Index: common.scm ================================================================== --- common.scm +++ common.scm @@ -50,11 +50,11 @@ (define *client-non-blocking-mode* #f) (define *server-id* #f) (define *server-info* #f) (define *time-to-exit* #f) (define *received-response* #f) -(define *default-numtries* 2) +(define *default-numtries* 5) (define *target* (make-hash-table)) ;; cache the target here; target is keyval1/keyval2/.../keyvalN (define *keys* (make-hash-table)) ;; cache the keys here (define *keyvals* (make-hash-table)) (define *toptest-paths* (make-hash-table)) ;; cache toptest path settings here Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -1120,27 +1120,35 @@ (res #f) (send-receive (lambda () (debug:print-info 11 "sending message") (send-message push-socket zdat) (debug:print-info 11 "message sent") - (let ((rmsg receive-message*)) ;; (if *client-non-blocking-mode* receive-message* receive-message))) + (let loop () ;; get the sender info ;; this should match (server:get-client-signature) ;; we will need to process "all" messages here some day - (rmsg sub-socket) + (receive-message* sub-socket) ;; now get the actual message - (set! res (db:string->obj (rmsg sub-socket)))))) + (let ((myres (db:string->obj (receive-message* sub-socket)))) + (if (equal? query-sig (vector-ref myres 1)) + (set! res (vector-ref myres 2)) + (loop)))))) (timeout (lambda () - (thread-sleep! 120) - (if (not res) - (if (> numretries 0) - (begin - (debug:print 0 "WARNING: no reply to query " params ", trying again") - (apply cdb:client-call zmq-sockets qtype immediate (- numretries 1) params)) - (begin - (debug:print 0 "ERROR: cdb:client-call timed out " params ", exiting.") - (exit 5))))))) + (let loop ((n numretries)) + (thread-sleep! 20) + (if (not res) + (if (> numretries 0) + (begin + (debug:print 0 "WARNING: no reply to query " params ", trying resend") + (debug:print-info 11 "re-sending message") + (send-message push-socket zdat) + (debug:print-info 11 "message re-sent") + (loop (- n 1))) + ;; (apply cdb:client-call zmq-sockets qtype immediate (- numretries 1) params)) + (begin + (debug:print 0 "ERROR: cdb:client-call timed out " params ", exiting.") + (exit 5)))))))) (debug:print-info 11 "Starting threads") (let ((th1 (make-thread send-receive "send receive")) (th2 (make-thread timeout "timeout"))) (thread-start! th1) (thread-start! th2) @@ -1230,15 +1238,16 @@ '(delete-tests-in-state "DELETE FROM tests WHERE state=? AND run_id=?;") '(tests:test-set-toplog "UPDATE tests SET final_logf=? WHERE run_id=? AND testname=? AND item_path='';") )) ;; do not run these as part of the transaction -(define db:special-queries '(;; rollup-tests-pass-fail - ;; db:roll-up-pass-fail-counts +(define db:special-queries '(rollup-tests-pass-fail + db:roll-up-pass-fail-counts login immediate flush + sync set-verbosity killserver)) ;; not used, intended to indicate to run in calling process (define db:run-local-queries '()) ;; rollup-tests-pass-fail)) @@ -1278,19 +1287,20 @@ (debug:print-info 11 "special-qry=" special-qry ", stmts=" stmts) (if special-qry ;; handle a query that cannot be part of the grouped queries (let* ((stmt-key (cdb:packet-get-qtype special-qry)) + (qry-sig (cdb:packet-get-query-sig special-qry)) (return-address (cdb:packet-get-client-sig special-qry)) (qry (hash-table-ref/default queries stmt-key #f)) (params (cdb:packet-get-params special-qry))) (debug:print-info 11 "Special queries/requests stmt-key=" stmt-key ", return-address=" return-address ", qry=" qry ", params=" params) (cond ;; Special queries ((string? qry) (apply sqlite3:execute db qry params) - (server:reply pubsock return-address #t)) + (server:reply pubsock return-address qry-sig #t #t)) ;; ((and (not (null? params)) ;; (procedure? (car params))) ;; (let ((proc (car params)) ;; (remparams (cdr params))) ;; ;; we are being handed a procedure so call it @@ -1302,11 +1312,11 @@ ((immediate) (let ((proc (car params)) (remparams (cdr params))) ;; we are being handed a procedure so call it (debug:print-info 11 "Running (apply " proc " " remparams ")") - (server:reply pubsock return-address (apply proc remparams)))) + (server:reply pubsock return-address qry-sig #t (apply proc remparams)))) ((login) (if (< (length params) 3) ;; should get toppath, version and signature '(#f "login failed due to missing params") ;; missing params (let ((calling-path (car params)) (calling-vers (cadr params)) @@ -1313,27 +1323,27 @@ (client-key (caddr params))) (if (and (equal? calling-path *toppath*) (equal? megatest-version calling-vers)) (begin (hash-table-set! *logged-in-clients* client-key (current-seconds)) - (server:reply pubsock return-address '(#t "successful login"))) ;; path matches - pass! Should vet the caller at this time ... + (server:reply pubsock return-address qry-sig #t '(#t "successful login"))) ;; path matches - pass! Should vet the caller at this time ... (list #f (conc "Login failed due to mismatch paths: " calling-path ", " *toppath*)))))) - ((flush) - (server:reply pubsock return-address '(#t "sucessful flush"))) + ((flush sync) + (server:reply pubsock return-address qry-sig #t (length data))) ((set-verbosity) (set! *verbosity* (car params)) - (server:reply pubsock return-address '(#t *verbosity*))) + (server:reply pubsock return-address qry-sig #t '(#t *verbosity*))) ((killserver) (debug:print 0 "WARNING: Server going down in 15 seconds by user request!") (open-run-close tasks:server-deregister tasks:open-db (cadr *server-info*) pullport: (caddr *server-info*)) (thread-start! (make-thread (lambda ()(thread-sleep! 15)(exit)))) - (server:reply pubsock return-address '(#t "exit process started"))) + (server:reply pubsock return-address qry-sig #t '(#t "exit process started"))) (else (debug:print 0 "ERROR: Unrecognised queued call " qry " " params) - (server:reply pubsock return-address #t))))) + (server:reply pubsock return-address qry-sig #f #t))))) (if (not (null? stmts)) (outerloop #f stmts))) ;; handle normal queries (let ((rem (sqlite3:with-transaction @@ -1344,20 +1354,21 @@ stmts (let innerloop ((hed (car stmts)) (tal (cdr stmts))) (let ((params (cdb:packet-get-params hed)) (return-address (cdb:packet-get-client-sig hed)) + (qry-sig (cdb:packet-get-query-sig hed)) (stmt-key (cdb:packet-get-qtype hed))) (if (or (not (hash-table-ref/default queries stmt-key #f)) (member stmt-key db:special-queries)) (begin (debug:print-info 11 "Handling special statement " stmt-key) (cons hed tal)) (begin (debug:print-info 11 "Executing " stmt-key " for " params) (apply sqlite3:execute (hash-table-ref queries stmt-key) params) - (server:reply pubsock return-address #t) + (server:reply pubsock return-address qry-sig #t #t) (if (not (null? tal)) (innerloop (car tal)(cdr tal)) '())) )))))))) (if (not (null? rem)) Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -118,12 +118,11 @@ ;; what to do when we quit ;; (on-exit (lambda () (if (and *toppath* *server-info*) - (begin - (open-run-close tasks:server-deregister-self tasks:open-db ipaddrstr p1 p2)) + (open-run-close tasks:server-deregister-self tasks:open-db (car *server-info*)) (let loop () (let ((queue-len 0)) (thread-sleep! (random 5)) (mutex-lock! *incoming-mutex*) (set! queue-len (length *incoming-data*)) @@ -145,14 +144,14 @@ (begin (db:process-queue pub-socket (cons packet queue-lst)) (loop '())) (loop (cons packet queue-lst))))))) -(define (server:reply pubsock target result) +(define (server:reply pubsock target query-sig success/fail result) (debug:print-info 11 "server:reply target=" target ", result=" result) (send-message pubsock target send-more: #t) - (send-message pubsock (db:obj->string result))) + (send-message pubsock (db:obj->string (vector success/fail query-sig result)))) ;; run server:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; (define (server:keep-running) @@ -169,41 +168,42 @@ (sleep 4) (loop)))))) (iface (cadr server-info)) (pullport (caddr server-info)) (pubport (cadddr server-info)) ;; id interface pullport pubport) - ;; (zmq-sockets (server:client-connect iface pullport pubport)) + (zmq-sockets (server:client-connect iface pullport pubport)) ) (let loop ((count 0)) (thread-sleep! 4) ;; no need to do this very often - ;; (let ((queue-len (string->number (cdb:client-call zmq-sockets 'sync #t 1)))) - ;; (print "Server running, count is " count) - (if (< count 1) ;; 3x3 = 9 secs aprox - (loop (+ count 1))) - - ;; NOTE: Get rid of this mechanism! It really is not needed... - (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info)) - - ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access - (if (> (+ *last-db-access* - ;; (* 48 60 60) ;; 48 hrs - ;; 60 ;; one minute - (* 60 60) ;; one hour - ) - (current-seconds)) - (begin - (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) *last-db-access*)) - (loop 0)) - (begin - (debug:print-info 0 "Starting to shutdown the server.") - ;; need to delete only *my* server entry (future use) - (set! *time-to-exit* #t) - (open-run-close tasks:server-deregister-self tasks:open-db (get-host-name)) - (thread-sleep! 1) - (debug:print-info 0 "Max cached queries was " *max-cache-size*) - (debug:print-info 0 "Server shutdown complete. Exiting") - (exit)))))) + ;; NB// sync currently does NOT return queue-length + (let ((queue-len (cdb:client-call zmq-sockets 'sync #t 1))) + ;; (print "Server running, count is " count) + (if (< count 1) ;; 3x3 = 9 secs aprox + (loop (+ count 1))) + + ;; NOTE: Get rid of this mechanism! It really is not needed... + (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info)) + + ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access + (if (> (+ *last-db-access* + ;; (* 48 60 60) ;; 48 hrs + ;; 60 ;; one minute + (* 60 60) ;; one hour + ) + (current-seconds)) + (begin + (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) *last-db-access*)) + (loop 0)) + (begin + (debug:print-info 0 "Starting to shutdown the server.") + ;; need to delete only *my* server entry (future use) + (set! *time-to-exit* #t) + (open-run-close tasks:server-deregister-self tasks:open-db (get-host-name)) + (thread-sleep! 1) + (debug:print-info 0 "Max cached queries was " *max-cache-size*) + (debug:print-info 0 "Server shutdown complete. Exiting") + (exit))))))) (define (server:find-free-port-and-open iface s port stype #!key (trynum 50)) (let ((s (if s s (make-socket stype))) (p (if (number? port) port 5555)) (old-handler (current-exception-handler))) Index: tests/tests.scm ================================================================== --- tests/tests.scm +++ tests/tests.scm @@ -159,11 +159,11 @@ (test "write env files" "nada.csh" (begin (save-environment-as-files "nada") (and (file-exists? "nada.sh") (file-exists? "nada.csh")))) -(test #f #t (cdb:client-call *runremote* 'immediate #f 1 (lambda ()(display "Got here eh!?") #t))) +(test #f #t (cdb:client-call *runremote* 'immediate #t 1 (lambda ()(display "Got here eh!?") #t))) ;; (set! *verbosity* 20) (test #f *verbosity* (cadr (cdb:set-verbosity *runremote* *verbosity*))) (test #f #f (cdb:roll-up-pass-fail-counts *runremote* 1 "test1" "" "PASS")) ;; (set! *verbosity* 1) @@ -311,11 +311,11 @@ ;; (exit) (test #f "myrun" (cdb:remote-run db:get-run-name-from-id #f 1)) -(test #f "dunno" (cdb:remote-run db:roll-up-pass-fail-counts #f 1 "nada" "" "PASS")) +(test #f #f (cdb:remote-run db:roll-up-pass-fail-counts #f 1 "nada" "" "PASS")) ;;====================================================================== ;; R E M O T E C A L L S ;;====================================================================== @@ -325,10 +325,11 @@ (print "Intensive: params=" params) (cdb:tests-register-test *runremote* 1 (conc "test" (random 20)) "") (apply cdb:test-set-status-state *runremote* test-id params) (cdb:pass-fail-counts *runremote* test-id (random 100) (random 100)) (cdb:test-rollup-test_data-pass-fail *runremote* test-id) + (cdb:roll-up-pass-fail-counts *runremote* 1 "test1" "" (cadr params)) (thread-sleep! 0.01)) ;; cache ordering granularity is at the second level. Should really be at the ms level '(("COMPLETED" "PASS" #f) ("NOT_STARTED" "FAIL" "Just testing") ("NOT_STARTED" "FAIL" "Just testing") ("NOT_STARTED" "FAIL" "Just testing")