Megatest

Check-in [d20ee11c75]
Login
Overview
Comment:Added drop captain send to all peers
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | v1.70-captain-ulex | v1.70-defunct-try
Files: files | file ages | folders
SHA1: d20ee11c7592daf356b6fa02ee8308b45da174c7
User & Date: matt on 2020-01-20 03:34:04
Other Links: branch diff | manifest | tags
Context
2020-01-20
11:20
Refactored handler loop to facilitate calling locally check-in: a2267e910d user: matt tags: v1.70-captain-ulex, v1.70-defunct-try
03:34
Added drop captain send to all peers check-in: d20ee11c75 user: matt tags: v1.70-captain-ulex, v1.70-defunct-try
03:01
Speculatively removed inp and oup saving and close all ports check-in: 5263e9f2ce user: matt tags: v1.70-captain-ulex, v1.70-defunct-try
Changes

Modified ulex/ulex.scm from [e8344f5ae3] to [e30b68cf23].

208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
208
209
210
211
212
213
214

215
216
217
218
219
220
221







-







  ;; app info
  (appname         #f)
  (dbtypes         (make-hash-table))  ;; this should be an alist but hash is easier. dbtype => [ initproc syncproc ]
  ;; cookies
  (cnum            0) ;; cookie num
  )


(define (udat-my-host-port udata)
  (if (and (udat-my-address udata)(udat-my-port udata))
      (conc (udat-my-address udata) ":" (udat-my-port udata))
      #f))

(define (udat-captain-host-port udata)
  (if (and (udat-captain-address udata)(udat-captain-port udata))
323
324
325
326
327
328
329















330
331
332
333
334
335
336
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350







+
+
+
+
+
+
+
+
+
+
+
+
+
+
+








;; remove pkt associated with captn (the Z key .pkt)
;;
(define (remove-captain-pkt udata captn)
  (let ((Z       (alist-ref 'Z captn))
	(cpktdir (udat-cpkts-dir udata)))
    (delete-file* (conc cpktdir "/" Z ".pkt"))))

;; call all known peers and tell them to delete their info on the captain
;; thus forcing them to re-read pkts and connect to a new captain
;; call this when the captain needs to exit and if an older captain is
;; detected. Due to delays in sending file meta data in NFS multiple
;; captains can be initiated in a "Storm of Captains", book soon to be
;; on Amazon
;;
(define (drop-captain udata)
  (let* ((peers (hash-table-keys (udat-peers udata)))
	 (cookie (make-cookie udata)))
    (for-each
     (lambda (host-port)
       (send udata host-port 'dropcaptain cookie "nomsg" retval: #t))
     peers)))

;;======================================================================
;; server primitives
;;======================================================================

(define (make-cookie udata)
  (let ((newcnum (+ (udat-cnum udata))))
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
372
373
374
375
376
377
378



379
380
381
382
383
384
385







-
-
-







    (udat-my-address-set!    udata addr)
    (udat-my-port-set!       udata port)
    (udat-my-hostname-set!   udata (get-host-name))
    (udat-serv-listener-set! udata tlsn)
    udata))

(define (get-peer-dat udata host-port #!optional (hostname #f)(pid #f))
  ;; I'm currently very fuzzy on whether it makes sense to be reusing the outgoing connections.
  ;; at the other end of the line I think the reciever has closed the ports - thus each message
  ;; requires new connection?
  (let* ((pdat (or (udat-get-peer udata host-port)
		   (handle-exceptions ;; ERROR - MAKE THIS EXCEPTION HANDLER MORE SPECIFIC
		    exn
		    #f
		    (let ((npdat (make-peer addr-port: host-port)))
		      (if hostname (peer-hostname-set! npdat hostname))
		      (if pid (peer-pid-set! npdat pid))
436
437
438
439
440
441
442
443
444
445
446
447
448
449

450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
447
448
449
450
451
452
453







454













455
456
457
458
459
460
461







-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-







	       (mbox-receive-time    (current-milliseconds)))
	  (hash-table-delete! mboxes qrykey)
	  (if (eq? res 'MBOX_TIMEOUT)
	      #f
	      res))
	#f))) ;; #f means failed to communicate

(define (add-to-work-queue udata peer-dat handlerkey qrykey data)
  (let ((wdat (make-work peer-dat: peer-dat handlerkey: handlerkey qrykey: qrykey data: data)))
    (if (udat-busy udata)
	(queue-add! (udat-work-queue udata) wdat)
	(process-work udata wdat)) ;; passing in wdat tells process-work to first process the passed in wdat
    ))

;; handles the incoming messages and dispatches to queues
(define (do-work udata wdat)
  #f)

(define (process-work udata #!optional wdat)
  (if wdat (do-work udata wdat)) ;; process wdat
  (let ((wqueue (udat-work-queue udata)))
    (if (not (queue-empty? wqueue))
	(let loop ((wd (queue-remove! wqueue)))
	  (do-work udata wd)
	  (if (not (queue-empty? wqueue))
	      (loop (queue-remove! wqueue)))))))

;; 
;;
(define (ulex-handler udata)
  (let* ((serv-listener (udat-serv-listener udata)))
    (print "serv-listner: " serv-listener)
    ;; data comes as two lines
    ;;   handlerkey resp-addr:resp-port hostname pid qrykey [dbpath/dbfile.db]
    ;;   data
498
499
500
501
502
503
504









505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523























524
525
526
527
528
529
530
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554







+
+
+
+
+
+
+
+
+



















+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+







			    (dbs   (if peer (peer-dbs peer) '()))
			    (dbshash (udat-dbowners udata)))
		       (for-each (lambda (dbfile)(hash-table-delete! dbshash dbfile)) dbs)
		       (hash-table-delete! (udat-peers udata) host-port)
		       (write-line qrykey oup)
		       (close-input-port inp)
		       (close-output-port oup)))
		    ((dropcaptain)
		     ;; remove all traces of the captain
		     (udat-captain-address-set! udata #f)
		     (udat-captain-host-set!    udata #f)
		     (udat-captain-port-set!    udata #f)
		     (udat-captain-pid-set!     udata #f)
		     (write-line qrykey oup)
		     (close-input-port inp)
		     (close-output-port oup))
		    ((rucaptain) ;; remote is asking if I'm the captain
		     (write-line (if (udat-my-cpkt-key udata) "yes" "no"))
		     (close-input-port inp)
		     (close-output-port oup))
		    ((whoowns) ;; given a db name who do I send my queries to
		     ;; look up the file in handlers, if have an entry ping them to be sure
		     ;; they are still alive and then return that host:port.
		     ;; if no handler found or if the ping fails pick from peers the oldest that
		     ;; is managing the fewest dbs
		     #f)
		    (else
		     (add-to-work-queue udata (get-peer-dat udata host-port) handlerkey qrykey data))))
		 (else (print "BAD DATA? controldat=" controldat " data=" data)))))
	(loop state))))

;; add a proc to the handler list
(define (register-handler udata key proc)
  (hash-table-set! (udat-handlers udata) key proc))


;;======================================================================
;; work queues
;;======================================================================

(define (add-to-work-queue udata peer-dat handlerkey qrykey data)
  (let ((wdat (make-work peer-dat: peer-dat handlerkey: handlerkey qrykey: qrykey data: data)))
    (if (udat-busy udata)
	(queue-add! (udat-work-queue udata) wdat)
	(process-work udata wdat)) ;; passing in wdat tells process-work to first process the passed in wdat
    ))

(define (do-work udata wdat)
  #f)

(define (process-work udata #!optional wdat)
  (if wdat (do-work udata wdat)) ;; process wdat
  (let ((wqueue (udat-work-queue udata)))
    (if (not (queue-empty? wqueue))
	(let loop ((wd (queue-remove! wqueue)))
	  (do-work udata wd)
	  (if (not (queue-empty? wqueue))
	      (loop (queue-remove! wqueue)))))))

;;======================================================================
;; Generic db handling
;;   setup a inmem db instance
;;   open connection to on-disk db
;;   sync on-disk db to inmem
;;   get lock in on-disk db for dbowner of this db