Index: common.scm ================================================================== --- common.scm +++ common.scm @@ -47,12 +47,14 @@ (define *last-db-access* (current-seconds)) ;; update when db is accessed via server (define *max-cache-size* 0) (define *logged-in-clients* (make-hash-table)) (define *client-non-blocking-mode* #f) (define *server-id* #f) -(define *time-to-exit* #f) +(define *server-info* #f) +(define *time-to-exit* #f) (define *received-response* #f) +(define *default-numtries* 2) (define *target* (make-hash-table)) ;; cache the target here; target is keyval1/keyval2/.../keyvalN (define *keys* (make-hash-table)) ;; cache the keys here (define *keyvals* (make-hash-table)) (define *toptest-paths* (make-hash-table)) ;; cache toptest path settings here Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -11,14 +11,14 @@ ;;====================================================================== ;; Database access ;;====================================================================== -(require-extension (srfi 18) extras tcp rpc) -(import (prefix rpc rpc:)) +(require-extension (srfi 18) extras tcp) ;; rpc) +;; (import (prefix rpc rpc:)) -(use sqlite3 srfi-1 posix regex regex-case srfi-69 csv-xml s11n) +(use sqlite3 srfi-1 posix regex regex-case srfi-69 csv-xml s11n md5 message-digest) (import (prefix sqlite3 sqlite3:)) (use zmq) (declare (unit db)) @@ -781,11 +781,11 @@ ;;(debug:print 0 "QRY: " qry) (sqlite3:execute db qry run-id newstate newstatus testname testname))) testnames)) (define (cdb:delete-tests-in-state zmqsocket run-id state) - (cdb:client-call zmqsocket 'delete-tests-in-state #t run-id state)) + (cdb:client-call zmqsocket 'delete-tests-in-state #t *default-numtries* run-id state)) ;; speed up for common cases with a little logic (define (db:test-set-state-status-by-id db test-id newstate newstatus newcomment) (cond ((and newstate newstatus newcomment) @@ -960,14 +960,14 @@ db "UPDATE tests SET comment=? WHERE id=?;" comment test-id)) (define (cdb:test-set-rundir! zmqsocket run-id test-name item-path rundir) - (cdb:client-call zmqsocket 'test-set-rundir #t rundir run-id test-name item-path)) + (cdb:client-call zmqsocket 'test-set-rundir #t *default-numtries* rundir run-id test-name item-path)) (define (cdb:test-set-rundir-by-test-id zmqsocket test-id rundir) - (cdb:client-call zmqsocket 'test-set-rundir-by-test-id #t rundir test-id)) + (cdb:client-call zmqsocket 'test-set-rundir-by-test-id #t *default-numtries* rundir test-id)) (define (db:test-get-rundir-from-test-id db test-id) (let ((res #f)) ;; (hash-table-ref/default *test-paths* test-id #f))) ;; (if res ;; res @@ -980,11 +980,11 @@ test-id) ;; (hash-table-set! *test-paths* test-id res) res)) ;; )) (define (cdb:test-set-log! zmqsocket test-id logf) - (if (string? logf)(cdb:client-call zmqsocket 'test-set-log #f logf test-id))) + (if (string? logf)(cdb:client-call zmqsocket 'test-set-log #f *default-numtries* logf test-id))) ;;====================================================================== ;; Misc. test related queries ;;====================================================================== @@ -1095,86 +1095,10 @@ ;; (let loop () ;; (thread-sleep! 10) ;; move save time around to minimize regular collisions? ;; (db:write-cached-data) ;; (loop))) -;; cdb:cached-access is called by the server loop to dispatch commands or queue up -;; db accesses -;; -;; params := qry-name cached? val1 val2 val3 ... -(define (cdb:cached-access params) - (debug:print-info 12 "cdb:cached-access params=" params) - (if (< (length params) 2) - "ERROR" - (let ((qry-name (car params)) - (cached? (cadr params)) - (remparam (list-tail params 2))) - (debug:print-info 12 "cdb:cached-access qry-name=" qry-name " params=" params) - (if (not cached?)(db:write-cached-data)) - ;; Any special calls are dispatched here. - ;; Remainder are put in the db queue - (case qry-name - ((login) ;; login checks that the megatest path and version matches - (if (< (length remparam) 3) ;; should get toppath, version and signature - '(#f "login failed due to missing params") ;; missing params - (let ((calling-path (car remparam)) - (calling-vers (cadr remparam)) - (client-key (caddr remparam))) - (if (and (equal? calling-path *toppath*) - (equal? megatest-version calling-vers)) - (begin - (hash-table-set! *logged-in-clients* client-key (current-seconds)) - '(#t "successful login")) ;; path matches - pass! Should vet the caller at this time ... - (list #f (conc "Login failed due to mismatch paths: " calling-path ", " *toppath*)))))) - ((logout) - (if (and (> (length remparam) 1) - (eq? *toppath* (car remparam)) - (hash-table-ref/default *logged-in-clients* (cadr remparam) #f)) - #t - #f)) - ((numclients) - (length (hash-table-keys *logged-in-clients*))) - ((flush) - (db:write-cached-data) - #t) - ((immediate) - (db:write-cached-data) - (if (not (null? remparam)) - (apply (car remparam) (cdr remparam)) - "ERROR")) - ((killserver) - ;; (db:write-cached-data) - (debug:print-info 0 "Remotely killed server on host " (get-host-name) " pid " (current-process-id)) - (set! *time-to-exit* #t) - #t) - ((set-verbosity) - (set! *verbosity* (caddr params)) - *verbosity*) - ((get-verbosity) - *verbosity*) - ((ping) - 'hi) - (else - (mutex-lock! *incoming-mutex*) - (set! *last-db-access* (current-seconds)) - (set! *incoming-data* (cons - (vector qry-name - (current-milliseconds) - remparam) - *incoming-data*)) - (mutex-unlock! *incoming-mutex*) - ;; NOTE: if cached? is #f then this call must be run immediately - ;; but first all calls in the queue are run first in the order - ;; of their time stamp - (if (and cached? *cache-on*) - (begin - (debug:print-info 12 "*cache-on* is " *cache-on* ", skipping cache write") - "CACHED") - (begin - (db:write-cached-data) - "WRITTEN"))))))) - (define (db:obj->string obj)(with-output-to-string (lambda ()(serialize obj)))) (define (db:string->obj msg)(with-input-from-string msg (lambda ()(deserialize)))) (define (cdb:use-non-blocking-mode proc) (set! *client-non-blocking-mode* #t) @@ -1181,72 +1105,98 @@ (let ((res (proc))) (set! *client-non-blocking-mode* #f) res)) ;; params = 'target cached remparams -(define (cdb:client-call zmq-socket . params) - (debug:print-info 11 "cdb:client-call zmq-socket=" zmq-socket " params=" params) - (let ((zdat (db:obj->string params)) ;; (with-output-to-string (lambda ()(serialize params)))) - (res #f)) - ;; (signal-mask! signal/int) - (set! *received-response* #f) - (send-message zmq-socket zdat) - ;; (signal-unmask! signal/int) - (set! res (db:string->obj (if *client-non-blocking-mode* - (receive-message* zmq-socket) - (receive-message zmq-socket)))) - (set! *received-response* #t) - (debug:print-info 11 "zmq-socket " (car params) " res=" res) - res)) +;; +;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime +;; +(define (cdb:client-call zmq-sockets qtype immediate numretries . params) + (debug:print-info 11 "cdb:client-call zmq-sockets=" zmq-sockets ", qtype=" qtype ", immediate=" immediate ", numretries=" numretries ", params=" params) + (let* ((push-socket (vector-ref zmq-sockets 0)) + (sub-socket (vector-ref zmq-sockets 1)) + (client-sig (server:get-client-signature)) + (query-sig (message-digest-string (md5-primitive) (conc qtype immediate params))) + (zdat (db:obj->string (vector client-sig qtype immediate query-sig params (current-seconds)))) ;; (with-output-to-string (lambda ()(serialize params)))) + (res #f) + (send-receive (lambda () + (debug:print-info 11 "sending message") + (send-message push-socket zdat) + (debug:print-info 11 "message sent") + (let ((rmsg receive-message*)) ;; (if *client-non-blocking-mode* receive-message* receive-message))) + ;; get the sender info + ;; this should match (server:get-client-signature) + ;; we will need to process "all" messages here some day + (rmsg sub-socket) + ;; now get the actual message + (set! res (db:string->obj (rmsg sub-socket)))))) + (timeout (lambda () + (thread-sleep! 60) + (if (not res) + (if (> numretries 0) + (begin + (debug:print 0 "WARNING: no reply to query " params ", trying again") + (apply cdb:client-call zmq-sockets qtype immediate (- numretries 1) params)) + (begin + (debug:print 0 "ERROR: cdb:client-call timed out " params ", exiting.") + (exit 5))))))) + (debug:print-info 11 "Starting threads") + (let ((th1 (make-thread send-receive "send receive")) + (th2 (make-thread timeout "timeout"))) + (thread-start! th1) + (thread-start! th2) + (thread-join! th1) + (debug:print-info 11 "cdb:client-call returning res=" res) + res))) (define (cdb:set-verbosity zmq-socket val) - (cdb:client-call zmq-socket 'set-verbosity #f val)) + (cdb:client-call zmq-socket 'set-verbosity #f *default-numtries* val)) -(define (cdb:login zmq-socket keyval signature) - (cdb:client-call zmq-socket 'login #t keyval megatest-version signature)) +(define (cdb:login zmq-sockets keyval signature) + (cdb:client-call zmq-sockets 'login #t *default-numtries* keyval megatest-version signature)) (define (cdb:logout zmq-socket keyval signature) - (cdb:client-call zmq-socket 'logout #t keyval signature)) + (cdb:client-call zmq-socket 'logout #t *default-numtries* keyval signature)) (define (cdb:num-clients zmq-socket) - (cdb:client-call zmq-socket 'numclients #t)) + (cdb:client-call zmq-socket 'numclients #t *default-numtries*)) (define (cdb:test-set-status-state zmqsocket test-id status state msg) (if msg - (cdb:client-call zmqsocket 'state-status-msg #t state status msg test-id) - (cdb:client-call zmqsocket 'state-status #t state status test-id))) ;; run-id test-name item-path minutes cpuload diskfree tmpfree) + (cdb:client-call zmqsocket 'state-status-msg #t *default-numtries* state status msg test-id) + (cdb:client-call zmqsocket 'state-status #t *default-numtries* state status test-id))) ;; run-id test-name item-path minutes cpuload diskfree tmpfree) (define (cdb:test-rollup-test_data-pass-fail zmqsocket test-id) - (cdb:client-call zmqsocket 'test_data-pf-rollup #t test-id test-id test-id test-id)) + (cdb:client-call zmqsocket 'test_data-pf-rollup #t *default-numtries* test-id test-id test-id test-id)) (define (cdb:pass-fail-counts zmqsocket test-id fail-count pass-count) - (cdb:client-call zmqsocket 'pass-fail-counts #t fail-count pass-count test-id)) + (cdb:client-call zmqsocket 'pass-fail-counts #t *default-numtries* fail-count pass-count test-id)) (define (cdb:tests-register-test zmqsocket run-id test-name item-path) (let ((item-paths (if (equal? item-path "") (list item-path) (list item-path "")))) - (cdb:client-call zmqsocket 'register-test #t run-id test-name item-path))) + (cdb:client-call zmqsocket 'register-test #t *default-numtries* run-id test-name item-path))) (define (cdb:flush-queue zmqsocket) - (cdb:client-call zmqsocket 'flush #f)) + (cdb:client-call zmqsocket 'flush #f *default-numtries*)) (define (cdb:kill-server zmqsocket) - (cdb:client-call zmqsocket 'killserver #f)) + (cdb:client-call zmqsocket 'killserver #f *default-numtries*)) (define (cdb:roll-up-pass-fail-counts zmqsocket run-id test-name item-path status) - (cdb:client-call zmqsocket 'immediate #f open-run-close db:roll-up-pass-fail-counts #f run-id test-name item-path status)) + (cdb:client-call zmqsocket 'immediate #f *default-numtries* open-run-close db:roll-up-pass-fail-counts #f run-id test-name item-path status)) (define (cdb:get-test-info zmqsocket run-id test-name item-path) - (cdb:client-call zmqsocket 'immediate #f open-run-close db:get-test-info #f run-id test-name item-path)) + (cdb:client-call zmqsocket 'immediate #f *default-numtries* open-run-close db:get-test-info #f run-id test-name item-path)) (define (cdb:get-test-info-by-id zmqsocket test-id) - (cdb:client-call zmqsocket 'immediate #f open-run-close db:get-test-info-by-id #f test-id)) + (cdb:client-call zmqsocket 'immediate #f *default-numtries* open-run-close db:get-test-info-by-id #f test-id)) ;; db should be db open proc or #f (define (cdb:remote-run proc db . params) - (apply cdb:client-call *runremote* 'immediate #f open-run-close proc #f params)) + (apply cdb:client-call *runremote* 'immediate #f *default-numtries* open-run-close proc #f params)) (define (db:test-get-logfile-info db run-id test-name) (let ((res #f)) (sqlite3:for-each-row (lambda (path final_logf) @@ -1276,41 +1226,42 @@ END WHERE id=?;") '(test-set-log "UPDATE tests SET final_logf=? WHERE id=?;") '(test-set-rundir-by-test-id "UPDATE tests SET rundir=? WHERE id=?") '(test-set-rundir "UPDATE tests SET rundir=? WHERE run_id=? AND testname=? AND item_path=?;") '(delete-tests-in-state "DELETE FROM tests WHERE state=? AND run_id=?;") - '(tests:test-set-toplog "UPDATE tests SET final_logf=? WHERE run_id=? AND testname=? AND item_path='';") + '(tests:test-set-toplog "UPDATE tests SET final_logf=? WHERE run_id=? AND testname=? AND item_path='';") )) ;; do not run these as part of the transaction (define db:special-queries '(rollup-tests-pass-fail - db:roll-up-pass-fail-counts)) + db:roll-up-pass-fail-counts + login + immediate)) ;; not used, intended to indicate to run in calling process (define db:run-local-queries '()) ;; rollup-tests-pass-fail)) ;; The queue is a list of vectors where the zeroth slot indicates the type of query to ;; apply and the second slot is the time of the query and the third entry is a list of ;; values to be applied ;; -(define (db:write-cached-data) +(define (db:process-queue pubsock indata) (open-run-close (lambda (db . junkparams) (let ((queries (make-hash-table)) - (data #f)) - (mutex-lock! *incoming-mutex*) - (set! data (sort *incoming-data* (lambda (a b)(< (vector-ref a 1)(vector-ref b 1))))) - (set! *incoming-data* '()) - (mutex-unlock! *incoming-mutex*) + (data (sort indata (lambda (a b) + (< (cdb:packet-get-qtime a)(cdb:packet-get-qtime b)))))) (if (> (length data) 0) (debug:print-info 4 "Writing cached data " data)) ;; prepare the needed statements, do each only once (for-each (lambda (request-item) - (let ((stmt-key (vector-ref request-item 0))) + (let ((stmt-key (cdb:packet-get-qtype request-item))) + (debug:print-info 11 "stmt-key=" stmt-key ", request-item=" request-item) (if (not (hash-table-ref/default queries stmt-key #f)) (let ((stmt (alist-ref stmt-key db:queries))) + (debug:print-info 11 "stmt-key=" stmt-key ", stmt=" stmt) (if stmt (hash-table-set! queries stmt-key (sqlite3:prepare db (car stmt))) (if (procedure? stmt-key) (hash-table-set! queries stmt-key #f) (debug:print 0 "ERROR: Missing query spec for " stmt-key "!"))))))) @@ -1318,24 +1269,54 @@ ;; outer loop to handle special queries that cannot be handled in the ;; transaction. (let outerloop ((special-qry #f) (stmts data)) + (debug:print-info 11 "special-qry=" special-qry ", stmts=" stmts) (if special-qry ;; handle a query that cannot be part of the grouped queries - (let* ((stmt-key (vector-ref special-qry 0)) - (qry (hash-table-ref queries stmt-key)) - (params (vector-ref special-qry 2))) - (if (string? qry) - (apply sqlite3:execute db qry params) - (if (procedure? stmt-key) - (begin - ;; we are being handed a procedure so call it - (debug:print-info 11 "Running (apply " stmt-key " " db " " params ")") - (apply stmt-key db params)) - (debug:print 0 "ERROR: Unrecognised queued call " qry " " params))) + (let* ((stmt-key (cdb:packet-get-qtype special-qry)) + (return-address (cdb:packet-get-client-sig special-qry)) + (qry (hash-table-ref/default queries stmt-key #f)) + (params (cdb:packet-get-params special-qry))) + (debug:print-info 11 "Special queries/requests stmt-key=" stmt-key ", return-address=" return-address ", qry=" qry ", params=" params) + (cond + ;; Special queries + ((string? qry) + (apply sqlite3:execute db qry params) + (server:reply pubsock return-address #t)) + ;; ((and (not (null? params)) + ;; (procedure? (car params))) + ;; (let ((proc (car params)) + ;; (remparams (cdr params))) + ;; ;; we are being handed a procedure so call it + ;; (debug:print-info 11 "Running (apply " proc " " db " " remparams ")") + ;; (server:reply pubsock return-address (apply proc db remparams)))) + + (else + (case stmt-key + ((immediate) + (let ((proc (car params)) + (remparams (cdr params))) + ;; we are being handed a procedure so call it + (debug:print-info 11 "Running (apply " proc " " remparams ")") + (server:reply pubsock return-address (apply proc remparams)))) + ((login) + (if (< (length params) 3) ;; should get toppath, version and signature + '(#f "login failed due to missing params") ;; missing params + (let ((calling-path (car params)) + (calling-vers (cadr params)) + (client-key (caddr params))) + (if (and (equal? calling-path *toppath*) + (equal? megatest-version calling-vers)) + (begin + (hash-table-set! *logged-in-clients* client-key (current-seconds)) + (server:reply pubsock return-address '(#t "successful login"))) ;; path matches - pass! Should vet the caller at this time ... + (list #f (conc "Login failed due to mismatch paths: " calling-path ", " *toppath*)))))) + (else + (debug:print 0 "ERROR: Unrecognised queued call " qry " " params))))) (if (not (null? stmts)) (outerloop #f stmts))) ;; handle normal queries (let ((rem (sqlite3:with-transaction @@ -1344,20 +1325,22 @@ (debug:print-info 11 "flushing " stmts " to db") (if (null? stmts) stmts (let innerloop ((hed (car stmts)) (tal (cdr stmts))) - (let ((params (vector-ref hed 2)) - (stmt-key (vector-ref hed 0))) - (if (or (procedure? stmt-key) + (let ((params (cdb:packet-get-params hed)) + (return-address (cdb:packet-get-client-sig hed)) + (stmt-key (cdb:packet-get-qtype hed))) + (if (or (not (hash-table-ref/default queries stmt-key #f)) (member stmt-key db:special-queries)) (begin (debug:print-info 11 "Handling special statement " stmt-key) (cons hed tal)) (begin (debug:print-info 11 "Executing " stmt-key " for " params) (apply sqlite3:execute (hash-table-ref queries stmt-key) params) + (server:reply pubsock return-address #t) (if (not (null? tal)) (innerloop (car tal)(cdr tal)) '())) )))))))) (if (not (null? rem)) Index: db_records.scm ================================================================== --- db_records.scm +++ db_records.scm @@ -118,5 +118,19 @@ (define-inline (db:step-stable-set-runtime! vec val)(vector-set! vec 4 val)) ;; use this one for db-get-run-info (define-inline (db:get-row vec)(vector-ref vec 1)) +;; The data structure for handing off requests via wire +(define (make-cdb:packet)(make-vector 6)) +(define-inline (cdb:packet-get-client-sig vec) (vector-ref vec 0)) +(define-inline (cdb:packet-get-qtype vec) (vector-ref vec 1)) +(define-inline (cdb:packet-get-immediate vec) (vector-ref vec 2)) +(define-inline (cdb:packet-get-query-sig vec) (vector-ref vec 3)) +(define-inline (cdb:packet-get-params vec) (vector-ref vec 4)) +(define-inline (cdb:packet-get-qtime vec) (vector-ref vec 5)) +(define-inline (cdb:packet-set-client-sig! vec val)(vector-set! vec 0 val)) +(define-inline (cdb:packet-set-qtype! vec val)(vector-set! vec 1 val)) +(define-inline (cdb:packet-set-immediate! vec val)(vector-set! vec 2 val)) +(define-inline (cdb:packet-set-query-sig! vec val)(vector-set! vec 3 val)) +(define-inline (cdb:packet-set-params! vec val)(vector-set! vec 4 val)) +(define-inline (cdb:packet-set-qtime! vec val)(vector-set! vec 5 val)) Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -97,12 +97,11 @@ -env2file fname : write the environment to fname.csh and fname.sh -setvars VAR1=val1,VAR2=val2 : Add environment variables to a run NB// these are overwritten by values set in config files. -server -|hostname : start the server (reduces contention on megatest.db), use - to automatically figure out hostname - -listservers : list the servers - -killserver host:port|pid : kill server specified by host:port or pid + -list-servers : list the servers -repl : start a repl (useful for extending megatest) Spreadsheet generation -extract-ods fname.ods : extract an open document spreadsheet from the database -pathmod path : insert path, i.e. path/runame/itempath/logfile.html @@ -121,10 +120,11 @@ Called as " (string-intersperse (argv) " ") " Built from " megatest-fossil-hash )) ;; -gui : start a gui interface ;; -config fname : override the runconfig file with fname +;; -kill-server host:port|pid : kill server specified by host:port or pid ;; process args (define remargs (args:get-args (argv) (list "-runtests" ;; run a specific test @@ -157,11 +157,11 @@ ":expected" ":tol" ":units" ;; misc "-server" - "-killserver" + "-kill-server" "-port" "-extract-ods" "-pathmod" "-env2file" "-setvars" @@ -184,11 +184,11 @@ ;; misc "-archive" "-repl" "-lock" "-unlock" - "-listservers" + "-list-servers" ;; queries "-test-paths" ;; get path(s) to a test, ordered by youngest first "-runall" ;; run all tests "-remove-runs" @@ -270,52 +270,47 @@ (if (args:get-arg "-server") (begin (debug:print 1 "Launching server...") (server:launch))) -(if (or (args:get-arg "-listservers") - (args:get-arg "-killserver")) +(if (args:get-arg "-list-servers") + ;; (args:get-arg "-kill-server")) (let ((tl (setup-for-run))) (if tl (let ((servers (open-run-close tasks:get-all-servers tasks:open-db)) - (fmtstr "~5a~8a~8a~20a~20a~10a~20a~10a~10a\n") + (fmtstr "~5a~8a~8a~20a~20a~10a~10a~10a~10a\n") (servers-to-kill '())) - (format #t fmtstr "Id" "MTver" "Pid" "Host" "Interface" "Port" "Time" "Priority" "State") - (format #t fmtstr "==" "=====" "===" "====" "=========" "====" "====" "========" "=====") + (format #t fmtstr "Id" "MTver" "Pid" "Host" "Interface" "OutPort" "InPort" "LastBeat" "State") + (format #t fmtstr "==" "=====" "===" "====" "=========" "=======" "======" "========" "=====") (for-each (lambda (server) - (let* ((killinfo (args:get-arg "-killserver")) - (khost-port (if killinfo (if (substring-index ":" killinfo)(string-split ":") #f) #f)) - (kpid (if killinfo (if (substring-index ":" killinfo) #f (string->number killinfo)) #f)) + (let* (;; (killinfo (args:get-arg "-kill-server")) + ;; (khost-port (if killinfo (if (substring-index ":" killinfo)(string-split ":") #f) #f)) + ;; (kpid (if killinfo (if (substring-index ":" killinfo) #f (string->number killinfo)) #f)) (id (vector-ref server 0)) (pid (vector-ref server 1)) (hostname (vector-ref server 2)) (interface (vector-ref server 3)) - (port (vector-ref server 4)) - (start-time (vector-ref server 5)) - (priority (vector-ref server 6)) - (state (vector-ref server 7)) - (mt-ver (vector-ref server 8)) - (status (open-run-close tasks:server-alive? tasks:open-db #f hostname: hostname port: port)) + (pullport (vector-ref server 4)) + (pubport (vector-ref server 5)) + (start-time (vector-ref server 6)) + (priority (vector-ref server 7)) + (state (vector-ref server 8)) + (mt-ver (vector-ref server 9)) + (last-update (vector-ref server 10)) ;; (open-run-close tasks:server-alive? tasks:open-db #f hostname: hostname port: port)) (killed #f) - (zmq-socket (if status (server:client-connect hostname port) #f))) + (status (< last-update 20))) + ;; (zmq-sockets (if status (server:client-connect hostname port) #f))) ;; no need to login as status of #t indicates we are connecting to correct ;; server - (if (not status) ;; no point in keeping dead records in the db - (open-run-close tasks:server-deregister tasks:open-db hostname port: port pid: pid)) - - (if (and khost-port ;; kill by host/port - (equal? hostname (car khost-port)) - (equal? port (string->number (cadr khost-port)))) - (tasks:kill-server status hostname port pid)) - - (if (and kpid - (equal? hostname (get-host-name)) - (equal? kpid pid)) ;;; YEP, ALL WITH PID WILL BE KILLED!!! - (tasks:kill-server status hostname #f pid)) - - (format #t fmtstr id mt-ver pid hostname interface port start-time priority + (if (equal? state "dead") + (if (> last-update (* 25 60 60)) ;; keep records around for slighly over a day. + (open-run-close tasks:server-deregister tasks:open-db hostname pullport: pullport pid: pid action: 'delete)) + (if (> last-update 20) ;; Mark as dead if not updated in last 20 seconds + (open-run-close tasks:server-deregister tasks:open-db hostname pullport: pullport pid: pid))) + + (format #t fmtstr id mt-ver pid hostname interface pullport pubport last-update (if status "alive" "dead")))) servers) (debug:print-info 1 "Done with listservers") (set! *didsomething* #t) (exit) ;; must do, would have to add checks to many/all calls below @@ -323,11 +318,11 @@ (exit))) ;; if not list or kill then start a client (if appropriate) (if (or (args-defined? "-h" "-version" "-gen-megatest-area" "-gen-megatest-test") (eq? (length (hash-table-keys args:arg-hash)) 0)) (debug:print-info 1 "Server connection not needed") - + (server:client-launch))) ;;====================================================================== ;; Remove old run(s) ;;====================================================================== Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -6,12 +6,12 @@ ;; ;; This program is distributed WITHOUT ANY WARRANTY; without even the ;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR ;; PURPOSE. -(require-extension (srfi 18) extras tcp rpc s11n) -(import (prefix rpc rpc:)) +(require-extension (srfi 18) extras tcp s11n) +;; (import (prefix rpc rpc:)) (use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) (import (prefix sqlite3 sqlite3:)) (use zmq) @@ -23,67 +23,107 @@ (declare (uses tests)) (declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. (include "common_records.scm") (include "db_records.scm") + +;; Transition to pub --> sub with pull <-- push +;; +;; 1. client sends request to server via push to the pull port +;; 2. server puts request in queue or processes immediately as appropriate +;; 3. server puts responses from completed requests into pub port +;; +;; TODO +;; +;; Done Tested +;; [x] [ ] 1. Add columns pullport pubport to servers table +;; [x] [ ] 2. Add rm of monitor.db if older than 11/12/2012 +;; [x] [ ] 3. Add create of pullport and pubport with finding of available ports +;; [x] [ ] 4. Add client compose of request +;; [x] [ ] - name of client: testname/itempath-test_id-hostname +;; [x] [ ] - name of request: callname, params +;; [x] [ ] - request key: f(clientname, callname, params) +;; [ ] [ ] 5. Add processing of subscription hits +;; [ ] [ ] - done when get key +;; [ ] [ ] - return results +;; [ ] [ ] 6. Add timeout processing +;; [ ] [ ] - after 60 seconds +;; [ ] [ ] i. check server alive, connect to new if necessary +;; [ ] [ ] ii. resend request +;; [ ] [ ] 7. Turn self ping back on (define (server:make-server-url hostport) (if (not hostport) #f (conc "tcp://" (car hostport) ":" (cadr hostport)))) (define *server-loop-heart-beat* (current-seconds)) (define *heartbeat-mutex* (make-mutex)) -(define (server:self-ping iface port) - (let ((zsocket (server:client-connect iface port))) - (let loop () - (thread-sleep! 2) - (cdb:client-call zsocket 'ping #t) - (debug:print 4 "server:self-ping - I'm alive on " iface ":" port "!") - (mutex-lock! *heartbeat-mutex*) - (set! *server-loop-heart-beat* (current-seconds)) - (mutex-unlock! *heartbeat-mutex*) - (loop)))) - +;; (define (server:self-ping server-info) +;; ;; server-info: server-id interface pullport pubport +;; (let ((iface (list-ref server-info 1)) +;; (pullport (list-ref server-info 2)) +;; (pubport (list-ref server-info 3))) +;; (server:client-connect iface pullport pubport) +;; (let loop () +;; (thread-sleep! 2) +;; (cdb:client-call *runremote* 'ping #t) +;; (debug:print 4 "server:self-ping - I'm alive on " iface ":" pullport "/" pubport "!") +;; (mutex-lock! *heartbeat-mutex*) +;; (set! *server-loop-heart-beat* (current-seconds)) +;; (mutex-unlock! *heartbeat-mutex*) +;; (loop)))) + +(define-inline (zmqsock:get-pub dat)(vector-ref dat 0)) +(define-inline (zmqsock:get-pull dat)(vector-ref dat 1)) +(define-inline (zmqsock:set-pub! dat s)(vector-set! dat s 0)) +(define-inline (zmqsock:set-pull! dat s)(vector-set! dat s 0)) + (define (server:run hostn) (debug:print 0 "Attempting to start the server ...") (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") (exit)))) - (let* ((zmq-socket #f) - (zmq-socket-dat #f) - (iface (if (string=? "-" hostn) - "*" ;; (get-host-name) - hostn)) - (hostname (get-host-name)) - (ipaddrstr (let ((ipstr (if (string=? "-" hostn) - (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") - #f))) - (if ipstr ipstr hostname))) - (actual-port #f)) - ;; (set! zmq-socket (server:find-free-port-and-open iface zmq-socket 5555 0)) - (set! zmq-socket-dat (server:find-free-port-and-open ipaddrstr zmq-socket (if (args:get-arg "-port") - (string->number (args:get-arg "-port")) - (+ 5000 (random 1001))) - 0)) - (set! zmq-socket (cadr zmq-socket-dat)) - (set! actual-port (caddr zmq-socket-dat)) + (let* ((zmq-sdat1 #f) + (zmq-sdat2 #f) + (pull-socket #f) + (pub-socket #f) + (p1 #f) + (p2 #f) + (zmq-sockets-dat #f) + (iface (if (string=? "-" hostn) + "*" ;; (get-host-name) + hostn)) + (hostname (get-host-name)) + (ipaddrstr (let ((ipstr (if (string=? "-" hostn) + (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") + #f))) + (if ipstr ipstr hostname))) + (last-run 0)) + (set! zmq-sockets-dat (server:setup-ports ipaddrstr (if (args:get-arg "-port") + (string->number (args:get-arg "-port")) + (+ 5000 (random 1001))))) + + (set! zmq-sdat1 (car zmq-sockets-dat)) + (set! pull-socket (cadr zmq-sdat1)) ;; (iface s port) + (set! p1 (caddr zmq-sdat1)) + + (set! zmq-sdat2 (cadr zmq-sockets-dat)) + (set! pub-socket (cadr zmq-sdat2)) + (set! p2 (caddr zmq-sdat2)) + (set! *cache-on* #t) - ;; (set! th1 (make-thread (lambda () - ;; (server:self-ping ipaddrstr actual-port)))) - ;; (thread-start! th1) - ;; what to do when we quit ;; (on-exit (lambda () (if (and *toppath* *server-info*) (begin - (open-run-close tasks:server-deregister-self tasks:open-db ipaddrstr)) + (open-run-close tasks:server-deregister-self tasks:open-db ipaddrstr p1 p2)) (let loop () (let ((queue-len 0)) (thread-sleep! (random 5)) (mutex-lock! *incoming-mutex*) (set! queue-len (length *incoming-data*)) @@ -93,91 +133,80 @@ (debug:print-info 0 "Queue not flushed, waiting ...") (loop)))))))) ;; The heavy lifting ;; - (let loop () - ;; ;; Ugly yuk. - ;; (mutex-lock! *incoming-mutex*) - ;; (set! *server-loop-heart-beat* (list 'waiting (current-seconds))) - ;; (mutex-unlock! *incoming-mutex*) - (let* ((rawmsg (receive-message* zmq-socket)) - (params (db:string->obj rawmsg)) ;; (with-input-from-string rawmsg (lambda ()(deserialize)))) - (res #f)) - ;;; Ugly yuk. - ;; (mutex-lock! *incoming-mutex*) - ;; (set! *server-loop-heart-beat* (list 'working (current-seconds))) - ;; (mutex-unlock! *incoming-mutex*) - (debug:print-info 12 "server=> received params=" params) - (set! res (cdb:cached-access params)) - (debug:print-info 12 "server=> processed res=" res) - (send-message zmq-socket (db:obj->string res)) - (if (not *time-to-exit*) - (loop) + ;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime + ;; + (let loop ((queue-lst '())) + (let* ((rawmsg (receive-message* pull-socket)) + (packet (db:string->obj rawmsg))) + (debug:print-info 12 "server=> received packet=" packet) + (if #t ;; (cdb:packet-get-immediate packet) ;; process immediately or put in queue (begin - (open-run-close tasks:server-deregister-self tasks:open-db #f) - (db:write-cached-data) - (exit) - )))) - (thread-join! th1))) + (db:process-queue pub-socket (cons packet queue-lst)) + (loop '())) + (loop (cons packet queue-lst))))))) +(define (server:reply pubsock target result) + (debug:print-info 11 "server:reply target=" target ", result=" result) + (send-message pubsock target send-more: #t) + (send-message pubsock (db:obj->string result))) ;; run server:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; (define (server:keep-running) ;; if none running or if > 20 seconds since ;; server last used then start shutdown - (let loop ((count 0)) - (thread-sleep! 4) ;; no need to do this very often - (db:write-cached-data) - ;; (print "Server running, count is " count) - (if (< count 1) ;; 3x3 = 9 secs aprox - (loop (+ count 1)) - (let (;; (numrunning (open-run-close db:get-count-tests-running #f)) - (server-loop-heartbeat #f) - (server-info #f) - (pulse 0)) - ;; BUG add a wait on server alive here!! - ;; ;; Ugly yuk. - (mutex-lock! *heartbeat-mutex*) - (set! server-loop-heartbeat *server-loop-heart-beat*) - (set! server-info *server-info*) - (mutex-unlock! *heartbeat-mutex*) - ;; The logic here is that if the server loop gets stuck blocked in working - ;; we don't want to update our heartbeat - (set! pulse (- (current-seconds) server-loop-heartbeat)) - (debug:print-info 2 "Heartbeat period is " pulse " seconds on " (cadr server-info) ":" (caddr server-info) ", last db access is " (- (current-seconds) *last-db-access*) " seconds ago") - (if (> pulse 15) ;; must stay less than 10 seconds - (begin - (open-run-close tasks:server-deregister tasks:open-db (cadr server-info) port: (caddr server-info)) - (debug:print 0 "ERROR: Heartbeat failed, committing servercide") - (exit)) - (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info))) - ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access - (if (> (+ *last-db-access* - ;; (* 48 60 60) ;; 48 hrs - ;; 60 ;; one minute - (* 60 60) ;; one hour - ) - (current-seconds)) - (begin - ;; (debug:print-info 2 "Server continuing, tests running: " numrunning ", seconds since last db access: " (- (current-seconds) *last-db-access*)) - (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) *last-db-access*)) - (loop 0)) - (begin - (debug:print-info 0 "Starting to shutdown the server.") - ;; need to delete only *my* server entry (future use) - (set! *time-to-exit* #t) - (open-run-close tasks:server-deregister-self tasks:open-db (get-host-name)) - (thread-sleep! 1) - (debug:print-info 0 "Max cached queries was " *max-cache-size*) - (debug:print-info 0 "Server shutdown complete. Exiting") - (exit))))))) - -(define (server:find-free-port-and-open iface s port #!key (trynum 50)) - (let ((s (if s s (make-socket 'rep))) + ;; This thread waits for the server to come alive + (let* ((server-info (let loop () + (let ((sdat #f)) + (mutex-lock! *heartbeat-mutex*) + (set! sdat *server-info*) + (mutex-unlock! *heartbeat-mutex*) + (if sdat sdat + (begin + (sleep 4) + (loop)))))) + (iface (cadr server-info)) + (pullport (caddr server-info)) + (pubport (cadddr server-info)) ;; id interface pullport pubport) + ;; (zmq-sockets (server:client-connect iface pullport pubport)) + ) + (let loop ((count 0)) + (thread-sleep! 4) ;; no need to do this very often + ;; (let ((queue-len (string->number (cdb:client-call zmq-sockets 'sync #t 1)))) + ;; (print "Server running, count is " count) + (if (< count 1) ;; 3x3 = 9 secs aprox + (loop (+ count 1))) + + ;; NOTE: Get rid of this mechanism! It really is not needed... + (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info)) + + ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access + (if (> (+ *last-db-access* + ;; (* 48 60 60) ;; 48 hrs + ;; 60 ;; one minute + (* 60 60) ;; one hour + ) + (current-seconds)) + (begin + (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) *last-db-access*)) + (loop 0)) + (begin + (debug:print-info 0 "Starting to shutdown the server.") + ;; need to delete only *my* server entry (future use) + (set! *time-to-exit* #t) + (open-run-close tasks:server-deregister-self tasks:open-db (get-host-name)) + (thread-sleep! 1) + (debug:print-info 0 "Max cached queries was " *max-cache-size*) + (debug:print-info 0 "Server shutdown complete. Exiting") + (exit)))))) + +(define (server:find-free-port-and-open iface s port stype #!key (trynum 50)) + (let ((s (if s s (make-socket stype))) (p (if (number? port) port 5555)) (old-handler (current-exception-handler))) (handle-exceptions exn (begin @@ -186,20 +215,28 @@ ;; (old-handler) ;; (print-call-chain) (if (> trynum 0) (server:find-free-port-and-open iface s (+ p 1) trynum: (- trynum 1)) (debug:print-info 0 "Tried ports up to " p - " but all were in use. Please try a different port range by starting the server with parameter \" -port N\" where N is the starting port number to use"))) + " but all were in use. Please try a different port range by starting the server with parameter \" -port N\" where N is the starting port number to use")) + (exit)) ;; To exit or not? That is the question. (let ((zmq-url (conc "tcp://" iface ":" p))) - (print "Trying to start server on " zmq-url) + (debug:print 0 "Trying to start server on " zmq-url) (bind-socket s zmq-url) - (set! *runremote* #f) - (debug:print 0 "Server started on " zmq-url) - (mutex-lock! *heartbeat-mutex*) - (set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) iface p 0 'live)) - (mutex-unlock! *heartbeat-mutex*) (list iface s port))))) + +(define (server:setup-ports ipaddrstr startport) + (let* ((s1 (server:find-free-port-and-open ipaddrstr #f startport 'pull)) + (p1 (caddr s1)) + (s2 (server:find-free-port-and-open ipaddrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pub)) + (p2 (caddr s2))) + (set! *runremote* #f) + (debug:print 0 "Server started on " ipaddrstr " ports " p1 " and " p2) + (mutex-lock! *heartbeat-mutex*) + (set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) ipaddrstr p1 p2 0 'live)) + (mutex-unlock! *heartbeat-mutex*) + (list s1 s2))) (define (server:mk-signature) (message-digest-string (md5-primitive) (with-output-to-string (lambda () @@ -211,32 +248,57 @@ (let ((sig (server:mk-signature))) (set! *my-client-signature* sig) *my-client-signature*))) ;; -(define (server:client-connect iface port #!key (context #f)) - (debug:print-info 3 "client-connect " iface ":" port) +(define (server:client-socket-connect iface port #!key (context #f)(type 'req)(subscriptions '())) + (debug:print-info 3 "client-connect " iface ":" port ", type=" type ", subscriptions=" subscriptions) (let ((connect-ok #f) (zmq-socket (if context - (make-socket 'req context) - (make-socket 'req))) + (make-socket type context) + (make-socket type))) (conurl (server:make-server-url (list iface port)))) (if (socket? zmq-socket) (begin + ;; first apply subscriptions + (for-each (lambda (subscription) + (debug:print 2 "Subscribing to " subscription) + (socket-option-set! zmq-socket 'subscribe subscription)) + subscriptions) (connect-socket zmq-socket conurl) zmq-socket) - #f))) + (begin + (debug:print 0 "ERROR: Failed to open socket to " conurl) + #f)))) - -(define (server:client-login zmq-socket) - (cdb:login zmq-socket *toppath* (server:get-client-signature))) +(define (server:client-login zmq-sockets) + (cdb:login zmq-sockets *toppath* (server:get-client-signature))) (define (server:client-logout zmq-socket) (let ((ok (and (socket? zmq-socket) (cdb:logout zmq-socket *toppath* (server:get-client-signature))))) ;; (close-socket zmq-socket) ok)) + +(define (server:client-connect iface pullport pubport) + (let* ((push-socket (server:client-socket-connect iface pullport type: 'push)) + (sub-socket (server:client-socket-connect iface pubport + type: 'sub + subscriptions: (list (server:get-client-signature) "all"))) + (zmq-sockets (vector push-socket sub-socket)) + (login-res #f)) + (set! login-res (server:client-login zmq-sockets)) + (if (and (not (null? login-res)) + (car login-res)) + (begin + (debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".") + (set! *runremote* zmq-sockets) + zmq-sockets) + (begin + (debug:print-info 2 "Failed to login or connect to " conurl) + (set! *runremote* #f) + #f)))) ;; Do all the connection work, start a server if not already running (define (server:client-setup #!key (numtries 50)) (if (not *toppath*) (if (not (setup-for-run)) @@ -243,38 +305,27 @@ (begin (debug:print 0 "ERROR: failed to find megatest.config, exiting") (exit)))) (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) (if hostinfo - (let ((host (car hostinfo)) - (iface (cadr hostinfo)) - (port (caddr hostinfo))) + (let ((host (list-ref hostinfo 0)) + (iface (list-ref hostinfo 1)) + (pullport (list-ref hostinfo 2)) + (pubport (list-ref hostinfo 3))) (debug:print-info 2 "Setting up to connect to " hostinfo) - (handle-exceptions - exn - (begin - ;; something went wrong in connecting to the server. In this scenario it is ok - ;; to try again - (debug:print 0 "ERROR: Failed to open a connection to the server at: " hostinfo) - (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) - (debug:print 0 " perhaps jobs killed with -9? Removing server records") - (open-run-close tasks:server-deregister tasks:open-db host port: port) - (server:client-setup (- numtries 1)) - #f) - (let* ((zmq-socket (server:client-connect iface port)) - (login-res (server:client-login zmq-socket)) - (connect-ok (if (null? login-res) #f (car login-res))) - (conurl (server:make-server-url (list iface port)))) - (if connect-ok - (begin - (debug:print-info 2 "Logged in and connected to " conurl) - (set! *runremote* zmq-socket) - #t) - (begin - (debug:print-info 2 "Failed to login or connect to " conurl) - (set! *runremote* #f) - #f))))) + ;; (handle-exceptions + ;; exn + ;; (begin + ;; ;; something went wrong in connecting to the server. In this scenario it is ok + ;; ;; to try again + ;; (debug:print 0 "ERROR: Failed to open a connection to the server at: " hostinfo) + ;; (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) + ;; (debug:print 0 " perhaps jobs killed with -9? Removing server records") + ;; (open-run-close tasks:server-deregister tasks:open-db host pullport: pullport) + ;; (server:client-setup (- numtries 1)) + ;; #f) + (server:client-connect iface pullport pubport)) ;; ) (if (> numtries 0) (let ((exe (car (argv)))) (debug:print-info 1 "No server available, attempting to start one...") (process-run exe (list "-server" "-" "-debug" (conc *verbosity*))) (sleep 5) ;; give server time to start @@ -293,32 +344,36 @@ (debug:print-info 1 "Starting the standalone server") (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) (if hostinfo (debug:print-info 1 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo)) (if *toppath* - (let* ((th1 (make-thread (lambda () - (let ((server-info #f)) - ;; wait for the server to be online and available - (let loop () - (debug:print-info 1 "Waiting for the server to come online before starting heartbeat") - (thread-sleep! 2) - (mutex-lock! *heartbeat-mutex*) - (set! server-info *server-info* ) - (mutex-unlock! *heartbeat-mutex*) - (if (not server-info)(loop))) - (debug:print 1 "Server alive, starting self-ping") - (server:self-ping (cadr server-info)(caddr server-info)))) "Self ping")) + (let* (;; (th1 (make-thread (lambda () + ;; (let ((server-info #f)) + ;; ;; wait for the server to be online and available + ;; (let loop () + ;; (debug:print-info 1 "Waiting for the server to come online before starting heartbeat") + ;; (thread-sleep! 2) + ;; (mutex-lock! *heartbeat-mutex*) + ;; (set! server-info *server-info* ) + ;; (mutex-unlock! *heartbeat-mutex*) + ;; (if (not server-info)(loop))) + ;; (debug:print 1 "Server alive, starting self-ping") + ;; (server:self-ping server-info) + ;; )) + ;; "Self ping")) (th2 (make-thread (lambda () (server:run (args:get-arg "-server"))) "Server run")) - (th3 (make-thread (lambda () - (server:keep-running)) "Keep running"))) + (th3 (make-thread (lambda ()(server:keep-running)) "Keep running")) + ) (set! *client-non-blocking-mode* #t) - (thread-start! th1) + ;; (thread-start! th1) (thread-start! th2) (thread-start! th3) (set! *didsomething* #t) - (thread-join! th3)) + ;; (thread-join! th3) + (thread-join! th2) + ) (debug:print 0 "ERROR: Failed to setup for megatest"))) (exit))) (define (server:client-signal-handler signum) (handle-exceptions @@ -327,11 +382,11 @@ (let ((th1 (make-thread (lambda () (if (not *received-response*) (receive-message* *runremote*))) ;; flush out last call if applicable "eat response")) (th2 (make-thread (lambda () - (debug:print 0 "ERROR: Received ^C, attempting clean exit.") + (debug:print 0 "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.") (thread-sleep! 3) ;; give the flush three seconds to do it's stuff (debug:print 0 " Done.") (exit 4)) "exit on ^C timer"))) (thread-start! th2) @@ -345,10 +400,11 @@ (begin (debug:print 0 "ERROR: Failed to connect as client") (exit)))) ;; ping a server and return number of clients or #f (if no response) +;; NOT IN USE! (define (server:ping host port #!key (secs 10)(return-socket #f)) (cdb:use-non-blocking-mode (lambda () (let* ((res #f) (th1 (make-thread Index: tasks.scm ================================================================== --- tasks.scm +++ tasks.scm @@ -22,11 +22,19 @@ ;; Tasks db ;;====================================================================== (define (tasks:open-db) (let* ((dbpath (conc *toppath* "/monitor.db")) - (exists (file-exists? dbpath)) + (exists (if (file-exists? dbpath) + ;; BUGGISHNESS: Remove this code in six months. Today is 11/13/2012 + (if (< (file-change-time dbpath) 1352851396.0) + (begin + (debug:print 0 "NOTE: removing old db file " dbpath) + (delete-file dbpath) + #f) + #t) + #f)) (mdb (sqlite3:open-database dbpath)) ;; (never-give-up-open-db dbpath)) (handler (make-busy-timeout 36000))) (sqlite3:set-busy-handler! mdb handler) (sqlite3:execute mdb (conc "PRAGMA synchronous = 0;")) (if (not exists) @@ -52,17 +60,18 @@ CONSTRAINT monitors_constraint UNIQUE (pid,hostname));") (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS servers (id INTEGER PRIMARY KEY, pid INTEGER, interface TEXT, hostname TEXT, - port INTEGER, + pullport INTEGER, + pubport INTEGER, start_time TIMESTAMP, priority INTEGER, state TEXT, mt_version TEXT, heartbeat TIMESTAMP, - CONSTRAINT servers_constraint UNIQUE (pid,hostname,port));") + CONSTRAINT servers_constraint UNIQUE (pid,hostname,pullport,pubport));") (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS clients (id INTEGER PRIMARY KEY, server_id INTEGER, pid INTEGER, hostname TEXT, cmdline TEXT, @@ -76,54 +85,58 @@ ;;====================================================================== ;; Server and client management ;;====================================================================== ;; state: 'live, 'shutting-down, 'dead -(define (tasks:server-register mdb pid interface port priority state) +(define (tasks:server-register mdb pid interface pullport pubport priority state) (sqlite3:execute mdb - "INSERT OR REPLACE INTO servers (pid,hostname,port,start_time,priority,state,mt_version,heartbeat,interface) VALUES(?,?,?,strftime('%s','now'),?,?,?,strftime('%s','now'),?);" - pid (get-host-name) port priority (conc state) megatest-version interface) + "INSERT OR REPLACE INTO servers (pid,hostname,pullport,pubport,start_time,priority,state,mt_version,heartbeat,interface) + VALUES(?, ?, ?, ?, strftime('%s','now'), ?, ?, ?, strftime('%s','now'),?);" + pid (get-host-name) pullport pubport priority (conc state) megatest-version interface) (list - (tasks:server-get-server-id mdb (get-host-name) port pid) + (tasks:server-get-server-id mdb (get-host-name) pullport pid) interface - port)) + pullport + pubport)) ;; NB// two servers with same pid on different hosts will be removed from the list if pid: is used! -(define (tasks:server-deregister mdb hostname #!key (port #f)(pid #f)) - (debug:print-info 11 "server-deregister " hostname ", port " port ", pid " pid) +(define (tasks:server-deregister mdb hostname #!key (pullport #f)(pid #f)(action 'markdead)) + (debug:print-info 11 "server-deregister " hostname ", pullport " pullport ", pid " pid) (if pid - ;; (sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid) - (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE pid=?;" pid) - (if port - ;; (sqlite3:execute mdb "DELETE FROM servers WHERE hostname=? AND port=?;" hostname port) - (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE hostname=? AND port=?;" hostname port) + (case action + ((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid)) + (else (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE pid=?;" pid))) + (if pullport + (case action + ((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE hostname=? AND pullport=?;" hostname port)) + (else (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE hostname=? AND pullport=?;" hostname pullport))) (debug:print 0 "ERROR: tasks:server-deregister called with neither pid nor port specified")))) (define (tasks:server-deregister-self mdb hostname) (tasks:server-deregister mdb hostname pid: (current-process-id))) -(define (tasks:server-get-server-id mdb hostname port pid) +(define (tasks:server-get-server-id mdb hostname pullport pid) (let ((res #f)) (sqlite3:for-each-row (lambda (id) (set! res id)) mdb (if (and hostname pid) "SELECT id FROM servers WHERE hostname=? AND pid=?;" - "SELECT id FROM servers WHERE hostname=? AND port=?;") - hostname (if pid pid port)) + "SELECT id FROM servers WHERE hostname=? AND pullport=?;") + hostname (if pid pid pullport)) res)) (define (tasks:server-update-heartbeat mdb server-id) (sqlite3:execute mdb "UPDATE servers SET heartbeat=strftime('%s','now') WHERE id=?;" server-id)) ;; alive servers keep the heartbeat field upto date with seconds every 6 or so seconds -(define (tasks:server-alive? mdb server-id #!key (hostname #f)(port #f)(pid #f)) +(define (tasks:server-alive? mdb server-id #!key (hostname #f)(pullport #f)(pid #f)) (let* ((server-id (if server-id server-id - (tasks:server-get-server-id mdb hostname port pid))) + (tasks:server-get-server-id mdb hostname pullport pid))) (heartbeat-delta 99e9)) (sqlite3:for-each-row (lambda (delta) (set! heartbeat-delta delta)) mdb "SELECT strftime('%s','now')-heartbeat FROM servers WHERE id=?;" server-id) @@ -158,36 +171,47 @@ ;; remove any others. will not necessarily remove all! (define (tasks:get-best-server mdb) (let ((res '()) (best #f)) (sqlite3:for-each-row - (lambda (id hostname interface port pid) - (set! res (cons (list hostname interface port pid) res)) - (debug:print-info 2 "Found existing server " hostname ":" port " registered in db")) + (lambda (id hostname interface pullport pubport pid) + (set! res (cons (list hostname interface pullport pubport pid) res)) + (debug:print-info 2 "Found existing server " hostname ":" pullport " registered in db")) mdb - "SELECT id,hostname,interface,port,pid FROM servers WHERE state='live' AND mt_version=? ORDER BY start_time ASC LIMIT 1;" megatest-version) - ;; (print "res=" res) + "SELECT id,hostname,interface,pullport,pubport,pid FROM servers + WHERE strftime('%s','now')-heartbeat < 10 + AND mt_version=? ORDER BY start_time ASC LIMIT 1;" megatest-version) (if (null? res) #f (let loop ((hed (car res)) (tal (cdr res))) ;; (print "hed=" hed ", tal=" tal) - (let* ((host (car hed)) - (iface (cadr hed)) - (port (caddr hed)) - (pid (cadddr hed)) - (alive (open-run-close tasks:server-alive? tasks:open-db #f hostname: host port: port))) + (let* ((host (list-ref hed 0)) + (iface (list-ref hed 1)) + (pullport (list-ref hed 2)) + (pubport (list-ref hed 3)) + (pid (list-ref hed 4)) + (alive (open-run-close tasks:server-alive? tasks:open-db #f hostname: host pullport: pullport))) (if alive (begin - (debug:print-info 2 "Found an existing, alive, server " host ":" port ".") - (list host iface port)) + (debug:print-info 2 "Found an existing, alive, server " host ", " pullport " and " pubport ".") + (list host iface pullport pubport)) (begin - (debug:print-info 1 "Removing " host ":" port " from server registry as it appears to be dead") - (tasks:kill-server #f host port pid) + (debug:print-info 1 "Marking " host ":" pullport " as dead in server registry.") + (if pullport + (open-run-close tasks:server-deregister tasks:open-db host pullport: pullport) + (open-run-close tasks:server-deregister tasks:open-db host pid: pid)) (if (null? tal) #f (loop (car tal)(cdr tal)))))))))) +(define (tasks:mark-server hostname pullport pid state) + (if port + (open-run-close tasks:server-deregister tasks:open-db hostname port: port) + (open-run-close tasks:server-deregister tasks:open-db hostname pid: pid))) + + +;; NOTE: NOT PORTED TO WORK WITH pullport/pubport (define (tasks:kill-server status hostname port pid) (debug:print-info 1 "Removing defunct server record for " hostname ":" port) (if port (open-run-close tasks:server-deregister tasks:open-db hostname port: port) (open-run-close tasks:server-deregister tasks:open-db hostname pid: pid)) @@ -212,18 +236,20 @@ (debug:print-info 1 "Sending signal/term to " pid " on " hostname) (process-signal pid signal/term) ;; local machine, send sig term (thread-sleep! 5) ;; give it five seconds to die peacefully then do a brutal kill (process-signal pid signal/kill)) (debug:print 0 "WARNING: Can't kill frozen server on remote host " hostname)))))) + + (define (tasks:get-all-servers mdb) (let ((res '())) (sqlite3:for-each-row - (lambda (id pid hostname interface port start-time priority state mt-version) - (set! res (cons (vector id pid hostname interface port start-time priority state mt-version) res))) + (lambda (id pid hostname interface pullport pubport start-time priority state mt-version last-update) + (set! res (cons (vector id pid hostname interface pullport pubport start-time priority state mt-version last-update) res))) mdb - "SELECT id,pid,hostname,interface,port,start_time,priority,state,mt_version FROM servers ORDER BY start_time DESC;") + "SELECT id,pid,hostname,interface,pullport,pubport,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update FROM servers ORDER BY start_time DESC;") res)) ;;====================================================================== ;; Tasks and Task monitors ADDED testzmq/mockupclient.scm Index: testzmq/mockupclient.scm ================================================================== --- /dev/null +++ testzmq/mockupclient.scm @@ -0,0 +1,35 @@ +(use zmq posix) + +(define cname "Bob") +(define runtime 10) +(let ((args (argv))) + (if (< (length args) 3) + (begin + (print "Usage: mockupclient clientname runtime") + (exit)) + (begin + (set! cname (cadr args)) + (set! runtime (string->number (caddr args)))))) + +;; (define start-delay (/ (random 100) 9)) +;; (define runtime (+ 1 (/ (random 200) 2))) + +(print "Starting client " cname " with runtime " runtime) + +(include "mockupclientlib.scm") + +(set! endtime (+ (current-seconds) runtime)) + +(let loop () + (let ((x (random 15)) + (varname (list-ref (list "hello" "goodbye" "saluton" "kiaorana")(random 4)))) + (case x + ;; ((1)(dbaccess cname 'sync "nodat" #f)) + ((2 3 4 5)(dbaccess cname 'set varname (random 999))) + ((6 7 8 9 10)(print cname ": Get \"" varname "\" " (dbaccess cname 'get varname #f))) + (else + (thread-sleep! 0.011))) + (if (< (current-seconds) endtime) + (loop)))) + +(print "Client " cname " all done!!") ADDED testzmq/mockupserver.scm Index: testzmq/mockupserver.scm ================================================================== --- /dev/null +++ testzmq/mockupserver.scm @@ -0,0 +1,138 @@ +;; pub/sub with envelope address +;; Note that if you don't insert a sleep, the server will crash with SIGPIPE as soon +;; as a client disconnects. Also a remaining client may receive tons of +;; messages afterward. + +(use zmq srfi-18 sqlite3) + +(define pub (make-socket 'pub)) +(define pull (make-socket 'pull)) +(define cname "server") +(define total-db-accesses 0) +(define start-time (current-seconds)) + +(bind-socket pub "tcp://*:5563") +(bind-socket pull "tcp://*:5564") + +(define (open-db) + (let* ((dbpath "mockup.db") + (dbexists (file-exists? dbpath)) + (db (open-database dbpath)) ;; (never-give-up-open-db dbpath)) + (handler (make-busy-timeout 10))) + (set-busy-handler! db handler) + (if (not dbexists) + (for-each + (lambda (stmt) + (execute db stmt)) + (list + "PRAGMA SYNCHRONOUS=0;" + "CREATE TABLE clients (id INTEGER PRIMARY KEY,name TEXT,num_accesses INTEGER DEFAULT 0);" + "CREATE TABLE vars (var TEXT,val TEXT,CONSTRAINT vars_constraint UNIQUE (var));"))) + db)) + +(define cid-cache (make-hash-table)) + +(define (get-client-id db cname) + (let ((cid (hash-table-ref/default cid-cache cname #f))) + (if cid + cid + (begin + (execute db "INSERT OR REPLACE INTO clients (name) VALUES(?);" cname) + (for-each-row + (lambda (id) + (set! cid id)) + db + "SELECT id FROM clients WHERE name=?;" cname) + (hash-table-set! cid-cache cname cid) + (set! total-db-accesses (+ total-db-accesses 2)) + cid)))) + +(define (count-client db cname) + (let ((cid (get-client-id db cname))) + (execute db "UPDATE clients SET num_accesses=num_accesses+1 WHERE id=?;" cid) + (set! total-db-accesses (+ total-db-accesses 1)) + )) + +(define db (open-db)) +;; (define queuelst '()) +;; (define mx1 (make-mutex)) + +(define max-queue-len 0) + +(define (process-queue queuelst) + (let ((queuelen (length queuelst))) + (if (> queuelen max-queue-len) + (set! max-queue-len queuelen)) + (for-each + (lambda (item) + (let ((cname (vector-ref item 1)) + (clcmd (vector-ref item 2)) + (cdata (vector-ref item 3))) + (send-message pub cname send-more: #t) + (send-message pub (case clcmd + ((sync) + (conc queuelen)) + ((set) + (set! total-db-accesses (+ total-db-accesses 1)) + (apply execute db "INSERT OR REPLACE INTO vars (var,val) VALUES (?,?);" (string-split cdata)) + "ok") + ((get) + (set! total-db-accesses (+ total-db-accesses 1)) + (let ((res "noval")) + (for-each-row + (lambda (val) + (set! res val)) + db + "SELECT val FROM vars WHERE var=?;" cdata) + res)) + (else (conc "unk cmd: " clcmd)))))) + queuelst))) + +(define th1 (make-thread + (lambda () + (let ((last-run 0)) ;; current-seconds when run last + (let loop ((queuelst '())) + (let* ((indat (receive-message* pull)) + (parts (string-split indat ":")) + (cname (car parts)) ;; client name + (clcmd (string->symbol (cadr parts))) ;; client cmd + (cdata (caddr parts)) ;; client data + (svect (vector (current-seconds) cname clcmd cdata))) ;; record for the queue + (count-client db cname) + (case clcmd + ((sync) ;; just process the queue + (print "Got sync from " cname) + (process-queue (cons svect queuelst)) + (loop '())) + ((get) + (process-queue (cons svect queuelst)) + (loop '())) + (else + (loop (cons svect queuelst)))))))) + "server thread")) + +(include "mockupclientlib.scm") + +;; send a sync to the pull port +(define th2 (make-thread + (lambda () + (let ((last-action-time (current-seconds))) + (let loop () + (thread-sleep! 5) + (let ((queuelen (string->number (dbaccess "server" 'sync "nada" #f))) + (last-action-delta #f)) + (if (> queuelen 1)(set! last-action-time (current-seconds))) + (set! last-action-delta (- (current-seconds) last-action-time)) + (print "Server: Got queuelen=" queuelen ", last-action-delta=" last-action-delta) + (if (< last-action-delta 60) + (loop) + (print "Server exiting, 25 seconds since last access")))))) + "sync thread")) + +(thread-start! th1) +(thread-start! th2) +(thread-join! th2) + +(let* ((run-time (- (current-seconds) start-time)) + (queries/second (/ total-db-accesses run-time))) + (print "Server exited! Total db accesses=" total-db-accesses " in " run-time " seconds for " queries/second " queries/second with max queue length of: " max-queue-len)) ADDED testzmq/testmockup.sh Index: testzmq/testmockup.sh ================================================================== --- /dev/null +++ testzmq/testmockup.sh @@ -0,0 +1,33 @@ +#!/bin/bash + +rm -f mockup.db + +echo Compiling mockupserver.scm and mockupclient.scm +csc mockupserver.scm +csc mockupclient.scm + +echo Starting server +./mockupserver & + +sleep 1 + +echo Starting clients +for i in a b c d e f g h i j k l m n o p q s t u v w x y z; + do + for k in a b; + do + for j in 0 1 2 3 4 5 6 7 8 9; + do + waittime=`random 0 60` + runtime=`random 5 120` + echo "Starting client $i$k$j with waittime $waittime and runtime $runtime" + (sleep $waittime;./mockupclient $i$k$j $runtime) & + done + done +done + +wait +echo testmockup.sh script done +# echo "Waiting for 5 seconds then killing all mockupserver and mockupclient processes" +# sleep 30 +# killall -v mockupserver mockupclient Index: utils/installall.sh ================================================================== --- utils/installall.sh +++ utils/installall.sh @@ -1,6 +1,6 @@ -#!/bin/bash +#! /bin/env bash set -x # Copyright 2007-2010, Matthew Welland. # @@ -11,11 +11,11 @@ # implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR # PURPOSE. echo You may need to do the following first: echo sudo apt-get install libreadline-dev -echo sudo apt-get install libwebkitgtk-dev +echo sudo apt-get install libwebkitgtk-dev echo sudo apt-get install libmotif3 -OR- set KTYPE=26g4 echo KTYPE can be 26, 26g4, or 32 echo KTYPE=$KTYPE echo You are using PREFIX=$PREFIX echo You are using proxy="$proxy" @@ -268,11 +268,12 @@ tar xfz ${ZEROMQ}.tar.gz cd ${ZEROMQ} ln -s $PREFIX/include/uuid src # LDFLAGS=-L$PREFIX/lib ./configure --prefix=$PREFIX - ./configure --enable-static --disable-shared --prefix=$PREFIX --with-uuid=$PREFIX LDFLAGS="-L$PREFIX/lib" CPPFLAGS="-fPIC -I$PREFIX/include" LIBS="-lgcc" + ./configure --enable-static --prefix=$PREFIX --with-uuid=$PREFIX LDFLAGS="-L$PREFIX/lib" CPPFLAGS="-fPIC -I$PREFIX/include" LIBS="-lgcc" + # --disable-shared CPPFLAGS="-fPIC # LDFLAGS="-L/usr/lib64 -L$PREFIX/lib" ./configure --enable-static --prefix=$PREFIX make make install CSC_OPTIONS="-I$PREFIX/include -L$CSCLIBS" chicken-install $PROX zmq # CSC_OPTIONS="-I$PREFIX/include -L$CSCLIBS" chicken-install $PROX -deploy -prefix $DEPLOYTARG zmq