;; ulex: Distributed sqlite3 db
;;;
;; Copyright (C) 2018-2021 Matt Welland
;; Redistribution and use in source and binary forms, with or without
;; modification, is permitted.
;;
;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS
;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
;; ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE
;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
;; OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
;; BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
;; LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
;; USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
;; DAMAGE.
;;======================================================================
;; ABOUT:
;; See README in the distribution at https://www.kiatoa.com/fossils/ulex
;; NOTES:
;; Why sql-de-lite and not say, dbi? - performance mostly, then simplicity.
;;
;;======================================================================
(module ulex
*
#;(
;; NOTE: looking for the handler proc - find the run-listener :)
run-listener ;; (run-listener handler-proc [port]) => uconn
;; NOTE: handler-proc params;
;; (handler-proc rem-host-port qrykey cmd params)
send-receive ;; (send-receive uconn host-port cmd data)
;; NOTE: cmd can be any plain text symbol except for these;
;; 'ping 'ack 'goodbye 'response
set-work-handler ;; (set-work-handler proc)
wait-and-close ;; (wait-and-close uconn)
ulex-listener?
;; needed to get the interface:port that was automatically found
udat-port
udat-host-port
;; for testing only
;; pp-uconn
;; parameters
work-method ;; parameter; 'threads, 'mailbox, 'limited, 'direct
return-method ;; parameter; 'mailbox, 'polling, 'direct
)
(import scheme
chicken.base
chicken.file
chicken.io
chicken.time
chicken.condition
chicken.string
chicken.sort
chicken.pretty-print
address-info
mailbox
matchable
;; queues
regex
regex-case
simple-exceptions
s11n
srfi-1
srfi-18
srfi-4
srfi-69
system-information
tcp6
typed-records
)
;; ;; udat struct, used by both caller and callee
;; ;; instantiated as uconn by convention
;; ;;
;; (defstruct udat
;; ;; the listener side
;; (port #f)
;; (host-port #f)
;; (socket #f)
;; ;; the peers
;; (peers (make-hash-table)) ;; host:port->peer
;; ;; work handling
;; (work-queue (make-mailbox))
;; (work-proc #f) ;; set by user
;; (cnum 0) ;; cookie number
;; (mboxes (make-hash-table)) ;; for the replies
;; (avail-cmboxes '()) ;; list of (<cookie> . <mbox>) for re-use
;; ;; threads
;; (numthreads 10)
;; (cmd-thread #f)
;; (work-queue-thread #f)
;; (num-threads-running 0)
;; )
;;
;; ;; Parameters
;;
;; ;; work-method:
;; (define work-method (make-parameter 'mailbox))
;; ;; mailbox - all rdat goes through mailbox
;; ;; threads - all rdat immediately executed in new thread
;; ;; direct - no queuing
;; ;;
;;
;; ;; return-method, return the result to waiting send-receive:
;; (define return-method (make-parameter 'mailbox))
;; ;; mailbox - create a mailbox and use it for passing returning results to send-receive
;; ;; polling - put the result in a hash table keyed by qrykey and send-receive can poll it for result
;; ;; direct - no queuing, result is passed back in single tcp connection
;; ;;
;;
;; ;; ;; struct for keeping track of others we are talking to
;; ;; ;;
;; ;; (defstruct pdat
;; ;; (host-port #f)
;; ;; (conns '()) ;; list of pcon structs, pop one off when calling the peer
;; ;; )
;; ;;
;; ;; ;; struct for peer connections, keep track of expiration etc.
;; ;; ;;
;; ;; (defstruct pcon
;; ;; (inp #f)
;; ;; (oup #f)
;; ;; (exp (+ (current-seconds) 59)) ;; expires at this time, set to (+ (current-seconds) 59)
;; ;; (lifetime (+ (current-seconds) 600)) ;; throw away and create new after five minutes
;; ;; )
;;
;; ;;======================================================================
;; ;; listener
;; ;;======================================================================
;;
;; ;; is uconn a ulex connector (listener)
;; ;;
;; (define (ulex-listener? uconn)
;; (udat? uconn))
;;
;; ;; create a tcp listener and return a populated udat struct with
;; ;; my port, address, hostname, pid etc.
;; ;; return #f if fail to find a port to allocate.
;; ;;
;; ;; if udata-in is #f create the record
;; ;; if there is already a serv-listener return the udata
;; ;;
;; (define (setup-listener uconn #!optional (port 4242))
;; (handle-exceptions
;; exn
;; (if (< port 65535)
;; (setup-listener uconn (+ port 1))
;; #f)
;; (connect-listener uconn port)))
;;
;; (define (connect-listener uconn port)
;; ;; (tcp-listener-socket LISTENER)(socket-name so)
;; ;; sockaddr-address, sockaddr-port, sockaddr->string
;; (let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]])
;; (addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname)))
;; (udat-port-set! uconn port)
;; (udat-host-port-set! uconn (conc addr":"port))
;; (udat-socket-set! uconn tlsn)
;; uconn))
;;
;; ;; run-listener does all the work of starting a listener in a thread
;; ;; it then returns control
;; ;;
;; (define (run-listener handler-proc #!optional (port-suggestion 4242))
;; (let* ((uconn (make-udat)))
;; (udat-work-proc-set! uconn handler-proc)
;; (if (setup-listener uconn port-suggestion)
;; (let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop"))
;; (th2 (make-thread (lambda ()
;; (case (work-method)
;; ((mailbox limited)
;; (process-work-queue uconn))))
;; "Ulex work queue processor")))
;; ;; (tcp-buffer-size 2048)
;; (thread-start! th1)
;; (thread-start! th2)
;; (udat-cmd-thread-set! uconn th1)
;; (udat-work-queue-thread-set! uconn th2)
;; (print "cmd loop and process workers started, listening on "(udat-host-port uconn)".")
;; uconn)
;; (assert #f "ERROR: run-listener called without proper setup."))))
;;
;; (define (wait-and-close uconn)
;; (thread-join! (udat-cmd-thread uconn))
;; (tcp-close (udat-socket uconn)))
;;
;; ;;======================================================================
;; ;; peers and connections
;; ;;======================================================================
;;
;; (define *send-mutex* (make-mutex))
;;
;; ;; send structured data to recipient
;; ;;
;; ;; NOTE: qrykey is what was called the "cookie" previously
;; ;;
;; ;; retval tells send to expect and wait for return data (one line) and return it or time out
;; ;; this is for ping where we don't want to necessarily have set up our own server yet.
;; ;;
;; ;; NOTE: see below for beginnings of code to allow re-use of tcp connections
;; ;; - I believe (without substantial evidence) that re-using connections will
;; ;; be beneficial ...
;; ;;
;; (define (send udata host-port qrykey cmd params)
;; (let* ((my-host-port (udat-host-port udata)) ;; remote will return to this
;; (isme #f #;(equal? host-port my-host-port)) ;; calling myself?
;; ;; dat is a self-contained work block that can be sent or handled locally
;; (dat (list my-host-port qrykey cmd params #;(cons (current-seconds)(current-milliseconds)))))
;; (cond
;; (isme (ulex-handler udata dat)) ;; no transmission needed
;; (else
;; (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC?
;; exn
;; (message exn)
;; (begin
;; ;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP
;; (let-values (((inp oup)(tcp-connect host-port)))
;; (let ((res (if (and inp oup)
;; (begin
;; (serialize dat oup)
;; (close-output-port oup)
;; (deserialize inp)
;; )
;; (begin
;; (print "ERROR: send called but no receiver has been setup. Please call setup first!")
;; #f))))
;; (close-input-port inp)
;; ;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP
;; res)))))))) ;; res will always be 'ack unless return-method is direct
;;
;; (define (send-via-polling uconn host-port cmd data)
;; (let* ((qrykey (make-cookie uconn))
;; (sres (send uconn host-port qrykey cmd data)))
;; (case sres
;; ((ack)
;; (let loop ((start-time (current-milliseconds)))
;; (if (> (current-milliseconds)(+ start-time 10000)) ;; ten seconds timeout
;; (begin
;; (print "ULEX ERROR: timed out waiting for response from "host-port", "cmd" "data)
;; #f)
;; (let* ((result (hash-table-ref/default (udat-mboxes uconn) qrykey #f))) ;; NOTE: we are re-using mboxes hash
;; (if result ;; result is '(status . result-data) or #f for nothing yet
;; (begin
;; (hash-table-delete! (udat-mboxes uconn) qrykey)
;; (cdr result))
;; (begin
;; (thread-sleep! 0.01)
;; (loop start-time)))))))
;; (else
;; (print "ULEX ERROR: Communication failed? sres="sres)
;; #f))))
;;
;; (define (send-via-mailbox uconn host-port cmd data)
;; (let* ((cmbox (get-cmbox uconn)) ;; would it be better to keep a stack of mboxes to reuse?
;; (qrykey (car cmbox))
;; (mbox (cdr cmbox))
;; (mbox-time (current-milliseconds))
;; (sres (send uconn host-port qrykey cmd data))) ;; short res
;; (if (eq? sres 'ack)
;; (let* ((mbox-timeout-secs 120 #;(if (eq? 'primordial (thread-name (current-thread)))
;; #f
;; 120)) ;; timeout)
;; (mbox-timeout-result 'MBOX_TIMEOUT)
;; (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result))
;; (mbox-receive-time (current-milliseconds)))
;; ;; (put-cmbox uconn cmbox) ;; reuse mbox and cookie. is it worth it?
;; (hash-table-delete! (udat-mboxes uconn) qrykey)
;; (if (eq? res 'MBOX_TIMEOUT)
;; (begin
;; (print "WARNING: mbox timed out for query "cmd", with data "data
;; ", waiting for response from "host-port".")
;;
;; ;; here it might make sense to clean up connection records and force clean start?
;; ;; NO. The progam using ulex needs to do the reset. Right thing here is exception
;;
;; #f) ;; convert to raising exception?
;; res))
;; (begin
;; (print "ERROR: Communication failed? Got "sres)
;; #f))))
;;
;; ;; send a request to the given host-port and register a mailbox in udata
;; ;; wait for the mailbox data and return it
;; ;;
;; (define (send-receive uconn host-port cmd data)
;; (let* ((start-time (current-milliseconds))
;; (result (cond
;; ((member cmd '(ping goodbye)) ;; these are immediate
;; (send uconn host-port 'ping cmd data))
;; ((eq? (work-method) 'direct)
;; ;; the result from send will be the actual result, not an 'ack
;; (send uconn host-port 'direct cmd data))
;; (else
;; (case (return-method)
;; ((polling)
;; (send-via-polling uconn host-port cmd data))
;; ((mailbox)
;; (send-via-mailbox uconn host-port cmd data))
;; (else
;; (print "ULEX ERROR: unrecognised return-method "(return-method)".")
;; #f)))))
;; (duration (- (current-milliseconds) start-time)))
;; ;; this is ONLY for development and debugging. It will be removed once Ulex is stable.
;; (if (< 5000 duration)
;; (print "ULEX WARNING: round-trip took "(inexact->exact (round (/ duration 1000)))
;; " seconds; "cmd", host-port="host-port", data="data))
;; result))
;;
;;
;; ;;======================================================================
;; ;; responder side
;; ;;======================================================================
;;
;; ;; take a request, rdat, and if not immediate put it in the work queue
;; ;;
;; ;; Reserved cmds; ack ping goodbye response
;; ;;
;; (define (ulex-handler uconn rdat)
;; (assert (list? rdat) "FATAL: ulex-handler give rdat as not list")
;; (match rdat ;; (string-split controldat)
;; ((rem-host-port qrykey cmd params);; timedata)
;; ;; (print "ulex-handler got: "rem-host-port" qrykey: "qrykey" cmd: "cmd" params: "params)
;; (case cmd
;; ;; ((ack )(print "Got ack! But why? Should NOT get here.") 'ack)
;; ((ping)
;; ;; (print "Got Ping!")
;; ;; (add-to-work-queue uconn rdat)
;; 'ack)
;; ((goodbye)
;; ;; just clear out references to the caller. NOT COMPLETE
;; (add-to-work-queue uconn rdat)
;; 'ack)
;; ((response) ;; this is a result from remote processing, send it as mail ...
;; (case (return-method)
;; ((polling)
;; (hash-table-set! (udat-mboxes uconn) qrykey (cons 'ok params))
;; 'ack)
;; ((mailbox)
;; (let ((mbox (hash-table-ref/default (udat-mboxes uconn) qrykey #f)))
;; (if mbox
;; (begin
;; (mailbox-send! mbox params) ;; params here is our result
;; 'ack)
;; (begin
;; (print "ERROR: received result but no associated mbox for cookie "qrykey)
;; 'no-mbox-found))))
;; (else (print "ULEX ERROR: unrecognised return-method "(return-method))
;; 'bad-return-method)))
;; (else ;; generic request - hand it to the work queue
;; (add-to-work-queue uconn rdat)
;; 'ack)))
;; (else
;; (print "ULEX ERROR: bad rdat "rdat)
;; 'bad-rdat)))
;;
;; ;; given an already set up uconn start the cmd-loop
;; ;;
;; (define (ulex-cmd-loop uconn)
;; (let* ((serv-listener (udat-socket uconn))
;; (listener (lambda ()
;; (let loop ((state 'start))
;; (let-values (((inp oup)(tcp-accept serv-listener)))
;; ;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP
;; (let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params)
;; (resp (ulex-handler uconn rdat)))
;; (serialize resp oup)
;; (close-input-port inp)
;; (close-output-port oup)
;; ;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP
;; )
;; (loop state))))))
;; ;; start N of them
;; (let loop ((thnum 0)
;; (threads '()))
;; (if (< thnum 100)
;; (let* ((th (make-thread listener (conc "listener" thnum))))
;; (thread-start! th)
;; (loop (+ thnum 1)
;; (cons th threads)))
;; (map thread-join! threads)))))
;;
;; ;; add a proc to the cmd list, these are done symetrically (i.e. in all instances)
;; ;; so that the proc can be dereferenced remotely
;; ;;
;; (define (set-work-handler uconn proc)
;; (udat-work-proc-set! uconn proc))
;;
;; ;;======================================================================
;; ;; work queues - this is all happening on the listener side
;; ;;======================================================================
;;
;; ;; rdat is (rem-host-port qrykey cmd params)
;;
;; (define (add-to-work-queue uconn rdat)
;; #;(queue-add! (udat-work-queue uconn) rdat)
;; (case (work-method)
;; ((threads)
;; (thread-start! (make-thread (lambda ()
;; (do-work uconn rdat))
;; "worker thread")))
;; ((mailbox)
;; (mailbox-send! (udat-work-queue uconn) rdat))
;; ((direct)
;; (do-work uconn rdat))
;; (else
;; (print "ULEX ERROR: work-method "(work-method)" not recognised, using mailbox.")
;; (mailbox-send! (udat-work-queue uconn) rdat))))
;;
;; ;; move the logic to return the result somewhere else?
;; ;;
;; (define (do-work uconn rdat)
;; (let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change
;; ;; put this following into a do-work procedure
;; (match rdat
;; ((rem-host-port qrykey cmd params)
;; (let* ((start-time (current-milliseconds))
;; (result (proc rem-host-port qrykey cmd params))
;; (end-time (current-milliseconds))
;; (run-time (- end-time start-time)))
;; (case (work-method)
;; ((direct) result)
;; (else
;; (print "ULEX: work "cmd", "params" done in "run-time" ms")
;; ;; send 'response as cmd and result as params
;; (send uconn rem-host-port qrykey 'response result) ;; could check for ack
;; (print "ULEX: response sent back to "rem-host-port" in "(- (current-milliseconds) end-time))))))
;; (MBOX_TIMEOUT 'do-work-timeout)
;; (else
;; (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params")))))
;;
;; ;; NEW APPROACH:
;; ;;
;; (define (process-work-queue uconn)
;; (let ((wqueue (udat-work-queue uconn))
;; (proc (udat-work-proc uconn))
;; (numthr (udat-numthreads uconn)))
;; (let loop ((thnum 1)
;; (threads '()))
;; (let ((thlst (cons (make-thread (lambda ()
;; (let work-loop ()
;; (let ((rdat (mailbox-receive! wqueue 24000 'MBOX_TIMEOUT)))
;; (do-work uconn rdat))
;; (work-loop)))
;; (conc "work thread " thnum))
;; threads)))
;; (if (< thnum numthr)
;; (loop (+ thnum 1)
;; thlst)
;; (begin
;; (print "ULEX: Starting "(length thlst)" worker threads.")
;; (map thread-start! thlst)
;; (print "ULEX: Threads started. Joining all.")
;; (map thread-join! thlst)))))))
;;
;; ;; below was to enable re-use of connections. This seems non-trivial so for
;; ;; now lets open on each call
;; ;;
;; ;; ;; given host-port get or create peer struct
;; ;; ;;
;; ;; (define (udat-get-peer uconn host-port)
;; ;; (or (hash-table-ref/default (udat-peers uconn) host-port #f)
;; ;; ;; no peer, so create pdat and init it
;; ;;
;; ;; ;; NEED stack of connections, pop and use; inp, oup,
;; ;; ;; creation_time (remove and create new if over 24hrs old
;; ;; ;;
;; ;; (let ((pdat (make-pdat host-port: host-port)))
;; ;; (hash-table-set! (udat-peers uconn) host-port pdat)
;; ;; pdat)))
;; ;;
;; ;; ;; is pcon alive
;; ;;
;; ;; ;; given host-port and pdat get a pcon
;; ;; ;;
;; ;; (define (pdat-get-pcon pdat host-port)
;; ;; (let loop ((conns (pdat-conns pdat)))
;; ;; (if (null? conns) ;; none? make and return - do NOT add - it will be pushed back on list later
;; ;; (init-pcon (make-pcon))
;; ;; (let* ((conn (pop conns)))
;; ;;
;; ;; ;; given host-port get a pcon struct
;; ;; ;;
;; ;; (define (udat-get-pcon
;;
;; ;;======================================================================
;; ;; misc utils
;; ;;======================================================================
;;
;; (define (make-cookie uconn)
;; (let ((newcnum (+ (udat-cnum uconn) 1)))
;; (udat-cnum-set! uconn newcnum)
;; (conc (udat-host-port uconn) ":"
;; newcnum)))
;;
;; ;; cookie/mboxes
;;
;; ;; we store each mbox with a cookie (<cookie> . <mbox>)
;; ;;
;; (define (get-cmbox uconn)
;; (if (null? (udat-avail-cmboxes uconn))
;; (let ((cookie (make-cookie uconn))
;; (mbox (make-mailbox)))
;; (hash-table-set! (udat-mboxes uconn) cookie mbox)
;; `(,cookie . ,mbox))
;; (let ((cmbox (car (udat-avail-cmboxes uconn))))
;; (udat-avail-cmboxes-set! uconn (cdr (udat-avail-cmboxes uconn)))
;; cmbox)))
;;
;; (define (put-cmbox uconn cmbox)
;; (udat-avail-cmboxes-set! uconn (cons cmbox (udat-avail-cmboxes uconn))))
;;
;; (define (pp-uconn uconn)
;; (pp (udat->alist uconn)))
;;
;;
;; ;;======================================================================
;; ;; network utilities
;; ;;======================================================================
;;
;; ;; NOTE: Look at address-info egg as alternative to some of this
;;
;; (define (rate-ip ipaddr)
;; (regex-case ipaddr
;; ( "^127\\..*" _ 0 )
;; ( "^(10\\.0|192\\.168)\\..*" _ 1 )
;; ( else 2 ) ))
;;
;; ;; Change this to bias for addresses with a reasonable broadcast value?
;; ;;
;; (define (ip-pref-less? a b)
;; (> (rate-ip a) (rate-ip b)))
;;
;; (define (get-my-best-address)
;; (let ((all-my-addresses (get-all-ips)))
;; (cond
;; ((null? all-my-addresses)
;; (get-host-name)) ;; no interfaces?
;; ((eq? (length all-my-addresses) 1)
;; (car all-my-addresses)) ;; only one to choose from, just go with it
;; (else
;; (car (sort all-my-addresses ip-pref-less?))))))
;;
;; (define (get-all-ips-sorted)
;; (sort (get-all-ips) ip-pref-less?))
;;
;; (define (get-all-ips)
;; (map address-info-host
;; (filter (lambda (x)
;; (equal? (address-info-type x) "tcp"))
;; (address-infos (get-host-name)))))
;;
)