Megatest

tcp-transportmod.scm at [8a443df8a9]
Login

File tcp-transportmod.scm artifact c233cd9055 part of check-in 8a443df8a9


;;======================================================================
;; Copyright 2017, Matthew Welland.
;; 
;; This file is part of Megatest.
;; 
;;     Megatest is free software: you can redistribute it and/or modify
;;     it under the terms of the GNU General Public License as published by
;;     the Free Software Foundation, either version 3 of the License, or
;;     (at your option) any later version.
;; 
;;     Megatest is distributed in the hope that it will be useful,
;;     but WITHOUT ANY WARRANTY; without even the implied warranty of
;;     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
;;     GNU General Public License for more details.
;; 
;;     You should have received a copy of the GNU General Public License
;;     along with Megatest.  If not, see <http://www.gnu.org/licenses/>.

;;======================================================================

(declare (unit tcp-transportmod))
(declare (uses debugprint))
(declare (uses commonmod))
(declare (uses dbfile))
(declare (uses dbmod))

(use address-info)

(module tcp-transportmod
	*
	
  (import scheme
	  (prefix sqlite3 sqlite3:)
	  chicken
	  data-structures

	  address-info
	  directory-utils
	  extras
	  files
	  hostinfo
	  matchable
	  md5
	  message-digest
	  ports
	  posix
	  regex
	  regex-case
	  s11n
	  srfi-1
	  srfi-18
	  srfi-4
	  srfi-69
	  stack
	  typed-records
	  tcp-server
	  tcp
	  
	  debugprint
	  commonmod
	  dbfile
	  dbmod
	)

;;======================================================================
;; client
;;======================================================================

;; (define keep-age-param (make-parameter 10)) ;; qif file age, if over move to attic

(defstruct tt-conn
  host
  port
  host-port
  dbfname
  server-id
  server-start
  pid
)

(defstruct tt
  ;; client related
  (conns (make-hash-table)) ;; dbfname -> conn

  ;; server related
  (areapath     #f)
  (host         #f)
  (port         #f)
  (conn         #f)
  (cleanup-proc #f)
  (handler      #f) ;; receives data and responds
  (socket       #f)
  (thread       #f)
  (host-port    #f)
  (cmd-thread   #f)
  )

(define (tt:make-remote areapath)
  (make-tt area: areapath))

;; do all the busy work of finding and setting up conn for
;; connecting to a server
;; 
(define (tt:client-connect-to-server ttdat dbfname run-id)
  (let* ((conn (hash-table-ref/default (tt-conns ttdat) dbfname #f)))
    (if conn
	conn ;; we are already connected to the server
	(let* ((sdat (tt:get-current-server-info ttdat dbfname run-id)))
	  (match sdat
	    ((host port start-time server-id pid)
	     (let ((conn (make-tt-conn
			  host: host
			  port: port
			  dbfname: dbfname
			  server-id: server-id
			  server-start: start-time
			  pid: pid)))
	       (hash-table-set! (tt-conns ttdat) dbfname conn)
	       conn))
	    (else
	     (tt:server-process-run
	      (tt-areapath ttdat)
	      (dbfile:testsuite-name)
	      (common:find-local-megatest)
	      run-id)
	     (thread-sleep! 1)
	     (tt:client-connect-to-server ttdat dbfname run-id)))))))

;; client side handler
;;
(define (tt:handler ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe)
  ;; NOTE: areapath is passed in and in tt struct. We'll use passed in value for now.
  (let* ((conn (tt:client-connect-to-server ttdat dbfname run-id))) ;; (hash-table-ref/default (tt-conns ttdat) dbfname #f)))
    (if conn
	;; have connection, call the server
	(let* ((res (tt:send-receive ttdat conn cmd run-id params)))
	  (cond
	   ((member res '(busy starting))
	    (thread-sleep! 1)
	    (tt:handler  ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe))
	   (else
	    res)))
	(begin
	  (thread-sleep! 1) ;; give it a rest and try again
	  (tt:handler ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe)))))

	;; no conn yet, find and or start and find a server
;; 	(let* ((server (tt:find-server ttdat dbfname)))
;; 	  (if server
;; 	      (let* ((conn (tt:client-connect-to-server server)))
;; 		(hash-table-set! (tt-conns ttdat) dbfname conn)
;; 		(tt:handler  ttdat cmd run-id params attemptnum area-dat areapath readonly-mode
;; 			     dbfname testsuite mtexe))
;; 	      ;; no server, try to start a server process
;; 	      (begin
;; 		(tt:server-process-run areapath testsuite mtexe run-id) ;;  #!key (profile-mode "")) 
;; 		(thread-sleep! 1)
;; 		(tt:handler  ttdat cmd run-id params attemptnum area-dat areapath
;; 			     readonly-mode dbfname testsuite mtexe)))))))

(define (tt:bid-for-servership run-id)
  #f)

(define (tt:get-current-server-info ttdat dbfname run-id)
  (let* ((sfiles (tt:find-server ttdat dbfname)))
    (case (length sfiles)
      ((0) #f) ;; no server around
      ((1) (tt:server-get-info (car sfiles)))
      (else #f) ;; we'll want to wait until extra servers have exited
      )))

(define (tt:send-receive ttdat conn cmd run-id params)
  (let* ((host-port (conc (tt-conn-host conn)":"(tt-conn-port conn)))
	 (dat       (list cmd run-id params)))
    (let-values (((inp oup)(tcp-connect host-port)))
      (let ((res (if (and inp oup)
		     (begin
		       (serialize dat oup)
		       (close-output-port oup)
		       (deserialize inp))
		     (begin
		       (debug:print 0 *default-log-port* "ERROR: send called but no receiver has been setup. Please call setup first!")
		       #f))))
	(close-input-port inp)
	;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP
	res))))

;;======================================================================
;; server
;;======================================================================

(define (tt:sync-dbs ttdat)
  #f)

;; start the listener and start responding to requests
;;
;; NOTE: organise by dbfname, not run-id so we don't need
;;       to pull in more modules
;;
(define (tt:start-server areapath run-id dbfname handler)
  ;; is there already a server for this dbfile? Then exit.
  (let* ((ttdat  (make-tt areapath: areapath))
	 (servers (tt:find-server ttdat dbfname)))
    (tt-handler-set! ttdat handler)
    (if (null? servers)
	(let* ((dbstruct (dbmod:open-dbmoddb areapath run-id (dbfile:db-init-proc))))
	  (tt:start-tcp-server ttdat) ;; start the tcp-server which applies handler to incoming data
	  (tt:keep-running ttdat dbfname handler))
	(begin
	  (debug:print 0 *default-log-port* "INFO: found server(s) already running for db "dbfname", "(string-intersperse servers ",")" Exiting.")
	  (exit)))))

(define (tt:keep-running ttdat dbfile)
  ;; verfiy conn for ready
  ;; listener socket has been started by this stage
  (debug:print 0 *default-log-port* "INFO: Got here!!!!"))

;; ;; given an already set up uconn start the cmd-loop
;; ;;
;; (define (tt:cmd-loop ttdat)
;;   (let* ((serv-listener (-socket uconn))
;; 	 (listener      (lambda ()
;; 			  (let loop ((state 'start))
;; 			    (let-values (((inp oup)(tcp-accept serv-listener)))
;; 			      ;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP
;; 			      (let* ((rdat  (deserialize inp)) ;; '(my-host-port qrykey cmd params)
;; 				     (resp  (ulex-handler uconn rdat)))
;; 				(serialize resp oup)
;; 				(close-input-port inp)
;; 				(close-output-port oup)
;; 				;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP
;; 				)
;; 			      (loop state))))))
;;     ;; start N of them
;;     (let loop ((thnum   0)
;; 	       (threads '()))
;;       (if (< thnum 100)
;; 	  (let* ((th (make-thread listener (conc "listener" thnum))))
;; 	    (thread-start! th)
;; 	    (loop (+ thnum 1)
;; 		  (cons th threads)))
;; 	  (map thread-join! threads)))))
;; 
;; 
;; 
;; (define (wait-and-close uconn)
;;   (thread-join! (udat-cmd-thread uconn))
;;   (tcp-close (udat-socket uconn)))
;; 
;; 

(define (tt:shutdown-server ttdat)
  (let* ((cleanproc (tt-cleanup-proc ttdat)))
    (if cleanproc (cleanproc))
    (tcp-close (tt-socket ttdat)) ;; close up ports here
    ))

;; (define (wait-and-close uconn)
;;   (thread-join! (tt-cmd-thread uconn))
;;   (tcp-close (tt-socket uconn)))

;; return servid
;; side-effects:
;;   ttdat-cleanup-proc is populated with function to remove the serverinfo file
(define (tt:create-server-registration-file ttdat dbfname)
  (let* ((areapath (tt-areapath ttdat))
	 (servdir  (tt:get-servinfo-dir areapath))
	 (conn     (hash-table-ref/default (tt-conns ttdat) dbfname #f)))
    (assert conn "FATAL: tt:create-server-registration-file called with no conn, dbfname="dbfname)
    (let* ((host    (tt-conn-host conn))
	   (port    (tt-conn-port conn))
	   (servinf (conc servdir"/"host":"port"-"(current-process-id)":"dbfname))
	   (serv-id (tt:mk-signature areapath))
	   (clean-proc (lambda ()
			 (delete-file* servinf))))
      (tt-cleanup-proc-set! ttdat clean-proc)
      (with-output-to-file servinf
	(lambda ()
	  (print "SERVER STARTED: "host":"port" AT "(current-seconds)" server-id: "serv-id" pid: "(current-process-id)" dbfname: "dbfname)))
      serv-id)))

;; find valid server
;; get servers listed, last part of name must match :<dbfname>
;; if more than one, wait one second and look again
;; future: ping oldest, if alive remove other :<dbfname> files
;;
(define (tt:find-server ttdat dbfname)
  (let* ((areapath (tt-areapath ttdat))
	 (servdir  (tt:get-servinfo-dir areapath))
	 (sfiles   (glob (conc servdir"/*:"dbfname))))
    sfiles))

;; given a path to a server info file return: host port startseconds server-id
;; example of what it's looking for in the log file:
;;     SERVER STARTED: 10.38.175.67:50216 AT 1616502350.0 server-id: 4907e90fc55c7a09694e3f658c639cf4 
;;
(define (tt:server-get-info logf)
  (let ((server-rx    (regexp "^SERVER STARTED: (\\S+):(\\d+) AT ([\\d\\.]+) server-id: (\\S+) pid: (\\d+)")) ;; SERVER STARTED: host:port AT timesecs server id
        (dbprep-rx    (regexp "^SERVER: dbprep"))
        (dbprep-found 0)
	(bad-dat      (list #f #f #f #f #f)))
    (handle-exceptions
     exn
     (begin
       ;; WARNING: this is potentially dangerous to blanket ignore the errors
       (if (file-exists? logf)
	   (debug:print-info 2 *default-log-port* "Unable to get server info from "logf", exn=" exn))
       bad-dat) ;; no idea what went wrong, call it a bad server
     (with-input-from-file
	 logf
       (lambda ()
	 (let loop ((inl  (read-line))
		    (lnum 0))
	   (if (not (eof-object? inl))
	       (let ((mlst (string-match server-rx inl))
		     (dbprep (string-match dbprep-rx inl)))
		 (if dbprep (set! dbprep-found 1))
		 (if (not mlst)
		     (if (< lnum 500) ;; give up if more than 500 lines of server log read
			 (loop (read-line)(+ lnum 1))
			 (begin 
                           (debug:print-info 0 *default-log-port* "Unable to get server info from first 500 lines of " logf )
                           bad-dat))
		     (match mlst
			    ((_ host port start server-id pid)
			     (list host
				   (string->number port)
				   (string->number start)
				   server-id
				   (string->number pid)))
			    (else
			     (debug:print 0 *default-log-port* "ERROR: did not recognise SERVER line info "mlst)
			     bad-dat))))
	       (begin 
		 (if dbprep-found
		     (begin
		       (debug:print-info 2 *default-log-port* "Server is in dbprep at " (common:human-time))
		       (thread-sleep! 0.5)) ;; was 25 sec but that blocked things from starting?
		     (debug:print-info 0 *default-log-port* "Unable to get server info from " logf " at " (seconds->time-string (current-seconds))))
		 bad-dat))))))))

;; Given an area path,  start a server process    ### NOTE ### > file 2>&1 
;; if the target-host is set 
;; try running on that host
;;   incidental: rotate logs in logs/ dir.
;;
(define  (tt:server-process-run areapath testsuite mtexe run-id #!key (profile-mode "")) ;; areapath is *toppath* for a given testsuite area
  (let* ((logfile   (conc areapath "/logs/server.log")) ;; -" curr-pid "-" target-host ".log"))
	 (cmdln     (conc
		     mtexe
		     " -server - ";; (or target-host "-")
		     " -m testsuite:" testsuite
		     " -run-id " (or run-id "main")
		     " -db "  (dbmod:run-id->dbfname run-id)
		     " " profile-mode
		     ))) ;; (conc " >> " logfile " 2>&1 &")))))
    ;; we want the remote server to start in *toppath* so push there
    (push-directory areapath)
    (debug:print 0 *default-log-port* "INFO: Trying to start server in tcp mode (" cmdln ") ...")
    (debug:print 0 *default-log-port* "INFO: starting server at " (common:human-time))
    (system (conc "nbfake " cmdln))
    (pop-directory)))

;;======================================================================
;; tcp connection stuff
;;======================================================================

;; create a tcp listener and return a populated udat struct with
;; my port, address, hostname, pid etc.
;; return #f if fail to find a port to allocate.
;;
;;  if udata-in is #f create the record
;;  if there is already a serv-listener return the udata
;;
(define (setup-listener uconn #!optional (port 4242))
  (assert (tt? uconn) "FATAL: setup-listener called with wrong struct "uconn)
  (handle-exceptions
   exn
   (if (< port 65535)
       (setup-listener uconn (+ port 1))
       #f)
   (connect-listener uconn port)))

;; find a port and start tcp-server
;;
(define (tt:start-tcp-server ttdat)
  (setup-listener ttdat)
  (let* ((socket   (tt-socket ttdat))
	 (handler  (tt-handler    ttdat)))
    ((make-tcp-server socket handler)
     #t ;; yes, send error messages to std-err
     )))

(define (connect-listener uconn port)
  ;; (tcp-listener-socket LISTENER)(socket-name so)
  ;; sockaddr-address, sockaddr-port, sockaddr->string
  (let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]])
	 (addr  (tt:get-best-guess-address (get-host-name)))) ;; (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname)))
    (tt-port-set!      uconn port)
    (tt-host-port-set! uconn (conc addr":"port))
    (tt-socket-set!    uconn tlsn)
    uconn))



;;======================================================================
;; utils
;;======================================================================

;; Generate a unique signature for this server
(define (tt:mk-signature areapath)
  (message-digest-string (md5-primitive) 
			 (with-output-to-string
			   (lambda ()
			     (write (list areapath
                                          (current-process-id)
					  (argv)))))))


(define (tt:get-best-guess-address hostname)
  (let ((res #f))
    (for-each 
     (lambda (adr)
       (if (not (eq? (u8vector-ref adr 0) 127))
	   (set! res adr)))
     ;; NOTE: This can fail when there is no mention of the host in /etc/hosts. FIXME
     (vector->list (hostinfo-addresses (hostname->hostinfo hostname))))
    (string-intersperse 
     (map number->string
	  (u8vector->list
	   (if res res (hostname->ip hostname)))) ".")))

(define (tt:get-servinfo-dir areapath)
  (let* ((spath (conc areapath"/.servinfo")))
    (if (not (file-exists? spath))
	(create-directory spath #t))
    spath))

;;======================================================================
;; network utilities
;;======================================================================

;; NOTE: Look at address-info egg as alternative to some of this

(define (rate-ip ipaddr)
  (regex-case ipaddr
    ( "^127\\..*" _ 0 )
    ( "^(10\\.0|192\\.168)\\..*" _ 1 )
    ( else 2 ) ))

;; Change this to bias for addresses with a reasonable broadcast value?
;;
(define (ip-pref-less? a b)
  (> (rate-ip a) (rate-ip b)))

(define (get-my-best-address)
  (let ((all-my-addresses (get-all-ips)))
    (cond
     ((null? all-my-addresses)
      (get-host-name))                                          ;; no interfaces?
     ((eq? (length all-my-addresses) 1)
      (car all-my-addresses))                      ;; only one to choose from, just go with it
     (else
      (car (sort all-my-addresses ip-pref-less?))))))

(define (get-all-ips-sorted)
  (sort (get-all-ips) ip-pref-less?))

(define (get-all-ips)
  (map address-info-host
       (filter (lambda (x)
		 (equal? (address-info-type x) "tcp"))
	       (address-infos (get-host-name)))))

)