Changes In Branch interleaved-queries Through [b85732a36a] Excluding Merge-Ins
This is equivalent to a diff from 8c476e8627 to b85732a36a
2012-11-19
| ||
01:55 | server, list-runs and repl now working check-in: 0cb9ad87a9 user: matt tags: interleaved-queries | |
2012-11-18
| ||
23:30 | Initial coding for interleaved queries done and compiles check-in: b85732a36a user: matt tags: interleaved-queries | |
2012-11-17
| ||
17:07 | Increased rigour of mocktest, added timeout and retry on dbaccess and added count of all accesses check-in: 79ae44c0b4 user: matt tags: interleaved-queries | |
2012-11-12
| ||
19:50 | Cherrypicked the fix to building for deploy check-in: e4ac93792c user: matt tags: trunk | |
00:04 | Interleaving query enablement, server side coded check-in: 4980ca9f98 user: matt tags: interleaved-queries | |
2012-11-06
| ||
09:44 | Testing ordering of loading zmq, fixes to deploy script check-in: 8c476e8627 user: mrwellan tags: trunk | |
06:21 | Added deploy script check-in: af55ffc7d7 user: matt tags: trunk | |
Modified common.scm from [48dba0a8c5] to [cc3866e918].
︙ | ︙ | |||
45 46 47 48 49 50 51 | (define *rpc:listener* #f) ;; if set up for server communication this will hold the tcp port (define *runremote* #f) ;; if set up for server communication this will hold <host port> (define *last-db-access* (current-seconds)) ;; update when db is accessed via server (define *max-cache-size* 0) (define *logged-in-clients* (make-hash-table)) (define *client-non-blocking-mode* #f) (define *server-id* #f) | > | > | 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 | (define *rpc:listener* #f) ;; if set up for server communication this will hold the tcp port (define *runremote* #f) ;; if set up for server communication this will hold <host port> (define *last-db-access* (current-seconds)) ;; update when db is accessed via server (define *max-cache-size* 0) (define *logged-in-clients* (make-hash-table)) (define *client-non-blocking-mode* #f) (define *server-id* #f) (define *server-info* #f) (define *time-to-exit* #f) (define *received-response* #f) (define *default-numtries* 2) (define *target* (make-hash-table)) ;; cache the target here; target is keyval1/keyval2/.../keyvalN (define *keys* (make-hash-table)) ;; cache the keys here (define *keyvals* (make-hash-table)) (define *toptest-paths* (make-hash-table)) ;; cache toptest path settings here (define *test-paths* (make-hash-table)) ;; cache test-id to test run paths here (define *test-ids* (make-hash-table)) ;; cache run-id, testname, and item-path => test-id |
︙ | ︙ |
Modified db.scm from [108dfe0472] to [c306724289].
︙ | ︙ | |||
9 10 11 12 13 14 15 | ;; PURPOSE. ;;====================================================================== ;;====================================================================== ;; Database access ;;====================================================================== | | | | | 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | ;; PURPOSE. ;;====================================================================== ;;====================================================================== ;; Database access ;;====================================================================== (require-extension (srfi 18) extras tcp) ;; rpc) ;; (import (prefix rpc rpc:)) (use sqlite3 srfi-1 posix regex regex-case srfi-69 csv-xml s11n md5 message-digest) (import (prefix sqlite3 sqlite3:)) (use zmq) (declare (unit db)) (declare (uses common)) (declare (uses keys)) |
︙ | ︙ | |||
779 780 781 782 783 784 785 | (if currstatus (conc "status='" currstatus "' AND ") "") " run_id=? AND testname=? AND NOT (item_path='' AND testname in (SELECT DISTINCT testname FROM tests WHERE testname=? AND item_path != ''));"))) ;;(debug:print 0 "QRY: " qry) (sqlite3:execute db qry run-id newstate newstatus testname testname))) testnames)) (define (cdb:delete-tests-in-state zmqsocket run-id state) | | | 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 | (if currstatus (conc "status='" currstatus "' AND ") "") " run_id=? AND testname=? AND NOT (item_path='' AND testname in (SELECT DISTINCT testname FROM tests WHERE testname=? AND item_path != ''));"))) ;;(debug:print 0 "QRY: " qry) (sqlite3:execute db qry run-id newstate newstatus testname testname))) testnames)) (define (cdb:delete-tests-in-state zmqsocket run-id state) (cdb:client-call zmqsocket 'delete-tests-in-state #t *default-numtries* run-id state)) ;; speed up for common cases with a little logic (define (db:test-set-state-status-by-id db test-id newstate newstatus newcomment) (cond ((and newstate newstatus newcomment) (sqlite3:exectute db "UPDATE tests SET state=?,status=?,comment=? WHERE id=?;" newstate newstatus test-id)) ((and newstate newstatus) |
︙ | ︙ | |||
958 959 960 961 962 963 964 | (define (db:test-set-comment db test-id comment) (sqlite3:execute db "UPDATE tests SET comment=? WHERE id=?;" comment test-id)) (define (cdb:test-set-rundir! zmqsocket run-id test-name item-path rundir) | | | | | 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 | (define (db:test-set-comment db test-id comment) (sqlite3:execute db "UPDATE tests SET comment=? WHERE id=?;" comment test-id)) (define (cdb:test-set-rundir! zmqsocket run-id test-name item-path rundir) (cdb:client-call zmqsocket 'test-set-rundir #t *default-numtries* rundir run-id test-name item-path)) (define (cdb:test-set-rundir-by-test-id zmqsocket test-id rundir) (cdb:client-call zmqsocket 'test-set-rundir-by-test-id #t *default-numtries* rundir test-id)) (define (db:test-get-rundir-from-test-id db test-id) (let ((res #f)) ;; (hash-table-ref/default *test-paths* test-id #f))) ;; (if res ;; res ;; (begin (sqlite3:for-each-row (lambda (tpath) (set! res tpath)) db "SELECT rundir FROM tests WHERE id=?;" test-id) ;; (hash-table-set! *test-paths* test-id res) res)) ;; )) (define (cdb:test-set-log! zmqsocket test-id logf) (if (string? logf)(cdb:client-call zmqsocket 'test-set-log #f *default-numtries* logf test-id))) ;;====================================================================== ;; Misc. test related queries ;;====================================================================== (define (db:test-get-paths-matching db keynames target fnamepatt #!key (res '())) (let* ((testpatt (if (args:get-arg "-testpatt")(args:get-arg "-testpatt") "%")) |
︙ | ︙ | |||
1093 1094 1095 1096 1097 1098 1099 | ;; (define (db:updater) ;; (debug:print-info 4 "Starting cache processing") ;; (let loop () ;; (thread-sleep! 10) ;; move save time around to minimize regular collisions? ;; (db:write-cached-data) ;; (loop))) | < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < > > > | | > > > > | | < | | < > | > > > | > | > > | > > | > > > > > > > > > | | | | | | | | | | | | | | | | | | 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 | ;; (define (db:updater) ;; (debug:print-info 4 "Starting cache processing") ;; (let loop () ;; (thread-sleep! 10) ;; move save time around to minimize regular collisions? ;; (db:write-cached-data) ;; (loop))) (define (db:obj->string obj)(with-output-to-string (lambda ()(serialize obj)))) (define (db:string->obj msg)(with-input-from-string msg (lambda ()(deserialize)))) (define (cdb:use-non-blocking-mode proc) (set! *client-non-blocking-mode* #t) (let ((res (proc))) (set! *client-non-blocking-mode* #f) res)) ;; params = 'target cached remparams ;; ;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime ;; (define (cdb:client-call zmq-sockets qtype immediate numretries . params) (debug:print-info 11 "cdb:client-call zmq-sockets=" zmq-sockets " params=" params) (let* ((push-socket (vector-ref zmq-sockets 0)) (sub-socket (vector-ref zmq-sockets 1)) (client-sig (server:get-client-signature)) (query-sig (message-digest-string (md5-primitive) (conc qtype immediate params))) (zdat (db:obj->string (vector client-sig qtype immediate query-sig params (current-seconds)))) ;; (with-output-to-string (lambda ()(serialize params)))) (res #f) (send-receive (lambda () (send-message push-socket zdat) (db:string->obj (let ((rmsg (if *client-non-blocking-mode* receive-message* receive-message))) ;; get the sender info ;; this should match (server:get-client-signature) ;; we will need to process "all" messages here some day (rmsg sub-socket) ;; now get the actual message (rmsg sub-socket))))) (timeout (lambda () (thread-sleep! 5) (if (not res) (if (> numretries 0) (begin (debug:print 0 "WARNING: no reply to query " params ", trying again") (apply cdb:client-call zmq-sockets qtype immediate (- numretries 1) params)) (begin (debug:print 0 "ERROR: cdb:client-call timed out " params ", exiting.") (exit 5))))))) (let ((th1 (make-thread send-receive "send receive")) (th2 (make-thread timeout "timeout"))) (thread-start! th1) (thread-start! th2) (thread-join! th1) res))) (define (cdb:set-verbosity zmq-socket val) (cdb:client-call zmq-socket 'set-verbosity #f *default-numtries* val)) (define (cdb:login zmq-sockets keyval signature) (cdb:client-call zmq-sockets 'login #t *default-numtries* keyval megatest-version signature)) (define (cdb:logout zmq-socket keyval signature) (cdb:client-call zmq-socket 'logout #t *default-numtries* keyval signature)) (define (cdb:num-clients zmq-socket) (cdb:client-call zmq-socket 'numclients #t *default-numtries*)) (define (cdb:test-set-status-state zmqsocket test-id status state msg) (if msg (cdb:client-call zmqsocket 'state-status-msg #t *default-numtries* state status msg test-id) (cdb:client-call zmqsocket 'state-status #t *default-numtries* state status test-id))) ;; run-id test-name item-path minutes cpuload diskfree tmpfree) (define (cdb:test-rollup-test_data-pass-fail zmqsocket test-id) (cdb:client-call zmqsocket 'test_data-pf-rollup #t *default-numtries* test-id test-id test-id test-id)) (define (cdb:pass-fail-counts zmqsocket test-id fail-count pass-count) (cdb:client-call zmqsocket 'pass-fail-counts #t *default-numtries* fail-count pass-count test-id)) (define (cdb:tests-register-test zmqsocket run-id test-name item-path) (let ((item-paths (if (equal? item-path "") (list item-path) (list item-path "")))) (cdb:client-call zmqsocket 'register-test #t *default-numtries* run-id test-name item-path))) (define (cdb:flush-queue zmqsocket) (cdb:client-call zmqsocket 'flush #f *default-numtries*)) (define (cdb:kill-server zmqsocket) (cdb:client-call zmqsocket 'killserver #f *default-numtries*)) (define (cdb:roll-up-pass-fail-counts zmqsocket run-id test-name item-path status) (cdb:client-call zmqsocket 'immediate #f *default-numtries* open-run-close db:roll-up-pass-fail-counts #f run-id test-name item-path status)) (define (cdb:get-test-info zmqsocket run-id test-name item-path) (cdb:client-call zmqsocket 'immediate #f *default-numtries* open-run-close db:get-test-info #f run-id test-name item-path)) (define (cdb:get-test-info-by-id zmqsocket test-id) (cdb:client-call zmqsocket 'immediate #f *default-numtries* open-run-close db:get-test-info-by-id #f test-id)) ;; db should be db open proc or #f (define (cdb:remote-run proc db . params) (apply cdb:client-call *runremote* 'immediate #f *default-numtries* open-run-close proc #f params)) (define (db:test-get-logfile-info db run-id test-name) (let ((res #f)) (sqlite3:for-each-row (lambda (path final_logf) (set! logf final_logf) (set! res (list path final_logf)) |
︙ | ︙ | |||
1288 1289 1290 1291 1292 1293 1294 | ;; not used, intended to indicate to run in calling process (define db:run-local-queries '()) ;; rollup-tests-pass-fail)) ;; The queue is a list of vectors where the zeroth slot indicates the type of query to ;; apply and the second slot is the time of the query and the third entry is a list of ;; values to be applied ;; | | | | < < < | | > | | > | > | | > | > | 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 | ;; not used, intended to indicate to run in calling process (define db:run-local-queries '()) ;; rollup-tests-pass-fail)) ;; The queue is a list of vectors where the zeroth slot indicates the type of query to ;; apply and the second slot is the time of the query and the third entry is a list of ;; values to be applied ;; (define (db:process-queue pubsock indata) (open-run-close (lambda (db . junkparams) (let ((queries (make-hash-table)) (data (sort indata (lambda (a b) (< (cdb:packet-get-qtime a)(cdb:packet-get-qtime b)))))) (if (> (length data) 0) (debug:print-info 4 "Writing cached data " data)) ;; prepare the needed statements, do each only once (for-each (lambda (request-item) (let ((stmt-key (cdb:get-qtype request-item))) (if (not (hash-table-ref/default queries stmt-key #f)) (let ((stmt (alist-ref stmt-key db:queries))) (if stmt (hash-table-set! queries stmt-key (sqlite3:prepare db (car stmt))) (if (procedure? stmt-key) (hash-table-set! queries stmt-key #f) (debug:print 0 "ERROR: Missing query spec for " stmt-key "!"))))))) data) ;; outer loop to handle special queries that cannot be handled in the ;; transaction. (let outerloop ((special-qry #f) (stmts data)) (if special-qry ;; handle a query that cannot be part of the grouped queries (let* ((stmt-key (cdb:get-qtype special-qry)) (return-address (cdb:get-client-sig special-qry)) (qry (hash-table-ref queries stmt-key)) (params (cdb:get-params special-qry))) (if (string? qry) (begin (apply sqlite3:execute db qry params) (server:reply return-address #t)) (if (procedure? stmt-key) (begin ;; we are being handed a procedure so call it (debug:print-info 11 "Running (apply " stmt-key " " db " " params ")") (server:reply return-address (apply stmt-key db params))) (debug:print 0 "ERROR: Unrecognised queued call " qry " " params))) (if (not (null? stmts)) (outerloop #f stmts))) ;; handle normal queries (let ((rem (sqlite3:with-transaction db (lambda () (debug:print-info 11 "flushing " stmts " to db") (if (null? stmts) stmts (let innerloop ((hed (car stmts)) (tal (cdr stmts))) (let ((params (cdb:packet-get-params hed)) (return-address (cdb:packet-get-client-sig hed)) (stmt-key (cdb:packet-get-qtype hed))) (if (or (procedure? stmt-key) (member stmt-key db:special-queries)) (begin (debug:print-info 11 "Handling special statement " stmt-key) (cons hed tal)) (begin (debug:print-info 11 "Executing " stmt-key " for " params) (apply sqlite3:execute (hash-table-ref queries stmt-key) params) (server:reply return-address #t) (if (not (null? tal)) (innerloop (car tal)(cdr tal)) '())) )))))))) (if (not (null? rem)) (outerloop (car rem)(cdr rem)))))) (for-each (lambda (stmt-key) |
︙ | ︙ |
Modified db_records.scm from [685480077c] to [b53402355f].
︙ | ︙ | |||
116 117 118 119 120 121 122 | (define-inline (db:step-stable-set-end! vec val)(vector-set! vec 2 val)) (define-inline (db:step-stable-set-status! vec val)(vector-set! vec 3 val)) (define-inline (db:step-stable-set-runtime! vec val)(vector-set! vec 4 val)) ;; use this one for db-get-run-info (define-inline (db:get-row vec)(vector-ref vec 1)) | > > > > > > > > > > > > > > | 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 | (define-inline (db:step-stable-set-end! vec val)(vector-set! vec 2 val)) (define-inline (db:step-stable-set-status! vec val)(vector-set! vec 3 val)) (define-inline (db:step-stable-set-runtime! vec val)(vector-set! vec 4 val)) ;; use this one for db-get-run-info (define-inline (db:get-row vec)(vector-ref vec 1)) ;; The data structure for handing off requests via wire (define (make-cdb:packet)(make-vector 6)) (define-inline (cdb:packet-get-client-sig vec) (vector-ref vec 0)) (define-inline (cdb:packet-get-qtype vec) (vector-ref vec 1)) (define-inline (cdb:packet-get-immediate vec) (vector-ref vec 2)) (define-inline (cdb:packet-get-query-sig vec) (vector-ref vec 3)) (define-inline (cdb:packet-get-params vec) (vector-ref vec 4)) (define-inline (cdb:packet-get-qtime vec) (vector-ref vec 5)) (define-inline (cdb:packet-set-client-sig! vec val)(vector-set! vec 0 val)) (define-inline (cdb:packet-set-qtype! vec val)(vector-set! vec 1 val)) (define-inline (cdb:packet-set-immediate! vec val)(vector-set! vec 2 val)) (define-inline (cdb:packet-set-query-sig! vec val)(vector-set! vec 3 val)) (define-inline (cdb:packet-set-params! vec val)(vector-set! vec 4 val)) (define-inline (cdb:packet-set-qtime! vec val)(vector-set! vec 5 val)) |
Modified megatest.scm from [8b945e5bb2] to [b74cc84dd9].
︙ | ︙ | |||
95 96 97 98 99 100 101 | -rebuild-db : bring the database schema up to date -update-meta : update the tests metadata for all tests -env2file fname : write the environment to fname.csh and fname.sh -setvars VAR1=val1,VAR2=val2 : Add environment variables to a run NB// these are overwritten by values set in config files. -server -|hostname : start the server (reduces contention on megatest.db), use - to automatically figure out hostname | | < | 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 | -rebuild-db : bring the database schema up to date -update-meta : update the tests metadata for all tests -env2file fname : write the environment to fname.csh and fname.sh -setvars VAR1=val1,VAR2=val2 : Add environment variables to a run NB// these are overwritten by values set in config files. -server -|hostname : start the server (reduces contention on megatest.db), use - to automatically figure out hostname -list-servers : list the servers -repl : start a repl (useful for extending megatest) Spreadsheet generation -extract-ods fname.ods : extract an open document spreadsheet from the database -pathmod path : insert path, i.e. path/runame/itempath/logfile.html will clear the field if no rundir/testname/itempath/logfile if it contains forward slashes the path will be converted |
︙ | ︙ | |||
119 120 121 122 123 124 125 126 127 128 129 130 131 132 | megatest -test-files 'logs/*.log' -target ubuntu/n%/no% :runname w49% -testpatt test_mt% Called as " (string-intersperse (argv) " ") " Built from " megatest-fossil-hash )) ;; -gui : start a gui interface ;; -config fname : override the runconfig file with fname ;; process args (define remargs (args:get-args (argv) (list "-runtests" ;; run a specific test "-config" ;; override the config file name "-execute" ;; run the command encoded in the base64 parameter | > | 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 | megatest -test-files 'logs/*.log' -target ubuntu/n%/no% :runname w49% -testpatt test_mt% Called as " (string-intersperse (argv) " ") " Built from " megatest-fossil-hash )) ;; -gui : start a gui interface ;; -config fname : override the runconfig file with fname ;; -kill-server host:port|pid : kill server specified by host:port or pid ;; process args (define remargs (args:get-args (argv) (list "-runtests" ;; run a specific test "-config" ;; override the config file name "-execute" ;; run the command encoded in the base64 parameter |
︙ | ︙ | |||
155 156 157 158 159 160 161 | ":variable" ":value" ":expected" ":tol" ":units" ;; misc "-server" | | | 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 | ":variable" ":value" ":expected" ":tol" ":units" ;; misc "-server" "-kill-server" "-port" "-extract-ods" "-pathmod" "-env2file" "-setvars" "-set-state-status" "-debug" ;; for *verbosity* > 2 |
︙ | ︙ | |||
182 183 184 185 186 187 188 | "-summarize-items" "-gui" ;; misc "-archive" "-repl" "-lock" "-unlock" | | | 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 | "-summarize-items" "-gui" ;; misc "-archive" "-repl" "-lock" "-unlock" "-list-servers" ;; queries "-test-paths" ;; get path(s) to a test, ordered by youngest first "-runall" ;; run all tests "-remove-runs" "-usequeue" "-rebuild-db" |
︙ | ︙ | |||
268 269 270 271 272 273 274 | ;;====================================================================== (if (args:get-arg "-server") (begin (debug:print 1 "Launching server...") (server:launch))) | | | | | | | | | | > | | | | | > | | > | | < < < < | < < < < | | | 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 | ;;====================================================================== (if (args:get-arg "-server") (begin (debug:print 1 "Launching server...") (server:launch))) (if (args:get-arg "-list-servers") ;; (args:get-arg "-kill-server")) (let ((tl (setup-for-run))) (if tl (let ((servers (open-run-close tasks:get-all-servers tasks:open-db)) (fmtstr "~5a~8a~8a~20a~20a~10a~10a~10a~10a\n") (servers-to-kill '())) (format #t fmtstr "Id" "MTver" "Pid" "Host" "Interface" "OutPort" "InPort" "LastBeat" "State") (format #t fmtstr "==" "=====" "===" "====" "=========" "=======" "======" "========" "=====") (for-each (lambda (server) (let* (;; (killinfo (args:get-arg "-kill-server")) ;; (khost-port (if killinfo (if (substring-index ":" killinfo)(string-split ":") #f) #f)) ;; (kpid (if killinfo (if (substring-index ":" killinfo) #f (string->number killinfo)) #f)) (id (vector-ref server 0)) (pid (vector-ref server 1)) (hostname (vector-ref server 2)) (interface (vector-ref server 3)) (pullport (vector-ref server 4)) (pubport (vector-ref server 5)) (start-time (vector-ref server 6)) (priority (vector-ref server 7)) (state (vector-ref server 8)) (mt-ver (vector-ref server 9)) (last-update (vector-ref server 10)) ;; (open-run-close tasks:server-alive? tasks:open-db #f hostname: hostname port: port)) (killed #f) (status (< last-update 20))) ;; (zmq-sockets (if status (server:client-connect hostname port) #f))) ;; no need to login as status of #t indicates we are connecting to correct ;; server (if (equal? state "dead") (if (> last-update (* 25 60 60)) ;; keep records around for slighly over a day. (open-run-close tasks:server-deregister tasks:open-db hostname pullport: pullport pid: pid action: 'delete)) (if (> last-update 20) ;; Mark as dead if not updated in last 20 seconds (open-run-close tasks:server-deregister tasks:open-db hostname pullport: pullport pid: pid))) (format #t fmtstr id mt-ver pid hostname interface pullport pubport last-update (if status "alive" "dead")))) servers) (debug:print-info 1 "Done with listservers") (set! *didsomething* #t) (exit) ;; must do, would have to add checks to many/all calls below ) (exit))) ;; if not list or kill then start a client (if appropriate) (if (or (args-defined? "-h" "-version" "-gen-megatest-area" "-gen-megatest-test") (eq? (length (hash-table-keys args:arg-hash)) 0)) (debug:print-info 1 "Server connection not needed") (server:client-launch))) ;;====================================================================== ;; Remove old run(s) ;;====================================================================== ;; since several actions can be specified on the command line the removal |
︙ | ︙ |
Modified server.scm from [c2beddecc4] to [67b3f4e15b].
1 2 3 4 5 6 7 8 9 10 | ;; Copyright 2006-2012, Matthew Welland. ;; ;; This program is made available under the GNU GPL version 2.0 or ;; greater. See the accompanying file COPYING for details. ;; ;; This program is distributed WITHOUT ANY WARRANTY; without even the ;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR ;; PURPOSE. | | | > > > > > > > > > > > > > > > > > > > > > > > > > | > > > > | | | | | | | | | > > > > > > > | > > > | | | | | | | | | | < | | | | > | | < | > | > | | | | > > | | < < < | | < < < < < | | < < < < < | < | | > > > > > > > > > > > > > > > > > > | | < > | | | | < < < < < < < < < < < < < < < | < < | > | | | | | | | | < | | | | | | | | | | | | | | > | > > > > > > > | | | | | | | | | | > > > > > > > | < | | > > > > > > > > > > > > > > > > > > > | | | > | | | | | | | | | | | | < < < < < < < < < < < < | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 | ;; Copyright 2006-2012, Matthew Welland. ;; ;; This program is made available under the GNU GPL version 2.0 or ;; greater. See the accompanying file COPYING for details. ;; ;; This program is distributed WITHOUT ANY WARRANTY; without even the ;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR ;; PURPOSE. (require-extension (srfi 18) extras tcp s11n) ;; (import (prefix rpc rpc:)) (use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) (import (prefix sqlite3 sqlite3:)) (use zmq) (declare (unit server)) (declare (uses common)) (declare (uses db)) (declare (uses tests)) (declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. (include "common_records.scm") (include "db_records.scm") ;; Transition to pub --> sub with pull <-- push ;; ;; 1. client sends request to server via push to the pull port ;; 2. server puts request in queue or processes immediately as appropriate ;; 3. server puts responses from completed requests into pub port ;; ;; TODO ;; ;; Done Tested ;; [x] [ ] 1. Add columns pullport pubport to servers table ;; [x] [ ] 2. Add rm of monitor.db if older than 11/12/2012 ;; [x] [ ] 3. Add create of pullport and pubport with finding of available ports ;; [x] [ ] 4. Add client compose of request ;; [x] [ ] - name of client: testname/itempath-test_id-hostname ;; [x] [ ] - name of request: callname, params ;; [x] [ ] - request key: f(clientname, callname, params) ;; [ ] [ ] 5. Add processing of subscription hits ;; [ ] [ ] - done when get key ;; [ ] [ ] - return results ;; [ ] [ ] 6. Add timeout processing ;; [ ] [ ] - after 60 seconds ;; [ ] [ ] i. check server alive, connect to new if necessary ;; [ ] [ ] ii. resend request ;; [ ] [ ] 7. Turn self ping back on (define (server:make-server-url hostport) (if (not hostport) #f (conc "tcp://" (car hostport) ":" (cadr hostport)))) (define *server-loop-heart-beat* (current-seconds)) (define *heartbeat-mutex* (make-mutex)) ;; (define (server:self-ping server-info) ;; ;; server-info: server-id interface pullport pubport ;; (let ((iface (list-ref server-info 1)) ;; (pullport (list-ref server-info 2)) ;; (pubport (list-ref server-info 3))) ;; (server:client-connect iface pullport pubport) ;; (let loop () ;; (thread-sleep! 2) ;; (cdb:client-call *runremote* 'ping #t) ;; (debug:print 4 "server:self-ping - I'm alive on " iface ":" pullport "/" pubport "!") ;; (mutex-lock! *heartbeat-mutex*) ;; (set! *server-loop-heart-beat* (current-seconds)) ;; (mutex-unlock! *heartbeat-mutex*) ;; (loop)))) (define-inline (zmqsock:get-pub dat)(vector-ref dat 0)) (define-inline (zmqsock:get-pull dat)(vector-ref dat 1)) (define-inline (zmqsock:set-pub! dat s)(vector-set! dat s 0)) (define-inline (zmqsock:set-pull! dat s)(vector-set! dat s 0)) (define (server:run hostn) (debug:print 0 "Attempting to start the server ...") (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") (exit)))) (let* ((zmq-sdat1 #f) (zmq-sdat2 #f) (pull-socket #f) (pub-socket #f) (p1 #f) (p2 #f) (zmq-sockets-dat #f) (iface (if (string=? "-" hostn) "*" ;; (get-host-name) hostn)) (hostname (get-host-name)) (ipaddrstr (let ((ipstr (if (string=? "-" hostn) (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") #f))) (if ipstr ipstr hostname))) (last-run 0)) (set! zmq-sockets-dat (server:setup-ports ipaddrstr (if (args:get-arg "-port") (string->number (args:get-arg "-port")) (+ 5000 (random 1001))))) (set! zmq-sdat1 (car zmq-sockets-dat)) (set! pull-socket (cadr zmq-sdat1)) ;; (iface s port) (set! p1 (caddr zmq-sdat1)) (set! zmq-sdat2 (cadr zmq-sockets-dat)) (set! pub-socket (cadr zmq-sdat2)) (set! p2 (caddr zmq-sdat2)) (set! *cache-on* #t) ;; what to do when we quit ;; (on-exit (lambda () (if (and *toppath* *server-info*) (begin (open-run-close tasks:server-deregister-self tasks:open-db ipaddrstr p1 p2)) (let loop () (let ((queue-len 0)) (thread-sleep! (random 5)) (mutex-lock! *incoming-mutex*) (set! queue-len (length *incoming-data*)) (mutex-unlock! *incoming-mutex*) (if (> queue-len 0) (begin (debug:print-info 0 "Queue not flushed, waiting ...") (loop)))))))) ;; The heavy lifting ;; ;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime ;; (let loop ((queue-lst '())) ;; (print "GOT HERE EH?") (let* ((rawmsg (receive-message* pull-socket)) (packet (db:string->obj rawmsg))) (debug:print-info 12 "server=> received packet=" packet) (if (cdb:packet-get-immediate packet) ;; process immediately or put in queue (begin (db:process-queue pubsock (cons packet queue)) (loop '())) (loop (cons packet queue))))))) (define (server:reply pubsock target result) (send-message pubsock target send-more: #t) (send-message pubsock result)) ;; run server:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; (define (server:keep-running) ;; if none running or if > 20 seconds since ;; server last used then start shutdown ;; This thread waits for the server to come alive (let* ((server-info (let loop () (let ((sdat #f)) (mutex-lock! *heartbeat-mutex*) (set! sdat *server-info*) (mutex-unlock! *heartbeat-mutex*) (if sdat sdat (begin (sleep 4) (loop)))))) (iface (cadr server-info)) (pullport (caddr server-info)) (pubport (cadddr server-info)) ;; id interface pullport pubport) ;; (zmq-sockets (server:client-connect iface pullport pubport))) ) (let loop ((count 0)) (thread-sleep! 4) ;; no need to do this very often ;; (let ((queue-len (string->number (cdb:client-call zmq-sockets 'sync #t 1)))) ;; (print "Server running, count is " count) (if (< count 1) ;; 3x3 = 9 secs aprox (loop (+ count 1))) ;; NOTE: Get rid of this mechanism! It really is not needed... (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info)) ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access (if (> (+ *last-db-access* ;; (* 48 60 60) ;; 48 hrs ;; 60 ;; one minute (* 60 60) ;; one hour ) (current-seconds)) (begin (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) *last-db-access*)) (loop 0)) (begin (debug:print-info 0 "Starting to shutdown the server.") ;; need to delete only *my* server entry (future use) (set! *time-to-exit* #t) (open-run-close tasks:server-deregister-self tasks:open-db (get-host-name)) (thread-sleep! 1) (debug:print-info 0 "Max cached queries was " *max-cache-size*) (debug:print-info 0 "Server shutdown complete. Exiting") (exit)))))) (define (server:find-free-port-and-open iface s port stype #!key (trynum 50)) (let ((s (if s s (make-socket stype))) (p (if (number? port) port 5555)) (old-handler (current-exception-handler))) (handle-exceptions exn (begin (debug:print 0 "Failed to bind to port " p ", trying next port") (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) ;; (old-handler) ;; (print-call-chain) (if (> trynum 0) (server:find-free-port-and-open iface s (+ p 1) trynum: (- trynum 1)) (debug:print-info 0 "Tried ports up to " p " but all were in use. Please try a different port range by starting the server with parameter \" -port N\" where N is the starting port number to use")) (exit)) ;; To exit or not? That is the question. (let ((zmq-url (conc "tcp://" iface ":" p))) (debug:print 0 "Trying to start server on " zmq-url) (bind-socket s zmq-url) (list iface s port))))) (define (server:setup-ports ipaddrstr startport) (let* ((s1 (server:find-free-port-and-open ipaddrstr #f startport 'pub)) (p1 (caddr s1)) (s2 (server:find-free-port-and-open ipaddrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pull)) (p2 (caddr s2))) (set! *runremote* #f) (debug:print 0 "Server started on " ipaddrstr " ports " p1 " and " p2) (mutex-lock! *heartbeat-mutex*) (set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) ipaddrstr p1 p2 0 'live)) (mutex-unlock! *heartbeat-mutex*) (list s1 s2))) (define (server:mk-signature) (message-digest-string (md5-primitive) (with-output-to-string (lambda () (write (list (current-directory) (argv))))))) (define (server:get-client-signature) (if *my-client-signature* *my-client-signature* (let ((sig (server:mk-signature))) (set! *my-client-signature* sig) *my-client-signature*))) ;; (define (server:client-socket-connect iface port #!key (context #f)(type 'req)(subscriptions '())) (debug:print-info 3 "client-connect " iface ":" port ", type=" type ", subscriptions=" subscriptions) (let ((connect-ok #f) (zmq-socket (if context (make-socket type context) (make-socket type))) (conurl (server:make-server-url (list iface port)))) (if (socket? zmq-socket) (begin ;; first apply subscriptions (for-each (lambda (subscription) (debug:print 2 "Subscribing to " subscription) (socket-option-set! zmq-socket 'subscribe subscription)) subscriptions) (connect-socket zmq-socket conurl) zmq-socket) (begin (debug:print 0 "ERROR: Failed to open socket to " conurl) #f)))) (define (server:client-login zmq-sockets) (cdb:login zmq-sockets *toppath* (server:get-client-signature))) (define (server:client-logout zmq-socket) (let ((ok (and (socket? zmq-socket) (cdb:logout zmq-socket *toppath* (server:get-client-signature))))) ;; (close-socket zmq-socket) ok)) (define (server:client-connect iface pullport pubport) (let* ((push-socket (server:client-socket-connect iface pullport type: 'push)) (sub-socket (server:client-socket-connect iface pubport type: 'sub subscriptions: (list (server:get-client-signature) "all"))) (zmq-sockets (vector push-socket sub-socket)) (login-res #f)) (set! login-res (server:client-login zmq-sockets)) (if (and (not (null? login-res)) (car login-res)) (begin (debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".") (set! *runremote* zmq-sockets) zmq-sockets) (begin (debug:print-info 2 "Failed to login or connect to " conurl) (set! *runremote* #f) #f)))) ;; Do all the connection work, start a server if not already running (define (server:client-setup #!key (numtries 50)) (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: failed to find megatest.config, exiting") (exit)))) (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) (if hostinfo (let ((host (list-ref hostinfo 0)) (iface (list-ref hostinfo 1)) (pullport (list-ref hostinfo 2)) (pubport (list-ref hostinfo 3))) (debug:print-info 2 "Setting up to connect to " hostinfo) ;; (handle-exceptions ;; exn ;; (begin ;; ;; something went wrong in connecting to the server. In this scenario it is ok ;; ;; to try again ;; (debug:print 0 "ERROR: Failed to open a connection to the server at: " hostinfo) ;; (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) ;; (debug:print 0 " perhaps jobs killed with -9? Removing server records") ;; (open-run-close tasks:server-deregister tasks:open-db host pullport: pullport) ;; (server:client-setup (- numtries 1)) ;; #f) (server:client-connect iface pullport pubport)) ;; ) (if (> numtries 0) (let ((exe (car (argv)))) (debug:print-info 1 "No server available, attempting to start one...") (process-run exe (list "-server" "-" "-debug" (conc *verbosity*))) (sleep 5) ;; give server time to start ;; we are starting a server, do not try again! That can lead to ;; recursively starting many processes!!! |
︙ | ︙ | |||
291 292 293 294 295 296 297 | (debug:print 0 "ERROR: cannot find megatest.config, exiting") (exit)))) (debug:print-info 1 "Starting the standalone server") (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) (if hostinfo (debug:print-info 1 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo)) (if *toppath* | | | | | | | | | | | | | > > | < > | | > > | > | 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 | (debug:print 0 "ERROR: cannot find megatest.config, exiting") (exit)))) (debug:print-info 1 "Starting the standalone server") (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) (if hostinfo (debug:print-info 1 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo)) (if *toppath* (let* (;; (th1 (make-thread (lambda () ;; (let ((server-info #f)) ;; ;; wait for the server to be online and available ;; (let loop () ;; (debug:print-info 1 "Waiting for the server to come online before starting heartbeat") ;; (thread-sleep! 2) ;; (mutex-lock! *heartbeat-mutex*) ;; (set! server-info *server-info* ) ;; (mutex-unlock! *heartbeat-mutex*) ;; (if (not server-info)(loop))) ;; (debug:print 1 "Server alive, starting self-ping") ;; (server:self-ping server-info) ;; )) ;; "Self ping")) (th2 (make-thread (lambda () (server:run (args:get-arg "-server"))) "Server run")) (th3 (make-thread (lambda ()(server:keep-running)) "Keep running")) ) (set! *client-non-blocking-mode* #t) ;; (thread-start! th1) (thread-start! th2) (thread-start! th3) (set! *didsomething* #t) ;; (thread-join! th3) (thread-join! th2) ) (debug:print 0 "ERROR: Failed to setup for megatest"))) (exit))) (define (server:client-signal-handler signum) (handle-exceptions exn (debug:print " ... exiting ...") (let ((th1 (make-thread (lambda () (if (not *received-response*) (receive-message* *runremote*))) ;; flush out last call if applicable "eat response")) (th2 (make-thread (lambda () (debug:print 0 "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.") (thread-sleep! 3) ;; give the flush three seconds to do it's stuff (debug:print 0 " Done.") (exit 4)) "exit on ^C timer"))) (thread-start! th2) (thread-start! th1) (thread-join! th2)))) (define (server:client-launch) (set-signal-handler! signal/int server:client-signal-handler) (if (server:client-setup) (debug:print-info 2 "connected as client") (begin (debug:print 0 "ERROR: Failed to connect as client") (exit)))) ;; ping a server and return number of clients or #f (if no response) ;; NOT IN USE! (define (server:ping host port #!key (secs 10)(return-socket #f)) (cdb:use-non-blocking-mode (lambda () (let* ((res #f) (th1 (make-thread (lambda () (let* ((zmq-context (make-context 1)) |
︙ | ︙ |
Modified tasks.scm from [53e3d6e8bd] to [acea04adee].
︙ | ︙ | |||
20 21 22 23 24 25 26 | ;;====================================================================== ;; Tasks db ;;====================================================================== (define (tasks:open-db) (let* ((dbpath (conc *toppath* "/monitor.db")) | | > > > > > > > > | 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | ;;====================================================================== ;; Tasks db ;;====================================================================== (define (tasks:open-db) (let* ((dbpath (conc *toppath* "/monitor.db")) (exists (if (file-exists? dbpath) ;; BUGGISHNESS: Remove this code in six months. Today is 11/13/2012 (if (< (file-change-time dbpath) 1352851396.0) (begin (debug:print 0 "NOTE: removing old db file " dbpath) (delete-file dbpath) #f) #t) #f)) (mdb (sqlite3:open-database dbpath)) ;; (never-give-up-open-db dbpath)) (handler (make-busy-timeout 36000))) (sqlite3:set-busy-handler! mdb handler) (sqlite3:execute mdb (conc "PRAGMA synchronous = 0;")) (if (not exists) (begin (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS tasks_queue (id INTEGER PRIMARY KEY, |
︙ | ︙ | |||
50 51 52 53 54 55 56 | hostname TEXT, username TEXT, CONSTRAINT monitors_constraint UNIQUE (pid,hostname));") (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS servers (id INTEGER PRIMARY KEY, pid INTEGER, interface TEXT, hostname TEXT, | | > | | | > | | > | | | > | | | > | | | | | | | | 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 | hostname TEXT, username TEXT, CONSTRAINT monitors_constraint UNIQUE (pid,hostname));") (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS servers (id INTEGER PRIMARY KEY, pid INTEGER, interface TEXT, hostname TEXT, pullport INTEGER, pubport INTEGER, start_time TIMESTAMP, priority INTEGER, state TEXT, mt_version TEXT, heartbeat TIMESTAMP, CONSTRAINT servers_constraint UNIQUE (pid,hostname,pullport,pubport));") (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS clients (id INTEGER PRIMARY KEY, server_id INTEGER, pid INTEGER, hostname TEXT, cmdline TEXT, login_time TIMESTAMP, logout_time TIMESTAMP DEFAULT -1, CONSTRAINT clients_constraint UNIQUE (pid,hostname));") )) mdb)) ;;====================================================================== ;; Server and client management ;;====================================================================== ;; state: 'live, 'shutting-down, 'dead (define (tasks:server-register mdb pid interface pullport pubport priority state) (sqlite3:execute mdb "INSERT OR REPLACE INTO servers (pid,hostname,pullport,pubport,start_time,priority,state,mt_version,heartbeat,interface) VALUES(?, ?, ?, ?, strftime('%s','now'), ?, ?, ?, strftime('%s','now'),?);" pid (get-host-name) pullport pubport priority (conc state) megatest-version interface) (list (tasks:server-get-server-id mdb (get-host-name) pullport pid) interface pullport pubport)) ;; NB// two servers with same pid on different hosts will be removed from the list if pid: is used! (define (tasks:server-deregister mdb hostname #!key (pullport #f)(pid #f)(action 'markdead)) (debug:print-info 11 "server-deregister " hostname ", pullport " pullport ", pid " pid) (if pid (case action ((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid)) (else (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE pid=?;" pid))) (if pullport (case action ((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE hostname=? AND pullport=?;" hostname port)) (else (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE hostname=? AND pullport=?;" hostname pullport))) (debug:print 0 "ERROR: tasks:server-deregister called with neither pid nor port specified")))) (define (tasks:server-deregister-self mdb hostname) (tasks:server-deregister mdb hostname pid: (current-process-id))) (define (tasks:server-get-server-id mdb hostname pullport pid) (let ((res #f)) (sqlite3:for-each-row (lambda (id) (set! res id)) mdb (if (and hostname pid) "SELECT id FROM servers WHERE hostname=? AND pid=?;" "SELECT id FROM servers WHERE hostname=? AND pullport=?;") hostname (if pid pid pullport)) res)) (define (tasks:server-update-heartbeat mdb server-id) (sqlite3:execute mdb "UPDATE servers SET heartbeat=strftime('%s','now') WHERE id=?;" server-id)) ;; alive servers keep the heartbeat field upto date with seconds every 6 or so seconds (define (tasks:server-alive? mdb server-id #!key (hostname #f)(pullport #f)(pid #f)) (let* ((server-id (if server-id server-id (tasks:server-get-server-id mdb hostname pullport pid))) (heartbeat-delta 99e9)) (sqlite3:for-each-row (lambda (delta) (set! heartbeat-delta delta)) mdb "SELECT strftime('%s','now')-heartbeat FROM servers WHERE id=?;" server-id) (< heartbeat-delta 10))) |
︙ | ︙ | |||
156 157 158 159 160 161 162 | ;; ping each server in the db and return first found that responds. ;; remove any others. will not necessarily remove all! (define (tasks:get-best-server mdb) (let ((res '()) (best #f)) (sqlite3:for-each-row | | | | > > | < | | > | | | | | | > > | > > > > > > > | 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 | ;; ping each server in the db and return first found that responds. ;; remove any others. will not necessarily remove all! (define (tasks:get-best-server mdb) (let ((res '()) (best #f)) (sqlite3:for-each-row (lambda (id hostname interface pullport pubport pid) (set! res (cons (list hostname interface pullport pubport pid) res)) (debug:print-info 2 "Found existing server " hostname ":" pullport " registered in db")) mdb "SELECT id,hostname,interface,pullport,pubport,pid FROM servers WHERE strftime('%s','now')-heartbeat < 10 AND mt_version=? ORDER BY start_time ASC LIMIT 1;" megatest-version) (if (null? res) #f (let loop ((hed (car res)) (tal (cdr res))) ;; (print "hed=" hed ", tal=" tal) (let* ((host (list-ref hed 0)) (iface (list-ref hed 1)) (pullport (list-ref hed 2)) (pubport (list-ref hed 3)) (pid (list-ref hed 4)) (alive (open-run-close tasks:server-alive? tasks:open-db #f hostname: host pullport: pullport))) (if alive (begin (debug:print-info 2 "Found an existing, alive, server " host ", " pullport " and " pubport ".") (list host iface pullport pubport)) (begin (debug:print-info 1 "Marking " host ":" pullport " as dead in server registry.") (if pullport (open-run-close tasks:server-deregister tasks:open-db host pullport: pullport) (open-run-close tasks:server-deregister tasks:open-db host pid: pid)) (if (null? tal) #f (loop (car tal)(cdr tal)))))))))) (define (tasks:mark-server hostname pullport pid state) (if port (open-run-close tasks:server-deregister tasks:open-db hostname port: port) (open-run-close tasks:server-deregister tasks:open-db hostname pid: pid))) ;; NOTE: NOT PORTED TO WORK WITH pullport/pubport (define (tasks:kill-server status hostname port pid) (debug:print-info 1 "Removing defunct server record for " hostname ":" port) (if port (open-run-close tasks:server-deregister tasks:open-db hostname port: port) (open-run-close tasks:server-deregister tasks:open-db hostname pid: pid)) (if status ;; #t means alive (begin |
︙ | ︙ | |||
210 211 212 213 214 215 216 217 218 219 220 | (if (equal? hostname (get-host-name)) (begin (debug:print-info 1 "Sending signal/term to " pid " on " hostname) (process-signal pid signal/term) ;; local machine, send sig term (thread-sleep! 5) ;; give it five seconds to die peacefully then do a brutal kill (process-signal pid signal/kill)) (debug:print 0 "WARNING: Can't kill frozen server on remote host " hostname)))))) (define (tasks:get-all-servers mdb) (let ((res '())) (sqlite3:for-each-row | > > | | | | 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 | (if (equal? hostname (get-host-name)) (begin (debug:print-info 1 "Sending signal/term to " pid " on " hostname) (process-signal pid signal/term) ;; local machine, send sig term (thread-sleep! 5) ;; give it five seconds to die peacefully then do a brutal kill (process-signal pid signal/kill)) (debug:print 0 "WARNING: Can't kill frozen server on remote host " hostname)))))) (define (tasks:get-all-servers mdb) (let ((res '())) (sqlite3:for-each-row (lambda (id pid hostname interface pullport pubport start-time priority state mt-version last-update) (set! res (cons (vector id pid hostname interface pullport pubport start-time priority state mt-version last-update) res))) mdb "SELECT id,pid,hostname,interface,pullport,pubport,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update FROM servers ORDER BY start_time DESC;") res)) ;;====================================================================== ;; Tasks and Task monitors ;;====================================================================== |
︙ | ︙ |
Added testzmq/mockupclient.scm version [3f321e6f75].
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | (use zmq posix) (define cname "Bob") (define runtime 10) (let ((args (argv))) (if (< (length args) 3) (begin (print "Usage: mockupclient clientname runtime") (exit)) (begin (set! cname (cadr args)) (set! runtime (string->number (caddr args)))))) ;; (define start-delay (/ (random 100) 9)) ;; (define runtime (+ 1 (/ (random 200) 2))) (print "Starting client " cname " with runtime " runtime) (include "mockupclientlib.scm") (set! endtime (+ (current-seconds) runtime)) (let loop () (let ((x (random 15)) (varname (list-ref (list "hello" "goodbye" "saluton" "kiaorana")(random 4)))) (case x ;; ((1)(dbaccess cname 'sync "nodat" #f)) ((2 3 4 5)(dbaccess cname 'set varname (random 999))) ((6 7 8 9 10)(print cname ": Get \"" varname "\" " (dbaccess cname 'get varname #f))) (else (thread-sleep! 0.011))) (if (< (current-seconds) endtime) (loop)))) (print "Client " cname " all done!!") |
Added testzmq/mockupserver.scm version [5ff56c6e40].
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 | ;; pub/sub with envelope address ;; Note that if you don't insert a sleep, the server will crash with SIGPIPE as soon ;; as a client disconnects. Also a remaining client may receive tons of ;; messages afterward. (use zmq srfi-18 sqlite3) (define pub (make-socket 'pub)) (define pull (make-socket 'pull)) (define cname "server") (define total-db-accesses 0) (define start-time (current-seconds)) (bind-socket pub "tcp://*:5563") (bind-socket pull "tcp://*:5564") (define (open-db) (let* ((dbpath "mockup.db") (dbexists (file-exists? dbpath)) (db (open-database dbpath)) ;; (never-give-up-open-db dbpath)) (handler (make-busy-timeout 10))) (set-busy-handler! db handler) (if (not dbexists) (for-each (lambda (stmt) (execute db stmt)) (list "PRAGMA SYNCHRONOUS=0;" "CREATE TABLE clients (id INTEGER PRIMARY KEY,name TEXT,num_accesses INTEGER DEFAULT 0);" "CREATE TABLE vars (var TEXT,val TEXT,CONSTRAINT vars_constraint UNIQUE (var));"))) db)) (define cid-cache (make-hash-table)) (define (get-client-id db cname) (let ((cid (hash-table-ref/default cid-cache cname #f))) (if cid cid (begin (execute db "INSERT OR REPLACE INTO clients (name) VALUES(?);" cname) (for-each-row (lambda (id) (set! cid id)) db "SELECT id FROM clients WHERE name=?;" cname) (hash-table-set! cid-cache cname cid) (set! total-db-accesses (+ total-db-accesses 2)) cid)))) (define (count-client db cname) (let ((cid (get-client-id db cname))) (execute db "UPDATE clients SET num_accesses=num_accesses+1 WHERE id=?;" cid) (set! total-db-accesses (+ total-db-accesses 1)) )) (define db (open-db)) ;; (define queuelst '()) ;; (define mx1 (make-mutex)) (define (process-queue queuelst) (let ((queuelen (length queuelst))) (for-each (lambda (item) (let ((cname (vector-ref item 1)) (clcmd (vector-ref item 2)) (cdata (vector-ref item 3))) (send-message pub cname send-more: #t) (send-message pub (case clcmd ((sync) (conc queuelen)) ((set) (set! total-db-accesses (+ total-db-accesses 1)) (apply execute db "INSERT OR REPLACE INTO vars (var,val) VALUES (?,?);" (string-split cdata)) "ok") ((get) (set! total-db-accesses (+ total-db-accesses 1)) (let ((res "noval")) (for-each-row (lambda (val) (set! res val)) db "SELECT val FROM vars WHERE var=?;" cdata) res)) (else (conc "unk cmd: " clcmd)))))) queuelst))) (define th1 (make-thread (lambda () (let ((last-run 0)) ;; current-seconds when run last (let loop ((queuelst '())) (let* ((indat (receive-message* pull)) (parts (string-split indat ":")) (cname (car parts)) ;; client name (clcmd (string->symbol (cadr parts))) ;; client cmd (cdata (caddr parts)) ;; client data (svect (vector (current-seconds) cname clcmd cdata))) ;; record for the queue (count-client db cname) (case clcmd ((sync) ;; just process the queue (print "Got sync from " cname) (process-queue (cons svect queuelst)) (loop '())) ((get) (process-queue (cons svect queuelst)) (loop '())) (else (loop (cons svect queuelst)))))))) "server thread")) (include "mockupclientlib.scm") ;; send a sync to the pull port (define th2 (make-thread (lambda () (let ((last-action-time (current-seconds))) (let loop () (thread-sleep! 5) (let ((queuelen (string->number (dbaccess "server" 'sync "nada" #f))) (last-action-delta #f)) (if (> queuelen 1)(set! last-action-time (current-seconds))) (set! last-action-delta (- (current-seconds) last-action-time)) (print "Server: Got queuelen=" queuelen ", last-action-delta=" last-action-delta) (if (< last-action-delta 60) (loop) (print "Server exiting, 25 seconds since last access")))))) "sync thread")) (thread-start! th1) (thread-start! th2) (thread-join! th2) (let* ((run-time (- (current-seconds) start-time)) (queries/second (/ total-db-accesses run-time))) (print "Server exited! Total db accesses=" total-db-accesses " in " run-time " seconds for " queries/second " queries/second")) |
Added testzmq/testmockup.sh version [15deaa0e30].
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | #!/bin/bash rm -f mockup.db echo Compiling mockupserver.scm and mockupclient.scm csc mockupserver.scm csc mockupclient.scm echo Starting server ./mockupserver & sleep 1 echo Starting clients for i in a b c d e f g h i j k l m n o p q s t u v w x y z; do for k in a b; do for j in 0 1 2 3 4 5 6 7 8 9; do waittime=`random 0 60` runtime=`random 5 120` echo "Starting client $i$k$j with waittime $waittime and runtime $runtime" (sleep $waittime;./mockupclient $i$k$j $runtime) & done done done wait echo testmockup.sh script done # echo "Waiting for 5 seconds then killing all mockupserver and mockupclient processes" # sleep 30 # killall -v mockupserver mockupclient |
Modified utils/installall.sh from [e461189985] to [6bb115f66a].
|
| | | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | #! /bin/env bash set -x # Copyright 2007-2010, Matthew Welland. # # This program is made available under the GNU GPL version 2.0 or # greater. See the accompanying file COPYING for details. # # This program is distributed WITHOUT ANY WARRANTY; without even the # implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR # PURPOSE. echo You may need to do the following first: echo sudo apt-get install libreadline-dev echo sudo apt-get install libwebkitgtk-dev echo sudo apt-get install libmotif3 -OR- set KTYPE=26g4 echo KTYPE can be 26, 26g4, or 32 echo KTYPE=$KTYPE echo You are using PREFIX=$PREFIX echo You are using proxy="$proxy" echo echo "Set additional_libpath to help find gtk or other libraries, don't forget a leading :" |
︙ | ︙ | |||
266 267 268 269 270 271 272 | if [[ -e ${ZEROMQ}${zpatchlev}.tar.gz ]] ; then tar xfz ${ZEROMQ}.tar.gz cd ${ZEROMQ} ln -s $PREFIX/include/uuid src # LDFLAGS=-L$PREFIX/lib ./configure --prefix=$PREFIX | | > | 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 | if [[ -e ${ZEROMQ}${zpatchlev}.tar.gz ]] ; then tar xfz ${ZEROMQ}.tar.gz cd ${ZEROMQ} ln -s $PREFIX/include/uuid src # LDFLAGS=-L$PREFIX/lib ./configure --prefix=$PREFIX ./configure --enable-static --prefix=$PREFIX --with-uuid=$PREFIX LDFLAGS="-L$PREFIX/lib" CPPFLAGS="-fPIC -I$PREFIX/include" LIBS="-lgcc" # --disable-shared CPPFLAGS="-fPIC # LDFLAGS="-L/usr/lib64 -L$PREFIX/lib" ./configure --enable-static --prefix=$PREFIX make make install CSC_OPTIONS="-I$PREFIX/include -L$CSCLIBS" chicken-install $PROX zmq # CSC_OPTIONS="-I$PREFIX/include -L$CSCLIBS" chicken-install $PROX -deploy -prefix $DEPLOYTARG zmq fi |
︙ | ︙ |