;;; ulex: Distributed sqlite3 db
;;;
;; Copyright (C) 2018 Matt Welland
;; Redistribution and use in source and binary forms, with or without
;; modification, is permitted.
;;
;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS
;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
;; ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE
;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
;; OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
;; BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
;; LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
;; USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
;; DAMAGE.
;;======================================================================
;; ABOUT:
;; See README in the distribution at https://www.kiatoa.com/fossils/ulex
;; NOTES:
;; Why sql-de-lite and not say, dbi? - performance mostly, then simplicity.
;;
;;======================================================================
;; (use rpc pkts mailbox sqlite3)
(module ulex
*
(import scheme posix chicken data-structures ports extras files mailbox)
(import srfi-18 pkts matchable regex
typed-records srfi-69 srfi-1
srfi-4 regex-case
(prefix sqlite3 sqlite3:)
foreign
tcp6
;; ulex-netutil
hostinfo)
;;======================================================================
;; network utilities
;;======================================================================
(define (rate-ip ipaddr)
(regex-case ipaddr
( "^127\\..*" _ 0 )
( "^(10\\.0|192\\.168)\\..*" _ 1 )
( else 2 ) ))
;; Change this to bias for addresses with a reasonable broadcast value?
;;
(define (ip-pref-less? a b)
(> (rate-ip a) (rate-ip b)))
(define (get-my-best-address)
(let ((all-my-addresses (get-all-ips))
;;(all-my-addresses-old (vector->list (hostinfo-addresses (hostname->hostinfo (get-host-name)))))
)
(cond
((null? all-my-addresses)
(get-host-name)) ;; no interfaces?
((eq? (length all-my-addresses) 1)
(car all-my-addresses)) ;; only one to choose from, just go with it
(else
(car (sort all-my-addresses ip-pref-less?)))
;; (else
;; (ip->string (car (filter (lambda (x) ;; take any but 127.
;; (not (eq? (u8vector-ref x 0) 127)))
;; all-my-addresses))))
)))
(define (get-all-ips-sorted)
(sort (get-all-ips) ip-pref-less?))
(define (get-all-ips)
(map ip->string (vector->list
(hostinfo-addresses
(host-information (current-hostname))))))
;; make it a global? Well, it is local to area module
(define *captain-pktspec*
`((captain (host . h)
(port . p)
(pid . i)
(ipaddr . a)
)
#;(data (hostname . h) ;; sender hostname
(port . p) ;; sender port
(ipaddr . a) ;; sender ip
(hostkey . k) ;; sending host key - store info at server under this key
(servkey . s) ;; server key - this needs to match at server end or reject the msg
(format . f) ;; sb=serialized-base64, t=text, sx=sexpr, j=json
(data . d) ;; base64 encoded slln data
)))
;; struct for keeping track of our world
(defstruct udat
(captain-address #f)
(captain-host #f)
(captain-port #f)
(captain-pid #f)
(cpkts-dir (conc (get-environment-variable "HOME") "/.ulex/pkts"))
(cpkt-spec *captain-pktspec*)
(my-cpkt-key #f) ;; put Z card here when I create a pkt for myself as captain
(my-address #f)
(my-hostname #f)
(my-port #f)
(my-pid (current-process-id))
(serv-listener #f)
(handler-thread #f)
(handlers (make-hash-table))
(outgoing-conns (make-hash-table)) ;; host:port -> conn
)
;; struct for keeping track of others we are talking to
(defstruct peer
(addr-port #f)
(hostname #f)
(pid #f)
(inp #f) ;; input port from the peer
(oup #f) ;; output port to the peer
(owns '()) ;; list of databases this peer is currently handling
)
;;======================================================================
;; Captain pkt functions
;;======================================================================
;; given a pkts dir read
;;
(define (get-all-captain-pkts udata)
(let* ((pktsdir (let ((d (udat-cpkts-dir udata)))
(if (file-exists? d)
d
(begin
(create-directory d #t)
d))))
(all-pkt-files (glob (conc pktsdir "/*.pkt")))
(pkt-spec (udat-cpkt-spec udata)))
(map (lambda (pkt-file)
(read-pkt->alist pkt-file pktspec: pkt-spec))
all-pkt-files)))
;; sort by D then Z, return one, choose the oldest then
;; differentiate if needed using the Z key
;;l
(define (get-winning-pkt pkts)
(if (null? pkts)
#f
(car (sort pkts (lambda (a b)
(let ((ad (string->number (alist-ref 'D a)))
(bd (string->number (alist-ref 'D b))))
(if (eq? a b)
(let ((az (alist-ref 'Z a))
(bz (alist-ref 'Z b)))
(string>=? az bz))
(> ad bd))))))))
;; create a tcp listener and return a populated udat struct with
;; my port, address, hostname, pid etc.
;; return #f if fail to find a port to allocate.
;;
(define (start-server-find-port udata #!optional (port 4242))
(handle-exceptions
exn
(if (< port 65535)(start-server-find-port udata (+ port 1)) #f)
(connect-server udata port)))
(define (connect-server udata port)
;; (tcp-listener-socket LISTENER)(socket-name so)
;; sockaddr-address, sockaddr-port, sockaddr->string
(let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]])
(addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname)))
(udat-my-address-set! udata addr)
(udat-my-port-set! udata port)
(udat-my-hostname-set! udata (get-host-name))
(udat-serv-listener-set! udata tlsn)
udata))
;; put the host, ip, port and pid into a pkt in
;; the captain pkts dir
;; - assumes user has already fired up a server
;; which will be in the udata struct
;;
(define (create-captain-pkt udata)
(if (not (udat-serv-listener udata))
(begin
(print "ERROR: create-captain-pkt called with out a listener")
#f)
(let* ((pktdat `((port . ,(udat-my-port udata))
(host . ,(udat-my-hostname udata))
(ipaddr . ,(udat-my-address udata))
(pid . ,(udat-my-pid udata))))
(pktdir (udat-cpkts-dir udata))
(pktspec (udat-cpkt-spec udata))
)
(udat-my-cpkt-key-set!
udata
(write-alist->pkt
pktdir
pktdat
pktspec: pktspec
ptype: 'captain))
(udat-my-cpkt-key udata))))
;; NB// This needs to be started in a thread
;;
;; setup to be a captain
;; - start server
;; - create pkt
;; - start server port handler
;;
(define (setup-as-captain udata)
(if (start-server-find-port udata) ;; puts the server in udata
(if (create-captain-pkt udata)
(let* ((th (make-thread (lambda ()
(ulex-handler udata)) "Captain handler")))
(udat-handler-thread-set! udata th)
(thread-start! th))
#f)
#f))
(define (get-peer-dat udata host-port #!optional (hostname #f)(pid #f))
(let* ((pdat (or (hash-table-ref/default (udat-outgoing-conns udata) host-port #f)
(let ((npdat (make-peer addr-port: host-port)))
(if hostname (peer-hostname-set! npdat hostname))
(if pid (peer-pid-set! npdat pid))
(let-values (((ninp noup)(tcp-connect host-port)))
(peer-inp-set! npdat ninp)
(peer-oup-set! npdat noup))
(hash-table-set! (udat-outgoing-conns udata) host-port npdat)
npdat))))
pdat))
(define (get-peer-ports udata host-port #!optional (hostname #f)(pid #f))
(let ((pdat (get-peer-dat udata host-port hostname pid)))
(values (peer-inp pdat)(peer-oup pdat))))
;; send back ack, amusing I suppose that this looks almost like what
;; tcp itself does ...
;;
(define (reply udata host-port handler qrykey data #!optional (hostname #f)(pid #f))
(let-values (((inp oup)(get-peer-ports udata host-port hostname pid)))
(write-line (conc
handler " "
(udat-my-address udata) ":" (udat-my-port udata) " "
(udat-my-hostname udata) " "
(udat-my-pid udata) " "
qrykey)
oup)
(write-line data oup))) ;; we must send a second line - for the ack let it be the qrykey
(define (add-to-work-queue udata . blah)
#f)
;; send back ack
;;
(define (send-ack udata host-port qrykey #!optional (hostname #f)(pid #f))
(reply udata "ack" qrykey qrykey hostname pid)) ;; we must send a second line - for the ack let it be the qrykey
;;
;;
(define (ulex-handler udata)
(let* ((serv-listener (udat-serv-listener udata)))
(let-values (((inp oup)(tcp-accept serv-listener)))
;; data comes as two lines
;; handlerkey resp-addr:resp-port hostname pid qrykey [dbpath/dbfile.db]
;; data
(let loop ((state 'start))
(let* ((controldat (read-line inp))
(data (read-line inp)))
(match (string-split controldat)
((handlerkey host:port hostname pid qrykey params ...)
(case (string->symbol handlerkey)
((ack)(print "Got ack!"))
((ping)
(let* ((proc (hash-table-ref/default (udat-handlers udata) 'ping #f))
(val (if proc (proc) "gotping")))
(reply udata host:port "version" qrykey val)))
((rucaptain)
(reply udata host:port "iamcaptain" qrykey (if (udat-my-cpkt-key udata)
"yes"
"no")))
(else
(send-ack udata host:port qrykey hostname pid)
(add-to-work-queue udata (get-peer-dat udata host:port) handlerkey data)))
(else (print "BAD DATA? handler=" handlerkey " data=" data)))))
(loop state)))))
;; add a proc to the handler list
(define (register-handler udata key proc)
(hash-table-set! (udat-handlers udata) key proc))
;;======================================================================
;; connection setup and management functions
;;======================================================================
;; find or become the captain, return a ulex object
;;
(define (setup)
(let* ((udata (make-udat))
(cpkts (get-all-captain-pkts udata)) ;; read captain pkts
(captn (get-winning-pkt cpkts)))
(if captn
(let* ((port (alist-ref 'port captn))
(host (alist-ref 'host captn))
(ipaddr (alist-ref 'ipaddr captn))
(pid (alist-ref 'pid captn)))
(udat-captain-address-set! udata ipaddr)
(udat-captain-host-set! udata host)
(udat-captain-port-set! udata port)
(udat-captain-pid-set! udata pid)
;;(if (ping-captain udata)
;; udata
;; (begin
;; (remove-captain-pkt udata captn)
;; (setup)))
udata)
(setup-as-captain udata)) ;; this saves the thread to captain-thread and starts the thread
))
(define (connect udata dbfname)
udata)
) ;; END OF ULEX
;;; ;;======================================================================
;;; ;; D E B U G H E L P E R S
;;; ;;======================================================================
;;;
;;; (define (dbg> . args)
;;; (with-output-to-port (current-error-port)
;;; (lambda ()
;;; (apply print "dbg> " args))))
;;;
;;; (define (debug-pp . args)
;;; (if (get-environment-variable "ULEX_DEBUG")
;;; (with-output-to-port (current-error-port)
;;; (lambda ()
;;; (apply pp args)))))
;;;
;;; (define *default-debug-port* (current-error-port))
;;;
;;; (define (sdbg> fn stage-name stage-start stage-end start-time . message)
;;; (if (get-environment-variable "ULEX_DEBUG")
;;; (with-output-to-port *default-debug-port*
;;; (lambda ()
;;; (apply print "ulex:" fn " " stage-name " took " (- (if stage-end stage-end (current-milliseconds)) stage-start) " ms. "
;;; (if start-time
;;; (conc "total time " (- (current-milliseconds) start-time)
;;; " ms.")
;;; "")
;;; message
;;; )))))
;;======================================================================
;; M A C R O S
;;======================================================================
;; iup callbacks are not dumping the stack, this is a work-around
;;
;; Some of these routines use:
;;
;; http://www.cs.toronto.edu/~gfb/scheme/simple-macros.html
;;
;; Syntax for defining macros in a simple style similar to function definiton,
;; when there is a single pattern for the argument list and there are no keywords.
;;
;; (define-simple-syntax (name arg ...) body ...)
;;
;;
;; (define-syntax define-simple-syntax
;; (syntax-rules ()
;; ((_ (name arg ...) body ...)
;; (define-syntax name (syntax-rules () ((name arg ...) (begin body ...)))))))
;;
;; (define-simple-syntax (catch-and-dump proc procname)
;; (handle-exceptions
;; exn
;; (begin
;; (print-call-chain (current-error-port))
;; (with-output-to-port (current-error-port)
;; (lambda ()
;; (print ((condition-property-accessor 'exn 'message) exn))
;; (print "Callback error in " procname)
;; (print "Full condition info:\n" (condition->list exn)))))
;; (proc)))
;;
;;
;;======================================================================
;; R E C O R D S
;;======================================================================
;;; ;; information about me as a server
;;; ;;
;;; (defstruct area
;;; ;; about this area
;;; (useportlogger #f)
;;; (lowport 32768)
;;; (server-type 'auto) ;; auto=create up to five servers/pkts, main=create pkts, passive=no pkt (unless there are no pkts at all)
;;; (conn #f)
;;; (port #f)
;;; (myaddr (get-my-best-address))
;;; pktid ;; get pkt from hosts table if needed
;;; pktfile
;;; pktsdir
;;; dbdir
;;; (dbhandles (make-hash-table)) ;; fname => list-of-dbh, NOTE: Should really never need more than one?
;;; (mutex (make-mutex))
;;; (rtable (make-hash-table)) ;; registration table of available actions
;;; (dbs (make-hash-table)) ;; filename => random number, used for choosing what dbs I serve
;;; ;; about other servers
;;; (hosts (make-hash-table)) ;; key => hostdat
;;; (hoststats (make-hash-table)) ;; key => alist of fname => ( qcount . qtime )
;;; (reqs (make-hash-table)) ;; uri => queue
;;; ;; work queues
;;; (wqueues (make-hash-table)) ;; fname => qdat
;;; (stats (make-hash-table)) ;; fname => totalqueries
;;; (last-srvup (current-seconds)) ;; last time we updated the known servers
;;; (cookie2mbox (make-hash-table)) ;; map cookie for outstanding request to mailbox of awaiting call
;;; (ready #f)
;;; (health (make-hash-table)) ;; ipaddr:port => num failed pings since last good ping
;;; )
;;;
;;; ;; host stats
;;; ;;
;;; (defstruct hostdat
;;; (pkt #f)
;;; (dbload (make-hash-table)) ;; "dbfile.db" => queries/min
;;; (hostload #f) ;; normalized load ( 5min load / numcpus )
;;; )
;;;
;;; ;; dbdat
;;; ;;
;;; (defstruct dbdat
;;; (dbh #f)
;;; (fname #f)
;;; (write-access #f)
;;; (sths (make-hash-table)) ;; hash mapping query strings to handles
;;; )
;;;
;;; ;; qdat
;;; ;;
;;; (defstruct qdat
;;; (writeq (make-queue))
;;; (readq (make-queue))
;;; (rwq (make-queue))
;;; (logq (make-queue)) ;; do we need a queue for logging? yes, if we use sqlite3 db for logging
;;; (osshort (make-queue))
;;; (oslong (make-queue))
;;; (misc (make-queue)) ;; used for things like ping-full
;;; )
;;;
;;; ;; calldat
;;; ;;
;;; (defstruct calldat
;;; (ctype 'dbwrite)
;;; (obj #f) ;; this would normally be an SQL statement e.g. SELECT, INSERT etc.
;;; (rtime (current-milliseconds)))
;;;
;;; ;; make it a global? Well, it is local to area module
;;;
;;; (define *pktspec*
;;; `((server (hostname . h)
;;; (port . p)
;;; (pid . i)
;;; (ipaddr . a)
;;; )
;;; (data (hostname . h) ;; sender hostname
;;; (port . p) ;; sender port
;;; (ipaddr . a) ;; sender ip
;;; (hostkey . k) ;; sending host key - store info at server under this key
;;; (servkey . s) ;; server key - this needs to match at server end or reject the msg
;;; (format . f) ;; sb=serialized-base64, t=text, sx=sexpr, j=json
;;; (data . d) ;; base64 encoded slln data
;;; )))
;;;
;;; ;; work item
;;; ;;
;;; (defstruct witem
;;; (rhost #f) ;; return host
;;; (ripaddr #f) ;; return ipaddr
;;; (rport #f) ;; return port
;;; (servkey #f) ;; the packet representing the client of this workitem, used by final send-message
;;; (rdat #f) ;; the request - usually an sql query, type is rdat
;;; (action #f) ;; the action: immediate, dbwrite, dbread,oslong, osshort
;;; (cookie #f) ;; cookie id for response
;;; (data #f) ;; the data payload, i.e. parameters
;;; (result #f) ;; the result from processing the data
;;; (caller #f)) ;; the calling peer according to rpc itself
;;;
;;; (define (trim-pktid pktid)
;;; (if (string? pktid)
;;; (substring pktid 0 4)
;;; "nopkt"))
;;;
;;; (define (any->number num)
;;; (cond
;;; ((number? num) num)
;;; ((string? num) (string->number num))
;;; (else num)))
;;;
;;; (use trace)
;;; (trace-call-sites #t)
;;;
;;; ;;======================================================================
;;; ;; D A T A B A S E H A N D L I N G
;;; ;;======================================================================
;;;
;;; ;; look in dbhandles for a db, return it, else return #f
;;; ;;
;;; (define (get-dbh acfg fname)
;;; (let ((dbh-lst (hash-table-ref/default (area-dbhandles acfg) fname '())))
;;; (if (null? dbh-lst)
;;; (begin
;;; ;; (print "opening db for " fname)
;;; (open-db acfg fname)) ;; Note that the handles get put back in the queue in the save-dbh calls
;;; (let ((rem-lst (cdr dbh-lst)))
;;; ;; (print "re-using saved connection for " fname)
;;; (hash-table-set! (area-dbhandles acfg) fname rem-lst)
;;; (car dbh-lst)))))
;;;
;;; (define (save-dbh acfg fname dbdat)
;;; ;; (print "saving dbh for " fname)
;;; (hash-table-set! (area-dbhandles acfg) fname (cons dbdat (hash-table-ref/default (area-dbhandles acfg) fname '()))))
;;;
;;; ;; open the database, if never before opened init it. put the handle in the
;;; ;; open db's hash table
;;; ;; returns: the dbdat
;;; ;;
;;; (define (open-db acfg fname)
;;; (let* ((fullname (conc (area-dbdir acfg) "/" fname))
;;; (exists (file-exists? fullname))
;;; (write-access (if exists
;;; (file-write-access? fullname)
;;; (file-write-access? (area-dbdir acfg))))
;;; (db (sqlite3:open-database fullname))
;;; (handler (sqlite3:make-busy-timeout 136000))
;;; )
;;; (sqlite3:set-busy-handler! db handler)
;;; (sqlite3:execute db "PRAGMA synchronous = 0;")
;;; (if (not exists) ;; need to init the db
;;; (if write-access
;;; (let ((isql (get-rsql acfg 'dbinitsql))) ;; get the init sql statements
;;; ;; (sqlite3:with-transaction
;;; ;; db
;;; ;; (lambda ()
;;; (if isql
;;; (for-each
;;; (lambda (sql)
;;; (sqlite3:execute db sql))
;;; isql)))
;;; (print "ERROR: no write access to " (area-dbdir acfg))))
;;; (make-dbdat dbh: db fname: fname write-access: write-access)))
;;;
;;; ;; This is a low-level command to retrieve or to prepare, save and return a prepared statment
;;; ;; you must extract the db handle
;;; ;;
;;; (define (get-sth db cache stmt)
;;; (if (hash-table-exists? cache stmt)
;;; (begin
;;; ;; (print "Reusing cached stmt for " stmt)
;;; (hash-table-ref/default cache stmt #f))
;;; (let ((sth (sqlite3:prepare db stmt)))
;;; (hash-table-set! cache stmt sth)
;;; ;; (print "prepared stmt for " stmt)
;;; sth)))
;;;
;;; ;; a little more expensive but does all the tedious deferencing - only use if you don't already
;;; ;; have dbdat and db sitting around
;;; ;;
;;; (define (full-get-sth acfg fname stmt)
;;; (let* ((dbdat (get-dbh acfg fname))
;;; (db (dbdat-dbh dbdat))
;;; (sths (dbdat-sths dbdat)))
;;; (get-sth db sths stmt)))
;;;
;;; ;; write to a db
;;; ;; acfg: area data
;;; ;; rdat: request data
;;; ;; hdat: (host . port)
;;; ;;
;;; ;; (define (dbwrite acfg rdat hdat data-in)
;;; ;; (let* ((dbname (car data-in))
;;; ;; (dbdat (get-dbh acfg dbname))
;;; ;; (db (dbdat-dbh dbdat))
;;; ;; (sths (dbdat-sths dbdat))
;;; ;; (stmt (calldat-obj rdat))
;;; ;; (sth (get-sth db sths stmt))
;;; ;; (data (cdr data-in)))
;;; ;; (print "dbname: " dbname " acfg: " acfg " rdat: " (calldat->alist rdat) " hdat: " hdat " data: " data)
;;; ;; (print "dbdat: " (dbdat->alist dbdat))
;;; ;; (apply sqlite3:execute sth data)
;;; ;; (save-dbh acfg dbname dbdat)
;;; ;; #t
;;; ;; ))
;;;
;;; (define (finalize-all-db-handles acfg)
;;; (let* ((dbhandles (area-dbhandles acfg)) ;; dbhandles is hash of fname ==> dbdat
;;; (num 0))
;;; (for-each
;;; (lambda (area-name)
;;; (print "Closing handles for " area-name)
;;; (let ((dbdats (hash-table-ref/default dbhandles area-name '())))
;;; (for-each
;;; (lambda (dbdat)
;;; ;; first close all statement handles
;;; (for-each
;;; (lambda (sth)
;;; (sqlite3:finalize! sth)
;;; (set! num (+ num 1)))
;;; (hash-table-values (dbdat-sths dbdat)))
;;; ;; now close the dbh
;;; (set! num (+ num 1))
;;; (sqlite3:finalize! (dbdat-dbh dbdat)))
;;; dbdats)))
;;; (hash-table-keys dbhandles))
;;; (print "FINALIZED " num " dbhandles")))
;;;
;;; ;;======================================================================
;;; ;; W O R K Q U E U E H A N D L I N G
;;; ;;======================================================================
;;;
;;; (define (register-db-as-mine acfg dbname)
;;; (let ((ht (area-dbs acfg)))
;;; (if (not (hash-table-ref/default ht dbname #f))
;;; (hash-table-set! ht dbname (random 10000)))))
;;;
;;; (define (work-queue-add acfg fname witem)
;;; (let* ((work-queue-start (current-milliseconds))
;;; (action (witem-action witem)) ;; NB the action is the index into the rdat actions
;;; (qdat (or (hash-table-ref/default (area-wqueues acfg) fname #f)
;;; (let ((newqdat (make-qdat)))
;;; (hash-table-set! (area-wqueues acfg) fname newqdat)
;;; newqdat)))
;;; (rdat (hash-table-ref/default (area-rtable acfg) action #f)))
;;; (if rdat
;;; (queue-add!
;;; (case (calldat-ctype rdat)
;;; ((dbwrite) (register-db-as-mine acfg fname)(qdat-writeq qdat))
;;; ((dbread) (register-db-as-mine acfg fname)(qdat-readq qdat))
;;; ((dbrw) (register-db-as-mine acfg fname)(qdat-rwq qdat))
;;; ((oslong) (qdat-oslong qdat))
;;; ((osshort) (qdat-osshort qdat))
;;; ((full-ping) (qdat-misc qdat))
;;; (else
;;; (print "ERROR: no queue for " action ". Adding to dbwrite queue.")
;;; (qdat-writeq qdat)))
;;; witem)
;;; (case action
;;; ((full-ping)(qdat-misc qdat))
;;; (else
;;; (print "ERROR: No action " action " was registered"))))
;;; (sdbg> "work-queue-add" "queue-add" work-queue-start #f #f)
;;; #t)) ;; for now, simply return #t to indicate request got to the queue
;;;
;;; (define (doqueue acfg q fname dbdat dbh)
;;; ;; (print "doqueue: " fname)
;;; (let* ((start-time (current-milliseconds))
;;; (qlen (queue-length q)))
;;; (if (> qlen 1)
;;; (print "Processing queue of length " qlen))
;;; (let loop ((count 0)
;;; (responses '()))
;;; (let ((delta (- (current-milliseconds) start-time)))
;;; (if (or (queue-empty? q)
;;; (> delta 400)) ;; stop working on this queue after 400ms have passed
;;; (list count delta responses) ;; return count, delta and responses list
;;; (let* ((witem (queue-remove! q))
;;; (action (witem-action witem))
;;; (rdat (witem-rdat witem))
;;; (stmt (calldat-obj rdat))
;;; (sth (full-get-sth acfg fname stmt))
;;; (ctype (calldat-ctype rdat))
;;; (data (witem-data witem))
;;; (cookie (witem-cookie witem)))
;;; ;; do the processing and save the result in witem-result
;;; (witem-result-set!
;;; witem
;;; (case ctype ;; action
;;; ((noblockwrite) ;; blind write, no ack of success returned
;;; (apply sqlite3:execute sth data)
;;; (sqlite3:last-insert-rowid dbh))
;;; ((dbwrite) ;; blocking write
;;; (apply sqlite3:execute sth data)
;;; #t)
;;; ((dbread) ;; TODO: consider breaking this up and shipping in pieces for large query
;;; (apply sqlite3:map-row (lambda x x) sth data))
;;; ((full-ping) 'full-ping)
;;; (else (print "Not ready for action " action) #f)))
;;; (loop (add1 count)
;;; (if cookie
;;; (cons witem responses)
;;; responses))))))))
;;;
;;; ;; do up to 400ms of processing on each queue
;;; ;; - the work-queue-processor will allow the max 1200ms of work to complete but it will flag as overloaded
;;; ;;
;;; (define (process-db-queries acfg fname)
;;; (if (hash-table-exists? (area-wqueues acfg) fname)
;;; (let* ((process-db-queries-start-time (current-milliseconds))
;;; (qdat (hash-table-ref/default (area-wqueues acfg) fname #f))
;;; (queue-sym->queue (lambda (queue-sym)
;;; (case queue-sym ;; lookup the queue from qdat given a name (symbol)
;;; ((wqueue) (qdat-writeq qdat))
;;; ((rqueue) (qdat-readq qdat))
;;; ((rwqueue) (qdat-rwq qdat))
;;; ((misc) (qdat-misc qdat))
;;; (else #f))))
;;; (dbdat (get-dbh acfg fname))
;;; (dbh (if (dbdat? dbdat)(dbdat-dbh dbdat) #f))
;;; (nowtime (current-seconds)))
;;; ;; handle the queues that require a transaction
;;; ;;
;;; (map ;;
;;; (lambda (queue-sym)
;;; ;; (print "processing queue " queue-sym)
;;; (let* ((queue (queue-sym->queue queue-sym)))
;;; (if (not (queue-empty? queue))
;;; (let ((responses
;;; (sqlite3:with-transaction ;; todo - catch exceptions...
;;; dbh
;;; (lambda ()
;;; (let* ((res (doqueue acfg queue fname dbdat dbh))) ;; this does the work!
;;; ;; (print "res=" res)
;;; (match res
;;; ((count delta responses)
;;; (update-stats acfg fname queue-sym delta count)
;;; (sdbg> "process-db-queries" "sqlite3-transaction" process-db-queries-start-time #f #f)
;;; responses) ;; return responses
;;; (else
;;; (print "ERROR: bad return data from doqueue " res)))
;;; )))))
;;; ;; having completed the transaction, send the responses.
;;; ;; (print "INFO: sending " (length responses) " responses.")
;;; (let loop ((responses-left responses))
;;; (cond
;;; ((null? responses-left) #t)
;;; (else
;;; (let* ((witem (car responses-left))
;;; (response (cdr responses-left)))
;;; (call-deliver-response acfg (witem-ripaddr witem)(witem-rport witem)
;;; (witem-cookie witem)(witem-result witem)))
;;; (loop (cdr responses-left))))))
;;; )))
;;; '(wqueue rwqueue rqueue))
;;;
;;; ;; handle misc queue
;;; ;;
;;; ;; (print "processing misc queue")
;;; (let ((queue (queue-sym->queue 'misc)))
;;; (doqueue acfg queue fname dbdat dbh))
;;; ;; ....
;;; (save-dbh acfg fname dbdat)
;;; #t ;; just to let the tests know we got here
;;; )
;;; #f ;; nothing processed
;;; ))
;;;
;;; ;; run all queues in parallel per db but sequentially per queue for that db.
;;; ;; - process the queues every 500 or so ms
;;; ;; - allow for long running queries to continue but all other activities for that
;;; ;; db will be blocked.
;;; ;;
;;; (define (work-queue-processor acfg)
;;; (let* ((threads (make-hash-table))) ;; fname => thread
;;; (let loop ((fnames (hash-table-keys (area-wqueues acfg)))
;;; (target-time (+ (current-milliseconds) 50)))
;;; ;;(if (not (null? fnames))(print "Processing for these databases: " fnames))
;;; (for-each
;;; (lambda (fname)
;;; ;; (print "processing for " fname)
;;; ;;(process-db-queries acfg fname))
;;; (let ((th (hash-table-ref/default threads fname #f)))
;;; (if (and th (not (member (thread-state th) '(dead terminated))))
;;; (begin
;;; (print "WARNING: worker thread for " fname " is taking a long time.")
;;; (print "Thread is in state " (thread-state th)))
;;; (let ((th1 (make-thread (lambda ()
;;; ;; (catch-and-dump
;;; ;; (lambda ()
;;; ;; (print "Process queries for " fname)
;;; (let ((start-time (current-milliseconds)))
;;; (process-db-queries acfg fname)
;;; ;; (thread-sleep! 0.01) ;; need the thread to take at least some time
;;; (hash-table-delete! threads fname)) ;; no mutexes?
;;; fname)
;;; "th1"))) ;; ))
;;; (hash-table-set! threads fname th1)
;;; (thread-start! th1)))))
;;; fnames)
;;; ;; (thread-sleep! 0.1) ;; give the threads some time to process requests
;;; ;; burn time until 400ms is up
;;; (let ((now-time (current-milliseconds)))
;;; (if (< now-time target-time)
;;; (let ((delta (- target-time now-time)))
;;; (thread-sleep! (/ delta 1000)))))
;;; (loop (hash-table-keys (area-wqueues acfg))
;;; (+ (current-milliseconds) 50)))))
;;;
;;; ;;======================================================================
;;; ;; S T A T S G A T H E R I N G
;;; ;;======================================================================
;;;
;;; (defstruct stat
;;; (qcount-avg 0) ;; coarse running average
;;; (qtime-avg 0) ;; coarse running average
;;; (qcount 0) ;; total
;;; (qtime 0) ;; total
;;; (last-qcount 0) ;; last
;;; (last-qtime 0) ;; last
;;; (dbs '()) ;; list of db files handled by this node
;;; (when 0)) ;; when the last query happened - seconds
;;;
;;;
;;; (define (update-stats acfg fname bucket duration numqueries)
;;; (let* ((key fname) ;; for now do not use bucket. Was: (conc fname "-" bucket)) ;; lazy but good enough
;;; (stats (or (hash-table-ref/default (area-stats acfg) key #f)
;;; (let ((newstats (make-stat)))
;;; (hash-table-set! (area-stats acfg) key newstats)
;;; newstats))))
;;; ;; when the last query happended (used to remove the fname from the active list)
;;; (stat-when-set! stats (current-seconds))
;;; ;; last values
;;; (stat-last-qcount-set! stats numqueries)
;;; (stat-last-qtime-set! stats duration)
;;; ;; total over process lifetime
;;; (stat-qcount-set! stats (+ (stat-qcount stats) numqueries))
;;; (stat-qtime-set! stats (+ (stat-qtime stats) duration))
;;; ;; coarse average
;;; (stat-qcount-avg-set! stats (/ (+ (stat-qcount-avg stats) numqueries) 2))
;;; (stat-qtime-avg-set! stats (/ (+ (stat-qtime-avg stats) duration) 2))
;;;
;;; ;; here is where we add the stats for a given dbfile
;;; (if (not (member fname (stat-dbs stats)))
;;; (stat-dbs-set! stats (cons fname (stat-dbs stats))))
;;;
;;; ))
;;;
;;; ;;======================================================================
;;; ;; S E R V E R S T U F F
;;; ;;======================================================================
;;;
;;; ;; this does NOT return!
;;; ;;
;;; (define (find-free-port-and-open acfg)
;;; (let ((port (or (area-port acfg) 3200)))
;;; (handle-exceptions
;;; exn
;;; (begin
;;; (print "INFO: cannot bind to port " (rpc:default-server-port) ", trying next port")
;;; (area-port-set! acfg (+ port 1))
;;; (find-free-port-and-open acfg))
;;; (rpc:default-server-port port)
;;; (area-port-set! acfg port)
;;; (tcp-read-timeout 120000)
;;; ;; ((rpc:make-server (tcp-listen port)) #t)
;;; (tcp-listen (rpc:default-server-port)
;;; ))))
;;;
;;; ;; register this node by putting a packet into the pkts dir.
;;; ;; look for other servers
;;; ;; contact other servers and compile list of servers
;;; ;; there are two types of server
;;; ;; main servers - dashboards, runners and dedicated servers - need pkt
;;; ;; passive servers - test executers, step calls, list-runs - no pkt
;;; ;;
;;; (define (register-node acfg hostip port-num)
;;; ;;(mutex-lock! (area-mutex acfg))
;;; (let* ((server-type (area-server-type acfg)) ;; auto, main, passive (no pkt created)
;;; (best-ip (or hostip (get-my-best-address)))
;;; (mtdir (area-dbdir acfg))
;;; (pktdir (area-pktsdir acfg))) ;; conc mtdir "/.server-pkts")))
;;; (print "Registering node " best-ip ":" port-num)
;;; (if (not mtdir) ;; require a home for this node to put or find databases
;;; #f
;;; (begin
;;; (if (not (directory? pktdir))(create-directory pktdir))
;;; ;; server is started, now create pkt if needed
;;; (print "Starting server in " server-type " mode with port " port-num)
;;; (if (member server-type '(auto main)) ;; TODO: if auto, count number of servers registers, if > 3 then don't put out a pkt
;;; (begin
;;; (area-pktid-set! acfg
;;; (write-alist->pkt
;;; pktdir
;;; `((hostname . ,(get-host-name))
;;; (ipaddr . ,best-ip)
;;; (port . ,port-num)
;;; (pid . ,(current-process-id)))
;;; pktspec: *pktspec*
;;; ptype: 'server))
;;; (area-pktfile-set! acfg (conc pktdir "/" (area-pktid acfg) ".pkt"))))
;;; (area-port-set! acfg port-num)
;;; #;(mutex-unlock! (area-mutex acfg))))))
;;;
;;; (define *cookie-seqnum* 0)
;;; (define (make-cookie key)
;;; (set! *cookie-seqnum* (add1 *cookie-seqnum*))
;;; ;;(print "MAKE COOKIE CALLED -- on "servkey"-"*cookie-seqnum*)
;;; (conc key "-" *cookie-seqnum*)
;;; )
;;;
;;; ;; dispatch locally if possible
;;; ;;
;;; (define (call-deliver-response acfg ipaddr port cookie data)
;;; (if (and (equal? (area-myaddr acfg) ipaddr)
;;; (equal? (area-port acfg) port))
;;; (deliver-response acfg cookie data)
;;; ((rpc:procedure 'response ipaddr port) cookie data)))
;;;
;;; (define (deliver-response acfg cookie data)
;;; (let ((deliver-response-start (current-milliseconds)))
;;; (thread-start! (make-thread
;;; (lambda ()
;;; (let loop ((tries-left 5))
;;; ;;(print "TOP OF DELIVER_RESPONSE LOOP; triesleft="tries-left)
;;; ;;(pp (hash-table->alist (area-cookie2mbox acfg)))
;;; (let* ((mbox (hash-table-ref/default (area-cookie2mbox acfg) cookie #f)))
;;; (cond
;;; ((eq? 0 tries-left)
;;; (print "ulex:deliver-response: I give up. Mailbox never appeared. cookie="cookie)
;;; )
;;; (mbox
;;; ;;(print "got mbox="mbox" got data="data" send.")
;;; (mailbox-send! mbox data))
;;; (else
;;; ;;(print "no mbox yet. look for "cookie)
;;; (thread-sleep! (/ (- 6 tries-left) 10))
;;; (loop (sub1 tries-left))))))
;;; ;; (debug-pp (list (conc "ulex:deliver-response took " (- (current-milliseconds) deliver-response-start) " ms, cookie=" cookie " data=") data))
;;; (sdbg> "deliver-response" "mailbox-send" deliver-response-start #f #f cookie)
;;; )
;;; (conc "deliver-response thread for cookie="cookie))))
;;; #t)
;;;
;;; ;; action:
;;; ;; immediate - quick actions, no need to put in queues
;;; ;; dbwrite - put in dbwrite queue
;;; ;; dbread - put in dbread queue
;;; ;; oslong - os actions, e.g. du, that could take a long time
;;; ;; osshort - os actions that should be quick, e.g. df
;;; ;;
;;; (define (request acfg from-ipaddr from-port servkey action cookie fname params) ;; std-peer-handler
;;; ;; NOTE: Use rpc:current-peer for getting return address
;;; (let* ((std-peer-handler-start (current-milliseconds))
;;; ;; (raw-data (alist-ref 'data dat))
;;; (rdat (hash-table-ref/default
;;; (area-rtable acfg) action #f)) ;; this looks up the sql query or other details indexed by the action
;;; (witem (make-witem ripaddr: from-ipaddr ;; rhost: from-host
;;; rport: from-port action: action
;;; rdat: rdat cookie: cookie
;;; servkey: servkey data: params ;; TODO - rename data to params
;;; caller: (rpc:current-peer))))
;;; (if (not (equal? servkey (area-pktid acfg)))
;;; `(#f . ,(conc "I don't know you servkey=" servkey ", pktid=" (area-pktid acfg))) ;; immediately return this
;;; (let* ((ctype (if rdat
;;; (calldat-ctype rdat) ;; is this necessary? these should be identical
;;; action)))
;;; (sdbg> "std-peer-handler" "immediate" std-peer-handler-start #f #f)
;;; (case ctype
;;; ;; (dbwrite acfg rdat (cons from-ipaddr from-port) data)))
;;; ((full-ping) `(#t "ack to full ping" ,(work-queue-add acfg fname witem) ,cookie))
;;; ((response) `(#t "ack from requestor" ,(deliver-response acfg fname params)))
;;; ((dbwrite) `(#t "db write submitted" ,(work-queue-add acfg fname witem) ,cookie))
;;; ((dbread) `(#t "db read submitted" ,(work-queue-add acfg fname witem) ,cookie ))
;;; ((dbrw) `(#t "db read/write submitted" ,cookie))
;;; ((osshort) `(#t "os short submitted" ,cookie))
;;; ((oslong) `(#t "os long submitted" ,cookie))
;;; (else `(#f "unrecognised action" ,ctype)))))))
;;;
;;; ;; Call this to start the actual server
;;; ;;
;;; ;; start_server
;;; ;;
;;; ;; mode: '
;;; ;; handler: proc which takes pktrecieved as argument
;;; ;;
;;;
;;; (define (start-server acfg)
;;; (let* ((conn (find-free-port-and-open acfg))
;;; (port (area-port acfg)))
;;; (rpc:publish-procedure!
;;; 'delist-db
;;; (lambda (fname)
;;; (hash-table-delete! (area-dbs acfg) fname)))
;;; (rpc:publish-procedure!
;;; 'calling-addr
;;; (lambda ()
;;; (rpc:current-peer)))
;;; (rpc:publish-procedure!
;;; 'ping
;;; (lambda ()(real-ping acfg)))
;;; (rpc:publish-procedure!
;;; 'request
;;; (lambda (from-addr from-port servkey action cookie dbname params)
;;; (request acfg from-addr from-port servkey action cookie dbname params)))
;;; (rpc:publish-procedure!
;;; 'response
;;; (lambda (cookie res-dat)
;;; (deliver-response acfg cookie res-dat)))
;;; (area-ready-set! acfg #t)
;;; (area-conn-set! acfg conn)
;;; ((rpc:make-server conn) #f)));; ((tcp-listen (rpc:default-server-port)) #t)
;;;
;;;
;;; (define (launch acfg) ;; #!optional (proc std-peer-handler))
;;; (print "starting launch")
;;; (update-known-servers acfg) ;; gotta do this on every start (thus why limit number of publicised servers)
;;; #;(let ((original-handler (current-exception-handler))) ;; is th
;;; (lambda (exception)
;;; (server-exit-procedure)
;;; (original-handler exception)))
;;; (on-exit (lambda ()
;;; (shutdown acfg))) ;; (finalize-all-db-handles acfg)))
;;; ;; set up the rpc handler
;;; (let* ((th1 (make-thread
;;; (lambda ()(start-server acfg))
;;; "server thread"))
;;; (th2 (make-thread
;;; (lambda ()
;;; (print "th2 starting")
;;; (let loop ()
;;; (work-queue-processor acfg)
;;; (print "work-queue-processor crashed!")
;;; (loop)))
;;; "work queue thread")))
;;; (thread-start! th1)
;;; (thread-start! th2)
;;; (let loop ()
;;; (thread-sleep! 0.025)
;;; (if (area-ready acfg)
;;; #t
;;; (loop)))
;;; ;; attempt to fix my address
;;; (let* ((all-addr (get-all-ips-sorted))) ;; could use (tcp-addresses conn)?
;;; (let loop ((rem-addrs all-addr))
;;; (if (null? rem-addrs)
;;; (begin
;;; (print "ERROR: Failed to figure out the ip address of myself as a server. Giving up.")
;;; (exit 1)) ;; BUG Changeme to raising an exception
;;;
;;; (let* ((addr (car rem-addrs))
;;; (good-addr (handle-exceptions
;;; exn
;;; #f
;;; ((rpc:procedure 'calling-addr addr (area-port acfg))))))
;;; (if good-addr
;;; (begin
;;; (print "Got good-addr of " good-addr)
;;; (area-myaddr-set! acfg good-addr))
;;; (loop (cdr rem-addrs)))))))
;;; (register-node acfg (area-myaddr acfg)(area-port acfg))
;;; (print "INFO: Server started on " (area-myaddr acfg) ":" (area-port acfg))
;;; ;; (update-known-servers acfg) ;; gotta do this on every start (thus why limit number of publicised servers)
;;; ))
;;;
;;; (define (clear-server-pkt acfg)
;;; (let ((pktf (area-pktfile acfg)))
;;; (if pktf (delete-file* pktf))))
;;;
;;; (define (shutdown acfg)
;;; (let (;;(conn (area-conn acfg))
;;; (pktf (area-pktfile acfg))
;;; (port (area-port acfg)))
;;; (if pktf (delete-file* pktf))
;;; (send-all "imshuttingdown")
;;; ;; (rpc:close-all-connections!) ;; don't know if this is actually needed
;;; (finalize-all-db-handles acfg)))
;;;
;;; (define (send-all msg)
;;; #f)
;;;
;;; ;; given a area record look up all the packets
;;; ;;
;;; (define (get-all-server-pkts acfg)
;;; (let ((all-pkt-files (glob (conc (area-pktsdir acfg) "/*.pkt"))))
;;; (map (lambda (pkt-file)
;;; (read-pkt->alist pkt-file pktspec: *pktspec*))
;;; all-pkt-files)))
;;;
;;; #;((Z . "9a0212302295a19610d5796fce0370fa130758e9")
;;; (port . "34827")
;;; (pid . "28748")
;;; (hostname . "zeus")
;;; (T . "server")
;;; (D . "1549427032.0"))
;;;
;;; #;(define (get-my-best-address)
;;; (let ((all-my-addresses (get-all-ips))) ;; (vector->list (hostinfo-addresses (hostname->hostinfo (get-host-name))))))
;;; (cond
;;; ((null? all-my-addresses)
;;; (get-host-name)) ;; no interfaces?
;;; ((eq? (length all-my-addresses) 1)
;;; (ip->string (car all-my-addresses))) ;; only one to choose from, just go with it
;;; (else
;;; (ip->string (car (filter (lambda (x) ;; take any but 127.
;;; (not (eq? (u8vector-ref x 0) 127)))
;;; all-my-addresses)))))))
;;;
;;; ;; whoami? I am my pkt
;;; ;;
;;; (define (whoami? acfg)
;;; (hash-table-ref/default (area-hosts acfg)(area-pktid acfg) #f))
;;;
;;; ;;======================================================================
;;; ;; "Client side" operations
;;; ;;======================================================================
;;;
;;; (define (safe-call call-key host port . params)
;;; (handle-exceptions
;;; exn
;;; (begin
;;; (print "Call " call-key " to " host ":" port " failed")
;;; #f)
;;; (apply (rpc:procedure call-key host port) params)))
;;;
;;; ;; ;; convert to/from string / sexpr
;;; ;;
;;; ;; (define (string->sexpr str)
;;; ;; (if (string? str)
;;; ;; (with-input-from-string str read)
;;; ;; str))
;;; ;;
;;; ;; (define (sexpr->string s)
;;; ;; (with-output-to-string (lambda ()(write s))))
;;;
;;; ;; is the server alive?
;;; ;;
;;; (define (ping acfg host port)
;;; (let* ((myaddr (area-myaddr acfg))
;;; (myport (area-port acfg))
;;; (start-time (current-milliseconds))
;;; (res (if (and (equal? myaddr host)
;;; (equal? myport port))
;;; (real-ping acfg)
;;; ((rpc:procedure 'ping host port)))))
;;; (cons (- (current-milliseconds) start-time)
;;; res)))
;;;
;;; ;; returns ( ipaddr port alist-fname=>randnum )
;;; (define (real-ping acfg)
;;; `(,(area-myaddr acfg) ,(area-port acfg) ,(get-host-stats acfg)))
;;;
;;; ;; is the server alive AND the queues processing?
;;; ;;
;;; #;(define (full-ping acfg servpkt)
;;; (let* ((start-time (current-milliseconds))
;;; (res (send-message acfg servpkt '(full-ping) 'full-ping)))
;;; (cons (- (current-milliseconds) start-time)
;;; res))) ;; (equal? res "got ping"))))
;;;
;;;
;;; ;; look up all pkts and get the server id (the hash), port, host/ip
;;; ;; store this info in acfg
;;; ;; return the number of responsive servers found
;;; ;;
;;; ;; DO NOT VERIFY THAT THE SERVER IS ALIVE HERE. This is called at times where the current server is not yet alive and cannot ping itself
;;; ;;
;;; (define (update-known-servers acfg)
;;; ;; readll all pkts
;;; ;; foreach pkt; if it isn't me ping the server; if alive, add to hosts hash, else rm the pkt
;;; (let* ((start-time (current-milliseconds))
;;; (all-pkts (delete-duplicates
;;; (append (get-all-server-pkts acfg)
;;; (hash-table-values (area-hosts acfg)))))
;;; (hostshash (area-hosts acfg))
;;; (my-id (area-pktid acfg))
;;; (pktsdir (area-pktsdir acfg)) ;; needed to remove pkts from non-responsive servers
;;; (numsrvs 0)
;;; (delpkt (lambda (pktsdir sid)
;;; (print "clearing out server " sid)
;;; (delete-file* (conc pktsdir "/" sid ".pkt"))
;;; (hash-table-delete! hostshash sid))))
;;; (area-last-srvup-set! acfg (current-seconds))
;;; (for-each
;;; (lambda (servpkt)
;;; (if (list? servpkt)
;;; ;; (pp servpkt)
;;; (let* ((shost (alist-ref 'ipaddr servpkt))
;;; (sport (any->number (alist-ref 'port servpkt)))
;;; (res (handle-exceptions
;;; exn
;;; (begin
;;; ;; (print "INFO: bad server on " shost ":" sport)
;;; #f)
;;; (ping acfg shost sport)))
;;; (sid (alist-ref 'Z servpkt)) ;; Z code is our name for the server
;;; (url (conc shost ":" sport))
;;; )
;;; #;(if (or (not res)
;;; (null? res))
;;; (begin
;;; (print "STRANGE: ping of " url " gave " res)))
;;;
;;; ;; (print "Got " res " from " shost ":" sport)
;;; (match res
;;; ((qduration . payload)
;;; ;; (print "Server pkt:" (alist-ref 'ipaddr servpkt) ":" (alist-ref 'port servpkt)
;;; ;; (if payload
;;; ;; "Success" "Fail"))
;;; (match payload
;;; ((host port stats)
;;; ;; (print "From " host ":" port " got stats: " stats)
;;; (if (and host port stats)
;;; (let ((url (conc host ":" port)))
;;; (hash-table-set! hostshash sid servpkt)
;;; ;; store based on host:port
;;; (hash-table-set! (area-hoststats acfg) sid stats))
;;; (print "missing data from the server, not sure what that means!"))
;;; (set! numsrvs (+ numsrvs 1)))
;;; (#f
;;; (print "Removing pkt " sid " due to #f from server or failed ping")
;;; (delpkt pktsdir sid))
;;; (else
;;; (print "Got ")(pp res)(print " from server ")(pp servpkt) " but response did not match (#f/#t . msg)")))
;;; (else
;;; ;; here we delete the pkt - can't reach the server, remove it
;;; ;; however this logic is inadequate. we should mark the server as checked
;;; ;; and not good, if it happens a second time - then remove the pkt
;;; ;; or something similar. I.e. don't be too quick to assume the server is wedged or dead
;;; ;; could be it is simply too busy to reply
;;; (let ((bad-pings (hash-table-ref/default (area-health acfg) url 0)))
;;; (if (> bad-pings 1) ;; two bad pings - remove pkt
;;; (begin
;;; (print "INFO: " bad-pings " bad responses from " url ", deleting pkt " sid)
;;; (delpkt pktsdir sid))
;;; (begin
;;; (print "INFO: " bad-pings " bad responses from " shost ":" sport " not deleting pkt yet")
;;; (hash-table-set! (area-health acfg)
;;; url
;;; (+ (hash-table-ref/default (area-health acfg) url 0) 1))
;;; ))
;;; ))))
;;; ;; servpkt is not actually a pkt?
;;; (begin
;;; (print "Bad pkt " servpkt))))
;;; all-pkts)
;;; (sdbg> "update-known-servers" "end" start-time #f #f " found " numsrvs
;;; " servers, pkts: " (map (lambda (p)
;;; (alist-ref 'Z p))
;;; all-pkts))
;;; numsrvs))
;;;
;;; (defstruct srvstat
;;; (numfiles 0) ;; number of db files handled by this server - subtract 1 for the db being currently looked at
;;; (randnum #f) ;; tie breaker number assigned to by the server itself - applies only to the db under consideration
;;; (pkt #f)) ;; the server pkt
;;;
;;; ;;(define (srv->srvstat srvpkt)
;;;
;;; ;; Get the server best for given dbname and key
;;; ;;
;;; ;; NOTE: key is not currently used. The key points to the kind of query, this may be useful for directing read-only queries.
;;; ;;
;;; (define (get-best-server acfg dbname key)
;;; (let* (;; (servers (hash-table-values (area-hosts acfg)))
;;; (servers (area-hosts acfg))
;;; (skeys (sort (hash-table-keys servers) string>=?)) ;; a stable listing
;;; (start-time (current-milliseconds))
;;; (srvstats (make-hash-table)) ;; srvid => srvstat
;;; (url (conc (area-myaddr acfg) ":" (area-port acfg))))
;;; ;; (print "scores for " dbname ": " (map (lambda (k)(cons k (calc-server-score acfg dbname k))) skeys))
;;; (if (null? skeys)
;;; (if (> (update-known-servers acfg) 0)
;;; (get-best-server acfg dbname key) ;; some risk of infinite loop here, TODO add try counter
;;; (begin
;;; (print "ERROR: no server found!") ;; since this process is also a server this should never happen
;;; #f))
;;; (begin
;;; ;; (print "in get-best-server with skeys=" skeys)
;;; (if (> (- (current-seconds) (area-last-srvup acfg)) 10)
;;; (begin
;;; (update-known-servers acfg)
;;; (sdbg> "get-best-server" "update-known-servers" start-time #f #f)))
;;;
;;; ;; for each server look at the list of dbfiles, total number of dbs being handled
;;; ;; and the rand number, save the best host
;;; ;; also do a delist-db for each server dbfile not used
;;; (let* ((best-server #f)
;;; (servers-to-delist (make-hash-table)))
;;; (for-each
;;; (lambda (srvid)
;;; (let* ((server (hash-table-ref/default servers srvid #f))
;;; (stats (hash-table-ref/default (area-hoststats acfg) srvid '(()))))
;;; ;; (print "stats: " stats)
;;; (if server
;;; (let* ((dbweights (car stats))
;;; (srvload (length (filter (lambda (x)(not (equal? dbname (car x)))) dbweights)))
;;; (dbrec (alist-ref dbname dbweights equal?)) ;; get the pair with fname . randscore
;;; (randnum (if dbrec
;;; dbrec ;; (cdr dbrec)
;;; 0)))
;;; (hash-table-set! srvstats srvid (make-srvstat numfiles: srvload randnum: randnum pkt: server))))))
;;; skeys)
;;;
;;; (let* ((sorted (sort (hash-table-values srvstats)
;;; (lambda (a b)
;;; (let ((numfiles-a (srvstat-numfiles a))
;;; (numfiles-b (srvstat-numfiles b))
;;; (randnum-a (srvstat-randnum a))
;;; (randnum-b (srvstat-randnum b)))
;;; (if (< numfiles-a numfiles-b) ;; Note, I don't think adding an offset works here. Goal was only move file handling to a different server if it has 2 less
;;; #t
;;; (if (and (equal? numfiles-a numfiles-b)
;;; (< randnum-a randnum-b))
;;; #t
;;; #f))))))
;;; (best (if (null? sorted)
;;; (begin
;;; (print "ERROR: should never be null due to self as server.")
;;; #f)
;;; (srvstat-pkt (car sorted)))))
;;; #;(print "SERVER(" url "): " dbname ": " (map (lambda (srv)
;;; (let ((p (srvstat-pkt srv)))
;;; (conc (alist-ref 'ipaddr p) ":" (alist-ref 'port p)
;;; "(" (srvstat-numfiles srv)","(srvstat-randnum srv)")")))
;;; sorted))
;;; best))))))
;;;
;;; ;; send out an "I'm about to exit notice to all known servers"
;;; ;;
;;; (define (death-imminent acfg)
;;; '())
;;;
;;; ;;======================================================================
;;; ;; U L E X - T H E I N T E R E S T I N G S T U F F ! !
;;; ;;======================================================================
;;;
;;; ;; register a handler
;;; ;; NOTES:
;;; ;; dbinitsql is reserved for a list of sql statements for initializing the db
;;; ;; dbinitfn is reserved for a db init function, if exists called after dbinitsql
;;; ;;
;;; (define (register acfg key obj #!optional (ctype 'dbwrite))
;;; (let ((ht (area-rtable acfg)))
;;; (if (hash-table-exists? ht key)
;;; (print "WARNING: redefinition of entry " key))
;;; (hash-table-set! ht key (make-calldat obj: obj ctype: ctype))))
;;;
;;; ;; usage: register-batch acfg '((key1 . sql1) (key2 . sql2) ... )
;;; ;; NB// obj is often an sql query
;;; ;;
;;; (define (register-batch acfg ctype data)
;;; (let ((ht (area-rtable acfg)))
;;; (map (lambda (dat)
;;; (hash-table-set! ht (car dat)(make-calldat obj: (cdr dat) ctype: ctype)))
;;; data)))
;;;
;;; (define (initialize-area-calls-from-specfile area specfile)
;;; (let* ((callspec (with-input-from-file specfile read )))
;;; (for-each (lambda (group)
;;; (register-batch
;;; area
;;; (car group)
;;; (cdr group)))
;;; callspec)))
;;;
;;; ;; get-rentry
;;; ;;
;;; (define (get-rentry acfg key)
;;; (hash-table-ref/default (area-rtable acfg) key #f))
;;;
;;; (define (get-rsql acfg key)
;;; (let ((cdat (get-rentry acfg key)))
;;; (if cdat
;;; (calldat-obj cdat)
;;; #f)))
;;;
;;;
;;;
;;; ;; blocking call:
;;; ;; client server
;;; ;; ------ ------
;;; ;; call()
;;; ;; send-message()
;;; ;; nmsg-send()
;;; ;; nmsg-receive()
;;; ;; nmsg-respond(ack,cookie)
;;; ;; ack, cookie
;;; ;; mbox-thread-wait(cookie)
;;; ;; nmsg-send(client,cookie,result)
;;; ;; nmsg-respond(ack)
;;; ;; return result
;;; ;;
;;; ;; reserved action:
;;; ;; 'immediate
;;; ;; 'dbinitsql
;;; ;;
;;; (define (call acfg dbname action params #!optional (count 0))
;;; (let* ((call-start-time (current-milliseconds))
;;; (srv (get-best-server acfg dbname action))
;;; (post-get-start-time (current-milliseconds))
;;; (rdat (hash-table-ref/default (area-rtable acfg) action #f))
;;; (myid (trim-pktid (area-pktid acfg)))
;;; (srvid (trim-pktid (alist-ref 'Z srv)))
;;; (cookie (make-cookie myid)))
;;; (sdbg> "call" "get-best-server" call-start-time #f call-start-time " from: " myid " to server: " srvid " for " dbname " action: " action " params: " params " rdat: " rdat)
;;; (print "INFO: call to " (alist-ref 'ipaddr srv) ":" (alist-ref 'port srv) " from " (area-myaddr acfg) ":" (area-port acfg) " for " dbname)
;;; (if (and srv rdat) ;; need both to dispatch a request
;;; (let* ((ripaddr (alist-ref 'ipaddr srv))
;;; (rsrvid (alist-ref 'Z srv))
;;; (rport (any->number (alist-ref 'port srv)))
;;; (res-full (if (and (equal? ripaddr (area-myaddr acfg))
;;; (equal? rport (area-port acfg)))
;;; (request acfg ripaddr rport (area-pktid acfg) action cookie dbname params)
;;; (safe-call 'request ripaddr rport
;;; (area-myaddr acfg)
;;; (area-port acfg)
;;; #;(area-pktid acfg)
;;; rsrvid
;;; action cookie dbname params))))
;;; ;; (print "res-full: " res-full)
;;; (match res-full
;;; ((response-ok response-msg rem ...)
;;; (let* ((send-message-time (current-milliseconds))
;;; ;; (match res-full
;;; ;; ((response-ok response-msg)
;;; ;; (response-ok (car res-full))
;;; ;; (response-msg (cadr res-full)
;;; )
;;; ;; (res (take res-full 3))) ;; ctype == action, TODO: converge on one term <<=== what was this? BUG
;;; ;; (print "ulex:call: send-message took " (- send-message-time post-get-start-time) " ms params=" params)
;;; (sdbg> "call" "send-message" post-get-start-time #f call-start-time)
;;; (cond
;;; ((not response-ok) #f)
;;; ((member response-msg '("db read submitted" "db write submitted"))
;;; (let* ((cookie-id (cadddr res-full))
;;; (mbox (make-mailbox))
;;; (mbox-time (current-milliseconds)))
;;; (hash-table-set! (area-cookie2mbox acfg) cookie-id mbox)
;;; (let* ((mbox-timeout-secs 20)
;;; (mbox-timeout-result 'MBOX_TIMEOUT)
;;; (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result))
;;; (mbox-receive-time (current-milliseconds)))
;;; (hash-table-delete! (area-cookie2mbox acfg) cookie-id)
;;; (sdbg> "call" "mailbox-receive" mbox-time #f call-start-time " from: " myid " to server: " srvid " for " dbname)
;;; ;; (print "ulex:call mailbox-receive took " (- mbox-receive-time mbox-time) "ms params=" params)
;;; res)))
;;; (else
;;; (print "Unhandled response \""response-msg"\"")
;;; #f))
;;; ;; depending on what action (i.e. ctype) is we will block here waiting for
;;; ;; all the data (mechanism to be determined)
;;; ;;
;;; ;; if res is a "working on it" then wait
;;; ;; wait for result
;;; ;; mailbox thread wait on
;;;
;;; ;; if res is a "can't help you" then try a different server
;;; ;; if res is a "ack" (e.g. for one-shot requests) then return res
;;; ))
;;; (else
;;; (if (< count 10)
;;; (let* ((url (conc (alist-ref 'ipaddr srv) ":" (alist-ref 'port srv))))
;;; (thread-sleep! 1)
;;; (print "ERROR: Bad result from " url ", dbname: " dbname ", action: " action ", params: " params ". Trying again in 1 second.")
;;; (call acfg dbname action params (+ count 1)))
;;; (begin
;;; (error (conc "ERROR: " count " tries, still have improper response res-full=" res-full)))))))
;;; (begin
;;; (if (not rdat)
;;; (print "ERROR: action " action " not registered.")
;;; (if (< count 10)
;;; (begin
;;; (thread-sleep! 1)
;;; (area-hosts-set! acfg (make-hash-table)) ;; clear out all known hosts
;;; (print "ERROR: no server found, srv=" srv ", trying again in 1 seconds")
;;; (call acfg dbname action params (+ count 1)))
;;; (begin
;;; (error (conc "ERROR: no server found after 10 tries, srv=" srv ", giving up."))
;;; #;(error "No server available"))))))))
;;;
;;;
;;; ;;======================================================================
;;; ;; U T I L I T I E S
;;; ;;======================================================================
;;;
;;; ;; get a signature for identifing this process
;;; ;;
;;; (define (get-process-signature)
;;; (cons (get-host-name)(current-process-id)))
;;;
;;; ;;======================================================================
;;; ;; S Y S T E M S T U F F
;;; ;;======================================================================
;;;
;;; ;; get normalized cpu load by reading from /proc/loadavg and
;;; ;; /proc/cpuinfo return all three values and the number of real cpus
;;; ;; and the number of threads returns alist '((adj-cpu-load
;;; ;; . normalized-proc-load) ... etc. keys: adj-proc-load,
;;; ;; adj-core-load, 1m-load, 5m-load, 15m-load
;;; ;;
;;; (define (get-normalized-cpu-load)
;;; (let ((res (get-normalized-cpu-load-raw))
;;; (default `((adj-proc-load . 2) ;; there is no right answer
;;; (adj-core-load . 2)
;;; (1m-load . 2)
;;; (5m-load . 0) ;; causes a large delta - thus causing default of throttling if stuff goes wrong
;;; (15m-load . 0)
;;; (proc . 1)
;;; (core . 1)
;;; (phys . 1)
;;; (error . #t))))
;;; (cond
;;; ((and (list? res)
;;; (> (length res) 2))
;;; res)
;;; ((eq? res #f) default) ;; add messages?
;;; ((eq? res #f) default) ;; this would be the #eof
;;; (else default))))
;;;
;;; (define (get-normalized-cpu-load-raw)
;;; (let* ((actual-host (get-host-name))) ;; #f is localhost
;;; (let ((data (append
;;; (with-input-from-file "/proc/loadavg" read-lines)
;;; (with-input-from-file "/proc/cpuinfo" read-lines)
;;; (list "end")))
;;; (load-rx (regexp "^([\\d\\.]+)\\s+([\\d\\.]+)\\s+([\\d\\.]+)\\s+.*$"))
;;; (proc-rx (regexp "^processor\\s+:\\s+(\\d+)\\s*$"))
;;; (core-rx (regexp "^core id\\s+:\\s+(\\d+)\\s*$"))
;;; (phys-rx (regexp "^physical id\\s+:\\s+(\\d+)\\s*$"))
;;; (max-num (lambda (p n)(max (string->number p) n))))
;;; ;; (print "data=" data)
;;; (if (null? data) ;; something went wrong
;;; #f
;;; (let loop ((hed (car data))
;;; (tal (cdr data))
;;; (loads #f)
;;; (proc-num 0) ;; processor includes threads
;;; (phys-num 0) ;; physical chip on motherboard
;;; (core-num 0)) ;; core
;;; ;; (print hed ", " loads ", " proc-num ", " phys-num ", " core-num)
;;; (if (null? tal) ;; have all our data, calculate normalized load and return result
;;; (let* ((act-proc (+ proc-num 1))
;;; (act-phys (+ phys-num 1))
;;; (act-core (+ core-num 1))
;;; (adj-proc-load (/ (car loads) act-proc))
;;; (adj-core-load (/ (car loads) act-core))
;;; (result
;;; (append (list (cons 'adj-proc-load adj-proc-load)
;;; (cons 'adj-core-load adj-core-load))
;;; (list (cons '1m-load (car loads))
;;; (cons '5m-load (cadr loads))
;;; (cons '15m-load (caddr loads)))
;;; (list (cons 'proc act-proc)
;;; (cons 'core act-core)
;;; (cons 'phys act-phys)))))
;;; result)
;;; (regex-case
;;; hed
;;; (load-rx ( x l1 l5 l15 ) (loop (car tal)(cdr tal)(map string->number (list l1 l5 l15)) proc-num phys-num core-num))
;;; (proc-rx ( x p ) (loop (car tal)(cdr tal) loads (max-num p proc-num) phys-num core-num))
;;; (phys-rx ( x p ) (loop (car tal)(cdr tal) loads proc-num (max-num p phys-num) core-num))
;;; (core-rx ( x c ) (loop (car tal)(cdr tal) loads proc-num phys-num (max-num c core-num)))
;;; (else
;;; (begin
;;; ;; (print "NO MATCH: " hed)
;;; (loop (car tal)(cdr tal) loads proc-num phys-num core-num))))))))))
;;;
;;; (define (get-host-stats acfg)
;;; (let ((stats-hash (area-stats acfg)))
;;; ;; use this opportunity to remove references to dbfiles which have not been accessed in a while
;;; (for-each
;;; (lambda (dbname)
;;; (let* ((stats (hash-table-ref stats-hash dbname))
;;; (last-access (stat-when stats)))
;;; (if (and (> last-access 0) ;; if zero then there has been no access
;;; (> (- (current-seconds) last-access) 10)) ;; not used in ten seconds
;;; (begin
;;; (print "Removing " dbname " from stats list")
;;; (hash-table-delete! stats-hash dbname) ;; remove from stats hash
;;; (stat-dbs-set! stats (hash-table-keys stats))))))
;;; (hash-table-keys stats-hash))
;;;
;;; `(,(hash-table->alist (area-dbs acfg)) ;; dbname => randnum
;;; ,(map (lambda (dbname) ;; dbname is the db name
;;; (cons dbname (stat-when (hash-table-ref stats-hash dbname))))
;;; (hash-table-keys stats-hash))
;;; (cpuload . ,(get-normalized-cpu-load)))))
;;; #;(stats . ,(map (lambda (k) ;; create an alist from the stats data
;;; (cons k (stat->alist (hash-table-ref (area-stats acfg) k))))
;;; (hash-table-keys (area-stats acfg))))
;;;
;;; #;(trace
;;; ;; assv
;;; ;; cdr
;;; ;; caar
;;; ;; ;; cdr
;;; ;; call
;;; ;; finalize-all-db-handles
;;; ;; get-all-server-pkts
;;; ;; get-normalized-cpu-load
;;; ;; get-normalized-cpu-load-raw
;;; ;; launch
;;; ;; nmsg-send
;;; ;; process-db-queries
;;; ;; receive-message
;;; ;; std-peer-handler
;;; ;; update-known-servers
;;; ;; work-queue-processor
;;; )
;;;
;;; ;;======================================================================
;;; ;; netutil
;;; ;; move this back to ulex-netutil.scm someday?
;;; ;;======================================================================
;;;
;;; ;; #include <stdio.h>
;;; ;; #include <netinet/in.h>
;;; ;; #include <string.h>
;;; ;; #include <arpa/inet.h>
;;;
;;; (foreign-declare "#include \"sys/types.h\"")
;;; (foreign-declare "#include \"sys/socket.h\"")
;;; (foreign-declare "#include \"ifaddrs.h\"")
;;; (foreign-declare "#include \"arpa/inet.h\"")
;;;
;;; ;; get IP addresses from ALL interfaces
;;; (define get-all-ips
;;; (foreign-safe-lambda* scheme-object ()
;;; "
;;;
;;; // from https://stackoverflow.com/questions/17909401/linux-c-get-default-interfaces-ip-address :
;;;
;;;
;;; C_word lst = C_SCHEME_END_OF_LIST, len, str, *a;
;;; // struct ifaddrs *ifa, *i;
;;; // struct sockaddr *sa;
;;;
;;; struct ifaddrs * ifAddrStruct = NULL;
;;; struct ifaddrs * ifa = NULL;
;;; void * tmpAddrPtr = NULL;
;;;
;;; if ( getifaddrs(&ifAddrStruct) != 0)
;;; C_return(C_SCHEME_FALSE);
;;;
;;; // for (i = ifa; i != NULL; i = i->ifa_next) {
;;; for (ifa = ifAddrStruct; ifa != NULL; ifa = ifa->ifa_next) {
;;; if (ifa->ifa_addr->sa_family==AF_INET) { // Check it is
;;; // a valid IPv4 address
;;; tmpAddrPtr = &((struct sockaddr_in *)ifa->ifa_addr)->sin_addr;
;;; char addressBuffer[INET_ADDRSTRLEN];
;;; inet_ntop(AF_INET, tmpAddrPtr, addressBuffer, INET_ADDRSTRLEN);
;;; // printf(\"%s IP Address %s\\n\", ifa->ifa_name, addressBuffer);
;;; len = strlen(addressBuffer);
;;; a = C_alloc(C_SIZEOF_PAIR + C_SIZEOF_STRING(len));
;;; str = C_string(&a, len, addressBuffer);
;;; lst = C_a_pair(&a, str, lst);
;;; }
;;;
;;; // else if (ifa->ifa_addr->sa_family==AF_INET6) { // Check it is
;;; // // a valid IPv6 address
;;; // tmpAddrPtr = &((struct sockaddr_in6 *)ifa->ifa_addr)->sin6_addr;
;;; // char addressBuffer[INET6_ADDRSTRLEN];
;;; // inet_ntop(AF_INET6, tmpAddrPtr, addressBuffer, INET6_ADDRSTRLEN);
;;; //// printf(\"%s IP Address %s\\n\", ifa->ifa_name, addressBuffer);
;;; // len = strlen(addressBuffer);
;;; // a = C_alloc(C_SIZEOF_PAIR + C_SIZEOF_STRING(len));
;;; // str = C_string(&a, len, addressBuffer);
;;; // lst = C_a_pair(&a, str, lst);
;;; // }
;;;
;;; // else {
;;; // printf(\" not an IPv4 address\\n\");
;;; // }
;;;
;;; }
;;;
;;; freeifaddrs(ifa);
;;; C_return(lst);
;;;
;;; "))
;;;
;;; ;; Change this to bias for addresses with a reasonable broadcast value?
;;; ;;
;;; (define (ip-pref-less? a b)
;;; (let* ((rate (lambda (ipstr)
;;; (regex-case ipstr
;;; ( "^127\\." _ 0 )
;;; ( "^(10\\.0|192\\.168\\.)\\..*" _ 1 )
;;; ( else 2 ) ))))
;;; (< (rate a) (rate b))))
;;;
;;;
;;; (define (get-my-best-address)
;;; (let ((all-my-addresses (get-all-ips))
;;; ;;(all-my-addresses-old (vector->list (hostinfo-addresses (hostname->hostinfo (get-host-name)))))
;;; )
;;; (cond
;;; ((null? all-my-addresses)
;;; (get-host-name)) ;; no interfaces?
;;; ((eq? (length all-my-addresses) 1)
;;; (car all-my-addresses)) ;; only one to choose from, just go with it
;;;
;;; (else
;;; (car (sort all-my-addresses ip-pref-less?)))
;;; ;; (else
;;; ;; (ip->string (car (filter (lambda (x) ;; take any but 127.
;;; ;; (not (eq? (u8vector-ref x 0) 127)))
;;; ;; all-my-addresses))))
;;;
;;; )))
;;;
;;; (define (get-all-ips-sorted)
;;; (sort (get-all-ips) ip-pref-less?))
;;;
;;;