︙ | | | ︙ | |
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
(define (zmq-transport:run hostn)
(debug:print 2 "Attempting to start the server ...")
(if (not *toppath*)
(if (not (setup-for-run))
(begin
(debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting")
(exit))))
(let* ((zmq-sdat1 #f)
(zmq-sdat2 #f)
(pull-socket #f)
(pub-socket #f)
(p1 #f)
(p2 #f)
(zmq-sockets-dat #f)
(iface (if (string=? "-" hostn)
|
>
|
|
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
(define (zmq-transport:run hostn)
(debug:print 2 "Attempting to start the server ...")
(if (not *toppath*)
(if (not (setup-for-run))
(begin
(debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting")
(exit))))
(let* ((db (open-db)) ;; here we *do not* want to be opening and closing the db
(zmq-sdat1 #f)
(zmq-sdat2 #f)
(pull-socket #f)
(pub-socket #f)
(p1 #f)
(p2 #f)
(zmq-sockets-dat #f)
(iface (if (string=? "-" hostn)
|
︙ | | | ︙ | |
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
(set! p1 (caddr zmq-sdat1))
(set! zmq-sdat2 (cadr zmq-sockets-dat))
(set! pub-socket (cadr zmq-sdat2))
(set! p2 (caddr zmq-sdat2))
(set! *cache-on* #t)
;; what to do when we quit
;;
;; (on-exit (lambda ()
;; (if (and *toppath* *server-info*)
;; (open-run-close tasks:server-deregister-self tasks:open-db (car *server-info*))
;; (let loop ()
|
>
>
|
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
(set! p1 (caddr zmq-sdat1))
(set! zmq-sdat2 (cadr zmq-sockets-dat))
(set! pub-socket (cadr zmq-sdat2))
(set! p2 (caddr zmq-sdat2))
(set! *cache-on* #t)
(set! *runremote* (vector pull-socket pub-socket)) ;; overloading the use of *runremote* BUG!?
;; what to do when we quit
;;
;; (on-exit (lambda ()
;; (if (and *toppath* *server-info*)
;; (open-run-close tasks:server-deregister-self tasks:open-db (car *server-info*))
;; (let loop ()
|
︙ | | | ︙ | |
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
(if (not (member qtype '(sync ping)))
(begin
(mutex-lock! *heartbeat-mutex*)
(set! *last-db-access* (current-seconds))
(mutex-unlock! *heartbeat-mutex*)))
(if #t ;; (cdb:packet-get-immediate packet) ;; process immediately or put in queue
(begin
(open-run-close db:process-queue #f pub-socket (cons packet queue-lst))
(loop '()))
(loop (cons packet queue-lst)))))))
;; run zmq-transport:keep-running in a parallel thread to monitor that the db is being
;; used and to shutdown after sometime if it is not.
;;
(define (zmq-transport:keep-running)
|
>
|
>
|
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
|
(if (not (member qtype '(sync ping)))
(begin
(mutex-lock! *heartbeat-mutex*)
(set! *last-db-access* (current-seconds))
(mutex-unlock! *heartbeat-mutex*)))
(if #t ;; (cdb:packet-get-immediate packet) ;; process immediately or put in queue
(begin
(db:process-queue-item db packet)
;; (open-run-close db:process-queue #f pub-socket (cons packet queue-lst))
(loop '()))
(loop (cons packet queue-lst)))))))
;; run zmq-transport:keep-running in a parallel thread to monitor that the db is being
;; used and to shutdown after sometime if it is not.
;;
(define (zmq-transport:keep-running)
|
︙ | | | ︙ | |
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
|
(debug:print 0 "ERROR: Failed to open socket to " conurl)
#f))))
(define (zmq-transport:client-connect iface pullport pubport)
(let* ((push-socket (zmq-transport:client-socket-connect iface pullport type: 'push))
(sub-socket (zmq-transport:client-socket-connect iface pubport
type: 'sub
subscriptions: (list (zmq-transport:get-client-signature) "all")))
(zmq-sockets (vector push-socket sub-socket))
(login-res #f))
(debug:print-info 11 "zmq-transport:client-connect started. Next is login")
(set! login-res (zmq-transport:client-login zmq-sockets))
(if (and (not (null? login-res))
(car login-res))
(begin
(debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".")
(set! *runremote* zmq-sockets)
zmq-sockets)
(begin
|
|
|
|
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
|
(debug:print 0 "ERROR: Failed to open socket to " conurl)
#f))))
(define (zmq-transport:client-connect iface pullport pubport)
(let* ((push-socket (zmq-transport:client-socket-connect iface pullport type: 'push))
(sub-socket (zmq-transport:client-socket-connect iface pubport
type: 'sub
subscriptions: (list (server:get-client-signature) "all")))
(zmq-sockets (vector push-socket sub-socket))
(login-res #f))
(debug:print-info 11 "zmq-transport:client-connect started. Next is login")
(set! login-res (server:client-login zmq-sockets))
(if (and (not (null? login-res))
(car login-res))
(begin
(debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".")
(set! *runremote* zmq-sockets)
zmq-sockets)
(begin
|
︙ | | | ︙ | |
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
|
;; ))
;; "Self ping"))
(th2 (make-thread (lambda ()
(zmq-transport:run
(if (args:get-arg "-server")
(args:get-arg "-server")
"-"))) "Server run"))
(th3 (make-thread (lambda ()(zmq-transport:keep-running)) "Keep running"))
)
(set! *client-non-blocking-mode* #t)
;; (thread-start! th1)
(thread-start! th2)
(thread-start! th3)
(set! *didsomething* #t)
;; (thread-join! th3)
(thread-join! th2)
)
(debug:print 0 "ERROR: Failed to setup for megatest")))
(define (zmq-transport:client-signal-handler signum)
|
|
|
|
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
|
;; ))
;; "Self ping"))
(th2 (make-thread (lambda ()
(zmq-transport:run
(if (args:get-arg "-server")
(args:get-arg "-server")
"-"))) "Server run"))
;; (th3 (make-thread (lambda ()(zmq-transport:keep-running)) "Keep running"))
)
(set! *client-non-blocking-mode* #t)
;; (thread-start! th1)
(thread-start! th2)
;; (thread-start! th3)
(set! *didsomething* #t)
;; (thread-join! th3)
(thread-join! th2)
)
(debug:print 0 "ERROR: Failed to setup for megatest")))
(define (zmq-transport:client-signal-handler signum)
|
︙ | | | ︙ | |