Index: rmtmod.scm ================================================================== --- rmtmod.scm +++ rmtmod.scm @@ -16,10 +16,14 @@ ;; You should have received a copy of the GNU General Public License ;; along with Megatest. If not, see . ;;====================================================================== +;; generate entries for ~/.megatestrc with the following +;; +;; grep define ../rmt.scm | grep rmt: |perl -pi -e 's/\(define\s+\((\S+)\W.*$/\1/'|sort -u + (declare (unit rmtmod)) (declare (uses apimod)) (declare (uses commonmod)) (declare (uses configfmod)) @@ -101,69 +105,47 @@ tasksmod ulex ) -(defstruct alldat - (areapath #f) - (ulexdat #f) - ) - - -;; (require-extension (srfi 18) extras tcp s11n) -;; -;; -;; (use srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest posix-extras) -;; -;; (use spiffy uri-common intarweb http-client spiffy-request-vars intarweb spiffy-directory-listing) -;; ;; Configurations for server ;; (tcp-buffer-size 2048) ;; (max-connections 2048) -;; info about me as a server +;; info about me as a listener and my connections to db servers +;; stored (for now) in *db-serv-info* ;; (defstruct servdat (host #f) (port #f) (uuid #f) - (rep #f) (dbfile #f) - (api-url #f) - (api-uri #f) - (api-req #f) (uconn #f) ;; this is the listener (mode #f) (status 'starting) (trynum 0) ;; count the number of ports we've tried + (conns (make-hash-table)) ;; apath/dbname => conndat ) + +(define *db-serv-info* (make-servdat)) (define (servdat->url sdat) (conc (servdat-host sdat)":"(servdat-port sdat))) -;; generate entries for ~/.megatestrc with the following +;; db servers contact info ;; -;; grep define ../rmt.scm | grep rmt: |perl -pi -e 's/\(define\s+\((\S+)\W.*$/\1/'|sort -u - -(defstruct remotedat - (conns (make-hash-table)) ;; apath/dbname => conndat - ) - (defstruct conndat (apath #f) (dbname #f) (fullname #f) (hostport #f) (ipaddr #f) (port #f) - (socket #f) (srvpkt #f) (srvkey #f) (lastmsg 0) - (expires 0) - (inport #f) - (outport #f)) + (expires 0)) (define *srvpktspec* `((server (host . h) (port . p) (servkey . k) @@ -173,18 +155,10 @@ ;;====================================================================== ;; S U P P O R T F U N C T I O N S ;;====================================================================== -;; replaces *runremote* -(define *remotedat* (make-remotedat)) - -;; -> http://abc.com:900/ -;; -(define (conndat->uri conn entrypoint) - (conc "http://"(conndat-ipaddr conn)":"(conndat-port conn)"/"entrypoint)) - ;; set up the api proc, seems like there should be a better place for this? ;; ;; IS THIS NEEDED ANYMORE? TODO - REMOVE IF POSSIBLE ;; (define api-proc (make-parameter conc)) @@ -197,11 +171,11 @@ ;; ;; if that fails, return '(#f "some reason") ;; NB// convert to raising an exception ;; (define (rmt:get-conn remdat apath dbname) (let* ((fullname (db:dbname->path apath dbname)) - (conn (hash-table-ref/default (remotedat-conns remdat) fullname #f))) + (conn (hash-table-ref/default (servdat-conns remdat) fullname #f))) (if (and conn (< (current-seconds) (conndat-expires conn))) conn #f))) @@ -223,34 +197,32 @@ ;; ;; TODO: This is unnecessarily re-creating the record in the hash table ;; (define (rmt:open-main-connection remdat apath) (let* ((fullpath (db:dbname->path apath "/.db/main.db")) - (conns (remotedat-conns remdat)) + (conns (servdat-conns remdat)) (conn (hash-table-ref/default conns fullpath #f)) ;; TODO - create call for this - (myconn (if *server-info* - (servdat-uconn *server-info*) + (myconn (if *db-serv-info* + (servdat-uconn *db-serv-info*) (let* ((th1 (make-thread (lambda ()(rmt:run (get-host-name))) "non-db mode server"))) (thread-start! th1) (let loop ((count 0)) (assert (< count 30) "FATAL: responder failed to initialize in rmt:open-main-connection") - (if (not *server-info*) + (if (not *db-serv-info*) (begin (thread-sleep! 1) (loop (+ count 1))) (begin - (servdat-mode-set! *server-info* 'non-db) - (servdat-uconn *server-info*)))))))) + (servdat-mode-set! *db-serv-info* 'non-db) + (servdat-uconn *db-serv-info*)))))))) (cond ((and conn ;; conn is NOT a socket, just saying ... (< (current-seconds) (conndat-expires conn))) #t) ;; we are current and good to go - we'll deal elsewhere with a server that was killed or died ((and conn (>= (current-seconds)(conndat-expires conn))) (debug:print-info 0 *default-log-port* "connection to "fullpath" server expired. Reconnecting.") - #;(if (conndat-socket conn) - (nng-close! (conndat-socket conn))) ;; TODO - close the ulex server here? (hash-table-set! conns fullpath #f) ;; clean up (rmt:open-main-connection remdat apath)) (else ;; Below we will find or create and connect to main (let* ((dbname (db:run-id->dbname #f)) @@ -287,18 +259,18 @@ expires: (+ (current-seconds) 60) ;; this needs to be gathered during the ping ))) (hash-table-set! conns fullpath new-the-srv))) #t))))) -;; NB// remdat is a remotedat struct +;; NB// sinfo is a servdat struct ;; -(define (rmt:general-open-connection remdat apath dbname #!key (num-tries 5)) +(define (rmt:general-open-connection sinfo apath dbname #!key (num-tries 5)) (assert (not (equal? dbname ".db/main.db")) "ERROR: general-open-connection should never be called with main as the db") (let* ((mdbname (db:run-id->dbname #f)) (fullname (db:dbname->path apath dbname)) - (conns (remotedat-conns remdat)) - (mconn (rmt:get-conn remdat apath mdbname))) + (conns (servdat-conns sinfo)) + (mconn (rmt:get-conn sinfo apath mdbname))) (if (and mconn (not (debug:print-logger))) (begin (debug:print-info 0 *default-log-port* "Turning on logging to main, look in logs dir for main log.") (debug:print-logger rmt:log-to-main))) @@ -306,22 +278,21 @@ ((or (not mconn) ;; no channel open to main? (< (conndat-expires mconn)(+ (current-seconds) 2))) ;; restablish connection if less than 2 seconds on the lease (if mconn ;; previously opened - clean up NB// consolidate this with the similar code in open main above (begin (debug:print-info 0 *default-log-port* "Clearing out connection to main that has expired.") - ;; (nng-close! (conndat-socket mconn)) ;; TODO - close the ulex server/listener here? (hash-table-set! conns fullname #f))) - (rmt:open-main-connection remdat apath) - (rmt:general-open-connection remdat apath mdbname)) - ((not (rmt:get-conn remdat apath dbname)) ;; no channel open to dbname? - (let* ((res (rmt:send-receive-real remdat apath mdbname 'get-server `(,apath ,dbname)))) + (rmt:open-main-connection sinfo apath) + (rmt:general-open-connection sinfo apath mdbname)) + ((not (rmt:get-conn sinfo apath dbname)) ;; no channel open to dbname? + (let* ((res (rmt:send-receive-real sinfo apath mdbname 'get-server `(,apath ,dbname)))) (case res ((server-started) (if (> num-tries 0) (begin (thread-sleep! 2) - (rmt:general-open-connection remdat apath dbname num-tries: (- num-tries 1))) + (rmt:general-open-connection sinfo apath dbname num-tries: (- num-tries 1))) (begin (debug:print-error 0 *default-log-port* "Failed to start servers needed or open channel to "apath", "dbname) (exit 1)))) (else (if (list? res) ;; server has been registered and the info was returned. pass it on. @@ -365,47 +336,35 @@ ;; Defaults to current area ;; (define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f)) ;; (if (not *remotedat*)(set! *remotedat* (make-remotedat))) (let* ((apath *toppath*) - (remdat *remotedat*) - (conns (remotedat-conns remdat)) ;; just checking that remdat is a remotedat + (sinfo *db-serv-info*) (dbname (db:run-id->dbname rid))) (if *localmode* (let* ((dbdat (dbr:dbstruct-get-dbdat *dbstruct* dbname)) (indat `((cmd . ,cmd)(params . ,params)))) (api:execute-requests *dbstruct* cmd params) ;; (api:process-request *dbstruct* indat) ;; (api:process-request dbdat indat) ) (begin - (rmt:open-main-connection remdat apath) - (if rid (rmt:general-open-connection remdat apath dbname)) - (rmt:send-receive-real remdat apath dbname cmd params))))) - -#;(define (rmt:send-receive-setup conn) - (if (not (conndat-inport conn)) - (let-values (((i o) (tcp-connect (conndat-ipaddr conn) - (conndat-port conn)))) - (conndat-inport-set! conn i) - (conndat-outport-set! conn o)))) - + (rmt:open-main-connection sinfo apath) + (if rid (rmt:general-open-connection sinfo apath dbname)) + (rmt:send-receive-real sinfo apath dbname cmd params))))) + ;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed ;; sometime in the future ;; -(define (rmt:send-receive-real remdat apath dbname cmd params) - (let* ((conn (rmt:get-conn remdat apath dbname))) +(define (rmt:send-receive-real sinfo apath dbname cmd params) + (let* ((conn (rmt:get-conn sinfo apath dbname))) (assert conn "FATAL: rmt:send-receive-real called without the needed channels opened") - (assert (conndat-socket conn) "FATAL: rmt:send-receive-real called without the channel socket opened.") - (let* ((soc (conndat-socket conn)) - (key #f) - (host (conndat-ipaddr conn)) - (port (conndat-port conn)) + (let* ((key #f) (payload `((cmd . ,cmd) (key . ,(conndat-srvkey conn)) (params . ,params))) - (res (send-receive soc payload))) + (res (send-receive conn cmd payload))) (if (member res '("#")) ;; TODO - fix this in string->sexpr #f (string->sexpr res))))) ;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed @@ -412,18 +371,10 @@ ;; sometime in the future. ;; ;; Purpose - call the main.db server and request a server be started ;; for the given area path and dbname ;; -;; (define (rmt:send-receive-server-start remdat apath dbname) -;; (let* ((conn (rmt:get-conn remdat apath dbname))) -;; (assert conn "FATAL: Unable to connect to db "apath"/"dbname) -;; #;(let* ((res (with-input-from-request -;; (conndat->uri conn "api") -;; `((params . (,apath ,dbname))) -;; read-string))) -;; (string->sexpr res)))) (define (rmt:print-db-stats) (let ((fmtstr "~40a~7-d~9-d~20,2-f")) ;; "~20,2-f" (debug:print 18 *default-log-port* "DB Stats, "(seconds->year-week/day-time (current-seconds))"\n=====================") (debug:print 18 *default-log-port* (format #f "~40a~8a~10a~10a" "Cmd" "Count" "TotTime" "Avg")) @@ -475,11 +426,11 @@ (rmt:send-receive 'kill-server #f (list run-id))) (define (rmt:start-server run-id) (rmt:send-receive 'start-server #f (list run-id))) -(define (rmt:get-server-info apath dbname) +(define (rmt:server-info apath dbname) (rmt:send-receive 'get-server-info #f (list apath dbname))) ;;====================================================================== ;; M I S C ;;====================================================================== @@ -1543,17 +1494,17 @@ (not (equal? (substring (->string megatest-version) 0 4) (substring (conc (common:get-last-run-version)) 0 4)))) ;; host and port are used to ensure we are remove proper records (define (rmt:server-shutdown host port) - (let ((dbfile (servdat-dbfile *server-info*))) + (let ((dbfile (servdat-dbfile *db-serv-info*))) (debug:print-info 0 *default-log-port* "dbfile is "dbfile) (if dbfile (let* ((am-server (args:get-arg "-server")) (dbfile (args:get-arg "-db")) (apath *toppath*) - (remdat *remotedat*)) ;; foundation for future fix + #;(sinfo *remotedat*)) ;; foundation for future fix (if *dbstruct-db* (let* ((dbdat (db:get-dbdat *dbstruct-db* apath dbfile)) (db (dbr:dbdat-db dbdat)) (inmem (dbr:dbdat-db dbdat)) ;; WRONG ) @@ -1572,24 +1523,24 @@ (debug:print-info 0 *default-log-port* "Db was never opened, no cleanup to do.")) (if (not am-server) (debug:print-info 0 *default-log-port* "I am not a server, should NOT get here!") (if (string-match ".*/main.db$" dbfile) (let ((pkt-file (conc (get-pkts-dir *toppath*) - "/" (servdat-uuid *server-info*) + "/" (servdat-uuid *db-serv-info*) ".pkt"))) (debug:print-info 0 *default-log-port* "removing pkt "pkt-file) (delete-file* pkt-file) (debug:print-info 0 *default-log-port* "Releasing lock (if any) for "dbfile ", host "host", port "port) (db:with-lock-db - (servdat-dbfile *server-info*) + (servdat-dbfile *db-serv-info*) (lambda (dbh dbfile) (db:release-lock dbh dbfile host port)))) ;; I'm not the server - should not have a lock to remove - (let* ((sdat *server-info*) ;; we have a run-id server + (let* ((sdat *db-serv-info*) ;; we have a run-id server (host (servdat-host sdat)) (port (servdat-port sdat)) (uuid (servdat-uuid sdat)) - (res (rmt:deregister-server remdat *toppath* host port uuid dbfile))) + (res (rmt:deregister-server *db-serv-info* *toppath* host port uuid dbfile))) (debug:print-info 0 *default-log-port* "deregistered-server, res="res) (debug:print-info 0 *default-log-port* "deregistering server "host":"port" with uuid "uuid) ))))))) (define (std-exit-procedure) @@ -1605,13 +1556,13 @@ (if (and no-hurry (debug:debug-mode 18)) (rmt:print-db-stats)) (let ((th1 (make-thread (lambda () ;; thread for cleaning up, give it five seconds (let* ((start-time (current-seconds))) - (if *server-info* - (let* ((host (servdat-host *server-info*)) - (port (servdat-port *server-info*))) + (if *db-serv-info* + (let* ((host (servdat-host *db-serv-info*)) + (port (servdat-port *db-serv-info*))) (debug:print-info 0 *default-log-port* "Shutting down server/responder.") ;; ;; TODO - add flushing/waiting on the work queue ;; (rmt:server-shutdown host port) @@ -1660,15 +1611,14 @@ ;; NOTE: This is NOT called directly from clients as not all transports support a client running ;; in the same process as the server. ;; ;; conn is a conndat record ;; -(define (server:ping conn #!key (do-exit #f)) - (let* ((req (conndat-socket conn)) - (srvkey (conndat-srvkey conn)) +(define (server:ping uconn #!key (do-exit #f)) + (let* ((srvkey (conndat-srvkey uconn)) (msg (sexpr->string '(ping ,srvkey)))) - (send-receive req msg))) ;; (server-ready? host port server-id)) + (send-receive uconn 'ping msg))) ;; (server-ready? host port server-id)) ;;====================================================================== ;; http-transportmod.scm contents moved here ;;====================================================================== @@ -1684,34 +1634,33 @@ (define (http-get-function fnkey) (hash-table-ref/default *http-functions* fnkey (lambda () "nothing here yet"))) ;; Main entry point to start a server. was start-server (define (rmt:run hostn) - ;; (assert (not *server-info*) "FATAL: rmt:run called but *server-info* has already been initialized") ;; ;; Configurations for server ;; (tcp-buffer-size 2048) ;; (max-connections 2048) (debug:print 2 *default-log-port* "PID: "(current-process-id)". Attempting to start the server ...") - (if *server-info* - (let* ((uconn (servdat-uconn *server-info*))) + (if *db-serv-info* + (let* ((uconn (servdat-uconn *db-serv-info*))) (wait-and-close uconn)) (let* ((port (portlogger:open-run-close portlogger:find-port)) (handler-proc (lambda (rem-host-port qrykey cmd params) ;; (api:execute-requests *dbstruct-db* cmd params)))) ;; (api:process-request *dbstuct-db* - (set! *server-info* (make-servdat host: hostn port: port)) + (set! *db-serv-info* (make-servdat host: hostn port: port)) (let* ((uconn (run-listener handler-proc port)) (rport (udat-port uconn))) ;; the real port - (servdat-host-set! *server-info* hostn) - (servdat-port-set! *server-info* rport) - (servdat-uconn-set! *server-info* uconn) + (servdat-host-set! *db-serv-info* hostn) + (servdat-port-set! *db-serv-info* rport) + (servdat-uconn-set! *db-serv-info* uconn) (wait-and-close uconn) (db:print-current-query-stats) ))) - (let* ((host (servdat-host *server-info*)) - (port (servdat-port *server-info*)) - (mode (or (servdat-mode *server-info*) + (let* ((host (servdat-host *db-serv-info*)) + (port (servdat-port *db-serv-info*)) + (mode (or (servdat-mode *db-serv-info*) "non-db"))) ;; server exit stuff here ;; (rmt:server-shutdown host port) - always do in on-exit ;; (portlogger:open-run-close portlogger:set-port port "released") ;; moved to on-exit (debug:print-info 0 *default-log-port* "Server "host":"port" mode "mode"shutdown complete. Exiting") @@ -1781,11 +1730,11 @@ ;; res => list then already locked, check server is responsive ;; => #t then sucessfully got the lock ;; => #f reserved for future use as to indicate something went wrong (match res ((owner_pid owner_host owner_port event_time) - (if (server-ready? uconn owner_host owner_port "abc") + (if (server-ready? uconn (conc owner_host":"owner_port) "abc") #f ;; locked by someone else (begin ;; locked by someone dead and gone (debug:print 0 *default-log-port* "WARNING: stale lock - have to steal it. This may fail.") (db:steal-lock-db dbh dbfile port)))) (#t #t) ;; placeholder so that we don't touch res if it is #t @@ -1835,15 +1784,15 @@ (define (server-address srv-pkt) (conc (alist-ref 'host srv-pkt) ":" (alist-ref 'port srv-pkt))) -(define (server-ready? uconn host port key) ;; server-address is host:port +(define (server-ready? uconn host-port key) ;; server-address is host:port (let* ((data (sexpr->string `((cmd . ping) (key . ,key) (params . ())))) - (res (send-receive uconn (conc host ":" port) 'ping data))) + (res (send-receive uconn host-port 'ping data))) (if res (string->sexpr res) res))) ; from the pkts return servers associated with dbpath @@ -1863,13 +1812,14 @@ (define (remove-pkts-if-not-alive uconn serv-pkts) (filter (lambda (pkt) (let* ((host (alist-ref 'host pkt)) (port (alist-ref 'port pkt)) + (host-port (conc host":"port)) (key (alist-ref 'servkey pkt)) (pktz (alist-ref 'Z pkt)) - (res (server-ready? uconn host port key))) + (res (server-ready? uconn host-port key))) (if res res (let* ((pktsdir (get-pkts-dir *toppath*)) (pktpath (conc pktsdir"/"pktz".pkt"))) (debug:print 0 *default-log-port* "WARNING: pkt with no server "pktpath) @@ -1884,14 +1834,15 @@ (if (null? tail) #f (let* ((spkt (car tail)) (host (alist-ref 'ipaddr spkt)) (port (alist-ref 'port spkt)) + (host-port (conc host":"port)) (dbpth (alist-ref 'dbpath spkt)) (srvkey (alist-ref 'Z spkt)) ;; (alist-ref 'srvkey spkt)) (addr (server-address spkt))) - (if (server-ready? uconn host port srvkey) + (if (server-ready? uconn host-port srvkey) spkt (loop (cdr tail))))))) ;; am I the "first" in line server? I.e. my D card is smallest ;; use Z card as tie breaker @@ -1928,19 +1879,19 @@ ;;====================================================================== ;; if .db/main.db check the pkts ;; (define (rmt:wait-for-server pkts-dir db-file server-key) - (let* ((sdat *server-info*)) + (let* ((sdat *db-serv-info*)) (let loop ((start-time (current-seconds)) (changed #t) (last-sdat "not this")) (begin ;; let ((sdat #f)) (thread-sleep! 0.01) (debug:print-info 0 *default-log-port* "Waiting for server alive signature") (mutex-lock! *heartbeat-mutex*) - (set! sdat *server-info*) + (set! sdat *db-serv-info*) (mutex-unlock! *heartbeat-mutex*) (if (and sdat (not changed) (> (- (current-seconds) start-time) 2)) (let* ((uconn (servdat-uconn sdat))) @@ -1967,11 +1918,11 @@ (best-srv (get-best-candidate alive db-file)) (best-srv-key (if best-srv (alist-ref 'servkey best-srv) #f)) (i-am-srv (equal? best-srv-key server-key)) (delete-pkt (lambda () (let* ((pktfile (conc (get-pkts-dir *toppath*) - "/" (servdat-uuid *server-info*) + "/" (servdat-uuid *db-serv-info*) ".pkt"))) (debug:print-info 0 *default-log-port* "Attempting to remove bogus pkt file "pktfile) (delete-file* pktfile))))) ;; remove immediately instead of waiting for on-exit (debug:print 0 *default-log-port* "best-srv-key: "best-srv-key", server-key: "server-key", i-am-srv: "i-am-srv) ;; am I the best-srv, compare server-keys to know @@ -2004,37 +1955,36 @@ (exit)) (loop start-time (equal? sdat last-sdat) sdat)))))))) -(define (rmt:register-server remdat apath iface port server-key dbname) - (remotedat-conns remdat) ;; just checking types - (rmt:open-main-connection remdat apath) ;; we need a channel to main.db - (rmt:send-receive-real remdat apath ;; params: host port servkey pid ipaddr dbpath +(define (rmt:register-server sinfo apath iface port server-key dbname) + (servdat-conns sinfo) ;; just checking types + (rmt:open-main-connection sinfo apath) ;; we need a channel to main.db + (rmt:send-receive-real sinfo apath ;; params: host port servkey pid ipaddr dbpath (db:run-id->dbname #f) 'register-server `(,iface ,port ,server-key ,(current-process-id) ,iface ,apath ,dbname))) -(define (rmt:get-count-servers remdat apath) - (remotedat-conns remdat) ;; just checking types - (rmt:open-main-connection remdat apath) ;; we need a channel to main.db - (rmt:send-receive-real remdat apath ;; params: host port servkey pid ipaddr dbpath +(define (rmt:get-count-servers sinfo apath) + (servdat-conns sinfo) ;; just checking types + (rmt:open-main-connection sinfo apath) ;; we need a channel to main.db + (rmt:send-receive-real sinfo apath ;; params: host port servkey pid ipaddr dbpath (db:run-id->dbname #f) 'get-count-servers `(,apath))) (define (rmt:get-servers-info apath) (rmt:send-receive 'get-servers-info #f `(,apath))) -(define (rmt:deregister-server remdat apath iface port server-key dbname) - (remotedat-conns remdat) ;; just checking types - (rmt:open-main-connection remdat apath) ;; we need a channel to main.db - (rmt:send-receive-real remdat apath ;; params: host port servkey pid ipaddr dbpath +(define (rmt:deregister-server db-serv-info apath iface port server-key dbname) + (rmt:open-main-connection db-serv-info apath) ;; we need a channel to main.db + (rmt:send-receive-real db-serv-info apath ;; params: host port servkey pid ipaddr dbpath (db:run-id->dbname #f) 'deregister-server `(,iface ,port ,server-key ,(current-process-id) @@ -2041,23 +1991,23 @@ ,iface ,apath ,dbname))) (define (rmt:wait-for-stable-interface #!optional (num-tries-allowed 100)) - ;; wait until *server-info* stops changing + ;; wait until *db-serv-info* stops changing (let* ((stime (current-seconds))) (let loop ((last-host #f) (last-port #f) (tries 0)) - (let* ((curr-host (and *server-info* (servdat-host *server-info*))) - (curr-port (and *server-info* (servdat-port *server-info*)))) - ;; first we verify port and interface, update *server-info* in need be. + (let* ((curr-host (and *db-serv-info* (servdat-host *db-serv-info*))) + (curr-port (and *db-serv-info* (servdat-port *db-serv-info*)))) + ;; first we verify port and interface, update *db-serv-info* in need be. (cond ((> tries num-tries-allowed) (debug:print 0 *default-log-port* "rmt:keep-running, giving up after trying for several minutes.") (exit 1)) - ((not *server-info*) + ((not *db-serv-info*) (thread-sleep! 0.25) (loop curr-host curr-port (+ tries 1))) ((or (not last-host)(not last-port)) (debug:print 0 *default-log-port* "rmt:keep-running, still no interface, tries="tries) (thread-sleep! 0.25) @@ -2070,16 +2020,16 @@ ((< (- (current-seconds) stime) 1) ;; keep up the looping until at least 3 seconds have passed (thread-sleep! 0.5) (loop curr-host curr-port (+ tries 1))) (else (rmt:get-signature) ;; sets *my-signature* as side effect - (servdat-status-set! *server-info* 'interface-stable) + (servdat-status-set! *db-serv-info* 'interface-stable) (debug:print 0 *default-log-port* "SERVER STARTED: " curr-host ":" curr-port " AT " (current-seconds) " server signature: " *my-signature* - " with "(servdat-trynum *server-info*)" port changes") + " with "(servdat-trynum *db-serv-info*)" port changes") (flush-output *default-log-port*) #t)))))) ;; run rmt:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. @@ -2088,11 +2038,11 @@ ;; if none running or if > 20 seconds since ;; server last used then start shutdown ;; This thread waits for the server to come alive (debug:print-info 0 *default-log-port* "Starting the sync-back, keep alive thread in server") - (let* ((remdat *remotedat*) + (let* ((sinfo *db-serv-info*) (server-start-time (current-seconds)) (pkts-dir (get-pkts-dir)) (server-key (rmt:get-signature)) ;; This servers key (is-main (equal? (args:get-arg "-db") ".db/main.db")) (last-access 0) @@ -2104,48 +2054,48 @@ ;; (portlogger:open-run-close portlogger:set-port port "released") called in on-exit (exit))) (timed-out? (lambda () (<= (+ last-access server-timeout) (current-seconds))))) - (servdat-dbfile-set! *server-info* (args:get-arg "-db")) + (servdat-dbfile-set! *db-serv-info* (args:get-arg "-db")) ;; main and run db servers have both got wait logic (could/should merge it) (if is-main (rmt:wait-for-server pkts-dir dbname server-key) (rmt:wait-for-stable-interface)) ;; this is our forever loop - (let* ((iface (servdat-host *server-info*)) - (port (servdat-port *server-info*)) - (uconn (servdat-uconn *server-info*))) + (let* ((iface (servdat-host *db-serv-info*)) + (port (servdat-port *db-serv-info*)) + (uconn (servdat-uconn *db-serv-info*))) (let loop ((count 0) (bad-sync-count 0) (start-time (current-milliseconds))) (if (and (not is-main) (common:low-noise-print 60 "servdat-status")) - (debug:print-info 0 *default-log-port* "servdat-status is " (servdat-status *server-info*))) + (debug:print-info 0 *default-log-port* "servdat-status is " (servdat-status *db-serv-info*))) ;; set up the database handle (mutex-lock! *heartbeat-mutex*) (if (not *dbstruct-db*) ;; no db opened yet, open the db and register with main if appropriate (let ((watchdog (bdat-watchdog *bdat*))) (debug:print 0 *default-log-port* "SERVER: dbprep") (db:setup dbname) ;; sets *dbstruct-db* as side effect - (servdat-status-set! *server-info* 'db-opened) + (servdat-status-set! *db-serv-info* 'db-opened) ;; IFF I'm not main, call into main and register self (if (not is-main) - (let ((res (rmt:register-server remdat + (let ((res (rmt:register-server sinfo *toppath* iface port server-key dbname))) (if res ;; we are the server - (servdat-status-set! *server-info* 'have-interface-and-db) + (servdat-status-set! *db-serv-info* 'have-interface-and-db) ;; now check that the db locker is alive, clear it out if not - (let* ((serv-info (rmt:get-server-info *toppath* dbname))) + (let* ((serv-info (rmt:server-info *toppath* dbname))) (match serv-info ((host port servkey pid ipaddr apath dbpath) - (if (not (server-ready? uconn host port servkey)) + (if (not (server-ready? uconn (conc host":"port) servkey)) (begin (debug:print-info 0 *default-log-port* "Server registered but not alive. Removing and trying again.") - (rmt:deregister-server remdat apath host port servkey dbpath) ;; servkey pid ipaddr apath dbpath) + (rmt:deregister-server sinfo apath host port servkey dbpath) ;; servkey pid ipaddr apath dbpath) (loop (+ count 1) bad-sync-count start-time)))) (else (debug:print 0 *default-log-port* "We are not the server for "dbname", exiting. Server info is: "serv-info) (exit))))))) @@ -2201,11 +2151,11 @@ (debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access)) (shutdown-server-sequence (get-host-name) port)) ((and *server-run* (or (not (timed-out?)) (if is-main ;; do not exit if there are other servers (keep main open until all others gone) - (> (rmt:get-count-servers remdat *toppath*) 1) + (> (rmt:get-count-servers sinfo *toppath*) 1) #f))) (if (common:low-noise-print 120 "server continuing") (debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access))) (loop 0 bad-sync-count (current-milliseconds))) (else Index: tests/unittests/basicserver.scm ================================================================== --- tests/unittests/basicserver.scm +++ tests/unittests/basicserver.scm @@ -21,22 +21,24 @@ ;; Run like this: ;; ;; ./rununittest.sh server 1;(cd simplerun;megatest -stop-server 0) (import rmtmod trace http-client apimod dbmod - launchmod srfi-69) + launchmod srfi-69 ulex system-information) (trace-call-sites #t) (trace + get-the-server ;; db:get-dbdat ;; rmt:find-main-server ;; rmt:send-receive-real ;; rmt:send-receive ;; sexpr->string - ;; server-ready? + server-ready? ;; rmt:register-server - ;; rmt:open-main-connection + api:run-server-process + rmt:open-main-connection ;; rmt:general-open-connection ;; rmt:get-conny ;; common:watchdog ;; rmt:find-main-server ;; get-all-server-pkts @@ -45,20 +47,31 @@ ;; api:run-server-process ;; rmt:run ;; rmt:try-start-server ) -(test #f #t (remotedat? (let ((r (make-remotedat))) - (set! *remotedat* r) - r))) -(test #f #f (rmt:get-conn *remotedat* *toppath* ".db/main.db")) -(test #f #f (rmt:find-main-server *toppath* ".db/main.db")) -(test #f #t (rmt:open-main-connection *remotedat* *toppath*)) -(pp (hash-table->alist (remotedat-conns *remotedat*))) -(test #f #t (conndat? (rmt:get-conn *remotedat* *toppath* ".db/main.db"))) - -(define *main* (rmt:get-conn *remotedat* *toppath* ".db/main.db")) +(test #f #t (servdat? (let ((s (make-servdat))) + (set! *servdat* s) + s))) +(test #f #f (rmt:get-conn *servdat* *toppath* ".db/main.db")) +(test #f #f (rmt:find-main-server *servdat* *toppath* ".db/main.db")) +(define th1 (make-thread (lambda () + (rmt:run (get-host-name))) + "rmt:run thread")) +(thread-start! th1) +(thread-sleep! 0.5) ;; give things some time to get going +(test #f #t (ulex-listener? *server-info*)) +(test #f #t (string? (udat-host-port *server-info*))) +(exit) +(test #f #t (server-ready? *server-info* (udat-host-port *server-info*))) + +(test #f #t (rmt:open-main-connection *servdat* *toppath*)) +;; (pp (hash-table->alist (remotedat-conns *servdat*))) +(test #f #t (conndat? (rmt:get-conn *servdat* *toppath* ".db/main.db"))) +(exit) + +(define *main* (rmt:get-conn *servdat* *toppath* ".db/main.db")) ;; (for-each (lambda (tdat) ;; (test #f tdat (loop-test (rmt:conn-ipaddr *main*) ;; (rmt:conn-port *main*) tdat))) ;; (list 'a @@ -68,13 +81,13 @@ (define *db* (db:setup ".db/main.db")) ;; these let me cut and paste from source easily (define apath *toppath*) (define dbname ".db/2.db") -(define remote *remotedat*) +(define remote *servdat*) (define keyvals '(("SYSTEM" "a")("RELEASE" "b"))) (test #f '() (string->sexpr "()")) (test #f 'server-started (api:execute-requests *db* 'get-server (list *toppath* ".db/2.db"))) (set! *dbstruct-db* #f) (exit) Index: ulex/ulex.scm ================================================================== --- ulex/ulex.scm +++ ulex/ulex.scm @@ -1,8 +1,8 @@ ;; ulex: Distributed sqlite3 db ;;; -;; Copyright (C) 2018 Matt Welland +;; Copyright (C) 2018-2021 Matt Welland ;; Redistribution and use in source and binary forms, with or without ;; modification, is permitted. ;; ;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS ;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED @@ -40,10 +40,12 @@ set-work-handler ;; (set-work-handler proc) wait-and-close ;; (wait-and-close uconn) + ulex-listener? + ;; needed to get the interface:port that was automatically found udat-port udat-host-port ;; for testing only @@ -112,10 +114,16 @@ ;; ) ;;====================================================================== ;; listener ;;====================================================================== + +;; is uconn a ulex connector (listener) +;; +(define (ulex-listener? uconn) + (udat? uconn)) + ;; create a tcp listener and return a populated udat struct with ;; my port, address, hostname, pid etc. ;; return #f if fail to find a port to allocate. ;; ;; if udata-in is #f create the record