;;======================================================================
;; Copyright 2017, Matthew Welland.
;;
;; This file is part of Megatest.
;;
;; Megatest is free software: you can redistribute it and/or modify
;; it under the terms of the GNU General Public License as published by
;; the Free Software Foundation, either version 3 of the License, or
;; (at your option) any later version.
;;
;; Megatest is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
;; GNU General Public License for more details.
;;
;; You should have received a copy of the GNU General Public License
;; along with Megatest. If not, see <http://www.gnu.org/licenses/>.
;;======================================================================
(declare (unit tcp-transportmod))
(declare (uses debugprint))
(declare (uses commonmod))
(declare (uses dbfile))
(declare (uses dbmod))
(declare (uses portlogger))
(use address-info tcp)
(module tcp-transportmod
*
(import scheme
(prefix sqlite3 sqlite3:)
chicken
data-structures
address-info
directory-utils
extras
files
hostinfo
matchable
md5
message-digest
ports
posix
regex
regex-case
s11n
srfi-1
srfi-18
srfi-4
srfi-69
stack
typed-records
tcp-server
tcp
debugprint
commonmod
dbfile
dbmod
portlogger
)
;;======================================================================
;; client
;;======================================================================
;; (define keep-age-param (make-parameter 10)) ;; qif file age, if over move to attic
;; Used ONLY for client
;;
(defstruct tt-conn
host
port
host-port
dbfname
server-id
server-start
servinf-file
pid
)
;; Used for BOTH clients and servers
(defstruct tt
;; client related
(conns (make-hash-table)) ;; dbfname -> conn
;; server related
(state 'starting)
(areapath #f)
(host #f)
(port #f)
(conn #f)
(cleanup-proc #f)
(handler #f) ;; receives data and responds
(socket #f)
(thread #f)
(host-port #f)
(cmd-thread #f)
(ro-mode #f)
(ro-mode-checked #f)
(last-access (current-seconds))
(servinf-file #f)
(last-serv-start 0)
)
;; parameters
;;
(define tt-server-timeout-param (make-parameter 600))
;; make ttdat visible
(define *server-info* #f)
(define (tt:make-remote areapath)
(make-tt areapath: areapath))
;; 1 ... or #f
;; and check that dbfname matches. FIXME: the propagation of dbfname and run-id
;; might not make the best sense
;;
(define (tt:valid-run-id run-id dbfname)
(and (or (number? run-id)
(not run-id))
(equal? (dbfile:run-id->dbfname run-id) dbfname)))
(tcp-buffer-size 2048)
;; (max-connections 4096)
;; do all the busy work of finding and setting up conn for
;; connecting to a server
;; This function, `tt:client-connect-to-server`, is designed to manage connections between a client and a server within a testing framework.
;; The function takes four arguments:
;; 1. `ttdat`: a data structure that holds information about the testing environment or connections.
;; 2. `dbfname`: The name of the database file that the client wants to connect to.
;; 3. `run-id`: An identifier for the current run of the test suite.
;; 4. `testsuite`: The test suite that is being run.
;;
;; Here's a step-by-step explanation of what the function does:
;;
;; 1. It first asserts that the `run-id` is valid for the given `dbfname` using the `tt:valid-run-id` function. If the `run-id` is not valid, it raises a fatal error.
;; 2. It prints debug information indicating that the function `tt:client-connect-to-server` has been called with the given `dbfname`.
;; 3. It attempts to retrieve an existing connection to the server from a hash table (`tt-conns`) using the `dbfname` as the key. If a connection already exists, it prints debug information and returns the existing connection.
;; 4. If no existing connection is found, it retrieves the current server information from the servinfo file, using the `tt:get-current-server-info` function.
;; 5. It uses pattern matching to destructure the server information into variables (`host`, `port`, `start-time`, `server-id`, `pid`, `dbfname2`, `servinffile`). It then asserts that the `dbfname` from the server info matches the one provided to the function.
;; 6. It constructs a connection object (`conn`) with the server information.
;; 7. It attempts to ping the server using `tt:timed-ping` to verify that the server is running and can be communicated with.
;; 8. Depending on the result of the ping:
;; - If the server is running (`running`), it prints debug information, saves the connection in the hash table, and returns the connection.
;; - If the server is starting (`starting`), it sleeps for 2 seconds and then recursively calls itself to retry the connection.
;; - If the server is neither running nor starting, it checks if it's been more than 10 seconds since the last server start attempt. If so, it attempts to start the server using `server-start-proc` and then sleeps for 1 second before retrying the connection.
;; 9. If no server information is found (`else` case), it checks if it's been more than 3 seconds since the last server start attempt. If so, it starts a new server using `server-start-proc`, updates the last server start time, and sleeps for 4 seconds.
;; 10. It then sleeps for 1 second and prints debug information before recursively calling itself to retry the connection.
;;
;; The function uses recursion to keep trying to connect to the server, with various sleep intervals to prevent overwhelming the system with connection attempts or server starts.
;; It also uses a hash table to cache connections and avoid reconnecting to a server if a connection already exists.
;; The function is designed to handle different server states and ensure that a server is running and available before returning a valid connection to the caller.
;;
(define (tt:client-connect-to-server ttdat dbfname run-id testsuite)
(assert (tt:valid-run-id run-id dbfname) "FATAL: invalid run-id "run-id)
(debug:print-info 2 *default-log-port* "tt:client-connect-to-server " dbfname)
(let* ((conn (hash-table-ref/default (tt-conns ttdat) dbfname #f))
(server-start-proc (lambda ()
(tt:server-process-run
(tt-areapath ttdat)
testsuite ;; (dbfile:testsuite-name)
(common:find-local-megatest)
run-id))))
(if conn
(begin
(debug:print-info 2 *default-log-port* "already connected to a server for " dbfname)
conn) ;; we are already connected to the server
(let* ((sdat (tt:get-current-server-info ttdat dbfname)))
(match sdat
((host port start-time server-id pid dbfname2 servinffile)
(assert (equal? dbfname dbfname2) "FATAL: read server info from wrong file.")
(debug:print-info 2 *default-log-port* "no conn - in match servinffile:" servinffile)
(let* ((host-port (conc host":"port))
(conn (make-tt-conn
host: host
port: port
host-port: host-port
dbfname: dbfname
servinf-file: servinffile
server-id: server-id
server-start: start-time
pid: pid)))
;; verify we can talk to this server
(let* ((result (tt:timed-ping host port server-id))
(ping-res (car result))
(ping (cdr result)))
(debug:print-info 2 *default-log-port* "host " host " port " port " ping time: " ping " result " ping-res)
(case ping-res
((running)
(debug:print-info 2 *default-log-port* "Setting conn = " conn " in hash table")
(hash-table-set! (tt-conns ttdat) dbfname conn) ;;; is this ok to save before validating that the connection is good?
conn)
((starting)
(thread-sleep! 2)
(debug:print-info 0 *default-log-port* "server for " dbfname " is in starting state, retrying connect")
(tt:client-connect-to-server ttdat dbfname run-id testsuite))
(else
(let* ((curr-secs (current-seconds)))
;; rm the (last server) would go here
(if (> (- curr-secs (tt-last-serv-start ttdat)) 10)
(begin
(tt-last-serv-start-set! ttdat curr-secs)
(server-start-proc))) ;; start server if 10 sec since last attempt
(thread-sleep! 1)
(debug:print-info 2 *default-log-port* "server ping result was neither running nor starting. Retrying connect")
(tt:client-connect-to-server ttdat dbfname run-id testsuite)))))))
(else ;; no good server found, if haven't started server in > 5 secs, start another
(if (> (- (current-seconds) (tt-last-serv-start ttdat)) 3) ;; BUG - grow this number really do not want to swamp the machine with servers
(begin
(debug:print-info 0 *default-log-port* "Starting server for "dbfname)
(server-start-proc)
(tt-last-serv-start-set! ttdat (current-seconds))
(thread-sleep! 4)
))
(thread-sleep! 1)
(debug:print-info 0 *default-log-port* "Connect to server for " dbfname)
(tt:client-connect-to-server ttdat dbfname run-id testsuite)))))))
(define (tt:timed-ping host port server-id)
(let* ((start-time (current-milliseconds))
(result (tt:ping host port server-id)))
(cons result (- (current-milliseconds) start-time))))
(define (tt:ping host port server-id #!optional (tries-left 5))
(let* ((res (tt:send-receive-direct host port `(ping #f #f #f) ping-mode: #t)) ;; please send me your server-id
(try-again (lambda ()
(if (> tries-left 0)
(begin
(thread-sleep! 1)
(tt:ping host port server-id (- tries-left 1)))
#f))))
;;
;; need two threads, one a 5 second timer
;;
(match res
((status errmsg result meta)
(if (equal? result server-id)
(let* ((server-state (alist-ref 'sstate meta)))
;; (debug:print 0 *default-log-port* "Ping to "host":"port" successful.")
(or server-state 'unk)) ;; then we are good
(begin
(debug:print 0 *default-log-port* "WARNING: server-id does not match, expected: "server-id", got: "result)
#f)))
(else
;; (debug:print 0 *default-log-port* "res not in form (status errmsg result meta), got: "res)
(try-again)))))
;; client side handler
;;
;;(tt:handler #<tt> get-keys #f () 2 #f "/home/matt/data/megatest/ext-tests" #f "main.db" "ext-tests" "/home/matt/data/megatest/bin/.22.04/../megatest")
;;
(define (tt:handler ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe)
(debug:print 2 *default-log-port* "tt:handler cmd: " cmd " run-id: " run-id " attemptnum: " attemptnum)
;; NOTE: areapath is passed in and in tt struct. We'll use passed in value for now.
;; connect-to-server will start a server if needed.
(let* ((conn (tt:client-connect-to-server ttdat dbfname run-id testsuite))) ;; (hash-table-ref/default (tt-conns ttdat) dbfname #f)))
(if conn
;; have connection, call the server
(let* ((res (tt:send-receive ttdat conn cmd run-id params)))
;; res is (status errmsg result meta)
; (debug:print 0 *default-log-port* "conn:" conn " res: " res)
(match res
((status errmsg result meta)
(if (list? meta)
(let* ((delay-wait (alist-ref 'delay-wait meta)))
(if (and (number? delay-wait)
(> delay-wait 0))
(begin
(debug:print 0 *default-log-port* "Server is loaded, delaying "delay-wait" seconds")
(thread-sleep! delay-wait)))))
(case status
((busy) ;; result will be how long the server wants you to delay
(let* ((dly (if (number? result) result 0.1)))
(debug:print 0 *default-log-port* "WARNING: server for "dbfname" is busy, will try again in "dly" seconds.")
(thread-sleep! dly)
(tt:handler ttdat cmd run-id params (+ attemptnum 1) area-dat areapath readonly-mode dbfname testsuite mtexe)))
((loaded)
(debug:print 0 *default-log-port* "WARNING: server for "dbfname" is loaded, slowing queries.")
(tt:backoff-incr (tt-conn-host conn)(tt-conn-port conn))
result) ;; (tt:handler ttdat cmd run-id params (+ attemptnum 1) area-dat areapath readonly-mode dbfname testsuite mtexe))
(else
result)))
(else ;; did not receive properly formated result
(if (not res) ;; tt:send-receive telling us that communication failed
(let* ((host (tt-conn-host conn))
(port (tt-conn-port conn))
;; (dbfname (tt-conn-port conn)) ;; 192.168.0.127:4242-726924:4.db
(pid (tt-conn-pid conn))
;;(servinf (tt-conn-servinf-file conn)))
(servinf (tt-servinf-file ttdat))) ;; (conc areapath"/.servinfo/"host":"port"-"pid":"dbfname))) ;; TODO, use (server:get-servinfo-dir areapath)
(hash-table-set! (tt-conns ttdat) dbfname #f)
(if (and servinf (file-exists? servinf))
(begin
(if (< attemptnum 10)
(begin
(thread-sleep! 0.5)
(tt:handler ttdat cmd run-id params (+ attemptnum 1) area-dat areapath readonly-mode dbfname testsuite mtexe))
(begin
(debug:print 0 *default-log-port* "INFO: no response from server "host":"port" for "dbfname)
(if (and (file-exists? servinf)
(> (- (current-seconds)(file-modification-time servinf)) 60))
(begin
(debug:print 0 *default-log-port* "INFO: "servinf" file seems old and no ping response, removing it.")
(handle-exceptions
exn
#f
(delete-file* servinf))
(tt:handler ttdat cmd run-id params (+ attemptnum 1) area-dat areapath readonly-mode dbfname testsuite mtexe))
(begin
;; start server - addressed in client-connect-to-server
;; delay - addressed in client-connect-to-server
;; try again
(thread-sleep! 0.25) ;; dunno, I think this needs to be here
(tt:handler ttdat cmd run-id params (+ attemptnum 1) area-dat areapath readonly-mode dbfname testsuite mtexe))
))))
(begin ;; no server file, delay and try again
(debug:print 2 *default-log-port* "INFO: connection to server "host":"port" broken for "dbfname", no servinf file. Server exited? ")
(thread-sleep! 0.5)
(tt:handler ttdat cmd run-id params (+ attemptnum 1) area-dat areapath readonly-mode dbfname testsuite mtexe))))
(begin ;; this case is where res is malformed. Probably should abort
(assert #f "FATAL: tt:handler received bad data "res)
;; (debug:print 0 *default-log-port* "INFO: got corrupt data from server "host":"port", "res", for "dbfname", will try again.")
;; (tt:handler ttdat cmd run-id params (+ attemptnum 1) area-dat areapath readonly-mode dbfname testsuite mtexe)
)))))
(begin
(thread-sleep! 1) ;; no conn yet set up, give it a rest and try again
(tt:handler ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe)))))
(define (tt:bid-for-servership run-id)
#f)
;; gets server info and appends path to server file
;; sorts by age, oldest first
;;
;; returns list of (host port startseconds server-id servinfofile)
;;
(define (tt:get-server-info-sorted ttdat dbfname)
(let* ((areapath (tt-areapath ttdat))
(sfiles (tt:find-server areapath dbfname))
(sdats (filter car (map tt:server-get-info sfiles))) ;; first element is #f if the file disappeared while being read
(sorted (sort sdats (lambda (a b)
(let* ((starta (list-ref a 2))
(startb (list-ref b 2)))
(if (eq? starta startb)
(string>? (list-ref a 3)(list-ref b 3)) ;; if servers started at same time look at server-id
(< starta startb))))))
(count 0))
(for-each
(lambda (rec)
(if (or (> (length sorted) 1)
(common:low-noise-print 120 "server info sorted"))
(debug:print 2 *default-log-port* "SERVER #"count": "(string-intersperse (map conc sorted) ", ")))
(set! count (+ count 1)))
sorted)
sorted))
(define (tt:get-current-server-info ttdat dbfname)
(assert (tt-areapath ttdat) "FATAL: areapath not set in ttdat.")
;;
;; TODO - replace most of below with tt;get-server-info-sorted
;;
(let* ((areapath (tt-areapath ttdat))
(sfiles (tt:find-server areapath dbfname))
(sdats (filter car (map tt:server-get-info sfiles))) ;; first element is #f if the file disappeared while being read
(sorted (sort sdats (lambda (a b)
(< (list-ref a 2)(list-ref b 2))))))
(if (null? sorted)
#f ;; we'll want to wait until extra servers have exited
(car sorted))))
(define (tt:send-receive ttdat conn cmd run-id params)
(let* ((host-port (tt-conn-host-port conn)) ;; (conc (tt-conn-host conn)":"(tt-conn-port conn)))
(host (tt-conn-host conn))
(port (tt-conn-port conn))
(dat (list cmd run-id params #f))) ;; no meta data yet
(tt:send-receive-direct host port dat)))
(defstruct tt:backoff
(last-ioerr (current-seconds))
(last-adj-t (current-seconds))
(wait-delay 0.1))
(define *tt:backoff-smoothing* (make-hash-table)) ;; host:port => lastaccess backoffdelay )
(define (tt:backoff-incr host port) ;; call if tcp fails i/o net
(let* ((host-port (conc host":"port))
(bkoff (hash-table-ref/default *tt:backoff-smoothing* host-port #f)))
(if bkoff
(begin
(tt:backoff-last-ioerr-set! bkoff (current-seconds))
(tt:backoff-wait-delay-set! bkoff (+ (tt:backoff-wait-delay bkoff) 0.1)))
(hash-table-set! *tt:backoff-smoothing* host-port (make-tt:backoff)))))
(define (tt:backoff-decr-and-wait host port)
(let* ((host-port (conc host":"port))
(bkoff (hash-table-ref/default *tt:backoff-smoothing* host-port #f)))
(if bkoff
(let* ((wait-delay (tt:backoff-wait-delay bkoff))
(last-ioerr (tt:backoff-last-ioerr bkoff))
(last-adj-t (tt:backoff-last-adj-t bkoff))
(delta (- (current-seconds) last-adj-t))
(adj (* delta 0.001)) ;; it takes 100 seconds to recover from hitting an io err
(new-wait (if (> wait-delay 0)
(if (> adj wait-delay)
0
(- wait-delay adj))
0)))
(if (> new-wait 0)
(begin
(if (common:low-noise-print 10 "delay wait message")
(debug:print-info 0 *default-log-port* "Server loaded, DelayWait: "new-wait))
(tt:backoff-wait-delay-set! bkoff new-wait)
(tt:backoff-last-adj-t-set! bkoff (current-seconds))
(thread-sleep! new-wait))
(hash-table-delete! *tt:backoff-smoothing* host-port))))))
(define (tt:send-receive-direct host port dat #!key (ping-mode #f)(tries-remaining 25))
(assert (number? port) "FATAL: tt:send-receive-direct called with port not a number "port)
(tt:backoff-decr-and-wait host port)
(let* ((retry (lambda ()
(tt:send-receive-direct host port dat tries-remaining: (- tries-remaining 1))))
(full-err-print (lambda (exn msg)
(if (condition? exn)
(begin
(pp (condition->list exn) *default-log-port*)
(pp dat *default-log-port*)
(debug:print 0 *default-log-port* msg
", error: " ((condition-property-accessor 'exn 'message) exn)
", arguments: " ((condition-property-accessor 'exn 'arguments) exn)
", location: " ((condition-property-accessor 'exn 'location) exn)
))
(debug:print 0 *default-log-port* msg "(note: exn="exn", is not a condition object.")))))
(condition-case
(let-values (((inp oup)(tcp-connect host port)))
(let ((res (if (and inp oup)
(begin
(serialize dat oup)
(close-output-port oup)
(deserialize inp))
)))
(close-input-port inp)
(match res
((result exn-result stdout-result)
(if exn-result
(full-err-print exn-result "ERROR: Server side exception detected"))
(if stdout-result
(debug:print 0 *default-log-port* "ERROR: Output detected on stdout on server side execution => "stdout-result))
result)
(else
(debug:print 0 *default-log-port* "ERROR: server returned non-standard output: "res)
#f))))
(exn (io-error)
(full-err-print exn "ERROR: i/o error")
(tt:backoff-incr host port)
#f)
(exn (i/o net)
(if ping-mode
#f
(cond
((> tries-remaining 4) ;; server likely defunct
(tt:backoff-incr host port)
#f)
((>= tries-remaining 0)
(let* ((backoff-delay (max (* (- 26 tries-remaining) 0.1) 1.0)))
(debug:print 0 *default-log-port* "WARNING: TCP overload, trying again in "backoff-delay"s.")
(thread-sleep! backoff-delay)
(tt:backoff-incr host port)
(retry))
;; (assert #f "FATAL: Too many retries in tt:send-receive-direct")
)
(else #f))))
(exn ()
(full-err-print exn "Unhandled exception from client side.")
#f))))
;;======================================================================
;; server
;;======================================================================
(define (tt:sync-dbs ttdat)
#f)
;; start the listener and start responding to requests
;;
;; NOTE: organise by dbfname, not run-id so we don't need
;; to pull in more modules
;;
;; This is the routine called in megatest.scm to start a server
;;
;; Server viability is checked in keep-running. Blindly start and run here.
;;
(define (tt:start-server areapath run-id dbfname-in handler keys)
(assert areapath "FATAL: areapath not provided for tt:start-server")
;; is there already a server for this dbfile? Then exit.
(debug:print 2 *default-log-port* "tt:start-server: " dbfname-in)
(let* ((ttdat (make-tt areapath: areapath))
(dbfname (or dbfname-in (dbmod:run-id->dbfname run-id)))
(servers (tt:find-server areapath dbfname))) ;; should use tt:get-current-server-info instead
(debug:print 0 *default-log-port* "Found " (length servers) " already running for " dbfname)
(if (> (length servers) 0)
(begin
(debug:print 0 *default-log-port* "INFO: found server(s) already running for db "dbfname", "(string-intersperse servers ",")" Exiting.")
(exit))
(let* ((dbstruct (dbmod:open-dbmoddb areapath run-id dbfname (dbfile:db-init-proc) keys)))
(tt-handler-set! ttdat (handler dbstruct))
(let* ((tcp-thread (make-thread
(lambda ()
(tt:start-tcp-server ttdat)) ;; start the tcp-server which applies handler to incoming data
"tcp-server-thread"))
(run-thread (make-thread
(lambda ()
(tt:keep-running ttdat dbfname dbstruct)))))
(thread-start! tcp-thread)
(thread-start! run-thread)
(let* ((areapath (tt-areapath ttdat))
(nosyncdbpath (conc areapath"/.mtdb")))
;; this didn't seem to work, is port not available yet?
(let loop ((count 0))
(if (tt-port ttdat)
(begin
(procinf-port-set! *procinf* (tt-port ttdat))
(procinf-dbname-set! *procinf* dbfname)
(dbfile:with-no-sync-db
nosyncdbpath
(lambda (nsdb)
(dbfile:insert-or-update-process nsdb *procinf*))))
(if (< count 5)
(begin
(thread-sleep! 0.5)
(loop (+ count 1)))
(debug:print 0 *default-log-port* "ERROR: (tt-port ttdat) no port set!"))))
(thread-join! run-thread) ;; run thread will exit on timeout or other conditions
;; replace with call to (dbfile:set-process-done nsdb host pid reason)
(procinf-status-set! *procinf* "done")
(procinf-end-set! *procinf* (current-seconds))
(dbfile:with-no-sync-db
nosyncdbpath
(lambda (nsdb)
(dbfile:insert-or-update-process nsdb *procinf*)))
(debug:print 0 *default-log-port* "Exiting now.")
(exit)))))))
(define (tt:keep-running ttdat dbfname dbstruct)
;; verfiy conn for ready
;; listener socket has been started by this stage
;; wait for a port before creating the registration file
;;
(let* ((db-locked-in #f)
(areapath (tt-areapath ttdat))
(nosyncdbpath (conc areapath"/.mtdb"))
(cleanup (lambda ()
(if (tt-cleanup-proc ttdat)
((tt-cleanup-proc ttdat)))
(dbfile:with-no-sync-db nosyncdbpath
(lambda (db)
(let* ((dbtmpname (dbr:dbstruct-dbtmpname dbstruct)))
(debug:print-info 0 *default-log-port* "keep-running: removing lock for file "dbtmpname)
(db:no-sync-del! db dbfname)
))))))
(set! *server-info* ttdat)
(let loop ((count 0))
(if (> count 240)
(begin
(debug:print 0 *default-log-port* "FATAL: Could not start a tcp server, giving up.")
(exit 1))
(if (not (tt-port ttdat)) ;; no connection yet
(begin
(thread-sleep! 0.25)
(loop (+ count 1))))))
(tt:create-server-registration-file ttdat dbfname)
;; now start watching the last-access, if it hasn't been touched
;; in over ten seconds we exit
(thread-sleep! 0.05) ;; any real need for delay here?
(let loop ()
(let* ((servers (tt:get-server-info-sorted ttdat dbfname))
(ok (cond
((null? servers) #f) ;; not ok
((equal? (list-ref (car servers) 6) ;; compare the servinfofile
(tt-servinf-file ttdat))
(let* ((res (if db-locked-in
#t
(let* ((lock-result ;; this is the primary lock - need to double verify that got it
(dbfile:with-no-sync-db
nosyncdbpath
(lambda (db)
(db:no-sync-lock-and-check db dbfname
(tt-servinf-file ttdat)
;; (dbr:dbstruct-dbtmpname dbstruct)
))))
(success (car lock-result)))
(if success
(begin
(tt-state-set! ttdat 'running)
(debug:print 0 *default-log-port* "Got server lock for " dbfname)
(set! db-locked-in #t)
#t)
(begin
(debug:print 0 *default-log-port* "Failed to get server lock for "dbfname)
#f))))))
(if (and res (common:low-noise-print 120 "top server message"))
(debug:print-info 0 *default-log-port* "Keep running, I'm the top server for "
dbfname" on "(tt-host ttdat)":"(tt-port ttdat)))
res))
(else
;; wrong servinfo file
(debug:print-info 0 *default-log-port* "I'm not the lead server: "servers)
(let* ((leadsrv (car servers)))
(match leadsrv
((host port startseconds server-id pid dbfname servinfofile)
(let* ((result (tt:timed-ping host port server-id))
(res (car result))
(ping (cdr result)))
(debug:print-info 0 *default-log-port* "Ping to "host":"port", with server-id "server-id
", and file "servinfofile" returned "res)
(if res
#f ;; not the server, but all good, want to exit
(if (and (file-exists? servinfofile)
(> (- (current-seconds)(file-modification-time servinfofile)) 30))
(begin
;; can't ping and file has been on disk 15 seconds, go ahead and try to remove it
(debug:print-info 0 *default-log-port* "Removing apparently dead server info file: "servinfofile)
(handle-exceptions
exn
(debug:print-info 0 *default-log-port* "Error removing server info file: "servinfofile)
(delete-file* servinfofile)
)
#t) ;; not the server but the server is not reachable
(begin
(debug:print 0 *default-log-port* "I'm not the server but could not ping "host":"port", will try again.")
(thread-sleep! 1) ;; just because
#t)))))
(else ;; should never get here
(debug:print 0 *default-log-port* "BAD SERVER RECORD: "leadsrv)
(assert #f "Bad server record "leadsrv))))))))
(if ok
(tt-last-access-set! ttdat *db-last-access*) ;; bit silly, just use db-last-access
(begin
(debug:print 0 *default-log-port* "Exiting immediately")
(cleanup)
(exit)))
(let* ((last-update (dbr:dbstruct-last-update dbstruct))
(curr-secs (current-seconds)))
(if (and (eq? (tt-state ttdat) 'running)
(> (- curr-secs last-update) 3)) ;; every 3-4 seconds update the db?
(begin
(set! (file-modification-time (tt-servinf-file ttdat)) (current-seconds))
((dbr:dbstruct-sync-proc dbstruct) last-update)
(dbr:dbstruct-last-update-set! dbstruct curr-secs))))
(if (< (- (current-seconds) (tt-last-access ttdat)) (tt-server-timeout-param))
(begin
(thread-sleep! 5)
(loop)))))
(cleanup)
(debug:print 0 *default-log-port* "INFO: Server timed out, exiting from tt:keep-running.")))
;; ;; given an already set up uconn start the cmd-loop
;; ;;
;; (define (tt:cmd-loop ttdat)
;; (let* ((serv-listener (-socket uconn))
;; (listener (lambda ()
;; (let loop ((state 'start))
;; (let-values (((inp oup)(tcp-accept serv-listener)))
;; ;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP
;; (let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params)
;; (resp (ulex-handler uconn rdat)))
;; (serialize resp oup)
;; (close-input-port inp)
;; (close-output-port oup)
;; ;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP
;; )
;; (loop state))))))
;; ;; start N of them
;; (let loop ((thnum 0)
;; (threads '()))
;; (if (< thnum 100)
;; (let* ((th (make-thread listener (conc "listener" thnum))))
;; (thread-start! th)
;; (loop (+ thnum 1)
;; (cons th threads)))
;; (map thread-join! threads)))))
;;
;;
;;
;; (define (wait-and-close uconn)
;; (thread-join! (udat-cmd-thread uconn))
;; (tcp-close (udat-socket uconn)))
;;
;;
(define (tt:shutdown-server ttdat)
(let* ((cleanproc (tt-cleanup-proc ttdat))
(port (tt-port ttdat)))
(tt-state-set! ttdat 'shutdown)
(portlogger:open-run-close portlogger:set-port port "released")
(if cleanproc (cleanproc))
(tcp-close (tt-socket ttdat)) ;; close up ports here
))
;; (define (wait-and-close uconn)
;; (thread-join! (tt-cmd-thread uconn))
;; (tcp-close (tt-socket uconn)))
;; return servid
;; side-effects:
;; ttdat-cleanup-proc is populated with function to remove the serverinfo file
(define (tt:create-server-registration-file ttdat dbfname)
(let* ((areapath (tt-areapath ttdat))
(servdir (tt:get-servinfo-dir areapath))
(host (tt-host ttdat))
(port (tt-port ttdat))
(servinf (conc servdir"/"host":"port"-"(current-process-id)":"dbfname))
(serv-id (tt:mk-signature areapath))
(clean-proc (lambda ()
(delete-file* servinf)
)))
(assert (and host port) "FATAL: tt:create-server-registration-file called with no conn, dbfname="dbfname)
(tt-cleanup-proc-set! ttdat clean-proc)
(tt-servinf-file-set! ttdat servinf)
(with-output-to-file servinf
(lambda ()
(print "SERVER STARTED: "host":"port" AT "(current-seconds)" server-id: "serv-id" pid: "(current-process-id)" dbfname: "dbfname)))
serv-id))
;; find valid server
;; get servers listed, last part of name must match :<dbfname>
;; if more than one, wait one second and look again
;; future: ping oldest, if alive remove other :<dbfname> files
;;
(define (tt:find-server areapath dbfname)
(let* ((servdir (tt:get-servinfo-dir areapath))
(sfiles (glob (conc servdir"/*:"dbfname)))
(good-files '()))
(for-each
(lambda (sfile)
(let* ((sinfo (tt:server-get-info sfile))
(host (list-ref sinfo 0))
(port (list-ref sinfo 1))
(server-id (list-ref sinfo 3))
(pid (list-ref sinfo 4))
(status (system (conc "ssh " host " ps " pid " > /dev/null")))
)
(if (= status 0)
(set! good-files (cons sfile good-files))
(delete-file* sfile)
)
)
)
sfiles
)
(debug:print-info 2 *default-log-port* "tt:find-server: good-files: " good-files " sfiles: " sfiles)
good-files))
;; given a path to a server info file return: host port startseconds server-id pid dbfname logf
;; example of what it's looking for in the log file:
;; SERVER STARTED: 10.38.175.67:50216 AT 1616502350.0 server-id: 4907e90fc55c7a09694e3f658c639cf4
;;
(define (tt:server-get-info logf)
(let ((server-rx (regexp "^SERVER STARTED: (\\S+):(\\d+) AT ([\\d\\.]+) server-id: (\\S+) pid: (\\d+) dbfname: (\\S+)")) ;; SERVER STARTED: host:port AT timesecs server id
(bad-dat (list #f #f #f #f #f #f logf)))
(let ((fdat (handle-exceptions
exn
(begin
;; WARNING: this is potentially dangerous to blanket ignore the errors
(debug:print-info 0 *default-log-port* "Unable to get server info from "logf", exn="(condition->list exn))
'()) ;; no idea what went wrong, call it a bad server, return empty list
(with-input-from-file logf read-lines))))
(if (null? fdat) ;; bad data, return bad-dat
bad-dat
(let loop ((inl (car fdat))
(tail (cdr fdat))
(lnum 0))
(let ((mlst (string-match server-rx inl)))
(if (not mlst)
(if (> lnum 500) ;; give up if more than 500 lines of server log read
bad-dat
(if (null? tail)
bad-dat
(loop (car tail)(cdr tail)(+ lnum 1))))
(match mlst ;; have a not null list
((_ host port start server-id pid dbfname)
(list host
(string->number port)
(string->number start)
server-id
(string->number pid)
dbfname
logf))
(else
(debug:print 0 *default-log-port* "ERROR: did not recognise SERVER line info "mlst)
bad-dat)))))))))
;; Given an area path, start a server process ### NOTE ### > file 2>&1
;; if the target-host is set
;; try running on that host
;; incidental: rotate logs in logs/ dir.
;;
(define (tt:server-process-run areapath testsuite mtexe run-id #!key (profile-mode "")) ;; areapath is *toppath* for a given testsuite area
(assert areapath "FATAL: tt:server-process-run called without areapath defined.")
(assert testsuite "FATAL: tt:server-process-run called without testsuite defined.")
(assert mtexe "FATAL: tt:server-process-run called without mtexe defined.")
;; mtest -server - -m testsuite:ext-tests -db 6.db
(let* ((dbfname (dbmod:run-id->dbfname run-id))
(load (get-normalized-cpu-load))
(trying (length (tt:find-server areapath dbfname)))
(nrun (number-of-processes-running (conc "mtest.*server.*"testsuite".*"dbfname))))
(cond
((> load 2.0)
(debug:print 0 *default-log-port* "Normalized load "load" on " (get-host-name) " is over the limit of 2.0. Not starting a server.")
(thread-sleep! 1))
((> nrun 100)
(debug:print 0 *default-log-port* nrun" servers running on " (get-host-name) ", not starting another.")
(thread-sleep! 1))
((> trying 4)
(debug:print 0 *default-log-port* trying" servers registered in .servinfo dir. not starting another.")
(thread-sleep! 1))
(else
(if (not (file-exists? (conc areapath"/logs")))
(create-directory (conc areapath"/logs") #t))
(let* ((logfile (conc areapath "/logs/server-"dbfname"-"(current-process-id)".log")) ;; -" curr-pid "-" target-host ".log"))
(cmdln (conc
mtexe
" -startdir "areapath
" -server - ";; (or target-host "-")
" -m testsuite:"testsuite
" -db "dbfname ;; (dbmod:run-id->dbfname run-id)
" " profile-mode
(conc " >> " logfile " 2>&1 &"))))
;; we want the remote server to start in *toppath* so push there
;; (push-directory areapath) ;; use cd in the command line instead
(debug:print 2 *default-log-port* "INFO: Trying to start server in tcp mode (" cmdln ") at "(common:human-time)" for "areapath)
;; (debug:print 0 *default-log-port* "INFO: starting server at " (common:human-time))
(system cmdln)
;; ;; use below to go back to nbfake - nbfake does cause trouble ...
;; (setenv "NBFAKE_QUIET" "yes") ;; BUG: change to with-environment-variable ...
;; (setenv "NBFAKE_LOG" logfile)
;; (system (conc "cd "areapath" ; nbfake " cmdln))
;; (unsetenv "NBFAKE_QUIET")
;; (unsetenv "NBFAKE_LOG")
;;(pop-directory)
)))))
;;======================================================================
;; tcp connection stuff
;;======================================================================
;; find a port and start tcp-server. This only starts the tcp portion of
;; the server, look at (tt:start-server ...) above for the entry point
;; for the entire server system
;;
(define (tt:start-tcp-server ttdat)
(setup-listener-portlogger ttdat) ;; set up tcp-listener
(let* ((socket (tt-socket ttdat))
(handler (tt-handler ttdat)) ;; the handler comes from our client setting a handler function
(handler-proc (lambda ()
(let* ((indat (deserialize)) ;; could use: (thread-terminate! (current-thread))
(result #f)
(exn-result #f)
(stdout-result (with-output-to-string
(lambda ()
(let ((res (handle-exceptions
exn
(let* ((errdat (condition->list exn)))
(set! exn-result errdat)
(debug:print 0 *default-log-port* "ERROR: handler exception, these are bad, will exit in five seconds.")
(pp errdat *default-log-port*)
;; these are always bad, set up an exit thread
(thread-start! (make-thread (lambda ()
(thread-sleep! 5)
(exit))))
#f)
(handler indat) ;; this is the proc being called by the remote client
)))
(set! result res)))))
(full-result (list result exn-result (if (equal? stdout-result "") #f stdout-result))))
(handle-exceptions
exn
(begin
(debug:print 0 *default-log-port* "Serialization failure. full-result="full-result)
(thread-start! (make-thread (lambda ()
(thread-sleep! 5)
(exit))))) ;; (serialize '(#f #f #f)) ;; doesn't work - the first call to serialize caused failure
(serialize full-result))))))
((make-tcp-server socket handler-proc)
#f ;; yes, send error messages to std-err
)))
;; create a tcp listener and return a populated udat struct with
;; my port, address, hostname, pid etc.
;; return #f if fail to find a port to allocate.
;;
;; if udata-in is #f create the record
;; if there is already a serv-listener return the udata
;;
;; (define (setup-listener uconn #!optional (port 4242))
;; (assert (tt? uconn) "FATAL: setup-listener called with wrong struct "uconn)
;; (handle-exceptions
;; exn
;; (if (< port 65535)
;; (begin
;; (thread-sleep! 0.25)
;; (setup-listener uconn (+ port 1)))
;; #f)
;; (connect-listener uconn port)))
(define (setup-listener-portlogger uconn)
(let ((port (portlogger:open-run-close portlogger:find-port)))
(assert (tt? uconn) "FATAL: setup-listener called with wrong struct "uconn)
(handle-exceptions
exn
(if (< port 65535)
(begin
(portlogger:open-run-close portlogger:set-failed port)
(thread-sleep! 0.25)
(setup-listener-portlogger uconn))
(begin
(assert #t "setup-listener-portlogger: could not get a port")
#f
)
)
(connect-listener uconn port))))
(define (connect-listener uconn port)
;; (tcp-listener-socket LISTENER)(socket-name so)
;; sockaddr-address, sockaddr-port, sockaddr->string
(let* ((tlsn (tcp-listen port 10000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]])
(addr (tt:get-best-guess-address (get-host-name)))) ;; (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname)))
(tt-port-set! uconn port)
(tt-host-set! uconn addr)
(tt-host-port-set! uconn (conc addr":"port))
(tt-socket-set! uconn tlsn)
uconn))
;;======================================================================
;; utils
;;======================================================================
;; Generate a unique signature for this server
(define (tt:mk-signature areapath)
(message-digest-string (md5-primitive)
(with-output-to-string
(lambda ()
(write (list areapath
(current-process-id)
(argv)))))))
(define (tt:get-best-guess-address hostname)
(let ((res #f))
(for-each
(lambda (adr)
(if (not (eq? (u8vector-ref adr 0) 127))
(set! res adr)))
;; NOTE: This can fail when there is no mention of the host in /etc/hosts. FIXME
(vector->list (hostinfo-addresses (hostname->hostinfo hostname))))
(string-intersperse
(map number->string
(u8vector->list
(if res res (hostname->ip hostname)))) ".")))
(define (tt:get-servinfo-dir areapath)
(let* ((spath (conc areapath"/.servinfo")))
(if (not (file-exists? spath))
(create-directory spath #t))
spath))
;;======================================================================
;; network utilities
;;======================================================================
;; NOTE: Look at address-info egg as alternative to some of this
(define (rate-ip ipaddr)
(regex-case ipaddr
( "^127\\..*" _ 0 )
( "^(10\\.0|192\\.168)\\..*" _ 1 )
( else 2 ) ))
;; Change this to bias for addresses with a reasonable broadcast value?
;;
(define (ip-pref-less? a b)
(> (rate-ip a) (rate-ip b)))
(define (get-my-best-address)
(let ((all-my-addresses (get-all-ips)))
(cond
((null? all-my-addresses)
(get-host-name)) ;; no interfaces?
((eq? (length all-my-addresses) 1)
(car all-my-addresses)) ;; only one to choose from, just go with it
(else
(car (sort all-my-addresses ip-pref-less?))))))
(define (get-all-ips-sorted)
(sort (get-all-ips) ip-pref-less?))
(define (get-all-ips)
(map address-info-host
(filter (lambda (x)
(equal? (address-info-type x) "tcp"))
(address-infos (get-host-name)))))
)