Megatest

Check-in [12d923326a]
Login
Overview
Comment:Heartbeat monitoring, on-the-fly server starting all working in simple manual testing
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: 12d923326abacc7e1f37a17e04fd6d97183330f5
User & Date: matt on 2012-11-03 17:11:18
Other Links: manifest | tags
Context
2012-11-03
17:18
Added more instrumentation info on server pulse. Removed rm of monitor.db in Makefile check-in: 04c31891a9 user: matt tags: trunk
17:11
Heartbeat monitoring, on-the-fly server starting all working in simple manual testing check-in: 12d923326a user: matt tags: trunk
2012-11-02
18:33
borked server heartbeat logic check-in: ece909ab1c user: mrwellan tags: trunk
Changes

Modified db.scm from [0f75487810] to [7d7e161003].

1145
1146
1147
1148
1149
1150
1151


1152
1153
1154
1155
1156
1157
1158
	   (set! *time-to-exit* #t)
	   #t)
	  ((set-verbosity)
	   (set! *verbosity* (caddr params))
	   *verbosity*)
	  ((get-verbosity)
	   *verbosity*)


	  (else
	   (mutex-lock! *incoming-mutex*)
	   (set! *last-db-access* (current-seconds))
	   (set! *incoming-data* (cons 
				  (vector qry-name
					  (current-milliseconds)
					  remparam)







>
>







1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
	   (set! *time-to-exit* #t)
	   #t)
	  ((set-verbosity)
	   (set! *verbosity* (caddr params))
	   *verbosity*)
	  ((get-verbosity)
	   *verbosity*)
	  ((ping)
	   'hi)
	  (else
	   (mutex-lock! *incoming-mutex*)
	   (set! *last-db-access* (current-seconds))
	   (set! *incoming-data* (cons 
				  (vector qry-name
					  (current-milliseconds)
					  remparam)
1174
1175
1176
1177
1178
1179
1180

1181
1182
1183
1184
1185
1186
1187

(define (cdb:use-non-blocking-mode proc)
  (set! *client-non-blocking-mode* #t)
  (let ((res (proc)))
    (set! *client-non-blocking-mode* #f)
    res))
  

(define (cdb:client-call zmq-socket . params)
  (debug:print-info 11 "cdb:client-call zmq-socket=" zmq-socket " params=" params)
  (let ((zdat (db:obj->string params)) ;; (with-output-to-string (lambda ()(serialize params))))
	(res  #f))
    (send-message zmq-socket zdat)
    (set! res (db:string->obj (if *client-non-blocking-mode* 
				  (receive-message* zmq-socket zdat)







>







1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190

(define (cdb:use-non-blocking-mode proc)
  (set! *client-non-blocking-mode* #t)
  (let ((res (proc)))
    (set! *client-non-blocking-mode* #f)
    res))
  
;; params = 'target cached remparams
(define (cdb:client-call zmq-socket . params)
  (debug:print-info 11 "cdb:client-call zmq-socket=" zmq-socket " params=" params)
  (let ((zdat (db:obj->string params)) ;; (with-output-to-string (lambda ()(serialize params))))
	(res  #f))
    (send-message zmq-socket zdat)
    (set! res (db:string->obj (if *client-non-blocking-mode* 
				  (receive-message* zmq-socket zdat)

Modified megatest.scm from [71ef6a547c] to [c4c3f3d52b].

289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
		      (hostname   (vector-ref server 2))
		      (interface  (vector-ref server 3))
		      (port       (vector-ref server 4))
		      (start-time (vector-ref server 5))
		      (priority   (vector-ref server 6))
		      (state      (vector-ref server 7))
		      (mt-ver     (vector-ref server 8))
		      (status     (open-run-close tasks:server-alive? tasks:open-db hostname port: port))
		      (killed     #f)
		      (zmq-socket (if status (server:client-connect hostname port) #f)))
		 ;; no need to login as status of #t indicates we are connecting to correct 
		 ;; server
		 (if (or (not status)    ;; no point in keeping dead records in the db
			 (and khost-port ;; kill by host/port
			      (equal? hostname (car khost-port))
			      (equal? port (string->number (cadr khost-port)))))
		     (begin
		       (open-run-close tasks:server-deregister tasks:open-db  hostname port: port)
		       (if status ;; #t means alive
			   (begin
			     (if (equal? hostname (get-host-name))
				 (process-signal pid signal/term) ;; local machine, send sig term
				 (cdb:kill-server zmq-socket))    ;; remote machine, try telling server to commit suicide
			     (debug:print-info 1 "Killed server by host:port at " hostname ":" port))
			   (debug:print-info 1 "Removing defunct server record for " hostname ":" port))
		       (set! killed #t)))
		 (if (and kpid
			  ;; (equal? hostname (car khost-port))
			  (equal? kpid pid)) ;;; YEP, ALL WITH PID WILL BE KILLED!!!
		     (begin
		       (open-run-close tasks:server-deregister tasks:open-db hostname pid: pid)
		       (set! killed #t)
		       (if status 
			   (if (equal? hostname (get-host-name))
			       (process-signal pid signal/term) ;; local machine, send sig term
			       (debug:print 0 "WARNING: Can't kill a dead server on host " hostname)))
		       (debug:print-info 1 "Killed server by pid at " hostname ":" port)))
		 ;; (if zmq-socket (close-socket  zmq-socket))
		 (format #t fmtstr id mt-ver pid hostname interface port start-time priority 
			 (if status "alive" "dead"))))
	     servers)
	    (debug:print-info 1 "Done with listservers")
	    (set! *didsomething* #t)
	    (exit) ;; must do, would have to add checks to many/all calls below
	    )







|








<
<
<
<
|
|
<
<
<
<

|

<
<
<
|
|
<
<
<
<







289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304




305
306




307
308
309



310
311




312
313
314
315
316
317
318
		      (hostname   (vector-ref server 2))
		      (interface  (vector-ref server 3))
		      (port       (vector-ref server 4))
		      (start-time (vector-ref server 5))
		      (priority   (vector-ref server 6))
		      (state      (vector-ref server 7))
		      (mt-ver     (vector-ref server 8))
		      (status     (open-run-close tasks:server-alive? tasks:open-db #f hostname: hostname port: port))
		      (killed     #f)
		      (zmq-socket (if status (server:client-connect hostname port) #f)))
		 ;; no need to login as status of #t indicates we are connecting to correct 
		 ;; server
		 (if (or (not status)    ;; no point in keeping dead records in the db
			 (and khost-port ;; kill by host/port
			      (equal? hostname (car khost-port))
			      (equal? port (string->number (cadr khost-port)))))




		     (tasks:kill-server status hostname port pid))





		 (if (and kpid
			  (equal? hostname (car khost-port))
			  (equal? kpid pid)) ;;; YEP, ALL WITH PID WILL BE KILLED!!!



		     (tasks:kill-server status hostname #f pid))





		 (format #t fmtstr id mt-ver pid hostname interface port start-time priority 
			 (if status "alive" "dead"))))
	     servers)
	    (debug:print-info 1 "Done with listservers")
	    (set! *didsomething* #t)
	    (exit) ;; must do, would have to add checks to many/all calls below
	    )

Modified server.scm from [aa1a28766d] to [442b819d32].

25
26
27
28
29
30
31
32

33











34
35
36
37
38
39
40
41

42
43
44
45
46
47
48
49

50
51
52
53

54


55




56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98


99
100
101
102
103
104
105
106
107
108
109
110
111
112
113



114
115
116

117
118
119
120
121
122

123

124
125
126
127





128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
(include "db_records.scm")

(define (server:make-server-url hostport)
  (if (not hostport)
      #f
      (conc "tcp://" (car hostport) ":" (cadr hostport))))

(define  *server-loop-heart-beat* (list 'start (current-seconds)))













(define (server:run hostn)
  (debug:print 0 "Attempting to start the server ...")
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting")
	    (exit))))
  (let* ((zmq-socket     #f)

	 (iface          (if (string=? "-" hostn)
			     "*" ;; (get-host-name) 
			     hostn))
	 (hostname       (get-host-name))
	 (ipaddrstr      (let ((ipstr (if (string=? "-" hostn)
					  (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
					  #f)))
			   (if ipstr ipstr hostname))))

    ;; (set! zmq-socket (server:find-free-port-and-open iface zmq-socket 5555 0))
    (set! zmq-socket (server:find-free-port-and-open ipaddrstr zmq-socket (if (args:get-arg "-port")
									      (string->number (args:get-arg "-port"))
									      5555)

						     0))


    (set! *cache-on* #t)




    
    ;; what to do when we quit
    ;;
    (on-exit (lambda ()
	       (if (and *toppath* *server-id*)
		   (begin
		     (open-run-close tasks:server-deregister-self tasks:open-db ipaddrstr))
		   (let loop () 
		     (let ((queue-len 0))
		       (thread-sleep! (random 5))
		       (mutex-lock! *incoming-mutex*)
		       (set! queue-len (length *incoming-data*))
		       (mutex-unlock! *incoming-mutex*)
		       (if (> queue-len 0)
			   (begin
			     (debug:print-info 0 "Queue not flushed, waiting ...")
			     (loop))))))))

    ;; The heavy lifting
    ;;
    (let loop ()
      ;; Ugly yuk. 
      (mutex-lock! *incoming-mutex*)
      (set! *server-loop-heart-beat* (list 'waiting (current-seconds)))
      (mutex-unlock! *incoming-mutex*)
      (let* ((rawmsg (receive-message* zmq-socket))
	     (params (db:string->obj rawmsg)) ;; (with-input-from-string rawmsg (lambda ()(deserialize))))
	     (res    #f))
	;;; Ugly yuk. 
	(mutex-lock! *incoming-mutex*)
	(set! *server-loop-heart-beat* (list 'working (current-seconds)))
	(mutex-unlock! *incoming-mutex*)
	(debug:print-info 12 "server=> received params=" params)
	(set! res (cdb:cached-access params))
	(debug:print-info 12 "server=> processed res=" res)
	(send-message zmq-socket (db:obj->string res))
	(if (not *time-to-exit*)
	    (loop)
	    (begin
	      (open-run-close tasks:server-deregister-self tasks:open-db #f)
	      (db:write-cached-data)
	      (exit)
	      ))))))



;; run server:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (server:keep-running)
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  (let loop ((count 0))
    (thread-sleep! 3) ;; no need to do this very often
    (db:write-cached-data)
    ;; (print "Server running, count is " count)
    (if (< count 2) ;; 3x3 = 9 secs aprox
	(loop (+ count 1))
	(let ((numrunning            (open-run-close db:get-count-tests-running #f))
	      (server-loop-heartbeat #f))



	;;; Ugly yuk. 
	  (mutex-lock! *incoming-mutex*)
	  (set! server-loop-heartbeat *server-loop-heart-beat*)

	  (mutex-unlock! *incoming-mutex*)
	  ;; The logic here is that if the server loop gets stuck blocked in working
	  ;; we don't want to update our heartbeat
	  (let ((server-state  (car server-loop-heartbeat))
		(server-update (cadr server-loop-heartbeat)))
	    (if (or (eq? server-state 'waiting)

		    (< (- (current-seconds) server-update) 10))

		(open-run-close tasks:server-update-heartbeat  tasks:open-db *server-id*)
		(debug:print "ERROR: No heartbeat update, server appears stuck")))
	  (if (or (> numrunning 0) ;; stay alive for two days after last access
		  (> (+ *last-db-access* (* 48 60 60))(current-seconds)))





	      (begin
		(debug:print-info 2 "Server continuing, tests running: " numrunning ", seconds since last db access: " (- (current-seconds) *last-db-access*))
		(loop 0))
	      (begin
		(debug:print-info 0 "Starting to shutdown the server.")
		;; need to delete only *my* server entry (future use)
		(set! *time-to-exit* #t)
		(open-run-close tasks:server-deregister-self tasks:open-db)
		(thread-sleep! 1)
		(debug:print-info 0 "Max cached queries was " *max-cache-size*)
		(debug:print-info 0 "Server shutdown complete. Exiting")
		(exit)))))))

(define (server:find-free-port-and-open iface s port #!key (trynum 50))
  (let ((s (if s s (make-socket 'rep)))







|
>

>
>
>
>
>
>
>
>
>
>
>








>







|
>

|

<
>

>
>

>
>
>
>




|
















|
|
|
|




|
|
|










|
>
>














|
>
>
>
|
|

>
|


|
|
|
>
|
>
|
<

|
>
>
>
>
>







|







25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66

67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152

153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
(include "db_records.scm")

(define (server:make-server-url hostport)
  (if (not hostport)
      #f
      (conc "tcp://" (car hostport) ":" (cadr hostport))))

(define  *server-loop-heart-beat* (current-seconds))
(define *heartbeat-mutex* (make-mutex))

(define (server:self-ping iface port)
  (let ((zsocket (server:client-connect iface port)))
    (let loop ()
      (thread-sleep! 5)
      (cdb:client-call zsocket 'ping #t)
      (debug:print 4 "server:self-ping - I'm alive on " iface ":" port "!")
      (mutex-lock! *heartbeat-mutex*)
      (set! *server-loop-heart-beat* (current-seconds))
      (mutex-unlock! *heartbeat-mutex*)
      (loop))))
    
(define (server:run hostn)
  (debug:print 0 "Attempting to start the server ...")
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting")
	    (exit))))
  (let* ((zmq-socket     #f)
	 (zmq-socket-dat #f)
	 (iface          (if (string=? "-" hostn)
			     "*" ;; (get-host-name) 
			     hostn))
	 (hostname       (get-host-name))
	 (ipaddrstr      (let ((ipstr (if (string=? "-" hostn)
					  (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
					  #f)))
			   (if ipstr ipstr hostname)))
	 (actual-port    #f))
    ;; (set! zmq-socket (server:find-free-port-and-open iface zmq-socket 5555 0))
    (set! zmq-socket-dat (server:find-free-port-and-open ipaddrstr zmq-socket (if (args:get-arg "-port")
									      (string->number (args:get-arg "-port"))

									      (+ 5000 (random 1001)))
						     0))
    (set! zmq-socket  (cadr  zmq-socket-dat))
    (set! actual-port (caddr zmq-socket-dat))
    (set! *cache-on* #t)

    ;; (set! th1 (make-thread (lambda ()
    ;;     		     (server:self-ping ipaddrstr actual-port))))
    ;; (thread-start! th1)
    
    ;; what to do when we quit
    ;;
    (on-exit (lambda ()
	       (if (and *toppath* *server-info*)
		   (begin
		     (open-run-close tasks:server-deregister-self tasks:open-db ipaddrstr))
		   (let loop () 
		     (let ((queue-len 0))
		       (thread-sleep! (random 5))
		       (mutex-lock! *incoming-mutex*)
		       (set! queue-len (length *incoming-data*))
		       (mutex-unlock! *incoming-mutex*)
		       (if (> queue-len 0)
			   (begin
			     (debug:print-info 0 "Queue not flushed, waiting ...")
			     (loop))))))))

    ;; The heavy lifting
    ;;
    (let loop ()
      ;; ;; Ugly yuk. 
      ;; (mutex-lock! *incoming-mutex*)
      ;; (set! *server-loop-heart-beat* (list 'waiting (current-seconds)))
      ;; (mutex-unlock! *incoming-mutex*)
      (let* ((rawmsg (receive-message* zmq-socket))
	     (params (db:string->obj rawmsg)) ;; (with-input-from-string rawmsg (lambda ()(deserialize))))
	     (res    #f))
	;;; Ugly yuk. 
	;; (mutex-lock! *incoming-mutex*)
	;; (set! *server-loop-heart-beat* (list 'working (current-seconds)))
	;; (mutex-unlock! *incoming-mutex*)
	(debug:print-info 12 "server=> received params=" params)
	(set! res (cdb:cached-access params))
	(debug:print-info 12 "server=> processed res=" res)
	(send-message zmq-socket (db:obj->string res))
	(if (not *time-to-exit*)
	    (loop)
	    (begin
	      (open-run-close tasks:server-deregister-self tasks:open-db #f)
	      (db:write-cached-data)
	      (exit)
	      ))))
    (thread-join! th1)))


;; run server:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (server:keep-running)
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  (let loop ((count 0))
    (thread-sleep! 3) ;; no need to do this very often
    (db:write-cached-data)
    ;; (print "Server running, count is " count)
    (if (< count 2) ;; 3x3 = 9 secs aprox
	(loop (+ count 1))
	(let ((numrunning            (open-run-close db:get-count-tests-running #f))
	      (server-loop-heartbeat #f)
	      (server-info           #f)
	      (pulse                 0))
	  ;; BUG add a wait on server alive here!!
	  ;; ;; Ugly yuk. 
	  (mutex-lock! *heartbeat-mutex*)
	  (set! server-loop-heartbeat *server-loop-heart-beat*)
	  (set! server-info *server-info*)
	  (mutex-unlock! *heartbeat-mutex*)
	  ;; The logic here is that if the server loop gets stuck blocked in working
	  ;; we don't want to update our heartbeat
	  (set! pulse (- (current-seconds) server-loop-heartbeat))
	  (debug:print-info 1 "Heartbeat period is " pulse " on " (cadr server-info) ":" (caddr server-info))
	  (if (> pulse 11) ;; must stay less than 10 seconds 
	      (begin
		(debug:print 0 "ERROR: Heartbeat failed, committing servercide")
		(exit))
	      (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info)))

	  (if (or (> numrunning 0) ;; stay alive for two days after last access
		  (> (+ *last-db-access* 
			;; (* 48 60 60)    ;; 48 hrs
			;; 60              ;; one minute
			(* 60 60)       ;; one hour
			)
		     (current-seconds)))
	      (begin
		(debug:print-info 2 "Server continuing, tests running: " numrunning ", seconds since last db access: " (- (current-seconds) *last-db-access*))
		(loop 0))
	      (begin
		(debug:print-info 0 "Starting to shutdown the server.")
		;; need to delete only *my* server entry (future use)
		(set! *time-to-exit* #t)
		(open-run-close tasks:server-deregister-self tasks:open-db (get-host-name))
		(thread-sleep! 1)
		(debug:print-info 0 "Max cached queries was " *max-cache-size*)
		(debug:print-info 0 "Server shutdown complete. Exiting")
		(exit)))))))

(define (server:find-free-port-and-open iface s port #!key (trynum 50))
  (let ((s (if s s (make-socket 'rep)))
154
155
156
157
158
159
160

161

162
163
164
165
166
167
168
169
	   (debug:print-info 0 "Tried ports up to " p 
			     " but all were in use. Please try a different port range by starting the server with parameter \" -port N\" where N is the starting port number to use")))
     (let ((zmq-url (conc "tcp://" iface ":" p)))
       (print "Trying to start server on " zmq-url)
       (bind-socket s zmq-url)
       (set! *runremote* #f)
       (debug:print 0 "Server started on " zmq-url)

       (set! *server-id* (open-run-close tasks:server-register tasks:open-db (current-process-id) iface p 0 'live))

       s))))

(define (server:mk-signature)
  (message-digest-string (md5-primitive) 
			 (with-output-to-string
			   (lambda ()
			     (write (list (current-directory)
					  (argv)))))))







>
|
>
|







186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
	   (debug:print-info 0 "Tried ports up to " p 
			     " but all were in use. Please try a different port range by starting the server with parameter \" -port N\" where N is the starting port number to use")))
     (let ((zmq-url (conc "tcp://" iface ":" p)))
       (print "Trying to start server on " zmq-url)
       (bind-socket s zmq-url)
       (set! *runremote* #f)
       (debug:print 0 "Server started on " zmq-url)
       (mutex-lock! *heartbeat-mutex*)
       (set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) iface p 0 'live))
       (mutex-unlock! *heartbeat-mutex*)
       (list iface s port)))))

(define (server:mk-signature)
  (message-digest-string (md5-primitive) 
			 (with-output-to-string
			   (lambda ()
			     (write (list (current-directory)
					  (argv)))))))
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216


217
218
219
220

221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240

241
242
243

244
245
246
247
248
249
250
251
252
253
254












255
256
257
258


259
260
261
262
263

264
265
266
267
268
269
270
(define (server:client-logout zmq-socket)
  (let ((ok (and (socket? zmq-socket)
		 (cdb:logout zmq-socket *toppath* (server:get-client-signature)))))
    ;; (close-socket zmq-socket)
    ok))

;; Do all the connection work, start a server if not already running
(define (server:client-setup #!key (numtries 50)(do-ping #f))
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: failed to find megatest.config, exiting")
	    (exit))))
  (let ((hostinfo   (open-run-close tasks:get-best-server tasks:open-db do-ping: do-ping)))
    (if hostinfo
	(let ((host    (car   hostinfo))
	      (iface   (cadr  hostinfo))
	      (port    (caddr hostinfo)))
	  (debug:print-info 2 "Setting up to connect to " hostinfo)
	  (handle-exceptions
	   exn
	   (begin


	     (debug:print 0 "ERROR: Failed to open a connection to the server at: " hostinfo)
	     (debug:print 0 "   EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
	     (debug:print 0 "   perhaps jobs killed with -9? Removing server records")
	     (open-run-close tasks:server-deregister tasks:open-db host port: port)

	     #f)
	   (let* ((zmq-socket (server:client-connect iface port))
		  (login-res  (server:client-login zmq-socket))
		  (connect-ok (if (null? login-res) #f (car login-res)))
		  (conurl     (server:make-server-url (list iface port))))
	     (if connect-ok
		 (begin
		   (debug:print-info 2 "Logged in and connected to " conurl)
		   (set! *runremote* zmq-socket)
		   #t)
		 (begin
		   (debug:print-info 2 "Failed to login or connect to " conurl)
		   (set! *runremote* #f)
		   #f)))))
	(if (> numtries 0)
	    (let ((exe (car (argv))))
	      (debug:print-info 1 "No server available, attempting to start one...")
	      (process-run exe (list "-server" "-" "-debug" (conc *verbosity*)))
	      (sleep 2)
	      ;; not doing ping, assume the server started and registered itself

	      (server:client-setup numtries: (- numtries 1) do-ping: #f))
	    (debug:print-info 1 "Too many attempts, giving up")))))


(define (server:launch)
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: cannot find megatest.config, exiting")
	    (exit))))
  (debug:print-info 0 "Starting the standalone server")
  (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db)))
    (if hostinfo
	(debug:print-info 1 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo))
	(if *toppath* 












	    (let* ((th2 (make-thread (lambda ()
				       (server:run (args:get-arg "-server")))))
		   (th3 (make-thread (lambda ()
				       (server:keep-running)))))


	      (thread-start! th2)
	      (thread-start! th3)
	      (set! *didsomething* #t)
	      (thread-join! th3))
	    (debug:print 0 "ERROR: Failed to setup for megatest")))))


(define (server:client-launch #!key (do-ping #f))
  (if (server:client-setup do-ping: do-ping)
      (debug:print-info 2 "connected as client")
      (begin
	(debug:print 0 "ERROR: Failed to connect as client")
	(exit))))







|





|








>
>




>


















|
|
>
|


>











>
>
>
>
>
>
>
>
>
>
>
>
|
|

|
>
>




|
>







229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
(define (server:client-logout zmq-socket)
  (let ((ok (and (socket? zmq-socket)
		 (cdb:logout zmq-socket *toppath* (server:get-client-signature)))))
    ;; (close-socket zmq-socket)
    ok))

;; Do all the connection work, start a server if not already running
(define (server:client-setup #!key (numtries 50))
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: failed to find megatest.config, exiting")
	    (exit))))
  (let ((hostinfo   (open-run-close tasks:get-best-server tasks:open-db)))
    (if hostinfo
	(let ((host    (car   hostinfo))
	      (iface   (cadr  hostinfo))
	      (port    (caddr hostinfo)))
	  (debug:print-info 2 "Setting up to connect to " hostinfo)
	  (handle-exceptions
	   exn
	   (begin
	     ;; something went wrong in connecting to the server. In this scenario it is ok
	     ;; to try again
	     (debug:print 0 "ERROR: Failed to open a connection to the server at: " hostinfo)
	     (debug:print 0 "   EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
	     (debug:print 0 "   perhaps jobs killed with -9? Removing server records")
	     (open-run-close tasks:server-deregister tasks:open-db host port: port)
	     (server:client-setup (- numtries 1))
	     #f)
	   (let* ((zmq-socket (server:client-connect iface port))
		  (login-res  (server:client-login zmq-socket))
		  (connect-ok (if (null? login-res) #f (car login-res)))
		  (conurl     (server:make-server-url (list iface port))))
	     (if connect-ok
		 (begin
		   (debug:print-info 2 "Logged in and connected to " conurl)
		   (set! *runremote* zmq-socket)
		   #t)
		 (begin
		   (debug:print-info 2 "Failed to login or connect to " conurl)
		   (set! *runremote* #f)
		   #f)))))
	(if (> numtries 0)
	    (let ((exe (car (argv))))
	      (debug:print-info 1 "No server available, attempting to start one...")
	      (process-run exe (list "-server" "-" "-debug" (conc *verbosity*)))
	      (sleep 5) ;; give server time to start
	      ;; we are starting a server, do not try again! That can lead to 
	      ;; recursively starting many processes!!!
	      (server:client-setup numtries: 0))
	    (debug:print-info 1 "Too many attempts, giving up")))))

;; all routes though here end in exit ...
(define (server:launch)
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: cannot find megatest.config, exiting")
	    (exit))))
  (debug:print-info 0 "Starting the standalone server")
  (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db)))
    (if hostinfo
	(debug:print-info 1 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo))
	(if *toppath* 
	    (let* ((th1 (make-thread (lambda ()
				       (let ((server-info #f))
					 ;; wait for the server to be online and available
					 (let loop ()
					   (debug:print-info 1 "Waiting for the server to come online before starting heartbeat")
					   (thread-sleep! 5)
					   (mutex-lock! *heartbeat-mutex*)
					   (set! server-info *server-info* )
					   (mutex-unlock! *heartbeat-mutex*)
					   (if (not server-info)(loop)))
					 (debug:print 1 "Server alive, starting self-ping")
					 (server:self-ping (cadr server-info)(caddr server-info)))) "Self ping"))
		   (th2 (make-thread (lambda ()
				       (server:run (args:get-arg "-server"))) "Server run"))
		   (th3 (make-thread (lambda ()
				       (server:keep-running)) "Keep running")))
	      (set! *client-non-blocking-mode* #t)
	      (thread-start! th1)
	      (thread-start! th2)
	      (thread-start! th3)
	      (set! *didsomething* #t)
	      (thread-join! th3))
	    (debug:print 0 "ERROR: Failed to setup for megatest")))
    (exit)))

(define (server:client-launch #!key (do-ping #f))
  (if (server:client-setup do-ping: do-ping)
      (debug:print-info 2 "connected as client")
      (begin
	(debug:print 0 "ERROR: Failed to connect as client")
	(exit))))
285
286
287
288
289
290
291
292

293
294
295
296
297
298
299
300
301

302
303
304
305
306
307
308
				    (begin
				      (server:client-logout zmq-socket)
				      (close-socket  zmq-socket)))
				(set! res (list #t numclients (if return-socket zmq-socket #f))))
			      (begin
				;; (close-socket zmq-socket)
				(set! res (list #f "CAN'T LOGIN" #f))))
			  (set! res (list #f "CAN'T CONNECT" #f)))))))

	    (th2 (make-thread
		  (lambda ()
		    (let loop ((count 1))
		      (debug:print-info 1 "Ping " count " server on " host " at port " port)
		      (thread-sleep! 2)
		      (if (< count (/ secs 2))
			  (loop (+ count 1))))
		    ;; (thread-terminate! th1)
		    (set! res (list #f "TIMED OUT" #f))))))

       (thread-start! th2)
       (thread-start! th1)
       (handle-exceptions
	exn
	(set! res (list #f "TIMED OUT" #f))
	(thread-join! th1 secs))
       res))))







|
>








|
>







339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
				    (begin
				      (server:client-logout zmq-socket)
				      (close-socket  zmq-socket)))
				(set! res (list #t numclients (if return-socket zmq-socket #f))))
			      (begin
				;; (close-socket zmq-socket)
				(set! res (list #f "CAN'T LOGIN" #f))))
			  (set! res (list #f "CAN'T CONNECT" #f)))))
		  "Ping: th1"))
	    (th2 (make-thread
		  (lambda ()
		    (let loop ((count 1))
		      (debug:print-info 1 "Ping " count " server on " host " at port " port)
		      (thread-sleep! 2)
		      (if (< count (/ secs 2))
			  (loop (+ count 1))))
		    ;; (thread-terminate! th1)
		    (set! res (list #f "TIMED OUT" #f)))
		  "Ping: th2")))
       (thread-start! th2)
       (thread-start! th1)
       (handle-exceptions
	exn
	(set! res (list #f "TIMED OUT" #f))
	(thread-join! th1 secs))
       res))))

Modified tasks.scm from [efc84088da] to [6c83278de0].

78
79
80
81
82
83
84

85


86
87
88
89
90
91

92
93

94
95
96
97
98
99
100

;; state: 'live, 'shutting-down, 'dead
(define (tasks:server-register mdb pid interface port priority state)
  (sqlite3:execute 
   mdb 
   "INSERT OR REPLACE INTO servers (pid,hostname,port,start_time,priority,state,mt_version,heartbeat,interface) VALUES(?,?,?,strftime('%s','now'),?,?,?,strftime('%s','now'),?);"
   pid (get-host-name) port priority (conc state) megatest-version interface)

  (tasks:server-get-server-id mdb (get-host-name) port pid))



;; NB// two servers with same pid on different hosts will be removed from the list if pid: is used!
(define (tasks:server-deregister mdb hostname #!key (port #f)(pid #f))
  (debug:print-info 11 "server-deregister " hostname ", port " port ", pid " pid)
  (if pid
      (sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid)

      (if port
	  (sqlite3:execute mdb "DELETE FROM servers WHERE  hostname=? AND port=?;" hostname port)

	  (debug:print 0 "ERROR: tasks:server-deregister called with neither pid nor port specified"))))

(define (tasks:server-deregister-self mdb hostname)
  (tasks:server-deregister mdb hostname pid: (current-process-id)))

(define (tasks:server-get-server-id mdb hostname port pid)
  (let ((res #f))







>
|
>
>





|
>

|
>







78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105

;; state: 'live, 'shutting-down, 'dead
(define (tasks:server-register mdb pid interface port priority state)
  (sqlite3:execute 
   mdb 
   "INSERT OR REPLACE INTO servers (pid,hostname,port,start_time,priority,state,mt_version,heartbeat,interface) VALUES(?,?,?,strftime('%s','now'),?,?,?,strftime('%s','now'),?);"
   pid (get-host-name) port priority (conc state) megatest-version interface)
  (list 
   (tasks:server-get-server-id mdb (get-host-name) port pid)
   interface
   port))

;; NB// two servers with same pid on different hosts will be removed from the list if pid: is used!
(define (tasks:server-deregister mdb hostname #!key (port #f)(pid #f))
  (debug:print-info 11 "server-deregister " hostname ", port " port ", pid " pid)
  (if pid
      ;; (sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid)
      (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE pid=?;" pid)
      (if port
	  ;; (sqlite3:execute mdb "DELETE FROM servers WHERE  hostname=? AND port=?;" hostname port)
	  (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE hostname=? AND port=?;" hostname port)
	  (debug:print 0 "ERROR: tasks:server-deregister called with neither pid nor port specified"))))

(define (tasks:server-deregister-self mdb hostname)
  (tasks:server-deregister mdb hostname pid: (current-process-id)))

(define (tasks:server-get-server-id mdb hostname port pid)
  (let ((res #f))
117
118
119
120
121
122
123

124
125
126
127
128
129
130
131
132
133
134
135
136
137
			 server-id
			 (tasks:server-get-server-id mdb hostname port pid)))
	 (heartbeat-delta 99e9))
    (sqlite3:for-each-row
     (lambda (delta)
       (set! heartbeat-delta delta))
     mdb "SELECT strftime('%s','now')-heartbeat FROM servers WHERE id=?;" server-id)

    (< heartbeat-delta 10)))

(define (tasks:client-register mdb pid hostname cmdline)
  (sqlite3:execute
   mdb
   "INSERT OR REPLACE INTO clients (server_id,pid,hostname,cmdline,login_time) VALUES(?,?,?,?,strftime('%s','now'));")
  (tasks:server-get-server-id mdb)
  pid hostname cmdline)

(define (tasks:client-logout mdb pid hostname cmdline)
  (sqlite3:execute
   mdb
   "UPDATE clients SET logout_time=strftime('%s','now') WHERE pid=? AND hostname=? AND cmdline=?;"
   pid hostname cmdline))







>






|







122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
			 server-id
			 (tasks:server-get-server-id mdb hostname port pid)))
	 (heartbeat-delta 99e9))
    (sqlite3:for-each-row
     (lambda (delta)
       (set! heartbeat-delta delta))
     mdb "SELECT strftime('%s','now')-heartbeat FROM servers WHERE id=?;" server-id)
    (debug:print 1 "Found heartbeat-delta of " heartbeat-delta " for server with id " server-id)
    (< heartbeat-delta 10)))

(define (tasks:client-register mdb pid hostname cmdline)
  (sqlite3:execute
   mdb
   "INSERT OR REPLACE INTO clients (server_id,pid,hostname,cmdline,login_time) VALUES(?,?,?,?,strftime('%s','now'));")
  (tasks:server-get-server-id mdb hostname #f pid)
  pid hostname cmdline)

(define (tasks:client-logout mdb pid hostname cmdline)
  (sqlite3:execute
   mdb
   "UPDATE clients SET logout_time=strftime('%s','now') WHERE pid=? AND hostname=? AND cmdline=?;"
   pid hostname cmdline))
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176

177
178
179
180
181
182
183
184
185
186
187
188
189

190
191
192
193
194



























195
196
197
198
199
200
201
     server-id)))

(define (tasks:have-clients? mdb server-id)
  (null? (tasks:get-logged-in-clients mdb server-id)))

;; ping each server in the db and return first found that responds. 
;; remove any others. will not necessarily remove all!
(define (tasks:get-best-server mdb #!key (do-ping #f))
  (let ((res '())
	(best #f))
    (sqlite3:for-each-row
     (lambda (id hostname interface port pid)
       (set! res (cons (list hostname interface port pid) res))
       (debug:print-info 1 "Found " hostname ":" port))
     mdb
     "SELECT id,hostname,interface,port,pid FROM servers WHERE state='live' AND mt_version=? ORDER BY start_time DESC LIMIT 1;" megatest-version)
    ;; (print "res=" res)
    (if (null? res) #f
	(let loop ((hed (car res))
		   (tal (cdr res)))
	  ;; (print "hed=" hed ", tal=" tal)
	  (let* ((host     (car    hed))
		 (iface    (cadr   hed))
		 (port     (caddr  hed))
		 (pid      (cadddr hed))
		 ;; (ping-res (if do-ping (server:ping host port return-socket: #f) '(#t "NO PING" #f)))
		 (alive    (open-run-close tasks:server-alive? tasks:open-db host port: port)) ;; (car ping-res))
		 ;; (reason   (cadr ping-res))
		 ;; (zsocket  (caddr ping-res))
		 )
	    (if alive

		;; (if (server:ping iface port)
		    (list host iface port)
		 ;;    ;; not actually alive, destroy!
		 ;;    (begin
		 ;;      (if (equal? host (get-host-name))
		 ;;          (begin
		 ;;            (debug:print-info 0 "Killing process " pid " on host " host " with signal/term")
		 ;;            (send-signal pid signal/term))
		 ;;          (debug:print 0 "WARNING: Can't kill process " pid " on host " host))
		 ;;      (open-run-close tasks:server-deregister tasks:open-db  host port: port)
		 ;;      #f))
		;; remove defunct server from table
		(begin

		  (open-run-close tasks:server-deregister tasks:open-db  host port: port)
		  (if (null? tal)
		      #f
		      (loop (car tal)(cdr tal))))))))))




























(define (tasks:get-all-servers mdb)
  (let ((res '()))
    (sqlite3:for-each-row
     (lambda (id pid hostname interface port start-time priority state mt-version)
       (set! res (cons (vector id pid hostname interface port start-time priority state mt-version) res)))
     mdb
     "SELECT id,pid,hostname,interface,port,start_time,priority,state,mt_version FROM servers ORDER BY start_time DESC;")







|





|











<
|
<
<
<

>
|
|
<
<
<
<
<
<
<
<
<
<

>
|




>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176

177



178
179
180
181










182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
     server-id)))

(define (tasks:have-clients? mdb server-id)
  (null? (tasks:get-logged-in-clients mdb server-id)))

;; ping each server in the db and return first found that responds. 
;; remove any others. will not necessarily remove all!
(define (tasks:get-best-server mdb)
  (let ((res '())
	(best #f))
    (sqlite3:for-each-row
     (lambda (id hostname interface port pid)
       (set! res (cons (list hostname interface port pid) res))
       (debug:print-info 1 "Found existing server " hostname ":" port " registered in db"))
     mdb
     "SELECT id,hostname,interface,port,pid FROM servers WHERE state='live' AND mt_version=? ORDER BY start_time DESC LIMIT 1;" megatest-version)
    ;; (print "res=" res)
    (if (null? res) #f
	(let loop ((hed (car res))
		   (tal (cdr res)))
	  ;; (print "hed=" hed ", tal=" tal)
	  (let* ((host     (car    hed))
		 (iface    (cadr   hed))
		 (port     (caddr  hed))
		 (pid      (cadddr hed))

		 (alive    (open-run-close tasks:server-alive? tasks:open-db #f hostname: host port: port)))



	    (if alive
		(begin
		  (debug:print 1 "Found an existing, alive, server " host ":" port ".")
		  (list host iface port))










		(begin
		  (debug:print-info 1 "Removing " host ":" port " from server registry as it appears to be dead")
		  (tasks:kill-server #f host port pid)
		  (if (null? tal)
		      #f
		      (loop (car tal)(cdr tal))))))))))

(define (tasks:kill-server status hostname port pid)
  (debug:print-info 1 "Removing defunct server record for " hostname ":" port)
  (if port
      (open-run-close tasks:server-deregister tasks:open-db  hostname port: port)
      (open-run-close tasks:server-deregister tasks:open-db hostname pid: pid))
  
  (if status ;; #t means alive
      (begin
	(if (equal? hostname (get-host-name))
	    (begin
	      (debug:print 1 "Sending signal/term to " pid " on " hostname)
	      (process-signal pid signal/term)
	      (thread-sleep! 5) ;; give it five seconds to die peacefully then do a brutal kill
	      (process-signal pid signal/kill)) ;; local machine, send sig term
	    (begin
	      (debug:print-info 1 "Telling alive server on " hostname ":" port " to commit servercide")
	      (cdb:kill-server zmq-socket))))    ;; remote machine, try telling server to commit suicide
      (begin
	(if status 
	    (if (equal? hostname (get-host-name))
		(begin
		  (debug:print-info 1 "Sending signal/term to " pid " on " hostname)
		  (process-signal pid signal/term)  ;; local machine, send sig term
		  (thread-sleep! 5)                 ;; give it five seconds to die peacefully then do a brutal kill
		  (process-signal pid signal/kill)) 
		(debug:print 0 "WARNING: Can't kill frozen server on remote host " hostname))))))

(define (tasks:get-all-servers mdb)
  (let ((res '()))
    (sqlite3:for-each-row
     (lambda (id pid hostname interface port start-time priority state mt-version)
       (set! res (cons (vector id pid hostname interface port start-time priority state mt-version) res)))
     mdb
     "SELECT id,pid,hostname,interface,port,start_time,priority,state,mt_version FROM servers ORDER BY start_time DESC;")