Overview
Comment: | Added simple ulex (used as a sanity checker |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | v2.0001 |
Files: | files | file ages | folders |
SHA1: |
acda13e7e19f600f5119af6fa5ebd71a |
User & Date: | matt on 2022-01-06 08:36:17 |
Other Links: | branch diff | manifest | tags |
Context
2022-01-06
| ||
18:18 | Ensure api calls to db do NOT occur on non-server processes. check-in: 7696fcfff8 user: matt tags: v2.0001 | |
08:36 | Added simple ulex (used as a sanity checker check-in: acda13e7e1 user: matt tags: v2.0001 | |
08:10 | Tweaked debug.scm check-in: 6e2f351dc9 user: matt tags: v2.0001 | |
Changes
Added ulex-simple/ulex.scm version [496e2d5b2f].
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 | ;; ulex: Distributed sqlite3 db ;;; ;; Copyright (C) 2018-2021 Matt Welland ;; Redistribution and use in source and binary forms, with or without ;; modification, is permitted. ;; ;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS ;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED ;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ;; ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE ;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR ;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT ;; OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR ;; BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF ;; LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT ;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE ;; USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH ;; DAMAGE. ;;====================================================================== ;; ABOUT: ;; See README in the distribution at https://www.kiatoa.com/fossils/ulex ;; NOTES: ;; Why sql-de-lite and not say, dbi? - performance mostly, then simplicity. ;; ;;====================================================================== (module ulex ( ;; NOTE: looking for the handler proc - find the run-listener :) run-listener ;; (run-listener handler-proc [port]) => uconn ;; NOTE: handler-proc params; ;; (handler-proc rem-host-port qrykey cmd params) send-receive ;; (send-receive uconn host-port cmd data) ;; NOTE: cmd can be any plain text symbol except for these; ;; 'ping 'ack 'goodbye 'response set-work-handler ;; (set-work-handler proc) wait-and-close ;; (wait-and-close uconn) ulex-listener? ;; needed to get the interface:port that was automatically found udat-port udat-host-port ;; for testing only ;; pp-uconn ) (import scheme chicken.base chicken.file chicken.time chicken.condition chicken.string chicken.sort chicken.pretty-print address-info mailbox matchable ;; queues regex regex-case s11n srfi-1 srfi-18 srfi-4 srfi-69 system-information tcp6 typed-records ) ;; udat struct, used by both caller and callee ;; instantiated as uconn by convention ;; (defstruct udat ;; the listener side (port #f) (host-port #f) (socket #f) ;; the peers (peers (make-hash-table)) ;; host:port->peer ;; work handling (work-queue (make-mailbox)) (work-proc #f) ;; set by user (cnum 0) ;; cookie number (mboxes (make-hash-table)) ;; for the replies (avail-cmboxes '()) ;; list of (<cookie> . <mbox>) for re-use ;; threads (numthreads 50) (cmd-thread #f) (work-queue-thread #f) ) ;; ;; struct for keeping track of others we are talking to ;; ;; ;; (defstruct pdat ;; (host-port #f) ;; (conns '()) ;; list of pcon structs, pop one off when calling the peer ;; ) ;; ;; ;; struct for peer connections, keep track of expiration etc. ;; ;; ;; (defstruct pcon ;; (inp #f) ;; (oup #f) ;; (exp (+ (current-seconds) 59)) ;; expires at this time, set to (+ (current-seconds) 59) ;; (lifetime (+ (current-seconds) 600)) ;; throw away and create new after five minutes ;; ) ;;====================================================================== ;; listener ;;====================================================================== ;; is uconn a ulex connector (listener) ;; (define (ulex-listener? uconn) (udat? uconn)) ;; create a tcp listener and return a populated udat struct with ;; my port, address, hostname, pid etc. ;; return #f if fail to find a port to allocate. ;; ;; if udata-in is #f create the record ;; if there is already a serv-listener return the udata ;; (define (setup-listener uconn #!optional (port 4242)) (handle-exceptions exn (if (< port 65535) (setup-listener uconn (+ port 1)) #f) (connect-listener uconn port))) (define (connect-listener uconn port) ;; (tcp-listener-socket LISTENER)(socket-name so) ;; sockaddr-address, sockaddr-port, sockaddr->string (let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]]) (addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname))) (udat-port-set! uconn port) (udat-host-port-set! uconn (conc addr":"port)) (udat-socket-set! uconn tlsn) uconn)) ;; run-listener does all the work of starting a listener in a thread ;; it then returns control ;; (define (run-listener handler-proc #!optional (port-suggestion 4242)) (let* ((uconn (make-udat))) (udat-work-proc-set! uconn handler-proc) (if (setup-listener uconn port-suggestion) (let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop")) #;(th2 (make-thread (lambda ()(process-work-queue uconn)) "Ulex work queue processor"))) (tcp-buffer-size 2048) ;; (max-connections 2048) (thread-start! th1) #;(thread-start! th2) (udat-cmd-thread-set! uconn th1) #;(udat-work-queue-thread-set! uconn th2) (print "cmd loop and process workers started") uconn) (assert #f "ERROR: run-listener called without proper setup.")))) (define (wait-and-close uconn) (thread-join! (udat-cmd-thread uconn)) (tcp-close (udat-socket uconn))) ;;====================================================================== ;; peers and connections ;;====================================================================== (define *send-mutex* (make-mutex)) ;; send structured data to recipient ;; ;; NOTE: qrykey is what was called the "cookie" previously ;; ;; retval tells send to expect and wait for return data (one line) and return it or time out ;; this is for ping where we don't want to necessarily have set up our own server yet. ;; ;; NOTE: see below for beginnings of code to allow re-use of tcp connections ;; - I believe (without substantial evidence) that re-using connections will ;; be beneficial ... ;; (define (send udata host-port qrykey cmd params) (mutex-lock! *send-mutex*) (let* ((my-host-port (udat-host-port udata)) ;; remote will return to this (isme #f #;(equal? host-port my-host-port)) ;; calling myself? ;; dat is a self-contained work block that can be sent or handled locally (dat (list my-host-port qrykey cmd params))) (if isme (ulex-handler udata dat) ;; no transmission needed (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC? exn #f (let-values (((inp oup)(tcp-connect host-port))) (let ((res (if (and inp oup) (begin (serialize dat oup) (deserialize inp)) ;; yes, we always want an ack (begin (print "ERROR: send called but no receiver has been setup. Please call setup first!") #f)))) (close-input-port inp) (close-output-port oup) (mutex-unlock! *send-mutex*) res)))))) ;; res will always be 'ack ;; send a request to the given host-port and register a mailbox in udata ;; wait for the mailbox data and return it ;; (define (send-receive uconn host-port cmd data) (cond ((member cmd '(ping goodbye)) ;; these are immediate (send uconn host-port 'ping cmd data)) (else (let* ((cmbox (get-cmbox uconn)) ;; would it be better to keep a stack of mboxes to reuse? (qrykey (car cmbox)) (mbox (cdr cmbox)) (mbox-time (current-milliseconds)) (sres (send uconn host-port qrykey cmd data))) ;; short res sres)))) ;;====================================================================== ;; responder side ;;====================================================================== ;; take a request, rdat, and if not immediate put it in the work queue ;; ;; Reserved cmds; ack ping goodbye response ;; (define (ulex-handler uconn rdat) (assert (list? rdat) "FATAL: ulex-handler give rdat as not list") (match rdat ;; (string-split controldat) ((rem-host-port qrykey cmd params) ;; (print "ulex-handler got: "rem-host-port" qrykey: "qrykey" cmd: "cmd" params: "params) (let ((mbox (hash-table-ref/default (udat-mboxes uconn) qrykey #f))) (case cmd ;; ((ack )(print "Got ack! But why? Should NOT get here.") 'ack) ((ping) ;; (print "Got Ping!") ;; (add-to-work-queue uconn rdat) 'ack) (else (do-work uconn rdat))))) (else (print "BAD DATA? controldat=" rdat) 'ack) ;; send ack anyway? )) ;; given an already set up uconn start the cmd-loop ;; (define (ulex-cmd-loop uconn) (let* ((serv-listener (udat-socket uconn))) (let loop ((state 'start)) (let-values (((inp oup)(tcp-accept serv-listener))) (let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params) (resp (ulex-handler uconn rdat))) (if resp (serialize resp oup)) (close-input-port inp) (close-output-port oup)) (loop state))))) ;; add a proc to the cmd list, these are done symetrically (i.e. in all instances) ;; so that the proc can be dereferenced remotely ;; (define (set-work-handler uconn proc) (udat-work-proc-set! uconn proc)) ;;====================================================================== ;; work queues - this is all happening on the listener side ;;====================================================================== ;; rdat is (rem-host-port qrykey cmd params) (define (add-to-work-queue uconn rdat) #;(queue-add! (udat-work-queue uconn) rdat) (mailbox-send! (udat-work-queue uconn) rdat)) (define (do-work uconn rdat) (let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change ;; put this following into a do-work procedure (match rdat ((rem-host-port qrykey cmd params) (let* ((start-time (current-milliseconds)) (result (proc rem-host-port qrykey cmd params)) (end-time (current-milliseconds)) (run-time (- end-time start-time))) result)) (else (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params") #f)))) (define (process-work-queue uconn) (let ((wqueue (udat-work-queue uconn)) (proc (udat-work-proc uconn)) (numthr (udat-numthreads uconn))) (let loop ((thnum 1) (threads '())) (let ((thlst (cons (make-thread (lambda () (let work-loop () (let ((rdat (mailbox-receive! wqueue 24000 'MBOX_TIMEOUT))) (do-work uconn rdat)) (work-loop))) (conc "work thread " thnum)) threads))) (if (< thnum numthr) (loop (+ thnum 1) thlst) (begin (print "ULEX: Starting "(length thlst)" worker threads.") (map thread-start! thlst) (print "ULEX: Threads started. Joining all.") (map thread-join! thlst))))))) ;; below was to enable re-use of connections. This seems non-trivial so for ;; now lets open on each call ;; ;; ;; given host-port get or create peer struct ;; ;; ;; (define (udat-get-peer uconn host-port) ;; (or (hash-table-ref/default (udat-peers uconn) host-port #f) ;; ;; no peer, so create pdat and init it ;; ;; ;; NEED stack of connections, pop and use; inp, oup, ;; ;; creation_time (remove and create new if over 24hrs old ;; ;; ;; (let ((pdat (make-pdat host-port: host-port))) ;; (hash-table-set! (udat-peers uconn) host-port pdat) ;; pdat))) ;; ;; ;; is pcon alive ;; ;; ;; given host-port and pdat get a pcon ;; ;; ;; (define (pdat-get-pcon pdat host-port) ;; (let loop ((conns (pdat-conns pdat))) ;; (if (null? conns) ;; none? make and return - do NOT add - it will be pushed back on list later ;; (init-pcon (make-pcon)) ;; (let* ((conn (pop conns))) ;; ;; ;; given host-port get a pcon struct ;; ;; ;; (define (udat-get-pcon ;;====================================================================== ;; misc utils ;;====================================================================== (define (make-cookie uconn) (let ((newcnum (+ (udat-cnum uconn) 1))) (udat-cnum-set! uconn newcnum) (conc (udat-host-port uconn) ":" newcnum))) ;; cookie/mboxes ;; we store each mbox with a cookie (<cookie> . <mbox>) ;; (define (get-cmbox uconn) (if (null? (udat-avail-cmboxes uconn)) (let ((cookie (make-cookie uconn)) (mbox (make-mailbox))) (hash-table-set! (udat-mboxes uconn) cookie mbox) `(,cookie . ,mbox)) (let ((cmbox (car (udat-avail-cmboxes uconn)))) (udat-avail-cmboxes-set! uconn (cdr (udat-avail-cmboxes uconn))) cmbox))) (define (put-cmbox uconn cmbox) (udat-avail-cmboxes-set! uconn (cons cmbox (udat-avail-cmboxes uconn)))) (define (pp-uconn uconn) (pp (udat->alist uconn))) ;;====================================================================== ;; network utilities ;;====================================================================== ;; NOTE: Look at address-info egg as alternative to some of this (define (rate-ip ipaddr) (regex-case ipaddr ( "^127\\..*" _ 0 ) ( "^(10\\.0|192\\.168)\\..*" _ 1 ) ( else 2 ) )) ;; Change this to bias for addresses with a reasonable broadcast value? ;; (define (ip-pref-less? a b) (> (rate-ip a) (rate-ip b))) (define (get-my-best-address) (let ((all-my-addresses (get-all-ips))) (cond ((null? all-my-addresses) (get-host-name)) ;; no interfaces? ((eq? (length all-my-addresses) 1) (car all-my-addresses)) ;; only one to choose from, just go with it (else (car (sort all-my-addresses ip-pref-less?)))))) (define (get-all-ips-sorted) (sort (get-all-ips) ip-pref-less?)) (define (get-all-ips) (map address-info-host (filter (lambda (x) (equal? (address-info-type x) "tcp")) (address-infos (get-host-name))))) ) |