Megatest

Check-in [81e546a994]
Login
Overview
Comment:Partially complete, just taking a snapshot
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | interleaved-queries
Files: files | file ages | folders
SHA1: 81e546a9941b46d488261a20f51601a94cf18fe9
User & Date: matt on 2012-11-15 23:42:43
Other Links: branch diff | manifest | tags
Context
2012-11-16
01:19
(no comment) check-in: d1245270ea user: matt tags: interleaved-queries
2012-11-15
23:42
Partially complete, just taking a snapshot check-in: 81e546a994 user: matt tags: interleaved-queries
01:44
Bit's 'n pieces check-in: d0462389b2 user: matt tags: interleaved-queries
Changes

Modified db.scm from [b0135e31f1] to [c6e157479f].

9
10
11
12
13
14
15
16
17


18
19

20
21
22
23
24
25
26
9
10
11
12
13
14
15


16
17
18

19
20
21
22
23
24
25
26







-
-
+
+

-
+







;;  PURPOSE.
;;======================================================================

;;======================================================================
;; Database access
;;======================================================================

(require-extension (srfi 18) extras tcp rpc)
(import (prefix rpc rpc:))
(require-extension (srfi 18) extras tcp) ;;  rpc)
;; (import (prefix rpc rpc:))

(use sqlite3 srfi-1 posix regex regex-case srfi-69 csv-xml s11n)
(use sqlite3 srfi-1 posix regex regex-case srfi-69 csv-xml s11n md5 message-digest)
(import (prefix sqlite3 sqlite3:))

(use zmq)

(declare (unit db))
(declare (uses common))
(declare (uses keys))
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197



1198
1199
1200
1201
1202
1203
1204
1188
1189
1190
1191
1192
1193
1194



1195
1196
1197
1198
1199
1200
1201
1202
1203
1204







-
-
-
+
+
+







  (let* ((push-socket (vector-ref zmq-sockets 0))
	 (sub-socket  (vector-ref zmq-sockets 1))
	 (query-id (conc (server:get-client-signature) "-" (message-digest-string (md5-primitive) (conc params))))
	 (zdat (db:obj->string (vector query-id params))) ;; (with-output-to-string (lambda ()(serialize params))))
	 (res  #f)
	 (get-res (lambda ()
		    (db:string->obj (if *client-non-blocking-mode* 
					(receive-message* zmq-socket)
					(receive-message  zmq-socket))))))
    (send-message zmq-socket zdat)
					(receive-message* sub-socket)
					(receive-message  sub-socket))))))
    (send-message push-socket zdat)
    (let loop ((res (get-res)))
      (if res res
	  (begin 
	    (thread-sleep! 0.5)
	    (get-res))))))
  
(define (cdb:set-verbosity zmq-socket val)

Modified server.scm from [129e174847] to [01f2819585].

1
2
3
4
5
6
7
8
9
10
11
12


13
14
15
16
17
18
19
1
2
3
4
5
6
7
8
9
10


11
12
13
14
15
16
17
18
19










-
-
+
+








;; Copyright 2006-2012, Matthew Welland.
;; 
;;  This program is made available under the GNU GPL version 2.0 or
;;  greater. See the accompanying file COPYING for details.
;; 
;;  This program is distributed WITHOUT ANY WARRANTY; without even the
;;  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
;;  PURPOSE.

(require-extension (srfi 18) extras tcp rpc s11n)
(import (prefix rpc rpc:))
(require-extension (srfi 18) extras tcp s11n)
;; (import (prefix rpc rpc:))

(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest)
(import (prefix sqlite3 sqlite3:))

(use zmq)

(declare (unit server))
45
46
47
48
49
50
51

52
53
54
55
56
57
58
59
60
61
62






63
64
65
66


67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87


88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104


105
106
107
108


109
110
111
112
113
114
115
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61


62
63
64
65
66
67
68
69


70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90


91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107


108
109
110
111


112
113
114
115
116
117
118
119
120







+









-
-
+
+
+
+
+
+


-
-
+
+



















-
-
+
+















-
-
+
+


-
-
+
+







;; [ ]  [ ]    5. Add processing of subscription hits
;; [ ]  [ ]        - done when get key 
;; [ ]  [ ]        - return results
;; [ ]  [ ]    6. Add timeout processing
;; [ ]  [ ]        - after 60 seconds
;; [ ]  [ ]            i. check server alive, connect to new if necessary
;; [ ]  [ ]           ii. resend request
;; [ ]  [ ]    7. Turn self ping back on

(define (server:make-server-url hostport)
  (if (not hostport)
      #f
      (conc "tcp://" (car hostport) ":" (cadr hostport))))

(define  *server-loop-heart-beat* (current-seconds))
(define *heartbeat-mutex* (make-mutex))

(define (server:self-ping iface port)
  (let ((zsocket (server:client-connect iface port)))
(define (server:self-ping server-info)
  ;; server-info: server-id interface pullport pubport
  (let ((iface    (list-ref server-info 1))
	(pullport (list-ref server-info 2))
	(pubport  (list-ref server-info 3)))
    (server:client-connect iface pullport pubport)
    (let loop ()
      (thread-sleep! 2)
      (cdb:client-call zsocket 'ping #t)
      (debug:print 4 "server:self-ping - I'm alive on " iface ":" port "!")
      (cdb:client-call *runremote* 'ping #t)
      (debug:print 4 "server:self-ping - I'm alive on " iface ":" pullport "/" pubport "!")
      (mutex-lock! *heartbeat-mutex*)
      (set! *server-loop-heart-beat* (current-seconds))
      (mutex-unlock! *heartbeat-mutex*)
      (loop))))
    
(define-inline (zmqsock:get-pub  dat)(vector-ref dat 0))
(define-inline (zmqsock:get-pull dat)(vector-ref dat 1))
(define-inline (zmqsock:set-pub! dat s)(vector-set! dat s 0))
(define-inline (zmqsock:set-pull! dat s)(vector-set! dat s 0))

(define (server:run hostn)
  (debug:print 0 "Attempting to start the server ...")
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting")
	    (exit))))
  (let* ((zmq-sdat1       #f)
	 (zmq-sdat2       #f)
	 (zmq-socket1     #f)
	 (zmq-socket2     #f)
	 (pull-socket     #f)
	 (pub-socket      #f)
	 (p1              #f)
	 (p2              #f)
	 (zmq-sockets-dat #f)
	 (iface           (if (string=? "-" hostn)
			      "*" ;; (get-host-name) 
			      hostn))
	 (hostname        (get-host-name))
	 (ipaddrstr       (let ((ipstr (if (string=? "-" hostn)
					   (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
					   #f)))
			    (if ipstr ipstr hostname))))
    (set! zmq-sockets-dat (server:setup-ports ipaddrstr (if (args:get-arg "-port")
							    (string->number (args:get-arg "-port"))
							    (+ 5000 (random 1001)))))

    (set! zmq-sdat1    (car   zmq-socket-dat))
    (set! zmq-socket1  (car   zmq-sdat1))
    (set! zmq-sdat1    (car   zmq-sockets-dat))
    (set! pull-socket  (cadr  zmq-sdat1)) ;; (iface s  port)
    (set! p1           (caddr zmq-sdat1))
    
    (set! zmq-sdat2    (cadr  zmq-socket-dat))
    (set! zmq-socket2  (car   zmq-sdat2))
    (set! zmq-sdat2    (cadr  zmq-sockets-dat))
    (set! pub-socket   (cadr  zmq-sdat2))
    (set! p2           (caddr zmq-sdat2))

    (set! *cache-on* #t)

    ;; (set! th1 (make-thread (lambda ()
    ;;     		     (server:self-ping ipaddrstr actual-port))))
    ;; (thread-start! th1)
130
131
132
133
134
135
136
137

138
139
140
141
142



143
144
145
146
147
148
149
135
136
137
138
139
140
141

142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157







-
+





+
+
+







			   (begin
			     (debug:print-info 0 "Queue not flushed, waiting ...")
			     (loop))))))))

    ;; The heavy lifting
    ;;
    (let loop ()
      (let* ((rawmsg (receive-message* zmq-socket1))
      (let* ((rawmsg (receive-message* pull-socket))
	     (params (db:string->obj rawmsg)) ;; (with-input-from-string rawmsg (lambda ()(deserialize))))
	     (res    #f))
	(debug:print-info 12 "server=> received params=" params)
	(set! res (cdb:cached-access params))
	(debug:print-info 12 "server=> processed res=" res)

	;; need address here
	;;
	(send-message zmq-socket (db:obj->string res))
	(if (not *time-to-exit*)
	    (loop)
	    (begin
	      (open-run-close tasks:server-deregister-self tasks:open-db #f)
	      (db:write-cached-data)
	      (exit)
175
176
177
178
179
180
181
182

183
184
185
186
187
188
189
183
184
185
186
187
188
189

190
191
192
193
194
195
196
197







-
+







	  (mutex-unlock! *heartbeat-mutex*)
	  ;; The logic here is that if the server loop gets stuck blocked in working
	  ;; we don't want to update our heartbeat
	  (set! pulse (- (current-seconds) server-loop-heartbeat))
	  (debug:print-info 2 "Heartbeat period is " pulse " seconds on " (cadr server-info) ":" (caddr server-info) ", last db access is " (- (current-seconds) *last-db-access*) " seconds ago")
	  (if (> pulse 15) ;; must stay less than 10 seconds 
	      (begin
		(open-run-close tasks:server-deregister tasks:open-db (cadr server-info) port: (caddr server-info))
		(open-run-close tasks:server-deregister tasks:open-db (cadr server-info) pullport: (caddr server-info))
		(debug:print 0 "ERROR: Heartbeat failed, committing servercide")
		(exit))
	      (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info)))
	  ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access
	  (if (> (+ *last-db-access* 
		    ;; (* 48 60 60)    ;; 48 hrs
		    ;; 60              ;; one minute
221
222
223
224
225
226
227
228
229


230
231

232
233
234

235
236

237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254

255
256
257
258
259
260
261
262




263
264
265
266
267
268
269
270
271
272
273
274
275


















276
277
278
279
280
281
282
229
230
231
232
233
234
235


236
237
238

239
240
241

242
243

244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261

262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278

279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311







-
-
+
+

-
+


-
+

-
+

















-
+








+
+
+
+




-








+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+







			     " but all were in use. Please try a different port range by starting the server with parameter \" -port N\" where N is the starting port number to use"))
       (exit)) ;; To exit or not? That is the question.
     (let ((zmq-url (conc "tcp://" iface ":" p)))
       (debug:print 0 "Trying to start server on " zmq-url)
       (bind-socket s zmq-url)
       (list iface s port)))))

(define (server:setup-ports ipadrstr startport)
  (let* ((s1 (server:find-free-port-and-open ipadrstr #f startport 'pub))
(define (server:setup-ports ipaddrstr startport)
  (let* ((s1 (server:find-free-port-and-open ipaddrstr #f startport 'pub))
	 (p1 (caddr s1))
	 (s2 (server:find-free-port-and-open ipadrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pull))
	 (s2 (server:find-free-port-and-open ipaddrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pull))
	 (p2 (caddr s2)))
    (set! *runremote* #f)
    (debug:print 0 "Server started on " ipaddrstr " ports " p1 " and p2")
    (debug:print 0 "Server started on " ipaddrstr " ports " p1 " and " p2)
    (mutex-lock! *heartbeat-mutex*)
    (set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) iface p 0 'live))
    (set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) ipaddrstr p1 p2 0 'live))
    (mutex-unlock! *heartbeat-mutex*)
    (list s1 s2)))

(define (server:mk-signature)
  (message-digest-string (md5-primitive) 
			 (with-output-to-string
			   (lambda ()
			     (write (list (current-directory)
					  (argv)))))))

(define (server:get-client-signature)
  (if *my-client-signature* *my-client-signature*
      (let ((sig (server:mk-signature)))
	(set! *my-client-signature* sig)
	*my-client-signature*)))

;; 
(define (server:client-connect iface port #!key (context #f)(type 'req))
(define (server:client-socket-connect iface port #!key (context #f)(type 'req)(subscriptions '()))
  (debug:print-info 3 "client-connect " iface ":" port)
  (let ((connect-ok #f)
	(zmq-socket (if context 
			(make-socket type context)
			(make-socket type)))
	(conurl     (server:make-server-url (list iface port))))
    (if (socket? zmq-socket)
	(begin
	  ;; first apply subscriptions
	  (for-each (lambda (subscription)
		      (socket-options-set! zmq-socket 'subscribe subscription))
		    subscriptions)
	  (connect-socket zmq-socket conurl)
	  zmq-socket)
	#f)))
  

(define (server:client-login zmq-sockets)
  (cdb:login zmq-sockets *toppath* (server:get-client-signature)))

(define (server:client-logout zmq-socket)
  (let ((ok (and (socket? zmq-socket)
		 (cdb:logout zmq-socket *toppath* (server:get-client-signature)))))
    ;; (close-socket zmq-socket)
    ok))

(define (server:client-connect iface pullport pubport)
  (let* ((push-socket (server:client-socket-connect iface pullport 'push))
	 (sub-socket  (server:client-socket-connect iface pubport 'sub
						    subscriptions: (list (server:get-client-signature) "all")))
	 (zmq-sockets (vector push-socket sub-socket))
	 (login-res   #f))
    (set! login-res (server:client-login zmq-sockets))
    (if (and (not (null? login-res))
	     (car login-res))
	(begin
	  (debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".")
	  (set! *runremote* zmq-socket)
	  #t)
	(begin
	  (debug:print-info 2 "Failed to login or connect to " conurl)
	  (set! *runremote* #f)
	  #f))))

;; Do all the connection work, start a server if not already running
(define (server:client-setup #!key (numtries 50))
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: failed to find megatest.config, exiting")
292
293
294
295
296
297
298
299

300
301
302

303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
321
322
323
324
325
326
327

328
329
330

331

















332
333
334
335
336
337
338







-
+


-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-







	   exn
	   (begin
	     ;; something went wrong in connecting to the server. In this scenario it is ok
	     ;; to try again
	     (debug:print 0 "ERROR: Failed to open a connection to the server at: " hostinfo)
	     (debug:print 0 "   EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
	     (debug:print 0 "   perhaps jobs killed with -9? Removing server records")
	     (open-run-close tasks:server-deregister tasks:open-db host port: port)
	     (open-run-close tasks:server-deregister tasks:open-db host pullport: pullport)
	     (server:client-setup (- numtries 1))
	     #f)
	   (let* ((push-socket (server:client-connect iface pullport 'push))
	   (server:client-connect iface pullport pubport)))
		  (sub-socket  (server:client-connect iface pubport  'sub))
		  (zmq-sockets (vector push-socket sub-socket))
		  (login-res   #f)
		  ;; (connect-ok 
		  (conurl     (server:make-server-url (list iface port))))
	     (socket-option-set! sub-socket 'subscribe  (server:get-client-signature))
	     (set! login-res (server:client-login zmq-sockets))
	     (if (and (not (null? login-res))
		      (car login-res))
		 (begin
		   (debug:print-info 2 "Logged in and connected to " conurl)
		   (set! *runremote* zmq-socket)
		   #t)
		 (begin
		   (debug:print-info 2 "Failed to login or connect to " conurl)
		   (set! *runremote* #f)
		   #f)))))
	(if (> numtries 0)
	    (let ((exe (car (argv))))
	      (debug:print-info 1 "No server available, attempting to start one...")
	      (process-run exe (list "-server" "-" "-debug" (conc *verbosity*)))
	      (sleep 5) ;; give server time to start
	      ;; we are starting a server, do not try again! That can lead to 
	      ;; recursively starting many processes!!!
346
347
348
349
350
351
352
353


354
355
356
357
358
359
360
358
359
360
361
362
363
364

365
366
367
368
369
370
371
372
373







-
+
+







					   (debug:print-info 1 "Waiting for the server to come online before starting heartbeat")
					   (thread-sleep! 2)
					   (mutex-lock! *heartbeat-mutex*)
					   (set! server-info *server-info* )
					   (mutex-unlock! *heartbeat-mutex*)
					   (if (not server-info)(loop)))
					 (debug:print 1 "Server alive, starting self-ping")
					 (server:self-ping (cadr server-info)(caddr server-info)))) "Self ping"))
					 (server:self-ping server-info)))
				     "Self ping"))
		   (th2 (make-thread (lambda ()
				       (server:run (args:get-arg "-server"))) "Server run"))
		   (th3 (make-thread (lambda ()
				       (server:keep-running)) "Keep running")))
	      (set! *client-non-blocking-mode* #t)
	      (thread-start! th1)
	      (thread-start! th2)
369
370
371
372
373
374
375
376

377
378
379
380
381
382
383
382
383
384
385
386
387
388

389
390
391
392
393
394
395
396







-
+







   exn
   (debug:print " ... exiting ...")
   (let ((th1 (make-thread (lambda ()
			     (if (not *received-response*)
				 (receive-message* *runremote*))) ;; flush out last call if applicable
			   "eat response"))
	 (th2 (make-thread (lambda ()
			     (debug:print 0 "ERROR: Received ^C, attempting clean exit.")
			     (debug:print 0 "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.")
			     (thread-sleep! 3) ;; give the flush three seconds to do it's stuff
			     (debug:print 0 "       Done.")
			     (exit 4))
			   "exit on ^C timer")))
     (thread-start! th2)
     (thread-start! th1)
     (thread-join! th2))))

Modified tasks.scm from [90e00b3daa] to [3c212dbf03].

23
24
25
26
27
28
29


30
31




32
33
34
35
36
37
38
23
24
25
26
27
28
29
30
31


32
33
34
35
36
37
38
39
40
41
42







+
+
-
-
+
+
+
+







;;======================================================================

(define (tasks:open-db)
  (let* ((dbpath  (conc *toppath* "/monitor.db"))
	 (exists  (if (file-exists? dbpath)
		      ;; BUGGISHNESS: Remove this code in six months. Today is 11/13/2012
		      (if (< (file-change-time dbpath) 1352851396.0)
			  (begin
			    (debug:print 0 "NOTE: removing old db file " dbpath)
			  (begin (delete-file dbpath) #f)
			  #t) #t))
			    (delete-file dbpath)
			    #f)
			  #t)
		      #f))
	 (mdb     (sqlite3:open-database dbpath)) ;; (never-give-up-open-db dbpath))
	 (handler (make-busy-timeout 36000)))
    (sqlite3:set-busy-handler! mdb handler)
    (sqlite3:execute mdb (conc "PRAGMA synchronous = 0;"))
    (if (not exists)
	(begin
	  (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS tasks_queue (id INTEGER PRIMARY KEY,
61
62
63
64
65
66
67
68

69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86

87
88
89
90



91
92

93

94

95
96
97
98


99
100
101
102

103
104

105
106
107
108
109
110

111
112
113
114
115
116
117
118
119


120
121
122
123
124
125
126

127
128
129

130
131
132
133
134
135
136
65
66
67
68
69
70
71

72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89

90
91
92


93
94
95
96

97
98
99

100
101
102


103
104
105
106
107

108
109

110
111
112
113
114
115

116
117
118
119
120
121
122
123


124
125
126
127
128
129
130
131

132
133
134

135
136
137
138
139
140
141
142







-
+

















-
+


-
-
+
+
+

-
+

+
-
+


-
-
+
+



-
+

-
+





-
+







-
-
+
+






-
+


-
+







                                  pullport INTEGER,
                                  pubport  INTEGER,
                                  start_time TIMESTAMP,
                                  priority INTEGER,
                                  state TEXT,
                                  mt_version TEXT,
                                  heartbeat TIMESTAMP,
                               CONSTRAINT servers_constraint UNIQUE (pid,hostname,port));")
                               CONSTRAINT servers_constraint UNIQUE (pid,hostname,pullport,pubport));")
	  (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS clients (id INTEGER PRIMARY KEY,
                                  server_id INTEGER,
                                  pid INTEGER,
                                  hostname TEXT,
                                  cmdline TEXT,
                                  login_time TIMESTAMP,
                                  logout_time TIMESTAMP DEFAULT -1,
                                CONSTRAINT clients_constraint UNIQUE (pid,hostname));")
                                  
	  ))
    mdb))
    
;;======================================================================
;; Server and client management
;;======================================================================

;; state: 'live, 'shutting-down, 'dead
(define (tasks:server-register mdb pid interface port priority state)
(define (tasks:server-register mdb pid interface pullport pubport priority state)
  (sqlite3:execute 
   mdb 
   "INSERT OR REPLACE INTO servers (pid,hostname,port,start_time,priority,state,mt_version,heartbeat,interface) VALUES(?,?,?,strftime('%s','now'),?,?,?,strftime('%s','now'),?);"
   pid (get-host-name) port priority (conc state) megatest-version interface)
   "INSERT OR REPLACE INTO servers (pid,hostname,pullport,pubport,start_time,priority,state,mt_version,heartbeat,interface)
                             VALUES(?,  ?,       ?,       ?, strftime('%s','now'), ?, ?, ?, strftime('%s','now'),?);"
   pid (get-host-name) pullport pubport priority (conc state) megatest-version interface)
  (list 
   (tasks:server-get-server-id mdb (get-host-name) port pid)
   (tasks:server-get-server-id mdb (get-host-name) pullport pid)
   interface
   pullport
   port))
   pubport))

;; NB// two servers with same pid on different hosts will be removed from the list if pid: is used!
(define (tasks:server-deregister mdb hostname #!key (port #f)(pid #f))
  (debug:print-info 11 "server-deregister " hostname ", port " port ", pid " pid)
(define (tasks:server-deregister mdb hostname #!key (pullport #f)(pid #f))
  (debug:print-info 11 "server-deregister " hostname ", pullport " pullport ", pid " pid)
  (if pid
      ;; (sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid)
      (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE pid=?;" pid)
      (if port
      (if pullport
	  ;; (sqlite3:execute mdb "DELETE FROM servers WHERE  hostname=? AND port=?;" hostname port)
	  (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE hostname=? AND port=?;" hostname port)
	  (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE hostname=? AND pullport=?;" hostname pullport)
	  (debug:print 0 "ERROR: tasks:server-deregister called with neither pid nor port specified"))))

(define (tasks:server-deregister-self mdb hostname)
  (tasks:server-deregister mdb hostname pid: (current-process-id)))

(define (tasks:server-get-server-id mdb hostname port pid)
(define (tasks:server-get-server-id mdb hostname pullport pid)
  (let ((res #f))
    (sqlite3:for-each-row
     (lambda (id)
       (set! res id))
     mdb
     (if (and hostname  pid)
	 "SELECT id FROM servers WHERE hostname=? AND pid=?;"
	 "SELECT id FROM servers WHERE hostname=? AND port=?;")
     hostname (if pid pid port))
	 "SELECT id FROM servers WHERE hostname=? AND pullport=?;")
     hostname (if pid pid pullport))
    res))

(define (tasks:server-update-heartbeat mdb server-id)
  (sqlite3:execute mdb "UPDATE servers SET heartbeat=strftime('%s','now') WHERE id=?;" server-id))

;; alive servers keep the heartbeat field upto date with seconds every 6 or so seconds
(define (tasks:server-alive? mdb server-id #!key (hostname #f)(port #f)(pid #f))
(define (tasks:server-alive? mdb server-id #!key (hostname #f)(pullport #f)(pid #f))
  (let* ((server-id  (if server-id 
			 server-id
			 (tasks:server-get-server-id mdb hostname port pid)))
			 (tasks:server-get-server-id mdb hostname pullport pid)))
	 (heartbeat-delta 99e9))
    (sqlite3:for-each-row
     (lambda (delta)
       (set! heartbeat-delta delta))
     mdb "SELECT strftime('%s','now')-heartbeat FROM servers WHERE id=?;" server-id)
    (< heartbeat-delta 10)))

161
162
163
164
165
166
167
168
169
170



171
172
173
174
175
176
177
167
168
169
170
171
172
173



174
175
176
177
178
179
180
181
182
183







-
-
-
+
+
+








;; ping each server in the db and return first found that responds. 
;; remove any others. will not necessarily remove all!
(define (tasks:get-best-server mdb)
  (let ((res '())
	(best #f))
    (sqlite3:for-each-row
     (lambda (id hostname interface port pid)
       (set! res (cons (list hostname interface port pid) res))
       (debug:print-info 2 "Found existing server " hostname ":" port " registered in db"))
     (lambda (id hostname interface pullport pubport pid)
       (set! res (cons (list hostname interface pullport pubport pid) res))
       (debug:print-info 2 "Found existing server " hostname ":" pullport " registered in db"))
     mdb
     "SELECT id,hostname,interface,pullport,pubport,pid FROM servers WHERE state='live' AND mt_version=? ORDER BY start_time ASC LIMIT 1;" megatest-version)
    ;; (print "res=" res)
    (if (null? res) #f
	(let loop ((hed (car res))
		   (tal (cdr res)))
	  ;; (print "hed=" hed ", tal=" tal)
229
230
231
232
233
234
235
236
237


238
239

240
241
242
243
244
245
246
235
236
237
238
239
240
241


242
243
244

245
246
247
248
249
250
251
252







-
-
+
+

-
+







		  (thread-sleep! 5)                 ;; give it five seconds to die peacefully then do a brutal kill
		  (process-signal pid signal/kill)) 
		(debug:print 0 "WARNING: Can't kill frozen server on remote host " hostname))))))

(define (tasks:get-all-servers mdb)
  (let ((res '()))
    (sqlite3:for-each-row
     (lambda (id pid hostname interface port start-time priority state mt-version)
       (set! res (cons (vector id pid hostname interface port start-time priority state mt-version) res)))
     (lambda (id pid hostname interface pullport pubport start-time priority state mt-version)
       (set! res (cons (vector id pid hostname interface pullport pubport start-time priority state mt-version) res)))
     mdb
     "SELECT id,pid,hostname,interface,port,start_time,priority,state,mt_version FROM servers ORDER BY start_time DESC;")
     "SELECT id,pid,hostname,interface,pullport,pubport,start_time,priority,state,mt_version FROM servers ORDER BY start_time DESC;")
    res))
       

;;======================================================================
;; Tasks and Task monitors
;;======================================================================