9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
|
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
+
-
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
-
-
-
-
-
+
+
-
-
-
-
-
-
-
-
+
-
-
-
-
+
+
+
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
-
-
-
-
-
-
+
+
+
-
-
-
-
-
-
-
+
+
+
+
+
+
+
-
-
-
-
+
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
|
;; PURPOSE.
(require-extension (srfi 18) extras tcp s11n)
(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest)
(import (prefix sqlite3 sqlite3:))
(use spiffy awful http-client)
(use zmq)
(tcp-buffer-size 2048)
(declare (unit server))
(declare (uses common))
(declare (uses db))
(declare (uses tests))
(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running.
(include "common_records.scm")
(include "db_records.scm")
;; Transition to pub --> sub with pull <-- push
;;
;; 1. client sends request to server via push to the pull port
;; 2. server puts request in queue or processes immediately as appropriate
;; 3. server puts responses from completed requests into pub port
;;
;; TODO
;;
;; Done Tested
;; [x] [ ] 1. Add columns pullport pubport to servers table
;; [x] [ ] 2. Add rm of monitor.db if older than 11/12/2012
;; [x] [ ] 3. Add create of pullport and pubport with finding of available ports
;; [x] [ ] 4. Add client compose of request
;; [x] [ ] - name of client: testname/itempath-test_id-hostname
;; [x] [ ] - name of request: callname, params
;; [x] [ ] - request key: f(clientname, callname, params)
;; [x] [ ] 5. Add processing of subscription hits
;; [x] [ ] - done when get key
;; [x] [ ] - return results
;; [x] [ ] 6. Add timeout processing
;; [x] [ ] - after 60 seconds
;; [ ] [ ] i. check server alive, connect to new if necessary
;; [ ] [ ] ii. resend request
;; [ ] [ ] 7. Turn self ping back on
(define (server:make-server-url hostport)
(if (not hostport)
#f
(conc "tcp://" (car hostport) ":" (cadr hostport))))
(conc "http://" (car hostport) ":" (cadr hostport))))
(define *server-loop-heart-beat* (current-seconds))
(define *heartbeat-mutex* (make-mutex))
;;======================================================================
;; S E R V E R
;;======================================================================
(define-inline (zmqsock:get-pub dat)(vector-ref dat 0))
(define-inline (zmqsock:get-pull dat)(vector-ref dat 1))
(define-inline (zmqsock:set-pub! dat s)(vector-set! dat s 0))
(define-inline (zmqsock:set-pull! dat s)(vector-set! dat s 0))
;; Call this to start the actual server
;;
(define (server:run hostn)
(debug:print 2 "Attempting to start the server ...")
(if (not *toppath*)
(if (not (setup-for-run))
(begin
(debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting")
(exit))))
(let* ((zmq-sdat1 #f)
(zmq-sdat2 #f)
(pull-socket #f)
(pub-socket #f)
(p1 #f)
(p2 #f)
(zmq-sockets-dat #f)
(iface (if (string=? "-" hostn)
(let* ((iface (if (string=? "-" hostn)
"*" ;; (get-host-name)
hostn))
(hostname (get-host-name))
(ipaddrstr (let ((ipstr (if (string=? "-" hostn)
(string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
#f)))
(if ipstr ipstr hostname)))
(last-run 0))
(set! zmq-sockets-dat (server:setup-ports ipaddrstr (if (args:get-arg "-port")
(string->number (args:get-arg "-port"))
(+ 5000 (random 1001)))))
(start-port (if (args:get-arg "-port")
(string->number (args:get-arg "-port"))
(+ 5000 (random 1001)))))
(set! zmq-sdat1 (car zmq-sockets-dat))
(set! pull-socket (cadr zmq-sdat1)) ;; (iface s port)
(set! p1 (caddr zmq-sdat1))
(set! zmq-sdat2 (cadr zmq-sockets-dat))
(set! pub-socket (cadr zmq-sdat2))
(set! p2 (caddr zmq-sdat2))
(set! *cache-on* #t)
(server:try-start-server ipaddrstr portnum)))
;; what to do when we quit
;;
;; (on-exit (lambda ()
;; (if (and *toppath* *server-info*)
;; (open-run-close tasks:server-deregister-self tasks:open-db (car *server-info*))
;; (let loop ()
;; (let ((queue-len 0))
;; (thread-sleep! (random 5))
;; (mutex-lock! *incoming-mutex*)
;; (set! queue-len (length *incoming-data*))
;; (mutex-unlock! *incoming-mutex*)
;; (if (> queue-len 0)
;; (begin
;; (debug:print-info 0 "Queue not flushed, waiting ...")
;; (loop))))))))
;; The heavy lifting
;;
;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime
;;
(let loop ((queue-lst '()))
(let* ((rawmsg (receive-message* pull-socket))
(packet (db:string->obj rawmsg))
(qtype (cdb:packet-get-qtype packet)))
(debug:print-info 12 "server=> received packet=" packet)
(if (not (member qtype '(sync ping)))
(begin
(mutex-lock! *heartbeat-mutex*)
(set! *last-db-access* (current-seconds))
(mutex-unlock! *heartbeat-mutex*)))
(if #t ;; (cdb:packet-get-immediate packet) ;; process immediately or put in queue
(begin
(open-run-close db:process-queue #f pub-socket (cons packet queue-lst))
(loop '()))
(loop (cons packet queue-lst)))))))
;; run server:keep-running in a parallel thread to monitor that the db is being
;; used and to shutdown after sometime if it is not.
;;
(define (server:keep-running)
;; if none running or if > 20 seconds since
;; server last used then start shutdown
;; This thread waits for the server to come alive
(let* ((server-info (let loop ()
(let ((sdat #f))
(mutex-lock! *heartbeat-mutex*)
(set! sdat *server-info*)
(mutex-unlock! *heartbeat-mutex*)
(if sdat sdat
(begin
(sleep 4)
(loop))))))
(iface (cadr server-info))
(pullport (caddr server-info))
(pubport (cadddr server-info)) ;; id interface pullport pubport)
(zmq-sockets (server:client-connect iface pullport pubport))
(last-access 0))
(let loop ((count 0))
(thread-sleep! 4) ;; no need to do this very often
;; NB// sync currently does NOT return queue-length
(let ((queue-len (cdb:client-call zmq-sockets 'sync #t 1)))
;; (print "Server running, count is " count)
(if (< count 1) ;; 3x3 = 9 secs aprox
(loop (+ count 1)))
;; NOTE: Get rid of this mechanism! It really is not needed...
(open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info))
;; This is recursively run by server:run until sucessful
;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access
(mutex-lock! *heartbeat-mutex*)
(set! last-access *last-db-access*)
(mutex-unlock! *heartbeat-mutex*)
(if (> (+ last-access
;; (* 50 60 60) ;; 48 hrs
;; 60 ;; one minute
;; (* 60 60) ;; one hour
(* 45 60) ;; 45 minutes, until the db deletion bug is fixed.
)
(current-seconds))
(begin
(debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access))
(loop 0))
(begin
(debug:print-info 0 "Starting to shutdown the server.")
;; need to delete only *my* server entry (future use)
(set! *time-to-exit* #t)
(open-run-close tasks:server-deregister-self tasks:open-db (get-host-name))
(thread-sleep! 1)
(debug:print-info 0 "Max cached queries was " *max-cache-size*)
(debug:print-info 0 "Server shutdown complete. Exiting")
(exit)))))))
(define (server:find-free-port-and-open iface s port stype #!key (trynum 50))
;;
(define (server:try-start-server ipaddrstr portnum)
(let ((s (if s s (make-socket stype)))
(p (if (number? port) port 5555))
(old-handler (current-exception-handler)))
(handle-exceptions
exn
(begin
(handle-exceptions
exn
(begin
(debug:print 0 "Failed to bind to port " p ", trying next port")
(debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
;; (old-handler)
;; (print-call-chain)
(if (> trynum 0)
(server:find-free-port-and-open iface s (+ p 1) trynum: (- trynum 1))
(debug:print-info 0 "Tried ports up to " p
(print-error-message exn)
(if (< portnum 9000)
(begin
(print "WARNING: failed to start on portnum: " portnum ", trying next port")
(sleep 1)
(server:try-start-server ipaddrstr (+ portnum 1)))
(print "ERROR: Tried and tried but could not start the server")))
" but all were in use. Please try a different port range by starting the server with parameter \" -port N\" where N is the starting port number to use"))
(exit)) ;; To exit or not? That is the question.
(let ((zmq-url (conc "tcp://" iface ":" p)))
(debug:print 2 "Trying to start server on " zmq-url)
(print "INFO: Trying to start server on portnum: " portnum)
(bind-socket s zmq-url)
(list iface s port)))))
(awful-start hello-world ip-address: ipaddrstr port: portnum)))
(define (server:setup-ports ipaddrstr startport)
(let* ((s1 (server:find-free-port-and-open ipaddrstr #f startport 'pull))
(p1 (caddr s1))
(s2 (server:find-free-port-and-open ipaddrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pub))
(p2 (caddr s2)))
(set! *runremote* #f)
(debug:print 0 "Server started on " ipaddrstr " ports " p1 " and " p2)
(mutex-lock! *heartbeat-mutex*)
(set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) ipaddrstr p1 p2 0 'live))
(mutex-unlock! *heartbeat-mutex*)
(list s1 s2)))
(define (server:mk-signature)
(message-digest-string (md5-primitive)
(with-output-to-string
(lambda ()
(write (list (current-directory)
(argv)))))))
|
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
|
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
|
-
-
-
+
+
+
+
+
-
-
-
-
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
-
+
-
-
-
-
-
-
-
+
+
-
-
+
+
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
|
;;======================================================================
(define (server:get-client-signature)
(if *my-client-signature* *my-client-signature*
(let ((sig (server:mk-signature)))
(set! *my-client-signature* sig)
*my-client-signature*)))
;;
(define (server:client-socket-connect iface port #!key (context #f)(type 'req)(subscriptions '()))
;; <html>
;; <head></head>
;; <body>1 Hello, world! Goodbye Dolly</body></html>
;; Send msg to serverdat and receive result
(define (server:client-send-receive serverdat msg)
(debug:print-info 3 "client-connect " iface ":" port ", type=" type ", subscriptions=" subscriptions)
(let ((connect-ok #f)
(zmq-socket (if context
(make-socket type context)
(let* ((res (with-input-from-request (conc serverdat "/?dat=" msg) #f read-string))
(match (string-search (regexp "<body>(.*)<.body>") (caddr (string-split res "\n")))))
(make-socket type)))
(conurl (server:make-server-url (list iface port))))
(if (socket? zmq-socket)
(begin
;; first apply subscriptions
(for-each (lambda (subscription)
(debug:print 2 "Subscribing to " subscription)
(socket-option-set! zmq-socket 'subscribe subscription))
subscriptions)
(connect-socket zmq-socket conurl)
zmq-socket)
(begin
(debug:print 0 "ERROR: Failed to open socket to " conurl)
#f))))
(define (server:client-login zmq-sockets)
(cdb:login zmq-sockets *toppath* (server:get-client-signature)))
(cadr match)))
(define (server:client-login serverdat)
(cdb:login serverdat *toppath* (server:get-client-signature)))
;; Not currently used! But, I think it *should* be used!!!
(define (server:client-logout zmq-socket)
(let ((ok (and (socket? zmq-socket)
(cdb:logout zmq-socket *toppath* (server:get-client-signature)))))
;; (close-socket zmq-socket)
ok))
(define (server:client-connect iface pullport pubport)
(define (server:client-connect iface port)
(let* ((push-socket (server:client-socket-connect iface pullport type: 'push))
(sub-socket (server:client-socket-connect iface pubport
type: 'sub
subscriptions: (list (server:get-client-signature) "all")))
(zmq-sockets (vector push-socket sub-socket))
(login-res #f))
(set! login-res (server:client-login zmq-sockets))
(let* ((login-res #f))
(set! login-res (server:client-login serverdat))
(if (and (not (null? login-res))
(car login-res))
(begin
(debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".")
(set! *runremote* zmq-sockets)
zmq-sockets)
(set! *runremote* serverdat)
serverdat)
(begin
(debug:print-info 2 "Failed to login or connect to " conurl)
(set! *runremote* #f)
#f))))
;; Do all the connection work, start a server if not already running
(define (server:client-setup #!key (numtries 50))
(if (not *toppath*)
(if (not (setup-for-run))
(begin
(debug:print 0 "ERROR: failed to find megatest.config, exiting")
(exit))))
(let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db)))
(if hostinfo
(let ((host (list-ref hostinfo 0))
(iface (list-ref hostinfo 1))
(iface (list-ref hostinfo 1)))
(pullport (list-ref hostinfo 2))
(pubport (list-ref hostinfo 3)))
(debug:print-info 2 "Setting up to connect to " hostinfo)
;; (handle-exceptions
;; exn
;; (begin
;; ;; something went wrong in connecting to the server. In this scenario it is ok
;; ;; to try again
;; (debug:print 0 "ERROR: Failed to open a connection to the server at: " hostinfo)
;; (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
;; (debug:print 0 " perhaps jobs killed with -9? Removing server records")
;; (open-run-close tasks:server-deregister tasks:open-db host pullport: pullport)
;; (server:client-setup (- numtries 1))
;; #f)
(server:client-connect iface pullport pubport)) ;; )
(server:client-connect iface pullport pubport)) ;; )
(if (> numtries 0)
(let ((exe (car (argv)))
(pid #f))
(debug:print-info 0 "No server available, attempting to start one...")
;; (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*)
;; (string-intersperse *verbosity* ",")
;; (conc *verbosity*)))))
|
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
|
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
|
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
|
(debug:print 0 "ERROR: cannot find megatest.config, exiting")
(exit))))
(debug:print-info 2 "Starting the standalone server")
(let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db)))
(if hostinfo
(debug:print-info 2 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo))
(if *toppath*
(let* (;; (th1 (make-thread (lambda ()
;; (let ((server-info #f))
;; ;; wait for the server to be online and available
;; (let loop ()
;; (debug:print-info 2 "Waiting for the server to come online before starting heartbeat")
;; (thread-sleep! 2)
;; (mutex-lock! *heartbeat-mutex*)
;; (set! server-info *server-info* )
;; (mutex-unlock! *heartbeat-mutex*)
;; (if (not server-info)(loop)))
;; (debug:print 2 "Server alive, starting self-ping")
;; (server:self-ping server-info)
;; ))
;; "Self ping"))
(th2 (make-thread (lambda ()
(let* ((th2 (make-thread (lambda ()
(server:run
(if (args:get-arg "-server")
(args:get-arg "-server")
"-"))) "Server run"))
(th3 (make-thread (lambda ()(server:keep-running)) "Keep running"))
)
(set! *client-non-blocking-mode* #t)
|
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
|
222
223
224
225
226
227
228
|
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
|
(set-signal-handler! signal/int server:client-signal-handler)
(if (server:client-setup)
(debug:print-info 2 "connected as client")
(begin
(debug:print 0 "ERROR: Failed to connect as client")
(exit))))
;;======================================================================
;; Defunct functions
;;======================================================================
;; ping a server and return number of clients or #f (if no response)
;; NOT IN USE!
(define (server:ping host port #!key (secs 10)(return-socket #f))
(cdb:use-non-blocking-mode
(lambda ()
(let* ((res #f)
(th1 (make-thread
(lambda ()
(let* ((zmq-context (make-context 1))
(zmq-socket (server:client-connect host port context: zmq-context)))
(if zmq-socket
(if (server:client-login zmq-socket)
(let ((numclients (cdb:num-clients zmq-socket)))
(if (not return-socket)
(begin
(server:client-logout zmq-socket)
(close-socket zmq-socket)))
(set! res (list #t numclients (if return-socket zmq-socket #f))))
(begin
;; (close-socket zmq-socket)
(set! res (list #f "CAN'T LOGIN" #f))))
(set! res (list #f "CAN'T CONNECT" #f)))))
"Ping: th1"))
(th2 (make-thread
(lambda ()
(let loop ((count 1))
(debug:print-info 1 "Ping " count " server on " host " at port " port)
(thread-sleep! 2)
(if (< count (/ secs 2))
(loop (+ count 1))))
;; (thread-terminate! th1)
(set! res (list #f "TIMED OUT" #f)))
"Ping: th2")))
(thread-start! th2)
(thread-start! th1)
(handle-exceptions
exn
(set! res (list #f "TIMED OUT" #f))
(thread-join! th1 secs))
res))))
;; (define (server:self-ping server-info)
;; ;; server-info: server-id interface pullport pubport
;; (let ((iface (list-ref server-info 1))
;; (pullport (list-ref server-info 2))
;; (pubport (list-ref server-info 3)))
;; (server:client-connect iface pullport pubport)
;; (let loop ()
;; (thread-sleep! 2)
;; (cdb:client-call *runremote* 'ping #t)
;; (debug:print 4 "server:self-ping - I'm alive on " iface ":" pullport "/" pubport "!")
;; (mutex-lock! *heartbeat-mutex*)
;; (set! *server-loop-heart-beat* (current-seconds))
;; (mutex-unlock! *heartbeat-mutex*)
;; (loop))))
(define (server:reply pubsock target query-sig success/fail result)
(debug:print-info 11 "server:reply target=" target ", result=" result)
(send-message pubsock target send-more: #t)
(send-message pubsock (db:obj->string (vector success/fail query-sig result))))
|