1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
;; Copyright 2006-2012, Matthew Welland.
;;
;; This program is made available under the GNU GPL version 2.0 or
;; greater. See the accompanying file COPYING for details.
;;
;; This program is distributed WITHOUT ANY WARRANTY; without even the
;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
;; PURPOSE.
(require-extension (srfi 18) extras tcp s11n)
;; (import (prefix rpc rpc:))
(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest)
(import (prefix sqlite3 sqlite3:))
(use zmq)
(declare (unit server))
|
<
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
;; Copyright 2006-2012, Matthew Welland.
;;
;; This program is made available under the GNU GPL version 2.0 or
;; greater. See the accompanying file COPYING for details.
;;
;; This program is distributed WITHOUT ANY WARRANTY; without even the
;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
;; PURPOSE.
(require-extension (srfi 18) extras tcp s11n)
(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest)
(import (prefix sqlite3 sqlite3:))
(use zmq)
(declare (unit server))
|
︙ | | | ︙ | |
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
(if (not hostport)
#f
(conc "tcp://" (car hostport) ":" (cadr hostport))))
(define *server-loop-heart-beat* (current-seconds))
(define *heartbeat-mutex* (make-mutex))
;; (define (server:self-ping server-info)
;; ;; server-info: server-id interface pullport pubport
;; (let ((iface (list-ref server-info 1))
;; (pullport (list-ref server-info 2))
;; (pubport (list-ref server-info 3)))
;; (server:client-connect iface pullport pubport)
;; (let loop ()
;; (thread-sleep! 2)
;; (cdb:client-call *runremote* 'ping #t)
;; (debug:print 4 "server:self-ping - I'm alive on " iface ":" pullport "/" pubport "!")
;; (mutex-lock! *heartbeat-mutex*)
;; (set! *server-loop-heart-beat* (current-seconds))
;; (mutex-unlock! *heartbeat-mutex*)
;; (loop))))
(define-inline (zmqsock:get-pub dat)(vector-ref dat 0))
(define-inline (zmqsock:get-pull dat)(vector-ref dat 1))
(define-inline (zmqsock:set-pub! dat s)(vector-set! dat s 0))
(define-inline (zmqsock:set-pull! dat s)(vector-set! dat s 0))
(define (server:run hostn)
(debug:print 2 "Attempting to start the server ...")
|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
|
|
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
(if (not hostport)
#f
(conc "tcp://" (car hostport) ":" (cadr hostport))))
(define *server-loop-heart-beat* (current-seconds))
(define *heartbeat-mutex* (make-mutex))
(define-inline (zmqsock:get-pub dat)(vector-ref dat 0))
(define-inline (zmqsock:get-pull dat)(vector-ref dat 1))
(define-inline (zmqsock:set-pub! dat s)(vector-set! dat s 0))
(define-inline (zmqsock:set-pull! dat s)(vector-set! dat s 0))
(define (server:run hostn)
(debug:print 2 "Attempting to start the server ...")
|
︙ | | | ︙ | |
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
|
(mutex-unlock! *heartbeat-mutex*)))
(if #t ;; (cdb:packet-get-immediate packet) ;; process immediately or put in queue
(begin
(open-run-close db:process-queue #f pub-socket (cons packet queue-lst))
(loop '()))
(loop (cons packet queue-lst)))))))
(define (server:reply pubsock target query-sig success/fail result)
(debug:print-info 11 "server:reply target=" target ", result=" result)
(send-message pubsock target send-more: #t)
(send-message pubsock (db:obj->string (vector success/fail query-sig result))))
;; run server:keep-running in a parallel thread to monitor that the db is being
;; used and to shutdown after sometime if it is not.
;;
(define (server:keep-running)
;; if none running or if > 20 seconds since
;; server last used then start shutdown
;; This thread waits for the server to come alive
|
<
<
<
<
<
|
133
134
135
136
137
138
139
140
141
142
143
144
145
146
|
(mutex-unlock! *heartbeat-mutex*)))
(if #t ;; (cdb:packet-get-immediate packet) ;; process immediately or put in queue
(begin
(open-run-close db:process-queue #f pub-socket (cons packet queue-lst))
(loop '()))
(loop (cons packet queue-lst)))))))
;; run server:keep-running in a parallel thread to monitor that the db is being
;; used and to shutdown after sometime if it is not.
;;
(define (server:keep-running)
;; if none running or if > 20 seconds since
;; server last used then start shutdown
;; This thread waits for the server to come alive
|
︙ | | | ︙ | |
288
289
290
291
292
293
294
295
296
297
298
299
300
301
|
(begin
(debug:print 0 "ERROR: Failed to open socket to " conurl)
#f))))
(define (server:client-login zmq-sockets)
(cdb:login zmq-sockets *toppath* (server:get-client-signature)))
(define (server:client-logout zmq-socket)
(let ((ok (and (socket? zmq-socket)
(cdb:logout zmq-socket *toppath* (server:get-client-signature)))))
;; (close-socket zmq-socket)
ok))
(define (server:client-connect iface pullport pubport)
|
>
|
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
|
(begin
(debug:print 0 "ERROR: Failed to open socket to " conurl)
#f))))
(define (server:client-login zmq-sockets)
(cdb:login zmq-sockets *toppath* (server:get-client-signature)))
;; Not currently used! But, I think it *should* be used!!!
(define (server:client-logout zmq-socket)
(let ((ok (and (socket? zmq-socket)
(cdb:logout zmq-socket *toppath* (server:get-client-signature)))))
;; (close-socket zmq-socket)
ok))
(define (server:client-connect iface pullport pubport)
|
︙ | | | ︙ | |
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
|
;; (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
;; (debug:print 0 " perhaps jobs killed with -9? Removing server records")
;; (open-run-close tasks:server-deregister tasks:open-db host pullport: pullport)
;; (server:client-setup (- numtries 1))
;; #f)
(server:client-connect iface pullport pubport)) ;; )
(if (> numtries 0)
(let ((exe (car (argv))))
(debug:print-info 2 "No server available, attempting to start one...")
(process-run exe (list "-server" "-" "-debug" (conc *verbosity*)))
;; (process-fork (lambda ()
;; (server:launch)
;; (exit) ;; should never get here ....
;; ))
(sleep 5) ;; give server time to start
;; we are starting a server, do not try again! That can lead to
;; recursively starting many processes!!!
(server:client-setup numtries: 0))
(debug:print-info 1 "Too many attempts, giving up")))))
;; all routes though here end in exit ...
(define (server:launch)
|
|
>
|
|
>
>
|
<
|
<
>
>
>
>
>
|
>
>
|
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
|
;; (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
;; (debug:print 0 " perhaps jobs killed with -9? Removing server records")
;; (open-run-close tasks:server-deregister tasks:open-db host pullport: pullport)
;; (server:client-setup (- numtries 1))
;; #f)
(server:client-connect iface pullport pubport)) ;; )
(if (> numtries 0)
(let ((exe (car (argv)))
(pid #f))
(debug:print-info 0 "No server available, attempting to start one...")
;; (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*)
;; (string-intersperse *verbosity* ",")
;; (conc *verbosity*)))))
(set! pid (process-fork (lambda ()
(server:launch)))) ;; should never get here ....
(let loop ((count 0))
(let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db)))
(if (not hostinfo)
(begin
(debug:print-info 0 "Waiting for server pid=" pid " to start")
(sleep 2) ;; give server time to start
(if (< count 5)
(loop (+ count 1)))))))
;; we are starting a server, do not try again! That can lead to
;; recursively starting many processes!!!
(server:client-setup numtries: 0))
(debug:print-info 1 "Too many attempts, giving up")))))
;; all routes though here end in exit ...
(define (server:launch)
|
︙ | | | ︙ | |
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
|
(if hostinfo
(debug:print-info 2 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo))
(if *toppath*
(let* (;; (th1 (make-thread (lambda ()
;; (let ((server-info #f))
;; ;; wait for the server to be online and available
;; (let loop ()
;; (debug:print-info 2 "Waiting for the server to come online before starting heartbeat")
;; (thread-sleep! 2)
;; (mutex-lock! *heartbeat-mutex*)
;; (set! server-info *server-info* )
;; (mutex-unlock! *heartbeat-mutex*)
;; (if (not server-info)(loop)))
;; (debug:print 2 "Server alive, starting self-ping")
;; (server:self-ping server-info)
;; ))
;; "Self ping"))
(th2 (make-thread (lambda ()
(server:run (args:get-arg "-server"))) "Server run"))
(th3 (make-thread (lambda ()(server:keep-running)) "Keep running"))
)
(set! *client-non-blocking-mode* #t)
;; (thread-start! th1)
(thread-start! th2)
(thread-start! th3)
(set! *didsomething* #t)
|
|
|
>
|
>
>
|
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
|
(if hostinfo
(debug:print-info 2 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo))
(if *toppath*
(let* (;; (th1 (make-thread (lambda ()
;; (let ((server-info #f))
;; ;; wait for the server to be online and available
;; (let loop ()
;; (debug:print-info 2 "Waiting for the server to come online before starting heartbeat")
;; (thread-sleep! 2)
;; (mutex-lock! *heartbeat-mutex*)
;; (set! server-info *server-info* )
;; (mutex-unlock! *heartbeat-mutex*)
;; (if (not server-info)(loop)))
;; (debug:print 2 "Server alive, starting self-ping")
;; (server:self-ping server-info)
;; ))
;; "Self ping"))
(th2 (make-thread (lambda ()
(server:run
(if (args:get-arg "-server")
(args:get-arg "-server")
"-"))) "Server run"))
(th3 (make-thread (lambda ()(server:keep-running)) "Keep running"))
)
(set! *client-non-blocking-mode* #t)
;; (thread-start! th1)
(thread-start! th2)
(thread-start! th3)
(set! *didsomething* #t)
|
︙ | | | ︙ | |
419
420
421
422
423
424
425
426
427
428
429
430
431
432
|
(define (server:client-launch)
(set-signal-handler! signal/int server:client-signal-handler)
(if (server:client-setup)
(debug:print-info 2 "connected as client")
(begin
(debug:print 0 "ERROR: Failed to connect as client")
(exit))))
;; ping a server and return number of clients or #f (if no response)
;; NOT IN USE!
(define (server:ping host port #!key (secs 10)(return-socket #f))
(cdb:use-non-blocking-mode
(lambda ()
(let* ((res #f)
|
>
>
>
>
|
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
|
(define (server:client-launch)
(set-signal-handler! signal/int server:client-signal-handler)
(if (server:client-setup)
(debug:print-info 2 "connected as client")
(begin
(debug:print 0 "ERROR: Failed to connect as client")
(exit))))
;;======================================================================
;; Defunct functions
;;======================================================================
;; ping a server and return number of clients or #f (if no response)
;; NOT IN USE!
(define (server:ping host port #!key (secs 10)(return-socket #f))
(cdb:use-non-blocking-mode
(lambda ()
(let* ((res #f)
|
︙ | | | ︙ | |
460
461
462
463
464
465
466
|
(thread-start! th2)
(thread-start! th1)
(handle-exceptions
exn
(set! res (list #f "TIMED OUT" #f))
(thread-join! th1 secs))
res))))
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
|
(thread-start! th2)
(thread-start! th1)
(handle-exceptions
exn
(set! res (list #f "TIMED OUT" #f))
(thread-join! th1 secs))
res))))
;; (define (server:self-ping server-info)
;; ;; server-info: server-id interface pullport pubport
;; (let ((iface (list-ref server-info 1))
;; (pullport (list-ref server-info 2))
;; (pubport (list-ref server-info 3)))
;; (server:client-connect iface pullport pubport)
;; (let loop ()
;; (thread-sleep! 2)
;; (cdb:client-call *runremote* 'ping #t)
;; (debug:print 4 "server:self-ping - I'm alive on " iface ":" pullport "/" pubport "!")
;; (mutex-lock! *heartbeat-mutex*)
;; (set! *server-loop-heart-beat* (current-seconds))
;; (mutex-unlock! *heartbeat-mutex*)
;; (loop))))
(define (server:reply pubsock target query-sig success/fail result)
(debug:print-info 11 "server:reply target=" target ", result=" result)
(send-message pubsock target send-more: #t)
(send-message pubsock (db:obj->string (vector success/fail query-sig result))))
|