161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
|
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
|
-
+
+
-
+
-
-
+
+
-
+
-
-
-
+
+
+
+
|
(mbox (cdr cmbox))
(mbox-time (current-milliseconds)))
(if (eq? (send uconn host-port qrykey cmd data) 'ack)
(let* ((mbox-timeout-secs 120) ;; timeout)
(mbox-timeout-result 'MBOX_TIMEOUT)
(res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result))
(mbox-receive-time (current-milliseconds)))
(print "In send-receive, got "res" back from mailbox")
(if (eq? res 'MBOX_TIMEOUT)
#f ;; convert to raising exception?
res))
(begin
(print "ERROR: Communication failed?")
#f))) ;; #f means failed to communicate
#f)))) ;; #f means failed to communicate
;;======================================================================
;; responder side
;;======================================================================
;; take a request, rdata, and if not immediate put it in the work queue
;;
;; Reserved cmds; ack ping goodbye response
;;
(define (ulex-handler uconn rdata)
(print "ulex-handler received data: "rdata)
(match rdata ;; (string-split controldat)
((rem-host-port qrykey cmd params) ;; cmdkey host-port pid qrykey params ...)
((rem-host-port qrykey cmd params)
;; (print "ulex-handler got: "rem-host-port" qrykey: "qrykey" cmd: "cmd" params: "params)
(let ((mbox (hash-table-ref/default (udat-mboxes uconn) qrykey #f)))
(case cmd
;; ((ack )(print "Got ack! But why? Should NOT get here.") 'ack)
((ping)
(print "Got Ping!")
;; (print "Got Ping!")
(add-to-work-queue uconn rdata)
'ack)
((goodbye)
;; just clear out references to the caller
(add-to-work-queue uconn rdata)
'ack)
((response) ;; this is a result from remote processing, send it as mail ...
(if mbox
(begin
(mailbox-send! mbox params) ;; params here is our result
'ack)
(begin
(print "ERROR: received result but no associated mbox for cookie "qrykey)
#f)))
((else
(add-to-work-queue uconn rdata)
'ack)))))
(else
;; (print "Got generic request: "cmd)
(add-to-work-queue uconn rdata)
'ack))))
(else
(print "BAD DATA? controldat=" rdata)
'ack) ;; send ack anyway?
))
;; given an already set up uconn start the cmd-loop
;;
|
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
|
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
|
-
+
+
-
-
-
+
+
+
+
+
+
+
+
+
+
+
-
-
+
+
+
+
-
|
;; (map ip->string (vector->list
;; (hostinfo-addresses
;; (host-information (current-hostname))))))
)
(import ulex trace big-chicken srfi-18 test)
(import ulex trace big-chicken srfi-18 test matchable)
(trace-call-sites #t)
(trace
;; ulex-handler
;; send
;; add-to-work-queue
)
(define (handler-proc . data)
(print "handler-proc, got: "data)
`(data ,data))
(define (handler-proc rem-host-port qrykey cmd params)
(print "handler-proc "rem-host-port" "qrykey" "cmd" "params)
(case cmd
((ping) 'pong)
((calc) (eval (with-input-from-string params read)))
((print)
(print "params="params)
params)
((reflect) `(,rem-host-port ,qrykey ,cmd ,params))
(else `(data ,data))))
(define uconn (run-listener handler-proc))
(pp-uconn uconn)
;; super basic loop back test
(define res #f)
(define th1 (make-thread (lambda ()
(test #f 10 (send-receive uconn "zeus:4242" 'calc "(+ 5 5)"))
(test #f 'ack (send-receive uconn "zeus:4242" 'ping '())))
(set! res (send-receive uconn "zeus:4242" 'ping '()))))
(set! res (send-receive uconn "zeus:4242" 'ping '()))
(test #f 'pong (send-receive uconn "zeus:4242" 'ping '()))
)))
(thread-start! th1)
(thread-join! th1)
(thread-sleep! 1)
(print "All done")
(print "Received "res)
|