Overview
Comment: | did some cleanup |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | rpc-transport |
Files: | files | file ages | folders |
SHA1: |
acd56658eb23e45ad1a5281c15582e45 |
User & Date: | bjbarcla on 2016-11-16 16:06:03 |
Other Links: | branch diff | manifest | tags |
Context
2016-11-16
| ||
16:19 | caught up to v1.62 Closed-Leaf check-in: 4e3d7aed7d user: bjbarcla tags: rpc-transport | |
16:06 | did some cleanup check-in: acd56658eb user: bjbarcla tags: rpc-transport | |
14:45 | deadlock msg check-in: a66741d98b user: bjbarcla tags: rpc-transport | |
Changes
Modified client.scm from [65a69d122a] to [437091e816].
︙ | ︙ | |||
69 70 71 72 73 74 75 | (begin (let ((num-available (tasks:bb-num-in-available-state run-id))) (debug:print-info 0 *default-log-port* "client:setup, no server registered, remaining-tries=" remaining-tries " num-available=" num-available) (if (< num-available 2) (server:try-running run-id)) (thread-sleep! (+ 5 (random (- 20 remaining-tries)))) ;; give server a little time to start up, randomize a little to avoid start storms. (client:setup run-id remaining-tries: (- remaining-tries 1)))))) | | | | 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 | (begin (let ((num-available (tasks:bb-num-in-available-state run-id))) (debug:print-info 0 *default-log-port* "client:setup, no server registered, remaining-tries=" remaining-tries " num-available=" num-available) (if (< num-available 2) (server:try-running run-id)) (thread-sleep! (+ 5 (random (- 20 remaining-tries)))) ;; give server a little time to start up, randomize a little to avoid start storms. (client:setup run-id remaining-tries: (- remaining-tries 1)))))) ((http) (client:setup-http run-id server-dat remaining-tries)) ((rpc) (rpc-transport:client-setup run-id server-dat remtries: remaining-tries)) (else (debug:print-error 0 *default-log-port* "(6) Transport [" transport "] specified for run-id [" run-id "] is not implemented in client:setup. Cannot proceed.") (exit 1))))) ;; client:setup-http |
︙ | ︙ |
Modified common_records.scm from [c5826de33b] to [6bf211fc41].
︙ | ︙ | |||
115 116 117 118 119 120 121 | (let* ((this-loc (vector-ref frame 0)) (this-func (cadr (string-split this-loc " ")))) (if (equal? this-func "BB>") (set! location this-loc)))) stack) (let ((dp-args (append (list 0 *default-log-port* location" " ) in-args))) (apply debug:print dp-args)))) | < < < < < | 115 116 117 118 119 120 121 122 123 124 125 126 127 128 | (let* ((this-loc (vector-ref frame 0)) (this-func (cadr (string-split this-loc " ")))) (if (equal? this-func "BB>") (set! location this-loc)))) stack) (let ((dp-args (append (list 0 *default-log-port* location" " ) in-args))) (apply debug:print dp-args)))) (define (debug:print-error n e . params) ;; normal print (if (debug:debug-mode n) (with-output-to-port (or e (current-error-port)) (lambda () (if *logging* |
︙ | ︙ |
Modified http-transport.scm from [e7487d8749] to [eb8d6f211b].
︙ | ︙ | |||
337 338 339 340 341 342 343 | (conc "http://" (http-transport:server-dat-get-iface vec) ":" (http-transport:server-dat-get-port vec)) #f)) (define (http-transport:server-dat-update-last-access vec) | | | 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 | (conc "http://" (http-transport:server-dat-get-iface vec) ":" (http-transport:server-dat-get-port vec)) #f)) (define (http-transport:server-dat-update-last-access vec) ;;(BB> "entered http-transport:server-dat-update-last-access vec="vec) (if (vector? vec) (vector-set! vec 5 (current-seconds)) (begin (print-call-chain (current-error-port)) (debug:print-error 0 *default-log-port* "call to http-transport:server-dat-update-last-access with non-vector!!")))) ;; |
︙ | ︙ |
Modified megatest.scm from [debfa33c05] to [20fa3aaa0f].
︙ | ︙ | |||
731 732 733 734 735 736 737 | ;;====================================================================== ;; Start the server - can be done in conjunction with -runall or -runtests (one day...) ;; we start the server if not running else start the client thread ;;====================================================================== (if (args:get-arg "-server") | | > | 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 | ;;====================================================================== ;; Start the server - can be done in conjunction with -runall or -runtests (one day...) ;; we start the server if not running else start the client thread ;;====================================================================== (if (args:get-arg "-server") ;; Server? Start up here. ;; (let* ((tl (launch:setup)) (run-id (and (args:get-arg "-run-id") (string->number (args:get-arg "-run-id"))))) (BB> "megatest -server called; starting server") (if run-id (begin (server:launch run-id (->string *transport-type*)) (set! *didsomething* #t)) (debug:print-error 0 *default-log-port* "server requires run-id be specified with -run-id"))) ;; Not a server? This section will decide how to communicate |
︙ | ︙ |
Modified rmt.scm from [8fafeb1080] to [ce1a8dad0d].
︙ | ︙ | |||
124 125 126 127 128 129 130 | (when (eq? (modulo attemptnum 5) 0) (debug:print-error 0 *default-log-port* "rmt:send-receive did not succeed after "(sub1 attemptnum)" tries. Aborting. (cmd="cmd" rid="rid" param="params) (exit 1)) (mutex-lock! *rmt:srmutex*) ;; deadlock is here! ;; expire connections | | | | 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 | (when (eq? (modulo attemptnum 5) 0) (debug:print-error 0 *default-log-port* "rmt:send-receive did not succeed after "(sub1 attemptnum)" tries. Aborting. (cmd="cmd" rid="rid" param="params) (exit 1)) (mutex-lock! *rmt:srmutex*) ;; deadlock is here! ;; expire connections (let ((expire-time (- (current-seconds) (server:get-timeout) 60))) ;; don't forget the 60 second margin (for-each (lambda (run-id) (let ((connection (rmt:get-cinfo run-id))) (if (and (vector? connection) (< (http-transport:server-dat-get-last-access connection) expire-time)) ;; BB> BBTODO: make this generic, not http transport specific. (begin (debug:print-info 0 *default-log-port* "Discarding connection to server for run-id " run-id ", too long between accesses") (hash-table-delete! *runremote* run-id))))) (hash-table-keys *runremote*))) (let* ((run-id (if rid rid 0)) (connection-info (rmt:get-connection-info-start-server-if-none run-id))) ;; the nmsg method does the encoding under the hood (the http method should be changed to do this also) ;;(BB> "in rmt:send-receive; run-id="run-id";;connection-info="connection-info) (if connection-info ;; use the server if have connection info (let* ((transport-type (rmt:run-id->transport-type run-id)) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Here, we make request to remote server ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; |
︙ | ︙ | |||
163 164 165 166 167 168 169 | "] specified for run-id [" run-id "] is not implemented in rmt:send-receive. Cannot proceed." (symbol? transport-type)) (vector #f (conc "transport ["transport-type"] unimplemented")))))) (success (if (vector? dat) (vector-ref dat 0) #f)) (res (if (vector? dat) (vector-ref dat 1) #f))) | | | 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 | "] specified for run-id [" run-id "] is not implemented in rmt:send-receive. Cannot proceed." (symbol? transport-type)) (vector #f (conc "transport ["transport-type"] unimplemented")))))) (success (if (vector? dat) (vector-ref dat 0) #f)) (res (if (vector? dat) (vector-ref dat 1) #f))) ;;(BB> "in rmt:send-receive; transport-type="transport-type" success="success" connection-info="connection-info" res="res " dat="dat) (if (and success (vector? connection-info)) (http-transport:server-dat-update-last-access connection-info)) ;; BB> BBTODO: make this generic, not http transport specific. (if success (begin (mutex-unlock! *rmt:srmutex*) ;; (mutex-unlock! *send-receive-mutex*) (case transport-type |
︙ | ︙ |
Modified rpc-transport.scm from [f8b4c106d1] to [7b17dd43f3].
︙ | ︙ | |||
44 45 46 47 48 49 50 | (let* ( (resdat (api:execute-requests *inmemdb* (vector cmd params))) ;; #( flag result ) (flag (vector-ref resdat 0)) (res (vector-ref resdat 1))) (mutex-lock! *heartbeat-mutex*) (set! *last-db-access* (current-seconds)) ;; bump *last-db-access*; this will renew keep-running thread's lease on life for another (server:get-timeout) seconds | | | 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 | (let* ( (resdat (api:execute-requests *inmemdb* (vector cmd params))) ;; #( flag result ) (flag (vector-ref resdat 0)) (res (vector-ref resdat 1))) (mutex-lock! *heartbeat-mutex*) (set! *last-db-access* (current-seconds)) ;; bump *last-db-access*; this will renew keep-running thread's lease on life for another (server:get-timeout) seconds ;;(BB> "in api-exec; last-db-access updated to "*last-db-access*) (mutex-unlock! *heartbeat-mutex*) res)) ;; (handle-exceptions ;; exn |
︙ | ︙ | |||
158 159 160 161 162 163 164 | ;; TODO: (low) the following is extraordinaritly slow. Maybe we don't even need portlogger for rpc anyway?? the exception-based failover when ports are taken is fast! ;;(portlogger:open-run-close portlogger:set-port (rpc:default-server-port) "released") (set! *time-to-exit* #t) (if *inmemdb* (db:sync-touched *inmemdb* *run-id* force-sync: #t)) (tasks:bb-server-delete-record server-id " rpc-transport:keep-running complete") | | | | 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 | ;; TODO: (low) the following is extraordinaritly slow. Maybe we don't even need portlogger for rpc anyway?? the exception-based failover when ports are taken is fast! ;;(portlogger:open-run-close portlogger:set-port (rpc:default-server-port) "released") (set! *time-to-exit* #t) (if *inmemdb* (db:sync-touched *inmemdb* *run-id* force-sync: #t)) (tasks:bb-server-delete-record server-id " rpc-transport:keep-running complete") ;;(BB> "Before (exit) (from-on-exit="from-on-exit")") (unless from-on-exit (exit)) ;; sometimes we hang (around) here with 100% cpu. ;;(BB> "After") ;; strace reveals endless: ;; getrusage(RUSAGE_SELF, {ru_utime={413, 917868}, ru_stime={0, 60003}, ...}) = 0 ;; getrusage(RUSAGE_SELF, {ru_utime={414, 9874}, ru_stime={0, 60003}, ...}) = 0 ;; getrusage(RUSAGE_SELF, {ru_utime={414, 13874}, ru_stime={0, 60003}, ...}) = 0 ;; getrusage(RUSAGE_SELF, {ru_utime={414, 105880}, ru_stime={0, 60003}, ...}) = 0 ;; getrusage(RUSAGE_SELF, {ru_utime={414, 109880}, ru_stime={0, 60003}, ...}) = 0 ;; getrusage(RUSAGE_SELF, {ru_utime={414, 201886}, ru_stime={0, 60003}, ...}) = 0 |
︙ | ︙ | |||
253 254 255 256 257 258 259 | (hash-table-set! *api-exec-ht* (cons iface port) res) res)))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; this client-side procedure makes rpc call to server and returns result ;; (define (rpc-transport:client-api-send-receive run-id serverdat cmd params #!key (numretries 3)) | | | | | | 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 | (hash-table-set! *api-exec-ht* (cons iface port) res) res)))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; this client-side procedure makes rpc call to server and returns result ;; (define (rpc-transport:client-api-send-receive run-id serverdat cmd params #!key (numretries 3)) ;;(BB> "entered rpc-transport:client-api-send-receive with run-id="run-id " serverdat="serverdat" cmd="cmd" params="params" numretries="numretries) (if (not (vector? serverdat)) (begin (BB> "WHAT?? for run-id="run-id", serverdat="serverdat) (print-call-chain) (exit 1))) (let* ((iface (rpc-transport:server-dat-get-iface serverdat)) (port (rpc-transport:server-dat-get-port serverdat)) (res #f) (api-exec (rpc-transport:get-api-exec iface port)) ;; chached by host/port. may need to clear... (send-receive (lambda () (tcp-buffer-size 0) (set! res (retry-thunk (lambda () (condition-case ;;(vector #t (run-remote cmd params)) (vector 'success (api-exec cmd params)) [x (exn i/o net) (vector 'comms-fail (conc "communications fail ["(->string x)"]") x)] [x () (vector 'other-fail "other fail ["(->string x)"]" x)])) chatty: #f accept-result?: (lambda(x) (and (vector? x) (vector-ref x 0))) retries: 4 back-off-factor: 1.5 random-wait: 0.2 retry-delay: 0.1 final-failure-returns-actual: #t)) ;;(BB> "HEY res="res) res )) (th1 (make-thread send-receive "send-receive")) (time-out-reached #f) (time-out (lambda () (thread-sleep! 45) (set! time-out-reached #t) (thread-terminate! th1) #f)) (th2 (make-thread time-out "time out"))) (thread-start! th1) (thread-start! th2) (thread-join! th1) (thread-terminate! th2) ;;(BB> "alt got res="res) (debug:print-info 11 *default-log-port* "got res=" res) (if (vector? res) (case (vector-ref res 0) ((success) (vector #t (vector-ref res 1))) ((comms-fail) (debug:print 0 *default-log-port* "WARNING: comms failure for rpc request >>"res"<<") ;;(debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn)) |
︙ | ︙ | |||
396 397 398 399 400 401 402 | ;;============================================================= (thread-start! th1) (set! db *inmemdb*) (debug:print 0 *default-log-port* "Server started on " host:port) | | > > | | 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 | ;;============================================================= (thread-start! th1) (set! db *inmemdb*) (debug:print 0 *default-log-port* "Server started on " host:port) ;;(thread-sleep! 5) (if (retry-thunk (lambda () (rpc-transport:self-test run-id ipaddrstr portnum))) (debug:print 0 *default-log-port* "INFO: rpc self test passed!") (begin (debug:print 0 *default-log-port* "Error: rpc listener did not pass self test. Shutting down. On: " host:port) (exit))) (on-exit (lambda () (rpc-transport:server-shutdown server-id rpc:listener from-on-exit: #t))) |
︙ | ︙ | |||
477 478 479 480 481 482 483 | ;; (debug:print-info 0 *default-log-port* "interface changed, refreshing iface and port info") ;; (set! iface (car sdat)) ;; (set! port (cadr sdat)))) ;; Transfer *last-db-access* to last-access to use in checking that we are still alive (mutex-lock! *heartbeat-mutex*) (set! last-access *last-db-access*) | < | 479 480 481 482 483 484 485 486 487 488 489 490 491 492 | ;; (debug:print-info 0 *default-log-port* "interface changed, refreshing iface and port info") ;; (set! iface (car sdat)) ;; (set! port (cadr sdat)))) ;; Transfer *last-db-access* to last-access to use in checking that we are still alive (mutex-lock! *heartbeat-mutex*) (set! last-access *last-db-access*) (mutex-unlock! *heartbeat-mutex*) ;; (debug:print 11 *default-log-port* "last-access=" last-access ", server-timeout=" server-timeout) ;; ;; no_traffic, no running tests, if server 0, no running servers ;; ;; (let ((wait-on-running (configf:lookup *configdat* "server" b"wait-on-running"))) ;; wait on running tasks (if not true then exit on time out) |
︙ | ︙ | |||
507 508 509 510 511 512 513 | ;; the db indicates so ;; (if (tasks:bb-server-am-i-the-server? run-id) (tasks:bb-server-set-state! server-id "running")) ;; (loop 0 bad-sync-count)) (begin | | | 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 | ;; the db indicates so ;; (if (tasks:bb-server-am-i-the-server? run-id) (tasks:bb-server-set-state! server-id "running")) ;; (loop 0 bad-sync-count)) (begin ;;(BB> "SERVER SHUTDOWN CALLED! last-access="last-access" current-seconds="(current-seconds)" server-timeout="server-timeout) (rpc-transport:server-shutdown server-id rpc:listener))))) ;; end new loop )))) (define (rpc-transport:find-free-port-and-open port #!key ) (handle-exceptions |
︙ | ︙ | |||
550 551 552 553 554 555 556 | (tcp-buffer-size 0) ;; gotta do this because http-transport undoes it. (let* ((testing-res ((rpc:procedure 'testing host port))) (login-res ((rpc:procedure 'server:login host port) *toppath*)) (res (and login-res (equal? testing-res "Just testing")))) (if login-res (begin | | | | | | 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 | (tcp-buffer-size 0) ;; gotta do this because http-transport undoes it. (let* ((testing-res ((rpc:procedure 'testing host port))) (login-res ((rpc:procedure 'server:login host port) *toppath*)) (res (and login-res (equal? testing-res "Just testing")))) (if login-res (begin ;;(BB> "Self test PASS. login-res="login-res" testing-res="testing-res" *toppath*="*toppath*) #t) (begin ;;(BB> "Self test fail. login-res="login-res" testing-res="testing-res" *toppath*="*toppath*) #f)) res)) (define (rpc-transport:client-setup run-id server-dat #!key (remtries 10)) ;;(BB> "entered rpc-transport:client-setup with run-id="run-id" and server-dat="server-dat" and retries="remtries) (tcp-buffer-size 0) (debug:print-info 0 *default-log-port* "rpc-transport:client-setup run-id="run-id" server-dat=" server-dat ", remaining-tries=" remtries) (let* ((iface (tasks:hostinfo-get-interface server-dat)) (hostname (tasks:hostinfo-get-hostname server-dat)) (port (tasks:hostinfo-get-port server-dat)) (runremote-server-dat (vector iface port #f #f #f (current-seconds) 'rpc)) ;; http version := (vector iface port api-uri api-url api-req (current-seconds) 'http ) (ping-res (retry-thunk (lambda () ;; make 3 attempts to ping. ((rpc:procedure 'server:login iface port) *toppath*)) chatty: #f retries: 3))) ;; we got here from rmt:get-connection-info on the condition that *runremote* has no entry for run-id... (if ping-res (begin (debug:print-info 0 *default-log-port* "rpc-transport:client-setup CONNECTION ESTABLISHED run-id="run-id" server-dat=" server-dat) (rmt:set-cinfo run-id runremote-server-dat) ;; (hash-table-set! *runremote* run-id runremote-server-dat) ;; side-effect - *runremote* cache init fpr rmt:* runremote-server-dat) |
︙ | ︙ |
Modified server.scm from [6f87686628] to [301314345c].
︙ | ︙ | |||
278 279 280 281 282 283 284 | (define (server:get-timeout) (let ((tmo (configf:lookup *configdat* "server" "timeout"))) (if (and (string? tmo) (string->number tmo)) (* 60 60 (string->number tmo)) ;; (* 3 24 60 60) ;; default to three days | | | 278 279 280 281 282 283 284 285 286 287 288 | (define (server:get-timeout) (let ((tmo (configf:lookup *configdat* "server" "timeout"))) (if (and (string? tmo) (string->number tmo)) (* 60 60 (string->number tmo)) ;; (* 3 24 60 60) ;; default to three days (* 60 3) ;; default to three minutes ;; (* 60 60 25) ;; default to 25 hours ))) |