Changes In Branch v1.80-servload Through [0ba83c29bb] Excluding Merge-Ins
This is equivalent to a diff from 66f2b72697 to 0ba83c29bb
2023-05-06
| ||
16:39 | Turn off find-and-mark-incomplete fully check-in: 3f75938dff user: matt tags: v1.80 | |
2023-05-03
| ||
21:49 | Sorta working but not really... check-in: 81dd2a2efe user: matt tags: v1.80-servload | |
19:05 | wip check-in: 0ba83c29bb user: mrwellan tags: v1.80-servload | |
2023-05-02
| ||
12:08 | Added warning on template changes. check-in: 1356471a2d user: mrwellan tags: v1.80-servload | |
2023-04-17
| ||
17:28 | cache get-test-info-by-id check-in: 1443998a16 user: matt tags: v1.80-servload | |
16:51 | possible fixes to test Leaf check-in: 4ef17eb32c user: matt tags: v1.80-possible-fixes | |
2023-04-16
| ||
16:43 | possible fix for bind issue check-in: 66f2b72697 user: matt tags: v1.80 | |
2023-04-15
| ||
09:14 | merged fork check-in: 532a2581c9 user: matt tags: v1.80 | |
Modified Makefile from [3b43ac8a9b] to [6812b9630b].
︙ | ︙ | |||
33 34 35 36 37 38 39 | diff-report.scm cgisetup/models/pgdb.scm # module source files MSRCFILES = dbfile.scm debugprint.scm mtargs.scm commonmod.scm dbmod.scm \ tcp-transportmod.scm rmtmod.scm portlogger.scm transport-mode.scm : transport-mode.scm.template | > > | > > | | 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | diff-report.scm cgisetup/models/pgdb.scm # module source files MSRCFILES = dbfile.scm debugprint.scm mtargs.scm commonmod.scm dbmod.scm \ tcp-transportmod.scm rmtmod.scm portlogger.scm transport-mode.scm : transport-mode.scm.template @if [[ -e transport-mode.scm ]];then \ echo "WARNING: transport-mode.scm.template is newer than transport-mode.scm"; else \ cp transport-mode.scm.template transport-mode.scm; fi dashboard-transport-mode.scm : dashboard-transport-mode.scm.template @if [[ -e dashboard-transport-mode.scm ]];then \ echo "WARNING: dashboard-transport-mode.scm.template is newer than dashboard-transport-mode.scm"; else \ cp dashboard-transport-mode.scm.template dashboard-transport-mode.scm; fi megatest.scm : transport-mode.scm dashboard.scm : dashboard-transport-mode.scm # dbmod.import.o is just a hack here mofiles/portlogger.o : mofiles/dbmod.o |
︙ | ︙ |
Modified TODO from [fa3d981ca6] to [4f2c585def].
︙ | ︙ | |||
55 56 57 58 59 60 61 | . Re-work the dbstruct data structure? .. Move main.db to global? .. [ run-id.db inmemdb last-mod last-read last-sync inuse ] . Re-work all queries to use run-id to dereference server . Open main.db directly in calls to -runtests etc. No need to talk remote? . remove common:faux-lock | > > > > > > > > > > > > > > > > > > > > > > > > | 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 | . Re-work the dbstruct data structure? .. Move main.db to global? .. [ run-id.db inmemdb last-mod last-read last-sync inuse ] . Re-work all queries to use run-id to dereference server . Open main.db directly in calls to -runtests etc. No need to talk remote? . remove common:faux-lock db:get-test-info-by-id db:get-test-state-status-by-id db:get-test-info - do a get id by name/item-path cache the id- use test id plus run id to get from cache need to do db:get-test-info-db look at html gen for items - rollup needs deduplication nonoverlap ;; cache write these with transaction db:teststep-set-status! db:test-set-top-process-id ;; called a lot, maybe from rollup? db:get-all-state-status-counts-for-test ;; load to move from server to client tests:summarize-items ;; appears to be on client tests:summarize-tests ;; converting rmt:set-tests-state-status 1. db:get-test-id needs rmt equiv 2. |
Modified api.scm from [9008afe383] to [1ab4975f7c].
︙ | ︙ | |||
317 318 319 320 321 322 323 | ((test-set-top-process-pid) (apply db:test-set-top-process-pid dbstruct params)) ((set-state-status-and-roll-up-items) (apply db:set-state-status-and-roll-up-items dbstruct params)) ((set-state-status-and-roll-up-run) (apply db:set-state-status-and-roll-up-run dbstruct params)) ((top-test-set-per-pf-counts) (apply db:top-test-set-per-pf-counts dbstruct params)) ((test-set-archive-block-id) (apply db:test-set-archive-block-id dbstruct params)) ((insert-test) (db:insert-test dbstruct run-id params)) | > | | 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 | ((test-set-top-process-pid) (apply db:test-set-top-process-pid dbstruct params)) ((set-state-status-and-roll-up-items) (apply db:set-state-status-and-roll-up-items dbstruct params)) ((set-state-status-and-roll-up-run) (apply db:set-state-status-and-roll-up-run dbstruct params)) ((top-test-set-per-pf-counts) (apply db:top-test-set-per-pf-counts dbstruct params)) ((test-set-archive-block-id) (apply db:test-set-archive-block-id dbstruct params)) ((insert-test) (db:insert-test dbstruct run-id params)) ((set-state-status-by-state-status) (apply db:set-state-status-by-state-status dbstruct params)) ;; RUNS ((register-run) (apply db:register-run dbstruct params)) ((set-tests-state-status) (apply db:set-tests-state-status dbstruct params)) ((delete-run) (apply db:delete-run dbstruct params)) ((lock/unlock-run) (apply db:lock/unlock-run dbstruct params)) ((update-run-event_time) (apply db:update-run-event_time dbstruct params)) ((update-run-stats) (apply db:update-run-stats dbstruct params)) |
︙ | ︙ | |||
395 396 397 398 399 400 401 402 403 404 405 406 407 408 | ((test-toplevel-num-items) (apply db:test-toplevel-num-items dbstruct params)) ((get-test-info-by-id) (apply db:get-test-info-by-id dbstruct params)) ((get-test-state-status-by-id) (apply db:get-test-state-status-by-id dbstruct params)) ((test-get-rundir-from-test-id) (apply db:test-get-rundir-from-test-id dbstruct params)) ((get-count-tests-running-for-testname) (apply db:get-count-tests-running-for-testname dbstruct params)) ((get-count-tests-running) (apply db:get-count-tests-running dbstruct params)) ((get-count-tests-running-in-jobgroup) (apply db:get-count-tests-running-in-jobgroup dbstruct params)) ;; ((delete-test-step-records) (apply db:delete-test-step-records dbstruct params)) ;; ((get-previous-test-run-record) (apply db:get-previous-test-run-record dbstruct params)) ((get-matching-previous-test-run-records)(apply db:get-matching-previous-test-run-records dbstruct params)) ((test-get-logfile-info) (apply db:test-get-logfile-info dbstruct params)) ((test-get-records-for-index-file) (apply db:test-get-records-for-index-file dbstruct params)) ((get-testinfo-state-status) (apply db:get-testinfo-state-status dbstruct params)) ((test-get-top-process-pid) (apply db:test-get-top-process-pid dbstruct params)) | > | 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 | ((test-toplevel-num-items) (apply db:test-toplevel-num-items dbstruct params)) ((get-test-info-by-id) (apply db:get-test-info-by-id dbstruct params)) ((get-test-state-status-by-id) (apply db:get-test-state-status-by-id dbstruct params)) ((test-get-rundir-from-test-id) (apply db:test-get-rundir-from-test-id dbstruct params)) ((get-count-tests-running-for-testname) (apply db:get-count-tests-running-for-testname dbstruct params)) ((get-count-tests-running) (apply db:get-count-tests-running dbstruct params)) ((get-count-tests-running-in-jobgroup) (apply db:get-count-tests-running-in-jobgroup dbstruct params)) ((get-all-state-status-counts-for-test) (apply db:get-all-state-status-counts-for-test dbstruct params)) ;; ((delete-test-step-records) (apply db:delete-test-step-records dbstruct params)) ;; ((get-previous-test-run-record) (apply db:get-previous-test-run-record dbstruct params)) ((get-matching-previous-test-run-records)(apply db:get-matching-previous-test-run-records dbstruct params)) ((test-get-logfile-info) (apply db:test-get-logfile-info dbstruct params)) ((test-get-records-for-index-file) (apply db:test-get-records-for-index-file dbstruct params)) ((get-testinfo-state-status) (apply db:get-testinfo-state-status dbstruct params)) ((test-get-top-process-pid) (apply db:test-get-top-process-pid dbstruct params)) |
︙ | ︙ |
Modified common.scm from [ec316c51cd] to [708849cceb].
︙ | ︙ | |||
168 169 170 171 172 173 174 | ;; (define *db-last-sync* 0) ;; last time the sync to megatest.db happened (define *db-sync-in-progress* #f) ;; if there is a sync in progress do not try to start another ;; (define *db-multi-sync-mutex* (make-mutex)) ;; protect access to *db-sync-in-progress*, *db-last-sync* ;; task db (define *task-db* #f) ;; (vector db path-to-db) (define *db-access-allowed* #t) ;; flag to allow access ;; (define *db-access-mutex* (make-mutex)) ;; moved to dbfile | < | 168 169 170 171 172 173 174 175 176 177 178 179 180 181 | ;; (define *db-last-sync* 0) ;; last time the sync to megatest.db happened (define *db-sync-in-progress* #f) ;; if there is a sync in progress do not try to start another ;; (define *db-multi-sync-mutex* (make-mutex)) ;; protect access to *db-sync-in-progress*, *db-last-sync* ;; task db (define *task-db* #f) ;; (vector db path-to-db) (define *db-access-allowed* #t) ;; flag to allow access ;; (define *db-access-mutex* (make-mutex)) ;; moved to dbfile (define *db-cache-path* #f) ;; (define *db-with-db-mutex* (make-mutex)) (define *db-api-call-time* (make-hash-table)) ;; hash of command => (list of times) ;; SERVER (define *transport-type* 'http) ;; override with [server] transport http|rpc|nmsg (define *runremote* #f) ;; if set up for server communication this will hold <host port> |
︙ | ︙ |
Modified dashboard.scm from [63dbb0e44d] to [3d1081a532].
︙ | ︙ | |||
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 | ;; RA => sets the tabdat passed to the hashkey at commondat:tabdats hash table ;; (define (dboard:common-set-tabdat! commondat tabnum tabdat) (hash-table-set! (dboard:commondat-tabdats commondat) tabnum tabdat)) ;; gets and calls updater list based on curr-tab-num ;; (define (dboard:common-run-curr-updaters commondat #!key (tab-num #f)) ;; (sync-db-to-tmp (dboard:common-get-tabdat commondat tab-num: tab-num)) ;; no longer applies ;; maybe need sleep here? (if (dboard:common-get-tabdat commondat tab-num: tab-num) ;; only update if there is a tabdat (let* ((tnum (or tab-num (dboard:commondat-curr-tab-num commondat))) (updaters (hash-table-ref/default (dboard:commondat-updaters commondat) tnum '()))) | > > > > > | | | > | | > > > | > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 | ;; RA => sets the tabdat passed to the hashkey at commondat:tabdats hash table ;; (define (dboard:common-set-tabdat! commondat tabnum tabdat) (hash-table-set! (dboard:commondat-tabdats commondat) tabnum tabdat)) (define *updaters-running* #f) ;; gets and calls updater list based on curr-tab-num ;; (define (dboard:common-run-curr-updaters commondat #!key (tab-num #f)) ;; (sync-db-to-tmp (dboard:common-get-tabdat commondat tab-num: tab-num)) ;; no longer applies ;; maybe need sleep here? (if (dboard:common-get-tabdat commondat tab-num: tab-num) ;; only update if there is a tabdat (let* ((tnum (or tab-num (dboard:commondat-curr-tab-num commondat))) (updaters (hash-table-ref/default (dboard:commondat-updaters commondat) tnum '()))) (if *updaters-running* (debug:print 0 *default-log-port* "updaters still running.") (let* ((th1 (make-thread (lambda () (debug:print 4 *default-log-port* "Found these updaters: " updaters " for tab-num: " tnum) (for-each ;; perform the function calls for the complete updaters list (lambda (updater) (let ((start-ms (current-milliseconds))) (debug:print 0 *default-log-port* "Running updater for tnum "tnum", "updater) (updater) (debug:print 0 *default-log-port* "Running updater for tnum "tnum" took "(- (current-milliseconds) start-ms)"ms") )) updaters)) "updaters thread")) (th2 (make-thread (lambda () (let loop () (case (thread-state th1) ((terminated) (debug:print 0 *default-log-port* "th1 terminated, all done for now.")) ((running) (debug:print 0 *default-log-port* "th1 running, suspending now.") (thread-suspend! th1) (thread-sleep! 0.1) (loop)) ((sleeping) (debug:print 0 *default-log-port* "th1 sleeping, resuming now.") (thread-resume! th1) (thread-sleep! 0.9) (loop)) ((dead) (debug:print 0 *default-log-port* "th1 "(thread-state th1)", what's next?")) (else (debug:print 0 *default-log-port* "th1 "(thread-state th1)", what's next?") (thread-sleep! 0.5) (loop)))))))) (set! *updaters-running* #t) (thread-start! th1) (thread-sleep! 0.1) (thread-start! th2) (thread-join! th1) (set! *updaters-running* #f)))))) ;; if tab-num passed in then use it, otherwise look in commondat at curr-tab-num ;; adds the updater passed in the updaters list at that hashkey ;; (define (dboard:commondat-add-updater commondat updater #!key (tab-num #f)) (let* ((tnum (or tab-num (dboard:commondat-curr-tab-num commondat))) |
︙ | ︙ |
Modified db.scm from [3353e0f2ec] to [79e052fa8d].
︙ | ︙ | |||
2272 2273 2274 2275 2276 2277 2278 | (sqlite3:with-transaction db (lambda () (sqlite3:execute db "DELETE FROM test_steps WHERE test_id IN (SELECT id FROM tests WHERE state='DELETED' AND event_time<?);" targtime) (sqlite3:execute db "DELETE FROM test_data WHERE test_id IN (SELECT id FROM tests WHERE state='DELETED' AND event_time<?);" targtime) (sqlite3:execute db "DELETE FROM tests WHERE state='DELETED' AND event_time<?;" targtime))))))) | < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < > > > > | 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 | (sqlite3:with-transaction db (lambda () (sqlite3:execute db "DELETE FROM test_steps WHERE test_id IN (SELECT id FROM tests WHERE state='DELETED' AND event_time<?);" targtime) (sqlite3:execute db "DELETE FROM test_data WHERE test_id IN (SELECT id FROM tests WHERE state='DELETED' AND event_time<?);" targtime) (sqlite3:execute db "DELETE FROM tests WHERE state='DELETED' AND event_time<?;" targtime))))))) ;; ;; speed up for common cases with a little logic ;; ;; NB// Ultimately this will be deprecated in deference to mt:test-set-state-status-by-id ;; ;; NOTE: run-id is not used ;; ;; (define (db:test-set-state-status dbstruct run-id test-id newstate newstatus newcomment) (db:with-db dbstruct run-id #t (lambda (dbdat db) (db:test-set-state-status-db db run-id test-id newstate newstatus newcomment)))) (define (db:test-set-state-status-db db run-id test-id newstate newstatus newcomment) ;; clear cache after this, I think that makes sense (cond ((and newstate newstatus newcomment) (sqlite3:execute db "UPDATE tests SET state=?,status=?,comment=? WHERE id=?;" newstate newstatus newcomment ;; (sdb:qry 'getid newcomment) test-id)) ((and newstate newstatus) (sqlite3:execute db "UPDATE tests SET state=?,status=? WHERE id=?;" newstate newstatus test-id)) (else (if newstate (sqlite3:execute db "UPDATE tests SET state=? WHERE id=?;" newstate test-id)) (if newstatus (sqlite3:execute db "UPDATE tests SET status=? WHERE id=?;" newstatus test-id)) (if newcomment (sqlite3:execute db "UPDATE tests SET comment=? WHERE id=?;" newcomment ;; (sdb:qry 'getid newcomment) test-id)))) ;; (mt:process-triggers dbstruct run-id test-id newstate newstatus)) ;; NOTE: Moved into calling function (let* ((hash-key (cons run-id test-id))) (hash-table-delete! *db:get-test-info-by-id-cache* hash-key) (hash-table-delete! *db:get-test-state-status-by-id-cache* hash-key)) ) ;; NEW BEHAVIOR: Count tests running in all runs! ;; (define (db:get-count-tests-running dbstruct run-id) ;; fastmode) (let* ((qry ;; (if fastmode ;; "SELECT count(id) FROM tests WHERE state in ('RUNNING','LAUNCHED','REMOTEHOSTSTART') AND NOT (uname = 'n/a' AND item_path = '') LIMIT 1;" |
︙ | ︙ | |||
2448 2449 2450 2451 2452 2453 2454 2455 2456 | #f (lambda (dbdat db) (sqlite3:first-result db "SELECT count(id) FROM tests WHERE state in ('LAUNCHED','NOT_STARTED','REMOTEHOSTSTART','RUNNING','KILLREQ') AND run_id=?;") run-id))) ;; map run-id, testname item-path to test-id (define (db:get-test-id dbstruct run-id testname item-path) | > > > > > > | | | | | | | | | | > > | 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 | #f (lambda (dbdat db) (sqlite3:first-result db "SELECT count(id) FROM tests WHERE state in ('LAUNCHED','NOT_STARTED','REMOTEHOSTSTART','RUNNING','KILLREQ') AND run_id=?;") run-id))) (define *db:get-test-id-cache* (make-hash-table)) ;; map run-id, testname item-path to test-id (define (db:get-test-id dbstruct run-id testname item-path) (let* ((hash-key (list run-id testname item-path)) (cache-result (hash-table-ref/default *db:get-test-id-cache* hash-key #f))) (if cache-result (cdr cache-result) (let* ((res (db:with-db dbstruct run-id #f (lambda (dbdat db) (db:first-result-default db "SELECT id FROM tests WHERE testname=? AND item_path=? AND run_id=?;" #f ;; the default testname item-path run-id))))) (if res (hash-table-set! *db:get-test-id-cache* hash-key (cons (current-seconds) res))) res)))) ;; overload the unused attemptnum field for the process id of the runscript or ;; ezsteps step script in progress ;; (define (db:test-set-top-process-pid dbstruct run-id test-id pid) (db:with-db dbstruct |
︙ | ︙ | |||
2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 | (let* ((run-ids (db:get-all-run-ids mtdb))) (for-each (lambda (run-id) (let ((testrecs (db:get-all-tests-info-by-run-id mtdb run-id))) (db:prep-megatest.db-adj-test-ids (dbr:dbdat-dbh mtdb) run-id testrecs))) run-ids))) ;; Get test data using test_id ;; (define (db:get-test-info-by-id dbstruct run-id test-id) | > > > > > > | | | | | | | | | | | | | | | > | > > > > > > | | | | | | | | | | | | | > | < < < < < < < < < < < < < < < > > | | | | | | | 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 | (let* ((run-ids (db:get-all-run-ids mtdb))) (for-each (lambda (run-id) (let ((testrecs (db:get-all-tests-info-by-run-id mtdb run-id))) (db:prep-megatest.db-adj-test-ids (dbr:dbdat-dbh mtdb) run-id testrecs))) run-ids))) (define *db:get-test-info-by-id-cache* (make-hash-table)) ;; Get test data using test_id ;; (define (db:get-test-info-by-id dbstruct run-id test-id) (let* ((hash-key (cons run-id test-id)) (cache-result (hash-table-ref/default *db:get-test-info-by-id-cache* hash-key #f))) (if cache-result (cdr cache-result) (db:with-db dbstruct run-id #f (lambda (dbdat db) (let ((res #f)) (sqlite3:for-each-row ;; attemptnum added to hold pid of top process (not Megatest) controlling a test (lambda (id run-id testname state status event-time host cpuload diskfree uname rundir-id item-path run_duration final-logf-id comment short-dir-id attemptnum archived last-update) ;; 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 (set! res (vector id run-id testname state status event-time host cpuload diskfree uname rundir-id item-path run_duration final-logf-id comment short-dir-id attemptnum archived last-update))) db ;; (db:get-cache-stmth dbdat db ;; (conc "SELECT " db:test-record-qry-selector " FROM tests WHERE id=? AND run_id=?;")) (conc "SELECT " db:test-record-qry-selector " FROM tests WHERE id=? AND run_id=?;") test-id run-id) (hash-table-set! *db:get-test-info-by-id-cache* hash-key (cons (current-seconds) res)) res)))))) (define *db:get-test-state-status-by-id-cache* (make-hash-table)) ;; Get test state, status using test_id ;; (define (db:get-test-state-status-by-id dbstruct run-id test-id) (let* ((hash-key (cons run-id test-id)) (cache-result (hash-table-ref/default *db:get-test-state-status-by-id-cache* hash-key #f))) (if cache-result (cdr cache-result) (db:with-db dbstruct run-id #f (lambda (dbdat db) (let ((res (cons #f #f))) ;; (stmth (db:get-cache-stmth dbdat db "SELECT state,status FROM tests WHERE id=?;"))) (sqlite3:for-each-row ;; attemptnum added to hold pid of top process (not Megatest) controlling a test (lambda (state status) (cons state status)) db "SELECT state,status FROM tests WHERE id=? AND run_id=?;" ;; stmth try not compiling this one - yes, this fixed the bind issue test-id run-id) (hash-table-set! *db:get-test-state-status-by-id-cache* hash-key (cons (current-seconds) res)) res)))))) ;; Use db:test-get* to access ;; Get test data using test_ids. NB// Only works within a single run!! ;; (define (db:get-test-info-by-ids dbstruct run-id test-ids) (db:with-db dbstruct run-id #f (lambda (dbdat db) (let ((res '())) (sqlite3:for-each-row (lambda (a . b) ;; 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 (set! res (cons (apply vector a b) res))) db (conc "SELECT " db:test-record-qry-selector " FROM tests WHERE id in (" (string-intersperse (map conc test-ids) ",") ");")) res)))) (define (db:get-test-info dbstruct run-id test-name item-path) (let* ((test-id (db:get-test-id dbstruct run-id test-name item-path))) (db:get-test-info-by-id dbstruct run-id test-id))) ;; (db:with-db ;; dbstruct ;; run-id ;; #f ;; (lambda (dbdat db) ;; (db:get-test-info-db db run-id test-name item-path)))) (define (db:get-test-info-db db run-id test-name item-path) (let ((res #f)) (sqlite3:for-each-row (lambda (a . b) (set! res (apply vector a b))) db |
︙ | ︙ | |||
3097 3098 3099 3100 3101 3102 3103 | (begin (debug:print-error 0 *default-log-port* "reception failed. Received \"" msg "\" but cannot translate it.") (print-call-chain (current-error-port)) msg))) ;; crude reply for when things go awry ((zmq nmsg)(with-input-from-string msg (lambda ()(deserialize)))) (else msg))) ;; rpc | < < < < < < < < < < < < < < < | 3075 3076 3077 3078 3079 3080 3081 3082 3083 3084 3085 3086 3087 3088 | (begin (debug:print-error 0 *default-log-port* "reception failed. Received \"" msg "\" but cannot translate it.") (print-call-chain (current-error-port)) msg))) ;; crude reply for when things go awry ((zmq nmsg)(with-input-from-string msg (lambda ()(deserialize)))) (else msg))) ;; rpc ;; state is the priority rollup of all states ;; status is the priority rollup of all completed statesfu ;; ;; if test-name is an integer work off that as test-id instead of test-name test-path ;; (define (db:set-state-status-and-roll-up-items dbstruct run-id test-name item-path state status comment) ;; establish info on incoming test followed by info on top level test |
︙ | ︙ | |||
3150 3151 3152 3153 3154 3155 3156 | (let ((tr-res (sqlite3:with-transaction db (lambda () ;; NB// Pass the db so it is part fo the transaction (db:test-set-state-status-db db run-id test-id state status comment) ;; this call sets the item state/status (if (not (equal? item-path "")) ;; only roll up IF incoming test is an item | | | > | 3113 3114 3115 3116 3117 3118 3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 | (let ((tr-res (sqlite3:with-transaction db (lambda () ;; NB// Pass the db so it is part fo the transaction (db:test-set-state-status-db db run-id test-id state status comment) ;; this call sets the item state/status (if (not (equal? item-path "")) ;; only roll up IF incoming test is an item (let* ((state-status-counts (db:get-all-state-status-counts-for-test-db db run-id test-name item-path state status)) ;; item-path is used to exclude current state/status of THIS test (state-statuses (db:roll-up-rules state-status-counts state status)) (newstate (car state-statuses)) (newstatus (cadr state-statuses))) (set! new-state-eh newstate) (set! new-status-eh newstatus) (debug:print 4 *default-log-port* "BB> tl-test-id="tl-test-id" ; "test-name":"item-path " newstate="newstate" newstatus="newstatus" len(sscs)="(length state-status-counts) " state-status-counts: " (apply conc (map (lambda (x) (conc (with-output-to-string (lambda () (pp (dbr:counts->alist x)))) " | ")) state-status-counts))); end debug:print (if tl-test-id (db:test-set-state-status-db db run-id tl-test-id newstate newstatus #f)) ;; we are still in the transaction - must access the db and not the dbstruct |
︙ | ︙ | |||
3263 3264 3265 3266 3267 3268 3269 | (define (db:get-all-state-status-counts-for-run dbstruct run-id) (db:with-db dbstruct #f #f (lambda (dbdat db) (db:get-all-state-status-counts-for-run-db dbdat db run-id)))) | > > > > > | | | 3227 3228 3229 3230 3231 3232 3233 3234 3235 3236 3237 3238 3239 3240 3241 3242 3243 3244 3245 3246 3247 3248 3249 3250 | (define (db:get-all-state-status-counts-for-run dbstruct run-id) (db:with-db dbstruct #f #f (lambda (dbdat db) (db:get-all-state-status-counts-for-run-db dbdat db run-id)))) (define (db:get-all-state-status-counts-for-test dbstruct run-id test-name item-path item-state-in item-status-in) (db:with-db dbstruct run-id #f (lambda (dbdat db) (db:get-all-state-status-counts-for-test-db db run-id test-name item-path item-state-in item-status-in)))) ;; BBnote: db:get-all-state-status-counts-for-test returns dbr:counts object aggregating state and status of items of a given test, *not including rollup state/status* ;; ;; NOTE: This is called within a transaction ;; (define (db:get-all-state-status-counts-for-test-db db run-id test-name item-path item-state-in item-status-in) (let* ((test-info (db:get-test-info-db db run-id test-name item-path)) (item-state (or item-state-in (db:test-get-state test-info))) (item-status (or item-status-in (db:test-get-status test-info))) (other-items-count-recs (sqlite3:map-row (lambda (state status count) (make-dbr:counts state: state status: status count: count)) db |
︙ | ︙ |
Modified dbmod.scm from [b9113d296b] to [4ac7149f64].
︙ | ︙ | |||
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | (module dbmod * (import scheme chicken data-structures extras (prefix sqlite3 sqlite3:) posix typed-records srfi-1 srfi-18 srfi-69 commonmod | > > | 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | (module dbmod * (import scheme chicken data-structures extras files (prefix sqlite3 sqlite3:) matchable posix typed-records srfi-1 srfi-18 srfi-69 commonmod |
︙ | ︙ | |||
153 154 155 156 157 158 159 160 161 162 163 164 165 166 | (lambda () (let* ((db (sqlite3:open-database dbfullname)) (handler (sqlite3:make-busy-timeout 136000))) (sqlite3:set-busy-handler! db handler) (if write-access (init-proc db)) db)))) (define *sync-in-progress* #f) ;; Open the inmem db and the on-disk db ;; populate the inmem db with data ;; ;; Updates fields in dbstruct | > > > > > > > > > > > > > > > > | 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 | (lambda () (let* ((db (sqlite3:open-database dbfullname)) (handler (sqlite3:make-busy-timeout 136000))) (sqlite3:set-busy-handler! db handler) (if write-access (init-proc db)) db)))) ;; try every second until tries times proc ;; (define (db:keep-trying-until-true proc params tries) (let* ((res (apply proc params))) (if res res (if (> tries 0) (begin (thread-sleep! 1) (db:keep-trying-until-true proc params (- tries 1))) (begin ;; (debug:print-info 0 *default-log-port* "proc never returned true, params="params) (print"db:keep-trying-until-true proc never returned true, proc = " proc " params =" params " tries = " tries) #f))))) (define *sync-in-progress* #f) ;; Open the inmem db and the on-disk db ;; populate the inmem db with data ;; ;; Updates fields in dbstruct |
︙ | ︙ | |||
205 206 207 208 209 210 211 | (dbr:dbstruct-sync-proc-set! dbstruct (lambda (last-update) (if *sync-in-progress* (debug:print 3 *default-log-port* "WARNING: overlapping calls to sync to disk") (begin (mutex-lock! *db-with-db-mutex*) ;; this mutex is used when overloaded or during a query that modifies the db (set! *sync-in-progress* #t) | | | > | > | | 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 | (dbr:dbstruct-sync-proc-set! dbstruct (lambda (last-update) (if *sync-in-progress* (debug:print 3 *default-log-port* "WARNING: overlapping calls to sync to disk") (begin (mutex-lock! *db-with-db-mutex*) ;; this mutex is used when overloaded or during a query that modifies the db (set! *sync-in-progress* #t) #;(dbmod:sync-gasket tables last-update inmem db dbfullname syncdir) (system (conc "megatest -db2db -from "tmpdb" -to "dbfname"&")) (mutex-unlock! *db-with-db-mutex*) (thread-sleep! 0.5) ;; ensure at least 1/2 second down time between sync calls (set! *sync-in-progress* #f))))) ;; (dbmod:sync-tables tables #f db inmem) ;; (if db (dbmod:sync-gasket tables #f inmem db dbfullname 'fromdest) ;; ) ;; load into inmem (dbr:dbstruct-last-update-set! dbstruct (current-seconds)) ;; should this be offset back in time by one second? dbstruct)) ;; (if (eq? syncdir 'todisk) ;; sync to disk normally, sync from in dashboard ;; (dbmod:sync-tables tables last-update inmem db) ;; (dbmod:sync-tables tables last-update db inmem)))) ;; direction: 'fromdest 'todest ;; (define (dbmod:sync-gasket tables last-update inmem dbh dbfname direction) (assert (sqlite3:database? inmem) "FATAL: sync-gasket: inmem is not a db") (assert (sqlite3:database? dbh) "FATAL: sync-gasket: dbh is not a db") (debug:print-info 0 *default-log-port* "Db sync using "(dbfile:sync-method)" method") (case (dbfile:sync-method) ((none) #f) ((attach) (dbmod:attach-sync tables inmem dbfname direction)) ((newsync) ;; DON'T USE THIS ONE. IT IS BORKED (dbmod:new-sync tables inmem dbh dbfname direction)) (else (case direction ((todisk) (dbmod:sync-tables tables last-update inmem dbh) ) (else |
︙ | ︙ | |||
276 277 278 279 280 281 282 | ;; IFF field-name exists ;; ;; Use (db:sync-all-tables-list keys) to get the tbls input ;; (define (dbmod:sync-tables tbls last-update fromdb todb) (assert (sqlite3:database? fromdb) "FATAL: dbmod:sync-tables called with fromdb not a database" fromdb) (assert (sqlite3:database? todb) "FATAL: dbmod:sync-tables called with fromdb not a database" todb) | > > | < < < < < < < < < < < < < < < < < < < < < < < < < < < < < | < < < < < < < | | < < < < < < < < < < < < < | < < < | < < < | < < < < < < < < < < < | < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < | < < < < | < < < < < < < < < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 | ;; IFF field-name exists ;; ;; Use (db:sync-all-tables-list keys) to get the tbls input ;; (define (dbmod:sync-tables tbls last-update fromdb todb) (assert (sqlite3:database? fromdb) "FATAL: dbmod:sync-tables called with fromdb not a database" fromdb) (assert (sqlite3:database? todb) "FATAL: dbmod:sync-tables called with fromdb not a database" todb) (let ((specials '(("keys" . "fieldname") ("meta" . "var"))) (stmts (make-hash-table)) ;; table-field => stmt (all-stmts '()) ;; ( ( stmt1 value1 ) ( stml2 value2 )) (numrecs (make-hash-table)) (start-time (current-milliseconds)) (tot-count 0)) (for-each ;; table (lambda (tabledat) (let* ((count (match tabledat ((tablename . fields) (debug:print-info 0 *default-log-port* "Syncing table "tablename) (dbmod:sync-table tablename fields fromdb todb (alist-ref tablename specials equal?))) (else (debug:print-warn 0 *default-log-port* "Bad tabledat entry: "tabledat) 0)))) (set! tot-count (+ tot-count count)))) tbls) (debug:print-info 0 *default-log-port* "dbmod:sync-tables completed in "(- (current-milliseconds) start-time)"ms") tot-count)) (define (dbmod:sync-table tablename fields from-db to-db keyfield) (let* ((field-names (map car fields)) (has-last-update (member "last_update" field-names)) (fields-sans-lu (filter (lambda (x) (not (member x '("id" "last_update")))) field-names)) (get-ids (lambda (db) (sqlite3:fold-row (lambda (res id) (cons id res)) '() db (conc "SELECT id FROM "tablename";")))) (get-val (lambda (db fieldname id) (let* ((res #f) (sql (conc "SELECT "fieldname" FROM "tablename" WHERE id=?;"))) (sqlite3:for-each-row (lambda (val) (set! res val)) db sql id) ;; (debug:print-info 0 *default-log-port* "get-val "db" "fieldname" "id", sql="sql", res="res) res))) (get-row (lambda (db id) (let* ((res #f)) (sqlite3:for-each-row (lambda tuple (set! res tuple)) db (conc "SELECT " (string-intersperse fields-sans-lu ",") " FROM "tablename" WHERE id=?;") id) res))) (ins-row (lambda (db id row) (let* ((qry (conc "INSERT INTO "tablename" (id," (string-intersperse fields-sans-lu ",") ") VALUES ("id"," (string-intersperse (make-list (length fields-sans-lu) "?") ",") ");"))) ;; (debug:print-info 0 *default-log-port* "qry="qry) (apply sqlite3:execute db qry row)))) (num-inserts 0) (num-updates 0) ) ;; (debug:print-info 0 *default-log-port* "field-names: "field-names", fields-sans-lu: "fields-sans-lu) ;; (sqlite3:with-transaction ;; from-db ;; (lambda () (let* ((from-ids (get-ids from-db))) ;; (debug:print-info 0 *default-log-port* "Table "tablename", has "(length from-ids)" records.") ;; (sqlite3:with-transaction ;; to-db ;; (lambda () (let* ((to-ids (get-ids to-db))) ;; (debug:print 0 *default-log-port* "to-ids="to-ids) (for-each ;; from-id (lambda (from-id) (if (member from-id to-ids) (for-each ;; case where record exists, do one by one the fields if different (lambda (fieldname) (let* ((from-val (get-val from-db fieldname from-id)) (dest-val (get-val to-db fieldname from-id))) #;(debug:print 0 *default-log-port* "fieldname="fieldname ", from-id="from-id ", from-val="from-val ", dest-val="dest-val ) (if (not (equal? from-val dest-val)) (begin (sqlite3:execute to-db (conc "UPDATE "tablename" SET "fieldname"=? WHERE id=?;") from-val from-id) (set! num-updates (+ num-updates 1)))))) fields-sans-lu) (let ((row (get-row from-db from-id))) ;; need to insert the row ;; (debug:print 0 *default-log-port* "row="row) (set! num-inserts (+ num-inserts 1)) (ins-row to-db from-id row)))) from-ids)));; )))) (+ num-inserts num-updates))) ;; (for-each ;; table ;; (lambda (tabledat) ;; (let* ((tablename (car tabledat)) ;; (fields (cdr tabledat)) ;; (has-last-update (member "last_update" fields)) ;; (use-last-update (dbmod:calc-use-last-update has-last-update fields last-update)) ;; (last-update-value (if use-last-update ;; no need to check for has-last-update - it is already accounted for ;; (if (number? last-update) ;; last-update ;; (cdr last-update)) ;; #f)) ;; (last-update-field (if use-last-update ;; (if (number? last-update) ;; "last_update" ;; (car last-update)) ;; #f)) ;; (num-fields (length fields)) ;; (field->num (make-hash-table)) ;; (num->field (apply vector (map car fields))) ;; BBHERE ;; (full-sel (conc "SELECT " (string-intersperse (map car fields) ",") ;; " FROM " tablename (if use-last-update ;; apply last-update criteria ;; (conc " WHERE " last-update-field " >= " last-update-value) ;; "") ;; ";")) ;; (full-ins (conc "INSERT OR REPLACE INTO " tablename " ( " (string-intersperse (map car fields) ",") " ) " ;; " VALUES ( " (string-intersperse (make-list num-fields "?") ",") " );")) ;; (fromdat '()) ;; (fromdats '()) ;; (totrecords 0) ;; (batch-len 10000000) ;; (string->number (or (configf:lookup *configdat* "sync" "batchsize") "100"))) ;; (todat (make-hash-table)) ;; (count 0) ;; (field-names (map car fields))) ;; ;; (debug:print-info 0 *default-log-port* "Syncing table "tablename) ;; ;; ;; set up the field->num table ;; (for-each ;; (lambda (field) ;; (hash-table-set! field->num field count) ;; (set! count (+ count 1))) ;; fields) ;; ;; ;; read the source table ;; ;; store a list of all rows in the table in fromdat, up to batch-len. ;; ;; Then add fromdat to the fromdats list, clear fromdat and repeat. ;; (sqlite3:for-each-row ;; (lambda (a . b) ;; (set! fromdat (cons (apply vector a b) fromdat)) ;; (if (> (length fromdat) batch-len) ;; (begin ;; (set! fromdats (cons fromdat fromdats)) ;; (set! fromdat '()) ;; (set! totrecords (+ totrecords 1))))) ;; fromdb ;; full-sel) ;; ;; (debug:print-info 0 *default-log-port* "Have "totrecords" records to update.") ;; ;; Count less than batch-len as a record ;; (if (> (length fromdat) 0) ;; (set! totrecords (+ totrecords 1))) ;; ;; ;; tack on remaining records in fromdat ;; (if (not (null? fromdat)) ;; (set! fromdats (cons fromdat fromdats))) ;; ;; (sqlite3:for-each-row ;; (lambda (a . b) ;; (hash-table-set! todat a (apply vector a b))) ;; todb ;; full-sel) ;; ;; ;; first pass implementation, just insert all changed rows ;; ;; (let* ((db todb) ;; (has-last-update (member "last_update" field-names)) ;; (drp-trigger (if has-last-update ;; (db:drop-trigger db tablename) ;; #f)) ;; (is-trigger-dropped (if has-last-update ;; (db:is-trigger-dropped db tablename) ;; #f)) ;; (stmth (sqlite3:prepare db full-ins)) ;; (changed-rows 0)) ;; (for-each ;; (lambda (fromdat-lst) ;; (mutex-lock! *db-transaction-mutex*) ;; (sqlite3:with-transaction ;; db ;; (lambda () ;; (for-each ;; ;; (lambda (fromrow) ;; (let* ((a (vector-ref fromrow 0)) ;; (curr (hash-table-ref/default todat a #f)) ;; (same #t)) ;; (let loop ((i 0)) ;; (if (or (not curr) ;; (not (equal? (vector-ref fromrow i)(vector-ref curr i)))) ;; (set! same #f)) ;; (if (and same ;; (< i (- num-fields 1))) ;; (loop (+ i 1)))) ;; (if (not same) ;; (begin ;; (apply sqlite3:execute stmth (vector->list fromrow)) ;; (hash-table-set! numrecs tablename (+ 1 (hash-table-ref/default numrecs tablename 0))) ;; (set! changed-rows (+ changed-rows 1)))))) ;; fromdat-lst))) ;; (mutex-unlock! *db-transaction-mutex*)) ;; fromdats) ;; ;; (sqlite3:finalize! stmth) ;; (if (member "last_update" field-names) ;; (db:create-trigger db tablename))) ;; )) ;; tbls) ;; (let* ((runtime (- (current-milliseconds) start-time)) ;; (should-print (or ;; (debug:debug-mode 12) ;; (common:low-noise-print 120 "db sync") ;; (> runtime 500)))) ;; low and high sync times treated as separate. ;; (for-each ;; (lambda (dat) ;; (let ((tblname (car dat)) ;; (count (cdr dat))) ;; (set! tot-count (+ tot-count count)))) ;; (sort (hash-table->alist numrecs)(lambda (a b)(> (cdr a)(cdr b)))))) (define (has-last-update dbh tablename) (let* ((has-last #f)) (sqlite3:for-each-row (lambda (name) (if (equal? name "last_update") (set! has-last #t))) dbh |
︙ | ︙ | |||
431 432 433 434 435 436 437 | (mode 'full) (no-update '("keys")) ;; do ) (debug:print 0 *default-log-port* "Doing sync "direction" "destdbfile) (if (not (sqlite3:auto-committing? dbh)) (debug:print 0 *default-log-port* "Skipping sync due to transaction in flight.") (let* ((table-names (map car tables)) | | > | 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 | (mode 'full) (no-update '("keys")) ;; do ) (debug:print 0 *default-log-port* "Doing sync "direction" "destdbfile) (if (not (sqlite3:auto-committing? dbh)) (debug:print 0 *default-log-port* "Skipping sync due to transaction in flight.") (let* ((table-names (map car tables)) (dest-exists (file-exists? destdbfile)) (start-time (current-milliseconds))) (assert dest-exists "FATAL: sync called with non-existant file, "destdbfile) ;; attach the destdbfile ;; for each table ;; insert into dest.<table> select * from src.<table> where last_update>last_update ;; done (debug:print 0 *default-log-port* "Attaching "destdbfile" as auxdb") (sqlite3:execute dbh (conc "ATTACH '"destdbfile"' AS auxdb;")) |
︙ | ︙ | |||
468 469 470 471 472 473 474 | ;; (debug:print 0 *default-log-port* "stmt8="stmt8) ;; (if (sqlite3:auto-committing? dbh) ;; (begin (mutex-lock! *db-transaction-mutex*) (sqlite3:with-transaction dbh (lambda () | | | | | > > | 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 | ;; (debug:print 0 *default-log-port* "stmt8="stmt8) ;; (if (sqlite3:auto-committing? dbh) ;; (begin (mutex-lock! *db-transaction-mutex*) (sqlite3:with-transaction dbh (lambda () (debug:print-info 0 *default-log-port* "Sync from "fromdb table" to "todb table" using INSERT OR UPDATE") (sqlite3:execute dbh stmt1) ;; get all new rows (if (member "last_update" fields) (sqlite3:execute dbh stmt8)) ;; get all updated rows ;; (sqlite3:execute dbh stmt5) ;; (sqlite3:execute dbh stmt4) ;; if it worked this would be better for incremental up ;; (sqlite3:execute dbh stmt6) )) (debug:print 0 *default-log-port* "Synced table "table " in "(- (current-milliseconds) start-ms)"ms") ;; ) (mutex-unlock! *db-transaction-mutex*))) ;; (debug:print 0 *default-log-port* "Skipping sync of table "table" due to transaction in flight.")))) table-names) (sqlite3:execute dbh "DETACH auxdb;") (debug:print-info 0 *default-log-port* "Total sync time: "(- (current-milliseconds) start-time)"ms") -1))) ;; FAILED ATTEMPTS ;; (if (not (has-last-update dbh table)) ;; (sqlite3:execute dbh (conc "ALTER TABLE "table" ADD COLUMN last_update INTEGER;"))) ;; (if (not (has-last-update dbh (conc "auxdb."table))) ;; (sqlite3:execute dbh (conc "ALTER TABLE auxdb."table" ADD COLUMN last_update INTEGER;"))) |
︙ | ︙ | |||
747 748 749 750 751 752 753 754 | (begin (debug:print-info 0 *default-log-port* "Found old test in LAUNCHED state, test-id=" test-id " 1 day since event_time marked") (set! oldlaunched (cons (list test-id run-dir uname testname item-path run-id) oldlaunched))))) stmth3 run-id)))) (list incompleted oldlaunched toplevels))) | > | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 | (begin (debug:print-info 0 *default-log-port* "Found old test in LAUNCHED state, test-id=" test-id " 1 day since event_time marked") (set! oldlaunched (cons (list test-id run-dir uname testname item-path run-id) oldlaunched))))) stmth3 run-id)))) (list incompleted oldlaunched toplevels))) (define (db:set-state-status-by-state-status dbstruct run-id testname currstate currstatus newstate newstatus) ;; clear caches needed (let* ((qry (conc "UPDATE tests SET state=?,status=? WHERE " (if currstate (conc "state='" currstate "' AND ") "") (if currstatus (conc "status='" currstatus "' AND ") "") " run_id=? AND testname LIKE ?;"))) (db:with-db dbstruct run-id #t (lambda (dbdat db) (sqlite3:execute db qry (or newstate currstate "NOT_STARTED") (or newstatus currstate "UNKNOWN") run-id testname))))) ;;====================================================================== ;; db to db sync ;;====================================================================== (define (dbmod:db-to-db-sync src-db dest-db last-update init-proc keys) (if (and (file-exists? src-db) (file-read-access? src-db)) (let* ((d-wr (or (and (file-exists? dest-db) (file-write-access? dest-db)) ;; exists and writable (let* ((dirname (or (pathname-directory dest-db) "."))) (if dirname (file-exists? dirname) (file-write-access? dirname))))) (tables (db:sync-all-tables-list keys)) (sdb (dbmod:safely-open-db src-db init-proc #t)) (ddb (dbmod:safely-open-db dest-db init-proc d-wr))) (dbmod:sync-gasket tables last-update sdb ddb dest-db 'todest)))) ) |
Modified launch.scm from [7e82dfb83e] to [4dac8c9f9e].
︙ | ︙ | |||
739 740 741 742 743 744 745 | ;; NO NEED TO CALL set-state-status-and-roll-up-items HERE, THIS IS DONE IN set-state-status-and-roll-up-items called by tests:test-set-status! ) ) ;; for automated creation of the rollup html file this is a good place... (if (not (equal? item-path "")) | | > | 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 | ;; NO NEED TO CALL set-state-status-and-roll-up-items HERE, THIS IS DONE IN set-state-status-and-roll-up-items called by tests:test-set-status! ) ) ;; for automated creation of the rollup html file this is a good place... (if (not (equal? item-path "")) (tests:summarize-items run-id test-id test-name #f)) ;; BUG was this meant to be the antecnt of the if above? (tests:summarize-test run-id test-id) ;; don't force - just update if no ;; Leave a .final-status file for the top level test (tests:save-final-status run-id test-id) (rmt:update-run-stats run-id (rmt:get-raw-run-stats run-id))) ;; end of let* (mutex-unlock! m) (launch:end-of-run-check run-id ) |
︙ | ︙ | |||
1153 1154 1155 1156 1157 1158 1159 | ;; if have -append-config then read and append here (let ((cfname (args:get-arg "-append-config"))) (if (and cfname (file-read-access? cfname)) (read-config cfname *configdat* #t))) ;; values are added to the hash, no need to do anything special. ;; have config at this time, this is a good place to set params based on config file settings | | > | | > > > | 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 | ;; if have -append-config then read and append here (let ((cfname (args:get-arg "-append-config"))) (if (and cfname (file-read-access? cfname)) (read-config cfname *configdat* #t))) ;; values are added to the hash, no need to do anything special. ;; have config at this time, this is a good place to set params based on config file settings (let* ((dbmode (configf:lookup *configdat* "setup" "dbcache-mode")) (syncmode (configf:lookup *configdat* "setup" "sync-mode"))) (if dbmode (begin (debug:print-info 0 *default-log-port* "Overriding dbmode to "dbmode) (dbcache-mode (string->symbol dbmode)))) (if syncmode (begin (debug:print-info 0 *default-log-port* "Overriding syncmode to "syncmode) (dbfile:sync-method (string->symbol syncmode))))) *toppath*))) (define (get-best-disk confdat testconfig) (let* ((disks (or (and testconfig (hash-table-ref/default testconfig "disks" #f)) (hash-table-ref/default confdat "disks" #f))) (minspace (let ((m (configf:lookup confdat "setup" "minspace"))) |
︙ | ︙ |
Modified megatest.scm from [adec828972] to [4b22029f00].
︙ | ︙ | |||
282 283 284 285 286 287 288 289 290 291 292 293 294 295 | -list-run-time : list time requered to complete runs. It supports following switches -run-patt <patt> -target-patt <patt> -dumpmode <csv,json,plain-text> -list-test-time : list time requered to complete each test in a run. It following following arguments -runname <patt> -target <patt> -dumpmode <csv,json,plain-text> -syscheck : do some very basic checks; write access and space in tmp, home, runs, links and is $DISPLAY valid -list-waivers : dump waivers for specified target, runname, testpatt to stdout Diff report -diff-rep : generate diff report (must include -src-target, -src-runname, -target, -runname and either -diff-email or -diff-html) -src-target <target> -src-runname <target> -diff-email <emails> : comma separated list of email addresses to send diff report | > | 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 | -list-run-time : list time requered to complete runs. It supports following switches -run-patt <patt> -target-patt <patt> -dumpmode <csv,json,plain-text> -list-test-time : list time requered to complete each test in a run. It following following arguments -runname <patt> -target <patt> -dumpmode <csv,json,plain-text> -syscheck : do some very basic checks; write access and space in tmp, home, runs, links and is $DISPLAY valid -list-waivers : dump waivers for specified target, runname, testpatt to stdout -db2db : sync db to db, use -from and -to to specify the databases Diff report -diff-rep : generate diff report (must include -src-target, -src-runname, -target, -runname and either -diff-email or -diff-html) -src-target <target> -src-runname <target> -diff-email <emails> : comma separated list of email addresses to send diff report |
︙ | ︙ | |||
344 345 346 347 348 349 350 351 352 353 354 355 356 357 | "-runstep" "-logpro" "-m" "-rerun" "-days" "-rename-run" "-to" "-dest" "-source" "-time-stamp" ;; values and messages ":category" ":variable" | > | 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 | "-runstep" "-logpro" "-m" "-rerun" "-days" "-rename-run" "-from" "-to" "-dest" "-source" "-time-stamp" ;; values and messages ":category" ":variable" |
︙ | ︙ | |||
448 449 450 451 452 453 454 | "-clean-cache" "-no-cache" "-cache-db" "-cp-eventtime-to-publishtime" "-use-db-cache" "-prepend-contour" | < | 450 451 452 453 454 455 456 457 458 459 460 461 462 463 | "-clean-cache" "-no-cache" "-cache-db" "-cp-eventtime-to-publishtime" "-use-db-cache" "-prepend-contour" ;; misc "-repl" "-lock" "-unlock" "-list-servers" "-kill-servers" "-run-wait" ;; wait on a run to complete (i.e. no RUNNING) |
︙ | ︙ | |||
493 494 495 496 497 498 499 500 501 502 503 504 505 506 | "-create-megatest-area" "-mark-incompletes" "-convert-to-norm" "-convert-to-old" "-import-megatest.db" "-sync-to-megatest.db" "-sync-brute-force" "-logging" "-v" ;; verbose 2, more than normal (normal is 1) "-q" ;; quiet 0, errors/warnings only "-diff-rep" | > | 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 | "-create-megatest-area" "-mark-incompletes" "-convert-to-norm" "-convert-to-old" "-import-megatest.db" "-sync-to-megatest.db" "-db2db" "-sync-brute-force" "-logging" "-v" ;; verbose 2, more than normal (normal is 1) "-q" ;; quiet 0, errors/warnings only "-diff-rep" |
︙ | ︙ | |||
2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 | (set! *didsomething* #t))) (if (args:get-arg "-sync-to") (let ((toppath (launch:setup))) (tasks:sync-to-postgres *configdat* (args:get-arg "-sync-to")) (set! *didsomething* #t))) (if (args:get-arg "-list-test-time") (let* ((toppath (launch:setup))) (task:get-test-times) (set! *didsomething* #t))) (if (args:get-arg "-list-run-time") (let* ((toppath (launch:setup))) | > > > > > > > > > > > > > > > > > > > > > > > > > > | 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 | (set! *didsomething* #t))) (if (args:get-arg "-sync-to") (let ((toppath (launch:setup))) (tasks:sync-to-postgres *configdat* (args:get-arg "-sync-to")) (set! *didsomething* #t))) ;; use with -from and -to ;; (if (args:get-arg "-db2db") (let* ((duh (launch:setup)) (src-db (args:get-arg "-from")) (dest-db (args:get-arg "-to")) (sync-period (args:get-arg "-period")) ;; NOT IMPLEMENTED YET (sync-timeout (args:get-arg "-timeout")) ;; NOT IMPLEMENTED YET (lockfile (conc dest-db".lock")) (keys (db:get-keys #f)) ) (if (and src-db dest-db) (begin (debug:print-info 0 *default-log-port* "Attempting to sync data from "src-db" to "dest-db"...") ;; (if (common:simple-file-lock lockfile) ;; (begin (if (not (file-exists? dest-db)) ;; use copy to get going (file-copy src-db dest-db)) (let ((res (dbmod:db-to-db-sync src-db dest-db 0 (dbfile:db-init-proc) keys))) ;; (common:simple-file-release-lock lockfile) (debug:print 0 *default-log-port* "Synced " res " records from "src-db" to "dest-db))) (debug:print 0 *default-log-port* "Skipping sync, there is a sync in progress.")) (set! *didsomething* #t)) (debug:print 0 *default-log-port* "Usage for -db2db; -to and -from must be specified")) (if (args:get-arg "-list-test-time") (let* ((toppath (launch:setup))) (task:get-test-times) (set! *didsomething* #t))) (if (args:get-arg "-list-run-time") (let* ((toppath (launch:setup))) |
︙ | ︙ |
Modified mt.scm from [9ff41cb92d] to [321551147d].
︙ | ︙ | |||
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | (use sqlite3 srfi-1 posix regex regex-case srfi-69 dot-locking (srfi 18) posix-extras directory-utils call-with-environment-variables) (import (prefix sqlite3 sqlite3:)) (declare (unit mt)) (declare (uses debugprint)) (declare (uses db)) (declare (uses common)) (declare (uses items)) (declare (uses runconfig)) (declare (uses tests)) (declare (uses server)) (declare (uses runs)) (declare (uses rmt)) (declare (uses rmtmod)) (import debugprint | > | > | 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | (use sqlite3 srfi-1 posix regex regex-case srfi-69 dot-locking (srfi 18) posix-extras directory-utils call-with-environment-variables) (import (prefix sqlite3 sqlite3:)) (declare (unit mt)) (declare (uses debugprint)) (declare (uses db)) (declare (uses dbmod)) (declare (uses common)) (declare (uses items)) (declare (uses runconfig)) (declare (uses tests)) (declare (uses server)) (declare (uses runs)) (declare (uses rmt)) (declare (uses rmtmod)) (import debugprint rmtmod dbmod) (include "common_records.scm") (include "key_records.scm") (include "db_records.scm") (include "run_records.scm") (include "test_records.scm") |
︙ | ︙ | |||
188 189 190 191 192 193 194 | (debug:print-info 0 *default-log-port* "TRIGGERED on " trigger ", running command " fullcmd " output at " (get-environment-variable "NBFAKE_LOG")) (process-run fullcmd) (if prev-nbfake-log (setenv "NBFAKE_LOG" prev-nbfake-log) (unsetenv "NBFAKE_LOG")) )) | | | | 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 | (debug:print-info 0 *default-log-port* "TRIGGERED on " trigger ", running command " fullcmd " output at " (get-environment-variable "NBFAKE_LOG")) (process-run fullcmd) (if prev-nbfake-log (setenv "NBFAKE_LOG" prev-nbfake-log) (unsetenv "NBFAKE_LOG")) )) (define (mt:process-triggers run-id test-id newstate newstatus) (if test-id (let* ((test-dat (rmt:get-test-info-by-id run-id test-id))) (if test-dat (let* ((test-rundir (db:test-get-rundir test-dat)) ;; ) ;; ) (test-name (db:test-get-testname test-dat)) (item-path (db:test-get-item-path test-dat)) (duration (db:test-get-run_duration test-dat)) (comment (db:test-get-comment test-dat)) (event-time (db:test-get-event_time test-dat)) |
︙ | ︙ | |||
260 261 262 263 264 265 266 | (define (mt:test-set-state-status-by-id run-id test-id newstate newstatus newcomment) (if (not (and run-id test-id)) (begin (debug:print-error 0 *default-log-port* "bad data handed to mt:test-set-state-status-by-id, run-id=" run-id ", test-id=" test-id ", newstate=" newstate) (print-call-chain (current-error-port)) #f) (begin | < < < < < < < < < < | 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 | (define (mt:test-set-state-status-by-id run-id test-id newstate newstatus newcomment) (if (not (and run-id test-id)) (begin (debug:print-error 0 *default-log-port* "bad data handed to mt:test-set-state-status-by-id, run-id=" run-id ", test-id=" test-id ", newstate=" newstate) (print-call-chain (current-error-port)) #f) (begin (rmt:set-state-status-and-roll-up-items run-id test-id #f newstate newstatus newcomment) #t))) (define (mt:test-set-state-status-by-id-unless-completed run-id test-id newstate newstatus newcomment) (let* ((test-vec (rmt:get-testinfo-state-status run-id test-id)) (state (vector-ref test-vec 3))) (if (equal? state "COMPLETED") |
︙ | ︙ | |||
292 293 294 295 296 297 298 | ;; (mt:process-triggers run-id test-id new-state new-status) #t);) ;;(mt:test-set-state-status-by-id run-id test-id new-state new-status new-comment))) (define (mt:test-set-state-status-by-testname-unless-completed run-id test-name item-path new-state new-status new-comment) (let ((test-id (rmt:get-test-id run-id test-name item-path))) (mt:test-set-state-status-by-id-unless-completed run-id test-id new-state new-status new-comment))) | | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 | ;; (mt:process-triggers run-id test-id new-state new-status) #t);) ;;(mt:test-set-state-status-by-id run-id test-id new-state new-status new-comment))) (define (mt:test-set-state-status-by-testname-unless-completed run-id test-name item-path new-state new-status new-comment) (let ((test-id (rmt:get-test-id run-id test-name item-path))) (mt:test-set-state-status-by-id-unless-completed run-id test-id new-state new-status new-comment))) ;; state and status are extra hints not usually used in the calculation ;; (define (rmt:set-state-status-and-roll-up-items run-id test-name item-path state status comment) (assert (number? run-id) "FATAL: Run id required.") (rmt:client-side-set-state-status-and-roll-up run-id test-name item-path state status comment) ;; (rmtmod:send-receive 'set-state-status-and-roll-up-items run-id (list run-id test-name item-path state status comment)) ) (define (rmt:client-side-set-state-status-and-roll-up run-id test-name item-path state status comment) ;; establish info on incoming test followed by info on top level test ;; BBnote - for mode itemwait, linkage between upstream test & matching item status is propagated to run queue in db:prereqs-not-met (let* ((test-id (if (number? test-name) test-name (db:keep-trying-until-true rmt:get-test-id (list run-id test-name item-path) 10))) ;; (rmt:get-test-id run-id test-name item-path))) (testdat (rmt:get-test-info-by-id run-id test-id)) ;; (test-id (db:test-get-id testdat)) (test-name (if (number? test-name) (db:test-get-testname testdat) test-name)) (item-path (db:test-get-item-path testdat)) (tl-test-id (rmt:get-test-id run-id test-name "")) (tl-testdat (rmt:get-test-info-by-id run-id test-id)) (new-state-eh #f) (new-status-eh #f)) (if (member state '("LAUNCHED" "REMOTEHOSTSTART")) (rmt:general-call 'set-test-start-time run-id test-id)) (let* ((res (begin (rmt:test-set-state-status run-id test-id state status comment) ;; this call sets the item state/status (if (not (equal? item-path "")) ;; only roll up IF incoming test is an item (let* ((state-status-counts (rmt:get-all-state-status-counts-for-test run-id test-name item-path state status)) ;; item-path is used to exclude current state/status of THIS test (state-statuses (db:roll-up-rules state-status-counts state status)) (newstate (car state-statuses)) (newstatus (cadr state-statuses))) (set! new-state-eh newstate) (set! new-status-eh newstatus) (debug:print 4 *default-log-port* "BB> tl-test-id="tl-test-id" ; "test-name":"item-path " newstate="newstate" newstatus="newstatus" len(sscs)="(length state-status-counts) " state-status-counts: " (apply conc (map (lambda (x) (conc (with-output-to-string (lambda () (pp (dbr:counts->alist x)))) " | ")) state-status-counts))); end debug:print (if tl-test-id (rmt:test-set-state-status run-id tl-test-id newstate newstatus #f)) ;; we are still in the transaction - must access the db and not the dbstruct ))))) (if (and test-id state status (equal? status "AUTO")) (rmt:test-data-rollup run-id test-id status)) (if new-state-eh ;; moved from db:test-set-state-status (mt:process-triggers run-id test-id new-state-eh new-status-eh)) res))) ;; select end_time-now from ;; (select testname,item_path,event_time+run_duration as ;; end_time,strftime('%s','now') as now from tests where state in ;; ('RUNNING','REMOTEHOSTSTART','LAUNCHED')); ;; ;; NOT EASY TO MIGRATE TO db{file,mod} ;; (define (rmt:find-and-mark-incomplete-engine run-id ovr-deadtime cfg-deadtime test-stats-update-period) (let* ((incompleted '()) (oldlaunched '()) (toplevels '()) ;; The default running-deadtime is 720 seconds = 12 minutes. ;; "(running-deadtime-default (+ server-start-allowance (* 2 launch-monitor-period)))" = 200 + (2 * (200 + 30 + 30)) (deadtime-trim (or ovr-deadtime cfg-deadtime)) (server-start-allowance 200) (server-overloaded-budget 200) (launch-monitor-off-time (or test-stats-update-period 30)) (launch-monitor-on-time-budget 30) (launch-monitor-period (+ launch-monitor-off-time launch-monitor-on-time-budget server-overloaded-budget)) (remotehoststart-deadtime-default (+ server-start-allowance server-overloaded-budget 30)) (remotehoststart-deadtime (or deadtime-trim remotehoststart-deadtime-default)) (running-deadtime-default (+ server-start-allowance (* 2 launch-monitor-period))) (running-deadtime (or deadtime-trim running-deadtime-default))) ;; two minutes (30 seconds between updates, this leaves 3x grace period) (debug:print-info 4 *default-log-port* "running-deadtime = " running-deadtime) (debug:print-info 4 *default-log-port* "deadtime-trim = " deadtime-trim) (let* ((dat (rmt:get-toplevels-and-incompletes run-id running-deadtime remotehoststart-deadtime))) (set! oldlaunched (list-ref dat 1)) (set! toplevels (list-ref dat 2)) (set! incompleted (list-ref dat 0))) (debug:print-info 18 *default-log-port* "Found " (length oldlaunched) " old LAUNCHED items, " (length toplevels) " old LAUNCHED toplevel tests and " (length incompleted) " tests marked RUNNING but apparently dead.") ;; These are defunct tests, do not do all the overhead of set-state-status. Force them to INCOMPLETE. ;; ;; (db:delay-if-busy dbdat) (let* ((min-incompleted-ids (map car incompleted)) ;; do 'em all (all-ids (append min-incompleted-ids (map car oldlaunched)))) (if (> (length all-ids) 0) (begin ;; (launch:is-test-alive "localhost" 435) (debug:print 0 *default-log-port* "WARNING: Marking test(s); " (string-intersperse (map conc all-ids) ", ") " as DEAD") (for-each (lambda (test-id) (let* ((tinfo (rmt:get-test-info-by-id run-id test-id)) (run-dir (db:test-get-rundir tinfo)) (host (db:test-get-host tinfo)) (pid (db:test-get-process_id tinfo)) (result (rmt:get-status-from-final-status-file run-dir))) (if (and (list? result) (> (length result) 1) (equal? "PASS" (cadr result)) (equal? "COMPLETED" (car result))) (begin (debug:print 0 *default-log-port* "INFO: test " test-id " actually passed, so marking PASS not DEAD") (rmt:set-state-status-and-roll-up-items run-id test-id 'foo "COMPLETED" "PASS" "Test stopped responding but it has PASSED; marking it PASS in the DB.")) (let ((is-alive (and (not (eq? pid 0)) ;; 0 is default in re-used field "attemptnum" where pid stored. (commonmod:is-test-alive host pid)))) (if is-alive (debug:print 0 *default-log-port* "INFO: test " test-id " on host " host " has a process on pid " pid ", NOT setting to DEAD.") (begin (debug:print 0 *default-log-port* "INFO: test " test-id " final state/status is not COMPLETED/PASS. It is " result) (rmt:set-state-status-and-roll-up-items run-id test-id 'foo "COMPLETED" "DEAD" "Test stopped responding while in RUNNING or REMOTEHOSTSTART; presumed dead."))))))) ;; call end of eud of run detection for posthook - from merge, is it needed? ;; (launch:end-of-run-check run-id) all-ids) ))))) (define (mt:lazy-read-test-config test-name) (let ((tconf (hash-table-ref/default *testconfigs* test-name #f))) (if tconf tconf (let ((test-dirs (tests:get-tests-search-path *configdat*))) (let loop ((hed (car test-dirs)) (tal (cdr test-dirs))) |
︙ | ︙ |
Added nmsg-transport.scm version [adedc287f0].
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 | ;; Copyright 2006-2012, Matthew Welland. ;; This file is part of Megatest. ;; ;; Megatest is free software: you can redistribute it and/or modify ;; it under the terms of the GNU General Public License as published by ;; the Free Software Foundation, either version 3 of the License, or ;; (at your option) any later version. ;; ;; Megatest is distributed in the hope that it will be useful, ;; but WITHOUT ANY WARRANTY; without even the implied warranty of ;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ;; GNU General Public License for more details. ;; ;; You should have received a copy of the GNU General Public License ;; along with Megatest. If not, see <http://www.gnu.org/licenses/>. (require-extension (srfi 18) extras tcp s11n) (use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) (import (prefix sqlite3 sqlite3:)) ;; (use nanomsg) (declare (unit nmsg-transport)) (declare (uses common)) (declare (uses db)) (declare (uses tests)) (declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. (declare (uses server)) (include "common_records.scm") (include "db_records.scm") ;; Transition to pub --> sub with pull <-- push ;; ;; 1. client sends request to server via push to the pull port ;; 2. server puts request in queue or processes immediately as appropriate ;; 3. server puts responses from completed requests into pub port ;; ;; TODO ;; ;; Done Tested ;; [x] [ ] 1. Add columns pullport pubport to servers table ;; [x] [ ] 2. Add rm of monitor.db if older than 11/12/2012 ;; [x] [ ] 3. Add create of pullport and pubport with finding of available ports ;; [x] [ ] 4. Add client compose of request ;; [x] [ ] - name of client: testname/itempath-test_id-hostname ;; [x] [ ] - name of request: callname, params ;; [x] [ ] - request key: f(clientname, callname, params) ;; [x] [ ] 5. Add processing of subscription hits ;; [x] [ ] - done when get key ;; [x] [ ] - return results ;; [x] [ ] 6. Add timeout processing ;; [x] [ ] - after 60 seconds ;; [ ] [ ] i. check server alive, connect to new if necessary ;; [ ] [ ] ii. resend request ;; [ ] [ ] 7. Turn self ping back on (define (nmsg-transport:make-server-url hostport #!key (bindall #f)) (if (not hostport) #f (conc "tcp://" (if bindall "*" (car hostport)) ":" (cadr hostport)))) (define *server-loop-heart-beat* (current-seconds)) (define *heartbeat-mutex* (make-mutex)) ;;====================================================================== ;; S E R V E R ;;====================================================================== (define (nmsg-transport:run dbstruct hostn run-id server-id #!key (retrynum 1000)) (debug:print 2 *default-log-port* "Attempting to start the server ...") (let* ((start-port (portlogger:open-run-close portlogger:find-port)) (server-thread (make-thread (lambda () (nmsg-transport:try-start-server dbstruct run-id start-port server-id)) "server thread")) (tdbdat (tasks:open-db))) (thread-start! server-thread) (thread-sleep! 0.1) (if (nmsg-transport:ping hostn start-port timeout: 2 expected-key: (current-process-id)) (let ((interface (if (equal? hostn "-")(get-host-name) hostn))) (tasks:server-set-interface-port (db:delay-if-busy tdbdat) server-id interface start-port) (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "dbprep") (set! *server-info* (list hostn start-port)) ;; probably not needed anymore? currently used by keep-running (thread-sleep! 3) ;; give some margin for queries to complete before switching from file based access to server based access ;; (set! *inmemdb* dbstruct) (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running") (thread-start! (make-thread (lambda ()(nmsg-transport:keep-running server-id run-id)) "keep running")) (thread-join! server-thread)) (if (> retrynum 0) (begin (debug:print 0 *default-log-port* "WARNING: Failed to connect to server (self) on host " hostn ":" start-port ", trying again.") (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id "failed to start, never received server alive signature") (portlogger:open-run-close portlogger:set-failed start-port) (nmsg-transport:run dbstruct hostn run-id server-id)) (begin (debug:print-error 0 *default-log-port* "could not find an open port to start server on. Giving up") (exit 1)))))) (define (nmsg-transport:try-start-server dbstruct run-id portnum server-id) (let ((repsoc (nn-socket 'rep))) (nn-bind repsoc (conc "tcp://*:" portnum)) (let loop ((msg-in (nn-recv repsoc))) (let* ((dat (db:string->obj msg-in transport: 'nmsg))) (debug:print 0 *default-log-port* "server, received: " dat) (let ((result (api:execute-requests dbstruct dat))) (debug:print 0 *default-log-port* "server, sending: " result) (nn-send repsoc (db:obj->string result transport: 'nmsg))) (loop (nn-recv repsoc)))))) ;; all routes though here end in exit ... ;; (define (nmsg-transport:launch run-id) (let* ((tdbdat (tasks:open-db)) (dbstruct (db:setup run-id)) (hostn (or (args:get-arg "-server") "-"))) (set! *run-id* run-id) (set! *inmemdb* dbstruct) ;; with nbfake daemonize isn't really needed ;; ;; (if (args:get-arg "-daemonize") ;; (begin ;; (daemon:ize) ;; (if *alt-log-file* ;; we should re-connect to this port, I think daemon:ize disrupts it ;; (begin ;; (current-error-port *alt-log-file*) ;; (current-output-port *alt-log-file*))))) (if (server:check-if-running run-id) (begin (debug:print-info 0 *default-log-port* "Server for run-id " run-id " already running") (exit 0))) (let loop ((server-id (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id)) (remtries 4)) (if (not server-id) (if (> remtries 0) (begin (thread-sleep! 2) (if (not (server:check-if-running run-id)) (loop (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id) (- remtries 1)) (begin (debug:print-info 0 *default-log-port* "Another server took the slot, exiting") (exit 0)))) (begin ;; since we didn't get the server lock we are going to clean up and bail out (debug:print-info 2 *default-log-port* "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue") (tasks:server-delete-records-for-this-pid (db:delay-if-busy tdbdat) " http-transport:launch") )) ;; locked in a server id, try to start up (nmsg-transport:run dbstruct hostn run-id server-id)) (set! *didsomething* #t) (exit)))) ;;====================================================================== ;; S E R V E R U T I L I T I E S ;;====================================================================== (define (nmsg-transport:mk-signature) (message-digest-string (md5-primitive) (with-output-to-string (lambda () (write (list (current-directory) (argv))))))) ;;====================================================================== ;; C L I E N T S ;;====================================================================== ;; ping the server at host:port ;; return the open socket if successful (return-socket == #t) ;; expect the key expected-key returned in payload ;; send our-key or #f as payload ;; (define (nmsg-transport:ping hostn port #!key (timeout 3)(return-socket #t)(expected-key #f)(our-key #f)(socket #f)) ;; send a random number along with pid and check that we get it back (let* ((host (if (or (not hostn) (equal? hostn "-")) ;; use localhost (get-host-name) hostn)) (req (or socket (let ((soc (nn-socket 'req))) (nn-connect soc (conc "tcp://" host ":" port)) soc))) (success #t) (dat (vector "ping" our-key)) (result (condition-case (nmsg-transport:client-api-send-receive-raw req dat timeout: timeout) ((timeout)(set! success #f) #f))) (key (if success (vector-ref result 1) #f))) (debug:print 0 *default-log-port* "success=" success ", key=" key ", expected-key=" expected-key ", equal? " (equal? key expected-key)) (if (and success (or (not expected-key) ;; just getting a reply is good enough then (equal? key expected-key))) (if return-socket req (begin (if (not socket)(nn-close req)) ;; don't want a side effect of closing socket if handed it #t)) (begin (if (not socket)(nn-close req)) ;; failed to ping, close socket as side effect #f)))) ;; send data to server, wait max of timeout seconds for a response. ;; return #( success/fail result ) ;; ;; for effiency it is easier to do the obj->string and string->obj here. ;; (define (nmsg-transport:client-api-send-receive-raw socreq indat #!key (enable-send #t)(timeout 25)) (let* ((success #f) (result #f) (keepwaiting #t) (dat (db:obj->string indat transport: 'nmsg)) (send-recv (make-thread (lambda () (nn-send socreq dat) (let* ((res (nn-recv socreq))) (set! success #t) (set! result (db:string->obj res transport: 'nmsg)))) "send-recv")) (timeout (make-thread (lambda () (let loop ((count 0)) (thread-sleep! 1) (debug:print-info 1 *default-log-port* "send-receive-raw, still waiting after " count " seconds...") (if (and keepwaiting (< count timeout)) ;; yes, this is very aproximate (loop (+ count 1)))) (if keepwaiting (begin (print "timeout waiting for ping") (thread-terminate! send-recv)))) "timeout"))) ;; replace with condition-case? (handle-exceptions exn (set! result "timeout") (thread-start! timeout) (thread-start! send-recv) (thread-join! send-recv) (if success (thread-terminate! timeout))) ;; raise timeout error if timed out (if success (if (and (vector? result) (vector-ref result 0)) ;; did it fail at the server? result ;; nope, all good (begin (debug:print-error 0 *default-log-port* "error occured at server, info=" (vector-ref result 2)) (debug:print 0 *default-log-port* " client call chain:") (print-call-chain (current-error-port)) (debug:print 0 *default-log-port* " server call chain:") (pp (vector-ref result 1) (current-error-port)) (signal (vector-ref result 0)))) (signal (make-composite-condition (make-property-condition 'timeout 'message "nmsg-transport:client-api-send-receive-raw timed out talking to server")))))) ;; run nmsg-transport:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; (define (nmsg-transport:keep-running server-id run-id) ;; if none running or if > 20 seconds since ;; server last used then start shutdown ;; This thread waits for the server to come alive (let* ((server-info (let loop () (let ((sdat #f)) (mutex-lock! *heartbeat-mutex*) (set! sdat *server-info*) (mutex-unlock! *heartbeat-mutex*) (if sdat (begin (debug:print-info 0 *default-log-port* "keep-running got sdat=" sdat) sdat) (begin (thread-sleep! 0.5) (loop)))))) (iface (car server-info)) (port (cadr server-info)) (last-access 0) (tdbdat (tasks:open-db)) (server-timeout (let ((tmo (configf:lookup *configdat* "server" "timeout"))) (if (and (string? tmo) (string->number tmo)) (* 60 60 (string->number tmo)) ;; (* 3 24 60 60) ;; default to three days (* 60 1) ;; default to one minute ;; (* 60 60 25) ;; default to 25 hours )))) (print "Keep-running got server pid " server-id ", using iface " iface " and port " port) (let loop ((count 0)) (thread-sleep! 4) ;; no need to do this very often ;; NB// sync currently does NOT return queue-length (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1))) ;; (print "Server running, count is " count) (if (< count 1) ;; 3x3 = 9 secs aprox (loop (+ count 1))) (mutex-lock! *heartbeat-mutex*) (set! last-access *last-db-access*) (mutex-unlock! *heartbeat-mutex*) (db:sync-touched *inmemdb* run-id force-sync: #t) (if (and *server-run* (> (+ last-access server-timeout) (current-seconds))) (begin (debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) (loop 0)) (begin (debug:print-info 0 *default-log-port* "Starting to shutdown the server.") (set! *time-to-exit* #t) (db:sync-touched *inmemdb* run-id force-sync: #t) (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id " http-transport:keep-running") (debug:print-info 0 *default-log-port* "Server shutdown complete. Exiting") (exit) )))))) ;;====================================================================== ;; C L I E N T S ;;====================================================================== (define (nmsg-transport:client-connect iface portnum) (let* ((reqsoc (nmsg-transport:ping iface portnum return-socket: #t))) (vector iface portnum #f #f #f (current-seconds) reqsoc))) ;; returns result, there is no sucess/fail flag - handled via excpections ;; (define (nmsg-transport:client-api-send-receive run-id connection-info cmd param #!key (remtries 5)) ;; NB// In the html version of this routine there is a call to ;; tasks:kill-server-run-id when there is an exception (mutex-lock! *http-mutex*) (let* ((packet (vector cmd param)) (reqsoc (http-transport:server-dat-get-socket connection-info)) (res (nmsg-transport:client-api-send-receive-raw reqsoc packet))) ;; (status (vector-ref rawres 0)) ;; (result (vector-ref rawres 1))) (mutex-unlock! *http-mutex*) res)) ;; (vector status (if status (db:string->obj result transport: 'nmsg) result)))) ;;====================================================================== ;; J U N K ;;====================================================================== ;; DO NOT USE ;; (define (nmsg-transport:client-signal-handler signum) (handle-exceptions exn (debug:print 0 *default-log-port* " ... exiting ...") (let ((th1 (make-thread (lambda () (if (not *received-response*) (receive-message* *runremote*))) ;; flush out last call if applicable "eat response")) (th2 (make-thread (lambda () (debug:print-error 0 *default-log-port* "Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.") (thread-sleep! 3) ;; give the flush three seconds to do it's stuff (debug:print 0 *default-log-port* " Done.") (exit 4)) "exit on ^C timer"))) (thread-start! th2) (thread-start! th1) (thread-join! th2)))) |
Added nn-transport.scm version [96f84cfada].
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 | ;;start a server, returns the connection ;; (define (start-nn-server portnum ) (let ((rep (nn-socket 'rep))) (handle-exceptions exn (let ((emsg ((condition-property-accessor 'exn 'message) exn))) (print "ERROR: Failed to start server \"" emsg "\"") (exit 1)) (nn-bind rep (conc "tcp://*:" portnum))) rep)) ;; open connection to server, send message, close connection ;; (define (open-send-close-nn host-port msg attrib #!key (timeout 3) ) ;; default timeout is 3 seconds (let ((req (nn-socket 'req)) (uri (conc "tcp://" host-port)) (res #f) (contacts (alist-ref 'contact attrib)) (mode (alist-ref 'mode attrib))) (handle-exceptions exn (let ((emsg ((condition-property-accessor 'exn 'message) exn))) ;; Send notification (print "ERROR: Failed to connect / send to " uri " message was \"" emsg "\"" ) (if (equal? mode "production") (begin (print " Sending email to contacts : " contacts ) (let ((email-body (mtut:stml->string (s:body (s:p (conc "We could not send messages to the server on " uri "." "Please check if the listner is running. It is possible that the host is overloaded due to which it may take too long to respond. \n Contact your system adminstrator if server load is high." (s:br)" Thank You ") ))))) (sendmail (string-join (string-split contacts ";" )) (conc "[Listner Error] Filed to connect to listner on " uri) email-body use_html: #t))) (print " mode : " mode " Not sending any emails" )) #f) (nn-connect req uri) (print "Connected to the server " ) (nn-send req msg) (print "Request Sent") (let* ((th1 (make-thread (lambda () (let ((resp (nn-recv req))) (nn-close req) (set! res (if (equal? resp "ok") #t #f)))) "recv thread")) (th2 (make-thread (lambda () (thread-sleep! timeout) (thread-terminate! th1)) "timer thread"))) (thread-start! th1) (thread-start! th2) (thread-join! th1) res)))) (define (open-send-receive-nn host-port msg attrib #!key (timeout 3) ) ;; default timeout is 3 seconds (let ((req (nn-socket 'req)) (uri (conc "tcp://" host-port)) (res #f) (contacts (alist-ref 'contact attrib)) (mode (alist-ref 'mode attrib))) (handle-exceptions exn (let ((emsg ((condition-property-accessor 'exn 'message) exn))) ;; Send notification (print "ERROR: Failed to connect / send to " uri " message was \"" emsg "\"" ) (if (equal? mode "production") (begin (print " Sending email to contacts : " contacts ) (let ((email-body (mtut:stml->string (s:body (s:p (conc "We could not send messages to the server on " uri "." "Please check if the listner is running. It is possible that the host is overloaded due to which it may take too long to respond. \n Contact your system adminstrator if server load is high." (s:br)" Thank You ") ))))) (sendmail (string-join (string-split contacts ";" )) (conc "[Listner Error] Filed to connect to listner on " uri) email-body use_html: #t))) (print " mode : " mode " Not sending any emails" )) #f) (nn-connect req uri) (print "Connected to the server " ) (nn-send req msg) (print "Request Sent") ;; receive code here ;;(print (nn-recv req)) (let* ((th1 (make-thread (lambda () (let ((resp (nn-recv req))) (nn-close req) (print resp) (set! res (if (equal? resp "ok") #t #f)))) "recv thread")) (th2 (make-thread (lambda () (thread-sleep! timeout) (thread-terminate! th1)) "timer thread"))) (thread-start! th1) (thread-start! th2) (thread-join! th1) res)))) ((tsend) (if (null? remargs) (print "ERROR: missing data to send to trigger listeners") (let* ((msg (car remargs)) (mtconfdat (simple-setup (args:get-arg "-start-dir"))) (mtconf (car mtconfdat)) (time-out (if (args:get-arg "-time-out") (string->number (args:get-arg "-time-out")) 5)) (listeners (configf:get-section mtconf "listeners")) (user-info (user-information (current-user-id))) (prev-seen (make-hash-table))) ;; catch duplicates (if user-info (begin (for-each (lambda (listener) (let ((host-port (car listener)) (attrib (val->alist (cadr listener)))) (if (and (equal? msg "time-to-die") (not (can-user-kill-listner user-info attrib))) (begin (debug:print-error 0 *default-log-port* "User " (car user-info) " is not allowed to send message '" msg"'") (exit 1))) (print "sending " msg " to " host-port ) (open-send-close-nn host-port msg attrib timeout: time-out ))) listeners)) (begin (debug:print-error 0 *default-log-port* "Could not Identify executing user. Will not send any message") (exit 1)))))) ((tquery) (if (null? remargs) (print "ERROR: missing data to send to trigger listeners") (let* ((msg (car remargs)) (mtconfdat (simple-setup (args:get-arg "-start-dir"))) (mtconf (car mtconfdat)) (time-out (if (args:get-arg "-time-out") (string->number (args:get-arg "-time-out")) 5)) (listeners (configf:get-section mtconf "listeners")) (user-info (user-information (current-user-id))) (prev-seen (make-hash-table))) ;; catch duplicates (if user-info (begin (for-each (lambda (listener) (let ((host-port (car listener)) (attrib (val->alist (cadr listener)))) (if (and (equal? msg "time-to-die") (not (can-user-kill-listner user-info attrib))) (begin (debug:print-error 0 *default-log-port* "User " (car user-info) " is not allowed to send message '" msg"'") (exit 1))) (print "sending " msg " to " host-port ) (open-send-receive-nn host-port msg attrib timeout: time-out ))) listeners)) (begin (debug:print-error 0 *default-log-port* "Could not Identify executing user. Will not send any message") (exit 1)))))) ((tquerylisten) (if (null? remargs) (print "ERROR: useage for tlisten is \"mtutil tlisten portnum\"") (let ((portnum (string->number (car remargs)))) (if (not portnum) (print "ERROR: the portnumber parameter must be a number, you gave: " (car remargs)) (begin (if (not (is-port-in-use portnum)) (let* ((rep (start-nn-server portnum)) (mtconfdat (simple-setup (args:get-arg "-start-dir"))) (mtconf (car mtconfdat)) (contact (configf:lookup mtconf "listener" "owner")) (script (configf:lookup mtconf "listener" "script"))) (print "Listening on port " portnum " for messages.") (set-signal-handler! signal/int (lambda (signum) (set! *time-to-exit* #t) (debug:print-error 0 *default-log-port* "Received signal " signum " sending email befor exiting !!") (let ((email-body (mtut:stml->string (s:body (s:p (conc "Received signal " signum ". Lister has been terminated on host " (get-environment-variable "HOST") ". ")))))) (sendmail contact "Listner has been terminated." email-body use_html: #t)) (exit))) (set-signal-handler! signal/term (lambda (signum) (set! *time-to-exit* #t) (debug:print-error 0 *default-log-port* "Received signal " signum " sending email befor exiting !!") (let ((email-body (mtut:stml->string (s:body (s:p (conc "Received signal " signum ". Lister has been terminated on host " (get-environment-variable "HOST") ". ")))))) (sendmail contact "Listner has been terminated." email-body use_html: #t)) (exit))) ;(set-signal-handler! signal/term special-signal-handler) (let loop ((instr (nn-recv rep))) ;;(nn-send rep "3.9") (with-input-from-pipe (conc "/usr/bin/uptime | cut -d':' -f4 | awk '{print $1}' | cut -d',' -f1") (lambda() (let loop ((inl (read-line))) (if (not (eof-object? inl)) (begin ;;(print "fdk73: " inl ":") ;;(set! current-list-ciaf (append! current-list-ciaf (list (string-substitute "\\s+$" "" inl)))) (nn-send rep inl) (loop(read-line))) )) ) ) ;;(print (isys "/usr/bin/uptime" foreach-stdout-thunk: foreach-stdout)) (let ((ctime (date->string (current-date)))) (if (equal? instr "time-to-die") (begin (debug:print 0 *default-log-port* ctime " received '" instr "'. Time to sucide." ) (let ((pid (current-process-id))) (debug:print 0 *default-log-port* "Killing current process (pid=" pid ")") (system (conc "kill " pid)))) (begin (debug:print 0 *default-log-port* ctime " received " instr ) ;(nn-send rep "ok") (if (not (equal? instr "ping")) (begin (debug:print 0 *default-log-port* ctime " running \"" script " " instr "\"") ;(system (conc script " '" instr "'")) (process-run script (list instr )) (debug:print 0 *default-log-port* ctime " done" )) (begin (if (not (equal? instr "load")) (print "Checking load") ) ) ) ))) (loop (nn-recv rep)))) (print "ERROR: Port " portnum " already in use. Try another port"))))))) ((tlisten) (if (null? remargs) (print "ERROR: useage for tlisten is \"mtutil tlisten portnum\"") (let ((portnum (string->number (car remargs)))) (if (not portnum) (print "ERROR: the portnumber parameter must be a number, you gave: " (car remargs)) (begin (if (not (is-port-in-use portnum)) (let* ((rep (start-nn-server portnum)) (mtconfdat (simple-setup (args:get-arg "-start-dir"))) (mtconf (car mtconfdat)) (contact (configf:lookup mtconf "listener" "owner")) (script (configf:lookup mtconf "listener" "script"))) (print "Listening on port " portnum " for messages.") (set-signal-handler! signal/int (lambda (signum) (set! *time-to-exit* #t) (debug:print-error 0 *default-log-port* "Received signal " signum " sending email befor exiting !!") (let ((email-body (mtut:stml->string (s:body (s:p (conc "Received signal " signum ". Lister has been terminated on host " (get-environment-variable "HOST") ". ")))))) (sendmail contact "Listner has been terminated." email-body use_html: #t)) (exit))) (set-signal-handler! signal/term (lambda (signum) (set! *time-to-exit* #t) (debug:print-error 0 *default-log-port* "Received signal " signum " sending email befor exiting !!") (let ((email-body (mtut:stml->string (s:body (s:p (conc "Received signal " signum ". Lister has been terminated on host " (get-environment-variable "HOST") ". ")))))) (sendmail contact "Listner has been terminated." email-body use_html: #t)) (exit))) ;; (set-signal-handler! signal/term special-signal-handler) (let loop ((instr (nn-recv rep))) (nn-send rep "ok") (let ((ctime (date->string (current-date)))) (if (equal? instr "time-to-die") (begin (debug:print 0 *default-log-port* ctime " received '" instr "'. Time to sucide." ) (let ((pid (current-process-id))) (debug:print 0 *default-log-port* "Killing current process (pid=" pid ")") (system (conc "kill " pid)))) (begin (debug:print 0 *default-log-port* ctime " received " instr ) ;(nn-send rep "ok") (if (not (equal? instr "ping")) (begin (debug:print 0 *default-log-port* ctime " running \"" script " " instr "\"") (system (conc script " '" instr "' &")) ;(process-run script (list instr )) (debug:print 0 *default-log-port* ctime " done" )) (begin (if (not (equal? instr "load")) (print "Checking load") ) ) ) ))) (loop (nn-recv rep)))) (print "ERROR: Port " portnum " already in use. Try another port"))))))) ;; from bard (define (start-nanomsg-server) (let* ((socket (socket-connect "inproc://my-server"))) (begin (set-socket-option socket 'linger 1) (set-socket-option socket 'sndbuf 1024) (set-socket-option socket 'rcvbuf 1024) (loop (let ((msg (receive-message socket))) (if msg (handle-message msg) (exit)))))) |
Modified rmt.scm from [0af3ea0170] to [504c22e294].
︙ | ︙ | |||
433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 | (assert (number? run-id) "FATAL: Run id required.") (rmt:send-receive 'get-count-tests-running run-id (list run-id))) (define (rmt:get-count-tests-running-for-testname run-id testname) (assert (number? run-id) "FATAL: Run id required.") (rmt:send-receive 'get-count-tests-running-for-testname run-id (list run-id testname))) (define (rmt:get-count-tests-running-in-jobgroup run-id jobgroup) (assert (number? run-id) "FATAL: Run id required.") (rmt:send-receive 'get-count-tests-running-in-jobgroup run-id (list run-id jobgroup))) (define (rmt:set-state-status-and-roll-up-run run-id state status) (assert (number? run-id) "FATAL: Run id required.") (rmt:send-receive 'set-state-status-and-roll-up-run run-id (list run-id state status))) (define (rmt:update-pass-fail-counts run-id test-name) (assert (number? run-id) "FATAL: Run id required.") (rmt:general-call 'update-pass-fail-counts run-id test-name test-name test-name)) (define (rmt:top-test-set-per-pf-counts run-id test-name) (assert (number? run-id) "FATAL: Run id required.") | > > > | 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 | (assert (number? run-id) "FATAL: Run id required.") (rmt:send-receive 'get-count-tests-running run-id (list run-id))) (define (rmt:get-count-tests-running-for-testname run-id testname) (assert (number? run-id) "FATAL: Run id required.") (rmt:send-receive 'get-count-tests-running-for-testname run-id (list run-id testname))) (define (rmt:get-all-state-status-counts-for-test run-id test-name item-path item-state-in item-status-in) (rmt:send-receive 'get-all-state-status-counts-for-test run-id (list run-id test-name item-path item-state-in item-status-in))) (define (rmt:get-count-tests-running-in-jobgroup run-id jobgroup) (assert (number? run-id) "FATAL: Run id required.") (rmt:send-receive 'get-count-tests-running-in-jobgroup run-id (list run-id jobgroup))) (define (rmt:set-state-status-and-roll-up-run run-id state status) (assert (number? run-id) "FATAL: Run id required.") (rmt:send-receive 'set-state-status-and-roll-up-run run-id (list run-id state status))) ;; run on client version of set-state-status-and-roll-up-run (define (rmt:update-pass-fail-counts run-id test-name) (assert (number? run-id) "FATAL: Run id required.") (rmt:general-call 'update-pass-fail-counts run-id test-name test-name test-name)) (define (rmt:top-test-set-per-pf-counts run-id test-name) (assert (number? run-id) "FATAL: Run id required.") |
︙ | ︙ | |||
771 772 773 774 775 776 777 | (define (rmt:find-and-mark-incomplete run-id ovr-deadtime) (let* ((cfg-deadtime (configf:lookup-number *configdat* "setup" "deadtime")) (test-stats-update-period (configf:lookup-number *configdat* "setup" "test-stats-update-period"))) (rmt:find-and-mark-incomplete-engine run-id ovr-deadtime cfg-deadtime test-stats-update-period) ;;call end of eud of run detection for posthook (launch:end-of-run-check run-id))) | > > > > > > > > > > > > > > > > > > > > > > > > > > > | 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 | (define (rmt:find-and-mark-incomplete run-id ovr-deadtime) (let* ((cfg-deadtime (configf:lookup-number *configdat* "setup" "deadtime")) (test-stats-update-period (configf:lookup-number *configdat* "setup" "test-stats-update-period"))) (rmt:find-and-mark-incomplete-engine run-id ovr-deadtime cfg-deadtime test-stats-update-period) ;;call end of eud of run detection for posthook (launch:end-of-run-check run-id))) (define (rmt:get-test-id run-id testname itempath) (rmt:send-receive 'get-test-id run-id (list run-id testname itempath))) ;; set tests with state currstate and status currstatus to newstate and newstatus ;; use currstate = #f and or currstatus = #f to apply to any state or status respectively ;; WARNING: SQL injection risk. NB// See new but not yet used "faster" version below ;; ;; AND NOT (item_path='' AND testname in (SELECT DISTINCT testname FROM tests WHERE testname=? AND item_path != ''));"))) ;; (debug:print 0 *default-log-port* "QRY: " qry) ;; (db:delay-if-busy) ;; ;; NB// This call only operates on toplevel tests. Consider replacing it with more general call ;; (define (rmt:set-tests-state-status run-id testnames currstate currstatus newstate newstatus) (let ((test-ids '())) (for-each (lambda (testname) (let ((test-id (rmt:get-test-id run-id testname ""))) (rmt:set-state-status-by-state-status run-id testname currstate currstatus newstate newstatus) (if test-id (begin (set! test-ids (cons test-id test-ids)) (mt:process-triggers run-id test-id newstate newstatus))))) testnames) test-ids)) |
Modified rmtmod.scm from [c4f748fe17] to [0f731a3019].
︙ | ︙ | |||
149 150 151 152 153 154 155 | ;; (open-test-db test-path))) ;; WARNING: This currently bypasses the transaction wrapped writes system (define (rmt:test-set-state-status-by-id run-id test-id newstate newstatus newcomment) (assert (number? run-id) "FATAL: Run id required.") (rmtmod:send-receive 'test-set-state-status-by-id run-id (list run-id test-id newstate newstatus newcomment))) | | | | | 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 | ;; (open-test-db test-path))) ;; WARNING: This currently bypasses the transaction wrapped writes system (define (rmt:test-set-state-status-by-id run-id test-id newstate newstatus newcomment) (assert (number? run-id) "FATAL: Run id required.") (rmtmod:send-receive 'test-set-state-status-by-id run-id (list run-id test-id newstate newstatus newcomment))) ;; (define (rmt:set-tests-state-status run-id testnames currstate currstatus newstate newstatus) ;; (assert (number? run-id) "FATAL: Run id required.") ;; (rmtmod:send-receive 'set-tests-state-status run-id (list run-id testnames currstate currstatus newstate newstatus))) (define (rmt:get-tests-for-run run-id testpatt states statuses offset limit not-in sort-by sort-order qryvals last-update mode) (assert (number? run-id) "FATAL: Run id required.") ;; (if (number? run-id) (rmtmod:send-receive 'get-tests-for-run run-id (list run-id testpatt states statuses offset limit not-in sort-by sort-order qryvals last-update mode))) ;; (begin ;; (debug:print-error 0 *default-log-port* "rmt:get-tests-for-run called with bad run-id=" run-id) |
︙ | ︙ | |||
175 176 177 178 179 180 181 | (assert (number? run-id) "FATAL: Run id required.") (rmtmod:send-receive 'synchash-get run-id (list run-id proc synckey keynum params))) (define (rmt:get-tests-for-run-mindata run-id testpatt states status not-in) (assert (number? run-id) "FATAL: Run id required.") (rmtmod:send-receive 'get-tests-for-run-mindata run-id (list run-id testpatt states status not-in))) | < < < < < < < | < < < < < < < < < < < < < < < < < < < < < < < | < < | < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < | < | 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 | (assert (number? run-id) "FATAL: Run id required.") (rmtmod:send-receive 'synchash-get run-id (list run-id proc synckey keynum params))) (define (rmt:get-tests-for-run-mindata run-id testpatt states status not-in) (assert (number? run-id) "FATAL: Run id required.") (rmtmod:send-receive 'get-tests-for-run-mindata run-id (list run-id testpatt states status not-in))) ;;====================================================================== ;; Maintenance ;;====================================================================== (define (rmt:get-toplevels-and-incompletes run-id running-deadtime remotehoststart-deadtime) (rmtmod:send-receive 'get-toplevels-and-incompletes run-id (list run-id running-deadtime remotehoststart-deadtime))) (define (rmt:get-status-from-final-status-file run-dir) (let ((infile (conc run-dir "/.final-status"))) ;; first verify we are able to write the output file (if (not (file-read-access? infile)) (begin (debug:print 2 *default-log-port* "ERROR: cannot read " infile) (debug:print 2 *default-log-port* "ERROR: run-dir is " run-dir) #f ) (with-input-from-file infile read-lines) ))) (define (rmt:set-state-status-by-state-status run-id testname currstate currstatus newstate newstatus) (rmtmod:send-receive 'set-state-status-by-state-status run-id (list run-id testname currstate currstatus newstate newstatus))) ) |
Modified runs.scm from [03c1310750] to [db15d8bb8d].
︙ | ︙ | |||
812 813 814 815 816 817 818 | ;; NOTE: these are all parent tests, items are not expanded yet. (debug:print-info 4 *default-log-port* "test-records=" (hash-table->alist test-records)) (let ((reglen (configf:lookup *configdat* "setup" "runqueue"))) (if (> (length (hash-table-keys test-records)) 0) (let* ((keep-going #t) (run-queue-retries 5) (run-ids (rmt:get-all-run-ids))) | | | 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 | ;; NOTE: these are all parent tests, items are not expanded yet. (debug:print-info 4 *default-log-port* "test-records=" (hash-table->alist test-records)) (let ((reglen (configf:lookup *configdat* "setup" "runqueue"))) (if (> (length (hash-table-keys test-records)) 0) (let* ((keep-going #t) (run-queue-retries 5) (run-ids (rmt:get-all-run-ids))) #;(for-each (lambda (run-id) (if keep-going (handle-exceptions exn (debug:print 0 *default-log-port* "error in calling find-and-mark-incomplete for run-id " run-id ", exn=" exn) (rmt:find-and-mark-incomplete run-id #f)))) ;; ovr-deadtime))) ;; could be root of https://hsdes.intel.com/appstore/article/#/220546828/main -- Title: Megatest jobs show DEAD even though they are still running (1.64/27) run-ids) (runs:run-tests-queue run-id runname test-records keyvals flags test-patts required-tests |
︙ | ︙ |
Modified server.scm from [39953c681c] to [f67a195591].
︙ | ︙ | |||
735 736 737 738 739 740 741 | (define (server:expiration-timeout) (let* ((tmo (configf:lookup *configdat* "server" "timeout"))) (if (string? tmo) (let* ((num (string->number tmo))) (if num (* 3600 num) (common:hms-string->seconds tmo))) | | | 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 | (define (server:expiration-timeout) (let* ((tmo (configf:lookup *configdat* "server" "timeout"))) (if (string? tmo) (let* ((num (string->number tmo))) (if num (* 3600 num) (common:hms-string->seconds tmo))) 10 ;; this is the default ))) (define (server:get-best-guess-address hostname) (let ((res #f)) (for-each (lambda (adr) (if (not (eq? (u8vector-ref adr 0) 127)) |
︙ | ︙ |
Modified tcp-transportmod.scm from [9c6068b733] to [9362adaf8f].
︙ | ︙ | |||
105 106 107 108 109 110 111 | (last-access (current-seconds)) (servinf-file #f) (last-serv-start 0) ) ;; parameters ;; | | > > > > | 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 | (last-access (current-seconds)) (servinf-file #f) (last-serv-start 0) ) ;; parameters ;; (define tt-server-timeout-param (make-parameter 10)) ;; make ttdat visible (define *server-info* #f) (define (tt:make-remote areapath) (make-tt areapath: areapath)) ;; 1 ... or #f ;; and check that dbfname matches. FIXME: the propagation of dbfname and run-id ;; might not make the best sense ;; (define (tt:valid-run-id run-id dbfname) (and (or (number? run-id) (not run-id)) (equal? (dbfile:run-id->dbfname run-id) dbfname))) (tcp-buffer-size 2048) ;; (max-connections 4096) ;;====================================================================== ;; Client side ;;====================================================================== ;; do all the busy work of finding and setting up conn for ;; connecting to a server ;; (define (tt:client-connect-to-server ttdat dbfname run-id testsuite) (assert (tt:valid-run-id run-id dbfname) "FATAL: invalid run-id "run-id) (let* ((conn (hash-table-ref/default (tt-conns ttdat) dbfname #f)) |
︙ | ︙ | |||
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 | conn) ((starting) (thread-sleep! 0.5) (tt:client-connect-to-server ttdat dbfname run-id testsuite)) (else (let* ((curr-secs (current-seconds))) ;; rm the (last server) would go here (if (> (- curr-secs (tt-last-serv-start ttdat)) 10) (begin (tt-last-serv-start-set! ttdat curr-secs) (server-start-proc))) ;; start server if 30 sec since last attempt (thread-sleep! 1) (tt:client-connect-to-server ttdat dbfname run-id testsuite))))))) (else ;; no good server found, if haven't started server in > 5 secs, start another (if (> (- (current-seconds) (tt-last-serv-start ttdat)) 5) ;; BUG - grow this number really do not want to swamp the machine with servers (begin (debug:print-info 0 *default-log-port* "No server found. Starting one for run-id "run-id" in dbfile "dbfname) (server-start-proc) (tt-last-serv-start-set! ttdat (current-seconds)))) (thread-sleep! 1) (tt:client-connect-to-server ttdat dbfname run-id testsuite))))))) (define (tt:ping host port server-id #!optional (tries-left 5)) (let* ((res (tt:send-receive-direct host port `(ping #f #f #f) ping-mode: #t)) ;; please send me your server-id (try-again (lambda () (if (> tries-left 0) (begin | > > | 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 | conn) ((starting) (thread-sleep! 0.5) (tt:client-connect-to-server ttdat dbfname run-id testsuite)) (else (let* ((curr-secs (current-seconds))) ;; rm the (last server) would go here (hash-table-delete! (tt-conns ttdat) dbfname) (if (> (- curr-secs (tt-last-serv-start ttdat)) 10) (begin (tt-last-serv-start-set! ttdat curr-secs) (server-start-proc))) ;; start server if 30 sec since last attempt (thread-sleep! 1) (tt:client-connect-to-server ttdat dbfname run-id testsuite))))))) (else ;; no good server found, if haven't started server in > 5 secs, start another (if (> (- (current-seconds) (tt-last-serv-start ttdat)) 5) ;; BUG - grow this number really do not want to swamp the machine with servers (begin (debug:print-info 0 *default-log-port* "No server found. Starting one for run-id "run-id" in dbfile "dbfname) (server-start-proc) (tt-last-serv-start-set! ttdat (current-seconds)))) (thread-sleep! 1) (hash-table-delete! (tt-conns ttdat) dbfname) (tt:client-connect-to-server ttdat dbfname run-id testsuite))))))) (define (tt:ping host port server-id #!optional (tries-left 5)) (let* ((res (tt:send-receive-direct host port `(ping #f #f #f) ping-mode: #t)) ;; please send me your server-id (try-again (lambda () (if (> tries-left 0) (begin |
︙ | ︙ |
Modified transport-mode.scm.template from [7b4174ac3b] to [317846bf53].
︙ | ︙ | |||
11 12 13 14 15 16 17 | ;; uncomment this block to test without tcp or inmem ;; (dbfile:sync-method 'none) ;; (dbfile:cache-method 'none) ;; (rmt:transport-mode 'nfs) ;; uncomment this block to test with tcp and inmem | | | | 11 12 13 14 15 16 17 18 19 20 21 22 | ;; uncomment this block to test without tcp or inmem ;; (dbfile:sync-method 'none) ;; (dbfile:cache-method 'none) ;; (rmt:transport-mode 'nfs) ;; uncomment this block to test with tcp and inmem (dbfile:sync-method 'original) ;; 'original) ;; attach) (dbfile:cache-method 'tmp) ;; 'inmem) (rmt:transport-mode 'tcp) |