158
159
160
161
162
163
164
165
166
167
168
169
170
171
|
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
|
+
|
(sleep 4)
(loop))))))
(iface (cadr server-info))
(pullport (caddr server-info))
(pubport (cadddr server-info)) ;; id interface pullport pubport)
(zmq-sockets (zmq-transport:client-connect iface pullport pubport))
(last-access 0))
(debug:print-info 11 "heartbeat started for zmq server on " iface " " pullport " " pubport)
(let loop ((count 0))
(thread-sleep! 4) ;; no need to do this very often
;; NB// sync currently does NOT return queue-length
(let ((queue-len (cdb:client-call zmq-sockets 'sync #t 1)))
;; (print "Server running, count is " count)
(if (< count 1) ;; 3x3 = 9 secs aprox
(loop (+ count 1)))
|
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
|
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
|
-
+
+
+
+
+
+
+
+
|
(let* ((s1 (zmq-transport:find-free-port-and-open ipaddrstr #f startport 'pull))
(p1 (caddr s1))
(s2 (zmq-transport:find-free-port-and-open ipaddrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pub))
(p2 (caddr s2)))
(set! *runremote* #f)
(debug:print 0 "Server started on " ipaddrstr " ports " p1 " and " p2)
(mutex-lock! *heartbeat-mutex*)
(set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) ipaddrstr p1 p2 0 'live 'zmq))
(set! *server-info* (open-run-close tasks:server-register
tasks:open-db
(current-process-id)
ipaddrstr p1
0
'live
'zmq
pubport: p2))
(mutex-unlock! *heartbeat-mutex*)
(list s1 s2)))
(define (zmq-transport:mk-signature)
(message-digest-string (md5-primitive)
(with-output-to-string
(lambda ()
|
269
270
271
272
273
274
275
276
277
278
279
280
281
282
|
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
|
+
|
(define (zmq-transport:client-connect iface pullport pubport)
(let* ((push-socket (zmq-transport:client-socket-connect iface pullport type: 'push))
(sub-socket (zmq-transport:client-socket-connect iface pubport
type: 'sub
subscriptions: (list (zmq-transport:get-client-signature) "all")))
(zmq-sockets (vector push-socket sub-socket))
(login-res #f))
(debug:print-info 11 "zmq-transport:client-connect started. Next is login")
(set! login-res (zmq-transport:client-login zmq-sockets))
(if (and (not (null? login-res))
(car login-res))
(begin
(debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".")
(set! *runremote* zmq-sockets)
zmq-sockets)
|