Overview
Comment: | Corrected part of broken sync |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | try-nanomsg |
Files: | files | file ages | folders |
SHA1: |
befb5004e8aa75e3fe01cbb6e8ebf2d5 |
User & Date: | matt on 2014-11-26 23:49:37 |
Other Links: | branch diff | manifest | tags |
Context
2014-11-28
| ||
16:01 | Switched to using exceptions to pass back client/server communicatioin issues check-in: b8180b058c user: matt tags: try-nanomsg | |
2014-11-26
| ||
23:49 | Corrected part of broken sync check-in: befb5004e8 user: matt tags: try-nanomsg | |
23:09 | nanomsg transport fixed. check-in: 675b89e392 user: matt tags: try-nanomsg | |
Changes
Modified api.scm from [52a89446cf] to [8d21d552d9].
︙ | ︙ | |||
122 123 124 125 126 127 128 | ((login) (apply db:login dbstruct params)) ((general-call) (let ((stmtname (car params)) (run-id (cadr params)) (realparams (cddr params))) (db:with-db dbstruct run-id #t ;; these are all for modifying the db (lambda (db) (db:general-call db stmtname realparams))))) | > | | 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 | ((login) (apply db:login dbstruct params)) ((general-call) (let ((stmtname (car params)) (run-id (cadr params)) (realparams (cddr params))) (db:with-db dbstruct run-id #t ;; these are all for modifying the db (lambda (db) (db:general-call db stmtname realparams))))) ((sync-inmem->db) (let ((run-id (car params))) (db:sync-touched dbstruct run-id force-sync: #t))) ((sdb-qry) (apply sdb:qry params)) ((ping) (current-process-id)) ;; TESTMETA ((testmeta-get-record) (apply db:testmeta-get-record dbstruct params)) ((testmeta-add-record) (apply db:testmeta-add-record dbstruct params)) ((testmeta-update-field) (apply db:testmeta-update-field dbstruct params))))) |
︙ | ︙ |
Modified db.scm from [75e6e604f1] to [3d1cef4541].
︙ | ︙ | |||
323 324 325 326 327 328 329 | (if (or (not (number? mtime)) (not (number? stime)) (> mtime stime) force-sync) (begin (db:delay-if-busy rundb) (db:delay-if-busy olddb) | < | > | 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 | (if (or (not (number? mtime)) (not (number? stime)) (> mtime stime) force-sync) (begin (db:delay-if-busy rundb) (db:delay-if-busy olddb) (dbr:dbstruct-set-stime! dbstruct (current-milliseconds)) (let ((num-synced (db:sync-tables db:sync-tests-only inmem refdb rundb olddb))) (mutex-unlock! *http-mutex*) num-synced) (begin (mutex-unlock! *http-mutex*) 0)))))) (define (db:close-main dbstruct) |
︙ | ︙ |
Modified nmsg-transport.scm from [a65dcd51b0] to [fe6d9123a8].
︙ | ︙ | |||
76 77 78 79 80 81 82 | (tasks:server-set-interface-port (db:delay-if-busy tdbdat) server-id interface start-port) (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "dbprep") (set! *server-info* (list hostn start-port)) ;; probably not needed anymore? currently used by keep-running (thread-sleep! 3) ;; give some margin for queries to complete before switching from file based access to server based access (set! *inmemdb* dbstruct) (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running") (thread-start! (make-thread | | | 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 | (tasks:server-set-interface-port (db:delay-if-busy tdbdat) server-id interface start-port) (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "dbprep") (set! *server-info* (list hostn start-port)) ;; probably not needed anymore? currently used by keep-running (thread-sleep! 3) ;; give some margin for queries to complete before switching from file based access to server based access (set! *inmemdb* dbstruct) (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running") (thread-start! (make-thread (lambda ()(nmsg-transport:keep-running server-id run-id)) "keep running")) (thread-join! server-thread)) (if (> retrynum 0) (begin (debug:print 0 "WARNING: Failed to connect to server (self) on host " hostn ":" start-port ", trying again.") (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id "failed to start, never received server alive signature") (portlogger:open-run-close portlogger:set-failed start-port) |
︙ | ︙ | |||
239 240 241 242 243 244 245 | (thread-join! send-recv) (if success (thread-terminate! timeout))) (vector success result))) ;; run nmsg-transport:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; | | | 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 | (thread-join! send-recv) (if success (thread-terminate! timeout))) (vector success result))) ;; run nmsg-transport:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; (define (nmsg-transport:keep-running server-id run-id) ;; if none running or if > 20 seconds since ;; server last used then start shutdown ;; This thread waits for the server to come alive (let* ((server-info (let loop () (let ((sdat #f)) (mutex-lock! *heartbeat-mutex*) (set! sdat *server-info*) |
︙ | ︙ | |||
276 277 278 279 280 281 282 | (thread-sleep! 4) ;; no need to do this very often ;; NB// sync currently does NOT return queue-length (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1))) ;; (print "Server running, count is " count) (if (< count 1) ;; 3x3 = 9 secs aprox (loop (+ count 1))) | < > | 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 | (thread-sleep! 4) ;; no need to do this very often ;; NB// sync currently does NOT return queue-length (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1))) ;; (print "Server running, count is " count) (if (< count 1) ;; 3x3 = 9 secs aprox (loop (+ count 1))) (mutex-lock! *heartbeat-mutex*) (set! last-access *last-db-access*) (mutex-unlock! *heartbeat-mutex*) (db:sync-touched *inmemdb* run-id force-sync: #t) (if (and *server-run* (> (+ last-access server-timeout) (current-seconds))) (begin (debug:print-info 0 "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) (loop 0)) (begin |
︙ | ︙ |
Modified rmt.scm from [47078702f7] to [dbecc93316].
︙ | ︙ | |||
62 63 64 65 66 67 68 | cinfo ;; NB// can cache the answer for server running for 10 seconds ... ;; ;; (and (not (rmt:write-frequency-over-limit? cmd run-id)) (if (tasks:server-running-or-starting? (db:delay-if-busy (tasks:open-db)) run-id) (client:setup run-id) #f)))) | | | 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 | cinfo ;; NB// can cache the answer for server running for 10 seconds ... ;; ;; (and (not (rmt:write-frequency-over-limit? cmd run-id)) (if (tasks:server-running-or-starting? (db:delay-if-busy (tasks:open-db)) run-id) (client:setup run-id) #f)))) (define (rmt:send-receive cmd rid params #!key (attemptnum 1)) ;; start attemptnum at 1 so the modulo below works as expected ;; clean out old connections (mutex-lock! *db-multi-sync-mutex*) ;; (let ((expire-time (- (current-seconds) 60))) ;; (for-each ;; (lambda (run-id) ;; (let ((connection (hash-table-ref/default *runremote* run-id #f))) ;; (if (and connection |
︙ | ︙ | |||
99 100 101 102 103 104 105 | ((http)(db:string->obj res)) ((nmsg)(vector-ref res 1))) (begin ;; let ((new-connection-info (client:setup run-id))) (debug:print 0 "WARNING: Communication failed, trying call to http-transport:client-api-send-receive again.") ;; (case *transport-type* ;; ((nmsg)(nn-close (http-transport:server-dat-get-socket connection-info)))) (hash-table-delete! *runremote* run-id) ;; don't keep using the same connection | > | | < > | 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 | ((http)(db:string->obj res)) ((nmsg)(vector-ref res 1))) (begin ;; let ((new-connection-info (client:setup run-id))) (debug:print 0 "WARNING: Communication failed, trying call to http-transport:client-api-send-receive again.") ;; (case *transport-type* ;; ((nmsg)(nn-close (http-transport:server-dat-get-socket connection-info)))) (hash-table-delete! *runremote* run-id) ;; don't keep using the same connection (if (eq? (modulo attemptnum 5) 0) (tasks:kill-server-run-id run-id tag: "api-send-receive-failed")) (tasks:start-and-wait-for-server (tasks:open-db) run-id 15) ;; (nmsg-transport:client-api-send-receive run-id connection-info cmd param remtries: (- remtries 1)))))) ;; no longer killing the server in http-transport:client-api-send-receive ;; may kill it here but what are the criteria? ;; start with three calls then kill server ;; (if (eq? attemptnum 3)(tasks:kill-server-run-id run-id)) ;; (thread-sleep! 2) (rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1))))) ;; no connection info? try to start a server (if (and (< attemptnum 15) (tasks:need-server run-id)) (begin (hash-table-delete! *runremote* run-id) (tasks:start-and-wait-for-server (db:delay-if-busy (tasks:open-db)) run-id 10) (client:setup run-id) (thread-sleep! (random 5)) ;; give some time to settle and minimize collison? (rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1))) (begin (debug:print 0 "ERROR: Communication failed!") (exit) ;; (rmt:open-qry-close-locally cmd run-id params)))) |
︙ | ︙ |