Megatest

Changes On Branch 6ede23fb86599084
Login

Changes In Branch multi-transport Through [6ede23fb86] Excluding Merge-Ins

This is equivalent to a diff from 6bba674f33 to 6ede23fb86

2013-01-29
10:15
Merged multi-transport to trunk, all tests passed check-in: 19f85b577e user: mrwellan tags: trunk
2013-01-28
22:14
Enabled http transport check-in: c3d256ef96 user: matt tags: multi-transport
2013-01-27
23:06
Compile errors missed ... check-in: 6ede23fb86 user: matt tags: multi-transport
22:59
Most cleanup done, routines consolidated check-in: 12356df330 user: matt tags: multi-transport
13:14
Creating branch for multi transport options, http, rpc, zmq, and network fs check-in: 66763d5399 user: matt tags: multi-transport
13:11
merged trunk into sqlite3-logging Closed-Leaf check-in: 7bd6b6dae6 user: matt tags: sqlite3-logging
12:22
Merged http-transport to trunk check-in: 6bba674f33 user: matt tags: trunk
12:17
Merged zmq-3.2.2 to trunk check-in: 3be673e9be user: matt tags: trunk
10:04
Streamlined db access a little, test4 completes in reasonable time Closed-Leaf check-in: a893c641ca user: matt tags: http-transport

Modified Makefile from [a4b44c1e54] to [1a410eaa9f].

1
2
3
4
5
6
7
8


9
10
11
12
13
14
15
1
2
3
4
5
6
7

8
9
10
11
12
13
14
15
16







-
+
+








PREFIX=$(PWD)
CSCOPTS= 
INSTALL=install
SRCFILES = common.scm items.scm launch.scm \
           ods.scm runconfig.scm server.scm configf.scm \
           db.scm keys.scm margs.scm megatest-version.scm \
           process.scm runs.scm tasks.scm tests.scm genexample.scm
           process.scm runs.scm tasks.scm tests.scm genexample.scm \
	   fs-transport.scm zmq-transport.scm http-transport.scm

GUISRCF  = dashboard.scm dashboard-tests.scm dashboard-guimonitor.scm dashboard-main.scm

OFILES   = $(SRCFILES:%.scm=%.o)
GOFILES  = $(GUISRCF:%.scm=%.o)

ADTLSCR=mt_laststep mt_runstep mt_ezstep

Modified common.scm from [1ba863b641] to [a6d027f297].

38
39
40
41
42
43
44


45
46
47
48
49
50
51
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53







+
+







(define *waiting-queue*     (make-hash-table))
(define *test-meta-updated* (make-hash-table))
(define *globalexitstatus*  0) ;; attempt to work around possible thread issues
(define *passnum*           0) ;; when running track calls to run-tests or similar

;; SERVER
(define *my-client-signature* #f)
(define *transport-type*    'fs)
(define *megatest-db*       #f)
(define *rpc:listener*      #f) ;; if set up for server communication this will hold the tcp port
(define *runremote*         #f) ;; if set up for server communication this will hold <host port>
(define *last-db-access*    (current-seconds))  ;; update when db is accessed via server
(define *max-cache-size*    0)
(define *logged-in-clients* (make-hash-table))
(define *client-non-blocking-mode* #f)
(define *server-id*         #f)

Modified db.scm from [4a4c4c2fc7] to [7c779ff845].

15
16
17
18
19
20
21



22
23
24
25
26

27
28
29
30
31
32
33
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37







+
+
+





+








(require-extension (srfi 18) extras tcp) ;;  rpc)
;; (import (prefix rpc rpc:))

(use sqlite3 srfi-1 posix regex regex-case srfi-69 csv-xml s11n md5 message-digest base64)
(import (prefix sqlite3 sqlite3:))
(import (prefix base64 base64:))

;; Note, try to remove this dependency 
(use zmq)

(declare (unit db))
(declare (uses common))
(declare (uses keys))
(declare (uses ods))
(declare (uses fs-transport))

(include "common_records.scm")
(include "db_records.scm")
(include "key_records.scm")
(include "run_records.scm")

;; timestamp type (val1 val2 ...)
1102
1103
1104
1105
1106
1107
1108

1109



1110
1111
1112
1113






1114
1115



1116
1117
1118
1119
1120







1121
1122
1123
1124
1125
1126
1127
1128
1129
1130






1131
1132
1133
1134

1135
1136

1137
1138
1139
1140
1141






1142
1143
1144


1145
1146
1147
1148
1149




1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161



































1162
1163
1164
1165
1166
1167
1168







1169
1170
1171
1172
1173
1174
1175
1176
1177
1178








1179
1180
1181
1182
1183
1184
1185
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117




1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128





1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154

1155


1156





1157
1158
1159
1160
1161
1162



1163
1164





1165
1166
1167
1168












1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203







1204
1205
1206
1207
1208
1209
1210










1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225







+

+
+
+
-
-
-
-
+
+
+
+
+
+


+
+
+
-
-
-
-
-
+
+
+
+
+
+
+










+
+
+
+
+
+



-
+
-
-
+
-
-
-
-
-
+
+
+
+
+
+
-
-
-
+
+
-
-
-
-
-
+
+
+
+
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
-
-
-
-
+
+
+
+
+
+
+
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+







;; (define (db:updater)
;;   (debug:print-info 4 "Starting cache processing")
;;   (let loop ()
;;     (thread-sleep! 10) ;; move save time around to minimize regular collisions?
;;     (db:write-cached-data)
;;     (loop)))

;; NOTE: Can remove the regex and base64 encoding for zmq
(define (db:obj->string obj)
  (case *transport-type*
    ((fs) obj)
	((http)
  (string-substitute
   (regexp "=") "_"
   (base64:base64-encode (with-output-to-string (lambda ()(serialize obj))))
   #t))
     (string-substitute
       (regexp "=") "_"
         (base64:base64-encode (with-output-to-string (lambda ()(serialize obj))))
        #t))
    ((zmq)(with-output-to-string (lambda ()(serialize obj))))
    (else obj)))

(define (db:string->obj msg)
  (case *transport-type*
   ((fs) msg)
   ((http)
  (with-input-from-string 
      (base64:base64-decode
       (string-substitute 
	(regexp "_") "=" msg #t))
    (lambda ()(deserialize))))
    (with-input-from-string 
       (base64:base64-decode
         (string-substitute 
	   (regexp "_") "=" msg #t))
       (lambda ()(deserialize))))
   ((zmq)(with-input-from-string msg (lambda ()(deserialize))))
   (else msg)))

(define (cdb:use-non-blocking-mode proc)
  (set! *client-non-blocking-mode* #t)
  (let ((res (proc)))
    (set! *client-non-blocking-mode* #f)
    res))
  
;; params = 'target cached remparams
;;
;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime
;;
;; cdb:client-call is the unified interface to all the transports. It dispatches the
;;                 query to a server routine (e.g. server:client-send-recieve) that 
;;                 transports the data to the server where it is passed to db:process-queue-item
;;                 which either returns the data to the calling server routine or 
;;                 directly calls the returning procedure (e.g. zmq).
;;
(define (cdb:client-call serverdat qtype immediate numretries . params)
  (debug:print-info 11 "cdb:client-call serverdat=" serverdat ", qtype=" qtype ", immediate=" immediate ", numretries=" numretries ", params=" params)
  ;; (handle-exceptions
  (case *transport-type* 
  ;;  exn
  ;;  (begin
    ((fs)
  ;;    (thread-sleep! 5) 
  ;;    (if (> numretries 0)(apply cdb:client-call serverdat qtype immediate (- numretries 1) params)))
   (let* ((client-sig  (server:get-client-signature))
	  (query-sig   (message-digest-string (md5-primitive) (conc qtype immediate params)))
	  (zdat        (db:obj->string (vector client-sig qtype immediate query-sig params (current-seconds)))) ;; (with-output-to-string (lambda ()(serialize params))))
     (let ((packet (vector "na" qtype immediate "na" params 0)))
       (fs:process-queue-item packet)))
    ((http)
     (let* ((client-sig  (server:get-client-signature))
	    (query-sig   (message-digest-string (md5-primitive) (conc qtype immediate params)))
	    (zdat        (db:obj->string (vector client-sig qtype immediate query-sig params (current-seconds))))) ;; (with-output-to-string (lambda ()(serialize params))))
	  )
     (debug:print-info 11 "zdat=" zdat)
     (let* (
       (debug:print-info 11 "zdat=" zdat)
       (let* ((res  #f)
	  (res  #f)
	  (rawdat      (server:client-send-receive serverdat zdat))
	  (tmp         #f))
     (debug:print-info 11 "Sent " zdat ", received " rawdat)
     (set! tmp (db:string->obj rawdat))
	      (rawdat      (server:client-send-receive serverdat zdat))
	      (tmp         #f))
	 (debug:print-info 11 "Sent " zdat ", received " rawdat)
	 (set! tmp (db:string->obj rawdat))
     ;; (if (equal? query-sig (vector-ref myres 1))
     ;; (set! res
     (vector-ref tmp 2)
     ;; (loop (server:client-send-receive serverdat zdat)))))))
	  ;; (timeout (lambda ()
	  ;;            (let loop ((n numretries))
	  ;;              (thread-sleep! 15)
	  ;;              (if (not res)
	  ;;       	   (if (> numretries 0)
	  ;;       	       (begin
	  ;;       		 (debug:print 2 "WARNING: no reply to query " params ", trying resend")
	  ;;       		 (debug:print-info 11 "re-sending message")
	 (vector-ref tmp 2))))
    ((zmq)
     (handle-exceptions
      exn
      (begin
	(thread-sleep! 5) 
	(if (> numretries 0)(apply cdb:client-call zmq-sockets qtype immediate (- numretries 1) params)))
      (let* ((push-socket (vector-ref zmq-sockets 0))
	     (sub-socket  (vector-ref zmq-sockets 1))
	     (client-sig  (server:get-client-signature))
	     (query-sig   (message-digest-string (md5-primitive) (conc qtype immediate params)))
	     (zdat        (db:obj->string (vector client-sig qtype immediate query-sig params (current-seconds)))) ;; (with-output-to-string (lambda ()(serialize params))))
	     (res  #f)
	     (send-receive (lambda ()
			     (debug:print-info 11 "sending message")
			     (send-message push-socket zdat)
			     (debug:print-info 11 "message sent")
			     (let loop ()
			       ;; get the sender info
			       ;; this should match (server:get-client-signature)
			       ;; we will need to process "all" messages here some day
			       (receive-message* sub-socket)
			       ;; now get the actual message
			       (let ((myres (db:string->obj (receive-message* sub-socket))))
				 (if (equal? query-sig (vector-ref myres 1))
				     (set! res (vector-ref myres 2))
				     (loop))))))
	     (timeout (lambda ()
			(let loop ((n numretries))
			  (thread-sleep! 15)
			  (if (not res)
			      (if (> numretries 0)
				  (begin
				    (debug:print 2 "WARNING: no reply to query " params ", trying resend")
				    (debug:print-info 11 "re-sending message")
	  ;;       		 (apply cdb:client-call serverdat qtype immediate numretries params)
	  ;;       		 (debug:print-info 11 "message re-sent")
	  ;;       		 (loop (- n 1)))
	  ;;       	       ;; (apply cdb:client-call serverdats qtype immediate (- numretries 1) params))
	  ;;       	       (begin
	  ;;       		 (debug:print 0 "ERROR: cdb:client-call timed out " params ", exiting.")
	  ;;       		 (exit 5))))))))
				    (send-message push-socket zdat)
				    (debug:print-info 11 "message re-sent")
				    (loop (- n 1)))
				  ;; (apply cdb:client-call zmq-sockets qtype immediate (- numretries 1) params))
				  (begin
				    (debug:print 0 "ERROR: cdb:client-call timed out " params ", exiting.")
				    (exit 5))))))))
     ;; (send-receive)
     )))
     ;; (debug:print-info 11 "Starting threads")
     ;; (let ((th1 (make-thread send-receive "send receive"))
     ;;       (th2 (make-thread timeout      "timeout")))
     ;;   (thread-start! th1)
     ;;   (thread-start! th2)
     ;;   (thread-join!  th1)
     ;;   (debug:print-info 11 "cdb:client-call returning res=" res)
     ;;   res))))
	(debug:print-info 11 "Starting threads")
	(let ((th1 (make-thread send-receive "send receive"))
	      (th2 (make-thread timeout      "timeout")))
	  (thread-start! th1)
	  (thread-start! th2)
	  (thread-join!  th1)
	  (debug:print-info 11 "cdb:client-call returning res=" res)
	  res))))))
  
(define (cdb:set-verbosity serverdat val)
  (cdb:client-call serverdat 'set-verbosity #f *default-numtries* val))

(define (cdb:login serverdat keyval signature)
  (cdb:client-call serverdat 'login #t *default-numtries* keyval megatest-version signature))

Added fs-transport.scm version [d187681c70].













































1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+

;; Copyright 2006-2012, Matthew Welland.
;; 
;;  This program is made available under the GNU GPL version 2.0 or
;;  greater. See the accompanying file COPYING for details.
;; 
;;  This program is distributed WITHOUT ANY WARRANTY; without even the
;;  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
;;  PURPOSE.

(require-extension (srfi 18) extras tcp s11n)

(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest)
(import (prefix sqlite3 sqlite3:))

(use spiffy uri-common intarweb http-client spiffy-request-vars)

(tcp-buffer-size 2048)

(declare (unit fs-transport))

(declare (uses common))
(declare (uses db))
(declare (uses tests))
(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running.

(include "common_records.scm")
(include "db_records.scm")


;;======================================================================
;; F S   T R A N S P O R T   S E R V E R
;;======================================================================

;; There is no "server" per se but a convience routine to make it non
;; necessary to be reopening the db over and over again.
;;

(define (fs:process-queue-item packet)
  (if (not *megatest-db*) ;; we will require that (setup-for-run) has already been called
      (set! *megatest-db* (open-db)))
  (debug:print-info 11 "fs:process-queue-item called with packet=" packet)
  (db:process-queue-item *megatest-db* packet))
      

Added http-transport.scm version [19494de820].




































































































































































































































































































































1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+

;; Copyright 2006-2012, Matthew Welland.
;; 
;;  This program is made available under the GNU GPL version 2.0 or
;;  greater. See the accompanying file COPYING for details.
;; 
;;  This program is distributed WITHOUT ANY WARRANTY; without even the
;;  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
;;  PURPOSE.

(require-extension (srfi 18) extras tcp s11n)

(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest)
(import (prefix sqlite3 sqlite3:))

(use spiffy uri-common intarweb http-client spiffy-request-vars)

(tcp-buffer-size 2048)

(declare (unit http-transport))

(declare (uses common))
(declare (uses db))
(declare (uses tests))
(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running.
(declare (uses server))

(include "common_records.scm")
(include "db_records.scm")

(define (http-transport:make-server-url hostport)
  (if (not hostport)
      #f
      (conc "http://" (car hostport) ":" (cadr hostport))))

(define  *server-loop-heart-beat* (current-seconds))
(define *heartbeat-mutex* (make-mutex))

;;======================================================================
;; S E R V E R
;;======================================================================

;; Call this to start the actual server
;;

(define *db:process-queue-mutex* (make-mutex))

(define (http-transport:run hostn)
  (debug:print 2 "Attempting to start the server ...")
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting")
	    (exit))))
  (let* (;; (iface           (if (string=? "-" hostn)
	 ;;        	      #f ;; (get-host-name) 
	 ;;        	      hostn))
	 (db              #f) ;;        (open-db)) ;; we don't want the server to be opening and closing the db unnecesarily
	 (hostname        (get-host-name))
	 (ipaddrstr       (let ((ipstr (if (string=? "-" hostn)
					   (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
					   #f)))
			    (if ipstr ipstr hostn))) ;; hostname)))
	 (start-port    (if (args:get-arg "-port")
			    (string->number (args:get-arg "-port"))
			    (+ 5000 (random 1001))))
	 (link-tree-path (config-lookup *configdat* "setup" "linktree")))
    (set! *cache-on* #t)
    (root-path     (if link-tree-path 
		       link-tree-path
		       (current-directory))) ;; WARNING: SECURITY HOLE. FIX ASAP!

    ;; Setup the web server and a /ctrl interface
    ;;
    (vhost-map `(((* any) . ,(lambda (continue)
			       ;; open the db on the first call 
			       (if (not db)(set! db (open-db)))
			       (let* (($   (request-vars source: 'both))
				      (dat ($ 'dat))
				      (res #f))
				 (cond
				  ((equal? (uri-path (request-uri (current-request))) 
					   '(/ "hey"))
				   (send-response body: "hey there!\n"
						  headers: '((content-type text/plain))))
				  ;; This is the /ctrl path where data is handed to the server and
				  ;; responses 
				  ((equal? (uri-path (request-uri (current-request)))
					   '(/ "ctrl"))
				   (let* ((packet (db:string->obj dat))
					  (qtype  (cdb:packet-get-qtype packet)))
				     (debug:print-info 12 "server=> received packet=" packet)
				     (if (not (member qtype '(sync ping)))
					 (begin
					   (mutex-lock! *heartbeat-mutex*)
					   (set! *last-db-access* (current-seconds))
					   (mutex-unlock! *heartbeat-mutex*)))
				     ;; (mutex-lock! *db:process-queue-mutex*) ;; trying a mutex
				     ;; (set! res (open-run-close db:process-queue-item open-db packet))
				     (set! res (db:process-queue-item db packet))
				     ;; (mutex-unlock! *db:process-queue-mutex*)
				     (debug:print-info 11 "Return value from db:process-queue-item is " res)
				     (send-response body: (conc "<head>ctrl data</head>\n<body>"
								res
								"</body>")
						    headers: '((content-type text/plain)))))
				  (else (continue))))))))
    (http-transport:try-start-server ipaddrstr start-port)
    ;; lite3:finalize! db)))
    ))



;; (define (http-transport:main-loop)
;;   (print "INFO: Exectuing main server loop")
;;   (access-log "megatest-http.log")
;;   (server-bind-address #f)
;;   (define-page (main-page-path)
;;     (lambda ()
;;       (let ((dat ($ "dat")))
;;       ;; (with-request-variables (dat)
;;         (debug:print-info 12 "Got dat=" dat)
;; 	(let* ((packet (db:string->obj dat))
;; 	       (qtype  (cdb:packet-get-qtype packet)))
;; 	  (debug:print-info 12 "server=> received packet=" packet)
;; 	  (if (not (member qtype '(sync ping)))
;; 	      (begin
;; 		(mutex-lock! *heartbeat-mutex*)
;; 		(set! *last-db-access* (current-seconds))
;; 		(mutex-unlock! *heartbeat-mutex*)))
;; 	  (let ((res (open-run-close db:process-queue-item open-db packet)))
;; 	    (debug:print-info 11 "Return value from db:process-queue-item is " res)
;; 	    res))))))

;;; (use spiffy uri-common intarweb)
;;; 
;;; (root-path "/var/www")
;;; 
;;; (vhost-map `(((* any) . ,(lambda (continue)
;;;                            (if (equal? (uri-path (request-uri (current-request))) 
;;;                                        '(/ "hey"))
;;;                                (send-response body: "hey there!\n"
;;;                                               headers: '((content-type text/plain)))
;;;                                (continue))))))
;;; 
;;; (start-server port: 12345)

;; This is recursively run by http-transport:run until sucessful
;;
(define (http-transport:try-start-server ipaddrstr portnum)
  (handle-exceptions
   exn
   (begin
     (print-error-message exn)
     (if (< portnum 9000)
	 (begin 
	   (print "WARNING: failed to start on portnum: " portnum ", trying next port")
	   (thread-sleep! 0.1)
	   (open-run-close tasks:remove-server-records tasks:open-db)
	   (http-transport:try-start-server ipaddrstr (+ portnum 1)))
	 (print "ERROR: Tried and tried but could not start the server")))
   (set! *runremote* (list ipaddrstr portnum))
   (open-run-close tasks:remove-server-records tasks:open-db)
   (open-run-close tasks:server-register 
		   tasks:open-db 
		   (current-process-id)
		   ipaddrstr portnum 0 'live 'http)
   (print "INFO: Trying to start server on " ipaddrstr ":" portnum)
   ;; This starts the spiffy server
   (start-server port: portnum)
   (print "INFO: server has been stopped")))

(define (http-transport:mk-signature)
  (message-digest-string (md5-primitive) 
			 (with-output-to-string
			   (lambda ()
			     (write (list (current-directory)
					  (argv)))))))

;;======================================================================
;; S E R V E R   U T I L I T I E S 
;;======================================================================

;;======================================================================
;; C L I E N T S
;;======================================================================

;; <html>
;; <head></head>
;; <body>1 Hello, world! Goodbye Dolly</body></html>
;; Send msg to serverdat and receive result
(define (http-transport:client-send-receive serverdat msg)
  (let* ((url        (http-transport:make-server-url serverdat))
	 (fullurl    (conc url "/ctrl")) ;; (conc url "/?dat=" msg)))
	 (numretries 0))     
    (handle-exceptions
     exn
     (if (< numretries 200)
	 (http-transport:client-send-receive serverdat msg))
     (begin
       (debug:print-info 11 "fullurl=" fullurl "\n")
       ;; set up the http-client here
       (max-retry-attempts 100)
       (retry-request? (lambda (request)
			 (thread-sleep! (/ (if (> numretries 100) 100 numretries) 10))
			 (set! numretries (+ numretries 1))
			 #t))
       ;; send the data and get the response
       ;; extract the needed info from the http data and 
       ;; process and return it.
       (let* ((res   (with-input-from-request fullurl 
					      ;; #f
					      ;; msg 
					      (list (cons 'dat msg)) 
					      read-string)))
	 (debug:print-info 11 "got res=" res)
	 (let ((match (string-search (regexp "<body>(.*)<.body>") res)))
	   (debug:print-info 11 "match=" match)
	   (let ((final (cadr match)))
	     (debug:print-info 11 "final=" final)
	     final)))))))

(define (http-transport:client-connect iface port)
  (let* ((login-res   #f)
	 (serverdat   (list iface port)))
    (set! login-res (server:client-login serverdat))
    (if (and (not (null? login-res))
	     (car login-res))
	(begin
	  (debug:print-info 2 "Logged in and connected to " iface ":" port)
	  (set! *runremote* serverdat)
	  serverdat)
	(begin
	  (debug:print-info 2 "Failed to login or connect to " iface ":" port)
	  (set! *runremote* #f)
	  #f))))


;; run http-transport:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (http-transport:keep-running)
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  ;; This thread waits for the server to come alive
  (let* ((server-info (let loop ()
                        (let ((sdat #f))
                          (mutex-lock! *heartbeat-mutex*)
                          (set! sdat *runremote*)
                          (mutex-unlock! *heartbeat-mutex*)
                          (if sdat sdat
                              (begin
                                (sleep 4)
                                (loop))))))
         (iface       (car server-info))
         (port        (cadr server-info))
         (last-access 0)
	 (tdb         (tasks:open-db))
	 (spid        (tasks:server-get-server-id tdb #f iface port #f)))
    (print "Keep-running got server pid " spid ", using iface " iface " and port " port)
    (let loop ((count 0))
      (thread-sleep! 4) ;; no need to do this very often
      ;; NB// sync currently does NOT return queue-length
      (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1)))
      ;; (print "Server running, count is " count)
        (if (< count 1) ;; 3x3 = 9 secs aprox
            (loop (+ count 1)))
        
        ;; NOTE: Get rid of this mechanism! It really is not needed...
        (tasks:server-update-heartbeat tdb spid)
      
        ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access
        (mutex-lock! *heartbeat-mutex*)
        (set! last-access *last-db-access*)
        (mutex-unlock! *heartbeat-mutex*)
        (if (> (+ last-access
                  ;; (* 50 60 60)    ;; 48 hrs
                  ;; 60              ;; one minute
                  ;; (* 60 60)       ;; one hour
                  (* 45 60)          ;; 45 minutes, until the db deletion bug is fixed.
                  )
               (current-seconds))
            (begin
              (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access))
              (loop 0))
            (begin
              (debug:print-info 0 "Starting to shutdown the server.")
              ;; need to delete only *my* server entry (future use)
              (set! *time-to-exit* #t)
              (tasks:server-deregister-self tdb (get-host-name))
              (thread-sleep! 1)
              (debug:print-info 0 "Max cached queries was " *max-cache-size*)
              (debug:print-info 0 "Server shutdown complete. Exiting")
              (exit)))))))

;; all routes though here end in exit ...
(define (http-transport:launch)
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: cannot find megatest.config, exiting")
	    (exit))))
  (debug:print-info 2 "Starting the standalone server")
  (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db)))
    (debug:print 11 "http-transport:launch hostinfo=" hostinfo)
    (if hostinfo
	(debug:print-info 2 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo))
	(if *toppath* 
	    (let* ((th2 (make-thread (lambda ()
				       (http-transport:run 
					(if (args:get-arg "-server")
					    (args:get-arg "-server")
					    "-"))) "Server run"))
		   (th3 (make-thread (lambda ()(http-transport:keep-running)) "Keep running"))
		   )
	      (thread-start! th2)
	      (thread-start! th3)
	      (set! *didsomething* #t)
	      (thread-join! th2)
	      )
	    (debug:print 0 "ERROR: Failed to setup for megatest")))
    (exit)))

Modified megatest.scm from [d00ef9a849] to [52774a1066].

95
96
97
98
99
100
101

102

103
104
105
106
107
108
109
95
96
97
98
99
100
101
102

103
104
105
106
107
108
109
110







+
-
+







  -rebuild-db             : bring the database schema up to date
  -update-meta            : update the tests metadata for all tests
  -env2file fname         : write the environment to fname.csh and fname.sh
  -setvars VAR1=val1,VAR2=val2 : Add environment variables to a run NB// these are
                                 overwritten by values set in config files.
  -server -|hostname      : start the server (reduces contention on megatest.db), use
                            - to automatically figure out hostname
  -transport http|zmq     : use http or zmq for transport (default is http) 
  -list-servers            : list the servers 
  -list-servers           : list the servers 
  -repl                   : start a repl (useful for extending megatest)

Spreadsheet generation
  -extract-ods fname.ods  : extract an open document spreadsheet from the database
  -pathmod path           : insert path, i.e. path/runame/itempath/logfile.html
                            will clear the field if no rundir/testname/itempath/logfile
                            if it contains forward slashes the path will be converted
155
156
157
158
159
160
161

162
163
164
165
166
167
168
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170







+







			":variable"
			":value"
			":expected"
			":tol"
			":units"
			;; misc
			"-server"
			"-transport"
			"-kill-server"
			"-port"
			"-extract-ods"
			"-pathmod"
			"-env2file"
			"-setvars"
			"-set-state-status"
281
282
283
284
285
286
287
288
289
290



291
292
293
294
295
296
297

298
299
300


301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316

317
318
319
320
321
322
323
324
325
326
327
328
329

330
331
332
333
334
335
336
283
284
285
286
287
288
289



290
291
292
293
294
295
296
297
298

299
300


301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331

332
333
334
335
336
337
338
339







-
-
-
+
+
+






-
+

-
-
+
+
















+












-
+








;;======================================================================
;; Start the server - can be done in conjunction with -runall or -runtests (one day...)
;;   we start the server if not running else start the client thread
;;======================================================================

(if (args:get-arg "-server")
    (begin
      (debug:print 2 "Launching server...")
      (server:launch)))
    (let ((transport (args:get-arg "-transport" "http")))
      (debug:print 2 "Launching server using transport " transport)
      (server:launch (string->symbol transport))))

(if (args:get-arg "-list-servers")
	;; (args:get-arg "-kill-server"))
    (let ((tl (setup-for-run)))
      (if tl 
	  (let ((servers (open-run-close tasks:get-all-servers tasks:open-db))
		(fmtstr  "~5a~8a~8a~20a~20a~10a~10a~10a~10a\n")
		(fmtstr  "~5a~8a~8a~20a~20a~10a~10a~10a~10a~10a\n")
		(servers-to-kill '()))
	    (format #t fmtstr "Id" "MTver" "Pid" "Host" "Interface" "OutPort" "InPort" "LastBeat" "State")
	    (format #t fmtstr "==" "=====" "===" "====" "=========" "=======" "======" "========" "=====")
	    (format #t fmtstr "Id" "MTver" "Pid" "Host" "Interface" "OutPort" "InPort" "LastBeat" "State" "Transport")
	    (format #t fmtstr "==" "=====" "===" "====" "=========" "=======" "======" "========" "=====" "=========")
	    (for-each 
	     (lambda (server)
	       (let* (;; (killinfo   (args:get-arg "-kill-server"))
		      ;; (khost-port (if killinfo (if (substring-index ":" killinfo)(string-split ":") #f) #f))
		      ;; (kpid       (if killinfo (if (substring-index ":" killinfo) #f (string->number killinfo)) #f))
		      (id         (vector-ref server 0))
		      (pid        (vector-ref server 1))
		      (hostname   (vector-ref server 2))
		      (interface  (vector-ref server 3))
		      (pullport   (vector-ref server 4))
		      (pubport    (vector-ref server 5))
		      (start-time (vector-ref server 6))
		      (priority   (vector-ref server 7))
		      (state      (vector-ref server 8))
		      (mt-ver     (vector-ref server 9))
		      (last-update (vector-ref server 10)) ;;   (open-run-close tasks:server-alive? tasks:open-db #f hostname: hostname port: port))
		      (transport  (vector-ref server 11))
		      (killed     #f)
		      (status     (< last-update 20)))
		 ;;   (zmq-sockets (if status (server:client-connect hostname port) #f)))
		 ;; no need to login as status of #t indicates we are connecting to correct 
		 ;; server
		 (if (equal? state "dead")
		     (if (> last-update (* 25 60 60)) ;; keep records around for slighly over a day.
			 (open-run-close tasks:server-deregister tasks:open-db hostname pullport: pullport pid: pid action: 'delete))
		     (if (> last-update 20)        ;; Mark as dead if not updated in last 20 seconds
			 (open-run-close tasks:server-deregister tasks:open-db hostname pullport: pullport pid: pid)))

		 (format #t fmtstr id mt-ver pid hostname interface pullport pubport last-update
			 (if status "alive" "dead"))))
			 (if status "alive" "dead") transport)))
	     servers)
	    (debug:print-info 1 "Done with listservers")
	    (set! *didsomething* #t)
	    (exit) ;; must do, would have to add checks to many/all calls below
	    )
	  (exit)))
    ;; if not list or kill then start a client (if appropriate)

Added rpc-transport.scm version [12f5deda72].





























































































































































































































































































































































































































1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+

;; Copyright 2006-2012, Matthew Welland.
;; 
;;  This program is made available under the GNU GPL version 2.0 or
;;  greater. See the accompanying file COPYING for details.
;; 
;;  This program is distributed WITHOUT ANY WARRANTY; without even the
;;  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
;;  PURPOSE.

(require-extension (srfi 18) extras tcp s11n)

(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest)
(import (prefix sqlite3 sqlite3:))

(use spiffy uri-common intarweb http-client spiffy-request-vars)

(tcp-buffer-size 2048)

(declare (unit server))

(declare (uses common))
(declare (uses db))
(declare (uses tests))
(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running.

(include "common_records.scm")
(include "db_records.scm")

(define (server:make-server-url hostport)
  (if (not hostport)
      #f
      (conc "http://" (car hostport) ":" (cadr hostport))))

(define  *server-loop-heart-beat* (current-seconds))
(define *heartbeat-mutex* (make-mutex))

;;======================================================================
;; S E R V E R
;;======================================================================

;; Call this to start the actual server
;;

(define *db:process-queue-mutex* (make-mutex))

(define (server:run hostn)
  (debug:print 2 "Attempting to start the server ...")
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting")
	    (exit))))
  (let* (;; (iface           (if (string=? "-" hostn)
	 ;;        	      #f ;; (get-host-name) 
	 ;;        	      hostn))
	 (db              #f) ;;        (open-db)) ;; we don't want the server to be opening and closing the db unnecesarily
	 (hostname        (get-host-name))
	 (ipaddrstr       (let ((ipstr (if (string=? "-" hostn)
					   (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
					   #f)))
			    (if ipstr ipstr hostn))) ;; hostname)))
	 (start-port    (if (args:get-arg "-port")
			    (string->number (args:get-arg "-port"))
			    (+ 5000 (random 1001))))
	 (link-tree-path (config-lookup *configdat* "setup" "linktree")))
    (set! *cache-on* #t)
    (root-path     (if link-tree-path 
		       link-tree-path
		       (current-directory))) ;; WARNING: SECURITY HOLE. FIX ASAP!

    ;; Setup the web server and a /ctrl interface
    ;;
    (vhost-map `(((* any) . ,(lambda (continue)
			       ;; open the db on the first call 
			       (if (not db)(set! db (open-db)))
			       (let* (($   (request-vars source: 'both))
				      (dat ($ 'dat))
				      (res #f))
				 (cond
				  ((equal? (uri-path (request-uri (current-request))) 
					   '(/ "hey"))
				   (send-response body: "hey there!\n"
						  headers: '((content-type text/plain))))
				  ;; This is the /ctrl path where data is handed to the server and
				  ;; responses 
				  ((equal? (uri-path (request-uri (current-request)))
					   '(/ "ctrl"))
				   (let* ((packet (db:string->obj dat))
					  (qtype  (cdb:packet-get-qtype packet)))
				     (debug:print-info 12 "server=> received packet=" packet)
				     (if (not (member qtype '(sync ping)))
					 (begin
					   (mutex-lock! *heartbeat-mutex*)
					   (set! *last-db-access* (current-seconds))
					   (mutex-unlock! *heartbeat-mutex*)))
				     ;; (mutex-lock! *db:process-queue-mutex*) ;; trying a mutex
				     ;; (set! res (open-run-close db:process-queue-item open-db packet))
				     (set! res (db:process-queue-item db packet))
				     ;; (mutex-unlock! *db:process-queue-mutex*)
				     (debug:print-info 11 "Return value from db:process-queue-item is " res)
				     (send-response body: (conc "<head>ctrl data</head>\n<body>"
								res
								"</body>")
						    headers: '((content-type text/plain)))))
				  (else (continue))))))))
    (server:try-start-server ipaddrstr start-port)
    ;; lite3:finalize! db)))
    ))



;; (define (server:main-loop)
;;   (print "INFO: Exectuing main server loop")
;;   (access-log "megatest-http.log")
;;   (server-bind-address #f)
;;   (define-page (main-page-path)
;;     (lambda ()
;;       (let ((dat ($ "dat")))
;;       ;; (with-request-variables (dat)
;;         (debug:print-info 12 "Got dat=" dat)
;; 	(let* ((packet (db:string->obj dat))
;; 	       (qtype  (cdb:packet-get-qtype packet)))
;; 	  (debug:print-info 12 "server=> received packet=" packet)
;; 	  (if (not (member qtype '(sync ping)))
;; 	      (begin
;; 		(mutex-lock! *heartbeat-mutex*)
;; 		(set! *last-db-access* (current-seconds))
;; 		(mutex-unlock! *heartbeat-mutex*)))
;; 	  (let ((res (open-run-close db:process-queue-item open-db packet)))
;; 	    (debug:print-info 11 "Return value from db:process-queue-item is " res)
;; 	    res))))))

;;; (use spiffy uri-common intarweb)
;;; 
;;; (root-path "/var/www")
;;; 
;;; (vhost-map `(((* any) . ,(lambda (continue)
;;;                            (if (equal? (uri-path (request-uri (current-request))) 
;;;                                        '(/ "hey"))
;;;                                (send-response body: "hey there!\n"
;;;                                               headers: '((content-type text/plain)))
;;;                                (continue))))))
;;; 
;;; (start-server port: 12345)

;; This is recursively run by server:run until sucessful
;;
(define (server:try-start-server ipaddrstr portnum)
  (handle-exceptions
   exn
   (begin
     (print-error-message exn)
     (if (< portnum 9000)
	 (begin 
	   (print "WARNING: failed to start on portnum: " portnum ", trying next port")
	   (thread-sleep! 0.1)
	   (open-run-close tasks:remove-server-records tasks:open-db)
	   (server:try-start-server ipaddrstr (+ portnum 1)))
	 (print "ERROR: Tried and tried but could not start the server")))
   (set! *runremote* (list ipaddrstr portnum))
   (open-run-close tasks:remove-server-records tasks:open-db)
   (open-run-close tasks:server-register 
		   tasks:open-db 
		   (current-process-id)
		   ipaddrstr portnum 0 'live)
   (print "INFO: Trying to start server on " ipaddrstr ":" portnum)
   ;; This starts the spiffy server
   (start-server port: portnum)
   (print "INFO: server has been stopped")))

(define (server:mk-signature)
  (message-digest-string (md5-primitive) 
			 (with-output-to-string
			   (lambda ()
			     (write (list (current-directory)
					  (argv)))))))

;;======================================================================
;; S E R V E R   U T I L I T I E S 
;;======================================================================

;; When using zmq this would send the message back (two step process)
;; with spiffy or rpc this simply returns the return data to be returned
;; 
(define (server:reply return-addr query-sig success/fail result)
  (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result)
  ;; (send-message pubsock target send-more: #t)
  ;; (send-message pubsock 
  (db:obj->string (vector success/fail query-sig result)))

;;======================================================================
;; C L I E N T S
;;======================================================================

(define (server:get-client-signature)
  (if *my-client-signature* *my-client-signature*
      (let ((sig (server:mk-signature)))
	(set! *my-client-signature* sig)
	*my-client-signature*)))

;; <html>
;; <head></head>
;; <body>1 Hello, world! Goodbye Dolly</body></html>
;; Send msg to serverdat and receive result
(define (server:client-send-receive serverdat msg)
  (let* ((url        (server:make-server-url serverdat))
	 (fullurl    (conc url "/ctrl")) ;; (conc url "/?dat=" msg)))
	 (numretries 0))     
    (handle-exceptions
     exn
     (if (< numretries 200)
	 (server:client-send-receive serverdat msg))
     (begin
       (debug:print-info 11 "fullurl=" fullurl "\n")
       ;; set up the http-client here
       (max-retry-attempts 100)
       (retry-request? (lambda (request)
			 (thread-sleep! (/ (if (> numretries 100) 100 numretries) 10))
			 (set! numretries (+ numretries 1))
			 #t))
       ;; send the data and get the response
       ;; extract the needed info from the http data and 
       ;; process and return it.
       (let* ((res   (with-input-from-request fullurl 
					      ;; #f
					      ;; msg 
					      (list (cons 'dat msg)) 
					      read-string)))
	 (debug:print-info 11 "got res=" res)
	 (let ((match (string-search (regexp "<body>(.*)<.body>") res)))
	   (debug:print-info 11 "match=" match)
	   (let ((final (cadr match)))
	     (debug:print-info 11 "final=" final)
	     final)))))))

(define (server:client-login serverdat)
  (max-retry-attempts 100)
  (cdb:login serverdat *toppath* (server:get-client-signature)))

;; Not currently used! But, I think it *should* be used!!!
(define (server:client-logout serverdat)
  (let ((ok (and (socket? serverdat)
		 (cdb:logout serverdat *toppath* (server:get-client-signature)))))
    ;; (close-socket serverdat)
    ok))

(define (server:client-connect iface port)
  (let* ((login-res   #f)
	 (serverdat   (list iface port)))
    (set! login-res (server:client-login serverdat))
    (if (and (not (null? login-res))
	     (car login-res))
	(begin
	  (debug:print-info 2 "Logged in and connected to " iface ":" port)
	  (set! *runremote* serverdat)
	  serverdat)
	(begin
	  (debug:print-info 2 "Failed to login or connect to " iface ":" port)
	  (set! *runremote* #f)
	  #f))))

;; Do all the connection work, start a server if not already running
(define (server:client-setup #!key (numtries 50))
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: failed to find megatest.config, exiting")
	    (exit))))
  (let ((hostinfo   (open-run-close tasks:get-best-server tasks:open-db)))
    (if hostinfo
	(let ((host     (list-ref hostinfo 0))
	      (iface    (list-ref hostinfo 1))
	      (port     (list-ref hostinfo 2))
	      (pid      (list-ref hostinfo 3)))
	  (debug:print-info 2 "Setting up to connect to " hostinfo)
	  (server:client-connect iface port)) ;; )
	(if (> numtries 0)
	    (let ((exe (car (argv)))
		  (pid #f))
	      (debug:print-info 0 "No server available, attempting to start one...")
	      (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*)
	        							  (string-intersperse *verbosity* ",")
	        							  (conc *verbosity*)))))
	      ;; (set! pid (process-fork (lambda ()
	      ;;   			(current-input-port  (open-input-file  "/dev/null"))
	      ;;   			(current-output-port (open-output-file "/dev/null"))
	      ;;   			(current-error-port  (open-output-file "/dev/null"))
	      ;;   			(server:launch))))
	      (let loop ((count 0))
		(let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db)))
		  (if (not hostinfo)
		      (begin
			(debug:print-info 0 "Waiting for server pid=" pid " to start")
			(sleep 2) ;; give server time to start
			(if (< count 5)
			    (loop (+ count 1)))))))
	      ;; we are starting a server, do not try again! That can lead to 
	      ;; recursively starting many processes!!!
	      (server:client-setup numtries: 0))
	    (debug:print-info 1 "Too many attempts, giving up")))))

;; run server:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (server:keep-running)
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  ;; This thread waits for the server to come alive
  (let* ((server-info (let loop ()
                        (let ((sdat #f))
                          (mutex-lock! *heartbeat-mutex*)
                          (set! sdat *runremote*)
                          (mutex-unlock! *heartbeat-mutex*)
                          (if sdat sdat
                              (begin
                                (sleep 4)
                                (loop))))))
         (iface       (car server-info))
         (port        (cadr server-info))
         (last-access 0)
	 (tdb         (tasks:open-db))
	 (spid        (tasks:server-get-server-id tdb #f iface port #f)))
    (print "Keep-running got server pid " spid ", using iface " iface " and port " port)
    (let loop ((count 0))
      (thread-sleep! 4) ;; no need to do this very often
      ;; NB// sync currently does NOT return queue-length
      (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1)))
      ;; (print "Server running, count is " count)
        (if (< count 1) ;; 3x3 = 9 secs aprox
            (loop (+ count 1)))
        
        ;; NOTE: Get rid of this mechanism! It really is not needed...
        (tasks:server-update-heartbeat tdb spid)
      
        ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access
        (mutex-lock! *heartbeat-mutex*)
        (set! last-access *last-db-access*)
        (mutex-unlock! *heartbeat-mutex*)
        (if (> (+ last-access
                  ;; (* 50 60 60)    ;; 48 hrs
                  ;; 60              ;; one minute
                  ;; (* 60 60)       ;; one hour
                  (* 45 60)          ;; 45 minutes, until the db deletion bug is fixed.
                  )
               (current-seconds))
            (begin
              (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access))
              (loop 0))
            (begin
              (debug:print-info 0 "Starting to shutdown the server.")
              ;; need to delete only *my* server entry (future use)
              (set! *time-to-exit* #t)
              (tasks:server-deregister-self tdb (get-host-name))
              (thread-sleep! 1)
              (debug:print-info 0 "Max cached queries was " *max-cache-size*)
              (debug:print-info 0 "Server shutdown complete. Exiting")
              (exit)))))))

;; all routes though here end in exit ...
(define (server:launch)
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: cannot find megatest.config, exiting")
	    (exit))))
  (debug:print-info 2 "Starting the standalone server")
  (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db)))
    (debug:print 11 "server:launch hostinfo=" hostinfo)
    (if hostinfo
	(debug:print-info 2 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo))
	(if *toppath* 
	    (let* ((th2 (make-thread (lambda ()
				       (server:run 
					(if (args:get-arg "-server")
					    (args:get-arg "-server")
					    "-"))) "Server run"))
		   (th3 (make-thread (lambda ()(server:keep-running)) "Keep running"))
		   )
	      (thread-start! th2)
	      (thread-start! th3)
	      (set! *didsomething* #t)
	      (thread-join! th2)
	      )
	    (debug:print 0 "ERROR: Failed to setup for megatest")))
    (exit)))

(define (server:client-signal-handler signum)
  (handle-exceptions
   exn
   (debug:print " ... exiting ...")
   (let ((th1 (make-thread (lambda ()
			     "") ;; do nothing for now (was flush out last call if applicable)
			   "eat response"))
	 (th2 (make-thread (lambda ()
			     (debug:print 0 "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.")
			     (thread-sleep! 1) ;; give the flush one second to do it's stuff
			     (debug:print 0 "       Done.")
			     (exit 4))
			   "exit on ^C timer")))
     (thread-start! th2)
     (thread-start! th1)
     (thread-join! th2))))

(define (server:client-launch)
  (set-signal-handler! signal/int server:client-signal-handler)
   (if (server:client-setup)
       (debug:print-info 2 "connected as client")
       (begin
	 (debug:print 0 "ERROR: Failed to connect as client")
	 (exit))))

Modified server.scm from [12f5deda72] to [64baef49da].

11
12
13
14
15
16
17
18
19
20
21
22
23
24
25


26
27
28
29
30
31
32
11
12
13
14
15
16
17


18
19
20
21
22
23
24
25
26
27
28
29
30
31
32







-
-






+
+







(require-extension (srfi 18) extras tcp s11n)

(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest)
(import (prefix sqlite3 sqlite3:))

(use spiffy uri-common intarweb http-client spiffy-request-vars)

(tcp-buffer-size 2048)

(declare (unit server))

(declare (uses common))
(declare (uses db))
(declare (uses tests))
(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running.
(declare (uses http-transport))
(declare (uses zmq-transport))

(include "common_records.scm")
(include "db_records.scm")

(define (server:make-server-url hostport)
  (if (not hostport)
      #f
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189


190







191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263



264
265
266
267
268
269

270
271


272
273
274
275
276





277
278
279

280
281

282
283

284
285
286
287
288
289
290
291
292
293





294
295
296
297
298
299
300

301
302

303
304
305
306
307
308
309
310
311
312
313




314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332

333
334
335

336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361

362
363
364
365
366
367
368
369





370
371
372

373
374

375
376
377
378
379
380
381
382
383
384
385
386


387
388
389
390
391
392
393
105
106
107
108
109
110
111




























































112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131

132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149



































150

151
152
153
154
155
156

157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173

174
175
176
177
178
179
180
181
182
183


184
185





186
187
188
189
190



191


192


193










194
195
196
197
198







199


200











201
202
203
204



















205



206























207
208

209
210
211
212
213
214



215
216
217
218
219



220


221












222
223
224
225
226
227
228
229
230







-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-


















+
+
-
+
+
+
+
+
+
+











-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-

-






-

















-
+
+
+






+
-
-
+
+
-
-
-
-
-
+
+
+
+
+
-
-
-
+
-
-
+
-
-
+
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
-
-
-
-
-
-
-
+
-
-
+
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-


-
+





-
-
-
+
+
+
+
+
-
-
-
+
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
+
+







						    headers: '((content-type text/plain)))))
				  (else (continue))))))))
    (server:try-start-server ipaddrstr start-port)
    ;; lite3:finalize! db)))
    ))



;; (define (server:main-loop)
;;   (print "INFO: Exectuing main server loop")
;;   (access-log "megatest-http.log")
;;   (server-bind-address #f)
;;   (define-page (main-page-path)
;;     (lambda ()
;;       (let ((dat ($ "dat")))
;;       ;; (with-request-variables (dat)
;;         (debug:print-info 12 "Got dat=" dat)
;; 	(let* ((packet (db:string->obj dat))
;; 	       (qtype  (cdb:packet-get-qtype packet)))
;; 	  (debug:print-info 12 "server=> received packet=" packet)
;; 	  (if (not (member qtype '(sync ping)))
;; 	      (begin
;; 		(mutex-lock! *heartbeat-mutex*)
;; 		(set! *last-db-access* (current-seconds))
;; 		(mutex-unlock! *heartbeat-mutex*)))
;; 	  (let ((res (open-run-close db:process-queue-item open-db packet)))
;; 	    (debug:print-info 11 "Return value from db:process-queue-item is " res)
;; 	    res))))))

;;; (use spiffy uri-common intarweb)
;;; 
;;; (root-path "/var/www")
;;; 
;;; (vhost-map `(((* any) . ,(lambda (continue)
;;;                            (if (equal? (uri-path (request-uri (current-request))) 
;;;                                        '(/ "hey"))
;;;                                (send-response body: "hey there!\n"
;;;                                               headers: '((content-type text/plain)))
;;;                                (continue))))))
;;; 
;;; (start-server port: 12345)

;; This is recursively run by server:run until sucessful
;;
(define (server:try-start-server ipaddrstr portnum)
  (handle-exceptions
   exn
   (begin
     (print-error-message exn)
     (if (< portnum 9000)
	 (begin 
	   (print "WARNING: failed to start on portnum: " portnum ", trying next port")
	   (thread-sleep! 0.1)
	   (open-run-close tasks:remove-server-records tasks:open-db)
	   (server:try-start-server ipaddrstr (+ portnum 1)))
	 (print "ERROR: Tried and tried but could not start the server")))
   (set! *runremote* (list ipaddrstr portnum))
   (open-run-close tasks:remove-server-records tasks:open-db)
   (open-run-close tasks:server-register 
		   tasks:open-db 
		   (current-process-id)
		   ipaddrstr portnum 0 'live)
   (print "INFO: Trying to start server on " ipaddrstr ":" portnum)
   ;; This starts the spiffy server
   (start-server port: portnum)
   (print "INFO: server has been stopped")))

(define (server:mk-signature)
  (message-digest-string (md5-primitive) 
			 (with-output-to-string
			   (lambda ()
			     (write (list (current-directory)
					  (argv)))))))

;;======================================================================
;; S E R V E R   U T I L I T I E S 
;;======================================================================

;; When using zmq this would send the message back (two step process)
;; with spiffy or rpc this simply returns the return data to be returned
;; 
(define (server:reply return-addr query-sig success/fail result)
  (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result)
  ;; (send-message pubsock target send-more: #t)
  ;; (send-message pubsock 
  (case *transport-type*
    ((fs) result)
  (db:obj->string (vector success/fail query-sig result)))
    ((http)(db:obj->string (vector success/fail query-sig result)))
    ((zmq)
     (send-message pubsock target send-more: #t)
     (send-message pubsock (db:obj->string (vector success/fail query-sig result))))
    (else 
     (debug:print 0 "ERROR: unrecognised transport type: " *transport-type*)
     result)))

;;======================================================================
;; C L I E N T S
;;======================================================================

(define (server:get-client-signature)
  (if *my-client-signature* *my-client-signature*
      (let ((sig (server:mk-signature)))
	(set! *my-client-signature* sig)
	*my-client-signature*)))

;; <html>
;; <head></head>
;; <body>1 Hello, world! Goodbye Dolly</body></html>
;; Send msg to serverdat and receive result
(define (server:client-send-receive serverdat msg)
  (let* ((url        (server:make-server-url serverdat))
	 (fullurl    (conc url "/ctrl")) ;; (conc url "/?dat=" msg)))
	 (numretries 0))     
    (handle-exceptions
     exn
     (if (< numretries 200)
	 (server:client-send-receive serverdat msg))
     (begin
       (debug:print-info 11 "fullurl=" fullurl "\n")
       ;; set up the http-client here
       (max-retry-attempts 100)
       (retry-request? (lambda (request)
			 (thread-sleep! (/ (if (> numretries 100) 100 numretries) 10))
			 (set! numretries (+ numretries 1))
			 #t))
       ;; send the data and get the response
       ;; extract the needed info from the http data and 
       ;; process and return it.
       (let* ((res   (with-input-from-request fullurl 
					      ;; #f
					      ;; msg 
					      (list (cons 'dat msg)) 
					      read-string)))
	 (debug:print-info 11 "got res=" res)
	 (let ((match (string-search (regexp "<body>(.*)<.body>") res)))
	   (debug:print-info 11 "match=" match)
	   (let ((final (cadr match)))
	     (debug:print-info 11 "final=" final)
	     final)))))))

(define (server:client-login serverdat)
  (max-retry-attempts 100)
  (cdb:login serverdat *toppath* (server:get-client-signature)))

;; Not currently used! But, I think it *should* be used!!!
(define (server:client-logout serverdat)
  (let ((ok (and (socket? serverdat)
		 (cdb:logout serverdat *toppath* (server:get-client-signature)))))
    ;; (close-socket serverdat)
    ok))

(define (server:client-connect iface port)
  (let* ((login-res   #f)
	 (serverdat   (list iface port)))
    (set! login-res (server:client-login serverdat))
    (if (and (not (null? login-res))
	     (car login-res))
	(begin
	  (debug:print-info 2 "Logged in and connected to " iface ":" port)
	  (set! *runremote* serverdat)
	  serverdat)
	(begin
	  (debug:print-info 2 "Failed to login or connect to " iface ":" port)
	  (set! *runremote* #f)
	  #f))))

;; Do all the connection work, start a server if not already running
;; Do all the connection work, look up the transport type and set up the
;; connection if required.
;;
(define (server:client-setup #!key (numtries 50))
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: failed to find megatest.config, exiting")
	    (exit))))
  (let* ((hostinfo   (if (not *transport-type*) ;; If we dont' already have transport type set then figure it out
  (let ((hostinfo   (open-run-close tasks:get-best-server tasks:open-db)))
    (if hostinfo
			 (open-run-close tasks:get-best-server tasks:open-db)
			 #f)))
	(let ((host     (list-ref hostinfo 0))
	      (iface    (list-ref hostinfo 1))
	      (port     (list-ref hostinfo 2))
	      (pid      (list-ref hostinfo 3)))
	  (debug:print-info 2 "Setting up to connect to " hostinfo)
    ;; if have hostinfo then extract the transport type 
    ;; else fall back to fs
    ;; 
    (set! *transport-type* (if hostinfo 
			       (string->vector (tasks:hostinfo-get-transport hostinfo))
	  (server:client-connect iface port)) ;; )
	(if (> numtries 0)
	    (let ((exe (car (argv)))
			       'fs))
		  (pid #f))
	      (debug:print-info 0 "No server available, attempting to start one...")
    (debug:print-info 1 "Using transport type of " *transport-type* (if hostinfo (conc " to connect to " hostinfo) ""))
	      (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*)
	        							  (string-intersperse *verbosity* ",")
    (case *transport-type* 
	        							  (conc *verbosity*)))))
	      ;; (set! pid (process-fork (lambda ()
	      ;;   			(current-input-port  (open-input-file  "/dev/null"))
	      ;;   			(current-output-port (open-output-file "/dev/null"))
	      ;;   			(current-error-port  (open-output-file "/dev/null"))
	      ;;   			(server:launch))))
	      (let loop ((count 0))
		(let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db)))
		  (if (not hostinfo)
		      (begin
    ((fs)(if (not *megatest-db*)(set! *megatest-db* (open-db))))
    ((http)
     (http-transport:client-connect (tasks:hostinfo-get-interface hostinfo)
				    (tasks:hostinfo-get-port hostinfo)))
    ((zmq)
			(debug:print-info 0 "Waiting for server pid=" pid " to start")
			(sleep 2) ;; give server time to start
			(if (< count 5)
			    (loop (+ count 1)))))))
	      ;; we are starting a server, do not try again! That can lead to 
	      ;; recursively starting many processes!!!
	      (server:client-setup numtries: 0))
     (zmq-transport:client-connect (tasks:hostinfo-get-interface hostinfo)
	    (debug:print-info 1 "Too many attempts, giving up")))))

				   (tasks:hostinfo-get-port      hostinfo)
;; run server:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (server:keep-running)
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  ;; This thread waits for the server to come alive
  (let* ((server-info (let loop ()
                        (let ((sdat #f))
                          (mutex-lock! *heartbeat-mutex*)
                          (set! sdat *runremote*)
				   (tasks:hostinfo-get-pubport   hostinfo)))
    (else  ;; default to fs
     (set! *transport-type* 'fs)
     (set! *megatest-db*    (open-db))))))
                          (mutex-unlock! *heartbeat-mutex*)
                          (if sdat sdat
                              (begin
                                (sleep 4)
                                (loop))))))
         (iface       (car server-info))
         (port        (cadr server-info))
         (last-access 0)
	 (tdb         (tasks:open-db))
	 (spid        (tasks:server-get-server-id tdb #f iface port #f)))
    (print "Keep-running got server pid " spid ", using iface " iface " and port " port)
    (let loop ((count 0))
      (thread-sleep! 4) ;; no need to do this very often
      ;; NB// sync currently does NOT return queue-length
      (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1)))
      ;; (print "Server running, count is " count)
        (if (< count 1) ;; 3x3 = 9 secs aprox
            (loop (+ count 1)))
        

        ;; NOTE: Get rid of this mechanism! It really is not needed...
        (tasks:server-update-heartbeat tdb spid)
      

        ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access
        (mutex-lock! *heartbeat-mutex*)
        (set! last-access *last-db-access*)
        (mutex-unlock! *heartbeat-mutex*)
        (if (> (+ last-access
                  ;; (* 50 60 60)    ;; 48 hrs
                  ;; 60              ;; one minute
                  ;; (* 60 60)       ;; one hour
                  (* 45 60)          ;; 45 minutes, until the db deletion bug is fixed.
                  )
               (current-seconds))
            (begin
              (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access))
              (loop 0))
            (begin
              (debug:print-info 0 "Starting to shutdown the server.")
              ;; need to delete only *my* server entry (future use)
              (set! *time-to-exit* #t)
              (tasks:server-deregister-self tdb (get-host-name))
              (thread-sleep! 1)
              (debug:print-info 0 "Max cached queries was " *max-cache-size*)
              (debug:print-info 0 "Server shutdown complete. Exiting")
              (exit)))))))

;; all routes though here end in exit ...
(define (server:launch)
(define (server:launch transport)
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: cannot find megatest.config, exiting")
	    (exit))))
  (debug:print-info 2 "Starting the standalone server")
  (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db)))
    (debug:print 11 "server:launch hostinfo=" hostinfo)
  (debug:print-info 2 "Starting server using " transport " transport")
  (set! *transport-type* transport)
  (case transport
    ((fs)   (exit)) ;; there is no "fs" transport
    ((http) (http-transport:launch))
    (if hostinfo
	(debug:print-info 2 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo))
	(if *toppath* 
    ((zmq)  (zmq-transport:launch))
	    (let* ((th2 (make-thread (lambda ()
				       (server:run 
    (else
					(if (args:get-arg "-server")
					    (args:get-arg "-server")
					    "-"))) "Server run"))
		   (th3 (make-thread (lambda ()(server:keep-running)) "Keep running"))
		   )
	      (thread-start! th2)
	      (thread-start! th3)
	      (set! *didsomething* #t)
	      (thread-join! th2)
	      )
	    (debug:print 0 "ERROR: Failed to setup for megatest")))
    (exit)))
     (debug:print "WARNING: unrecognised transport " transport)
     (exit))))

(define (server:client-signal-handler signum)
  (handle-exceptions
   exn
   (debug:print " ... exiting ...")
   (let ((th1 (make-thread (lambda ()
			     "") ;; do nothing for now (was flush out last call if applicable)

Modified tasks.scm from [7e2c4cdfd8] to [927320b7b5].

59
60
61
62
63
64
65

66
67
68
69
70

71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87








88
89

90
91
92
93
94
95
96




97
98
99


100
101
102
103
104
105
106
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98

99
100
101
102




103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118







+





+

















+
+
+
+
+
+
+
+

-
+



-
-
-
-
+
+
+
+



+
+







                                username TEXT,
                               CONSTRAINT monitors_constraint UNIQUE (pid,hostname));")
	  (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS servers (id INTEGER PRIMARY KEY,
                                  pid INTEGER,
                                  interface TEXT,
                                  hostname TEXT,
                                  port INTEGER,
                                  pubport INTEGER,
                                  start_time TIMESTAMP,
                                  priority INTEGER,
                                  state TEXT,
                                  mt_version TEXT,
                                  heartbeat TIMESTAMP,
                                  transport TEXT,
                               CONSTRAINT servers_constraint UNIQUE (pid,hostname,port));")
	  (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS clients (id INTEGER PRIMARY KEY,
                                  server_id INTEGER,
                                  pid INTEGER,
                                  hostname TEXT,
                                  cmdline TEXT,
                                  login_time TIMESTAMP,
                                  logout_time TIMESTAMP DEFAULT -1,
                                CONSTRAINT clients_constraint UNIQUE (pid,hostname));")
                                  
	  ))
    mdb))
    
;;======================================================================
;; Server and client management
;;======================================================================

;; make-vector-record tasks hostinfo id interface port pubport transport
(define (make-tasks:hostinfo)(make-vector 5))
(define (tasks:hostinfo-get-id          vec)    (vector-ref  vec 0))
(define (tasks:hostinfo-get-interface   vec)    (vector-ref  vec 1))
(define (tasks:hostinfo-get-port        vec)    (vector-ref  vec 2))
(define (tasks:hostinfo-get-pubport     vec)    (vector-ref  vec 3))
(define (tasks:hostinfo-get-transport   vec)    (vector-ref  vec 4))

;; state: 'live, 'shutting-down, 'dead
(define (tasks:server-register mdb pid interface port priority state)
(define (tasks:server-register mdb pid interface port priority state transport #!key (pubport -1))
  (debug:print-info 11 "tasks:server-register " pid " " interface " " port " " priority " " state)
  (sqlite3:execute 
   mdb 
   "INSERT OR REPLACE INTO servers (pid,hostname,port,start_time,priority,state,mt_version,heartbeat,interface)
                             VALUES(?,  ?,       ?, strftime('%s','now'), ?, ?, ?, strftime('%s','now'),?);"
   pid (get-host-name) port priority (conc state) megatest-version interface)
  (list 
   "INSERT OR REPLACE INTO servers (pid,hostname,port,pubport,start_time,priority,state,mt_version,heartbeat,interface,transport)
                             VALUES(?,  ?,       ?,   ?,  strftime('%s','now'), ?, ?, ?, strftime('%s','now'),?,?);"
   pid (get-host-name) port pubport priority (conc state) megatest-version interface (conc transport))
  (vector 
   (tasks:server-get-server-id mdb (get-host-name) interface port pid)
   interface
   port
   pubport
   transport
   ))

;; NB// two servers with same pid on different hosts will be removed from the list if pid: is used!
(define (tasks:server-deregister mdb hostname #!key (port #f)(pid #f)(action 'markdead))
  (debug:print-info 11 "server-deregister " hostname ", port " port ", pid " pid)
  (if pid
      (case action
251
252
253
254
255
256
257
258
259


260
261

262
263
264
265
266
267
268
263
264
265
266
267
268
269


270
271
272

273
274
275
276
277
278
279
280







-
-
+
+

-
+







		(debug:print 0 "WARNING: Can't kill frozen server on remote host " hostname))))))



(define (tasks:get-all-servers mdb)
  (let ((res '()))
    (sqlite3:for-each-row
     (lambda (id pid hostname interface port start-time priority state mt-version last-update)
       (set! res (cons (vector id pid hostname interface port start-time priority state mt-version last-update) res)))
     (lambda (id pid hostname interface port pubport start-time priority state mt-version last-update transport)
       (set! res (cons (vector id pid hostname interface port pubport start-time priority state mt-version last-update transport) res)))
     mdb
     "SELECT id,pid,hostname,interface,port,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update FROM servers ORDER BY start_time DESC;")
     "SELECT id,pid,hostname,interface,port,pubport,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update,transport FROM servers ORDER BY start_time DESC;")
    res))
       

;;======================================================================
;; Tasks and Task monitors
;;======================================================================

Added zmq-transport.scm version [84997153bf].




























































































































































































































































































































































































































































































1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+

;; Copyright 2006-2012, Matthew Welland.
;; 
;;  This program is made available under the GNU GPL version 2.0 or
;;  greater. See the accompanying file COPYING for details.
;; 
;;  This program is distributed WITHOUT ANY WARRANTY; without even the
;;  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
;;  PURPOSE.

(require-extension (srfi 18) extras tcp s11n)

(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest)
(import (prefix sqlite3 sqlite3:))

(use zmq)

(declare (unit zmq-transport))

(declare (uses common))
(declare (uses db))
(declare (uses tests))
(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running.
(declare (uses server))

(include "common_records.scm")
(include "db_records.scm")

;; Transition to pub --> sub with pull <-- push
;;
;;   1. client sends request to server via push to the pull port
;;   2. server puts request in queue or processes immediately as appropriate
;;   3. server puts responses from completed requests into pub port 
;;
;; TODO
;;
;; Done Tested
;; [x]  [ ]    1. Add columns pullport pubport to servers table
;; [x]  [ ]    2. Add rm of monitor.db if older than 11/12/2012 
;; [x]  [ ]    3. Add create of pullport and pubport with finding of available ports
;; [x]  [ ]    4. Add client compose of request
;; [x]  [ ]        - name of client: testname/itempath-test_id-hostname 
;; [x]  [ ]        - name of request: callname, params
;; [x]  [ ]        - request key: f(clientname, callname, params)
;; [x]  [ ]    5. Add processing of subscription hits
;; [x]  [ ]        - done when get key 
;; [x]  [ ]        - return results
;; [x]  [ ]    6. Add timeout processing
;; [x]  [ ]        - after 60 seconds
;; [ ]  [ ]            i. check server alive, connect to new if necessary
;; [ ]  [ ]           ii. resend request
;; [ ]  [ ]    7. Turn self ping back on

(define (zmq-transport:make-server-url hostport)
  (if (not hostport)
      #f
      (conc "tcp://" (car hostport) ":" (cadr hostport))))

(define  *server-loop-heart-beat* (current-seconds))
(define *heartbeat-mutex* (make-mutex))

;;======================================================================
;; S E R V E R
;;======================================================================

(define-inline (zmqsock:get-pub  dat)(vector-ref dat 0))
(define-inline (zmqsock:get-pull dat)(vector-ref dat 1))
(define-inline (zmqsock:set-pub! dat s)(vector-set! dat s 0))
(define-inline (zmqsock:set-pull! dat s)(vector-set! dat s 0))

(define (zmq-transport:run hostn)
  (debug:print 2 "Attempting to start the server ...")
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting")
	    (exit))))
  (let* ((zmq-sdat1       #f)
	 (zmq-sdat2       #f)
	 (pull-socket     #f)
	 (pub-socket      #f)
	 (p1              #f)
	 (p2              #f)
	 (zmq-sockets-dat #f)
	 (iface           (if (string=? "-" hostn)
			      "*" ;; (get-host-name) 
			      hostn))
	 (hostname        (get-host-name))
	 (ipaddrstr       (let ((ipstr (if (string=? "-" hostn)
					   (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
					   #f)))
			    (if ipstr ipstr hostname)))
	 (last-run       0))
    (set! zmq-sockets-dat (zmq-transport:setup-ports ipaddrstr (if (args:get-arg "-port")
			    (string->number (args:get-arg "-port"))
							    (+ 5000 (random 1001)))))

    (set! zmq-sdat1    (car   zmq-sockets-dat))
    (set! pull-socket  (cadr  zmq-sdat1)) ;; (iface s  port)
    (set! p1           (caddr zmq-sdat1))
    
    (set! zmq-sdat2    (cadr  zmq-sockets-dat))
    (set! pub-socket   (cadr  zmq-sdat2))
    (set! p2           (caddr zmq-sdat2))

    (set! *cache-on* #t)

    ;; what to do when we quit
    ;;
;;     (on-exit (lambda ()
;; 	       (if (and *toppath* *server-info*)
;; 		   (open-run-close tasks:server-deregister-self tasks:open-db (car *server-info*))
;; 		   (let loop () 
;; 		     (let ((queue-len 0))
;; 		       (thread-sleep! (random 5))
;; 		       (mutex-lock! *incoming-mutex*)
;; 		       (set! queue-len (length *incoming-data*))
;; 		       (mutex-unlock! *incoming-mutex*)
;; 		       (if (> queue-len 0)
;; 			   (begin
;; 			     (debug:print-info 0 "Queue not flushed, waiting ...")
;; 			     (loop))))))))

    ;; The heavy lifting
    ;;
    ;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime
    ;;
    (let loop ((queue-lst '()))
      (let* ((rawmsg (receive-message* pull-socket))
	     (packet (db:string->obj rawmsg))
					  (qtype  (cdb:packet-get-qtype packet)))
				     (debug:print-info 12 "server=> received packet=" packet)
				     (if (not (member qtype '(sync ping)))
					 (begin
					   (mutex-lock! *heartbeat-mutex*)
					   (set! *last-db-access* (current-seconds))
					   (mutex-unlock! *heartbeat-mutex*)))
	(if #t ;; (cdb:packet-get-immediate packet) ;; process immediately or put in queue
	    (begin
	      (open-run-close db:process-queue #f pub-socket (cons packet queue-lst))
	      (loop '()))
	    (loop (cons packet queue-lst)))))))

;; run zmq-transport:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (zmq-transport:keep-running)
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  ;; This thread waits for the server to come alive
  (let* ((server-info (let loop ()
			(let ((sdat #f))
			  (mutex-lock! *heartbeat-mutex*)
			  (set! sdat *server-info*)
			  (mutex-unlock! *heartbeat-mutex*)
			  (if sdat sdat
			      (begin
				(sleep 4)
				(loop))))))
	 (iface       (cadr server-info))
	 (pullport    (caddr server-info))
	 (pubport     (cadddr server-info)) ;; id interface pullport pubport)
	 (zmq-sockets (zmq-transport:client-connect iface pullport pubport))
	 (last-access 0))
    (let loop ((count 0))
      (thread-sleep! 4) ;; no need to do this very often
      ;; NB// sync currently does NOT return queue-length
      (let ((queue-len (cdb:client-call zmq-sockets 'sync #t 1)))
      ;; (print "Server running, count is " count)
	(if (< count 1) ;; 3x3 = 9 secs aprox
	    (loop (+ count 1)))

	;; NOTE: Get rid of this mechanism! It really is not needed...
	(open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info))

	;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access
	(mutex-lock! *heartbeat-mutex*)
	(set! last-access *last-db-access*)
	(mutex-unlock! *heartbeat-mutex*)
	(if (> (+ last-access
		  ;; (* 50 60 60)    ;; 48 hrs
		  ;; 60              ;; one minute
		  ;; (* 60 60)       ;; one hour
		  (* 45 60)          ;; 45 minutes, until the db deletion bug is fixed.
		  )
	       (current-seconds))
	    (begin
	      (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access))
	      (loop 0))
	    (begin
	      (debug:print-info 0 "Starting to shutdown the server.")
	      ;; need to delete only *my* server entry (future use)
	      (set! *time-to-exit* #t)
	      (open-run-close tasks:server-deregister-self tasks:open-db (get-host-name))
	      (thread-sleep! 1)
	      (debug:print-info 0 "Max cached queries was " *max-cache-size*)
	      (debug:print-info 0 "Server shutdown complete. Exiting")
	      (exit)))))))

(define (zmq-transport:find-free-port-and-open iface s port stype #!key (trynum 50))
  (let ((s (if s s (make-socket stype)))
        (p (if (number? port) port 5555))
        (old-handler (current-exception-handler)))
    (handle-exceptions
     exn
     (begin
       (debug:print 0 "Failed to bind to port " p ", trying next port")
       (debug:print 0 "   EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
       ;; (old-handler)
       ;; (print-call-chain)
       (if (> trynum 0)
           (zmq-transport:find-free-port-and-open iface s (+ p 1) trynum: (- trynum 1))
           (debug:print-info 0 "Tried ports up to " p 
                             " but all were in use. Please try a different port range by starting the server with parameter \" -port N\" where N is the starting port number to use"))
       (exit)) ;; To exit or not? That is the question.
     (let ((zmq-url (conc "tcp://" iface ":" p)))
       (debug:print 2 "Trying to start server on " zmq-url)
       (bind-socket s zmq-url)
       (list iface s port)))))

(define (zmq-transport:setup-ports ipaddrstr startport)
  (let* ((s1 (zmq-transport:find-free-port-and-open ipaddrstr #f startport 'pull))
         (p1 (caddr s1))
         (s2 (zmq-transport:find-free-port-and-open ipaddrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pub))
         (p2 (caddr s2)))
    (set! *runremote* #f)
    (debug:print 0 "Server started on " ipaddrstr " ports " p1 " and " p2)
    (mutex-lock! *heartbeat-mutex*)
    (set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) ipaddrstr p1 p2 0 'live 'zmq))
    (mutex-unlock! *heartbeat-mutex*)
    (list s1 s2)))

(define (zmq-transport:mk-signature)
  (message-digest-string (md5-primitive) 
			 (with-output-to-string
			   (lambda ()
			     (write (list (current-directory)
					  (argv)))))))

;;======================================================================
;; S E R V E R   U T I L I T I E S 
;;======================================================================

;;======================================================================
;; C L I E N T S
;;======================================================================

;; 
(define (zmq-transport:client-socket-connect iface port #!key (context #f)(type 'req)(subscriptions '()))
  (debug:print-info 3 "client-connect " iface ":" port ", type=" type ", subscriptions=" subscriptions)
  (let ((connect-ok #f)
	(zmq-socket (if context 
			(make-socket type context)
			(make-socket type)))
	(conurl     (zmq-transport:make-server-url (list iface port))))
    (if (socket? zmq-socket)
     (begin
	  ;; first apply subscriptions
	  (for-each (lambda (subscription)
		      (debug:print 2 "Subscribing to " subscription)
		      (socket-option-set! zmq-socket 'subscribe subscription))
		    subscriptions)
	  (connect-socket zmq-socket conurl)
	  zmq-socket)
	(begin
	  (debug:print 0 "ERROR: Failed to open socket to " conurl)
	  #f))))

(define (zmq-transport:client-connect iface pullport pubport)
  (let* ((push-socket (zmq-transport:client-socket-connect iface pullport type: 'push))
	 (sub-socket  (zmq-transport:client-socket-connect iface pubport
						    type: 'sub
						    subscriptions: (list (zmq-transport:get-client-signature) "all")))
	 (zmq-sockets (vector push-socket sub-socket))
	 (login-res   #f))
    (set! login-res (zmq-transport:client-login zmq-sockets))
    (if (and (not (null? login-res))
	     (car login-res))
	(begin
	  (debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".")
	  (set! *runremote* zmq-sockets)
	  zmq-sockets)
	(begin
	  (debug:print-info 2 "Failed to login or connect to " conurl)
	  (set! *runremote* #f)
	  #f))))

;; run zmq-transport:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (zmq-transport:keep-running)
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  ;; This thread waits for the server to come alive
  (let* ((server-info (let loop ()
                        (let ((sdat #f))
                          (mutex-lock! *heartbeat-mutex*)
                          (set! sdat *runremote*)
                          (mutex-unlock! *heartbeat-mutex*)
                          (if sdat sdat
                              (begin
                                (sleep 4)
                                (loop))))))
         (iface       (car server-info))
         (port        (cadr server-info))
         (last-access 0)
	 (tdb         (tasks:open-db))
	 (spid        (tasks:server-get-server-id tdb #f iface port #f)))
    (print "Keep-running got server pid " spid ", using iface " iface " and port " port)
    (let loop ((count 0))
      (thread-sleep! 4) ;; no need to do this very often
      ;; NB// sync currently does NOT return queue-length
      (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1)))
      ;; (print "Server running, count is " count)
        (if (< count 1) ;; 3x3 = 9 secs aprox
            (loop (+ count 1)))
        
        ;; NOTE: Get rid of this mechanism! It really is not needed...
        (tasks:server-update-heartbeat tdb spid)
      
        ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access
        (mutex-lock! *heartbeat-mutex*)
        (set! last-access *last-db-access*)
        (mutex-unlock! *heartbeat-mutex*)
        (if (> (+ last-access
                  ;; (* 50 60 60)    ;; 48 hrs
                  ;; 60              ;; one minute
                  ;; (* 60 60)       ;; one hour
                  (* 45 60)          ;; 45 minutes, until the db deletion bug is fixed.
                  )
               (current-seconds))
            (begin
              (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access))
              (loop 0))
            (begin
              (debug:print-info 0 "Starting to shutdown the server.")
              ;; need to delete only *my* server entry (future use)
              (set! *time-to-exit* #t)
              (tasks:server-deregister-self tdb (get-host-name))
              (thread-sleep! 1)
              (debug:print-info 0 "Max cached queries was " *max-cache-size*)
              (debug:print-info 0 "Server shutdown complete. Exiting")
              (exit)))))))

;; all routes though here end in exit ...
(define (zmq-transport:launch)
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: cannot find megatest.config, exiting")
	    (exit))))
  (debug:print-info 2 "Starting zmq server")
  (if *toppath* 
      (let* (;; (th1 (make-thread (lambda ()
	     ;;      	       (let ((server-info #f))
	     ;;      		 ;; wait for the server to be online and available
	     ;;      		 (let loop ()
	     ;;			   (debug:print-info 2 "Waiting for the server to come online before starting heartbeat")
	     ;;      		   (thread-sleep! 2)
	     ;;      		   (mutex-lock! *heartbeat-mutex*)
	     ;;      		   (set! server-info *server-info* )
	     ;;      		   (mutex-unlock! *heartbeat-mutex*)
	     ;;      		   (if (not server-info)(loop)))
	     ;;			 (debug:print 2 "Server alive, starting self-ping")
	     ;;      		 (zmq-transport:self-ping server-info)
	     ;;      		 ))
	     ;;      	     "Self ping"))
	     (th2 (make-thread (lambda ()
				 (zmq-transport:run 
				  (if (args:get-arg "-server")
				      (args:get-arg "-server")
				      "-"))) "Server run"))
	     (th3 (make-thread (lambda ()(zmq-transport:keep-running)) "Keep running"))
	     )
	(set! *client-non-blocking-mode* #t)
	;; (thread-start! th1)
	(thread-start! th2)
	(thread-start! th3)
	(set! *didsomething* #t)
	;; (thread-join! th3)
	(thread-join! th2)
	)
      (debug:print 0 "ERROR: Failed to setup for megatest")))

(define (zmq-transport:client-signal-handler signum)
  (handle-exceptions
   exn
   (debug:print " ... exiting ...")
   (let ((th1 (make-thread (lambda ()
			     (if (not *received-response*)
				 (receive-message* *runremote*))) ;; flush out last call if applicable
			   "eat response"))
	 (th2 (make-thread (lambda ()
			     (debug:print 0 "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.")
			     (thread-sleep! 3) ;; give the flush three seconds to do it's stuff
			     (debug:print 0 "       Done.")
			     (exit 4))
			   "exit on ^C timer")))
     (thread-start! th2)
     (thread-start! th1)
     (thread-join! th2))))

(define (zmq-transport:client-launch)
  (set-signal-handler! signal/int zmq-transport:client-signal-handler)
   (if (zmq-transport:client-setup)
       (debug:print-info 2 "connected as client")
       (begin
	 (debug:print 0 "ERROR: Failed to connect as client")
	 (exit))))

;;======================================================================
;; Defunct functions
;;======================================================================

;; ping a server and return number of clients or #f (if no response)
;; NOT IN USE!
(define (zmq-transport:ping host port #!key (secs 10)(return-socket #f))
  (cdb:use-non-blocking-mode
   (lambda ()
     (let* ((res #f)
	    (th1 (make-thread
		  (lambda ()
		    (let* ((zmq-context (make-context 1))
			   (zmq-socket  (zmq-transport:client-connect host port context: zmq-context)))
		      (if zmq-socket
			  (if (zmq-transport:client-login zmq-socket)
			      (let ((numclients (cdb:num-clients zmq-socket)))
				(if (not return-socket)
				    (begin
				      (zmq-transport:client-logout zmq-socket)
				      (close-socket  zmq-socket)))
				(set! res (list #t numclients (if return-socket zmq-socket #f))))
			      (begin
				;; (close-socket zmq-socket)
				(set! res (list #f "CAN'T LOGIN" #f))))
			  (set! res (list #f "CAN'T CONNECT" #f)))))
		  "Ping: th1"))
	    (th2 (make-thread
		  (lambda ()
		    (let loop ((count 1))
		      (debug:print-info 1 "Ping " count " server on " host " at port " port)
		      (thread-sleep! 2)
		      (if (< count (/ secs 2))
			  (loop (+ count 1))))
		    ;; (thread-terminate! th1)
		    (set! res (list #f "TIMED OUT" #f)))
		  "Ping: th2")))
       (thread-start! th2)
       (thread-start! th1)
       (handle-exceptions
	exn
	(set! res (list #f "TIMED OUT" #f))
	(thread-join! th1 secs))
       res))))

;; (define (zmq-transport:self-ping server-info)
;;   ;; server-info: server-id interface pullport pubport
;;   (let ((iface    (list-ref server-info 1))
;; 	(pullport (list-ref server-info 2))
;; 	(pubport  (list-ref server-info 3)))
;;     (zmq-transport:client-connect iface pullport pubport)
;;     (let loop ()
;;       (thread-sleep! 2)
;;       (cdb:client-call *runremote* 'ping #t)
;;       (debug:print 4 "zmq-transport:self-ping - I'm alive on " iface ":" pullport "/" pubport "!")
;;       (mutex-lock! *heartbeat-mutex*)
;;       (set! *server-loop-heart-beat* (current-seconds))
;;       (mutex-unlock! *heartbeat-mutex*)
;;       (loop))))
    
(define (zmq-transport:reply pubsock target query-sig success/fail result)
  (debug:print-info 11 "zmq-transport:reply target=" target ", result=" result)
  (send-message pubsock target send-more: #t)
  (send-message pubsock (db:obj->string (vector success/fail query-sig result))))