62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
(ipaddr (alist-ref 'ipaddr captn))
(pid (alist-ref 'pid captn))
(Z (alist-ref 'Z captn)))
(udat-captain-address-set! udata ipaddr)
(udat-captain-host-set! udata host)
(udat-captain-port-set! udata port)
(udat-captain-pid-set! udata pid)
(if (ping udata ipaddr port)
udata
(begin
(remove-captain-pkt udata captn)
(setup))))
(setup-as-captain udata)) ;; this saves the thread to captain-thread and starts the thread
))
|
|
|
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
(ipaddr (alist-ref 'ipaddr captn))
(pid (alist-ref 'pid captn))
(Z (alist-ref 'Z captn)))
(udat-captain-address-set! udata ipaddr)
(udat-captain-host-set! udata host)
(udat-captain-port-set! udata port)
(udat-captain-pid-set! udata pid)
(if (ping udata (conc ipaddr ":" port))
udata
(begin
(remove-captain-pkt udata captn)
(setup))))
(setup-as-captain udata)) ;; this saves the thread to captain-thread and starts the thread
))
|
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
|
(udat-handler-thread-set! udata th)
(thread-start! th))
#f)
#f))
(define (get-peer-dat udata host-port #!optional (hostname #f)(pid #f))
(let* ((pdat (or (hash-table-ref/default (udat-outgoing-conns udata) host-port #f)
(let ((npdat (make-peer addr-port: host-port)))
(if hostname (peer-hostname-set! npdat hostname))
(if pid (peer-pid-set! npdat pid))
(let-values (((ninp noup)(tcp-connect host-port)))
(peer-inp-set! npdat ninp)
(peer-oup-set! npdat noup))
(hash-table-set! (udat-outgoing-conns udata) host-port npdat)
npdat))))
pdat))
(define (get-peer-ports udata host-port #!optional (hostname #f)(pid #f))
(let ((pdat (get-peer-dat udata host-port hostname pid)))
(values (peer-inp pdat)(peer-oup pdat))))
;; send structured data to recipient
;;
;; NOTE: qrykey is what was called the "cookie" previously
;;
(define (send udata host-port handler qrykey data #!key (hostname #f)(pid #f)(params '()))
(let-values (((inp oup)(get-peer-ports udata host-port hostname pid)))
;; CONTROL LINE: (note: removed the hostname - I don't think it adds much value
;;
;; handlerkey host:port pid qrykey params ...
;;
(write-line (conc
handler " "
(udat-my-address udata) ":" (udat-my-port udata) " "
;; (udat-my-hostname udata) " "
(udat-my-pid udata) " "
qrykey
(if (null? params) "" (conc " " (string-intersperse params " "))))
oup)
(write-line data oup)
;; NOTE: DO NOT BE TEMPTED TO LOOK AT ANY DATA ON INP HERE!
;; (there is a listener for handling that)
))
;; send a request to the given host-port and register a mailbox in udata
;; wait for the mailbox data and return it
;;
(define (send-receive udata host-port handler qrykey data #!key (hostname #f)(pid #f)(params '()))
(let ((mbox (make-mailbox))
(mbox-time (current-milliseconds))
(mboxes (udat-mboxes udata)))
(hash-table-set! mboxes qrykey mbox)
(send udata host-port handler qrykey data hostname: hostname pid: pid params: params)
(let* ((mbox-timeout-secs 20)
(mbox-timeout-result 'MBOX_TIMEOUT)
(res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result))
(mbox-receive-time (current-milliseconds)))
(hash-table-delete! mboxes qrykey)
res)))
(define (add-to-work-queue udata peer-dat handlerkey qrykey data)
(let ((wdat (make-work peer-dat: peer-dat handlerkey: handlerkey qrykey: qrykey data: data)))
(if (udat-busy udata)
(queue-add! (udat-work-queue udata) wdat)
(process-work udata wdat)) ;; passing in wdat tells process-work to first process the passed in wdat
))
|
>
>
>
|
|
|
|
|
|
|
|
>
|
>
>
>
|
|
|
|
|
|
|
|
|
>
|
|
<
>
>
|
|
|
|
|
|
|
>
|
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
|
(udat-handler-thread-set! udata th)
(thread-start! th))
#f)
#f))
(define (get-peer-dat udata host-port #!optional (hostname #f)(pid #f))
(let* ((pdat (or (hash-table-ref/default (udat-outgoing-conns udata) host-port #f)
(handle-exceptions ;; ERROR - MAKE THIS EXCEPTION HANDLER MORE SPECIFIC
exn
#f
(let ((npdat (make-peer addr-port: host-port)))
(if hostname (peer-hostname-set! npdat hostname))
(if pid (peer-pid-set! npdat pid))
(let-values (((ninp noup)(tcp-connect host-port)))
(peer-inp-set! npdat ninp)
(peer-oup-set! npdat noup))
(hash-table-set! (udat-outgoing-conns udata) host-port npdat)
npdat)))))
pdat))
(define (get-peer-ports udata host-port #!optional (hostname #f)(pid #f))
(let ((pdat (get-peer-dat udata host-port hostname pid)))
(if pdat
(values (peer-inp pdat)(peer-oup pdat))
(values #f #f))))
;; send structured data to recipient
;;
;; NOTE: qrykey is what was called the "cookie" previously
;;
(define (send udata host-port handler qrykey data #!key (hostname #f)(pid #f)(params '()))
(let-values (((inp oup)(get-peer-ports udata host-port hostname pid)))
;; CONTROL LINE: (note: removed the hostname - I don't think it adds much value
;;
;; handlerkey host:port pid qrykey params ...
;;
(if (and inp oup)
(begin
(write-line (conc
handler " "
(udat-my-address udata) ":" (udat-my-port udata) " "
;; (udat-my-hostname udata) " "
(udat-my-pid udata) " "
qrykey
(if (null? params) "" (conc " " (string-intersperse params " "))))
oup)
(write-line data oup)
#t
;; NOTE: DO NOT BE TEMPTED TO LOOK AT ANY DATA ON INP HERE!
;; (there is a listener for handling that)
)
#f))) ;; #f means failed to connect and send
;; send a request to the given host-port and register a mailbox in udata
;; wait for the mailbox data and return it
;;
(define (send-receive udata host-port handler qrykey data #!key (hostname #f)(pid #f)(params '()))
(let ((mbox (make-mailbox))
(mbox-time (current-milliseconds))
(mboxes (udat-mboxes udata)))
(hash-table-set! mboxes qrykey mbox)
(if (send udata host-port handler qrykey data hostname: hostname pid: pid params: params)
(let* ((mbox-timeout-secs 20)
(mbox-timeout-result 'MBOX_TIMEOUT)
(res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result))
(mbox-receive-time (current-milliseconds)))
(hash-table-delete! mboxes qrykey)
res)
#f))) ;; #f means failed to communicate
(define (add-to-work-queue udata peer-dat handlerkey qrykey data)
(let ((wdat (make-work peer-dat: peer-dat handlerkey: handlerkey qrykey: qrykey data: data)))
(if (udat-busy udata)
(queue-add! (udat-work-queue udata) wdat)
(process-work udata wdat)) ;; passing in wdat tells process-work to first process the passed in wdat
))
|
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
|
;; handlerkey resp-addr:resp-port hostname pid qrykey [dbpath/dbfile.db]
;; data
(let loop ((state 'start))
(let* ((controldat (read-line inp))
(data (read-line inp)))
(match (string-split controldat)
((handlerkey host:port pid qrykey params ...)
(case (string->symbol handlerkey)
((ack)(print "Got ack!"))
((ping)
(let* ((proc (hash-table-ref/default (udat-handlers udata) 'ping #f))
(val (if proc (proc) "gotping")))
(send udata host:port "version" qrykey val)))
((rucaptain)
(send udata host:port "iamcaptain" qrykey (if (udat-my-cpkt-key udata)
"yes"
"no")))
(else
;; (send-ack udata host:port qrykey)
(add-to-work-queue udata (get-peer-dat udata host:port) handlerkey qrykey data)))
(else (print "BAD DATA? handler=" handlerkey " data=" data)))))
(loop state)))))
;; add a proc to the handler list
|
>
|
|
|
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
|
;; handlerkey resp-addr:resp-port hostname pid qrykey [dbpath/dbfile.db]
;; data
(let loop ((state 'start))
(let* ((controldat (read-line inp))
(data (read-line inp)))
(match (string-split controldat)
((handlerkey host:port pid qrykey params ...)
(print "handlerkey: " handlerkey " host:port: " host:port " pid: " pid " qrykey: " qrykey " params: " params)
(case (string->symbol handlerkey)
((ack)(print "Got ack!"))
((ping)
(let* ((proc (hash-table-ref/default (udat-handlers udata) 'ping #f))
(val (if proc (proc) "gotping")))
(send udata host:port "version" qrykey val)))
((rucaptain)
(send udata host:port "iamcaptain" qrykey (if (udat-my-cpkt-key udata)
"yes"
"no")))
(else
;; (send-ack udata host:port qrykey)
(add-to-work-queue udata (get-peer-dat udata host:port) handlerkey qrykey data)))
(else (print "BAD DATA? handler=" handlerkey " data=" data)))))
(loop state)))))
;; add a proc to the handler list
|