Index: api.scm ================================================================== --- api.scm +++ api.scm @@ -244,11 +244,20 @@ (ttdat *server-info*) (server-state (tt-state ttdat)) (maxthreads 20) ;; make this a parameter? (status (cond ((> numthreads maxthreads) - 'busy) + (let* ((testsuite (common:get-testsuite-name)) + (mtexe (common:find-local-megatest)) + (proc (lambda () + ;; we are overloaded, try to start another server + (debug:print 0 *default-log-port* "Too many threads running, starting another server") + (tt:server-process-run *toppath* testsuite mtexe run-id)))) + (set! *server-start-requests* (cons proc *server-start-requests*))) + ;; 'busy + 'loaded ;; not ideal since the client will not backoff + ) (else 'ok))) (errmsg (case status ((busy) (conc "Server overloaded, "numthreads" threads in flight, current cmd: " cmd "\n current threads: " (api:get-threads))) ((loaded) (conc "Server loaded, "numthreads" threads in flight")) (else #f))) @@ -264,11 +273,11 @@ ;; (thread-sleep! 0.5)) (normal-proc cmd run-id params)) (else (normal-proc cmd run-id params)))) (meta (case cmd - ((ping) `((sstate . ,server-state))) + ((ping) `((sstate . ,server-state)(sload . ,numthreads))) (else `((wait . ,delay-wait))))) (payload (list status errmsg result meta))) ;; (cmd run-id params meta) (db:add-stats cmd run-id params (- (current-milliseconds) start-t)) payload)) @@ -277,12 +286,10 @@ ;; (set! *api-process-request-count* (- *api-process-request-count* 1)) ;; (serialize payload) (api:unregister-thread (current-thread)) result))) - - (define *api-halt-writes* #f) (define (api:dispatch-request dbstruct cmd run-id params) (if (not *no-sync-db*) Index: common.scm ================================================================== --- common.scm +++ common.scm @@ -401,20 +401,18 @@ (not (equal? (common:get-last-run-version) (common:version-signature)))) ;; From 1.70 to 1.81, db's are compatible. - +;; +;; BUG: This logic is almost certainly not quite correct. +;; (define (common:api-changed?) - (let* ( - (megatest-major-version (substring (->string megatest-version) 0 4)) - (run-major-version (substring (conc (common:get-last-run-version)) 0 4)) - ) - (and (not (or (equal? megatest-major-version "1.80") (equal? megatest-major-version "1.81"))) - (not (equal? megatest-major-version run-major-version))) - ) -) + (let* ((megatest-major-version (substring (->string megatest-version) 0 4)) + (run-major-version (substring (conc (common:get-last-run-version)) 0 4))) + (and (not (member megatest-major-version '("1.81" "1.80"))) + (not (equal? megatest-major-version run-major-version))))) ;;====================================================================== ;; Move me elsewhere ... ;; RADT => Why do we meed the version check here, this is called only if version misma ;; Index: rmtmod.scm ================================================================== --- rmtmod.scm +++ rmtmod.scm @@ -18,12 +18,17 @@ ;;====================================================================== (declare (unit rmtmod)) (declare (uses debugprint)) +;; (declare (uses debugprint.import)) (declare (uses commonmod)) +;; (declare (uses commonmod.import)) (declare (uses dbfile)) ;; needed for records +(declare (uses dbmod)) +;; (declare (uses tcp-transportmod)) +;; (declare (uses tcp-transportmod.import)) ;; (declare (uses apimod)) ;; (declare (uses apimod.import)) ;; (declare (uses ulex)) @@ -33,10 +38,14 @@ * (import scheme chicken data-structures extras matchable srfi-69) (import (prefix sqlite3 sqlite3:) posix typed-records srfi-18) (import commonmod dbfile debugprint) ;; (prefix commonmod cmod:)) +(import dbmod + ;; tcp-transportmod + ) + ;; (import apimod) ;; (import (prefix ulex ulex:)) (include "db_records.scm") @@ -305,7 +314,25 @@ ;; call end of eud of run detection for posthook - from merge, is it needed? ;; (launch:end-of-run-check run-id) all-ids) ))))) +;;====================================================================== +;; Misc +;;====================================================================== + +;; (define (rmtmod:wait-on-server-load run-id ttdat) +;; (let* ((dbfname (dbmod:run-id->dbfname run-id)) +;; (get-lowest-thread-load +;; (lambda () +;; (let* ((sdats (tt:get-server-info-sorted ttdat dbfname))) +;; (car (map tt:get-server-threads sdats)))))) +;; (if ttdat +;; (let loop () +;; (if (> (get-lowest-thread-load) 5) ;; load is pretty high +;; (begin +;; (debug:print 0 *default-log-port* "Servers appear overloaded, waiting...") +;; (thread-sleep! 1) +;; (loop)))) +;; (debug:print 0 *default-log-port* "Can't wait on server load, *ttdat* not set")))) ) Index: runs.scm ================================================================== --- runs.scm +++ runs.scm @@ -29,10 +29,11 @@ (declare (uses mt)) (declare (uses archive)) (declare (uses mtargs)) (declare (uses rmtmod)) (declare (uses dbfile)) +(declare (uses tcp-transportmod)) (use (prefix sqlite3 sqlite3:) srfi-1 posix regex regex-case srfi-69 (srfi 18) posix-extras directory-utils pathname-expand typed-records format sxml-serializer sxml-modifications matchable) @@ -48,10 +49,11 @@ (import commonmod debugprint rmtmod dbfile + tcp-transportmod (prefix mtargs args:)) ;; use this struct to facilitate refactoring ;; @@ -1191,13 +1193,17 @@ (common:wait-for-normalized-load maxload "Waiting for load to drop before starting more tests" #f)) ;; jobtools maxhomehostload is intended to prevent overloading on the homehost which can cause database corruption issues (if maxhomehostload (common:wait-for-homehost-load maxhomehostload - (conc "Waiting for homehost load to drop below normalized value of " maxhomehostload)))))) + (conc "Waiting for homehost load to drop below normalized value of " maxhomehostload))) + + ;; lastly lets check the servers are not overloaded by looking at threads + (tt:wait-on-server-load run-id *ttdat*) + + ))) - (if (and (not (null? prereqs-not-met)) (runs:lownoise (conc "waiting on tests " prereqs-not-met hed) 60)) (debug:print-info 2 *default-log-port* "waiting on tests; " (string-intersperse (runs:mixed-list-testname-and-testrec->list-of-strings prereqs-not-met) ", "))) Index: tcp-transportmod.scm ================================================================== --- tcp-transportmod.scm +++ tcp-transportmod.scm @@ -205,16 +205,27 @@ (begin (debug:print-info 2 *default-log-port* "already connected to a server for " dbfname) conn) ;; we are already connected to the server ;; no conn + + ;; find server with lowest number of threads running (i.e. lowest load) + ;; (let* ((sdats (tt:get-server-info-sorted ttdat dbfname)) (sdat (if (null? sdats) #f - (car sdats)))) - (debug:print-info 2 *default-log-port* "found sdat " sdat) - (match sdat + ;; choose server with lowest threads count + (car (sort sdats + (lambda (a b) + (let* ((load-a (tt:get-server-threads a)) + (load-b (tt:get-server-threads b))) + (< load-a load-b)))))))) + + ;; (let ((indx (max (random (- (length sdats) 1)) 0))) + ;; (list-ref sdats indx))))) + ;; (debug:print-info 1 *default-log-port* "found sdat " sdat" from sdats: "sdats) + (match sdat ((host port start-time server-id pid dbfname2 servinffile) (assert (equal? dbfname dbfname2) "FATAL: read server info from wrong file.") (debug:print-info 2 *default-log-port* "no conn - in match servinffile:" servinffile) (let* ((host-port (conc host":"port)) (conn (make-tt-conn @@ -274,11 +285,57 @@ ;; returns ( result . ping_time ) (define (tt:timed-ping host port server-id) (let* ((start-time (current-milliseconds)) (result (tt:ping host port server-id))) (cons result (- (current-milliseconds) start-time)))) - + +;; host:port => ( meta . when-updated) +(define *server-load* (make-hash-table)) + +(define (tt:save-server-meta host port meta) + (hash-table-set! *server-load* (conc host":"port) (cons meta (current-seconds)))) + +(define (tt:get-server-threads dat) + (let* ((host (car dat)) + (port (cadr dat)) + (dat (tt:get-server-meta host port #t))) + ;; (debug:print 0 *default-log-port* "host: "host" port: "port" dat: "dat) + (if (list? dat) + (or (alist-ref 'sload dat) 99998) + 99999))) ;; absurd number means don't use this one + +;; lazy get, does not auto-refresh meta, this might be a problem +;; +(define (tt:get-server-meta host port #!optional (do-ping #f)) + (let* ((get-meta (lambda () + (let* ((dat (hash-table-ref/default *server-load* (conc host":"port) #f))) + (if dat (car dat) #f)))) + (meta (get-meta))) + (if (and (not meta) + do-ping) + (begin + (tt:timed-ping host port #f) + (get-meta)) + meta))) + +(define (tt:wait-on-server-load run-id ttdat) + (if ttdat ;; if no server yet just pass on through + (let* ((dbfname (dbmod:run-id->dbfname run-id)) + (get-lowest-thread-load + (lambda () + (let* ((sdats (tt:get-server-info-sorted ttdat dbfname))) + (car (map tt:get-server-threads sdats)))))) + (if ttdat + (let loop ((count 0)) + (let* ((lowestload (get-lowest-thread-load))) + (if (> lowestload 5) ;; load is pretty high + (begin + (debug:print 0 *default-log-port* "Servers appear overloaded with "lowestload" threads, waiting...") + (thread-sleep! 1) + (if (< count 10) + (loop (+ count 1))))))) + (debug:print 0 *default-log-port* "Can't wait on server load, *ttdat* not set"))))) (define (tt:ping host port server-id #!optional (tries-left 5)) (let* ((res (tt:send-receive-direct host port `(ping #f #f #f) ping-mode: #t)) ;; please send me your server-id (try-again (lambda () (if (> tries-left 0) @@ -289,25 +346,27 @@ ;; ;; need two threads, one a 5 second timer ;; (match res ((status errmsg result meta) + (tt:save-server-meta host port meta) (if (equal? result server-id) (let* ((server-state (alist-ref 'sstate meta))) ;; (debug:print 0 *default-log-port* "Ping to "host":"port" successful.") (or server-state 'unk)) ;; then we are good (begin - (debug:print 0 *default-log-port* "WARNING: server-id does not match, expected: "server-id", got: "result) + (if server-id + (debug:print 0 *default-log-port* "WARNING: server-id does not match, expected: "server-id", got: "result)) #f))) (else ;; (debug:print 0 *default-log-port* "res not in form (status errmsg result meta), got: "res) (try-again))))) ;; client side handler ;; ;;(tt:handler # get-keys #f () 2 #f "/home/matt/data/megatest/ext-tests" #f "main.db" "ext-tests" "/home/matt/data/megatest/bin/.22.04/../megatest") -;; +;;g (define (tt:handler ttdat cmd run-id params attemptnum readonly-mode dbfname testsuite mtexe server-start-proc) ;; connect-to-server will start a server if needed. (let* ((areapath (tt-areapath ttdat)) (conn (tt:client-connect-to-server ttdat dbfname run-id testsuite server-start-proc))) ;; looks up conn keyed by dbfname (if conn @@ -332,10 +391,13 @@ (thread-sleep! dly) (tt:handler ttdat cmd run-id params (+ attemptnum 1) readonly-mode dbfname testsuite mtexe server-start-proc))) ((loaded) (debug:print 0 *default-log-port* "WARNING: server for "dbfname" is loaded, slowing queries.") (tt:backoff-incr (tt-conn-host conn)(tt-conn-port conn)) + + ;; this would be a good place to force reconnection and connect to a different server + result) ;; (tt:handler ttdat cmd run-id params (+ attemptnum 1) readonly-mode dbfname testsuite mtexe)) (else result))) (else ;; did not receive properly formated result (if (not res) ;; tt:send-receive telling us that communication failed @@ -382,11 +444,13 @@ (begin (thread-sleep! 1) ;; no conn yet set up, give it a rest and try again (tt:handler ttdat cmd run-id params attemptnum readonly-mode dbfname testsuite mtexe server-start-proc))))) ;; gets server info and appends path to server file -;; sorts by age, oldest first +;; sorts by age, --oldest-- now newest first +;; +;; move the ping here? ;; ;; returns list of (host port startseconds server-id servinfofile) ;; (define (tt:get-server-info-sorted ttdat dbfname) (let* ((areapath (tt-areapath ttdat)) @@ -395,11 +459,11 @@ (sorted (sort sdats (lambda (a b) (let* ((starta (list-ref a 2)) (startb (list-ref b 2))) (if (eq? starta startb) (string>? (list-ref a 3)(list-ref b 3)) ;; if servers started at same time look at server-id - (< starta startb)))))) + (> starta startb)))))) (count 0)) (for-each (lambda (rec) (if (or (> (length sorted) 1) (common:low-noise-print 120 "server info sorted")) @@ -518,10 +582,12 @@ ;; server ;;====================================================================== (define (tt:sync-dbs ttdat) #f) + +(define *server-start-requests* '()) ;; start the listener and start responding to requests ;; ;; NOTE: organise by dbfname, not run-id so we don't need ;; to pull in more modules @@ -573,32 +639,36 @@ (keep-srv (and good-ping same-host))) (if keep-srv (loop (cdr servrs) host (cons servdat result)) - (begin - ;; (debug:print-info 0 *default-log-port* "good-ping: " good-ping " same-host: " same-host "keep-srv: " keep-srv) - (handle-exceptions - exn - (debug:print-info 0 *default-log-port* "Error removing server info file: "servinfofile", " - (condition->list exn)) - (delete-file* servinfofile)) - (loop (cdr servrs) prime-host result))))) + (let* ((modtime (file-modification-time servinfofile))) + ;; if the .servinfo hasn't been touched in five min + ;; we can be pretty sure the server is truly dead + (if (> (- (current-seconds) modtime) 360) + (handle-exceptions + exn + (debug:print-info 0 *default-log-port* + "Error removing server info file: "servinfofile", " + (condition->list exn)) + (delete-file* servinfofile)) + (loop (cdr servrs) prime-host result)))))) (else - ;; can't delete it as we don't have a filename. NOTE: Should really never get here. + ;; can't delete it as we don't have a filename. NOTE: Should never get here. (debug:print-info 0 *default-log-port* "ERROR: bad servinfo record \""servdat"\"") (loop (cdr servrs) prime-host result)) ;; drop ))))) (home-host (if (null? good-srvrs) #f (caar good-srvrs)))) ;; by here we have a trustworthy list of servers and we have removed the .servinfo file for any unresponsive servers ;; and the list is in good-srvrs + ;; (cond ((not home-host) ;; no servers yet, go ahead and start (debug:print-info 0 *default-log-port* "No servers yet, starting on "(get-host-name))) - ((> (length good-srvrs) 2) ;; don't need more, just exit + ((> (length good-srvrs) 3) ;; don't need more, just exit (debug:print-info 0 *default-log-port* "Have "(length good-srvrs)", no need for more, exiting.") (exit)) ((not (equal? home-host (get-host-name))) ;; there is a home-host and we are not on it (debug:print-info 0 *default-log-port* "Prime main server is on host "home-host", but we are on host "(get-host-name)", exiting.") (exit)) @@ -622,10 +692,12 @@ (begin (debug:print 0 *default-log-port* "ERROR: (tt-port ttdat) no port set! Exiting.") (exit))))) ;; create a servinfo file start keep-running + ;; On WSL there seems to be a race condition where the .servinfo file + ;; is not created fast enough (debug:print 0 *default-log-port* "Creating servinfo file for " dbfname) (tt:create-server-registration-file ttdat dbfname) (procinf-status-set! *procinf* "running") (tt-state-set! ttdat 'running) (dbfile:with-no-sync-db @@ -648,10 +720,12 @@ (dbfile:insert-or-update-process nsdb *procinf*))) (debug:print 0 *default-log-port* "Exiting now.") (exit)))))) (define (tt:keep-running ttdat dbfname dbstruct) + + (thread-sleep! 1) ;; at this point the server is running and responding to calls, we just monitor ;; for db calls and exit if there are none. ;; if I am not in the first 3 servers, exit @@ -664,18 +738,21 @@ (my-index (list-index (lambda (x) (equal? (list-ref x 6) (tt-servinf-file ttdat))) servers)) (ok (cond + ((not my-index) + (debug:print 0 *default-log-port* "WARNING: Apparently I don't exist.") + #f) ;; keep trying or give up? ((not *server-run*) (debug:print 0 *default-log-port* "WARNING: received a stop server from client by remote request.") #f) ((null? servers) (debug:print 0 *default-log-port* "WARNING: no servinfo files found, this cannot be.") #f) ;; not ok - ((> my-index 2) - (debug:print 0 *default-log-port* "WARNING: there are more than two servers ahead of me, I'm not needed, exiting.") + ((> my-index 3) + (debug:print 0 *default-log-port* "WARNING: there are more than three servers ahead of me, I'm not needed, exiting.") #f) ;; not ok to not be in first three ((eq? (tt-state ttdat) 'running) #t) ;; we are good to keep going ((> (- (current-seconds) start-time) 30) (debug:print 0 *default-log-port* "WARNING: over 30 seconds and not yet in runnning mode. Exiting.") #f) @@ -696,11 +773,19 @@ (set! (file-modification-time sinfo-file) (current-seconds)) ((dbr:dbstruct-sync-proc dbstruct) last-update) (dbr:dbstruct-last-update-set! dbstruct curr-secs)))) (if (< (- (current-seconds) (tt-last-access ttdat)) (tt-server-timeout-param)) - (begin + ;; process any requests to start a new server due to load on this one + (let* ((requests *server-start-requests*)) + (set! *server-start-requests* '()) + (if (> (length requests) 0) + (debug:print-info 0 *default-log-port* "Processing "(length requests)" server start requests")) + (for-each (lambda (proc) + (proc) + (thread-sleep! 1)) + requests) (thread-sleep! 5) (loop))))) (tt:shutdown-server ttdat) (debug:print 0 *default-log-port* "INFO: Server timed out, exiting from tt:keep-running."))) @@ -728,16 +813,26 @@ (assert (and host port) "FATAL: tt:create-server-registration-file called with no conn, dbfname="dbfname) (tt-servinf-file-set! ttdat servinf) (with-output-to-file servinf (lambda () (print "SERVER STARTED: "host":"port" AT "(current-seconds)" server-id: "serv-id" pid: "(current-process-id)" dbfname: "dbfname))) - serv-id)) + (let loop ((count 0)) + (if (not (file-exists? servinf)) + (begin + (debug:print 0 *default-log-port* "WARNING: file "servinf" was created but it doesn't show up on disk! We'll try again.") + (thread-sleep! 1) + (if (> count 10) + (debug:print 0 *default-log-port* "WARNING: file "servinf" was not created.") + (loop (+ count 1)))))) + serv-id)) ;; find valid server ;; get servers listed, last part of name must match : ;; if more than one, wait one second and look again -;; future: ping oldest, if alive remove other : files +;; +;; NOTE: this only gets the servinfo data, no network activity here +;; i.e. no ping etc. ;; (define (tt:find-server areapath dbfname) (let* ((servdir (tt:get-servinfo-dir areapath)) (sfiles (glob (conc servdir"/*:"dbfname))) (goodfiles '())) @@ -802,10 +897,12 @@ (define (tt:too-recent-server-start dbfname) (let* ((last-run-time (hash-table-ref/default *last-server-start* dbfname #f))) (and last-run-time (< (- (current-seconds) last-run-time) 5)))) + +(define *last-server-start-request-time* 0) ;; Given an area path, start a server process ### NOTE ### > file 2>&1 ;; if the target-host is set ;; try running on that host ;; incidental: rotate logs in logs/ dir. @@ -814,26 +911,28 @@ (assert areapath "FATAL: tt:server-process-run called without areapath defined.") (assert testsuite "FATAL: tt:server-process-run called without testsuite defined.") (assert mtexe "FATAL: tt:server-process-run called without mtexe defined.") ;; mtest -server - -m testsuite:ext-tests -db 6.db (let* ((dbfname (dbmod:run-id->dbfname run-id))) - (if (tt:too-recent-server-start dbfname) + (if (or (< (- (current-seconds) *last-server-start-request-time*) 5) ;; attempted start less than 5 sec ago + (tt:too-recent-server-start dbfname)) #f (let* ((load (get-normalized-cpu-load)) (srvrs (tt:find-server areapath dbfname)) (trying (length srvrs)) (nrun (number-of-processes-running (conc "mtest.*server.*"testsuite".*"dbfname)))) + (set! *last-server-start-request-time* (current-seconds)) (cond - ((> load 2.0) - (debug:print 0 *default-log-port* "Normalized load "load" on " (get-host-name) " is over the limit of 2.0. Not starting a server. Please reduce the load on "(get-host-name)" by killing some processes") - (thread-sleep! 1) + ((> load 4.0) + (debug:print 0 *default-log-port* "Normalized load "load" on " (get-host-name) " is over the limit of 4.0. Not starting a server. Please reduce the load on "(get-host-name)" by killing some processes") + (thread-sleep! 1) ;; I'm not convinced that a delay here is helpful. -mrw- #f) ((> nrun 100) (debug:print 0 *default-log-port* nrun" servers running on " (get-host-name) ", not starting another.") (thread-sleep! 1) #f) - ((> trying 2) + ((> trying 3) (debug:print 0 *default-log-port* trying" servers registered in .servinfo dir. not starting another.") (thread-sleep! 1) #f) (else (if (not (file-exists? (conc areapath"/logs")))