;; ulex: Distributed sqlite3 db
;;;
;; Copyright (C) 2018 Matt Welland
;; Redistribution and use in source and binary forms, with or without
;; modification, is permitted.
;;
;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS
;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
;; ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE
;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
;; OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
;; BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
;; LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
;; USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
;; DAMAGE.
;;======================================================================
;; ABOUT:
;; See README in the distribution at https://www.kiatoa.com/fossils/ulex
;; NOTES:
;; Why sql-de-lite and not say, dbi? - performance mostly, then simplicity.
;;
;;======================================================================
(use mailbox)
(module ulex
*
(import scheme
chicken
data-structures
files
foreign
hostinfo
mailbox
matchable
ports extras
posix
regex
regex-case
srfi-1
srfi-18
srfi-4
srfi-69
tcp6
typed-records
)
;; udat struct, used by both caller and callee
;; instantiated as uconn by convention
;;
(defstruct udat
;; the listener side
(host-port #f)
(socket #f)
;; the peers
(peers (make-hash-table)) ;; host:port->peer
;; work handling
(work-queue (make-queue))
(cnum 0) ;; cookie number
(mboxes (make-hash-table))
)
;; struct for keeping track of others we are talking to
;;
(defstruct pdat
(host-port #f)
(conns '()) ;; list of pcon structs, pop one off when calling the peer
)
;; struct for peer connections, keep track of expiration etc.
;;
(defstruct pcon
(inp #f)
(oup #f)
(exp (+ (current-seconds) 59)) ;; expires at this time, set to (+ (current-seconds) 59)
(lifetime (+ (current-seconds) 600)) ;; throw away and create new after five minutes
)
;; send structured data to recipient
;;
;; NOTE: qrykey is what was called the "cookie" previously
;;
;; retval tells send to expect and wait for return data (one line) and return it or time out
;; this is for ping where we don't want to necessarily have set up our own server yet.
;;
(define (send udata host-port cmd qrykey data
#!key (hostname #f)(pid #f)(params '())(retval #f))
(let* ((my-host-port (udat-my-host-port udata))
(isme (equal? host-port my-host-port)) ;; am I calling
;; myself?
(dat (list
cmd ;; " "
my-host-port ;; " "
(udat-my-pid udata) ;; " "
qrykey
params ;;(if (null? params) "" (conc " "
;;(string-intersperse params " ")))
)))
;; (print "send isme is " (if isme "true!" "false!") ",
;; my-host-port: " my-host-port ", host-port: " host-port)
(if isme
(ulex-handler udata dat data)
(handle-exceptions ;; ERROR - MAKE THIS EXCEPTION HANDLER MORE
;; SPECIFIC
exn
#f
(let-values (((inp oup)(tcp-connect host-port)))
;;
;; CONTROL LINE:
;; cmdkey host:port pid qrykey params ...
;;
(let ((res
(if (and inp oup)
(let* ()
(if my-host-port
(begin
(write dat oup)
(write data oup) ;; send as sexpr
;; (print "Sent dat: " dat " data: " data)
(if retval
(read inp)
#t))
(begin
(print "ERROR: send called but no receiver has been setup. Please call setup first!")
#f))
;; NOTE: DO NOT BE TEMPTED TO LOOK AT ANY DATA ON INP HERE!
;; (there is a listener for handling that)
)
#f))) ;; #f means failed to connect and send
(close-input-port inp)
(close-output-port oup)
res))))))
;;======================================================================
;; listener
;;======================================================================
;; create a tcp listener and return a populated udat struct with
;; my port, address, hostname, pid etc.
;; return #f if fail to find a port to allocate.
;;
;; if udata-in is #f create the record
;; if there is already a serv-listener return the udata
;;
(define (setup-listener uconn #!optional (port 4242))
(handle-exceptions
exn
(if (< port 65535)
(setup-listener uconn (+ port 1))
#f)
(connect-listener uconn port)))
(define (connect-listener uconn port)
;; (tcp-listener-socket LISTENER)(socket-name so)
;; sockaddr-address, sockaddr-port, sockaddr->string
(let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]])
(addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname)))
(udat-my-address-set! uconn addr)
(udat-my-port-set! uconn port)
(udat-my-hostname-set! uconn (get-host-name))
(udat-serv-listener-set! uconn tlsn)
uconn))
(define (udat-my-host-port uconn)
(if (and (udat-my-address uconn)(udat-my-port uconn))
(conc (udat-my-address uconn) ":" (udat-my-port uconn))
#f))
;;======================================================================
;; peers and connections
;;======================================================================
;; send structured data to recipient
;;
;; NOTE: qrykey is what was called the "cookie" previously
;;
;; retval tells send to expect and wait for return data (one line) and return it or time out
;; this is for ping where we don't want to necessarily have set up our own server yet.
;;
;; NOTE: see below for beginnings of code to allow re-use of tcp connections
;; - I believe (without substantial evidence) that re-using connections will
;; be beneficial ...
;;
(define (send udata host-port cmd qrykey data
#!key (hostname #f)(pid #f)(params '())(retval #f))
(let* ((my-host-port (udat-host-port udata))
(isme (equal? host-port my-host-port)) ;; am I calling
;; myself?
(dat (list
cmd ;; " "
my-host-port ;; " "
qrykey
params
)))
(if isme
(ulex-handler udata dat data)
(handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC
exn
#f
(let-values (((inp oup)(tcp-connect host-port)))
;;
;; CONTROL LINE:
;; cmdkey host:port pid qrykey params ...
;;
(let ((res (if (and inp oup)
(if my-host-port
(begin
(write dat oup)
(write data oup) ;; send as sexpr
;; (print "Sent dat: " dat " data: " data)
(if retval
(read inp)
#t))
(begin
(print "ERROR: send called but no receiver has been setup. Please call setup first!")
#f))
;; NOTE: DO NOT BE TEMPTED TO LOOK AT ANY DATA ON INP HERE!
;; (there is a listener for handling that)
#f))) ;; #f means failed to connect and send
(close-input-port inp)
(close-output-port oup)
res))))))
;; send a request to the given host-port and register a mailbox in udata
;; wait for the mailbox data and return it
;;
(define (send-receive udata host-port cmd qrykey data #!key (hostname #f)(pid #f)(params '())(timeout 20))
(let ((mbox (make-mailbox))
(mbox-time (current-milliseconds))
(mboxes (udat-mboxes udata)))
(hash-table-set! mboxes qrykey mbox)
(if (send udata host-port cmd qrykey data hostname: hostname pid: pid params: params)
(let* ((mbox-timeout-secs timeout)
(mbox-timeout-result 'MBOX_TIMEOUT)
(res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result))
(mbox-receive-time (current-milliseconds)))
(hash-table-delete! mboxes qrykey)
(if (eq? res 'MBOX_TIMEOUT)
#f
res))
#f))) ;; #f means failed to communicate
;;
(define (ulex-handler udata controldat data)
(print "controldat: " controldat " data: " data)
(match controldat ;; (string-split controldat)
((cmdkey host-port pid qrykey params ...)
;; (print "cmdkey: " cmdkey " host-port: " host-port " pid: " pid " qrykey: " qrykey " params: " params)
(case cmdkey ;; (string->symbol cmdkey)
((ack)(print "Got ack!"))
((ping) ;; special case - return result immediately on the same connection
(let* ((proc (hash-table-ref/default (udat-handlers udata) 'ping #f))
(val (if proc (proc) "gotping"))
(peer (make-peer addr-port: host-port pid: pid))
(dbshash (udat-dbowners udata)))
(peer-dbs-set! peer params) ;; params for ping is list of dbs owned by pinger
(for-each (lambda (dbfile)
(hash-table-set! dbshash dbfile host-port)) ;; WRONG?
params) ;; register each db in the dbshash
(if (not (hash-table-exists? (udat-peers udata) host-port))
(hash-table-set! (udat-peers udata) host-port peer)) ;; save the details of this caller in peers
qrykey)) ;; End of ping
((goodbye)
;; remove all traces of the caller in db ownership etc.
(let* ((peer (hash-table-ref/default (udat-peers udata) host-port #f))
(dbs (if peer (peer-dbs peer) '()))
(dbshash (udat-dbowners udata)))
(for-each (lambda (dbfile)(hash-table-delete! dbshash dbfile)) dbs)
(hash-table-delete! (udat-peers udata) host-port)
qrykey))
((immediate read-only normal low-priority) ;; do this work immediately
;; host-port (caller), pid (caller), qrykey (cookie), params <= all from first line
;; data => a single line encoded however you want, or should I build json into it?
(print "cmdkey=" cmdkey)
(let* ((pdat (get-peer-dat udata host-port)))
(match params ;; dbfile prockey procparam
((dbfile prockey procparam)
(case cmdkey
((immediate read-only)
(process-request udata pdat dbfile qrykey prockey procparam data))
((normal low-priority) ;; split off later and add logic to support low priority
(add-to-work-queue udata pdat dbfile qrykey prockey procparam data))
(else
#f)))
(else
(print "INFO: params=" params " cmdkey=" cmdkey " controldat=" controldat)
#f))))
(else
(add-to-work-queue udata (get-peer-dat udata host-port) cmdkey qrykey data)
#f)))
(else
(print "BAD DATA? controldat=" controldat " data=" data)
#f)));; handles the incoming messages and dispatches to queues
;;
(define (ulex-cmd-loop udata)
(let* ((serv-listener (udat-serv-listener udata)))
;; data comes as two lines
;; cmdkey resp-addr:resp-port hostname pid qrykey [dbpath/dbfile.db]
;; data
(let loop ((state 'start))
(let-values (((inp oup)(tcp-accept serv-listener)))
(let* ((controldat (read inp))
(data (read inp))
(resp (ulex-handler udata controldat data)))
(if resp (write resp oup))
(close-input-port inp)
(close-output-port oup))
(loop state)))))
;; add a proc to the cmd list, these are done symetrically (i.e. in all instances)
;; so that the proc can be dereferenced remotely
;;
(define (register-cmd udata key proc)
(hash-table-set! (udat-cmds udata) key proc))
;;======================================================================
;; work queues
;;======================================================================
(define (add-to-work-queue udata peer-dat cmdkey qrykey data)
(let ((wdat (make-work peer-dat: peer-dat cmdkey: cmdkey qrykey: qrykey data: data)))
(if (udat-busy udata)
(queue-add! (udat-work-queue udata) wdat)
(process-work udata wdat)) ;; passing in wdat tells process-work to first process the passed in wdat
))
(define (do-work udata wdat)
#f)
(define (process-work udata #!optional wdat)
(if wdat (do-work udata wdat)) ;; process wdat
(let ((wqueue (udat-work-queue udata)))
(if (not (queue-empty? wqueue))
(let loop ((wd (queue-remove! wqueue)))
(do-work udata wd)
(if (not (queue-empty? wqueue))
(loop (queue-remove! wqueue)))))))
;; below was to enable re-use of connections. This seems non-trivial so for
;; now lets open on each call
;;
;; ;; given host-port get or create peer struct
;; ;;
;; (define (udat-get-peer uconn host-port)
;; (or (hash-table-ref/default (udat-peers uconn) host-port #f)
;; ;; no peer, so create pdat and init it
;;
;; ;; NEED stack of connections, pop and use; inp, oup,
;; ;; creation_time (remove and create new if over 24hrs old
;; ;;
;; (let ((pdat (make-pdat host-port: host-port)))
;; (hash-table-set! (udat-peers uconn) host-port pdat)
;; pdat)))
;;
;; ;; is pcon alive
;;
;; ;; given host-port and pdat get a pcon
;; ;;
;; (define (pdat-get-pcon pdat host-port)
;; (let loop ((conns (pdat-conns pdat)))
;; (if (null? conns) ;; none? make and return - do NOT add - it will be pushed back on list later
;; (init-pcon (make-pcon))
;; (let* ((conn (pop conns)))
;;
;; ;; given host-port get a pcon struct
;; ;;
;; (define (udat-get-pcon
;;======================================================================
;; misc utils
;;======================================================================
(define (make-cookie uconn)
(let ((newcnum (+ (udat-cnum uconn) 1)))
(udat-cnum-set! uconn newcnum)
(conc (udat-my-address uconn) ":"
(udat-my-port uconn) "-"
newcnum)))
;;======================================================================
;; network utilities
;;======================================================================
;; NOTE: Look at address-info egg as alternative to some of this
(define (rate-ip ipaddr)
(regex-case ipaddr
( "^127\\..*" _ 0 )
( "^(10\\.0|192\\.168)\\..*" _ 1 )
( else 2 ) ))
;; Change this to bias for addresses with a reasonable broadcast value?
;;
(define (ip-pref-less? a b)
(> (rate-ip a) (rate-ip b)))
(define (get-my-best-address)
(let ((all-my-addresses (get-all-ips)))
(cond
((null? all-my-addresses)
(get-host-name)) ;; no interfaces?
((eq? (length all-my-addresses) 1)
(car all-my-addresses)) ;; only one to choose from, just go with it
(else
(car (sort all-my-addresses ip-pref-less?))))))
(define (get-all-ips-sorted)
(sort (get-all-ips) ip-pref-less?))
(define (get-all-ips)
(map ip->string (vector->list
(hostinfo-addresses
(host-information (current-hostname))))))
)