Index: client.scm ================================================================== --- client.scm +++ client.scm @@ -46,17 +46,21 @@ (define (client:connect iface port) (case (server:get-transport) ((rpc) (rpc:client-connect iface port)) ((http) (http:client-connect iface port)) ((zmq) (zmq:client-connect iface port)) - (else (rpc:client-connect iface port)))) + (else + (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (5)") + (exit)))) (define (client:setup run-id #!key (remaining-tries 10) (failed-connects 0)) (case (server:get-transport) ((rpc) (rpc-transport:client-setup run-id remaining-tries: remaining-tries failed-connects: failed-connects)) ;;(client:setup-rpc run-id)) ((http)(client:setup-http run-id remaining-tries: remaining-tries failed-connects: failed-connects)) - (else (rpc-transport:client-setup run-id remaining-tries: remaining-tries failed-connects: failed-connects)))) ;; (client:setup-rpc run-id)))) + (else + (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (6)") + (exit)))) ;; (client:setup-rpc run-id)))) ;; (define (client:login-no-auto-setup server-info run-id) ;; (case (server:get-transport) ;; ((rpc) (rpc:login-no-auto-client-setup server-info run-id)) ;; ((http) (rmt:login-no-auto-client-setup server-info run-id)) @@ -165,22 +169,12 @@ (debug:print-info 4 *default-log-port* "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries) (if server-dat (let* ((iface (tasks:hostinfo-get-interface server-dat)) (hostname (tasks:hostinfo-get-hostname server-dat)) (port (tasks:hostinfo-get-port server-dat)) - (start-res (case *transport-type* - ((http)(http-transport:client-connect iface port)) - ;;((nmsg)(nmsg-transport:client-connect hostname port)) - )) - (ping-res (case *transport-type* - ((http)(rmt:login-no-auto-client-setup start-res)) - ;; ((nmsg)(let ((logininfo (rmt:login-no-auto-client-setup start-res run-id))) - ;; (if logininfo - ;; (car (vector-ref logininfo 1)) - ;; #f))) - - ))) + (start-res (http-transport:client-connect iface port)) + (ping-res (rmt:login-no-auto-client-setup start-res))) (if (and start-res ping-res) (begin (remote-conndat-set! *runremote* start-res) ;; (hash-table-set! *runremote* run-id start-res) (debug:print-info 2 *default-log-port* "connected to " (http-transport:server-dat-make-url start-res)) @@ -209,11 +203,11 @@ (if (< num-available 2) (server:try-running run-id)) (thread-sleep! (+ 5 (random (- 20 remaining-tries)))) ;; give server a little time to start up, randomize a little to avoid start storms. (client:setup run-id remaining-tries: (- remaining-tries 1))))))))) -;; keep this as a function to ease future +;; keep this as a function to ease future ;; this is unused, not porting for rpc -BB (define (client:start run-id server-info) (http-transport:client-connect (tasks:hostinfo-get-interface server-info) (tasks:hostinfo-get-port server-info))) ;; ;; client:signal-handler Index: common.scm ================================================================== --- common.scm +++ common.scm @@ -18,10 +18,12 @@ (import (prefix base64 base64:)) (declare (unit common)) (include "common_records.scm") +(include "thunk-utils.scm") + ;; (require-library margs) ;; (include "margs.scm") ;; (define old-exit exit) @@ -60,13 +62,24 @@ (if (not cxt) (set! cxt (let ((x (make-cxt)))(hash-table-set! *contexts* toppath x) x))) (let ((cxt-mutex (cxt-mutex cxt))) (mutex-unlock! *context-mutex*) (mutex-lock! cxt-mutex) - (let ((res (proc cxt))) - (mutex-unlock! cxt-mutex) - res)))) + ;; here we guard proc with exception handler so + ;; no matter how proc succeeds or fails, + ;; the cxt-mutex will be unlocked afterward. + (let* ((EXCEPTION-SYMBOL (gensym)) ;; use a generated symbol + (guarded-proc ;; to avoid collision + (lambda args + (let* ((res (condition-case + (apply proc args) + [x () (cons EXCEPTION-SYMBOL x)]))) + (mutex-unlock! cxt-mutex) + (if (and (pair? res) (eq? (car res) EXCEPTION)) + (abort (cdr res)) + res))))) + (guarded-proc cxt))))) (define *db-keys* #f) (define *configinfo* #f) ;; raw results from setup, includes toppath and table from megatest.config (define *runconfigdat* #f) ;; run configs data Index: rmt.scm ================================================================== --- rmt.scm +++ rmt.scm @@ -6,17 +6,18 @@ ;; ;; This program is distributed WITHOUT ANY WARRANTY; without even the ;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR ;; PURPOSE. ;;====================================================================== - +;; (use format typed-records) ;; RADT => purpose of json format?? (declare (unit rmt)) (declare (uses api)) (declare (uses tdb)) (declare (uses http-transport)) +(declare (uses rpc-transport)) ;;(declare (uses nmsg-transport)) (include "common_records.scm") ;; ;; THESE ARE ALL CALLED ON THE CLIENT SIDE!!! @@ -141,20 +142,24 @@ (dat (case (remote-transport *runremote*) ((http) (condition-case ;; handling here has caused a lot of problems. However it is needed to deal with attemtped communication to servers that have gone away (http-transport:client-api-send-receive 0 conninfo cmd params) ((commfail)(vector #f "communications fail")) ((exn)(vector #f "other fail" (print-call-chain))))) + ((rpc) (condition-case ;; handling here has caused a lot of problems. However it is needed to deal with attemtped communication to servers that have gone away + (rpc-transport:client-api-send-receive 0 conninfo cmd params) + ((commfail)(vector #f "communications fail")) + ((exn)(vector #f "other fail" (print-call-chain))))) (else (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported") (exit)))) (success (if (vector? dat) (vector-ref dat 0) #f)) (res (if (vector? dat) (vector-ref dat 1) #f))) (if (vector? conninfo)(http-transport:server-dat-update-last-access conninfo)) ;; refresh access time (print "case 9. conninfo=" conninfo " dat=" dat) (if success (case (remote-transport *runremote*) - ((http) res) + ((http rpc) res) (else (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " is unknown") (exit 1))) (begin (debug:print 0 *default-log-port* "WARNING: communication failed. Trying again, try num: " attemptnum) @@ -257,11 +262,18 @@ (define (rmt:send-receive-no-auto-client-setup connection-info cmd run-id params) (let* ((run-id (if run-id run-id 0)) (res (handle-exceptions exn #f - (http-transport:client-api-send-receive run-id connection-info cmd params)))) + (case (remote-transport *runremote*) + ((http) (http-transport:client-api-send-receive run-id connection-info cmd params)) + ((rpc) (rpc-transport:client-api-send-receive run-id connection-info cmd params)) + (else + (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (2)") + (exit)) + + ))) (if (and res (vector-ref res 0)) (vector-ref res 1) ;;; YES!! THIS IS CORRECT!! CHANGE IT HERE, THEN CHANGE rmt:send-receive ALSO!!! #f))) ;; ;; Wrap json library for strings (why the ports crap in the first place?) @@ -301,11 +313,16 @@ ;; This login does no retries under the hood - it acts a bit like a ping. ;; Deprecated for nmsg-transport. ;; (define (rmt:login-no-auto-client-setup connection-info) (case *transport-type* ;; run-id of 0 is just a placeholder - ((http)(rmt:send-receive-no-auto-client-setup connection-info 'login 0 (list *toppath* megatest-version *my-client-signature*))) + ((http rpc)(rmt:send-receive-no-auto-client-setup connection-info 'login 0 (list *toppath* megatest-version *my-client-signature*))) + (else + (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (3)") + (exit)) + + ;;((nmsg)(nmsg-transport:client-api-send-receive run-id connection-info 'login (list *toppath* megatest-version run-id *my-client-signature*))) )) ;; hand off a call to one of the db:queries statements ;; added run-id to make looking up the correct db possible Index: rpc-transport.scm ================================================================== --- rpc-transport.scm +++ rpc-transport.scm @@ -1,7 +1,7 @@ -;; Copyright 2006-2012, Matthew Welland. +;; Copyright 2006-2016, Matthew Welland. ;; ;; This program is made available under the GNU GPL version 2.0 or ;; greater. See the accompanying file COPYING for details. ;; ;; This program is distributed WITHOUT ANY WARRANTY; without even the @@ -21,208 +21,583 @@ (declare (uses tests)) (declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. (include "common_records.scm") (include "db_records.scm") + +(define *heartbeat-mutex* (make-mutex)) +(define *server-loop-heart-beat* (current-seconds)) + ;; procstr is the name of the procedure to be called as a string -(define (rpc-transport:autoremote procstr params) - (handle-exceptions - exn - (begin - (debug:print 1 *default-log-port* "Remote failed for " proc " " params) - (apply (eval (string->symbol procstr)) params)) - ;; (if *runremote* - ;; (apply (eval (string->symbol (conc "remote:" procstr))) params) - (apply (eval (string->symbol procstr)) params))) +(define (rpc-transport:autoremote procstr params) ;; may be unused, I think api-exec deprecates this one. + (let* ((procsym (if (symbol? procstr) + procstr + (string->symbol (->string procstr)))) + (res + (begin + (apply (eval procsym) params)))) + res)) + + +;; rpc receiver +(define (rpc-transport:api-exec cmd params) + (let* ( (resdat (api:execute-requests *inmemdb* (vector cmd params))) ;; #( flag result ) + (flag (vector-ref resdat 0)) + (res (vector-ref resdat 1))) + + (mutex-lock! *heartbeat-mutex*) + + (set! *last-db-access* (current-seconds)) ;; bump *last-db-access*; this will renew keep-running thread's lease on life for another (server:get-timeout) seconds + ;;(BB> "in api-exec; last-db-access updated to "*last-db-access*) + (mutex-unlock! *heartbeat-mutex*) + + res)) + + + ;; (handle-exceptions + ;; exn + ;; (begin + ;; (debug:print 0 *default-log-port* "Remote failed for " proc " " params " exn="exn) + ;; (apply (eval (string->symbol procstr)) params)) + ;; ;; (if *runremote* + ;; ;; (apply (eval (string->symbol (conc "remote:" procstr))) params) + ;; (apply (eval (string->symbol procstr)) params))) + +;; retry an operation (depends on srfi-18) +;; ================== +;; idea here is to avoid spending time on coding retrying something. Trying to be generic here. +;; +;; Exception handling: +;; ------------------- +;; if evaluating the thunk results in exception, it will be retried. +;; on last try, if final-failure-returns-actual is true, the exception will be re-thrown to caller. +;; +;; look at options below #!key to see how to configure behavior +;; +;; +(define (retry-thunk + the-thunk + #!key ;;;; options below + (accept-result? (lambda (x) x)) ;; retry if predicate applied to thunk's result is false + (retries 4) ;; how many tries + (failure-value #f) ;; return this on final failure, unless following option is enabled: + (final-failure-returns-actual #f) ;; on failure, on the last try, just return the result, not failure-value + + (retry-delay 0.1) ;; delay between tries + (back-off-factor 1) ;; multiply retry-delay by this factor on retry + (random-delay 0.1) ;; add a random portion of this value to wait + + (chatty #f) ;; print status as we go, for debugging. + ) + + (when chatty (print) (print "Entered retry-thunk") (print "-=-=-=-=-=-")) + (let* ((guarded-thunk ;; we are guarding the thunk against exceptions. We will record whether result of evaluation is an exception or a regular result. + (lambda () + (let* ((EXCEPTION (gensym)) ;; using gensym to avoid potential collision + (res + (condition-case + (the-thunk) ;; this is what we are guarding the execution of + [x () (cons EXCEPTION x)] + ))) + (cond + ((and (pair? res) (eq? (car res) EXCEPTION)) + (if chatty + (print " - the-thunk threw exception >"(cdr res)"<")) + (cons 'exception (cdr res))) + (else + (if chatty + (print " - the-thunk returned result >"res"<")) + (cons 'regular-result res))))))) + + (let loop ((guarded-res (guarded-thunk)) + (retries-left retries) + (fail-wait retry-delay)) + (if chatty (print " ==========")) + (let* ((wait-time (+ fail-wait (+ (* fail-wait back-off-factor) + (* random-delay + (/ (random 1024) 1024) )))) + (res-type (car guarded-res)) + (res-value (cdr guarded-res))) + (cond + ((and (eq? res-type 'regular-result) (accept-result? res-value)) + (if chatty (print " + return result that satisfied accept-result? >"res-value"<")) + res-value) + + ((> retries-left 0) + (if chatty (print " - sleep "wait-time)) + (thread-sleep! wait-time) + (if chatty (print " + retry ["retries-left" tries left]")) + (loop (guarded-thunk) + (sub1 retries-left) + wait-time)) + + ((eq? res-type 'regular-result) + (if final-failure-returns-actual + (begin + (if chatty (print " + last try failed- return the result >"res-value"<")) + res-value) + (begin + (if chatty (print " + last try failed- return canned failure value >"failure-value"<")) + failure-value))) + + (else ;; no retries left; result was not accepted and res-type can only be 'exception + (if final-failure-returns-actual + (begin + (if chatty (print " + last try failed with exception- re-throw it >"res-value"<")) + (abort res-value)); re-raise the exception. TODO: find a way for call-history to show as though from entry to this function + (begin + (if chatty (print " + last try failed with exception- return canned failure value >"failure-value"<")) + failure-value)))))))) + + +(define (rpc-transport:server-shutdown server-id rpc:listener #!key (from-on-exit #f)) + (on-exit (lambda () #t)) ;; turn off on-exit stuff + ;;(tcp-close rpc:listener) ;; gotta exit nicely + ;;(tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "stopped") + + + ;; TODO: (low) the following is extraordinaritly slow. Maybe we don't even need portlogger for rpc anyway?? the exception-based failover when ports are taken is fast! + ;;(portlogger:open-run-close portlogger:set-port (rpc:default-server-port) "released") + + (set! *time-to-exit* #t) + (if *inmemdb* (db:sync-touched *inmemdb* *run-id* force-sync: #t)) + + + (tasks:server-delete-record (db:delay-if-busy (tasks:open-db)) server-id " rpc-transport:keep-running complete") + + + ;;(BB> "Before (exit) (from-on-exit="from-on-exit")") + (unless from-on-exit (exit)) ;; sometimes we hang (around) here with 100% cpu. + ;;(BB> "After") + ;; strace reveals endless: + ;; getrusage(RUSAGE_SELF, {ru_utime={413, 917868}, ru_stime={0, 60003}, ...}) = 0 + ;; getrusage(RUSAGE_SELF, {ru_utime={414, 9874}, ru_stime={0, 60003}, ...}) = 0 + ;; getrusage(RUSAGE_SELF, {ru_utime={414, 13874}, ru_stime={0, 60003}, ...}) = 0 + ;; getrusage(RUSAGE_SELF, {ru_utime={414, 105880}, ru_stime={0, 60003}, ...}) = 0 + ;; getrusage(RUSAGE_SELF, {ru_utime={414, 109880}, ru_stime={0, 60003}, ...}) = 0 + ;; getrusage(RUSAGE_SELF, {ru_utime={414, 201886}, ru_stime={0, 60003}, ...}) = 0 + ;; getrusage(RUSAGE_SELF, {ru_utime={414, 205886}, ru_stime={0, 60003}, ...}) = 0 + ;; getrusage(RUSAGE_SELF, {ru_utime={414, 297892}, ru_stime={0, 60003}, ...}) = 0 + ;; getrusage(RUSAGE_SELF, {ru_utime={414, 301892}, ru_stime={0, 60003}, ...}) = 0 + ;; getrusage(RUSAGE_SELF, {ru_utime={414, 393898}, ru_stime={0, 60003}, ...}) = 0 + ;; getrusage(RUSAGE_SELF, {ru_utime={414, 397898}, ru_stime={0, 60003}, ...}) = 0 + ;; make a post to chicken-users w/ http://paste.call-cc.org/paste?id=60a4b66a29ccf7d11359ea866db642c970735978 + (if from-on-exit + ;; avoid above condition! End current process externally since 1 in 20 (exit)'s result in hung, 100% cpu zombies. (see above) + (system (conc "kill -9 "(current-process-id)))) + ) + ;; all routes though here end in exit ... ;; ;; start_server? ;; (define (rpc-transport:launch run-id) - (let* ((tdbdat (tasks:open-db))) - (BB> "rpc-transport:launch fired for run-id="run-id) - (set! *run-id* run-id) - (if (args:get-arg "-daemonize") - (daemon:ize)) - (if (server:check-if-running run-id) - (begin - (debug:print 0 *default-log-port* "INFO: Server for run-id " run-id " already running") - (exit 0))) - (let loop ((server-id (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id)) - (remtries 4)) - (if (not server-id) - (if (> remtries 0) - (begin - (thread-sleep! 2) - (loop (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id) - (- remtries 1))) - (begin - ;; since we didn't get the server lock we are going to clean up and bail out - (debug:print-info 2 *default-log-port* "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue") - (tasks:server-delete-records-for-this-pid (db:delay-if-busy tdbdat) " rpc-transport:launch"))) - (begin - (rpc-transport:run (if (args:get-arg "-server")(args:get-arg "-server") "-") run-id server-id) - (exit)))))) + (set! *run-id* run-id) + + ;; ;; send to background if requested + ;; (when (args:get-arg "-daemonize") + ;; (daemon:ize) + ;; (when *alt-log-file* ;; we should re-connect to this port, I think daemon:ize disrupts it + ;; (current-error-port *alt-log-file*) + ;; (current-output-port *alt-log-file*))) + + ;; double check we dont alrady have a running server for this run-id + (when (server:check-if-running run-id) + (debug:print 0 *default-log-port* "INFO: Server for run-id " run-id " already running") + (exit 0)) + + ;; let's get a server-id for this server + ;; if at first we do not suceed, try 3 more times. + (let ((server-id (retry-thunk + (lambda () (tasks:server-lock-slot (db:delay-if-busy (tasks:open-db)) run-id 'rpc)) + chatty: #f + retries: 4))) + (when (not server-id) ;; dang we couldn't get a server-id. + ;; since we didn't get the server lock we are going to clean up and bail out + (debug:print-info 2 *default-log-port* "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue") + (tasks:server-delete-records-for-this-pid (db:delay-if-busy (tasks:open-db)) " rpc-transport:launch") + (exit 1)) + + ;; we got a server-id (and a corresponding entry in servers table in globally shared mdb) + ;; all systems go. Proceed to setup rpc server. + (rpc-transport:run + (if (args:get-arg "-server") + (args:get-arg "-server") + "-") + run-id + server-id) + (exit))) + +(define *rpc-listener-port* #f) +(define *rpc-listener-port-bind-timestamp* #f) + +(define *on-exit-flag #f) + +(define (rpc-transport:server-dat-get-iface vec) (vector-ref vec 0)) +(define (rpc-transport:server-dat-get-port vec) (vector-ref vec 1)) +(define (rpc-transport:server-dat-get-last-access vec) (vector-ref vec 5)) +(define (rpc-transport:server-dat-get-transport vec) (vector-ref vec 6)) +(define (rpc-transport:server-dat-update-last-access vec) + (if (vector? vec) + (vector-set! vec 5 (current-seconds)) + (begin + (print-call-chain (current-error-port)) + (debug:print-error 0 *default-log-port* "call to rpc-transport:server-dat-update-last-access with non-vector!!")))) + + +(define *api-exec-ht* (make-hash-table)) +(define *api-exec-mutex* (make-mutex)) +;; let's see if caching the rpc stub curbs thread-profusion on server side +(define (rpc-transport:get-api-exec iface port) + (mutex-lock! *api-exec-mutex*) + (let* ((lu (hash-table-ref/default *api-exec-ht* (cons iface port) #f))) + (if lu + (begin + (mutex-unlock! *api-exec-mutex*) + lu) + (let ((res (rpc:procedure 'api-exec iface port))) + (hash-table-set! *api-exec-ht* (cons iface port) res) + (mutex-unlock! *api-exec-mutex*) + res)))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; this client-side procedure makes rpc call to server and returns result +;; +(define (rpc-transport:client-api-send-receive run-id serverdat cmd params #!key (numretries 3)) + ;;(BB> "entered rpc-transport:client-api-send-receive with run-id="run-id " serverdat="serverdat" cmd="cmd" params="params" numretries="numretries) + (if (not (vector? serverdat)) + (begin + (BB> "WHAT?? for run-id="run-id", serverdat="serverdat) + (print-call-chain) + (exit 1))) + (let* ((iface (rpc-transport:server-dat-get-iface serverdat)) + (port (rpc-transport:server-dat-get-port serverdat)) + (res #f) + (api-exec (rpc-transport:get-api-exec iface port)) ;; chached by host/port. may need to clear... + (send-receive (lambda () + (tcp-buffer-size 0) + (set! res (retry-thunk + (lambda () + (condition-case + ;;(vector #t (run-remote cmd params)) + (vector 'success (api-exec cmd params)) + [x (exn i/o net) (vector 'comms-fail (conc "communications fail ["(->string x)"]") x)] + [x () (vector 'other-fail "other fail ["(->string x)"]" x)])) + chatty: #f + accept-result?: (lambda(x) + (and (vector? x) (vector-ref x 0))) + retries: 8 + back-off-factor: 1.5 + random-wait: 0.2 + retry-delay: 0.1 + final-failure-returns-actual: #t)) + ;;(BB> "HEY res="res) + res + )) + (th1 (make-thread send-receive "send-receive")) + (time-out-reached #f) + (time-out (lambda () + (thread-sleep! 45) + (set! time-out-reached #t) + (thread-terminate! th1) + #f)) + + (th2 (make-thread time-out "time out"))) + (thread-start! th1) + (thread-start! th2) + (thread-join! th1) + (thread-terminate! th2) + ;;(BB> "alt got res="res) + (debug:print-info 11 *default-log-port* "got res=" res) + (if (vector? res) + (case (vector-ref res 0) + ((success) (vector #t (vector-ref res 1))) + ( + (comms-fail other-fail) + ;;(comms-fail) + (debug:print 0 *default-log-port* "WARNING: comms failure for rpc request >>"res"<<") + ;;(debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn)) + (vector #f (vector-ref res 1))) + (else + (debug:print-error 0 *default-log-port* "error occured at server, info=" (vector-ref res 1)) + (debug:print 0 *default-log-port* " client call chain:") + (print-call-chain (current-error-port)) + (debug:print 0 *default-log-port* " server call chain:") + (pp (vector-ref res 1) (current-error-port)) + (signal (vector-ref res 2)))) + (signal (make-composite-condition + (make-property-condition + 'timeout + 'message "nmsg-transport:client-api-send-receive-raw timed out talking to server")))))) + + (define (rpc-transport:run hostn run-id server-id) (debug:print 2 *default-log-port* "Attempting to start the rpc server ...") ;; (trace rpc:publish-procedure!) - (rpc:publish-procedure! 'server:login server:login) - (rpc:publish-procedure! 'testing (lambda () "Just testing")) + ;;====================================================================== + ;; start of publish-procedure section + ;;====================================================================== + (rpc:publish-procedure! 'server:login server:login) ;; this allows client to validate it is the same megatest instance as the server. No security here, just making sure we're in the right room. + (rpc:publish-procedure! + 'testing + (lambda () + "Just testing")) + + ;; procedure to receive arbitrary API request from client's rpc:send-receive/rpc-transport:client-api-send-receive + (rpc:publish-procedure! 'rpc-transport:autoremote rpc-transport:autoremote) + ;; can use this to run most anything at the remote + (rpc:publish-procedure! 'api-exec rpc-transport:api-exec) + + + ;;====================================================================== + ;; end of publish-procedure section + ;;====================================================================== + (let* ((db #f) - (hostname (get-host-name)) - (ipaddrstr (let ((ipstr (if (string=? "-" hostn) + (hostname (let ((res (get-host-name))) res)) + (server-start-time (current-seconds)) + (server-timeout (server:get-timeout)) + (ipaddrstr (let* ((ipstr (if (string=? "-" hostn) ;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") (server:get-best-guess-address hostname) - #f))) - (if ipstr ipstr hostn))) ;; hostname))) - (start-port (open-run-close tasks:server-get-next-port tasks:open-db)) + #f)) + (res (if ipstr ipstr hostn))) + res)) ;; hostname))) + (start-port (let ((res (portlogger:open-run-close portlogger:find-port))) ;; BB> TODO: remove portlogger! + res)) (link-tree-path (configf:lookup *configdat* "setup" "linktree")) - (rpc:listener (rpc-transport:find-free-port-and-open (rpc:default-server-port))) + + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + ;; rpc:listener is the tcp-listen result from inside the find-free-port-and-open complex. + ;; It is our handle on the listening tcp port + ;; We will attach this to our rpc server with rpc:make-server in thread th1 . + (rpc:listener (rpc-transport:find-free-port-and-open start-port)) (th1 (make-thread (lambda () - ((rpc:make-server rpc:listener) #t)) + ((rpc:make-server rpc:listener) #t) ) "rpc:server")) - ;; (cute (rpc:make-server rpc:listener) "rpc:server") - ;; 'rpc:server)) - (hostname (if (string=? "-" hostn) + + + (hostname (if (string=? "-" hostn) (get-host-name) hostn)) (ipaddrstr (if (string=? "-" hostn) (server:get-best-guess-address hostname) ;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") #f)) - (portnum (rpc:default-server-port)) - (host:port (conc (if ipaddrstr ipaddrstr hostname) ":" portnum)) - (tdb (tasks:open-db))) + (portnum (let ((res (rpc:default-server-port))) res)) + (host:port (conc (if ipaddrstr ipaddrstr hostname) ":" portnum))) + + + ;; BB> TODO: remove portlogger! + ;; if rpc found it needed a different port than portlogger provided, keep portlogger in the loop. + ;; (when (not (equal? start-port portnum)) + ;; (BB> "portlogger proffered "start-port" but rpc grabbed "portnum) + ;; (portlogger:open-run-close portlogger:set-port start-port "released") + ;; (portlogger:open-run-close portlogger:take-port portnum)) + + (tasks:server-set-interface-port (db:delay-if-busy (tasks:open-db)) server-id ipaddrstr portnum) + + ;;============================================================ + ;; activate thread th1 to attach opened tcp port to rpc server + ;;============================================================= (thread-start! th1) - (set! db *dbstruct-db*) - (open-run-close tasks:server-set-interface-port - tasks:open-db - server-id - ipaddrstr portnum) + (set! db *inmemdb*) + (debug:print 0 *default-log-port* "Server started on " host:port) - - ;; (trace rpc:publish-procedure!) - ;; (rpc:publish-procedure! 'server:login server:login) - ;; (rpc:publish-procedure! 'testing (lambda () "Just testing")) - - ;;====================================================================== - ;; ;; end of publish-procedure section - ;;====================================================================== - ;; + + ;; (thread-sleep! 5) + + (if (retry-thunk (lambda () + (rpc-transport:self-test run-id ipaddrstr portnum))) + (debug:print 0 *default-log-port* "INFO: rpc self test passed!") + (begin + (debug:print 0 *default-log-port* "Error: rpc listener did not pass self test. Shutting down. On: " host:port) + (exit))) + (on-exit (lambda () - (open-run-close tasks:server-set-state! tasks:open-db server-id "stopped"))) - - (set! *rpc:listener* rpc:listener) - (tasks:server-set-state! tdb server-id "running") - (set! *dbstruct-db* (db:setup run-id)) - ;; if none running or if > 20 seconds since - ;; server last used then start shutdown - (let loop ((count 0)) - (thread-sleep! 5) ;; no need to do this very often - (let ((numrunning -1)) ;; (db:get-count-tests-running db))) - (if (or (> numrunning 0) - (> (+ *db-last-access* 60)(current-seconds))) - (begin - (debug:print-info 0 *default-log-port* "Server continuing, tests running: " numrunning ", seconds since last db access: " (- (current-seconds) *db-last-access*)) - (loop (+ 1 count))) - (begin - (debug:print-info 0 *default-log-port* "Starting to shutdown the server side") - (open-run-close tasks:server-delete-record tasks:open-db server-id " rpc-transport:try-start-server stop") - (thread-sleep! 10) - (debug:print-info 0 *default-log-port* "Max cached queries was " *max-cache-size*) - (debug:print-info 0 *default-log-port* "Server shutdown complete. Exiting") - )))))) - -(define (rpc-transport:find-free-port-and-open port) + (rpc-transport:server-shutdown server-id rpc:listener from-on-exit: #t))) + + ;; check again for running servers for this run-id in case one has snuck in since we checked last in rpc-transport:launch + (if (not (equal? server-id (tasks:server-am-i-the-server? (db:delay-if-busy (tasks:open-db)) run-id)));; try to ensure no double registering of servers + (begin ;; i am not the server, another server snuck in and beat this one to the punch + (tcp-close rpc:listener) ;; gotta exit nicely and free up that tcp port + (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "collision")) + + (begin ;; i am the server + ;; setup the in-memory db + (set! *inmemdb* (db:setup run-id)) + (db:get-db *inmemdb* run-id) + + ;; let's make it official + (set! *rpc:listener* rpc:listener) + (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "running") ;; update our mdb servers entry + + + + ;; this let loop will hold open this thread until we want the server to shut down. + ;; if no requests received within the last 20 seconds : + ;; database hasnt changed in ?? + ;; + + ;; begin new loop + ;; keep-running loop: polls last-db-access to see if we have timed out. + (let loop ((count 0) + (bad-sync-count 0)) + + ;; Use this opportunity to sync the inmemdb to db + (let ((start-time (current-milliseconds)) + (sync-time #f) + (rem-time #f)) + ;; inmemdb is a dbstruct + (condition-case + (db:sync-touched *inmemdb* *run-id* force-sync: #t) + ((sync-failed)(cond + ((> bad-sync-count 10) ;; time to give up + (rpc-transport:server-shutdown server-id rpc:listener)) + (else ;; (> bad-sync-count 0) ;; we've had a fail or two, delay and loop + (thread-sleep! 5) + (loop count (+ bad-sync-count 1))))) + ((exn) + (debug:print-error 0 *default-log-port* "error from sync code other than 'sync-failed. Attempting to gracefully shutdown the server") + (rpc-transport:server-shutdown server-id rpc:listener))) + (set! sync-time (- (current-milliseconds) start-time)) + (set! rem-time (quotient (- 4000 sync-time) 1000)) + (debug:print 4 *default-log-port* "SYNC: time= " sync-time ", rem-time=" rem-time) + + (if (and (<= rem-time 4) + (> rem-time 0)) + (thread-sleep! rem-time) + (thread-sleep! 4))) ;; fallback for if the math is changed ... + + (if (< count 1) ;; 3x3 = 9 secs aprox + (loop (+ count 1) bad-sync-count)) + + ;; BB: don't see how this is possible with RPC + ;; ;; Check that iface and port have not changed (can happen if server port collides) + ;; (mutex-lock! *heartbeat-mutex*) + ;; (set! sdat *server-info*) + ;; (mutex-unlock! *heartbeat-mutex*) + + ;; (if (or (not (equal? sdat (list iface port))) + ;; (not server-id)) + ;; (begin + ;; (debug:print-info 0 *default-log-port* "interface changed, refreshing iface and port info") + ;; (set! iface (car sdat)) + ;; (set! port (cadr sdat)))) + + ;; Transfer *last-db-access* to last-access to use in checking that we are still alive + (mutex-lock! *heartbeat-mutex*) + (set! last-access *last-db-access*) + (mutex-unlock! *heartbeat-mutex*) + + ;; (debug:print 11 *default-log-port* "last-access=" last-access ", server-timeout=" server-timeout) + ;; + ;; no_traffic, no running tests, if server 0, no running servers + ;; + ;; (let ((wait-on-running (configf:lookup *configdat* "server" b"wait-on-running"))) ;; wait on running tasks (if not true then exit on time out) + ;; + (let* ((hrs-since-start (/ (- (current-seconds) server-start-time) 3600)) + (adjusted-timeout (if (> hrs-since-start 1) + (- server-timeout (inexact->exact (round (* hrs-since-start 60)))) ;; subtract 60 seconds per hour + server-timeout))) + (if (common:low-noise-print 120 "server timeout") + (debug:print-info 0 *default-log-port* "Adjusted server timeout: " adjusted-timeout)) + (if (and *server-run* + (> (+ last-access server-timeout) + (current-seconds))) + (begin + (if (common:low-noise-print 120 "server continuing") + (debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access))) + ;; + ;; Consider implementing some smarts here to re-insert the record or kill self is + ;; the db indicates so + ;; + (if (tasks:server-am-i-the-server? (db:delay-if-busy (tasks:open-db)) run-id) + (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "running")) + ;; + (loop 0 bad-sync-count)) + (begin + ;;(BB> "SERVER SHUTDOWN CALLED! last-access="last-access" current-seconds="(current-seconds)" server-timeout="server-timeout) + (rpc-transport:server-shutdown server-id rpc:listener))))) + ;; end new loop + )))) + + +(define (rpc-transport:find-free-port-and-open port #!key ) (handle-exceptions exn - (begin + (begin (print "Failed to bind to port " (rpc:default-server-port) ", trying next port") - (rpc-transport:find-free-port-and-open (+ port 1))) + (rpc-transport:find-free-port-and-open (add1 port))) (rpc:default-server-port port) + (set! *rpc-listener-port* port) ;; a bit paranoid about rpc:default-server-port parameter not changing across threads (as params are wont to do). keeping this global in my back pocket in case this causes problems + (set! *rpc-listener-port-bind-timestamp* (current-milliseconds)) ;; may want to test how long it has been since the last bind attempt happened... (tcp-read-timeout 240000) - (tcp-listen (rpc:default-server-port) 10000))) - + (tcp-buffer-size 0) ;; gotta do this because http-transport undoes it. + (tcp-listen (rpc:default-server-port) 10000) + )) + (define (rpc-transport:ping run-id host port) (handle-exceptions exn (begin - (print "SERVER_NOT_FOUND") + (print "SERVER_NOT_FOUND exn="exn) (exit 1)) (let ((login-res ((rpc:procedure 'server:login host port) *toppath*))) - (if (and (list? login-res) - (car login-res)) + (if login-res (begin (print "LOGIN_OK") (exit 0)) (begin (print "LOGIN_FAILED") (exit 1)))))) -(define (rpc-transport:client-setup run-id #!key (remtries 10)) - (if *runremote* - (begin - (debug:print-error 0 *default-log-port* "Attempt to connect to server but already connected") - #f) - (let* ((host-info (hash-table-ref/default *runremote* run-id #f))) ;; (open-run-close db:get-var #f "SERVER")) - (if host-info - (let ((iface (car host-info)) - (port (cadr host-info)) - (ping-res ((rpc:procedure 'server:login host port) *toppath*))) - (if ping-res - (let ((server-dat (list iface port #f #f #f))) - (hash-table-set! *runremote* run-id server-dat) - server-dat) - (begin - (server:try-running run-id) - (thread-sleep! 2) - (rpc-transport:client-setup run-id (- remtries 1))))) - (let* ((server-db-info (open-run-close tasks:get-server tasks:open-db run-id))) - (debug:print-info 0 *default-log-port* "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries) - (if server-db-info - (let* ((iface (tasks:hostinfo-get-interface server-db-info)) - (port (tasks:hostinfo-get-port server-db-info)) - (server-dat (list iface port #f #f #f)) - (ping-res ((rpc:procedure 'server:login host port) *toppath*))) - (if start-res - (begin - (hash-table-set! *runremote* run-id server-dat) - server-dat) - (begin - (server:try-running run-id) - (thread-sleep! 2) - (rpc-transport:client-setup run-id (- remtries 1))))) - (begin - (server:try-running run-id) - (thread-sleep! 2) - (rpc-transport:client-setup run-id (- remtries 1))))))))) -;; -;; (port (if (and hostinfo (> (length hostdat) 1))(cadr hostdat) #f))) -;; (if (and port -;; (string->number port)) -;; (let ((portn (string->number port))) -;; (debug:print-info 2 *default-log-port* "Setting up to connect to host " host ":" port) -;; (handle-exceptions -;; exn -;; (begin -;; (debug:print-error 0 *default-log-port* "Failed to open a connection to the server at host: " host " port: " port) -;; (debug:print 0 *default-log-port* " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) -;; ;; (open-run-close -;; ;; (lambda (db . param) -;; ;; (sqlite3:execute db "DELETE FROM metadat WHERE var='SERVER'")) -;; ;; #f) -;; (set! *runremote* #f)) -;; (if (and (not (args:get-arg "-server")) ;; no point in the server using the server using the server -;; ((rpc:procedure 'server:login host portn) *toppath*)) -;; (begin -;; (debug:print-info 2 *default-log-port* "Logged in and connected to " host ":" port) -;; (set! *runremote* (vector host portn))) -;; (begin -;; (debug:print-info 2 *default-log-port* "Failed to login or connect to " host ":" port) -;; (set! *runremote* #f))))) -;; (debug:print-info 2 *default-log-port* "no server available"))))) - +(define (rpc-transport:self-test run-id host port) + (tcp-buffer-size 0) ;; gotta do this because http-transport undoes it. + (let* ((testing-res ((rpc:procedure 'testing host port))) + (login-res ((rpc:procedure 'server:login host port) *toppath*)) + (res (and login-res (equal? testing-res "Just testing")))) + + (if login-res + (begin + ;;(BB> "Self test PASS. login-res="login-res" testing-res="testing-res" *toppath*="*toppath*) + #t) + (begin + ;;(BB> "Self test fail. login-res="login-res" testing-res="testing-res" *toppath*="*toppath*) + + #f)) + res)) + +(define (rpc-transport:client-setup run-id server-dat #!key (remtries 10)) + ;;(BB> "entered rpc-transport:client-setup with run-id="run-id" and server-dat="server-dat" and retries="remtries) + (tcp-buffer-size 0) + (debug:print-info 0 *default-log-port* "rpc-transport:client-setup run-id="run-id" server-dat=" server-dat ", remaining-tries=" remtries) + (let* ((iface (tasks:hostinfo-get-interface server-dat)) + (hostname (tasks:hostinfo-get-hostname server-dat)) + (port (tasks:hostinfo-get-port server-dat)) + (runremote-server-dat (vector iface port #f #f #f (current-seconds) 'rpc)) ;; http version := (vector iface port api-uri api-url api-req (current-seconds) 'http ) + (ping-res (retry-thunk (lambda () ;; make 3 attempts to ping. + ((rpc:procedure 'server:login iface port) *toppath*)) + chatty: #f + retries: 3))) + ;; we got here from rmt:get-connection-info on the condition that *runremote* has no entry for run-id... + (if ping-res + (begin + (debug:print-info 0 *default-log-port* "rpc-transport:client-setup CONNECTION ESTABLISHED run-id="run-id" server-dat=" server-dat) + (rmt:set-cinfo run-id runremote-server-dat) ;; (hash-table-set! *runremote* run-id runremote-server-dat) ;; side-effect - *runremote* cache init fpr rmt:* + runremote-server-dat) + (begin ;; login failed but have a server record, clean out the record and try again + (debug:print-info 0 *default-log-port* "rpc-transport:client-setup UNABLE TO CONNECT run-id="run-id" server-dat=" server-dat) + (tasks:kill-server-run-id run-id) + (tasks:server-force-clean-run-record (db:delay-if-busy (tasks:open-db)) run-id iface port + " rpc-transport:client-setup (server-dat = #t)") + (if (> remtries 2) + (thread-sleep! (+ 1 (random 5))) ;; spread out the starts a little + (thread-sleep! (+ 15 (random 20)))) ;; it isn't going well. give it plenty of time + (server:try-running run-id) + (thread-sleep! 5) ;; give server a little time to start up + (client:setup run-id remaining-tries: (sub1 remtries)))))) Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -226,11 +226,11 @@ ;; (define (server:check-if-running areapath) (let* ((dotserver (server:read-dotserver areapath))) ;; tdbdat (tasks:open-db))) (if dotserver (let* ((res (case *transport-type* - ((http)(server:ping-server dotserver)) + ((http rpc)(server:ping-server dotserver)) ;; ((nmsg)(nmsg-transport:ping (tasks:hostinfo-get-interface server) ))) (if res dotserver #f)) @@ -259,11 +259,17 @@ (begin (debug:print 0 *default-log-port* "ERROR: bad host:port") (if do-exit (exit 1))) (let* ((iface (car host-port)) (port (cadr host-port)) - (server-dat (http-transport:client-connect iface port)) + (server-dat + (case (remote-transport *runremote*) + ((http) (http-transport:client-connect iface port)) + ((rpc) (rpc-transport:client-connect iface port)) + (else + (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (4)") + (exit)))) (login-res (rmt:login-no-auto-client-setup server-dat))) (if (and (list? login-res) (car login-res)) (begin (print "LOGIN_OK") ADDED thunk-utils.scm Index: thunk-utils.scm ================================================================== --- /dev/null +++ thunk-utils.scm @@ -0,0 +1,121 @@ +(use srfi-18) + + +;; wrap a proc with a mutex so that two threads may not call proc simultaneously. +;; will catch exceptions to ensure mutex is unlocked even if exception is thrown. +;; will generate a unique mutex for proc unless one is specified with canned-mutex: option +;; +;; example 1: (define thread-safe-+ (make-synchronized-proc +)) +;; example 2: (define thread-safe-plus +;; (make-synchronized-proc +;; (lambda (x y) +;; (+ x y)))) + +(define (make-synchronized-proc proc + #!key (canned-mutex #f)) + (let* ((guard-mutex (if canned-mutex canned-mutex (make-mutex))) + (guarded-proc ;; we are guarding the thunk against exceptions. We will record whether result of evaluation is an exception or a regular result. + (lambda args + (mutex-lock! guard-mutex) + (let* ((EXCEPTION (gensym)) ;; using gensym to avoid potential collision with a proc that returns a pair having the first element be our flag. gensym guarantees the symbol is unique. + (res + (condition-case + (apply proc args) ;; this is what we are guarding the execution of + [x () (cons EXCEPTION x)] + ))) + (mutex-unlock! guard-mutex) + (cond + ((and (pair? res) (eq? (car res) EXCEPTION)) + (raise (cdr res))) + (else + res)))))) + guarded-proc)) + + +;; retry an operation (depends on srfi-18) +;; ================== +;; idea here is to avoid spending time on coding retrying something. Trying to be generic here. +;; +;; Exception handling: +;; ------------------- +;; if evaluating the thunk results in exception, it will be retried. +;; on last try, if final-failure-returns-actual is true, the exception will be re-thrown to caller. +;; +;; look at options below #!key to see how to configure behavior +;; +;; + +(define (retry-thunk + the-thunk + #!key ;;;; options below + (accept-result? (lambda (x) x)) ;; retry if predicate applied to thunk's result is false + (retries 4) ;; how many tries + (failure-value #f) ;; return this on final failure, unless following option is enabled: + (final-failure-returns-actual #f) ;; on failure, on the last try, just return the result, not failure-value + + (retry-delay 0.1) ;; delay between tries + (back-off-factor 1) ;; multiply retry-delay by this factor on retry + (random-delay 0.1) ;; add a random portion of this value to wait + + (chatty #f) ;; print status as we go, for debugging. + ) + + (when chatty (print) (print "Entered retry-thunk") (print "-=-=-=-=-=-")) + (let* ((guarded-thunk ;; we are guarding the thunk against exceptions. We will record whether result of evaluation is an exception or a regular result. + (lambda () + (let* ((EXCEPTION (gensym)) ;; using gensym to avoid potential collision + (res + (condition-case + (the-thunk) ;; this is what we are guarding the execution of + [x () (cons EXCEPTION x)] + ))) + (cond + ((and (pair? res) (eq? (car res) EXCEPTION)) + (if chatty + (print " - the-thunk threw exception >"(cdr res)"<")) + (cons 'exception (cdr res))) + (else + (if chatty + (print " - the-thunk returned result >"res"<")) + (cons 'regular-result res))))))) + + (let loop ((guarded-res (guarded-thunk)) + (retries-left retries) + (fail-wait retry-delay)) + (if chatty (print " ==========")) + (let* ((wait-time (+ fail-wait (+ (* fail-wait back-off-factor) + (* random-delay + (/ (random 1024) 1024) )))) + (res-type (car guarded-res)) + (res-value (cdr guarded-res))) + (cond + ((and (eq? res-type 'regular-result) (accept-result? res-value)) + (if chatty (print " + return result that satisfied accept-result? >"res-value"<")) + res-value) + + ((> retries-left 0) + (if chatty (print " - sleep "wait-time)) + (thread-sleep! wait-time) + (if chatty (print " + retry ["retries-left" tries left]")) + (loop (guarded-thunk) + (sub1 retries-left) + wait-time)) + + ((eq? res-type 'regular-result) + (if final-failure-returns-actual + (begin + (if chatty (print " + last try failed- return the result >"res-value"<")) + res-value) + (begin + (if chatty (print " + last try failed- return canned failure value >"failure-value"<")) + failure-value))) + + (else ;; no retries left; result was not accepted and res-type can only be 'exception + (if final-failure-returns-actual + (begin + (if chatty (print " + last try failed with exception- re-throw it >"res-value"<")) + (abort res-value)); re-raise the exception. TODO: find a way for call-history to show as though from entry to this function + (begin + (if chatty (print " + last try failed with exception- return canned failure value >"failure-value"<")) + failure-value)))))))) +