Overview
Comment: | grafted rpc code |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | v1.62-rpc |
Files: | files | file ages | folders |
SHA1: |
59b73e1fc2a22f73f027649261f13d84 |
User & Date: | bjbarcla on 2016-12-02 14:35:20 |
Other Links: | branch diff | manifest | tags |
Context
2016-12-02
| ||
14:39 | grafted rpc code check-in: cc303d5f43 user: bjbarcla tags: v1.62-rpc | |
14:35 | grafted rpc code check-in: 59b73e1fc2 user: bjbarcla tags: v1.62-rpc | |
12:46 | ketchup check-in: c33db3cfa5 user: bjbarcla tags: v1.62-rpc | |
Changes
Modified client.scm from [50265f350f] to [2b55a1807a].
︙ | ︙ | |||
44 45 46 47 48 49 50 | ok)) (define (client:connect iface port) (case (server:get-transport) ((rpc) (rpc:client-connect iface port)) ((http) (http:client-connect iface port)) ((zmq) (zmq:client-connect iface port)) | | > > > > | | 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 | ok)) (define (client:connect iface port) (case (server:get-transport) ((rpc) (rpc:client-connect iface port)) ((http) (http:client-connect iface port)) ((zmq) (zmq:client-connect iface port)) (else (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (5)") (exit)))) (define (client:setup run-id #!key (remaining-tries 10) (failed-connects 0)) (case (server:get-transport) ((rpc) (rpc-transport:client-setup run-id remaining-tries: remaining-tries failed-connects: failed-connects)) ;;(client:setup-rpc run-id)) ((http)(client:setup-http run-id remaining-tries: remaining-tries failed-connects: failed-connects)) (else (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (6)") (exit)))) ;; (client:setup-rpc run-id)))) ;; (define (client:login-no-auto-setup server-info run-id) ;; (case (server:get-transport) ;; ((rpc) (rpc:login-no-auto-client-setup server-info run-id)) ;; ((http) (rmt:login-no-auto-client-setup server-info run-id)) ;; (else (rpc:login-no-auto-client-setup server-info run-id)))) ;; |
︙ | ︙ | |||
163 164 165 166 167 168 169 | (exit 1)) (let* ((server-dat (tasks:get-server (db:delay-if-busy tdbdat) run-id))) (debug:print-info 4 *default-log-port* "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries) (if server-dat (let* ((iface (tasks:hostinfo-get-interface server-dat)) (hostname (tasks:hostinfo-get-hostname server-dat)) (port (tasks:hostinfo-get-port server-dat)) | < | < < < | < < < < < < | 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 | (exit 1)) (let* ((server-dat (tasks:get-server (db:delay-if-busy tdbdat) run-id))) (debug:print-info 4 *default-log-port* "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries) (if server-dat (let* ((iface (tasks:hostinfo-get-interface server-dat)) (hostname (tasks:hostinfo-get-hostname server-dat)) (port (tasks:hostinfo-get-port server-dat)) (start-res (http-transport:client-connect iface port)) (ping-res (rmt:login-no-auto-client-setup start-res))) (if (and start-res ping-res) (begin (remote-conndat-set! *runremote* start-res) ;; (hash-table-set! *runremote* run-id start-res) (debug:print-info 2 *default-log-port* "connected to " (http-transport:server-dat-make-url start-res)) start-res) (begin ;; login failed but have a server record, clean out the record and try again |
︙ | ︙ | |||
207 208 209 210 211 212 213 | (let ((num-available (tasks:num-in-available-state (db:dbdat-get-db tdbdat) run-id))) (debug:print-info 0 *default-log-port* "client:setup, no server registered, remaining-tries=" remaining-tries " num-available=" num-available) (if (< num-available 2) (server:try-running run-id)) (thread-sleep! (+ 5 (random (- 20 remaining-tries)))) ;; give server a little time to start up, randomize a little to avoid start storms. (client:setup run-id remaining-tries: (- remaining-tries 1))))))))) | | | 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 | (let ((num-available (tasks:num-in-available-state (db:dbdat-get-db tdbdat) run-id))) (debug:print-info 0 *default-log-port* "client:setup, no server registered, remaining-tries=" remaining-tries " num-available=" num-available) (if (< num-available 2) (server:try-running run-id)) (thread-sleep! (+ 5 (random (- 20 remaining-tries)))) ;; give server a little time to start up, randomize a little to avoid start storms. (client:setup run-id remaining-tries: (- remaining-tries 1))))))))) ;; keep this as a function to ease future ;; this is unused, not porting for rpc -BB (define (client:start run-id server-info) (http-transport:client-connect (tasks:hostinfo-get-interface server-info) (tasks:hostinfo-get-port server-info))) ;; ;; client:signal-handler ;; (define (client:signal-handler signum) ;; (signal-mask! signum) |
︙ | ︙ |
Modified rmt.scm from [3618123224] to [7979b112a4].
︙ | ︙ | |||
11 12 13 14 15 16 17 18 19 20 21 22 23 24 | ;; (use format typed-records) ;; RADT => purpose of json format?? (declare (unit rmt)) (declare (uses api)) (declare (uses tdb)) (declare (uses http-transport)) ;;(declare (uses nmsg-transport)) (include "common_records.scm") ;; ;; THESE ARE ALL CALLED ON THE CLIENT SIDE!!! ;; | > | 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | ;; (use format typed-records) ;; RADT => purpose of json format?? (declare (unit rmt)) (declare (uses api)) (declare (uses tdb)) (declare (uses http-transport)) (declare (uses rpc-transport)) ;;(declare (uses nmsg-transport)) (include "common_records.scm") ;; ;; THESE ARE ALL CALLED ON THE CLIENT SIDE!!! ;; |
︙ | ︙ | |||
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 | (print "case 9") (let* ((conninfo (remote-conndat *runremote*)) (dat (case (remote-transport *runremote*) ((http) (condition-case ;; handling here has caused a lot of problems. However it is needed to deal with attemtped communication to servers that have gone away (http-transport:client-api-send-receive 0 conninfo cmd params) ((commfail)(vector #f "communications fail")) ((exn)(vector #f "other fail" (print-call-chain))))) (else (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported") (exit)))) (success (if (vector? dat) (vector-ref dat 0) #f)) (res (if (vector? dat) (vector-ref dat 1) #f))) (if (vector? conninfo)(http-transport:server-dat-update-last-access conninfo)) ;; refresh access time (print "case 9. conninfo=" conninfo " dat=" dat) (if success (case (remote-transport *runremote*) | > > > > | | 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 | (print "case 9") (let* ((conninfo (remote-conndat *runremote*)) (dat (case (remote-transport *runremote*) ((http) (condition-case ;; handling here has caused a lot of problems. However it is needed to deal with attemtped communication to servers that have gone away (http-transport:client-api-send-receive 0 conninfo cmd params) ((commfail)(vector #f "communications fail")) ((exn)(vector #f "other fail" (print-call-chain))))) ((rpc) (condition-case ;; handling here has caused a lot of problems. However it is needed to deal with attemtped communication to servers that have gone away (rpc-transport:client-api-send-receive 0 conninfo cmd params) ((commfail)(vector #f "communications fail")) ((exn)(vector #f "other fail" (print-call-chain))))) (else (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported") (exit)))) (success (if (vector? dat) (vector-ref dat 0) #f)) (res (if (vector? dat) (vector-ref dat 1) #f))) (if (vector? conninfo)(http-transport:server-dat-update-last-access conninfo)) ;; refresh access time (print "case 9. conninfo=" conninfo " dat=" dat) (if success (case (remote-transport *runremote*) ((http rpc) res) (else (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " is unknown") (exit 1))) (begin (debug:print 0 *default-log-port* "WARNING: communication failed. Trying again, try num: " attemptnum) (remote-conndat-set! *runremote* #f) (remote-server-url-set! *runremote* #f) |
︙ | ︙ | |||
281 282 283 284 285 286 287 | res)) (define (rmt:send-receive-no-auto-client-setup connection-info cmd run-id params) (let* ((run-id (if run-id run-id 0)) (res (handle-exceptions exn #f | > | > > > > > > | 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 | res)) (define (rmt:send-receive-no-auto-client-setup connection-info cmd run-id params) (let* ((run-id (if run-id run-id 0)) (res (handle-exceptions exn #f (case (remote-transport *runremote*) ((http) (http-transport:client-api-send-receive run-id connection-info cmd params)) ((rpc) (rpc-transport:client-api-send-receive run-id connection-info cmd params)) (else (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (2)") (exit)) ))) (if (and res (vector-ref res 0)) (vector-ref res 1) ;;; YES!! THIS IS CORRECT!! CHANGE IT HERE, THEN CHANGE rmt:send-receive ALSO!!! #f))) ;; ;; Wrap json library for strings (why the ports crap in the first place?) ;; (define (rmt:dat->json-str dat) ;; (with-output-to-string |
︙ | ︙ | |||
325 326 327 328 329 330 331 | (rmt:send-receive 'login run-id (list *toppath* megatest-version *my-client-signature*))) ;; This login does no retries under the hood - it acts a bit like a ping. ;; Deprecated for nmsg-transport. ;; (define (rmt:login-no-auto-client-setup connection-info) (case *transport-type* ;; run-id of 0 is just a placeholder | | > > > > > | 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 | (rmt:send-receive 'login run-id (list *toppath* megatest-version *my-client-signature*))) ;; This login does no retries under the hood - it acts a bit like a ping. ;; Deprecated for nmsg-transport. ;; (define (rmt:login-no-auto-client-setup connection-info) (case *transport-type* ;; run-id of 0 is just a placeholder ((http rpc)(rmt:send-receive-no-auto-client-setup connection-info 'login 0 (list *toppath* megatest-version *my-client-signature*))) (else (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (3)") (exit)) ;;((nmsg)(nmsg-transport:client-api-send-receive run-id connection-info 'login (list *toppath* megatest-version run-id *my-client-signature*))) )) ;; hand off a call to one of the db:queries statements ;; added run-id to make looking up the correct db possible ;; (define (rmt:general-call stmtname run-id . params) |
︙ | ︙ |
Modified rpc-transport.scm from [1a3df6f92b] to [261c47f7d8].
︙ | ︙ | |||
149 150 151 152 153 154 155 | (if chatty (print " + last try failed with exception- return canned failure value >"failure-value"<")) failure-value)))))))) (define (rpc-transport:server-shutdown server-id rpc:listener #!key (from-on-exit #f)) (on-exit (lambda () #t)) ;; turn off on-exit stuff ;;(tcp-close rpc:listener) ;; gotta exit nicely | | > > | > > | 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 | (if chatty (print " + last try failed with exception- return canned failure value >"failure-value"<")) failure-value)))))))) (define (rpc-transport:server-shutdown server-id rpc:listener #!key (from-on-exit #f)) (on-exit (lambda () #t)) ;; turn off on-exit stuff ;;(tcp-close rpc:listener) ;; gotta exit nicely ;;(tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "stopped") ;; TODO: (low) the following is extraordinaritly slow. Maybe we don't even need portlogger for rpc anyway?? the exception-based failover when ports are taken is fast! ;;(portlogger:open-run-close portlogger:set-port (rpc:default-server-port) "released") (set! *time-to-exit* #t) (if *inmemdb* (db:sync-touched *inmemdb* *run-id* force-sync: #t)) (tasks:server-delete-record (db:delay-if-busy (tasks:open-db)) server-id " rpc-transport:keep-running complete") ;;(BB> "Before (exit) (from-on-exit="from-on-exit")") (unless from-on-exit (exit)) ;; sometimes we hang (around) here with 100% cpu. ;;(BB> "After") ;; strace reveals endless: ;; getrusage(RUSAGE_SELF, {ru_utime={413, 917868}, ru_stime={0, 60003}, ...}) = 0 ;; getrusage(RUSAGE_SELF, {ru_utime={414, 9874}, ru_stime={0, 60003}, ...}) = 0 ;; getrusage(RUSAGE_SELF, {ru_utime={414, 13874}, ru_stime={0, 60003}, ...}) = 0 |
︙ | ︙ | |||
202 203 204 205 206 207 208 | (when (server:check-if-running run-id) (debug:print 0 *default-log-port* "INFO: Server for run-id " run-id " already running") (exit 0)) ;; let's get a server-id for this server ;; if at first we do not suceed, try 3 more times. (let ((server-id (retry-thunk | | | | 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 | (when (server:check-if-running run-id) (debug:print 0 *default-log-port* "INFO: Server for run-id " run-id " already running") (exit 0)) ;; let's get a server-id for this server ;; if at first we do not suceed, try 3 more times. (let ((server-id (retry-thunk (lambda () (tasks:server-lock-slot (db:delay-if-busy (tasks:open-db)) run-id 'rpc)) chatty: #f retries: 4))) (when (not server-id) ;; dang we couldn't get a server-id. ;; since we didn't get the server lock we are going to clean up and bail out (debug:print-info 2 *default-log-port* "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue") (tasks:server-delete-records-for-this-pid (db:delay-if-busy (tasks:open-db)) " rpc-transport:launch") (exit 1)) ;; we got a server-id (and a corresponding entry in servers table in globally shared mdb) ;; all systems go. Proceed to setup rpc server. (rpc-transport:run (if (args:get-arg "-server") (args:get-arg "-server") |
︙ | ︙ | |||
391 392 393 394 395 396 397 | ;; BB> TODO: remove portlogger! ;; if rpc found it needed a different port than portlogger provided, keep portlogger in the loop. ;; (when (not (equal? start-port portnum)) ;; (BB> "portlogger proffered "start-port" but rpc grabbed "portnum) ;; (portlogger:open-run-close portlogger:set-port start-port "released") ;; (portlogger:open-run-close portlogger:take-port portnum)) | | | 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 | ;; BB> TODO: remove portlogger! ;; if rpc found it needed a different port than portlogger provided, keep portlogger in the loop. ;; (when (not (equal? start-port portnum)) ;; (BB> "portlogger proffered "start-port" but rpc grabbed "portnum) ;; (portlogger:open-run-close portlogger:set-port start-port "released") ;; (portlogger:open-run-close portlogger:take-port portnum)) (tasks:server-set-interface-port (db:delay-if-busy (tasks:open-db)) server-id ipaddrstr portnum) ;;============================================================ ;; activate thread th1 to attach opened tcp port to rpc server ;;============================================================= (thread-start! th1) (set! db *inmemdb*) |
︙ | ︙ | |||
414 415 416 417 418 419 420 | (debug:print 0 *default-log-port* "Error: rpc listener did not pass self test. Shutting down. On: " host:port) (exit))) (on-exit (lambda () (rpc-transport:server-shutdown server-id rpc:listener from-on-exit: #t))) ;; check again for running servers for this run-id in case one has snuck in since we checked last in rpc-transport:launch | | | | | 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 | (debug:print 0 *default-log-port* "Error: rpc listener did not pass self test. Shutting down. On: " host:port) (exit))) (on-exit (lambda () (rpc-transport:server-shutdown server-id rpc:listener from-on-exit: #t))) ;; check again for running servers for this run-id in case one has snuck in since we checked last in rpc-transport:launch (if (not (equal? server-id (tasks:server-am-i-the-server? (db:delay-if-busy (tasks:open-db)) run-id)));; try to ensure no double registering of servers (begin ;; i am not the server, another server snuck in and beat this one to the punch (tcp-close rpc:listener) ;; gotta exit nicely and free up that tcp port (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "collision")) (begin ;; i am the server ;; setup the in-memory db (set! *inmemdb* (db:setup run-id)) (db:get-db *inmemdb* run-id) ;; let's make it official (set! *rpc:listener* rpc:listener) (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "running") ;; update our mdb servers entry ;; this let loop will hold open this thread until we want the server to shut down. ;; if no requests received within the last 20 seconds : ;; database hasnt changed in ?? ;; |
︙ | ︙ | |||
508 509 510 511 512 513 514 | (begin (if (common:low-noise-print 120 "server continuing") (debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access))) ;; ;; Consider implementing some smarts here to re-insert the record or kill self is ;; the db indicates so ;; | | | | 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 | (begin (if (common:low-noise-print 120 "server continuing") (debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access))) ;; ;; Consider implementing some smarts here to re-insert the record or kill self is ;; the db indicates so ;; (if (tasks:server-am-i-the-server? (db:delay-if-busy (tasks:open-db)) run-id) (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "running")) ;; (loop 0 bad-sync-count)) (begin ;;(BB> "SERVER SHUTDOWN CALLED! last-access="last-access" current-seconds="(current-seconds)" server-timeout="server-timeout) (rpc-transport:server-shutdown server-id rpc:listener))))) ;; end new loop )))) |
︙ | ︙ | |||
585 586 587 588 589 590 591 | (begin (debug:print-info 0 *default-log-port* "rpc-transport:client-setup CONNECTION ESTABLISHED run-id="run-id" server-dat=" server-dat) (rmt:set-cinfo run-id runremote-server-dat) ;; (hash-table-set! *runremote* run-id runremote-server-dat) ;; side-effect - *runremote* cache init fpr rmt:* runremote-server-dat) (begin ;; login failed but have a server record, clean out the record and try again (debug:print-info 0 *default-log-port* "rpc-transport:client-setup UNABLE TO CONNECT run-id="run-id" server-dat=" server-dat) (tasks:kill-server-run-id run-id) | | | 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 | (begin (debug:print-info 0 *default-log-port* "rpc-transport:client-setup CONNECTION ESTABLISHED run-id="run-id" server-dat=" server-dat) (rmt:set-cinfo run-id runremote-server-dat) ;; (hash-table-set! *runremote* run-id runremote-server-dat) ;; side-effect - *runremote* cache init fpr rmt:* runremote-server-dat) (begin ;; login failed but have a server record, clean out the record and try again (debug:print-info 0 *default-log-port* "rpc-transport:client-setup UNABLE TO CONNECT run-id="run-id" server-dat=" server-dat) (tasks:kill-server-run-id run-id) (tasks:server-force-clean-run-record (db:delay-if-busy (tasks:open-db)) run-id iface port " rpc-transport:client-setup (server-dat = #t)") (if (> remtries 2) (thread-sleep! (+ 1 (random 5))) ;; spread out the starts a little (thread-sleep! (+ 15 (random 20)))) ;; it isn't going well. give it plenty of time (server:try-running run-id) (thread-sleep! 5) ;; give server a little time to start up (client:setup run-id remaining-tries: (sub1 remtries)))))) |
Modified server.scm from [9384560fe7] to [43afd79d02].
︙ | ︙ | |||
224 225 226 227 228 229 230 | ;; no longer care if multiple servers are started by accident. older servers will drop off in time. ;; (define (server:check-if-running areapath) (let* ((dotserver (server:read-dotserver areapath))) ;; tdbdat (tasks:open-db))) (if dotserver (let* ((res (case *transport-type* | | | 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 | ;; no longer care if multiple servers are started by accident. older servers will drop off in time. ;; (define (server:check-if-running areapath) (let* ((dotserver (server:read-dotserver areapath))) ;; tdbdat (tasks:open-db))) (if dotserver (let* ((res (case *transport-type* ((http rpc)(server:ping-server dotserver)) ;; ((nmsg)(nmsg-transport:ping (tasks:hostinfo-get-interface server) ))) (if res dotserver #f)) #f))) |
︙ | ︙ | |||
257 258 259 260 261 262 263 | ;; (print "host-port=" host-port) (if (not host-port) (begin (debug:print 0 *default-log-port* "ERROR: bad host:port") (if do-exit (exit 1))) (let* ((iface (car host-port)) (port (cadr host-port)) | > > | > > > > | 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 | ;; (print "host-port=" host-port) (if (not host-port) (begin (debug:print 0 *default-log-port* "ERROR: bad host:port") (if do-exit (exit 1))) (let* ((iface (car host-port)) (port (cadr host-port)) (server-dat (case (remote-transport *runremote*) ((http) (http-transport:client-connect iface port)) ((rpc) (rpc-transport:client-connect iface port)) (else (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (4)") (exit)))) (login-res (rmt:login-no-auto-client-setup server-dat))) (if (and (list? login-res) (car login-res)) (begin (print "LOGIN_OK") (if do-exit (exit 0))) (begin |
︙ | ︙ |