Overview
Comment: | Scaffolding for send-receive |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | v1.70-captain-ulex | v1.70-defunct-try |
Files: | files | file ages | folders |
SHA1: |
3f613cadf24fb972ea5485fe861a6d3a |
User & Date: | matt on 2020-01-20 22:34:03 |
Other Links: | branch diff | manifest | tags |
Context
2020-01-21
| ||
19:19 | Switch from write-line and read-line to write and read for transport across the tcp connection check-in: 124ed3f5a6 user: matt tags: v1.70-captain-ulex, v1.70-defunct-try | |
2020-01-20
| ||
22:34 | Scaffolding for send-receive check-in: 3f613cadf2 user: matt tags: v1.70-captain-ulex, v1.70-defunct-try | |
11:20 | Refactored handler loop to facilitate calling locally check-in: a2267e910d user: matt tags: v1.70-captain-ulex, v1.70-defunct-try | |
Changes
Modified rmtmod.scm from [a1525563eb] to [3b1ae2dfc7].
︙ | ︙ | |||
26 27 28 29 30 31 32 | (declare (uses itemsmod)) (declare (uses ulex)) (module rmtmod * (import scheme chicken data-structures extras) | | > | 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | (declare (uses itemsmod)) (declare (uses ulex)) (module rmtmod * (import scheme chicken data-structures extras) (import (prefix sqlite3 sqlite3:) posix typed-records srfi-18 srfi-69 format ports srfi-1 matchable s11n) (import (prefix ulex ulex:)) (import commonmod) (import itemsmod) (import apimod) (import dbmod) |
︙ | ︙ | |||
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 | ;; setup the remote calls (define (rmt:setup-ulex alldat) (let* ((udata (ulex:setup))) ;; establish connection to ulex (alldat-ulexdat-set! alldat udata) ;; register all needed procs (ulex:register-handler udata 'ping common:get-full-version) ;; override ping with get-full-version (ulex:register-handler udata 'login common:get-full-version) ;; force setup of the connection udata)) ;; set up a connection to the current owner of the dbfile associated with rid ;; then send the query to that dbfile owner and wait for a response. ;; (define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f)) ;; start attemptnum at 1 so the modulo below works as expected (let* ((alldat *alldat*) (areapath (alldat-areapath alldat)) (dbtype (if (or (not rid)(< rid 1)) ;; this is the criteria for "main.db" 'main 'runs)) (dbfname (if (eq? dbtype 'main) "main.db" (conc rid ".db"))) (dbfile (conc areapath "/.db/" dbfname)) | > > | > > > > > > | | 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 | ;; setup the remote calls (define (rmt:setup-ulex alldat) (let* ((udata (ulex:setup))) ;; establish connection to ulex (alldat-ulexdat-set! alldat udata) ;; register all needed procs (ulex:register-handler udata 'ping common:get-full-version) ;; override ping with get-full-version (ulex:register-handler udata 'login common:get-full-version) ;; force setup of the connection (ulex:register-handler udata 'execute api:execute-requests) udata)) ;; set up a connection to the current owner of the dbfile associated with rid ;; then send the query to that dbfile owner and wait for a response. ;; (define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f)) ;; start attemptnum at 1 so the modulo below works as expected (let* ((alldat *alldat*) (areapath (alldat-areapath alldat)) (dbtype (if (or (not rid)(< rid 1)) ;; this is the criteria for "main.db" 'main 'runs)) (dbfname (if (eq? dbtype 'main) "main.db" (conc rid ".db"))) (dbfile (conc areapath "/.db/" dbfname)) (udata (alldat-ulexdat alldat)) (ulexconn (rmt:connect alldat dbfname dbtype))) ;; ulexconn is our new *runremote*, it is a dbowner struct < pdat lastrefresh > ;; need to call this on the other side ;; (api:execute-requests dbstruct-local (vector (symbol->string cmd) params)))) (with-input-from-string (ulex:remote-request udata ulexconn 'immediate dbfile 'execute rid (with-output-to-string (lambda ()(serialize params)))) (lambda ()(deserialize))))) #;(rmt:open-qry-close-locally cmd 0 params) ;; ;; ;; #;(common:telemetry-log (conc "rmt:"(->string cmd)) ;; ;; #;(define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f)) ;; start attemptnum at 1 so the modulo below works as expected ;; ;; ;; ;; #;(common:telemetry-log (conc "rmt:"(->string cmd)) ;; ;; payload: `((rid . ,rid) |
︙ | ︙ | |||
237 238 239 240 241 242 243 | ;; ;; ;; not on homehost, do server query ;; (else (extras-case-11 *default-log-port* runremote cmd params attemptnum rid))))) ;; bunch of small functions factored out of send-receive to make debug easier ;; | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 | ;; ;; ;; not on homehost, do server query ;; (else (extras-case-11 *default-log-port* runremote cmd params attemptnum rid))))) ;; bunch of small functions factored out of send-receive to make debug easier ;; ;;(define (extras-case-11 *default-log-port* runremote cmd params attemptnum rid) ;; ;; (mutex-unlock! *rmt-mutex*) ;; (debug:print-info 12 *default-log-port* "rmt:send-receive, case 9") ;; ;; (mutex-lock! *rmt-mutex*) ;; (let* ((conninfo (remote-conndat runremote)) ;; (dat (case (remote-transport runremote) ;; ((http) (condition-case ;; handling here has ;; ;; caused a lot of ;; ;; problems. However it ;; ;; is needed to deal with ;; ;; attemtped ;; ;; communication to ;; ;; servers that have gone ;; ;; away ;; #;(http-transport:client-api-send-receive 0 conninfo cmd params) ;; ((commfail)(vector #f "communications fail")) ;; ((exn)(vector #f "other fail" (print-call-chain))))) ;; (else ;; (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport runremote) " not supported") ;; (exit)))) ;; (success (if (vector? dat) (vector-ref dat 0) #f)) ;; (res (if (vector? dat) (vector-ref dat 1) #f))) ;; (if (and (vector? conninfo) (< 5 (vector-length conninfo))) ;; #t #;(http-transport:server-dat-update-last-access conninfo) ;; refresh access time ;; (begin ;; (debug:print 0 *default-log-port* "INFO: Should not get here! conninfo=" conninfo) ;; (set! conninfo #f) ;; (remote-conndat-set! *runremote* #f) ;; NOTE: *runremote* is global copy of runremote. Purpose: factor out global. ;; #;(http-transport:close-connections area-dat: runremote))) ;; (debug:print-info 13 *default-log-port* "rmt:send-receive, case 9. conninfo=" conninfo " dat=" dat " runremote = " runremote) ;; (mutex-unlock! *rmt-mutex*) ;; (if success ;; success only tells us that the transport was ;; ;; successful, have to examine the data to see if ;; ;; there was a detected issue at the other end ;; (extras-transport-succeded *default-log-port* *rmt-mutex* attemptnum runremote res params rid cmd) ;; (extras-transport-failed *default-log-port* *rmt-mutex* attemptnum runremote cmd rid params) ;; ))) ;; (define (rmt:update-db-stats run-id rawcmd params duration) ;; (mutex-lock! *db-stats-mutex*) ;; (handle-exceptions ;; exn ;; (begin ;; (debug:print 0 *default-log-port* "WARNING: stats collection failed in update-db-stats") |
︙ | ︙ |
Modified ulex/ulex.scm from [48c02e2743] to [210fc6977d].
|
| | | 1 2 3 4 5 6 7 8 | ;; ulex: Distributed sqlite3 db ;;; ;; Copyright (C) 2018 Matt Welland ;; Redistribution and use in source and binary forms, with or without ;; modification, is permitted. ;; ;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS ;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
︙ | ︙ | |||
77 78 79 80 81 82 83 84 | (remove-captain-pkt udata captn) (setup))))) (begin (setup-as-captain udata) ;; this saves the thread to captain-thread and starts the thread (setup))))) ;; connect to a specific dbfile (define (connect udata dbfname dbtype) | > > > > | > > > > > > > > | 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 | (remove-captain-pkt udata captn) (setup))))) (begin (setup-as-captain udata) ;; this saves the thread to captain-thread and starts the thread (setup))))) ;; connect to a specific dbfile ;; - if already connected - return the pdat ;; - ask the captain who to talk to for this db ;; - put the entry in the dbowners hash ;; (define (connect udata dbfname dbtype) (or (hash-table-ref/default (udat-dbowners udata) dbfname #f) (let-values (((success dbowner-host-port)(get-db-owner udata dbfname dbtype))) (if success (let* ((pdat (udat-get-peer udata dbowner-host-port)) (dbowner (make-dbowner pdat: pdat))) ;; just clobber the record, this is the new data no matter what (hash-table-set! (udat-dbowners udata) dbowner-host-port dbowner) dbowner) #f)))) ;; returns: success pingtime ;; ;; NOTE: causes the callee to store the info on this host along with the dbs this host currently owns ;; (define (ping udata host-port) (let* ((start (current-milliseconds)) |
︙ | ︙ | |||
110 111 112 113 114 115 116 117 118 119 120 121 122 123 | (values (equal? res cookie) delta))) (define (goodbye-captain udata) (let* ((host-port (udat-captain-host-port udata))) (if host-port (goodbye-ping udata host-port) (values #f -1)))) ;;====================================================================== ;; network utilities ;;====================================================================== (define (rate-ip ipaddr) (regex-case ipaddr | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 | (values (equal? res cookie) delta))) (define (goodbye-captain udata) (let* ((host-port (udat-captain-host-port udata))) (if host-port (goodbye-ping udata host-port) (values #f -1)))) (define (get-db-owner udata dbname dbtype) (let* ((host-port (udat-captain-host-port udata))) (if host-port (let* ((cookie (make-cookie udata)) (msg #f) ;; (conc dbname " " dbtype)) (params `(,dbname ,dbtype)) (res (send udata host-port 'db-owner cookie msg params: params retval: #t))) (match (string-split res) ((retcookie owner-host-port) (values (equal? retcookie cookie) owner-host-port)))) (values #f -1)))) ;; called in ulex-handler to dispatch work, called on the workers side ;; calls (proc params data) ;; returns result with cookie ;; ;; pdat is the info of the caller, used to send the result data ;; prockey is key into udat-handlers hash dereferencing a proc ;; procparam is a first param handed to proc - often to do further derefrencing ;; NOTE: params is intended to be a list of strings, encoding on data ;; is up to the user but data must be a single line ;; (define (process-request udata pdat dbname cookie prockey procparam data) (let* ((dbrec (ulex-open-db udata dbname)) ;; this will be a dbconn record, looks for in udata first (proc (hash-table-ref udata prockey))) (let* ((result (proc dbrec procparam data))) result))) ;; remote-request - send to remote to process in process-request ;; uconn comes from a call to connect and can be used instead of calling connect again ;; uconn is somewhat redundant with dbname but it tells us what host-port to call ;; uconn is a dbowner struct < pdat lastupdate > ;; we send dbname to the worker so they know which file to open ;; data must be a string with no newlines, it will be handed to the proc ;; at the remote site unchanged. It is up to the user to encode/decode it's contents ;; ;; rtype: immediate, read-only, normal, low-priority ;; (define (remote-request udata uconn rtype dbname prockey procparam data) (let* ((cookie (make-cookie)) (pdat (dbowner-pdat uconn)) (host-port (peer-addr-port pdat))) (send-receive udata host-port rtype cookie data `(,prockey procparam)))) (define (ulex-open-db udata dbname) #f) ;;====================================================================== ;; network utilities ;;====================================================================== (define (rate-ip ipaddr) (regex-case ipaddr |
︙ | ︙ | |||
197 198 199 200 201 202 203 | (serv-listener #f) ;; this processes server info (handler-thread #f) (mboxes (make-hash-table)) ;; key => mbox ;; other servers (peers (make-hash-table)) ;; host-port => peer record (dbowners (make-hash-table)) ;; dbfile => host-port (handlers (make-hash-table)) ;; dbfile => proc | | | 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 | (serv-listener #f) ;; this processes server info (handler-thread #f) (mboxes (make-hash-table)) ;; key => mbox ;; other servers (peers (make-hash-table)) ;; host-port => peer record (dbowners (make-hash-table)) ;; dbfile => host-port (handlers (make-hash-table)) ;; dbfile => proc ;; (outgoing-conns (make-hash-table)) ;; host:port -> conn (work-queue (make-queue)) ;; most stuff goes here ;; (fast-queue (make-queue)) ;; super quick stuff goes here (e.g. ping) (busy #f) ;; is either of the queues busy, use to switch between queuing tasks or doing immediately ;; app info (appname #f) (dbtypes (make-hash-table)) ;; this should be an alist but hash is easier. dbtype => [ initproc syncproc ] ;; cookies |
︙ | ︙ | |||
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 | (defstruct work (peer-dat #f) (handlerkey #f) (qrykey #f) (data #f) (start (current-milliseconds))) ;;====================================================================== ;; Captain functions ;;====================================================================== ;; NB// This needs to be started in a thread ;; ;; setup to be a captain ;; - start server ;; - create pkt ;; - start server port handler ;; (define (setup-as-captain udata) (if (start-server-find-port udata) ;; puts the server in udata (if (create-captain-pkt udata) | > > > > > > | > > > > | > > | | 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 | (defstruct work (peer-dat #f) (handlerkey #f) (qrykey #f) (data #f) (start (current-milliseconds))) (defstruct dbowner (pdat #f) (last-update (current-seconds))) ;;====================================================================== ;; Captain functions ;;====================================================================== ;; NB// This needs to be started in a thread ;; ;; setup to be a captain ;; - start server ;; - create pkt ;; - start server port handler ;; (define (setup-as-captain udata) (if (start-server-find-port udata) ;; puts the server in udata (if (create-captain-pkt udata) (let* ((my-addr (udat-my-address udata)) (my-port (udat-my-port udata)) (th (make-thread (lambda () (ulex-handler-loop udata)) "Captain handler"))) (udat-handler-thread-set! udata th) (udat-captain-address-set! udata my-addr) (udat-captain-port-set! udata my-port) (thread-start! th)) (begin (print "ERROR: failed to create captain pkt") #f)) (begin (print "ERROR: failed to start server.") #f))) ;; given a pkts dir read ;; (define (get-all-captain-pkts udata) (let* ((pktsdir (let ((d (udat-cpkts-dir udata))) (if (file-exists? d) d |
︙ | ︙ | |||
390 391 392 393 394 395 396 | ;; ;; NOTE: qrykey is what was called the "cookie" previously ;; ;; retval tells send to expect and wait for return data (one line) and return it or time out ;; this is for ping where we don't want to necessarily have set up our own server yet. ;; (define (send udata host-port handler qrykey data #!key (hostname #f)(pid #f)(params '())(retval #f)) | > > > > > > > > > > > | | | | | | | | | | | < < < < | < < < < | | | | | | | | | | | | | | | | | | 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 | ;; ;; NOTE: qrykey is what was called the "cookie" previously ;; ;; retval tells send to expect and wait for return data (one line) and return it or time out ;; this is for ping where we don't want to necessarily have set up our own server yet. ;; (define (send udata host-port handler qrykey data #!key (hostname #f)(pid #f)(params '())(retval #f)) (let* ((my-host-port (udat-my-host-port udata)) (isme (equal? host-port my-host-port)) ;; am I calling myself? (dat (conc handler " " my-host-port " " (udat-my-pid udata) " " qrykey (if (null? params) "" (conc " " (string-intersperse params " ")))))) ;; (print "send isme is " (if isme "true!" "false!") ", my-host-port: " my-host-port ", host-port: " host-port) (if isme (ulex-handler udata dat data) (handle-exceptions ;; ERROR - MAKE THIS EXCEPTION HANDLER MORE SPECIFIC exn #f (let-values (((inp oup)(tcp-connect host-port))) ;; ;; CONTROL LINE: ;; handlerkey host:port pid qrykey params ... ;; (let ((res (if (and inp oup) (let* () (if my-host-port (begin (write-line dat oup) (write-line data oup) ;; (print "Sent dat: " dat " data: " data) (if retval (read-line inp) #t)) (begin (print "ERROR: send called but no receiver has been setup. Please call setup first!") #f)) ;; NOTE: DO NOT BE TEMPTED TO LOOK AT ANY DATA ON INP HERE! ;; (there is a listener for handling that) ) #f))) ;; #f means failed to connect and send (close-input-port inp) (close-output-port oup) res)))))) ;; send a request to the given host-port and register a mailbox in udata ;; wait for the mailbox data and return it ;; (define (send-receive udata host-port handler qrykey data #!key (hostname #f)(pid #f)(params '())(timeout 20)) (let ((mbox (make-mailbox)) (mbox-time (current-milliseconds)) |
︙ | ︙ | |||
449 450 451 452 453 454 455 | (if (eq? res 'MBOX_TIMEOUT) #f res)) #f))) ;; #f means failed to communicate ;; (define (ulex-handler udata controldat data) | | | | 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 | (if (eq? res 'MBOX_TIMEOUT) #f res)) #f))) ;; #f means failed to communicate ;; (define (ulex-handler udata controldat data) ;; (print "controldat: " controldat " data: " data) (match (string-split controldat) ((handlerkey host-port pid qrykey params ...) ;; (print "handlerkey: " handlerkey " host-port: " host-port " pid: " pid " qrykey: " qrykey " params: " params) (case (string->symbol handlerkey) ((ack)(print "Got ack!")) ((ping) ;; special case - return result immediately on the same connection (let* ((proc (hash-table-ref/default (udat-handlers udata) 'ping #f)) (val (if proc (proc) "gotping")) (peer (make-peer addr-port: host-port pid: pid)) (dbshash (udat-dbowners udata))) |
︙ | ︙ | |||
484 485 486 487 488 489 490 | (udat-captain-address-set! udata #f) (udat-captain-host-set! udata #f) (udat-captain-port-set! udata #f) (udat-captain-pid-set! udata #f) qrykey) ((rucaptain) ;; remote is asking if I'm the captain (if (udat-my-cpkt-key udata) "yes" "no")) | | > > > > > > > > > > > > > > > > > > > > > > > > > | | | 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 | (udat-captain-address-set! udata #f) (udat-captain-host-set! udata #f) (udat-captain-port-set! udata #f) (udat-captain-pid-set! udata #f) qrykey) ((rucaptain) ;; remote is asking if I'm the captain (if (udat-my-cpkt-key udata) "yes" "no")) ((db-owner) ;; given a db name who do I send my queries to ;; look up the file in handlers, if have an entry ping them to be sure ;; they are still alive and then return that host:port. ;; if no handler found or if the ping fails pick from peers the oldest that ;; is managing the fewest dbs (match params ((dbfile dbtype) (let* ((curr-owner (hash-table-ref/default (udat-dbowners udata) dbfile #f)) (owner-host-port (and curr-owner (peer-addr-port curr-owner)))) (if owner-host-port (conc qrykey " " owner-host-port) (let* ((pdat (or (hash-table-ref/default (udat-peers udata) host-port #f) ;; no owner - caller gets to own it! (make-peer addr-port: host-port pid: pid dbs: `(,dbfile))))) (hash-table-set! (udat-dbowners udata) dbfile pdat) (conc qrykey " " host-port))))) (else (conc qrykey " BADDATA")))) ;; for work items: ;; handler is one of; immediate, read-only, read-write, high-priority ((immediate read-only normal low-priority) ;; do this work immediately ;; host-port (caller), pid (caller), qrykey (cookie), params <= all from first line ;; data => a single line encoded however you want, or should I build json into it? (let* ((pdat (get-peer-dat udata host-port))) (match params ;; dbfile prockey procparam ((dbfile prockey procparam) (case (string->symbol handlerkey) ((immediate read-only) (process-request udata pdat dbfile qrykey prockey procparam data)) ((normal low-priority) ;; split off later and add logic to support low priority (add-to-work-queue udata pdat dbfile qrykey prockey procparam data)) (else #f)))))) (else ;; (add-to-work-queue udata (get-peer-dat udata host-port) handlerkey qrykey data) #f))) (else (print "BAD DATA? controldat=" controldat " data=" data) #f)));; handles the incoming messages and dispatches to queues ;; (define (ulex-handler-loop udata) |
︙ | ︙ | |||
513 514 515 516 517 518 519 | (data (read-line inp)) (resp (ulex-handler udata controldat data))) (if resp (write-line resp oup)) (close-input-port inp) (close-output-port oup)) (loop state))))) | | > > | 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 | (data (read-line inp)) (resp (ulex-handler udata controldat data))) (if resp (write-line resp oup)) (close-input-port inp) (close-output-port oup)) (loop state))))) ;; add a proc to the handler list, these are done symetrically (i.e. in all instances) ;; so that the proc can be dereferenced remotely ;; (define (register-handler udata key proc) (hash-table-set! (udat-handlers udata) key proc)) ;;====================================================================== ;; work queues ;;====================================================================== |
︙ | ︙ | |||
552 553 554 555 556 557 558 559 560 561 562 563 564 565 | ;; sync on-disk db to inmem ;; get lock in on-disk db for dbowner of this db ;; put sync-proc, init-proc, on-disk handle, inmem handle in dbconn stuct ;; return the stuct ;;====================================================================== (defstruct dbconn (inmem #f) (conn #f) (sync #f) ;; sync proc (init #f) ;; init proc (lastsync (current-seconds)) ) | > | 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 | ;; sync on-disk db to inmem ;; get lock in on-disk db for dbowner of this db ;; put sync-proc, init-proc, on-disk handle, inmem handle in dbconn stuct ;; return the stuct ;;====================================================================== (defstruct dbconn (fname #f) (inmem #f) (conn #f) (sync #f) ;; sync proc (init #f) ;; init proc (lastsync (current-seconds)) ) |
︙ | ︙ |