Overview
Comment: | (no comment) |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | interleaved-queries |
Files: | files | file ages | folders |
SHA1: |
2a8e99f4af5bd363a1cf7af407e765a7 |
User & Date: | mrwellan on 2012-11-20 14:58:15 |
Other Links: | branch diff | manifest | tags |
Context
2012-11-20
| ||
18:09 | (no comment) check-in: fc67718610 user: mrwellan tags: interleaved-queries | |
14:58 | (no comment) check-in: 2a8e99f4af user: mrwellan tags: interleaved-queries | |
10:52 | brought tests up-to-date, increased timeout on query roundtime to 120 seconds check-in: fbe00a0b5c user: mrwellan tags: interleaved-queries | |
Changes
Modified common.scm from [cc3866e918] to [df3547d145].
︙ | ︙ | |||
48 49 50 51 52 53 54 | (define *max-cache-size* 0) (define *logged-in-clients* (make-hash-table)) (define *client-non-blocking-mode* #f) (define *server-id* #f) (define *server-info* #f) (define *time-to-exit* #f) (define *received-response* #f) | | | 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 | (define *max-cache-size* 0) (define *logged-in-clients* (make-hash-table)) (define *client-non-blocking-mode* #f) (define *server-id* #f) (define *server-info* #f) (define *time-to-exit* #f) (define *received-response* #f) (define *default-numtries* 5) (define *target* (make-hash-table)) ;; cache the target here; target is keyval1/keyval2/.../keyvalN (define *keys* (make-hash-table)) ;; cache the keys here (define *keyvals* (make-hash-table)) (define *toptest-paths* (make-hash-table)) ;; cache toptest path settings here (define *test-paths* (make-hash-table)) ;; cache test-id to test run paths here (define *test-ids* (make-hash-table)) ;; cache run-id, testname, and item-path => test-id |
︙ | ︙ |
Modified db.scm from [89de92b23e] to [1b1ceb5ed3].
︙ | ︙ | |||
1118 1119 1120 1121 1122 1123 1124 | (query-sig (message-digest-string (md5-primitive) (conc qtype immediate params))) (zdat (db:obj->string (vector client-sig qtype immediate query-sig params (current-seconds)))) ;; (with-output-to-string (lambda ()(serialize params)))) (res #f) (send-receive (lambda () (debug:print-info 11 "sending message") (send-message push-socket zdat) (debug:print-info 11 "message sent") | | | | > > > > | | | | | > > > > | | | | | 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 | (query-sig (message-digest-string (md5-primitive) (conc qtype immediate params))) (zdat (db:obj->string (vector client-sig qtype immediate query-sig params (current-seconds)))) ;; (with-output-to-string (lambda ()(serialize params)))) (res #f) (send-receive (lambda () (debug:print-info 11 "sending message") (send-message push-socket zdat) (debug:print-info 11 "message sent") (let loop () ;; get the sender info ;; this should match (server:get-client-signature) ;; we will need to process "all" messages here some day (receive-message* sub-socket) ;; now get the actual message (let ((myres (db:string->obj (receive-message* sub-socket)))) (if (equal? query-sig (vector-ref myres 1)) (set! res (vector-ref myres 2)) (loop)))))) (timeout (lambda () (let loop ((n numretries)) (thread-sleep! 20) (if (not res) (if (> numretries 0) (begin (debug:print 0 "WARNING: no reply to query " params ", trying resend") (debug:print-info 11 "re-sending message") (send-message push-socket zdat) (debug:print-info 11 "message re-sent") (loop (- n 1))) ;; (apply cdb:client-call zmq-sockets qtype immediate (- numretries 1) params)) (begin (debug:print 0 "ERROR: cdb:client-call timed out " params ", exiting.") (exit 5)))))))) (debug:print-info 11 "Starting threads") (let ((th1 (make-thread send-receive "send receive")) (th2 (make-thread timeout "timeout"))) (thread-start! th1) (thread-start! th2) (thread-join! th1) (debug:print-info 11 "cdb:client-call returning res=" res) |
︙ | ︙ | |||
1228 1229 1230 1231 1232 1233 1234 | '(test-set-rundir-by-test-id "UPDATE tests SET rundir=? WHERE id=?") '(test-set-rundir "UPDATE tests SET rundir=? WHERE run_id=? AND testname=? AND item_path=?;") '(delete-tests-in-state "DELETE FROM tests WHERE state=? AND run_id=?;") '(tests:test-set-toplog "UPDATE tests SET final_logf=? WHERE run_id=? AND testname=? AND item_path='';") )) ;; do not run these as part of the transaction | | | > | 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 | '(test-set-rundir-by-test-id "UPDATE tests SET rundir=? WHERE id=?") '(test-set-rundir "UPDATE tests SET rundir=? WHERE run_id=? AND testname=? AND item_path=?;") '(delete-tests-in-state "DELETE FROM tests WHERE state=? AND run_id=?;") '(tests:test-set-toplog "UPDATE tests SET final_logf=? WHERE run_id=? AND testname=? AND item_path='';") )) ;; do not run these as part of the transaction (define db:special-queries '(rollup-tests-pass-fail db:roll-up-pass-fail-counts login immediate flush sync set-verbosity killserver)) ;; not used, intended to indicate to run in calling process (define db:run-local-queries '()) ;; rollup-tests-pass-fail)) ;; The queue is a list of vectors where the zeroth slot indicates the type of query to |
︙ | ︙ | |||
1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 | (let outerloop ((special-qry #f) (stmts data)) (debug:print-info 11 "special-qry=" special-qry ", stmts=" stmts) (if special-qry ;; handle a query that cannot be part of the grouped queries (let* ((stmt-key (cdb:packet-get-qtype special-qry)) (return-address (cdb:packet-get-client-sig special-qry)) (qry (hash-table-ref/default queries stmt-key #f)) (params (cdb:packet-get-params special-qry))) (debug:print-info 11 "Special queries/requests stmt-key=" stmt-key ", return-address=" return-address ", qry=" qry ", params=" params) (cond ;; Special queries ((string? qry) (apply sqlite3:execute db qry params) | > | | | | | | | | > | | 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 | (let outerloop ((special-qry #f) (stmts data)) (debug:print-info 11 "special-qry=" special-qry ", stmts=" stmts) (if special-qry ;; handle a query that cannot be part of the grouped queries (let* ((stmt-key (cdb:packet-get-qtype special-qry)) (qry-sig (cdb:packet-get-query-sig special-qry)) (return-address (cdb:packet-get-client-sig special-qry)) (qry (hash-table-ref/default queries stmt-key #f)) (params (cdb:packet-get-params special-qry))) (debug:print-info 11 "Special queries/requests stmt-key=" stmt-key ", return-address=" return-address ", qry=" qry ", params=" params) (cond ;; Special queries ((string? qry) (apply sqlite3:execute db qry params) (server:reply pubsock return-address qry-sig #t #t)) ;; ((and (not (null? params)) ;; (procedure? (car params))) ;; (let ((proc (car params)) ;; (remparams (cdr params))) ;; ;; we are being handed a procedure so call it ;; (debug:print-info 11 "Running (apply " proc " " db " " remparams ")") ;; (server:reply pubsock return-address (apply proc db remparams)))) (else (case stmt-key ((immediate) (let ((proc (car params)) (remparams (cdr params))) ;; we are being handed a procedure so call it (debug:print-info 11 "Running (apply " proc " " remparams ")") (server:reply pubsock return-address qry-sig #t (apply proc remparams)))) ((login) (if (< (length params) 3) ;; should get toppath, version and signature '(#f "login failed due to missing params") ;; missing params (let ((calling-path (car params)) (calling-vers (cadr params)) (client-key (caddr params))) (if (and (equal? calling-path *toppath*) (equal? megatest-version calling-vers)) (begin (hash-table-set! *logged-in-clients* client-key (current-seconds)) (server:reply pubsock return-address qry-sig #t '(#t "successful login"))) ;; path matches - pass! Should vet the caller at this time ... (list #f (conc "Login failed due to mismatch paths: " calling-path ", " *toppath*)))))) ((flush sync) (server:reply pubsock return-address qry-sig #t (length data))) ((set-verbosity) (set! *verbosity* (car params)) (server:reply pubsock return-address qry-sig #t '(#t *verbosity*))) ((killserver) (debug:print 0 "WARNING: Server going down in 15 seconds by user request!") (open-run-close tasks:server-deregister tasks:open-db (cadr *server-info*) pullport: (caddr *server-info*)) (thread-start! (make-thread (lambda ()(thread-sleep! 15)(exit)))) (server:reply pubsock return-address qry-sig #t '(#t "exit process started"))) (else (debug:print 0 "ERROR: Unrecognised queued call " qry " " params) (server:reply pubsock return-address qry-sig #f #t))))) (if (not (null? stmts)) (outerloop #f stmts))) ;; handle normal queries (let ((rem (sqlite3:with-transaction db (lambda () (debug:print-info 11 "flushing " stmts " to db") (if (null? stmts) stmts (let innerloop ((hed (car stmts)) (tal (cdr stmts))) (let ((params (cdb:packet-get-params hed)) (return-address (cdb:packet-get-client-sig hed)) (qry-sig (cdb:packet-get-query-sig hed)) (stmt-key (cdb:packet-get-qtype hed))) (if (or (not (hash-table-ref/default queries stmt-key #f)) (member stmt-key db:special-queries)) (begin (debug:print-info 11 "Handling special statement " stmt-key) (cons hed tal)) (begin (debug:print-info 11 "Executing " stmt-key " for " params) (apply sqlite3:execute (hash-table-ref queries stmt-key) params) (server:reply pubsock return-address qry-sig #t #t) (if (not (null? tal)) (innerloop (car tal)(cdr tal)) '())) )))))))) (if (not (null? rem)) (outerloop (car rem)(cdr rem)))))) (for-each (lambda (stmt-key) |
︙ | ︙ |
Modified server.scm from [5c1d13d6ac] to [cfe6037649].
︙ | ︙ | |||
116 117 118 119 120 121 122 | (set! *cache-on* #t) ;; what to do when we quit ;; (on-exit (lambda () (if (and *toppath* *server-info*) | < | | 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 | (set! *cache-on* #t) ;; what to do when we quit ;; (on-exit (lambda () (if (and *toppath* *server-info*) (open-run-close tasks:server-deregister-self tasks:open-db (car *server-info*)) (let loop () (let ((queue-len 0)) (thread-sleep! (random 5)) (mutex-lock! *incoming-mutex*) (set! queue-len (length *incoming-data*)) (mutex-unlock! *incoming-mutex*) (if (> queue-len 0) |
︙ | ︙ | |||
143 144 145 146 147 148 149 | (debug:print-info 12 "server=> received packet=" packet) (if #t ;; (cdb:packet-get-immediate packet) ;; process immediately or put in queue (begin (db:process-queue pub-socket (cons packet queue-lst)) (loop '())) (loop (cons packet queue-lst))))))) | | | | > | | | | | | | | | | | | | | | | | | | | | | | | | | 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 | (debug:print-info 12 "server=> received packet=" packet) (if #t ;; (cdb:packet-get-immediate packet) ;; process immediately or put in queue (begin (db:process-queue pub-socket (cons packet queue-lst)) (loop '())) (loop (cons packet queue-lst))))))) (define (server:reply pubsock target query-sig success/fail result) (debug:print-info 11 "server:reply target=" target ", result=" result) (send-message pubsock target send-more: #t) (send-message pubsock (db:obj->string (vector success/fail query-sig result)))) ;; run server:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; (define (server:keep-running) ;; if none running or if > 20 seconds since ;; server last used then start shutdown ;; This thread waits for the server to come alive (let* ((server-info (let loop () (let ((sdat #f)) (mutex-lock! *heartbeat-mutex*) (set! sdat *server-info*) (mutex-unlock! *heartbeat-mutex*) (if sdat sdat (begin (sleep 4) (loop)))))) (iface (cadr server-info)) (pullport (caddr server-info)) (pubport (cadddr server-info)) ;; id interface pullport pubport) (zmq-sockets (server:client-connect iface pullport pubport)) ) (let loop ((count 0)) (thread-sleep! 4) ;; no need to do this very often ;; NB// sync currently does NOT return queue-length (let ((queue-len (cdb:client-call zmq-sockets 'sync #t 1))) ;; (print "Server running, count is " count) (if (< count 1) ;; 3x3 = 9 secs aprox (loop (+ count 1))) ;; NOTE: Get rid of this mechanism! It really is not needed... (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info)) ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access (if (> (+ *last-db-access* ;; (* 48 60 60) ;; 48 hrs ;; 60 ;; one minute (* 60 60) ;; one hour ) (current-seconds)) (begin (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) *last-db-access*)) (loop 0)) (begin (debug:print-info 0 "Starting to shutdown the server.") ;; need to delete only *my* server entry (future use) (set! *time-to-exit* #t) (open-run-close tasks:server-deregister-self tasks:open-db (get-host-name)) (thread-sleep! 1) (debug:print-info 0 "Max cached queries was " *max-cache-size*) (debug:print-info 0 "Server shutdown complete. Exiting") (exit))))))) (define (server:find-free-port-and-open iface s port stype #!key (trynum 50)) (let ((s (if s s (make-socket stype))) (p (if (number? port) port 5555)) (old-handler (current-exception-handler))) (handle-exceptions exn |
︙ | ︙ |
Modified tests/tests.scm from [6502e24ce5] to [491ed287ed].
︙ | ︙ | |||
157 158 159 160 161 162 163 | (list "pass" "fail" "n/a")) (test "write env files" "nada.csh" (begin (save-environment-as-files "nada") (and (file-exists? "nada.sh") (file-exists? "nada.csh")))) | | | 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 | (list "pass" "fail" "n/a")) (test "write env files" "nada.csh" (begin (save-environment-as-files "nada") (and (file-exists? "nada.sh") (file-exists? "nada.csh")))) (test #f #t (cdb:client-call *runremote* 'immediate #t 1 (lambda ()(display "Got here eh!?") #t))) ;; (set! *verbosity* 20) (test #f *verbosity* (cadr (cdb:set-verbosity *runremote* *verbosity*))) (test #f #f (cdb:roll-up-pass-fail-counts *runremote* 1 "test1" "" "PASS")) ;; (set! *verbosity* 1) ;; (cdb:set-verbosity *runremote* *verbosity*) |
︙ | ︙ | |||
309 310 311 312 313 314 315 | (begin (vector-ref (hash-table-ref (open-run-close db:get-steps-table #f test-id) "step1") 4))) ;; (exit) (test #f "myrun" (cdb:remote-run db:get-run-name-from-id #f 1)) | | > | 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 | (begin (vector-ref (hash-table-ref (open-run-close db:get-steps-table #f test-id) "step1") 4))) ;; (exit) (test #f "myrun" (cdb:remote-run db:get-run-name-from-id #f 1)) (test #f #f (cdb:remote-run db:roll-up-pass-fail-counts #f 1 "nada" "" "PASS")) ;;====================================================================== ;; R E M O T E C A L L S ;;====================================================================== (define start-wait (current-seconds)) (print "Starting intensive cache and rpc test") (for-each (lambda (params) (print "Intensive: params=" params) (cdb:tests-register-test *runremote* 1 (conc "test" (random 20)) "") (apply cdb:test-set-status-state *runremote* test-id params) (cdb:pass-fail-counts *runremote* test-id (random 100) (random 100)) (cdb:test-rollup-test_data-pass-fail *runremote* test-id) (cdb:roll-up-pass-fail-counts *runremote* 1 "test1" "" (cadr params)) (thread-sleep! 0.01)) ;; cache ordering granularity is at the second level. Should really be at the ms level '(("COMPLETED" "PASS" #f) ("NOT_STARTED" "FAIL" "Just testing") ("NOT_STARTED" "FAIL" "Just testing") ("NOT_STARTED" "FAIL" "Just testing") ("COMPLETED" "PASS" #f) ("NOT_STARTED" "FAIL" "Just testing") |
︙ | ︙ |