Index: rmtmod.scm ================================================================== --- rmtmod.scm +++ rmtmod.scm @@ -113,94 +113,32 @@ ;; ;; Configurations for server (tcp-buffer-size 2048) (max-connections 2048) +;; info about me as a server +;; (defstruct servdat (host #f) (port #f) (uuid #f) (dbfile #f) (api-url #f) (api-uri #f) (api-req #f) - (status 'starting)) + (status 'starting) + (trynum 0) ;; count the number of ports we've tried + ) (define (servdat->url sdat) (conc (servdat-host sdat)":"(servdat-port sdat))) -;; (include "db_records.scm") - -;;====================================================================== -;; return the handle struct for sending queries to a specific database -;; - initializes the connection object if this is the first access -;; - finds the "captain" and asks who to talk to for the given dbfname -;; - establishes the connection to the current dbowner -;; -#;(define (rmt:connect alldat dbfname dbtype) - (let* ((ulexdat (or (alldat-ulexdat alldat) - (rmt:setup-ulex alldat)))) - (ulex:connect ulexdat dbfname dbtype))) - -;; setup the remote calls -#;(define (rmt:setup-ulex alldat) - (let* ((udata (ulex:setup))) ;; establish connection to ulex - (alldat-ulexdat-set! alldat udata) - ;; register all needed procs - (ulex:register-handler udata 'ping cmod:get-full-version) ;; override ping with get-full-version - (ulex:register-handler udata 'login cmod:get-full-version) ;; force setup of the connection - (ulex:register-handler udata 'execute api:execute-requests) - udata)) - -;; set up a connection to the current owner of the dbfile associated with rid -;; then send the query to that dbfile owner and wait for a response. -;; -#;(define (rmt:send-receive alldat cmd rid params #!key (attemptnum 1)(area-dat #f)) ;; start attemptnum at 1 so the modulo below works as expected - (let* (;; (alldat *alldat*) - (areapath (alldat-areapath alldat)) - (dbtype (if (or (not rid)(< rid 1)) ;; this is the criteria for "main.db" - "main" "runs")) - (dbfname (if (equal? dbtype "main") - "main.db" - (conc rid ".db"))) - (dbfile (conc areapath "/.db/" dbfname)) - (ulexconn (rmt:connect alldat dbfname dbtype)) ;; ulexconn is our new *runremote*, it is a dbowner struct < pdat lastrefresh > - (udata (alldat-ulexdat alldat))) - (ulex:remote-request udata ulexconn 'immediate dbfile 'execute rid params))) - ;; need to call this on the other side - ;; (api:execute-requests dbstruct-local (vector (symbol->string cmd) params)))) - - #;(with-input-from-string - (ulex:remote-request udata ulexconn 'immediate dbfile 'execute rid (with-output-to-string (lambda ()(serialize params)))) - (lambda ()(deserialize))) - -;; -;; THESE ARE ALL CALLED ON THE CLIENT SIDE!!! -;; ;; generate entries for ~/.megatestrc with the following ;; ;; grep define ../rmt.scm | grep rmt: |perl -pi -e 's/\(define\s+\((\S+)\W.*$/\1/'|sort -u -;;====================================================================== -;; S U P P O R T F U N C T I O N S -;;====================================================================== - -;; ;; if a server is either running or in the process of starting call client:setup -;; ;; else return #f to let the calling proc know that there is no server available -;; ;; -;; (define (rmt:get-connection-info areapath #!key (area-dat #f)) ;; TODO: push areapath down. -;; (let* ((runremote (or area-dat *runremote*)) -;; (cinfo (if (remote? runremote) -;; (remote-conndat runremote) -;; #f))) -;; (if cinfo -;; cinfo -;; (if (server:check-if-running areapath) -;; (client:setup areapath) -;; #f)))) - (defstruct rmt:remote (conns (make-hash-table)) ;; apath/dbname => rmt:conn ) (defstruct rmt:conn @@ -211,10 +149,14 @@ (ipaddr #f) (port #f) (srvpkt #f) (lastmsg 0) (expires 0)) + +;;====================================================================== +;; S U P P O R T F U N C T I O N S +;;====================================================================== ;; replaces *runremote* (define *rmt:remote* (make-rmt:remote)) ;; -> http://abc.com:900/ @@ -248,10 +190,12 @@ (viable-srvs (get-viable-servers all-srvpkts dbname))) (get-the-server apath viable-srvs))) ;; looks for a connection to main ;; connections for other servers happens by requesting from main +;; +;; TODO: This is unnecessarily re-creating the record in the hash table ;; (define (rmt:open-main-connection remote apath) (let* ((dbname (db:run-id->dbname #f)) (the-srv (rmt:find-main-server apath dbname)) (start-main-srv (lambda () @@ -1548,12 +1492,11 @@ ;; ;; NOTE: This is NOT called directly from clients as not all transports support a client running ;; in the same process as the server. ;; (define (server:ping host port server-id #!key (do-exit #f)) - (let* ((sdat (servdat-init #f host port server-id))) - (rmt:send-receive sdat 'ping '()))) + (server-ready? host port "nokey yet")) ;;====================================================================== ;; http-transportmod.scm contents moved here ;;====================================================================== @@ -1667,10 +1610,18 @@ (define (http-transport:try-start-server ipaddrstr portnum) (let ((config-hostname (configf:lookup *configdat* "server" "hostname")) (config-use-proxy (equal? (configf:lookup *configdat* "client" "use-http_proxy") "yes"))) (if (not config-use-proxy) (determine-proxy (constantly #f))) + ;; any error in following steps will result in a retry + (if *server-info* + (begin + (servdat-host-set! *server-info* ipaddrstr) + (servdat-port-set! *server-info* portnum) + (servdat-status-set! *server-info* 'trying-port) + (servdat-trynum-set! *server-info* (+ (servdat-trynum *server-info*) 1))) + (set! *server-info* (make-servdat host: ipaddrstr port: portnum))) (debug:print-info 0 *default-log-port* "http-transport:try-start-server time=" (seconds->time-string (current-seconds)) " ipaddrsstr=" ipaddrstr " portnum=" portnum " config-hostname=" config-hostname) @@ -1690,23 +1641,26 @@ ;; get_next_port goes here (http-transport:try-start-server ipaddrstr (portlogger:open-run-close portlogger:find-port))) (begin (print "ERROR: Tried and tried but could not start the server")))) - ;; any error in following steps will result in a retry - (set! *server-info* (make-servdat host: ipaddrstr port: portnum)) - (debug:print 0 *default-log-port* "INFO: Trying to start server on " ipaddrstr ":" portnum) - ;; This starts the spiffy server - ;; NEED WAY TO SET IP TO #f TO BIND ALL - ;; (start-server bind-address: ipaddrstr port: portnum) - (if config-hostname ;; this is a hint to bind directly - (start-server port: portnum bind-address: (if (equal? config-hostname "-") - ipaddrstr - config-hostname)) - (start-server port: portnum)) - (portlogger:open-run-close portlogger:set-port portnum "released") - (debug:print 1 *default-log-port* "INFO: server has been stopped")))) + ;; any error in following steps will result in a retry + (if *server-info* + (servdat-status-set! *server-info* 'starting) + (set! *server-info* (make-servdat host: ipaddrstr port: portnum))) + + (debug:print 0 *default-log-port* "INFO: Trying to start server on " ipaddrstr ":" portnum) + ;; This starts the spiffy server + ;; NEED WAY TO SET IP TO #f TO BIND ALL + ;; (start-server bind-address: ipaddrstr port: portnum) + (if config-hostname ;; this is a hint to bind directly + (start-server port: portnum bind-address: (if (equal? config-hostname "-") + ipaddrstr + config-hostname)) + (start-server port: portnum)) + (portlogger:open-run-close portlogger:set-port portnum "released") + (debug:print 1 *default-log-port* "INFO: server has been stopped")))) ;;====================================================================== ;; S E R V E R U T I L I T I E S ;;====================================================================== @@ -1803,12 +1757,15 @@ (debug:print-error 0 *default-log-port* "call to http-transport:server-dat-update-last-access with non-vector!!")))) ;; initialize servdat for client side, setup needed parameters ;; pass in #f as sdat-in to create sdat ;; -(define (servdat-init sdat-in iface port uuid) +#;(define (servdat-init sdat-in iface port uuid) (let* ((sdat (or sdat-in (make-servdat)))) + + (assert #f "This is a bad idea.") + (if uuid (servdat-uuid-set! sdat uuid)) (servdat-host-set! sdat iface) (servdat-port-set! sdat port) (servdat-api-url-set! sdat (conc "http://" iface ":" port "/api")) (servdat-api-uri-set! sdat (uri-reference (servdat-api-url sdat))) @@ -1992,10 +1949,11 @@ (mutex-unlock! *heartbeat-mutex*) (if (and sdat (not changed) (> (- (current-seconds) start-time) 2)) (begin + (servdat-status-set! sdat 'iface-stable) (debug:print-info 0 *default-log-port* "Received server alive signature, now attempting to lock in server") ;; create a server pkt in *toppath*/.meta/srvpkts ;; TODO: ;; 1. change sdat to stuct @@ -2018,11 +1976,12 @@ ;; am I the best-srv, compare server-keys to know (if (equal? best-srv-key server-key) (if (get-lock-db sdat db-file) ;; (db:get-iam-server-lock *dbstruct-db* *toppath* run-id) (begin (debug:print 0 *default-log-port* "I'm the server!") - (servdat-dbfile-set! sdat db-file)) + (servdat-dbfile-set! sdat db-file) + (servdat-status-set! sdat 'db-locked)) (begin (debug:print 0 *default-log-port* "I'm not the server, exiting.") (bdat-time-to-exit-set! *bdat* #t) (thread-sleep! 0.2) (exit))) @@ -2056,41 +2015,45 @@ ,dbname))) (define (http-transport:wait-for-stable-interface #!optional (num-tries-allowed 100)) ;; wait until *server-info* stops changing (let* ((stime (current-seconds))) - (let loop ((sdat #f) ;; this is our copy of the *last* *server-info* + (let loop ((last-host #f) + (last-port #f) (tries 0)) - ;; first we verify port and interface, update *server-info* in need be. - (cond - ((> tries num-tries-allowed) - (debug:print 0 *default-log-port* "http-transport:keep-running, giving up after trying for several minutes.") - (exit 1)) - ((not *server-info*) - (thread-sleep! 0.25) - (loop *server-info* (+ tries 1))) - ((not sdat) - (debug:print 0 *default-log-port* "http-transport:keep-running, still no interface, tries="tries) - (thread-sleep! 0.25) - (loop *server-info* (+ tries 1))) - ((or (not (equal? (servdat-host sdat)(servdat-host *server-info*))) - (not (equal? (servdat-port sdat)(servdat-port *server-info*)))) - (debug:print-info 0 *default-log-port* "WARNING: interface changed, refreshing iface and port info") - (thread-sleep! 0.25) - (loop *server-info* (+ tries 1))) - ((< (- (current-seconds) stime) 3) ;; keep up the looping until at least 3 seconds have passed - (thread-sleep! 1) - (loop *server-info* (+ tries 1))) - (else - (if (not *server-id*)(set! *server-id* (server:mk-signature))) - (servdat-status-set! *server-info* 'interface-alive) - (debug:print 0 *default-log-port* - "SERVER STARTED: " (servdat-host *server-info*) - ":" (servdat-port *server-info*) - " AT " (current-seconds) " server-id: " *server-id*) - (flush-output *default-log-port*) - #t))))) + (let* ((curr-host (and *server-info* (servdat-host *server-info*))) + (curr-port (and *server-info* (servdat-port *server-info*)))) + ;; first we verify port and interface, update *server-info* in need be. + (cond + ((> tries num-tries-allowed) + (debug:print 0 *default-log-port* "http-transport:keep-running, giving up after trying for several minutes.") + (exit 1)) + ((not *server-info*) + (thread-sleep! 0.25) + (loop curr-host curr-port (+ tries 1))) + ((or (not last-host)(not last-port)) + (debug:print 0 *default-log-port* "http-transport:keep-running, still no interface, tries="tries) + (thread-sleep! 0.25) + (loop curr-host curr-port (+ tries 1))) + ((or (not (equal? last-host curr-host)) + (not (equal? last-port curr-port))) + (debug:print-info 0 *default-log-port* "WARNING: interface changed, refreshing iface and port info") + (thread-sleep! 0.25) + (loop curr-host curr-port (+ tries 1))) + ((< (- (current-seconds) stime) 3) ;; keep up the looping until at least 3 seconds have passed + (thread-sleep! 1) + (loop curr-host curr-port (+ tries 1))) + (else + (if (not *server-id*)(set! *server-id* (server:mk-signature))) + (servdat-status-set! *server-info* 'interface-stable) + (debug:print 0 *default-log-port* + "SERVER STARTED: " curr-host + ":" curr-port + " AT " (current-seconds) " server-id: " *server-id* + " with "(servdat-trynum *server-info*)" port changes") + (flush-output *default-log-port*) + #t)))))) ;; run http-transport:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; (define (http-transport:keep-running dbname) @@ -2103,48 +2066,56 @@ (pkts-dir (get-pkts-dir)) (server-key (server:mk-signature)) (is-main (equal? (args:get-arg "-db") ".db/main.db")) (last-access 0) (server-timeout (server:expiration-timeout))) - ;; exits if nothing found in 100 tries (switch to a duration would be good) - (http-transport:wait-for-stable-interface) - (if is-main (http-transport:wait-for-server pkts-dir dbname server-key)) + ;; main and run db servers have both got wait logic (could/should merge it) + (if is-main + (http-transport:wait-for-server pkts-dir dbname server-key) + (http-transport:wait-for-stable-interface)) ;; this is our forever loop (let* ((iface (servdat-host *server-info*)) (port (servdat-port *server-info*))) (let loop ((count 0) (bad-sync-count 0) (start-time (current-milliseconds))) - (debug:print-info 0 *default-log-port* "servdat-status is " (servdat-status *server-info*) ", is-main="is-main) + + (if (not is-main) + (debug:print-info 0 *default-log-port* "servdat-status is " (servdat-status *server-info*))) + ;; set up the database handle + (mutex-lock! *heartbeat-mutex*) (if (not *dbstruct-db*) ;; no db opened yet, open the db and register with main if appropriate (let ((watchdog (bdat-watchdog *bdat*))) (debug:print 0 *default-log-port* "SERVER: dbprep") (db:setup dbname) ;; sets *dbstruct-db* as side effect - + (servdat-status-set! *server-info* 'db-opened) ;; IFF I'm not main, call into main and register self (if (not is-main) (let ((res (rmt:register-server *rmt:remote* *toppath* iface port server-key dbname))) - (if res ;; we are not the server! + (if res ;; we are the server (servdat-status-set! *server-info* 'have-interface-and-db) - (begin + (begin (debug:print 0 *default-log-port* "We are not the server for "dbname", exiting.") (exit))))) (debug:print 0 *default-log-port* - "SERVER: running, megatest version: " - (common:get-full-version)) + "SERVER: running, db "dbname" opened, megatest version: " + (common:get-full-version)) + ;; start the watchdog (if watchdog (if (not (member (thread-state watchdog) '(ready running blocked sleeping dead))) (begin (debug:print-info 0 *default-log-port* "Starting watchdog thread (in state "(thread-state watchdog)")") - (thread-start! watchdog))) + (thread-start! watchdog)) + (debug:print-info 0 *default-log-port* "Not starting watchdog thread (in state "(thread-state watchdog)")")) (debug:print 0 *default-log-port* "ERROR: *watchdog* not setup, cannot start it.")) - (loop (+ count 1) bad-sync-count start-time))) + #;(loop (+ count 1) bad-sync-count start-time))) + (mutex-unlock! *heartbeat-mutex*) ;; when things go wrong we don't want to be doing the various ;; queries too often so we strive to run this stuff only every ;; four seconds or so. (let* ((sync-time (- (current-milliseconds) start-time)) @@ -2155,13 +2126,11 @@ (if (< count 1) ;; 3x3 = 9 secs aprox (loop (+ count 1) bad-sync-count (current-milliseconds))) ;; Transfer *db-last-access* to last-access to use in checking that we are still alive - (mutex-lock! *heartbeat-mutex*) (set! last-access *db-last-access*) - (mutex-unlock! *heartbeat-mutex*) (if (common:low-noise-print 60 "dbstats") (begin (debug:print 0 *default-log-port* "Server stats:") (db:print-current-query-stats))) @@ -2263,11 +2232,11 @@ (set! *my-client-signature* sig) *my-client-signature*))) ;; run ping in separate process, safest way in some cases ;; -(define (server:ping-server ifaceport) +#;(define (server:ping-server ifaceport) (with-input-from-pipe (conc (common:get-megatest-exe) " -ping " ifaceport) (lambda () (let loop ((inl (read-line)) (res "NOREPLY"))