23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
;; NOTES:
;; Why sql-de-lite and not say, dbi? - performance mostly, then simplicity.
;;
;;======================================================================
(use mailbox)
(module ulex *
(import scheme posix chicken data-structures ports extras files mailbox)
(import srfi-18 pkts matchable regex
typed-records srfi-69 srfi-1
srfi-4 regex-case
(prefix sqlite3 sqlite3:)
foreign
tcp6
;; ulex-netutil
hostinfo
)
;;======================================================================
;; KEY FUNCTIONS - THESE ARE TOO BE EXPOSED AND USED
;;======================================================================
;; connection setup and management functions
;; This is the basic setup command. Must always be
;; called before connecting to a db using connect.
;;
;; find or become the captain
;; setup and return a ulex object
;;
(define (setup #!optional (udata-in #f))
(let* ((udata (or udata-in (make-udat)))
(cpkts (get-all-captain-pkts udata)) ;; read captain pkts
(captn (get-winning-pkt cpkts)))
;; check to see if our own server is started and start one if not
(if (not (udat-serv-listener udata))(start-server-find-port udata))
(if captn
(let* ((port (alist-ref 'port captn))
(host (alist-ref 'host captn))
(ipaddr (alist-ref 'ipaddr captn))
(pid (alist-ref 'pid captn))
(Z (alist-ref 'Z captn)))
(udat-captain-address-set! udata ipaddr)
(udat-captain-host-set! udata host)
(udat-captain-port-set! udata port)
(udat-captain-pid-set! udata pid)
(let-values (((success pingtime)(ping udata (conc ipaddr ":" port))))
(if success
udata
(begin
(print "Found unreachable captain at " ipaddr ":" port ", removing pkt")
(remove-captain-pkt udata captn)
(setup)))))
(begin
(setup-as-captain udata) ;; this saves the thread to captain-thread and starts the thread
(setup)))))
;; connect to a specific dbfile
;; - if already connected - return the dbowner host-port
;; - ask the captain who to talk to for this db
;; - put the entry in the dbowners hash as dbfile => host-port
;;
(define (connect udata dbfname dbtype)
|
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
>
>
>
>
|
|
|
<
<
|
|
|
|
|
|
|
|
|
|
>
|
|
|
|
|
|
|
|
|
|
|
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
|
;; NOTES:
;; Why sql-de-lite and not say, dbi? - performance mostly, then simplicity.
;;
;;======================================================================
(use mailbox)
(module ulex
*
(import scheme posix chicken data-structures ports extras files mailbox)
(import srfi-18 pkts matchable regex
typed-records srfi-69 srfi-1
srfi-4 regex-case
(prefix sqlite3 sqlite3:)
foreign
tcp6
;; ulex-netutil
hostinfo
)
;; make it a global? Well, it is local to area module
(define *captain-pktspec*
`((captain (host . h)
(port . p)
(pid . i)
(ipaddr . a)
)
#;(data (hostname . h) ;; sender hostname
(port . p) ;; sender port
(ipaddr . a) ;; sender ip
(hostkey . k) ;; sending host key - store info at server under this key
(servkey . s) ;; server key - this needs to match at server end or reject the msg
(format . f) ;; sb=serialized-base64, t=text, sx=sexpr, j=json
(data . d) ;; base64 encoded slln data
)))
;; struct for keeping track of our world
(defstruct udat
;; captain info
(captain-address #f)
(captain-host #f)
(captain-port #f)
(captain-pid #f)
(captain-lease 0) ;; time (unix epoc) seconds when the lease is up
(ulex-dir (conc (get-environment-variable "HOME") "/.ulex"))
(cpkts-dir (conc (get-environment-variable "HOME") "/.ulex/pkts"))
(cpkt-spec *captain-pktspec*)
;; this processes info
(my-cpkt-key #f) ;; put Z card here when I create a pkt for myself as captain
(my-address #f)
(my-hostname #f)
(my-port #f)
(my-pid (current-process-id))
(my-dbs '())
;; server and handler thread
(serv-listener #f) ;; this processes server info
(handler-thread #f)
(mboxes (make-hash-table)) ;; key => mbox
;; other servers
(peers (make-hash-table)) ;; host-port => peer record
(dbowners (make-hash-table)) ;; dbfile => host-port
(handlers (make-hash-table)) ;; dbfile => proc
;; (outgoing-conns (make-hash-table)) ;; host:port -> conn
(work-queue (make-queue)) ;; most stuff goes here
;; (fast-queue (make-queue)) ;; super quick stuff goes here (e.g. ping)
(busy #f) ;; is either of the queues busy, use to switch between queuing tasks or doing immediately
;; app info
(appname #f)
(dbtypes (make-hash-table)) ;; this should be an alist but hash is easier. dbtype => [ initproc syncproc ]
;; cookies
(cnum 0) ;; cookie num
)
;;======================================================================
;; NEW APPROACH
;;======================================================================
;; start-server-find-port ;; gotta have a server port ready from the very begining
;; udata - all the connection info, captain, server, ulex db etc. MUST BE PASSED IN
;; dbpath - full path and filename of the db to talk to or a symbol naming the db?
;; callname - the remote call to execute
;; params - parameters to pass to the remote call
;;
(define (remote-call udata dbpath dbtype callname . params)
(start-server-find-port udata) ;; ensure we have a local server
(find-or-setup-captain udata)
;; look at connect, process-request, send, send-receive
(let-values (((cookie-key host-port)(get-db-owner udata dbpath dbtype)))
(send-receive udata host-port callname cookie-key params)))
;;======================================================================
;; KEY FUNCTIONS - THESE ARE TOO BE EXPOSED AND USED
;;======================================================================
;; connection setup and management functions
;; This is the basic setup command. Must always be
;; called before connecting to a db using connect.
;;
;; find or become the captain
;; setup and return a ulex object
;;
(define (find-or-setup-captain udata)
;; see if we already have a captain and if the lease is ok
(if (and (udat-captain-address udata)
(udat-captain-port udata)
(< (current-seconds) (udat-captain-lease udata)))
udata
(let* ((cpkts (get-all-captain-pkts udata)) ;; read captain pkts
(captn (get-winning-pkt cpkts)))
(if captn
(let* ((port (alist-ref 'port captn))
(host (alist-ref 'host captn))
(ipaddr (alist-ref 'ipaddr captn))
(pid (alist-ref 'pid captn))
(Z (alist-ref 'Z captn)))
(udat-captain-address-set! udata ipaddr)
(udat-captain-host-set! udata host)
(udat-captain-port-set! udata port)
(udat-captain-pid-set! udata pid)
(udat-captain-lease-set! udata (+ (current-seconds) 10))
(let-values (((success pingtime)(ping udata (conc ipaddr ":" port))))
(if success
udata
(begin
(print "Found unreachable captain at " ipaddr ":" port ", removing pkt")
(remove-captain-pkt udata captn)
(find-or-setup-captain udata))))
(begin
(setup-as-captain udata) ;; this saves the thread to captain-thread and starts the thread
(find-or-setup-captain udata)))))))
;; connect to a specific dbfile
;; - if already connected - return the dbowner host-port
;; - ask the captain who to talk to for this db
;; - put the entry in the dbowners hash as dbfile => host-port
;;
(define (connect udata dbfname dbtype)
|
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
|
(sort (get-all-ips) ip-pref-less?))
(define (get-all-ips)
(map ip->string (vector->list
(hostinfo-addresses
(host-information (current-hostname))))))
;; make it a global? Well, it is local to area module
(define *captain-pktspec*
`((captain (host . h)
(port . p)
(pid . i)
(ipaddr . a)
)
#;(data (hostname . h) ;; sender hostname
(port . p) ;; sender port
(ipaddr . a) ;; sender ip
(hostkey . k) ;; sending host key - store info at server under this key
(servkey . s) ;; server key - this needs to match at server end or reject the msg
(format . f) ;; sb=serialized-base64, t=text, sx=sexpr, j=json
(data . d) ;; base64 encoded slln data
)))
;; struct for keeping track of our world
(defstruct udat
;; captain info
(captain-address #f)
(captain-host #f)
(captain-port #f)
(captain-pid #f)
(ulex-dir (conc (get-environment-variable "HOME") "/.ulex"))
(cpkts-dir (conc (get-environment-variable "HOME") "/.ulex/pkts"))
(cpkt-spec *captain-pktspec*)
;; this processes info
(my-cpkt-key #f) ;; put Z card here when I create a pkt for myself as captain
(my-address #f)
(my-hostname #f)
(my-port #f)
(my-pid (current-process-id))
(my-dbs '())
;; server and handler thread
(serv-listener #f) ;; this processes server info
(handler-thread #f)
(mboxes (make-hash-table)) ;; key => mbox
;; other servers
(peers (make-hash-table)) ;; host-port => peer record
(dbowners (make-hash-table)) ;; dbfile => host-port
(handlers (make-hash-table)) ;; dbfile => proc
;; (outgoing-conns (make-hash-table)) ;; host:port -> conn
(work-queue (make-queue)) ;; most stuff goes here
;; (fast-queue (make-queue)) ;; super quick stuff goes here (e.g. ping)
(busy #f) ;; is either of the queues busy, use to switch between queuing tasks or doing immediately
;; app info
(appname #f)
(dbtypes (make-hash-table)) ;; this should be an alist but hash is easier. dbtype => [ initproc syncproc ]
;; cookies
(cnum 0) ;; cookie num
)
(define (udat-my-host-port udata)
(if (and (udat-my-address udata)(udat-my-port udata))
(conc (udat-my-address udata) ":" (udat-my-port udata))
#f))
(define (udat-captain-host-port udata)
(if (and (udat-captain-address udata)(udat-captain-port udata))
|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
|
287
288
289
290
291
292
293
294
295
296
297
298
299
300
|
(sort (get-all-ips) ip-pref-less?))
(define (get-all-ips)
(map ip->string (vector->list
(hostinfo-addresses
(host-information (current-hostname))))))
(define (udat-my-host-port udata)
(if (and (udat-my-address udata)(udat-my-port udata))
(conc (udat-my-address udata) ":" (udat-my-port udata))
#f))
(define (udat-captain-host-port udata)
(if (and (udat-captain-address udata)(udat-captain-port udata))
|
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
|
;;======================================================================
;; Captain functions
;;======================================================================
;; NB// This needs to be started in a thread
;;
;; setup to be a captain
;; - start server
;; - create pkt
;; - start server port handler
;;
(define (setup-as-captain udata)
(if (start-server-find-port udata) ;; puts the server in udata
(if (create-captain-pkt udata)
(let* ((my-addr (udat-my-address udata))
(my-port (udat-my-port udata))
(th (make-thread (lambda ()
(ulex-handler-loop udata)) "Captain handler")))
(udat-handler-thread-set! udata th)
(udat-captain-address-set! udata my-addr)
(udat-captain-port-set! udata my-port)
(thread-start! th))
(begin
(print "ERROR: failed to create captain pkt")
#f))
(begin
(print "ERROR: failed to start server.")
#f)))
;; given a pkts dir read
;;
(define (get-all-captain-pkts udata)
(let* ((pktsdir (let ((d (udat-cpkts-dir udata)))
(if (file-exists? d)
|
|
<
|
|
|
|
|
|
|
|
|
|
|
<
<
<
|
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
|
;;======================================================================
;; Captain functions
;;======================================================================
;; NB// This needs to be started in a thread
;;
;; setup to be a captain
;; - local server MUST be started already
;; - create pkt
;; - start server port handler
;;
(define (setup-as-captain udata)
(if (create-captain-pkt udata)
(let* ((my-addr (udat-my-address udata))
(my-port (udat-my-port udata))
(th (make-thread (lambda ()
(ulex-handler-loop udata)) "Captain handler")))
(udat-handler-thread-set! udata th)
(udat-captain-address-set! udata my-addr)
(udat-captain-port-set! udata my-port)
(thread-start! th))
(begin
(print "ERROR: failed to create captain pkt")
#f)))
;; given a pkts dir read
;;
(define (get-all-captain-pkts udata)
(let* ((pktsdir (let ((d (udat-cpkts-dir udata)))
(if (file-exists? d)
|
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
|
(udat-my-pid udata) "-"
newcnum)))
;; create a tcp listener and return a populated udat struct with
;; my port, address, hostname, pid etc.
;; return #f if fail to find a port to allocate.
;;
(define (start-server-find-port udata #!optional (port 4242))
(handle-exceptions
exn
(if (< port 65535)
(start-server-find-port udata (+ port 1))
#f)
(connect-server udata port)))
(define (connect-server udata port)
;; (tcp-listener-socket LISTENER)(socket-name so)
;; sockaddr-address, sockaddr-port, sockaddr->string
(let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]])
(addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname)))
(udat-my-address-set! udata addr)
|
>
>
>
|
>
>
>
|
|
|
|
|
|
|
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
|
(udat-my-pid udata) "-"
newcnum)))
;; create a tcp listener and return a populated udat struct with
;; my port, address, hostname, pid etc.
;; return #f if fail to find a port to allocate.
;;
;; if udata-in is #f create the record
;; if there is already a serv-listener return the udata
;;
(define (start-server-find-port udata-in #!optional (port 4242))
(let ((udata (or udata-in (make-udat))))
(if (udat-serv-listener udata) ;; TODO - add check that the listener is alive and ready?
udata
(handle-exceptions
exn
(if (< port 65535)
(start-server-find-port udata (+ port 1))
#f)
(connect-server udata port)))))
(define (connect-server udata port)
;; (tcp-listener-socket LISTENER)(socket-name so)
;; sockaddr-address, sockaddr-port, sockaddr->string
(let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]])
(addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname)))
(udat-my-address-set! udata addr)
|