;; ulex: Distributed sqlite3 db
;;;
;; Copyright (C) 2018-2021 Matt Welland
;; Redistribution and use in source and binary forms, with or without
;; modification, is permitted.
;;
;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS
;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
;; ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE
;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
;; OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
;; BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
;; LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
;; USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
;; DAMAGE.
;;======================================================================
;; ABOUT:
;; See README in the distribution at https://www.kiatoa.com/fossils/ulex
;; NOTES:
;; Why sql-de-lite and not say, dbi? - performance mostly, then simplicity.
;;
;;======================================================================
(module ulex
*
#;(
;; NOTE: looking for the handler proc - find the run-listener :)
run-listener ;; (run-listener handler-proc [port]) => uconn
;; NOTE: handler-proc params;
;; (handler-proc rem-host-port qrykey cmd params)
send-receive ;; (send-receive uconn host-port cmd data)
;; NOTE: cmd can be any plain text symbol except for these;
;; 'ping 'ack 'goodbye 'response
set-work-handler ;; (set-work-handler proc)
wait-and-close ;; (wait-and-close uconn)
ulex-listener?
;; needed to get the interface:port that was automatically found
udat-port
udat-host-port
;; for testing only
;; pp-uconn
)
(import scheme
chicken.base
chicken.file
chicken.time
chicken.condition
chicken.string
chicken.sort
chicken.pretty-print
;; chicken.tcp
address-info
mailbox
matchable
;; queues
regex
regex-case
s11n
srfi-1
srfi-18
srfi-4
srfi-69
system-information
tcp6
typed-records
;; tcp-server
)
;; udat struct, used by both caller and callee
;; instantiated as uconn by convention
;;
(defstruct udat
;; the listener side
(port #f)
(host-port #f)
(socket #f)
;; the peers
(peers (make-hash-table)) ;; host:port->peer
;; work handling
(work-queue (make-mailbox))
(work-proc #f) ;; set by user
(cnum 0) ;; cookie number
(mboxes (make-hash-table)) ;; for the replies
(avail-cmboxes '()) ;; list of (<cookie> . <mbox>) for re-use
;; threads
(numthreads 50)
(cmd-thread #f)
(work-queue-thread #f)
(num-threads-running 0)
)
;;======================================================================
;; listener
;;======================================================================
;; is uconn a ulex connector (listener)
;;
(define (ulex-listener? uconn)
(udat? uconn))
;; create a tcp listener and return a populated udat struct with
;; my port, address, hostname, pid etc.
;; return #f if fail to find a port to allocate.
;;
;; if udata-in is #f create the record
;; if there is already a serv-listener return the udata
;;
(define (setup-listener uconn #!optional (port 4242))
(handle-exceptions
exn
(if (< port 65535)
(begin
(thread-sleep! 0.1) ;; I'm not sure this helps but give the OS some time to do it's thing
(print "ULEX INFO: skipping port already in use "port)
(setup-listener uconn (+ port 1)))
#f)
(connect-listener uconn port)))
(define (connect-listener uconn port)
;; (tcp-listener-socket LISTENER)(socket-name so)
;; sockaddr-address, sockaddr-port, sockaddr->string
(let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]])
(addr (get-host-name))) ;; (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname)))
(udat-port-set! uconn port)
(udat-host-port-set! uconn (conc addr":"port))
(udat-socket-set! uconn tlsn)
uconn))
;; run-listener does all the work of starting a listener in a thread
;; it then returns control
;;
(define (run-listener handler-proc #!optional (port-suggestion 4242))
(let* ((uconn (make-udat)))
(udat-work-proc-set! uconn handler-proc)
(tcp-buffer-size 2048)
(if (setup-listener uconn port-suggestion)
(let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop")))
(thread-start! th1)
(udat-cmd-thread-set! uconn th1)
(print "cmd loop started")
uconn)
(assert #f "ERROR: run-listener called without proper setup."))))
(define (wait-and-close uconn)
(let loop ()
(if (not (udat-cmd-thread uconn))
(begin
(thread-sleep! 1)
(loop))))
(thread-join! (udat-cmd-thread uconn))
#;(tcp-close (udat-socket uconn)))
;;======================================================================
;; peers and connections
;;======================================================================
(define *send-mutex* (make-mutex))
;; send structured data to recipient
;;
;; NOTE: qrykey is what was called the "cookie" previously
;;
;; retval tells send to expect and wait for return data (one line) and return it or time out
;; this is for ping where we don't want to necessarily have set up our own server yet.
;;
;; NOTE: see below for beginnings of code to allow re-use of tcp connections
;; - I believe (without substantial evidence) that re-using connections will
;; be beneficial ...
;;
(define (send-receive udata host-port cmd params)
(mutex-lock! *send-mutex*)
(let* ((my-host-port (udat-host-port udata)) ;; remote will return to this
(isme (equal? host-port my-host-port)) ;; calling myself?
;; dat is a self-contained work block that can be sent or handled locally
(dat (list my-host-port cmd params))
(parts (string-split host-port ":"))
(host (car parts))
(port (string->number (cadr parts))))
(if isme
(ulex-handler udata dat) ;; no transmission needed
(let-values (((inp oup)(tcp-connect host-port)))
(let ((res (if (and inp oup)
(begin
(serialize dat oup)
(deserialize inp)) ;; yes, we always want an ack
(begin
(print "ERROR: send called but no receiver has been setup. Please call setup first!")
#f))))
(close-input-port inp)
(close-output-port oup)
(mutex-unlock! *send-mutex*)
res)))))
;;======================================================================
;; responder side
;;======================================================================
;; take a request, rdat, and if not immediate put it in the work queue
;;
;; Reserved cmds; ack ping goodbye response
;;
(define (ulex-handler uconn rdat)
(assert (list? rdat) "FATAL: ulex-handler give rdat as not list")
(match rdat
((rem-host-port cmd params)
(do-work uconn rdat))
(else
(print "BAD DATA? controldat=" rdat)
'bad-data)
))
;; given an already set up uconn start the cmd-loop
;;
#;(define (ulex-cmd-loop uconn)
(let* ((serv-listener (udat-socket uconn))
(server (make-tcp-server
serv-listener
(lambda ()
(let* ((rdat (deserialize)) ;; '(my-host-port qrykey cmd params)
(resp (ulex-handler uconn rdat)))
(if resp
(serialize resp)
(write resp)))))))
(server)))
;; given an already set up uconn start the cmd-loop
;;
(define (ulex-cmd-loop uconn)
(let* ((serv-listener (udat-socket uconn))
(listener (lambda ()
(let loop ((state 'start))
(let-values (((inp oup)(tcp-accept serv-listener)))
(let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params)
(resp (ulex-handler uconn rdat)))
(serialize resp oup)
(close-input-port inp)
(close-output-port oup))
(loop state))))))
;; start N of them
(let loop ((thnum 0)
(threads '()))
(if (< thnum 100)
(let* ((th (make-thread listener (conc "listener" thnum))))
(thread-start! th)
(loop (+ thnum 1)
(cons th threads)))
(map thread-join! threads)))))
;; add a proc to the cmd list, these are done symetrically (i.e. in all instances)
;; so that the proc can be dereferenced remotely
;;
(define (set-work-handler uconn proc)
(udat-work-proc-set! uconn proc))
;;======================================================================
;; work queues - this is all happening on the listener side
;;======================================================================
;; rdat is (rem-host-port qrykey cmd params)
(define (add-to-work-queue uconn rdat)
#;(queue-add! (udat-work-queue uconn) rdat)
(mailbox-send! (udat-work-queue uconn) rdat))
(define (do-work uconn rdat)
(let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change
;; put this following into a do-work procedure
(match rdat
((rem-host-port cmd params)
(let* ((start-time (current-milliseconds))
(result (proc rem-host-port cmd params))
(end-time (current-milliseconds))
(run-time (- end-time start-time)))
result))
(else
(print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params")
#f))))
;;======================================================================
;; misc utils
;;======================================================================
(define (make-cookie uconn)
(let ((newcnum (+ (udat-cnum uconn) 1)))
(udat-cnum-set! uconn newcnum)
(conc (udat-host-port uconn) ":"
newcnum)))
;; cookie/mboxes
;; we store each mbox with a cookie (<cookie> . <mbox>)
;;
(define (get-cmbox uconn)
(if (null? (udat-avail-cmboxes uconn))
(let ((cookie (make-cookie uconn))
(mbox (make-mailbox)))
(hash-table-set! (udat-mboxes uconn) cookie mbox)
`(,cookie . ,mbox))
(let ((cmbox (car (udat-avail-cmboxes uconn))))
(udat-avail-cmboxes-set! uconn (cdr (udat-avail-cmboxes uconn)))
cmbox)))
(define (put-cmbox uconn cmbox)
(udat-avail-cmboxes-set! uconn (cons cmbox (udat-avail-cmboxes uconn))))
(define (pp-uconn uconn)
(pp (udat->alist uconn)))
;;======================================================================
;; network utilities
;;======================================================================
;; NOTE: Look at address-info egg as alternative to some of this
(define (rate-ip ipaddr)
(regex-case ipaddr
( "^127\\..*" _ 0 )
( "^(10\\.0|192\\.168)\\..*" _ 1 )
( else 2 ) ))
;; Change this to bias for addresses with a reasonable broadcast value?
;;
(define (ip-pref-less? a b)
(> (rate-ip a) (rate-ip b)))
(define (get-my-best-address)
(let ((all-my-addresses (get-all-ips)))
(cond
((null? all-my-addresses)
(get-host-name)) ;; no interfaces?
((eq? (length all-my-addresses) 1)
(car all-my-addresses)) ;; only one to choose from, just go with it
(else
(car (sort all-my-addresses ip-pref-less?))))))
(define (get-all-ips-sorted)
(sort (get-all-ips) ip-pref-less?))
(define (get-all-ips)
(map address-info-host
(filter (lambda (x)
(equal? (address-info-type x) 'tcp))
(address-infos (get-host-name)))))
)