Index: client.scm
==================================================================
--- client.scm
+++ client.scm
@@ -46,17 +46,23 @@
(define (client:connect iface port)
(case (server:get-transport)
((rpc) (rpc:client-connect iface port))
((http) (http:client-connect iface port))
((zmq) (zmq:client-connect iface port))
- (else (rpc:client-connect iface port))))
+ (else
+ (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (5)")
+ (exit))))
(define (client:setup run-id #!key (remaining-tries 10) (failed-connects 0))
(case (server:get-transport)
- ((rpc) (rpc-transport:client-setup run-id remaining-tries: remaining-tries failed-connects: failed-connects)) ;;(client:setup-rpc run-id))
+ ((rpc) (let ((res (client:setup-rpc run-id remaining-tries: remaining-tries)))
+ (remote-conndat-set! *runremote* res)
+ res))
((http)(client:setup-http run-id remaining-tries: remaining-tries failed-connects: failed-connects))
- (else (rpc-transport:client-setup run-id remaining-tries: remaining-tries failed-connects: failed-connects)))) ;; (client:setup-rpc run-id))))
+ (else
+ (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (6)")
+ (exit)))) ;; (client:setup-rpc run-id))))
;; (define (client:login-no-auto-setup server-info run-id)
;; (case (server:get-transport)
;; ((rpc) (rpc:login-no-auto-client-setup server-info run-id))
;; ((http) (rmt:login-no-auto-client-setup server-info run-id))
@@ -152,10 +158,30 @@
;;
;; client:setup
;;
;; lookup_server, need to remove *runremote* stuff
;;
+
+(define (client:setup-rpc run-id #!key (remaining-tries 10) (failed-connects 0))
+ (debug:print-info 2 *default-log-port* "client:setup-rpc remaining-tries=" remaining-tries)
+ (let* ((server-dat (tasks:get-server (db:delay-if-busy (tasks:open-db)) run-id))
+ (num-available (tasks:num-in-available-state (db:delay-if-busy (tasks:open-db)) run-id)))
+ (cond
+ ((<= remaining-tries 0)
+ (debug:print-error 0 *default-log-port* "failed to start or connect to server for run-id " run-id)
+ (exit 1))
+ (server-dat
+ (debug:print-info 4 *default-log-port* "client:setup-rpc server-dat=" server-dat ", remaining-tries=" remaining-tries)
+
+ (rpc-transport:client-setup run-id server-dat remaining-tries: remaining-tries))
+ (else
+ (if (< num-available 2)
+ (server:try-running run-id))
+ (thread-sleep! (+ 2 (random (- 20 remaining-tries)))) ;; give server a little time to start up, randomize a little to avoid start storms.
+ (client:setup run-id remaining-tries: (- remaining-tries 1))))))
+
+
(define (client:setup-http run-id #!key (remaining-tries 10) (failed-connects 0))
(debug:print-info 2 *default-log-port* "client:setup remaining-tries=" remaining-tries)
(let* ((tdbdat (tasks:open-db)))
(if (<= remaining-tries 0)
(begin
@@ -165,22 +191,12 @@
(debug:print-info 4 *default-log-port* "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries)
(if server-dat
(let* ((iface (tasks:hostinfo-get-interface server-dat))
(hostname (tasks:hostinfo-get-hostname server-dat))
(port (tasks:hostinfo-get-port server-dat))
- (start-res (case *transport-type*
- ((http)(http-transport:client-connect iface port))
- ;;((nmsg)(nmsg-transport:client-connect hostname port))
- ))
- (ping-res (case *transport-type*
- ((http)(rmt:login-no-auto-client-setup start-res))
- ;; ((nmsg)(let ((logininfo (rmt:login-no-auto-client-setup start-res run-id)))
- ;; (if logininfo
- ;; (car (vector-ref logininfo 1))
- ;; #f)))
-
- )))
+ (start-res (http-transport:client-connect iface port))
+ (ping-res (rmt:login-no-auto-client-setup start-res)))
(if (and start-res
ping-res)
(begin
(remote-conndat-set! *runremote* start-res) ;; (hash-table-set! *runremote* run-id start-res)
(debug:print-info 2 *default-log-port* "connected to " (http-transport:server-dat-make-url start-res))
@@ -209,11 +225,11 @@
(if (< num-available 2)
(server:try-running run-id))
(thread-sleep! (+ 5 (random (- 20 remaining-tries)))) ;; give server a little time to start up, randomize a little to avoid start storms.
(client:setup run-id remaining-tries: (- remaining-tries 1)))))))))
-;; keep this as a function to ease future
+;; keep this as a function to ease future ;; this is unused, not porting for rpc -BB
(define (client:start run-id server-info)
(http-transport:client-connect (tasks:hostinfo-get-interface server-info)
(tasks:hostinfo-get-port server-info)))
;; ;; client:signal-handler
Index: common.scm
==================================================================
--- common.scm
+++ common.scm
@@ -18,10 +18,12 @@
(import (prefix base64 base64:))
(declare (unit common))
(include "common_records.scm")
+(include "thunk-utils.scm")
+
;; (require-library margs)
;; (include "margs.scm")
;; (define old-exit exit)
@@ -60,13 +62,24 @@
(if (not cxt)
(set! cxt (let ((x (make-cxt)))(hash-table-set! *contexts* toppath x) x)))
(let ((cxt-mutex (cxt-mutex cxt)))
(mutex-unlock! *context-mutex*)
(mutex-lock! cxt-mutex)
- (let ((res (proc cxt)))
- (mutex-unlock! cxt-mutex)
- res))))
+ ;; here we guard proc with exception handler so
+ ;; no matter how proc succeeds or fails,
+ ;; the cxt-mutex will be unlocked afterward.
+ (let* ((EXCEPTION-SYMBOL (gensym)) ;; use a generated symbol
+ (guarded-proc ;; to avoid collision
+ (lambda args
+ (let* ((res (condition-case
+ (apply proc args)
+ [x () (cons EXCEPTION-SYMBOL x)])))
+ (mutex-unlock! cxt-mutex)
+ (if (and (pair? res) (eq? (car res) EXCEPTION))
+ (abort (cdr res))
+ res)))))
+ (guarded-proc cxt)))))
(define *db-keys* #f)
(define *configinfo* #f) ;; raw results from setup, includes toppath and table from megatest.config
(define *runconfigdat* #f) ;; run configs data
Index: http-transport.scm
==================================================================
--- http-transport.scm
+++ http-transport.scm
@@ -84,11 +84,11 @@
((equal? (uri-path (request-uri (current-request)))
'(/ "api"))
(send-response body: (api:process-request *dbstruct-db* $) ;; the $ is the request vars proc
headers: '((content-type text/plain)))
(mutex-lock! *heartbeat-mutex*)
- (set! *db-last-access* (current-seconds))
+ (set! *last-db-access* (current-seconds))
(mutex-unlock! *heartbeat-mutex*))
((equal? (uri-path (request-uri (current-request)))
'(/ ""))
(send-response body: (http-transport:main-page)))
((equal? (uri-path (request-uri (current-request)))
@@ -335,11 +335,11 @@
;;
(define (http-transport:client-connect iface port)
(let* ((api-url (conc "http://" iface ":" port "/api"))
(api-uri (uri-reference (conc "http://" iface ":" port "/api")))
(api-req (make-request method: 'POST uri: api-uri))
- (server-dat (vector iface port api-uri api-url api-req (current-seconds))))
+ (server-dat (vector iface port api-uri api-url api-req (current-seconds) 'http)))
server-dat))
;; run http-transport:keep-running in a parallel thread to monitor that the db is being
;; used and to shutdown after sometime if it is not.
;;
@@ -426,13 +426,13 @@
(begin
(debug:print-info 0 *default-log-port* "interface changed, refreshing iface and port info")
(set! iface (car sdat))
(set! port (cadr sdat))))
- ;; Transfer *db-last-access* to last-access to use in checking that we are still alive
+ ;; Transfer *last-db-access* to last-access to use in checking that we are still alive
(mutex-lock! *heartbeat-mutex*)
- (set! last-access *db-last-access*)
+ (set! last-access *last-db-access*)
(mutex-unlock! *heartbeat-mutex*)
;; (debug:print 11 *default-log-port* "last-access=" last-access ", server-timeout=" server-timeout)
;;
;; no_traffic, no running tests, if server 0, no running servers
@@ -540,17 +540,17 @@
(begin
(debug:print 0 *default-log-port* "INFO: Server for run-id " run-id " already running")
(exit 0))
(begin ;; ok, no server detected, clean out any lingering records
(tasks:server-force-clean-running-records-for-run-id (db:delay-if-busy tdbdat) run-id "notresponding")))
- (let loop ((server-id (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id))
+ (let loop ((server-id (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id 'http))
(remtries 4))
(if (not server-id)
(if (> remtries 0)
(begin
(thread-sleep! 2)
- (loop (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id)
+ (loop (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id 'http)
(- remtries 1)))
(begin
;; since we didn't get the server lock we are going to clean up and bail out
(debug:print-info 2 *default-log-port* "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue")
(tasks:server-delete-records-for-this-pid (db:delay-if-busy tdbdat) " http-transport:launch")
@@ -638,11 +638,11 @@
"
Average non-cached time | " (if (eq? *number-non-write-queries* 0)
"n/a (no queries)"
(/ *total-non-write-delay*
*number-non-write-queries*))
" ms |
"
- "Last access | " (seconds->time-string *db-last-access*) " |
"
+ "Last access | " (seconds->time-string *last-db-access*) " |
"
"")))
(mutex-unlock! *heartbeat-mutex*)
res))
(define (http-transport:runs linkpath)
Index: megatest-version.scm
==================================================================
--- megatest-version.scm
+++ megatest-version.scm
@@ -1,7 +1,7 @@
;; Always use two or four digit decimal
;; 1.01, 1.02...1.10,1.11,1.1101 ... 1.99,2.00..
(declare (unit megatest-version))
-(define megatest-version 1.6208)
+(define megatest-version 1.6301)
Index: rmt.scm
==================================================================
--- rmt.scm
+++ rmt.scm
@@ -6,17 +6,18 @@
;;
;; This program is distributed WITHOUT ANY WARRANTY; without even the
;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
;; PURPOSE.
;;======================================================================
-
+;;
(use format typed-records) ;; RADT => purpose of json format??
(declare (unit rmt))
(declare (uses api))
(declare (uses tdb))
(declare (uses http-transport))
+(declare (uses rpc-transport))
;;(declare (uses nmsg-transport))
(include "common_records.scm")
;;
;; THESE ARE ALL CALLED ON THE CLIENT SIDE!!!
@@ -150,20 +151,24 @@
(dat (case (remote-transport *runremote*)
((http) (condition-case ;; handling here has caused a lot of problems. However it is needed to deal with attemtped communication to servers that have gone away
(http-transport:client-api-send-receive 0 conninfo cmd params)
((commfail)(vector #f "communications fail"))
((exn)(vector #f "other fail" (print-call-chain)))))
+ ((rpc) (condition-case ;; handling here has caused a lot of problems. However it is needed to deal with attemtped communication to servers that have gone away
+ (rpc-transport:client-api-send-receive 0 conninfo cmd params)
+ ((commfail)(vector #f "communications fail"))
+ ((exn)(vector #f "other fail" (print-call-chain)))))
(else
(debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported")
(exit))))
(success (if (vector? dat) (vector-ref dat 0) #f))
(res (if (vector? dat) (vector-ref dat 1) #f)))
(if (vector? conninfo)(http-transport:server-dat-update-last-access conninfo)) ;; refresh access time
(debug:print-info 12 *default-log-port* "rmt:send-receive, case 9. conninfo=" conninfo " dat=" dat)
(if success
(case (remote-transport *runremote*)
- ((http) res)
+ ((http rpc) res)
(else
(debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " is unknown")
(exit 1)))
(begin
(debug:print 0 *default-log-port* "WARNING: communication failed. Trying again, try num: " attemptnum)
@@ -266,11 +271,18 @@
(define (rmt:send-receive-no-auto-client-setup connection-info cmd run-id params)
(let* ((run-id (if run-id run-id 0))
(res (handle-exceptions
exn
#f
- (http-transport:client-api-send-receive run-id connection-info cmd params))))
+ (case (remote-transport *runremote*)
+ ((http) (http-transport:client-api-send-receive run-id connection-info cmd params))
+ ((rpc) (rpc-transport:client-api-send-receive run-id connection-info cmd params))
+ (else
+ (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (2)")
+ (exit))
+
+ ))))
(if (and res (vector-ref res 0))
(vector-ref res 1) ;;; YES!! THIS IS CORRECT!! CHANGE IT HERE, THEN CHANGE rmt:send-receive ALSO!!!
#f)))
;; ;; Wrap json library for strings (why the ports crap in the first place?)
@@ -310,11 +322,16 @@
;; This login does no retries under the hood - it acts a bit like a ping.
;; Deprecated for nmsg-transport.
;;
(define (rmt:login-no-auto-client-setup connection-info)
(case *transport-type* ;; run-id of 0 is just a placeholder
- ((http)(rmt:send-receive-no-auto-client-setup connection-info 'login 0 (list *toppath* megatest-version *my-client-signature*)))
+ ((http rpc)(rmt:send-receive-no-auto-client-setup connection-info 'login 0 (list *toppath* megatest-version *my-client-signature*)))
+ (else
+ (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (3)")
+ (exit))
+
+
;;((nmsg)(nmsg-transport:client-api-send-receive run-id connection-info 'login (list *toppath* megatest-version run-id *my-client-signature*)))
))
;; hand off a call to one of the db:queries statements
;; added run-id to make looking up the correct db possible
Index: rpc-transport.scm
==================================================================
--- rpc-transport.scm
+++ rpc-transport.scm
@@ -1,7 +1,7 @@
-;; Copyright 2006-2012, Matthew Welland.
+;; Copyright 2006-2016, Matthew Welland.
;;
;; This program is made available under the GNU GPL version 2.0 or
;; greater. See the accompanying file COPYING for details.
;;
;; This program is distributed WITHOUT ANY WARRANTY; without even the
@@ -21,208 +21,622 @@
(declare (uses tests))
(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running.
(include "common_records.scm")
(include "db_records.scm")
+
+(define *heartbeat-mutex* (make-mutex))
+(define *server-loop-heart-beat* (current-seconds))
+
;; procstr is the name of the procedure to be called as a string
-(define (rpc-transport:autoremote procstr params)
- (handle-exceptions
- exn
- (begin
- (debug:print 1 *default-log-port* "Remote failed for " proc " " params)
- (apply (eval (string->symbol procstr)) params))
- ;; (if *runremote*
- ;; (apply (eval (string->symbol (conc "remote:" procstr))) params)
- (apply (eval (string->symbol procstr)) params)))
+(define (rpc-transport:autoremote procstr params) ;; may be unused, I think api-exec deprecates this one.
+ (let* ((procsym (if (symbol? procstr)
+ procstr
+ (string->symbol (->string procstr))))
+ (res
+ (begin
+ (apply (eval procsym) params))))
+ res))
+
+
+;; rpc receiver
+(define (rpc-transport:api-exec cmd params)
+ (let* ( (resdat (api:execute-requests *inmemdb* (vector cmd params))) ;; #( flag result )
+ (flag (vector-ref resdat 0))
+ (res (vector-ref resdat 1)))
+
+ (mutex-lock! *heartbeat-mutex*)
+
+ (set! *last-db-access* (current-seconds)) ;; bump *last-db-access*; this will renew keep-running thread's lease on life for another (server:get-timeout) seconds
+ ;;(BB> "in api-exec; last-db-access updated to "*last-db-access*)
+ (mutex-unlock! *heartbeat-mutex*)
+
+ res))
+
+
+;; retry an operation (depends on srfi-18)
+;; ==================
+;; idea here is to avoid spending time on coding retrying something. Trying to be generic here.
+;;
+;; Exception handling:
+;; -------------------
+;; if evaluating the thunk results in exception, it will be retried.
+;; on last try, if final-failure-returns-actual is true, the exception will be re-thrown to caller.
+;;
+;; look at options below #!key to see how to configure behavior
+;;
+;;
+(define (retry-thunk
+ the-thunk
+ #!key ;;;; options below
+ (accept-result? (lambda (x) x)) ;; retry if predicate applied to thunk's result is false
+ (retries 4) ;; how many tries
+ (failure-value #f) ;; return this on final failure, unless following option is enabled:
+ (final-failure-returns-actual #f) ;; on failure, on the last try, just return the result, not failure-value
+
+ (retry-delay 0.1) ;; delay between tries
+ (back-off-factor 1) ;; multiply retry-delay by this factor on retry
+ (random-delay 0.1) ;; add a random portion of this value to wait
+
+ (chatty #f) ;; print status as we go, for debugging.
+ )
+
+ (when chatty (print) (print "Entered retry-thunk") (print "-=-=-=-=-=-"))
+ (let* ((guarded-thunk ;; we are guarding the thunk against exceptions. We will record whether result of evaluation is an exception or a regular result.
+ (lambda ()
+ (let* ((EXCEPTION (gensym)) ;; using gensym to avoid potential collision
+ (res
+ (condition-case
+ (the-thunk) ;; this is what we are guarding the execution of
+ [x () (cons EXCEPTION x)]
+ )))
+ (cond
+ ((and (pair? res) (eq? (car res) EXCEPTION))
+ (if chatty
+ (print " - the-thunk threw exception >"(cdr res)"<"))
+ (cons 'exception (cdr res)))
+ (else
+ (if chatty
+ (print " - the-thunk returned result >"res"<"))
+ (cons 'regular-result res)))))))
+
+ (let loop ((guarded-res (guarded-thunk))
+ (retries-left retries)
+ (fail-wait retry-delay))
+ (if chatty (print " =========="))
+ (let* ((wait-time (+ fail-wait (+ (* fail-wait back-off-factor)
+ (* random-delay
+ (/ (random 1024) 1024) ))))
+ (res-type (car guarded-res))
+ (res-value (cdr guarded-res)))
+ (cond
+ ((and (eq? res-type 'regular-result) (accept-result? res-value))
+ (if chatty (print " + return result that satisfied accept-result? >"res-value"<"))
+ res-value)
+
+ ((> retries-left 0)
+ (if chatty (print " - sleep "wait-time))
+ (thread-sleep! wait-time)
+ (if chatty (print " + retry ["retries-left" tries left]"))
+ (loop (guarded-thunk)
+ (sub1 retries-left)
+ wait-time))
+
+ ((eq? res-type 'regular-result)
+ (if final-failure-returns-actual
+ (begin
+ (if chatty (print " + last try failed- return the result >"res-value"<"))
+ res-value)
+ (begin
+ (if chatty (print " + last try failed- return canned failure value >"failure-value"<"))
+ failure-value)))
+
+ (else ;; no retries left; result was not accepted and res-type can only be 'exception
+ (if final-failure-returns-actual
+ (begin
+ (if chatty (print " + last try failed with exception- re-throw it >"res-value"<"))
+ (abort res-value)); re-raise the exception. TODO: find a way for call-history to show as though from entry to this function
+ (begin
+ (if chatty (print " + last try failed with exception- return canned failure value >"failure-value"<"))
+ failure-value))))))))
+
+
+(define (rpc-transport:server-shutdown server-id rpc:listener ) ;;#!key (from-on-exit #f))
+ ;;(on-exit (lambda () #t)) ;; turn off on-exit stuff
+ ;;(tcp-close rpc:listener) ;; gotta exit nicely
+ ;;(tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "stopped")
+
+
+ ;; TODO: (low) the following is extraordinaritly slow. Maybe we don't even need portlogger for rpc anyway?? the exception-based failover when ports are taken is fast!
+ ;;(portlogger:open-run-close portlogger:set-port (rpc:default-server-port) "released")
+
+ (set! *time-to-exit* #t)
+ ;;(if *inmemdb* (db:sync-touched *inmemdb* *run-id* force-sync: #t))
+
+
+ (tasks:server-delete-record (db:delay-if-busy (tasks:open-db)) server-id " rpc-transport:keep-running complete")
+
+
+ ;;(BB> "Before (exit) (from-on-exit="from-on-exit")")
+ ;;(unless from-on-exit (exit)) ;; sometimes we hang (around) here with 100% cpu.
+ ;;(BB> "After")
+ ;; strace reveals endless:
+ ;; getrusage(RUSAGE_SELF, {ru_utime={413, 917868}, ru_stime={0, 60003}, ...}) = 0
+ ;; getrusage(RUSAGE_SELF, {ru_utime={414, 9874}, ru_stime={0, 60003}, ...}) = 0
+ ;; getrusage(RUSAGE_SELF, {ru_utime={414, 13874}, ru_stime={0, 60003}, ...}) = 0
+ ;; getrusage(RUSAGE_SELF, {ru_utime={414, 105880}, ru_stime={0, 60003}, ...}) = 0
+ ;; getrusage(RUSAGE_SELF, {ru_utime={414, 109880}, ru_stime={0, 60003}, ...}) = 0
+ ;; getrusage(RUSAGE_SELF, {ru_utime={414, 201886}, ru_stime={0, 60003}, ...}) = 0
+ ;; getrusage(RUSAGE_SELF, {ru_utime={414, 205886}, ru_stime={0, 60003}, ...}) = 0
+ ;; getrusage(RUSAGE_SELF, {ru_utime={414, 297892}, ru_stime={0, 60003}, ...}) = 0
+ ;; getrusage(RUSAGE_SELF, {ru_utime={414, 301892}, ru_stime={0, 60003}, ...}) = 0
+ ;; getrusage(RUSAGE_SELF, {ru_utime={414, 393898}, ru_stime={0, 60003}, ...}) = 0
+ ;; getrusage(RUSAGE_SELF, {ru_utime={414, 397898}, ru_stime={0, 60003}, ...}) = 0
+ ;; make a post to chicken-users w/ http://paste.call-cc.org/paste?id=60a4b66a29ccf7d11359ea866db642c970735978
+
+
+ ;; (if from-on-exit
+ ;; ;; avoid above condition! End current process externally since 1 in 20 (exit)'s result in hung, 100% cpu zombies. (see above)
+
+ (system (conc "kill -9 "(current-process-id)))
+ )
+
;; all routes though here end in exit ...
;;
;; start_server?
;;
(define (rpc-transport:launch run-id)
- (let* ((tdbdat (tasks:open-db)))
- (BB> "rpc-transport:launch fired for run-id="run-id)
- (set! *run-id* run-id)
- (if (args:get-arg "-daemonize")
- (daemon:ize))
- (if (server:check-if-running run-id)
- (begin
- (debug:print 0 *default-log-port* "INFO: Server for run-id " run-id " already running")
- (exit 0)))
- (let loop ((server-id (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id))
- (remtries 4))
- (if (not server-id)
- (if (> remtries 0)
- (begin
- (thread-sleep! 2)
- (loop (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id)
- (- remtries 1)))
- (begin
- ;; since we didn't get the server lock we are going to clean up and bail out
- (debug:print-info 2 *default-log-port* "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue")
- (tasks:server-delete-records-for-this-pid (db:delay-if-busy tdbdat) " rpc-transport:launch")))
- (begin
- (rpc-transport:run (if (args:get-arg "-server")(args:get-arg "-server") "-") run-id server-id)
- (exit))))))
+ (set! *run-id* run-id)
+
+ ;; ;; send to background if requested
+ ;; (when (args:get-arg "-daemonize")
+ ;; (daemon:ize)
+ ;; (when *alt-log-file* ;; we should re-connect to this port, I think daemon:ize disrupts it
+ ;; (current-error-port *alt-log-file*)
+ ;; (current-output-port *alt-log-file*)))
+
+ ;; double check we dont alrady have a running server for this run-id
+ (when (server:check-if-running run-id)
+ (debug:print 0 *default-log-port* "INFO: Server for run-id " run-id " already running")
+ (exit 0))
+
+
+ ;; clean up dead servers (duped in megatest.scm in -list-servers processing; may want to consolidate into proc)
+ (for-each
+ (lambda (server)
+ (let* ((id (vector-ref server 0))
+ (pid (vector-ref server 1))
+ (hostname (vector-ref server 2))
+ (interface (vector-ref server 3))
+ (pullport (vector-ref server 4))
+ (pubport (vector-ref server 5))
+ (start-time (vector-ref server 6))
+ (priority (vector-ref server 7))
+ (state (vector-ref server 8))
+ (mt-ver (vector-ref server 9))
+ (last-update (vector-ref server 10))
+ (transport (vector-ref server 11))
+ (killed #f)
+ (status (< last-update 20)))
+
+ (if (equal? state "dead")
+ (if (> last-update (* 25 60 60)) ;; keep records around for slighly over a day.
+ (tasks:server-deregister (db:delay-if-busy (tasks:open-db)) hostname pullport: pullport pid: pid action: 'delete))
+ (if (> last-update 20) ;; Mark as dead if not updated in last 20 seconds
+ (tasks:server-deregister (db:delay-if-busy (tasks:open-db)) hostname pullport: pullport pid: pid)))
+ ;;(format #t fmtstr id mt-ver pid hostname (conc interface ":" pullport) pubport last-update
+ ;; (if status "alive" "dead") transport)
+ ;; (if (or (equal? id sid)
+ ;; (equal? sid 0)) ;; kill all/any
+ ;; (begin
+ ;; (debug:print-info 0 *default-log-port* "Attempting to kill "kill-switch" server with pid " pid)
+ ;; (tasks:kill-server hostname pid kill-switch: kill-switch)))
+
+ )
+
+ )
+ (tasks:get-all-servers (db:delay-if-busy (tasks:open-db))))
+
+ ;; let's get a server-id for this server
+ ;; if at first we do not suceed, try 3 more times.
+ (let ((server-id (retry-thunk
+ (lambda () (tasks:server-lock-slot (db:delay-if-busy (tasks:open-db)) run-id 'rpc))
+ chatty: #f
+ final-failure-returns-actual: #t
+ retries: 4)))
+ (when (not server-id) ;; dang we couldn't get a server-id.
+ ;; since we didn't get the server lock we are going to clean up and bail out
+ (debug:print-info 2 *default-log-port* "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue")
+ (tasks:server-delete-records-for-this-pid (db:delay-if-busy (tasks:open-db)) " rpc-transport:launch")
+ (exit 1))
+
+ ;; we got a server-id (and a corresponding entry in servers table in globally shared mdb)
+ ;; all systems go. Proceed to setup rpc server.
+ (rpc-transport:run
+ (if (args:get-arg "-server")
+ (args:get-arg "-server")
+ "-")
+ run-id
+ server-id)
+ (exit)))
+
+(define *rpc-listener-port* #f)
+(define *rpc-listener-port-bind-timestamp* #f)
+
+(define *on-exit-flag #f)
+
+(define (rpc-transport:server-dat-get-iface vec) (vector-ref vec 0))
+(define (rpc-transport:server-dat-get-port vec) (vector-ref vec 1))
+(define (rpc-transport:server-dat-get-last-access vec) (vector-ref vec 5))
+(define (rpc-transport:server-dat-get-transport vec) (vector-ref vec 6))
+(define (rpc-transport:server-dat-update-last-access vec)
+ (if (vector? vec)
+ (vector-set! vec 5 (current-seconds))
+ (begin
+ (print-call-chain (current-error-port))
+ (debug:print-error 0 *default-log-port* "call to rpc-transport:server-dat-update-last-access with non-vector!!"))))
+
+
+(define *api-exec-ht* (make-hash-table))
+(define *api-exec-mutex* (make-mutex))
+;; let's see if caching the rpc stub curbs thread-profusion on server side
+(define (rpc-transport:get-api-exec iface port)
+ (mutex-lock! *api-exec-mutex*)
+ (let* ((lu (hash-table-ref/default *api-exec-ht* (cons iface port) #f)))
+ (if lu
+ (begin
+ (mutex-unlock! *api-exec-mutex*)
+ lu)
+ (let ((res (rpc:procedure 'api-exec iface port)))
+ (hash-table-set! *api-exec-ht* (cons iface port) res)
+ (mutex-unlock! *api-exec-mutex*)
+ res))))
+
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+;; this client-side procedure makes rpc call to server and returns result
+;;
+(define (rpc-transport:client-api-send-receive run-id serverdat cmd params #!key (numretries 3))
+ ;;(BB> "entered rpc-transport:client-api-send-receive with run-id="run-id " serverdat="serverdat" cmd="cmd" params="params" numretries="numretries)
+ (if (not (vector? serverdat))
+ (begin
+ (BB> "WHAT?? for run-id="run-id", serverdat="serverdat)
+ (print-call-chain)
+ (exit 1)))
+ (let* ((iface (rpc-transport:server-dat-get-iface serverdat))
+ (port (rpc-transport:server-dat-get-port serverdat))
+ (res #f)
+ (api-exec (rpc-transport:get-api-exec iface port)) ;; chached by host/port. may need to clear...
+ (send-receive (lambda ()
+ (tcp-buffer-size 0)
+ (set! res (retry-thunk
+ (lambda ()
+ (condition-case
+ ;;(vector #t (run-remote cmd params))
+ (vector 'success (api-exec cmd params))
+ [x (exn i/o net) (vector 'comms-fail (conc "communications fail ["(->string x)"]") x)]
+ [x () (vector 'other-fail "other fail ["(->string x)"]" x)]))
+ chatty: #f
+ accept-result?: (lambda(x)
+ (and (vector? x) (vector-ref x 0)))
+ retries: 8
+ back-off-factor: 1.5
+ random-wait: 0.2
+ retry-delay: 0.1
+ final-failure-returns-actual: #t))
+ ;;(BB> "HEY res="res)
+ res
+ ))
+ (th1 (make-thread send-receive "send-receive"))
+ (time-out-reached #f)
+ (time-out (lambda ()
+ (thread-sleep! 45)
+ (set! time-out-reached #t)
+ (thread-terminate! th1)
+ #f))
+
+ (th2 (make-thread time-out "time out")))
+ (thread-start! th1)
+ (thread-start! th2)
+ (thread-join! th1)
+ (thread-terminate! th2)
+ ;;(BB> "alt got res="res)
+ (debug:print-info 11 *default-log-port* "got res=" res)
+ (if (vector? res)
+ (case (vector-ref res 0)
+ ((success) (vector #t (vector-ref res 1)))
+ (
+ (comms-fail other-fail)
+ ;;(comms-fail)
+ (debug:print 0 *default-log-port* "WARNING: comms failure for rpc request >>"res"<<")
+ ;;(debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn))
+ (vector #f (vector-ref res 1)))
+ (else
+ (debug:print-error 0 *default-log-port* "error occured at server, info=" (vector-ref res 1))
+ (debug:print 0 *default-log-port* " client call chain:")
+ (print-call-chain (current-error-port))
+ (debug:print 0 *default-log-port* " server call chain:")
+ (pp (vector-ref res 1) (current-error-port))
+ (signal (vector-ref res 2))))
+ (signal (make-composite-condition
+ (make-property-condition
+ 'timeout
+ 'message "nmsg-transport:client-api-send-receive-raw timed out talking to server"))))))
+
+
(define (rpc-transport:run hostn run-id server-id)
(debug:print 2 *default-log-port* "Attempting to start the rpc server ...")
;; (trace rpc:publish-procedure!)
- (rpc:publish-procedure! 'server:login server:login)
- (rpc:publish-procedure! 'testing (lambda () "Just testing"))
+ ;;======================================================================
+ ;; start of publish-procedure section
+ ;;======================================================================
+ (rpc:publish-procedure! 'server:login server:login) ;; this allows client to validate it is the same megatest instance as the server. No security here, just making sure we're in the right room.
+ (rpc:publish-procedure!
+ 'testing
+ (lambda ()
+ "Just testing"))
+
+ ;; procedure to receive arbitrary API request from client's rpc:send-receive/rpc-transport:client-api-send-receive
+ (rpc:publish-procedure! 'rpc-transport:autoremote rpc-transport:autoremote)
+ ;; can use this to run most anything at the remote
+ (rpc:publish-procedure! 'api-exec rpc-transport:api-exec)
+
+
+ ;;======================================================================
+ ;; end of publish-procedure section
+ ;;======================================================================
+
(let* ((db #f)
- (hostname (get-host-name))
- (ipaddrstr (let ((ipstr (if (string=? "-" hostn)
+ (hostname (let ((res (get-host-name))) res))
+ (server-start-time (current-seconds))
+ (server-timeout (server:get-timeout))
+ (ipaddrstr (let* ((ipstr (if (string=? "-" hostn)
;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
(server:get-best-guess-address hostname)
- #f)))
- (if ipstr ipstr hostn))) ;; hostname)))
- (start-port (open-run-close tasks:server-get-next-port tasks:open-db))
+ #f))
+ (res (if ipstr ipstr hostn)))
+ res)) ;; hostname)))
+ (start-port (let ((res (portlogger:open-run-close portlogger:find-port))) ;; BB> TODO: remove portlogger!
+ res))
(link-tree-path (configf:lookup *configdat* "setup" "linktree"))
- (rpc:listener (rpc-transport:find-free-port-and-open (rpc:default-server-port)))
+
+ ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+ ;; rpc:listener is the tcp-listen result from inside the find-free-port-and-open complex.
+ ;; It is our handle on the listening tcp port
+ ;; We will attach this to our rpc server with rpc:make-server in thread th1 .
+ (rpc:listener (rpc-transport:find-free-port-and-open start-port))
(th1 (make-thread
(lambda ()
- ((rpc:make-server rpc:listener) #t))
+ ((rpc:make-server rpc:listener) #t) )
"rpc:server"))
- ;; (cute (rpc:make-server rpc:listener) "rpc:server")
- ;; 'rpc:server))
- (hostname (if (string=? "-" hostn)
+
+
+ (hostname (if (string=? "-" hostn)
(get-host-name)
hostn))
(ipaddrstr (if (string=? "-" hostn)
(server:get-best-guess-address hostname) ;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
#f))
- (portnum (rpc:default-server-port))
- (host:port (conc (if ipaddrstr ipaddrstr hostname) ":" portnum))
- (tdb (tasks:open-db)))
+ (portnum (let ((res (rpc:default-server-port))) res))
+ (host:port (conc (if ipaddrstr ipaddrstr hostname) ":" portnum)))
+
+
+ (tasks:server-set-interface-port (db:delay-if-busy (tasks:open-db)) server-id ipaddrstr portnum)
+
+ ;;============================================================
+ ;; activate thread th1 to attach opened tcp port to rpc server
+ ;;=============================================================
(thread-start! th1)
- (set! db *dbstruct-db*)
- (open-run-close tasks:server-set-interface-port
- tasks:open-db
- server-id
- ipaddrstr portnum)
+ (set! db *inmemdb*)
+
(debug:print 0 *default-log-port* "Server started on " host:port)
-
- ;; (trace rpc:publish-procedure!)
- ;; (rpc:publish-procedure! 'server:login server:login)
- ;; (rpc:publish-procedure! 'testing (lambda () "Just testing"))
-
- ;;======================================================================
- ;; ;; end of publish-procedure section
- ;;======================================================================
- ;;
- (on-exit (lambda ()
- (open-run-close tasks:server-set-state! tasks:open-db server-id "stopped")))
-
- (set! *rpc:listener* rpc:listener)
- (tasks:server-set-state! tdb server-id "running")
- (set! *dbstruct-db* (db:setup run-id))
- ;; if none running or if > 20 seconds since
- ;; server last used then start shutdown
- (let loop ((count 0))
- (thread-sleep! 5) ;; no need to do this very often
- (let ((numrunning -1)) ;; (db:get-count-tests-running db)))
- (if (or (> numrunning 0)
- (> (+ *db-last-access* 60)(current-seconds)))
- (begin
- (debug:print-info 0 *default-log-port* "Server continuing, tests running: " numrunning ", seconds since last db access: " (- (current-seconds) *db-last-access*))
- (loop (+ 1 count)))
- (begin
- (debug:print-info 0 *default-log-port* "Starting to shutdown the server side")
- (open-run-close tasks:server-delete-record tasks:open-db server-id " rpc-transport:try-start-server stop")
- (thread-sleep! 10)
- (debug:print-info 0 *default-log-port* "Max cached queries was " *max-cache-size*)
- (debug:print-info 0 *default-log-port* "Server shutdown complete. Exiting")
- ))))))
-
-(define (rpc-transport:find-free-port-and-open port)
+
+ ;; (thread-sleep! 5)
+
+ (if (retry-thunk (lambda ()
+ (rpc-transport:self-test run-id ipaddrstr portnum))
+ final-failure-returns-actual: #t
+ )
+ (debug:print 0 *default-log-port* "INFO: rpc self test passed!")
+ (begin
+ (debug:print 0 *default-log-port* "Error: rpc listener did not pass self test. Shutting down. On: " host:port)
+ (BB> 1)
+ (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "dead")
+ (BB> 2)
+ (tcp-close rpc:listener) ;; gotta exit nicely and free up that tcp port
+ (BB> 3)
+ (rpc-transport:server-shutdown server-id rpc:listener)
+ (exit)))
+ (mutex-lock! *heartbeat-mutex*)
+ (set! *last-db-access* (current-seconds))
+ (mutex-unlock! *heartbeat-mutex*)
+
+ ;;(on-exit (lambda ()
+ ;; (rpc-transport:server-shutdown server-id rpc:listener from-on-exit: #t)))
+
+ ;; check again for running servers for this run-id in case one has snuck in since we checked last in rpc-transport:launch
+ (if (not (equal? server-id (tasks:server-am-i-the-server? (db:delay-if-busy (tasks:open-db)) run-id)));; try to ensure no double registering of servers
+ (begin ;; i am not the server, another server snuck in and beat this one to the punch
+ (tcp-close rpc:listener) ;; gotta exit nicely and free up that tcp port
+ (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "collision"))
+
+ (begin ;; i am the server
+ ;; setup the in-memory db
+ (set! *inmemdb* (db:setup run-id))
+ (db:get-db *inmemdb* run-id)
+
+ ;; let's make it official
+ (set! *rpc:listener* rpc:listener)
+ (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "running") ;; update our mdb servers entry
+
+
+
+ ;; this let loop will hold open this thread until we want the server to shut down.
+ ;; if no requests received within the last 20 seconds :
+ ;; database hasnt changed in ??
+ ;;
+
+ ;; begin new loop
+ ;; keep-running loop: polls last-db-access to see if we have timed out.
+ (let loop ((count 0)
+ (bad-sync-count 0))
+
+ ;; Use this opportunity to sync the inmemdb to db
+ (let ((start-time (current-milliseconds))
+ (sync-time #f)
+ (rem-time #f))
+ ;; inmemdb is a dbstruct
+ (condition-case
+ (db:sync-touched *inmemdb* *run-id* force-sync: #t)
+ ((sync-failed)(cond
+ ((> bad-sync-count 10) ;; time to give up
+ (rpc-transport:server-shutdown server-id rpc:listener))
+ (else ;; (> bad-sync-count 0) ;; we've had a fail or two, delay and loop
+ (thread-sleep! 5)
+ (loop count (+ bad-sync-count 1)))))
+ ((exn)
+ (debug:print-error 0 *default-log-port* "error from sync code other than 'sync-failed. Attempting to gracefully shutdown the server")
+ (rpc-transport:server-shutdown server-id rpc:listener)))
+ (set! sync-time (- (current-milliseconds) start-time))
+ (set! rem-time (quotient (- 4000 sync-time) 1000))
+ (debug:print 4 *default-log-port* "SYNC: time= " sync-time ", rem-time=" rem-time)
+
+ (if (and (<= rem-time 4)
+ (> rem-time 0))
+ (thread-sleep! rem-time)
+ (thread-sleep! 4))) ;; fallback for if the math is changed ...
+
+ (if (< count 1) ;; 3x3 = 9 secs aprox
+ (loop (+ count 1) bad-sync-count))
+
+ ;; BB: don't see how this is possible with RPC
+ ;; ;; Check that iface and port have not changed (can happen if server port collides)
+ ;; (mutex-lock! *heartbeat-mutex*)
+ ;; (set! sdat *server-info*)
+ ;; (mutex-unlock! *heartbeat-mutex*)
+
+ ;; (if (or (not (equal? sdat (list iface port)))
+ ;; (not server-id))
+ ;; (begin
+ ;; (debug:print-info 0 *default-log-port* "interface changed, refreshing iface and port info")
+ ;; (set! iface (car sdat))
+ ;; (set! port (cadr sdat))))
+
+ ;; Transfer *last-db-access* to last-access to use in checking that we are still alive
+ (mutex-lock! *heartbeat-mutex*)
+ (set! last-access *last-db-access*)
+ (mutex-unlock! *heartbeat-mutex*)
+
+ ;; (debug:print 11 *default-log-port* "last-access=" last-access ", server-timeout=" server-timeout)
+ ;;
+ ;; no_traffic, no running tests, if server 0, no running servers
+ ;;
+ ;; (let ((wait-on-running (configf:lookup *configdat* "server" b"wait-on-running"))) ;; wait on running tasks (if not true then exit on time out)
+ ;;
+ (let* ((hrs-since-start (/ (- (current-seconds) server-start-time) 3600))
+ (adjusted-timeout (if (> hrs-since-start 1)
+ (- server-timeout (inexact->exact (round (* hrs-since-start 60)))) ;; subtract 60 seconds per hour
+ server-timeout)))
+ (if (common:low-noise-print 120 "server timeout")
+ (debug:print-info 0 *default-log-port* "Adjusted server timeout: " adjusted-timeout))
+ (if (and *server-run*
+ (> (+ last-access server-timeout)
+ (current-seconds)))
+ (begin
+ (if (common:low-noise-print 120 "server continuing")
+ (debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access)))
+ ;;
+ ;; Consider implementing some smarts here to re-insert the record or kill self is
+ ;; the db indicates so
+ ;;
+ (if (tasks:server-am-i-the-server? (db:delay-if-busy (tasks:open-db)) run-id)
+ (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "running"))
+ ;;
+ (loop 0 bad-sync-count))
+ (begin
+ ;;(BB> "SERVER SHUTDOWN CALLED! last-access="last-access" current-seconds="(current-seconds)" server-timeout="server-timeout)
+ (rpc-transport:server-shutdown server-id rpc:listener)))))
+ ;; end new loop
+ ))))
+
+
+(define (rpc-transport:find-free-port-and-open port #!key )
(handle-exceptions
exn
- (begin
+ (begin
(print "Failed to bind to port " (rpc:default-server-port) ", trying next port")
- (rpc-transport:find-free-port-and-open (+ port 1)))
+ (rpc-transport:find-free-port-and-open (add1 port)))
(rpc:default-server-port port)
+ (set! *rpc-listener-port* port) ;; a bit paranoid about rpc:default-server-port parameter not changing across threads (as params are wont to do). keeping this global in my back pocket in case this causes problems
+ (set! *rpc-listener-port-bind-timestamp* (current-milliseconds)) ;; may want to test how long it has been since the last bind attempt happened...
(tcp-read-timeout 240000)
- (tcp-listen (rpc:default-server-port) 10000)))
-
+ (tcp-buffer-size 0) ;; gotta do this because http-transport undoes it.
+ (tcp-listen (rpc:default-server-port) 10000)
+ ))
+
(define (rpc-transport:ping run-id host port)
(handle-exceptions
exn
(begin
- (print "SERVER_NOT_FOUND")
+ (print "SERVER_NOT_FOUND exn="exn)
(exit 1))
(let ((login-res ((rpc:procedure 'server:login host port) *toppath*)))
- (if (and (list? login-res)
- (car login-res))
+ (if login-res
(begin
(print "LOGIN_OK")
(exit 0))
(begin
(print "LOGIN_FAILED")
(exit 1))))))
-(define (rpc-transport:client-setup run-id #!key (remtries 10))
- (if *runremote*
- (begin
- (debug:print-error 0 *default-log-port* "Attempt to connect to server but already connected")
- #f)
- (let* ((host-info (hash-table-ref/default *runremote* run-id #f))) ;; (open-run-close db:get-var #f "SERVER"))
- (if host-info
- (let ((iface (car host-info))
- (port (cadr host-info))
- (ping-res ((rpc:procedure 'server:login host port) *toppath*)))
- (if ping-res
- (let ((server-dat (list iface port #f #f #f)))
- (hash-table-set! *runremote* run-id server-dat)
- server-dat)
- (begin
- (server:try-running run-id)
- (thread-sleep! 2)
- (rpc-transport:client-setup run-id (- remtries 1)))))
- (let* ((server-db-info (open-run-close tasks:get-server tasks:open-db run-id)))
- (debug:print-info 0 *default-log-port* "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries)
- (if server-db-info
- (let* ((iface (tasks:hostinfo-get-interface server-db-info))
- (port (tasks:hostinfo-get-port server-db-info))
- (server-dat (list iface port #f #f #f))
- (ping-res ((rpc:procedure 'server:login host port) *toppath*)))
- (if start-res
- (begin
- (hash-table-set! *runremote* run-id server-dat)
- server-dat)
- (begin
- (server:try-running run-id)
- (thread-sleep! 2)
- (rpc-transport:client-setup run-id (- remtries 1)))))
- (begin
- (server:try-running run-id)
- (thread-sleep! 2)
- (rpc-transport:client-setup run-id (- remtries 1)))))))))
-;;
-;; (port (if (and hostinfo (> (length hostdat) 1))(cadr hostdat) #f)))
-;; (if (and port
-;; (string->number port))
-;; (let ((portn (string->number port)))
-;; (debug:print-info 2 *default-log-port* "Setting up to connect to host " host ":" port)
-;; (handle-exceptions
-;; exn
-;; (begin
-;; (debug:print-error 0 *default-log-port* "Failed to open a connection to the server at host: " host " port: " port)
-;; (debug:print 0 *default-log-port* " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
-;; ;; (open-run-close
-;; ;; (lambda (db . param)
-;; ;; (sqlite3:execute db "DELETE FROM metadat WHERE var='SERVER'"))
-;; ;; #f)
-;; (set! *runremote* #f))
-;; (if (and (not (args:get-arg "-server")) ;; no point in the server using the server using the server
-;; ((rpc:procedure 'server:login host portn) *toppath*))
-;; (begin
-;; (debug:print-info 2 *default-log-port* "Logged in and connected to " host ":" port)
-;; (set! *runremote* (vector host portn)))
-;; (begin
-;; (debug:print-info 2 *default-log-port* "Failed to login or connect to " host ":" port)
-;; (set! *runremote* #f)))))
-;; (debug:print-info 2 *default-log-port* "no server available")))))
-
+(define (rpc-transport:self-test run-id host port)
+ (if (not host)
+ (abort "host not set."))
+ (if (not port)
+ (abort "port not set."))
+ (tcp-buffer-size 0) ;; gotta do this because http-transport undoes it.
+ (let* ((testing-res ((rpc:procedure 'testing host port)))
+ (login-res ((rpc:procedure 'server:login host port) *toppath*))
+ (res (and login-res (equal? testing-res "Just testing"))))
+
+ (if login-res
+ (begin
+ (BB> "Self test PASS. login-res="login-res" testing-res="testing-res" *toppath*="*toppath*)
+ #t)
+ (begin
+ (BB> "Self test fail. login-res="login-res" testing-res="testing-res" *toppath*="*toppath*)
+
+ #f))
+ res))
+
+(define (rpc-transport:client-setup run-id server-dat #!key (remaining-tries 10))
+ ;;(BB> "entered rpc-transport:client-setup with run-id="run-id" and server-dat="server-dat" and retries="remaining-tries)
+ (tcp-buffer-size 0)
+ (debug:print-info 0 *default-log-port* "rpc-transport:client-setup run-id="run-id" server-dat=" server-dat ", remaining-tries=" remaining-tries)
+ (let* ((iface (tasks:hostinfo-get-interface server-dat))
+ (hostname (tasks:hostinfo-get-hostname server-dat))
+ (port (tasks:hostinfo-get-port server-dat))
+ (runremote-server-dat (vector iface port #f #f #f (current-seconds) 'rpc)) ;; http version := (vector iface port api-uri api-url api-req (current-seconds) 'http )
+ (ping-res (retry-thunk (lambda () ;; make 3 attempts to ping.
+ ((rpc:procedure 'server:login iface port) *toppath*))
+ chatty: #f
+ retries: 3)))
+ ;; we got here from rmt:get-connection-info on the condition that *runremote* has no entry for run-id...
+ (if ping-res
+ (begin
+ (debug:print-info 0 *default-log-port* "rpc-transport:client-setup CONNECTION ESTABLISHED run-id="run-id" server-dat=" server-dat)
+ runremote-server-dat)
+ (begin ;; login failed but have a server record, clean out the record and try again
+ (debug:print-info 0 *default-log-port* "rpc-transport:client-setup UNABLE TO CONNECT run-id="run-id" server-dat=" server-dat)
+ (tasks:kill-server-run-id run-id)
+ (tasks:server-force-clean-run-record (db:delay-if-busy (tasks:open-db)) run-id iface port
+ " rpc-transport:client-setup (server-dat = #t)")
+ (if (> remaining-tries 2)
+ (thread-sleep! (+ 1 (random 5))) ;; spread out the starts a little
+ (thread-sleep! (+ 15 (random 20)))) ;; it isn't going well. give it plenty of time
+ (server:try-running run-id)
+ (thread-sleep! 5) ;; give server a little time to start up
+ (client:setup run-id remaining-tries: (sub1 remaining-tries))))))
Index: server.scm
==================================================================
--- server.scm
+++ server.scm
@@ -46,20 +46,24 @@
;; all routes though here end in exit ...
;;
;; start_server
;;
-(define (server:launch run-id transport-type)
- (BB> "server:launch fired for run-id="run-id" transport-type="transport-type)
- (case transport-type
- ((http)(http-transport:launch run-id))
- ;;((nmsg)(nmsg-transport:launch run-id))
- ((rpc) (rpc-transport:launch run-id))
- (else (debug:print-error 0 *default-log-port* "unknown server type " transport-type))))
-;; (else (debug:print-error 0 *default-log-port* "No known transport set, transport=" transport ", using rpc")
-;; (rpc-transport:launch run-id)))))
-
+(define (server:launch run-id transport-type-raw)
+ (let ((transport-type
+ (cond
+ ((string? transport-type-raw) (string->symbol transport-type-raw))
+ (else transport-type-raw))))
+
+ (BB> "server:launch fired for run-id="run-id" transport-type="transport-type)
+
+ (case transport-type
+ ((http)(http-transport:launch run-id))
+ ;;((nmsg)(nmsg-transport:launch run-id))
+ ((rpc) (rpc-transport:launch run-id))
+ (else (debug:print-error 0 *default-log-port* "unknown server type " transport-type)))))
+
;;======================================================================
;; S E R V E R U T I L I T I E S
;;======================================================================
;; Get the transport
@@ -226,11 +230,11 @@
;;
(define (server:check-if-running areapath)
(let* ((dotserver (server:read-dotserver areapath))) ;; tdbdat (tasks:open-db)))
(if dotserver
(let* ((res (case *transport-type*
- ((http)(server:ping-server dotserver))
+ ((http rpc)(server:ping-server dotserver))
;; ((nmsg)(nmsg-transport:ping (tasks:hostinfo-get-interface server)
)))
(if res
dotserver
#f))
@@ -265,11 +269,17 @@
(debug:print 0 *default-log-port* "ERROR: bad host:port"))
(if do-exit (exit 1))
#f)
(let* ((iface (car host-port))
(port (cadr host-port))
- (server-dat (http-transport:client-connect iface port))
+ (server-dat
+ (case (remote-transport *runremote*)
+ ((http) (http-transport:client-connect iface port))
+ ((rpc) (rpc-transport:client-connect iface port))
+ (else
+ (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (4)")
+ (exit))))
(login-res (rmt:login-no-auto-client-setup server-dat)))
(if (and (list? login-res)
(car login-res))
(begin
(print "LOGIN_OK")
Index: tasks.scm
==================================================================
--- tasks.scm
+++ tasks.scm
@@ -170,21 +170,21 @@
(define (tasks:hostinfo-get-pubport vec) (vector-ref vec 3))
(define (tasks:hostinfo-get-transport vec) (vector-ref vec 4))
(define (tasks:hostinfo-get-pid vec) (vector-ref vec 5))
(define (tasks:hostinfo-get-hostname vec) (vector-ref vec 6))
-(define (tasks:server-lock-slot mdb run-id)
+(define (tasks:server-lock-slot mdb run-id transport-type)
(tasks:server-clean-out-old-records-for-run-id mdb run-id " tasks:server-lock-slot")
(if (< (tasks:num-in-available-state mdb run-id) 4)
(begin
- (tasks:server-set-available mdb run-id)
+ (tasks:server-set-available mdb run-id transport-type)
(thread-sleep! (/ (random 1500) 1000)) ;; (thread-sleep! 2) ;; Try removing this. It may not be needed.
(tasks:server-am-i-the-server? mdb run-id))
#f))
;; register that this server may come online (first to register goes though with the process)
-(define (tasks:server-set-available mdb run-id)
+(define (tasks:server-set-available mdb run-id transport-type)
(sqlite3:execute
mdb
"INSERT INTO servers (pid,hostname,port,pubport,start_time, priority,state,mt_version,heartbeat, interface,transport,run_id)
VALUES(?, ?, ?, ?, strftime('%s','now'), ?, ?, ?,-1,?, ?, ?);"
(current-process-id) ;; pid
@@ -194,11 +194,11 @@
(random 1000) ;; priority (used a tiebreaker on get-available)
"available" ;; state
(common:version-signature) ;; mt_version
-1 ;; interface
;; (conc (server:get-transport)) ;; transport
- (conc *transport-type*) ;; transport
+ (symbol->string transport-type) ;; transport
run-id
))
(define (tasks:num-in-available-state mdb run-id)
(let ((res 0))
ADDED thunk-utils.scm
Index: thunk-utils.scm
==================================================================
--- /dev/null
+++ thunk-utils.scm
@@ -0,0 +1,121 @@
+(use srfi-18)
+
+
+;; wrap a proc with a mutex so that two threads may not call proc simultaneously.
+;; will catch exceptions to ensure mutex is unlocked even if exception is thrown.
+;; will generate a unique mutex for proc unless one is specified with canned-mutex: option
+;;
+;; example 1: (define thread-safe-+ (make-synchronized-proc +))
+;; example 2: (define thread-safe-plus
+;; (make-synchronized-proc
+;; (lambda (x y)
+;; (+ x y))))
+
+(define (make-synchronized-proc proc
+ #!key (canned-mutex #f))
+ (let* ((guard-mutex (if canned-mutex canned-mutex (make-mutex)))
+ (guarded-proc ;; we are guarding the thunk against exceptions. We will record whether result of evaluation is an exception or a regular result.
+ (lambda args
+ (mutex-lock! guard-mutex)
+ (let* ((EXCEPTION (gensym)) ;; using gensym to avoid potential collision with a proc that returns a pair having the first element be our flag. gensym guarantees the symbol is unique.
+ (res
+ (condition-case
+ (apply proc args) ;; this is what we are guarding the execution of
+ [x () (cons EXCEPTION x)]
+ )))
+ (mutex-unlock! guard-mutex)
+ (cond
+ ((and (pair? res) (eq? (car res) EXCEPTION))
+ (raise (cdr res)))
+ (else
+ res))))))
+ guarded-proc))
+
+
+;; retry an operation (depends on srfi-18)
+;; ==================
+;; idea here is to avoid spending time on coding retrying something. Trying to be generic here.
+;;
+;; Exception handling:
+;; -------------------
+;; if evaluating the thunk results in exception, it will be retried.
+;; on last try, if final-failure-returns-actual is true, the exception will be re-thrown to caller.
+;;
+;; look at options below #!key to see how to configure behavior
+;;
+;;
+
+(define (retry-thunk
+ the-thunk
+ #!key ;;;; options below
+ (accept-result? (lambda (x) x)) ;; retry if predicate applied to thunk's result is false
+ (retries 4) ;; how many tries
+ (failure-value #f) ;; return this on final failure, unless following option is enabled:
+ (final-failure-returns-actual #f) ;; on failure, on the last try, just return the result, not failure-value
+
+ (retry-delay 0.1) ;; delay between tries
+ (back-off-factor 1) ;; multiply retry-delay by this factor on retry
+ (random-delay 0.1) ;; add a random portion of this value to wait
+
+ (chatty #f) ;; print status as we go, for debugging.
+ )
+
+ (when chatty (print) (print "Entered retry-thunk") (print "-=-=-=-=-=-"))
+ (let* ((guarded-thunk ;; we are guarding the thunk against exceptions. We will record whether result of evaluation is an exception or a regular result.
+ (lambda ()
+ (let* ((EXCEPTION (gensym)) ;; using gensym to avoid potential collision
+ (res
+ (condition-case
+ (the-thunk) ;; this is what we are guarding the execution of
+ [x () (cons EXCEPTION x)]
+ )))
+ (cond
+ ((and (pair? res) (eq? (car res) EXCEPTION))
+ (if chatty
+ (print " - the-thunk threw exception >"(cdr res)"<"))
+ (cons 'exception (cdr res)))
+ (else
+ (if chatty
+ (print " - the-thunk returned result >"res"<"))
+ (cons 'regular-result res)))))))
+
+ (let loop ((guarded-res (guarded-thunk))
+ (retries-left retries)
+ (fail-wait retry-delay))
+ (if chatty (print " =========="))
+ (let* ((wait-time (+ fail-wait (+ (* fail-wait back-off-factor)
+ (* random-delay
+ (/ (random 1024) 1024) ))))
+ (res-type (car guarded-res))
+ (res-value (cdr guarded-res)))
+ (cond
+ ((and (eq? res-type 'regular-result) (accept-result? res-value))
+ (if chatty (print " + return result that satisfied accept-result? >"res-value"<"))
+ res-value)
+
+ ((> retries-left 0)
+ (if chatty (print " - sleep "wait-time))
+ (thread-sleep! wait-time)
+ (if chatty (print " + retry ["retries-left" tries left]"))
+ (loop (guarded-thunk)
+ (sub1 retries-left)
+ wait-time))
+
+ ((eq? res-type 'regular-result)
+ (if final-failure-returns-actual
+ (begin
+ (if chatty (print " + last try failed- return the result >"res-value"<"))
+ res-value)
+ (begin
+ (if chatty (print " + last try failed- return canned failure value >"failure-value"<"))
+ failure-value)))
+
+ (else ;; no retries left; result was not accepted and res-type can only be 'exception
+ (if final-failure-returns-actual
+ (begin
+ (if chatty (print " + last try failed with exception- re-throw it >"res-value"<"))
+ (abort res-value)); re-raise the exception. TODO: find a way for call-history to show as though from entry to this function
+ (begin
+ (if chatty (print " + last try failed with exception- return canned failure value >"failure-value"<"))
+ failure-value))))))))
+