Overview
Comment: | wip-cleaning-up-send-receive |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | v1.70-captain-ulex | v1.70-defunct-try |
Files: | files | file ages | folders |
SHA1: |
1dbae7035b53065ca2f4215ca76f6866 |
User & Date: | matt on 2020-01-22 22:33:23 |
Other Links: | branch diff | manifest | tags |
Context
2020-01-28
| ||
20:07 | some forgotten changes... hopefully good changes. check-in: f86d0abaad user: matt tags: v1.70-captain-ulex, v1.70-defunct-try | |
2020-01-22
| ||
22:33 | wip-cleaning-up-send-receive check-in: 1dbae7035b user: matt tags: v1.70-captain-ulex, v1.70-defunct-try | |
2020-01-21
| ||
19:19 | Switch from write-line and read-line to write and read for transport across the tcp connection check-in: 124ed3f5a6 user: matt tags: v1.70-captain-ulex, v1.70-defunct-try | |
Changes
Modified rmtmod.scm from [1d07182ee0] to [0b58aa78dc].
︙ | ︙ | |||
96 97 98 99 100 101 102 | ;; set up a connection to the current owner of the dbfile associated with rid ;; then send the query to that dbfile owner and wait for a response. ;; (define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f)) ;; start attemptnum at 1 so the modulo below works as expected (let* ((alldat *alldat*) (areapath (alldat-areapath alldat)) (dbtype (if (or (not rid)(< rid 1)) ;; this is the criteria for "main.db" | | | > | < | 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 | ;; set up a connection to the current owner of the dbfile associated with rid ;; then send the query to that dbfile owner and wait for a response. ;; (define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f)) ;; start attemptnum at 1 so the modulo below works as expected (let* ((alldat *alldat*) (areapath (alldat-areapath alldat)) (dbtype (if (or (not rid)(< rid 1)) ;; this is the criteria for "main.db" "main" "runs")) (dbfname (if (equal? dbtype "main") "main.db" (conc rid ".db"))) (dbfile (conc areapath "/.db/" dbfname)) (ulexconn (rmt:connect alldat dbfname dbtype)) ;; ulexconn is our new *runremote*, it is a dbowner struct < pdat lastrefresh > (udata (alldat-ulexdat alldat))) (ulex:remote-request udata ulexconn 'immediate dbfile 'execute rid params))) ;; need to call this on the other side ;; (api:execute-requests dbstruct-local (vector (symbol->string cmd) params)))) #;(with-input-from-string (ulex:remote-request udata ulexconn 'immediate dbfile 'execute rid (with-output-to-string (lambda ()(serialize params)))) (lambda ()(deserialize))) |
︙ | ︙ |
Modified ulex/ulex.scm from [f8b8e960d6] to [e99055ba4e].
︙ | ︙ | |||
77 78 79 80 81 82 83 | (remove-captain-pkt udata captn) (setup))))) (begin (setup-as-captain udata) ;; this saves the thread to captain-thread and starts the thread (setup))))) ;; connect to a specific dbfile | | | | < | | | 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 | (remove-captain-pkt udata captn) (setup))))) (begin (setup-as-captain udata) ;; this saves the thread to captain-thread and starts the thread (setup))))) ;; connect to a specific dbfile ;; - if already connected - return the dbowner host-port ;; - ask the captain who to talk to for this db ;; - put the entry in the dbowners hash as dbfile => host-port ;; (define (connect udata dbfname dbtype) (or (hash-table-ref/default (udat-dbowners udata) dbfname #f) (let-values (((success dbowner-host-port)(get-db-owner udata dbfname dbtype))) (if success (begin ;; just clobber the record, this is the new data no matter what (hash-table-set! (udat-dbowners udata) dbfname dbowner-host-port) dbowner-host-port) #f)))) ;; returns: success pingtime ;; ;; NOTE: causes the callee to store the info on this host along with the dbs this host currently owns ;; (define (ping udata host-port) |
︙ | ︙ | |||
153 154 155 156 157 158 159 | (let* ((dbrec (ulex-open-db udata dbname)) ;; this will be a dbconn record, looks for in udata first (proc (hash-table-ref udata prockey))) (let* ((result (proc dbrec procparam data))) result))) ;; remote-request - send to remote to process in process-request ;; uconn comes from a call to connect and can be used instead of calling connect again | < | | < < | | 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 | (let* ((dbrec (ulex-open-db udata dbname)) ;; this will be a dbconn record, looks for in udata first (proc (hash-table-ref udata prockey))) (let* ((result (proc dbrec procparam data))) result))) ;; remote-request - send to remote to process in process-request ;; uconn comes from a call to connect and can be used instead of calling connect again ;; uconn is the host-port to call ;; we send dbname to the worker so they know which file to open ;; data must be a string with no newlines, it will be handed to the proc ;; at the remote site unchanged. It is up to the user to encode/decode it's contents ;; ;; rtype: immediate, read-only, normal, low-priority ;; (define (remote-request udata uconn rtype dbname prockey procparam data) (let* ((cookie (make-cookie udata))) (send-receive udata uconn rtype cookie data `(,prockey procparam)))) (define (ulex-open-db udata dbname) #f) ;;====================================================================== ;; network utilities ;;====================================================================== |
︙ | ︙ | |||
298 299 300 301 302 303 304 | (defstruct work (peer-dat #f) (handlerkey #f) (qrykey #f) (data #f) (start (current-milliseconds))) | | | 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 | (defstruct work (peer-dat #f) (handlerkey #f) (qrykey #f) (data #f) (start (current-milliseconds))) #;(defstruct dbowner (pdat #f) (last-update (current-seconds))) ;;====================================================================== ;; Captain functions ;;====================================================================== |
︙ | ︙ | |||
414 415 416 417 418 419 420 | peers))) ;;====================================================================== ;; server primitives ;;====================================================================== (define (make-cookie udata) | | | 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 | peers))) ;;====================================================================== ;; server primitives ;;====================================================================== (define (make-cookie udata) (let ((newcnum (+ (udat-cnum udata) 1))) (udat-cnum-set! udata newcnum) (conc (udat-my-address udata) ":" (udat-my-port udata) "-" (udat-my-pid udata) "-" newcnum))) ;; create a tcp listener and return a populated udat struct with |
︙ | ︙ | |||
536 537 538 539 540 541 542 | ((ping) ;; special case - return result immediately on the same connection (let* ((proc (hash-table-ref/default (udat-handlers udata) 'ping #f)) (val (if proc (proc) "gotping")) (peer (make-peer addr-port: host-port pid: pid)) (dbshash (udat-dbowners udata))) (peer-dbs-set! peer params) ;; params for ping is list of dbs owned by pinger (for-each (lambda (dbfile) | | | 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 | ((ping) ;; special case - return result immediately on the same connection (let* ((proc (hash-table-ref/default (udat-handlers udata) 'ping #f)) (val (if proc (proc) "gotping")) (peer (make-peer addr-port: host-port pid: pid)) (dbshash (udat-dbowners udata))) (peer-dbs-set! peer params) ;; params for ping is list of dbs owned by pinger (for-each (lambda (dbfile) (hash-table-set! dbshash dbfile host-port)) ;; WRONG? params) ;; register each db in the dbshash (if (not (hash-table-exists? (udat-peers udata) host-port)) (hash-table-set! (udat-peers udata) host-port peer)) ;; save the details of this caller in peers qrykey)) ;; End of ping ((goodbye) ;; remove all traces of the caller in db ownership etc. (let* ((peer (hash-table-ref/default (udat-peers udata) host-port #f)) |
︙ | ︙ | |||
565 566 567 568 569 570 571 | ((db-owner) ;; given a db name who do I send my queries to ;; look up the file in handlers, if have an entry ping them to be sure ;; they are still alive and then return that host:port. ;; if no handler found or if the ping fails pick from peers the oldest that ;; is managing the fewest dbs (match params ((dbfile dbtype) | | < > | | > | 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 | ((db-owner) ;; given a db name who do I send my queries to ;; look up the file in handlers, if have an entry ping them to be sure ;; they are still alive and then return that host:port. ;; if no handler found or if the ping fails pick from peers the oldest that ;; is managing the fewest dbs (match params ((dbfile dbtype) (let* ((owner-host-port (hash-table-ref/default (udat-dbowners udata) dbfile #f))) (if owner-host-port (conc qrykey " " owner-host-port) (let* ((pdat (or (hash-table-ref/default (udat-peers udata) host-port #f) ;; no owner - caller gets to own it! (make-peer addr-port: host-port pid: pid dbs: `(,dbfile))))) (hash-table-set! (udat-peers udata) host-port pdat) (hash-table-set! (udat-dbowners udata) dbfile host-port) (conc qrykey " " host-port))))) (else (conc qrykey " BADDATA")))) ;; for work items: ;; handler is one of; immediate, read-only, read-write, high-priority ((immediate read-only normal low-priority) ;; do this work immediately ;; host-port (caller), pid (caller), qrykey (cookie), params <= all from first line ;; data => a single line encoded however you want, or should I build json into it? (let* ((pdat (get-peer-dat udata host-port))) (match params ;; dbfile prockey procparam ((dbfile prockey procparam) (case (string->symbol handlerkey) ((immediate read-only) (process-request udata pdat dbfile qrykey prockey procparam data)) ((normal low-priority) ;; split off later and add logic to support low priority (add-to-work-queue udata pdat dbfile qrykey prockey procparam data)) (else #f))) (else (print "ERROR: params=" params))))) (else ;; (add-to-work-queue udata (get-peer-dat udata host-port) handlerkey qrykey data) #f))) (else (print "BAD DATA? controldat=" controldat " data=" data) #f)));; handles the incoming messages and dispatches to queues |
︙ | ︙ |