Megatest

Check-in [7f56278741]
Login
Overview
Comment:wip
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | v1.70-captain-ulex | v1.70-defunct-try
Files: files | file ages | folders
SHA1: 7f562787413cbe3bdafad21d2ab1f09efe943f6d
User & Date: matt on 2020-01-06 22:12:14
Other Links: branch diff | manifest | tags
Context
2020-01-08
01:11
Removed nanomsg from dcommonmon and megamod check-in: 95b4151a44 user: matt tags: v1.70-captain-ulex, v1.70-defunct-try
2020-01-06
22:12
wip check-in: 7f56278741 user: matt tags: v1.70-captain-ulex, v1.70-defunct-try
17:06
wip check-in: 0390dc30b4 user: mrwellan tags: v1.70-captain-ulex, v1.70-defunct-try
Changes

Modified ulex/ulex.scm from [7b7114168d] to [43414b60e9].

162
163
164
165
166
167
168




169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186







187
188
189
190
191
192
193
  (my-port         #f)
  (my-pid          (current-process-id))
  ;; server and handler thread
  (serv-listener   #f)                 ;; this processes server info
  (handler-thread  #f)
  (handlers        (make-hash-table))
  (outgoing-conns  (make-hash-table))  ;; host:port -> conn




  ;; app info
  (appname         #f)
  (dbtypes         (make-hash-table))  ;; this should be an alist but hash is easier. dbtype => [ initproc syncproc ]
  ;; cookies
  (cnum            0) ;; cookie num
  )

;; struct for keeping track of others we are talking to

(defstruct peer
  (addr-port       #f)
  (hostname        #f)
  (pid             #f)
  (inp             #f)  ;; input port from the peer
  (oup             #f)  ;; output port to the peer
  (owns            '()) ;; list of databases this peer is currently handling
  )








;;======================================================================
;; Captain pkt functions
;;======================================================================

;; given a pkts dir read 
;;
(define (get-all-captain-pkts udata)







>
>
>
>


















>
>
>
>
>
>
>







162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
  (my-port         #f)
  (my-pid          (current-process-id))
  ;; server and handler thread
  (serv-listener   #f)                 ;; this processes server info
  (handler-thread  #f)
  (handlers        (make-hash-table))
  (outgoing-conns  (make-hash-table))  ;; host:port -> conn
  (mboxes          (make-hash-table))  ;; key => mbox
  (work-queue      (make-queue))       ;; most stuff goes here
  (fast-queue      (make-queue))       ;; super quick stuff goes here (e.g. ping)
  (busy            #f)                 ;; is either of the queues busy
  ;; app info
  (appname         #f)
  (dbtypes         (make-hash-table))  ;; this should be an alist but hash is easier. dbtype => [ initproc syncproc ]
  ;; cookies
  (cnum            0) ;; cookie num
  )

;; struct for keeping track of others we are talking to

(defstruct peer
  (addr-port       #f)
  (hostname        #f)
  (pid             #f)
  (inp             #f)  ;; input port from the peer
  (oup             #f)  ;; output port to the peer
  (owns            '()) ;; list of databases this peer is currently handling
  )

(defstruct work
  (peer-dat   #f)
  (handlerkey #f)
  (qrykey     #f)
  (data       #f)
  (start      (current-milliseconds)))

;;======================================================================
;; Captain pkt functions
;;======================================================================

;; given a pkts dir read 
;;
(define (get-all-captain-pkts udata)
337
338
339
340
341
342
343
















344







345









346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
		 (if (null? params) "" (conc " " (string-intersperse params " "))))
		oup)
    (write-line data oup)
    ;; NOTE: DO NOT BE TEMPTED TO LOOK AT ANY DATA ON INP HERE!
    ;;       (there is a listener for handling that)
    ))

















(define (add-to-work-queue udata . blah)







  #f)










;; send back ack - this is tcp we are talking about, do we really need an ack?
;;
;; NOTE: No need to send back host:port of self - that is locked in by qrykey
;;
(define (send-ack udata host-port qrykey) ;;  #!optional (hostname #f)(pid #f))
  (send udata host-port "ack" qrykey qrykey)) ;; we must send a second line - for the ack let it be the qrykey 
  
;; 
;;
(define (ulex-handler udata)
  (let* ((serv-listener (udat-serv-listener udata)))
    (let-values (((inp oup)(tcp-accept serv-listener)))
      ;; data comes as two lines
      ;;   handlerkey resp-addr:resp-port hostname pid qrykey [dbpath/dbfile.db]
      ;;   data
      (let loop ((state 'start))
	(let* ((controldat (read-line inp))
	       (data       (read-line inp)))
	  (match (string-split controldat)
	    ((handlerkey host:port pid qrykey cookie params ...)
	     (case (string->symbol handlerkey)
	       ((ack)(print "Got ack!"))
	       ((ping)
		(let* ((proc (hash-table-ref/default (udat-handlers udata) 'ping #f))
		       (val  (if proc (proc) "gotping")))
		  (send udata host:port "version" qrykey cookie val)))
	       ((rucaptain)
		(send udata host:port "iamcaptain" qrykey (if (udat-my-cpkt-key udata)
							       "yes"
							       "no")))
	       (else
		(send-ack udata host:port qrykey)
		(add-to-work-queue udata (get-peer-dat udata host:port) handlerkey data)))
	     (else (print "BAD DATA? handler=" handlerkey " data=" data)))))
	(loop state)))))

;; add a proc to the handler list
(define (register-handler udata key proc)
  (hash-table-set! (udat-handlers udata) key proc))








>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
>
>
>
>
>
>
>

>
>
>
>
>
>
>
>
>




















|





|





|
|







348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
		 (if (null? params) "" (conc " " (string-intersperse params " "))))
		oup)
    (write-line data oup)
    ;; NOTE: DO NOT BE TEMPTED TO LOOK AT ANY DATA ON INP HERE!
    ;;       (there is a listener for handling that)
    ))

;; send a request to the given host-port and register a mailbox in udata
;; wait for the mailbox data and return it
;;
(define (send-receive udata host-port handler qrykey data #!key (hostname #f)(pid #f)(params '()))
  (let ((mbox      (make-mailbox))
	(mbox-time (current-milliseconds))
	(mboxes    (udat-mboxes udata)))
    (hash-table-set! mboxes qrykey mbox)
    (send udata host-port handler qrykey data hostname: hostname pid: pid params: params)
    (let* ((mbox-timeout-secs    20)
	   (mbox-timeout-result 'MBOX_TIMEOUT)
	   (res                  (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result))
	   (mbox-receive-time    (current-milliseconds)))
      (hash-table-delete! mboxes qrykey)
      res)))

(define (add-to-work-queue udata peer-dat handlerkey qrykey data)
  (let ((wdat (make-work peer-dat: peer-dat handlerkey: handlerkey qrykey: qrykey data: data)))
    (if (udat-busy udata)
	(queue-add! (udat-work-queue udata) wdat)
	(process-work udata wdat)) ;; passing in wdat tells process-work to first process the passed in wdat
    ))

(define (do-work udata wdat)
  #f)

(define (process-work udata #!optional wdat)
  (if wdat (do-work udata wdat)) ;; process wdat
  (let ((wqueue (udat-work-queue udata)))
    (if (not (queue-empty? wqueue))
	(let loop ((wd (queue-remove! wqueue)))
	  (do-work udata wd)
	  (if (not (queue-empty? wqueue))
	      (loop (queue-remove! wqueue)))))))

;; send back ack - this is tcp we are talking about, do we really need an ack?
;;
;; NOTE: No need to send back host:port of self - that is locked in by qrykey
;;
(define (send-ack udata host-port qrykey) ;;  #!optional (hostname #f)(pid #f))
  (send udata host-port "ack" qrykey qrykey)) ;; we must send a second line - for the ack let it be the qrykey 
  
;; 
;;
(define (ulex-handler udata)
  (let* ((serv-listener (udat-serv-listener udata)))
    (let-values (((inp oup)(tcp-accept serv-listener)))
      ;; data comes as two lines
      ;;   handlerkey resp-addr:resp-port hostname pid qrykey [dbpath/dbfile.db]
      ;;   data
      (let loop ((state 'start))
	(let* ((controldat (read-line inp))
	       (data       (read-line inp)))
	  (match (string-split controldat)
	    ((handlerkey host:port pid qrykey params ...)
	     (case (string->symbol handlerkey)
	       ((ack)(print "Got ack!"))
	       ((ping)
		(let* ((proc (hash-table-ref/default (udat-handlers udata) 'ping #f))
		       (val  (if proc (proc) "gotping")))
		  (send udata host:port "version" qrykey val)))
	       ((rucaptain)
		(send udata host:port "iamcaptain" qrykey (if (udat-my-cpkt-key udata)
							       "yes"
							       "no")))
	       (else
		;; (send-ack udata host:port qrykey)
		(add-to-work-queue udata (get-peer-dat udata host:port) handlerkey qrykey data)))
	     (else (print "BAD DATA? handler=" handlerkey " data=" data)))))
	(loop state)))))

;; add a proc to the handler list
(define (register-handler udata key proc)
  (hash-table-set! (udat-handlers udata) key proc))