;; ulex: Distributed sqlite3 db
;;;
;; Copyright (C) 2018-2021 Matt Welland
;; Redistribution and use in source and binary forms, with or without
;; modification, is permitted.
;;
;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS
;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
;; ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE
;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
;; OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
;; BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
;; LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
;; USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
;; DAMAGE.
;;======================================================================
;; ABOUT:
;; See README in the distribution at https://www.kiatoa.com/fossils/ulex
;; NOTES:
;; Why sql-de-lite and not say, dbi? - performance mostly, then simplicity.
;;
;;======================================================================
(module ulex
*
#;(
;; NOTE: looking for the handler proc - find the run-listener :)
run-listener ;; (run-listener handler-proc [port]) => uconn
;; NOTE: handler-proc params;
;; (handler-proc rem-host-port qrykey cmd params)
send-receive ;; (send-receive uconn host-port cmd data)
;; NOTE: cmd can be any plain text symbol except for these;
;; 'ping 'ack 'goodbye 'response
set-work-handler ;; (set-work-handler proc)
wait-and-close ;; (wait-and-close uconn)
ulex-listener?
;; needed to get the interface:port that was automatically found
udat-port
udat-host-port
;; for testing only
;; pp-uconn
;; parameters
work-method ;; parameter; 'threads, 'mailbox, 'limited, 'direct
return-method ;; parameter; 'mailbox, 'polling, 'direct
)
(import scheme
chicken.base
chicken.file
chicken.io
chicken.time
chicken.condition
chicken.string
chicken.sort
chicken.pretty-print
address-info
mailbox
matchable
;; queues
regex
regex-case
simple-exceptions
s11n
srfi-1
srfi-18
srfi-4
srfi-69
system-information
tcp6
typed-records
)
;; udat struct, used by both caller and callee
;; instantiated as uconn by convention
;;
(defstruct udat
;; the listener side
(port #f)
(host-port #f)
(socket #f)
;; the peers
(peers (make-hash-table)) ;; host:port->peer
;; work handling
(work-queue (make-mailbox))
(work-proc #f) ;; set by user
(cnum 0) ;; cookie number
(mboxes (make-hash-table)) ;; for the replies
(avail-cmboxes '()) ;; list of (<cookie> . <mbox>) for re-use
;; threads
(numthreads 10)
(cmd-thread #f)
(work-queue-thread #f)
(num-threads-running 0)
)
;; == << ;; Parameters
;; == <<
;; == << ;; work-method:
;; == << (define work-method (make-parameter 'mailbox))
;; == << ;; mailbox - all rdat goes through mailbox
;; == << ;; threads - all rdat immediately executed in new thread
;; == << ;; direct - no queuing
;; == << ;;
;; == <<
;; == << ;; return-method, return the result to waiting send-receive:
;; == << (define return-method (make-parameter 'mailbox))
;; == << ;; mailbox - create a mailbox and use it for passing returning results to send-receive
;; == << ;; polling - put the result in a hash table keyed by qrykey and send-receive can poll it for result
;; == << ;; direct - no queuing, result is passed back in single tcp connection
;; == << ;;
;; == <<
;; == << ;; ;; struct for keeping track of others we are talking to
;; == << ;; ;;
;; == << ;; (defstruct pdat
;; == << ;; (host-port #f)
;; == << ;; (conns '()) ;; list of pcon structs, pop one off when calling the peer
;; == << ;; )
;; == << ;;
;; == << ;; ;; struct for peer connections, keep track of expiration etc.
;; == << ;; ;;
;; == << ;; (defstruct pcon
;; == << ;; (inp #f)
;; == << ;; (oup #f)
;; == << ;; (exp (+ (current-seconds) 59)) ;; expires at this time, set to (+ (current-seconds) 59)
;; == << ;; (lifetime (+ (current-seconds) 600)) ;; throw away and create new after five minutes
;; == << ;; )
;; == <<
;; == << ;;======================================================================
;; == << ;; listener
;; == << ;;======================================================================
;; == <<
;; == << ;; is uconn a ulex connector (listener)
;; == << ;;
;; == << (define (ulex-listener? uconn)
;; == << (udat? uconn))
;; == <<
;; == << ;; create a tcp listener and return a populated udat struct with
;; == << ;; my port, address, hostname, pid etc.
;; == << ;; return #f if fail to find a port to allocate.
;; == << ;;
;; == << ;; if udata-in is #f create the record
;; == << ;; if there is already a serv-listener return the udata
;; == << ;;
;; == << (define (setup-listener uconn #!optional (port 4242))
;; == << (handle-exceptions
;; == << exn
;; == << (if (< port 65535)
;; == << (setup-listener uconn (+ port 1))
;; == << #f)
;; == << (connect-listener uconn port)))
;; == <<
;; == << (define (connect-listener uconn port)
;; == << ;; (tcp-listener-socket LISTENER)(socket-name so)
;; == << ;; sockaddr-address, sockaddr-port, sockaddr->string
;; == << (let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]])
;; == << (addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname)))
;; == << (udat-port-set! uconn port)
;; == << (udat-host-port-set! uconn (conc addr":"port))
;; == << (udat-socket-set! uconn tlsn)
;; == << uconn))
;; == <<
;; == << ;; run-listener does all the work of starting a listener in a thread
;; == << ;; it then returns control
;; == << ;;
;; == << (define (run-listener handler-proc #!optional (port-suggestion 4242))
;; == << (let* ((uconn (make-udat)))
;; == << (udat-work-proc-set! uconn handler-proc)
;; == << (if (setup-listener uconn port-suggestion)
;; == << (let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop"))
;; == << (th2 (make-thread (lambda ()
;; == << (case (work-method)
;; == << ((mailbox limited)
;; == << (process-work-queue uconn))))
;; == << "Ulex work queue processor")))
;; == << ;; (tcp-buffer-size 2048)
;; == << (thread-start! th1)
;; == << (thread-start! th2)
;; == << (udat-cmd-thread-set! uconn th1)
;; == << (udat-work-queue-thread-set! uconn th2)
;; == << (print "cmd loop and process workers started, listening on "(udat-host-port uconn)".")
;; == << uconn)
;; == << (assert #f "ERROR: run-listener called without proper setup."))))
;; == <<
;; == << (define (wait-and-close uconn)
;; == << (thread-join! (udat-cmd-thread uconn))
;; == << (tcp-close (udat-socket uconn)))
;; == <<
;; == << ;;======================================================================
;; == << ;; peers and connections
;; == << ;;======================================================================
;; == <<
;; == << (define *send-mutex* (make-mutex))
;; == <<
;; == << ;; send structured data to recipient
;; == << ;;
;; == << ;; NOTE: qrykey is what was called the "cookie" previously
;; == << ;;
;; == << ;; retval tells send to expect and wait for return data (one line) and return it or time out
;; == << ;; this is for ping where we don't want to necessarily have set up our own server yet.
;; == << ;;
;; == << ;; NOTE: see below for beginnings of code to allow re-use of tcp connections
;; == << ;; - I believe (without substantial evidence) that re-using connections will
;; == << ;; be beneficial ...
;; == << ;;
;; == << (define (send udata host-port qrykey cmd params)
;; == << (let* ((my-host-port (udat-host-port udata)) ;; remote will return to this
;; == << (isme #f #;(equal? host-port my-host-port)) ;; calling myself?
;; == << ;; dat is a self-contained work block that can be sent or handled locally
;; == << (dat (list my-host-port qrykey cmd params #;(cons (current-seconds)(current-milliseconds)))))
;; == << (cond
;; == << (isme (ulex-handler udata dat)) ;; no transmission needed
;; == << (else
;; == << (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC?
;; == << exn
;; == << (message exn)
;; == << (begin
;; == << ;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP
;; == << (let-values (((inp oup)(tcp-connect host-port)))
;; == << (let ((res (if (and inp oup)
;; == << (begin
;; == << (serialize dat oup)
;; == << (close-output-port oup)
;; == << (deserialize inp)
;; == << )
;; == << (begin
;; == << (print "ERROR: send called but no receiver has been setup. Please call setup first!")
;; == << #f))))
;; == << (close-input-port inp)
;; == << ;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP
;; == << res)))))))) ;; res will always be 'ack unless return-method is direct
;; == <<
;; == << (define (send-via-polling uconn host-port cmd data)
;; == << (let* ((qrykey (make-cookie uconn))
;; == << (sres (send uconn host-port qrykey cmd data)))
;; == << (case sres
;; == << ((ack)
;; == << (let loop ((start-time (current-milliseconds)))
;; == << (if (> (current-milliseconds)(+ start-time 10000)) ;; ten seconds timeout
;; == << (begin
;; == << (print "ULEX ERROR: timed out waiting for response from "host-port", "cmd" "data)
;; == << #f)
;; == << (let* ((result (hash-table-ref/default (udat-mboxes uconn) qrykey #f))) ;; NOTE: we are re-using mboxes hash
;; == << (if result ;; result is '(status . result-data) or #f for nothing yet
;; == << (begin
;; == << (hash-table-delete! (udat-mboxes uconn) qrykey)
;; == << (cdr result))
;; == << (begin
;; == << (thread-sleep! 0.01)
;; == << (loop start-time)))))))
;; == << (else
;; == << (print "ULEX ERROR: Communication failed? sres="sres)
;; == << #f))))
;; == <<
;; == << (define (send-via-mailbox uconn host-port cmd data)
;; == << (let* ((cmbox (get-cmbox uconn)) ;; would it be better to keep a stack of mboxes to reuse?
;; == << (qrykey (car cmbox))
;; == << (mbox (cdr cmbox))
;; == << (mbox-time (current-milliseconds))
;; == << (sres (send uconn host-port qrykey cmd data))) ;; short res
;; == << (if (eq? sres 'ack)
;; == << (let* ((mbox-timeout-secs 120 #;(if (eq? 'primordial (thread-name (current-thread)))
;; == << #f
;; == << 120)) ;; timeout)
;; == << (mbox-timeout-result 'MBOX_TIMEOUT)
;; == << (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result))
;; == << (mbox-receive-time (current-milliseconds)))
;; == << ;; (put-cmbox uconn cmbox) ;; reuse mbox and cookie. is it worth it?
;; == << (hash-table-delete! (udat-mboxes uconn) qrykey)
;; == << (if (eq? res 'MBOX_TIMEOUT)
;; == << (begin
;; == << (print "WARNING: mbox timed out for query "cmd", with data "data
;; == << ", waiting for response from "host-port".")
;; == <<
;; == << ;; here it might make sense to clean up connection records and force clean start?
;; == << ;; NO. The progam using ulex needs to do the reset. Right thing here is exception
;; == <<
;; == << #f) ;; convert to raising exception?
;; == << res))
;; == << (begin
;; == << (print "ERROR: Communication failed? Got "sres)
;; == << #f))))
;; == <<
;; == << ;; send a request to the given host-port and register a mailbox in udata
;; == << ;; wait for the mailbox data and return it
;; == << ;;
;; == << (define (send-receive uconn host-port cmd data)
;; == << (let* ((start-time (current-milliseconds))
;; == << (result (cond
;; == << ((member cmd '(ping goodbye)) ;; these are immediate
;; == << (send uconn host-port 'ping cmd data))
;; == << ((eq? (work-method) 'direct)
;; == << ;; the result from send will be the actual result, not an 'ack
;; == << (send uconn host-port 'direct cmd data))
;; == << (else
;; == << (case (return-method)
;; == << ((polling)
;; == << (send-via-polling uconn host-port cmd data))
;; == << ((mailbox)
;; == << (send-via-mailbox uconn host-port cmd data))
;; == << (else
;; == << (print "ULEX ERROR: unrecognised return-method "(return-method)".")
;; == << #f)))))
;; == << (duration (- (current-milliseconds) start-time)))
;; == << ;; this is ONLY for development and debugging. It will be removed once Ulex is stable.
;; == << (if (< 5000 duration)
;; == << (print "ULEX WARNING: round-trip took "(inexact->exact (round (/ duration 1000)))
;; == << " seconds; "cmd", host-port="host-port", data="data))
;; == << result))
;; == <<
;; == <<
;; == << ;;======================================================================
;; == << ;; responder side
;; == << ;;======================================================================
;; == <<
;; == << ;; take a request, rdat, and if not immediate put it in the work queue
;; == << ;;
;; == << ;; Reserved cmds; ack ping goodbye response
;; == << ;;
;; == << (define (ulex-handler uconn rdat)
;; == << (assert (list? rdat) "FATAL: ulex-handler give rdat as not list")
;; == << (match rdat ;; (string-split controldat)
;; == << ((rem-host-port qrykey cmd params);; timedata)
;; == << ;; (print "ulex-handler got: "rem-host-port" qrykey: "qrykey" cmd: "cmd" params: "params)
;; == << (case cmd
;; == << ;; ((ack )(print "Got ack! But why? Should NOT get here.") 'ack)
;; == << ((ping)
;; == << ;; (print "Got Ping!")
;; == << ;; (add-to-work-queue uconn rdat)
;; == << 'ack)
;; == << ((goodbye)
;; == << ;; just clear out references to the caller. NOT COMPLETE
;; == << (add-to-work-queue uconn rdat)
;; == << 'ack)
;; == << ((response) ;; this is a result from remote processing, send it as mail ...
;; == << (case (return-method)
;; == << ((polling)
;; == << (hash-table-set! (udat-mboxes uconn) qrykey (cons 'ok params))
;; == << 'ack)
;; == << ((mailbox)
;; == << (let ((mbox (hash-table-ref/default (udat-mboxes uconn) qrykey #f)))
;; == << (if mbox
;; == << (begin
;; == << (mailbox-send! mbox params) ;; params here is our result
;; == << 'ack)
;; == << (begin
;; == << (print "ERROR: received result but no associated mbox for cookie "qrykey)
;; == << 'no-mbox-found))))
;; == << (else (print "ULEX ERROR: unrecognised return-method "(return-method))
;; == << 'bad-return-method)))
;; == << (else ;; generic request - hand it to the work queue
;; == << (add-to-work-queue uconn rdat)
;; == << 'ack)))
;; == << (else
;; == << (print "ULEX ERROR: bad rdat "rdat)
;; == << 'bad-rdat)))
;; == <<
;; == << ;; given an already set up uconn start the cmd-loop
;; == << ;;
;; == << (define (ulex-cmd-loop uconn)
;; == << (let* ((serv-listener (udat-socket uconn))
;; == << (listener (lambda ()
;; == << (let loop ((state 'start))
;; == << (let-values (((inp oup)(tcp-accept serv-listener)))
;; == << ;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP
;; == << (let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params)
;; == << (resp (ulex-handler uconn rdat)))
;; == << (serialize resp oup)
;; == << (close-input-port inp)
;; == << (close-output-port oup)
;; == << ;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP
;; == << )
;; == << (loop state))))))
;; == << ;; start N of them
;; == << (let loop ((thnum 0)
;; == << (threads '()))
;; == << (if (< thnum 100)
;; == << (let* ((th (make-thread listener (conc "listener" thnum))))
;; == << (thread-start! th)
;; == << (loop (+ thnum 1)
;; == << (cons th threads)))
;; == << (map thread-join! threads)))))
;; == <<
;; == << ;; add a proc to the cmd list, these are done symetrically (i.e. in all instances)
;; == << ;; so that the proc can be dereferenced remotely
;; == << ;;
;; == << (define (set-work-handler uconn proc)
;; == << (udat-work-proc-set! uconn proc))
;; == <<
;; == << ;;======================================================================
;; == << ;; work queues - this is all happening on the listener side
;; == << ;;======================================================================
;; == <<
;; == << ;; rdat is (rem-host-port qrykey cmd params)
;; == <<
;; == << (define (add-to-work-queue uconn rdat)
;; == << #;(queue-add! (udat-work-queue uconn) rdat)
;; == << (case (work-method)
;; == << ((threads)
;; == << (thread-start! (make-thread (lambda ()
;; == << (do-work uconn rdat))
;; == << "worker thread")))
;; == << ((mailbox)
;; == << (mailbox-send! (udat-work-queue uconn) rdat))
;; == << ((direct)
;; == << (do-work uconn rdat))
;; == << (else
;; == << (print "ULEX ERROR: work-method "(work-method)" not recognised, using mailbox.")
;; == << (mailbox-send! (udat-work-queue uconn) rdat))))
;; == <<
;; == << ;; move the logic to return the result somewhere else?
;; == << ;;
;; == << (define (do-work uconn rdat)
;; == << (let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change
;; == << ;; put this following into a do-work procedure
;; == << (match rdat
;; == << ((rem-host-port qrykey cmd params)
;; == << (let* ((start-time (current-milliseconds))
;; == << (result (proc rem-host-port qrykey cmd params))
;; == << (end-time (current-milliseconds))
;; == << (run-time (- end-time start-time)))
;; == << (case (work-method)
;; == << ((direct) result)
;; == << (else
;; == << (print "ULEX: work "cmd", "params" done in "run-time" ms")
;; == << ;; send 'response as cmd and result as params
;; == << (send uconn rem-host-port qrykey 'response result) ;; could check for ack
;; == << (print "ULEX: response sent back to "rem-host-port" in "(- (current-milliseconds) end-time))))))
;; == << (MBOX_TIMEOUT 'do-work-timeout)
;; == << (else
;; == << (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params")))))
;; == <<
;; == << ;; NEW APPROACH:
;; == << ;;
;; == << (define (process-work-queue uconn)
;; == << (let ((wqueue (udat-work-queue uconn))
;; == << (proc (udat-work-proc uconn))
;; == << (numthr (udat-numthreads uconn)))
;; == << (let loop ((thnum 1)
;; == << (threads '()))
;; == << (let ((thlst (cons (make-thread (lambda ()
;; == << (let work-loop ()
;; == << (let ((rdat (mailbox-receive! wqueue 24000 'MBOX_TIMEOUT)))
;; == << (do-work uconn rdat))
;; == << (work-loop)))
;; == << (conc "work thread " thnum))
;; == << threads)))
;; == << (if (< thnum numthr)
;; == << (loop (+ thnum 1)
;; == << thlst)
;; == << (begin
;; == << (print "ULEX: Starting "(length thlst)" worker threads.")
;; == << (map thread-start! thlst)
;; == << (print "ULEX: Threads started. Joining all.")
;; == << (map thread-join! thlst)))))))
;; == <<
;; == << ;; below was to enable re-use of connections. This seems non-trivial so for
;; == << ;; now lets open on each call
;; == << ;;
;; == << ;; ;; given host-port get or create peer struct
;; == << ;; ;;
;; == << ;; (define (udat-get-peer uconn host-port)
;; == << ;; (or (hash-table-ref/default (udat-peers uconn) host-port #f)
;; == << ;; ;; no peer, so create pdat and init it
;; == << ;;
;; == << ;; ;; NEED stack of connections, pop and use; inp, oup,
;; == << ;; ;; creation_time (remove and create new if over 24hrs old
;; == << ;; ;;
;; == << ;; (let ((pdat (make-pdat host-port: host-port)))
;; == << ;; (hash-table-set! (udat-peers uconn) host-port pdat)
;; == << ;; pdat)))
;; == << ;;
;; == << ;; ;; is pcon alive
;; == << ;;
;; == << ;; ;; given host-port and pdat get a pcon
;; == << ;; ;;
;; == << ;; (define (pdat-get-pcon pdat host-port)
;; == << ;; (let loop ((conns (pdat-conns pdat)))
;; == << ;; (if (null? conns) ;; none? make and return - do NOT add - it will be pushed back on list later
;; == << ;; (init-pcon (make-pcon))
;; == << ;; (let* ((conn (pop conns)))
;; == << ;;
;; == << ;; ;; given host-port get a pcon struct
;; == << ;; ;;
;; == << ;; (define (udat-get-pcon
;; == <<
;; == << ;;======================================================================
;; == << ;; misc utils
;; == << ;;======================================================================
;; == <<
;; == << (define (make-cookie uconn)
;; == << (let ((newcnum (+ (udat-cnum uconn) 1)))
;; == << (udat-cnum-set! uconn newcnum)
;; == << (conc (udat-host-port uconn) ":"
;; == << newcnum)))
;; == <<
;; == << ;; cookie/mboxes
;; == <<
;; == << ;; we store each mbox with a cookie (<cookie> . <mbox>)
;; == << ;;
;; == << (define (get-cmbox uconn)
;; == << (if (null? (udat-avail-cmboxes uconn))
;; == << (let ((cookie (make-cookie uconn))
;; == << (mbox (make-mailbox)))
;; == << (hash-table-set! (udat-mboxes uconn) cookie mbox)
;; == << `(,cookie . ,mbox))
;; == << (let ((cmbox (car (udat-avail-cmboxes uconn))))
;; == << (udat-avail-cmboxes-set! uconn (cdr (udat-avail-cmboxes uconn)))
;; == << cmbox)))
;; == <<
;; == << (define (put-cmbox uconn cmbox)
;; == << (udat-avail-cmboxes-set! uconn (cons cmbox (udat-avail-cmboxes uconn))))
;; == <<
;; == << (define (pp-uconn uconn)
;; == << (pp (udat->alist uconn)))
;; == <<
;; == <<
;; == << ;;======================================================================
;; == << ;; network utilities
;; == << ;;======================================================================
;; == <<
;; == << ;; NOTE: Look at address-info egg as alternative to some of this
;; == <<
;; == << (define (rate-ip ipaddr)
;; == << (regex-case ipaddr
;; == << ( "^127\\..*" _ 0 )
;; == << ( "^(10\\.0|192\\.168)\\..*" _ 1 )
;; == << ( else 2 ) ))
;; == <<
;; == << ;; Change this to bias for addresses with a reasonable broadcast value?
;; == << ;;
;; == << (define (ip-pref-less? a b)
;; == << (> (rate-ip a) (rate-ip b)))
;; == <<
;; == << (define (get-my-best-address)
;; == << (let ((all-my-addresses (get-all-ips)))
;; == << (cond
;; == << ((null? all-my-addresses)
;; == << (get-host-name)) ;; no interfaces?
;; == << ((eq? (length all-my-addresses) 1)
;; == << (car all-my-addresses)) ;; only one to choose from, just go with it
;; == << (else
;; == << (car (sort all-my-addresses ip-pref-less?))))))
;; == <<
;; == << (define (get-all-ips-sorted)
;; == << (sort (get-all-ips) ip-pref-less?))
;; == <<
;; == << (define (get-all-ips)
;; == << (map address-info-host
;; == << (filter (lambda (x)
;; == << (equal? (address-info-type x) "tcp"))
;; == << (address-infos (get-host-name)))))
;; == <<
)