Overview
Comment: | Refactored handler loop to facilitate calling locally |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | v1.70-captain-ulex | v1.70-defunct-try |
Files: | files | file ages | folders |
SHA1: |
a2267e910db658798619a48f050321a4 |
User & Date: | matt on 2020-01-20 11:20:44 |
Other Links: | branch diff | manifest | tags |
Context
2020-01-20
| ||
22:34 | Scaffolding for send-receive check-in: 3f613cadf2 user: matt tags: v1.70-captain-ulex, v1.70-defunct-try | |
11:20 | Refactored handler loop to facilitate calling locally check-in: a2267e910d user: matt tags: v1.70-captain-ulex, v1.70-defunct-try | |
03:34 | Added drop captain send to all peers check-in: d20ee11c75 user: matt tags: v1.70-captain-ulex, v1.70-defunct-try | |
Changes
Modified ulex/ulex.scm from [e30b68cf23] to [48c02e2743].
︙ | ︙ | |||
254 255 256 257 258 259 260 | ;; - create pkt ;; - start server port handler ;; (define (setup-as-captain udata) (if (start-server-find-port udata) ;; puts the server in udata (if (create-captain-pkt udata) (let* ((th (make-thread (lambda () | | | 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 | ;; - create pkt ;; - start server port handler ;; (define (setup-as-captain udata) (if (start-server-find-port udata) ;; puts the server in udata (if (create-captain-pkt udata) (let* ((th (make-thread (lambda () (ulex-handler-loop udata)) "Captain handler"))) (udat-handler-thread-set! udata th) (thread-start! th)) #f) #f)) ;; given a pkts dir read ;; |
︙ | ︙ | |||
447 448 449 450 451 452 453 | (mbox-receive-time (current-milliseconds))) (hash-table-delete! mboxes qrykey) (if (eq? res 'MBOX_TIMEOUT) #f res)) #f))) ;; #f means failed to communicate | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | > | < < | < | < < < < < < < < < < < < < < < | | < < < < < < < < < < < < < < < < < < < | < < < < < < < < < < < < < | | 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 | (mbox-receive-time (current-milliseconds))) (hash-table-delete! mboxes qrykey) (if (eq? res 'MBOX_TIMEOUT) #f res)) #f))) ;; #f means failed to communicate ;; (define (ulex-handler udata controldat data) (print "controldat: " controldat " data: " data) (match (string-split controldat) ((handlerkey host-port pid qrykey params ...) (print "handlerkey: " handlerkey " host-port: " host-port " pid: " pid " qrykey: " qrykey " params: " params) (case (string->symbol handlerkey) ((ack)(print "Got ack!")) ((ping) ;; special case - return result immediately on the same connection (let* ((proc (hash-table-ref/default (udat-handlers udata) 'ping #f)) (val (if proc (proc) "gotping")) (peer (make-peer addr-port: host-port pid: pid)) (dbshash (udat-dbowners udata))) (peer-dbs-set! peer params) ;; params for ping is list of dbs owned by pinger (for-each (lambda (dbfile) (hash-table-set! dbshash dbfile host-port)) params) ;; register each db in the dbshash (if (not (hash-table-exists? (udat-peers udata) host-port)) (hash-table-set! (udat-peers udata) host-port peer)) ;; save the details of this caller in peers qrykey)) ;; End of ping ((goodbye) ;; remove all traces of the caller in db ownership etc. (let* ((peer (hash-table-ref/default (udat-peers udata) host-port #f)) (dbs (if peer (peer-dbs peer) '())) (dbshash (udat-dbowners udata))) (for-each (lambda (dbfile)(hash-table-delete! dbshash dbfile)) dbs) (hash-table-delete! (udat-peers udata) host-port) qrykey)) ((dropcaptain) ;; remove all traces of the captain (udat-captain-address-set! udata #f) (udat-captain-host-set! udata #f) (udat-captain-port-set! udata #f) (udat-captain-pid-set! udata #f) qrykey) ((rucaptain) ;; remote is asking if I'm the captain (if (udat-my-cpkt-key udata) "yes" "no")) ((whoowns) ;; given a db name who do I send my queries to ;; look up the file in handlers, if have an entry ping them to be sure ;; they are still alive and then return that host:port. ;; if no handler found or if the ping fails pick from peers the oldest that ;; is managing the fewest dbs #f) (else (add-to-work-queue udata (get-peer-dat udata host-port) handlerkey qrykey data) #f))) (else (print "BAD DATA? controldat=" controldat " data=" data) #f)));; handles the incoming messages and dispatches to queues ;; (define (ulex-handler-loop udata) (let* ((serv-listener (udat-serv-listener udata))) ;; data comes as two lines ;; handlerkey resp-addr:resp-port hostname pid qrykey [dbpath/dbfile.db] ;; data (let loop ((state 'start)) (let-values (((inp oup)(tcp-accept serv-listener))) (let* ((controldat (read-line inp)) (data (read-line inp)) (resp (ulex-handler udata controldat data))) (if resp (write-line resp oup)) (close-input-port inp) (close-output-port oup)) (loop state))))) ;; add a proc to the handler list (define (register-handler udata key proc) (hash-table-set! (udat-handlers udata) key proc)) ;;====================================================================== |
︙ | ︙ |