116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
|
(set! *cache-on* #t)
;; what to do when we quit
;;
(on-exit (lambda ()
(if (and *toppath* *server-info*)
(begin
(open-run-close tasks:server-deregister-self tasks:open-db ipaddrstr p1 p2))
(let loop ()
(let ((queue-len 0))
(thread-sleep! (random 5))
(mutex-lock! *incoming-mutex*)
(set! queue-len (length *incoming-data*))
(mutex-unlock! *incoming-mutex*)
(if (> queue-len 0)
|
<
|
|
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
(set! *cache-on* #t)
;; what to do when we quit
;;
(on-exit (lambda ()
(if (and *toppath* *server-info*)
(open-run-close tasks:server-deregister-self tasks:open-db (car *server-info*))
(let loop ()
(let ((queue-len 0))
(thread-sleep! (random 5))
(mutex-lock! *incoming-mutex*)
(set! queue-len (length *incoming-data*))
(mutex-unlock! *incoming-mutex*)
(if (> queue-len 0)
|
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
|
(debug:print-info 12 "server=> received packet=" packet)
(if #t ;; (cdb:packet-get-immediate packet) ;; process immediately or put in queue
(begin
(db:process-queue pub-socket (cons packet queue-lst))
(loop '()))
(loop (cons packet queue-lst)))))))
(define (server:reply pubsock target result)
(debug:print-info 11 "server:reply target=" target ", result=" result)
(send-message pubsock target send-more: #t)
(send-message pubsock (db:obj->string result)))
;; run server:keep-running in a parallel thread to monitor that the db is being
;; used and to shutdown after sometime if it is not.
;;
(define (server:keep-running)
;; if none running or if > 20 seconds since
;; server last used then start shutdown
;; This thread waits for the server to come alive
(let* ((server-info (let loop ()
(let ((sdat #f))
(mutex-lock! *heartbeat-mutex*)
(set! sdat *server-info*)
(mutex-unlock! *heartbeat-mutex*)
(if sdat sdat
(begin
(sleep 4)
(loop))))))
(iface (cadr server-info))
(pullport (caddr server-info))
(pubport (cadddr server-info)) ;; id interface pullport pubport)
;; (zmq-sockets (server:client-connect iface pullport pubport))
)
(let loop ((count 0))
(thread-sleep! 4) ;; no need to do this very often
;; (let ((queue-len (string->number (cdb:client-call zmq-sockets 'sync #t 1))))
;; (print "Server running, count is " count)
(if (< count 1) ;; 3x3 = 9 secs aprox
(loop (+ count 1)))
;; NOTE: Get rid of this mechanism! It really is not needed...
(open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info))
;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access
(if (> (+ *last-db-access*
;; (* 48 60 60) ;; 48 hrs
;; 60 ;; one minute
(* 60 60) ;; one hour
)
(current-seconds))
(begin
(debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) *last-db-access*))
(loop 0))
(begin
(debug:print-info 0 "Starting to shutdown the server.")
;; need to delete only *my* server entry (future use)
(set! *time-to-exit* #t)
(open-run-close tasks:server-deregister-self tasks:open-db (get-host-name))
(thread-sleep! 1)
(debug:print-info 0 "Max cached queries was " *max-cache-size*)
(debug:print-info 0 "Server shutdown complete. Exiting")
(exit))))))
(define (server:find-free-port-and-open iface s port stype #!key (trynum 50))
(let ((s (if s s (make-socket stype)))
(p (if (number? port) port 5555))
(old-handler (current-exception-handler)))
(handle-exceptions
exn
|
|
|
|
>
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
|
(debug:print-info 12 "server=> received packet=" packet)
(if #t ;; (cdb:packet-get-immediate packet) ;; process immediately or put in queue
(begin
(db:process-queue pub-socket (cons packet queue-lst))
(loop '()))
(loop (cons packet queue-lst)))))))
(define (server:reply pubsock target query-sig success/fail result)
(debug:print-info 11 "server:reply target=" target ", result=" result)
(send-message pubsock target send-more: #t)
(send-message pubsock (db:obj->string (vector success/fail query-sig result))))
;; run server:keep-running in a parallel thread to monitor that the db is being
;; used and to shutdown after sometime if it is not.
;;
(define (server:keep-running)
;; if none running or if > 20 seconds since
;; server last used then start shutdown
;; This thread waits for the server to come alive
(let* ((server-info (let loop ()
(let ((sdat #f))
(mutex-lock! *heartbeat-mutex*)
(set! sdat *server-info*)
(mutex-unlock! *heartbeat-mutex*)
(if sdat sdat
(begin
(sleep 4)
(loop))))))
(iface (cadr server-info))
(pullport (caddr server-info))
(pubport (cadddr server-info)) ;; id interface pullport pubport)
(zmq-sockets (server:client-connect iface pullport pubport))
)
(let loop ((count 0))
(thread-sleep! 4) ;; no need to do this very often
;; NB// sync currently does NOT return queue-length
(let ((queue-len (cdb:client-call zmq-sockets 'sync #t 1)))
;; (print "Server running, count is " count)
(if (< count 1) ;; 3x3 = 9 secs aprox
(loop (+ count 1)))
;; NOTE: Get rid of this mechanism! It really is not needed...
(open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info))
;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access
(if (> (+ *last-db-access*
;; (* 48 60 60) ;; 48 hrs
;; 60 ;; one minute
(* 60 60) ;; one hour
)
(current-seconds))
(begin
(debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) *last-db-access*))
(loop 0))
(begin
(debug:print-info 0 "Starting to shutdown the server.")
;; need to delete only *my* server entry (future use)
(set! *time-to-exit* #t)
(open-run-close tasks:server-deregister-self tasks:open-db (get-host-name))
(thread-sleep! 1)
(debug:print-info 0 "Max cached queries was " *max-cache-size*)
(debug:print-info 0 "Server shutdown complete. Exiting")
(exit)))))))
(define (server:find-free-port-and-open iface s port stype #!key (trynum 50))
(let ((s (if s s (make-socket stype)))
(p (if (number? port) port 5555))
(old-handler (current-exception-handler)))
(handle-exceptions
exn
|