Overview
Comment: | tested; found bugs; fixed bugs |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | rpc-transport |
Files: | files | file ages | folders |
SHA1: |
1387b44afba9023e01379b0e87f51745 |
User & Date: | bjbarcla on 2016-11-01 02:45:48 |
Other Links: | branch diff | manifest | tags |
Context
2016-11-01
| ||
02:46 | picked nit check-in: e9f1cb9e4b user: bjbarcla tags: rpc-transport | |
02:45 | tested; found bugs; fixed bugs check-in: 1387b44afb user: bjbarcla tags: rpc-transport | |
00:56 | did a little tidying up check-in: febd54396c user: bjbarcla tags: rpc-transport | |
Changes
Modified client.scm from [2c1ce58891] to [69eebee107].
︙ | ︙ | |||
48 49 50 51 52 53 54 | ;; (define (client:connect iface port) ;; (case (server:get-transport) ;; ((rpc) (rpc:client-connect iface port)) ;; ((http) (http:client-connect iface port)) ;; ((zmq) (zmq:client-connect iface port)) ;; (else (rpc:client-connect iface port)))) | | | > | | | | > > > > > | | 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 | ;; (define (client:connect iface port) ;; (case (server:get-transport) ;; ((rpc) (rpc:client-connect iface port)) ;; ((http) (http:client-connect iface port)) ;; ((zmq) (zmq:client-connect iface port)) ;; (else (rpc:client-connect iface port)))) (define (client:setup run-id #!key (remaining-tries 10)) (debug:print-info 2 *default-log-port* "client:setup remaining-tries=" remaining-tries) (let* ((server-dat (tasks:bb-get-server-info run-id)) (transport (if server-dat (string->symbol (tasks:hostinfo-get-transport server-dat)) 'noserver))) ;;(BB> "transport >"transport"< string? transport >"(string? transport)"< symbol? transport >"(symbol? transport)"<") (case transport ((noserver) ;; no server registered (if (<= remaining-tries 0) (begin (debug:print-error 0 *default-log-port* "failed to start or connect to server for run-id " run-id) (exit 1)) (begin (let ((num-available (tasks:bb-num-in-available-state run-id))) (debug:print-info 0 *default-log-port* "client:setup, no server registered, remaining-tries=" remaining-tries " num-available=" num-available) (if (< num-available 2) (server:try-running run-id)) (thread-sleep! (+ 5 (random (- 20 remaining-tries)))) ;; give server a little time to start up, randomize a little to avoid start storms. (client:setup run-id remaining-tries: (- remaining-tries 1)))))) ((http)(client:setup-http run-id server-dat remaining-tries)) ;; ((rpc) (rpc-transport:client-setup run-id)) ;;(client:setup-rpc run-id)) rpc not implemented; want to see a failure here for now. (else (debug:print-error 0 *default-log-port* "Transport [" transport "] specified for run-id [" run-id "] is not implemented in client:setup. Cannot proceed.") (exit 1))))) ;; client:setup-http ;; ;; For http transport, robustly ensure an advertised-running server is actually working and responding, and ;; establish tcp connection to server. For servers marked running but not responding, kill them and clear from mdb ;; (define (client:setup-http run-id server-dat remaining-tries) (let* ((iface (tasks:hostinfo-get-interface server-dat)) (hostname (tasks:hostinfo-get-hostname server-dat)) (port (tasks:hostinfo-get-port server-dat)) (start-res (http-transport:client-connect iface port)) (ping-res (rmt:login-no-auto-client-setup start-res run-id))) (if (and start-res ping-res) (begin (hash-table-set! *runremote* run-id start-res) ;; side-effect - *runremote* cache init fpr rmt:* (debug:print-info 2 *default-log-port* "connected to " (http-transport:server-dat-make-url start-res)) start-res) (begin ;; login failed but have a server record, clean out the record and try again (debug:print-info 0 *default-log-port* "client:setup, login failed, will attempt to start server ... start-res=" start-res ", run-id=" run-id ", server-dat=" server-dat) (http-transport:close-connections run-id) (hash-table-delete! *runremote* run-id) (tasks:kill-server-run-id run-id) |
︙ | ︙ |
Modified http-transport.scm from [36a3ef7f7d] to [f8800e432b].
︙ | ︙ | |||
321 322 323 324 325 326 327 | (define (make-http-transport:server-dat)(make-vector 6)) (define (http-transport:server-dat-get-iface vec) (vector-ref vec 0)) (define (http-transport:server-dat-get-port vec) (vector-ref vec 1)) (define (http-transport:server-dat-get-api-uri vec) (vector-ref vec 2)) (define (http-transport:server-dat-get-api-url vec) (vector-ref vec 3)) (define (http-transport:server-dat-get-api-req vec) (vector-ref vec 4)) (define (http-transport:server-dat-get-last-access vec) (vector-ref vec 5)) | | | 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 | (define (make-http-transport:server-dat)(make-vector 6)) (define (http-transport:server-dat-get-iface vec) (vector-ref vec 0)) (define (http-transport:server-dat-get-port vec) (vector-ref vec 1)) (define (http-transport:server-dat-get-api-uri vec) (vector-ref vec 2)) (define (http-transport:server-dat-get-api-url vec) (vector-ref vec 3)) (define (http-transport:server-dat-get-api-req vec) (vector-ref vec 4)) (define (http-transport:server-dat-get-last-access vec) (vector-ref vec 5)) (define (http-transport:server-dat-get-transport vec) (vector-ref vec 6)) (define (http-transport:server-dat-make-url vec) (if (and (http-transport:server-dat-get-iface vec) (http-transport:server-dat-get-port vec)) (conc "http://" (http-transport:server-dat-get-iface vec) ":" |
︙ | ︙ | |||
346 347 348 349 350 351 352 | ;; ;; connect ;; (define (http-transport:client-connect iface port) (let* ((api-url (conc "http://" iface ":" port "/api")) (api-uri (uri-reference (conc "http://" iface ":" port "/api"))) (api-req (make-request method: 'POST uri: api-uri)) | | | 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 | ;; ;; connect ;; (define (http-transport:client-connect iface port) (let* ((api-url (conc "http://" iface ":" port "/api")) (api-uri (uri-reference (conc "http://" iface ":" port "/api"))) (api-req (make-request method: 'POST uri: api-uri)) (server-dat (vector iface port api-uri api-url api-req (current-seconds) 'http))) server-dat)) ;; run http-transport:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; (define (http-transport:keep-running server-id run-id) ;; if none running or if > 20 seconds since |
︙ | ︙ |
Modified rmt.scm from [ef5e4d35bf] to [f6f47d3907].
︙ | ︙ | |||
96 97 98 99 100 101 102 | ;; (mutex-unlock! *db-multi-sync-mutex*) ;; (mutex-lock! *send-receive-mutex*) (let* ((run-id (if rid rid 0)) (connection-info (rmt:get-connection-info run-id))) ;; the nmsg method does the encoding under the hood (the http method should be changed to do this also) (if connection-info ;; use the server if have connection info | | > < < < | | > > > | 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 | ;; (mutex-unlock! *db-multi-sync-mutex*) ;; (mutex-lock! *send-receive-mutex*) (let* ((run-id (if rid rid 0)) (connection-info (rmt:get-connection-info run-id))) ;; the nmsg method does the encoding under the hood (the http method should be changed to do this also) (if connection-info ;; use the server if have connection info (let* ((transport-type (vector-ref connection-info 6)) (dat (case transport-type ;; BB: replaced *transport-type* global with run-id specific transport-type, item 6 in server-info vector which was populated by *-transport:client-connect with >> (vector iface port api-uri api-url api-req (current-seconds) 'http ) << ((http)(condition-case (http-transport:client-api-send-receive run-id connection-info cmd params) ((commfail)(vector #f "communications fail")) ((exn)(vector #f "other fail")))) ;;((rpc) (rpc-transport:client-api-send-receive run-id connection-info cmd params)) ;; BB: let us error out for now (else (debug:print-error 0 *default-log-port* "Transport [" transport "] specified for run-id [" run-id "] is not implemented in rmt:send-receive. Cannot proceed.") (exit 1)))) (success (if (vector? dat) (vector-ref dat 0) #f)) (res (if (vector? dat) (vector-ref dat 1) #f))) (if (vector? connection-info)(http-transport:server-dat-update-last-access connection-info)) (if success (begin ;; (mutex-unlock! *send-receive-mutex*) (case *transport-type* |
︙ | ︙ |
Modified tasks.scm from [b6453842af] to [c4bf88d4c8].
︙ | ︙ | |||
329 330 331 332 333 334 335 | run-id) (vector header res))) ;; BB> bb opinion - want to push responsibility into api (encapsulation), like waiting if db is busy and finding the db handle in the first place. why should the caller need to be concerned?? If my opinion carries, we'll remove the bb- and make other needful adjustments. (define (bb-mdb-inserter mdb-expecting-proc mdbless-args) (let ((mdb (db:delay-if-busy (tasks:open-db)))) | | | 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 | run-id) (vector header res))) ;; BB> bb opinion - want to push responsibility into api (encapsulation), like waiting if db is busy and finding the db handle in the first place. why should the caller need to be concerned?? If my opinion carries, we'll remove the bb- and make other needful adjustments. (define (bb-mdb-inserter mdb-expecting-proc mdbless-args) (let ((mdb (db:delay-if-busy (tasks:open-db)))) (apply mdb-expecting-proc (cons mdb mdbless-args)))) (define (tasks:bb-get-server-info . args) (bb-mdb-inserter tasks:get-server-info args)) (define (tasks:bb-num-in-available-state . args) (bb-mdb-inserter tasks:num-in-available-state args)) |
︙ | ︙ |