114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
|
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
|
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
|
(set! pub-socket (cadr zmq-sdat2))
(set! p2 (caddr zmq-sdat2))
(set! *cache-on* #t)
;; what to do when we quit
;;
(on-exit (lambda ()
(if (and *toppath* *server-info*)
(open-run-close tasks:server-deregister-self tasks:open-db (car *server-info*))
(let loop ()
(let ((queue-len 0))
(thread-sleep! (random 5))
(mutex-lock! *incoming-mutex*)
(set! queue-len (length *incoming-data*))
(mutex-unlock! *incoming-mutex*)
(if (> queue-len 0)
(begin
(debug:print-info 0 "Queue not flushed, waiting ...")
(loop))))))))
;; (on-exit (lambda ()
;; (if (and *toppath* *server-info*)
;; (open-run-close tasks:server-deregister-self tasks:open-db (car *server-info*))
;; (let loop ()
;; (let ((queue-len 0))
;; (thread-sleep! (random 5))
;; (mutex-lock! *incoming-mutex*)
;; (set! queue-len (length *incoming-data*))
;; (mutex-unlock! *incoming-mutex*)
;; (if (> queue-len 0)
;; (begin
;; (debug:print-info 0 "Queue not flushed, waiting ...")
;; (loop))))))))
;; The heavy lifting
;;
;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime
;;
(let loop ((queue-lst '()))
(let* ((rawmsg (receive-message* pull-socket))
(packet (db:string->obj rawmsg)))
(debug:print-info 12 "server=> received packet=" packet)
(if #t ;; (cdb:packet-get-immediate packet) ;; process immediately or put in queue
(begin
(db:process-queue pub-socket (cons packet queue-lst))
(open-run-close db:process-queue #f pub-socket (cons packet queue-lst))
(loop '()))
(loop (cons packet queue-lst)))))))
(define (server:reply pubsock target query-sig success/fail result)
(debug:print-info 11 "server:reply target=" target ", result=" result)
(send-message pubsock target send-more: #t)
(send-message pubsock (db:obj->string (vector success/fail query-sig result))))
|
238
239
240
241
242
243
244
245
246
247
248
249
250
251
|
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
|
+
+
+
+
+
|
(define (server:mk-signature)
(message-digest-string (md5-primitive)
(with-output-to-string
(lambda ()
(write (list (current-directory)
(argv)))))))
;;======================================================================
;; S E R V E R U T I L I T I E S
;;======================================================================
;;======================================================================
;; C L I E N T S
;;======================================================================
(define (server:get-client-signature)
(if *my-client-signature* *my-client-signature*
|
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
|
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
|
-
+
-
+
|
;; (if (not server-info)(loop)))
;; (debug:print 1 "Server alive, starting self-ping")
;; (server:self-ping server-info)
;; ))
;; "Self ping"))
(th2 (make-thread (lambda ()
(server:run (args:get-arg "-server"))) "Server run"))
(th3 (make-thread (lambda ()(server:keep-running)) "Keep running"))
;; (th3 (make-thread (lambda ()(server:keep-running)) "Keep running"))
)
(set! *client-non-blocking-mode* #t)
;; (thread-start! th1)
(thread-start! th2)
(thread-start! th3)
;; (thread-start! th3)
(set! *didsomething* #t)
;; (thread-join! th3)
(thread-join! th2)
)
(debug:print 0 "ERROR: Failed to setup for megatest")))
(exit)))
|