Overview
Comment: | wip |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | v2.001 |
Files: | files | file ages | folders |
SHA1: |
c74886dad6ef99621fc8e3fa3a659a7f |
User & Date: | matt on 2021-12-18 14:33:22 |
Other Links: | branch diff | manifest | tags |
Context
2021-12-18
| ||
14:47 | fixed bad call check-in: 7d2dde810d user: matt tags: v2.001 | |
14:33 | wip check-in: c74886dad6 user: matt tags: v2.001 | |
05:45 | WIP - doesn't compile check-in: 9607b06dff user: matt tags: v2.001 | |
Changes
Modified rmtmod.scm from [208d4b493b] to [4f98cc6cc5].
︙ | ︙ | |||
186 187 188 189 190 191 192 | ;; is it not expired? then return it ;; ;; else setup a connection ;; ;; if that fails, return '(#f "some reason") ;; NB// convert to raising an exception ;; (define (rmt:get-conn remdat apath dbname) | | | | 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 | ;; is it not expired? then return it ;; ;; else setup a connection ;; ;; if that fails, return '(#f "some reason") ;; NB// convert to raising an exception ;; (define (rmt:get-conn remdat apath dbname) (let* ((fullname (db:dbname->path apath dbname)) (conn (hash-table-ref/default (remotedat-conns remdat) fullname #f))) (if (and conn (< (current-seconds) (conndat-expires conn))) conn #f))) (define (rmt:find-main-server apath dbname) (let* ((pktsdir (get-pkts-dir apath)) |
︙ | ︙ | |||
209 210 211 212 213 214 215 | ;; ;; connections for other servers happens by requesting from main ;; ;; TODO: This is unnecessarily re-creating the record in the hash table ;; (define (rmt:open-main-connection remdat apath) (let* ((fullpath (db:dbname->path apath "/.db/main.db")) | > | > < < | | > > | | 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 | ;; ;; connections for other servers happens by requesting from main ;; ;; TODO: This is unnecessarily re-creating the record in the hash table ;; (define (rmt:open-main-connection remdat apath) (let* ((fullpath (db:dbname->path apath "/.db/main.db")) (conns (remotedat-conns remdat)) (conn (hash-table-ref/default conns fullpath #f))) ;; TODO - create call for this (if (and conn ;; conn is NOT a socket, just saying ... (< (current-seconds) (conndat-expires conn))) conn ;; we are current and good to go - we'll deal elsewhere with a server that was killed or died ;; Below we will find or create and connect to main (let* ((dbname (db:run-id->dbname #f)) (the-srv (rmt:find-main-server apath dbname)) (start-main-srv (lambda () ;; call IF there is no the-srv found (api:run-server-process apath dbname) (thread-sleep! 4) (rmt:open-main-connection remdat apath) ;; TODO: Add limit to number of tries ))) (if (not the-srv) ;; have server, try connecting to it (start-main-srv) (let* ((srv-addr (server-address the-srv)) ;; need serv (ipaddr (alist-ref 'ipaddr the-srv)) (port (alist-ref 'port the-srv)) (srvkey (alist-ref 'servkey the-srv)) (fullpath (db:dbname->path apath dbname)) (new-the-srv (make-conndat apath: apath dbname: dbname fullname: fullpath hostport: srv-addr socket: (open-nn-connection srv-addr) ipaddr: ipaddr port: port srvpkt: the-srv srvkey: srvkey ;; generated by rmt:get-signature on the server side lastmsg: (current-seconds) expires: (+ (current-seconds) 60) ;; this needs to be gathered during the ping ))) (hash-table-set! conns fullpath new-the-srv))))))) ;; NB// remdat is a remotedat struct ;; (define (rmt:general-open-connection remdat apath dbname #!key (num-tries 5)) (assert (not (equal? dbname ".db/main.db")) "ERROR: general-open-connection should never be called with main as the db") (let* ((mdbname (db:run-id->dbname #f)) (fullname (db:dbname->path apath dbname)) (conns (remotedat-conns remdat)) (mconn (rmt:get-conn remdat apath mdbname))) (if (and mconn (not (debug:print-logger))) (if (equal? dbname ".db/main.db") (debug:print-info 0 *default-log-port* "Not turning on logging to main, I am main!") (begin (debug:print-info 0 *default-log-port* "Turning on logging to main, look in logs dir for main log.") (debug:print-logger rmt:log-to-main)))) |
︙ | ︙ | |||
285 286 287 288 289 290 291 | ;; "192.168.0.9" ;; "/home/matt/data/megatest/tests/simplerun" ;; ".db/1.db") (match res ((host port servkey pid ipaddr apath dbname) (debug:print-info 0 *default-log-port* "got "res) | | | | 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 | ;; "192.168.0.9" ;; "/home/matt/data/megatest/tests/simplerun" ;; ".db/1.db") (match res ((host port servkey pid ipaddr apath dbname) (debug:print-info 0 *default-log-port* "got "res) (hash-table-set! conns fullname (make-conndat apath: apath dbname: dbname hostport: (conc host":"port) ipaddr: ipaddr port: port srvkey: servkey |
︙ | ︙ | |||
316 317 318 319 320 321 322 | ;; (define *localmode* #t) (define *localmode* #f) (define *dbstruct* (make-dbr:dbstruct)) ;; Defaults to current area ;; (define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f)) | | | > | | | | 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 | ;; (define *localmode* #t) (define *localmode* #f) (define *dbstruct* (make-dbr:dbstruct)) ;; Defaults to current area ;; (define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f)) ;; (if (not *remotedat*)(set! *remotedat* (make-remotedat))) (let* ((apath *toppath*) (remdat *remotedat*) (conns (remotedat-conns remdat)) ;; just checking that remdat is a remotedat (dbname (db:run-id->dbname rid))) (if *localmode* (let* ((dbdat (dbr:dbstruct-get-dbdat *dbstruct* dbname)) (indat `((cmd . ,cmd)(params . ,params)))) (api:process-request *dbstruct* indat) ;; (api:process-request dbdat indat) ) (begin (if rid (rmt:general-open-connection remdat apath dbname) ;; was conns (rmt:open-main-connection remdat apath)) (rmt:send-receive-real remdat apath dbname cmd params))))) #;(define (rmt:send-receive-setup conn) (if (not (conndat-inport conn)) (let-values (((i o) (tcp-connect (conndat-ipaddr conn) (conndat-port conn)))) (conndat-inport-set! conn i) (conndat-outport-set! conn o)))) |
︙ | ︙ | |||
425 426 427 428 429 430 431 | (define (rmt:kill-server run-id) (rmt:send-receive 'kill-server run-id (list run-id))) (define (rmt:start-server run-id) (rmt:send-receive 'start-server 0 (list run-id))) (define (rmt:get-server-info apath dbname) | | | 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 | (define (rmt:kill-server run-id) (rmt:send-receive 'kill-server run-id (list run-id))) (define (rmt:start-server run-id) (rmt:send-receive 'start-server 0 (list run-id))) (define (rmt:get-server-info apath dbname) (rmt:send-receive 'get-server-info #f (list apath dbname))) ;;====================================================================== ;; M I S C ;;====================================================================== (define (rmt:login run-id) (rmt:send-receive 'login run-id (list *toppath* megatest-version *my-signature*))) |
︙ | ︙ | |||
1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 | (define (rmt:server-shutdown) (let ((dbfile (servdat-dbfile *server-info*))) (debug:print-info 0 *default-log-port* "dbfile is "dbfile) (if dbfile (let* ((am-server (args:get-arg "-server")) (dbfile (args:get-arg "-db")) (apath *toppath*) (dbdat (db:get-dbdat *dbstruct-db* apath dbfile)) (db (dbr:dbdat-db dbdat)) (inmem (dbr:dbdat-db dbdat)) ) ;; do a final sync here (debug:print-info 0 *default-log-port* "Doing final sync for "apath" "dbfile" at "(current-seconds)) (db:sync-inmem->disk *dbstruct-db* apath dbfile force-sync: #t) | > | 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 | (define (rmt:server-shutdown) (let ((dbfile (servdat-dbfile *server-info*))) (debug:print-info 0 *default-log-port* "dbfile is "dbfile) (if dbfile (let* ((am-server (args:get-arg "-server")) (dbfile (args:get-arg "-db")) (apath *toppath*) (remdat *remotedat*) ;; foundation for future fix (dbdat (db:get-dbdat *dbstruct-db* apath dbfile)) (db (dbr:dbdat-db dbdat)) (inmem (dbr:dbdat-db dbdat)) ) ;; do a final sync here (debug:print-info 0 *default-log-port* "Doing final sync for "apath" "dbfile" at "(current-seconds)) (db:sync-inmem->disk *dbstruct-db* apath dbfile force-sync: #t) |
︙ | ︙ | |||
1529 1530 1531 1532 1533 1534 1535 | (lambda (dbh dbfile) (db:release-lock dbh dbfile)))) (let* ((sdat *server-info*) ;; we have a run-id server (host (servdat-host sdat)) (port (servdat-port sdat)) (uuid (servdat-uuid sdat))) (if (not (string-match ".db/main.db" (args:get-arg "-db"))) | | | | 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 | (lambda (dbh dbfile) (db:release-lock dbh dbfile)))) (let* ((sdat *server-info*) ;; we have a run-id server (host (servdat-host sdat)) (port (servdat-port sdat)) (uuid (servdat-uuid sdat))) (if (not (string-match ".db/main.db" (args:get-arg "-db"))) (let* ((res (rmt:deregister-server remdat *toppath* (servdat-host *server-info*) ;; iface (servdat-port *server-info*) (servdat-uuid *server-info*) dbfile ;; (current-process-id) ))) (debug:print-info 0 *default-log-port* "deregistered-server, res="res))) (debug:print-info 0 *default-log-port* "deregistering server "host":"port" with uuid "uuid) ))))))) (define (std-exit-procedure) |
︙ | ︙ | |||
1907 1908 1909 1910 1911 1912 1913 | all-pkt-files))) (define (server-address srv-pkt) (conc (alist-ref 'host srv-pkt) ":" (alist-ref 'port srv-pkt))) (define (server-ready? host port key) ;; server-address is host:port | < < < < < | 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 | all-pkt-files))) (define (server-address srv-pkt) (conc (alist-ref 'host srv-pkt) ":" (alist-ref 'port srv-pkt))) (define (server-ready? host port key) ;; server-address is host:port (let* ((data (sexpr->string `((cmd . ping) (key . ,key) (params . ())))) (res (open-send-receive-nn (conc host ":" port) data))) (string->sexpr res))) ;; (let ((res (with-input-from-port i |
︙ | ︙ | |||
2030 2031 2032 2033 2034 2035 2036 | ;;====================================================================== ;; END NEW SERVER METHOD ;;====================================================================== ;; if .db/main.db check the pkts ;; | | | 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 | ;;====================================================================== ;; END NEW SERVER METHOD ;;====================================================================== ;; if .db/main.db check the pkts ;; (define (rmt:wait-for-server pkts-dir db-file server-key) (let* ((sdat *server-info*)) (let loop ((start-time (current-seconds)) (changed #t) (last-sdat "not this")) (begin ;; let ((sdat #f)) (thread-sleep! 0.01) (debug:print-info 0 *default-log-port* "Waiting for server alive signature") |
︙ | ︙ | |||
2079 2080 2081 2082 2083 2084 2085 | (debug:print-info 0 *default-log-port* "Attempting to remove bogus pkt file "pktfile) (delete-file* pktfile))))) ;; remove immediately instead of waiting for on-exit (debug:print 0 *default-log-port* "best-srv-key: "best-srv-key", server-key: "server-key", i-am-srv: "i-am-srv) ;; am I the best-srv, compare server-keys to know (if i-am-srv (if (get-lock-db sdat db-file (servdat-port sdat)) ;; (db:get-iam-server-lock *dbstruct-db* *toppath* run-id) (begin | | | | > > > | 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 | (debug:print-info 0 *default-log-port* "Attempting to remove bogus pkt file "pktfile) (delete-file* pktfile))))) ;; remove immediately instead of waiting for on-exit (debug:print 0 *default-log-port* "best-srv-key: "best-srv-key", server-key: "server-key", i-am-srv: "i-am-srv) ;; am I the best-srv, compare server-keys to know (if i-am-srv (if (get-lock-db sdat db-file (servdat-port sdat)) ;; (db:get-iam-server-lock *dbstruct-db* *toppath* run-id) (begin (debug:print-info 0 *default-log-port* "I'm the server!") (servdat-dbfile-set! sdat db-file) (servdat-status-set! sdat 'db-locked)) (begin (debug:print-info 0 *default-log-port* "I'm not the server, exiting.") (bdat-time-to-exit-set! *bdat* #t) (delete-pkt) (thread-sleep! 0.2) (exit))) (begin (debug:print-info 0 *default-log-port* "Keys do not match "best-srv-key", "server-key", exiting.") (bdat-time-to-exit-set! *bdat* #t) (delete-pkt) (thread-sleep! 0.2) (exit))) sdat)) (begin ;; sdat not yet contains server info (debug:print-info 0 *default-log-port* "Still waiting, last-sdat=" last-sdat) (sleep 4) (if (> (- (current-seconds) start-time) 120) ;; been waiting for two minutes (begin (debug:print-error 0 *default-log-port* "transport appears to have died, exiting server") (exit)) (loop start-time (equal? sdat last-sdat) sdat)))))))) (define (rmt:register-server remdat apath iface port server-key dbname) (remotedat-conns remdat) ;; just checking types (rmt:open-main-connection remdat apath) ;; we need a channel to main.db (rmt:send-receive-real remdat apath ;; params: host port servkey pid ipaddr dbpath (db:run-id->dbname #f) 'register-server `(,iface ,port ,server-key ,(current-process-id) ,iface ,apath ,dbname))) (define (rmt:get-count-servers remdat apath) (remotedat-conns remdat) ;; just checking types (rmt:open-main-connection remdat apath) ;; we need a channel to main.db (rmt:send-receive-real remdat apath ;; params: host port servkey pid ipaddr dbpath (db:run-id->dbname #f) 'get-count-servers `(,apath ))) (define (rmt:deregister-server remdat apath iface port server-key dbname) (remotedat-conns remdat) ;; just checking types (rmt:open-main-connection remdat apath) ;; we need a channel to main.db (rmt:send-receive-real remdat apath ;; params: host port servkey pid ipaddr dbpath (db:run-id->dbname #f) 'deregister-server `(,iface ,port ,server-key ,(current-process-id) |
︙ | ︙ | |||
2186 2187 2188 2189 2190 2191 2192 | ;; (define (rmt:keep-running dbname) ;; if none running or if > 20 seconds since ;; server last used then start shutdown ;; This thread waits for the server to come alive (debug:print-info 0 *default-log-port* "Starting the sync-back, keep alive thread in server") | > | | 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 | ;; (define (rmt:keep-running dbname) ;; if none running or if > 20 seconds since ;; server last used then start shutdown ;; This thread waits for the server to come alive (debug:print-info 0 *default-log-port* "Starting the sync-back, keep alive thread in server") (let* ((remdat *remotedat*) (server-start-time (current-seconds)) (pkts-dir (get-pkts-dir)) (server-key (rmt:get-signature)) ;; This servers key (is-main (equal? (args:get-arg "-db") ".db/main.db")) (last-access 0) (server-timeout (server:expiration-timeout))) ;; main and run db servers have both got wait logic (could/should merge it) (if is-main |
︙ | ︙ | |||
2216 2217 2218 2219 2220 2221 2222 | (if (not *dbstruct-db*) ;; no db opened yet, open the db and register with main if appropriate (let ((watchdog (bdat-watchdog *bdat*))) (debug:print 0 *default-log-port* "SERVER: dbprep") (db:setup dbname) ;; sets *dbstruct-db* as side effect (servdat-status-set! *server-info* 'db-opened) ;; IFF I'm not main, call into main and register self (if (not is-main) | | | | 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 | (if (not *dbstruct-db*) ;; no db opened yet, open the db and register with main if appropriate (let ((watchdog (bdat-watchdog *bdat*))) (debug:print 0 *default-log-port* "SERVER: dbprep") (db:setup dbname) ;; sets *dbstruct-db* as side effect (servdat-status-set! *server-info* 'db-opened) ;; IFF I'm not main, call into main and register self (if (not is-main) (let ((res (rmt:register-server remdat *toppath* iface port server-key dbname))) (if res ;; we are the server (servdat-status-set! *server-info* 'have-interface-and-db) (let* ((serv-info (rmt:get-server-info *toppath* dbname))) (match serv-info ((host port servkey pid ipaddr apath dbpath) (if (not (server-ready? host port servkey)) (begin (debug:print-info 0 *default-log-port* "Server registered but not alive. Removing and trying again.") (rmt:deregister-server remdat apath host port dbpath) ;; servkey pid ipaddr apath dbpath) (loop (+ count 1) bad-sync-count start-time)))) (else (debug:print 0 *default-log-port* "We are not the server for "dbname", exiting. Server info is: "serv-info) (exit))))))) (debug:print 0 *default-log-port* "SERVER: running, db "dbname" opened, megatest version: " (common:get-full-version)) |
︙ | ︙ | |||
2281 2282 2283 2284 2285 2286 2287 | (db:print-current-query-stats))) (let* ((hrs-since-start (/ (- (current-seconds) server-start-time) 3600))) (cond ((and *server-run* (> (+ last-access server-timeout) (current-seconds)) (if is-main | | | 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 | (db:print-current-query-stats))) (let* ((hrs-since-start (/ (- (current-seconds) server-start-time) 3600))) (cond ((and *server-run* (> (+ last-access server-timeout) (current-seconds)) (if is-main (> (rmt:get-count-servers remdat *toppath*) 1) #t)) (if (common:low-noise-print 120 "server continuing") (debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access))) (loop 0 bad-sync-count (current-milliseconds))) (else (set! *unclean-shutdown* #f) (debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access)) |
︙ | ︙ | |||
2376 2377 2378 2379 2380 2381 2382 | (print "ERROR: Failed to start server \"" emsg "\"") (exit 1)) (nng-dial #;nn-bind rep (conc "tcp://*:" portnum))) rep)) (define (open-nn-connection host-port) | | | 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 | (print "ERROR: Failed to start server \"" emsg "\"") (exit 1)) (nng-dial #;nn-bind rep (conc "tcp://*:" portnum))) rep)) (define (open-nn-connection host-port) (let ((req (make-req-socket)) (uri (conc "tcp://" host-port))) (socket-set! req 'nng/recvtimeo 2000) (nng-dial req uri) req)) (define (send-receive-nn req msg) (nng-send req msg) |
︙ | ︙ | |||
2425 2426 2427 2428 2429 2430 2431 | ;; (thread-terminate! th1)) ;; "timer thread"))) ;; (thread-start! th1) ;; (thread-start! th2) ;; (thread-join! th1) ;; res)))) ;; | | | | | < < < | | | | | | | < | < < < | | | | | | | | | | | | | | | 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 | ;; (thread-terminate! th1)) ;; "timer thread"))) ;; (thread-start! th1) ;; (thread-start! th2) ;; (thread-join! th1) ;; res)))) ;; (define (open-send-receive-nn host-port msg #!key (timeout 3) ) ;; default timeout is 3 seconds (let ((req (make-req-socket)) (uri (conc "tcp://" host-port)) (res #f)) (handle-exceptions exn (let ((emsg ((condition-property-accessor 'exn 'message) exn))) ;; Send notification (debug:print 0 *default-log-port* "ERROR: Failed to connect / send to " uri " message was \"" emsg "\", exn=" exn) #f) (nng-dial req uri) (nng-send req msg) (let* ((th1 (make-thread (lambda () (let ((resp (nng-recv req))) (nng-close! req) ;; (print resp) (set! res resp))) "recv thread")) (th2 (make-thread (lambda () (thread-sleep! timeout) (thread-terminate! th1)) "timer thread"))) (thread-start! th1) (thread-start! th2) (thread-join! th1) res)))) ;;====================================================================== ;; S E R V E R U T I L I T I E S ;;====================================================================== ;; run ping in separate process, safest way in some cases ;; |
︙ | ︙ |