Index: TODO ================================================================== --- TODO +++ TODO @@ -1,4 +1,12 @@ -1. Confirm that branch transaction-for-sequential-writes content was added to trunk/development -2. Add a host chooser for ssh to launch-tests -3. Try making static executable +TODO +==== + +Migration to inmem db plus per run db +------------------------------------- + +. Re-work the dbstruct data structure? +.. Move main.db to global? +.. [ run-id.db inmemdb last-mod last-read last-sync inuse ] +. Re-work all queries to use run-id to dereference server +. Open main.db directly in calls to -runtests etc. No need to talk remote? Index: api.scm ================================================================== --- api.scm +++ api.scm @@ -71,27 +71,27 @@ (realparams (cddr params))) (db:with-db dbstruct run-id #t ;; these are all for modifying the db (lambda (db) (db:general-call db stmtname realparams))))) ((sync-inmem->db) (db:sync-touched dbstruct force-sync: #t)) - ((kill-server) - (db:sync-tables (db:tbls *inmemdb*) *inmemdb* *db*) ;; (db:sync-to *inmemdb* *db*) - (let ((hostname (car *runremote*)) - (port (cadr *runremote*)) - (pid (if (null? params) #f (car params))) - (th1 (make-thread (lambda ()(thread-sleep! 3)(debug:print 0 "Server exiting!")(exit 0)) "Server exit thread"))) - (debug:print 0 "WARNING: Server on " hostname ":" port " going down by user request!") - (debug:print-info 1 "current pid=" (current-process-id)) - (open-run-close tasks:server-deregister tasks:open-db - hostname - port: port) - (set! *server-run* #f) - (thread-sleep! 3) - (if pid - (process-signal pid signal/kill) - (thread-start! th1)) - '(#t "exit process started"))) + ;; ((kill-server) + ;; (db:sync-tables (db:tbls *inmemdb*) *inmemdb* *db*) ;; (db:sync-to *inmemdb* *db*) + ;; (let ((hostname (car *runremote*)) + ;; (port (cadr *runremote*)) + ;; (pid (if (null? params) #f (car params))) + ;; (th1 (make-thread (lambda ()(thread-sleep! 3)(debug:print 0 "Server exiting!")(exit 0)) "Server exit thread"))) + ;; (debug:print 0 "WARNING: Server on " hostname ":" port " going down by user request!") + ;; (debug:print-info 1 "current pid=" (current-process-id)) + ;; (open-run-close tasks:server-deregister tasks:open-db + ;; hostname + ;; port: port) + ;; (set! *server-run* #f) + ;; (thread-sleep! 3) + ;; (if pid + ;; (process-signal pid signal/kill) + ;; (thread-start! th1)) + ;; '(#t "exit process started"))) ((sdb-qry) (apply sdb:qry params)) ;; TESTMETA ((testmeta-get-record) (apply db:testmeta-get-record dbstruct params)) ((testmeta-add-record) (apply db:testmeta-add-record dbstruct params)) Index: client.scm ================================================================== --- client.scm +++ client.scm @@ -50,43 +50,48 @@ ;; 1. We are a test manager and we received *transport-type* and *runremote* via cmdline ;; 2. We are a run tests, list runs or other interactive process and we must figure out ;; *transport-type* and *runremote* from the monitor.db ;; ;; client:setup -(define (client:setup #!key (numtries 3)) +(define (client:setup run-id #!key (numtries 3)) (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: failed to find megatest.config, exiting") (exit)))) - (push-directory *toppath*) ;; This is probably NOT needed + ;; (push-directory *toppath*) ;; This is probably NOT needed ;; clients get the sdb:qry proc created here ;; (if (not sdb:qry) ;; (begin ;; (set! sdb:qry (make-sdb:qry (conc *toppath* "/db/strings.db"))) ;; we open the normalization helpers here ;; (sdb:qry 'setup #f))) - - (debug:print-info 11 "*transport-type* is " *transport-type* ", *runremote* is " *runremote*) - (let* ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) - (debug:print-info 11 "CLIENT SETUP, hostinfo=" hostinfo) - (set! *transport-type* (if hostinfo - (string->symbol (tasks:hostinfo-get-transport hostinfo)) - 'fs)) - (debug:print-info 11 "Using transport type of " *transport-type* (if hostinfo (conc " to connect to " hostinfo) "")) - (case *transport-type* - ;; ((fs)(if (not *megatest-db*)(set! *megatest-db* (open-db)))) - ((http) - (http-transport:client-connect (tasks:hostinfo-get-interface hostinfo) - (tasks:hostinfo-get-port hostinfo))) - ((zmq) - (zmq-transport:client-connect (tasks:hostinfo-get-interface hostinfo) - (tasks:hostinfo-get-port hostinfo) - (tasks:hostinfo-get-pubport hostinfo))) - (else ;; default to fs - (debug:print 0 "ERROR: unrecognised transport type " *transport-type* " exiting now.") - (exit))) - (pop-directory))) + (let ((hostinfo (and run-id (hash-table-ref/default *runremote* run-id #f)))) + (debug:print-info 11 "for run-id=" run-id ", *transport-type* is " *transport-type*) + (if hostinfo + hostinfo ;; have hostinfo - just return it + (let* ((hostinfo (open-run-close tasks:get-server tasks:open-db run-id)) + (transport (if hostinfo + (string->symbol (tasks:hostinfo-get-transport hostinfo)) + 'http))) + (hash-table-set! *runremote* run-id hostinfo) + (debug:print-info 11 "CLIENT SETUP, hostinfo=" hostinfo) + (debug:print-info 11 "Using transport type of " transport (if hostinfo (conc " to connect to " hostinfo) "")) + (case *transport-type* + ;; ((fs)(if (not *megatest-db*)(set! *megatest-db* (open-db)))) + ((http) + ;; this saves the hostinfo in the *runremote* hash and returns it + (http-transport:client-connect run-id + (tasks:hostinfo-get-interface hostinfo) + (tasks:hostinfo-get-port hostinfo))) + ((zmq) + (zmq-transport:client-connect (tasks:hostinfo-get-interface hostinfo) + (tasks:hostinfo-get-port hostinfo) + (tasks:hostinfo-get-pubport hostinfo))) + (else ;; default to fs + (debug:print 0 "ERROR: unrecognised transport type " *transport-type* " exiting now.") + (exit))))))) +;; (pop-directory))) ;; client:signal-handler (define (client:signal-handler signum) (handle-exceptions exn @@ -103,13 +108,16 @@ (thread-start! th2) (thread-start! th1) (thread-join! th2)))) ;; client:launch -(define (client:launch) +;; Need to set the signal handler somewhere other than here as this +;; routine will go away. +;; +(define (client:launch run-id) (set-signal-handler! signal/int client:signal-handler) - (if (client:setup) - (debug:print-info 2 "connected as client") - (begin - (debug:print 0 "ERROR: Failed to connect as client") - (exit)))) + (if (client:setup run-id) + (debug:print-info 2 "connected as client") + (begin + (debug:print 0 "ERROR: Failed to connect as client") + (exit)))) Index: common.scm ================================================================== --- common.scm +++ common.scm @@ -46,11 +46,11 @@ ;; SERVER (define *my-client-signature* #f) (define *transport-type* 'fs) (define *megatest-db* #f) (define *rpc:listener* #f) ;; if set up for server communication this will hold the tcp port -(define *runremote* #f) ;; if set up for server communication this will hold +(define *runremote* (make-hash-table)) ;; if set up for server communication this will hold (define *last-db-access* (current-seconds)) ;; update when db is accessed via server (define *max-cache-size* 0) (define *logged-in-clients* (make-hash-table)) (define *client-non-blocking-mode* #f) (define *server-id* #f) @@ -59,10 +59,11 @@ (define *received-response* #f) (define *default-numtries* 10) (define *server-run* #t) (define *db-write-access* #t) (define *inmemdb* #f) +(define *run-id* #f) (define *target* (make-hash-table)) ;; cache the target here; target is keyval1/keyval2/.../keyvalN (define *keys* (make-hash-table)) ;; cache the keys here (define *keyvals* (make-hash-table)) (define *toptest-paths* (make-hash-table)) ;; cache toptest path settings here Index: dashboard-tests.scm ================================================================== --- dashboard-tests.scm +++ dashboard-tests.scm @@ -140,11 +140,11 @@ ;;====================================================================== (define (run-info-panel db keydat testdat runname) (let* ((run-id (db:test-get-run_id testdat)) (rundat (db:get-run-info db run-id)) (header (db:get-header rundat)) - (event_time (db:get-value-by-header (db:get-row rundat) + (event_time (db:get-value-by-header (db:get-rows rundat) (db:get-header rundat) "event_time"))) (iup:frame #:title "Megatest Run Info" ; #:expand "YES" (iup:hbox ; #:expand "YES" @@ -462,11 +462,11 @@ (debug:print 2 "ERROR: No test data found for test " test-id ", exiting") (exit 1)) (let* (;; (run-id (if testdat (db:test-get-run_id testdat) #f)) (keydat (if testdat (db:get-key-val-pairs dbstruct run-id) #f)) (rundat (if testdat (db:get-run-info dbstruct run-id) #f)) - (runname (if testdat (db:get-value-by-header (db:get-row rundat) + (runname (if testdat (db:get-value-by-header (db:get-rows rundat) (db:get-header rundat) "runname") #f)) (tdb (tdb:open-test-db-by-test-id-local dbstruct run-id test-id)) ;; These next two are intentional bad values to ensure errors if they should not ;; get filled in properly. Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -170,11 +170,11 @@ (dbr:dbstruct-set-main! dbstruct db) db)))) ;; Make the dbstruct, setup up auxillary db's and call for main db at least once ;; -(define (db:setup #!key (local #f)) +(define (db:setup run-id #!key (local #f)) (let ((dbstruct (make-dbr:dbstruct path: *toppath* local: local))) (db:get-db dbstruct #f) ;; force one call to main ;; (if (not sdb:qry) ;; (begin ;; (set! sdb:qry (make-sdb:qry (conc *toppath* "/db/strings.db"))) ;; we open the normalization helpers here @@ -772,21 +772,25 @@ (db:get-db dbstruct #f) "SELECT fieldname FROM keys ORDER BY id DESC;"))) (set! *db-keys* res) res))) -;; +;; look up values in a header/data structure (define (db:get-value-by-header row header field) - (debug:print-info 4 "db:get-value-by-header row: " row " header: " header " field: " field) (if (null? header) #f (let loop ((hed (car header)) (tal (cdr header)) (n 0)) (if (equal? hed field) (vector-ref row n) (if (null? tal) #f (loop (car tal)(cdr tal)(+ n 1))))))) +;; Accessors for the header/data structure +;; get rows and header from +(define (db:get-header vec)(vector-ref vec 0)) +(define (db:get-rows vec)(vector-ref vec 1)) + ;;====================================================================== ;; R U N S ;;====================================================================== (define (db:get-run-name-from-id dbstruct run-id) @@ -975,11 +979,11 @@ ;; db:get-runs-by-patt ;; get runs by list of criteria ;; register a test run with the db ;; -;; Use: (db-get-value-by-header (db:get-header runinfo)(db:get-row runinfo)) +;; Use: (db:get-value-by-header (db:get-header runinfo)(db:get-rows runinfo)) ;; to extract info from the structure returned ;; (define (db:get-runs-by-patt dbstruct keys runnamepatt targpatt offset limit) ;; test-name) (let* ((tmp (runs:get-std-run-fields keys '("id" "runname" "state" "status" "owner" "event_time"))) (keystr (car tmp)) @@ -1013,11 +1017,11 @@ (db:get-db dbstruct #f) qry-str runnamepatt))) (vector header res))) -;; use (get-value-by-header (db:get-header runinfo)(db:get-row runinfo)) +;; use (get-value-by-header (db:get-header runinfo)(db:get-rows runinfo)) (define (db:get-run-info dbstruct run-id) ;;(if (hash-table-ref/default *run-info-cache* run-id #f) ;; (hash-table-ref *run-info-cache* run-id) (let* ((res (vector #f #f #f #f)) (keys (db:get-keys dbstruct)) Index: db_records.scm ================================================================== --- db_records.scm +++ db_records.scm @@ -109,14 +109,10 @@ (define-inline (db:test-set-state! vec val)(vector-set! vec 3 val)) (define-inline (db:test-set-status! vec val)(vector-set! vec 4 val)) (define-inline (db:test-set-run_duration! vec val)(vector-set! vec 12 val)) (define-inline (db:test-set-final_logf! vec val)(vector-set! vec 13 val)) -;; get rows and header from -(define-inline (db:get-header vec)(vector-ref vec 0)) -(define-inline (db:get-rows vec)(vector-ref vec 1)) - ;; make-vector-record "" db mintest id run_id testname state status event_time item_path ;; (define (make-db:mintest)(make-vector 7)) (define-inline (db:mintest-get-id vec) (vector-ref vec 0)) (define-inline (db:mintest-get-run_id vec) (vector-ref vec 1)) @@ -209,13 +205,10 @@ (define-inline (tdb:step-stable-set-start! vec val)(vector-set! vec 1 val)) (define-inline (tdb:step-stable-set-end! vec val)(vector-set! vec 2 val)) (define-inline (tdb:step-stable-set-status! vec val)(vector-set! vec 3 val)) (define-inline (tdb:step-stable-set-runtime! vec val)(vector-set! vec 4 val)) -;; use this one for db-get-run-info -(define-inline (db:get-row vec)(vector-ref vec 1)) - ;; The data structure for handing off requests via wire (define (make-cdb:packet)(make-vector 6)) (define-inline (cdb:packet-get-client-sig vec) (vector-ref vec 0)) (define-inline (cdb:packet-get-qtype vec) (vector-ref vec 1)) (define-inline (cdb:packet-get-immediate vec) (vector-ref vec 2)) Index: http-transport.scm ================================================================== --- http-transport.scm +++ http-transport.scm @@ -58,11 +58,11 @@ (string-intersperse (map number->string (u8vector->list (if res res (hostname->ip hostname)))) "."))) -(define (http-transport:run hostn) +(define (http-transport:run hostn run-id server-id) (debug:print 2 "Attempting to start the server ...") (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") @@ -141,34 +141,31 @@ ((equal? (uri-path (request-uri (current-request))) '(/ "hey")) (send-response body: "hey there!\n" headers: '((content-type text/plain)))) (else (continue)))))))) - (http-transport:try-start-server ipaddrstr start-port))) + (http-transport:try-start-server ipaddrstr start-port server-id))) ;; This is recursively run by http-transport:run until sucessful ;; -(define (http-transport:try-start-server ipaddrstr portnum) +(define (http-transport:try-start-server ipaddrstr portnum server-id) (handle-exceptions exn (begin (print-error-message exn) (if (< portnum 9000) (begin (debug:print 0 "WARNING: failed to start on portnum: " portnum ", trying next port") (thread-sleep! 0.1) - ;; (open-run-close tasks:remove-server-records tasks:open-db) - (open-run-close tasks:server-delete tasks:open-db ipaddrstr portnum) - (http-transport:try-start-server ipaddrstr (+ portnum 1))) + (http-transport:try-start-server ipaddrstr (+ portnum 1) server-id)) (print "ERROR: Tried and tried but could not start the server"))) ;; any error in following steps will result in a retry (set! *runremote* (list ipaddrstr portnum)) - ;; (open-run-close tasks:remove-server-records tasks:open-db) - (open-run-close tasks:server-register + (open-run-close tasks:server-set-interface-port tasks:open-db - (current-process-id) - ipaddrstr portnum 0 'startup 'http) + server-id + ipaddrstr portnum) (debug:print 1 "INFO: Trying to start server on " ipaddrstr ":" portnum) ;; This starts the spiffy server ;; NEED WAY TO SET IP TO #f TO BIND ALL (start-server bind-address: ipaddrstr port: portnum) (open-run-close tasks:server-delete tasks:open-db ipaddrstr portnum) @@ -373,61 +370,61 @@ (thread-join! th1) (thread-terminate! th2) (debug:print-info 11 "got res=" res) res))))) -(define (http-transport:client-connect iface port) +(define (http-transport:client-connect run-id iface port) (let* ((login-res #f) (uri-dat (make-request method: 'POST uri: (uri-reference (conc "http://" iface ":" port "/ctrl")))) (uri-api-dat (make-request method: 'POST uri: (uri-reference (conc "http://" iface ":" port "/api")))) (serverdat (list iface port uri-dat uri-api-dat))) - (set! *runremote* serverdat) ;; may or may not be good ... - (set! login-res (rmt:login)) + (hash-table-set! *runremote* run-id serverdat) ;; may or may not be good ... + (set! login-res (rmt:login run-id)) (if (and (list? login-res) (car login-res)) (begin (debug:print-info 2 "Logged in and connected to " iface ":" port) - (set! *runremote* serverdat) + (hash-table-set! *runremote* run-id serverdat) serverdat) (begin (debug:print-info 0 "ERROR: Failed to login or connect to " iface ":" port) (exit 1))))) -;; (set! *runremote* #f) -;; (set! *transport-type* 'fs) -;; #f)))) - ;; run http-transport:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; -(define (http-transport:keep-running) +(define (http-transport:keep-running server-id) ;; if none running or if > 20 seconds since ;; server last used then start shutdown ;; This thread waits for the server to come alive - (let* ((server-info (let loop () + (let* ((server-info (let loop ((start-time (current-seconds)) + (changed #t) + (last-sdat "not this")) (let ((sdat #f)) (mutex-lock! *heartbeat-mutex*) (set! sdat *runremote*) (mutex-unlock! *heartbeat-mutex*) - (if sdat + (if (and sdat + (not changed) + (> (- (current-seconds) start-time) 2)) sdat (begin (sleep 4) - (loop)))))) + (loop start-time + (equal? sdat last-sdat) + sdat)))))) (iface (car server-info)) (port (cadr server-info)) (last-access 0) (tdb (tasks:open-db)) - (spid ;;(open-run-close tasks:server-get-server-id tasks:open-db #f iface port #f)) - (tasks:server-get-server-id tdb #f iface port #f)) (server-timeout (let ((tmo (config-lookup *configdat* "server" "timeout"))) (if (and (string? tmo) (string->number tmo)) (* 60 60 (string->number tmo)) ;; default to three days (* 3 24 60 60))))) - (debug:print-info 2 "server-timeout: " server-timeout ", server pid: " spid " on " iface ":" port) + (tasks:server-set-state! tdb server-id "running") (let loop ((count 0)) ;; Use this opportunity to sync the inmemdb to db (let ((start-time (current-milliseconds)) (sync-time #f) (rem-time #f)) @@ -449,20 +446,19 @@ (mutex-lock! *heartbeat-mutex*) (set! sdat *runremote*) (mutex-unlock! *heartbeat-mutex*) (if (or (not (equal? sdat (list iface port))) - (not spid)) + (not server-id)) (begin (debug:print-info 0 "interface changed, refreshing iface and port info") (set! iface (car sdat)) - (set! port (cadr sdat)) - (set! spid (tasks:server-get-server-id tdb #f iface port #f)))) + (set! port (cadr sdat)))) ;; NOTE: Get rid of this mechanism! It really is not needed... ;; (open-run-close tasks:server-update-heartbeat tasks:open-db spid) - (tasks:server-update-heartbeat tdb spid) + (tasks:server-update-heartbeat tdb server-id) ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access ;; Transfer *last-db-access* to last-access to use in checking that we are still alive (mutex-lock! *heartbeat-mutex*) @@ -479,12 +475,12 @@ (begin (debug:print-info 0 "Starting to shutdown the server.") ;; need to delete only *my* server entry (future use) (set! *time-to-exit* #t) (if *inmemdb* (db:sync-touched *inmemdb* force-sync: #t)) - (open-run-close tasks:server-deregister-self tasks:open-db (get-host-name)) - (thread-sleep! 1) + ( tasks:server-set-state! tdb server-id "shutting-down") + (thread-sleep! 5) (debug:print-info 0 "Max cached queries was " *max-cache-size*) (debug:print-info 0 "Number of cached writes " *number-of-writes*) (debug:print-info 0 "Average cached write time " (if (eq? *number-of-writes* 0) "n/a (no writes)" @@ -497,36 +493,42 @@ "n/a (no queries)" (/ *total-non-write-delay* *number-non-write-queries*)) " ms") (debug:print-info 0 "Server shutdown complete. Exiting") + (tasks:server-delete-record! tdb server-id) (exit)))))) ;; all routes though here end in exit ... -(define (http-transport:launch) +(define (http-transport:launch run-id) + (set! *run-id* run-id) (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: cannot find megatest.config, exiting") (exit)))) (debug:print-info 2 "Starting the standalone server") (if (args:get-arg "-daemonize") (daemon:ize)) - (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) - (debug:print 11 "http-transport:launch hostinfo=" hostinfo) - ;; #(1 "143.182.207.24" 5736 -1 "http" 22771 "hostname") - (if hostinfo - (debug:print-info 2 "NOT starting new server, one is already running on " (vector-ref hostinfo 1) ":" (vector-ref hostinfo 2)) + (let ((server-id (open-run-close tasks:server-lock-slot tasks:open-db run-id))) + (if (not server-id) + (begin + (debug:print-info 2 "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue") + (open-run-close tasks:server-delete-records-for-this-pid tasks:open-db)) (if *toppath* (let* ((th2 (make-thread (lambda () (http-transport:run (if (args:get-arg "-server") (args:get-arg "-server") - "-"))) "Server run")) - (th3 (make-thread http-transport:keep-running "Keep running"))) + "-") + run-id + server-id)) "Server run")) + (th3 (make-thread (lambda () + (http-transport:keep-running server-id)) + "Keep running"))) ;; Database connection - (set! *inmemdb* (db:setup)) + (set! *inmemdb* (db:setup run-id)) (thread-start! th2) (thread-start! th3) (set! *didsomething* #t) (thread-join! th2)) (debug:print 0 "ERROR: Failed to setup for megatest"))) Index: launch.scm ================================================================== --- launch.scm +++ launch.scm @@ -475,11 +475,11 @@ ;; ;; - [ - ] ;; (define (create-work-area run-id run-info keyvals test-id test-src-path disk-path testname itemdat) (let* ((item-path (item-list->path itemdat)) - (runname (db:get-value-by-header (db:get-row run-info) + (runname (db:get-value-by-header (db:get-rows run-info) (db:get-header run-info) "runname")) ;; convert back to db: from rdb: - this is always run at server end (target (string-intersperse (map cadr keyvals) "/")) Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -30,10 +30,11 @@ ;; (declare (uses sdb)) ;; (declare (uses filedb)) (declare (uses tdb)) (declare (uses mt)) (declare (uses api)) +(declare (uses tasks)) ;; only used for debugging. (define *db* #f) ;; this is only for the repl, do not use in general!!!! (include "common_records.scm") (include "key_records.scm") @@ -201,10 +202,11 @@ "-gen-megatest-test" "-override-timeout" "-test-files" ;; -test-paths is for listing all "-load" ;; load and exectute a scheme file "-dumpmode" + "-run-id" ) (list "-h" "-version" "-force" "-xterm" @@ -343,13 +345,17 @@ ;; Server? Start up here. ;; (let ((tl (setup-for-run)) (transport (or (configf:lookup *configdat* "setup" "transport") - (args:get-arg "-transport" "http")))) - (debug:print 2 "Launching server using transport " transport) - (server:launch (string->symbol transport))) + (args:get-arg "-transport" "http"))) + (run-id (and (args:get-arg "-run-id") + (string->number (args:get-arg "-run-id"))))) + (debug:print 2 "Launching server using transport " transport " for run-id=" run-id) + (if run-id + (server:launch (string->symbol transport) run-id) + (debug:print 0 "ERROR: server requires run-id be specified with -run-id"))) ;; Not a server? This section will decide how to communicate ;; ;; Setup client for all expect listed here (if (null? (lset-intersection @@ -358,11 +364,12 @@ '("-list-servers" "-stop-server" "-show-cmdinfo" "-list-runs"))) (if (setup-for-run) - (begin + (let ((run-id (and (args:get-arg "-run-id") + (string->number (args:get-arg "-run-id"))))) ;; (set! *fdb* (filedb:open-db (conc *toppath* "/db/paths.db"))) ;; if not list or kill then start a client (if appropriate) (if (or (args-defined? "-h" "-version" "-gen-megatest-area" "-gen-megatest-test") (eq? (length (hash-table-keys args:arg-hash)) 0)) (debug:print-info 1 "Server connection not needed") @@ -383,14 +390,12 @@ "fs")))) (debug:print 2 "chosen-transport: " chosen-transport " have; config=" transport-from-config ", cmdln=" transport-from-cmdln ", cmdinfo=" transport-from-cmdinfo) (case chosen-transport ((http) (set! *transport-type 'http) - (server:ensure-running) - ;; Get rid of this - - (client:launch)) + (if run-id (server:ensure-running run-id)) + (client:launch run-id)) (else ;; (fs) (debug:print 0 "ERROR: Should NOT be getting here! fs transport is no longer supported") (set! *transport-type* 'fs) (set! *megatest-db* (make-dbr:dbstruct path: *toppath* local: #t)))))))))) Index: mt.scm ================================================================== --- mt.scm +++ mt.scm @@ -37,11 +37,11 @@ ;; runs:get-runs-by-patt ;; get runs by list of criteria ;; register a test run with the db ;; -;; Use: (db-get-value-by-header (db:get-header runinfo)(db:get-row runinfo)) +;; Use: (db-get-value-by-header (db:get-header runinfo)(db:get-rows runinfo)) ;; to extract info from the structure returned ;; (define (mt:get-runs-by-patt keys runnamepatt targpatt) (let loop ((runsdat (rmt:get-runs-by-patt keys runnamepatt targpatt 0 500)) (res '()) Index: rmt.scm ================================================================== --- rmt.scm +++ rmt.scm @@ -35,24 +35,24 @@ ;;====================================================================== ;; cmd is a symbol ;; vars is a json string encoding the parameters for the call ;; -(define (rmt:send-receive cmd params) +(define (rmt:send-receive cmd run-id params) (case *transport-type* ((fs-aint-here) (debug:print 0 "ERROR: Not yet (re)supported") (exit 1)) ((fs http) - (let* ((jparams (db:obj->string params)) ;; (rmt:dat->json-str params)) - (res (http-transport:client-api-send-receive *runremote* cmd jparams))) + (let* ((connection-info (client:setup run-id)) + (jparams (db:obj->string params)) ;; (rmt:dat->json-str params)) + (res (http-transport:client-api-send-receive connection-info cmd jparams))) (if res (db:string->obj res) ;; (rmt:json-str->dat res) (begin (debug:print 0 "ERROR: Bad value from http-transport:client-api-send-receive " res) - #f)) - )) + #f)))) (else (debug:print 0 "ERROR: Transport " *transport-type* " not yet (re)supported") (exit 1)))) ;; Wrap json library for strings (why the ports crap in the first place?) @@ -74,56 +74,58 @@ ;;====================================================================== ;; M I S C ;;====================================================================== -(define (rmt:login) - (rmt:send-receive 'login (list *toppath* megatest-version *my-client-signature*))) +(define (rmt:login run-id) + (rmt:send-receive 'login run-id (list *toppath* megatest-version *my-client-signature*))) -(define (rmt:kill-server) - (rmt:send-receive 'kill-server '())) +(define (rmt:kill-server run-id) + (rmt:send-receive 'kill-server run-id (list run-id))) ;; hand off a call to one of the db:queries statements ;; added run-id to make looking up the correct db possible ;; (define (rmt:general-call stmtname run-id . params) - (rmt:send-receive 'general-call (append (list stmtname run-id) params))) + (rmt:send-receive 'general-call run-id (append (list stmtname run-id) params))) -(define (rmt:sync-inmem->db) - (rmt:send-receive 'sync-inmem->db '())) +(define (rmt:sync-inmem->db run-id) + (rmt:send-receive 'sync-inmem->db run-id '())) -(define (rmt:sdb-qry qry val) +(define (rmt:sdb-qry qry val run-id) ;; add caching if qry is 'getid or 'getstr - (rmt:send-receive 'sdb-qry (list qry val))) + (rmt:send-receive 'sdb-qry run-id (list qry val))) ;;====================================================================== ;; K E Y S ;;====================================================================== +;; These should not require run-id but it is more consistent to have it. +;; run-id can theoretically be #f but how to handle that is not yet done. (define (rmt:get-key-val-pairs run-id) - (rmt:send-receive 'get-key-val-pairs (list run-id))) + (rmt:send-receive 'get-key-val-pairs run-id (list run-id))) -(define (rmt:get-keys) - (rmt:send-receive 'get-keys '())) +(define (rmt:get-keys run-id) + (rmt:send-receive 'get-keys run-id '())) ;;====================================================================== ;; T E S T S ;;====================================================================== (define (rmt:get-test-id run-id testname item-path) - (rmt:send-receive 'get-test-id (list run-id testname item-path))) + (rmt:send-receive 'get-test-id run-id (list run-id testname item-path))) (define (rmt:get-test-info-by-id run-id test-id) (if (and (number? run-id)(number? test-id)) - (rmt:send-receive 'get-test-info-by-id (list run-id test-id)) + (rmt:send-receive 'get-test-info-by-id run-id (list run-id test-id)) (begin (debug:print 0 "ERROR: Bad data handed to rmt:get-test-info-by-id run-id=" run-id ", test-id=" test-id) (print-call-chain) #f))) (define (rmt:test-get-rundir-from-test-id run-id test-id) - (rmt:send-receive 'test-get-rundir-from-test-id (list run-id test-id))) + (rmt:send-receive 'test-get-rundir-from-test-id run-id (list run-id test-id))) (define (rmt:open-test-db-by-test-id run-id test-id #!key (work-area #f)) (let* ((test-path (if (string? work-area) work-area (rmt:test-get-rundir-from-test-id run-id test-id)))) @@ -130,88 +132,93 @@ (debug:print 3 "TEST PATH: " test-path) (open-test-db test-path))) ;; WARNING: This currently bypasses the transaction wrapped writes system (define (rmt:test-set-state-status-by-id run-id test-id newstate newstatus newcomment) - (rmt:send-receive 'test-set-state-status-by-id (list run-id test-id newstate newstatus newcomment))) + (rmt:send-receive 'test-set-state-status-by-id run-id (list run-id test-id newstate newstatus newcomment))) (define (rmt:set-tests-state-status run-id testnames currstate currstatus newstate newstatus) - (rmt:send-receive 'set-tests-state-status (list run-id testnames currstate currstatus newstate newstatus))) + (rmt:send-receive 'set-tests-state-status run-id (list run-id testnames currstate currstatus newstate newstatus))) (define (rmt:get-tests-for-run run-id testpatt states statuses offset limit not-in sort-by sort-order qryvals) (if (number? run-id) - (rmt:send-receive 'get-tests-for-run (list run-id testpatt states statuses offset limit not-in sort-by sort-order qryvals)) + (rmt:send-receive 'get-tests-for-run run-id (list run-id testpatt states statuses offset limit not-in sort-by sort-order qryvals)) (begin (debug:print "ERROR: rmt:get-tests-for-run called with bad run-id=" run-id) (print-call-chain) '()))) (define (rmt:get-tests-for-runs-mindata run-ids testpatt states status not-in) - (rmt:send-receive 'get-tests-for-runs-mindata (list run-ids testpatt states status not-in))) + (apply append (map (lambda (run-id) + (rmt:send-receive 'get-tests-for-run-mindata run-id (list run-ids testpatt states status not-in))) + run-ids))) (define (rmt:delete-test-records run-id test-id) - (rmt:send-receive 'delete-test-records (list run-id test-id))) + (rmt:send-receive 'delete-test-records run-id (list run-id test-id))) (define (rmt:test-set-status-state run-id test-id status state msg) - (rmt:send-receive 'test-set-status-state (list run-id test-id status state msg))) + (rmt:send-receive 'test-set-status-state run-id (list run-id test-id status state msg))) (define (rmt:get-previous-test-run-record run-id test-name item-path) - (rmt:send-receive 'get-previous-test-run-record (list run-id test-name item-path))) + (rmt:send-receive 'get-previous-test-run-record run-id (list run-id test-name item-path))) (define (rmt:get-matching-previous-test-run-records run-id test-name item-path) - (rmt:send-receive 'get-matching-previous-test-run-records (list run-id test-name item-path))) + (rmt:send-receive 'get-matching-previous-test-run-records run-id (list run-id test-name item-path))) (define (rmt:test-get-logfile-info run-id test-name) - (rmt:send-receive 'test-get-logfile-info (list run-id test-name))) + (rmt:send-receive 'test-get-logfile-info run-id (list run-id test-name))) (define (rmt:test-get-records-for-index-file run-id test-name) - (rmt:send-receive 'test-get-records-for-index-file (list run-id test-name))) + (rmt:send-receive 'test-get-records-for-index-file run-id (list run-id test-name))) (define (rmt:get-testinfo-state-status run-id test-id) - (rmt:send-receive 'get-testinfo-state-status (list run-id test-id))) + (rmt:send-receive 'get-testinfo-state-status run-id (list run-id test-id))) (define (rmt:test-set-log! run-id test-id logf) (if (string? logf)(rmt:general-call 'test-set-log run-id logf test-id))) (define (rmt:test-get-paths-matching-keynames-target-new keynames target res testpatt statepatt statuspatt runname) - (rmt:send-receive 'test-get-paths-matching-keynames-target-new (list keynames target res testpatt statepatt statuspatt runname))) + (let ((run-ids (rmt:get-run-ids-matching keynames target res))) + (apply append (lambda (run-id) + (rmt:send-receive 'test-get-paths-matching-keynames-target-new (list keynames target res testpatt statepatt statuspatt runname))) + run-ids))) (define (rmt:get-prereqs-not-met run-id waitons ref-item-path #!key (mode 'normal)) - (rmt:send-receive 'get-prereqs-not-met (list run-id waitons ref-item-path mode))) + (rmt:send-receive 'get-prereqs-not-met run-id (list run-id waitons ref-item-path mode))) (define (rmt:get-count-tests-running-for-run-id run-id) - (rmt:send-receive 'get-count-tests-running-for-run-id (list run-id))) + (rmt:send-receive 'get-count-tests-running-for-run-id run-id (list run-id))) ;; Statistical queries (define (rmt:get-count-tests-running run-id) - (rmt:send-receive 'get-count-tests-running (list run-id))) + (rmt:send-receive 'get-count-tests-running run-id (list run-id))) (define (rmt:get-count-tests-running-in-jobgroup run-id jobgroup) - (rmt:send-receive 'get-count-tests-running-in-jobgroup (list run-id jobgroup))) + (rmt:send-receive 'get-count-tests-running-in-jobgroup run-id (list run-id jobgroup))) (define (rmt:roll-up-pass-fail-counts run-id test-name item-path status) - (rmt:send-receive 'roll-up-pass-fail-counts (list run-id test-name item-path status))) + (rmt:send-receive 'roll-up-pass-fail-counts run-id (list run-id test-name item-path status))) (define (rmt:update-pass-fail-counts run-id test-name) - (rmt:general-call 'update-fail-pass-counts (list run-id test-name run-id test-name run-id test-name))) + (rmt:general-call 'update-fail-pass-counts run-id (list run-id test-name run-id test-name run-id test-name))) ;;====================================================================== ;; R U N S ;;====================================================================== (define (rmt:get-run-info run-id) - (rmt:send-receive 'get-run-info (list run-id))) + (rmt:send-receive 'get-run-info run-id (list run-id))) (define (rmt:register-run keyvals runname state status user) (rmt:send-receive 'register-run (list keyvals runname state status user))) (define (rmt:get-run-name-from-id run-id) - (rmt:send-receive 'get-run-name-from-id (list run-id))) + (rmt:send-receive 'get-run-name-from-id run-id (list run-id))) (define (rmt:delete-run run-id) - (rmt:send-receive 'delete-run (list run-id))) + (rmt:send-receive 'delete-run run-id (list run-id))) (define (rmt:delete-old-deleted-test-records) (rmt:send-receive 'delete-old-deleted-test-records '())) (define (rmt:get-runs runpatt count offset keypatts) @@ -237,30 +244,30 @@ ;; 2. Open the testdat.db file and do the query ;; If not given the work area ;; 1. Do a remote call to get the test path ;; 2. Continue as above ;; -(define (rmt:get-steps-for-test test-id) - (rmt:send-receive 'get-steps-data (list test-id))) +(define (rmt:get-steps-for-test run-id test-id) + (rmt:send-receive 'get-steps-data run-id (list test-id))) (define (rmt:teststep-set-status! run-id test-id teststep-name state-in status-in comment logfile) (let* ((state (items:check-valid-items "state" state-in)) (status (items:check-valid-items "status" status-in))) (if (or (not state)(not status)) (debug:print 3 "WARNING: Invalid " (if status "status" "state") " value \"" (if status state-in status-in) "\", update your validvalues section in megatest.config")) - (rmt:send-receive 'teststep-set-status! (list run-id test-id teststep-name state-in status-in comment logfile)))) + (rmt:send-receive 'teststep-set-status! run-id (list run-id test-id teststep-name state-in status-in comment logfile)))) -(define (rmt:get-steps-for-test test-id) - (rmt:send-receive 'get-steps-for-test (list test-id))) +(define (rmt:get-steps-for-test run-id test-id) + (rmt:send-receive 'get-steps-for-test run-id (list test-id))) ;;====================================================================== ;; T E S T D A T A ;;====================================================================== -(define (rmt:read-test-data test-id categorypatt #!key (work-area #f)) - (let ((tdb (rmt:open-test-db-by-test-id test-id work-area: work-area))) +(define (rmt:read-test-data run-id test-id categorypatt #!key (work-area #f)) + (let ((tdb (rmt:open-test-db-by-test-id run-id test-id work-area: work-area))) (if tdb (tdb:read-test-data tdb test-id categorypatt) '()))) (define (rmt:testmeta-add-record testname) @@ -271,9 +278,9 @@ (define (rmt:testmeta-update-field test-name fld val) (rmt:send-receive 'testmeta-update-field (list test-name fld val))) (define (rmt:test-data-rollup run-id test-id status) - (rmt:send-receive 'test-data-rollup (list run-id test-id status))) + (rmt:send-receive 'test-data-rollup run-id (list run-id test-id status))) (define (rmt:csv->test-data run-id test-id csvdata) - (rmt:send-receive 'csv->test-data (list run-id test-id csvdata))) + (rmt:send-receive 'csv->test-data run-id (list run-id test-id csvdata))) Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -42,22 +42,22 @@ ;; Call this to start the actual server ;; ;; all routes though here end in exit ... -(define (server:launch transport) +(define (server:launch transport run-id) (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: cannot find megatest.config, exiting") (exit)))) (debug:print-info 2 "Starting server using " transport " transport") (set! *transport-type* transport) (case transport ;; ((fs) (exit)) ;; there is no "fs" server transport - ((fs http) (http-transport:launch)) - ((zmq) (zmq-transport:launch)) + ((fs http) (http-transport:launch run-id)) + ((zmq) (zmq-transport:launch run-id)) (else (debug:print "WARNING: unrecognised transport " transport) (exit)))) ;;====================================================================== @@ -117,19 +117,19 @@ (send-message pub-socket (db:obj->string (vector success/fail query-sig result))))) (else (debug:print 0 "ERROR: unrecognised transport type: " *transport-type*) result))) -(define (server:ensure-running) - (let loop ((servers (open-run-close tasks:get-best-server tasks:open-db)) +(define (server:ensure-running run-id) + (let loop ((servers (open-run-close tasks:get-server tasks:open-db run-id)) (trycount 0)) (if (or (not servers) (null? servers)) (begin (if (even? trycount) ;; just do the server start every other time through this loop (every 8 seconds) (let ((cmdln (conc (if (getenv "MT_MEGATEST") (getenv "MT_MEGATEST") "megatest") - " -server - -daemonize"))) + " -server - -daemonize -run-id " run-id))) (debug:print 0 "INFO: Starting server (" cmdln ") as none running ...") ;; (server:launch (string->symbol (args:get-arg "-transport" "http")))) ;; no need to use fork, no need to do the list-servers trick. Just start the damn server, it will exit on it's own ;; if there is an existing server (system cmdln) Index: tasks.scm ================================================================== --- tasks.scm +++ tasks.scm @@ -64,12 +64,12 @@ priority INTEGER, state TEXT, mt_version TEXT, heartbeat TIMESTAMP, transport TEXT, - run_id INTEGER, - CONSTRAINT servers_constraint UNIQUE (pid,hostname,port));") + run_id INTEGER);") +;; CONSTRAINT servers_constraint UNIQUE (pid,hostname,port));") (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS clients (id INTEGER PRIMARY KEY, server_id INTEGER, pid INTEGER, hostname TEXT, cmdline TEXT, @@ -91,67 +91,86 @@ (define (tasks:hostinfo-get-pubport vec) (vector-ref vec 3)) (define (tasks:hostinfo-get-transport vec) (vector-ref vec 4)) (define (tasks:hostinfo-get-pid vec) (vector-ref vec 5)) (define (tasks:hostinfo-get-hostname vec) (vector-ref vec 6)) -;; state: 'live, 'shutting-down, 'dead -(define (tasks:server-register mdb pid interface port priority state transport #!key (pubport -1)) - (debug:print-info 11 "tasks:server-register " pid " " interface " " port " " priority " " state) +(define (tasks:server-lock-slot mdb run-id) + (let ((res '()) + (best #f)) + (tasks:server-clean-out-old-records-for-run-id mdb run-id) + (tasks:server-set-available mdb run-id) + (thread-sleep! 2) ;; Try removing this. It may not be needed. + (tasks:server-am-i-the-server? mdb run-id))) + +;; register that this server may come online (first to register goes though with the process) +(define (tasks:server-set-available mdb run-id) (sqlite3:execute mdb - "INSERT OR REPLACE INTO servers (pid,hostname,port,pubport,start_time,priority,state,mt_version,heartbeat,interface,transport) - VALUES(?, ?, ?, ?, strftime('%s','now'), ?, ?, ?, strftime('%s','now'),?,?);" - pid (get-host-name) port pubport priority (conc state) - (common:version-signature) - interface - (conc transport)) - (vector - (tasks:server-get-server-id mdb (get-host-name) interface port pid) - interface - port - pubport - transport + "INSERT INTO servers (pid,hostname,port,pubport,start_time, priority,state,mt_version,heartbeat, interface,transport,run_id) + VALUES(?, ?, ?, ?, strftime('%s','now'), ?, ?, ?, strftime('%s','now'),?, ?, ?);" + (current-process-id) ;; pid + (get-host-name) ;; hostname + -1 ;; port + -1 ;; pubport + (random 1000) ;; priority (used a tiebreaker on get-available) + "available" ;; state + (common:version-signature) ;; mt_version + -1 ;; interface + "http" ;; transport + run-id )) -;; NB// two servers with same pid on different hosts will be removed from the list if pid: is used! -(define (tasks:server-deregister mdb hostname #!key (port #f)(pid #f)(action 'delete)) - (debug:print-info 11 "server-deregister " hostname ", port " port ", pid " pid) - (if *db-write-access* - (if pid - (case action - ((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid)) - (else (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE pid=?;" pid))) - (if port - (case action - ((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE (interface=? or hostname=?) AND port=?;" hostname hostname port)) - (else (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE (interface=? or hostname=?) AND port=?;" hostname hostname port))) - (debug:print 0 "ERROR: tasks:server-deregister called with neither pid nor port specified"))))) - -(define (tasks:server-deregister-self mdb hostname) - (tasks:server-deregister mdb hostname pid: (current-process-id))) - -;; need a simple call for robustly removing records given host and port -(define (tasks:server-delete mdb hostname port) - (tasks:server-deregister mdb hostname port: port action: 'delete)) - -(define (tasks:server-get-server-id mdb hostname iface port pid) - (debug:print-info 12 "tasks:server-get-server-id " mdb " " hostname " " iface " " port " " pid) - (let ((res #f)) +(define (tasks:server-clean-out-old-records-for-run-id mdb run-id) + (sqlite3:execute mdb "DELETE FROM servers WHERE state='available' AND (strftime('%s','now') - start_time) > 30 AND run_id=?;" run-id) + (sqlite3:execute mdb "DELETE FROM servers WHERE state='running' AND (strftime('%s','now') - heartbeat) > 10 AND run_id=?;" run-id)) + + +(define (tasks:server-set-state! mdb server-id state) + (sqlite3:execute mdb "UPDATE servers SET state=? WHERE id=?;" state server-id)) + +(define (tasks:server-delete-record! mdb server-id) + (sqlite3:execute mdb "DELETE FROM servers WHERE id=?;" server-id)) + +(define (tasks:server-delete-records-for-this-pid mdb) + (sqlite3:execute mdb "DELETE FROM servers WHERE hostname=? AND pid=?;" (get-host-name) (current-process-id))) + +(define (tasks:server-set-interface-port mdb server-id interface port) + (sqlite3:execute mdb "UPDATE servers SET interface=?,port=? WHERE id=?;" interface port server-id)) + +(define (tasks:server-am-i-the-server? mdb run-id) + (let* ((all (tasks:server-get-servers-vying-for-run-id mdb run-id)) + (first (if (null? all) + (begin (debug:print 0 "ERROR: no servers listed, should be at least one by now.") + (sqlite3:finalize! mdb) + (exit 1)) + (car (db:get-rows all)))) + (header (db:get-header all)) + (id (db:get-value-by-header first header "id")) + (hostname (db:get-value-by-header first header "hostname")) + (pid (db:get-value-by-header first header "pid")) + (priority (db:get-value-by-header first header "priority"))) + (debug:print 0 "INFO: am-i-the-server got record " first) + ;; for now a basic check. add tiebreaking by priority later + (if (and (equal? hostname (get-host-name)) + (equal? pid (current-process-id))) + id + #f))) + +;; Use: (db:get-value-by-header (car (db:get-rows dat)) (db:get-header dat) "fieldname") +;; to extract info from the structure returned +;; +(define (tasks:server-get-servers-vying-for-run-id mdb run-id) + (let* ((header (list "id" "hostname" "pid" "interface" "port" "pubport" "state" "run_id" "priority" "start_time")) + (selstr (string-intersperse header ",")) + (res '())) (sqlite3:for-each-row - (lambda (id) - (set! res id)) + (lambda (a . b) + (set! res (cons (apply vector a b) res))) mdb - (cond - ((and hostname pid) "SELECT id FROM servers WHERE hostname=? AND pid=?;") - ((and iface port) "SELECT id FROM servers WHERE interface=? AND port=?;") - ((and hostname port) "SELECT id FROM servers WHERE hostname=? AND port=?;") - (else - (begin - (debug:print 0 "ERROR: tasks:server-get-server-id needs (hostname and pid) OR (iface and port) OR (hostname and port)") - "SELECT id FROM servers WHERE pid=-999;"))) - (if hostname hostname iface)(if pid pid port)) - res)) + (conc "SELECT " selstr " FROM servers WHERE run_id=? ORDER BY start_time DESC;") + run-id) + (vector header res))) (define (tasks:server-update-heartbeat mdb server-id) (debug:print-info 1 "Heart beat update of server id=" server-id) (handle-exceptions exn @@ -171,90 +190,22 @@ (lambda (delta) (set! heartbeat-delta delta)) mdb "SELECT strftime('%s','now')-heartbeat FROM servers WHERE id=?;" server-id) (< heartbeat-delta 10))) -(define (tasks:client-register mdb pid hostname cmdline) - (sqlite3:execute - mdb - "INSERT OR REPLACE INTO clients (server_id,pid,hostname,cmdline,login_time) VALUES(?,?,?,?,strftime('%s','now'));") - (tasks:server-get-server-id mdb hostname #f #f pid) - pid hostname cmdline) - -(define (tasks:client-logout mdb pid hostname cmdline) - (sqlite3:execute - mdb - "UPDATE clients SET logout_time=strftime('%s','now') WHERE pid=? AND hostname=? AND cmdline=?;" - pid hostname cmdline)) - -(define (tasks:get-logged-in-clients mdb server-id) - (let ((res '())) - (sqlite3:for-each-row - (lambda (id server-id pid hostname cmdline login-time logout-time) - (set! res (cons (vector id server-id pid hostname cmdline login-time lougout-time) res))) - mdb - "SELECT id,server_id,pid,hostname,cmdline,login_time,logout_time FROM clients WHERE server_id=?;" - server-id))) - -(define (tasks:have-clients? mdb server-id) - (null? (tasks:get-logged-in-clients mdb server-id))) - -;; ping each server in the db and return first found that responds. -;; remove any others. will not necessarily remove all! -(define (tasks:get-best-server mdb) - (let ((res '()) - (best #f) - (transport (if (and *transport-type* - (not (eq? *transport-type* 'fs))) - (conc *transport-type*) - "%"))) +(define (tasks:get-server mdb run-id) + (let ((res #f) + (best #f)) (sqlite3:for-each-row (lambda (id interface port pubport transport pid hostname) - (set! res (cons (vector id interface port pubport transport pid hostname) res)) - ;;(debug:print-info 2 "Found existing server " hostname ":" port " registered in db")) - ) + (set! res (vector id interface port pubport transport pid hostname))) mdb - "SELECT id,interface,port,pubport,transport,pid,hostname FROM servers WHERE strftime('%s','now')-heartbeat < 10 - AND mt_version=? AND transport LIKE ? - ORDER BY start_time DESC LIMIT 1;" (common:version-signature) transport) - ;; for now we are keeping only one server registered in the db, return #f or first server found - (if (null? res) #f (car res)))) - -;; BUG: This logic is probably needed unless methodology changes completely... -;; -;; (if (null? res) #f -;; (let loop ((hed (car res)) -;; (tal (cdr res))) -;; ;; (print "hed=" hed ", tal=" tal) -;; (let* ((host (list-ref hed 0)) -;; (iface (list-ref hed 1)) -;; (port (list-ref hed 2)) -;; (pid (list-ref hed 4)) -;; (alive (open-run-close tasks:server-alive? tasks:open-db #f hostname: host port: port))) -;; (if alive -;; (begin -;; (debug:print-info 2 "Found an existing, alive, server " host ", " port ".") -;; (list host iface port)) -;; (begin -;; (debug:print-info 1 "Marking " host ":" port " as dead in server registry.") -;; (if port -;; (open-run-close tasks:server-deregister tasks:open-db host port: port) -;; (open-run-close tasks:server-deregister tasks:open-db host pid: pid)) -;; (if (null? tal) -;; #f -;; (loop (car tal)(cdr tal)))))))))) - -(define (tasks:remove-server-records mdb) - (sqlite3:execute mdb "DELETE FROM servers;")) - -(define (tasks:mark-server hostname port pid state transport) - (if port - (open-run-close tasks:server-deregister tasks:open-db hostname port: port) - (open-run-close tasks:server-deregister tasks:open-db hostname pid: pid))) - + AND mt_version=? AND run_id=? AND state='running' + ORDER BY start_time DESC LIMIT 1;" (common:version-signature) run-id) + res)) (define (tasks:kill-server status hostname port pid transport) (debug:print-info 1 "Removing defunct server record for " hostname ":" port) (if port (open-run-close tasks:server-deregister tasks:open-db hostname port: port) @@ -287,22 +238,10 @@ (process-signal pid signal/term) ;; local machine, send sig term (thread-sleep! 5) ;; give it five seconds to die peacefully then do a brutal kill (process-signal pid signal/kill)) (debug:print 0 "WARNING: Can't kill frozen server on remote host " hostname)))))) - - -(define (tasks:get-all-servers mdb) - (let ((res '())) - (sqlite3:for-each-row - (lambda (id pid hostname interface port pubport start-time priority state mt-version last-update transport) - (set! res (cons (vector id pid hostname interface port pubport start-time priority state mt-version last-update transport) res))) - mdb - "SELECT id,pid,hostname,interface,port,pubport,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update,transport FROM servers ORDER BY start_time DESC;") - res)) - - ;;====================================================================== ;; Tasks and Task monitors ;;====================================================================== Index: tests/Makefile ================================================================== --- tests/Makefile +++ tests/Makefile @@ -1,18 +1,18 @@ # # run some tests -BINPATH=$(shell readlink -m $(PWD)/../bin) -MEGATEST=$(BINPATH)/megatest -DASHBOARD=$(BINPATH)/dashboard -PATH := $(BINPATH):$(PATH) -RUNNAME := $(shell date +w%V.%u.%H.%M) -IPADDR := "-" -# Set SERVER to "-server -" -SERVER = -DEBUG = 1 -LOGGING = +BINPATH = $(shell readlink -m $(PWD)/../bin) +MEGATEST = $(BINPATH)/megatest +DASHBOARD = $(BINPATH)/dashboard +PATH := $(BINPATH):$(PATH) +RUNNAME := $(shell date +w%V.%u.%H.%M) +IPADDR := "-" +RUNID := 1 +SERVER = +DEBUG = 1 +LOGGING = OS = $(shell grep ID /etc/*-release|cut -d= -f2) FS = $(shell df -T .|tail -1|awk '{print $$2}') VER = $(shell fsl info|grep checkout|awk '{print $$2}'|cut -c 1-5) @@ -22,11 +22,11 @@ all : test1 test2 test3 test4 test5 test6 test7 test8 test9 server : cd ..;make;make install - cd fullrun;../../bin/megatest -server - -debug 22 + cd fullrun;../../bin/megatest -server - -debug 22 -run-id $(RUNID) stopserver : cd ..;make && make install cd fullrun;$(MEGATEST) -stop-server 0 ADDED tests/watch-monitor.sh Index: tests/watch-monitor.sh ================================================================== --- /dev/null +++ tests/watch-monitor.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +sqlite3 fullrun/db/monitor.db << EOF +.header on +.mode column +select * from servers; +.q +EOF