Megatest

Diff
Login

Differences From Artifact [442b5dfce3]:

To Artifact [84997153bf]:


11
12
13
14
15
16
17
18

19
20
21
22
23

24
25
26
27
28
29
30
11
12
13
14
15
16
17

18
19
20
21
22
23
24
25
26
27
28
29
30
31







-
+





+







(require-extension (srfi 18) extras tcp s11n)

(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest)
(import (prefix sqlite3 sqlite3:))

(use zmq)

(declare (unit server))
(declare (unit zmq-transport))

(declare (uses common))
(declare (uses db))
(declare (uses tests))
(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running.
(declare (uses server))

(include "common_records.scm")
(include "db_records.scm")

;; Transition to pub --> sub with pull <-- push
;;
;;   1. client sends request to server via push to the pull port
46
47
48
49
50
51
52
53

54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70

71
72
73
74
75
76
77
47
48
49
50
51
52
53

54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70

71
72
73
74
75
76
77
78







-
+
















-
+







;; [x]  [ ]        - return results
;; [x]  [ ]    6. Add timeout processing
;; [x]  [ ]        - after 60 seconds
;; [ ]  [ ]            i. check server alive, connect to new if necessary
;; [ ]  [ ]           ii. resend request
;; [ ]  [ ]    7. Turn self ping back on

(define (server:make-server-url hostport)
(define (zmq-transport:make-server-url hostport)
  (if (not hostport)
      #f
      (conc "tcp://" (car hostport) ":" (cadr hostport))))

(define  *server-loop-heart-beat* (current-seconds))
(define *heartbeat-mutex* (make-mutex))

;;======================================================================
;; S E R V E R
;;======================================================================

(define-inline (zmqsock:get-pub  dat)(vector-ref dat 0))
(define-inline (zmqsock:get-pull dat)(vector-ref dat 1))
(define-inline (zmqsock:set-pub! dat s)(vector-set! dat s 0))
(define-inline (zmqsock:set-pull! dat s)(vector-set! dat s 0))

(define (server:run hostn)
(define (zmq-transport:run hostn)
  (debug:print 2 "Attempting to start the server ...")
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting")
	    (exit))))
  (let* ((zmq-sdat1       #f)
86
87
88
89
90
91
92
93

94
95
96
97
98
99
100
87
88
89
90
91
92
93

94
95
96
97
98
99
100
101







-
+







			      hostn))
	 (hostname        (get-host-name))
	 (ipaddrstr       (let ((ipstr (if (string=? "-" hostn)
					   (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
					   #f)))
			    (if ipstr ipstr hostname)))
	 (last-run       0))
    (set! zmq-sockets-dat (server:setup-ports ipaddrstr (if (args:get-arg "-port")
    (set! zmq-sockets-dat (zmq-transport:setup-ports ipaddrstr (if (args:get-arg "-port")
			    (string->number (args:get-arg "-port"))
							    (+ 5000 (random 1001)))))

    (set! zmq-sdat1    (car   zmq-sockets-dat))
    (set! pull-socket  (cadr  zmq-sdat1)) ;; (iface s  port)
    (set! p1           (caddr zmq-sdat1))
    
136
137
138
139
140
141
142
143

144
145
146

147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162

163
164
165
166
167
168
169
137
138
139
140
141
142
143

144
145
146

147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162

163
164
165
166
167
168
169
170







-
+


-
+















-
+







					   (mutex-unlock! *heartbeat-mutex*)))
	(if #t ;; (cdb:packet-get-immediate packet) ;; process immediately or put in queue
	    (begin
	      (open-run-close db:process-queue #f pub-socket (cons packet queue-lst))
	      (loop '()))
	    (loop (cons packet queue-lst)))))))

;; run server:keep-running in a parallel thread to monitor that the db is being 
;; run zmq-transport:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (server:keep-running)
(define (zmq-transport:keep-running)
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  ;; This thread waits for the server to come alive
  (let* ((server-info (let loop ()
			(let ((sdat #f))
			  (mutex-lock! *heartbeat-mutex*)
			  (set! sdat *server-info*)
			  (mutex-unlock! *heartbeat-mutex*)
			  (if sdat sdat
			      (begin
				(sleep 4)
				(loop))))))
	 (iface       (cadr server-info))
	 (pullport    (caddr server-info))
	 (pubport     (cadddr server-info)) ;; id interface pullport pubport)
	 (zmq-sockets (server:client-connect iface pullport pubport))
	 (zmq-sockets (zmq-transport:client-connect iface pullport pubport))
	 (last-access 0))
    (let loop ((count 0))
      (thread-sleep! 4) ;; no need to do this very often
      ;; NB// sync currently does NOT return queue-length
      (let ((queue-len (cdb:client-call zmq-sockets 'sync #t 1)))
      ;; (print "Server running, count is " count)
	(if (< count 1) ;; 3x3 = 9 secs aprox
192
193
194
195
196
197
198
199

200
201
202
203
204
205
206
207










208
209
210
211
212












213
214



215
216



217
218
219
220
221
222

223

224
225

226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256

257
258
259
260
261
262

263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289



290
291

292
293
294

295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358

359
360
361

362
363
364
365
366
367
368
193
194
195
196
197
198
199

200
201







202
203
204
205
206
207
208
209
210
211





212
213
214
215
216
217
218
219
220
221
222
223


224
225
226


227
228
229






230

231
232

233
234
235
236
237
238
239
240
241
242
243









244
245
246
247






248

249
250
251
252
253
254

255
256
257
258
259
260
261
262
263
264
265
266
267
268














269
270
271
272

273
274
275

276
277
278
279
280
281
282
283
284
285
286
287





















































288
289
290

291
292
293
294
295
296
297
298







-
+

-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
-
-
+
+
+
-
-
+
+
+
-
-
-
-
-
-
+
-
+

-
+










-
-
-
-
-
-
-
-
-




-
-
-
-
-
-

-
+





-
+













-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+

-
+


-
+











-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+


-
+







	      (set! *time-to-exit* #t)
	      (open-run-close tasks:server-deregister-self tasks:open-db (get-host-name))
	      (thread-sleep! 1)
	      (debug:print-info 0 "Max cached queries was " *max-cache-size*)
	      (debug:print-info 0 "Server shutdown complete. Exiting")
	      (exit)))))))

(define (server:find-free-port-and-open iface s port stype #!key (trynum 50))
(define (zmq-transport:find-free-port-and-open iface s port stype #!key (trynum 50))
  (let ((s (if s s (make-socket stype)))
	(p (if (number? port) port 5555))
 	(old-handler (current-exception-handler)))
  (handle-exceptions
   exn
   (begin
     (print-error-message exn)
     (if (< portnum 9000)
        (p (if (number? port) port 5555))
        (old-handler (current-exception-handler)))
    (handle-exceptions
     exn
     (begin
       (debug:print 0 "Failed to bind to port " p ", trying next port")
       (debug:print 0 "   EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
       ;; (old-handler)
       ;; (print-call-chain)
       (if (> trynum 0)
	 (begin 
	   (print "WARNING: failed to start on portnum: " portnum ", trying next port")
	   (thread-sleep! 0.1)
	   (open-run-close tasks:remove-server-records tasks:open-db)
	   (server:try-start-server ipaddrstr (+ portnum 1)))
           (zmq-transport:find-free-port-and-open iface s (+ p 1) trynum: (- trynum 1))
           (debug:print-info 0 "Tried ports up to " p 
                             " but all were in use. Please try a different port range by starting the server with parameter \" -port N\" where N is the starting port number to use"))
       (exit)) ;; To exit or not? That is the question.
     (let ((zmq-url (conc "tcp://" iface ":" p)))
       (debug:print 2 "Trying to start server on " zmq-url)
       (bind-socket s zmq-url)
       (list iface s port)))))

(define (zmq-transport:setup-ports ipaddrstr startport)
  (let* ((s1 (zmq-transport:find-free-port-and-open ipaddrstr #f startport 'pull))
         (p1 (caddr s1))
	 (print "ERROR: Tried and tried but could not start the server")))
   (set! *runremote* (list ipaddrstr portnum))
         (s2 (zmq-transport:find-free-port-and-open ipaddrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pub))
         (p2 (caddr s2)))
    (set! *runremote* #f)
   (open-run-close tasks:remove-server-records tasks:open-db)
   (open-run-close tasks:server-register 
    (debug:print 0 "Server started on " ipaddrstr " ports " p1 " and " p2)
    (mutex-lock! *heartbeat-mutex*)
    (set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) ipaddrstr p1 p2 0 'live 'zmq))
		   tasks:open-db 
		   (current-process-id)
		   ipaddrstr portnum 0 'live)
   (print "INFO: Trying to start server on " ipaddrstr ":" portnum)
   ;; This starts the spiffy server
   (start-server port: portnum)
    (mutex-unlock! *heartbeat-mutex*)
   (print "INFO: server has been stopped")))
    (list s1 s2)))

(define (server:mk-signature)
(define (zmq-transport:mk-signature)
  (message-digest-string (md5-primitive) 
			 (with-output-to-string
			   (lambda ()
			     (write (list (current-directory)
					  (argv)))))))

;;======================================================================
;; S E R V E R   U T I L I T I E S 
;;======================================================================

;; When using zmq this would send the message back (two step process)
;; with spiffy or rpc this simply returns the return data to be returned
;; 
(define (server:reply return-addr query-sig success/fail result)
  (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result)
  ;; (send-message pubsock target send-more: #t)
  ;; (send-message pubsock 
  (db:obj->string (vector success/fail query-sig result)))

;;======================================================================
;; C L I E N T S
;;======================================================================

(define (server:get-client-signature)
  (if *my-client-signature* *my-client-signature*
      (let ((sig (server:mk-signature)))
	(set! *my-client-signature* sig)
	*my-client-signature*)))

;; 
(define (server:client-socket-connect iface port #!key (context #f)(type 'req)(subscriptions '()))
(define (zmq-transport:client-socket-connect iface port #!key (context #f)(type 'req)(subscriptions '()))
  (debug:print-info 3 "client-connect " iface ":" port ", type=" type ", subscriptions=" subscriptions)
  (let ((connect-ok #f)
	(zmq-socket (if context 
			(make-socket type context)
			(make-socket type)))
	(conurl     (server:make-server-url (list iface port))))
	(conurl     (zmq-transport:make-server-url (list iface port))))
    (if (socket? zmq-socket)
     (begin
	  ;; first apply subscriptions
	  (for-each (lambda (subscription)
		      (debug:print 2 "Subscribing to " subscription)
		      (socket-option-set! zmq-socket 'subscribe subscription))
		    subscriptions)
	  (connect-socket zmq-socket conurl)
	  zmq-socket)
	(begin
	  (debug:print 0 "ERROR: Failed to open socket to " conurl)
	  #f))))

(define (server:client-login serverdat)
  (max-retry-attempts 100)
  (cdb:login serverdat *toppath* (server:get-client-signature)))

;; Not currently used! But, I think it *should* be used!!!
(define (server:client-logout serverdat)
  (let ((ok (and (socket? serverdat)
		 (cdb:logout serverdat *toppath* (server:get-client-signature)))))
    ;; (close-socket serverdat)
    ok))

(define (server:client-connect iface pullport pubport)
  (let* ((push-socket (server:client-socket-connect iface pullport type: 'push))
	 (sub-socket  (server:client-socket-connect iface pubport
(define (zmq-transport:client-connect iface pullport pubport)
  (let* ((push-socket (zmq-transport:client-socket-connect iface pullport type: 'push))
	 (sub-socket  (zmq-transport:client-socket-connect iface pubport
						    type: 'sub
						    subscriptions: (list (server:get-client-signature) "all")))
						    subscriptions: (list (zmq-transport:get-client-signature) "all")))
	 (zmq-sockets (vector push-socket sub-socket))
	 (login-res   #f))
    (set! login-res (server:client-login zmq-sockets))
    (set! login-res (zmq-transport:client-login zmq-sockets))
    (if (and (not (null? login-res))
	     (car login-res))
	(begin
	  (debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".")
	  (set! *runremote* zmq-sockets)
	  zmq-sockets)
	(begin
	  (debug:print-info 2 "Failed to login or connect to " conurl)
	  (set! *runremote* #f)
	  #f))))

;; Do all the connection work, start a server if not already running
(define (server:client-setup #!key (numtries 50))
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: failed to find megatest.config, exiting")
	    (exit))))
  (let ((hostinfo   (open-run-close tasks:get-best-server tasks:open-db)))
    (if hostinfo
	(let ((host     (list-ref hostinfo 0))
	      (iface    (list-ref hostinfo 1))
	      (port     (list-ref hostinfo 2))
          (pubport  (list-ref hostinfo 3))
	      (pid      (list-ref hostinfo 4)))
	  (debug:print-info 2 "Setting up to connect to " hostinfo)
	  ;; (handle-exceptions
	  ;;   exn
	  ;;  (begin
	  ;;    ;; something went wrong in connecting to the server. In this scenario it is ok
	  ;;    ;; to try again
	  ;;    (debug:print 0 "ERROR: Failed to open a connection to the server at: " hostinfo)
	  ;;    (debug:print 0 "   EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
	  ;;    (debug:print 0 "   perhaps jobs killed with -9? Removing server records")
	  ;;    (open-run-close tasks:server-deregister tasks:open-db host pullport: pullport)
	  ;;    (server:client-setup (- numtries 1))
	  ;;    #f)
	   (server:client-connect iface port pubport)) ;; )
	(if (> numtries 0)
	    (let ((exe (car (argv)))
		  (pid #f))
	      (debug:print-info 0 "No server available, attempting to start one...")
	      ;; (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*)
	      ;;   							  (string-intersperse *verbosity* ",")
	      ;;   							  (conc *verbosity*)))))
	      (set! pid (process-fork (lambda ()
	      ;;   			(current-input-port  (open-input-file  "/dev/null"))
	      ;;   			(current-output-port (open-output-file "/dev/null"))
	      ;;   			(current-error-port  (open-output-file "/dev/null"))
					(server:launch)))) ;; should never get here ....
	      (let loop ((count 0))
		(let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db)))
		  (if (not hostinfo)
		      (begin
			(debug:print-info 0 "Waiting for server pid=" pid " to start")
			(sleep 2) ;; give server time to start
			(if (< count 5)
			    (loop (+ count 1)))))))
	      ;; we are starting a server, do not try again! That can lead to 
	      ;; recursively starting many processes!!!
	      (server:client-setup numtries: 0))
	    (debug:print-info 1 "Too many attempts, giving up")))))

;; run server:keep-running in a parallel thread to monitor that the db is being 
;; run zmq-transport:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (server:keep-running)
(define (zmq-transport:keep-running)
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  ;; This thread waits for the server to come alive
  (let* ((server-info (let loop ()
                        (let ((sdat #f))
                          (mutex-lock! *heartbeat-mutex*)
                          (set! sdat *runremote*)
409
410
411
412
413
414
415
416

417
418
419
420
421
422

423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457































458
459
460

461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480



481
482
483
484
485
486
487
488
489
490
491
492

493
494
495
496
497
498
499

500
501

502
503
504
505

506
507
508
509
510
511
512
339
340
341
342
343
344
345

346
347
348
349
350
351

352



































353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383

384

385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402



403
404
405
406
407
408
409
410
411
412
413
414
415
416

417
418
419
420
421
422
423

424
425

426
427
428
429

430
431
432
433
434
435
436
437







-
+





-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-

-
+

















-
-
-
+
+
+











-
+






-
+

-
+



-
+







              (tasks:server-deregister-self tdb (get-host-name))
              (thread-sleep! 1)
              (debug:print-info 0 "Max cached queries was " *max-cache-size*)
              (debug:print-info 0 "Server shutdown complete. Exiting")
              (exit)))))))

;; all routes though here end in exit ...
(define (server:launch)
(define (zmq-transport:launch)
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: cannot find megatest.config, exiting")
	    (exit))))
  (debug:print-info 2 "Starting the standalone server")
  (debug:print-info 2 "Starting zmq server")
  (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db)))
    (debug:print 11 "server:launch hostinfo=" hostinfo)
    (if hostinfo
	(debug:print-info 2 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo))
	(if *toppath* 
	    (let* (;; (th1 (make-thread (lambda ()
		   ;;      	       (let ((server-info #f))
		   ;;      		 ;; wait for the server to be online and available
		   ;;      		 (let loop ()
		   ;;			   (debug:print-info 2 "Waiting for the server to come online before starting heartbeat")
		   ;;      		   (thread-sleep! 2)
		   ;;      		   (mutex-lock! *heartbeat-mutex*)
		   ;;      		   (set! server-info *server-info* )
		   ;;      		   (mutex-unlock! *heartbeat-mutex*)
		   ;;      		   (if (not server-info)(loop)))
		   ;;			 (debug:print 2 "Server alive, starting self-ping")
		   ;;      		 (server:self-ping server-info)
		   ;;      		 ))
		   ;;      	     "Self ping"))
		   (th2 (make-thread (lambda ()
				       (server:run 
					(if (args:get-arg "-server")
					    (args:get-arg "-server")
					    "-"))) "Server run"))
		   (th3 (make-thread (lambda ()(server:keep-running)) "Keep running"))
		   )
	      (set! *client-non-blocking-mode* #t)
	      ;; (thread-start! th1)
	      (thread-start! th2)
	      (thread-start! th3)
	      (set! *didsomething* #t)
	      ;; (thread-join! th3)
	      (thread-join! th2)
	      )
	    (debug:print 0 "ERROR: Failed to setup for megatest")))
  (if *toppath* 
      (let* (;; (th1 (make-thread (lambda ()
	     ;;      	       (let ((server-info #f))
	     ;;      		 ;; wait for the server to be online and available
	     ;;      		 (let loop ()
	     ;;			   (debug:print-info 2 "Waiting for the server to come online before starting heartbeat")
	     ;;      		   (thread-sleep! 2)
	     ;;      		   (mutex-lock! *heartbeat-mutex*)
	     ;;      		   (set! server-info *server-info* )
	     ;;      		   (mutex-unlock! *heartbeat-mutex*)
	     ;;      		   (if (not server-info)(loop)))
	     ;;			 (debug:print 2 "Server alive, starting self-ping")
	     ;;      		 (zmq-transport:self-ping server-info)
	     ;;      		 ))
	     ;;      	     "Self ping"))
	     (th2 (make-thread (lambda ()
				 (zmq-transport:run 
				  (if (args:get-arg "-server")
				      (args:get-arg "-server")
				      "-"))) "Server run"))
	     (th3 (make-thread (lambda ()(zmq-transport:keep-running)) "Keep running"))
	     )
	(set! *client-non-blocking-mode* #t)
	;; (thread-start! th1)
	(thread-start! th2)
	(thread-start! th3)
	(set! *didsomething* #t)
	;; (thread-join! th3)
	(thread-join! th2)
	)
      (debug:print 0 "ERROR: Failed to setup for megatest")))
    (exit)))

(define (server:client-signal-handler signum)
(define (zmq-transport:client-signal-handler signum)
  (handle-exceptions
   exn
   (debug:print " ... exiting ...")
   (let ((th1 (make-thread (lambda ()
			     (if (not *received-response*)
				 (receive-message* *runremote*))) ;; flush out last call if applicable
			   "eat response"))
	 (th2 (make-thread (lambda ()
			     (debug:print 0 "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.")
			     (thread-sleep! 3) ;; give the flush three seconds to do it's stuff
			     (debug:print 0 "       Done.")
			     (exit 4))
			   "exit on ^C timer")))
     (thread-start! th2)
     (thread-start! th1)
     (thread-join! th2))))

(define (server:client-launch)
  (set-signal-handler! signal/int server:client-signal-handler)
   (if (server:client-setup)
(define (zmq-transport:client-launch)
  (set-signal-handler! signal/int zmq-transport:client-signal-handler)
   (if (zmq-transport:client-setup)
       (debug:print-info 2 "connected as client")
       (begin
	 (debug:print 0 "ERROR: Failed to connect as client")
	 (exit))))

;;======================================================================
;; Defunct functions
;;======================================================================

;; ping a server and return number of clients or #f (if no response)
;; NOT IN USE!
(define (server:ping host port #!key (secs 10)(return-socket #f))
(define (zmq-transport:ping host port #!key (secs 10)(return-socket #f))
  (cdb:use-non-blocking-mode
   (lambda ()
     (let* ((res #f)
	    (th1 (make-thread
		  (lambda ()
		    (let* ((zmq-context (make-context 1))
			   (zmq-socket  (server:client-connect host port context: zmq-context)))
			   (zmq-socket  (zmq-transport:client-connect host port context: zmq-context)))
		      (if zmq-socket
			  (if (server:client-login zmq-socket)
			  (if (zmq-transport:client-login zmq-socket)
			      (let ((numclients (cdb:num-clients zmq-socket)))
				(if (not return-socket)
				    (begin
				      (server:client-logout zmq-socket)
				      (zmq-transport:client-logout zmq-socket)
				      (close-socket  zmq-socket)))
				(set! res (list #t numclients (if return-socket zmq-socket #f))))
			      (begin
				;; (close-socket zmq-socket)
				(set! res (list #f "CAN'T LOGIN" #f))))
			  (set! res (list #f "CAN'T CONNECT" #f)))))
		  "Ping: th1"))
524
525
526
527
528
529
530
531

532
533
534
535
536

537
538
539
540

541
542
543
544
545
546
547


548
549
550
449
450
451
452
453
454
455

456
457
458
459
460

461
462
463
464

465
466
467
468
469
470


471
472
473
474
475







-
+




-
+



-
+





-
-
+
+



       (thread-start! th1)
       (handle-exceptions
	exn
	(set! res (list #f "TIMED OUT" #f))
	(thread-join! th1 secs))
       res))))

;; (define (server:self-ping server-info)
;; (define (zmq-transport:self-ping server-info)
;;   ;; server-info: server-id interface pullport pubport
;;   (let ((iface    (list-ref server-info 1))
;; 	(pullport (list-ref server-info 2))
;; 	(pubport  (list-ref server-info 3)))
;;     (server:client-connect iface pullport pubport)
;;     (zmq-transport:client-connect iface pullport pubport)
;;     (let loop ()
;;       (thread-sleep! 2)
;;       (cdb:client-call *runremote* 'ping #t)
;;       (debug:print 4 "server:self-ping - I'm alive on " iface ":" pullport "/" pubport "!")
;;       (debug:print 4 "zmq-transport:self-ping - I'm alive on " iface ":" pullport "/" pubport "!")
;;       (mutex-lock! *heartbeat-mutex*)
;;       (set! *server-loop-heart-beat* (current-seconds))
;;       (mutex-unlock! *heartbeat-mutex*)
;;       (loop))))
    
(define (server:reply pubsock target query-sig success/fail result)
  (debug:print-info 11 "server:reply target=" target ", result=" result)
(define (zmq-transport:reply pubsock target query-sig success/fail result)
  (debug:print-info 11 "zmq-transport:reply target=" target ", result=" result)
  (send-message pubsock target send-more: #t)
  (send-message pubsock (db:obj->string (vector success/fail query-sig result))))