Index: rmtmod.scm
==================================================================
--- rmtmod.scm
+++ rmtmod.scm
@@ -16,10 +16,14 @@
;; You should have received a copy of the GNU General Public License
;; along with Megatest. If not, see .
;;======================================================================
+;; generate entries for ~/.megatestrc with the following
+;;
+;; grep define ../rmt.scm | grep rmt: |perl -pi -e 's/\(define\s+\((\S+)\W.*$/\1/'|sort -u
+
(declare (unit rmtmod))
(declare (uses apimod))
(declare (uses commonmod))
(declare (uses configfmod))
@@ -101,69 +105,47 @@
tasksmod
ulex
)
-(defstruct alldat
- (areapath #f)
- (ulexdat #f)
- )
-
-
-;; (require-extension (srfi 18) extras tcp s11n)
-;;
-;;
-;; (use srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest posix-extras)
-;;
-;; (use spiffy uri-common intarweb http-client spiffy-request-vars intarweb spiffy-directory-listing)
-;;
;; Configurations for server
;; (tcp-buffer-size 2048)
;; (max-connections 2048)
-;; info about me as a server
+;; info about me as a listener and my connections to db servers
+;; stored (for now) in *db-serv-info*
;;
(defstruct servdat
(host #f)
(port #f)
(uuid #f)
- (rep #f)
(dbfile #f)
- (api-url #f)
- (api-uri #f)
- (api-req #f)
(uconn #f) ;; this is the listener
(mode #f)
(status 'starting)
(trynum 0) ;; count the number of ports we've tried
+ (conns (make-hash-table)) ;; apath/dbname => conndat
)
+
+(define *db-serv-info* (make-servdat))
(define (servdat->url sdat)
(conc (servdat-host sdat)":"(servdat-port sdat)))
-;; generate entries for ~/.megatestrc with the following
+;; db servers contact info
;;
-;; grep define ../rmt.scm | grep rmt: |perl -pi -e 's/\(define\s+\((\S+)\W.*$/\1/'|sort -u
-
-(defstruct remotedat
- (conns (make-hash-table)) ;; apath/dbname => conndat
- )
-
(defstruct conndat
(apath #f)
(dbname #f)
(fullname #f)
(hostport #f)
(ipaddr #f)
(port #f)
- (socket #f)
(srvpkt #f)
(srvkey #f)
(lastmsg 0)
- (expires 0)
- (inport #f)
- (outport #f))
+ (expires 0))
(define *srvpktspec*
`((server (host . h)
(port . p)
(servkey . k)
@@ -173,18 +155,10 @@
;;======================================================================
;; S U P P O R T F U N C T I O N S
;;======================================================================
-;; replaces *runremote*
-(define *remotedat* (make-remotedat))
-
-;; -> http://abc.com:900/
-;;
-(define (conndat->uri conn entrypoint)
- (conc "http://"(conndat-ipaddr conn)":"(conndat-port conn)"/"entrypoint))
-
;; set up the api proc, seems like there should be a better place for this?
;;
;; IS THIS NEEDED ANYMORE? TODO - REMOVE IF POSSIBLE
;;
(define api-proc (make-parameter conc))
@@ -197,11 +171,11 @@
;;
;; if that fails, return '(#f "some reason") ;; NB// convert to raising an exception
;;
(define (rmt:get-conn remdat apath dbname)
(let* ((fullname (db:dbname->path apath dbname))
- (conn (hash-table-ref/default (remotedat-conns remdat) fullname #f)))
+ (conn (hash-table-ref/default (servdat-conns remdat) fullname #f)))
(if (and conn
(< (current-seconds) (conndat-expires conn)))
conn
#f)))
@@ -223,34 +197,32 @@
;;
;; TODO: This is unnecessarily re-creating the record in the hash table
;;
(define (rmt:open-main-connection remdat apath)
(let* ((fullpath (db:dbname->path apath "/.db/main.db"))
- (conns (remotedat-conns remdat))
+ (conns (servdat-conns remdat))
(conn (hash-table-ref/default conns fullpath #f)) ;; TODO - create call for this
- (myconn (if *server-info*
- (servdat-uconn *server-info*)
+ (myconn (if *db-serv-info*
+ (servdat-uconn *db-serv-info*)
(let* ((th1 (make-thread (lambda ()(rmt:run (get-host-name))) "non-db mode server")))
(thread-start! th1)
(let loop ((count 0))
(assert (< count 30) "FATAL: responder failed to initialize in rmt:open-main-connection")
- (if (not *server-info*)
+ (if (not *db-serv-info*)
(begin
(thread-sleep! 1)
(loop (+ count 1)))
(begin
- (servdat-mode-set! *server-info* 'non-db)
- (servdat-uconn *server-info*))))))))
+ (servdat-mode-set! *db-serv-info* 'non-db)
+ (servdat-uconn *db-serv-info*))))))))
(cond
((and conn ;; conn is NOT a socket, just saying ...
(< (current-seconds) (conndat-expires conn)))
#t) ;; we are current and good to go - we'll deal elsewhere with a server that was killed or died
((and conn
(>= (current-seconds)(conndat-expires conn)))
(debug:print-info 0 *default-log-port* "connection to "fullpath" server expired. Reconnecting.")
- #;(if (conndat-socket conn)
- (nng-close! (conndat-socket conn))) ;; TODO - close the ulex server here?
(hash-table-set! conns fullpath #f) ;; clean up
(rmt:open-main-connection remdat apath))
(else
;; Below we will find or create and connect to main
(let* ((dbname (db:run-id->dbname #f))
@@ -287,18 +259,18 @@
expires: (+ (current-seconds) 60) ;; this needs to be gathered during the ping
)))
(hash-table-set! conns fullpath new-the-srv)))
#t)))))
-;; NB// remdat is a remotedat struct
+;; NB// sinfo is a servdat struct
;;
-(define (rmt:general-open-connection remdat apath dbname #!key (num-tries 5))
+(define (rmt:general-open-connection sinfo apath dbname #!key (num-tries 5))
(assert (not (equal? dbname ".db/main.db")) "ERROR: general-open-connection should never be called with main as the db")
(let* ((mdbname (db:run-id->dbname #f))
(fullname (db:dbname->path apath dbname))
- (conns (remotedat-conns remdat))
- (mconn (rmt:get-conn remdat apath mdbname)))
+ (conns (servdat-conns sinfo))
+ (mconn (rmt:get-conn sinfo apath mdbname)))
(if (and mconn
(not (debug:print-logger)))
(begin
(debug:print-info 0 *default-log-port* "Turning on logging to main, look in logs dir for main log.")
(debug:print-logger rmt:log-to-main)))
@@ -306,22 +278,21 @@
((or (not mconn) ;; no channel open to main?
(< (conndat-expires mconn)(+ (current-seconds) 2))) ;; restablish connection if less than 2 seconds on the lease
(if mconn ;; previously opened - clean up NB// consolidate this with the similar code in open main above
(begin
(debug:print-info 0 *default-log-port* "Clearing out connection to main that has expired.")
- ;; (nng-close! (conndat-socket mconn)) ;; TODO - close the ulex server/listener here?
(hash-table-set! conns fullname #f)))
- (rmt:open-main-connection remdat apath)
- (rmt:general-open-connection remdat apath mdbname))
- ((not (rmt:get-conn remdat apath dbname)) ;; no channel open to dbname?
- (let* ((res (rmt:send-receive-real remdat apath mdbname 'get-server `(,apath ,dbname))))
+ (rmt:open-main-connection sinfo apath)
+ (rmt:general-open-connection sinfo apath mdbname))
+ ((not (rmt:get-conn sinfo apath dbname)) ;; no channel open to dbname?
+ (let* ((res (rmt:send-receive-real sinfo apath mdbname 'get-server `(,apath ,dbname))))
(case res
((server-started)
(if (> num-tries 0)
(begin
(thread-sleep! 2)
- (rmt:general-open-connection remdat apath dbname num-tries: (- num-tries 1)))
+ (rmt:general-open-connection sinfo apath dbname num-tries: (- num-tries 1)))
(begin
(debug:print-error 0 *default-log-port* "Failed to start servers needed or open channel to "apath", "dbname)
(exit 1))))
(else
(if (list? res) ;; server has been registered and the info was returned. pass it on.
@@ -365,47 +336,35 @@
;; Defaults to current area
;;
(define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f))
;; (if (not *remotedat*)(set! *remotedat* (make-remotedat)))
(let* ((apath *toppath*)
- (remdat *remotedat*)
- (conns (remotedat-conns remdat)) ;; just checking that remdat is a remotedat
+ (sinfo *db-serv-info*)
(dbname (db:run-id->dbname rid)))
(if *localmode*
(let* ((dbdat (dbr:dbstruct-get-dbdat *dbstruct* dbname))
(indat `((cmd . ,cmd)(params . ,params))))
(api:execute-requests *dbstruct* cmd params)
;; (api:process-request *dbstruct* indat)
;; (api:process-request dbdat indat)
)
(begin
- (rmt:open-main-connection remdat apath)
- (if rid (rmt:general-open-connection remdat apath dbname))
- (rmt:send-receive-real remdat apath dbname cmd params)))))
-
-#;(define (rmt:send-receive-setup conn)
- (if (not (conndat-inport conn))
- (let-values (((i o) (tcp-connect (conndat-ipaddr conn)
- (conndat-port conn))))
- (conndat-inport-set! conn i)
- (conndat-outport-set! conn o))))
-
+ (rmt:open-main-connection sinfo apath)
+ (if rid (rmt:general-open-connection sinfo apath dbname))
+ (rmt:send-receive-real sinfo apath dbname cmd params)))))
+
;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed
;; sometime in the future
;;
-(define (rmt:send-receive-real remdat apath dbname cmd params)
- (let* ((conn (rmt:get-conn remdat apath dbname)))
+(define (rmt:send-receive-real sinfo apath dbname cmd params)
+ (let* ((conn (rmt:get-conn sinfo apath dbname)))
(assert conn "FATAL: rmt:send-receive-real called without the needed channels opened")
- (assert (conndat-socket conn) "FATAL: rmt:send-receive-real called without the channel socket opened.")
- (let* ((soc (conndat-socket conn))
- (key #f)
- (host (conndat-ipaddr conn))
- (port (conndat-port conn))
+ (let* ((key #f)
(payload `((cmd . ,cmd)
(key . ,(conndat-srvkey conn))
(params . ,params)))
- (res (send-receive soc payload)))
+ (res (send-receive conn cmd payload)))
(if (member res '("#")) ;; TODO - fix this in string->sexpr
#f
(string->sexpr res)))))
;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed
@@ -412,18 +371,10 @@
;; sometime in the future.
;;
;; Purpose - call the main.db server and request a server be started
;; for the given area path and dbname
;;
-;; (define (rmt:send-receive-server-start remdat apath dbname)
-;; (let* ((conn (rmt:get-conn remdat apath dbname)))
-;; (assert conn "FATAL: Unable to connect to db "apath"/"dbname)
-;; #;(let* ((res (with-input-from-request
-;; (conndat->uri conn "api")
-;; `((params . (,apath ,dbname)))
-;; read-string)))
-;; (string->sexpr res))))
(define (rmt:print-db-stats)
(let ((fmtstr "~40a~7-d~9-d~20,2-f")) ;; "~20,2-f"
(debug:print 18 *default-log-port* "DB Stats, "(seconds->year-week/day-time (current-seconds))"\n=====================")
(debug:print 18 *default-log-port* (format #f "~40a~8a~10a~10a" "Cmd" "Count" "TotTime" "Avg"))
@@ -475,11 +426,11 @@
(rmt:send-receive 'kill-server #f (list run-id)))
(define (rmt:start-server run-id)
(rmt:send-receive 'start-server #f (list run-id)))
-(define (rmt:get-server-info apath dbname)
+(define (rmt:server-info apath dbname)
(rmt:send-receive 'get-server-info #f (list apath dbname)))
;;======================================================================
;; M I S C
;;======================================================================
@@ -1543,17 +1494,17 @@
(not (equal? (substring (->string megatest-version) 0 4)
(substring (conc (common:get-last-run-version)) 0 4))))
;; host and port are used to ensure we are remove proper records
(define (rmt:server-shutdown host port)
- (let ((dbfile (servdat-dbfile *server-info*)))
+ (let ((dbfile (servdat-dbfile *db-serv-info*)))
(debug:print-info 0 *default-log-port* "dbfile is "dbfile)
(if dbfile
(let* ((am-server (args:get-arg "-server"))
(dbfile (args:get-arg "-db"))
(apath *toppath*)
- (remdat *remotedat*)) ;; foundation for future fix
+ #;(sinfo *remotedat*)) ;; foundation for future fix
(if *dbstruct-db*
(let* ((dbdat (db:get-dbdat *dbstruct-db* apath dbfile))
(db (dbr:dbdat-db dbdat))
(inmem (dbr:dbdat-db dbdat)) ;; WRONG
)
@@ -1572,24 +1523,24 @@
(debug:print-info 0 *default-log-port* "Db was never opened, no cleanup to do."))
(if (not am-server)
(debug:print-info 0 *default-log-port* "I am not a server, should NOT get here!")
(if (string-match ".*/main.db$" dbfile)
(let ((pkt-file (conc (get-pkts-dir *toppath*)
- "/" (servdat-uuid *server-info*)
+ "/" (servdat-uuid *db-serv-info*)
".pkt")))
(debug:print-info 0 *default-log-port* "removing pkt "pkt-file)
(delete-file* pkt-file)
(debug:print-info 0 *default-log-port* "Releasing lock (if any) for "dbfile ", host "host", port "port)
(db:with-lock-db
- (servdat-dbfile *server-info*)
+ (servdat-dbfile *db-serv-info*)
(lambda (dbh dbfile)
(db:release-lock dbh dbfile host port)))) ;; I'm not the server - should not have a lock to remove
- (let* ((sdat *server-info*) ;; we have a run-id server
+ (let* ((sdat *db-serv-info*) ;; we have a run-id server
(host (servdat-host sdat))
(port (servdat-port sdat))
(uuid (servdat-uuid sdat))
- (res (rmt:deregister-server remdat *toppath* host port uuid dbfile)))
+ (res (rmt:deregister-server *db-serv-info* *toppath* host port uuid dbfile)))
(debug:print-info 0 *default-log-port* "deregistered-server, res="res)
(debug:print-info 0 *default-log-port* "deregistering server "host":"port" with uuid "uuid)
)))))))
(define (std-exit-procedure)
@@ -1605,13 +1556,13 @@
(if (and no-hurry (debug:debug-mode 18))
(rmt:print-db-stats))
(let ((th1 (make-thread
(lambda () ;; thread for cleaning up, give it five seconds
(let* ((start-time (current-seconds)))
- (if *server-info*
- (let* ((host (servdat-host *server-info*))
- (port (servdat-port *server-info*)))
+ (if *db-serv-info*
+ (let* ((host (servdat-host *db-serv-info*))
+ (port (servdat-port *db-serv-info*)))
(debug:print-info 0 *default-log-port* "Shutting down server/responder.")
;;
;; TODO - add flushing/waiting on the work queue
;;
(rmt:server-shutdown host port)
@@ -1660,15 +1611,14 @@
;; NOTE: This is NOT called directly from clients as not all transports support a client running
;; in the same process as the server.
;;
;; conn is a conndat record
;;
-(define (server:ping conn #!key (do-exit #f))
- (let* ((req (conndat-socket conn))
- (srvkey (conndat-srvkey conn))
+(define (server:ping uconn #!key (do-exit #f))
+ (let* ((srvkey (conndat-srvkey uconn))
(msg (sexpr->string '(ping ,srvkey))))
- (send-receive req msg))) ;; (server-ready? host port server-id))
+ (send-receive uconn 'ping msg))) ;; (server-ready? host port server-id))
;;======================================================================
;; http-transportmod.scm contents moved here
;;======================================================================
@@ -1684,34 +1634,33 @@
(define (http-get-function fnkey)
(hash-table-ref/default *http-functions* fnkey (lambda () "nothing here yet")))
;; Main entry point to start a server. was start-server
(define (rmt:run hostn)
- ;; (assert (not *server-info*) "FATAL: rmt:run called but *server-info* has already been initialized")
;; ;; Configurations for server
;; (tcp-buffer-size 2048)
;; (max-connections 2048)
(debug:print 2 *default-log-port* "PID: "(current-process-id)". Attempting to start the server ...")
- (if *server-info*
- (let* ((uconn (servdat-uconn *server-info*)))
+ (if *db-serv-info*
+ (let* ((uconn (servdat-uconn *db-serv-info*)))
(wait-and-close uconn))
(let* ((port (portlogger:open-run-close portlogger:find-port))
(handler-proc (lambda (rem-host-port qrykey cmd params) ;;
(api:execute-requests *dbstruct-db* cmd params))))
;; (api:process-request *dbstuct-db*
- (set! *server-info* (make-servdat host: hostn port: port))
+ (set! *db-serv-info* (make-servdat host: hostn port: port))
(let* ((uconn (run-listener handler-proc port))
(rport (udat-port uconn))) ;; the real port
- (servdat-host-set! *server-info* hostn)
- (servdat-port-set! *server-info* rport)
- (servdat-uconn-set! *server-info* uconn)
+ (servdat-host-set! *db-serv-info* hostn)
+ (servdat-port-set! *db-serv-info* rport)
+ (servdat-uconn-set! *db-serv-info* uconn)
(wait-and-close uconn)
(db:print-current-query-stats)
)))
- (let* ((host (servdat-host *server-info*))
- (port (servdat-port *server-info*))
- (mode (or (servdat-mode *server-info*)
+ (let* ((host (servdat-host *db-serv-info*))
+ (port (servdat-port *db-serv-info*))
+ (mode (or (servdat-mode *db-serv-info*)
"non-db")))
;; server exit stuff here
;; (rmt:server-shutdown host port) - always do in on-exit
;; (portlogger:open-run-close portlogger:set-port port "released") ;; moved to on-exit
(debug:print-info 0 *default-log-port* "Server "host":"port" mode "mode"shutdown complete. Exiting")
@@ -1781,11 +1730,11 @@
;; res => list then already locked, check server is responsive
;; => #t then sucessfully got the lock
;; => #f reserved for future use as to indicate something went wrong
(match res
((owner_pid owner_host owner_port event_time)
- (if (server-ready? uconn owner_host owner_port "abc")
+ (if (server-ready? uconn (conc owner_host":"owner_port) "abc")
#f ;; locked by someone else
(begin ;; locked by someone dead and gone
(debug:print 0 *default-log-port* "WARNING: stale lock - have to steal it. This may fail.")
(db:steal-lock-db dbh dbfile port))))
(#t #t) ;; placeholder so that we don't touch res if it is #t
@@ -1835,15 +1784,15 @@
(define (server-address srv-pkt)
(conc (alist-ref 'host srv-pkt) ":"
(alist-ref 'port srv-pkt)))
-(define (server-ready? uconn host port key) ;; server-address is host:port
+(define (server-ready? uconn host-port key) ;; server-address is host:port
(let* ((data (sexpr->string `((cmd . ping)
(key . ,key)
(params . ()))))
- (res (send-receive uconn (conc host ":" port) 'ping data)))
+ (res (send-receive uconn host-port 'ping data)))
(if res
(string->sexpr res)
res)))
; from the pkts return servers associated with dbpath
@@ -1863,13 +1812,14 @@
(define (remove-pkts-if-not-alive uconn serv-pkts)
(filter (lambda (pkt)
(let* ((host (alist-ref 'host pkt))
(port (alist-ref 'port pkt))
+ (host-port (conc host":"port))
(key (alist-ref 'servkey pkt))
(pktz (alist-ref 'Z pkt))
- (res (server-ready? uconn host port key)))
+ (res (server-ready? uconn host-port key)))
(if res
res
(let* ((pktsdir (get-pkts-dir *toppath*))
(pktpath (conc pktsdir"/"pktz".pkt")))
(debug:print 0 *default-log-port* "WARNING: pkt with no server "pktpath)
@@ -1884,14 +1834,15 @@
(if (null? tail)
#f
(let* ((spkt (car tail))
(host (alist-ref 'ipaddr spkt))
(port (alist-ref 'port spkt))
+ (host-port (conc host":"port))
(dbpth (alist-ref 'dbpath spkt))
(srvkey (alist-ref 'Z spkt)) ;; (alist-ref 'srvkey spkt))
(addr (server-address spkt)))
- (if (server-ready? uconn host port srvkey)
+ (if (server-ready? uconn host-port srvkey)
spkt
(loop (cdr tail)))))))
;; am I the "first" in line server? I.e. my D card is smallest
;; use Z card as tie breaker
@@ -1928,19 +1879,19 @@
;;======================================================================
;; if .db/main.db check the pkts
;;
(define (rmt:wait-for-server pkts-dir db-file server-key)
- (let* ((sdat *server-info*))
+ (let* ((sdat *db-serv-info*))
(let loop ((start-time (current-seconds))
(changed #t)
(last-sdat "not this"))
(begin ;; let ((sdat #f))
(thread-sleep! 0.01)
(debug:print-info 0 *default-log-port* "Waiting for server alive signature")
(mutex-lock! *heartbeat-mutex*)
- (set! sdat *server-info*)
+ (set! sdat *db-serv-info*)
(mutex-unlock! *heartbeat-mutex*)
(if (and sdat
(not changed)
(> (- (current-seconds) start-time) 2))
(let* ((uconn (servdat-uconn sdat)))
@@ -1967,11 +1918,11 @@
(best-srv (get-best-candidate alive db-file))
(best-srv-key (if best-srv (alist-ref 'servkey best-srv) #f))
(i-am-srv (equal? best-srv-key server-key))
(delete-pkt (lambda ()
(let* ((pktfile (conc (get-pkts-dir *toppath*)
- "/" (servdat-uuid *server-info*)
+ "/" (servdat-uuid *db-serv-info*)
".pkt")))
(debug:print-info 0 *default-log-port* "Attempting to remove bogus pkt file "pktfile)
(delete-file* pktfile))))) ;; remove immediately instead of waiting for on-exit
(debug:print 0 *default-log-port* "best-srv-key: "best-srv-key", server-key: "server-key", i-am-srv: "i-am-srv)
;; am I the best-srv, compare server-keys to know
@@ -2004,37 +1955,36 @@
(exit))
(loop start-time
(equal? sdat last-sdat)
sdat))))))))
-(define (rmt:register-server remdat apath iface port server-key dbname)
- (remotedat-conns remdat) ;; just checking types
- (rmt:open-main-connection remdat apath) ;; we need a channel to main.db
- (rmt:send-receive-real remdat apath ;; params: host port servkey pid ipaddr dbpath
+(define (rmt:register-server sinfo apath iface port server-key dbname)
+ (servdat-conns sinfo) ;; just checking types
+ (rmt:open-main-connection sinfo apath) ;; we need a channel to main.db
+ (rmt:send-receive-real sinfo apath ;; params: host port servkey pid ipaddr dbpath
(db:run-id->dbname #f)
'register-server `(,iface
,port
,server-key
,(current-process-id)
,iface
,apath
,dbname)))
-(define (rmt:get-count-servers remdat apath)
- (remotedat-conns remdat) ;; just checking types
- (rmt:open-main-connection remdat apath) ;; we need a channel to main.db
- (rmt:send-receive-real remdat apath ;; params: host port servkey pid ipaddr dbpath
+(define (rmt:get-count-servers sinfo apath)
+ (servdat-conns sinfo) ;; just checking types
+ (rmt:open-main-connection sinfo apath) ;; we need a channel to main.db
+ (rmt:send-receive-real sinfo apath ;; params: host port servkey pid ipaddr dbpath
(db:run-id->dbname #f)
'get-count-servers `(,apath)))
(define (rmt:get-servers-info apath)
(rmt:send-receive 'get-servers-info #f `(,apath)))
-(define (rmt:deregister-server remdat apath iface port server-key dbname)
- (remotedat-conns remdat) ;; just checking types
- (rmt:open-main-connection remdat apath) ;; we need a channel to main.db
- (rmt:send-receive-real remdat apath ;; params: host port servkey pid ipaddr dbpath
+(define (rmt:deregister-server db-serv-info apath iface port server-key dbname)
+ (rmt:open-main-connection db-serv-info apath) ;; we need a channel to main.db
+ (rmt:send-receive-real db-serv-info apath ;; params: host port servkey pid ipaddr dbpath
(db:run-id->dbname #f)
'deregister-server `(,iface
,port
,server-key
,(current-process-id)
@@ -2041,23 +1991,23 @@
,iface
,apath
,dbname)))
(define (rmt:wait-for-stable-interface #!optional (num-tries-allowed 100))
- ;; wait until *server-info* stops changing
+ ;; wait until *db-serv-info* stops changing
(let* ((stime (current-seconds)))
(let loop ((last-host #f)
(last-port #f)
(tries 0))
- (let* ((curr-host (and *server-info* (servdat-host *server-info*)))
- (curr-port (and *server-info* (servdat-port *server-info*))))
- ;; first we verify port and interface, update *server-info* in need be.
+ (let* ((curr-host (and *db-serv-info* (servdat-host *db-serv-info*)))
+ (curr-port (and *db-serv-info* (servdat-port *db-serv-info*))))
+ ;; first we verify port and interface, update *db-serv-info* in need be.
(cond
((> tries num-tries-allowed)
(debug:print 0 *default-log-port* "rmt:keep-running, giving up after trying for several minutes.")
(exit 1))
- ((not *server-info*)
+ ((not *db-serv-info*)
(thread-sleep! 0.25)
(loop curr-host curr-port (+ tries 1)))
((or (not last-host)(not last-port))
(debug:print 0 *default-log-port* "rmt:keep-running, still no interface, tries="tries)
(thread-sleep! 0.25)
@@ -2070,16 +2020,16 @@
((< (- (current-seconds) stime) 1) ;; keep up the looping until at least 3 seconds have passed
(thread-sleep! 0.5)
(loop curr-host curr-port (+ tries 1)))
(else
(rmt:get-signature) ;; sets *my-signature* as side effect
- (servdat-status-set! *server-info* 'interface-stable)
+ (servdat-status-set! *db-serv-info* 'interface-stable)
(debug:print 0 *default-log-port*
"SERVER STARTED: " curr-host
":" curr-port
" AT " (current-seconds) " server signature: " *my-signature*
- " with "(servdat-trynum *server-info*)" port changes")
+ " with "(servdat-trynum *db-serv-info*)" port changes")
(flush-output *default-log-port*)
#t))))))
;; run rmt:keep-running in a parallel thread to monitor that the db is being
;; used and to shutdown after sometime if it is not.
@@ -2088,11 +2038,11 @@
;; if none running or if > 20 seconds since
;; server last used then start shutdown
;; This thread waits for the server to come alive
(debug:print-info 0 *default-log-port* "Starting the sync-back, keep alive thread in server")
- (let* ((remdat *remotedat*)
+ (let* ((sinfo *db-serv-info*)
(server-start-time (current-seconds))
(pkts-dir (get-pkts-dir))
(server-key (rmt:get-signature)) ;; This servers key
(is-main (equal? (args:get-arg "-db") ".db/main.db"))
(last-access 0)
@@ -2104,48 +2054,48 @@
;; (portlogger:open-run-close portlogger:set-port port "released") called in on-exit
(exit)))
(timed-out? (lambda ()
(<= (+ last-access server-timeout)
(current-seconds)))))
- (servdat-dbfile-set! *server-info* (args:get-arg "-db"))
+ (servdat-dbfile-set! *db-serv-info* (args:get-arg "-db"))
;; main and run db servers have both got wait logic (could/should merge it)
(if is-main
(rmt:wait-for-server pkts-dir dbname server-key)
(rmt:wait-for-stable-interface))
;; this is our forever loop
- (let* ((iface (servdat-host *server-info*))
- (port (servdat-port *server-info*))
- (uconn (servdat-uconn *server-info*)))
+ (let* ((iface (servdat-host *db-serv-info*))
+ (port (servdat-port *db-serv-info*))
+ (uconn (servdat-uconn *db-serv-info*)))
(let loop ((count 0)
(bad-sync-count 0)
(start-time (current-milliseconds)))
(if (and (not is-main)
(common:low-noise-print 60 "servdat-status"))
- (debug:print-info 0 *default-log-port* "servdat-status is " (servdat-status *server-info*)))
+ (debug:print-info 0 *default-log-port* "servdat-status is " (servdat-status *db-serv-info*)))
;; set up the database handle
(mutex-lock! *heartbeat-mutex*)
(if (not *dbstruct-db*) ;; no db opened yet, open the db and register with main if appropriate
(let ((watchdog (bdat-watchdog *bdat*)))
(debug:print 0 *default-log-port* "SERVER: dbprep")
(db:setup dbname) ;; sets *dbstruct-db* as side effect
- (servdat-status-set! *server-info* 'db-opened)
+ (servdat-status-set! *db-serv-info* 'db-opened)
;; IFF I'm not main, call into main and register self
(if (not is-main)
- (let ((res (rmt:register-server remdat
+ (let ((res (rmt:register-server sinfo
*toppath* iface port
server-key dbname)))
(if res ;; we are the server
- (servdat-status-set! *server-info* 'have-interface-and-db)
+ (servdat-status-set! *db-serv-info* 'have-interface-and-db)
;; now check that the db locker is alive, clear it out if not
- (let* ((serv-info (rmt:get-server-info *toppath* dbname)))
+ (let* ((serv-info (rmt:server-info *toppath* dbname)))
(match serv-info
((host port servkey pid ipaddr apath dbpath)
- (if (not (server-ready? uconn host port servkey))
+ (if (not (server-ready? uconn (conc host":"port) servkey))
(begin
(debug:print-info 0 *default-log-port* "Server registered but not alive. Removing and trying again.")
- (rmt:deregister-server remdat apath host port servkey dbpath) ;; servkey pid ipaddr apath dbpath)
+ (rmt:deregister-server sinfo apath host port servkey dbpath) ;; servkey pid ipaddr apath dbpath)
(loop (+ count 1) bad-sync-count start-time))))
(else
(debug:print 0 *default-log-port* "We are not the server for "dbname", exiting. Server info is: "serv-info)
(exit)))))))
@@ -2201,11 +2151,11 @@
(debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access))
(shutdown-server-sequence (get-host-name) port))
((and *server-run*
(or (not (timed-out?))
(if is-main ;; do not exit if there are other servers (keep main open until all others gone)
- (> (rmt:get-count-servers remdat *toppath*) 1)
+ (> (rmt:get-count-servers sinfo *toppath*) 1)
#f)))
(if (common:low-noise-print 120 "server continuing")
(debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access)))
(loop 0 bad-sync-count (current-milliseconds)))
(else
Index: tests/unittests/basicserver.scm
==================================================================
--- tests/unittests/basicserver.scm
+++ tests/unittests/basicserver.scm
@@ -21,22 +21,24 @@
;; Run like this:
;;
;; ./rununittest.sh server 1;(cd simplerun;megatest -stop-server 0)
(import rmtmod trace http-client apimod dbmod
- launchmod srfi-69)
+ launchmod srfi-69 ulex system-information)
(trace-call-sites #t)
(trace
+ get-the-server
;; db:get-dbdat
;; rmt:find-main-server
;; rmt:send-receive-real
;; rmt:send-receive
;; sexpr->string
- ;; server-ready?
+ server-ready?
;; rmt:register-server
- ;; rmt:open-main-connection
+ api:run-server-process
+ rmt:open-main-connection
;; rmt:general-open-connection
;; rmt:get-conny
;; common:watchdog
;; rmt:find-main-server
;; get-all-server-pkts
@@ -45,20 +47,31 @@
;; api:run-server-process
;; rmt:run
;; rmt:try-start-server
)
-(test #f #t (remotedat? (let ((r (make-remotedat)))
- (set! *remotedat* r)
- r)))
-(test #f #f (rmt:get-conn *remotedat* *toppath* ".db/main.db"))
-(test #f #f (rmt:find-main-server *toppath* ".db/main.db"))
-(test #f #t (rmt:open-main-connection *remotedat* *toppath*))
-(pp (hash-table->alist (remotedat-conns *remotedat*)))
-(test #f #t (conndat? (rmt:get-conn *remotedat* *toppath* ".db/main.db")))
-
-(define *main* (rmt:get-conn *remotedat* *toppath* ".db/main.db"))
+(test #f #t (servdat? (let ((s (make-servdat)))
+ (set! *servdat* s)
+ s)))
+(test #f #f (rmt:get-conn *servdat* *toppath* ".db/main.db"))
+(test #f #f (rmt:find-main-server *servdat* *toppath* ".db/main.db"))
+(define th1 (make-thread (lambda ()
+ (rmt:run (get-host-name)))
+ "rmt:run thread"))
+(thread-start! th1)
+(thread-sleep! 0.5) ;; give things some time to get going
+(test #f #t (ulex-listener? *server-info*))
+(test #f #t (string? (udat-host-port *server-info*)))
+(exit)
+(test #f #t (server-ready? *server-info* (udat-host-port *server-info*)))
+
+(test #f #t (rmt:open-main-connection *servdat* *toppath*))
+;; (pp (hash-table->alist (remotedat-conns *servdat*)))
+(test #f #t (conndat? (rmt:get-conn *servdat* *toppath* ".db/main.db")))
+(exit)
+
+(define *main* (rmt:get-conn *servdat* *toppath* ".db/main.db"))
;; (for-each (lambda (tdat)
;; (test #f tdat (loop-test (rmt:conn-ipaddr *main*)
;; (rmt:conn-port *main*) tdat)))
;; (list 'a
@@ -68,13 +81,13 @@
(define *db* (db:setup ".db/main.db"))
;; these let me cut and paste from source easily
(define apath *toppath*)
(define dbname ".db/2.db")
-(define remote *remotedat*)
+(define remote *servdat*)
(define keyvals '(("SYSTEM" "a")("RELEASE" "b")))
(test #f '() (string->sexpr "()"))
(test #f 'server-started (api:execute-requests *db* 'get-server (list *toppath* ".db/2.db")))
(set! *dbstruct-db* #f)
(exit)
Index: ulex/ulex.scm
==================================================================
--- ulex/ulex.scm
+++ ulex/ulex.scm
@@ -1,8 +1,8 @@
;; ulex: Distributed sqlite3 db
;;;
-;; Copyright (C) 2018 Matt Welland
+;; Copyright (C) 2018-2021 Matt Welland
;; Redistribution and use in source and binary forms, with or without
;; modification, is permitted.
;;
;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS
;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
@@ -40,10 +40,12 @@
set-work-handler ;; (set-work-handler proc)
wait-and-close ;; (wait-and-close uconn)
+ ulex-listener?
+
;; needed to get the interface:port that was automatically found
udat-port
udat-host-port
;; for testing only
@@ -112,10 +114,16 @@
;; )
;;======================================================================
;; listener
;;======================================================================
+
+;; is uconn a ulex connector (listener)
+;;
+(define (ulex-listener? uconn)
+ (udat? uconn))
+
;; create a tcp listener and return a populated udat struct with
;; my port, address, hostname, pid etc.
;; return #f if fail to find a port to allocate.
;;
;; if udata-in is #f create the record