;;======================================================================
;; Copyright 2017, Matthew Welland.
;;
;; This file is part of Megatest.
;;
;; Megatest is free software: you can redistribute it and/or modify
;; it under the terms of the GNU General Public License as published by
;; the Free Software Foundation, either version 3 of the License, or
;; (at your option) any later version.
;;
;; Megatest is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
;; GNU General Public License for more details.
;;
;; You should have received a copy of the GNU General Public License
;; along with Megatest. If not, see <http://www.gnu.org/licenses/>.
;;======================================================================
(declare (unit tcp-transportmod))
(declare (uses debugprint))
(declare (uses commonmod))
(declare (uses dbfile))
(declare (uses dbmod))
(declare (uses portlogger))
(use address-info tcp)
(module tcp-transportmod
*
(import scheme)
(cond-expand
(chicken-4
(import (prefix sqlite3 sqlite3:)
chicken
extras
hostinfo
ports
posix
files
data-structures
directory-utils
tcp
))
(chicken-5
(import chicken.base
chicken.condition
chicken.file
chicken.pathname
chicken.process-context.posix
chicken.process
chicken.sort
chicken.string
chicken.time
chicken.tcp
chicken.random
chicken.file.posix
chicken.pretty-print
chicken.io
chicken.port
chicken.process-context
system-information)
(define unsetenv unset-environment-variable!)
))
(import address-info
directory-utils
matchable
md5
message-digest
regex
regex-case
s11n
srfi-1
srfi-18
srfi-4
srfi-69
stack
typed-records
tcp-server
debugprint
commonmod
dbfile
dbmod
portlogger
)
;;======================================================================
;; client
;;======================================================================
;; (define keep-age-param (make-parameter 10)) ;; qif file age, if over move to attic
;; Used ONLY for client
;;
(defstruct tt-conn
host
port
host-port
dbfname
server-id
server-start
servinf-file
pid
)
;; Used for BOTH clients and servers
(defstruct tt
;; client related
(conns (make-hash-table)) ;; dbfname -> conn
;; server related
(state 'starting)
(areapath #f)
(host #f)
(port #f)
(conn #f)
(cleanup-proc #f)
(handler #f) ;; receives data and responds
(socket #f)
(thread #f)
(host-port #f)
(cmd-thread #f)
(ro-mode #f)
(ro-mode-checked #f)
(last-access (current-seconds))
(servinf-file #f)
(last-serv-start 0)
)
;; parameters
;;
(define tt-server-timeout-param (make-parameter 600))
;; make ttdat visible
(define *server-info* #f)
(define *server-run* #t)
(define (tt:make-remote areapath)
(make-tt areapath: areapath))
;; 1 ... or #f
;; and check that dbfname matches. FIXME: the propagation of dbfname and run-id
;; might not make the best sense
;;
(define (tt:valid-run-id run-id dbfname)
(and (or (number? run-id)
(not run-id))
(equal? (dbfile:run-id->dbfname run-id) dbfname)))
(tcp-buffer-size 2048)
;; (max-connections 4096)
(define (tt:get-conn ttdat dbfname)
(hash-table-ref/default (tt-conns ttdat) dbfname #f))
;; do all the busy work of finding and setting up conn for
;; connecting to a server
;; This function, `tt:client-connect-to-server`, is designed to manage connections between a client and a server within a testing framework.
;; The function takes four arguments:
;; 1. `ttdat`: a data structure that holds information about the testing environment or connections.
;; 2. `dbfname`: The name of the database file that the client wants to connect to.
;; 3. `run-id`: An identifier for the current run of the test suite.
;; 4. `testsuite`: The test suite that is being run.
;;
;; Here's a step-by-step explanation of what the function does:
;;
;; 1. It first asserts that the `run-id` is valid for the given `dbfname` using the `tt:valid-run-id` function. If the `run-id` is not valid, it raises a fatal error.
;; 2. It prints debug information indicating that the function `tt:client-connect-to-server` has been called with the given `dbfname`.
;; 3. It attempts to retrieve an existing connection to the server from a hash table (`tt-conns`) using the `dbfname` as the key. If a connection already exists, it prints debug information and returns the existing connection.
;; 4. If no existing connection is found, it retrieves the current server information from the servinfo file, using the `tt:get-current-server-info` function.
;; 5. It uses pattern matching to destructure the server information into variables (`host`, `port`, `start-time`, `server-id`, `pid`, `dbfname2`, `servinffile`). It then asserts that the `dbfname` from the server info matches the one provided to the function.
;; 6. It constructs a connection object (`conn`) with the server information.
;; 7. It attempts to ping the server using `tt:timed-ping` to verify that the server is running and can be communicated with.
;; 8. Depending on the result of the ping:
;; - If the server is running (`running`), it prints debug information, saves the connection in the hash table, and returns the connection.
;; - If the server is starting (`starting`), it sleeps for 2 seconds and then recursively calls itself to retry the connection.
;; - If the server is neither running nor starting, it checks if it's been more than 10 seconds since the last server start attempt. If so, it attempts to start the server using `server-start-proc` and then sleeps for 1 second before retrying the connection.
;; 9. If no server information is found (`else` case), it checks if it's been more than 3 seconds since the last server start attempt. If so, it starts a new server using `server-start-proc`, updates the last server start time, and sleeps for 4 seconds.
;; 10. It then sleeps for 1 second and prints debug information before recursively calling itself to retry the connection.
;;
;; The function uses recursion to keep trying to connect to the server, with various sleep intervals to prevent overwhelming the system with connection attempts or server starts.
;; It also uses a hash table to cache connections and avoid reconnecting to a server if a connection already exists.
;; The function is designed to handle different server states and ensure that a server is running and available before returning a valid connection to the caller.
;;
(define (tt:client-connect-to-server ttdat dbfname run-id testsuite server-start-proc)
(assert (tt:valid-run-id run-id dbfname) "FATAL: invalid run-id "run-id)
(debug:print-info 2 *default-log-port* "tt:client-connect-to-server " dbfname)
(let* ((conn (tt:get-conn ttdat dbfname))
(server-start-proc (or server-start-proc
(lambda ()
(assert (equal? dbfname "main.db") ;; only main.db is started here
"FATAL: called server-start-proc for db other than main.db")
(tt:server-process-run
(tt-areapath ttdat)
testsuite ;; (dbfile:testsuite-name)
(common:find-local-megatest)
run-id)))))
(if conn
(begin
(debug:print-info 2 *default-log-port* "already connected to a server for " dbfname)
conn) ;; we are already connected to the server
;; no conn
;; find server with lowest number of threads running (i.e. lowest load)
;;
(let* ((sdats (tt:get-server-info-sorted ttdat dbfname))
(sdat (if (null? sdats)
#f
;; choose server with lowest threads count
(car (sort sdats
(lambda (a b)
(let* ((load-a (tt:get-server-threads a))
(load-b (tt:get-server-threads b)))
(< load-a load-b))))))))
;; (let ((indx (max (random (- (length sdats) 1)) 0)))
;; (list-ref sdats indx)))))
;; (debug:print-info 1 *default-log-port* "found sdat " sdat" from sdats: "sdats)
(match sdat
((host port start-time server-id pid dbfname2 servinffile)
(assert (equal? dbfname dbfname2) "FATAL: read server info from wrong file.")
(debug:print-info 2 *default-log-port* "no conn - in match servinffile:" servinffile)
(let* ((host-port (conc host":"port))
(conn (make-tt-conn
host: host
port: port
host-port: host-port
dbfname: dbfname
servinf-file: servinffile
server-id: server-id
server-start: start-time
pid: pid)))
;; verify we can talk to this server
(let* ((result (tt:timed-ping host port server-id))
(ping-res (car result))
(ping (cdr result)))
(debug:print-info 2 *default-log-port* "host " host " port " port " ping time: " ping " result " ping-res)
(case ping-res
((running)
(debug:print-info 2 *default-log-port* "Setting conn = " conn " in hash table")
(hash-table-set! (tt-conns ttdat) dbfname conn) ;;; is this ok to save before validating that the connection is good?
conn)
((starting)
(thread-sleep! 0.5)
(debug:print-info 0 *default-log-port* "server for " dbfname " is in starting state, retrying connect")
(tt:client-connect-to-server ttdat dbfname run-id testsuite server-start-proc))
(else
(let* ((curr-secs (current-seconds)))
;; rm the (last server) would go here
(if (> (- curr-secs (tt-last-serv-start ttdat)) 10)
(begin
(debug:print-info 0 *default-log-port* "Unreachable server at "
host":"port" with servinfo file "servinffile", removing it")
(if (file-exists? servinffile)
(handle-exceptions
exn
#f
(delete-file servinffile)))
(tt-last-serv-start-set! ttdat curr-secs)
(debug:print-info 0 *default-log-port* "Starting a new server on " (get-host-name))
(server-start-proc))) ;; start server if 10 sec since last attempt
(thread-sleep! 1)
(debug:print-info 0 *default-log-port* "Retrying connect")
(tt:client-connect-to-server ttdat dbfname run-id testsuite server-start-proc)))))))
(else ;; no good server found, if haven't started server in > 5 secs, start another
(if (> (- (current-seconds) (tt-last-serv-start ttdat)) 3) ;; BUG - grow this number really do not want to swamp the machine with servers
(begin
(debug:print-info 0 *default-log-port* "Starting server for "dbfname " on " (get-host-name))
(server-start-proc)
(tt-last-serv-start-set! ttdat (current-seconds))
(thread-sleep! 6)
))
(thread-sleep! 1)
(debug:print-info 0 *default-log-port* "Connect to server from " (get-host-name) " for " dbfname)
(tt:client-connect-to-server ttdat dbfname run-id testsuite server-start-proc)))))))
;; returns ( result . ping_time )
(define (tt:timed-ping host port server-id)
(let* ((start-time (current-milliseconds))
(result (tt:ping host port server-id)))
(cons result (- (current-milliseconds) start-time))))
;; host:port => ( meta . when-updated)
(define *server-load* (make-hash-table))
(define (tt:save-server-meta host port meta)
(hash-table-set! *server-load* (conc host":"port) (cons meta (current-seconds))))
(define (tt:get-server-threads dat)
(let* ((host (car dat))
(port (cadr dat))
(dat (tt:get-server-meta host port #t)))
;; (debug:print 0 *default-log-port* "host: "host" port: "port" dat: "dat)
(if (list? dat)
(or (alist-ref 'sload dat) 99998)
99999))) ;; absurd number means don't use this one
;; lazy get, does not auto-refresh meta, this might be a problem
;;
(define (tt:get-server-meta host port #!optional (do-ping #f))
(let* ((get-meta (lambda ()
(let* ((dat (hash-table-ref/default *server-load* (conc host":"port) #f)))
(if dat (car dat) #f))))
(meta (get-meta)))
(if (and (not meta)
do-ping)
(begin
(tt:timed-ping host port #f)
(get-meta))
meta)))
(define (tt:wait-on-server-load run-id ttdat)
(if ttdat ;; if no server yet just pass on through
(let* ((dbfname (dbmod:run-id->dbfname run-id))
(get-lowest-thread-load
(lambda ()
(let* ((sdats (tt:get-server-info-sorted ttdat dbfname)))
(car (map tt:get-server-threads sdats))))))
(if ttdat
(let loop ((count 0))
(let* ((lowestload (get-lowest-thread-load)))
(if (> lowestload 5) ;; load is pretty high
(begin
(debug:print 0 *default-log-port* "Servers appear overloaded with "lowestload" threads, waiting...")
(thread-sleep! 1)
(if (< count 10)
(loop (+ count 1)))))))
(debug:print 0 *default-log-port* "Can't wait on server load, *ttdat* not set")))))
(define (tt:ping host port server-id #!optional (tries-left 5))
(let* ((res (tt:send-receive-direct host port `(ping #f #f #f) ping-mode: #t)) ;; please send me your server-id
(try-again (lambda ()
(if (> tries-left 0)
(begin
(thread-sleep! 1)
(tt:ping host port server-id (- tries-left 1)))
#f))))
;;
;; need two threads, one a 5 second timer
;;
(match res
((status errmsg result meta)
(tt:save-server-meta host port meta)
(if (equal? result server-id)
(let* ((server-state (alist-ref 'sstate meta)))
;; (debug:print 0 *default-log-port* "Ping to "host":"port" successful.")
(or server-state 'unk)) ;; then we are good
(begin
(if server-id
(debug:print 0 *default-log-port* "WARNING: server-id does not match, expected: "server-id", got: "result))
#f)))
(else
;; (debug:print 0 *default-log-port* "res not in form (status errmsg result meta), got: "res)
(try-again)))))
;; client side handler
;;
;;(tt:handler #<tt> get-keys #f () 2 #f "/home/matt/data/megatest/ext-tests" #f "main.db" "ext-tests" "/home/matt/data/megatest/bin/.22.04/../megatest")
;;g
(define (tt:handler ttdat cmd run-id params attemptnum readonly-mode dbfname testsuite mtexe server-start-proc)
;; connect-to-server will start a server if needed.
(let* ((areapath (tt-areapath ttdat))
(conn (tt:client-connect-to-server ttdat dbfname run-id testsuite server-start-proc))) ;; looks up conn keyed by dbfname
(if conn
;; have connection, call the server
(let* ((res (tt:send-receive ttdat conn cmd run-id params)))
;; res is (status errmsg result meta)
(match res
((status errmsg result meta)
(if (list? meta)
(let* ((delay-wait (alist-ref 'delay-wait meta)))
(if (and (number? delay-wait)
(> delay-wait 0))
(begin
(debug:print 0 *default-log-port* "Server is loaded, delaying "delay-wait" seconds")
(thread-sleep! delay-wait)))))
(case status
((busy) ;; result will be how long the server wants you to delay
(let* ((raw-dly (if (number? result) result 0.1))
(dly (+ raw-dly (/ attemptnum 10)))) ;; (* raw-dly (/ attemptnum 2))))
(debug:print 0 *default-log-port* "WARNING: server for "dbfname" is busy, cmd is "cmd", will try again in "dly" seconds. This is attempt "(- attemptnum 1))
(debug:print 0 *default-log-port* errmsg)
(thread-sleep! dly)
(tt:handler ttdat cmd run-id params (+ attemptnum 1) readonly-mode dbfname testsuite mtexe server-start-proc)))
((loaded)
(debug:print 0 *default-log-port* "WARNING: server for "dbfname" is loaded, slowing queries.")
(tt:backoff-incr (tt-conn-host conn)(tt-conn-port conn))
;; this would be a good place to force reconnection and connect to a different server
result) ;; (tt:handler ttdat cmd run-id params (+ attemptnum 1) readonly-mode dbfname testsuite mtexe))
(else
result)))
(else ;; did not receive properly formated result
(if (not res) ;; tt:send-receive telling us that communication failed
(let* ((host (tt-conn-host conn))
(port (tt-conn-port conn))
;; (dbfname (tt-conn-port conn)) ;; 192.168.0.127:4242-726924:4.db
(pid (tt-conn-pid conn))
;;(servinf (tt-conn-servinf-file conn)))
(servinf (tt-servinf-file ttdat))) ;; (conc areapath"/.servinfo/"host":"port"-"pid":"dbfname))) ;; TODO, use (server:get-servinfo-dir areapath)
(hash-table-set! (tt-conns ttdat) dbfname #f) ;; clear out the conn for this dbfname to force finding new server
(if (and servinf (file-exists? servinf))
(begin
(if (< attemptnum 10)
(begin
(thread-sleep! 0.5)
(tt:handler ttdat cmd run-id params (+ attemptnum 1) readonly-mode dbfname testsuite mtexe server-start-proc))
(begin
(debug:print 0 *default-log-port* "INFO: no response from server "host":"port" for "dbfname)
(if (and (file-exists? servinf)
(> (- (current-seconds)(file-modification-time servinf)) 60))
(begin
(debug:print 0 *default-log-port* "INFO: "servinf" file seems old and no ping response, removing it.")
(handle-exceptions
exn
#f
(delete-file* servinf))
(tt:handler ttdat cmd run-id params (+ attemptnum 1) readonly-mode dbfname testsuite mtexe server-start-proc))
(begin
;; start server - addressed in client-connect-to-server
;; delay - addressed in client-connect-to-server
;; try again
(thread-sleep! 0.25) ;; dunno, I think this needs to be here
(tt:handler ttdat cmd run-id params (+ attemptnum 1) readonly-mode dbfname testsuite mtexe server-start-proc))
))))
(begin ;; no server file, delay and try again
(debug:print 2 *default-log-port* "INFO: connection to server "host":"port" broken for "dbfname", no servinf file. Server exited? ")
(thread-sleep! 0.5)
(tt:handler ttdat cmd run-id params (+ attemptnum 1) readonly-mode dbfname testsuite mtexe server-start-proc))))
(begin ;; this case is where res is malformed. Probably should abort
(assert #f "FATAL: tt:handler received bad data "res)
;; (debug:print 0 *default-log-port* "INFO: got corrupt data from server "host":"port", "res", for "dbfname", will try again.")
;; (tt:handler ttdat cmd run-id params (+ attemptnum 1) readonly-mode dbfname testsuite mtexe)
)))))
(begin
(thread-sleep! 1) ;; no conn yet set up, give it a rest and try again
(tt:handler ttdat cmd run-id params attemptnum readonly-mode dbfname testsuite mtexe server-start-proc)))))
;; gets server info and appends path to server file
;; sorts by age, --oldest-- now newest first
;;
;; move the ping here?
;;
;; returns list of (host port startseconds server-id servinfofile)
;;
(define (tt:get-server-info-sorted ttdat dbfname)
(let* ((areapath (tt-areapath ttdat))
(sfiles (tt:find-server areapath dbfname))
(sdats (filter car (map tt:server-get-info sfiles))) ;; first element is #f if the file disappeared while being read
(sorted (sort sdats (lambda (a b)
(let* ((starta (list-ref a 2))
(startb (list-ref b 2)))
(if (eq? starta startb)
(string>? (list-ref a 3)(list-ref b 3)) ;; if servers started at same time look at server-id
(> starta startb))))))
(count 0))
(for-each
(lambda (rec)
(if (or (> (length sorted) 1)
(common:low-noise-print 120 "server info sorted"))
(debug:print 2 *default-log-port* "SERVER #"count": "(string-intersperse (map conc sorted) ", ")))
(set! count (+ count 1)))
sorted)
sorted))
(define (tt:send-receive ttdat conn cmd run-id params)
(let* ((host-port (tt-conn-host-port conn)) ;; (conc (tt-conn-host conn)":"(tt-conn-port conn)))
(host (tt-conn-host conn))
(port (tt-conn-port conn))
(dat (list cmd run-id params #f))) ;; no meta data yet
(tt:send-receive-direct host port dat)))
(defstruct tt:backoff
(last-ioerr (current-seconds))
(last-adj-t (current-seconds))
(wait-delay 0.1))
(define *tt:backoff-smoothing* (make-hash-table)) ;; host:port => lastaccess backoffdelay )
(define (tt:backoff-incr host port) ;; call if tcp fails i/o net
(let* ((host-port (conc host":"port))
(bkoff (hash-table-ref/default *tt:backoff-smoothing* host-port #f)))
(if bkoff
(begin
(tt:backoff-last-ioerr-set! bkoff (current-seconds))
(tt:backoff-wait-delay-set! bkoff (+ (tt:backoff-wait-delay bkoff) 0.1)))
(hash-table-set! *tt:backoff-smoothing* host-port (make-tt:backoff)))))
(define (tt:backoff-decr-and-wait host port)
(let* ((host-port (conc host":"port))
(bkoff (hash-table-ref/default *tt:backoff-smoothing* host-port #f)))
(if bkoff
(let* ((wait-delay (tt:backoff-wait-delay bkoff))
(last-ioerr (tt:backoff-last-ioerr bkoff))
(last-adj-t (tt:backoff-last-adj-t bkoff))
(delta (- (current-seconds) last-adj-t))
(adj (* delta 0.001)) ;; it takes 100 seconds to recover from hitting an io err
(new-wait (if (> wait-delay 0)
(if (> adj wait-delay)
0
(- wait-delay adj))
0)))
(if (> new-wait 0)
(begin
(if (common:low-noise-print 10 "delay wait message")
(debug:print-info 0 *default-log-port* "Server on host " host " loaded, DelayWait: "new-wait))
(tt:backoff-wait-delay-set! bkoff new-wait)
(tt:backoff-last-adj-t-set! bkoff (current-seconds))
(thread-sleep! new-wait))
(hash-table-delete! *tt:backoff-smoothing* host-port))))))
(define (tt:send-receive-direct host port dat #!key (ping-mode #f)(tries-remaining 25))
(assert (number? port) "FATAL: tt:send-receive-direct called with a port that is not a number "port)
(tt:backoff-decr-and-wait host port)
(let* ((retry (lambda ()
(tt:send-receive-direct host port dat tries-remaining: (- tries-remaining 1))))
(full-err-print (lambda (exn msg)
(if (condition? exn)
(begin
(pp (condition->list exn) *default-log-port*)
(pp dat *default-log-port*)
(debug:print 0 *default-log-port* msg
", error: " ((condition-property-accessor 'exn 'message) exn)
", arguments: " ((condition-property-accessor 'exn 'arguments) exn)
", location: " ((condition-property-accessor 'exn 'location) exn)
))
(debug:print 0 *default-log-port* msg "(note: exn="exn", is not a condition object.")))))
(condition-case
(let-values (((inp oup)(tcp-connect host port)))
(let ((res (if (and inp oup)
(begin
(serialize dat oup)
(close-output-port oup)
(deserialize inp))
)))
(close-input-port inp)
(match res
((result exn-result stdout-result)
(if exn-result
(full-err-print exn-result "ERROR: Server side exception detected"))
(if stdout-result
(debug:print 0 *default-log-port* "ERROR: Output detected on stdout on server side execution => "stdout-result))
result)
(else
(debug:print 0 *default-log-port* "ERROR: server returned non-standard output: "res)
#f))))
(exn (io-error)
(full-err-print exn "ERROR: i/o error")
(tt:backoff-incr host port)
#f)
(exn (i/o net)
(if ping-mode
#f
(cond
((> tries-remaining 4) ;; server likely defunct
(tt:backoff-incr host port)
#f)
((>= tries-remaining 0)
(let* ((backoff-delay (max (* (- 26 tries-remaining) 0.1) 1.0)))
(debug:print 0 *default-log-port* "WARNING: TCP overload, trying again in "backoff-delay"s.")
(thread-sleep! backoff-delay)
(tt:backoff-incr host port)
(retry))
;; (assert #f "FATAL: Too many retries in tt:send-receive-direct")
)
(else #f))))
(exn ()
(full-err-print exn "Unhandled exception from client side.")
#f))))
;;======================================================================
;; server
;;======================================================================
(define (tt:sync-dbs ttdat)
#f)
(define *server-start-requests* '())
;; start the listener and start responding to requests
;;
;; NOTE: organise by dbfname, not run-id so we don't need
;; to pull in more modules
;;
;; This is the routine called in megatest.scm to start a server. NOTE: sequence is different for main.db vs. X.db
;;
;; Server viability is checked in keep-running. Blindly start and run here.
;;
(define (tt:start-server areapath run-id dbfname-in handler keys)
(assert areapath "FATAL: areapath not provided for tt:start-server")
(let* ((ttdat (make-tt areapath: areapath))
(dbfname (or dbfname-in (dbmod:run-id->dbfname run-id))))
(set! *server-info* ttdat)
(let* ((dbstruct (dbmod:open-dbmoddb areapath run-id dbfname (dbfile:db-init-proc) keys)))
(tt-handler-set! ttdat (handler dbstruct))
(let* ((servinf-created #f)
(tcp-thread (make-thread
(lambda ()
;; NOTE: tt-port and tt-host are set in connect-listener which is called under tt:start-tcp-server
(tt:start-tcp-server ttdat)) ;; start the tcp-server which applies handler to incoming data
"tcp-server-thread"))
(run-thread (make-thread
(lambda ()
(tt:keep-running ttdat dbfname dbstruct)))))
(thread-start! tcp-thread)
(let* ((areapath (tt-areapath ttdat))
(nosyncdbpath (conc areapath"/.mtdb"))
(servers ;; (tt:find-server areapath dbfname)))
(tt:get-server-info-sorted ttdat dbfname)) ;; (host port startseconds server-id servinfofile)
(good-srvrs
;; contact servers via ping, if no response remove the .servinfo file
(let loop ((servrs servers)
(prime-host #f)
(result '()))
(if (null? servrs)
(reverse result)
(let* ((servdat (car servrs)))
(match servdat
((host port startseconds server-id pid dbfilename servinfofile)
(debug:print-info 0 *default-log-port* "Good servinfo file: " servdat)
(let* ((ping-res (tt:timed-ping host port server-id))
(good-ping (match ping-res
((result . ping-time)
(not result)) ;; we couldn't reach the server or it was not a megatest server
(else #f))) ;; the ping failed completely?
(same-host (or (not prime-host) ;; i.e. this is the first host
(equal? prime-host host)))
(keep-srv (and good-ping same-host)))
(if keep-srv
(loop (cdr servrs)
host
(cons servdat result))
(let* ((modtime (file-modification-time servinfofile)))
;; if the .servinfo hasn't been touched in five min
;; we can be pretty sure the server is truly dead
(if (> (- (current-seconds) modtime) 360)
(handle-exceptions
exn
(debug:print-info 0 *default-log-port*
"Error removing server info file: "servinfofile", "
(condition->list exn))
(delete-file* servinfofile))
(loop (cdr servrs) prime-host result))))))
(else
;; can't delete it as we don't have a filename. NOTE: Should never get here.
(debug:print-info 0 *default-log-port* "ERROR: bad servinfo record \""servdat"\"")
(loop (cdr servrs) prime-host result)) ;; drop
)))))
(home-host (if (null? good-srvrs)
#f
(caar good-srvrs))))
;; by here we have a trustworthy list of servers and we have removed the .servinfo file for any unresponsive servers
;; and the list is in good-srvrs
;;
(cond
((not home-host) ;; no servers yet, go ahead and start
(debug:print-info 0 *default-log-port* "No servers yet, starting on "(get-host-name)))
((> (length good-srvrs) 3) ;; don't need more, just exit
(debug:print-info 0 *default-log-port* "Have "(length good-srvrs)", no need for more, exiting.")
(exit))
((not (equal? home-host (get-host-name))) ;; there is a home-host and we are not on it
(debug:print-info 0 *default-log-port* "Prime main server is on host "home-host", but we are on host "(get-host-name)", exiting.")
(exit))
(else
(debug:print-info 0 *default-log-port* "Starting on host "(get-host-name)", along with "(length good-srvrs)" other servers.")))
;; this didn't seem to work, is port not available yet?
(let loop ((count 0))
(if (tt-port ttdat)
(begin
(procinf-port-set! *procinf* (tt-port ttdat))
(procinf-dbname-set! *procinf* dbfname)
(dbfile:with-no-sync-db
nosyncdbpath
(lambda (nsdb)
(dbfile:insert-or-update-process nsdb *procinf*))))
(if (< count 10)
(begin
(thread-sleep! 0.25)
(loop (+ count 1)))
(begin
(debug:print 0 *default-log-port* "ERROR: (tt-port ttdat) no port set! Exiting.")
(exit)))))
;; create a servinfo file start keep-running
;; On WSL there seems to be a race condition where the .servinfo file
;; is not created fast enough
(debug:print 0 *default-log-port* "Creating servinfo file for " dbfname)
(tt:create-server-registration-file ttdat dbfname)
(procinf-status-set! *procinf* "running")
(tt-state-set! ttdat 'running)
(dbfile:with-no-sync-db
nosyncdbpath
(lambda (nsdb)
(dbfile:insert-or-update-process nsdb *procinf*)))
(thread-start! run-thread)
(thread-join! run-thread) ;; run thread will exit on timeout or other conditions
;; (tcp-close (tt-socket ttdat)) ;; close up ports here
;; replace with call to (dbfile:set-process-done nsdb host pid reason)
(procinf-status-set! *procinf* "done")
(procinf-end-set! *procinf* (current-seconds))
;; either convert this to use set-process-done or get rid of set-process-done
(dbfile:with-no-sync-db
nosyncdbpath
(lambda (nsdb)
(dbfile:insert-or-update-process nsdb *procinf*)))
(debug:print 0 *default-log-port* "Exiting now.")
(exit))))))
(define (tt:keep-running ttdat dbfname dbstruct)
(thread-sleep! 1)
;; at this point the server is running and responding to calls, we just monitor
;; for db calls and exit if there are none.
;; if I am not in the first 3 servers, exit
(let* ((start-time (current-seconds)))
(let loop ()
(let* ((servers (tt:get-server-info-sorted ttdat dbfname))
(home-host (if (null? servers)
#f
(caar servers)))
(my-index (list-index (lambda (x)
(equal? (list-ref x 6)
(tt-servinf-file ttdat)))
servers))
(ok (cond
((not my-index)
(debug:print 0 *default-log-port* "WARNING: Apparently I don't exist.")
#f) ;; keep trying or give up?
((not *server-run*)
(debug:print 0 *default-log-port* "WARNING: received a stop server from client by remote request.")
#f)
((null? servers)
(debug:print 0 *default-log-port* "WARNING: no servinfo files found, this cannot be.")
#f) ;; not ok
((> my-index 3)
(debug:print 0 *default-log-port* "WARNING: there are more than three servers ahead of me, I'm not needed, exiting.")
#f) ;; not ok to not be in first three
((eq? (tt-state ttdat) 'running) #t) ;; we are good to keep going
((> (- (current-seconds) start-time) 30)
(debug:print 0 *default-log-port* "WARNING: over 30 seconds and not yet in runnning mode. Exiting.")
#f)
(else #t))))
(if ok
(tt-last-access-set! ttdat *db-last-access*) ;; bit silly, just use db-last-access
(begin
(debug:print 0 *default-log-port* "Exiting immediately")
(tt:shutdown-server ttdat)
(exit)))
(let* ((last-update (dbr:dbstruct-last-update dbstruct))
(curr-secs (current-seconds)))
(if (and (eq? (tt-state ttdat) 'running)
(> (- curr-secs last-update) 5)) ;; every 5 seconds update the db?
(let* ((sinfo-file (tt-servinf-file ttdat)))
;; (debug:print 0 *default-log-port* "INFO: touching "sinfo-file)
(set! (file-modification-time sinfo-file) (current-seconds))
((dbr:dbstruct-sync-proc dbstruct) last-update)
(dbr:dbstruct-last-update-set! dbstruct curr-secs))))
(if (< (- (current-seconds) (tt-last-access ttdat)) (tt-server-timeout-param))
;; process any requests to start a new server due to load on this one
(let* ((requests *server-start-requests*))
(set! *server-start-requests* '())
(if (> (length requests) 0)
(debug:print-info 0 *default-log-port* "Processing "(length requests)" server start requests"))
(for-each (lambda (proc)
(proc)
(thread-sleep! 1))
requests)
(thread-sleep! 5)
(loop)))))
(tt:shutdown-server ttdat)
(debug:print 0 *default-log-port* "INFO: Server timed out, exiting from tt:keep-running.")))
(define (tt:shutdown-server ttdat)
(let* ((host (tt-host ttdat))
(port (tt-port ttdat))
(sinf (tt-servinf-file ttdat)))
(tt-state-set! ttdat 'shutdown)
(portlogger:open-run-close portlogger:set-port port "released")
(if (file-exists? sinf)
(delete-file* sinf))
))
;; return servid
;; side-effects:
;; ttdat-cleanup-proc is populated with function to remove the serverinfo file
(define (tt:create-server-registration-file ttdat dbfname)
(let* ((areapath (tt-areapath ttdat))
(servdir (tt:get-servinfo-dir areapath))
(host (tt-host ttdat))
(port (tt-port ttdat))
(servinf (conc servdir"/"host":"port"-"(current-process-id)":"dbfname))
(serv-id (tt:mk-signature areapath)))
(assert (and host port) "FATAL: tt:create-server-registration-file called with no conn, dbfname="dbfname)
(tt-servinf-file-set! ttdat servinf)
(with-output-to-file servinf
(lambda ()
(print "SERVER STARTED: "host":"port" AT "(current-seconds)" server-id: "serv-id" pid: "(current-process-id)" dbfname: "dbfname)))
(let loop ((count 0))
(if (not (file-exists? servinf))
(begin
(debug:print 0 *default-log-port* "WARNING: file "servinf" was created but it doesn't show up on disk! We'll try again.")
(thread-sleep! 1)
(if (> count 10)
(debug:print 0 *default-log-port* "WARNING: file "servinf" was not created.")
(loop (+ count 1))))))
serv-id))
;; find valid server
;; get servers listed, last part of name must match :<dbfname>
;; if more than one, wait one second and look again
;;
;; NOTE: this only gets the servinfo data, no network activity here
;; i.e. no ping etc.
;;
(define (tt:find-server areapath dbfname)
(let* ((servdir (tt:get-servinfo-dir areapath))
(sfiles (glob (conc servdir"/*:"dbfname)))
(goodfiles '()))
;; filter the files here by looking in processes table (if we are not main.db)
;; and or look at the time stamp on the servinfo file, a running server will
;; touch the file every minute (again, this will only apply for main.db)
(for-each (lambda (fname)
(let* ((age (- (current-seconds)(file-modification-time fname))))
(if (> age (tt-server-timeout-param)) ;; can't trust it if over server timeout old.
(begin
(debug:print 0 *default-log-port* "WARNING: removing stale servinfo file "fname", it is "age" seconds old")
(handle-exceptions
exn
(debug:print 0 *default-log-port* "WARNING: error attempting to remove stale servinfo file "fname)
(delete-file fname))) ;;
(set! goodfiles (cons fname goodfiles)))))
sfiles)
goodfiles))
;; given a path to a server info file return: host port startseconds server-id pid dbfname logf
;; example of what it's looking for in the file:
;; SERVER STARTED: 10.38.175.67:50216 AT 1616502350.0 server-id: 4907e90fc55c7a09694e3f658c639cf4
;;
(define (tt:server-get-info logf)
(let ((server-rx (regexp "^SERVER STARTED: (\\S+):(\\d+) AT ([\\d\\.]+) server-id: (\\S+) pid: (\\d+) dbfname: (\\S+)")) ;; SERVER STARTED: host:port AT timesecs server id
(bad-dat (list #f #f #f #f #f #f logf)))
(let ((fdat (handle-exceptions
exn
(begin
;; BUG, TODO: add err checking, for now blanket ignore the errors?
(debug:print-info 0 *default-log-port* "Unable to get server info from "logf
", exn="(condition->list exn))
'()) ;; no idea what went wrong, call it a bad server, return empty list
(with-input-from-file logf read-lines))))
(if (null? fdat) ;; bad data, return bad-dat
bad-dat
(let loop ((inl (car fdat))
(tail (cdr fdat))
(lnum 0))
(let ((mlst (string-match server-rx inl)))
(if (not mlst)
(if (> lnum 500) ;; give up if more than 500 lines of server log read
bad-dat
(if (null? tail)
bad-dat
(loop (car tail)(cdr tail)(+ lnum 1))))
(match mlst ;; have a not null list
((_ host port start server-id pid dbfname)
(list host
(string->number port)
(string->number start)
server-id
(string->number pid)
dbfname
logf))
(else
(debug:print 0 *default-log-port* "ERROR: did not recognise SERVER line info "mlst)
bad-dat)))))))))
(define *last-server-start* (make-hash-table))
(define (tt:too-recent-server-start dbfname)
(let* ((last-run-time (hash-table-ref/default *last-server-start* dbfname #f)))
(and last-run-time
(< (- (current-seconds) last-run-time) 5))))
(define *last-server-start-request-time* 0)
;; Given an area path, start a server process ### NOTE ### > file 2>&1
;; if the target-host is set
;; try running on that host
;; incidental: rotate logs in logs/ dir.
;;
(define (tt:server-process-run areapath testsuite mtexe run-id #!key (profile-mode "")) ;; areapath is *toppath* for a given testsuite area
(assert areapath "FATAL: tt:server-process-run called without areapath defined.")
(assert testsuite "FATAL: tt:server-process-run called without testsuite defined.")
(assert mtexe "FATAL: tt:server-process-run called without mtexe defined.")
;; mtest -server - -m testsuite:ext-tests -db 6.db
(let* ((dbfname (dbmod:run-id->dbfname run-id)))
(if (or (< (- (current-seconds) *last-server-start-request-time*) 5) ;; attempted start less than 5 sec ago
(tt:too-recent-server-start dbfname))
#f
(let* ((load (get-normalized-cpu-load))
(srvrs (tt:find-server areapath dbfname))
(trying (length srvrs))
(nrun (number-of-processes-running (conc "mtest.*server.*"testsuite".*"dbfname))))
(set! *last-server-start-request-time* (current-seconds))
(cond
((> load 4.0)
(debug:print 0 *default-log-port* "Normalized load "load" on " (get-host-name) " is over the limit of 4.0. Not starting a server. Please reduce the load on "(get-host-name)" by killing some processes")
(thread-sleep! 1) ;; I'm not convinced that a delay here is helpful. -mrw-
#f)
((> nrun 100)
(debug:print 0 *default-log-port* nrun" servers running on " (get-host-name) ", not starting another.")
(thread-sleep! 1)
#f)
((> trying 3)
(debug:print 0 *default-log-port* trying" servers registered in .servinfo dir. not starting another.")
(thread-sleep! 1)
#f)
(else
(if (not (file-exists? (conc areapath"/logs")))
(create-directory (conc areapath"/logs") #t))
(let* ((logfile (conc areapath "/logs/server-"dbfname"-"(current-process-id)".log")) ;; -" curr-pid "-" target-host ".log"))
(cmdln (conc
mtexe
" -startdir "areapath
" -server - ";; (or target-host "-")
" -m testsuite:"testsuite
" -db "dbfname ;; (dbmod:run-id->dbfname run-id)
" " profile-mode
#;(conc " >> " logfile " 2>&1 &"))))
;; we want the remote server to start in *toppath* so push there
;; (push-directory areapath) ;; use cd in the command line instead
(debug:print 2 *default-log-port* "INFO: Trying to start server in tcp mode (" cmdln ") at "(common:human-time)" for "areapath)
;; (debug:print 0 *default-log-port* "INFO: starting server at " (common:human-time))
(setenv "NBFAKE_QUIET" "yes") ;; BUG: change to with-environment-variable ...
(setenv "NBFAKE_LOG" logfile)
(system (conc "cd "areapath" ; nbfake " cmdln))
(unsetenv "NBFAKE_QUIET")
(unsetenv "NBFAKE_LOG")
;; (system cmdln)
(hash-table-set! *last-server-start* dbfname (current-seconds))
;; ;; use below to go back to nbfake - nbfake does cause trouble ...
;; (setenv "NBFAKE_QUIET" "yes") ;; BUG: change to with-environment-variable ...
;; (setenv "NBFAKE_LOG" logfile)
;; (system (conc "cd "areapath" ; nbfake " cmdln))
;; (unsetenv "NBFAKE_QUIET")
;; (unsetenv "NBFAKE_LOG")
;;(pop-directory)
#t)))))))
;;======================================================================
;; tcp connection stuff
;;======================================================================
;; find a port and start tcp-server. This only starts the tcp portion of
;; the server, look at (tt:start-server ...) above for the entry point
;; for the entire server system
;;
(define (tt:start-tcp-server ttdat)
(setup-listener-portlogger ttdat) ;; set up tcp-listener
(let* ((socket (tt-socket ttdat))
(handler (tt-handler ttdat)) ;; the handler comes from our client setting a handler function
(handler-proc (lambda ()
(let* ((indat (deserialize)) ;; could use: (thread-terminate! (current-thread))
(result #f)
(exn-result #f)
(stdout-result (with-output-to-string
(lambda ()
(let ((res (handle-exceptions
exn
(let* ((errdat (condition->list exn)))
(set! exn-result errdat)
(debug:print 0 *default-log-port* "ERROR: handler exception, these are bad, will exit in five seconds.")
(pp errdat *default-log-port*)
;; these are always bad, set up an exit thread
(thread-start! (make-thread (lambda ()
(thread-sleep! 5)
(exit))))
#f)
(handler indat) ;; this is the proc being called by the remote client
)))
(set! result res)))))
(full-result (list result exn-result (if (equal? stdout-result "") #f stdout-result))))
(handle-exceptions
exn
(begin
(debug:print 0 *default-log-port* "Serialization failure. full-result="full-result)
(thread-start! (make-thread (lambda ()
(thread-sleep! 5)
(exit))))) ;; (serialize '(#f #f #f)) ;; doesn't work - the first call to serialize caused failure
(serialize full-result))))))
((make-tcp-server socket handler-proc)
#f ;; yes, send error messages to std-err
)))
;; create a tcp listener and return a populated udat struct with
;; my port, address, hostname, pid etc.
;; return #f if fail to find a port to allocate.
;;
;; if udata-in is #f create the record
;; if there is already a serv-listener return the udata
;;
;; (define (setup-listener uconn #!optional (port 4242))
;; (assert (tt? uconn) "FATAL: setup-listener called with wrong struct "uconn)
;; (handle-exceptions
;; exn
;; (if (< port 65535)
;; (begin
;; (thread-sleep! 0.25)
;; (setup-listener uconn (+ port 1)))
;; #f)
;; (connect-listener uconn port)))
(define (setup-listener-portlogger uconn)
(let ((port (portlogger:open-run-close portlogger:find-port)))
(assert (tt? uconn) "FATAL: setup-listener called with wrong struct "uconn)
(debug:print 2 *default-log-port* "setup-listener-portlogger got port " port)
(handle-exceptions
exn
(if (< port 65535)
(begin
(debug:print 0 *default-log-port* "setup-listener-portlogger: exception finding port. Retrying")
(portlogger:open-run-close portlogger:set-failed port)
(thread-sleep! 0.25)
(setup-listener-portlogger uconn))
(begin
(assert #t "setup-listener-portlogger: could not get a port")
#f
)
)
(debug:print 2 *default-log-port* "setup-listener-portlogger: got port " port)
(connect-listener uconn port))))
(define (connect-listener uconn port)
;; (tcp-listener-socket LISTENER)(socket-name so)
;; sockaddr-address, sockaddr-port, sockaddr->string
(let* ((tlsn (tcp-listen port 10000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]])
(addr (tt:get-best-guess-address (get-host-name)))) ;; (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname)))
(tt-port-set! uconn port)
(tt-host-set! uconn addr)
(tt-host-port-set! uconn (conc addr":"port))
(tt-socket-set! uconn tlsn)
uconn))
;;======================================================================
;; utils
;;======================================================================
;; Generate a unique signature for this server
(define (tt:mk-signature areapath)
(message-digest-string (md5-primitive)
(with-output-to-string
(lambda ()
(write (list areapath
(current-process-id)
(argv)))))))
(define (tt:get-best-guess-address hostname)
(let ((res #f))
(for-each
(lambda (adr)
(if (not (eq? (u8vector-ref adr 0) 127))
(set! res adr)))
;; NOTE: This can fail when there is no mention of the host in /etc/hosts. FIXME
(vector->list (hostinfo-addresses (hostname->hostinfo hostname))))
(string-intersperse
(map number->string
(u8vector->list
(if res res (hostname->ip hostname)))) ".")))
(define (tt:get-servinfo-dir areapath)
(let* ((spath (conc areapath"/.servinfo")))
(if (not (file-exists? spath))
(create-directory spath #t))
spath))
;;======================================================================
;; network utilities
;;======================================================================
;; NOTE: Look at address-info egg as alternative to some of this
(define (rate-ip ipaddr)
(regex-case ipaddr
( "^127\\..*" _ 0 )
( "^(10\\.0|192\\.168)\\..*" _ 1 )
( else 2 ) ))
;; Change this to bias for addresses with a reasonable broadcast value?
;;
(define (ip-pref-less? a b)
(> (rate-ip a) (rate-ip b)))
(define (get-my-best-address)
(let ((all-my-addresses (get-all-ips)))
(cond
((null? all-my-addresses)
(get-host-name)) ;; no interfaces?
((eq? (length all-my-addresses) 1)
(car all-my-addresses)) ;; only one to choose from, just go with it
(else
(car (sort all-my-addresses ip-pref-less?))))))
(define (get-all-ips-sorted)
(sort (get-all-ips) ip-pref-less?))
(define (get-all-ips)
(map address-info-host
(filter (lambda (x)
(equal? (address-info-type x) "tcp"))
(address-infos (get-host-name)))))
;;======================================================================
;; Other Utils
;;======================================================================
;; 1.db => (10 . 9) ;; (total . hits)
(define *journal-stats* (make-hash-table))
;; monte-carlo-esque random sampling of journal files
;; for all the files:
;; if .journal
;; update stats +1 +1
;; update stats +1 0
;;
(define (tt:write-load-tracking dbdir)
(let* ((cs (current-seconds))
(key (inexact->exact (quotient cs 10))))
(directory-fold
dbdir
(lambda (res fname)
(cons fname res))
'())))
)