︙ | | | ︙ | |
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
;; start-server-find-port ;; gotta have a server port ready from the very begining
;; udata - all the connection info, captain, server, ulex db etc. MUST BE PASSED IN
;; dbpath - full path and filename of the db to talk to or a symbol naming the db?
;; callname - the remote call to execute
;; params - parameters to pass to the remote call
;;
(define (remote-call udata dbpath dbtype callname . params)
(start-server-find-port udata) ;; ensure we have a local server
(find-or-setup-captain udata)
;; look at connect, process-request, send, send-receive
(let-values (((cookie-key host-port)(get-db-owner udata dbpath dbtype)))
(send-receive udata host-port callname cookie-key params)))
;;======================================================================
;; KEY FUNCTIONS - THESE ARE TOO BE EXPOSED AND USED
;;======================================================================
;; connection setup and management functions
;; This is the basic setup command. Must always be
;; called before connecting to a db using connect.
;;
;; find or become the captain
;; setup and return a ulex object
;;
(define (find-or-setup-captain udata)
;; see if we already have a captain and if the lease is ok
(if (and (udat-captain-address udata)
(udat-captain-port udata)
(< (current-seconds) (udat-captain-lease udata)))
udata
(let* ((cpkts (get-all-captain-pkts udata)) ;; read captain pkts
(captn (get-winning-pkt cpkts)))
|
|
>
|
>
|
|
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
|
;; start-server-find-port ;; gotta have a server port ready from the very begining
;; udata - all the connection info, captain, server, ulex db etc. MUST BE PASSED IN
;; dbpath - full path and filename of the db to talk to or a symbol naming the db?
;; callname - the remote call to execute
;; params - parameters to pass to the remote call
;;
(define (remote-call udata dbpath dbtype callname params)
(start-server-find-port udata) ;; ensure we have a local server
(find-or-setup-captain udata)
;; look at connect, process-request, send, send-receive
(let-values (((cookie-key host-port)(get-db-owner udata dbpath dbtype)))
(if (and cookie-key host-port)
(send-receive udata host-port callname cookie-key params)
#f)))
;;======================================================================
;; KEY FUNCTIONS - THESE ARE TOO BE EXPOSED AND USED
;;======================================================================
;; connection setup and management functions
;; This is the basic setup command. Must always be
;; called before connecting to a db using connect.
;;
;; find or become the captain
;; setup and return a ulex object
;;
(define (find-or-setup-captain udata #!optional (tries 0))
;; see if we already have a captain and if the lease is ok
(if (and (udat-captain-address udata)
(udat-captain-port udata)
(< (current-seconds) (udat-captain-lease udata)))
udata
(let* ((cpkts (get-all-captain-pkts udata)) ;; read captain pkts
(captn (get-winning-pkt cpkts)))
|
︙ | | | ︙ | |
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
|
(udat-captain-lease-set! udata (+ (current-seconds) 10))
(let-values (((success pingtime)(ping udata (conc ipaddr ":" port))))
(if success
udata
(begin
(print "Found unreachable captain at " ipaddr ":" port ", removing pkt")
(remove-captain-pkt udata captn)
(find-or-setup-captain udata))))
(begin
(setup-as-captain udata) ;; this saves the thread to captain-thread and starts the thread
(find-or-setup-captain udata)))))))
;; connect to a specific dbfile
;; - if already connected - return the dbowner host-port
;; - ask the captain who to talk to for this db
;; - put the entry in the dbowners hash as dbfile => host-port
;;
(define (connect udata dbfname dbtype)
|
>
|
>
|
|
>
|
>
|
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
|
(udat-captain-lease-set! udata (+ (current-seconds) 10))
(let-values (((success pingtime)(ping udata (conc ipaddr ":" port))))
(if success
udata
(begin
(print "Found unreachable captain at " ipaddr ":" port ", removing pkt")
(remove-captain-pkt udata captn)
(if (< tries 20)
(find-or-setup-captain udata (+ tries 1))
#f)))))
(begin
(setup-as-captain udata) ;; this saves the thread to captain-thread and starts the thread
(if (< tries 20)
(find-or-setup-captain udata (+ tries 1))
#f))))))
;; connect to a specific dbfile
;; - if already connected - return the dbowner host-port
;; - ask the captain who to talk to for this db
;; - put the entry in the dbowners hash as dbfile => host-port
;;
(define (connect udata dbfname dbtype)
|
︙ | | | ︙ | |
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
|
(let* ((host-port (udat-captain-host-port udata)))
(if host-port
(let* ((cookie (make-cookie udata))
(msg #f) ;; (conc dbname " " dbtype))
(params `(,dbname ,dbtype))
(res (send udata host-port 'db-owner cookie msg
params: params retval: #t)))
(match (string-split res)
((retcookie owner-host-port)
(values (equal? retcookie cookie) owner-host-port))))
(values #f -1))))
;; called in ulex-handler to dispatch work, called on the workers side
;; calls (proc params data)
;; returns result with cookie
;;
;; pdat is the info of the caller, used to send the result data
|
|
|
>
|
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
|
(let* ((host-port (udat-captain-host-port udata)))
(if host-port
(let* ((cookie (make-cookie udata))
(msg #f) ;; (conc dbname " " dbtype))
(params `(,dbname ,dbtype))
(res (send udata host-port 'db-owner cookie msg
params: params retval: #t)))
(match (and res (string-split res))
((retcookie owner-host-port)
(values (equal? retcookie cookie) owner-host-port))
(else (values #f #f))))
(values #f -1))))
;; called in ulex-handler to dispatch work, called on the workers side
;; calls (proc params data)
;; returns result with cookie
;;
;; pdat is the info of the caller, used to send the result data
|
︙ | | | ︙ | |
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
|
(let* ((my-addr (udat-my-address udata))
(my-port (udat-my-port udata))
(th (make-thread (lambda ()
(ulex-handler-loop udata)) "Captain handler")))
(udat-handler-thread-set! udata th)
(udat-captain-address-set! udata my-addr)
(udat-captain-port-set! udata my-port)
(thread-start! th))
(begin
(print "ERROR: failed to create captain pkt")
#f)))
;; given a pkts dir read
;;
(define (get-all-captain-pkts udata)
|
|
>
|
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
|
(let* ((my-addr (udat-my-address udata))
(my-port (udat-my-port udata))
(th (make-thread (lambda ()
(ulex-handler-loop udata)) "Captain handler")))
(udat-handler-thread-set! udata th)
(udat-captain-address-set! udata my-addr)
(udat-captain-port-set! udata my-port)
(thread-start! th)
(print "Captain setup complete and thread started. Address: " my-addr ", port: " my-port))
(begin
(print "ERROR: failed to create captain pkt")
#f)))
;; given a pkts dir read
;;
(define (get-all-captain-pkts udata)
|
︙ | | | ︙ | |
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
|
(udat-my-pid udata) "-"
newcnum)))
;; create a tcp listener and return a populated udat struct with
;; my port, address, hostname, pid etc.
;; return #f if fail to find a port to allocate.
;;
;; if udata-in is #f create the record
;; if there is already a serv-listener return the udata
;;
(define (start-server-find-port udata-in #!optional (port 4242))
(let ((udata (or udata-in (make-udat))))
(if (udat-serv-listener udata) ;; TODO - add check that the listener is alive and ready?
udata
(handle-exceptions
exn
(if (< port 65535)
(start-server-find-port udata (+ port 1))
#f)
(connect-server udata port)))))
(define (connect-server udata port)
;; (tcp-listener-socket LISTENER)(socket-name so)
;; sockaddr-address, sockaddr-port, sockaddr->string
(let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]])
(addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname)))
(udat-my-address-set! udata addr)
(udat-my-port-set! udata port)
(udat-my-hostname-set! udata (get-host-name))
(udat-serv-listener-set! udata tlsn)
udata))
(define (get-peer-dat udata host-port #!optional (hostname #f)(pid #f))
(let* ((pdat (or (udat-get-peer udata host-port)
(handle-exceptions ;; ERROR - MAKE THIS EXCEPTION HANDLER MORE SPECIFIC
exn
#f
(let ((npdat (make-peer addr-port: host-port)))
|
>
>
|
|
>
|
>
>
|
>
|
|
<
>
>
>
|
>
>
|
|
|
|
|
>
|
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
|
(udat-my-pid udata) "-"
newcnum)))
;; create a tcp listener and return a populated udat struct with
;; my port, address, hostname, pid etc.
;; return #f if fail to find a port to allocate.
;;
;; does not actually start a server thread
;;
;; if udata-in is #f create the record
;; if there is already a serv-listener return the udata
;;
(define (start-server-find-port udata-in #!optional (port 4242)(tries 0))
(let ((udata (or udata-in (make-udat))))
(if (udat-serv-listener udata) ;; TODO - add check that the listener is alive and ready?
udata
(let ((res (connect-server udata port)))
(if res
res
(begin
;; (print "Could not connect to " port)
(if (and (< port 65535)
(< tries 10000)) ;; make this number bigger when things are working
(start-server-find-port udata (+ port 1)(+ tries 1))
#f)))))))
(define (connect-server udata port)
;; (tcp-listener-socket LISTENER)(socket-name so)
;; sockaddr-address, sockaddr-port, sockaddr->string
(let* ((tlsn (handle-exceptions
exn
#f ;; NB// NEED BETTER HANDLING HERE ASAP
(tcp-listen port 1000 #f))) ;; (tcp-listen TCPPORT [BACKLOG [HOST]])
(addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname)))
(if tlsn
(begin
(udat-my-address-set! udata addr)
(udat-my-port-set! udata port)
(udat-my-hostname-set! udata (get-host-name))
(udat-serv-listener-set! udata tlsn)
udata)
#f)))
(define (get-peer-dat udata host-port #!optional (hostname #f)(pid #f))
(let* ((pdat (or (udat-get-peer udata host-port)
(handle-exceptions ;; ERROR - MAKE THIS EXCEPTION HANDLER MORE SPECIFIC
exn
#f
(let ((npdat (make-peer addr-port: host-port)))
|
︙ | | | ︙ | |