Megatest

Artifact [e8344f5ae3]
Login

Artifact e8344f5ae35974e6cbbb4be520961374480e201d:


;;; ulex: Distributed sqlite3 db
;;;
;; Copyright (C) 2018 Matt Welland
;; Redistribution and use in source and binary forms, with or without
;; modification, is permitted.
;;
;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS
;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
;; ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE
;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
;; OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
;; BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
;; LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
;; USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
;; DAMAGE.

;;======================================================================
;; ABOUT:
;;   See README in the distribution at https://www.kiatoa.com/fossils/ulex
;; NOTES:
;;   Why sql-de-lite and not say, dbi?  - performance mostly, then simplicity.
;;
;;======================================================================

(use mailbox)

(module ulex
    *

(import scheme posix chicken data-structures ports extras files mailbox)
(import srfi-18 pkts matchable regex
	typed-records srfi-69 srfi-1
	srfi-4 regex-case
	(prefix sqlite3 sqlite3:)
	foreign
	tcp6
	;; ulex-netutil
	hostinfo
	)

;;======================================================================
;; KEY FUNCTIONS - THESE ARE TOO BE EXPOSED AND USED
;;======================================================================

;; connection setup and management functions

;; This is the basic setup command. Must always be
;; called before connecting to a db using connect.
;;
;; find or become the captain
;; setup and return a ulex object
;;
(define (setup)
  (let* ((udata (make-udat))
	 (cpkts (get-all-captain-pkts udata)) ;; read captain pkts
	 (captn (get-winning-pkt cpkts)))
    ;; check to see if our own server is started and start one if not
    (if (not (udat-serv-listener udata))(start-server-find-port udata))
    (if captn
	(let* ((port   (alist-ref 'port   captn))
	       (host   (alist-ref 'host   captn))
	       (ipaddr (alist-ref 'ipaddr captn))
	       (pid    (alist-ref 'pid    captn))
	       (Z      (alist-ref 'Z      captn)))
	  (udat-captain-address-set! udata ipaddr)
	  (udat-captain-host-set!    udata host)
	  (udat-captain-port-set!    udata port)
	  (udat-captain-pid-set!     udata pid)
	  (let-values (((success pingtime)(ping udata (conc ipaddr ":" port))))
	    (if success
		udata
		(begin
		  (print "Found unreachable captain at " ipaddr ":" port ", removing pkt")
		  (remove-captain-pkt udata captn)
		  (setup)))))
	(begin
	  (setup-as-captain udata)  ;; this saves the thread to captain-thread and starts the thread
	  (setup)))))

;; connect to a specific dbfile
(define (connect udata dbfname dbtype)
  udata)

;; returns: success pingtime
;;
;; NOTE: causes the callee to store the info on this host along with the dbs this host currently owns
;;
(define (ping udata host-port)
  (let* ((start  (current-milliseconds))
	 (cookie (make-cookie udata))
	 (dbs    (udat-my-dbs udata))
	 (msg    (string-intersperse dbs " "))
	 (res (send udata host-port 'ping cookie msg retval: #t))
	 (delta (- (current-milliseconds) start)))
    (values (equal? res cookie) delta)))

;; returns: success pingtime
;;
;; NOTE: causes all references to this worker to be wiped out in the callee (ususally the captain)
;;
(define (goodbye-ping udata host-port)
  (let* ((start  (current-milliseconds))
	 (cookie (make-cookie udata))
	 (dbs    (udat-my-dbs udata))
	 (res (send udata host-port 'goodbye cookie "nomsg" retval: #t))
	 (delta (- (current-milliseconds) start)))
    (values (equal? res cookie) delta)))

(define (goodbye-captain udata)
  (let* ((host-port (udat-captain-host-port udata)))
    (if host-port
	(goodbye-ping udata host-port)
	(values #f -1))))

;;======================================================================
;; network utilities
;;======================================================================

(define (rate-ip ipaddr)
  (regex-case ipaddr
    ( "^127\\..*" _ 0 )
    ( "^(10\\.0|192\\.168)\\..*" _ 1 )
    ( else 2 ) ))

;; Change this to bias for addresses with a reasonable broadcast value?
;;
(define (ip-pref-less? a b)
  (> (rate-ip a) (rate-ip b)))
  

(define (get-my-best-address)
  (let ((all-my-addresses (get-all-ips))
        ;;(all-my-addresses-old (vector->list (hostinfo-addresses (hostname->hostinfo (get-host-name)))))
        )
    (cond
     ((null? all-my-addresses)
      (get-host-name))                                          ;; no interfaces?
     ((eq? (length all-my-addresses) 1)
      (car all-my-addresses))                      ;; only one to choose from, just go with it
     
     (else
      (car (sort all-my-addresses ip-pref-less?)))
     ;; (else 
     ;;  (ip->string (car (filter (lambda (x)                      ;; take any but 127.
     ;;    			 (not (eq? (u8vector-ref x 0) 127)))
     ;;    		       all-my-addresses))))

     )))

(define (get-all-ips-sorted)
  (sort (get-all-ips) ip-pref-less?))

(define (get-all-ips)
  (map ip->string (vector->list 
		   (hostinfo-addresses
		    (host-information (current-hostname))))))

;; make it a global? Well, it is local to area module

(define *captain-pktspec*
  `((captain (host     . h)
	     (port     . p)
	     (pid      . i)
	     (ipaddr   . a)
	     )
    #;(data   (hostname . h)  ;; sender hostname
	    (port     . p)  ;; sender port
	    (ipaddr   . a)  ;; sender ip
	    (hostkey  . k)  ;; sending host key - store info at server under this key
	    (servkey  . s)  ;; server key - this needs to match at server end or reject the msg
	    (format   . f)  ;; sb=serialized-base64, t=text, sx=sexpr, j=json
	    (data     . d)  ;; base64 encoded slln data
	    )))

;; struct for keeping track of our world

(defstruct udat
  ;; captain info
  (captain-address #f)
  (captain-host    #f)
  (captain-port    #f)
  (captain-pid     #f)
  (ulex-dir        (conc (get-environment-variable "HOME") "/.ulex"))
  (cpkts-dir       (conc (get-environment-variable "HOME") "/.ulex/pkts"))
  (cpkt-spec       *captain-pktspec*)
  ;; this processes info
  (my-cpkt-key     #f)   ;; put Z card here when I create a pkt for myself as captain
  (my-address      #f)
  (my-hostname     #f)
  (my-port         #f)
  (my-pid          (current-process-id))
  (my-dbs          '())
  ;; server and handler thread
  (serv-listener   #f)                 ;; this processes server info
  (handler-thread  #f)
  (mboxes          (make-hash-table))  ;; key => mbox
  ;; other servers
  (peers           (make-hash-table))  ;; host-port => peer record
  (dbowners        (make-hash-table))  ;; dbfile => host-port
  (handlers        (make-hash-table))  ;; dbfile => proc
  (outgoing-conns  (make-hash-table))  ;; host:port -> conn
  (work-queue      (make-queue))       ;; most stuff goes here
  ;; (fast-queue      (make-queue))       ;; super quick stuff goes here (e.g. ping)
  (busy            #f)                 ;; is either of the queues busy, use to switch between queuing tasks or doing immediately
  ;; app info
  (appname         #f)
  (dbtypes         (make-hash-table))  ;; this should be an alist but hash is easier. dbtype => [ initproc syncproc ]
  ;; cookies
  (cnum            0) ;; cookie num
  )


(define (udat-my-host-port udata)
  (if (and (udat-my-address udata)(udat-my-port udata))
      (conc (udat-my-address udata) ":" (udat-my-port udata))
      #f))

(define (udat-captain-host-port udata)
  (if (and (udat-captain-address udata)(udat-captain-port udata))
      (conc (udat-captain-address udata) ":" (udat-captain-port udata))
      #f))

(define (udat-get-peer udata host-port)
  (hash-table-ref/default (udat-peers udata) host-port #f))

;; struct for keeping track of others we are talking to

(defstruct peer
  (addr-port       #f)
  (hostname        #f)
  (pid             #f)
  ;; (inp             #f)
  ;; (oup             #f)
  (dbs            '()) ;; list of databases this peer is currently handling
  )

(defstruct work
  (peer-dat   #f)
  (handlerkey #f)
  (qrykey     #f)
  (data       #f)
  (start      (current-milliseconds)))

;;======================================================================
;; Captain functions
;;======================================================================

;; NB// This needs to be started in a thread
;;
;; setup to be a captain
;;   - start server
;;   - create pkt
;;   - start server port handler
;;
(define (setup-as-captain udata)
  (if (start-server-find-port udata) ;; puts the server in udata
      (if (create-captain-pkt udata)
	  (let* ((th (make-thread (lambda ()
				    (ulex-handler udata)) "Captain handler")))
	    (udat-handler-thread-set! udata th)
	    (thread-start! th))
	  #f)
      #f))

;; given a pkts dir read 
;;
(define (get-all-captain-pkts udata)
  (let* ((pktsdir       (let ((d (udat-cpkts-dir udata)))
			  (if (file-exists? d)
			      d
			      (begin
				(create-directory d #t)
				d))))
	 (all-pkt-files (glob (conc pktsdir "/*.pkt")))
	 (pkt-spec      (udat-cpkt-spec udata)))
    (map (lambda (pkt-file)
	   (read-pkt->alist pkt-file pktspec: pkt-spec))
	 all-pkt-files)))

;; sort by D then Z, return one, choose the oldest then
;; differentiate if needed using the Z key
;;l
(define (get-winning-pkt pkts)
  (if (null? pkts)
      #f
      (car (sort pkts (lambda (a b)
			(let ((ad (string->number (alist-ref 'D a)))
			      (bd (string->number (alist-ref 'D b))))
			  (if (eq? a b)
			      (let ((az (alist-ref 'Z a))
				    (bz (alist-ref 'Z b)))
				(string>=? az bz))
			      (> ad bd))))))))

;; put the host, ip, port and pid into a pkt in
;; the captain pkts dir
;;  - assumes user has already fired up a server
;;    which will be in the udata struct
;;
(define (create-captain-pkt udata)
  (if (not (udat-serv-listener udata))
      (begin
	(print "ERROR: create-captain-pkt called with out a listener")
	#f)
      (let* ((pktdat `((port   . ,(udat-my-port udata))
		       (host   . ,(udat-my-hostname udata))
		       (ipaddr . ,(udat-my-address udata))
		       (pid    . ,(udat-my-pid     udata))))
	     (pktdir  (udat-cpkts-dir udata))
	     (pktspec (udat-cpkt-spec udata))
	     )
	(udat-my-cpkt-key-set!
	 udata
	 (write-alist->pkt
	  pktdir
	  pktdat
	  pktspec: pktspec
	  ptype:   'captain))
	(udat-my-cpkt-key udata))))

;; remove pkt associated with captn (the Z key .pkt)
;;
(define (remove-captain-pkt udata captn)
  (let ((Z       (alist-ref 'Z captn))
	(cpktdir (udat-cpkts-dir udata)))
    (delete-file* (conc cpktdir "/" Z ".pkt"))))

;;======================================================================
;; server primitives
;;======================================================================

(define (make-cookie udata)
  (let ((newcnum (+ (udat-cnum udata))))
    (udat-cnum-set! udata newcnum)
    (conc (udat-my-address udata) ":"
	  (udat-my-port    udata) "-"
	  (udat-my-pid     udata) "-"
	  newcnum)))

;; create a tcp listener and return a populated udat struct with
;; my port, address, hostname, pid etc.
;; return #f if fail to find a port to allocate.
;;
(define (start-server-find-port udata #!optional (port 4242)) 
  (handle-exceptions
      exn
      (if (< port 65535)(start-server-find-port udata (+ port 1)) #f)
    (connect-server udata port)))

(define (connect-server udata port)
  ;; (tcp-listener-socket LISTENER)(socket-name so)
  ;; sockaddr-address, sockaddr-port, sockaddr->string
  (let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]])
	 (addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname)))
    (udat-my-address-set!    udata addr)
    (udat-my-port-set!       udata port)
    (udat-my-hostname-set!   udata (get-host-name))
    (udat-serv-listener-set! udata tlsn)
    udata))

(define (get-peer-dat udata host-port #!optional (hostname #f)(pid #f))
  ;; I'm currently very fuzzy on whether it makes sense to be reusing the outgoing connections.
  ;; at the other end of the line I think the reciever has closed the ports - thus each message
  ;; requires new connection?
  (let* ((pdat (or (udat-get-peer udata host-port)
		   (handle-exceptions ;; ERROR - MAKE THIS EXCEPTION HANDLER MORE SPECIFIC
		    exn
		    #f
		    (let ((npdat (make-peer addr-port: host-port)))
		      (if hostname (peer-hostname-set! npdat hostname))
		      (if pid (peer-pid-set! npdat pid))
		      npdat)))))
    pdat))

;; send structured data to recipient
;;
;;  NOTE: qrykey is what was called the "cookie" previously
;;
;;     retval tells send to expect and wait for return data (one line) and return it or time out
;;       this is for ping where we don't want to necessarily have set up our own server yet.
;;
(define (send udata host-port handler qrykey data #!key (hostname #f)(pid #f)(params '())(retval #f))
  (handle-exceptions ;; ERROR - MAKE THIS EXCEPTION HANDLER MORE SPECIFIC
   exn
   #f 
   (let-values (((inp oup)(tcp-connect host-port)))
     ;;
     ;; CONTROL LINE:
     ;;    handlerkey host:port pid qrykey params ...
     ;;
     (let ((res
	    (if (and inp oup)
		(let* ((myhost (udat-my-address udata))
		       (myport (udat-my-port    udata))
		       (dat  (conc
			      handler " "
			      (udat-my-address  udata) ":" (udat-my-port udata) " "
			      ;; (udat-my-hostname udata) " "
			      (udat-my-pid  udata) " "
			      qrykey
			      (if (null? params) "" (conc " " (string-intersperse params " "))))))
		  (if (and myhost myport)
		      (begin
			(write-line dat  oup)
			(write-line data oup)
			;; (print "Sent dat: " dat " data: " data)
			(if retval
			    (read-line inp)
			    #t))
		      (begin
			(print "ERROR: send called but no receiver has been setup. Please call setup first!")
			#f))
		  ;; NOTE: DO NOT BE TEMPTED TO LOOK AT ANY DATA ON INP HERE!
		  ;;       (there is a listener for handling that)
		  )
		#f))) ;; #f means failed to connect and send
       (close-input-port inp)
       (close-output-port oup)
       res))))

;; send a request to the given host-port and register a mailbox in udata
;; wait for the mailbox data and return it
;;
(define (send-receive udata host-port handler qrykey data #!key (hostname #f)(pid #f)(params '())(timeout 20))
  (let ((mbox      (make-mailbox))
	(mbox-time (current-milliseconds))
	(mboxes    (udat-mboxes udata)))
    (hash-table-set! mboxes qrykey mbox)
    (if (send udata host-port handler qrykey data hostname: hostname pid: pid params: params)
	(let* ((mbox-timeout-secs    timeout)
	       (mbox-timeout-result 'MBOX_TIMEOUT)
	       (res                  (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result))
	       (mbox-receive-time    (current-milliseconds)))
	  (hash-table-delete! mboxes qrykey)
	  (if (eq? res 'MBOX_TIMEOUT)
	      #f
	      res))
	#f))) ;; #f means failed to communicate

(define (add-to-work-queue udata peer-dat handlerkey qrykey data)
  (let ((wdat (make-work peer-dat: peer-dat handlerkey: handlerkey qrykey: qrykey data: data)))
    (if (udat-busy udata)
	(queue-add! (udat-work-queue udata) wdat)
	(process-work udata wdat)) ;; passing in wdat tells process-work to first process the passed in wdat
    ))

(define (do-work udata wdat)
  #f)

(define (process-work udata #!optional wdat)
  (if wdat (do-work udata wdat)) ;; process wdat
  (let ((wqueue (udat-work-queue udata)))
    (if (not (queue-empty? wqueue))
	(let loop ((wd (queue-remove! wqueue)))
	  (do-work udata wd)
	  (if (not (queue-empty? wqueue))
	      (loop (queue-remove! wqueue)))))))

;; 
;;
(define (ulex-handler udata)
  (let* ((serv-listener (udat-serv-listener udata)))
    (print "serv-listner: " serv-listener)
    ;; data comes as two lines
    ;;   handlerkey resp-addr:resp-port hostname pid qrykey [dbpath/dbfile.db]
    ;;   data
    (let loop ((state 'start))
      (let-values (((inp oup)(tcp-accept serv-listener)))
	(print "got here: inp=" inp " oup=" oup)
	(let* ((controldat (read-line inp))
	       (data       (read-line inp)))
	  (print "controldat: " controldat " data: " data)
	  (match (string-split controldat)
		 ((handlerkey host-port pid qrykey params ...)
		  (print "handlerkey: " handlerkey " host-port: " host-port " pid: " pid " qrykey: " qrykey " params: " params)
		  (case (string->symbol handlerkey)
		    ((ack)(print "Got ack!"))
		    ((ping) ;; special case - return result immediately on the same connection
		     (let* ((proc  (hash-table-ref/default (udat-handlers udata) 'ping #f))
			    (val   (if proc (proc) "gotping"))
			    (peer  (make-peer addr-port: host-port pid: pid))
			    (dbshash (udat-dbowners udata)))
		       (peer-dbs-set! peer params) ;; params for ping is list of dbs owned by pinger
		       (for-each (lambda (dbfile)
				   (hash-table-set! dbshash dbfile host-port))
				 params) ;; register each db in the dbshash
		       (if (not (hash-table-exists? (udat-peers udata) host-port))
			   (hash-table-set! (udat-peers udata) host-port peer)) ;; save the details of this caller in peers
		       (write-line qrykey oup)
		       (close-input-port inp)
		       (close-output-port oup))) ;; End of ping
		    ((goodbye)
		     ;; remove all traces of the caller in db ownership etc.
		     (let* ((peer  (hash-table-ref/default (udat-peers udata) host-port #f))
			    (dbs   (if peer (peer-dbs peer) '()))
			    (dbshash (udat-dbowners udata)))
		       (for-each (lambda (dbfile)(hash-table-delete! dbshash dbfile)) dbs)
		       (hash-table-delete! (udat-peers udata) host-port)
		       (write-line qrykey oup)
		       (close-input-port inp)
		       (close-output-port oup)))
		    ((rucaptain) ;; remote is asking if I'm the captain
		     (write-line (if (udat-my-cpkt-key udata) "yes" "no"))
		     (close-input-port inp)
		     (close-output-port oup))
		    ((whoowns) ;; given a db name who do I send my queries to
		     ;; look up the file in handlers, if have an entry ping them to be sure
		     ;; they are still alive and then return that host:port.
		     ;; if no handler found or if the ping fails pick from peers the oldest that
		     ;; is managing the fewest dbs
		     #f)
		    (else
		     (add-to-work-queue udata (get-peer-dat udata host-port) handlerkey qrykey data))))
		 (else (print "BAD DATA? controldat=" controldat " data=" data)))))
	(loop state))))

;; add a proc to the handler list
(define (register-handler udata key proc)
  (hash-table-set! (udat-handlers udata) key proc))


;;======================================================================
;; Generic db handling
;;   setup a inmem db instance
;;   open connection to on-disk db
;;   sync on-disk db to inmem
;;   get lock in on-disk db for dbowner of this db
;;   put sync-proc, init-proc, on-disk handle, inmem handle in dbconn stuct
;;   return the stuct
;;======================================================================

(defstruct dbconn
  (inmem  #f)
  (conn   #f)
  (sync   #f) ;; sync proc
  (init   #f) ;; init proc
  (lastsync (current-seconds))
  )

(defstruct dbinfo
  (initproc #f)
  (syncproc #f))

;; open inmem and disk database
;;   init with initproc
;;   return db struct
;;
;;   appname; megatest, ulex or something else.
;;
(define (setup-db-connection udata fname-in appname dbtype)
  (let* ((is-ulex (eq? appname 'ulex))
	 (dbinf   (if is-ulex ;; ulex is a built-in special case
		      (make-dbinfo initproc: ulexdb-init syncproc: ulexdb-sync)
		      (hash-table-ref/default (udat-dbtypes udata) dbtype #f)))
	 (initproc (dbinfo-initproc dbinf))
	 (syncproc (dbinfo-syncproc dbinf))
	 (fname   (if is-ulex
		      (conc (udat-ulex-dir udata) "/ulex.db")
		      fname-in))
	 (inmem-db (open-and-initdb udata #f 'inmem (dbinfo-initproc dbinf)))
	 (disk-db  (open-and-initdb udata fname 'disk (dbinfo-initproc dbinf))))
    (make-dbconn inmem: inmem-db conn: disk-db sync: syncproc init: initproc)))

;; dest='inmem or 'disk
;;
(define (open-and-initdb udata filename dest init-proc)
  (let* ((inmem    (eq? dest 'inmem))
	 (dbfile   (if inmem
		       ":INMEM:"
		       filename))
	 (dbexists (if inmem #t (file-exists? dbfile)))
	 (db       (sqlite3:open-database dbfile)))
    (sqlite3:set-busy-handler! db (sqlite3:make-busy-timeout 136000))
    (if (not dbexists)
	(init-proc db))
    db))


;;======================================================================
;; Ulex db
;;======================================================================

(define (ulexdb-init db inmem)
  (sqlite3:with-transaction
   db
   (lambda ()
     (for-each
      (lambda (stmt)
	(if stmt (sqlite3:execute db stmt)))
      `("CREATE TABLE IF NOT EXISTS processes 
                 (id INTEGER PRIMARY KEY,
                  host  TEXT NOT NULL,
                  ipadr TEXT NOT NULL,
                  port  INTEGER NOT NULL,
                  pid   INTEGER NOT NULL,
                  regtime INTEGER DEFAULT (strftime('%s','now')),
                  last_update INTEGER DEFAULT (strftime('%s','now')));"
	(if inmem
	    "CREATE TRIGGER  IF NOT EXISTS update_proces_trigger AFTER UPDATE ON processes
                             FOR EACH ROW
                               BEGIN 
                                 UPDATE processes SET last_update=(strftime('%s','now'))
                                   WHERE id=old.id;
                               END;"
	    #f))))))

;; open databases, do initial sync
(define (ulexdb-sync dbconndat udata)
  #f)


) ;; END OF ULEX


;;; ;;======================================================================
;;; ;; D E B U G   H E L P E R S
;;; ;;======================================================================
;;;     
;;; (define (dbg> . args)
;;;   (with-output-to-port (current-error-port)
;;;     (lambda ()
;;;       (apply print "dbg> " args))))
;;; 
;;; (define (debug-pp . args)
;;;   (if (get-environment-variable "ULEX_DEBUG")
;;;       (with-output-to-port (current-error-port)
;;; 	(lambda ()
;;; 	  (apply pp args)))))
;;; 
;;; (define *default-debug-port* (current-error-port))
;;; 
;;; (define (sdbg> fn stage-name stage-start stage-end start-time . message)
;;;   (if (get-environment-variable "ULEX_DEBUG")
;;;       (with-output-to-port *default-debug-port* 
;;; 	(lambda ()
;;; 	  (apply print "ulex:" fn " " stage-name " took " (- (if stage-end stage-end (current-milliseconds)) stage-start) " ms. "
;;; 		 (if start-time
;;; 		     (conc "total time " (- (current-milliseconds) start-time)
;;; 			   " ms.")
;;; 		     "")
;;; 		 message
;;; 		 )))))

;;======================================================================
;; M A C R O S
;;======================================================================
;; iup callbacks are not dumping the stack, this is a work-around
;;

;; Some of these routines use:
;;
;;     http://www.cs.toronto.edu/~gfb/scheme/simple-macros.html
;;
;; Syntax for defining macros in a simple style similar to function definiton,
;;  when there is a single pattern for the argument list and there are no keywords.
;;
;; (define-simple-syntax (name arg ...) body ...)
;;
;; 
;; (define-syntax define-simple-syntax
;;   (syntax-rules ()
;;     ((_ (name arg ...) body ...)
;;      (define-syntax name (syntax-rules () ((name arg ...) (begin body ...)))))))
;; 
;; (define-simple-syntax (catch-and-dump proc procname)
;;   (handle-exceptions
;;    exn
;;    (begin
;;      (print-call-chain (current-error-port))
;;      (with-output-to-port (current-error-port)
;;        (lambda ()
;;          (print ((condition-property-accessor 'exn 'message) exn))
;;          (print "Callback error in " procname)
;;          (print "Full condition info:\n" (condition->list exn)))))
;;    (proc)))
;; 
;; 
;;======================================================================
;;  R E C O R D S
;;======================================================================

;;; ;; information about me as a server
;;; ;;
;;; (defstruct area
;;;   ;; about this area
;;;   (useportlogger #f)
;;;   (lowport       32768)
;;;   (server-type   'auto)  ;; auto=create up to five servers/pkts, main=create pkts, passive=no pkt (unless there are no pkts at all)
;;;   (conn          #f)
;;;   (port          #f)
;;;   (myaddr        (get-my-best-address))
;;;   pktid          ;; get pkt from hosts table if needed
;;;   pktfile
;;;   pktsdir
;;;   dbdir
;;;   (dbhandles     (make-hash-table)) ;; fname => list-of-dbh, NOTE: Should really never need more than one?
;;;   (mutex         (make-mutex))
;;;   (rtable        (make-hash-table)) ;; registration table of available actions
;;;   (dbs           (make-hash-table)) ;; filename => random number, used for choosing what dbs I serve
;;;   ;; about other servers
;;;   (hosts         (make-hash-table)) ;; key => hostdat
;;;   (hoststats     (make-hash-table)) ;; key => alist of fname => ( qcount . qtime )
;;;   (reqs          (make-hash-table)) ;; uri => queue
;;;   ;; work queues
;;;   (wqueues       (make-hash-table)) ;; fname => qdat
;;;   (stats         (make-hash-table)) ;; fname => totalqueries
;;;   (last-srvup    (current-seconds)) ;; last time we updated the known servers
;;;   (cookie2mbox   (make-hash-table)) ;; map cookie for outstanding request to mailbox of awaiting call
;;;   (ready #f)
;;;   (health        (make-hash-table)) ;; ipaddr:port => num failed pings since last good ping
;;;   )
;;; 
;;; ;; host stats
;;; ;;
;;; (defstruct hostdat
;;;   (pkt      #f)
;;;   (dbload   (make-hash-table))  ;; "dbfile.db" => queries/min
;;;   (hostload #f)                 ;; normalized load ( 5min load / numcpus )
;;;   )
;;; 
;;; ;; dbdat
;;; ;;
;;; (defstruct dbdat
;;;   (dbh    #f)
;;;   (fname  #f)
;;;   (write-access #f)
;;;   (sths   (make-hash-table))  ;; hash mapping query strings to handles
;;;   )
;;; 
;;; ;; qdat
;;; ;;
;;; (defstruct qdat
;;;   (writeq  (make-queue))
;;;   (readq   (make-queue))
;;;   (rwq     (make-queue))
;;;   (logq    (make-queue)) ;; do we need a queue for logging? yes, if we use sqlite3 db for logging
;;;   (osshort (make-queue))
;;;   (oslong  (make-queue))
;;;   (misc    (make-queue)) ;; used for things like ping-full
;;;   )
;;; 
;;; ;; calldat
;;; ;;
;;; (defstruct calldat
;;;   (ctype 'dbwrite)
;;;   (obj   #f)              ;; this would normally be an SQL statement e.g. SELECT, INSERT etc.
;;;   (rtime (current-milliseconds)))
;;; 
;;; ;; make it a global? Well, it is local to area module
;;; 
;;; (define *pktspec*
;;;   `((server (hostname . h)
;;; 	    (port     . p)
;;; 	    (pid      . i)
;;; 	    (ipaddr   . a)
;;; 	    )
;;;     (data   (hostname . h)  ;; sender hostname
;;; 	    (port     . p)  ;; sender port
;;; 	    (ipaddr   . a)  ;; sender ip
;;; 	    (hostkey  . k)  ;; sending host key - store info at server under this key
;;; 	    (servkey  . s)  ;; server key - this needs to match at server end or reject the msg
;;; 	    (format   . f)  ;; sb=serialized-base64, t=text, sx=sexpr, j=json
;;; 	    (data     . d)  ;; base64 encoded slln data
;;; 	    )))
;;; 
;;; ;; work item
;;; ;;
;;; (defstruct witem
;;;   (rhost #f)   ;; return host
;;;   (ripaddr #f) ;; return ipaddr
;;;   (rport #f)   ;; return port
;;;   (servkey #f) ;; the packet representing the client of this workitem, used by final send-message
;;;   (rdat  #f)   ;; the request - usually an sql query, type is rdat
;;;   (action #f)  ;; the action: immediate, dbwrite, dbread,oslong, osshort
;;;   (cookie #f)  ;; cookie id for response
;;;   (data   #f)  ;; the data payload, i.e. parameters
;;;   (result #f)  ;; the result from processing the data
;;;   (caller #f)) ;; the calling peer according to rpc itself
;;; 
;;; (define (trim-pktid pktid)
;;;   (if (string? pktid)
;;;       (substring pktid 0 4)
;;;       "nopkt"))
;;; 
;;; (define (any->number num)
;;;   (cond
;;;    ((number? num) num)
;;;    ((string? num) (string->number num))
;;;    (else num)))
;;; 
;;; (use trace)
;;; (trace-call-sites #t)
;;; 
;;; ;;======================================================================
;;; ;; D A T A B A S E   H A N D L I N G 
;;; ;;======================================================================
;;; 
;;; ;; look in dbhandles for a db, return it, else return #f
;;; ;;
;;; (define (get-dbh acfg fname)
;;;   (let ((dbh-lst (hash-table-ref/default (area-dbhandles acfg) fname '())))
;;;     (if (null? dbh-lst)
;;; 	(begin
;;; 	  ;; (print "opening db for " fname)
;;; 	  (open-db acfg fname)) ;; Note that the handles get put back in the queue in the save-dbh calls
;;; 	(let ((rem-lst (cdr dbh-lst)))
;;; 	  ;; (print "re-using saved connection for " fname)
;;; 	  (hash-table-set! (area-dbhandles acfg) fname rem-lst)
;;; 	  (car dbh-lst)))))
;;; 
;;; (define (save-dbh acfg fname dbdat)
;;;     ;; (print "saving dbh for " fname)
;;;     (hash-table-set! (area-dbhandles acfg) fname (cons dbdat (hash-table-ref/default (area-dbhandles acfg) fname '()))))
;;; 
;;; ;; open the database, if never before opened init it. put the handle in the
;;; ;; open db's hash table
;;; ;; returns: the dbdat
;;; ;;
;;; (define (open-db acfg fname)
;;;   (let* ((fullname     (conc (area-dbdir acfg) "/" fname))
;;; 	 (exists       (file-exists? fullname))
;;; 	 (write-access (if exists
;;; 			   (file-write-access? fullname)
;;; 			   (file-write-access? (area-dbdir acfg))))
;;; 	 (db           (sqlite3:open-database fullname))
;;; 	 (handler      (sqlite3:make-busy-timeout 136000))
;;; 	 )
;;;     (sqlite3:set-busy-handler! db handler)
;;;     (sqlite3:execute db "PRAGMA synchronous = 0;")
;;;     (if (not exists) ;; need to init the db
;;; 	(if write-access
;;; 	    (let ((isql (get-rsql acfg 'dbinitsql))) ;; get the init sql statements
;;; 	      ;; (sqlite3:with-transaction
;;; 	      ;;  db
;;; 	      ;;  (lambda ()
;;; 		 (if isql
;;; 		     (for-each
;;; 		      (lambda (sql)
;;; 			(sqlite3:execute db sql))
;;; 		      isql)))
;;; 	    (print "ERROR: no write access to " (area-dbdir acfg))))
;;;     (make-dbdat dbh: db fname: fname write-access: write-access)))
;;; 
;;; ;; This is a low-level command to retrieve or to prepare, save and return a prepared statment
;;; ;; you must extract the db handle
;;; ;;
;;; (define (get-sth db cache stmt)
;;;   (if (hash-table-exists? cache stmt)
;;;       (begin
;;; 	;; (print "Reusing cached stmt for " stmt)
;;; 	(hash-table-ref/default cache stmt #f))
;;;       (let ((sth (sqlite3:prepare db stmt)))
;;; 	(hash-table-set! cache stmt sth)
;;; 	;; (print "prepared stmt for " stmt)
;;; 	sth)))
;;; 
;;; ;; a little more expensive but does all the tedious deferencing - only use if you don't already
;;; ;; have dbdat and db sitting around
;;; ;;
;;; (define (full-get-sth acfg fname stmt)
;;;   (let* ((dbdat  (get-dbh acfg fname))
;;; 	 (db     (dbdat-dbh dbdat))
;;; 	 (sths   (dbdat-sths dbdat)))
;;;     (get-sth db sths stmt)))
;;; 
;;; ;; write to a db
;;; ;; acfg: area data
;;; ;; rdat: request data
;;; ;; hdat: (host . port)
;;; ;;
;;; ;; (define (dbwrite acfg rdat hdat data-in)
;;; ;;   (let* ((dbname (car data-in))
;;; ;; 	 (dbdat  (get-dbh acfg dbname))
;;; ;; 	 (db     (dbdat-dbh dbdat))
;;; ;; 	 (sths   (dbdat-sths dbdat))
;;; ;; 	 (stmt   (calldat-obj rdat))
;;; ;; 	 (sth    (get-sth db sths stmt))
;;; ;; 	 (data   (cdr data-in)))
;;; ;;     (print "dbname: " dbname " acfg: " acfg " rdat: " (calldat->alist rdat) " hdat: " hdat " data: " data)
;;; ;;     (print "dbdat: " (dbdat->alist dbdat))
;;; ;;     (apply sqlite3:execute sth data)
;;; ;;     (save-dbh acfg dbname dbdat)
;;; ;;     #t
;;; ;;     ))
;;; 
;;; (define (finalize-all-db-handles acfg)
;;;   (let* ((dbhandles (area-dbhandles acfg))  ;; dbhandles is hash of fname ==> dbdat
;;; 	 (num       0))
;;;     (for-each
;;;      (lambda (area-name)
;;;        (print "Closing handles for " area-name)
;;;        (let ((dbdats (hash-table-ref/default dbhandles area-name '())))
;;; 	 (for-each
;;; 	  (lambda (dbdat)
;;; 	    ;; first close all statement handles
;;; 	    (for-each
;;; 	     (lambda (sth)
;;; 	       (sqlite3:finalize! sth)
;;; 	       (set! num (+ num 1)))
;;; 	     (hash-table-values (dbdat-sths dbdat)))
;;; 	    ;; now close the dbh
;;; 	    (set! num (+ num 1))
;;; 	    (sqlite3:finalize! (dbdat-dbh dbdat)))
;;; 	  dbdats)))
;;;      (hash-table-keys dbhandles))
;;;     (print "FINALIZED " num " dbhandles")))
;;; 
;;; ;;======================================================================
;;; ;; W O R K   Q U E U E   H A N D L I N G 
;;; ;;======================================================================
;;; 
;;; (define (register-db-as-mine acfg dbname)
;;;   (let ((ht (area-dbs acfg)))
;;;     (if (not (hash-table-ref/default ht dbname #f))
;;; 	(hash-table-set! ht dbname (random 10000)))))
;;; 	
;;; (define (work-queue-add acfg fname witem)
;;;   (let* ((work-queue-start (current-milliseconds))
;;; 	 (action           (witem-action witem)) ;; NB the action is the index into the rdat actions
;;; 	 (qdat             (or (hash-table-ref/default (area-wqueues acfg) fname #f)
;;; 			       (let ((newqdat (make-qdat)))
;;; 				 (hash-table-set! (area-wqueues acfg) fname newqdat)
;;; 				 newqdat)))
;;; 	 (rdat             (hash-table-ref/default (area-rtable acfg) action #f)))
;;;     (if rdat
;;; 	(queue-add!
;;; 	 (case (calldat-ctype rdat)
;;; 	   ((dbwrite)   (register-db-as-mine acfg fname)(qdat-writeq qdat))
;;; 	   ((dbread)    (register-db-as-mine acfg fname)(qdat-readq  qdat))
;;; 	   ((dbrw)      (register-db-as-mine acfg fname)(qdat-rwq    qdat))
;;; 	   ((oslong)    (qdat-oslong qdat))
;;; 	   ((osshort)   (qdat-osshort qdat))
;;; 	   ((full-ping) (qdat-misc  qdat))
;;; 	   (else
;;; 	    (print "ERROR: no queue for " action ". Adding to dbwrite queue.")
;;; 	    (qdat-writeq qdat)))
;;; 	 witem)
;;; 	(case action
;;; 	  ((full-ping)(qdat-misc qdat))
;;; 	  (else
;;; 	   (print "ERROR: No action " action " was registered"))))
;;;     (sdbg> "work-queue-add" "queue-add" work-queue-start #f #f)
;;;     #t)) ;; for now, simply return #t to indicate request got to the queue
;;; 
;;; (define (doqueue acfg q fname dbdat dbh)
;;;   ;; (print "doqueue: " fname)
;;;   (let* ((start-time (current-milliseconds))
;;; 	 (qlen       (queue-length q)))
;;;     (if (> qlen 1)
;;; 	(print "Processing queue of length " qlen))
;;;     (let loop ((count      0)
;;; 	       (responses '()))
;;;       (let ((delta (- (current-milliseconds) start-time)))
;;; 	(if (or (queue-empty? q)
;;; 		(> delta 400)) ;; stop working on this queue after 400ms have passed
;;; 	    (list count delta responses) ;; return count, delta and responses list
;;; 	    (let* ((witem  (queue-remove! q))
;;; 		   (action (witem-action witem))
;;; 		   (rdat   (witem-rdat   witem))
;;; 		   (stmt   (calldat-obj rdat))
;;; 		   (sth    (full-get-sth acfg fname stmt))
;;; 		   (ctype  (calldat-ctype rdat))
;;; 		   (data   (witem-data   witem))
;;; 		   (cookie (witem-cookie witem)))
;;; 	      ;; do the processing and save the result in witem-result
;;; 	      (witem-result-set!
;;; 	       witem
;;; 	       (case ctype ;; action
;;; 		 ((noblockwrite) ;; blind write, no ack of success returned
;;; 		  (apply sqlite3:execute sth data)
;;; 		  (sqlite3:last-insert-rowid dbh))
;;; 		 ((dbwrite)      ;; blocking write   
;;; 		  (apply sqlite3:execute sth data)
;;; 		  #t)
;;; 		 ((dbread) ;; TODO: consider breaking this up and shipping in pieces for large query
;;; 		  (apply sqlite3:map-row (lambda x x) sth data))
;;; 		 ((full-ping)  'full-ping)
;;; 		 (else (print "Not ready for action " action) #f)))
;;; 	      (loop (add1 count)
;;; 		    (if cookie
;;; 			(cons witem responses)
;;; 			responses))))))))
;;; 
;;; ;; do up to 400ms of processing on each queue
;;; ;; - the work-queue-processor will allow the max 1200ms of work to complete but it will flag as overloaded
;;; ;; 
;;; (define (process-db-queries acfg fname)
;;;   (if (hash-table-exists? (area-wqueues acfg) fname)
;;;       (let* ((process-db-queries-start-time (current-milliseconds))
;;; 	     (qdat             (hash-table-ref/default (area-wqueues acfg) fname #f))
;;; 	     (queue-sym->queue (lambda (queue-sym)
;;; 				 (case queue-sym  ;; lookup the queue from qdat given a name (symbol)
;;; 				   ((wqueue)  (qdat-writeq qdat))
;;; 				   ((rqueue)  (qdat-readq  qdat))
;;; 				   ((rwqueue) (qdat-rwq    qdat))
;;; 				   ((misc)    (qdat-misc   qdat))
;;; 				   (else #f))))
;;; 	     (dbdat   (get-dbh acfg fname))
;;; 	     (dbh     (if (dbdat? dbdat)(dbdat-dbh dbdat) #f))
;;; 	     (nowtime (current-seconds)))
;;; 	;; handle the queues that require a transaction
;;; 	;;
;;; 	(map ;; 
;;; 	 (lambda (queue-sym)
;;; 	   ;; (print "processing queue " queue-sym)
;;; 	   (let* ((queue (queue-sym->queue queue-sym)))
;;; 	     (if (not (queue-empty? queue))
;;; 		 (let ((responses
;;; 			(sqlite3:with-transaction ;; todo - catch exceptions...
;;; 			 dbh
;;; 			 (lambda ()
;;; 			   (let* ((res (doqueue acfg queue fname dbdat dbh))) ;; this does the work!
;;; 			     ;; (print "res=" res)
;;; 			     (match res
;;; 			      ((count delta responses)
;;; 			       (update-stats acfg fname queue-sym delta count)
;;; 			       (sdbg> "process-db-queries" "sqlite3-transaction" process-db-queries-start-time #f #f)
;;; 			       responses) ;; return responses
;;; 			      (else
;;; 			       (print "ERROR: bad return data from doqueue " res)))
;;; 			     )))))
;;; 		   ;; having completed the transaction, send the responses.
;;; 		   ;; (print "INFO: sending " (length responses) " responses.")
;;; 		   (let loop ((responses-left responses))
;;; 		     (cond
;;; 		      ((null? responses-left)  #t)
;;; 		      (else
;;; 		       (let* ((witem    (car responses-left))
;;; 			      (response (cdr responses-left)))  
;;; 			 (call-deliver-response acfg (witem-ripaddr witem)(witem-rport witem)
;;; 						(witem-cookie witem)(witem-result witem)))
;;; 		       (loop (cdr responses-left))))))
;;; 		 )))
;;; 	 '(wqueue rwqueue rqueue))
;;; 	
;;; 	;; handle misc queue
;;; 	;;
;;; 	;; (print "processing misc queue")
;;; 	(let ((queue (queue-sym->queue 'misc)))
;;; 	  (doqueue acfg queue fname dbdat dbh))
;;; 	;; ....
;;; 	(save-dbh acfg fname dbdat)
;;; 	#t ;; just to let the tests know we got here
;;; 	)
;;;       #f ;; nothing processed
;;;       ))
;;; 
;;; ;; run all queues in parallel per db but sequentially per queue for that db.
;;; ;;  - process the queues every 500 or so ms
;;; ;;  - allow for long running queries to continue but all other activities for that
;;; ;;    db will be blocked.
;;; ;;
;;; (define (work-queue-processor acfg)
;;;   (let* ((threads (make-hash-table))) ;; fname => thread
;;;     (let loop ((fnames      (hash-table-keys (area-wqueues acfg)))
;;; 	       (target-time (+ (current-milliseconds) 50)))
;;;       ;;(if (not (null? fnames))(print "Processing for these databases: " fnames))
;;;       (for-each
;;;        (lambda (fname)
;;; 	 ;; (print "processing for " fname)
;;; 	 ;;(process-db-queries acfg fname))
;;; 	 (let ((th (hash-table-ref/default threads fname #f)))
;;; 	   (if (and th (not (member (thread-state th) '(dead terminated))))
;;; 	       (begin
;;; 		 (print "WARNING: worker thread for " fname " is taking a long time.")
;;; 		 (print "Thread is in state " (thread-state th)))
;;; 	       (let ((th1 (make-thread (lambda ()
;;; 					 ;; (catch-and-dump
;;; 					 ;;  (lambda ()
;;; 					    ;; (print "Process queries for " fname)
;;; 					    (let ((start-time (current-milliseconds)))
;;; 					      (process-db-queries acfg fname)
;;; 					      ;; (thread-sleep! 0.01) ;; need the thread to take at least some time
;;; 					      (hash-table-delete! threads fname)) ;; no mutexes?
;;; 					    fname)
;;; 					  "th1"))) ;; ))
;;; 		 (hash-table-set! threads fname th1)
;;; 		 (thread-start! th1)))))
;;;        fnames)
;;;       ;; (thread-sleep! 0.1) ;; give the threads some time to process requests
;;;       ;; burn time until 400ms is up
;;;       (let ((now-time (current-milliseconds)))
;;; 	(if (< now-time target-time)
;;; 	    (let ((delta (- target-time now-time)))
;;; 	      (thread-sleep! (/ delta 1000)))))
;;;       (loop (hash-table-keys (area-wqueues acfg))
;;; 	    (+ (current-milliseconds) 50)))))
;;; 
;;; ;;======================================================================
;;; ;; S T A T S   G A T H E R I N G
;;; ;;======================================================================
;;; 
;;; (defstruct stat
;;;   (qcount-avg  0)                  ;; coarse running average
;;;   (qtime-avg   0)                  ;; coarse running average
;;;   (qcount      0)                  ;; total
;;;   (qtime       0)                  ;; total
;;;   (last-qcount 0)                  ;; last 
;;;   (last-qtime  0)                  ;; last
;;;   (dbs        '())                 ;; list of db files handled by this node
;;;   (when        0))                 ;; when the last query happened - seconds
;;; 
;;; 
;;; (define (update-stats acfg fname bucket duration numqueries)
;;;   (let* ((key   fname) ;; for now do not use bucket. Was: (conc fname "-" bucket)) ;; lazy but good enough
;;; 	 (stats (or (hash-table-ref/default (area-stats acfg) key #f)
;;; 		    (let ((newstats (make-stat)))
;;; 		      (hash-table-set! (area-stats acfg) key newstats)
;;; 		      newstats))))
;;;     ;; when the last query happended (used to remove the fname from the active list)
;;;     (stat-when-set! stats (current-seconds))
;;;     ;; last values
;;;     (stat-last-qcount-set! stats numqueries)
;;;     (stat-last-qtime-set!  stats duration)
;;;     ;; total over process lifetime
;;;     (stat-qcount-set! stats (+ (stat-qcount stats) numqueries))
;;;     (stat-qtime-set!  stats (+ (stat-qtime  stats) duration))
;;;     ;; coarse average
;;;     (stat-qcount-avg-set! stats (/ (+ (stat-qcount-avg stats) numqueries) 2))
;;;     (stat-qtime-avg-set!  stats (/ (+ (stat-qtime-avg  stats) duration)   2))
;;; 
;;;     ;; here is where we add the stats for a given dbfile
;;;     (if (not (member fname (stat-dbs stats)))
;;; 	(stat-dbs-set! stats (cons fname (stat-dbs stats))))
;;; 
;;;     ))
;;; 
;;; ;;======================================================================
;;; ;; S E R V E R   S T U F F 
;;; ;;======================================================================
;;; 
;;; ;; this does NOT return!
;;; ;;
;;; (define (find-free-port-and-open acfg)
;;;   (let ((port (or (area-port acfg) 3200)))
;;;     (handle-exceptions
;;; 	exn
;;; 	(begin
;;; 	  (print "INFO: cannot bind to port " (rpc:default-server-port) ", trying next port")
;;; 	  (area-port-set! acfg (+ port 1))
;;; 	  (find-free-port-and-open acfg))
;;;       (rpc:default-server-port port)
;;;       (area-port-set! acfg port)
;;;       (tcp-read-timeout 120000)
;;;       ;; ((rpc:make-server (tcp-listen port)) #t)
;;;       (tcp-listen (rpc:default-server-port)
;;;       ))))
;;; 
;;; ;; register this node by putting a packet into the pkts dir.
;;; ;; look for other servers
;;; ;; contact other servers and compile list of servers
;;; ;; there are two types of server
;;; ;;     main servers - dashboards, runners and dedicated servers - need pkt
;;; ;;     passive servers - test executers, step calls, list-runs - no pkt
;;; ;;
;;; (define (register-node acfg hostip port-num)
;;;   ;;(mutex-lock! (area-mutex acfg))
;;;   (let* ((server-type  (area-server-type acfg)) ;; auto, main, passive (no pkt created)
;;; 	 (best-ip      (or hostip (get-my-best-address)))
;;; 	 (mtdir        (area-dbdir acfg))
;;; 	 (pktdir       (area-pktsdir acfg))) ;; conc mtdir "/.server-pkts")))
;;;     (print "Registering node " best-ip ":" port-num)
;;;     (if (not mtdir) ;; require a home for this node to put or find databases
;;; 	#f
;;; 	(begin
;;; 	  (if  (not (directory? pktdir))(create-directory pktdir))
;;; 	  ;; server is started, now create pkt if needed
;;; 	  (print "Starting server in " server-type " mode with port " port-num)
;;; 	  (if (member server-type '(auto main)) ;; TODO: if auto, count number of servers registers, if > 3 then don't put out a pkt
;;; 	      (begin
;;; 		(area-pktid-set! acfg
;;; 				 (write-alist->pkt
;;; 				  pktdir 
;;; 				  `((hostname . ,(get-host-name))
;;; 				    (ipaddr   . ,best-ip)
;;; 				    (port     . ,port-num)
;;; 				    (pid      . ,(current-process-id)))
;;; 				  pktspec: *pktspec*
;;; 				  ptype:   'server))
;;; 		(area-pktfile-set! acfg (conc pktdir "/" (area-pktid acfg) ".pkt"))))
;;; 	  (area-port-set!    acfg port-num)
;;; 	  #;(mutex-unlock! (area-mutex acfg))))))
;;; 
;;; (define *cookie-seqnum* 0)
;;; (define (make-cookie key)
;;;   (set! *cookie-seqnum* (add1 *cookie-seqnum*))
;;;   ;;(print "MAKE COOKIE CALLED -- on "servkey"-"*cookie-seqnum*)
;;;   (conc key "-" *cookie-seqnum*)
;;;   )
;;; 
;;; ;; dispatch locally if possible
;;; ;;
;;; (define (call-deliver-response acfg ipaddr port cookie data)
;;;   (if (and (equal? (area-myaddr acfg) ipaddr)
;;; 	   (equal? (area-port     acfg) port))
;;;       (deliver-response acfg cookie data)
;;;       ((rpc:procedure 'response ipaddr port) cookie data)))
;;; 
;;; (define (deliver-response acfg cookie data)
;;;   (let ((deliver-response-start (current-milliseconds)))
;;;     (thread-start! (make-thread
;;; 		    (lambda ()
;;; 		      (let loop ((tries-left 5))
;;; 			;;(print "TOP OF DELIVER_RESPONSE LOOP; triesleft="tries-left)
;;; 			;;(pp (hash-table->alist (area-cookie2mbox acfg)))
;;; 			(let* ((mbox (hash-table-ref/default (area-cookie2mbox acfg) cookie #f)))
;;; 			  (cond
;;; 			   ((eq? 0 tries-left)
;;; 			    (print "ulex:deliver-response: I give up. Mailbox never appeared. cookie="cookie)
;;; 			    )
;;; 			   (mbox
;;; 			    ;;(print "got mbox="mbox"  got data="data"  send.")
;;; 			    (mailbox-send! mbox data))
;;; 			   (else
;;; 			    ;;(print "no mbox yet.  look for "cookie)
;;; 			    (thread-sleep! (/ (- 6 tries-left) 10))
;;; 			    (loop (sub1 tries-left))))))
;;; 		      ;; (debug-pp (list (conc "ulex:deliver-response took " (- (current-milliseconds) deliver-response-start) " ms, cookie=" cookie " data=") data))
;;; 		      (sdbg> "deliver-response" "mailbox-send" deliver-response-start #f #f cookie)
;;; 		      )
;;; 		    (conc "deliver-response thread for cookie="cookie))))
;;;   #t)
;;; 
;;; ;; action:
;;; ;;   immediate - quick actions, no need to put in queues
;;; ;;   dbwrite   - put in dbwrite queue
;;; ;;   dbread    - put in dbread queue
;;; ;;   oslong    - os actions, e.g. du, that could take a long time
;;; ;;   osshort   - os actions that should be quick, e.g. df
;;; ;;
;;; (define (request acfg from-ipaddr from-port servkey action cookie fname params) ;; std-peer-handler
;;;   ;; NOTE: Use rpc:current-peer for getting return address
;;;   (let* ((std-peer-handler-start (current-milliseconds))
;;; 	 ;; (raw-data               (alist-ref 'data     dat))
;;; 	 (rdat                   (hash-table-ref/default
;;; 				  (area-rtable acfg) action #f)) ;; this looks up the sql query or other details indexed by the action
;;; 	 (witem                  (make-witem ripaddr: from-ipaddr ;; rhost:   from-host   
;;; 					     rport:   from-port   action:  action
;;; 					     rdat:    rdat        cookie:  cookie
;;; 					     servkey: servkey     data:    params ;; TODO - rename data to params
;;; 					     caller:  (rpc:current-peer))))
;;;     (if (not (equal? servkey (area-pktid acfg)))
;;; 	`(#f . ,(conc "I don't know you servkey=" servkey ", pktid=" (area-pktid acfg))) ;; immediately return this
;;; 	(let* ((ctype (if rdat 
;;; 			  (calldat-ctype rdat) ;; is this necessary? these should be identical
;;; 			  action)))
;;; 	  (sdbg> "std-peer-handler" "immediate" std-peer-handler-start #f #f)
;;; 	  (case ctype
;;; 	    ;; (dbwrite acfg rdat (cons from-ipaddr from-port) data)))
;;; 	    ((full-ping)  `(#t  "ack to full ping"        ,(work-queue-add acfg fname witem) ,cookie))
;;; 	    ((response)   `(#t  "ack from requestor"      ,(deliver-response acfg fname params)))
;;; 	    ((dbwrite)    `(#t  "db write submitted"      ,(work-queue-add acfg fname witem) ,cookie))
;;; 	    ((dbread)     `(#t  "db read submitted"       ,(work-queue-add acfg fname witem) ,cookie  ))
;;; 	    ((dbrw)       `(#t  "db read/write submitted" ,cookie))
;;; 	    ((osshort)    `(#t  "os short submitted"      ,cookie))
;;; 	    ((oslong)     `(#t  "os long submitted"       ,cookie))
;;; 	    (else         `(#f  "unrecognised action"     ,ctype)))))))
;;; 
;;; ;; Call this to start the actual server
;;; ;;
;;; ;; start_server
;;; ;;
;;; ;;   mode: '
;;; ;;   handler: proc which takes pktrecieved as argument
;;; ;;
;;; 
;;; (define (start-server acfg)
;;;   (let* ((conn (find-free-port-and-open acfg))
;;; 	 (port (area-port acfg)))
;;;     (rpc:publish-procedure!
;;;      'delist-db
;;;      (lambda (fname)
;;;        (hash-table-delete! (area-dbs acfg) fname)))
;;;     (rpc:publish-procedure!
;;;      'calling-addr
;;;      (lambda ()
;;;        (rpc:current-peer)))
;;;     (rpc:publish-procedure!
;;;      'ping
;;;      (lambda ()(real-ping acfg)))
;;;     (rpc:publish-procedure!
;;;      'request
;;;      (lambda (from-addr from-port servkey action cookie dbname params)
;;;        (request acfg from-addr from-port servkey action cookie dbname params)))
;;;     (rpc:publish-procedure!
;;;      'response
;;;      (lambda (cookie res-dat)
;;;        (deliver-response acfg cookie res-dat)))
;;;     (area-ready-set! acfg #t)
;;;     (area-conn-set! acfg conn)
;;;     ((rpc:make-server conn) #f)));; ((tcp-listen (rpc:default-server-port)) #t)
;;; 
;;; 
;;; (define (launch acfg) ;;  #!optional (proc std-peer-handler))
;;;   (print "starting launch")
;;;   (update-known-servers acfg) ;; gotta do this on every start (thus why limit number of publicised servers)
;;;   #;(let ((original-handler (current-exception-handler))) ;; is th
;;;     (lambda (exception)
;;;       (server-exit-procedure)
;;;       (original-handler exception)))
;;;   (on-exit (lambda ()
;;; 	     (shutdown acfg))) ;; (finalize-all-db-handles acfg)))
;;;   ;; set up the rpc handler
;;;   (let* ((th1  (make-thread
;;; 		(lambda ()(start-server acfg))
;;; 		"server thread"))
;;; 	 (th2   (make-thread
;;; 		 (lambda ()
;;; 		   (print "th2 starting")
;;; 		   (let loop ()
;;; 		     (work-queue-processor acfg)
;;; 		     (print "work-queue-processor crashed!")
;;; 		     (loop)))
;;; 		 "work queue thread")))
;;;     (thread-start! th1)
;;;     (thread-start! th2)
;;;     (let loop ()
;;;       (thread-sleep! 0.025)
;;;       (if (area-ready acfg)
;;; 	  #t
;;; 	  (loop)))
;;;     ;; attempt to fix my address
;;;     (let* ((all-addr (get-all-ips-sorted)))	     ;; could use (tcp-addresses conn)?
;;;       (let loop ((rem-addrs all-addr))
;;; 	(if (null? rem-addrs)
;;; 	    (begin
;;; 	      (print "ERROR: Failed to figure out the ip address of myself as a server. Giving up.")
;;; 	      (exit 1)) ;; BUG Changeme to raising an exception
;;; 		
;;; 	    (let* ((addr      (car rem-addrs))
;;; 		   (good-addr (handle-exceptions
;;; 				  exn
;;; 				  #f
;;; 				((rpc:procedure 'calling-addr addr (area-port acfg))))))
;;; 	      (if good-addr
;;; 		  (begin
;;; 		    (print "Got good-addr of " good-addr)
;;; 		    (area-myaddr-set! acfg good-addr))
;;; 		  (loop (cdr rem-addrs)))))))
;;;     (register-node acfg (area-myaddr acfg)(area-port acfg))
;;;     (print "INFO: Server started on " (area-myaddr acfg) ":" (area-port acfg))
;;;     ;; (update-known-servers acfg) ;; gotta do this on every start (thus why limit number of publicised servers)
;;;     ))
;;; 
;;; (define (clear-server-pkt acfg)
;;;   (let ((pktf (area-pktfile acfg)))
;;;     (if pktf (delete-file* pktf))))
;;; 
;;; (define (shutdown acfg)
;;;   (let (;;(conn (area-conn    acfg))
;;; 	(pktf (area-pktfile acfg))
;;; 	(port (area-port    acfg)))
;;;     (if pktf (delete-file* pktf))
;;;     (send-all "imshuttingdown")
;;;     ;; (rpc:close-all-connections!) ;; don't know if this is actually needed
;;;     (finalize-all-db-handles acfg)))
;;; 
;;; (define (send-all msg)
;;;   #f)
;;; 
;;; ;; given a area record look up all the packets
;;; ;;
;;; (define (get-all-server-pkts acfg)
;;;   (let ((all-pkt-files (glob (conc (area-pktsdir acfg) "/*.pkt"))))
;;;     (map (lambda (pkt-file)
;;; 	   (read-pkt->alist pkt-file pktspec: *pktspec*))
;;; 	 all-pkt-files)))
;;; 
;;; #;((Z . "9a0212302295a19610d5796fce0370fa130758e9")
;;;   (port . "34827")
;;;   (pid . "28748")
;;;   (hostname . "zeus")
;;;   (T . "server")
;;;   (D . "1549427032.0"))
;;; 
;;; #;(define (get-my-best-address)
;;;   (let ((all-my-addresses (get-all-ips))) ;; (vector->list (hostinfo-addresses (hostname->hostinfo (get-host-name))))))
;;;     (cond
;;;      ((null? all-my-addresses)
;;;       (get-host-name))                                          ;; no interfaces?
;;;      ((eq? (length all-my-addresses) 1)
;;;       (ip->string (car all-my-addresses)))                      ;; only one to choose from, just go with it
;;;      (else 
;;;       (ip->string (car (filter (lambda (x)                      ;; take any but 127.
;;; 				 (not (eq? (u8vector-ref x 0) 127)))
;;; 			       all-my-addresses)))))))
;;; 
;;; ;; whoami? I am my pkt
;;; ;;
;;; (define (whoami? acfg)
;;;   (hash-table-ref/default (area-hosts acfg)(area-pktid acfg) #f))
;;; 
;;; ;;======================================================================
;;; ;; "Client side" operations
;;; ;;======================================================================
;;; 
;;; (define (safe-call call-key host port . params)
;;;   (handle-exceptions
;;;    exn
;;;    (begin
;;;      (print "Call " call-key " to " host ":" port " failed")
;;;      #f)
;;;    (apply (rpc:procedure call-key host port) params)))
;;; 
;;; ;; ;; convert to/from string / sexpr
;;; ;; 
;;; ;; (define (string->sexpr str)
;;; ;;   (if (string? str)
;;; ;;       (with-input-from-string str read)
;;; ;;       str))
;;; ;; 
;;; ;; (define (sexpr->string s)
;;; ;;   (with-output-to-string (lambda ()(write s))))
;;; 
;;; ;; is the server alive?
;;; ;;
;;; (define (ping acfg host port)
;;;   (let* ((myaddr     (area-myaddr acfg))
;;; 	 (myport     (area-port   acfg))
;;; 	 (start-time (current-milliseconds))
;;; 	 (res        (if (and (equal? myaddr host)
;;; 			      (equal? myport port))
;;; 			 (real-ping acfg)
;;; 			 ((rpc:procedure 'ping host port)))))
;;;     (cons (- (current-milliseconds) start-time)
;;; 	  res)))
;;; 
;;; ;; returns ( ipaddr port alist-fname=>randnum )
;;; (define (real-ping acfg)
;;;   `(,(area-myaddr acfg) ,(area-port acfg) ,(get-host-stats acfg)))
;;; 
;;; ;; is the server alive AND the queues processing?
;;; ;;
;;; #;(define (full-ping acfg servpkt)
;;;   (let* ((start-time (current-milliseconds))
;;; 	 (res        (send-message acfg servpkt '(full-ping) 'full-ping)))
;;;     (cons (- (current-milliseconds) start-time)
;;; 	  res))) ;; (equal? res "got ping"))))
;;; 
;;; 
;;; ;; look up all pkts and get the server id (the hash), port, host/ip
;;; ;; store this info in acfg
;;; ;; return the number of responsive servers found
;;; ;;
;;; ;; DO NOT VERIFY THAT THE SERVER IS ALIVE HERE. This is called at times where the current server is not yet alive and cannot ping itself
;;; ;;
;;; (define (update-known-servers acfg)
;;;   ;; readll all pkts
;;;   ;; foreach pkt; if it isn't me ping the server; if alive, add to hosts hash, else rm the pkt
;;;   (let* ((start-time (current-milliseconds))
;;; 	 (all-pkts  (delete-duplicates
;;; 		     (append (get-all-server-pkts acfg)
;;; 			     (hash-table-values (area-hosts acfg)))))
;;; 	 (hostshash (area-hosts acfg))
;;; 	 (my-id     (area-pktid acfg))
;;; 	 (pktsdir   (area-pktsdir acfg)) ;; needed to remove pkts from non-responsive servers
;;; 	 (numsrvs   0)
;;; 	 (delpkt    (lambda (pktsdir sid)
;;; 		      (print "clearing out server " sid)
;;; 		      (delete-file* (conc pktsdir "/" sid ".pkt"))
;;; 		      (hash-table-delete! hostshash sid))))
;;;     (area-last-srvup-set! acfg (current-seconds))
;;;     (for-each
;;;      (lambda (servpkt)
;;;        (if (list? servpkt)
;;; 	   ;; (pp servpkt)
;;; 	   (let* ((shost (alist-ref 'ipaddr servpkt))
;;; 		  (sport (any->number (alist-ref 'port servpkt)))
;;; 		  (res   (handle-exceptions
;;; 			  exn
;;; 			  (begin
;;; 			    ;; (print "INFO: bad server on " shost ":" sport)
;;; 			    #f)
;;; 			  (ping acfg shost sport)))
;;; 		  (sid   (alist-ref 'Z servpkt)) ;; Z code is our name for the server
;;; 		  (url   (conc shost ":" sport))
;;; 		  )
;;; 	     #;(if (or (not res)
;;; 		     (null? res))
;;; 		 (begin
;;; 		   (print "STRANGE: ping of " url " gave " res)))
;;; 	     
;;; 	     ;; (print "Got " res " from " shost ":" sport)
;;; 	     (match res
;;; 		    ((qduration . payload)
;;; 		     ;; (print "Server pkt:" (alist-ref 'ipaddr servpkt) ":" (alist-ref 'port servpkt)
;;; 		     ;;        (if payload
;;; 		     ;;            "Success" "Fail"))
;;; 		     (match payload
;;; 			    ((host port stats)
;;; 			     ;; (print "From " host ":" port " got stats: " stats)
;;; 			     (if (and host port stats)
;;; 				 (let ((url (conc host ":" port)))
;;; 				   (hash-table-set! hostshash sid servpkt)
;;; 				   ;; store based on host:port
;;; 				   (hash-table-set! (area-hoststats acfg) sid stats))
;;; 				 (print "missing data from the server, not sure what that means!"))
;;; 			     (set! numsrvs (+ numsrvs 1)))
;;; 			    (#f
;;; 			     (print "Removing pkt " sid " due to #f from server or failed ping")
;;; 			     (delpkt pktsdir sid))
;;; 			    (else
;;; 			     (print "Got ")(pp res)(print " from server ")(pp servpkt) " but response did not match (#f/#t . msg)")))
;;; 		    (else
;;; 		     ;; here we delete the pkt - can't reach the server, remove it
;;; 		     ;; however this logic is inadequate. we should mark the server as checked
;;; 		     ;; and not good, if it happens a second time - then remove the pkt
;;; 		     ;; or something similar. I.e. don't be too quick to assume the server is wedged or dead
;;; 		     ;; could be it is simply too busy to reply
;;; 		     (let ((bad-pings (hash-table-ref/default (area-health acfg) url 0)))
;;; 		       (if (> bad-pings 1) ;; two bad pings - remove pkt
;;; 			   (begin
;;; 			     (print "INFO: " bad-pings " bad responses from " url ", deleting pkt " sid)
;;; 			     (delpkt pktsdir sid))
;;; 			   (begin
;;; 			     (print "INFO: " bad-pings " bad responses from " shost ":" sport " not deleting pkt yet")
;;; 			     (hash-table-set! (area-health acfg)
;;; 					      url
;;; 					      (+ (hash-table-ref/default (area-health acfg) url 0) 1))
;;; 			     ))
;;; 		       ))))
;;; 	   ;; servpkt is not actually a pkt?
;;; 	   (begin
;;; 	     (print "Bad pkt " servpkt))))
;;;      all-pkts)
;;;     (sdbg> "update-known-servers" "end" start-time #f #f " found " numsrvs
;;; 	   " servers, pkts: " (map (lambda (p)
;;; 				     (alist-ref 'Z p))
;;; 				   all-pkts))
;;;     numsrvs))
;;; 
;;; (defstruct srvstat
;;;   (numfiles 0)   ;; number of db files handled by this server - subtract 1 for the db being currently looked at
;;;   (randnum  #f)  ;; tie breaker number assigned to by the server itself - applies only to the db under consideration
;;;   (pkt      #f)) ;; the server pkt
;;; 
;;; ;;(define (srv->srvstat srvpkt)
;;;   
;;; ;; Get the server best for given dbname and key
;;; ;;
;;; ;;   NOTE: key is not currently used. The key points to the kind of query, this may be useful for directing read-only queries.
;;; ;;
;;; (define (get-best-server acfg dbname key)
;;;   (let* (;; (servers (hash-table-values (area-hosts acfg)))
;;; 	 (servers     (area-hosts acfg))
;;; 	 (skeys       (sort (hash-table-keys servers) string>=?)) ;; a stable listing
;;; 	 (start-time  (current-milliseconds))
;;; 	 (srvstats    (make-hash-table))  ;; srvid => srvstat
;;; 	 (url         (conc (area-myaddr acfg) ":" (area-port acfg))))
;;;     ;; (print "scores for " dbname ": " (map (lambda (k)(cons k (calc-server-score acfg dbname k))) skeys))
;;;     (if (null? skeys)
;;; 	(if (> (update-known-servers acfg) 0)
;;; 	    (get-best-server acfg dbname key) ;; some risk of infinite loop here, TODO add try counter
;;; 	    (begin
;;; 	      (print "ERROR: no server found!") ;; since this process is also a server this should never happen
;;; 	      #f))
;;; 	(begin
;;; 	  ;; (print "in get-best-server with skeys=" skeys)
;;; 	  (if (> (- (current-seconds) (area-last-srvup acfg)) 10)
;;; 	      (begin
;;; 		(update-known-servers acfg)
;;; 		(sdbg> "get-best-server" "update-known-servers" start-time #f #f)))
;;; 
;;; 	  ;; for each server look at the list of dbfiles, total number of dbs being handled
;;; 	  ;; and the rand number, save the best host
;;; 	  ;; also do a delist-db for each server dbfile not used
;;; 	  (let* ((best-server       #f)
;;; 		 (servers-to-delist (make-hash-table)))
;;; 	    (for-each
;;; 	     (lambda (srvid)
;;; 	       (let* ((server    (hash-table-ref/default servers srvid #f))
;;; 		      (stats     (hash-table-ref/default (area-hoststats acfg) srvid '(()))))
;;; 		 ;; (print "stats: " stats)
;;;  		 (if server
;;; 		     (let* ((dbweights (car stats))
;;; 			    (srvload   (length (filter (lambda (x)(not (equal? dbname (car x)))) dbweights)))
;;; 			    (dbrec     (alist-ref dbname dbweights equal?))  ;; get the pair with fname . randscore
;;; 			    (randnum   (if dbrec
;;; 					   dbrec ;; (cdr dbrec)
;;; 					   0)))
;;; 		       (hash-table-set! srvstats srvid (make-srvstat numfiles: srvload randnum: randnum pkt: server))))))
;;; 	     skeys)
;;; 	    
;;; 	    (let* ((sorted    (sort (hash-table-values srvstats) 
;;; 				    (lambda (a b)
;;; 				      (let ((numfiles-a (srvstat-numfiles a))
;;; 					    (numfiles-b (srvstat-numfiles b))
;;; 					    (randnum-a  (srvstat-randnum a))
;;; 					    (randnum-b  (srvstat-randnum b)))
;;; 					(if (< numfiles-a numfiles-b) ;; Note, I don't think adding an offset works here. Goal was only move file handling to a different server if it has 2 less
;;; 					    #t
;;; 					    (if (and (equal? numfiles-a numfiles-b)
;;; 						     (< randnum-a randnum-b))
;;; 						#t
;;; 						#f))))))
;;; 		   (best      (if (null? sorted)
;;; 				  (begin
;;; 				    (print "ERROR: should never be null due to self as server.")
;;; 				    #f)
;;; 				  (srvstat-pkt (car sorted)))))
;;; 	      #;(print "SERVER(" url "): " dbname ": " (map (lambda (srv)
;;; 							    (let ((p (srvstat-pkt srv)))
;;; 							      (conc (alist-ref 'ipaddr p) ":" (alist-ref 'port p)
;;; 								    "(" (srvstat-numfiles srv)","(srvstat-randnum srv)")")))
;;; 							    sorted))
;;; 	      best))))))
;;;     
;;;     ;; send out an "I'm about to exit notice to all known servers"
;;;     ;;
;;; (define (death-imminent acfg)
;;;   '())
;;; 
;;; ;;======================================================================
;;; ;; U L E X  -  T H E   I N T E R E S T I N G   S T U F F ! !
;;; ;;======================================================================
;;; 
;;; ;; register a handler
;;; ;;   NOTES:
;;; ;;     dbinitsql   is reserved for a list of sql statements for initializing the db
;;; ;;     dbinitfn    is reserved for a db init function, if exists called after dbinitsql
;;; ;;     
;;; (define (register acfg key obj #!optional (ctype 'dbwrite))
;;;   (let ((ht (area-rtable acfg)))
;;;     (if (hash-table-exists? ht key)
;;; 	(print "WARNING: redefinition of entry " key))
;;;     (hash-table-set! ht key (make-calldat obj: obj ctype: ctype))))
;;; 
;;; ;; usage: register-batch acfg '((key1 . sql1) (key2 . sql2) ... )
;;; ;; NB// obj is often an sql query
;;; ;;
;;; (define (register-batch acfg ctype data)
;;;   (let ((ht (area-rtable acfg)))
;;;     (map (lambda (dat)
;;; 	   (hash-table-set! ht (car dat)(make-calldat obj: (cdr dat) ctype: ctype)))
;;; 	 data)))
;;; 
;;; (define (initialize-area-calls-from-specfile area specfile)
;;;   (let* ((callspec (with-input-from-file specfile read )))
;;;     (for-each (lambda (group)
;;;                 (register-batch
;;;                  area
;;;                  (car group)
;;;                  (cdr group)))
;;;               callspec)))
;;; 
;;; ;; get-rentry
;;; ;;
;;; (define (get-rentry acfg key)
;;;   (hash-table-ref/default (area-rtable acfg) key #f))
;;; 
;;; (define (get-rsql acfg key)
;;;   (let ((cdat (get-rentry acfg key)))
;;;     (if cdat
;;; 	(calldat-obj cdat)
;;; 	#f)))
;;; 
;;; 
;;; 
;;; ;; blocking call:
;;; ;;    client                         server
;;; ;;    ------                         ------
;;; ;;    call()
;;; ;;    send-message()
;;; ;;    nmsg-send()
;;; ;;                                   nmsg-receive()
;;; ;;                                   nmsg-respond(ack,cookie)
;;; ;;    ack, cookie
;;; ;;    mbox-thread-wait(cookie)
;;; ;;                                   nmsg-send(client,cookie,result)
;;; ;;        nmsg-respond(ack)
;;; ;;        return result
;;; ;;
;;; ;; reserved action:
;;; ;;    'immediate
;;; ;;    'dbinitsql
;;; ;;
;;; (define (call acfg dbname action params #!optional (count 0))
;;;   (let* ((call-start-time     (current-milliseconds))
;;; 	 (srv                 (get-best-server acfg dbname action))
;;; 	 (post-get-start-time (current-milliseconds))
;;; 	 (rdat                (hash-table-ref/default (area-rtable acfg) action #f))
;;; 	 (myid                (trim-pktid (area-pktid acfg)))
;;; 	 (srvid               (trim-pktid (alist-ref 'Z srv)))
;;; 	 (cookie              (make-cookie myid)))
;;;     (sdbg> "call" "get-best-server" call-start-time #f call-start-time " from: " myid " to server: " srvid " for " dbname " action: " action " params: " params " rdat: " rdat)
;;;     (print "INFO: call to " (alist-ref 'ipaddr srv) ":" (alist-ref 'port srv) " from " (area-myaddr acfg) ":" (area-port acfg) " for " dbname)
;;;     (if (and srv rdat) ;; need both to dispatch a request
;;; 	(let* ((ripaddr  (alist-ref 'ipaddr srv))
;;; 	       (rsrvid   (alist-ref 'Z srv))
;;; 	       (rport    (any->number (alist-ref 'port   srv)))
;;; 	       (res-full (if (and (equal? ripaddr (area-myaddr acfg))
;;; 				  (equal? rport   (area-port acfg)))
;;; 			     (request acfg ripaddr rport (area-pktid acfg) action cookie dbname params)
;;; 			     (safe-call 'request ripaddr rport
;;; 					(area-myaddr acfg)
;;; 					(area-port   acfg)
;;; 					#;(area-pktid acfg)
;;; 					rsrvid
;;; 					action cookie dbname params))))
;;; 	  ;; (print "res-full: " res-full)
;;; 	  (match res-full
;;; 	    ((response-ok response-msg rem ...)
;;; 	     (let* ((send-message-time (current-milliseconds))
;;; 		    ;; (match res-full
;;; 		    ;;  ((response-ok response-msg)
;;; 		    ;; (response-ok  (car res-full))
;;; 		    ;; (response-msg (cadr res-full)
;;; 		    )
;;; 	       ;; (res (take res-full 3))) ;; ctype == action, TODO: converge on one term <<=== what was this? BUG 
;;; 	       ;; (print "ulex:call: send-message took " (- send-message-time post-get-start-time) " ms params=" params)
;;; 	       (sdbg> "call" "send-message" post-get-start-time #f call-start-time)
;;; 	       (cond
;;; 		((not response-ok) #f)
;;; 		((member response-msg '("db read submitted" "db write submitted"))
;;; 		 (let* ((cookie-id   (cadddr res-full))
;;; 			(mbox        (make-mailbox))
;;; 			(mbox-time   (current-milliseconds)))
;;; 		   (hash-table-set! (area-cookie2mbox acfg) cookie-id mbox)
;;; 		   (let* ((mbox-timeout-secs    20)
;;; 			  (mbox-timeout-result 'MBOX_TIMEOUT)
;;; 			  (res                  (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result))
;;; 			  (mbox-receive-time    (current-milliseconds)))
;;; 		     (hash-table-delete! (area-cookie2mbox acfg) cookie-id)
;;; 		     (sdbg> "call" "mailbox-receive" mbox-time #f call-start-time " from: " myid " to server: " srvid " for " dbname)
;;; 		     ;; (print "ulex:call mailbox-receive took " (- mbox-receive-time mbox-time) "ms params=" params)
;;; 		     res)))
;;; 		(else
;;; 		 (print "Unhandled response \""response-msg"\"")
;;; 		 #f))
;;; 	       ;; depending on what action (i.e. ctype) is we will block here waiting for
;;; 	       ;; all the data (mechanism to be determined)
;;; 	       ;;
;;; 	       ;; if res is a "working on it" then wait
;;; 	       ;;    wait for result
;;; 	       ;; mailbox thread wait on 
;;; 	       
;;; 	       ;; if res is a "can't help you" then try a different server
;;; 	       ;; if res is a "ack" (e.g. for one-shot requests) then return res
;;; 	       ))
;;; 	    (else
;;; 	     (if (< count 10)
;;; 		 (let* ((url (conc (alist-ref 'ipaddr srv) ":" (alist-ref 'port srv))))
;;; 		   (thread-sleep! 1)
;;; 		   (print "ERROR: Bad result from " url ", dbname: " dbname ", action: " action ", params: " params ". Trying again in 1 second.")
;;; 		   (call acfg dbname action params (+ count 1)))
;;; 		 (begin
;;; 		   (error (conc "ERROR: " count " tries, still have improper response res-full=" res-full)))))))
;;; 	(begin
;;; 	  (if (not rdat)
;;; 	      (print "ERROR: action " action " not registered.")
;;; 	      (if (< count 10)
;;; 		 (begin
;;; 		   (thread-sleep! 1)
;;; 		   (area-hosts-set! acfg (make-hash-table)) ;; clear out all known hosts
;;; 		   (print "ERROR: no server found, srv=" srv ", trying again in 1 seconds")
;;; 		   (call acfg dbname action params (+ count 1)))
;;; 		 (begin
;;; 		   (error (conc "ERROR: no server found after 10 tries, srv=" srv ", giving up."))
;;; 		   #;(error "No server available"))))))))
;;; 
;;; 
;;; ;;======================================================================
;;; ;; U T I L I T I E S 
;;; ;;======================================================================
;;; 
;;; ;; get a signature for identifing this process
;;; ;;
;;; (define (get-process-signature)
;;;   (cons (get-host-name)(current-process-id)))
;;; 
;;; ;;======================================================================
;;; ;; S Y S T E M   S T U F F
;;; ;;======================================================================
;;; 
;;; ;; get normalized cpu load by reading from /proc/loadavg and
;;; ;; /proc/cpuinfo return all three values and the number of real cpus
;;; ;; and the number of threads returns alist '((adj-cpu-load
;;; ;; . normalized-proc-load) ... etc.  keys: adj-proc-load,
;;; ;; adj-core-load, 1m-load, 5m-load, 15m-load
;;; ;;
;;; (define (get-normalized-cpu-load)
;;;   (let ((res (get-normalized-cpu-load-raw))
;;; 	(default `((adj-proc-load . 2) ;; there is no right answer
;;; 		   (adj-core-load . 2)
;;; 		   (1m-load       . 2)
;;; 		   (5m-load       . 0) ;; causes a large delta - thus causing default of throttling if stuff goes wrong
;;; 		   (15m-load      . 0)
;;; 		   (proc          . 1)
;;; 		   (core          . 1)
;;; 		   (phys          . 1)
;;; 		   (error         . #t))))
;;;     (cond
;;;      ((and (list? res)
;;; 	   (> (length res) 2))
;;;       res)
;;;      ((eq? res #f)   default) ;; add messages?
;;;      ((eq? res #f) default)   ;; this would be the #eof
;;;      (else default))))
;;; 
;;; (define (get-normalized-cpu-load-raw)
;;;   (let* ((actual-host           (get-host-name))) ;; #f is localhost
;;;     (let ((data  (append 
;;; 		  (with-input-from-file "/proc/loadavg" read-lines)
;;; 		  (with-input-from-file "/proc/cpuinfo" read-lines)
;;; 		  (list "end")))
;;; 	  (load-rx  (regexp "^([\\d\\.]+)\\s+([\\d\\.]+)\\s+([\\d\\.]+)\\s+.*$"))
;;; 	  (proc-rx  (regexp "^processor\\s+:\\s+(\\d+)\\s*$"))
;;; 	  (core-rx  (regexp "^core id\\s+:\\s+(\\d+)\\s*$"))
;;; 	  (phys-rx  (regexp "^physical id\\s+:\\s+(\\d+)\\s*$"))
;;; 	  (max-num  (lambda (p n)(max (string->number p) n))))
;;;       ;; (print "data=" data)
;;;       (if (null? data) ;; something went wrong
;;; 	  #f
;;; 	  (let loop ((hed      (car data))
;;; 		     (tal      (cdr data))
;;; 		     (loads    #f)
;;; 		     (proc-num 0)  ;; processor includes threads
;;; 		     (phys-num 0)  ;; physical chip on motherboard
;;; 		     (core-num 0)) ;; core
;;; 	    ;; (print hed ", " loads ", " proc-num ", " phys-num ", " core-num)
;;; 	    (if (null? tal) ;; have all our data, calculate normalized load and return result
;;; 		(let* ((act-proc (+ proc-num 1))
;;; 		       (act-phys (+ phys-num 1))
;;; 		       (act-core (+ core-num 1))
;;; 		       (adj-proc-load (/ (car loads) act-proc))
;;; 		       (adj-core-load (/ (car loads) act-core))
;;; 		       (result
;;; 			(append (list (cons 'adj-proc-load adj-proc-load)
;;; 				      (cons 'adj-core-load adj-core-load))
;;; 				(list (cons '1m-load (car loads))
;;; 				      (cons '5m-load (cadr loads))
;;; 				      (cons '15m-load (caddr loads)))
;;; 				(list (cons 'proc act-proc)
;;; 				      (cons 'core act-core)
;;; 				      (cons 'phys act-phys)))))
;;; 		  result)
;;; 		(regex-case
;;; 		    hed
;;; 		  (load-rx  ( x l1 l5 l15 ) (loop (car tal)(cdr tal)(map string->number (list l1 l5 l15)) proc-num phys-num core-num))
;;; 		  (proc-rx  ( x p         ) (loop (car tal)(cdr tal) loads           (max-num p proc-num) phys-num core-num))
;;; 		  (phys-rx  ( x p         ) (loop (car tal)(cdr tal) loads           proc-num (max-num p phys-num) core-num))
;;; 		  (core-rx  ( x c         ) (loop (car tal)(cdr tal) loads           proc-num phys-num (max-num c core-num)))
;;; 		  (else 
;;; 		   (begin
;;; 		     ;; (print "NO MATCH: " hed)
;;; 		     (loop (car tal)(cdr tal) loads proc-num phys-num core-num))))))))))
;;; 
;;; (define (get-host-stats acfg)
;;;   (let ((stats-hash (area-stats acfg)))
;;;     ;; use this opportunity to remove references to dbfiles which have not been accessed in a while
;;;     (for-each
;;;      (lambda (dbname)
;;;        (let* ((stats       (hash-table-ref stats-hash dbname))
;;; 	      (last-access (stat-when stats)))
;;; 	 (if (and (> last-access 0)                             ;; if zero then there has been no access
;;; 		  (> (- (current-seconds) last-access) 10))     ;; not used in ten seconds
;;; 	     (begin
;;; 	       (print "Removing " dbname " from stats list")
;;; 	       (hash-table-delete! stats-hash dbname) ;; remove from stats hash
;;; 	       (stat-dbs-set! stats (hash-table-keys stats))))))
;;;      (hash-table-keys stats-hash))
;;;     
;;;     `(,(hash-table->alist (area-dbs acfg)) ;; dbname => randnum
;;;       ,(map (lambda (dbname)  ;; dbname is the db name
;;; 	      (cons dbname (stat-when (hash-table-ref stats-hash dbname))))
;;; 	    (hash-table-keys stats-hash))
;;;       (cpuload . ,(get-normalized-cpu-load)))))
;;;     #;(stats   . ,(map (lambda (k) ;; create an alist from the stats data
;;; 		       (cons k (stat->alist (hash-table-ref (area-stats acfg) k))))
;;; 		     (hash-table-keys (area-stats acfg))))
;;; 
;;; #;(trace
;;;  ;; assv
;;;  ;; cdr
;;;  ;; caar
;;;  ;; ;; cdr
;;;  ;; call
;;;  ;; finalize-all-db-handles
;;;  ;; get-all-server-pkts
;;;  ;; get-normalized-cpu-load
;;;  ;; get-normalized-cpu-load-raw
;;;  ;; launch
;;;  ;; nmsg-send
;;;  ;; process-db-queries
;;;  ;; receive-message
;;;  ;; std-peer-handler
;;;  ;; update-known-servers
;;;  ;; work-queue-processor
;;;  )
;;; 
;;; ;;======================================================================
;;; ;; netutil
;;; ;;   move this back to ulex-netutil.scm someday?
;;; ;;======================================================================
;;; 
;;; ;; #include <stdio.h>
;;; ;; #include <netinet/in.h>
;;; ;; #include <string.h>
;;; ;; #include <arpa/inet.h>
;;; 
;;; (foreign-declare "#include \"sys/types.h\"")
;;; (foreign-declare "#include \"sys/socket.h\"")
;;; (foreign-declare "#include \"ifaddrs.h\"")
;;; (foreign-declare "#include \"arpa/inet.h\"")
;;; 
;;; ;; get IP addresses from ALL interfaces
;;; (define get-all-ips
;;;   (foreign-safe-lambda* scheme-object ()
;;;     "
;;; 
;;; // from https://stackoverflow.com/questions/17909401/linux-c-get-default-interfaces-ip-address :
;;; 
;;; 
;;;     C_word lst = C_SCHEME_END_OF_LIST, len, str, *a;
;;; //    struct ifaddrs *ifa, *i;
;;; //    struct sockaddr *sa;
;;; 
;;;     struct ifaddrs * ifAddrStruct = NULL;
;;;     struct ifaddrs * ifa = NULL;
;;;     void * tmpAddrPtr = NULL;
;;; 
;;;     if ( getifaddrs(&ifAddrStruct) != 0)
;;;       C_return(C_SCHEME_FALSE);
;;; 
;;; //    for (i = ifa; i != NULL; i = i->ifa_next) {
;;;     for (ifa = ifAddrStruct; ifa != NULL; ifa = ifa->ifa_next) {
;;;         if (ifa->ifa_addr->sa_family==AF_INET) { // Check it is
;;;             // a valid IPv4 address
;;;             tmpAddrPtr = &((struct sockaddr_in *)ifa->ifa_addr)->sin_addr;
;;;             char addressBuffer[INET_ADDRSTRLEN];
;;;             inet_ntop(AF_INET, tmpAddrPtr, addressBuffer, INET_ADDRSTRLEN);
;;; //            printf(\"%s IP Address %s\\n\", ifa->ifa_name, addressBuffer);
;;;             len = strlen(addressBuffer);
;;;             a = C_alloc(C_SIZEOF_PAIR + C_SIZEOF_STRING(len));
;;;             str = C_string(&a, len, addressBuffer);
;;;             lst = C_a_pair(&a, str, lst);
;;;         } 
;;; 
;;; //        else if (ifa->ifa_addr->sa_family==AF_INET6) { // Check it is
;;; //            // a valid IPv6 address
;;; //            tmpAddrPtr = &((struct sockaddr_in6 *)ifa->ifa_addr)->sin6_addr;
;;; //            char addressBuffer[INET6_ADDRSTRLEN];
;;; //            inet_ntop(AF_INET6, tmpAddrPtr, addressBuffer, INET6_ADDRSTRLEN);
;;; ////            printf(\"%s IP Address %s\\n\", ifa->ifa_name, addressBuffer);
;;; //            len = strlen(addressBuffer);
;;; //            a = C_alloc(C_SIZEOF_PAIR + C_SIZEOF_STRING(len));
;;; //            str = C_string(&a, len, addressBuffer);
;;; //            lst = C_a_pair(&a, str, lst);
;;; //       }
;;; 
;;; //       else {
;;; //         printf(\" not an IPv4 address\\n\");
;;; //       }
;;; 
;;;     }
;;; 
;;;     freeifaddrs(ifa);
;;;     C_return(lst);
;;; 
;;; "))
;;; 
;;; ;; Change this to bias for addresses with a reasonable broadcast value?
;;; ;;
;;; (define (ip-pref-less? a b)
;;;   (let* ((rate (lambda (ipstr)
;;;                  (regex-case ipstr
;;;                              ( "^127\\." _ 0 )
;;;                              ( "^(10\\.0|192\\.168\\.)\\..*" _ 1 )
;;;                              ( else 2 ) ))))
;;;     (< (rate a) (rate b))))
;;;   
;;; 
;;; (define (get-my-best-address)
;;;   (let ((all-my-addresses (get-all-ips))
;;;         ;;(all-my-addresses-old (vector->list (hostinfo-addresses (hostname->hostinfo (get-host-name)))))
;;;         )
;;;     (cond
;;;      ((null? all-my-addresses)
;;;       (get-host-name))                                          ;; no interfaces?
;;;      ((eq? (length all-my-addresses) 1)
;;;       (car all-my-addresses))                      ;; only one to choose from, just go with it
;;;      
;;;      (else
;;;       (car (sort all-my-addresses ip-pref-less?)))
;;;      ;; (else 
;;;      ;;  (ip->string (car (filter (lambda (x)                      ;; take any but 127.
;;;      ;;    			 (not (eq? (u8vector-ref x 0) 127)))
;;;      ;;    		       all-my-addresses))))
;;; 
;;;      )))
;;; 
;;; (define (get-all-ips-sorted)
;;;   (sort (get-all-ips) ip-pref-less?))
;;; 
;;;