Megatest

Check-in [0b096cbab6]
Login
Overview
Comment:wip
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | v1.70-captain-ulex | v1.70-defunct-try
Files: files | file ages | folders
SHA1: 0b096cbab6db9217100e8f2b1ef4fa768130659e
User & Date: matt on 2020-01-01 23:12:30
Other Links: branch diff | manifest | tags
Context
2020-01-01
23:28
wip check-in: 852e127507 user: matt tags: v1.70-captain-ulex, v1.70-defunct-try
23:12
wip check-in: 0b096cbab6 user: matt tags: v1.70-captain-ulex, v1.70-defunct-try
21:11
wip check-in: 65faac60c7 user: matt tags: v1.70-captain-ulex, v1.70-defunct-try
Changes

Modified ulex/ulex.scm from [1ac73bf9c4] to [9daa591c44].

96
97
98
99
100
101
102


103
104
105
106
107
108
109
110
111
112
113
114
115














116
117
118
119
120
121
122
	    (ipaddr   . a)  ;; sender ip
	    (hostkey  . k)  ;; sending host key - store info at server under this key
	    (servkey  . s)  ;; server key - this needs to match at server end or reject the msg
	    (format   . f)  ;; sb=serialized-base64, t=text, sx=sexpr, j=json
	    (data     . d)  ;; base64 encoded slln data
	    )))



(defstruct udat
  (captain-address #f)
  (captain-host    #f)
  (captain-port    #f)
  (captain-pid     #f)
  (cpkts-dir       (conc (get-environment-variable "HOME") "/.ulex/pkts"))
  (cpkt-spec       *captain-pktspec*)
  (my-cpkt-key     #f)   ;; put Z card here when I create a pkt for myself as captain
  (my-address      #f)
  (my-hostname     #f)
  (my-port         #f)
  (my-pid          (current-process-id))
  (serv-listener   #f)














  )

;;======================================================================
;; Captain pkt functions
;;======================================================================

;; given a pkts dir read 







>
>













>
>
>
>
>
>
>
>
>
>
>
>
>
>







96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
	    (ipaddr   . a)  ;; sender ip
	    (hostkey  . k)  ;; sending host key - store info at server under this key
	    (servkey  . s)  ;; server key - this needs to match at server end or reject the msg
	    (format   . f)  ;; sb=serialized-base64, t=text, sx=sexpr, j=json
	    (data     . d)  ;; base64 encoded slln data
	    )))

;; struct for keeping track of our world

(defstruct udat
  (captain-address #f)
  (captain-host    #f)
  (captain-port    #f)
  (captain-pid     #f)
  (cpkts-dir       (conc (get-environment-variable "HOME") "/.ulex/pkts"))
  (cpkt-spec       *captain-pktspec*)
  (my-cpkt-key     #f)   ;; put Z card here when I create a pkt for myself as captain
  (my-address      #f)
  (my-hostname     #f)
  (my-port         #f)
  (my-pid          (current-process-id))
  (serv-listener   #f)
  (handler-thread  #f)
  (handlers        '())
  (outgoing-conns  (make-hash-table))  ;; host:port -> conn
  )

;; struct for keeping track of others we are talking to

(defstruct peer
  (addr-port       #f)
  (hostname        #f)
  (pid             #f)
  (inp             #f)  ;; input port from the peer
  (oup             #f)  ;; output port to the peer
  (owns            '()) ;; list of databases this peer is currently handling
  )

;;======================================================================
;; Captain pkt functions
;;======================================================================

;; given a pkts dir read 
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
	 (pkt-spec      (udat-cpkt-spec udata)))
    (map (lambda (pkt-file)
	   (read-pkt->alist pkt-file pktspec: pkt-spec))
	 all-pkt-files)))

;; sort by D then Z, return one, choose the oldest then
;; differentiate if needed using the Z key
;;
(define (get-winning-pkt pkts)
  (if (null? pkts)
      #f
      (car (sort pkts (lambda (a b)
			(let ((ad (string->number (alist-ref 'D a)))
			      (bd (string->number (alist-ref 'D b))))
			  (if (eq? a b)
			      (let ((az (alist-ref 'Z a))
				    (bz (alist-ref 'Z b)))
				(string>=? az bz))
			      (> ad bd))))))))

;; create a tcp listener and return a populated udat struct with
;; my port, address, hostname, pid etc.
;; return #f if fail to find a port to allocate.
;;
(define (start-server-find-port udata #!optional (port 9999)) 
  (handle-exceptions
      exn
      (if (< port 65535)(start-server-find-port (+ port 1)) #f)
    (start-server udata port)))

(define (start-server udata port)
  ;; (tcp-listener-socket LISTENER)(socket-name so)
  ;; sockaddr-address, sockaddr-port, sockaddr->string
  (let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]])
	 (addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname)))
    (udat-my-address-set!    udata addr)
    (udat-my-port-set!       udata port)
    (udat-my-hostname-set!   udata (get-host-name))







|
















|


|
|

|







148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
	 (pkt-spec      (udat-cpkt-spec udata)))
    (map (lambda (pkt-file)
	   (read-pkt->alist pkt-file pktspec: pkt-spec))
	 all-pkt-files)))

;; sort by D then Z, return one, choose the oldest then
;; differentiate if needed using the Z key
;;l
(define (get-winning-pkt pkts)
  (if (null? pkts)
      #f
      (car (sort pkts (lambda (a b)
			(let ((ad (string->number (alist-ref 'D a)))
			      (bd (string->number (alist-ref 'D b))))
			  (if (eq? a b)
			      (let ((az (alist-ref 'Z a))
				    (bz (alist-ref 'Z b)))
				(string>=? az bz))
			      (> ad bd))))))))

;; create a tcp listener and return a populated udat struct with
;; my port, address, hostname, pid etc.
;; return #f if fail to find a port to allocate.
;;
(define (start-server-find-port udata #!optional (port 4242)) 
  (handle-exceptions
      exn
      (if (< port 65535)(connect-server-find-port udata (+ port 1)) #f)
    (connect-server udata port)))

(define (connect-server udata port)
  ;; (tcp-listener-socket LISTENER)(socket-name so)
  ;; sockaddr-address, sockaddr-port, sockaddr->string
  (let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]])
	 (addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname)))
    (udat-my-address-set!    udata addr)
    (udat-my-port-set!       udata port)
    (udat-my-hostname-set!   udata (get-host-name))
191
192
193
194
195
196
197
198


































































199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217

218
219
220
221
222
223
224

225
226
227
228
229
230
231
232
	 udata
	 (write-alist->pkt
	  pktdir
	  pktdat
	  pktspec: pktspec
	  ptype:   'captain))
	(udat-my-cpkt-key udata))))
    


































































;;======================================================================
;; connection setup and management functions
;;======================================================================

;; find or become the captain, return a ulex object
;;
(define (setup)
  (let* ((udata (make-udat))
	 (cpkts (get-all-captain-pkts udata)) ;; read captain pkts
	 (captn (get-winning-pkt cpkts)))
    (if captn
	(let* ((port   (alist-ref 'port   captn))
	       (host   (alist-ref 'host   captn))
	       (ipaddr (alist-ref 'ipaddr captn))
	       (pid    (alist-ref 'pid    captn)))
	  (udat-captain-address-set! udata ipaddr)
	  (udat-captain-host-set!    udata host)
	  (udat-captain-port-set!    udata port)
	  (udat-captain-pid-set!     udata pid)

	  udata)
	;;
	;; register captn here
	;;
	;;  then run setup again
	;;
	udata

	)))
    
(define (connect udata dbfname)
  udata)

) ;; END OF ULEX









|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>



















>
|
|
|
<
|
<
|
>
|







207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303

304

305
306
307
308
309
310
311
312
313
314
	 udata
	 (write-alist->pkt
	  pktdir
	  pktdat
	  pktspec: pktspec
	  ptype:   'captain))
	(udat-my-cpkt-key udata))))

;; NB// This needs to be started in a thread
;;
;; setup to be a captain
;;   - start server
;;   - create pkt
;;   - start server port handler
;;
(define (setup-as-captain udata)
  (if (start-server-find-port udata) ;; puts the server in udata
      (if (create-captain-pkt udata)
	  (let* ((th (make-thread (lambda ()
				    (ulex-handler udata)) "Captain handler")))
	    (udat-handler-thread-set! udata th)
	    (thread-start! th))
	  #f)
      #f))

(define (get-peer-dat udata host-port #!optional (hostname #f)(pid #f))
  (let* ((pdat (or (hash-table-ref/default (udat-outgoing-conns udata) host-port #f)
		   (let ((npdat (make-peer addr-port: host-port)))
		     (if hostname (peer-hostname-set! npdat hostname))
		     (if pid (peer-pid-set! npdat pid))
		     (let-values (((ninp noup)(tcp-connect host-port)))
		       (peer-inp-set! npdat ninp)
		       (peer-oup-set! npdat noup))
		     (hash-table-set! (udat-outgoing-conns udata) host-port npdat)
		     npdat))))
    pdat))

(define (get-peer-ports udata host-port hostname pid)
  (let ((pdat (get-peer-dat udata host-port hostname pid)))
    (values (peer-inp pdat)(peer-oup pdat))))

;; send back ack
;;
(define (send-ack udata qrykey oup)
  (write-line (conc
	       "ack "
	       (udat-my-address  udata) ":" (udat-my-port udata) " "
	       (udat-my-hostname udata) " "
	       (udat-my-pid      udata) " "
	       qrykey)
	      oup)
  (write-line qrykey oup)) ;; we must send a second line - for the ack let it be the qrykey 
  
;; 
;;
(define (ulex-handler udata)
  (let* ((serv-listener (udat-serv-listener udata)))
    (let-values (((inp oup)(tcp-accept serv-listener)))
      ;; data comes as two lines
      ;;   handlerkey resp-addr:resp-port hostname pid qrykey [dbpath/dbfile.db]
      ;;   data
      (let loop ((state 'start))
	(let* ((controldat (read-line inp))
	       (data       (read-line inp)))
	  (match (string-split controldat)
	    ((handlerkey host:port hostname pid qrykey params ...)
	     (case (string->symbol handlerkey)
	       (else
		(let-values (((pinp poup)(get-peer-ports udata host:port hostname pid)))
		  (send-ack udata qrykey poup))
		(add-to-work-queue (get-peer-dat udata host:port) handlerkey data))))
	    (else (print "BAD DATA? handler=" handler " data=" data))))
	(loop state)))))

;;======================================================================
;; connection setup and management functions
;;======================================================================

;; find or become the captain, return a ulex object
;;
(define (setup)
  (let* ((udata (make-udat))
	 (cpkts (get-all-captain-pkts udata)) ;; read captain pkts
	 (captn (get-winning-pkt cpkts)))
    (if captn
	(let* ((port   (alist-ref 'port   captn))
	       (host   (alist-ref 'host   captn))
	       (ipaddr (alist-ref 'ipaddr captn))
	       (pid    (alist-ref 'pid    captn)))
	  (udat-captain-address-set! udata ipaddr)
	  (udat-captain-host-set!    udata host)
	  (udat-captain-port-set!    udata port)
	  (udat-captain-pid-set!     udata pid)
	  ;;(if (ping-captain udata)
	  ;;    udata
	  ;;    (begin
	  ;;       (remove-captain-pkt udata captn)

	  ;;       (setup)))

	  udata)
	(setup-as-captain udata)) ;; this saves the thread to captain-thread and starts the thread
    ))
    
(define (connect udata dbfname)
  udata)

) ;; END OF ULEX