;;======================================================================
;; Copyright 2017, Matthew Welland.
;;
;; This file is part of Megatest.
;;
;; Megatest is free software: you can redistribute it and/or modify
;; it under the terms of the GNU General Public License as published by
;; the Free Software Foundation, either version 3 of the License, or
;; (at your option) any later version.
;;
;; Megatest is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
;; GNU General Public License for more details.
;;
;; You should have received a copy of the GNU General Public License
;; along with Megatest. If not, see <http://www.gnu.org/licenses/>.
;;======================================================================
(declare (unit tcp-transportmod))
(declare (uses debugprint))
(declare (uses commonmod))
(declare (uses dbfile))
(declare (uses dbmod))
(declare (uses portlogger))
(declare (uses mtmod))
(use address-info tcp)
(module tcp-transportmod
(
make-tt
tt:get-server-info-sorted
tt:ping
tt:find-server
tt:start-server
tt:get-servinfo-dir
tt-server-timeout-param
tt:mk-signature
tt-state
tt:server-process-run
tt:make-remote
tt-ro-mode-checked-set!
tt-ro-mode-set!
tt-ro-mode
tt-ro-mode-checked
tt:handler
tt:get-conn
)
(import scheme)
(cond-expand
(chicken-4
(import (prefix sqlite3 sqlite3:)
chicken
extras
hostinfo
ports
posix
files
data-structures
tcp
))
(chicken-5
(import chicken.base
chicken.condition
chicken.file
chicken.pathname
chicken.process-context.posix
chicken.process
chicken.sort
chicken.string
chicken.time
chicken.tcp
chicken.random
chicken.file.posix
chicken.pretty-print
chicken.io
chicken.port
chicken.process-context
system-information)
(define unsetenv unset-environment-variable!)
))
(import address-info
directory-utils
matchable
md5
message-digest
regex
regex-case
s11n
srfi-1
srfi-18
srfi-4
srfi-69
stack
typed-records
tcp-server
debugprint
commonmod
dbfile
dbmod
mtmod
portlogger
)
;;======================================================================
;; client
;;======================================================================
;; (define keep-age-param (make-parameter 10)) ;; qif file age, if over move to attic
;; Used ONLY for client
;;
(defstruct tt-conn
host
port
host-port
dbfname
server-id
server-start
servinf-file
pid
)
;; Used for BOTH clients and servers
(defstruct tt
;; client related
(conns (make-hash-table)) ;; dbfname -> conn
;; server related
(state 'starting)
(areapath #f)
(host #f)
(port #f)
(conn #f)
(cleanup-proc #f)
(handler #f) ;; receives data and responds
(socket #f)
(thread #f)
(host-port #f)
(cmd-thread #f)
(ro-mode #f)
(ro-mode-checked #f)
(last-access (current-seconds))
(servinf-file #f)
(last-serv-start 0)
)
;; parameters
;;
(define tt-server-timeout-param (make-parameter 600))
;; make ttdat visible
;; (define *server-info* #f) ;; get this from commonmod
(define *server-run* #t)
(define (tt:make-remote areapath)
(make-tt areapath: areapath))
;; 1 ... or #f
;; and check that dbfname matches. FIXME: the propagation of dbfname and run-id
;; might not make the best sense
;;
(define (tt:valid-run-id run-id dbfname)
(and (or (number? run-id)
(not run-id))
(equal? (dbfile:run-id->dbfname run-id) dbfname)))
(tcp-buffer-size 2048)
;; (max-connections 4096)
(define (tt:get-conn ttdat dbfname)
(hash-table-ref/default (tt-conns ttdat) dbfname #f))
;; do all the busy work of finding and setting up conn for
;; connecting to a server
;;
(define (tt:client-connect-to-server ttdat dbfname run-id testsuite server-start-proc)
(assert (tt:valid-run-id run-id dbfname) "FATAL: invalid run-id "run-id)
(debug:print-info 2 *default-log-port* "tt:client-connect-to-server " dbfname " " run-id)
(let* ((conn (tt:get-conn ttdat dbfname))
(server-start-proc (or server-start-proc
(lambda ()
(assert (equal? dbfname "main.db") ;; only main.db is started here
"FATAL: called server-start-proc for db other than main.db")
(tt:server-process-run
(tt-areapath ttdat)
testsuite ;; (dbfile:testsuite-name)
(common:find-local-megatest)
run-id)))))
(if conn
(begin
(debug:print-info 2 *default-log-port* "already connected to a server")
conn) ;; we are already connected to the server
;; no conn
(let* ((sdats (tt:get-server-info-sorted ttdat dbfname))
(sdat (if (null? sdats)
#f
(car sdats))))
(debug:print-info 2 *default-log-port* "found sdat " sdat)
(match sdat
((host port start-time server-id pid dbfname2 servinffile)
(assert (equal? dbfname dbfname2) "FATAL: read server info from wrong file.")
(debug:print-info 2 *default-log-port* "no conn - in match servinffile:" servinffile)
(let* ((host-port (conc host":"port))
(conn (make-tt-conn
host: host
port: port
host-port: host-port
dbfname: dbfname
servinf-file: servinffile
server-id: server-id
server-start: start-time
pid: pid)))
;; verify we can talk to this server
(let* ((result (tt:timed-ping host port server-id))
(ping-res (car result))
(ping (cdr result)))
(debug:print-info 2 *default-log-port* "host " host " port " port " ping time: " ping " result " ping-res)
(case ping-res
((running)
(debug:print-info 2 *default-log-port* "Setting conn = " conn " in hash table")
(hash-table-set! (tt-conns ttdat) dbfname conn) ;;; is this ok to save before validating that the connection is good?
conn)
((starting)
(thread-sleep! 0.5)
(debug:print-info 0 *default-log-port* "server for " dbfname " is in starting state, retrying connect")
(tt:client-connect-to-server ttdat dbfname run-id testsuite server-start-proc))
(else
(let* ((curr-secs (current-seconds)))
;; rm the (last server) would go here
(if (> (- curr-secs (tt-last-serv-start ttdat)) 10)
(begin
(debug:print-info 0 *default-log-port* "Unreachable server at "
host":"port" with servinfo file "servinffile", removing it")
(if (file-exists? servinffile)
(handle-exceptions
exn
#f
(delete-file servinffile)))
(tt-last-serv-start-set! ttdat curr-secs)
(debug:print-info 0 *default-log-port* "Starting a new server on " (get-host-name))
(server-start-proc))) ;; start server if 10 sec since last attempt
(thread-sleep! 1)
(debug:print-info 0 *default-log-port* "Retrying connect")
(tt:client-connect-to-server ttdat dbfname run-id testsuite server-start-proc)))))))
(else ;; no good server found, if haven't started server in > 5 secs, start another
(if (> (- (current-seconds) (tt-last-serv-start ttdat)) 3) ;; BUG - grow this number really do not want to swamp the machine with servers
(begin
(debug:print-info 0 *default-log-port* "Starting server for "dbfname " on " (get-host-name))
(server-start-proc)
(tt-last-serv-start-set! ttdat (current-seconds))
(thread-sleep! 6)
))
(thread-sleep! 1)
(debug:print-info 0 *default-log-port* "Connect to server from " (get-host-name) " for " dbfname)
(tt:client-connect-to-server ttdat dbfname run-id testsuite server-start-proc)))))))
;; returns ( result . ping_time )
(define (tt:timed-ping host port server-id)
(let* ((start-time (current-milliseconds))
(result (tt:ping host port server-id)))
(cons result (- (current-milliseconds) start-time))))
(define (tt:ping host port server-id #!optional (tries-left 5))
(let* ((res (tt:send-receive-direct host port `(ping #f #f #f) ping-mode: #t)) ;; please send me your server-id
(try-again (lambda ()
(if (> tries-left 0)
(begin
(thread-sleep! 1)
(tt:ping host port server-id (- tries-left 1)))
#f))))
;;
;; need two threads, one a 5 second timer
;;
(match res
((status errmsg result meta)
(if (equal? result server-id)
(let* ((server-state (alist-ref 'sstate meta)))
;; (debug:print 0 *default-log-port* "Ping to "host":"port" successful.")
(or server-state 'unk)) ;; then we are good
(begin
(debug:print 0 *default-log-port* "WARNING: server-id does not match, expected: "server-id", got: "result)
#f)))
(else
;; (debug:print 0 *default-log-port* "res not in form (status errmsg result meta), got: "res)
(try-again)))))
;; client side handler
;;
;;(tt:handler #<tt> get-keys #f () 2 #f "/home/matt/data/megatest/ext-tests" #f "main.db" "ext-tests" "/home/matt/data/megatest/bin/.22.04/../megatest")
;;
(define (tt:handler ttdat cmd run-id params attemptnum readonly-mode dbfname testsuite mtexe server-start-proc)
;; connect-to-server will start a server if needed.
(let* ((areapath (tt-areapath ttdat))
(conn (tt:client-connect-to-server ttdat dbfname run-id testsuite server-start-proc))) ;; looks up conn keyed by dbfname
(if conn
;; have connection, call the server
(let* ((res (tt:send-receive ttdat conn cmd run-id params)))
;; res is (status errmsg result meta)
(match res
((status errmsg result meta)
(if (list? meta)
(let* ((delay-wait (alist-ref 'delay-wait meta)))
(if (and (number? delay-wait)
(> delay-wait 0))
(begin
(debug:print 0 *default-log-port* "Server is loaded, delaying "delay-wait" seconds")
(thread-sleep! delay-wait)))))
(case status
((busy) ;; result will be how long the server wants you to delay
(let* ((raw-dly (if (number? result) result 0.1))
(dly (+ raw-dly (/ attemptnum 10)))) ;; (* raw-dly (/ attemptnum 2))))
(debug:print 0 *default-log-port* "WARNING: server for "dbfname" is busy, cmd is "cmd", will try again in "dly" seconds. This is attempt "(- attemptnum 1))
(thread-sleep! dly)
(tt:handler ttdat cmd run-id params (+ attemptnum 1) readonly-mode dbfname testsuite mtexe server-start-proc)))
((loaded)
(debug:print 0 *default-log-port* "WARNING: server for "dbfname" is loaded, slowing queries.")
(tt:backoff-incr (tt-conn-host conn)(tt-conn-port conn))
result) ;; (tt:handler ttdat cmd run-id params (+ attemptnum 1) readonly-mode dbfname testsuite mtexe))
(else
result)))
(else ;; did not receive properly formated result
(if (not res) ;; tt:send-receive telling us that communication failed
(let* ((host (tt-conn-host conn))
(port (tt-conn-port conn))
;; (dbfname (tt-conn-port conn)) ;; 192.168.0.127:4242-726924:4.db
(pid (tt-conn-pid conn))
;;(servinf (tt-conn-servinf-file conn)))
(servinf (tt-servinf-file ttdat))) ;; (conc areapath"/.servinfo/"host":"port"-"pid":"dbfname))) ;; TODO, use (server:get-servinfo-dir areapath)
(hash-table-set! (tt-conns ttdat) dbfname #f) ;; clear out the conn for this dbfname to force finding new server
(if (and servinf (file-exists? servinf))
(begin
(if (< attemptnum 10)
(begin
(thread-sleep! 0.5)
(tt:handler ttdat cmd run-id params (+ attemptnum 1) readonly-mode dbfname testsuite mtexe server-start-proc))
(begin
(debug:print 0 *default-log-port* "INFO: no response from server "host":"port" for "dbfname)
(if (and (file-exists? servinf)
(> (- (current-seconds)(file-modification-time servinf)) 60))
(begin
(debug:print 0 *default-log-port* "INFO: "servinf" file seems old and no ping response, removing it.")
(handle-exceptions
exn
#f
(delete-file* servinf))
(tt:handler ttdat cmd run-id params (+ attemptnum 1) readonly-mode dbfname testsuite mtexe server-start-proc))
(begin
;; start server - addressed in client-connect-to-server
;; delay - addressed in client-connect-to-server
;; try again
(thread-sleep! 0.25) ;; dunno, I think this needs to be here
(tt:handler ttdat cmd run-id params (+ attemptnum 1) readonly-mode dbfname testsuite mtexe server-start-proc))
))))
(begin ;; no server file, delay and try again
(debug:print 2 *default-log-port* "INFO: connection to server "host":"port" broken for "dbfname", no servinf file. Server exited? ")
(thread-sleep! 0.5)
(tt:handler ttdat cmd run-id params (+ attemptnum 1) readonly-mode dbfname testsuite mtexe server-start-proc))))
(begin ;; this case is where res is malformed. Probably should abort
(assert #f "FATAL: tt:handler received bad data "res)
;; (debug:print 0 *default-log-port* "INFO: got corrupt data from server "host":"port", "res", for "dbfname", will try again.")
;; (tt:handler ttdat cmd run-id params (+ attemptnum 1) readonly-mode dbfname testsuite mtexe)
)))))
(begin
(thread-sleep! 1) ;; no conn yet set up, give it a rest and try again
(tt:handler ttdat cmd run-id params attemptnum readonly-mode dbfname testsuite mtexe server-start-proc)))))
;; gets server info and appends path to server file
;; sorts by age, oldest first
;;
;; returns list of (host port startseconds server-id servinfofile)
;;
(define (tt:get-server-info-sorted ttdat dbfname)
(let* ((areapath (tt-areapath ttdat))
(sfiles (tt:find-server areapath dbfname))
(sdats (filter car (map tt:server-get-info sfiles))) ;; first element is #f if the file disappeared while being read
(sorted (sort sdats (lambda (a b)
(let* ((starta (list-ref a 2))
(startb (list-ref b 2)))
(if (eq? starta startb)
(string>? (list-ref a 3)(list-ref b 3)) ;; if servers started at same time look at server-id
(< starta startb))))))
(count 0))
(for-each
(lambda (rec)
(if (or (> (length sorted) 1)
(common:low-noise-print 120 "server info sorted"))
(debug:print 2 *default-log-port* "SERVER #"count": "(string-intersperse (map conc sorted) ", ")))
(set! count (+ count 1)))
sorted)
sorted))
(define (tt:send-receive ttdat conn cmd run-id params)
(let* ((host-port (tt-conn-host-port conn)) ;; (conc (tt-conn-host conn)":"(tt-conn-port conn)))
(host (tt-conn-host conn))
(port (tt-conn-port conn))
(dat (list cmd run-id params #f))) ;; no meta data yet
(tt:send-receive-direct host port dat)))
(defstruct tt:backoff
(last-ioerr (current-seconds))
(last-adj-t (current-seconds))
(wait-delay 0.1))
(define *tt:backoff-smoothing* (make-hash-table)) ;; host:port => lastaccess backoffdelay )
(define (tt:backoff-incr host port) ;; call if tcp fails i/o net
(let* ((host-port (conc host":"port))
(bkoff (hash-table-ref/default *tt:backoff-smoothing* host-port #f)))
(if bkoff
(begin
(tt:backoff-last-ioerr-set! bkoff (current-seconds))
(tt:backoff-wait-delay-set! bkoff (+ (tt:backoff-wait-delay bkoff) 0.1)))
(hash-table-set! *tt:backoff-smoothing* host-port (make-tt:backoff)))))
(define (tt:backoff-decr-and-wait host port)
(let* ((host-port (conc host":"port))
(bkoff (hash-table-ref/default *tt:backoff-smoothing* host-port #f)))
(if bkoff
(let* ((wait-delay (tt:backoff-wait-delay bkoff))
(last-ioerr (tt:backoff-last-ioerr bkoff))
(last-adj-t (tt:backoff-last-adj-t bkoff))
(delta (- (current-seconds) last-adj-t))
(adj (* delta 0.001)) ;; it takes 100 seconds to recover from hitting an io err
(new-wait (if (> wait-delay 0)
(if (> adj wait-delay)
0
(- wait-delay adj))
0)))
(if (> new-wait 0)
(begin
(if (common:low-noise-print 10 "delay wait message")
(debug:print-info 0 *default-log-port* "Server on host " host " loaded, DelayWait: "new-wait))
(tt:backoff-wait-delay-set! bkoff new-wait)
(tt:backoff-last-adj-t-set! bkoff (current-seconds))
(thread-sleep! new-wait))
(hash-table-delete! *tt:backoff-smoothing* host-port))))))
(define (tt:send-receive-direct host port dat #!key (ping-mode #f)(tries-remaining 25))
(assert (number? port) "FATAL: tt:send-receive-direct called with a port that is not a number "port)
(tt:backoff-decr-and-wait host port)
(let* ((retry (lambda ()
(tt:send-receive-direct host port dat tries-remaining: (- tries-remaining 1))))
(full-err-print (lambda (exn msg)
(if (condition? exn)
(begin
(pp (condition->list exn) *default-log-port*)
(pp dat *default-log-port*)
(debug:print 0 *default-log-port* msg
", error: " ((condition-property-accessor 'exn 'message) exn)
", arguments: " ((condition-property-accessor 'exn 'arguments) exn)
", location: " ((condition-property-accessor 'exn 'location) exn)
))
(debug:print 0 *default-log-port* msg "(note: exn="exn", is not a condition object.")))))
(condition-case
(let-values (((inp oup)(tcp-connect host port)))
(let ((res (if (and inp oup)
(begin
(serialize dat oup)
(close-output-port oup)
(deserialize inp))
)))
(close-input-port inp)
(match res
((result exn-result stdout-result)
(if exn-result
(full-err-print exn-result "ERROR: Server side exception detected"))
(if stdout-result
(debug:print 0 *default-log-port* "ERROR: Output detected on stdout on server side execution => "stdout-result))
result)
(else
(debug:print 0 *default-log-port* "ERROR: server returned non-standard output: "res)
#f))))
(exn (io-error)
(full-err-print exn "ERROR: i/o error")
(tt:backoff-incr host port)
#f)
(exn (i/o net)
(if ping-mode
#f
(cond
((> tries-remaining 4) ;; server likely defunct
(tt:backoff-incr host port)
#f)
((>= tries-remaining 0)
(let* ((backoff-delay (max (* (- 26 tries-remaining) 0.1) 1.0)))
(debug:print 0 *default-log-port* "WARNING: TCP overload, trying again in "backoff-delay"s.")
(thread-sleep! backoff-delay)
(tt:backoff-incr host port)
(retry))
;; (assert #f "FATAL: Too many retries in tt:send-receive-direct")
)
(else #f))))
(exn ()
(full-err-print exn "Unhandled exception from client side.")
#f))))
;;======================================================================
;; server
;;======================================================================
(define (tt:sync-dbs ttdat)
#f)
;; start the listener and start responding to requests
;;
;; NOTE: organise by dbfname, not run-id so we don't need
;; to pull in more modules
;;
;; This is the routine called in megatest.scm to start a server. NOTE: sequence is different for main.db vs. X.db
;;
;; Server viability is checked in keep-running. Blindly start and run here.
;;
(define (tt:start-server areapath run-id dbfname-in handler keys)
(assert areapath "FATAL: areapath not provided for tt:start-server")
(let* ((ttdat (make-tt areapath: areapath))
(dbfname (or dbfname-in (dbmod:run-id->dbfname run-id))))
(set! *server-info* ttdat)
(let* ((dbstruct (dbmod:open-dbmoddb areapath run-id dbfname (dbfile:db-init-proc) keys)))
(tt-handler-set! ttdat (handler dbstruct))
(let* ((servinf-created #f)
(tcp-thread (make-thread
(lambda ()
;; NOTE: tt-port and tt-host are set in connect-listener which is called under tt:start-tcp-server
(tt:start-tcp-server ttdat)) ;; start the tcp-server which applies handler to incoming data
"tcp-server-thread"))
(run-thread (make-thread
(lambda ()
(tt:keep-running ttdat dbfname dbstruct)))))
(thread-start! tcp-thread)
(let* ((areapath (tt-areapath ttdat))
(nosyncdbpath (conc areapath"/.mtdb"))
(servers ;; (tt:find-server areapath dbfname)))
(tt:get-server-info-sorted ttdat dbfname)) ;; (host port startseconds server-id servinfofile)
(good-srvrs
;; contact servers via ping, if no response remove the .servinfo file
(let loop ((servrs servers)
(prime-host #f)
(result '()))
(if (null? servrs)
(reverse result)
(let* ((servdat (car servrs)))
(match servdat
((host port startseconds server-id servinfofile)
(let* ((ping-res (tt:timed-ping host port server-id))
(good-ping (match ping-res
((result . ping-time)
(not result)) ;; we couldn't reach the server or it was not a megatest server
(else #f))) ;; the ping failed completely?
(same-host (or (not prime-host) ;; i.e. this is the first host
(equal? prime-host host)))
(keep-srv (and good-ping same-host)))
(if keep-srv
(loop (cdr servrs)
host
(cons servdat result))
(begin
(handle-exceptions
exn
(debug:print-info 0 *default-log-port* "Error removing server info file: "servinfofile", "
(condition->list exn))
(delete-file* servinfofile))
(loop (cdr servrs) prime-host result)))))
(else
;; can't delete it as we don't have a filename. NOTE: Should really never get here.
(debug:print-info 0 *default-log-port* "ERROR: bad servinfo record \""servdat"\"")
(loop (cdr servrs) prime-host result)) ;; drop
)))))
(home-host (if (null? good-srvrs)
#f
(caar good-srvrs))))
;; by here we have a trustworthy list of servers and we have removed the .servinfo file for any unresponsive servers
;; and the list is in good-srvrs
(cond
((not home-host) ;; no servers yet, go ahead and start
(debug:print-info 0 *default-log-port* "No servers yet, starting on "(get-host-name)))
((> (length good-srvrs) 2) ;; don't need more, just exit
(debug:print-info 0 *default-log-port* "Have "(length good-srvrs)", no need for more, exiting.")
(exit))
((not (equal? home-host (get-host-name))) ;; there is a home-host and we are not on it
(debug:print-info 0 *default-log-port* "Prime main server is on host "home-host", but we are on host "(get-host-name)", exiting.")
(exit))
(else
(debug:print-info 0 *default-log-port* "Starting on host "(get-host-name)", along with "(length good-srvrs)" other servers.")))
;; this didn't seem to work, is port not available yet?
(let loop ((count 0))
(if (tt-port ttdat)
(begin
(procinf-port-set! *procinf* (tt-port ttdat))
(procinf-dbname-set! *procinf* dbfname)
(dbfile:with-no-sync-db
nosyncdbpath
(lambda (nsdb)
(dbfile:insert-or-update-process nsdb *procinf*))))
(if (< count 10)
(begin
(thread-sleep! 0.25)
(loop (+ count 1)))
(begin
(debug:print 0 *default-log-port* "ERROR: (tt-port ttdat) no port set! Exiting.")
(exit)))))
;; create a servinfo file start keep-running
(debug:print 0 *default-log-port* "Creating servinfo file for " dbfname)
(tt:create-server-registration-file ttdat dbfname)
(procinf-status-set! *procinf* "running")
(tt-state-set! ttdat 'running)
(dbfile:with-no-sync-db
nosyncdbpath
(lambda (nsdb)
(dbfile:insert-or-update-process nsdb *procinf*)))
(thread-start! run-thread)
(thread-join! run-thread) ;; run thread will exit on timeout or other conditions
;; (tcp-close (tt-socket ttdat)) ;; close up ports here
;; replace with call to (dbfile:set-process-done nsdb host pid reason)
(procinf-status-set! *procinf* "done")
(procinf-end-set! *procinf* (current-seconds))
;; either convert this to use set-process-done or get rid of set-process-done
(dbfile:with-no-sync-db
nosyncdbpath
(lambda (nsdb)
(dbfile:insert-or-update-process nsdb *procinf*)))
(debug:print 0 *default-log-port* "Exiting now.")
(exit))))))
(define (tt:keep-running ttdat dbfname dbstruct)
;; at this point the server is running and responding to calls, we just monitor
;; for db calls and exit if there are none.
;; if I am not in the first 3 servers, exit
(let* ((start-time (current-seconds)))
(let loop ()
(let* ((servers (tt:get-server-info-sorted ttdat dbfname))
(home-host (if (null? servers)
#f
(caar servers)))
(my-index (list-index (lambda (x)
(equal? (list-ref x 6)
(tt-servinf-file ttdat)))
servers))
(ok (cond
((not (number? my-index))
(debug:print 0 *default-log-port* "ERROR: bad server data in "servers", might be due to host misconfiguration such as bad IP address in /etc/hosts.")
#f)
((not *server-run*)
(debug:print 0 *default-log-port* "WARNING: received a stop server from client by remote request.")
#f)
((null? servers)
(debug:print 0 *default-log-port* "WARNING: no servinfo files found, this cannot be.")
#f) ;; not ok
((> my-index 2)
(debug:print 0 *default-log-port* "WARNING: there are more than two servers ahead of me, I'm not needed, exiting.")
#f) ;; not ok to not be in first three
((eq? (tt-state ttdat) 'running) #t) ;; we are good to keep going
((> (- (current-seconds) start-time) 30)
(debug:print 0 *default-log-port* "WARNING: over 30 seconds and not yet in runnning mode. Exiting.")
#f)
(else #t))))
(if ok
(tt-last-access-set! ttdat *db-last-access*) ;; bit silly, just use db-last-access
(begin
(debug:print 0 *default-log-port* "Exiting immediately")
(tt:shutdown-server ttdat)
(exit)))
(let* ((last-update (dbr:dbstruct-last-update dbstruct))
(curr-secs (current-seconds)))
(if (and (eq? (tt-state ttdat) 'running)
(> (- curr-secs last-update) 5)) ;; every 5 seconds update the db?
(let* ((sinfo-file (tt-servinf-file ttdat)))
;; (debug:print 0 *default-log-port* "INFO: touching "sinfo-file)
(set! (file-modification-time sinfo-file) (current-seconds))
((dbr:dbstruct-sync-proc dbstruct) last-update)
(dbr:dbstruct-last-update-set! dbstruct curr-secs))))
(if (< (- (current-seconds) (tt-last-access ttdat)) (tt-server-timeout-param))
(begin
(thread-sleep! 5)
(loop)))))
(tt:shutdown-server ttdat)
(debug:print 0 *default-log-port* "INFO: Server timed out, exiting from tt:keep-running.")))
(define (tt:shutdown-server ttdat)
(let* ((host (tt-host ttdat))
(port (tt-port ttdat))
(sinf (tt-servinf-file ttdat)))
(tt-state-set! ttdat 'shutdown)
(portlogger:open-run-close portlogger:set-port port "released")
(if (file-exists? sinf)
(delete-file* sinf))
))
;; return servid
;; side-effects:
;; ttdat-cleanup-proc is populated with function to remove the serverinfo file
(define (tt:create-server-registration-file ttdat dbfname)
(let* ((areapath (tt-areapath ttdat))
(servdir (tt:get-servinfo-dir areapath))
(host (tt-host ttdat))
(port (tt-port ttdat))
(servinf (conc servdir"/"host":"port"-"(current-process-id)":"dbfname))
(serv-id (tt:mk-signature areapath)))
(assert (and host port) "FATAL: tt:create-server-registration-file called with no conn, dbfname="dbfname)
(tt-servinf-file-set! ttdat servinf)
(with-output-to-file servinf
(lambda ()
(print "SERVER STARTED: "host":"port" AT "(current-seconds)" server-id: "serv-id" pid: "(current-process-id)" dbfname: "dbfname)))
serv-id))
;; find valid server
;; get servers listed, last part of name must match :<dbfname>
;; if more than one, wait one second and look again
;; future: ping oldest, if alive remove other :<dbfname> files
;;
(define (tt:find-server areapath dbfname)
(let* ((servdir (tt:get-servinfo-dir areapath))
(sfiles (glob (conc servdir"/*:"dbfname)))
(goodfiles '()))
;; filter the files here by looking in processes table (if we are not main.db)
;; and or look at the time stamp on the servinfo file, a running server will
;; touch the file every minute (again, this will only apply for main.db)
(for-each (lambda (fname)
(let* ((age (- (current-seconds)(file-modification-time fname))))
(if (> age 200) ;; can't trust it if over 200 seconds old
(begin
(debug:print 0 *default-log-port* "WARNING: removing stale servinfo file "fname", it is "age" seconds old")
(handle-exceptions
exn
(debug:print 0 *default-log-port* "WARNING: error attempting to remove stale servinfo file "fname)
(delete-file fname))) ;;
(set! goodfiles (cons fname goodfiles)))))
sfiles)
goodfiles))
;; given a path to a server info file return: host port startseconds server-id pid dbfname logf
;; example of what it's looking for in the log file:
;; SERVER STARTED: 10.38.175.67:50216 AT 1616502350.0 server-id: 4907e90fc55c7a09694e3f658c639cf4
;;
(define (tt:server-get-info logf)
(let ((server-rx (regexp "^SERVER STARTED: (\\S+):(\\d+) AT ([\\d\\.]+) server-id: (\\S+) pid: (\\d+) dbfname: (\\S+)")) ;; SERVER STARTED: host:port AT timesecs server id
(dbprep-rx (regexp "^SERVER: dbprep"))
(dbprep-found 0)
(bad-dat (list #f #f #f #f #f #f logf)))
(let ((fdat (handle-exceptions
exn
(begin
;; BUG, TODO: add err checking, for now blanket ignore the errors?
(debug:print-info 0 *default-log-port* "Unable to get server info from "logf
", exn="(condition->list exn))
'()) ;; no idea what went wrong, call it a bad server, return empty list
(with-input-from-file logf read-lines))))
(if (null? fdat) ;; bad data, return bad-dat
bad-dat
(let loop ((inl (car fdat))
(tail (cdr fdat))
(lnum 0))
(let ((mlst (string-match server-rx inl))
(dbprep (string-match dbprep-rx inl)))
(if dbprep (set! dbprep-found 1))
(if (not mlst)
(if (> lnum 500) ;; give up if more than 500 lines of server log read
bad-dat
(if (null? tail)
bad-dat
(loop (car tail)(cdr tail)(+ lnum 1))))
(match mlst ;; have a not null list
((_ host port start server-id pid dbfname)
(list host
(string->number port)
(string->number start)
server-id
(string->number pid)
dbfname
logf))
(else
(debug:print 0 *default-log-port* "ERROR: did not recognise SERVER line info "mlst)
bad-dat)))))))))
(define *last-server-start* (make-hash-table))
(define (tt:too-recent-server-start dbfname)
(let* ((last-run-time (hash-table-ref/default *last-server-start* dbfname #f)))
(and last-run-time
(< (- (current-seconds) last-run-time) 5))))
;; Given an area path, start a server process ### NOTE ### > file 2>&1
;; if the target-host is set
;; try running on that host
;; incidental: rotate logs in logs/ dir.
;;
(define (tt:server-process-run areapath testsuite mtexe run-id #!key (profile-mode "")) ;; areapath is *toppath* for a given testsuite area
(assert areapath "FATAL: tt:server-process-run called without areapath defined.")
(assert testsuite "FATAL: tt:server-process-run called without testsuite defined.")
(assert mtexe "FATAL: tt:server-process-run called without mtexe defined.")
;; mtest -server - -m testsuite:ext-tests -db 6.db
(let* ((dbfname (dbmod:run-id->dbfname run-id)))
(if (tt:too-recent-server-start dbfname)
#f
(let* ((load (get-normalized-cpu-load))
(srvrs (tt:find-server areapath dbfname))
(trying (length srvrs))
(nrun (number-of-processes-running (conc "mtest.*server.*"testsuite".*"dbfname))))
(cond
((> load 2.0)
(debug:print 0 *default-log-port* "Normalized load "load" on " (get-host-name) " is over the limit of 2.0. Not starting a server. Please reduce the load on "(get-host-name)" by killing some processes")
(thread-sleep! 1)
#f)
((> nrun 100)
(debug:print 0 *default-log-port* nrun" servers running on " (get-host-name) ", not starting another.")
(thread-sleep! 1)
#f)
((> trying 2)
(debug:print 0 *default-log-port* trying" servers registered in .servinfo dir. not starting another.")
(thread-sleep! 1)
#f)
(else
(if (not (file-exists? (conc areapath"/logs")))
(create-directory (conc areapath"/logs") #t))
(let* ((logfile (conc areapath "/logs/server-"dbfname"-"(current-process-id)".log")) ;; -" curr-pid "-" target-host ".log"))
(cmdln (conc
mtexe
" -startdir "areapath
" -server - ";; (or target-host "-")
" -m testsuite:"testsuite
" -db "dbfname ;; (dbmod:run-id->dbfname run-id)
" " profile-mode
#;(conc " >> " logfile " 2>&1 &"))))
;; we want the remote server to start in *toppath* so push there
;; (push-directory areapath) ;; use cd in the command line instead
(debug:print 2 *default-log-port* "INFO: Trying to start server in tcp mode (" cmdln ") at "(common:human-time)" for "areapath)
;; (debug:print 0 *default-log-port* "INFO: starting server at " (common:human-time))
(setenv "NBFAKE_QUIET" "yes") ;; BUG: change to with-environment-variable ...
(setenv "NBFAKE_LOG" logfile)
(system (conc "cd "areapath" ; nbfake " cmdln))
(unsetenv "NBFAKE_QUIET")
(unsetenv "NBFAKE_LOG")
;; (system cmdln)
(hash-table-set! *last-server-start* dbfname (current-seconds))
;; ;; use below to go back to nbfake - nbfake does cause trouble ...
;; (setenv "NBFAKE_QUIET" "yes") ;; BUG: change to with-environment-variable ...
;; (setenv "NBFAKE_LOG" logfile)
;; (system (conc "cd "areapath" ; nbfake " cmdln))
;; (unsetenv "NBFAKE_QUIET")
;; (unsetenv "NBFAKE_LOG")
;;(pop-directory)
#t)))))))
;;======================================================================
;; tcp connection stuff
;;======================================================================
;; find a port and start tcp-server. This only starts the tcp portion of
;; the server, look at (tt:start-server ...) above for the entry point
;; for the entire server system
;;
(define (tt:start-tcp-server ttdat)
(setup-listener-portlogger ttdat) ;; set up tcp-listener
(let* ((socket (tt-socket ttdat))
(handler (tt-handler ttdat)) ;; the handler comes from our client setting a handler function
(handler-proc (lambda ()
(let* ((indat (deserialize)) ;; could use: (thread-terminate! (current-thread))
(result #f)
(exn-result #f)
(stdout-result (with-output-to-string
(lambda ()
(let ((res (handle-exceptions
exn
(let* ((errdat (condition->list exn)))
(set! exn-result errdat)
(debug:print 0 *default-log-port* "ERROR: handler exception, these are bad, will exit in five seconds.")
(pp errdat *default-log-port*)
;; these are always bad, set up an exit thread
(thread-start! (make-thread (lambda ()
(thread-sleep! 5)
(exit))))
#f)
(handler indat) ;; this is the proc being called by the remote client
)))
(set! result res)))))
(full-result (list result exn-result (if (equal? stdout-result "") #f stdout-result))))
(handle-exceptions
exn
(begin
(debug:print 0 *default-log-port* "Serialization failure. full-result="full-result)
(thread-start! (make-thread (lambda ()
(thread-sleep! 5)
(exit))))) ;; (serialize '(#f #f #f)) ;; doesn't work - the first call to serialize caused failure
(serialize full-result))))))
((make-tcp-server socket handler-proc)
#f ;; yes, send error messages to std-err
)))
;; create a tcp listener and return a populated udat struct with
;; my port, address, hostname, pid etc.
;; return #f if fail to find a port to allocate.
;;
;; if udata-in is #f create the record
;; if there is already a serv-listener return the udata
;;
;; (define (setup-listener uconn #!optional (port 4242))
;; (assert (tt? uconn) "FATAL: setup-listener called with wrong struct "uconn)
;; (handle-exceptions
;; exn
;; (if (< port 65535)
;; (begin
;; (thread-sleep! 0.25)
;; (setup-listener uconn (+ port 1)))
;; #f)
;; (connect-listener uconn port)))
(define (setup-listener-portlogger uconn)
(let ((port (portlogger:open-run-close portlogger:find-port)))
(assert (tt? uconn) "FATAL: setup-listener called with wrong struct "uconn)
(debug:print 2 *default-log-port* "setup-listener-portlogger got port " port)
(handle-exceptions
exn
(if (< port 65535)
(begin
(portlogger:open-run-close portlogger:set-failed port)
(thread-sleep! 0.25)
(setup-listener-portlogger uconn))
(begin
(debug:print 0 *default-log-port* "setup-listener-portlogger: could not get a port")
#f
)
)
(connect-listener uconn port))))
(define (connect-listener uconn port)
;; (tcp-listener-socket LISTENER)(socket-name so)
;; sockaddr-address, sockaddr-port, sockaddr->string
(let* ((tlsn (tcp-listen port 10000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]])
(addr (tt:get-best-guess-address (get-host-name)))) ;; (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname)))
(tt-port-set! uconn port)
(tt-host-set! uconn addr)
(tt-host-port-set! uconn (conc addr":"port))
(tt-socket-set! uconn tlsn)
uconn))
;;======================================================================
;; utils
;;======================================================================
;; Generate a unique signature for this server
(define (tt:mk-signature areapath)
(message-digest-string (md5-primitive)
(with-output-to-string
(lambda ()
(write (list areapath
(current-process-id)
(argv)))))))
(define (tt:get-best-guess-address hostname)
(let ((res #f))
(for-each
(lambda (adr)
(if (not (eq? (u8vector-ref adr 0) 127))
(set! res adr)))
;; NOTE: This can fail when there is no mention of the host in /etc/hosts. FIXME
(vector->list (hostinfo-addresses (hostname->hostinfo hostname))))
(string-intersperse
(map number->string
(u8vector->list
(if res res (hostname->ip hostname)))) ".")))
(define (tt:get-servinfo-dir areapath)
(let* ((spath (conc areapath"/.servinfo")))
(if (not (file-exists? spath))
(create-directory spath #t))
spath))
;;======================================================================
;; network utilities
;;======================================================================
;; NOTE: Look at address-info egg as alternative to some of this
(define (rate-ip ipaddr)
(regex-case ipaddr
( "^127\\..*" _ 0 )
( "^(10\\.0|192\\.168)\\..*" _ 1 )
( else 2 ) ))
;; Change this to bias for addresses with a reasonable broadcast value?
;;
(define (ip-pref-less? a b)
(> (rate-ip a) (rate-ip b)))
(define (get-my-best-address)
(let ((all-my-addresses (get-all-ips)))
(cond
((null? all-my-addresses)
(get-host-name)) ;; no interfaces?
((eq? (length all-my-addresses) 1)
(car all-my-addresses)) ;; only one to choose from, just go with it
(else
(car (sort all-my-addresses ip-pref-less?))))))
(define (get-all-ips-sorted)
(sort (get-all-ips) ip-pref-less?))
(define (get-all-ips)
(map address-info-host
(filter (lambda (x)
(equal? (address-info-type x) "tcp"))
(address-infos (get-host-name)))))
)