65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
(udat-captain-address-set! udata ipaddr)
(udat-captain-host-set! udata host)
(udat-captain-port-set! udata port)
(udat-captain-pid-set! udata pid)
(if (ping udata (conc ipaddr ":" port))
udata
(begin
(remove-captain-pkt udata captn)
(setup))))
(begin
(setup-as-captain udata) ;; this saves the thread to captain-thread and starts the thread
(setup)))
))
;; connect to a specific dbfile
(define (connect udata dbfname dbtype)
udata)
(define (ping udata host-port)
(let* ((cookie (make-cookie udata))
(res (send-receive udata host-port 'ping "just pinging" (conc (current-seconds)))))
(print "got res=" res)
(equal? res cookie)
))
;;======================================================================
;; network utilities
;;======================================================================
|
>
|
|
|
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
(udat-captain-address-set! udata ipaddr)
(udat-captain-host-set! udata host)
(udat-captain-port-set! udata port)
(udat-captain-pid-set! udata pid)
(if (ping udata (conc ipaddr ":" port))
udata
(begin
(print "Found unreachable captain at " ipaddr ":" port ", removing pkt")
(remove-captain-pkt udata captn)
(setup))))
(begin
(setup-as-captain udata) ;; this saves the thread to captain-thread and starts the thread
(setup)))
))
;; connect to a specific dbfile
(define (connect udata dbfname dbtype)
udata)
(define (ping udata host-port)
(let* ((cookie (make-cookie udata))
(res (send-receive udata host-port 'ping "just pinging" (conc (current-seconds)) timeout: 1)))
;; (print "got res=" res)
(equal? res cookie)
))
;;======================================================================
;; network utilities
;;======================================================================
|
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
|
;;
(define (setup-as-captain udata)
(if (start-server-find-port udata) ;; puts the server in udata
(if (create-captain-pkt udata)
(let* ((th (make-thread (lambda ()
(ulex-handler udata)) "Captain handler")))
(udat-handler-thread-set! udata th)
(thread-start! th)
udata)
#f)
#f))
(define (get-peer-dat udata host-port #!optional (hostname #f)(pid #f))
(let* ((pdat (or (hash-table-ref/default (udat-outgoing-conns udata) host-port #f)
(handle-exceptions ;; ERROR - MAKE THIS EXCEPTION HANDLER MORE SPECIFIC
exn
|
|
<
|
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
|
;;
(define (setup-as-captain udata)
(if (start-server-find-port udata) ;; puts the server in udata
(if (create-captain-pkt udata)
(let* ((th (make-thread (lambda ()
(ulex-handler udata)) "Captain handler")))
(udat-handler-thread-set! udata th)
(thread-start! th))
#f)
#f))
(define (get-peer-dat udata host-port #!optional (hostname #f)(pid #f))
(let* ((pdat (or (hash-table-ref/default (udat-outgoing-conns udata) host-port #f)
(handle-exceptions ;; ERROR - MAKE THIS EXCEPTION HANDLER MORE SPECIFIC
exn
|
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
|
;; (there is a listener for handling that)
)
#f))) ;; #f means failed to connect and send
;; send a request to the given host-port and register a mailbox in udata
;; wait for the mailbox data and return it
;;
(define (send-receive udata host-port handler qrykey data #!key (hostname #f)(pid #f)(params '()))
(let ((mbox (make-mailbox))
(mbox-time (current-milliseconds))
(mboxes (udat-mboxes udata)))
(hash-table-set! mboxes qrykey mbox)
(if (send udata host-port handler qrykey data hostname: hostname pid: pid params: params)
(let* ((mbox-timeout-secs 20)
(mbox-timeout-result 'MBOX_TIMEOUT)
(res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result))
(mbox-receive-time (current-milliseconds)))
(hash-table-delete! mboxes qrykey)
res)
#f))) ;; #f means failed to communicate
(define (add-to-work-queue udata peer-dat handlerkey qrykey data)
(let ((wdat (make-work peer-dat: peer-dat handlerkey: handlerkey qrykey: qrykey data: data)))
(if (udat-busy udata)
(queue-add! (udat-work-queue udata) wdat)
(process-work udata wdat)) ;; passing in wdat tells process-work to first process the passed in wdat
|
|
|
>
>
|
|
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
|
;; (there is a listener for handling that)
)
#f))) ;; #f means failed to connect and send
;; send a request to the given host-port and register a mailbox in udata
;; wait for the mailbox data and return it
;;
(define (send-receive udata host-port handler qrykey data #!key (hostname #f)(pid #f)(params '())(timeout 20))
(let ((mbox (make-mailbox))
(mbox-time (current-milliseconds))
(mboxes (udat-mboxes udata)))
(hash-table-set! mboxes qrykey mbox)
(if (send udata host-port handler qrykey data hostname: hostname pid: pid params: params)
(let* ((mbox-timeout-secs timeout)
(mbox-timeout-result 'MBOX_TIMEOUT)
(res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result))
(mbox-receive-time (current-milliseconds)))
(hash-table-delete! mboxes qrykey)
(if (eq? res 'MBOX_TIMEOUT)
#f
res))
#f))) ;; #f means failed to communicate
(define (add-to-work-queue udata peer-dat handlerkey qrykey data)
(let ((wdat (make-work peer-dat: peer-dat handlerkey: handlerkey qrykey: qrykey data: data)))
(if (udat-busy udata)
(queue-add! (udat-work-queue udata) wdat)
(process-work udata wdat)) ;; passing in wdat tells process-work to first process the passed in wdat
|