Index: db.scm
==================================================================
--- db.scm
+++ db.scm
@@ -17,10 +17,13 @@
 ;; (import (prefix rpc rpc:))
 
 (use sqlite3 srfi-1 posix regex regex-case srfi-69 csv-xml s11n md5 message-digest base64)
 (import (prefix sqlite3 sqlite3:))
 (import (prefix base64 base64:))
+
+;; Note, try to remove this dependency 
+(use zmq)
 
 (declare (unit db))
 (declare (uses common))
 (declare (uses keys))
 (declare (uses ods))
@@ -1105,22 +1108,33 @@
 ;;   (let loop ()
 ;;     (thread-sleep! 10) ;; move save time around to minimize regular collisions?
 ;;     (db:write-cached-data)
 ;;     (loop)))
 
+;; NOTE: Can remove the regex and base64 encoding for zmq
 (define (db:obj->string obj)
-  (string-substitute
-   (regexp "=") "_"
-   (base64:base64-encode (with-output-to-string (lambda ()(serialize obj))))
-   #t))
+  (case *transport-type*
+    ((fs) obj)
+	((http)
+     (string-substitute
+       (regexp "=") "_"
+         (base64:base64-encode (with-output-to-string (lambda ()(serialize obj))))
+        #t))
+    ((zmq)(with-output-to-string (lambda ()(serialize obj))))
+    (else obj)))
 
 (define (db:string->obj msg)
-  (with-input-from-string 
-      (base64:base64-decode
-       (string-substitute 
-	(regexp "_") "=" msg #t))
-    (lambda ()(deserialize))))
+  (case *transport-type*
+   ((fs) msg)
+   ((http)
+    (with-input-from-string 
+       (base64:base64-decode
+         (string-substitute 
+	   (regexp "_") "=" msg #t))
+       (lambda ()(deserialize))))
+   ((zmq)(with-input-from-string msg (lambda ()(deserialize))))
+   (else msg)))
 
 (define (cdb:use-non-blocking-mode proc)
   (set! *client-non-blocking-mode* #t)
   (let ((res (proc)))
     (set! *client-non-blocking-mode* #f)
@@ -1150,12 +1164,60 @@
        (let* ((res  #f)
 	      (rawdat      (server:client-send-receive serverdat zdat))
 	      (tmp         #f))
 	 (debug:print-info 11 "Sent " zdat ", received " rawdat)
 	 (set! tmp (db:string->obj rawdat))
-	 (vector-ref tmp 2)
-	 )))))
+	 (vector-ref tmp 2))))
+    ((zmq)
+     (handle-exceptions
+      exn
+      (begin
+	(thread-sleep! 5) 
+	(if (> numretries 0)(apply cdb:client-call zmq-sockets qtype immediate (- numretries 1) params)))
+      (let* ((push-socket (vector-ref zmq-sockets 0))
+	     (sub-socket  (vector-ref zmq-sockets 1))
+	     (client-sig  (server:get-client-signature))
+	     (query-sig   (message-digest-string (md5-primitive) (conc qtype immediate params)))
+	     (zdat        (db:obj->string (vector client-sig qtype immediate query-sig params (current-seconds)))) ;; (with-output-to-string (lambda ()(serialize params))))
+	     (res  #f)
+	     (send-receive (lambda ()
+			     (debug:print-info 11 "sending message")
+			     (send-message push-socket zdat)
+			     (debug:print-info 11 "message sent")
+			     (let loop ()
+			       ;; get the sender info
+			       ;; this should match (server:get-client-signature)
+			       ;; we will need to process "all" messages here some day
+			       (receive-message* sub-socket)
+			       ;; now get the actual message
+			       (let ((myres (db:string->obj (receive-message* sub-socket))))
+				 (if (equal? query-sig (vector-ref myres 1))
+				     (set! res (vector-ref myres 2))
+				     (loop))))))
+	     (timeout (lambda ()
+			(let loop ((n numretries))
+			  (thread-sleep! 15)
+			  (if (not res)
+			      (if (> numretries 0)
+				  (begin
+				    (debug:print 2 "WARNING: no reply to query " params ", trying resend")
+				    (debug:print-info 11 "re-sending message")
+				    (send-message push-socket zdat)
+				    (debug:print-info 11 "message re-sent")
+				    (loop (- n 1)))
+				  ;; (apply cdb:client-call zmq-sockets qtype immediate (- numretries 1) params))
+				  (begin
+				    (debug:print 0 "ERROR: cdb:client-call timed out " params ", exiting.")
+				    (exit 5))))))))
+	(debug:print-info 11 "Starting threads")
+	(let ((th1 (make-thread send-receive "send receive"))
+	      (th2 (make-thread timeout      "timeout")))
+	  (thread-start! th1)
+	  (thread-start! th2)
+	  (thread-join!  th1)
+	  (debug:print-info 11 "cdb:client-call returning res=" res)
+	  res))))))
   
 (define (cdb:set-verbosity serverdat val)
   (cdb:client-call serverdat 'set-verbosity #f *default-numtries* val))
 
 (define (cdb:login serverdat keyval signature)

Index: tasks.scm
==================================================================
--- tasks.scm
+++ tasks.scm
@@ -61,10 +61,11 @@
 	  (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS servers (id INTEGER PRIMARY KEY,
                                   pid INTEGER,
                                   interface TEXT,
                                   hostname TEXT,
                                   port INTEGER,
+                                  pubport INTEGER,
                                   start_time TIMESTAMP,
                                   priority INTEGER,
                                   state TEXT,
                                   mt_version TEXT,
                                   heartbeat TIMESTAMP,
@@ -84,21 +85,22 @@
 ;;======================================================================
 ;; Server and client management
 ;;======================================================================
 
 ;; state: 'live, 'shutting-down, 'dead
-(define (tasks:server-register mdb pid interface port priority state)
+(define (tasks:server-register mdb pid interface port priority state #!key (pubport -1))
   (debug:print-info 11 "tasks:server-register " pid " " interface " " port " " priority " " state)
   (sqlite3:execute 
    mdb 
-   "INSERT OR REPLACE INTO servers (pid,hostname,port,start_time,priority,state,mt_version,heartbeat,interface)
+   "INSERT OR REPLACE INTO servers (pid,hostname,port,pubport,start_time,priority,state,mt_version,heartbeat,interface)
                              VALUES(?,  ?,       ?, strftime('%s','now'), ?, ?, ?, strftime('%s','now'),?);"
-   pid (get-host-name) port priority (conc state) megatest-version interface)
+   pid (get-host-name) port pubport priority (conc state) megatest-version interface)
   (list 
    (tasks:server-get-server-id mdb (get-host-name) interface port pid)
    interface
    port
+   pubport
    ))
 
 ;; NB// two servers with same pid on different hosts will be removed from the list if pid: is used!
 (define (tasks:server-deregister mdb hostname #!key (port #f)(pid #f)(action 'markdead))
   (debug:print-info 11 "server-deregister " hostname ", port " port ", pid " pid)
@@ -253,14 +255,14 @@
 
 
 (define (tasks:get-all-servers mdb)
   (let ((res '()))
     (sqlite3:for-each-row
-     (lambda (id pid hostname interface port start-time priority state mt-version last-update)
-       (set! res (cons (vector id pid hostname interface port start-time priority state mt-version last-update) res)))
+     (lambda (id pid hostname interface port pubport start-time priority state mt-version last-update)
+       (set! res (cons (vector id pid hostname interface port pubport start-time priority state mt-version last-update) res)))
      mdb
-     "SELECT id,pid,hostname,interface,port,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update FROM servers ORDER BY start_time DESC;")
+     "SELECT id,pid,hostname,interface,port,pubport,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update FROM servers ORDER BY start_time DESC;")
     res))
        
 
 ;;======================================================================
 ;; Tasks and Task monitors

Index: zmq-transport.scm
==================================================================
--- zmq-transport.scm
+++ zmq-transport.scm
@@ -11,13 +11,11 @@
 (require-extension (srfi 18) extras tcp s11n)
 
 (use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest)
 (import (prefix sqlite3 sqlite3:))
 
-(use spiffy uri-common intarweb http-client spiffy-request-vars)
-
-(tcp-buffer-size 2048)
+(use zmq)
 
 (declare (unit server))
 
 (declare (uses common))
 (declare (uses db))
@@ -24,131 +22,186 @@
 (declare (uses tests))
 (declare (uses tasks)) ;; tasks are where stuff is maintained about what is running.
 
 (include "common_records.scm")
 (include "db_records.scm")
+
+;; Transition to pub --> sub with pull <-- push
+;;
+;;   1. client sends request to server via push to the pull port
+;;   2. server puts request in queue or processes immediately as appropriate
+;;   3. server puts responses from completed requests into pub port 
+;;
+;; TODO
+;;
+;; Done Tested
+;; [x]  [ ]    1. Add columns pullport pubport to servers table
+;; [x]  [ ]    2. Add rm of monitor.db if older than 11/12/2012 
+;; [x]  [ ]    3. Add create of pullport and pubport with finding of available ports
+;; [x]  [ ]    4. Add client compose of request
+;; [x]  [ ]        - name of client: testname/itempath-test_id-hostname 
+;; [x]  [ ]        - name of request: callname, params
+;; [x]  [ ]        - request key: f(clientname, callname, params)
+;; [x]  [ ]    5. Add processing of subscription hits
+;; [x]  [ ]        - done when get key 
+;; [x]  [ ]        - return results
+;; [x]  [ ]    6. Add timeout processing
+;; [x]  [ ]        - after 60 seconds
+;; [ ]  [ ]            i. check server alive, connect to new if necessary
+;; [ ]  [ ]           ii. resend request
+;; [ ]  [ ]    7. Turn self ping back on
 
 (define (server:make-server-url hostport)
   (if (not hostport)
       #f
-      (conc "http://" (car hostport) ":" (cadr hostport))))
+      (conc "tcp://" (car hostport) ":" (cadr hostport))))
 
 (define  *server-loop-heart-beat* (current-seconds))
 (define *heartbeat-mutex* (make-mutex))
 
 ;;======================================================================
 ;; S E R V E R
 ;;======================================================================
 
-;; Call this to start the actual server
-;;
-
-(define *db:process-queue-mutex* (make-mutex))
+(define-inline (zmqsock:get-pub  dat)(vector-ref dat 0))
+(define-inline (zmqsock:get-pull dat)(vector-ref dat 1))
+(define-inline (zmqsock:set-pub! dat s)(vector-set! dat s 0))
+(define-inline (zmqsock:set-pull! dat s)(vector-set! dat s 0))
 
 (define (server:run hostn)
   (debug:print 2 "Attempting to start the server ...")
   (if (not *toppath*)
       (if (not (setup-for-run))
 	  (begin
 	    (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting")
 	    (exit))))
-  (let* (;; (iface           (if (string=? "-" hostn)
-	 ;;        	      #f ;; (get-host-name) 
-	 ;;        	      hostn))
-	 (db              #f) ;;        (open-db)) ;; we don't want the server to be opening and closing the db unnecesarily
+  (let* ((zmq-sdat1       #f)
+	 (zmq-sdat2       #f)
+	 (pull-socket     #f)
+	 (pub-socket      #f)
+	 (p1              #f)
+	 (p2              #f)
+	 (zmq-sockets-dat #f)
+	 (iface           (if (string=? "-" hostn)
+			      "*" ;; (get-host-name) 
+			      hostn))
 	 (hostname        (get-host-name))
 	 (ipaddrstr       (let ((ipstr (if (string=? "-" hostn)
 					   (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
 					   #f)))
-			    (if ipstr ipstr hostn))) ;; hostname)))
-	 (start-port    (if (args:get-arg "-port")
+			    (if ipstr ipstr hostname)))
+	 (last-run       0))
+    (set! zmq-sockets-dat (server:setup-ports ipaddrstr (if (args:get-arg "-port")
 			    (string->number (args:get-arg "-port"))
-			    (+ 5000 (random 1001))))
-	 (link-tree-path (config-lookup *configdat* "setup" "linktree")))
+							    (+ 5000 (random 1001)))))
+
+    (set! zmq-sdat1    (car   zmq-sockets-dat))
+    (set! pull-socket  (cadr  zmq-sdat1)) ;; (iface s  port)
+    (set! p1           (caddr zmq-sdat1))
+    
+    (set! zmq-sdat2    (cadr  zmq-sockets-dat))
+    (set! pub-socket   (cadr  zmq-sdat2))
+    (set! p2           (caddr zmq-sdat2))
+
     (set! *cache-on* #t)
-    (root-path     (if link-tree-path 
-		       link-tree-path
-		       (current-directory))) ;; WARNING: SECURITY HOLE. FIX ASAP!
+
+    ;; what to do when we quit
+    ;;
+;;     (on-exit (lambda ()
+;; 	       (if (and *toppath* *server-info*)
+;; 		   (open-run-close tasks:server-deregister-self tasks:open-db (car *server-info*))
+;; 		   (let loop () 
+;; 		     (let ((queue-len 0))
+;; 		       (thread-sleep! (random 5))
+;; 		       (mutex-lock! *incoming-mutex*)
+;; 		       (set! queue-len (length *incoming-data*))
+;; 		       (mutex-unlock! *incoming-mutex*)
+;; 		       (if (> queue-len 0)
+;; 			   (begin
+;; 			     (debug:print-info 0 "Queue not flushed, waiting ...")
+;; 			     (loop))))))))
 
-    ;; Setup the web server and a /ctrl interface
+    ;; The heavy lifting
+    ;;
+    ;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime
     ;;
-    (vhost-map `(((* any) . ,(lambda (continue)
-			       ;; open the db on the first call 
-			       (if (not db)(set! db (open-db)))
-			       (let* (($   (request-vars source: 'both))
-				      (dat ($ 'dat))
-				      (res #f))
-				 (cond
-				  ((equal? (uri-path (request-uri (current-request))) 
-					   '(/ "hey"))
-				   (send-response body: "hey there!\n"
-						  headers: '((content-type text/plain))))
-				  ;; This is the /ctrl path where data is handed to the server and
-				  ;; responses 
-				  ((equal? (uri-path (request-uri (current-request)))
-					   '(/ "ctrl"))
-				   (let* ((packet (db:string->obj dat))
+    (let loop ((queue-lst '()))
+      (let* ((rawmsg (receive-message* pull-socket))
+	     (packet (db:string->obj rawmsg))
 					  (qtype  (cdb:packet-get-qtype packet)))
 				     (debug:print-info 12 "server=> received packet=" packet)
 				     (if (not (member qtype '(sync ping)))
 					 (begin
 					   (mutex-lock! *heartbeat-mutex*)
 					   (set! *last-db-access* (current-seconds))
 					   (mutex-unlock! *heartbeat-mutex*)))
-				     ;; (mutex-lock! *db:process-queue-mutex*) ;; trying a mutex
-				     ;; (set! res (open-run-close db:process-queue-item open-db packet))
-				     (set! res (db:process-queue-item db packet))
-				     ;; (mutex-unlock! *db:process-queue-mutex*)
-				     (debug:print-info 11 "Return value from db:process-queue-item is " res)
-				     (send-response body: (conc "<head>ctrl data</head>\n<body>"
-								res
-								"</body>")
-						    headers: '((content-type text/plain)))))
-				  (else (continue))))))))
-    (server:try-start-server ipaddrstr start-port)
-    ;; lite3:finalize! db)))
-    ))
-
-
-
-;; (define (server:main-loop)
-;;   (print "INFO: Exectuing main server loop")
-;;   (access-log "megatest-http.log")
-;;   (server-bind-address #f)
-;;   (define-page (main-page-path)
-;;     (lambda ()
-;;       (let ((dat ($ "dat")))
-;;       ;; (with-request-variables (dat)
-;;         (debug:print-info 12 "Got dat=" dat)
-;; 	(let* ((packet (db:string->obj dat))
-;; 	       (qtype  (cdb:packet-get-qtype packet)))
-;; 	  (debug:print-info 12 "server=> received packet=" packet)
-;; 	  (if (not (member qtype '(sync ping)))
-;; 	      (begin
-;; 		(mutex-lock! *heartbeat-mutex*)
-;; 		(set! *last-db-access* (current-seconds))
-;; 		(mutex-unlock! *heartbeat-mutex*)))
-;; 	  (let ((res (open-run-close db:process-queue-item open-db packet)))
-;; 	    (debug:print-info 11 "Return value from db:process-queue-item is " res)
-;; 	    res))))))
-
-;;; (use spiffy uri-common intarweb)
-;;; 
-;;; (root-path "/var/www")
-;;; 
-;;; (vhost-map `(((* any) . ,(lambda (continue)
-;;;                            (if (equal? (uri-path (request-uri (current-request))) 
-;;;                                        '(/ "hey"))
-;;;                                (send-response body: "hey there!\n"
-;;;                                               headers: '((content-type text/plain)))
-;;;                                (continue))))))
-;;; 
-;;; (start-server port: 12345)
-
-;; This is recursively run by server:run until sucessful
+	(if #t ;; (cdb:packet-get-immediate packet) ;; process immediately or put in queue
+	    (begin
+	      (open-run-close db:process-queue #f pub-socket (cons packet queue-lst))
+	      (loop '()))
+	    (loop (cons packet queue-lst)))))))
+
+;; run server:keep-running in a parallel thread to monitor that the db is being 
+;; used and to shutdown after sometime if it is not.
 ;;
-(define (server:try-start-server ipaddrstr portnum)
+(define (server:keep-running)
+  ;; if none running or if > 20 seconds since 
+  ;; server last used then start shutdown
+  ;; This thread waits for the server to come alive
+  (let* ((server-info (let loop ()
+			(let ((sdat #f))
+			  (mutex-lock! *heartbeat-mutex*)
+			  (set! sdat *server-info*)
+			  (mutex-unlock! *heartbeat-mutex*)
+			  (if sdat sdat
+			      (begin
+				(sleep 4)
+				(loop))))))
+	 (iface       (cadr server-info))
+	 (pullport    (caddr server-info))
+	 (pubport     (cadddr server-info)) ;; id interface pullport pubport)
+	 (zmq-sockets (server:client-connect iface pullport pubport))
+	 (last-access 0))
+    (let loop ((count 0))
+      (thread-sleep! 4) ;; no need to do this very often
+      ;; NB// sync currently does NOT return queue-length
+      (let ((queue-len (cdb:client-call zmq-sockets 'sync #t 1)))
+      ;; (print "Server running, count is " count)
+	(if (< count 1) ;; 3x3 = 9 secs aprox
+	    (loop (+ count 1)))
+
+	;; NOTE: Get rid of this mechanism! It really is not needed...
+	(open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info))
+
+	;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access
+	(mutex-lock! *heartbeat-mutex*)
+	(set! last-access *last-db-access*)
+	(mutex-unlock! *heartbeat-mutex*)
+	(if (> (+ last-access
+		  ;; (* 50 60 60)    ;; 48 hrs
+		  ;; 60              ;; one minute
+		  ;; (* 60 60)       ;; one hour
+		  (* 45 60)          ;; 45 minutes, until the db deletion bug is fixed.
+		  )
+	       (current-seconds))
+	    (begin
+	      (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access))
+	      (loop 0))
+	    (begin
+	      (debug:print-info 0 "Starting to shutdown the server.")
+	      ;; need to delete only *my* server entry (future use)
+	      (set! *time-to-exit* #t)
+	      (open-run-close tasks:server-deregister-self tasks:open-db (get-host-name))
+	      (thread-sleep! 1)
+	      (debug:print-info 0 "Max cached queries was " *max-cache-size*)
+	      (debug:print-info 0 "Server shutdown complete. Exiting")
+	      (exit)))))))
+
+(define (server:find-free-port-and-open iface s port stype #!key (trynum 50))
+  (let ((s (if s s (make-socket stype)))
+	(p (if (number? port) port 5555))
+ 	(old-handler (current-exception-handler)))
   (handle-exceptions
    exn
    (begin
      (print-error-message exn)
      (if (< portnum 9000)
@@ -197,44 +250,30 @@
   (if *my-client-signature* *my-client-signature*
       (let ((sig (server:mk-signature)))
 	(set! *my-client-signature* sig)
 	*my-client-signature*)))
 
-;; <html>
-;; <head></head>
-;; <body>1 Hello, world! Goodbye Dolly</body></html>
-;; Send msg to serverdat and receive result
-(define (server:client-send-receive serverdat msg)
-  (let* ((url        (server:make-server-url serverdat))
-	 (fullurl    (conc url "/ctrl")) ;; (conc url "/?dat=" msg)))
-	 (numretries 0))     
-    (handle-exceptions
-     exn
-     (if (< numretries 200)
-	 (server:client-send-receive serverdat msg))
+;; 
+(define (server:client-socket-connect iface port #!key (context #f)(type 'req)(subscriptions '()))
+  (debug:print-info 3 "client-connect " iface ":" port ", type=" type ", subscriptions=" subscriptions)
+  (let ((connect-ok #f)
+	(zmq-socket (if context 
+			(make-socket type context)
+			(make-socket type)))
+	(conurl     (server:make-server-url (list iface port))))
+    (if (socket? zmq-socket)
      (begin
-       (debug:print-info 11 "fullurl=" fullurl "\n")
-       ;; set up the http-client here
-       (max-retry-attempts 100)
-       (retry-request? (lambda (request)
-			 (thread-sleep! (/ (if (> numretries 100) 100 numretries) 10))
-			 (set! numretries (+ numretries 1))
-			 #t))
-       ;; send the data and get the response
-       ;; extract the needed info from the http data and 
-       ;; process and return it.
-       (let* ((res   (with-input-from-request fullurl 
-					      ;; #f
-					      ;; msg 
-					      (list (cons 'dat msg)) 
-					      read-string)))
-	 (debug:print-info 11 "got res=" res)
-	 (let ((match (string-search (regexp "<body>(.*)<.body>") res)))
-	   (debug:print-info 11 "match=" match)
-	   (let ((final (cadr match)))
-	     (debug:print-info 11 "final=" final)
-	     final)))))))
+	  ;; first apply subscriptions
+	  (for-each (lambda (subscription)
+		      (debug:print 2 "Subscribing to " subscription)
+		      (socket-option-set! zmq-socket 'subscribe subscription))
+		    subscriptions)
+	  (connect-socket zmq-socket conurl)
+	  zmq-socket)
+	(begin
+	  (debug:print 0 "ERROR: Failed to open socket to " conurl)
+	  #f))))
 
 (define (server:client-login serverdat)
   (max-retry-attempts 100)
   (cdb:login serverdat *toppath* (server:get-client-signature)))
 
@@ -243,22 +282,26 @@
   (let ((ok (and (socket? serverdat)
 		 (cdb:logout serverdat *toppath* (server:get-client-signature)))))
     ;; (close-socket serverdat)
     ok))
 
-(define (server:client-connect iface port)
-  (let* ((login-res   #f)
-	 (serverdat   (list iface port)))
-    (set! login-res (server:client-login serverdat))
+(define (server:client-connect iface pullport pubport)
+  (let* ((push-socket (server:client-socket-connect iface pullport type: 'push))
+	 (sub-socket  (server:client-socket-connect iface pubport
+						    type: 'sub
+						    subscriptions: (list (server:get-client-signature) "all")))
+	 (zmq-sockets (vector push-socket sub-socket))
+	 (login-res   #f))
+    (set! login-res (server:client-login zmq-sockets))
     (if (and (not (null? login-res))
 	     (car login-res))
 	(begin
-	  (debug:print-info 2 "Logged in and connected to " iface ":" port)
-	  (set! *runremote* serverdat)
-	  serverdat)
+	  (debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".")
+	  (set! *runremote* zmq-sockets)
+	  zmq-sockets)
 	(begin
-	  (debug:print-info 2 "Failed to login or connect to " iface ":" port)
+	  (debug:print-info 2 "Failed to login or connect to " conurl)
 	  (set! *runremote* #f)
 	  #f))))
 
 ;; Do all the connection work, start a server if not already running
 (define (server:client-setup #!key (numtries 50))
@@ -270,25 +313,37 @@
   (let ((hostinfo   (open-run-close tasks:get-best-server tasks:open-db)))
     (if hostinfo
 	(let ((host     (list-ref hostinfo 0))
 	      (iface    (list-ref hostinfo 1))
 	      (port     (list-ref hostinfo 2))
-	      (pid      (list-ref hostinfo 3)))
+          (pubport  (list-ref hostinfo 3))
+	      (pid      (list-ref hostinfo 4)))
 	  (debug:print-info 2 "Setting up to connect to " hostinfo)
-	  (server:client-connect iface port)) ;; )
+	  ;; (handle-exceptions
+	  ;;   exn
+	  ;;  (begin
+	  ;;    ;; something went wrong in connecting to the server. In this scenario it is ok
+	  ;;    ;; to try again
+	  ;;    (debug:print 0 "ERROR: Failed to open a connection to the server at: " hostinfo)
+	  ;;    (debug:print 0 "   EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
+	  ;;    (debug:print 0 "   perhaps jobs killed with -9? Removing server records")
+	  ;;    (open-run-close tasks:server-deregister tasks:open-db host pullport: pullport)
+	  ;;    (server:client-setup (- numtries 1))
+	  ;;    #f)
+	   (server:client-connect iface port pubport)) ;; )
 	(if (> numtries 0)
 	    (let ((exe (car (argv)))
 		  (pid #f))
 	      (debug:print-info 0 "No server available, attempting to start one...")
-	      (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*)
-	        							  (string-intersperse *verbosity* ",")
-	        							  (conc *verbosity*)))))
-	      ;; (set! pid (process-fork (lambda ()
+	      ;; (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*)
+	      ;;   							  (string-intersperse *verbosity* ",")
+	      ;;   							  (conc *verbosity*)))))
+	      (set! pid (process-fork (lambda ()
 	      ;;   			(current-input-port  (open-input-file  "/dev/null"))
 	      ;;   			(current-output-port (open-output-file "/dev/null"))
 	      ;;   			(current-error-port  (open-output-file "/dev/null"))
-	      ;;   			(server:launch))))
+					(server:launch)))) ;; should never get here ....
 	      (let loop ((count 0))
 		(let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db)))
 		  (if (not hostinfo)
 		      (begin
 			(debug:print-info 0 "Waiting for server pid=" pid " to start")
@@ -368,20 +423,37 @@
   (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db)))
     (debug:print 11 "server:launch hostinfo=" hostinfo)
     (if hostinfo
 	(debug:print-info 2 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo))
 	(if *toppath* 
-	    (let* ((th2 (make-thread (lambda ()
+	    (let* (;; (th1 (make-thread (lambda ()
+		   ;;      	       (let ((server-info #f))
+		   ;;      		 ;; wait for the server to be online and available
+		   ;;      		 (let loop ()
+		   ;;			   (debug:print-info 2 "Waiting for the server to come online before starting heartbeat")
+		   ;;      		   (thread-sleep! 2)
+		   ;;      		   (mutex-lock! *heartbeat-mutex*)
+		   ;;      		   (set! server-info *server-info* )
+		   ;;      		   (mutex-unlock! *heartbeat-mutex*)
+		   ;;      		   (if (not server-info)(loop)))
+		   ;;			 (debug:print 2 "Server alive, starting self-ping")
+		   ;;      		 (server:self-ping server-info)
+		   ;;      		 ))
+		   ;;      	     "Self ping"))
+		   (th2 (make-thread (lambda ()
 				       (server:run 
 					(if (args:get-arg "-server")
 					    (args:get-arg "-server")
 					    "-"))) "Server run"))
 		   (th3 (make-thread (lambda ()(server:keep-running)) "Keep running"))
 		   )
+	      (set! *client-non-blocking-mode* #t)
+	      ;; (thread-start! th1)
 	      (thread-start! th2)
 	      (thread-start! th3)
 	      (set! *didsomething* #t)
+	      ;; (thread-join! th3)
 	      (thread-join! th2)
 	      )
 	    (debug:print 0 "ERROR: Failed to setup for megatest")))
     (exit)))
 
@@ -388,15 +460,16 @@
 (define (server:client-signal-handler signum)
   (handle-exceptions
    exn
    (debug:print " ... exiting ...")
    (let ((th1 (make-thread (lambda ()
-			     "") ;; do nothing for now (was flush out last call if applicable)
+			     (if (not *received-response*)
+				 (receive-message* *runremote*))) ;; flush out last call if applicable
 			   "eat response"))
 	 (th2 (make-thread (lambda ()
 			     (debug:print 0 "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.")
-			     (thread-sleep! 1) ;; give the flush one second to do it's stuff
+			     (thread-sleep! 3) ;; give the flush three seconds to do it's stuff
 			     (debug:print 0 "       Done.")
 			     (exit 4))
 			   "exit on ^C timer")))
      (thread-start! th2)
      (thread-start! th1)
@@ -408,5 +481,70 @@
        (debug:print-info 2 "connected as client")
        (begin
 	 (debug:print 0 "ERROR: Failed to connect as client")
 	 (exit))))
 
+;;======================================================================
+;; Defunct functions
+;;======================================================================
+
+;; ping a server and return number of clients or #f (if no response)
+;; NOT IN USE!
+(define (server:ping host port #!key (secs 10)(return-socket #f))
+  (cdb:use-non-blocking-mode
+   (lambda ()
+     (let* ((res #f)
+	    (th1 (make-thread
+		  (lambda ()
+		    (let* ((zmq-context (make-context 1))
+			   (zmq-socket  (server:client-connect host port context: zmq-context)))
+		      (if zmq-socket
+			  (if (server:client-login zmq-socket)
+			      (let ((numclients (cdb:num-clients zmq-socket)))
+				(if (not return-socket)
+				    (begin
+				      (server:client-logout zmq-socket)
+				      (close-socket  zmq-socket)))
+				(set! res (list #t numclients (if return-socket zmq-socket #f))))
+			      (begin
+				;; (close-socket zmq-socket)
+				(set! res (list #f "CAN'T LOGIN" #f))))
+			  (set! res (list #f "CAN'T CONNECT" #f)))))
+		  "Ping: th1"))
+	    (th2 (make-thread
+		  (lambda ()
+		    (let loop ((count 1))
+		      (debug:print-info 1 "Ping " count " server on " host " at port " port)
+		      (thread-sleep! 2)
+		      (if (< count (/ secs 2))
+			  (loop (+ count 1))))
+		    ;; (thread-terminate! th1)
+		    (set! res (list #f "TIMED OUT" #f)))
+		  "Ping: th2")))
+       (thread-start! th2)
+       (thread-start! th1)
+       (handle-exceptions
+	exn
+	(set! res (list #f "TIMED OUT" #f))
+	(thread-join! th1 secs))
+       res))))
+
+;; (define (server:self-ping server-info)
+;;   ;; server-info: server-id interface pullport pubport
+;;   (let ((iface    (list-ref server-info 1))
+;; 	(pullport (list-ref server-info 2))
+;; 	(pubport  (list-ref server-info 3)))
+;;     (server:client-connect iface pullport pubport)
+;;     (let loop ()
+;;       (thread-sleep! 2)
+;;       (cdb:client-call *runremote* 'ping #t)
+;;       (debug:print 4 "server:self-ping - I'm alive on " iface ":" pullport "/" pubport "!")
+;;       (mutex-lock! *heartbeat-mutex*)
+;;       (set! *server-loop-heart-beat* (current-seconds))
+;;       (mutex-unlock! *heartbeat-mutex*)
+;;       (loop))))
+    
+(define (server:reply pubsock target query-sig success/fail result)
+  (debug:print-info 11 "server:reply target=" target ", result=" result)
+  (send-message pubsock target send-more: #t)
+  (send-message pubsock (db:obj->string (vector success/fail query-sig result))))
+