34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
-
-
-
-
+
+
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
|
;;
;; TODO
;;
;; Done Tested
;; [x] [ ] 1. Add columns pullport pubport to servers table
;; [x] [ ] 2. Add rm of monitor.db if older than 11/12/2012
;; [x] [ ] 3. Add create of pullport and pubport with finding of available ports
;; [ ] [ ] 4. Add client compose of request
;; [ ] [ ] - name of client: testname/itempath-test_id-hostname
;; [ ] [ ] - name of request: callname, params
;; [ ] [ ] - request key: f(clientname, callname, params)
;; [x] [ ] 4. Add client compose of request
;; [x] [ ] - name of client: testname/itempath-test_id-hostname
;; [x] [ ] - name of request: callname, params
;; [x] [ ] - request key: f(clientname, callname, params)
;; [ ] [ ] 5. Add processing of subscription hits
;; [ ] [ ] - done when get key
;; [ ] [ ] - return results
;; [ ] [ ] 6. Add timeout processing
;; [ ] [ ] - after 60 seconds
;; [ ] [ ] i. check server alive, connect to new if necessary
;; [ ] [ ] ii. resend request
;; [ ] [ ] 7. Turn self ping back on
(define (server:make-server-url hostport)
(if (not hostport)
#f
(conc "tcp://" (car hostport) ":" (cadr hostport))))
(define *server-loop-heart-beat* (current-seconds))
(define *heartbeat-mutex* (make-mutex))
(define (server:self-ping server-info)
;; server-info: server-id interface pullport pubport
(let ((iface (list-ref server-info 1))
(pullport (list-ref server-info 2))
(pubport (list-ref server-info 3)))
(server:client-connect iface pullport pubport)
(let loop ()
(thread-sleep! 2)
(cdb:client-call *runremote* 'ping #t)
(debug:print 4 "server:self-ping - I'm alive on " iface ":" pullport "/" pubport "!")
(mutex-lock! *heartbeat-mutex*)
(set! *server-loop-heart-beat* (current-seconds))
(mutex-unlock! *heartbeat-mutex*)
(loop))))
;; (define (server:self-ping server-info)
;; ;; server-info: server-id interface pullport pubport
;; (let ((iface (list-ref server-info 1))
;; (pullport (list-ref server-info 2))
;; (pubport (list-ref server-info 3)))
;; (server:client-connect iface pullport pubport)
;; (let loop ()
;; (thread-sleep! 2)
;; (cdb:client-call *runremote* 'ping #t)
;; (debug:print 4 "server:self-ping - I'm alive on " iface ":" pullport "/" pubport "!")
;; (mutex-lock! *heartbeat-mutex*)
;; (set! *server-loop-heart-beat* (current-seconds))
;; (mutex-unlock! *heartbeat-mutex*)
;; (loop))))
(define-inline (zmqsock:get-pub dat)(vector-ref dat 0))
(define-inline (zmqsock:get-pull dat)(vector-ref dat 1))
(define-inline (zmqsock:set-pub! dat s)(vector-set! dat s 0))
(define-inline (zmqsock:set-pull! dat s)(vector-set! dat s 0))
(define (server:run hostn)
|
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
|
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
|
-
+
|
(begin
(debug:print-info 0 "Queue not flushed, waiting ...")
(loop))))))))
;; The heavy lifting
;;
(let loop ()
(print "GOT HERE EH?")
;; (print "GOT HERE EH?")
(let* ((rawmsg (receive-message* pull-socket))
(params (db:string->obj rawmsg)) ;; (with-input-from-string rawmsg (lambda ()(deserialize))))
(res #f))
(debug:print-info 12 "server=> received params=" params)
(set! res (cdb:cached-access params))
(debug:print-info 12 "server=> processed res=" res)
|
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
|
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
|
-
-
-
-
+
+
+
+
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
|
(loop (+ count 1))
(let (;; (numrunning (open-run-close db:get-count-tests-running #f))
(server-loop-heartbeat #f)
(server-info #f)
(pulse 0))
;; BUG add a wait on server alive here!!
;; ;; Ugly yuk.
(mutex-lock! *heartbeat-mutex*)
(set! server-loop-heartbeat *server-loop-heart-beat*)
(set! server-info *server-info*)
(mutex-unlock! *heartbeat-mutex*)
;; == (mutex-lock! *heartbeat-mutex*)
;; == (set! server-loop-heartbeat *server-loop-heart-beat*)
;; == (set! server-info *server-info*)
;; == (mutex-unlock! *heartbeat-mutex*)
;; The logic here is that if the server loop gets stuck blocked in working
;; we don't want to update our heartbeat
(set! pulse (- (current-seconds) server-loop-heartbeat))
(debug:print-info 2 "Heartbeat period is " pulse " seconds on " (cadr server-info) ":" (caddr server-info) ", last db access is " (- (current-seconds) *last-db-access*) " seconds ago")
(if (> pulse 15) ;; must stay less than 10 seconds
(begin
(open-run-close tasks:server-deregister tasks:open-db (cadr server-info) pullport: (caddr server-info))
(debug:print 0 "ERROR: Heartbeat failed, committing servercide")
(exit))
(open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info)))
;; == (set! pulse (- (current-seconds) server-loop-heartbeat))
;; == (debug:print-info 2 "Heartbeat period is " pulse " seconds on " (cadr server-info) ":" (caddr server-info) ", last db access is " (- (current-seconds) *last-db-access*) " seconds ago")
;; == (if (> pulse 15) ;; must stay less than 10 seconds
;; == (begin
;; == (open-run-close tasks:server-deregister tasks:open-db (cadr server-info) pullport: (caddr server-info))
;; == (debug:print 0 "ERROR: Heartbeat failed, committing servercide")
;; == (exit))
;; NOTE: Get rid of this mechanism! It really is not needed...
(open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info))
;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access
(if (> (+ *last-db-access*
;; (* 48 60 60) ;; 48 hrs
;; 60 ;; one minute
(* 60 60) ;; one hour
)
(current-seconds))
|
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
|
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
|
-
-
+
+
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
|
(let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db)))
(if hostinfo
(let ((host (list-ref hostinfo 0))
(iface (list-ref hostinfo 1))
(pullport (list-ref hostinfo 2))
(pubport (list-ref hostinfo 3)))
(debug:print-info 2 "Setting up to connect to " hostinfo)
(handle-exceptions
exn
;;(handle-exceptions
;; exn
(begin
;; something went wrong in connecting to the server. In this scenario it is ok
;; to try again
(debug:print 0 "ERROR: Failed to open a connection to the server at: " hostinfo)
(debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
(debug:print 0 " perhaps jobs killed with -9? Removing server records")
(open-run-close tasks:server-deregister tasks:open-db host pullport: pullport)
(server:client-setup (- numtries 1))
#f)
(server:client-connect iface pullport pubport)))
(if (> numtries 0)
(let ((exe (car (argv))))
(debug:print-info 1 "No server available, attempting to start one...")
(process-run exe (list "-server" "-" "-debug" (conc *verbosity*)))
(sleep 5) ;; give server time to start
;; we are starting a server, do not try again! That can lead to
;; recursively starting many processes!!!
(server:client-setup numtries: 0))
(debug:print-info 1 "Too many attempts, giving up")))))
(server:client-connect iface pullport pubport)))))
;; (if (> numtries 0)
;; (let ((exe (car (argv))))
;; (debug:print-info 1 "No server available, attempting to start one...")
;; (process-run exe (list "-server" "-" "-debug" (conc *verbosity*)))
;; (sleep 5) ;; give server time to start
;; ;; we are starting a server, do not try again! That can lead to
;; ;; recursively starting many processes!!!
;; (server:client-setup numtries: 0))
;; (debug:print-info 1 "Too many attempts, giving up")))))
;; all routes though here end in exit ...
(define (server:launch)
(if (not *toppath*)
(if (not (setup-for-run))
(begin
(debug:print 0 "ERROR: cannot find megatest.config, exiting")
(exit))))
(debug:print-info 1 "Starting the standalone server")
(let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db)))
(if hostinfo
(debug:print-info 1 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo))
(if *toppath*
(let* ((th1 (make-thread (lambda ()
(let ((server-info #f))
;; wait for the server to be online and available
(let loop ()
(debug:print-info 1 "Waiting for the server to come online before starting heartbeat")
(thread-sleep! 2)
(mutex-lock! *heartbeat-mutex*)
(set! server-info *server-info* )
(mutex-unlock! *heartbeat-mutex*)
(if (not server-info)(loop)))
(debug:print 1 "Server alive, starting self-ping")
;; (server:self-ping server-info)
))
"Self ping"))
(let* (;; (th1 (make-thread (lambda ()
;; (let ((server-info #f))
;; ;; wait for the server to be online and available
;; (let loop ()
;; (debug:print-info 1 "Waiting for the server to come online before starting heartbeat")
;; (thread-sleep! 2)
;; (mutex-lock! *heartbeat-mutex*)
;; (set! server-info *server-info* )
;; (mutex-unlock! *heartbeat-mutex*)
;; (if (not server-info)(loop)))
;; (debug:print 1 "Server alive, starting self-ping")
;; (server:self-ping server-info)
;; ))
;; "Self ping"))
(th2 (make-thread (lambda ()
(server:run (args:get-arg "-server"))) "Server run"))
(th3 (make-thread (lambda ()
(server:keep-running)) "Keep running")))
(set! *client-non-blocking-mode* #t)
(thread-start! th1)
;; (thread-start! th1)
(thread-start! th2)
(thread-start! th3)
(set! *didsomething* #t)
(thread-join! th3))
(debug:print 0 "ERROR: Failed to setup for megatest")))
(exit)))
|