Overview
Comment: | Added drop captain send to all peers |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | v1.70-captain-ulex | v1.70-defunct-try |
Files: | files | file ages | folders |
SHA1: |
d20ee11c7592daf356b6fa02ee8308b4 |
User & Date: | matt on 2020-01-20 03:34:04 |
Other Links: | branch diff | manifest | tags |
Context
2020-01-20
| ||
11:20 | Refactored handler loop to facilitate calling locally check-in: a2267e910d user: matt tags: v1.70-captain-ulex, v1.70-defunct-try | |
03:34 | Added drop captain send to all peers check-in: d20ee11c75 user: matt tags: v1.70-captain-ulex, v1.70-defunct-try | |
03:01 | Speculatively removed inp and oup saving and close all ports check-in: 5263e9f2ce user: matt tags: v1.70-captain-ulex, v1.70-defunct-try | |
Changes
Modified ulex/ulex.scm from [e8344f5ae3] to [e30b68cf23].
︙ | ︙ | |||
208 209 210 211 212 213 214 | ;; app info (appname #f) (dbtypes (make-hash-table)) ;; this should be an alist but hash is easier. dbtype => [ initproc syncproc ] ;; cookies (cnum 0) ;; cookie num ) | < | 208 209 210 211 212 213 214 215 216 217 218 219 220 221 | ;; app info (appname #f) (dbtypes (make-hash-table)) ;; this should be an alist but hash is easier. dbtype => [ initproc syncproc ] ;; cookies (cnum 0) ;; cookie num ) (define (udat-my-host-port udata) (if (and (udat-my-address udata)(udat-my-port udata)) (conc (udat-my-address udata) ":" (udat-my-port udata)) #f)) (define (udat-captain-host-port udata) (if (and (udat-captain-address udata)(udat-captain-port udata)) |
︙ | ︙ | |||
323 324 325 326 327 328 329 330 331 332 333 334 335 336 | ;; remove pkt associated with captn (the Z key .pkt) ;; (define (remove-captain-pkt udata captn) (let ((Z (alist-ref 'Z captn)) (cpktdir (udat-cpkts-dir udata))) (delete-file* (conc cpktdir "/" Z ".pkt")))) ;;====================================================================== ;; server primitives ;;====================================================================== (define (make-cookie udata) (let ((newcnum (+ (udat-cnum udata)))) | > > > > > > > > > > > > > > > | 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 | ;; remove pkt associated with captn (the Z key .pkt) ;; (define (remove-captain-pkt udata captn) (let ((Z (alist-ref 'Z captn)) (cpktdir (udat-cpkts-dir udata))) (delete-file* (conc cpktdir "/" Z ".pkt")))) ;; call all known peers and tell them to delete their info on the captain ;; thus forcing them to re-read pkts and connect to a new captain ;; call this when the captain needs to exit and if an older captain is ;; detected. Due to delays in sending file meta data in NFS multiple ;; captains can be initiated in a "Storm of Captains", book soon to be ;; on Amazon ;; (define (drop-captain udata) (let* ((peers (hash-table-keys (udat-peers udata))) (cookie (make-cookie udata))) (for-each (lambda (host-port) (send udata host-port 'dropcaptain cookie "nomsg" retval: #t)) peers))) ;;====================================================================== ;; server primitives ;;====================================================================== (define (make-cookie udata) (let ((newcnum (+ (udat-cnum udata)))) |
︙ | ︙ | |||
358 359 360 361 362 363 364 | (udat-my-address-set! udata addr) (udat-my-port-set! udata port) (udat-my-hostname-set! udata (get-host-name)) (udat-serv-listener-set! udata tlsn) udata)) (define (get-peer-dat udata host-port #!optional (hostname #f)(pid #f)) | < < < | 372 373 374 375 376 377 378 379 380 381 382 383 384 385 | (udat-my-address-set! udata addr) (udat-my-port-set! udata port) (udat-my-hostname-set! udata (get-host-name)) (udat-serv-listener-set! udata tlsn) udata)) (define (get-peer-dat udata host-port #!optional (hostname #f)(pid #f)) (let* ((pdat (or (udat-get-peer udata host-port) (handle-exceptions ;; ERROR - MAKE THIS EXCEPTION HANDLER MORE SPECIFIC exn #f (let ((npdat (make-peer addr-port: host-port))) (if hostname (peer-hostname-set! npdat hostname)) (if pid (peer-pid-set! npdat pid)) |
︙ | ︙ | |||
436 437 438 439 440 441 442 | (mbox-receive-time (current-milliseconds))) (hash-table-delete! mboxes qrykey) (if (eq? res 'MBOX_TIMEOUT) #f res)) #f))) ;; #f means failed to communicate | < < < < < < | < < < < < < < < < < < < < | 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 | (mbox-receive-time (current-milliseconds))) (hash-table-delete! mboxes qrykey) (if (eq? res 'MBOX_TIMEOUT) #f res)) #f))) ;; #f means failed to communicate ;; handles the incoming messages and dispatches to queues ;; (define (ulex-handler udata) (let* ((serv-listener (udat-serv-listener udata))) (print "serv-listner: " serv-listener) ;; data comes as two lines ;; handlerkey resp-addr:resp-port hostname pid qrykey [dbpath/dbfile.db] ;; data |
︙ | ︙ | |||
498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 | (dbs (if peer (peer-dbs peer) '())) (dbshash (udat-dbowners udata))) (for-each (lambda (dbfile)(hash-table-delete! dbshash dbfile)) dbs) (hash-table-delete! (udat-peers udata) host-port) (write-line qrykey oup) (close-input-port inp) (close-output-port oup))) ((rucaptain) ;; remote is asking if I'm the captain (write-line (if (udat-my-cpkt-key udata) "yes" "no")) (close-input-port inp) (close-output-port oup)) ((whoowns) ;; given a db name who do I send my queries to ;; look up the file in handlers, if have an entry ping them to be sure ;; they are still alive and then return that host:port. ;; if no handler found or if the ping fails pick from peers the oldest that ;; is managing the fewest dbs #f) (else (add-to-work-queue udata (get-peer-dat udata host-port) handlerkey qrykey data)))) (else (print "BAD DATA? controldat=" controldat " data=" data))))) (loop state)))) ;; add a proc to the handler list (define (register-handler udata key proc) (hash-table-set! (udat-handlers udata) key proc)) ;;====================================================================== ;; Generic db handling ;; setup a inmem db instance ;; open connection to on-disk db ;; sync on-disk db to inmem ;; get lock in on-disk db for dbowner of this db | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 | (dbs (if peer (peer-dbs peer) '())) (dbshash (udat-dbowners udata))) (for-each (lambda (dbfile)(hash-table-delete! dbshash dbfile)) dbs) (hash-table-delete! (udat-peers udata) host-port) (write-line qrykey oup) (close-input-port inp) (close-output-port oup))) ((dropcaptain) ;; remove all traces of the captain (udat-captain-address-set! udata #f) (udat-captain-host-set! udata #f) (udat-captain-port-set! udata #f) (udat-captain-pid-set! udata #f) (write-line qrykey oup) (close-input-port inp) (close-output-port oup)) ((rucaptain) ;; remote is asking if I'm the captain (write-line (if (udat-my-cpkt-key udata) "yes" "no")) (close-input-port inp) (close-output-port oup)) ((whoowns) ;; given a db name who do I send my queries to ;; look up the file in handlers, if have an entry ping them to be sure ;; they are still alive and then return that host:port. ;; if no handler found or if the ping fails pick from peers the oldest that ;; is managing the fewest dbs #f) (else (add-to-work-queue udata (get-peer-dat udata host-port) handlerkey qrykey data)))) (else (print "BAD DATA? controldat=" controldat " data=" data))))) (loop state)))) ;; add a proc to the handler list (define (register-handler udata key proc) (hash-table-set! (udat-handlers udata) key proc)) ;;====================================================================== ;; work queues ;;====================================================================== (define (add-to-work-queue udata peer-dat handlerkey qrykey data) (let ((wdat (make-work peer-dat: peer-dat handlerkey: handlerkey qrykey: qrykey data: data))) (if (udat-busy udata) (queue-add! (udat-work-queue udata) wdat) (process-work udata wdat)) ;; passing in wdat tells process-work to first process the passed in wdat )) (define (do-work udata wdat) #f) (define (process-work udata #!optional wdat) (if wdat (do-work udata wdat)) ;; process wdat (let ((wqueue (udat-work-queue udata))) (if (not (queue-empty? wqueue)) (let loop ((wd (queue-remove! wqueue))) (do-work udata wd) (if (not (queue-empty? wqueue)) (loop (queue-remove! wqueue))))))) ;;====================================================================== ;; Generic db handling ;; setup a inmem db instance ;; open connection to on-disk db ;; sync on-disk db to inmem ;; get lock in on-disk db for dbowner of this db |
︙ | ︙ |