Index: Makefile ================================================================== --- Makefile +++ Makefile @@ -35,14 +35,18 @@ # module source files MSRCFILES = dbfile.scm debugprint.scm mtargs.scm commonmod.scm dbmod.scm \ tcp-transportmod.scm rmtmod.scm portlogger.scm transport-mode.scm : transport-mode.scm.template - cp transport-mode.scm.template transport-mode.scm + @if [[ -e transport-mode.scm ]];then \ + echo "WARNING: transport-mode.scm.template is newer than transport-mode.scm"; else \ + cp transport-mode.scm.template transport-mode.scm; fi dashboard-transport-mode.scm : dashboard-transport-mode.scm.template - cp dashboard-transport-mode.scm.template dashboard-transport-mode.scm + @if [[ -e dashboard-transport-mode.scm ]];then \ + echo "WARNING: dashboard-transport-mode.scm.template is newer than dashboard-transport-mode.scm"; else \ + cp dashboard-transport-mode.scm.template dashboard-transport-mode.scm; fi megatest.scm : transport-mode.scm dashboard.scm : dashboard-transport-mode.scm # dbmod.import.o is just a hack here Index: TODO ================================================================== --- TODO +++ TODO @@ -57,5 +57,29 @@ .. [ run-id.db inmemdb last-mod last-read last-sync inuse ] . Re-work all queries to use run-id to dereference server . Open main.db directly in calls to -runtests etc. No need to talk remote? . remove common:faux-lock +db:get-test-info-by-id +db:get-test-state-status-by-id +db:get-test-info - do a get id by name/item-path + cache the id- + use test id plus run id to get from cache + +need to do db:get-test-info-db +look at html gen for items - rollup needs deduplication nonoverlap + +;; cache write these with transaction +db:teststep-set-status! +db:test-set-top-process-id + +;; called a lot, maybe from rollup? +db:get-all-state-status-counts-for-test + +;; load to move from server to client +tests:summarize-items ;; appears to be on client +tests:summarize-tests + +;; converting rmt:set-tests-state-status + +1. db:get-test-id needs rmt equiv +2. Index: api.scm ================================================================== --- api.scm +++ api.scm @@ -319,11 +319,12 @@ ((set-state-status-and-roll-up-run) (apply db:set-state-status-and-roll-up-run dbstruct params)) ((top-test-set-per-pf-counts) (apply db:top-test-set-per-pf-counts dbstruct params)) ((test-set-archive-block-id) (apply db:test-set-archive-block-id dbstruct params)) ((insert-test) (db:insert-test dbstruct run-id params)) - + ((set-state-status-by-state-status) (apply db:set-state-status-by-state-status dbstruct params)) + ;; RUNS ((register-run) (apply db:register-run dbstruct params)) ((set-tests-state-status) (apply db:set-tests-state-status dbstruct params)) ((delete-run) (apply db:delete-run dbstruct params)) ((lock/unlock-run) (apply db:lock/unlock-run dbstruct params)) @@ -397,10 +398,11 @@ ((get-test-state-status-by-id) (apply db:get-test-state-status-by-id dbstruct params)) ((test-get-rundir-from-test-id) (apply db:test-get-rundir-from-test-id dbstruct params)) ((get-count-tests-running-for-testname) (apply db:get-count-tests-running-for-testname dbstruct params)) ((get-count-tests-running) (apply db:get-count-tests-running dbstruct params)) ((get-count-tests-running-in-jobgroup) (apply db:get-count-tests-running-in-jobgroup dbstruct params)) + ((get-all-state-status-counts-for-test) (apply db:get-all-state-status-counts-for-test dbstruct params)) ;; ((delete-test-step-records) (apply db:delete-test-step-records dbstruct params)) ;; ((get-previous-test-run-record) (apply db:get-previous-test-run-record dbstruct params)) ((get-matching-previous-test-run-records)(apply db:get-matching-previous-test-run-records dbstruct params)) ((test-get-logfile-info) (apply db:test-get-logfile-info dbstruct params)) ((test-get-records-for-index-file) (apply db:test-get-records-for-index-file dbstruct params)) Index: common.scm ================================================================== --- common.scm +++ common.scm @@ -170,11 +170,10 @@ ;; (define *db-multi-sync-mutex* (make-mutex)) ;; protect access to *db-sync-in-progress*, *db-last-sync* ;; task db (define *task-db* #f) ;; (vector db path-to-db) (define *db-access-allowed* #t) ;; flag to allow access ;; (define *db-access-mutex* (make-mutex)) ;; moved to dbfile -;; (define *db-transaction-mutex* (make-mutex)) (define *db-cache-path* #f) ;; (define *db-with-db-mutex* (make-mutex)) (define *db-api-call-time* (make-hash-table)) ;; hash of command => (list of times) ;; SERVER Index: dashboard.scm ================================================================== --- dashboard.scm +++ dashboard.scm @@ -227,10 +227,12 @@ (define (dboard:common-set-tabdat! commondat tabnum tabdat) (hash-table-set! (dboard:commondat-tabdats commondat) tabnum tabdat)) + +(define *updaters-running* #f) ;; gets and calls updater list based on curr-tab-num ;; (define (dboard:common-run-curr-updaters commondat #!key (tab-num #f)) ;; (sync-db-to-tmp (dboard:common-get-tabdat commondat tab-num: tab-num)) ;; no longer applies @@ -241,16 +243,51 @@ (if (dboard:common-get-tabdat commondat tab-num: tab-num) ;; only update if there is a tabdat (let* ((tnum (or tab-num (dboard:commondat-curr-tab-num commondat))) (updaters (hash-table-ref/default (dboard:commondat-updaters commondat) tnum '()))) - (debug:print 4 *default-log-port* "Found these updaters: " updaters " for tab-num: " tnum) - (for-each ;; perform the function calls for the complete updaters list - (lambda (updater) - ;; (debug:print 3 *default-log-port* "Running " updater) - (updater)) - updaters)))) + (if *updaters-running* + (debug:print 0 *default-log-port* "updaters still running.") + (let* ((th1 (make-thread (lambda () + (debug:print 4 *default-log-port* "Found these updaters: " updaters " for tab-num: " tnum) + (for-each ;; perform the function calls for the complete updaters list + (lambda (updater) + (let ((start-ms (current-milliseconds))) + (debug:print 0 *default-log-port* "Running updater for tnum "tnum", "updater) + (updater) + (debug:print 0 *default-log-port* "Running updater for tnum "tnum" took "(- (current-milliseconds) + start-ms)"ms") + )) + updaters)) + "updaters thread")) + (th2 (make-thread (lambda () + (let loop () + (case (thread-state th1) + ((terminated) + (debug:print 0 *default-log-port* "th1 terminated, all done for now.")) + ((running) + (debug:print 0 *default-log-port* "th1 running, suspending now.") + (thread-suspend! th1) + (thread-sleep! 0.1) + (loop)) + ((sleeping) + (debug:print 0 *default-log-port* "th1 sleeping, resuming now.") + (thread-resume! th1) + (thread-sleep! 0.9) + (loop)) + ((dead) + (debug:print 0 *default-log-port* "th1 "(thread-state th1)", what's next?")) + (else + (debug:print 0 *default-log-port* "th1 "(thread-state th1)", what's next?") + (thread-sleep! 0.5) + (loop)))))))) + (set! *updaters-running* #t) + (thread-start! th1) + (thread-sleep! 0.1) + (thread-start! th2) + (thread-join! th1) + (set! *updaters-running* #f)))))) ;; if tab-num passed in then use it, otherwise look in commondat at curr-tab-num ;; adds the updater passed in the updaters list at that hashkey ;; (define (dboard:commondat-add-updater commondat updater #!key (tab-num #f)) Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -2274,45 +2274,10 @@ (lambda () (sqlite3:execute db "DELETE FROM test_steps WHERE test_id IN (SELECT id FROM tests WHERE state='DELETED' AND event_time tries 0) - (begin - (thread-sleep! 1) - (db:keep-trying-until-true proc params (- tries 1))) - (begin - ;; (debug:print-info 0 *default-log-port* "proc never returned true, params="params) - (print"db:keep-trying-until-true proc never returned true, proc = " proc " params =" params " tries = " tries) - #f))))) - (define (db:get-test-info dbstruct run-id test-name item-path) - (db:with-db - dbstruct - run-id - #f - (lambda (dbdat db) - (db:get-test-info-db db run-id test-name item-path)))) + (let* ((test-id (db:get-test-id dbstruct run-id test-name item-path))) + (db:get-test-info-by-id dbstruct run-id test-id))) +;; (db:with-db +;; dbstruct +;; run-id +;; #f +;; (lambda (dbdat db) +;; (db:get-test-info-db db run-id test-name item-path)))) (define (db:get-test-info-db db run-id test-name item-path) (let ((res #f)) (sqlite3:for-each-row (lambda (a . b) @@ -3099,25 +3077,10 @@ (print-call-chain (current-error-port)) msg))) ;; crude reply for when things go awry ((zmq nmsg)(with-input-from-string msg (lambda ()(deserialize)))) (else msg))) ;; rpc -;; ; This is to be the big daddy call NOPE: Replaced by db:set-state-status-and-roll-up-items -;; ; -;; define (db:test-set-state-status dbstruct run-id test-id state status msg) -;; (let ((dbdat (db:get-subdb dbstruct run-id))) -;; (if (member state '("LAUNCHED" "REMOTEHOSTSTART")) -;; (db:general-call dbdat 'set-test-start-time (list test-id))) -;; ;; (if msg -;; ;; (db:general-call dbdat 'state-status-msg (list state status msg test-id)) -;; ;; (db:general-call dbdat 'state-status (list state status test-id))) -;; (db:set-state-status-and-roll-up-items dbstruct run-id test-id #f state status msg) -;; ;; process the test_data table -;; (if (and test-id state status (equal? status "AUTO")) -;; (db:test-data-rollup dbstruct run-id test-id status)) -;; (mt:process-triggers dbstruct run-id test-id state status))) - ;; state is the priority rollup of all states ;; status is the priority rollup of all completed statesfu ;; ;; if test-name is an integer work off that as test-id instead of test-name test-path ;; @@ -3152,17 +3115,18 @@ db (lambda () ;; NB// Pass the db so it is part fo the transaction (db:test-set-state-status-db db run-id test-id state status comment) ;; this call sets the item state/status (if (not (equal? item-path "")) ;; only roll up IF incoming test is an item - (let* ((state-status-counts (db:get-all-state-status-counts-for-test db run-id test-name item-path state status)) ;; item-path is used to exclude current state/status of THIS test + (let* ((state-status-counts (db:get-all-state-status-counts-for-test-db db run-id test-name item-path state status)) ;; item-path is used to exclude current state/status of THIS test (state-statuses (db:roll-up-rules state-status-counts state status)) (newstate (car state-statuses)) (newstatus (cadr state-statuses))) (set! new-state-eh newstate) (set! new-status-eh newstatus) - (debug:print 4 *default-log-port* "BB> tl-test-id="tl-test-id" ; "test-name":"item-path" newstate="newstate" newstatus="newstatus" len(sscs)="(length state-status-counts) " state-status-counts: " + (debug:print 4 *default-log-port* "BB> tl-test-id="tl-test-id" ; "test-name":"item-path + " newstate="newstate" newstatus="newstatus" len(sscs)="(length state-status-counts) " state-status-counts: " (apply conc (map (lambda (x) (conc (with-output-to-string (lambda () (pp (dbr:counts->alist x)))) " | ")) state-status-counts))); end debug:print @@ -3265,15 +3229,20 @@ (db:with-db dbstruct #f #f (lambda (dbdat db) (db:get-all-state-status-counts-for-run-db dbdat db run-id)))) -;; BBnote: db:get-all-state-status-counts-for-test returns dbr:counts object aggregating state and status of items of a given test, *not including rollup state/status* +(define (db:get-all-state-status-counts-for-test dbstruct run-id test-name item-path item-state-in item-status-in) + (db:with-db + dbstruct run-id #f + (lambda (dbdat db) + (db:get-all-state-status-counts-for-test-db db run-id test-name item-path item-state-in item-status-in)))) + ;; BBnote: db:get-all-state-status-counts-for-test returns dbr:counts object aggregating state and status of items of a given test, *not including rollup state/status* ;; ;; NOTE: This is called within a transaction ;; -(define (db:get-all-state-status-counts-for-test db run-id test-name item-path item-state-in item-status-in) +(define (db:get-all-state-status-counts-for-test-db db run-id test-name item-path item-state-in item-status-in) (let* ((test-info (db:get-test-info-db db run-id test-name item-path)) (item-state (or item-state-in (db:test-get-state test-info))) (item-status (or item-status-in (db:test-get-status test-info))) (other-items-count-recs (sqlite3:map-row (lambda (state status count) Index: dbmod.scm ================================================================== --- dbmod.scm +++ dbmod.scm @@ -29,12 +29,14 @@ (import scheme chicken data-structures extras + files (prefix sqlite3 sqlite3:) + matchable posix typed-records srfi-1 srfi-18 srfi-69 @@ -155,10 +157,26 @@ (handler (sqlite3:make-busy-timeout 136000))) (sqlite3:set-busy-handler! db handler) (if write-access (init-proc db)) db)))) + +;; try every second until tries times proc +;; +(define (db:keep-trying-until-true proc params tries) + (let* ((res (apply proc params))) + (if res + res + (if (> tries 0) + (begin + (thread-sleep! 1) + (db:keep-trying-until-true proc params (- tries 1))) + (begin + ;; (debug:print-info 0 *default-log-port* "proc never returned true, params="params) + (print"db:keep-trying-until-true proc never returned true, proc = " proc " params =" params " tries = " tries) + #f))))) + (define *sync-in-progress* #f) ;; Open the inmem db and the on-disk db ;; populate the inmem db with data @@ -207,12 +225,13 @@ (if *sync-in-progress* (debug:print 3 *default-log-port* "WARNING: overlapping calls to sync to disk") (begin (mutex-lock! *db-with-db-mutex*) ;; this mutex is used when overloaded or during a query that modifies the db (set! *sync-in-progress* #t) - (dbmod:sync-gasket tables last-update inmem db - dbfullname syncdir) + #;(dbmod:sync-gasket tables last-update inmem db + dbfullname syncdir) + (system (conc "megatest -db2db -from "tmpdb" -to "dbfname"&")) (mutex-unlock! *db-with-db-mutex*) (thread-sleep! 0.5) ;; ensure at least 1/2 second down time between sync calls (set! *sync-in-progress* #f))))) ;; (dbmod:sync-tables tables #f db inmem) ;; (if db @@ -226,16 +245,17 @@ ;; direction: 'fromdest 'todest ;; (define (dbmod:sync-gasket tables last-update inmem dbh dbfname direction) (assert (sqlite3:database? inmem) "FATAL: sync-gasket: inmem is not a db") - (assert (sqlite3:database? inmem) "FATAL: sync-gasket: dbh is not a db") + (assert (sqlite3:database? dbh) "FATAL: sync-gasket: dbh is not a db") + (debug:print-info 0 *default-log-port* "Db sync using "(dbfile:sync-method)" method") (case (dbfile:sync-method) ((none) #f) ((attach) (dbmod:attach-sync tables inmem dbfname direction)) - ((newsync) + ((newsync) ;; DON'T USE THIS ONE. IT IS BORKED (dbmod:new-sync tables inmem dbh dbfname direction)) (else (case direction ((todisk) (dbmod:sync-tables tables last-update inmem dbh) @@ -278,140 +298,244 @@ ;; Use (db:sync-all-tables-list keys) to get the tbls input ;; (define (dbmod:sync-tables tbls last-update fromdb todb) (assert (sqlite3:database? fromdb) "FATAL: dbmod:sync-tables called with fromdb not a database" fromdb) (assert (sqlite3:database? todb) "FATAL: dbmod:sync-tables called with fromdb not a database" todb) - (let ((stmts (make-hash-table)) ;; table-field => stmt + (let ((specials '(("keys" . "fieldname") + ("meta" . "var"))) + (stmts (make-hash-table)) ;; table-field => stmt (all-stmts '()) ;; ( ( stmt1 value1 ) ( stml2 value2 )) (numrecs (make-hash-table)) (start-time (current-milliseconds)) (tot-count 0)) (for-each ;; table (lambda (tabledat) - (let* ((tablename (car tabledat)) - (fields (cdr tabledat)) - (has-last-update (member "last_update" fields)) - (use-last-update (dbmod:calc-use-last-update has-last-update fields last-update)) - (last-update-value (if use-last-update ;; no need to check for has-last-update - it is already accounted for - (if (number? last-update) - last-update - (cdr last-update)) - #f)) - (last-update-field (if use-last-update - (if (number? last-update) - "last_update" - (car last-update)) - #f)) - (num-fields (length fields)) - (field->num (make-hash-table)) - (num->field (apply vector (map car fields))) ;; BBHERE - (full-sel (conc "SELECT " (string-intersperse (map car fields) ",") - " FROM " tablename (if use-last-update ;; apply last-update criteria - (conc " WHERE " last-update-field " >= " last-update-value) - "") - ";")) - (full-ins (conc "INSERT OR REPLACE INTO " tablename " ( " (string-intersperse (map car fields) ",") " ) " - " VALUES ( " (string-intersperse (make-list num-fields "?") ",") " );")) - (fromdat '()) - (fromdats '()) - (totrecords 0) - (batch-len 100) ;; (string->number (or (configf:lookup *configdat* "sync" "batchsize") "100"))) - (todat (make-hash-table)) - (count 0) - (field-names (map car fields))) - - ;; set up the field->num table - (for-each - (lambda (field) - (hash-table-set! field->num field count) - (set! count (+ count 1))) - fields) - - ;; read the source table - ;; store a list of all rows in the table in fromdat, up to batch-len. - ;; Then add fromdat to the fromdats list, clear fromdat and repeat. - (sqlite3:for-each-row - (lambda (a . b) - (set! fromdat (cons (apply vector a b) fromdat)) - (if (> (length fromdat) batch-len) - (begin - (set! fromdats (cons fromdat fromdats)) - (set! fromdat '()) - (set! totrecords (+ totrecords 1))))) - fromdb - full-sel) - - ;; Count less than batch-len as a record - (if (> (length fromdat) 0) - (set! totrecords (+ totrecords 1))) - - ;; tack on remaining records in fromdat - (if (not (null? fromdat)) - (set! fromdats (cons fromdat fromdats))) - - (sqlite3:for-each-row - (lambda (a . b) - (hash-table-set! todat a (apply vector a b))) - todb - full-sel) - - ;; first pass implementation, just insert all changed rows - - (let* ((db todb) - (drp-trigger (if (member "last_update" field-names) - (db:drop-trigger db tablename) - #f)) - (has-last-update (member "last_update" field-names)) - (is-trigger-dropped (if has-last-update - (db:is-trigger-dropped db tablename) - #f)) - (stmth (sqlite3:prepare db full-ins)) - (changed-rows 0)) - (for-each - (lambda (fromdat-lst) - (mutex-lock! *db-transaction-mutex*) - (sqlite3:with-transaction - db - (lambda () - (for-each ;; - (lambda (fromrow) - (let* ((a (vector-ref fromrow 0)) - (curr (hash-table-ref/default todat a #f)) - (same #t)) - (let loop ((i 0)) - (if (or (not curr) - (not (equal? (vector-ref fromrow i)(vector-ref curr i)))) - (set! same #f)) - (if (and same - (< i (- num-fields 1))) - (loop (+ i 1)))) - (if (not same) - (begin - (apply sqlite3:execute stmth (vector->list fromrow)) - (hash-table-set! numrecs tablename (+ 1 (hash-table-ref/default numrecs tablename 0))) - (set! changed-rows (+ changed-rows 1)))))) - fromdat-lst))) - (mutex-unlock! *db-transaction-mutex*)) - fromdats) - - (sqlite3:finalize! stmth) - (if (member "last_update" field-names) - (db:create-trigger db tablename))) - )) + (let* ((count (match tabledat + ((tablename . fields) + (debug:print-info 0 *default-log-port* "Syncing table "tablename) + (dbmod:sync-table tablename fields fromdb todb (alist-ref tablename specials equal?))) + (else + (debug:print-warn 0 *default-log-port* "Bad tabledat entry: "tabledat) + 0)))) + (set! tot-count (+ tot-count count)))) tbls) - (let* ((runtime (- (current-milliseconds) start-time)) - (should-print (or ;; (debug:debug-mode 12) - (common:low-noise-print 120 "db sync") - (> runtime 500)))) ;; low and high sync times treated as separate. - (for-each - (lambda (dat) - (let ((tblname (car dat)) - (count (cdr dat))) - (set! tot-count (+ tot-count count)))) - (sort (hash-table->alist numrecs)(lambda (a b)(> (cdr a)(cdr b)))))) + (debug:print-info 0 *default-log-port* "dbmod:sync-tables completed in "(- (current-milliseconds) start-time)"ms") tot-count)) +(define (dbmod:sync-table tablename fields from-db to-db keyfield) + (let* ((field-names (map car fields)) + (has-last-update (member "last_update" field-names)) + (fields-sans-lu (filter (lambda (x) + (not (member x '("id" "last_update")))) + field-names)) + (get-ids (lambda (db) + (sqlite3:fold-row (lambda (res id) + (cons id res)) + '() + db + (conc "SELECT id FROM "tablename";")))) + (get-val (lambda (db fieldname id) + (let* ((res #f) + (sql (conc "SELECT "fieldname" FROM "tablename" WHERE id=?;"))) + (sqlite3:for-each-row + (lambda (val) + (set! res val)) + db + sql + id) + ;; (debug:print-info 0 *default-log-port* "get-val "db" "fieldname" "id", sql="sql", res="res) + res))) + (get-row (lambda (db id) + (let* ((res #f)) + (sqlite3:for-each-row + (lambda tuple + (set! res tuple)) + db + (conc "SELECT " (string-intersperse fields-sans-lu ",") + " FROM "tablename" WHERE id=?;") + id) + res))) + (ins-row (lambda (db id row) + (let* ((qry (conc "INSERT INTO "tablename" (id," + (string-intersperse fields-sans-lu ",") + ") VALUES ("id"," + (string-intersperse + (make-list (length fields-sans-lu) "?") + ",") + ");"))) + ;; (debug:print-info 0 *default-log-port* "qry="qry) + (apply sqlite3:execute db + qry + row)))) + (num-inserts 0) + (num-updates 0) + ) + ;; (debug:print-info 0 *default-log-port* "field-names: "field-names", fields-sans-lu: "fields-sans-lu) + ;; (sqlite3:with-transaction + ;; from-db + ;; (lambda () + (let* ((from-ids (get-ids from-db))) + ;; (debug:print-info 0 *default-log-port* "Table "tablename", has "(length from-ids)" records.") + ;; (sqlite3:with-transaction + ;; to-db + ;; (lambda () + (let* ((to-ids (get-ids to-db))) + ;; (debug:print 0 *default-log-port* "to-ids="to-ids) + (for-each ;; from-id + (lambda (from-id) + (if (member from-id to-ids) + (for-each ;; case where record exists, do one by one the fields if different + (lambda (fieldname) + (let* ((from-val (get-val from-db fieldname from-id)) + (dest-val (get-val to-db fieldname from-id))) + #;(debug:print 0 *default-log-port* + "fieldname="fieldname + ", from-id="from-id + ", from-val="from-val + ", dest-val="dest-val + ) + (if (not (equal? from-val dest-val)) + (begin + (sqlite3:execute to-db (conc "UPDATE "tablename" SET "fieldname"=? WHERE id=?;") + from-val + from-id) + (set! num-updates (+ num-updates 1)))))) + fields-sans-lu) + (let ((row (get-row from-db from-id))) ;; need to insert the row + ;; (debug:print 0 *default-log-port* "row="row) + (set! num-inserts (+ num-inserts 1)) + (ins-row to-db from-id row)))) + from-ids)));; )))) + (+ num-inserts num-updates))) + +;; (for-each ;; table +;; (lambda (tabledat) +;; (let* ((tablename (car tabledat)) +;; (fields (cdr tabledat)) +;; (has-last-update (member "last_update" fields)) +;; (use-last-update (dbmod:calc-use-last-update has-last-update fields last-update)) +;; (last-update-value (if use-last-update ;; no need to check for has-last-update - it is already accounted for +;; (if (number? last-update) +;; last-update +;; (cdr last-update)) +;; #f)) +;; (last-update-field (if use-last-update +;; (if (number? last-update) +;; "last_update" +;; (car last-update)) +;; #f)) +;; (num-fields (length fields)) +;; (field->num (make-hash-table)) +;; (num->field (apply vector (map car fields))) ;; BBHERE +;; (full-sel (conc "SELECT " (string-intersperse (map car fields) ",") +;; " FROM " tablename (if use-last-update ;; apply last-update criteria +;; (conc " WHERE " last-update-field " >= " last-update-value) +;; "") +;; ";")) +;; (full-ins (conc "INSERT OR REPLACE INTO " tablename " ( " (string-intersperse (map car fields) ",") " ) " +;; " VALUES ( " (string-intersperse (make-list num-fields "?") ",") " );")) +;; (fromdat '()) +;; (fromdats '()) +;; (totrecords 0) +;; (batch-len 10000000) ;; (string->number (or (configf:lookup *configdat* "sync" "batchsize") "100"))) +;; (todat (make-hash-table)) +;; (count 0) +;; (field-names (map car fields))) +;; +;; (debug:print-info 0 *default-log-port* "Syncing table "tablename) +;; +;; ;; set up the field->num table +;; (for-each +;; (lambda (field) +;; (hash-table-set! field->num field count) +;; (set! count (+ count 1))) +;; fields) +;; +;; ;; read the source table +;; ;; store a list of all rows in the table in fromdat, up to batch-len. +;; ;; Then add fromdat to the fromdats list, clear fromdat and repeat. +;; (sqlite3:for-each-row +;; (lambda (a . b) +;; (set! fromdat (cons (apply vector a b) fromdat)) +;; (if (> (length fromdat) batch-len) +;; (begin +;; (set! fromdats (cons fromdat fromdats)) +;; (set! fromdat '()) +;; (set! totrecords (+ totrecords 1))))) +;; fromdb +;; full-sel) +;; +;; (debug:print-info 0 *default-log-port* "Have "totrecords" records to update.") +;; ;; Count less than batch-len as a record +;; (if (> (length fromdat) 0) +;; (set! totrecords (+ totrecords 1))) +;; +;; ;; tack on remaining records in fromdat +;; (if (not (null? fromdat)) +;; (set! fromdats (cons fromdat fromdats))) +;; +;; (sqlite3:for-each-row +;; (lambda (a . b) +;; (hash-table-set! todat a (apply vector a b))) +;; todb +;; full-sel) +;; +;; ;; first pass implementation, just insert all changed rows +;; +;; (let* ((db todb) +;; (has-last-update (member "last_update" field-names)) +;; (drp-trigger (if has-last-update +;; (db:drop-trigger db tablename) +;; #f)) +;; (is-trigger-dropped (if has-last-update +;; (db:is-trigger-dropped db tablename) +;; #f)) +;; (stmth (sqlite3:prepare db full-ins)) +;; (changed-rows 0)) +;; (for-each +;; (lambda (fromdat-lst) +;; (mutex-lock! *db-transaction-mutex*) +;; (sqlite3:with-transaction +;; db +;; (lambda () +;; (for-each ;; +;; (lambda (fromrow) +;; (let* ((a (vector-ref fromrow 0)) +;; (curr (hash-table-ref/default todat a #f)) +;; (same #t)) +;; (let loop ((i 0)) +;; (if (or (not curr) +;; (not (equal? (vector-ref fromrow i)(vector-ref curr i)))) +;; (set! same #f)) +;; (if (and same +;; (< i (- num-fields 1))) +;; (loop (+ i 1)))) +;; (if (not same) +;; (begin +;; (apply sqlite3:execute stmth (vector->list fromrow)) +;; (hash-table-set! numrecs tablename (+ 1 (hash-table-ref/default numrecs tablename 0))) +;; (set! changed-rows (+ changed-rows 1)))))) +;; fromdat-lst))) +;; (mutex-unlock! *db-transaction-mutex*)) +;; fromdats) +;; +;; (sqlite3:finalize! stmth) +;; (if (member "last_update" field-names) +;; (db:create-trigger db tablename))) +;; )) +;; tbls) +;; (let* ((runtime (- (current-milliseconds) start-time)) +;; (should-print (or ;; (debug:debug-mode 12) +;; (common:low-noise-print 120 "db sync") +;; (> runtime 500)))) ;; low and high sync times treated as separate. +;; (for-each +;; (lambda (dat) +;; (let ((tblname (car dat)) +;; (count (cdr dat))) +;; (set! tot-count (+ tot-count count)))) +;; (sort (hash-table->alist numrecs)(lambda (a b)(> (cdr a)(cdr b)))))) + (define (has-last-update dbh tablename) (let* ((has-last #f)) (sqlite3:for-each-row (lambda (name) (if (equal? name "last_update") @@ -433,11 +557,12 @@ ) (debug:print 0 *default-log-port* "Doing sync "direction" "destdbfile) (if (not (sqlite3:auto-committing? dbh)) (debug:print 0 *default-log-port* "Skipping sync due to transaction in flight.") (let* ((table-names (map car tables)) - (dest-exists (file-exists? destdbfile))) + (dest-exists (file-exists? destdbfile)) + (start-time (current-milliseconds))) (assert dest-exists "FATAL: sync called with non-existant file, "destdbfile) ;; attach the destdbfile ;; for each table ;; insert into dest. select * from src.
where last_update>last_update ;; done @@ -470,15 +595,15 @@ ;; (begin (mutex-lock! *db-transaction-mutex*) (sqlite3:with-transaction dbh (lambda () - (debug:print-info 0 *default-log-port* "Sync from "fromdb table" to "todb table" using "stmt1) + (debug:print-info 0 *default-log-port* "Sync from "fromdb table" to "todb table" using INSERT OR UPDATE") (sqlite3:execute dbh stmt1) ;; get all new rows - #;(if (member "last_update" fields) - (sqlite3:execute dbh stmt8)) ;; get all updated rows + (if (member "last_update" fields) + (sqlite3:execute dbh stmt8)) ;; get all updated rows ;; (sqlite3:execute dbh stmt5) ;; (sqlite3:execute dbh stmt4) ;; if it worked this would be better for incremental up ;; (sqlite3:execute dbh stmt6) )) (debug:print 0 *default-log-port* "Synced table "table @@ -485,11 +610,13 @@ " in "(- (current-milliseconds) start-ms)"ms") ;; ) (mutex-unlock! *db-transaction-mutex*))) ;; (debug:print 0 *default-log-port* "Skipping sync of table "table" due to transaction in flight.")))) table-names) - (sqlite3:execute dbh "DETACH auxdb;")))) + (sqlite3:execute dbh "DETACH auxdb;") + (debug:print-info 0 *default-log-port* "Total sync time: "(- (current-milliseconds) start-time)"ms") + -1))) ;; FAILED ATTEMPTS ;; (if (not (has-last-update dbh table)) ;; (sqlite3:execute dbh (conc "ALTER TABLE "table" ADD COLUMN last_update INTEGER;"))) @@ -749,7 +876,45 @@ " 1 day since event_time marked") (set! oldlaunched (cons (list test-id run-dir uname testname item-path run-id) oldlaunched))))) stmth3 run-id)))) (list incompleted oldlaunched toplevels))) + +(define (db:set-state-status-by-state-status dbstruct run-id testname currstate currstatus newstate newstatus) + + + ;; clear caches needed + + + (let* ((qry (conc "UPDATE tests SET state=?,status=? WHERE " + (if currstate (conc "state='" currstate "' AND ") "") + (if currstatus (conc "status='" currstatus "' AND ") "") + " run_id=? AND testname LIKE ?;"))) + (db:with-db + dbstruct + run-id + #t + (lambda (dbdat db) + (sqlite3:execute db qry + (or newstate currstate "NOT_STARTED") + (or newstatus currstate "UNKNOWN") + run-id testname))))) +;;====================================================================== +;; db to db sync +;;====================================================================== + +(define (dbmod:db-to-db-sync src-db dest-db last-update init-proc keys) + (if (and (file-exists? src-db) + (file-read-access? src-db)) + (let* ((d-wr (or (and (file-exists? dest-db) + (file-write-access? dest-db)) ;; exists and writable + (let* ((dirname (or (pathname-directory dest-db) + "."))) + (if dirname + (file-exists? dirname) + (file-write-access? dirname))))) + (tables (db:sync-all-tables-list keys)) + (sdb (dbmod:safely-open-db src-db init-proc #t)) + (ddb (dbmod:safely-open-db dest-db init-proc d-wr))) + (dbmod:sync-gasket tables last-update sdb ddb dest-db 'todest)))) ) Index: launch.scm ================================================================== --- launch.scm +++ launch.scm @@ -741,11 +741,12 @@ ) ;; for automated creation of the rollup html file this is a good place... (if (not (equal? item-path "")) - (tests:summarize-items run-id test-id test-name #f)) + (tests:summarize-items run-id test-id test-name #f)) + ;; BUG was this meant to be the antecnt of the if above? (tests:summarize-test run-id test-id) ;; don't force - just update if no ;; Leave a .final-status file for the top level test (tests:save-final-status run-id test-id) (rmt:update-run-stats run-id (rmt:get-raw-run-stats run-id))) ;; end of let* @@ -1155,16 +1156,20 @@ (let ((cfname (args:get-arg "-append-config"))) (if (and cfname (file-read-access? cfname)) (read-config cfname *configdat* #t))) ;; values are added to the hash, no need to do anything special. ;; have config at this time, this is a good place to set params based on config file settings - (let* ((dbmode (configf:lookup *configdat* "setup" "dbcache-mode"))) + (let* ((dbmode (configf:lookup *configdat* "setup" "dbcache-mode")) + (syncmode (configf:lookup *configdat* "setup" "sync-mode"))) (if dbmode (begin (debug:print-info 0 *default-log-port* "Overriding dbmode to "dbmode) - (dbcache-mode (string->symbol dbmode))))) - + (dbcache-mode (string->symbol dbmode)))) + (if syncmode + (begin + (debug:print-info 0 *default-log-port* "Overriding syncmode to "syncmode) + (dbfile:sync-method (string->symbol syncmode))))) *toppath*))) (define (get-best-disk confdat testconfig) (let* ((disks (or (and testconfig (hash-table-ref/default testconfig "disks" #f)) Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -284,10 +284,11 @@ -list-test-time : list time requered to complete each test in a run. It following following arguments -runname -target -dumpmode -syscheck : do some very basic checks; write access and space in tmp, home, runs, links and is $DISPLAY valid -list-waivers : dump waivers for specified target, runname, testpatt to stdout + -db2db : sync db to db, use -from and -to to specify the databases Diff report -diff-rep : generate diff report (must include -src-target, -src-runname, -target, -runname and either -diff-email or -diff-html) -src-target @@ -346,10 +347,11 @@ "-m" "-rerun" "-days" "-rename-run" + "-from" "-to" "-dest" "-source" "-time-stamp" ;; values and messages @@ -450,11 +452,10 @@ "-cache-db" "-cp-eventtime-to-publishtime" "-use-db-cache" "-prepend-contour" - ;; misc "-repl" "-lock" "-unlock" "-list-servers" @@ -495,10 +496,11 @@ "-convert-to-norm" "-convert-to-old" "-import-megatest.db" "-sync-to-megatest.db" + "-db2db" "-sync-brute-force" "-logging" "-v" ;; verbose 2, more than normal (normal is 1) "-q" ;; quiet 0, errors/warnings only @@ -2560,10 +2562,36 @@ (if (args:get-arg "-sync-to") (let ((toppath (launch:setup))) (tasks:sync-to-postgres *configdat* (args:get-arg "-sync-to")) (set! *didsomething* #t))) +;; use with -from and -to +;; +(if (args:get-arg "-db2db") + (let* ((duh (launch:setup)) + (src-db (args:get-arg "-from")) + (dest-db (args:get-arg "-to")) + (sync-period (args:get-arg "-period")) ;; NOT IMPLEMENTED YET + (sync-timeout (args:get-arg "-timeout")) ;; NOT IMPLEMENTED YET + (lockfile (conc dest-db".lock")) + (keys (db:get-keys #f)) + ) + + (if (and src-db dest-db) + (begin + (debug:print-info 0 *default-log-port* "Attempting to sync data from "src-db" to "dest-db"...") + ;; (if (common:simple-file-lock lockfile) + ;; (begin + (if (not (file-exists? dest-db)) ;; use copy to get going + (file-copy src-db dest-db)) + (let ((res (dbmod:db-to-db-sync src-db dest-db 0 (dbfile:db-init-proc) keys))) + ;; (common:simple-file-release-lock lockfile) + (debug:print 0 *default-log-port* "Synced " res " records from "src-db" to "dest-db))) + (debug:print 0 *default-log-port* "Skipping sync, there is a sync in progress.")) + (set! *didsomething* #t)) + (debug:print 0 *default-log-port* "Usage for -db2db; -to and -from must be specified")) + (if (args:get-arg "-list-test-time") (let* ((toppath (launch:setup))) (task:get-test-times) (set! *didsomething* #t))) Index: mt.scm ================================================================== --- mt.scm +++ mt.scm @@ -21,10 +21,11 @@ (import (prefix sqlite3 sqlite3:)) (declare (unit mt)) (declare (uses debugprint)) (declare (uses db)) +(declare (uses dbmod)) (declare (uses common)) (declare (uses items)) (declare (uses runconfig)) (declare (uses tests)) (declare (uses server)) @@ -31,11 +32,12 @@ (declare (uses runs)) (declare (uses rmt)) (declare (uses rmtmod)) (import debugprint - rmtmod) + rmtmod + dbmod) (include "common_records.scm") (include "key_records.scm") (include "db_records.scm") (include "run_records.scm") @@ -190,13 +192,13 @@ (if prev-nbfake-log (setenv "NBFAKE_LOG" prev-nbfake-log) (unsetenv "NBFAKE_LOG")) )) -(define (mt:process-triggers dbstruct run-id test-id newstate newstatus) +(define (mt:process-triggers run-id test-id newstate newstatus) (if test-id - (let* ((test-dat (db:get-test-info-by-id dbstruct run-id test-id))) + (let* ((test-dat (rmt:get-test-info-by-id run-id test-id))) (if test-dat (let* ((test-rundir (db:test-get-rundir test-dat)) ;; ) ;; ) (test-name (db:test-get-testname test-dat)) (item-path (db:test-get-item-path test-dat)) (duration (db:test-get-run_duration test-dat)) @@ -262,21 +264,11 @@ (begin (debug:print-error 0 *default-log-port* "bad data handed to mt:test-set-state-status-by-id, run-id=" run-id ", test-id=" test-id ", newstate=" newstate) (print-call-chain (current-error-port)) #f) (begin - ;; cond - ;; ((and newstate newstatus newcomment) - ;; (rmt:general-call 'state-status-msg run-id newstate newstatus newcomment test-id)) - ;; ((and newstate newstatus) - ;; (rmt:general-call 'state-status run-id newstate newstatus test-id)) - ;; (else - ;; (if newstate (rmt:general-call 'set-test-state run-id newstate test-id)) - ;; (if newstatus (rmt:general-call 'set-test-status run-id newstatus test-id)) - ;; (if newcomment (rmt:general-call 'set-test-comment run-id newcomment test-id)))) (rmt:set-state-status-and-roll-up-items run-id test-id #f newstate newstatus newcomment) - ;; (mt:process-triggers run-id test-id newstate newstatus) #t))) (define (mt:test-set-state-status-by-id-unless-completed run-id test-id newstate newstatus newcomment) (let* ((test-vec (rmt:get-testinfo-state-status run-id test-id)) @@ -294,11 +286,141 @@ ;;(mt:test-set-state-status-by-id run-id test-id new-state new-status new-comment))) (define (mt:test-set-state-status-by-testname-unless-completed run-id test-name item-path new-state new-status new-comment) (let ((test-id (rmt:get-test-id run-id test-name item-path))) (mt:test-set-state-status-by-id-unless-completed run-id test-id new-state new-status new-comment))) - + +;; state and status are extra hints not usually used in the calculation +;; +(define (rmt:set-state-status-and-roll-up-items run-id test-name item-path state status comment) + (assert (number? run-id) "FATAL: Run id required.") + (rmt:client-side-set-state-status-and-roll-up run-id test-name item-path state status comment) + ;; (rmtmod:send-receive 'set-state-status-and-roll-up-items run-id (list run-id test-name item-path state status comment)) + ) + +(define (rmt:client-side-set-state-status-and-roll-up run-id test-name item-path state status comment) + ;; establish info on incoming test followed by info on top level test + ;; BBnote - for mode itemwait, linkage between upstream test & matching item status is propagated to run queue in db:prereqs-not-met + (let* ((test-id (if (number? test-name) + test-name + (db:keep-trying-until-true + rmt:get-test-id + (list run-id test-name item-path) + 10))) + ;; (rmt:get-test-id run-id test-name item-path))) + (testdat (rmt:get-test-info-by-id run-id test-id)) + ;; (test-id (db:test-get-id testdat)) + (test-name (if (number? test-name) + (db:test-get-testname testdat) + test-name)) + (item-path (db:test-get-item-path testdat)) + (tl-test-id (rmt:get-test-id run-id test-name "")) + (tl-testdat (rmt:get-test-info-by-id run-id test-id)) + (new-state-eh #f) + (new-status-eh #f)) + (if (member state '("LAUNCHED" "REMOTEHOSTSTART")) + (rmt:general-call 'set-test-start-time run-id test-id)) + (let* ((res (begin + (rmt:test-set-state-status run-id test-id state status comment) ;; this call sets the item state/status + (if (not (equal? item-path "")) ;; only roll up IF incoming test is an item + (let* ((state-status-counts (rmt:get-all-state-status-counts-for-test run-id test-name item-path state status)) ;; item-path is used to exclude current state/status of THIS test + (state-statuses (db:roll-up-rules state-status-counts state status)) + (newstate (car state-statuses)) + (newstatus (cadr state-statuses))) + (set! new-state-eh newstate) + (set! new-status-eh newstatus) + (debug:print 4 *default-log-port* "BB> tl-test-id="tl-test-id" ; "test-name":"item-path + " newstate="newstate" newstatus="newstatus" len(sscs)="(length state-status-counts) " state-status-counts: " + (apply conc + (map (lambda (x) + (conc + (with-output-to-string (lambda () (pp (dbr:counts->alist x)))) " | ")) + state-status-counts))); end debug:print + (if tl-test-id + (rmt:test-set-state-status run-id tl-test-id newstate newstatus #f)) ;; we are still in the transaction - must access the db and not the dbstruct + ))))) + (if (and test-id state status (equal? status "AUTO")) + (rmt:test-data-rollup run-id test-id status)) + (if new-state-eh ;; moved from db:test-set-state-status + (mt:process-triggers run-id test-id new-state-eh new-status-eh)) + res))) + +;; select end_time-now from +;; (select testname,item_path,event_time+run_duration as +;; end_time,strftime('%s','now') as now from tests where state in +;; ('RUNNING','REMOTEHOSTSTART','LAUNCHED')); +;; +;; NOT EASY TO MIGRATE TO db{file,mod} +;; +(define (rmt:find-and-mark-incomplete-engine run-id ovr-deadtime cfg-deadtime test-stats-update-period) + (let* ((incompleted '()) + (oldlaunched '()) + (toplevels '()) + ;; The default running-deadtime is 720 seconds = 12 minutes. + ;; "(running-deadtime-default (+ server-start-allowance (* 2 launch-monitor-period)))" = 200 + (2 * (200 + 30 + 30)) + (deadtime-trim (or ovr-deadtime cfg-deadtime)) + (server-start-allowance 200) + (server-overloaded-budget 200) + (launch-monitor-off-time (or test-stats-update-period 30)) + (launch-monitor-on-time-budget 30) + (launch-monitor-period (+ launch-monitor-off-time launch-monitor-on-time-budget server-overloaded-budget)) + (remotehoststart-deadtime-default (+ server-start-allowance server-overloaded-budget 30)) + (remotehoststart-deadtime (or deadtime-trim remotehoststart-deadtime-default)) + (running-deadtime-default (+ server-start-allowance (* 2 launch-monitor-period))) + (running-deadtime (or deadtime-trim running-deadtime-default))) ;; two minutes (30 seconds between updates, this leaves 3x grace period) + + (debug:print-info 4 *default-log-port* "running-deadtime = " running-deadtime) + (debug:print-info 4 *default-log-port* "deadtime-trim = " deadtime-trim) + + (let* ((dat (rmt:get-toplevels-and-incompletes run-id running-deadtime remotehoststart-deadtime))) + (set! oldlaunched (list-ref dat 1)) + (set! toplevels (list-ref dat 2)) + (set! incompleted (list-ref dat 0))) + + (debug:print-info 18 *default-log-port* "Found " (length oldlaunched) " old LAUNCHED items, " + (length toplevels) " old LAUNCHED toplevel tests and " + (length incompleted) " tests marked RUNNING but apparently dead.") + + ;; These are defunct tests, do not do all the overhead of set-state-status. Force them to INCOMPLETE. + ;; + ;; (db:delay-if-busy dbdat) + (let* ((min-incompleted-ids (map car incompleted)) ;; do 'em all + (all-ids (append min-incompleted-ids (map car oldlaunched)))) + (if (> (length all-ids) 0) + (begin + ;; (launch:is-test-alive "localhost" 435) + (debug:print 0 *default-log-port* "WARNING: Marking test(s); " (string-intersperse (map conc all-ids) ", ") + " as DEAD") + (for-each + (lambda (test-id) + (let* ((tinfo (rmt:get-test-info-by-id run-id test-id)) + (run-dir (db:test-get-rundir tinfo)) + (host (db:test-get-host tinfo)) + (pid (db:test-get-process_id tinfo)) + (result (rmt:get-status-from-final-status-file run-dir))) + (if (and (list? result) (> (length result) 1) (equal? "PASS" (cadr result)) (equal? "COMPLETED" (car result))) + (begin + (debug:print 0 *default-log-port* "INFO: test " test-id " actually passed, so marking PASS not DEAD") + (rmt:set-state-status-and-roll-up-items + run-id test-id 'foo "COMPLETED" "PASS" + "Test stopped responding but it has PASSED; marking it PASS in the DB.")) + (let ((is-alive (and (not (eq? pid 0)) ;; 0 is default in re-used field "attemptnum" where pid stored. + (commonmod:is-test-alive host pid)))) + (if is-alive + (debug:print 0 *default-log-port* "INFO: test " test-id " on host " host + " has a process on pid " pid ", NOT setting to DEAD.") + (begin + (debug:print 0 *default-log-port* "INFO: test " test-id + " final state/status is not COMPLETED/PASS. It is " result) + (rmt:set-state-status-and-roll-up-items + run-id test-id 'foo "COMPLETED" "DEAD" + "Test stopped responding while in RUNNING or REMOTEHOSTSTART; presumed dead."))))))) + ;; call end of eud of run detection for posthook - from merge, is it needed? + ;; (launch:end-of-run-check run-id) + all-ids) + ))))) + (define (mt:lazy-read-test-config test-name) (let ((tconf (hash-table-ref/default *testconfigs* test-name #f))) (if tconf tconf (let ((test-dirs (tests:get-tests-search-path *configdat*))) ADDED nmsg-transport.scm Index: nmsg-transport.scm ================================================================== --- /dev/null +++ nmsg-transport.scm @@ -0,0 +1,367 @@ + +;; Copyright 2006-2012, Matthew Welland. + +;; This file is part of Megatest. +;; +;; Megatest is free software: you can redistribute it and/or modify +;; it under the terms of the GNU General Public License as published by +;; the Free Software Foundation, either version 3 of the License, or +;; (at your option) any later version. +;; +;; Megatest is distributed in the hope that it will be useful, +;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +;; GNU General Public License for more details. +;; +;; You should have received a copy of the GNU General Public License +;; along with Megatest. If not, see . + + +(require-extension (srfi 18) extras tcp s11n) + +(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) +(import (prefix sqlite3 sqlite3:)) + +;; (use nanomsg) + +(declare (unit nmsg-transport)) + +(declare (uses common)) +(declare (uses db)) +(declare (uses tests)) +(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. +(declare (uses server)) + +(include "common_records.scm") +(include "db_records.scm") + +;; Transition to pub --> sub with pull <-- push +;; +;; 1. client sends request to server via push to the pull port +;; 2. server puts request in queue or processes immediately as appropriate +;; 3. server puts responses from completed requests into pub port +;; +;; TODO +;; +;; Done Tested +;; [x] [ ] 1. Add columns pullport pubport to servers table +;; [x] [ ] 2. Add rm of monitor.db if older than 11/12/2012 +;; [x] [ ] 3. Add create of pullport and pubport with finding of available ports +;; [x] [ ] 4. Add client compose of request +;; [x] [ ] - name of client: testname/itempath-test_id-hostname +;; [x] [ ] - name of request: callname, params +;; [x] [ ] - request key: f(clientname, callname, params) +;; [x] [ ] 5. Add processing of subscription hits +;; [x] [ ] - done when get key +;; [x] [ ] - return results +;; [x] [ ] 6. Add timeout processing +;; [x] [ ] - after 60 seconds +;; [ ] [ ] i. check server alive, connect to new if necessary +;; [ ] [ ] ii. resend request +;; [ ] [ ] 7. Turn self ping back on + +(define (nmsg-transport:make-server-url hostport #!key (bindall #f)) + (if (not hostport) + #f + (conc "tcp://" (if bindall "*" (car hostport)) ":" (cadr hostport)))) + +(define *server-loop-heart-beat* (current-seconds)) +(define *heartbeat-mutex* (make-mutex)) + +;;====================================================================== +;; S E R V E R +;;====================================================================== + +(define (nmsg-transport:run dbstruct hostn run-id server-id #!key (retrynum 1000)) + (debug:print 2 *default-log-port* "Attempting to start the server ...") + (let* ((start-port (portlogger:open-run-close portlogger:find-port)) + (server-thread (make-thread (lambda () + (nmsg-transport:try-start-server dbstruct run-id start-port server-id)) + "server thread")) + (tdbdat (tasks:open-db))) + (thread-start! server-thread) + (thread-sleep! 0.1) + (if (nmsg-transport:ping hostn start-port timeout: 2 expected-key: (current-process-id)) + (let ((interface (if (equal? hostn "-")(get-host-name) hostn))) + (tasks:server-set-interface-port (db:delay-if-busy tdbdat) server-id interface start-port) + (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "dbprep") + (set! *server-info* (list hostn start-port)) ;; probably not needed anymore? currently used by keep-running + (thread-sleep! 3) ;; give some margin for queries to complete before switching from file based access to server based access + ;; (set! *inmemdb* dbstruct) + (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running") + (thread-start! (make-thread + (lambda ()(nmsg-transport:keep-running server-id run-id)) + "keep running")) + (thread-join! server-thread)) + (if (> retrynum 0) + (begin + (debug:print 0 *default-log-port* "WARNING: Failed to connect to server (self) on host " hostn ":" start-port ", trying again.") + (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id "failed to start, never received server alive signature") + (portlogger:open-run-close portlogger:set-failed start-port) + (nmsg-transport:run dbstruct hostn run-id server-id)) + (begin + (debug:print-error 0 *default-log-port* "could not find an open port to start server on. Giving up") + (exit 1)))))) + +(define (nmsg-transport:try-start-server dbstruct run-id portnum server-id) + (let ((repsoc (nn-socket 'rep))) + (nn-bind repsoc (conc "tcp://*:" portnum)) + (let loop ((msg-in (nn-recv repsoc))) + (let* ((dat (db:string->obj msg-in transport: 'nmsg))) + (debug:print 0 *default-log-port* "server, received: " dat) + (let ((result (api:execute-requests dbstruct dat))) + (debug:print 0 *default-log-port* "server, sending: " result) + (nn-send repsoc (db:obj->string result transport: 'nmsg))) + (loop (nn-recv repsoc)))))) + +;; all routes though here end in exit ... +;; +(define (nmsg-transport:launch run-id) + (let* ((tdbdat (tasks:open-db)) + (dbstruct (db:setup run-id)) + (hostn (or (args:get-arg "-server") "-"))) + (set! *run-id* run-id) + (set! *inmemdb* dbstruct) + ;; with nbfake daemonize isn't really needed + ;; + ;; (if (args:get-arg "-daemonize") + ;; (begin + ;; (daemon:ize) + ;; (if *alt-log-file* ;; we should re-connect to this port, I think daemon:ize disrupts it + ;; (begin + ;; (current-error-port *alt-log-file*) + ;; (current-output-port *alt-log-file*))))) + (if (server:check-if-running run-id) + (begin + (debug:print-info 0 *default-log-port* "Server for run-id " run-id " already running") + (exit 0))) + (let loop ((server-id (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id)) + (remtries 4)) + (if (not server-id) + (if (> remtries 0) + (begin + (thread-sleep! 2) + (if (not (server:check-if-running run-id)) + (loop (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id) + (- remtries 1)) + (begin + (debug:print-info 0 *default-log-port* "Another server took the slot, exiting") + (exit 0)))) + (begin + ;; since we didn't get the server lock we are going to clean up and bail out + (debug:print-info 2 *default-log-port* "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue") + (tasks:server-delete-records-for-this-pid (db:delay-if-busy tdbdat) " http-transport:launch") + )) + ;; locked in a server id, try to start up + (nmsg-transport:run dbstruct hostn run-id server-id)) + (set! *didsomething* #t) + (exit)))) + +;;====================================================================== +;; S E R V E R U T I L I T I E S +;;====================================================================== + +(define (nmsg-transport:mk-signature) + (message-digest-string (md5-primitive) + (with-output-to-string + (lambda () + (write (list (current-directory) + (argv))))))) + +;;====================================================================== +;; C L I E N T S +;;====================================================================== + +;; ping the server at host:port +;; return the open socket if successful (return-socket == #t) +;; expect the key expected-key returned in payload +;; send our-key or #f as payload +;; +(define (nmsg-transport:ping hostn port #!key (timeout 3)(return-socket #t)(expected-key #f)(our-key #f)(socket #f)) + ;; send a random number along with pid and check that we get it back + (let* ((host (if (or (not hostn) + (equal? hostn "-")) ;; use localhost + (get-host-name) + hostn)) + (req (or socket + (let ((soc (nn-socket 'req))) + (nn-connect soc (conc "tcp://" host ":" port)) + soc))) + (success #t) + (dat (vector "ping" our-key)) + (result (condition-case + (nmsg-transport:client-api-send-receive-raw req dat timeout: timeout) + ((timeout)(set! success #f) #f))) + (key (if success + (vector-ref result 1) + #f))) + (debug:print 0 *default-log-port* "success=" success ", key=" key ", expected-key=" expected-key ", equal? " (equal? key expected-key)) + (if (and success + (or (not expected-key) ;; just getting a reply is good enough then + (equal? key expected-key))) + (if return-socket + req + (begin + (if (not socket)(nn-close req)) ;; don't want a side effect of closing socket if handed it + #t)) + (begin + (if (not socket)(nn-close req)) ;; failed to ping, close socket as side effect + #f)))) + +;; send data to server, wait max of timeout seconds for a response. +;; return #( success/fail result ) +;; +;; for effiency it is easier to do the obj->string and string->obj here. +;; +(define (nmsg-transport:client-api-send-receive-raw socreq indat #!key (enable-send #t)(timeout 25)) + (let* ((success #f) + (result #f) + (keepwaiting #t) + (dat (db:obj->string indat transport: 'nmsg)) + (send-recv (make-thread + (lambda () + (nn-send socreq dat) + (let* ((res (nn-recv socreq))) + (set! success #t) + (set! result (db:string->obj res transport: 'nmsg)))) + "send-recv")) + (timeout (make-thread + (lambda () + (let loop ((count 0)) + (thread-sleep! 1) + (debug:print-info 1 *default-log-port* "send-receive-raw, still waiting after " count " seconds...") + (if (and keepwaiting (< count timeout)) ;; yes, this is very aproximate + (loop (+ count 1)))) + (if keepwaiting + (begin + (print "timeout waiting for ping") + (thread-terminate! send-recv)))) + "timeout"))) + ;; replace with condition-case? + (handle-exceptions + exn + (set! result "timeout") + (thread-start! timeout) + (thread-start! send-recv) + (thread-join! send-recv) + (if success (thread-terminate! timeout))) + ;; raise timeout error if timed out + (if success + (if (and (vector? result) + (vector-ref result 0)) ;; did it fail at the server? + result ;; nope, all good + (begin + (debug:print-error 0 *default-log-port* "error occured at server, info=" (vector-ref result 2)) + (debug:print 0 *default-log-port* " client call chain:") + (print-call-chain (current-error-port)) + (debug:print 0 *default-log-port* " server call chain:") + (pp (vector-ref result 1) (current-error-port)) + (signal (vector-ref result 0)))) + (signal (make-composite-condition + (make-property-condition 'timeout 'message "nmsg-transport:client-api-send-receive-raw timed out talking to server")))))) + +;; run nmsg-transport:keep-running in a parallel thread to monitor that the db is being +;; used and to shutdown after sometime if it is not. +;; +(define (nmsg-transport:keep-running server-id run-id) + ;; if none running or if > 20 seconds since + ;; server last used then start shutdown + ;; This thread waits for the server to come alive + (let* ((server-info (let loop () + (let ((sdat #f)) + (mutex-lock! *heartbeat-mutex*) + (set! sdat *server-info*) + (mutex-unlock! *heartbeat-mutex*) + (if sdat + (begin + (debug:print-info 0 *default-log-port* "keep-running got sdat=" sdat) + sdat) + (begin + (thread-sleep! 0.5) + (loop)))))) + (iface (car server-info)) + (port (cadr server-info)) + (last-access 0) + (tdbdat (tasks:open-db)) + (server-timeout (let ((tmo (configf:lookup *configdat* "server" "timeout"))) + (if (and (string? tmo) + (string->number tmo)) + (* 60 60 (string->number tmo)) + ;; (* 3 24 60 60) ;; default to three days + (* 60 1) ;; default to one minute + ;; (* 60 60 25) ;; default to 25 hours + )))) + (print "Keep-running got server pid " server-id ", using iface " iface " and port " port) + (let loop ((count 0)) + (thread-sleep! 4) ;; no need to do this very often + ;; NB// sync currently does NOT return queue-length + (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1))) + ;; (print "Server running, count is " count) + (if (< count 1) ;; 3x3 = 9 secs aprox + (loop (+ count 1))) + + (mutex-lock! *heartbeat-mutex*) + (set! last-access *last-db-access*) + (mutex-unlock! *heartbeat-mutex*) + (db:sync-touched *inmemdb* run-id force-sync: #t) + (if (and *server-run* + (> (+ last-access server-timeout) + (current-seconds))) + (begin + (debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) + (loop 0)) + (begin + (debug:print-info 0 *default-log-port* "Starting to shutdown the server.") + (set! *time-to-exit* #t) + (db:sync-touched *inmemdb* run-id force-sync: #t) + (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id " http-transport:keep-running") + (debug:print-info 0 *default-log-port* "Server shutdown complete. Exiting") + (exit) + )))))) + +;;====================================================================== +;; C L I E N T S +;;====================================================================== + +(define (nmsg-transport:client-connect iface portnum) + (let* ((reqsoc (nmsg-transport:ping iface portnum return-socket: #t))) + (vector iface portnum #f #f #f (current-seconds) reqsoc))) + +;; returns result, there is no sucess/fail flag - handled via excpections +;; +(define (nmsg-transport:client-api-send-receive run-id connection-info cmd param #!key (remtries 5)) + ;; NB// In the html version of this routine there is a call to + ;; tasks:kill-server-run-id when there is an exception + (mutex-lock! *http-mutex*) + (let* ((packet (vector cmd param)) + (reqsoc (http-transport:server-dat-get-socket connection-info)) + (res (nmsg-transport:client-api-send-receive-raw reqsoc packet))) +;; (status (vector-ref rawres 0)) +;; (result (vector-ref rawres 1))) + (mutex-unlock! *http-mutex*) + res)) ;; (vector status (if status (db:string->obj result transport: 'nmsg) result)))) + +;;====================================================================== +;; J U N K +;;====================================================================== + +;; DO NOT USE +;; +(define (nmsg-transport:client-signal-handler signum) + (handle-exceptions + exn + (debug:print 0 *default-log-port* " ... exiting ...") + (let ((th1 (make-thread (lambda () + (if (not *received-response*) + (receive-message* *runremote*))) ;; flush out last call if applicable + "eat response")) + (th2 (make-thread (lambda () + (debug:print-error 0 *default-log-port* "Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.") + (thread-sleep! 3) ;; give the flush three seconds to do it's stuff + (debug:print 0 *default-log-port* " Done.") + (exit 4)) + "exit on ^C timer"))) + (thread-start! th2) + (thread-start! th1) + (thread-join! th2)))) + ADDED nn-transport.scm Index: nn-transport.scm ================================================================== --- /dev/null +++ nn-transport.scm @@ -0,0 +1,315 @@ +;;start a server, returns the connection +;; +(define (start-nn-server portnum ) + (let ((rep (nn-socket 'rep))) + (handle-exceptions + exn + (let ((emsg ((condition-property-accessor 'exn 'message) exn))) + (print "ERROR: Failed to start server \"" emsg "\"") + (exit 1)) + + (nn-bind rep (conc "tcp://*:" portnum))) + rep)) +;; open connection to server, send message, close connection +;; +(define (open-send-close-nn host-port msg attrib #!key (timeout 3) ) ;; default timeout is 3 seconds + (let ((req (nn-socket 'req)) + (uri (conc "tcp://" host-port)) + (res #f) + (contacts (alist-ref 'contact attrib)) + (mode (alist-ref 'mode attrib))) + (handle-exceptions + exn + (let ((emsg ((condition-property-accessor 'exn 'message) exn))) + ;; Send notification + (print "ERROR: Failed to connect / send to " uri " message was \"" emsg "\"" ) + (if (equal? mode "production") + (begin + (print " Sending email to contacts : " contacts ) + (let ((email-body (mtut:stml->string (s:body + (s:p (conc "We could not send messages to the server on " uri "." "Please check if the listner is running. It is possible that the host is overloaded due to which it may take too long to respond. \n Contact your system adminstrator if server load is high." (s:br)" Thank You ") ))))) + (sendmail (string-join (string-split contacts ";" )) (conc "[Listner Error] Filed to connect to listner on " uri) email-body use_html: #t))) + (print " mode : " mode " Not sending any emails" )) + #f) + (nn-connect req uri) + (print "Connected to the server " ) + (nn-send req msg) + (print "Request Sent") + (let* ((th1 (make-thread (lambda () + (let ((resp (nn-recv req))) + (nn-close req) + (set! res (if (equal? resp "ok") + #t + #f)))) + "recv thread")) + (th2 (make-thread (lambda () + (thread-sleep! timeout) + (thread-terminate! th1)) + "timer thread"))) + (thread-start! th1) + (thread-start! th2) + (thread-join! th1) + res)))) + +(define (open-send-receive-nn host-port msg attrib #!key (timeout 3) ) ;; default timeout is 3 seconds + (let ((req (nn-socket 'req)) + (uri (conc "tcp://" host-port)) + (res #f) + (contacts (alist-ref 'contact attrib)) + (mode (alist-ref 'mode attrib))) + (handle-exceptions + exn + (let ((emsg ((condition-property-accessor 'exn 'message) exn))) + ;; Send notification + (print "ERROR: Failed to connect / send to " uri " message was \"" emsg "\"" ) + (if (equal? mode "production") + (begin + (print " Sending email to contacts : " contacts ) + (let ((email-body (mtut:stml->string (s:body + (s:p (conc "We could not send messages to the server on " uri "." "Please check if the listner is running. It is possible that the host is overloaded due to which it may take too long to respond. \n Contact your system adminstrator if server load is high." (s:br)" Thank You ") ))))) + (sendmail (string-join (string-split contacts ";" )) (conc "[Listner Error] Filed to connect to listner on " uri) email-body use_html: #t))) + (print " mode : " mode " Not sending any emails" )) + #f) + (nn-connect req uri) + (print "Connected to the server " ) + (nn-send req msg) + (print "Request Sent") + ;; receive code here + ;;(print (nn-recv req)) + (let* ((th1 (make-thread (lambda () + (let ((resp (nn-recv req))) + (nn-close req) + (print resp) + (set! res (if (equal? resp "ok") + #t + #f)))) + "recv thread")) + (th2 (make-thread (lambda () + (thread-sleep! timeout) + (thread-terminate! th1)) + "timer thread"))) + (thread-start! th1) + (thread-start! th2) + (thread-join! th1) + res)))) + + + ((tsend) + (if (null? remargs) + (print "ERROR: missing data to send to trigger listeners") + (let* ((msg (car remargs)) + (mtconfdat (simple-setup (args:get-arg "-start-dir"))) + (mtconf (car mtconfdat)) + (time-out (if (args:get-arg "-time-out") + (string->number (args:get-arg "-time-out")) + 5)) + (listeners (configf:get-section mtconf "listeners")) + (user-info (user-information (current-user-id))) + (prev-seen (make-hash-table))) ;; catch duplicates + (if user-info + (begin + (for-each + (lambda (listener) + (let ((host-port (car listener)) + (attrib (val->alist (cadr listener)))) + (if (and (equal? msg "time-to-die") (not (can-user-kill-listner user-info attrib))) + (begin + (debug:print-error 0 *default-log-port* "User " (car user-info) " is not allowed to send message '" msg"'") + (exit 1))) + (print "sending " msg " to " host-port ) + (open-send-close-nn host-port msg attrib timeout: time-out ))) + listeners)) + (begin + (debug:print-error 0 *default-log-port* "Could not Identify executing user. Will not send any message") + (exit 1)))))) + ((tquery) + (if (null? remargs) + (print "ERROR: missing data to send to trigger listeners") + (let* ((msg (car remargs)) + (mtconfdat (simple-setup (args:get-arg "-start-dir"))) + (mtconf (car mtconfdat)) + (time-out (if (args:get-arg "-time-out") + (string->number (args:get-arg "-time-out")) + 5)) + (listeners (configf:get-section mtconf "listeners")) + (user-info (user-information (current-user-id))) + (prev-seen (make-hash-table))) ;; catch duplicates + (if user-info + (begin + (for-each + (lambda (listener) + (let ((host-port (car listener)) + (attrib (val->alist (cadr listener)))) + (if (and (equal? msg "time-to-die") (not (can-user-kill-listner user-info attrib))) + (begin + (debug:print-error 0 *default-log-port* "User " (car user-info) " is not allowed to send message '" msg"'") + (exit 1))) + (print "sending " msg " to " host-port ) + (open-send-receive-nn host-port msg attrib timeout: time-out ))) + listeners)) + (begin + (debug:print-error 0 *default-log-port* "Could not Identify executing user. Will not send any message") + (exit 1)))))) + + ((tquerylisten) + (if (null? remargs) + (print "ERROR: useage for tlisten is \"mtutil tlisten portnum\"") + (let ((portnum (string->number (car remargs)))) + + (if (not portnum) + (print "ERROR: the portnumber parameter must be a number, you gave: " (car remargs)) + (begin + (if (not (is-port-in-use portnum)) + (let* ((rep (start-nn-server portnum)) + (mtconfdat (simple-setup (args:get-arg "-start-dir"))) + (mtconf (car mtconfdat)) + (contact (configf:lookup mtconf "listener" "owner")) + (script (configf:lookup mtconf "listener" "script"))) + (print "Listening on port " portnum " for messages.") + (set-signal-handler! signal/int (lambda (signum) + (set! *time-to-exit* #t) + (debug:print-error 0 *default-log-port* "Received signal " signum " sending email befor exiting !!") + (let ((email-body (mtut:stml->string (s:body + (s:p (conc "Received signal " signum ". Lister has been terminated on host " (get-environment-variable "HOST") ". ")))))) + (sendmail contact "Listner has been terminated." email-body use_html: #t)) + (exit))) + (set-signal-handler! signal/term (lambda (signum) + (set! *time-to-exit* #t) + (debug:print-error 0 *default-log-port* "Received signal " signum " sending email befor exiting !!") + (let ((email-body (mtut:stml->string (s:body + (s:p (conc "Received signal " signum ". Lister has been terminated on host " (get-environment-variable "HOST") ". ")))))) + (sendmail contact "Listner has been terminated." email-body use_html: #t)) + (exit))) + + ;(set-signal-handler! signal/term special-signal-handler) + + (let loop ((instr (nn-recv rep))) + ;;(nn-send rep "3.9") + (with-input-from-pipe (conc "/usr/bin/uptime | cut -d':' -f4 | awk '{print $1}' | cut -d',' -f1") + (lambda() + (let loop ((inl (read-line))) + (if (not (eof-object? inl)) + (begin + ;;(print "fdk73: " inl ":") + ;;(set! current-list-ciaf (append! current-list-ciaf (list (string-substitute "\\s+$" "" inl)))) + (nn-send rep inl) + (loop(read-line))) + )) + + ) + ) + ;;(print (isys "/usr/bin/uptime" foreach-stdout-thunk: foreach-stdout)) + (let ((ctime (date->string (current-date)))) + (if (equal? instr "time-to-die") + (begin + (debug:print 0 *default-log-port* ctime " received '" instr "'. Time to sucide." ) + (let ((pid (current-process-id))) + (debug:print 0 *default-log-port* "Killing current process (pid=" pid ")") + (system (conc "kill " pid)))) + (begin + (debug:print 0 *default-log-port* ctime " received " instr ) + ;(nn-send rep "ok") + (if (not (equal? instr "ping")) + (begin + (debug:print 0 *default-log-port* ctime " running \"" script " " instr "\"") + ;(system (conc script " '" instr "'")) + (process-run script (list instr )) + (debug:print 0 *default-log-port* ctime " done" )) + (begin + (if (not (equal? instr "load")) + (print "Checking load") + + ) + ) + + ) + + ))) + (loop (nn-recv rep)))) + (print "ERROR: Port " portnum " already in use. Try another port"))))))) + + ((tlisten) + (if (null? remargs) + (print "ERROR: useage for tlisten is \"mtutil tlisten portnum\"") + (let ((portnum (string->number (car remargs)))) + + (if (not portnum) + (print "ERROR: the portnumber parameter must be a number, you gave: " (car remargs)) + (begin + (if (not (is-port-in-use portnum)) + (let* ((rep (start-nn-server portnum)) + (mtconfdat (simple-setup (args:get-arg "-start-dir"))) + (mtconf (car mtconfdat)) + (contact (configf:lookup mtconf "listener" "owner")) + (script (configf:lookup mtconf "listener" "script"))) + (print "Listening on port " portnum " for messages.") + (set-signal-handler! signal/int + (lambda (signum) + (set! *time-to-exit* #t) + (debug:print-error 0 *default-log-port* "Received signal " signum + " sending email befor exiting !!") + (let ((email-body (mtut:stml->string + (s:body + (s:p (conc "Received signal " signum + ". Lister has been terminated on host " + (get-environment-variable "HOST") ". ")))))) + (sendmail contact "Listner has been terminated." email-body use_html: #t)) + (exit))) + (set-signal-handler! signal/term (lambda (signum) + (set! *time-to-exit* #t) + (debug:print-error 0 *default-log-port* "Received signal " + signum " sending email befor exiting !!") + (let ((email-body (mtut:stml->string + (s:body + (s:p (conc "Received signal " signum + ". Lister has been terminated on host " + (get-environment-variable "HOST") ". ")))))) + (sendmail contact "Listner has been terminated." email-body use_html: #t)) + (exit))) + + ;; (set-signal-handler! signal/term special-signal-handler) + + (let loop ((instr (nn-recv rep))) + (nn-send rep "ok") + (let ((ctime (date->string (current-date)))) + (if (equal? instr "time-to-die") + (begin + (debug:print 0 *default-log-port* ctime " received '" instr "'. Time to sucide." ) + (let ((pid (current-process-id))) + (debug:print 0 *default-log-port* "Killing current process (pid=" pid ")") + (system (conc "kill " pid)))) + (begin + (debug:print 0 *default-log-port* ctime " received " instr ) + ;(nn-send rep "ok") + (if (not (equal? instr "ping")) + (begin + (debug:print 0 *default-log-port* ctime " running \"" script " " instr "\"") + (system (conc script " '" instr "' &")) + ;(process-run script (list instr )) + (debug:print 0 *default-log-port* ctime " done" )) + (begin + (if (not (equal? instr "load")) + (print "Checking load") + + ) + ) + + ) + + ))) + (loop (nn-recv rep)))) + (print "ERROR: Port " portnum " already in use. Try another port"))))))) + +;; from bard + +(define (start-nanomsg-server) + (let* ((socket (socket-connect "inproc://my-server"))) + (begin + (set-socket-option socket 'linger 1) + (set-socket-option socket 'sndbuf 1024) + (set-socket-option socket 'rcvbuf 1024) + (loop + (let ((msg (receive-message socket))) + (if msg + (handle-message msg) + (exit)))))) Index: rmt.scm ================================================================== --- rmt.scm +++ rmt.scm @@ -435,18 +435,21 @@ (define (rmt:get-count-tests-running-for-testname run-id testname) (assert (number? run-id) "FATAL: Run id required.") (rmt:send-receive 'get-count-tests-running-for-testname run-id (list run-id testname))) +(define (rmt:get-all-state-status-counts-for-test run-id test-name item-path item-state-in item-status-in) + (rmt:send-receive 'get-all-state-status-counts-for-test run-id (list run-id test-name item-path item-state-in item-status-in))) (define (rmt:get-count-tests-running-in-jobgroup run-id jobgroup) (assert (number? run-id) "FATAL: Run id required.") (rmt:send-receive 'get-count-tests-running-in-jobgroup run-id (list run-id jobgroup))) (define (rmt:set-state-status-and-roll-up-run run-id state status) (assert (number? run-id) "FATAL: Run id required.") (rmt:send-receive 'set-state-status-and-roll-up-run run-id (list run-id state status))) +;; run on client version of set-state-status-and-roll-up-run (define (rmt:update-pass-fail-counts run-id test-name) (assert (number? run-id) "FATAL: Run id required.") (rmt:general-call 'update-pass-fail-counts run-id test-name test-name test-name)) @@ -773,5 +776,32 @@ (let* ((cfg-deadtime (configf:lookup-number *configdat* "setup" "deadtime")) (test-stats-update-period (configf:lookup-number *configdat* "setup" "test-stats-update-period"))) (rmt:find-and-mark-incomplete-engine run-id ovr-deadtime cfg-deadtime test-stats-update-period) ;;call end of eud of run detection for posthook (launch:end-of-run-check run-id))) + +(define (rmt:get-test-id run-id testname itempath) + (rmt:send-receive 'get-test-id run-id (list run-id testname itempath))) + +;; set tests with state currstate and status currstatus to newstate and newstatus +;; use currstate = #f and or currstatus = #f to apply to any state or status respectively +;; WARNING: SQL injection risk. NB// See new but not yet used "faster" version below +;; +;; AND NOT (item_path='' AND testname in (SELECT DISTINCT testname FROM tests WHERE testname=? AND item_path != ''));"))) +;; (debug:print 0 *default-log-port* "QRY: " qry) +;; (db:delay-if-busy) +;; +;; NB// This call only operates on toplevel tests. Consider replacing it with more general call +;; +(define (rmt:set-tests-state-status run-id testnames currstate currstatus newstate newstatus) + (let ((test-ids '())) + (for-each + (lambda (testname) + (let ((test-id (rmt:get-test-id run-id testname ""))) + (rmt:set-state-status-by-state-status run-id testname currstate currstatus newstate newstatus) + (if test-id + (begin + (set! test-ids (cons test-id test-ids)) + (mt:process-triggers run-id test-id newstate newstatus))))) + testnames) + test-ids)) + Index: rmtmod.scm ================================================================== --- rmtmod.scm +++ rmtmod.scm @@ -151,13 +151,13 @@ ;; WARNING: This currently bypasses the transaction wrapped writes system (define (rmt:test-set-state-status-by-id run-id test-id newstate newstatus newcomment) (assert (number? run-id) "FATAL: Run id required.") (rmtmod:send-receive 'test-set-state-status-by-id run-id (list run-id test-id newstate newstatus newcomment))) -(define (rmt:set-tests-state-status run-id testnames currstate currstatus newstate newstatus) - (assert (number? run-id) "FATAL: Run id required.") - (rmtmod:send-receive 'set-tests-state-status run-id (list run-id testnames currstate currstatus newstate newstatus))) +;; (define (rmt:set-tests-state-status run-id testnames currstate currstatus newstate newstatus) +;; (assert (number? run-id) "FATAL: Run id required.") +;; (rmtmod:send-receive 'set-tests-state-status run-id (list run-id testnames currstate currstatus newstate newstatus))) (define (rmt:get-tests-for-run run-id testpatt states statuses offset limit not-in sort-by sort-order qryvals last-update mode) (assert (number? run-id) "FATAL: Run id required.") ;; (if (number? run-id) (rmtmod:send-receive 'get-tests-for-run run-id (list run-id testpatt states statuses offset limit not-in sort-by sort-order qryvals last-update mode))) @@ -177,17 +177,10 @@ (define (rmt:get-tests-for-run-mindata run-id testpatt states status not-in) (assert (number? run-id) "FATAL: Run id required.") (rmtmod:send-receive 'get-tests-for-run-mindata run-id (list run-id testpatt states status not-in))) -;; state and status are extra hints not usually used in the calculation -;; -(define (rmt:set-state-status-and-roll-up-items run-id test-name item-path state status comment) - (assert (number? run-id) "FATAL: Run id required.") - (rmtmod:send-receive 'set-state-status-and-roll-up-items run-id (list run-id test-name item-path state status comment))) - - ;;====================================================================== ;; Maintenance ;;====================================================================== @@ -203,83 +196,12 @@ (debug:print 2 *default-log-port* "ERROR: run-dir is " run-dir) #f ) (with-input-from-file infile read-lines) ))) - -;; select end_time-now from -;; (select testname,item_path,event_time+run_duration as -;; end_time,strftime('%s','now') as now from tests where state in -;; ('RUNNING','REMOTEHOSTSTART','LAUNCHED')); -;; -;; NOT EASY TO MIGRATE TO db{file,mod} -;; -(define (rmt:find-and-mark-incomplete-engine run-id ovr-deadtime cfg-deadtime test-stats-update-period) - (let* ((incompleted '()) - (oldlaunched '()) - (toplevels '()) - ;; The default running-deadtime is 720 seconds = 12 minutes. - ;; "(running-deadtime-default (+ server-start-allowance (* 2 launch-monitor-period)))" = 200 + (2 * (200 + 30 + 30)) - (deadtime-trim (or ovr-deadtime cfg-deadtime)) - (server-start-allowance 200) - (server-overloaded-budget 200) - (launch-monitor-off-time (or test-stats-update-period 30)) - (launch-monitor-on-time-budget 30) - (launch-monitor-period (+ launch-monitor-off-time launch-monitor-on-time-budget server-overloaded-budget)) - (remotehoststart-deadtime-default (+ server-start-allowance server-overloaded-budget 30)) - (remotehoststart-deadtime (or deadtime-trim remotehoststart-deadtime-default)) - (running-deadtime-default (+ server-start-allowance (* 2 launch-monitor-period))) - (running-deadtime (or deadtime-trim running-deadtime-default))) ;; two minutes (30 seconds between updates, this leaves 3x grace period) - - (debug:print-info 4 *default-log-port* "running-deadtime = " running-deadtime) - (debug:print-info 4 *default-log-port* "deadtime-trim = " deadtime-trim) - - (let* ((dat (rmt:get-toplevels-and-incompletes run-id running-deadtime remotehoststart-deadtime))) - (set! oldlaunched (list-ref dat 1)) - (set! toplevels (list-ref dat 2)) - (set! incompleted (list-ref dat 0))) - - (debug:print-info 18 *default-log-port* "Found " (length oldlaunched) " old LAUNCHED items, " - (length toplevels) " old LAUNCHED toplevel tests and " - (length incompleted) " tests marked RUNNING but apparently dead.") - - ;; These are defunct tests, do not do all the overhead of set-state-status. Force them to INCOMPLETE. - ;; - ;; (db:delay-if-busy dbdat) - (let* ((min-incompleted-ids (map car incompleted)) ;; do 'em all - (all-ids (append min-incompleted-ids (map car oldlaunched)))) - (if (> (length all-ids) 0) - (begin - ;; (launch:is-test-alive "localhost" 435) - (debug:print 0 *default-log-port* "WARNING: Marking test(s); " (string-intersperse (map conc all-ids) ", ") - " as DEAD") - (for-each - (lambda (test-id) - (let* ((tinfo (rmt:get-test-info-by-id run-id test-id)) - (run-dir (db:test-get-rundir tinfo)) - (host (db:test-get-host tinfo)) - (pid (db:test-get-process_id tinfo)) - (result (rmt:get-status-from-final-status-file run-dir))) - (if (and (list? result) (> (length result) 1) (equal? "PASS" (cadr result)) (equal? "COMPLETED" (car result))) - (begin - (debug:print 0 *default-log-port* "INFO: test " test-id " actually passed, so marking PASS not DEAD") - (rmt:set-state-status-and-roll-up-items - run-id test-id 'foo "COMPLETED" "PASS" - "Test stopped responding but it has PASSED; marking it PASS in the DB.")) - (let ((is-alive (and (not (eq? pid 0)) ;; 0 is default in re-used field "attemptnum" where pid stored. - (commonmod:is-test-alive host pid)))) - (if is-alive - (debug:print 0 *default-log-port* "INFO: test " test-id " on host " host - " has a process on pid " pid ", NOT setting to DEAD.") - (begin - (debug:print 0 *default-log-port* "INFO: test " test-id - " final state/status is not COMPLETED/PASS. It is " result) - (rmt:set-state-status-and-roll-up-items - run-id test-id 'foo "COMPLETED" "DEAD" - "Test stopped responding while in RUNNING or REMOTEHOSTSTART; presumed dead."))))))) - ;; call end of eud of run detection for posthook - from merge, is it needed? - ;; (launch:end-of-run-check run-id) - all-ids) - ))))) - + +(define (rmt:set-state-status-by-state-status run-id testname currstate currstatus newstate newstatus) + (rmtmod:send-receive 'set-state-status-by-state-status run-id (list run-id testname currstate currstatus newstate newstatus))) + + ) Index: runs.scm ================================================================== --- runs.scm +++ runs.scm @@ -814,11 +814,11 @@ (let ((reglen (configf:lookup *configdat* "setup" "runqueue"))) (if (> (length (hash-table-keys test-records)) 0) (let* ((keep-going #t) (run-queue-retries 5) (run-ids (rmt:get-all-run-ids))) - (for-each (lambda (run-id) + #;(for-each (lambda (run-id) (if keep-going (handle-exceptions exn (debug:print 0 *default-log-port* "error in calling find-and-mark-incomplete for run-id " run-id ", exn=" exn) (rmt:find-and-mark-incomplete run-id #f)))) ;; ovr-deadtime))) ;; could be root of https://hsdes.intel.com/appstore/article/#/220546828/main -- Title: Megatest jobs show DEAD even though they are still running (1.64/27) Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -737,11 +737,11 @@ (if (string? tmo) (let* ((num (string->number tmo))) (if num (* 3600 num) (common:hms-string->seconds tmo))) - 600 ;; this is the default + 10 ;; this is the default ))) (define (server:get-best-guess-address hostname) (let ((res #f)) (for-each Index: tcp-transportmod.scm ================================================================== --- tcp-transportmod.scm +++ tcp-transportmod.scm @@ -107,11 +107,11 @@ (last-serv-start 0) ) ;; parameters ;; -(define tt-server-timeout-param (make-parameter 600)) +(define tt-server-timeout-param (make-parameter 10)) ;; make ttdat visible (define *server-info* #f) (define (tt:make-remote areapath) @@ -126,10 +126,14 @@ (not run-id)) (equal? (dbfile:run-id->dbfname run-id) dbfname))) (tcp-buffer-size 2048) ;; (max-connections 4096) + +;;====================================================================== +;; Client side +;;====================================================================== ;; do all the busy work of finding and setting up conn for ;; connecting to a server ;; (define (tt:client-connect-to-server ttdat dbfname run-id testsuite) @@ -171,10 +175,11 @@ (thread-sleep! 0.5) (tt:client-connect-to-server ttdat dbfname run-id testsuite)) (else (let* ((curr-secs (current-seconds))) ;; rm the (last server) would go here + (hash-table-delete! (tt-conns ttdat) dbfname) (if (> (- curr-secs (tt-last-serv-start ttdat)) 10) (begin (tt-last-serv-start-set! ttdat curr-secs) (server-start-proc))) ;; start server if 30 sec since last attempt (thread-sleep! 1) @@ -184,10 +189,11 @@ (begin (debug:print-info 0 *default-log-port* "No server found. Starting one for run-id "run-id" in dbfile "dbfname) (server-start-proc) (tt-last-serv-start-set! ttdat (current-seconds)))) (thread-sleep! 1) + (hash-table-delete! (tt-conns ttdat) dbfname) (tt:client-connect-to-server ttdat dbfname run-id testsuite))))))) (define (tt:ping host port server-id #!optional (tries-left 5)) (let* ((res (tt:send-receive-direct host port `(ping #f #f #f) ping-mode: #t)) ;; please send me your server-id (try-again (lambda () Index: transport-mode.scm.template ================================================================== --- transport-mode.scm.template +++ transport-mode.scm.template @@ -13,10 +13,10 @@ ;; (dbfile:sync-method 'none) ;; (dbfile:cache-method 'none) ;; (rmt:transport-mode 'nfs) ;; uncomment this block to test with tcp and inmem -(dbfile:sync-method 'original) ;; attach) -(dbfile:cache-method 'inmem) +(dbfile:sync-method 'original) ;; 'original) ;; attach) +(dbfile:cache-method 'tmp) ;; 'inmem) (rmt:transport-mode 'tcp)