;;======================================================================
;; Copyright 2017, Matthew Welland.
;;
;; This file is part of Megatest.
;;
;; Megatest is free software: you can redistribute it and/or modify
;; it under the terms of the GNU General Public License as published by
;; the Free Software Foundation, either version 3 of the License, or
;; (at your option) any later version.
;;
;; Megatest is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
;; GNU General Public License for more details.
;;
;; You should have received a copy of the GNU General Public License
;; along with Megatest. If not, see <http://www.gnu.org/licenses/>.
;;======================================================================
(declare (unit tcp-transportmod))
(declare (uses debugprint))
(declare (uses commonmod))
(declare (uses dbfile))
(declare (uses dbmod))
(use address-info)
(module tcp-transportmod
*
(import scheme
(prefix sqlite3 sqlite3:)
chicken
data-structures
address-info
directory-utils
extras
files
hostinfo
matchable
md5
message-digest
ports
posix
regex
regex-case
s11n
srfi-1
srfi-18
srfi-4
srfi-69
stack
typed-records
tcp-server
tcp
debugprint
commonmod
dbfile
dbmod
)
;;======================================================================
;; client
;;======================================================================
;; (define keep-age-param (make-parameter 10)) ;; qif file age, if over move to attic
;; Used ONLY for client
;;
(defstruct tt-conn
host
port
host-port
dbfname
server-id
server-start
pid
)
;; Used for BOTH clients and servers
(defstruct tt
;; client related
(conns (make-hash-table)) ;; dbfname -> conn
;; server related
(areapath #f)
(host #f)
(port #f)
(conn #f)
(cleanup-proc #f)
(handler #f) ;; receives data and responds
(socket #f)
(thread #f)
(host-port #f)
(cmd-thread #f)
(ro-mode #f)
(ro-mode-checked #f)
(last-access (current-seconds))
)
(define (tt:make-remote areapath)
(make-tt areapath: areapath))
;; do all the busy work of finding and setting up conn for
;; connecting to a server
;;
(define (tt:client-connect-to-server ttdat dbfname run-id )
(let* ((conn (hash-table-ref/default (tt-conns ttdat) dbfname #f))
(server-start-proc (lambda ()
(tt:server-process-run
(tt-areapath ttdat)
(dbfile:testsuite-name)
(common:find-local-megatest)
run-id))))
(if conn
conn ;; we are already connected to the server
(let* ((sdat (tt:get-current-server-info ttdat dbfname run-id)))
(match sdat
((host port start-time server-id pid dbfname2)
(assert (equal? dbfname dbfname2) "FATAL: read server info from wrong file.")
(let* ((host-port (conc host":"port))
(conn (make-tt-conn
host: host
port: port
host-port: host-port
dbfname: dbfname
server-id: server-id
server-start: start-time
pid: pid)))
(hash-table-set! (tt-conns ttdat) dbfname conn)
;; verify we can talk to this server
(if (tt:ping host port server-id)
conn
(begin
;; rm the (last server) would go here
(server-start-proc)
(thread-sleep! 1)
(tt:client-connect-to-server ttdat dbfname run-id)))))
(else
(server-start-proc)
(thread-sleep! 1)
(tt:client-connect-to-server ttdat dbfname run-id)))))))
(define (tt:ping host port server-id)
(let* ((res (tt:send-receive-direct host port `(ping #f #f #f)))) ;; please send me your server-id
;;
;; need two threads, one a 5 second timer
;;
(match res
((status errmsg result meta)
(if (equal? result server-id)
#t ;; then we are good
(begin
(debug:print 0 *default-log-port* "WARNING: server-id does not match, expected: "server-id", got: "result)
#f)))
(else
(debug:print 0 *default-log-port* "res not in form (status errmsg resutl meta), got: "res)
#f))))
;; client side handler
;;
(define (tt:handler ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe)
;; NOTE: areapath is passed in and in tt struct. We'll use passed in value for now.
(let* ((conn (tt:client-connect-to-server ttdat dbfname run-id))) ;; (hash-table-ref/default (tt-conns ttdat) dbfname #f)))
(if conn
;; have connection, call the server
(let* ((res (tt:send-receive ttdat conn cmd run-id params)))
;; res is (status errmsg result meta)
(match res
((status errmsg result meta)
(case status
((busy)
(debug:print 0 *default-log-port* "WARNING: server is overloaded, will try again in few seconds.")
(thread-sleep! 2)
(tt:handler ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe))
((loaded)
(debug:print 0 *default-log-port* "WARNING: server is loaded, will try again in a second.")
(thread-sleep! 1)
(tt:handler ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe))
(else
result)))
(else
(if (not res)
(begin ;; server likely died
(hash-table-set! (tt-conns ttdat) dbfname #f)
(debug:print 0 *default-log-port* "INFO: connection to server broken, reconnecting.")
(tt:handler ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe))
(assert #f "FATAL: tt:handler received bad data "res)))))
(begin
(thread-sleep! 1) ;; give it a rest and try again
(tt:handler ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe)))))
;; no conn yet, find and or start and find a server
;; (let* ((server (tt:find-server ttdat dbfname)))
;; (if server
;; (let* ((conn (tt:client-connect-to-server server)))
;; (hash-table-set! (tt-conns ttdat) dbfname conn)
;; (tt:handler ttdat cmd run-id params attemptnum area-dat areapath readonly-mode
;; dbfname testsuite mtexe))
;; ;; no server, try to start a server process
;; (begin
;; (tt:server-process-run areapath testsuite mtexe run-id) ;; #!key (profile-mode ""))
;; (thread-sleep! 1)
;; (tt:handler ttdat cmd run-id params attemptnum area-dat areapath
;; readonly-mode dbfname testsuite mtexe)))))))
(define (tt:bid-for-servership run-id)
#f)
(define (tt:get-current-server-info ttdat dbfname run-id)
(assert (tt-areapath ttdat) "FATAL: areapath not set in ttdat.")
(let* ((areapath (tt-areapath ttdat))
(sfiles (tt:find-server areapath dbfname))
(sdats (filter car (map tt:server-get-info sfiles))) ;; first element is #f if the file disappeared while being read
(sorted (sort sdats (lambda (a b)
(< (list-ref a 2)(list-ref b 2))))))
(if (null? sorted)
#f ;; we'll want to wait until extra servers have exited
(car sorted))))
(define (tt:send-receive ttdat conn cmd run-id params)
(let* ((host-port (tt-conn-host-port conn)) ;; (conc (tt-conn-host conn)":"(tt-conn-port conn)))
(host (tt-conn-host conn))
(port (tt-conn-port conn))
(dat (list cmd run-id params #f))) ;; no meta data yet
(tt:send-receive-direct host port dat)))
(define (tt:send-receive-direct host port dat)
(assert (number? port) "FATAL: tt:send-receive-direct called with port not a number "port)
(handle-exceptions
exn
#f ;; Add condition-case or better handling here
(let-values (((inp oup)(tcp-connect host port)))
(let ((res (if (and inp oup)
(begin
(serialize dat oup)
(close-output-port oup)
(deserialize inp))
)))
(close-input-port inp)
res))))
;;======================================================================
;; server
;;======================================================================
(define (tt:sync-dbs ttdat)
#f)
;; start the listener and start responding to requests
;;
;; NOTE: organise by dbfname, not run-id so we don't need
;; to pull in more modules
;;
;; This is the routine called in megatest.scm to start a server
;;
(define (tt:start-server areapath run-id dbfname handler keys)
(assert areapath "FATAL: areapath not provided for tt:start-server")
;; is there already a server for this dbfile? Then exit.
(let* ((ttdat (make-tt areapath: areapath))
(servers (tt:find-server areapath dbfname))) ;; should use tt:get-current-server-info instead
(if (null? servers)
(let* ((dbstruct (dbmod:open-dbmoddb areapath run-id (dbfile:db-init-proc) keys)))
(tt-handler-set! ttdat (handler dbstruct))
(let* ((tcp-thread (make-thread
(lambda ()
(tt:start-tcp-server ttdat)) ;; start the tcp-server which applies handler to incoming data
"tcp-server-thread"))
(run-thread (make-thread
(lambda ()
(tt:keep-running ttdat dbfname)))))
(thread-start! tcp-thread)
(thread-start! run-thread)
(thread-join! run-thread) ;; run thread will exit on timeout or other conditions
;;
;; set a flag here to tell tcp-thread to stop running
;;
;; (thread-join! tcp-thread) ;; can't wait
;;
;; remove the servinfo file
;;
;; close the database, remove lock in on-disk db
;;
;; close the listener ports
;;
(exit)))
(begin
(debug:print 0 *default-log-port* "INFO: found server(s) already running for db "dbfname", "(string-intersperse servers ",")" Exiting.")
(exit)))))
(define (tt:keep-running ttdat dbfname)
;; verfiy conn for ready
;; listener socket has been started by this stage
(thread-sleep! 1)
(let loop ((count 0))
(if (> count 60)
(begin
(debug:print 0 *default-log-port* "FATAL: Could not start a tcp server, giving up.")
(exit 1))
(if (not (tt-port ttdat)) ;; no connection yet
(begin
(thread-sleep! 1)
(loop (+ count 1))))))
(tt:create-server-registration-file ttdat dbfname)
;; now start watching the last-access, if it hasn't been touched
;; in over ten seconds we exit
(let loop ()
(if (< (- (current-seconds) (tt-last-access ttdat)) 60)
(begin
(thread-sleep! 2)
(loop))))
(if (tt-cleanup-proc ttdat)
((tt-cleanup-proc ttdat)))
(debug:print 0 *default-log-port* "INFO: Server timed out, exiting."))
;; ;; given an already set up uconn start the cmd-loop
;; ;;
;; (define (tt:cmd-loop ttdat)
;; (let* ((serv-listener (-socket uconn))
;; (listener (lambda ()
;; (let loop ((state 'start))
;; (let-values (((inp oup)(tcp-accept serv-listener)))
;; ;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP
;; (let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params)
;; (resp (ulex-handler uconn rdat)))
;; (serialize resp oup)
;; (close-input-port inp)
;; (close-output-port oup)
;; ;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP
;; )
;; (loop state))))))
;; ;; start N of them
;; (let loop ((thnum 0)
;; (threads '()))
;; (if (< thnum 100)
;; (let* ((th (make-thread listener (conc "listener" thnum))))
;; (thread-start! th)
;; (loop (+ thnum 1)
;; (cons th threads)))
;; (map thread-join! threads)))))
;;
;;
;;
;; (define (wait-and-close uconn)
;; (thread-join! (udat-cmd-thread uconn))
;; (tcp-close (udat-socket uconn)))
;;
;;
(define (tt:shutdown-server ttdat)
(let* ((cleanproc (tt-cleanup-proc ttdat)))
(if cleanproc (cleanproc))
(tcp-close (tt-socket ttdat)) ;; close up ports here
))
;; (define (wait-and-close uconn)
;; (thread-join! (tt-cmd-thread uconn))
;; (tcp-close (tt-socket uconn)))
;; return servid
;; side-effects:
;; ttdat-cleanup-proc is populated with function to remove the serverinfo file
(define (tt:create-server-registration-file ttdat dbfname)
(let* ((areapath (tt-areapath ttdat))
(servdir (tt:get-servinfo-dir areapath))
(host (tt-host ttdat))
(port (tt-port ttdat))
(servinf (conc servdir"/"host":"port"-"(current-process-id)":"dbfname))
(serv-id (tt:mk-signature areapath))
(clean-proc (lambda ()
(delete-file* servinf))))
(assert (and host port) "FATAL: tt:create-server-registration-file called with no conn, dbfname="dbfname)
(tt-cleanup-proc-set! ttdat clean-proc)
(with-output-to-file servinf
(lambda ()
(print "SERVER STARTED: "host":"port" AT "(current-seconds)" server-id: "serv-id" pid: "(current-process-id)" dbfname: "dbfname)))
serv-id))
;; find valid server
;; get servers listed, last part of name must match :<dbfname>
;; if more than one, wait one second and look again
;; future: ping oldest, if alive remove other :<dbfname> files
;;
(define (tt:find-server areapath dbfname)
(let* ((servdir (tt:get-servinfo-dir areapath))
(sfiles (glob (conc servdir"/*:"dbfname))))
sfiles))
;; given a path to a server info file return: host port startseconds server-id
;; example of what it's looking for in the log file:
;; SERVER STARTED: 10.38.175.67:50216 AT 1616502350.0 server-id: 4907e90fc55c7a09694e3f658c639cf4
;;
(define (tt:server-get-info logf)
(let ((server-rx (regexp "^SERVER STARTED: (\\S+):(\\d+) AT ([\\d\\.]+) server-id: (\\S+) pid: (\\d+) dbfname: (\\S+)")) ;; SERVER STARTED: host:port AT timesecs server id
(dbprep-rx (regexp "^SERVER: dbprep"))
(dbprep-found 0)
(bad-dat (list #f #f #f #f #f #f)))
(handle-exceptions
exn
(begin
;; WARNING: this is potentially dangerous to blanket ignore the errors
(if (file-exists? logf)
(debug:print-info 2 *default-log-port* "Unable to get server info from "logf", exn=" exn))
bad-dat) ;; no idea what went wrong, call it a bad server
(with-input-from-file
logf
(lambda ()
(let loop ((inl (read-line))
(lnum 0))
(if (not (eof-object? inl))
(let ((mlst (string-match server-rx inl))
(dbprep (string-match dbprep-rx inl)))
(if dbprep (set! dbprep-found 1))
(if (not mlst)
(if (< lnum 500) ;; give up if more than 500 lines of server log read
(loop (read-line)(+ lnum 1))
(begin
(debug:print-info 0 *default-log-port* "Unable to get server info from first 500 lines of " logf )
bad-dat))
(match mlst
((_ host port start server-id pid dbfname)
(list host
(string->number port)
(string->number start)
server-id
(string->number pid)
dbfname))
(else
(debug:print 0 *default-log-port* "ERROR: did not recognise SERVER line info "mlst)
bad-dat))))
(begin
(if dbprep-found
(begin
(debug:print-info 2 *default-log-port* "Server is in dbprep at " (common:human-time))
(thread-sleep! 0.5)) ;; was 25 sec but that blocked things from starting?
(debug:print-info 0 *default-log-port* "Unable to get server info from " logf " at " (seconds->time-string (current-seconds))))
bad-dat))))))))
;; Given an area path, start a server process ### NOTE ### > file 2>&1
;; if the target-host is set
;; try running on that host
;; incidental: rotate logs in logs/ dir.
;;
(define (tt:server-process-run areapath testsuite mtexe run-id #!key (profile-mode "")) ;; areapath is *toppath* for a given testsuite area
(let* ((logfile (conc areapath "/logs/server.log")) ;; -" curr-pid "-" target-host ".log"))
(cmdln (conc
mtexe
" -server - ";; (or target-host "-")
" -m testsuite:" testsuite
" -run-id " (or run-id "main")
" -db " (dbmod:run-id->dbfname run-id)
" " profile-mode
))) ;; (conc " >> " logfile " 2>&1 &")))))
;; we want the remote server to start in *toppath* so push there
(push-directory areapath)
(debug:print 0 *default-log-port* "INFO: Trying to start server in tcp mode (" cmdln ") ...")
(debug:print 0 *default-log-port* "INFO: starting server at " (common:human-time))
(system (conc "nbfake " cmdln))
(pop-directory)))
;;======================================================================
;; tcp connection stuff
;;======================================================================
;; find a port and start tcp-server. This only starts the tcp portion of
;; the server, look at (tt:start-server ...) above for the entry point
;; for the entire server system
;;
(define (tt:start-tcp-server ttdat)
(setup-listener ttdat)
(let* ((socket (tt-socket ttdat))
(handler (tt-handler ttdat)))
((make-tcp-server socket handler)
#t ;; yes, send error messages to std-err
)))
;; create a tcp listener and return a populated udat struct with
;; my port, address, hostname, pid etc.
;; return #f if fail to find a port to allocate.
;;
;; if udata-in is #f create the record
;; if there is already a serv-listener return the udata
;;
(define (setup-listener uconn #!optional (port 4242))
(assert (tt? uconn) "FATAL: setup-listener called with wrong struct "uconn)
(handle-exceptions
exn
(if (< port 65535)
(setup-listener uconn (+ port 1))
#f)
(connect-listener uconn port)))
(define (connect-listener uconn port)
;; (tcp-listener-socket LISTENER)(socket-name so)
;; sockaddr-address, sockaddr-port, sockaddr->string
(let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]])
(addr (tt:get-best-guess-address (get-host-name)))) ;; (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname)))
(tt-port-set! uconn port)
(tt-host-set! uconn addr)
(tt-host-port-set! uconn (conc addr":"port))
(tt-socket-set! uconn tlsn)
uconn))
;;======================================================================
;; utils
;;======================================================================
;; Generate a unique signature for this server
(define (tt:mk-signature areapath)
(message-digest-string (md5-primitive)
(with-output-to-string
(lambda ()
(write (list areapath
(current-process-id)
(argv)))))))
(define (tt:get-best-guess-address hostname)
(let ((res #f))
(for-each
(lambda (adr)
(if (not (eq? (u8vector-ref adr 0) 127))
(set! res adr)))
;; NOTE: This can fail when there is no mention of the host in /etc/hosts. FIXME
(vector->list (hostinfo-addresses (hostname->hostinfo hostname))))
(string-intersperse
(map number->string
(u8vector->list
(if res res (hostname->ip hostname)))) ".")))
(define (tt:get-servinfo-dir areapath)
(let* ((spath (conc areapath"/.servinfo")))
(if (not (file-exists? spath))
(create-directory spath #t))
spath))
;;======================================================================
;; network utilities
;;======================================================================
;; NOTE: Look at address-info egg as alternative to some of this
(define (rate-ip ipaddr)
(regex-case ipaddr
( "^127\\..*" _ 0 )
( "^(10\\.0|192\\.168)\\..*" _ 1 )
( else 2 ) ))
;; Change this to bias for addresses with a reasonable broadcast value?
;;
(define (ip-pref-less? a b)
(> (rate-ip a) (rate-ip b)))
(define (get-my-best-address)
(let ((all-my-addresses (get-all-ips)))
(cond
((null? all-my-addresses)
(get-host-name)) ;; no interfaces?
((eq? (length all-my-addresses) 1)
(car all-my-addresses)) ;; only one to choose from, just go with it
(else
(car (sort all-my-addresses ip-pref-less?))))))
(define (get-all-ips-sorted)
(sort (get-all-ips) ip-pref-less?))
(define (get-all-ips)
(map address-info-host
(filter (lambda (x)
(equal? (address-info-type x) "tcp"))
(address-infos (get-host-name)))))
)