;; ulex: Distributed sqlite3 db
;;;
;; Copyright (C) 2018-2021 Matt Welland
;; Redistribution and use in source and binary forms, with or without
;; modification, is permitted.
;;
;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS
;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
;; ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE
;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
;; OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
;; BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
;; LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
;; USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
;; DAMAGE.
;;======================================================================
;; ABOUT:
;; See README in the distribution at https://www.kiatoa.com/fossils/ulex
;; NOTES:
;; Why sql-de-lite and not say, dbi? - performance mostly, then simplicity.
;;
;;======================================================================
(module ulex
(
;; NOTE: looking for the handler proc - find the run-listener :)
run-listener ;; (run-listener handler-proc [port]) => uconn
;; NOTE: handler-proc params;
;; (handler-proc rem-host-port qrykey cmd params)
send-receive ;; (send-receive uconn host-port cmd data)
;; NOTE: cmd can be any plain text symbol except for these;
;; 'ping 'ack 'goodbye 'response
set-work-handler ;; (set-work-handler proc)
wait-and-close ;; (wait-and-close uconn)
ulex-listener?
;; needed to get the interface:port that was automatically found
udat-port
udat-host-port
;; for testing only
;; pp-uconn
)
(import scheme
chicken.base
chicken.file
chicken.time
chicken.condition
chicken.string
chicken.sort
chicken.pretty-print
chicken.process-context.posix
address-info
mailbox
matchable
;; queues
regex
regex-case
s11n
srfi-1
srfi-18
srfi-4
srfi-69
system-information
tcp6
typed-records
)
;; udat struct, used by both caller and callee
;; instantiated as uconn by convention
;;
(defstruct udat
;; the listener side
(port #f)
(host-port #f)
(socket #f)
;; the peers
(peers (make-hash-table)) ;; host:port->peer
;; work handling
(work-queue (make-mailbox))
(work-proc #f) ;; set by user
(cnum 0) ;; cookie number
(mboxes (make-hash-table)) ;; for the replies
(avail-cmboxes '()) ;; list of (<cookie> . <mbox>) for re-use
;; threads
(numthreads 50)
(cmd-thread #f)
(work-queue-thread #f)
)
;; short: immediately run the task and return the result
;; mailb: use mailbox to queue the tasks on the server side
;; send tasks to mailbox, send results to caller
;; thread: use immediately created threads to do the task, send
;; is called directly
;;
(define ulex-mode (make-parameter 'short))
;;======================================================================
;; listener
;;======================================================================
;; is uconn a ulex connector (listener)
;;
(define (ulex-listener? uconn)
(udat? uconn))
;; create a tcp listener and return a populated udat struct with
;; my port, address, hostname, pid etc.
;; return #f if fail to find a port to allocate.
;;
;; if udata-in is #f create the record
;; if there is already a serv-listener return the udata
;;
(define (setup-listener uconn #!optional (port 4242))
(handle-exceptions
exn
(if (< port 65535)
(setup-listener uconn (+ port 1))
#f)
(connect-listener uconn port)))
(define (connect-listener uconn port)
;; (tcp-listener-socket LISTENER)(socket-name so)
;; sockaddr-address, sockaddr-port, sockaddr->string
(let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]])
(addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname)))
(udat-port-set! uconn port)
(udat-host-port-set! uconn (conc addr":"port))
(udat-socket-set! uconn tlsn)
uconn))
;; run-listener does all the work of starting a listener in a thread
;; it then returns control
;;
(define (run-listener handler-proc #!optional (port-suggestion 4242))
(let* ((uconn (make-udat)))
(udat-work-proc-set! uconn handler-proc)
(if (setup-listener uconn port-suggestion)
(let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop"))
(th2 (make-thread (lambda ()(process-work-queue uconn)) "Ulex work queue processor")))
(tcp-buffer-size 2048)
;; (max-connections 2048)
(thread-start! th1)
(thread-start! th2)
(udat-cmd-thread-set! uconn th1)
(udat-work-queue-thread-set! uconn th2)
(print "cmd loop and process workers started")
uconn)
(assert #f "ERROR: run-listener called without proper setup."))))
(define (wait-and-close uconn)
(thread-join! (udat-cmd-thread uconn))
(tcp-close (udat-socket uconn)))
;;======================================================================
;; peers and connections
;;======================================================================
;; send structured data to recipient
;;
;; NOTE: qrykey is what was called the "cookie" previously
;;
;; retval tells send to expect and wait for return data (one line) and return it or time out
;; this is for ping where we don't want to necessarily have set up our own server yet.
;;
;; NOTE: see below for beginnings of code to allow re-use of tcp connections
;; - I believe (without substantial evidence) that re-using connections will
;; be beneficial ...
;;
(define (send udata host-port qrykey cmd params)
(let* ((my-host-port (udat-host-port udata)) ;; remote will return to this
(isme #f #;(equal? host-port my-host-port)) ;; calling myself?
;; dat is a self-contained work block that can be sent or handled locally
(dat (list my-host-port qrykey cmd params))
)
(if isme
(ulex-handler udata dat) ;; no transmission needed
(handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC?
exn
#f
(let-values (((inp oup)(tcp-connect host-port)))
(let ((res (if (and inp oup)
(begin
(serialize dat oup)
(deserialize inp)) ;; yes, we always want an ack
(begin
(print "ERROR: send called but no receiver has been setup. Please call setup first!")
#f))))
(close-input-port inp)
(close-output-port oup)
res)))))) ;; res will always be 'ack
(define (delta-print start-ms threshold . params)
(let* ((delta (- (current-milliseconds) start-ms)))
(if (>= delta threshold)
(apply print "ULEX: long wait "delta"ms. " params))))
;; send a request to the given host-port and register a mailbox in udata
;; wait for the mailbox data and return it
;;
(define (send-receive uconn host-port cmd data)
(cond
((member cmd '(ping goodbye)) ;; these are immediate
(send uconn host-port 'ping cmd data))
((member (ulex-mode) '(short))
(send uconn host-port 'nokey cmd data))
(else ;; 'full
(let* ((cmbox (get-cmbox uconn)) ;; would it be better to keep a stack of mboxes to reuse?
(qrykey (car cmbox))
(mbox (cdr cmbox))
(mbox-time (current-milliseconds))
(sres (send uconn host-port qrykey cmd data))) ;; short res
(cond
((eq? sres 'ack)
(let* ((mbox-timeout-secs 120 #;(if (eq? 'primordial (thread-name (current-thread)))
#f
120)) ;; timeout)
(mbox-timeout-result 'MBOX_TIMEOUT)
(res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result))
(mbox-receive-time (current-milliseconds)))
(delta-print mbox-time 6000 ", pid="(current-process-id)". send-receive mailbox received "res)
;; (put-cmbox uconn cmbox) ;; reuse mbox and cookie. is it worth it?
(hash-table-delete! (udat-mboxes uconn) qrykey)
(if (eq? res 'MBOX_TIMEOUT)
(begin
(print "WARNING: mbox timed out for query "cmd", with data "data)
#f) ;; convert to raising exception?
res)))
(else
(print "ERROR: Communication failed? Got "sres)
#f)))))) ;; #f means failed to communicate
;;======================================================================
;; responder side
;;======================================================================
;; take a request, rdat, and if not immediate put it in the work queue
;;
;; Reserved cmds; ack ping goodbye response
;;
(define (ulex-handler uconn rdat)
(assert (list? rdat) "FATAL: ulex-handler give rdat as not list")
(match rdat ;; (string-split controldat)
((rem-host-port qrykey cmd params)
;; (print "ulex-handler got: "rem-host-port" qrykey: "qrykey" cmd: "cmd" params: "params)
(case (ulex-mode)
((short)(do-work uconn rdat))
((mailb)
(let ((mbox (hash-table-ref/default (udat-mboxes uconn) qrykey #f)))
(case cmd
;; ((ack )(print "Got ack! But why? Should NOT get here.") 'ack)
((ping)
;; (print "Got Ping!")
;; (add-to-work-queue uconn rdat)
'ack)
((goodbye)
;; just clear out references to the caller
(add-to-work-queue uconn rdat)
'ack)
((response) ;; this is a result from remote processing, send it as mail ...
(if mbox
(begin
(mailbox-send! mbox params) ;; params here is our result
'ack)
(begin
(print "ERROR: received result but no associated mbox for cookie "qrykey)
#f)))
(else
(add-to-work-queue uconn rdat)
'ack))))
((thread)(thread-start!
(make-thread (lambda ()
(do-work uconn rdat)))))
(else
(print "BAD DATA? controldat=" rdat)
'ack) ;; send ack anyway?
))))
;; given an already set up uconn start the cmd-loop
;;
(define (ulex-cmd-loop uconn)
(let* ((serv-listener (udat-socket uconn)))
(let loop ((state 'start))
(let-values (((inp oup)(tcp-accept serv-listener)))
(let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params)
(resp (ulex-handler uconn rdat)))
(if resp (serialize resp oup))
(close-input-port inp)
(close-output-port oup))
(loop state)))))
;; add a proc to the cmd list, these are done symetrically (i.e. in all instances)
;; so that the proc can be dereferenced remotely
;;
(define (set-work-handler uconn proc)
(udat-work-proc-set! uconn proc))
;;======================================================================
;; work queues - this is all happening on the listener side
;;======================================================================
;; rdat is (rem-host-port qrykey cmd params)
(define (add-to-work-queue uconn rdat)
#;(queue-add! (udat-work-queue uconn) rdat)
(case (ulex-mode)
((short) (assert #f "FATAL: Should never get here."))
((full)
(mailbox-send! (udat-work-queue uconn) rdat))
((thread)
(thread-start!
(make-thread
(lambda ()(do-work uconn rdat)) "do-work")))))
(define (do-work uconn rdat)
(let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change
;; put this following into a do-work procedure
(match rdat
((rem-host-port qrykey cmd params)
(let* ((start-time (current-milliseconds))
(result (proc rem-host-port qrykey cmd params))
(end-time (current-milliseconds))
(run-time (- end-time start-time)))
(print "ULEX: work "cmd", "params" done in "run-time" ms")
;; send 'response as cmd and result as params
(case (ulex-mode)
((mailb thread)
(send uconn rem-host-port qrykey 'response result)) ;; could check for ack
((short) result)
(else
(print "ULEX: error - ulex-mode unrecognised "(ulex-mode))))))
;; (print "ULEX: response sent back to "rem-host-port" in "(- (current-milliseconds) end-time))))
(MBOX_TIMEOUT #f)
(else
(print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params")
#f))))
(define (process-work-queue uconn)
(let ((wqueue (udat-work-queue uconn))
(proc (udat-work-proc uconn))
(numthr (udat-numthreads uconn)))
(let loop ((thnum 1)
(threads '()))
(let ((thlst (cons (make-thread (lambda ()
(let work-loop ()
(let ((start-time (current-milliseconds))
(rdat (mailbox-receive! wqueue 24000 'MBOX_TIMEOUT)))
(delta-print start-time 60000 ", pid="(current-process-id)" process-work-queue mailbox received "rdat)
(do-work uconn rdat))
(work-loop)))
(conc "work thread " thnum))
threads)))
(if (< thnum numthr)
(loop (+ thnum 1)
thlst)
(begin
(print "ULEX: Starting "(length thlst)" worker threads.")
(map thread-start! thlst)
(print "ULEX: Threads started. Joining all.")
(map thread-join! thlst)))))))
;; below was to enable re-use of connections. This seems non-trivial so for
;; now lets open on each call
;;
;; ;; given host-port get or create peer struct
;; ;;
;; (define (udat-get-peer uconn host-port)
;; (or (hash-table-ref/default (udat-peers uconn) host-port #f)
;; ;; no peer, so create pdat and init it
;;
;; ;; NEED stack of connections, pop and use; inp, oup,
;; ;; creation_time (remove and create new if over 24hrs old
;; ;;
;; (let ((pdat (make-pdat host-port: host-port)))
;; (hash-table-set! (udat-peers uconn) host-port pdat)
;; pdat)))
;;
;; ;; is pcon alive
;;
;; ;; given host-port and pdat get a pcon
;; ;;
;; (define (pdat-get-pcon pdat host-port)
;; (let loop ((conns (pdat-conns pdat)))
;; (if (null? conns) ;; none? make and return - do NOT add - it will be pushed back on list later
;; (init-pcon (make-pcon))
;; (let* ((conn (pop conns)))
;;
;; ;; given host-port get a pcon struct
;; ;;
;; (define (udat-get-pcon
;;======================================================================
;; misc utils
;;======================================================================
(define (make-cookie uconn)
(let ((newcnum (+ (udat-cnum uconn) 1)))
(udat-cnum-set! uconn newcnum)
(conc (udat-host-port uconn) ":"
newcnum)))
;; cookie/mboxes
;; we store each mbox with a cookie (<cookie> . <mbox>)
;;
(define (get-cmbox uconn)
(if (null? (udat-avail-cmboxes uconn))
(let ((cookie (make-cookie uconn))
(mbox (make-mailbox)))
(hash-table-set! (udat-mboxes uconn) cookie mbox)
`(,cookie . ,mbox))
(let ((cmbox (car (udat-avail-cmboxes uconn))))
(udat-avail-cmboxes-set! uconn (cdr (udat-avail-cmboxes uconn)))
cmbox)))
(define (put-cmbox uconn cmbox)
(udat-avail-cmboxes-set! uconn (cons cmbox (udat-avail-cmboxes uconn))))
(define (pp-uconn uconn)
(pp (udat->alist uconn)))
;;======================================================================
;; network utilities
;;======================================================================
;; NOTE: Look at address-info egg as alternative to some of this
(define (rate-ip ipaddr)
(regex-case ipaddr
( "^127\\..*" _ 0 )
( "^(10\\.0|192\\.168)\\..*" _ 1 )
( else 2 ) ))
;; Change this to bias for addresses with a reasonable broadcast value?
;;
(define (ip-pref-less? a b)
(> (rate-ip a) (rate-ip b)))
(define (get-my-best-address)
(let ((all-my-addresses (get-all-ips)))
(cond
((null? all-my-addresses)
(get-host-name)) ;; no interfaces?
((eq? (length all-my-addresses) 1)
(car all-my-addresses)) ;; only one to choose from, just go with it
(else
(car (sort all-my-addresses ip-pref-less?))))))
(define (get-all-ips-sorted)
(sort (get-all-ips) ip-pref-less?))
(define (get-all-ips)
(map address-info-host
(filter (lambda (x)
(equal? (address-info-type x) "tcp"))
(address-infos (get-host-name)))))
)