Megatest

rpc-transport.scm at [e5d7dac0b8]
Login

File rpc-transport.scm artifact 1a12c353bd part of check-in e5d7dac0b8



;; Copyright 2006-2016, Matthew Welland.
;; 
;;  This program is made available under the GNU GPL version 2.0 or
;;  greater. See the accompanying file COPYING for details.
;; 
;;  This program is distributed WITHOUT ANY WARRANTY; without even the
;;  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
;;  PURPOSE.

(require-extension (srfi 18) extras tcp s11n rpc)
(import (prefix rpc rpc:))

(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest)
(import (prefix sqlite3 sqlite3:))

(declare (unit rpc-transport))

(declare (uses common))
(declare (uses db))
(declare (uses tests))
(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running.

(include "common_records.scm")
(include "db_records.scm")

(define *heartbeat-mutex* (make-mutex))
(define *server-loop-heart-beat* (current-seconds))


;; procstr is the name of the procedure to be called as a string
(define (rpc-transport:autoremote procstr params)  ;; may be unused, I think api-exec deprecates this one.
  (let* ((procsym (if (symbol? procstr)
                     procstr
                     (string->symbol (->string procstr))))
        (res
         (begin
           (apply (eval procsym) params))))
    res))


;; rpc receiver
(define (rpc-transport:api-exec cmd params)
  (let* ( (resdat  (api:execute-requests *dbstruct-db* (vector cmd params))) ;; #( flag result )
          (flag    (vector-ref resdat 0))
          (res     (vector-ref resdat 1)))

    (mutex-lock! *heartbeat-mutex*)

    (set! *last-db-access* (current-seconds)) ;; bump *last-db-access*; this will renew keep-running thread's lease on life for another (server:get-timeout) seconds
    ;;(BB> "in api-exec; last-db-access updated to "*last-db-access*)
    (mutex-unlock! *heartbeat-mutex*)

    res))


;; retry an operation (depends on srfi-18)
;; ==================
;; idea here is to avoid spending time on coding retrying something.  Trying to be generic here.
;;
;; Exception handling:
;; -------------------
;; if evaluating the thunk results in exception, it will be retried.
;; on last try, if final-failure-returns-actual is true, the exception will be re-thrown to caller.
;;
;; look at options below #!key to see how to configure behavior
;;
;;
(define (retry-thunk
         the-thunk
         #!key ;;;; options below
         (accept-result?   (lambda (x) x)) ;; retry if predicate applied to thunk's result is false 
         (retries                       4) ;; how many tries
         (failure-value                #f) ;; return this on final failure, unless following option is enabled:
         (final-failure-returns-actual #f) ;; on failure, on the last try, just return the result, not failure-value

         (retry-delay                 0.1) ;; delay between tries
         (back-off-factor               1) ;; multiply retry-delay by this factor on retry
         (random-delay                0.1) ;; add a random portion of this value to wait

         (chatty                       #f) ;; print status as we go, for debugging.
         )
  
  (when chatty (print) (print "Entered retry-thunk") (print "-=-=-=-=-=-"))
  (let* ((guarded-thunk ;; we are guarding the thunk against exceptions.  We will record whether result of evaluation is an exception or a regular result.
          (lambda ()
           (let* ((EXCEPTION (gensym)) ;; using gensym to avoid potential collision
                  (res
                   (condition-case
                    (the-thunk) ;; this is what we are guarding the execution of
                    [x () (cons EXCEPTION x)]
                    )))
             (cond
              ((and (pair? res) (eq? (car res) EXCEPTION))
               (if chatty
                   (print " - the-thunk threw exception >"(cdr res)"<"))
               (cons 'exception (cdr res)))
               (else
                (if chatty
                    (print " - the-thunk returned result >"res"<"))
                (cons 'regular-result res)))))))
    
    (let loop ((guarded-res (guarded-thunk))
               (retries-left retries)
               (fail-wait retry-delay))
      (if chatty (print "   =========="))
      (let* ((wait-time (+ fail-wait (+ (* fail-wait back-off-factor)
                                        (* random-delay
                                           (/ (random 1024) 1024) ))))
             (res-type (car guarded-res))
             (res-value (cdr guarded-res)))
        (cond
         ((and (eq? res-type 'regular-result) (accept-result? res-value))
                   (if chatty (print " + return result that satisfied accept-result? >"res-value"<"))
                   res-value)

         ((> retries-left 0)
          (if chatty (print " - sleep "wait-time))
          (thread-sleep! wait-time)
          (if chatty (print " + retry ["retries-left" tries left]"))
          (loop (guarded-thunk)
                (sub1 retries-left)
                wait-time))
         
         ((eq? res-type 'regular-result)
          (if final-failure-returns-actual
              (begin
                (if chatty (print " + last try failed- return the result >"res-value"<"))
                res-value)
              (begin
                (if chatty (print " + last try failed- return canned failure value >"failure-value"<"))
              failure-value)))
         
         (else ;; no retries left; result was not accepted and res-type can only be 'exception
          (if final-failure-returns-actual 
              (begin
                (if chatty (print " + last try failed with exception- re-throw it >"res-value"<"))
                (abort res-value)); re-raise the exception. TODO: find a way for call-history to show as though from entry to this function
              (begin
                (if chatty (print " + last try failed with exception- return canned failure value >"failure-value"<"))
                failure-value))))))))


(define (rpc-transport:server-shutdown server-id rpc:listener ) ;;#!key (from-on-exit #f))
  ;;(on-exit (lambda () #t)) ;; turn off on-exit stuff
  ;;(tcp-close rpc:listener) ;; gotta exit nicely
  ;;(tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "stopped")


  ;; TODO: (low) the following is extraordinaritly slow.  Maybe we don't even need portlogger for rpc anyway??  the exception-based failover when ports are taken is fast!
  ;;(portlogger:open-run-close portlogger:set-port (rpc:default-server-port) "released")
  
  (set! *time-to-exit* #t)
  ;;(if *dbstruct-db* (db:sync-touched *dbstruct-db* *run-id* force-sync: #t))
  
  (server:remove-dotserver-file *toppath* "anyhost:anyport" force: #t)
  (tasks:server-delete-record (db:delay-if-busy (tasks:open-db)) server-id " rpc-transport:keep-running complete")
  

  ;;(BB> "Before (exit) (from-on-exit="from-on-exit")")
  ;;(unless from-on-exit (exit))  ;; sometimes we hang (around) here with 100% cpu.
  ;;(BB> "After")
  ;; strace reveals endless:
  ;; getrusage(RUSAGE_SELF, {ru_utime={413, 917868}, ru_stime={0, 60003}, ...}) = 0
  ;; getrusage(RUSAGE_SELF, {ru_utime={414, 9874}, ru_stime={0, 60003}, ...}) = 0
  ;; getrusage(RUSAGE_SELF, {ru_utime={414, 13874}, ru_stime={0, 60003}, ...}) = 0
  ;; getrusage(RUSAGE_SELF, {ru_utime={414, 105880}, ru_stime={0, 60003}, ...}) = 0
  ;; getrusage(RUSAGE_SELF, {ru_utime={414, 109880}, ru_stime={0, 60003}, ...}) = 0
  ;; getrusage(RUSAGE_SELF, {ru_utime={414, 201886}, ru_stime={0, 60003}, ...}) = 0
  ;; getrusage(RUSAGE_SELF, {ru_utime={414, 205886}, ru_stime={0, 60003}, ...}) = 0
  ;; getrusage(RUSAGE_SELF, {ru_utime={414, 297892}, ru_stime={0, 60003}, ...}) = 0
  ;; getrusage(RUSAGE_SELF, {ru_utime={414, 301892}, ru_stime={0, 60003}, ...}) = 0
  ;; getrusage(RUSAGE_SELF, {ru_utime={414, 393898}, ru_stime={0, 60003}, ...}) = 0
  ;; getrusage(RUSAGE_SELF, {ru_utime={414, 397898}, ru_stime={0, 60003}, ...}) = 0
  ;; make a post to chicken-users w/ http://paste.call-cc.org/paste?id=60a4b66a29ccf7d11359ea866db642c970735978


  ;; (if from-on-exit
  ;;     ;; avoid above condition!  End current process externally  since 1 in 20 (exit)'s result in hung, 100% cpu zombies. (see above)
  
  (system (conc  "kill -9  "(current-process-id)))
  )


;; all routes though here end in exit ...
;;
;; start_server? 
;;
(define (rpc-transport:launch run-id)
  (set! *run-id*   run-id)

  ;; ;; send to background if requested
  ;; (when (args:get-arg "-daemonize")
  ;;     (daemon:ize)
  ;;     (when *alt-log-file* ;; we should re-connect to this port, I think daemon:ize disrupts it
  ;;       (current-error-port *alt-log-file*)
  ;;       (current-output-port *alt-log-file*)))

  ;; double check we dont alrady have a running server for this run-id
  (when (and (server:read-dotserver *toppath*)
             (server:check-if-running run-id))
    (debug:print 0 *default-log-port* "INFO: Server for run-id " run-id " already running")
    (exit 0))

  ;; did not find server running, let's clean up the table of dead servers
  (tasks:server-force-clean-running-records-for-run-id  (db:delay-if-busy (tasks:open-db)) run-id "notresponding")  

  (server:dotserver-starting)

  


  
  ;; let's get a server-id for this server
  ;;   if at first we do not suceed, try 3 more times.
  (let ((server-id (retry-thunk
                    (lambda () (tasks:server-lock-slot (db:delay-if-busy (tasks:open-db)) run-id 'rpc))
                    chatty: #f
                    final-failure-returns-actual: #t
                    retries: 4)))
    (when (not server-id) ;; dang we couldn't get a server-id.
      ;; since we didn't get the server lock we are going to clean up and bail out


      (debug:print-info 2 *default-log-port* "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue")
      (tasks:server-delete-records-for-this-pid (db:delay-if-busy (tasks:open-db)) " rpc-transport:launch")
      (server:dotserver-starting-remove)
      (exit 1))

    ;; we got a server-id (and a corresponding entry in servers table in globally shared mdb)
    ;; all systems go.  Proceed to setup rpc server.  
    (rpc-transport:run
     (if (args:get-arg "-server")
         (args:get-arg "-server")
         "-")
     run-id
     server-id)
    (exit)))

(define *rpc-listener-port* #f)
(define *rpc-listener-port-bind-timestamp* #f)

(define *on-exit-flag #f)

(define (rpc-transport:server-dat-get-iface         vec)    (vector-ref  vec 0))
(define (rpc-transport:server-dat-get-port          vec)    (vector-ref  vec 1))
(define (rpc-transport:server-dat-get-last-access   vec)    (vector-ref  vec 5))
(define (rpc-transport:server-dat-get-transport     vec)    (vector-ref  vec 6))
(define (rpc-transport:server-dat-update-last-access vec)
  (if (vector? vec)
      (vector-set! vec 5 (current-seconds))
      (begin
	(print-call-chain (current-error-port))
	(debug:print-error 0 *default-log-port* "call to rpc-transport:server-dat-update-last-access with non-vector!!"))))


(define *api-exec-ht* (make-hash-table))
(define *api-exec-mutex* (make-mutex))
;; let's see if caching the rpc stub curbs thread-profusion on server side
(define (rpc-transport:get-api-exec iface port)
  (mutex-lock! *api-exec-mutex*)
  (let* ((lu (hash-table-ref/default *api-exec-ht* (cons iface  port) #f)))
    (if lu
        (begin
          (mutex-unlock! *api-exec-mutex*)
          lu)
        (let ((res (rpc:procedure 'api-exec iface port)))
          (hash-table-set! *api-exec-ht* (cons iface port) res)
          (mutex-unlock! *api-exec-mutex*)
          res))))

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; this client-side procedure makes rpc call to server and returns result
;;
(define (rpc-transport:client-api-send-receive run-id serverdat cmd params #!key (numretries 3))
  ;;(BB> "entered rpc-transport:client-api-send-receive with run-id="run-id " serverdat="serverdat" cmd="cmd" params="params" numretries="numretries)
  (if (not (vector? serverdat))
      (begin
        (BB> "WHAT?? for run-id="run-id", serverdat="serverdat)
        (print-call-chain)
        (exit 1)))
  (let* ((iface (rpc-transport:server-dat-get-iface serverdat))
         (port  (rpc-transport:server-dat-get-port serverdat))
         (res #f)
         (api-exec (rpc-transport:get-api-exec iface port))  ;; chached by host/port. may need to clear...
         (send-receive (lambda ()
                         (tcp-buffer-size 0)
                         (set! res (retry-thunk
                                    (lambda ()
                                      (condition-case
                                       ;;(vector #t (run-remote cmd params))
                                       (vector 'success (api-exec cmd params))
                                       [x (exn i/o net) (vector 'comms-fail (conc "communications fail ["(->string x)"]") x)]
                                       [x () (vector 'other-fail "other fail ["(->string x)"]" x)]))
                                    chatty: #f
                                    accept-result?: (lambda(x)
                                                      (and (vector? x) (vector-ref x 0)))
                                    retries: 8
                                    back-off-factor: 1.5
                                    random-wait: 0.2
                                    retry-delay: 0.1
                                    final-failure-returns-actual: #t))
                         ;;(BB> "HEY res="res)
                         res
                         ))
         (th1 (make-thread send-receive "send-receive"))
         (time-out-reached #f)
         (time-out     (lambda ()
			      (thread-sleep! 45)
                              (set! time-out-reached #t)
                              (thread-terminate! th1)
			      #f))

         (th2 (make-thread time-out     "time out")))
	 (thread-start! th1)
	 (thread-start! th2)
	 (thread-join! th1)
	 (thread-terminate! th2)
         ;;(BB> "alt got res="res)
	 (debug:print-info 11 *default-log-port* "got res=" res)
	 (if (vector? res)
             (case (vector-ref res 0)
               ((success) (vector #t (vector-ref res 1)))
               (
                (comms-fail other-fail)
                ;;(comms-fail) 
                (debug:print 0 *default-log-port* "WARNING: comms failure for rpc request >>"res"<<")
                ;;(debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn))
                (vector #f (vector-ref res 1)))
               (else
                (debug:print-error 0 *default-log-port* "error occured at server, info=" (vector-ref res 1))
                (debug:print 0 *default-log-port* " client call chain:")
                (print-call-chain (current-error-port))
                (debug:print 0 *default-log-port* " server call chain:")
                (pp (vector-ref res 1) (current-error-port))
                (signal (vector-ref res 2))))
             (signal (make-composite-condition
                      (make-property-condition 
             	       'timeout
             	       'message "nmsg-transport:client-api-send-receive-raw timed out talking to server"))))))
        


(define (rpc-transport:run hostn run-id server-id)
  (debug:print 2 *default-log-port* "Attempting to start the rpc server ...")
   ;; (trace rpc:publish-procedure!)

  ;;======================================================================
  ;;	  start of publish-procedure section
  ;;======================================================================
  (rpc:publish-procedure! 'server:login server:login) ;; this allows client to validate it is the same megatest instance as the server.  No security here, just making sure we're in the right room.
  (rpc:publish-procedure!
   'testing
   (lambda ()
     "Just testing"))

  ;; procedure to receive arbitrary API request from client's rpc:send-receive/rpc-transport:client-api-send-receive 
  (rpc:publish-procedure! 'rpc-transport:autoremote rpc-transport:autoremote)
  ;; can use this to run most anything at the remote
  (rpc:publish-procedure! 'api-exec rpc-transport:api-exec)
  
  
  ;;======================================================================
  ;;	  end of publish-procedure section
  ;;======================================================================


  (let* ((db              #f)
	 (hostname        (let ((res (get-host-name)))  res))
         (server-start-time (current-seconds))
         (server-timeout (server:get-timeout))
	 (ipaddrstr       (let* ((ipstr (if (string=? "-" hostn)
					   ;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
					   (server:get-best-guess-address hostname)
					   #f))
                                 (res (if ipstr ipstr hostn)))
                            res)) ;; hostname))) 
	 (start-port      (let ((res (portlogger:open-run-close portlogger:find-port)))      ;; BB> TODO: remove portlogger!
                            res))
	 (link-tree-path  (configf:lookup *configdat* "setup" "linktree"))

         ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
         ;; rpc:listener is the tcp-listen result from inside the find-free-port-and-open complex.
         ;;   It is our handle on the listening tcp port
         ;;   We will attach this to our rpc server with rpc:make-server in thread th1 .
	 (rpc:listener    (rpc-transport:find-free-port-and-open start-port)) 
	 (th1             (make-thread
			   (lambda ()
                             ;;(BB> "BEFORE rpc:make-server")
			     ((rpc:make-server rpc:listener) #t)
                             ;;(BB> "BEFORE rpc:make-server")
                             )
			   "rpc:server"))


         (hostname        (if (string=? "-" hostn)
			      (get-host-name) 
			      hostn))
	 (ipaddrstr       (if (string=? "-" hostn)
			      (server:get-best-guess-address hostname) ;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
                              (string-intersperse 
                               (map number->string
                                    (u8vector->list
                                      (hostname->ip hostn))) ".")
                              ))
	 (portnum         (let ((res (rpc:default-server-port)))  res))
	 (host:port       (conc (if ipaddrstr ipaddrstr hostname) ":" portnum)))

    (when (not (equal? ipaddrstr (server:get-best-guess-address (get-host-name))))
        
      (debug:print 0 *default-log-port* "Error: This host "(ip->string (hostname->ip (get-host-name)))" ("(get-host-name)") is not the homehost "ipaddrstr" ("(ip->hostname (string->ip ipaddrstr))"; Cannot proceed.")
      (server:dotserver-starting-remove)
      (tcp-close rpc:listener) ;; gotta exit nicely and free up that tcp port
      (exit))
    
    (tasks:server-set-interface-port (db:delay-if-busy (tasks:open-db)) server-id ipaddrstr portnum)

    ;;============================================================
    ;;  activate thread th1 to attach opened tcp port to rpc server
    ;;=============================================================
    (thread-start! th1)
    (set! db *dbstruct-db*)

    (debug:print 0 *default-log-port* "Server started on " host:port)
    ;;(BB> "before SELF-TEST")
    (if (retry-thunk (lambda ()
                       (rpc-transport:self-test run-id ipaddrstr portnum))
                     final-failure-returns-actual: #t ;; TODO: remove this line
                     )
        (debug:print 0 *default-log-port* "INFO: rpc self test passed!")
        (begin
          (debug:print 0 *default-log-port* "Error: rpc listener did not pass self test.  Shutting down.  On: " host:port)
          (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "dead")
          (tcp-close rpc:listener) ;; gotta exit nicely and free up that tcp port
          (rpc-transport:server-shutdown server-id rpc:listener)
          (server:dotserver-starting-remove)
          (exit)))




    ;;(on-exit (lambda ()
    ;;           (rpc-transport:server-shutdown server-id rpc:listener from-on-exit: #t)))
    
    ;; check again for running servers for this run-id in case one has snuck in since we checked last in rpc-transport:launch
    (if (not (equal? server-id (tasks:server-am-i-the-server? (db:delay-if-busy (tasks:open-db)) run-id)));; try to ensure no double registering of servers
        (begin ;; i am not the server, another server snuck in and beat this one to the punch
          (tcp-close rpc:listener) ;; gotta exit nicely and free up that tcp port
          (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "collision")
          (server:dotserver-starting-remove))

        (begin ;; i am the server
          ;; setup the in-memory db
          (set! *dbstruct-db*  (db:setup run-id))
          (db:get-db *dbstruct-db* run-id)

          ;; at this point, satisfied server has started
          ;; let's make it official
          (server:write-dotserver *toppath* (conc ipaddrstr ":" portnum))
          (mutex-lock! *heartbeat-mutex*)
          (set! *last-db-access* (current-seconds))
          (mutex-unlock! *heartbeat-mutex*)
          (set! *rpc:listener* rpc:listener) 
          (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "running") ;; update our mdb servers entry

          
          
          ;; this let loop will hold open this thread until we want the server to shut down.
          ;;   if no requests received within the last 20 seconds :
          ;;   database hasnt changed in ??
          ;;


          
          ;; keep-running loop: polls last-db-access to see if we have timed out.  keep running if not. 
          (let loop ((count          0)
                     (bad-sync-count 0))
            (BB> "keep running: count = "count)
            ;; Use this opportunity to sync the inmemdb to db

            (let ((start-time (current-milliseconds))
                  (sync-time  #f)
                  (rem-time   #f))

              ;; following is now done in common:watchdog, commenting out.  sync-time will now be 0; can live with that.
              ;; ;; inmemddb is a dbstruct
              ;; (condition-case
              ;;  (db:sync-touched *dbstruct-db* *run-id* force-sync: #t)
              ;;  ((sync-failed)(cond
              ;;                 ((> bad-sync-count 10) ;; time to give up
              ;;                  (rpc-transport:server-shutdown server-id rpc:listener))
              ;;                 (else ;; (> bad-sync-count 0)  ;; we've had a fail or two, delay and loop
              ;;                  (thread-sleep! 5)
              ;;                  (loop count (+ bad-sync-count 1)))))
              ;;  ((exn)
              ;;   (debug:print-error 0 *default-log-port* "error from sync code other than 'sync-failed. Attempting to gracefully shutdown the server ")
              ;;   (rpc-transport:server-shutdown server-id rpc:listener)))
              
              (set! sync-time  (- (current-milliseconds) start-time))
              (set! rem-time (quotient (- 4000 sync-time) 1000))
              (debug:print 4 *default-log-port* "SYNC: time= " sync-time ", rem-time=" rem-time)
              
              (if (and (<= rem-time 4)
                       (> rem-time 0))
                  (thread-sleep! rem-time)
                  (thread-sleep! 4))) ;; fallback for if the math is changed ...
            
            (if (< count 1) ;; 3x3 = 9 secs aprox
                (loop (+ count 1) bad-sync-count))

            ;; BB: don't see how this is possible with RPC
            ;; ;; Check that iface and port have not changed (can happen if server port collides)
            ;; (mutex-lock! *heartbeat-mutex*)
            ;; (set! sdat *server-info*)
            ;; (mutex-unlock! *heartbeat-mutex*)
            
            ;; (if (or (not (equal? sdat (list iface port)))
            ;;         (not server-id))
            ;;     (begin 
            ;;       (debug:print-info 0 *default-log-port* "interface changed, refreshing iface and port info")
            ;;       (set! iface (car sdat))
            ;;       (set! port  (cadr sdat))))
            
            ;; Transfer *last-db-access* to last-access to use in checking that we are still alive
            (mutex-lock! *heartbeat-mutex*)
            (set! last-access *last-db-access*)
            (mutex-unlock! *heartbeat-mutex*)
            
            ;; (debug:print 11 *default-log-port* "last-access=" last-access ", server-timeout=" server-timeout)
            ;;
            ;; no_traffic, no running tests, if server 0, no running servers
            ;;
            ;; (let ((wait-on-running (configf:lookup *configdat* "server" b"wait-on-running"))) ;; wait on running tasks (if not true then exit on time out)
            ;;
            (let* ((hrs-since-start  (/ (- (current-seconds) server-start-time) 3600))
                   (adjusted-timeout (if (> hrs-since-start 1)
                                         (- server-timeout (inexact->exact (round (* hrs-since-start 60))))  ;; subtract 60 seconds per hour
                                         server-timeout)))
              (if (common:low-noise-print 120 "server timeout")
                  (debug:print-info 0 *default-log-port* "Adjusted server timeout: " adjusted-timeout))
              (if (and *server-run*
                       (> (+ last-access server-timeout)
                          (current-seconds)))
                  (begin
                    (if (common:low-noise-print 120 "server continuing")
                        (debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access)))
                    ;;
                    ;; Consider implementing some smarts here to re-insert the record or kill self is
                    ;; the db indicates so
                    ;;
                    (if (tasks:server-am-i-the-server? (db:delay-if-busy (tasks:open-db)) run-id)
                        (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "running"))
                    ;;
                    (loop 0 bad-sync-count))
                  (begin
                    ;;(BB> "SERVER SHUTDOWN CALLED!  last-access="last-access" current-seconds="(current-seconds)" server-timeout="server-timeout)
                    
                    (rpc-transport:server-shutdown server-id rpc:listener)))))
          ;; end new loop
          ))))


(define (rpc-transport:find-free-port-and-open port #!key )
  (handle-exceptions
   exn
   (begin
     (print "Failed to bind to port " (rpc:default-server-port) ", trying next port")
     (rpc-transport:find-free-port-and-open (add1 port)))
   (rpc:default-server-port port)
   (set! *rpc-listener-port* port) ;; a bit paranoid about rpc:default-server-port parameter not changing across threads (as params are wont to do).  keeping this global in my back pocket in case this causes problems
   (set! *rpc-listener-port-bind-timestamp* (current-milliseconds)) ;; may want to test how long it has been since the last bind attempt happened...
   (tcp-read-timeout 240000)
   (tcp-buffer-size 0) ;; gotta do this because http-transport undoes it.
   (tcp-listen (rpc:default-server-port) 10000)
   ))
  
(define (rpc-transport:ping run-id host port)
  (handle-exceptions
   exn
   (begin
     (print "SERVER_NOT_FOUND exn="exn)
     (exit 1))
   (let ((login-res ((rpc:procedure 'server:login host port) *toppath*)))
     (if login-res
	 (begin
	   (print "LOGIN_OK")
	   (exit 0))
	 (begin
	   (print "LOGIN_FAILED")
	   (exit 1))))))

(define (rpc-transport:self-test run-id host port)
  (if (not host)
      (abort "host not set."))
  (if (not port)
      (abort "port not set."))
  (tcp-buffer-size 0) ;; gotta do this because http-transport undoes it.
  (let* ((testing-res ((rpc:procedure 'testing host port)))
         (login-res ((rpc:procedure 'server:login host port) *toppath*))
         (res (and login-res (equal? testing-res "Just testing"))))
    
    (if login-res
        (begin
          ;;(BB> "Self test PASS.  login-res="login-res" testing-res="testing-res" *toppath*="*toppath*)
          #t)
        (begin
          (BB> "Self test fail.  login-res="login-res" testing-res="testing-res" *toppath*="*toppath*)
           
          #f))
    res))

(define (rpc-transport:client-setup run-id server-dat #!key (remaining-tries 10))
  ;;(BB> "entered rpc-transport:client-setup with run-id="run-id" and server-dat="server-dat" and retries="remaining-tries)
  (tcp-buffer-size 0)
  (debug:print-info 0 *default-log-port* "rpc-transport:client-setup run-id="run-id" server-dat=" server-dat ", remaining-tries=" remaining-tries)
  (let* ((iface     (tasks:hostinfo-get-interface server-dat))
         (hostname  (tasks:hostinfo-get-hostname  server-dat))
         (port      (tasks:hostinfo-get-port      server-dat))
         (runremote-server-dat (vector iface port #f #f #f (current-seconds) 'rpc)) ;; http version := (vector iface port api-uri api-url api-req (current-seconds) 'http  )
         (ping-res (retry-thunk (lambda ()  ;; make 3 attempts to ping.
                                  ((rpc:procedure 'server:login iface port) *toppath*))
                                chatty: #f
                                retries: 3)))
    ;; we got here from rmt:get-connection-info on the condition that *runremote* has no entry for run-id...
    (if ping-res
        (begin
          (debug:print-info 0 *default-log-port* "rpc-transport:client-setup CONNECTION ESTABLISHED run-id="run-id" server-dat=" server-dat)
          runremote-server-dat)
        (begin ;; login failed but have a server record, clean out the record and try again
          (debug:print-info 0 *default-log-port* "rpc-transport:client-setup UNABLE TO CONNECT run-id="run-id" server-dat=" server-dat)
          (tasks:kill-server-run-id run-id)
          (tasks:server-force-clean-run-record  (db:delay-if-busy (tasks:open-db)) run-id iface port
                                                   " rpc-transport:client-setup (server-dat = #t)")
          (if (> remaining-tries 2)
              (thread-sleep! (+ 1 (random 5))) ;; spread out the starts a little
              (thread-sleep! (+ 15 (random 20)))) ;; it isn't going well. give it plenty of time
          (server:try-running run-id)
          (thread-sleep! 5)   ;; give server a little time to start up
          (client:setup run-id remaining-tries: (sub1 remaining-tries))))))