;; ulex: Distributed sqlite3 db
;;;
;; Copyright (C) 2018-2021 Matt Welland
;; Redistribution and use in source and binary forms, with or without
;; modification, is permitted.
;;
;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS
;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
;; ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE
;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
;; OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
;; BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
;; LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
;; USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
;; DAMAGE.
;;======================================================================
;; ABOUT:
;; See README in the distribution at https://www.kiatoa.com/fossils/ulex
;; NOTES:
;; Why sql-de-lite and not say, dbi? - performance mostly, then simplicity.
;;
;;======================================================================
(module ulex
*
#;(
;; NOTE: looking for the handler proc - find the run-listener :)
run-listener ;; (run-listener handler-proc [port]) => uconn
;; NOTE: handler-proc params;
;; (handler-proc rem-host-port qrykey cmd params)
send-receive ;; (send-receive uconn host-port cmd data)
;; NOTE: cmd can be any plain text symbol except for these;
;; 'ping 'ack 'goodbye 'response
set-work-handler ;; (set-work-handler proc)
wait-and-close ;; (wait-and-close uconn)
ulex-listener?
;; needed to get the interface:port that was automatically found
udat-port
udat-host-port
;; for testing only
;; pp-uconn
;; parameters
work-method ;; parameter; 'threads, 'mailbox, 'limited, 'direct
return-method ;; parameter; 'mailbox, 'polling, 'direct
)
(import scheme
chicken.base
chicken.file
chicken.io
chicken.time
chicken.condition
chicken.port
chicken.string
chicken.sort
chicken.pretty-print
chicken.tcp
address-info
mailbox
matchable
;; queues
regex
regex-case
simple-exceptions
s11n
srfi-1
srfi-18
srfi-4
srfi-69
system-information
;; tcp6
tcp-server
typed-records
md5
message-digest
(prefix base64 base64:)
z3
)
;; udat struct, used by both caller and callee
;; instantiated as uconn by convention
;;
(defstruct udat
;; the listener side
(port #f)
(host-port #f) ;; my host:port
(socket #f)
;; the peers
(peers (make-hash-table)) ;; host:port->peer
;; work handling
(work-queue (make-mailbox))
(work-proc #f) ;; set by user
(cnum 0) ;; cookie number
(mboxes (make-hash-table)) ;; for the replies
(avail-cmboxes '()) ;; list of (<cookie> . <mbox>) for re-use
;; threads
(numthreads 10)
(cmd-thread #f)
(work-queue-thread #f)
(num-threads-running 0)
)
;;======================================================================
;; serialization
;; NOTE: I've had problems with read/write and s11n serialize, deserialize
;; thus the inefficient method here
;;======================================================================
(define serializing-method (make-parameter 'complex))
;; NOTE: Can remove the regex and base64 encoding for zmq
(define (obj->string obj)
(case (serializing-method)
((complex)
(string-substitute
(regexp "=") "_"
(base64:base64-encode
(z3:encode-buffer
(with-output-to-string
(lambda ()(serialize obj))))) ;; BB: serialize - this is
;; what causes problems
;; between different builds of
;; megatest communicating.
;; serialize is sensitive to
;; binary image of mtest.
#t))
((write)(with-output-to-string (lambda ()(write obj))))
((s11n) (with-output-to-string (lambda ()(serialize obj))))
(else obj))) ;; rpc
(define (string->obj msg #!key (transport 'http))
(case (serializing-method)
((complex)
(if (string? msg)
(with-input-from-string
(z3:decode-buffer
(base64:base64-decode
(string-substitute
(regexp "_") "=" msg #t)))
(lambda ()(deserialize)))
(begin
(print "ULEX ERROR: cannot translate received data \""msg"\"")
(print-call-chain (current-error-port))
msg))) ;; crude reply for when things go awry
((write)(with-input-from-string msg (lambda ()(read))))
((s11n)(with-input-from-string msg (lambda ()(deserialize))))
(else msg))) ;; rpc
;;======================================================================
;; listener
;;======================================================================
;; is uconn a ulex connector (listener)
;;
(define (ulex-listener? uconn)
(udat? uconn))
;; create a tcp listener and return a populated udat struct with
;; my port, address, hostname, pid etc.
;; return #f if fail to find a port to allocate.
;;
;; if udata-in is #f create the record
;; if there is already a serv-listener return the udata
;;
(define (setup-listener uconn #!optional (port 4242))
(handle-exceptions
exn
(if (< port 65535)
(setup-listener uconn (+ port 1))
#f)
(connect-listener uconn port)))
(define (connect-listener uconn port)
;; (tcp-listener-socket LISTENER)(socket-name so)
;; sockaddr-address, sockaddr-port, sockaddr->string
(let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]])
(addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname)))
(udat-port-set! uconn port)
(udat-host-port-set! uconn (conc addr":"port))
(udat-socket-set! uconn tlsn)
uconn))
;; run-listener does all the work of starting a listener in a thread
;; it then returns control
;;
(define (run-listener handler-proc #!optional (port-suggestion 4242))
(let* ((uconn (make-udat)))
(udat-work-proc-set! uconn handler-proc)
(if (setup-listener uconn port-suggestion)
(let* ((orig-in (current-input-port))
(orig-out (current-output-port)))
((make-tcp-server
(udat-socket uconn)
(lambda ()
(let* ((rdat
(string->obj (read))
;; (read in)
;; (deserialize)
)
(resp (let ((tcp-in (current-input-port))
(tcp-out (current-output-port)))
(current-input-port orig-in)
(current-output-port orig-out)
(let ((res (do-work uconn rdat)))
(current-input-port tcp-in)
(current-output-port tcp-out)
res))))
(write (obj->string resp))
;; (serialize resp)
;; (write resp out)
)))))
(assert #f "ERROR: run-listener called without proper setup."))))
(define (wait-and-close uconn)
(thread-join! (udat-cmd-thread uconn))
(tcp-close (udat-socket uconn)))
;;======================================================================
;; == << ;; peers and connections
;; == << ;;======================================================================
(define *send-mutex* (make-mutex))
;; send structured data to recipient
;;
;; NOTE: qrykey is what was called the "cookie" previously
;;
;; retval tells send to expect and wait for return data (one line) and return it or time out
;; this is for ping where we don't want to necessarily have set up our own server yet.
;;
(define (send-receive udata host-port cmd params)
(let* ((host-port-lst (string-split host-port ":"))
(host (car host-port-lst))
(port (string->number (cadr host-port-lst)))
(my-host-port (and udata (udat-host-port udata))) ;; remote will return to this
(isme (equal? host-port my-host-port)) ;; calling myself?
;; dat is a self-contained work block that can be sent or handled locally
(dat (list my-host-port 'qrykey cmd params #;(cons (current-seconds)(current-milliseconds)))))
(cond
(isme (do-work udata dat)) ;; no transmission needed
(else
;; (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC?
;; exn
;; (begin
;; (print "ULEX send-receive: exn="exn)
;; (message exn))
;; (begin
;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP
(let-values (((inp oup)(tcp-connect host port)))
(let ((res (if (and inp oup)
(begin
(write (obj->string dat) oup)
;; (write dat oup)
;; (serialize dat oup)
(close-output-port oup)
(string->obj (read inp))
;; (read inp)
;; (deserialize inp)
)
(begin
(print "ERROR: send called but no receiver has been setup. Please call setup first!")
#f))))
;; (close-output-port oup)
(close-input-port inp)
;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP
res))))));; )) ;; res will always be 'ack unless return-method is direct
;;======================================================================
;; work queues - this is all happening on the listener side
;;======================================================================
;; move the logic to return the result somewhere else?
;;
(define (do-work uconn rdat)
;; put this following into a do-work procedure
(match rdat
((rem-host-port qrykey cmd params)
(case cmd
((ping) 'ping-ack) ;; bypass calling the proc
(else
(let* ((proc (udat-work-proc uconn))
(start-time (current-milliseconds))
(result (with-output-to-port (current-error-port)
(lambda ()
(proc rem-host-port qrykey cmd params))))
(end-time (current-milliseconds))
(run-time (- end-time start-time)))
result))))
(else
(print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params"))))
;;======================================================================
;; misc utils
;;======================================================================
(define (pp-uconn uconn)
(pp (udat->alist uconn)))
;;======================================================================
;; network utilities
;;======================================================================
;; NOTE: Look at address-info egg as alternative to some of this
(define (rate-ip ipaddr)
(regex-case ipaddr
( "^127\\..*" _ 0 )
( "^(10\\.0|192\\.168)\\..*" _ 1 )
( else 2 ) ))
;; Change this to bias for addresses with a reasonable broadcast value?
;;
(define (ip-pref-less? a b)
(> (rate-ip a) (rate-ip b)))
(define (get-my-best-address)
(let ((all-my-addresses (get-all-ips)))
(cond
((null? all-my-addresses)
(get-host-name)) ;; no interfaces?
((eq? (length all-my-addresses) 1)
(car all-my-addresses)) ;; only one to choose from, just go with it
(else
(car (sort all-my-addresses ip-pref-less?))))))
(define (get-all-ips-sorted)
(sort (get-all-ips) ip-pref-less?))
(define (get-all-ips)
(map address-info-host
(filter (lambda (x)
(equal? (address-info-type x) "tcp"))
(address-infos (get-host-name)))))
)