Index: Makefile
==================================================================
--- Makefile
+++ Makefile
@@ -30,11 +30,11 @@
# Configuration stuff
transport-flavor :
@echo Creating transport-flavor with full as flavor. Options include: full, simple
echo full > transport-flavor
-ulex.scm dbmgrmod.scm : ulex.scm.template dbmgrmod.scm.template transport-flavor
+ulex.scm dbmgrmod.scm : ulex.scm.template dbmgrmod.scm.template transport-flavor ulex-*/*scm
./configure
# module source files
MSRCFILES = autoload.scm dbi.scm ducttape-lib.scm pkts.scm stml2.scm \
cookie.scm mutils.scm mtargs.scm apimod.scm ulex.scm \
Index: configure
==================================================================
--- configure
+++ configure
@@ -14,10 +14,12 @@
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Megatest. If not, see .
+
+# Flavors include: simple, full and none
# look at build.config (not a version controlled file and
# create ulex.scm and dbmgr.scm
if [[ -e transport-flavor ]];then
Index: dcommon.scm
==================================================================
--- dcommon.scm
+++ dcommon.scm
@@ -26,10 +26,11 @@
(declare (uses commonmod))
(declare (uses configfmod))
(declare (uses rmtmod))
(declare (uses mtargs))
(declare (uses testsmod))
+(declare (uses dbmgrmod))
(module dcommon
*
(import scheme
@@ -62,10 +63,11 @@
srfi-1
)
(import mtver
dbmod
+ dbmgrmod
commonmod
debugprint
configfmod
rmtmod
;; gutils
Index: rmtmod.scm
==================================================================
--- rmtmod.scm
+++ rmtmod.scm
@@ -1191,11 +1191,11 @@
(if (and no-hurry (debug:debug-mode 18))
(rmt:print-db-stats))
(let ((th1 (make-thread
(lambda () ;; thread for cleaning up, give it five seconds
(let* ((start-time (current-seconds)))
- (if *db-serv-info*
+ #;(if *db-serv-info*
(let* ((host (servdat-host *db-serv-info*))
(port (servdat-port *db-serv-info*)))
(debug:print-info 0 *default-log-port* "Shutting down server/responder.")
;;
;; TODO - add flushing/waiting on the work queue
Index: tests/simplerun/tests/test1/testconfig
==================================================================
--- tests/simplerun/tests/test1/testconfig
+++ tests/simplerun/tests/test1/testconfig
@@ -24,11 +24,11 @@
[requirements]
# waiton setup
priority 0
# Iteration for your tests are controlled by the items section
-[items]
+# [items]
# PARTOFDAY morning noon afternoon evening night
# test_meta is a section for storing additional data on your test
[test_meta]
author matt
ADDED ulex-none/dbmgr.scm
Index: ulex-none/dbmgr.scm
==================================================================
--- /dev/null
+++ ulex-none/dbmgr.scm
@@ -0,0 +1,1123 @@
+;;======================================================================
+;; Copyright 2022, Matthew Welland.
+;;
+;; This file is part of Megatest.
+;;
+;; Megatest is free software: you can redistribute it and/or modify
+;; it under the terms of the GNU General Public License as published by
+;; the Free Software Foundation, either version 3 of the License, or
+;; (at your option) any later version.
+;;
+;; Megatest is distributed in the hope that it will be useful,
+;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+;; GNU General Public License for more details.
+;;
+;; You should have received a copy of the GNU General Public License
+;; along with Megatest. If not, see .
+
+;;======================================================================
+
+(declare (unit dbmgrmod))
+
+(declare (uses ulex))
+(declare (uses apimod))
+(declare (uses pkts))
+(declare (uses commonmod))
+(declare (uses dbmod))
+(declare (uses mtargs))
+(declare (uses portloggermod))
+(declare (uses debugprint))
+
+(module dbmgrmod
+ *
+
+(import scheme
+ chicken.base
+ chicken.condition
+ chicken.file
+ chicken.format
+ chicken.port
+ chicken.process
+ chicken.process-context
+ chicken.process-context.posix
+ chicken.sort
+ chicken.string
+ chicken.time
+
+ (prefix sqlite3 sqlite3:)
+ matchable
+ md5
+ message-digest
+ regex
+ s11n
+ srfi-1
+ srfi-18
+ srfi-69
+ system-information
+ typed-records
+
+ pkts
+ ulex
+
+ commonmod
+ apimod
+ dbmod
+ debugprint
+ (prefix mtargs args:)
+ portloggermod
+ )
+
+;; ;; Configurations for server
+;; ;; (tcp-buffer-size 2048)
+;; ;; (max-connections 2048)
+;;
+;; ;; info about me as a listener and my connections to db servers
+;; ;; stored (for now) in *db-serv-info*
+;; ;;
+;; (defstruct servdat
+;; (host #f)
+;; (port #f)
+;; (uuid #f)
+;; (dbfile #f)
+;; (uconn #f) ;; this is the listener *FOR THIS PROCESS*
+;; (mode #f)
+;; (status 'starting)
+;; (trynum 0) ;; count the number of ports we've tried
+;; (conns (make-hash-table)) ;; apath/dbname => conndat
+;; )
+;;
+;; (define *db-serv-info* (make-servdat))
+;;
+;; (define (servdat->url sdat)
+;; (conc (servdat-host sdat)":"(servdat-port sdat)))
+;;
+;; ;; db servers contact info
+;; ;;
+;; (defstruct conndat
+;; (apath #f)
+;; (dbname #f)
+;; (fullname #f)
+;; (hostport #f)
+;; (ipaddr #f)
+;; (port #f)
+;; (srvpkt #f)
+;; (srvkey #f)
+;; (lastmsg 0)
+;; (expires 0))
+;;
+;; (define *srvpktspec*
+;; `((server (host . h)
+;; (port . p)
+;; (servkey . k)
+;; (pid . i)
+;; (ipaddr . a)
+;; (dbpath . d))))
+;;
+;; ;;======================================================================
+;; ;; S U P P O R T F U N C T I O N S
+;; ;;======================================================================
+;;
+;; ;; set up the api proc, seems like there should be a better place for this?
+;; ;;
+;; ;; IS THIS NEEDED ANYMORE? TODO - REMOVE IF POSSIBLE
+;; ;;
+;; ;; (define api-proc (make-parameter conc))
+;; ;; (api-proc api:execute-requests)
+;;
+;; ;; do we have a connection to apath dbname and
+;; ;; is it not expired? then return it
+;; ;;
+;; ;; else setup a connection
+;; ;;
+;; ;; if that fails, return '(#f "some reason") ;; NB// convert to raising an exception
+;; ;;
+;; (define (rmt:get-conn remdat apath dbname)
+;; (let* ((fullname (db:dbname->path apath dbname)))
+;; (hash-table-ref/default (servdat-conns remdat) fullname #f)))
+;;
+;; (define (rmt:drop-conn remdat apath dbname)
+;; (let* ((fullname (db:dbname->path apath dbname)))
+;; (hash-table-delete! (servdat-conns remdat) fullname)))
+;;
+;; (define (rmt:find-main-server uconn apath dbname)
+;; (let* ((pktsdir (get-pkts-dir apath))
+;; (all-srvpkts (get-all-server-pkts pktsdir *srvpktspec*))
+;; (viable-srvs (get-viable-servers all-srvpkts dbname)))
+;; (get-the-server uconn apath viable-srvs)))
+;;
+;;
+;; (define *connstart-mutex* (make-mutex))
+;; (define *last-main-start* 0)
+;;
+;; ;; looks for a connection to main, returns if have and not exired
+;; ;; creates new otherwise
+;; ;;
+;; ;; connections for other servers happens by requesting from main
+;; ;;
+;; ;; TODO: This is unnecessarily re-creating the record in the hash table
+;; ;;
+;; (define (rmt:open-main-connection remdat apath)
+;; (let* ((fullpath (db:dbname->path apath ".db/main.db"))
+;; (conns (servdat-conns remdat))
+;; (conn (rmt:get-conn remdat apath ".db/main.db")) ;; (hash-table-ref/default conns fullpath #f)) ;; TODO - create call for this
+;; (start-rmt:run (lambda ()
+;; (let* ((th1 (make-thread (lambda ()(rmt:run (get-host-name))) "non-db mode server")))
+;; (thread-start! th1)
+;; (thread-sleep! 1)
+;; (let loop ((count 0))
+;; (assert (< count 30) "FATAL: responder failed to initialize in rmt:open-main-connection")
+;; (if (or (not *db-serv-info*)
+;; (not (servdat-uconn *db-serv-info*)))
+;; (begin
+;; (thread-sleep! 1)
+;; (loop (+ count 1)))
+;; (begin
+;; (servdat-mode-set! *db-serv-info* 'non-db)
+;; (servdat-uconn *db-serv-info*)))))))
+;; (myconn (servdat-uconn *db-serv-info*)))
+;; (cond
+;; ((not myconn)
+;; (start-rmt:run)
+;; (rmt:open-main-connection remdat apath))
+;; ((and conn ;; conn is NOT a socket, just saying ...
+;; (< (current-seconds) (conndat-expires conn)))
+;; #t) ;; we are current and good to go - we'll deal elsewhere with a server that was killed or died
+;; ((and conn
+;; (>= (current-seconds)(conndat-expires conn)))
+;; (debug:print-info 0 *default-log-port* "connection to "fullpath" server expired. Reconnecting.")
+;; (rmt:drop-conn remdat apath ".db/main.db") ;;
+;; (rmt:open-main-connection remdat apath))
+;; (else
+;; ;; Below we will find or create and connect to main
+;; (debug:print-info 0 *default-log-port* "rmt:open-main-connection - starting from scratch")
+;; (let* ((dbname (db:run-id->dbname #f))
+;; (the-srv (rmt:find-main-server myconn apath dbname))
+;; (start-main-srv (lambda () ;; call IF there is no the-srv found
+;; (mutex-lock! *connstart-mutex*)
+;; (if (> (- (current-seconds) *last-main-start*) 5) ;; at least four seconds since last attempt to start main server
+;; (begin
+;; (api:run-server-process apath dbname)
+;; (set! *last-main-start* (current-seconds))
+;; (thread-sleep! 1))
+;; (thread-sleep! 0.25))
+;; (mutex-unlock! *connstart-mutex*)
+;; (rmt:open-main-connection remdat apath) ;; TODO: Add limit to number of tries
+;; )))
+;; (if (not the-srv) ;; have server, try connecting to it
+;; (start-main-srv)
+;; (let* ((srv-addr (server-address the-srv)) ;; need serv
+;; (ipaddr (alist-ref 'ipaddr the-srv))
+;; (port (alist-ref 'port the-srv))
+;; (srvkey (alist-ref 'servkey the-srv))
+;; (fullpath (db:dbname->path apath dbname))
+;;
+;; (new-the-srv (make-conndat
+;; apath: apath
+;; dbname: dbname
+;; fullname: fullpath
+;; hostport: srv-addr
+;; ;; socket: (open-nn-connection srv-addr) - TODO - open ulex connection?
+;; ipaddr: ipaddr
+;; port: port
+;; srvpkt: the-srv
+;; srvkey: srvkey ;; generated by rmt:get-signature on the server side
+;; lastmsg: (current-seconds)
+;; expires: (+ (current-seconds)
+;; (server:expiration-timeout)
+;; -2) ;; this needs to be gathered during the ping
+;; )))
+;; (hash-table-set! conns fullpath new-the-srv)))
+;; #t)))))
+;;
+;; ;; NB// sinfo is a servdat struct
+;; ;;
+;; (define (rmt:general-open-connection sinfo apath dbname #!key (num-tries 5))
+;; (assert (not (equal? dbname ".db/main.db")) "ERROR: general-open-connection should never be called with main as the db")
+;; (let* ((mdbname ".db/main.db") ;; (db:run-id->dbname #f)) TODO: put this back to the lookup when stable
+;; (fullname (db:dbname->path apath dbname))
+;; (conns (servdat-conns sinfo))
+;; (mconn (rmt:get-conn sinfo apath ".db/main.db"))
+;; (dconn (rmt:get-conn sinfo apath dbname)))
+;; #;(if (and mconn
+;; (not (debug:print-logger)))
+;; (begin
+;; (debug:print-info 0 *default-log-port* "Turning on logging to main, look in logs dir for main log.")
+;; (debug:print-logger rmt:log-to-main)))
+;; (cond
+;; ((and mconn
+;; dconn
+;; (< (current-seconds)(conndat-expires dconn)))
+;; #t) ;; good to go
+;; ((not mconn) ;; no channel open to main? open it...
+;; (rmt:open-main-connection sinfo apath)
+;; (rmt:general-open-connection sinfo apath dbname num-tries: (- num-tries 1)))
+;; ((not dconn) ;; no channel open to dbname?
+;; (let* ((res (rmt:send-receive-real sinfo apath mdbname 'get-server `(,apath ,dbname))))
+;; (case res
+;; ((server-started)
+;; (if (> num-tries 0)
+;; (begin
+;; (thread-sleep! 2)
+;; (rmt:general-open-connection sinfo apath dbname num-tries: (- num-tries 1)))
+;; (begin
+;; (debug:print-error 0 *default-log-port* "Failed to start servers needed or open channel to "apath", "dbname)
+;; (exit 1))))
+;; (else
+;; (if (list? res) ;; server has been registered and the info was returned. pass it on.
+;; (begin ;; ("192.168.0.9" 53817
+;; ;; "5e34239f48e8973b3813221e54701a01" "24310"
+;; ;; "192.168.0.9"
+;; ;; "/home/matt/data/megatest/tests/simplerun"
+;; ;; ".db/1.db")
+;; (match
+;; res
+;; ((host port servkey pid ipaddr apath dbname)
+;; (debug:print-info 0 *default-log-port* "got "res)
+;; (hash-table-set! conns
+;; fullname
+;; (make-conndat
+;; apath: apath
+;; dbname: dbname
+;; hostport: (conc host":"port)
+;; ;; socket: (open-nn-connection (conc host":"port)) ;; TODO - open ulex connection?
+;; ipaddr: ipaddr
+;; port: port
+;; srvkey: servkey
+;; lastmsg: (current-seconds)
+;; expires: (+ (current-seconds)
+;; (server:expiration-timeout)
+;; -2))))
+;; (else
+;; (debug:print-info 0 *default-log-port* "return data from starting server did not match host port servkey pid ipaddr apath dbname " res)))
+;; res)
+;; (begin
+;; (debug:print-info 0 *default-log-port* "Unexpected result: " res)
+;; res)))))))
+;; #t))
+;;
+;; ;;======================================================================
+;;
+;; ;; FOR DEBUGGING SET TO #t
+;; ;; (define *localmode* #t)
+;; (define *localmode* #f)
+(define *dbstruct* (make-dbr:dbstruct))
+
+;; Defaults to current area
+;;
+(define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f))
+ (let* ((apath *toppath*)
+ (dbname (db:run-id->dbname rid)))
+ (api:execute-requests *dbstruct* cmd params)))
+
+;; ;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed
+;; ;; sometime in the future
+;; ;;
+;; (define (rmt:send-receive-real sinfo apath dbname cmd params)
+;; (assert (not (eq? 'primordial (thread-name (current-thread)))) "FATAL: Do not call rmt:send-receive-real in the primodial thread.")
+;; (let* ((cdat (rmt:get-conn sinfo apath dbname)))
+;; (assert cdat "FATAL: rmt:send-receive-real called without the needed channels opened")
+;; (let* ((uconn (servdat-uconn sinfo)) ;; get the interface to ulex
+;; ;; then send-receive using the ulex layer to host-port stored in cdat
+;; (res (send-receive uconn (conndat-hostport cdat) cmd params))
+;; #;(th1 (make-thread (lambda ()
+;; (set! res (send-receive uconn (conndat-hostport cdat) cmd params)))
+;; "send-receive thread")))
+;; ;; (thread-start! th1)
+;; ;; (thread-join! th1) ;; gratuitious thread stuff is so that mailbox is not used in primordial thead
+;; ;; since we accessed the server we can bump the expires time up
+;; (conndat-expires-set! cdat (+ (current-seconds)
+;; (server:expiration-timeout)
+;; -2)) ;; two second margin for network time misalignments etc.
+;; res)))
+;;
+;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed
+;; sometime in the future.
+;;
+;; Purpose - call the main.db server and request a server be started
+;; for the given area path and dbname
+;;
+
+(define (rmt:print-db-stats)
+ (let ((fmtstr "~40a~7-d~9-d~20,2-f")) ;; "~20,2-f"
+ (debug:print 18 *default-log-port* "DB Stats, "(seconds->year-week/day-time (current-seconds))"\n=====================")
+ (debug:print 18 *default-log-port* (format #f "~40a~8a~10a~10a" "Cmd" "Count" "TotTime" "Avg"))
+ (for-each (lambda (cmd)
+ (let ((cmd-dat (hash-table-ref *db-stats* cmd)))
+ (debug:print 18 *default-log-port* (format #f fmtstr cmd (vector-ref cmd-dat 0) (vector-ref cmd-dat 1) (/ (vector-ref cmd-dat 1)(vector-ref cmd-dat 0))))))
+ (sort (hash-table-keys *db-stats*)
+ (lambda (a b)
+ (> (vector-ref (hash-table-ref *db-stats* a) 0)
+ (vector-ref (hash-table-ref *db-stats* b) 0)))))))
+
+(define (rmt:get-max-query-average run-id)
+ (mutex-lock! *db-stats-mutex*)
+ (let* ((runkey (conc "run-id=" run-id " "))
+ (cmds (filter (lambda (x)
+ (substring-index runkey x))
+ (hash-table-keys *db-stats*)))
+ (res (if (null? cmds)
+ (cons 'none 0)
+ (let loop ((cmd (car cmds))
+ (tal (cdr cmds))
+ (max-cmd (car cmds))
+ (res 0))
+ (let* ((cmd-dat (hash-table-ref *db-stats* cmd))
+ (tot (vector-ref cmd-dat 0))
+ (curravg (/ (vector-ref cmd-dat 1) (vector-ref cmd-dat 0))) ;; count is never zero by construction
+ (currmax (max res curravg))
+ (newmax-cmd (if (> curravg res) cmd max-cmd)))
+ (if (null? tal)
+ (if (> tot 10)
+ (cons newmax-cmd currmax)
+ (cons 'none 0))
+ (loop (car tal)(cdr tal) newmax-cmd currmax)))))))
+ (mutex-unlock! *db-stats-mutex*)
+ res))
+
+;; ;; host and port are used to ensure we are remove proper records
+;; (define (rmt:server-shutdown host port)
+;; (let ((dbfile (servdat-dbfile *db-serv-info*)))
+;; (debug:print-info 0 *default-log-port* "dbfile is "dbfile)
+;; (if dbfile
+;; (let* ((am-server (args:get-arg "-server"))
+;; (dbfile (args:get-arg "-db"))
+;; (apath *toppath*)
+;; #;(sinfo *remotedat*)) ;; foundation for future fix
+;; (if *dbstruct-db*
+;; (let* ((dbdat (db:get-dbdat *dbstruct-db* apath dbfile))
+;; (db (dbr:dbdat-db dbdat))
+;; (inmem (dbr:dbdat-db dbdat)) ;; WRONG
+;; )
+;; ;; do a final sync here
+;; (debug:print-info 0 *default-log-port* "Doing final sync for "apath" "dbfile" at "(current-seconds))
+;; (db:sync-inmem->disk *dbstruct-db* apath dbfile force-sync: #t)
+;; ;; let's finalize here
+;; (debug:print-info 0 *default-log-port* "Finalizing db and inmem")
+;; (if (sqlite3:database? db)
+;; (sqlite3:finalize! db)
+;; (debug:print-info 0 *default-log-port* "in rmt:server-shutdown, db is not a database, not finalizing..."))
+;; (if (sqlite3:database? inmem)
+;; (sqlite3:finalize! inmem)
+;; (debug:print-info 0 *default-log-port* "in rmt:server-shutdown, inmem is not a database, not finalizing..."))
+;; (debug:print-info 0 *default-log-port* "Finalizing db and inmem complete"))
+;; (debug:print-info 0 *default-log-port* "Db was never opened, no cleanup to do."))
+;; (if (not am-server)
+;; (debug:print-info 0 *default-log-port* "I am not a server, should NOT get here!")
+;; (if (string-match ".*/main.db$" dbfile)
+;; (let ((pkt-file (conc (get-pkts-dir *toppath*)
+;; "/" (servdat-uuid *db-serv-info*)
+;; ".pkt")))
+;; (debug:print-info 0 *default-log-port* "removing pkt "pkt-file)
+;; (delete-file* pkt-file)
+;; (debug:print-info 0 *default-log-port* "Releasing lock (if any) for "dbfile ", host "host", port "port)
+;; (db:with-lock-db
+;; (servdat-dbfile *db-serv-info*)
+;; (lambda (dbh dbfile)
+;; (db:release-lock dbh dbfile host port)))) ;; I'm not the server - should not have a lock to remove
+;; (let* ((sdat *db-serv-info*) ;; we have a run-id server
+;; (host (servdat-host sdat))
+;; (port (servdat-port sdat))
+;; (uuid (servdat-uuid sdat))
+;; (res (rmt:deregister-server *db-serv-info* *toppath* host port uuid dbfile)))
+;; (debug:print-info 0 *default-log-port* "deregistered-server, res="res)
+;; (debug:print-info 0 *default-log-port* "deregistering server "host":"port" with uuid "uuid)
+;; )))))))
+;;
+;;
+;; (define (common:run-sync?)
+;; ;; (and (common:on-homehost?)
+;; (args:get-arg "-server"))
+;;
+;; (define *rmt:run-mutex* (make-mutex))
+;; (define *rmt:run-flag* #f)
+;;
+;; ;; Main entry point to start a server. was start-server
+;; (define (rmt:run hostn)
+;; (mutex-lock! *rmt:run-mutex*)
+;; (if *rmt:run-flag*
+;; (begin
+;; (debug:print-warn 0 *default-log-port* "rmt:run already running.")
+;; (mutex-unlock! *rmt:run-mutex*))
+;; (begin
+;; (set! *rmt:run-flag* #t)
+;; (mutex-unlock! *rmt:run-mutex*)
+;; ;; ;; Configurations for server
+;; ;; (tcp-buffer-size 2048)
+;; ;; (max-connections 2048)
+;; (debug:print 2 *default-log-port* "PID: "(current-process-id)". Attempting to start the server ...")
+;; (if (and *db-serv-info*
+;; (servdat-uconn *db-serv-info*))
+;; (let* ((uconn (servdat-uconn *db-serv-info*)))
+;; (wait-and-close uconn))
+;; (let* ((port (portlogger:open-run-close portlogger:find-port))
+;; (handler-proc (lambda (rem-host-port qrykey cmd params) ;;
+;; (set! *db-last-access* (current-seconds))
+;; (assert (list? params) "FATAL: handler called with non-list params")
+;; (assert (args:get-arg "-server") "FATAL: handler called on non-server side. cmd="cmd", params="params)
+;; (debug:print 0 *default-log-port* "handler call: "cmd", params="params)
+;; (api:execute-requests *dbstruct-db* cmd params))))
+;; ;; (api:process-request *dbstuct-db*
+;; (if (not *db-serv-info*)
+;; (set! *db-serv-info* (make-servdat host: hostn port: port)))
+;; (let* ((uconn (run-listener handler-proc port))
+;; (rport (udat-port uconn))) ;; the real port
+;; (servdat-host-set! *db-serv-info* hostn)
+;; (servdat-port-set! *db-serv-info* rport)
+;; (servdat-uconn-set! *db-serv-info* uconn)
+;; (wait-and-close uconn)
+;; (db:print-current-query-stats)
+;; )))
+;; (let* ((host (servdat-host *db-serv-info*))
+;; (port (servdat-port *db-serv-info*))
+;; (mode (or (servdat-mode *db-serv-info*)
+;; "non-db")))
+;; ;; server exit stuff here
+;; ;; (rmt:server-shutdown host port) - always do in on-exit
+;; ;; (portlogger:open-run-close portlogger:set-port port "released") ;; moved to on-exit
+;; (debug:print-info 0 *default-log-port* "Server "host":"port" mode "mode"shutdown complete. Exiting")
+;; ))))
+;;
+;; ;;======================================================================
+;; ;; S E R V E R U T I L I T I E S
+;; ;;======================================================================
+;;
+;;
+;; ;;======================================================================
+;; ;; NEW SERVER METHOD
+;; ;;======================================================================
+;;
+;; ;; only use for main.db - need to re-write some of this :(
+;; ;;
+;; (define (get-lock-db sdat dbfile host port)
+;; (assert host "FATAL: get-lock-db called with host not set.")
+;; (assert port "FATAL: get-lock-db called with port not set.")
+;; (let* ((dbh (db:open-run-db dbfile db:initialize-db)) ;; open-run-db creates a standard db with schema used by all situations
+;; (res (db:get-iam-server-lock dbh dbfile host port))
+;; (uconn (servdat-uconn sdat)))
+;; ;; res => list then already locked, check server is responsive
+;; ;; => #t then sucessfully got the lock
+;; ;; => #f reserved for future use as to indicate something went wrong
+;; (match res
+;; ((owner_pid owner_host owner_port event_time)
+;; (if (server-ready? uconn (conc owner_host":"owner_port) "abc")
+;; #f ;; locked by someone else
+;; (begin ;; locked by someone dead and gone
+;; (debug:print 0 *default-log-port* "WARNING: stale lock - have to steal it. This may fail.")
+;; (db:steal-lock-db dbh dbfile port))))
+;; (#t #t) ;; placeholder so that we don't touch res if it is #t
+;; (else (set! res #f)))
+;; (sqlite3:finalize! dbh)
+;; res))
+;;
+;;
+;; (define (register-server pkts-dir pkt-spec host port servkey ipaddr dbpath)
+;; (let* ((pkt-dat `((host . ,host)
+;; (port . ,port)
+;; (servkey . ,servkey)
+;; (pid . ,(current-process-id))
+;; (ipaddr . ,ipaddr)
+;; (dbpath . ,dbpath)))
+;; (uuid (write-alist->pkt
+;; pkts-dir
+;; pkt-dat
+;; pktspec: pkt-spec
+;; ptype: 'server)))
+;; (debug:print 0 *default-log-port* "Server on "host":"port" registered in pkt "uuid)
+;; uuid))
+;;
+;; (define (get-pkts-dir #!optional (apath #f))
+;; (let* ((effective-toppath (or *toppath* apath)))
+;; (assert effective-toppath
+;; "ERROR: get-pkts-dir called without *toppath* set. Exiting.")
+;; (let* ((pdir (conc effective-toppath "/.meta/srvpkts")))
+;; (if (file-exists? pdir)
+;; pdir
+;; (begin
+;; (handle-exceptions ;; this exception handler should NOT be needed but ...
+;; exn
+;; pdir
+;; (create-directory pdir #t))
+;; pdir)))))
+;;
+;; ;; given a pkts dir read
+;; ;;
+;; (define (get-all-server-pkts pktsdir-in pktspec)
+;; (let* ((pktsdir (if (file-exists? pktsdir-in)
+;; pktsdir-in
+;; (begin
+;; (create-directory pktsdir-in #t)
+;; pktsdir-in)))
+;; (all-pkt-files (glob (conc pktsdir "/*.pkt"))))
+;; (map (lambda (pkt-file)
+;; (read-pkt->alist pkt-file pktspec: pktspec))
+;; all-pkt-files)))
+;;
+;; (define (server-address srv-pkt)
+;; (conc (alist-ref 'host srv-pkt) ":"
+;; (alist-ref 'port srv-pkt)))
+
+(define (server-ready? uconn host-port key) ;; server-address is host:port
+ #;(let* ((params `((cmd . ping)(key . ,key)))
+ (data `((cmd . ping)
+ (key . ,key)
+ (params . ,params))) ;; I don't get it.
+ (res (send-receive uconn host-port 'ping data)))
+ (if (eq? res 'ack) ;; yep, likely it is who we want on the other end
+ res
+ #f))
+ #t)
+
+;; ; from the pkts return servers associated with dbpath
+;; ;; NOTE: Only one can be alive - have to check on each
+;; ;; in the list of pkts returned
+;; ;;
+;; (define (get-viable-servers serv-pkts dbpath)
+;; (let loop ((tail serv-pkts)
+;; (res '()))
+;; (if (null? tail)
+;; res ;; NOTE: sort by age so oldest is considered first
+;; (let* ((spkt (car tail)))
+;; (loop (cdr tail)
+;; (if (equal? dbpath (alist-ref 'dbpath spkt))
+;; (cons spkt res)
+;; res))))))
+;;
+;; (define (remove-pkts-if-not-alive uconn serv-pkts)
+;; (filter (lambda (pkt)
+;; (let* ((host (alist-ref 'host pkt))
+;; (port (alist-ref 'port pkt))
+;; (host-port (conc host":"port))
+;; (key (alist-ref 'servkey pkt))
+;; (pktz (alist-ref 'Z pkt))
+;; (res (server-ready? uconn host-port key)))
+;; (if res
+;; res
+;; (let* ((pktsdir (get-pkts-dir *toppath*))
+;; (pktpath (conc pktsdir"/"pktz".pkt")))
+;; (debug:print 0 *default-log-port* "WARNING: pkt with no server "pktpath)
+;; (delete-file* pktpath)
+;; #f))))
+;; serv-pkts))
+;;
+;; ;; from viable servers get one that is alive and ready
+;; ;;
+;; (define (get-the-server uconn apath serv-pkts)
+;; (let loop ((tail serv-pkts))
+;; (if (null? tail)
+;; #f
+;; (let* ((spkt (car tail))
+;; (host (alist-ref 'ipaddr spkt))
+;; (port (alist-ref 'port spkt))
+;; (host-port (conc host":"port))
+;; (dbpth (alist-ref 'dbpath spkt))
+;; (srvkey (alist-ref 'Z spkt)) ;; (alist-ref 'srvkey spkt))
+;; (addr (server-address spkt)))
+;; (if (server-ready? uconn host-port srvkey)
+;; spkt
+;; (loop (cdr tail)))))))
+;;
+;; ;; am I the "first" in line server? I.e. my D card is smallest
+;; ;; use Z card as tie breaker
+;; ;;
+;; (define (get-best-candidate serv-pkts dbpath)
+;; (if (null? serv-pkts)
+;; #f
+;; (let loop ((tail serv-pkts)
+;; (best (car serv-pkts)))
+;; (if (null? tail)
+;; best
+;; (let* ((candidate (car tail))
+;; (candidate-bd (string->number (alist-ref 'D candidate)))
+;; (best-bd (string->number (alist-ref 'D best)))
+;; ;; bigger number is younger
+;; (candidate-z (alist-ref 'Z candidate))
+;; (best-z (alist-ref 'Z best))
+;; (new-best (cond
+;; ((> best-bd candidate-bd) ;; best is younger than candidate
+;; candidate)
+;; ((< best-bd candidate-bd) ;; candidate is younger than best
+;; best)
+;; (else
+;; (if (string>=? best-z candidate-z)
+;; best
+;; candidate))))) ;; use Z card as tie breaker
+;; (if (null? tail)
+;; new-best
+;; (loop (cdr tail) new-best)))))))
+;;
+;;
+;; ;;======================================================================
+;; ;; END NEW SERVER METHOD
+;; ;;======================================================================
+;;
+;; ;; if .db/main.db check the pkts
+;; ;;
+;; (define (rmt:wait-for-server pkts-dir db-file server-key)
+;; (let* ((sdat *db-serv-info*))
+;; (let loop ((start-time (current-seconds))
+;; (changed #t)
+;; (last-sdat "not this"))
+;; (begin ;; let ((sdat #f))
+;; (thread-sleep! 0.01)
+;; (debug:print-info 0 *default-log-port* "Waiting for server alive signature")
+;; (mutex-lock! *heartbeat-mutex*)
+;; (set! sdat *db-serv-info*)
+;; (mutex-unlock! *heartbeat-mutex*)
+;; (if (and sdat
+;; (not changed)
+;; (> (- (current-seconds) start-time) 2))
+;; (let* ((uconn (servdat-uconn sdat)))
+;; (servdat-status-set! sdat 'iface-stable)
+;; (debug:print-info 0 *default-log-port* "Received server alive signature, now attempting to lock in server")
+;; ;; create a server pkt in *toppath*/.meta/srvpkts
+;;
+;; ;; TODO:
+;; ;; 1. change sdat to stuct
+;; ;; 2. add uuid to struct
+;; ;; 3. update uuid in sdat here
+;; ;;
+;; (servdat-uuid-set! sdat
+;; (register-server
+;; pkts-dir *srvpktspec*
+;; (get-host-name)
+;; (servdat-port sdat) server-key
+;; (servdat-host sdat) db-file))
+;; ;; (set! *my-signature* (servdat-uuid sdat)) ;; replace with Z, no, stick with proper key
+;; ;; now read pkts and see if we are a contender
+;; (let* ((all-pkts (get-all-server-pkts pkts-dir *srvpktspec*))
+;; (viables (get-viable-servers all-pkts db-file))
+;; (alive (remove-pkts-if-not-alive uconn viables))
+;; (best-srv (get-best-candidate alive db-file))
+;; (best-srv-key (if best-srv (alist-ref 'servkey best-srv) #f))
+;; (i-am-srv (equal? best-srv-key server-key))
+;; (delete-pkt (lambda ()
+;; (let* ((pktfile (conc (get-pkts-dir *toppath*)
+;; "/" (servdat-uuid *db-serv-info*)
+;; ".pkt")))
+;; (debug:print-info 0 *default-log-port* "Attempting to remove bogus pkt file "pktfile)
+;; (delete-file* pktfile))))) ;; remove immediately instead of waiting for on-exit
+;; (debug:print 0 *default-log-port* "best-srv-key: "best-srv-key", server-key: "server-key", i-am-srv: "i-am-srv)
+;; ;; am I the best-srv, compare server-keys to know
+;; (if i-am-srv
+;; (if (get-lock-db sdat db-file (servdat-host sdat)(servdat-port sdat)) ;; (db:get-iam-server-lock *dbstruct-db* *toppath* run-id)
+;; (begin
+;; (debug:print-info 0 *default-log-port* "I'm the server!")
+;; (servdat-dbfile-set! sdat db-file)
+;; (servdat-status-set! sdat 'db-locked))
+;; (begin
+;; (debug:print-info 0 *default-log-port* "I'm not the server, exiting.")
+;; (bdat-time-to-exit-set! *bdat* #t)
+;; (delete-pkt)
+;; (thread-sleep! 0.2)
+;; (exit)))
+;; (begin
+;; (debug:print-info 0 *default-log-port*
+;; "Keys do not match "best-srv-key", "server-key", exiting.")
+;; (bdat-time-to-exit-set! *bdat* #t)
+;; (delete-pkt)
+;; (thread-sleep! 0.2)
+;; (exit)))
+;; sdat))
+;; (begin ;; sdat not yet contains server info
+;; (debug:print-info 0 *default-log-port* "Still waiting, last-sdat=" last-sdat)
+;; (sleep 4)
+;; (if (> (- (current-seconds) start-time) 120) ;; been waiting for two minutes
+;; (begin
+;; (debug:print-error 0 *default-log-port* "transport appears to have died, exiting server")
+;; (exit))
+;; (loop start-time
+;; (equal? sdat last-sdat)
+;; sdat))))))))
+;;
+;; (define (rmt:register-server sinfo apath iface port server-key dbname)
+;; (servdat-conns sinfo) ;; just checking types
+;; (rmt:open-main-connection sinfo apath) ;; we need a channel to main.db
+;; (rmt:send-receive-real sinfo apath ;; params: host port servkey pid ipaddr dbpath
+;; (db:run-id->dbname #f)
+;; 'register-server `(,iface
+;; ,port
+;; ,server-key
+;; ,(current-process-id)
+;; ,iface
+;; ,apath
+;; ,dbname)))
+;;
+;; (define (rmt:get-count-servers sinfo apath)
+;; (servdat-conns sinfo) ;; just checking types
+;; (rmt:open-main-connection sinfo apath) ;; we need a channel to main.db
+;; (rmt:send-receive-real sinfo apath ;; params: host port servkey pid ipaddr dbpath
+;; (db:run-id->dbname #f)
+;; 'get-count-servers `(,apath)))
+
+(define (rmt:get-servers-info apath)
+ (rmt:send-receive 'get-servers-info #f `(,apath)))
+
+;; (define (rmt:deregister-server db-serv-info apath iface port server-key dbname)
+;; (rmt:open-main-connection db-serv-info apath) ;; we need a channel to main.db
+;; (rmt:send-receive-real db-serv-info apath ;; params: host port servkey pid ipaddr dbpath
+;; (db:run-id->dbname #f)
+;; 'deregister-server `(,iface
+;; ,port
+;; ,server-key
+;; ,(current-process-id)
+;; ,iface
+;; ,apath
+;; ,dbname)))
+;;
+;; (define (rmt:wait-for-stable-interface #!optional (num-tries-allowed 100))
+;; ;; wait until *db-serv-info* stops changing
+;; (let* ((stime (current-seconds)))
+;; (let loop ((last-host #f)
+;; (last-port #f)
+;; (tries 0))
+;; (let* ((curr-host (and *db-serv-info* (servdat-host *db-serv-info*)))
+;; (curr-port (and *db-serv-info* (servdat-port *db-serv-info*))))
+;; ;; first we verify port and interface, update *db-serv-info* in need be.
+;; (cond
+;; ((> tries num-tries-allowed)
+;; (debug:print 0 *default-log-port* "rmt:keep-running, giving up after trying for several minutes.")
+;; (exit 1))
+;; ((not *db-serv-info*)
+;; (thread-sleep! 0.25)
+;; (loop curr-host curr-port (+ tries 1)))
+;; ((or (not last-host)(not last-port))
+;; (debug:print 0 *default-log-port* "rmt:keep-running, still no interface, tries="tries)
+;; (thread-sleep! 0.25)
+;; (loop curr-host curr-port (+ tries 1)))
+;; ((or (not (equal? last-host curr-host))
+;; (not (equal? last-port curr-port)))
+;; (debug:print-info 0 *default-log-port* "WARNING: interface changed, refreshing iface and port info")
+;; (thread-sleep! 0.25)
+;; (loop curr-host curr-port (+ tries 1)))
+;; ((< (- (current-seconds) stime) 1) ;; keep up the looping until at least 3 seconds have passed
+;; (thread-sleep! 0.5)
+;; (loop curr-host curr-port (+ tries 1)))
+;; (else
+;; (rmt:get-signature) ;; sets *my-signature* as side effect
+;; (servdat-status-set! *db-serv-info* 'interface-stable)
+;; (debug:print 0 *default-log-port*
+;; "SERVER STARTED: " curr-host
+;; ":" curr-port
+;; " AT " (current-seconds) " server signature: " *my-signature*
+;; " with "(servdat-trynum *db-serv-info*)" port changes")
+;; (flush-output *default-log-port*)
+;; #t))))))
+;;
+;; ;; run rmt:keep-running in a parallel thread to monitor that the db is being
+;; ;; used and to shutdown after sometime if it is not.
+;; ;;
+;; (define (rmt:keep-running dbname)
+;; ;; if none running or if > 20 seconds since
+;; ;; server last used then start shutdown
+;; ;; This thread waits for the server to come alive
+;; (debug:print-info 0 *default-log-port* "Starting the sync-back, keep alive thread in server")
+;;
+;; (let* ((sinfo *db-serv-info*)
+;; (server-start-time (current-seconds))
+;; (pkts-dir (get-pkts-dir))
+;; (server-key (rmt:get-signature)) ;; This servers key
+;; (is-main (equal? (args:get-arg "-db") ".db/main.db"))
+;; (last-access 0)
+;; (server-timeout (server:expiration-timeout))
+;; (shutdown-server-sequence (lambda (host port)
+;; (set! *unclean-shutdown* #f) ;; Should not be needed anymore
+;; (debug:print-info 0 *default-log-port* "Starting to shutdown the server. pid="(current-process-id))
+;; ;; (rmt:server-shutdown host port) -- called in on-exit
+;; ;; (portlogger:open-run-close portlogger:set-port port "released") called in on-exit
+;; (exit)))
+;; (timed-out? (lambda ()
+;; (<= (+ last-access server-timeout)
+;; (current-seconds)))))
+;; (servdat-dbfile-set! *db-serv-info* (args:get-arg "-db"))
+;; ;; main and run db servers have both got wait logic (could/should merge it)
+;; (if is-main
+;; (rmt:wait-for-server pkts-dir dbname server-key)
+;; (rmt:wait-for-stable-interface))
+;; ;; this is our forever loop
+;; (let* ((iface (servdat-host *db-serv-info*))
+;; (port (servdat-port *db-serv-info*))
+;; (uconn (servdat-uconn *db-serv-info*)))
+;; (let loop ((count 0)
+;; (bad-sync-count 0)
+;; (start-time (current-milliseconds)))
+;; (if (and (not is-main)
+;; (common:low-noise-print 60 "servdat-status"))
+;; (debug:print-info 0 *default-log-port* "servdat-status is " (servdat-status *db-serv-info*)))
+;;
+;; (mutex-lock! *heartbeat-mutex*)
+;; ;; set up the database handle
+;; (if (not *dbstruct-db*) ;; no db opened yet, open the db and register with main if appropriate
+;; (let ((watchdog (bdat-watchdog *bdat*)))
+;; (debug:print 0 *default-log-port* "SERVER: dbprep")
+;; (db:setup dbname) ;; sets *dbstruct-db* as side effect
+;; (servdat-status-set! *db-serv-info* 'db-opened)
+;; ;; IFF I'm not main, call into main and register self
+;; (if (not is-main)
+;; (let ((res (rmt:register-server sinfo
+;; *toppath* iface port
+;; server-key dbname)))
+;; (if res ;; we are the server
+;; (servdat-status-set! *db-serv-info* 'have-interface-and-db)
+;; ;; now check that the db locker is alive, clear it out if not
+;; (let* ((serv-info (rmt:server-info *toppath* dbname)))
+;; (match serv-info
+;; ((host port servkey pid ipaddr apath dbpath)
+;; (if (not (server-ready? uconn (conc host":"port) servkey))
+;; (begin
+;; (debug:print-info 0 *default-log-port* "Server registered but not alive. Removing and trying again.")
+;; (rmt:deregister-server sinfo apath host port servkey dbpath) ;; servkey pid ipaddr apath dbpath)
+;; (loop (+ count 1) bad-sync-count start-time))))
+;; (else
+;; (debug:print 0 *default-log-port* "We are not the server for "dbname", exiting. Server info is: "serv-info)
+;; (exit)))))))
+;; (debug:print 0 *default-log-port*
+;; "SERVER: running, db "dbname" opened, megatest version: "
+;; (common:get-full-version))
+;; ;; start the watchdog
+;;
+;; ;; is this really needed?
+;;
+;; #;(if watchdog
+;; (if (not (member (thread-state watchdog)
+;; '(ready running blocked
+;; sleeping dead)))
+;; (begin
+;; (debug:print-info 0 *default-log-port* "Starting watchdog thread (in state "(thread-state watchdog)")")
+;; (thread-start! watchdog))
+;; (debug:print-info 0 *default-log-port* "Not starting watchdog thread (in state "(thread-state watchdog)")"))
+;; (debug:print 0 *default-log-port* "ERROR: *watchdog* not setup, cannot start it."))
+;; #;(loop (+ count 1) bad-sync-count start-time)
+;; ))
+;;
+;; (db:sync-inmem->disk *dbstruct-db* *toppath* dbname force-sync: #t)
+;;
+;; (mutex-unlock! *heartbeat-mutex*)
+;;
+;; ;; when things go wrong we don't want to be doing the various
+;; ;; queries too often so we strive to run this stuff only every
+;; ;; four seconds or so.
+;; (let* ((sync-time (- (current-milliseconds) start-time))
+;; (rem-time (quotient (- 4000 sync-time) 1000)))
+;; (if (and (<= rem-time 4)
+;; (> rem-time 0))
+;; (thread-sleep! rem-time)))
+;;
+;; ;; Transfer *db-last-access* to last-access to use in checking that we are still alive
+;; (set! last-access *db-last-access*)
+;;
+;; (if (< count 1) ;; 3x3 = 9 secs aprox
+;; (loop (+ count 1) bad-sync-count (current-milliseconds)))
+;;
+;; (if (common:low-noise-print 60 "dbstats")
+;; (begin
+;; (debug:print 0 *default-log-port* "Server stats:")
+;; (db:print-current-query-stats)))
+;; (let* ((hrs-since-start (/ (- (current-seconds) server-start-time) 3600)))
+;; (cond
+;; ((not *server-run*)
+;; (debug:print-info 0 *default-log-port* "*server-run* set to #f. Shutting down.")
+;; (shutdown-server-sequence (get-host-name) port))
+;; ((timed-out?)
+;; (debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access))
+;; (shutdown-server-sequence (get-host-name) port))
+;; ((and *server-run*
+;; (or (not (timed-out?))
+;; (if is-main ;; do not exit if there are other servers (keep main open until all others gone)
+;; (> (rmt:get-count-servers sinfo *toppath*) 1)
+;; #f)))
+;; (if (common:low-noise-print 120 "server continuing")
+;; (debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access)))
+;; (loop 0 bad-sync-count (current-milliseconds)))
+;; (else
+;; (set! *unclean-shutdown* #f)
+;; (debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access))
+;; (shutdown-server-sequence (get-host-name) port)
+;; #;(debug:print-info 0 *default-log-port* "Sending 'quit to server, received: "
+;; (open-send-receive-nn (conc iface":"port) ;; do this here and not in server-shutdown
+;; (sexpr->string 'quit))))))))))
+;;
+;; (define (rmt:get-reasonable-hostname)
+;; (let* ((inhost (or (args:get-arg "-server") "-")))
+;; (if (equal? inhost "-")
+;; (get-host-name)
+;; inhost)))
+;;
+;; Call this to start the actual server
+;;
+;; all routes though here end in exit ...
+;;
+;; This is the point at which servers are started
+;;
+(define (rmt:server-launch dbname)
+ (debug:print-info 0 *default-log-port* "Entered rmt:server-launch")
+ #;(let* ((th2 (make-thread (lambda ()
+ (debug:print-info 0 *default-log-port* "Server run thread started")
+ (rmt:run (rmt:get-reasonable-hostname)))
+ "Server run"))
+ (th3 (make-thread (lambda ()
+ (debug:print-info 0 *default-log-port* "Server monitor thread started")
+ (if (args:get-arg "-server")
+ (rmt:keep-running dbname)))
+ "Keep running")))
+ (thread-start! th2)
+ (thread-sleep! 0.252) ;; give the server time to settle before starting the keep-running monitor.
+ (thread-start! th3)
+ (set! *didsomething* #t)
+ (thread-join! th2)
+ (thread-join! th3))
+ #f)
+;;
+;; ;;======================================================================
+;; ;; S E R V E R - D I R E C T C A L L S
+;; ;;======================================================================
+;;
+;; (define (rmt:kill-server run-id)
+;; (rmt:send-receive 'kill-server #f (list run-id)))
+;;
+;; (define (rmt:start-server run-id)
+;; (rmt:send-receive 'start-server #f (list run-id)))
+;;
+;; (define (rmt:server-info apath dbname)
+;; (rmt:send-receive 'get-server-info #f (list apath dbname)))
+;;
+;; ;;======================================================================
+;; ;; Nanomsg transport
+;; ;;======================================================================
+;;
+;; #;(define (is-port-in-use port-num)
+;; (let* ((ret #f))
+;; (let-values (((inp oup pid)
+;; (process "netstat" (list "-tulpn" ))))
+;; (let loop ((inl (read-line inp)))
+;; (if (not (eof-object? inl))
+;; (begin
+;; (if (string-search (regexp (conc ":" port-num)) inl)
+;; (begin
+;; ;(print "Output: " inl)
+;; (set! ret #t))
+;; (loop (read-line inp)))))))
+;; ret))
+;;
+;; #;(define (open-nn-connection host-port)
+;; (let ((req (make-req-socket))
+;; (uri (conc "tcp://" host-port)))
+;; (nng-dial req uri)
+;; (socket-set! req 'nng/recvtimeo 2000)
+;; req))
+;;
+;; #;(define (send-receive-nn req msg)
+;; (nng-send req msg)
+;; (nng-recv req))
+;;
+;; #;(define (close-nn-connection req)
+;; (nng-close! req))
+;;
+;; ;; ;; open connection to server, send message, close connection
+;; ;; ;;
+;; ;; (define (open-send-close-nn host-port msg #!key (timeout 3) ) ;; default timeout is 3 seconds
+;; ;; (let ((req (make-req-socket 'req))
+;; ;; (uri (conc "tcp://" host-port))
+;; ;; (res #f)
+;; ;; ;; (contacts (alist-ref 'contact attrib))
+;; ;; ;; (mode (alist-ref 'mode attrib))
+;; ;; )
+;; ;; (socket-set! req 'nng/recvtimeo 2000)
+;; ;; (handle-exceptions
+;; ;; exn
+;; ;; (let ((emsg ((condition-property-accessor 'exn 'message) exn)))
+;; ;; ;; Send notification
+;; ;; (debug:print 0 *default-log-port* "ERROR: Failed to connect / send to " uri " message was \"" emsg "\"" )
+;; ;; #f)
+;; ;; (nng-dial req uri)
+;; ;; ;; (print "Connected to the server " )
+;; ;; (nng-send req msg)
+;; ;; ;; (print "Request Sent")
+;; ;; (let* ((th1 (make-thread (lambda ()
+;; ;; (let ((resp (nng-recv req)))
+;; ;; (nng-close! req)
+;; ;; (set! res (if (equal? resp "ok")
+;; ;; #t
+;; ;; #f))))
+;; ;; "recv thread"))
+;; ;; (th2 (make-thread (lambda ()
+;; ;; (thread-sleep! timeout)
+;; ;; (thread-terminate! th1))
+;; ;; "timer thread")))
+;; ;; (thread-start! th1)
+;; ;; (thread-start! th2)
+;; ;; (thread-join! th1)
+;; ;; res))))
+;; ;;
+;; #;(define (open-send-receive-nn host-port msg #!key (timeout 3) ) ;; default timeout is 3 seconds
+;; (let ((req (make-req-socket))
+;; (uri (conc "tcp://" host-port))
+;; (res #f))
+;; (handle-exceptions
+;; exn
+;; (let ((emsg ((condition-property-accessor 'exn 'message) exn)))
+;; ;; Send notification
+;; (debug:print 0 *default-log-port* "ERROR: Failed to connect / send to " uri " message was \"" emsg "\", exn=" exn)
+;; #f)
+;; (nng-dial req uri)
+;; (nng-send req msg)
+;; (let* ((th1 (make-thread (lambda ()
+;; (let ((resp (nng-recv req)))
+;; (nng-close! req)
+;; ;; (print resp)
+;; (set! res resp)))
+;; "recv thread"))
+;; (th2 (make-thread (lambda ()
+;; (thread-sleep! timeout)
+;; (thread-terminate! th1))
+;; "timer thread")))
+;; (thread-start! th1)
+;; (thread-start! th2)
+;; (thread-join! th1)
+;; res))))
+;;
+;; ;;======================================================================
+;; ;; S E R V E R U T I L I T I E S
+;; ;;======================================================================
+;;
+;; ;; run ping in separate process, safest way in some cases
+;; ;;
+;; #;(define (server:ping-server ifaceport)
+;; (with-input-from-pipe
+;; (conc (common:get-megatest-exe) " -ping " ifaceport)
+;; (lambda ()
+;; (let loop ((inl (read-line))
+;; (res "NOREPLY"))
+;; (if (eof-object? inl)
+;; (case (string->symbol res)
+;; ((NOREPLY) #f)
+;; ((LOGIN_OK) #t)
+;; (else #f))
+;; (loop (read-line) inl))))))
+;;
+;; ;; NOT USED (well, ok, reference in rpc-transport but otherwise not used).
+;; ;;
+;; #;(define (server:login toppath)
+;; (lambda (toppath)
+;; (set! *db-last-access* (current-seconds)) ;; might not be needed.
+;; (if (equal? *toppath* toppath)
+;; #t
+;; #f)))
+;;
+;; ;; (define server:sync-lock-token "SERVER_SYNC_LOCK")
+;; ;; (define (server:release-sync-lock)
+;; ;; (db:no-sync-del! *no-sync-db* server:sync-lock-token))
+;; ;; (define (server:have-sync-lock?)
+;; ;; (let* ((have-lock-pair (db:no-sync-get-lock *no-sync-db* server:sync-lock-token))
+;; ;; (have-lock? (car have-lock-pair))
+;; ;; (lock-time (cdr have-lock-pair))
+;; ;; (lock-age (- (current-seconds) lock-time)))
+;; ;; (cond
+;; ;; (have-lock? #t)
+;; ;; ((>lock-age
+;; ;; (* 3 (configf:lookup-number *configdat* "server" "minimum-intersync-delay" default: 180)))
+;; ;; (server:release-sync-lock)
+;; ;; (server:have-sync-lock?))
+;; ;; (else #f))))
+
+)
ADDED ulex-none/ulex.scm
Index: ulex-none/ulex.scm
==================================================================
--- /dev/null
+++ ulex-none/ulex.scm
@@ -0,0 +1,569 @@
+;; ulex: Distributed sqlite3 db
+;;;
+;; Copyright (C) 2018-2021 Matt Welland
+;; Redistribution and use in source and binary forms, with or without
+;; modification, is permitted.
+;;
+;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS
+;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+;; ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE
+;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
+;; OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+;; BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+;; LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
+;; USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
+;; DAMAGE.
+
+;;======================================================================
+;; ABOUT:
+;; See README in the distribution at https://www.kiatoa.com/fossils/ulex
+;; NOTES:
+;; Why sql-de-lite and not say, dbi? - performance mostly, then simplicity.
+;;
+;;======================================================================
+
+(module ulex
+ *
+ #;(
+
+ ;; NOTE: looking for the handler proc - find the run-listener :)
+
+ run-listener ;; (run-listener handler-proc [port]) => uconn
+
+ ;; NOTE: handler-proc params;
+ ;; (handler-proc rem-host-port qrykey cmd params)
+
+ send-receive ;; (send-receive uconn host-port cmd data)
+
+ ;; NOTE: cmd can be any plain text symbol except for these;
+ ;; 'ping 'ack 'goodbye 'response
+
+ set-work-handler ;; (set-work-handler proc)
+
+ wait-and-close ;; (wait-and-close uconn)
+
+ ulex-listener?
+
+ ;; needed to get the interface:port that was automatically found
+ udat-port
+ udat-host-port
+
+ ;; for testing only
+ ;; pp-uconn
+
+ ;; parameters
+ work-method ;; parameter; 'threads, 'mailbox, 'limited, 'direct
+ return-method ;; parameter; 'mailbox, 'polling, 'direct
+ )
+
+(import scheme
+ chicken.base
+ chicken.file
+ chicken.io
+ chicken.time
+ chicken.condition
+ chicken.string
+ chicken.sort
+ chicken.pretty-print
+
+ address-info
+ mailbox
+ matchable
+ ;; queues
+ regex
+ regex-case
+ simple-exceptions
+ s11n
+ srfi-1
+ srfi-18
+ srfi-4
+ srfi-69
+ system-information
+ tcp6
+ typed-records
+ )
+
+;; ;; udat struct, used by both caller and callee
+;; ;; instantiated as uconn by convention
+;; ;;
+;; (defstruct udat
+;; ;; the listener side
+;; (port #f)
+;; (host-port #f)
+;; (socket #f)
+;; ;; the peers
+;; (peers (make-hash-table)) ;; host:port->peer
+;; ;; work handling
+;; (work-queue (make-mailbox))
+;; (work-proc #f) ;; set by user
+;; (cnum 0) ;; cookie number
+;; (mboxes (make-hash-table)) ;; for the replies
+;; (avail-cmboxes '()) ;; list of ( . ) for re-use
+;; ;; threads
+;; (numthreads 10)
+;; (cmd-thread #f)
+;; (work-queue-thread #f)
+;; (num-threads-running 0)
+;; )
+;;
+;; ;; Parameters
+;;
+;; ;; work-method:
+;; (define work-method (make-parameter 'mailbox))
+;; ;; mailbox - all rdat goes through mailbox
+;; ;; threads - all rdat immediately executed in new thread
+;; ;; direct - no queuing
+;; ;;
+;;
+;; ;; return-method, return the result to waiting send-receive:
+;; (define return-method (make-parameter 'mailbox))
+;; ;; mailbox - create a mailbox and use it for passing returning results to send-receive
+;; ;; polling - put the result in a hash table keyed by qrykey and send-receive can poll it for result
+;; ;; direct - no queuing, result is passed back in single tcp connection
+;; ;;
+;;
+;; ;; ;; struct for keeping track of others we are talking to
+;; ;; ;;
+;; ;; (defstruct pdat
+;; ;; (host-port #f)
+;; ;; (conns '()) ;; list of pcon structs, pop one off when calling the peer
+;; ;; )
+;; ;;
+;; ;; ;; struct for peer connections, keep track of expiration etc.
+;; ;; ;;
+;; ;; (defstruct pcon
+;; ;; (inp #f)
+;; ;; (oup #f)
+;; ;; (exp (+ (current-seconds) 59)) ;; expires at this time, set to (+ (current-seconds) 59)
+;; ;; (lifetime (+ (current-seconds) 600)) ;; throw away and create new after five minutes
+;; ;; )
+;;
+;; ;;======================================================================
+;; ;; listener
+;; ;;======================================================================
+;;
+;; ;; is uconn a ulex connector (listener)
+;; ;;
+;; (define (ulex-listener? uconn)
+;; (udat? uconn))
+;;
+;; ;; create a tcp listener and return a populated udat struct with
+;; ;; my port, address, hostname, pid etc.
+;; ;; return #f if fail to find a port to allocate.
+;; ;;
+;; ;; if udata-in is #f create the record
+;; ;; if there is already a serv-listener return the udata
+;; ;;
+;; (define (setup-listener uconn #!optional (port 4242))
+;; (handle-exceptions
+;; exn
+;; (if (< port 65535)
+;; (setup-listener uconn (+ port 1))
+;; #f)
+;; (connect-listener uconn port)))
+;;
+;; (define (connect-listener uconn port)
+;; ;; (tcp-listener-socket LISTENER)(socket-name so)
+;; ;; sockaddr-address, sockaddr-port, sockaddr->string
+;; (let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]])
+;; (addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname)))
+;; (udat-port-set! uconn port)
+;; (udat-host-port-set! uconn (conc addr":"port))
+;; (udat-socket-set! uconn tlsn)
+;; uconn))
+;;
+;; ;; run-listener does all the work of starting a listener in a thread
+;; ;; it then returns control
+;; ;;
+;; (define (run-listener handler-proc #!optional (port-suggestion 4242))
+;; (let* ((uconn (make-udat)))
+;; (udat-work-proc-set! uconn handler-proc)
+;; (if (setup-listener uconn port-suggestion)
+;; (let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop"))
+;; (th2 (make-thread (lambda ()
+;; (case (work-method)
+;; ((mailbox limited)
+;; (process-work-queue uconn))))
+;; "Ulex work queue processor")))
+;; ;; (tcp-buffer-size 2048)
+;; (thread-start! th1)
+;; (thread-start! th2)
+;; (udat-cmd-thread-set! uconn th1)
+;; (udat-work-queue-thread-set! uconn th2)
+;; (print "cmd loop and process workers started, listening on "(udat-host-port uconn)".")
+;; uconn)
+;; (assert #f "ERROR: run-listener called without proper setup."))))
+;;
+;; (define (wait-and-close uconn)
+;; (thread-join! (udat-cmd-thread uconn))
+;; (tcp-close (udat-socket uconn)))
+;;
+;; ;;======================================================================
+;; ;; peers and connections
+;; ;;======================================================================
+;;
+;; (define *send-mutex* (make-mutex))
+;;
+;; ;; send structured data to recipient
+;; ;;
+;; ;; NOTE: qrykey is what was called the "cookie" previously
+;; ;;
+;; ;; retval tells send to expect and wait for return data (one line) and return it or time out
+;; ;; this is for ping where we don't want to necessarily have set up our own server yet.
+;; ;;
+;; ;; NOTE: see below for beginnings of code to allow re-use of tcp connections
+;; ;; - I believe (without substantial evidence) that re-using connections will
+;; ;; be beneficial ...
+;; ;;
+;; (define (send udata host-port qrykey cmd params)
+;; (let* ((my-host-port (udat-host-port udata)) ;; remote will return to this
+;; (isme #f #;(equal? host-port my-host-port)) ;; calling myself?
+;; ;; dat is a self-contained work block that can be sent or handled locally
+;; (dat (list my-host-port qrykey cmd params #;(cons (current-seconds)(current-milliseconds)))))
+;; (cond
+;; (isme (ulex-handler udata dat)) ;; no transmission needed
+;; (else
+;; (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC?
+;; exn
+;; (message exn)
+;; (begin
+;; ;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP
+;; (let-values (((inp oup)(tcp-connect host-port)))
+;; (let ((res (if (and inp oup)
+;; (begin
+;; (serialize dat oup)
+;; (close-output-port oup)
+;; (deserialize inp)
+;; )
+;; (begin
+;; (print "ERROR: send called but no receiver has been setup. Please call setup first!")
+;; #f))))
+;; (close-input-port inp)
+;; ;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP
+;; res)))))))) ;; res will always be 'ack unless return-method is direct
+;;
+;; (define (send-via-polling uconn host-port cmd data)
+;; (let* ((qrykey (make-cookie uconn))
+;; (sres (send uconn host-port qrykey cmd data)))
+;; (case sres
+;; ((ack)
+;; (let loop ((start-time (current-milliseconds)))
+;; (if (> (current-milliseconds)(+ start-time 10000)) ;; ten seconds timeout
+;; (begin
+;; (print "ULEX ERROR: timed out waiting for response from "host-port", "cmd" "data)
+;; #f)
+;; (let* ((result (hash-table-ref/default (udat-mboxes uconn) qrykey #f))) ;; NOTE: we are re-using mboxes hash
+;; (if result ;; result is '(status . result-data) or #f for nothing yet
+;; (begin
+;; (hash-table-delete! (udat-mboxes uconn) qrykey)
+;; (cdr result))
+;; (begin
+;; (thread-sleep! 0.01)
+;; (loop start-time)))))))
+;; (else
+;; (print "ULEX ERROR: Communication failed? sres="sres)
+;; #f))))
+;;
+;; (define (send-via-mailbox uconn host-port cmd data)
+;; (let* ((cmbox (get-cmbox uconn)) ;; would it be better to keep a stack of mboxes to reuse?
+;; (qrykey (car cmbox))
+;; (mbox (cdr cmbox))
+;; (mbox-time (current-milliseconds))
+;; (sres (send uconn host-port qrykey cmd data))) ;; short res
+;; (if (eq? sres 'ack)
+;; (let* ((mbox-timeout-secs 120 #;(if (eq? 'primordial (thread-name (current-thread)))
+;; #f
+;; 120)) ;; timeout)
+;; (mbox-timeout-result 'MBOX_TIMEOUT)
+;; (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result))
+;; (mbox-receive-time (current-milliseconds)))
+;; ;; (put-cmbox uconn cmbox) ;; reuse mbox and cookie. is it worth it?
+;; (hash-table-delete! (udat-mboxes uconn) qrykey)
+;; (if (eq? res 'MBOX_TIMEOUT)
+;; (begin
+;; (print "WARNING: mbox timed out for query "cmd", with data "data
+;; ", waiting for response from "host-port".")
+;;
+;; ;; here it might make sense to clean up connection records and force clean start?
+;; ;; NO. The progam using ulex needs to do the reset. Right thing here is exception
+;;
+;; #f) ;; convert to raising exception?
+;; res))
+;; (begin
+;; (print "ERROR: Communication failed? Got "sres)
+;; #f))))
+;;
+;; ;; send a request to the given host-port and register a mailbox in udata
+;; ;; wait for the mailbox data and return it
+;; ;;
+;; (define (send-receive uconn host-port cmd data)
+;; (let* ((start-time (current-milliseconds))
+;; (result (cond
+;; ((member cmd '(ping goodbye)) ;; these are immediate
+;; (send uconn host-port 'ping cmd data))
+;; ((eq? (work-method) 'direct)
+;; ;; the result from send will be the actual result, not an 'ack
+;; (send uconn host-port 'direct cmd data))
+;; (else
+;; (case (return-method)
+;; ((polling)
+;; (send-via-polling uconn host-port cmd data))
+;; ((mailbox)
+;; (send-via-mailbox uconn host-port cmd data))
+;; (else
+;; (print "ULEX ERROR: unrecognised return-method "(return-method)".")
+;; #f)))))
+;; (duration (- (current-milliseconds) start-time)))
+;; ;; this is ONLY for development and debugging. It will be removed once Ulex is stable.
+;; (if (< 5000 duration)
+;; (print "ULEX WARNING: round-trip took "(inexact->exact (round (/ duration 1000)))
+;; " seconds; "cmd", host-port="host-port", data="data))
+;; result))
+;;
+;;
+;; ;;======================================================================
+;; ;; responder side
+;; ;;======================================================================
+;;
+;; ;; take a request, rdat, and if not immediate put it in the work queue
+;; ;;
+;; ;; Reserved cmds; ack ping goodbye response
+;; ;;
+;; (define (ulex-handler uconn rdat)
+;; (assert (list? rdat) "FATAL: ulex-handler give rdat as not list")
+;; (match rdat ;; (string-split controldat)
+;; ((rem-host-port qrykey cmd params);; timedata)
+;; ;; (print "ulex-handler got: "rem-host-port" qrykey: "qrykey" cmd: "cmd" params: "params)
+;; (case cmd
+;; ;; ((ack )(print "Got ack! But why? Should NOT get here.") 'ack)
+;; ((ping)
+;; ;; (print "Got Ping!")
+;; ;; (add-to-work-queue uconn rdat)
+;; 'ack)
+;; ((goodbye)
+;; ;; just clear out references to the caller. NOT COMPLETE
+;; (add-to-work-queue uconn rdat)
+;; 'ack)
+;; ((response) ;; this is a result from remote processing, send it as mail ...
+;; (case (return-method)
+;; ((polling)
+;; (hash-table-set! (udat-mboxes uconn) qrykey (cons 'ok params))
+;; 'ack)
+;; ((mailbox)
+;; (let ((mbox (hash-table-ref/default (udat-mboxes uconn) qrykey #f)))
+;; (if mbox
+;; (begin
+;; (mailbox-send! mbox params) ;; params here is our result
+;; 'ack)
+;; (begin
+;; (print "ERROR: received result but no associated mbox for cookie "qrykey)
+;; 'no-mbox-found))))
+;; (else (print "ULEX ERROR: unrecognised return-method "(return-method))
+;; 'bad-return-method)))
+;; (else ;; generic request - hand it to the work queue
+;; (add-to-work-queue uconn rdat)
+;; 'ack)))
+;; (else
+;; (print "ULEX ERROR: bad rdat "rdat)
+;; 'bad-rdat)))
+;;
+;; ;; given an already set up uconn start the cmd-loop
+;; ;;
+;; (define (ulex-cmd-loop uconn)
+;; (let* ((serv-listener (udat-socket uconn))
+;; (listener (lambda ()
+;; (let loop ((state 'start))
+;; (let-values (((inp oup)(tcp-accept serv-listener)))
+;; ;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP
+;; (let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params)
+;; (resp (ulex-handler uconn rdat)))
+;; (serialize resp oup)
+;; (close-input-port inp)
+;; (close-output-port oup)
+;; ;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP
+;; )
+;; (loop state))))))
+;; ;; start N of them
+;; (let loop ((thnum 0)
+;; (threads '()))
+;; (if (< thnum 100)
+;; (let* ((th (make-thread listener (conc "listener" thnum))))
+;; (thread-start! th)
+;; (loop (+ thnum 1)
+;; (cons th threads)))
+;; (map thread-join! threads)))))
+;;
+;; ;; add a proc to the cmd list, these are done symetrically (i.e. in all instances)
+;; ;; so that the proc can be dereferenced remotely
+;; ;;
+;; (define (set-work-handler uconn proc)
+;; (udat-work-proc-set! uconn proc))
+;;
+;; ;;======================================================================
+;; ;; work queues - this is all happening on the listener side
+;; ;;======================================================================
+;;
+;; ;; rdat is (rem-host-port qrykey cmd params)
+;;
+;; (define (add-to-work-queue uconn rdat)
+;; #;(queue-add! (udat-work-queue uconn) rdat)
+;; (case (work-method)
+;; ((threads)
+;; (thread-start! (make-thread (lambda ()
+;; (do-work uconn rdat))
+;; "worker thread")))
+;; ((mailbox)
+;; (mailbox-send! (udat-work-queue uconn) rdat))
+;; ((direct)
+;; (do-work uconn rdat))
+;; (else
+;; (print "ULEX ERROR: work-method "(work-method)" not recognised, using mailbox.")
+;; (mailbox-send! (udat-work-queue uconn) rdat))))
+;;
+;; ;; move the logic to return the result somewhere else?
+;; ;;
+;; (define (do-work uconn rdat)
+;; (let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change
+;; ;; put this following into a do-work procedure
+;; (match rdat
+;; ((rem-host-port qrykey cmd params)
+;; (let* ((start-time (current-milliseconds))
+;; (result (proc rem-host-port qrykey cmd params))
+;; (end-time (current-milliseconds))
+;; (run-time (- end-time start-time)))
+;; (case (work-method)
+;; ((direct) result)
+;; (else
+;; (print "ULEX: work "cmd", "params" done in "run-time" ms")
+;; ;; send 'response as cmd and result as params
+;; (send uconn rem-host-port qrykey 'response result) ;; could check for ack
+;; (print "ULEX: response sent back to "rem-host-port" in "(- (current-milliseconds) end-time))))))
+;; (MBOX_TIMEOUT 'do-work-timeout)
+;; (else
+;; (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params")))))
+;;
+;; ;; NEW APPROACH:
+;; ;;
+;; (define (process-work-queue uconn)
+;; (let ((wqueue (udat-work-queue uconn))
+;; (proc (udat-work-proc uconn))
+;; (numthr (udat-numthreads uconn)))
+;; (let loop ((thnum 1)
+;; (threads '()))
+;; (let ((thlst (cons (make-thread (lambda ()
+;; (let work-loop ()
+;; (let ((rdat (mailbox-receive! wqueue 24000 'MBOX_TIMEOUT)))
+;; (do-work uconn rdat))
+;; (work-loop)))
+;; (conc "work thread " thnum))
+;; threads)))
+;; (if (< thnum numthr)
+;; (loop (+ thnum 1)
+;; thlst)
+;; (begin
+;; (print "ULEX: Starting "(length thlst)" worker threads.")
+;; (map thread-start! thlst)
+;; (print "ULEX: Threads started. Joining all.")
+;; (map thread-join! thlst)))))))
+;;
+;; ;; below was to enable re-use of connections. This seems non-trivial so for
+;; ;; now lets open on each call
+;; ;;
+;; ;; ;; given host-port get or create peer struct
+;; ;; ;;
+;; ;; (define (udat-get-peer uconn host-port)
+;; ;; (or (hash-table-ref/default (udat-peers uconn) host-port #f)
+;; ;; ;; no peer, so create pdat and init it
+;; ;;
+;; ;; ;; NEED stack of connections, pop and use; inp, oup,
+;; ;; ;; creation_time (remove and create new if over 24hrs old
+;; ;; ;;
+;; ;; (let ((pdat (make-pdat host-port: host-port)))
+;; ;; (hash-table-set! (udat-peers uconn) host-port pdat)
+;; ;; pdat)))
+;; ;;
+;; ;; ;; is pcon alive
+;; ;;
+;; ;; ;; given host-port and pdat get a pcon
+;; ;; ;;
+;; ;; (define (pdat-get-pcon pdat host-port)
+;; ;; (let loop ((conns (pdat-conns pdat)))
+;; ;; (if (null? conns) ;; none? make and return - do NOT add - it will be pushed back on list later
+;; ;; (init-pcon (make-pcon))
+;; ;; (let* ((conn (pop conns)))
+;; ;;
+;; ;; ;; given host-port get a pcon struct
+;; ;; ;;
+;; ;; (define (udat-get-pcon
+;;
+;; ;;======================================================================
+;; ;; misc utils
+;; ;;======================================================================
+;;
+;; (define (make-cookie uconn)
+;; (let ((newcnum (+ (udat-cnum uconn) 1)))
+;; (udat-cnum-set! uconn newcnum)
+;; (conc (udat-host-port uconn) ":"
+;; newcnum)))
+;;
+;; ;; cookie/mboxes
+;;
+;; ;; we store each mbox with a cookie ( . )
+;; ;;
+;; (define (get-cmbox uconn)
+;; (if (null? (udat-avail-cmboxes uconn))
+;; (let ((cookie (make-cookie uconn))
+;; (mbox (make-mailbox)))
+;; (hash-table-set! (udat-mboxes uconn) cookie mbox)
+;; `(,cookie . ,mbox))
+;; (let ((cmbox (car (udat-avail-cmboxes uconn))))
+;; (udat-avail-cmboxes-set! uconn (cdr (udat-avail-cmboxes uconn)))
+;; cmbox)))
+;;
+;; (define (put-cmbox uconn cmbox)
+;; (udat-avail-cmboxes-set! uconn (cons cmbox (udat-avail-cmboxes uconn))))
+;;
+;; (define (pp-uconn uconn)
+;; (pp (udat->alist uconn)))
+;;
+;;
+;; ;;======================================================================
+;; ;; network utilities
+;; ;;======================================================================
+;;
+;; ;; NOTE: Look at address-info egg as alternative to some of this
+;;
+;; (define (rate-ip ipaddr)
+;; (regex-case ipaddr
+;; ( "^127\\..*" _ 0 )
+;; ( "^(10\\.0|192\\.168)\\..*" _ 1 )
+;; ( else 2 ) ))
+;;
+;; ;; Change this to bias for addresses with a reasonable broadcast value?
+;; ;;
+;; (define (ip-pref-less? a b)
+;; (> (rate-ip a) (rate-ip b)))
+;;
+;; (define (get-my-best-address)
+;; (let ((all-my-addresses (get-all-ips)))
+;; (cond
+;; ((null? all-my-addresses)
+;; (get-host-name)) ;; no interfaces?
+;; ((eq? (length all-my-addresses) 1)
+;; (car all-my-addresses)) ;; only one to choose from, just go with it
+;; (else
+;; (car (sort all-my-addresses ip-pref-less?))))))
+;;
+;; (define (get-all-ips-sorted)
+;; (sort (get-all-ips) ip-pref-less?))
+;;
+;; (define (get-all-ips)
+;; (map address-info-host
+;; (filter (lambda (x)
+;; (equal? (address-info-type x) "tcp"))
+;; (address-infos (get-host-name)))))
+;;
+)
Index: ulex-simple/dbmgr.scm
==================================================================
--- ulex-simple/dbmgr.scm
+++ ulex-simple/dbmgr.scm
@@ -158,30 +158,12 @@
;; TODO: This is unnecessarily re-creating the record in the hash table
;;
(define (rmt:open-main-connection remdat apath)
(let* ((fullpath (db:dbname->path apath ".db/main.db"))
(conns (servdat-conns remdat))
- (conn (rmt:get-conn remdat apath ".db/main.db")) ;; (hash-table-ref/default conns fullpath #f)) ;; TODO - create call for this
- (start-rmt:run (lambda ()
- (let* ((th1 (make-thread (lambda ()(rmt:run (get-host-name))) "non-db mode server")))
- (thread-start! th1)
- (thread-sleep! 1)
- (let loop ((count 0))
- (assert (< count 30) "FATAL: responder failed to initialize in rmt:open-main-connection")
- (if (or (not *db-serv-info*)
- (not (servdat-uconn *db-serv-info*)))
- (begin
- (thread-sleep! 1)
- (loop (+ count 1)))
- (begin
- (servdat-mode-set! *db-serv-info* 'non-db)
- (servdat-uconn *db-serv-info*)))))))
- (myconn (servdat-uconn *db-serv-info*)))
+ (conn (rmt:get-conn remdat apath ".db/main.db"))) ;; (hash-table-ref/default conns fullpath #f)) ;; TODO - create call for this
(cond
- ((not myconn)
- (start-rmt:run)
- (rmt:open-main-connection remdat apath))
((and conn ;; conn is NOT a socket, just saying ...
(< (current-seconds) (conndat-expires conn)))
#t) ;; we are current and good to go - we'll deal elsewhere with a server that was killed or died
((and conn
(>= (current-seconds)(conndat-expires conn)))
@@ -314,32 +296,20 @@
(begin
(rmt:open-main-connection sinfo apath)
(if rid (rmt:general-open-connection sinfo apath dbname))
#;(if (not (member cmd '(log-to-main)))
(debug:print-info 0 *default-log-port* "rmt:send-receive "cmd" params="params))
- (rmt:send-receive-real sinfo apath dbname cmd params)))))
-
-;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed
-;; sometime in the future
-;;
-(define (rmt:send-receive-real sinfo apath dbname cmd params)
- (assert (not (eq? 'primordial (thread-name (current-thread)))) "FATAL: Do not call rmt:send-receive-real in the primodial thread.")
- (let* ((cdat (rmt:get-conn sinfo apath dbname)))
- (assert cdat "FATAL: rmt:send-receive-real called without the needed channels opened")
- (let* ((uconn (servdat-uconn sinfo)) ;; get the interface to ulex
- ;; then send-receive using the ulex layer to host-port stored in cdat
- (res (send-receive uconn (conndat-hostport cdat) cmd params))
- #;(th1 (make-thread (lambda ()
- (set! res (send-receive uconn (conndat-hostport cdat) cmd params)))
- "send-receive thread")))
- ;; (thread-start! th1)
- ;; (thread-join! th1) ;; gratuitious thread stuff is so that mailbox is not used in primordial thead
- ;; since we accessed the server we can bump the expires time up
- (conndat-expires-set! cdat (+ (current-seconds)
- (server:expiration-timeout)
- -2)) ;; two second margin for network time misalignments etc.
- res)))
+ (assert (not (eq? 'primordial (thread-name (current-thread)))) "FATAL: Do not call rmt:send-receive-real in the primodial thread.")
+ (let* ((cdat (rmt:get-conn sinfo apath dbname)))
+ (assert cdat "FATAL: rmt:send-receive-real called without the needed channels opened")
+ (let* ((uconn (servdat-uconn sinfo)) ;; get the interface to ulex
+ ;; then send-receive using the ulex layer to host-port stored in cdat
+ (res (send-receive uconn (conndat-hostport cdat) cmd params)))
+ (conndat-expires-set! cdat (+ (current-seconds)
+ (server:expiration-timeout)
+ -2)) ;; two second margin for network time misalignments etc.
+ res)))
;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed
;; sometime in the future.
;;
;; Purpose - call the main.db server and request a server be started
Index: ulex-simple/ulex.scm
==================================================================
--- ulex-simple/ulex.scm
+++ ulex-simple/ulex.scm
@@ -24,11 +24,12 @@
;; Why sql-de-lite and not say, dbi? - performance mostly, then simplicity.
;;
;;======================================================================
(module ulex
- (
+ *
+ #;(
;; NOTE: looking for the handler proc - find the run-listener :)
run-listener ;; (run-listener handler-proc [port]) => uconn
@@ -50,15 +51,20 @@
udat-port
udat-host-port
;; for testing only
;; pp-uconn
+
+ ;; parameters
+ work-method ;; parameter; 'threads, 'mailbox, 'limited, 'direct
+ return-method ;; parameter; 'mailbox, 'polling, 'direct
)
(import scheme
chicken.base
chicken.file
+ chicken.io
chicken.time
chicken.condition
chicken.string
chicken.sort
chicken.pretty-print
@@ -67,10 +73,11 @@
mailbox
matchable
;; queues
regex
regex-case
+ simple-exceptions
s11n
srfi-1
srfi-18
srfi-4
srfi-69
@@ -94,347 +101,469 @@
(work-proc #f) ;; set by user
(cnum 0) ;; cookie number
(mboxes (make-hash-table)) ;; for the replies
(avail-cmboxes '()) ;; list of ( . ) for re-use
;; threads
- (numthreads 50)
- (cmd-thread #f)
- (work-queue-thread #f)
- )
-
-;; ;; struct for keeping track of others we are talking to
-;; ;;
-;; (defstruct pdat
-;; (host-port #f)
-;; (conns '()) ;; list of pcon structs, pop one off when calling the peer
-;; )
-;;
-;; ;; struct for peer connections, keep track of expiration etc.
-;; ;;
-;; (defstruct pcon
-;; (inp #f)
-;; (oup #f)
-;; (exp (+ (current-seconds) 59)) ;; expires at this time, set to (+ (current-seconds) 59)
-;; (lifetime (+ (current-seconds) 600)) ;; throw away and create new after five minutes
-;; )
-
-;;======================================================================
-;; listener
-;;======================================================================
-
-;; is uconn a ulex connector (listener)
-;;
-(define (ulex-listener? uconn)
- (udat? uconn))
-
-;; create a tcp listener and return a populated udat struct with
-;; my port, address, hostname, pid etc.
-;; return #f if fail to find a port to allocate.
-;;
-;; if udata-in is #f create the record
-;; if there is already a serv-listener return the udata
-;;
-(define (setup-listener uconn #!optional (port 4242))
- (handle-exceptions
- exn
- (if (< port 65535)
- (setup-listener uconn (+ port 1))
- #f)
- (connect-listener uconn port)))
-
-(define (connect-listener uconn port)
- ;; (tcp-listener-socket LISTENER)(socket-name so)
- ;; sockaddr-address, sockaddr-port, sockaddr->string
- (let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]])
- (addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname)))
- (udat-port-set! uconn port)
- (udat-host-port-set! uconn (conc addr":"port))
- (udat-socket-set! uconn tlsn)
- uconn))
-
-;; run-listener does all the work of starting a listener in a thread
-;; it then returns control
-;;
-(define (run-listener handler-proc #!optional (port-suggestion 4242))
- (let* ((uconn (make-udat)))
- (udat-work-proc-set! uconn handler-proc)
- (if (setup-listener uconn port-suggestion)
- (let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop"))
- #;(th2 (make-thread (lambda ()(process-work-queue uconn)) "Ulex work queue processor")))
- (tcp-buffer-size 2048)
- ;; (max-connections 2048)
- (thread-start! th1)
- #;(thread-start! th2)
- (udat-cmd-thread-set! uconn th1)
- #;(udat-work-queue-thread-set! uconn th2)
- (print "cmd loop and process workers started")
- uconn)
- (assert #f "ERROR: run-listener called without proper setup."))))
-
-(define (wait-and-close uconn)
- (thread-join! (udat-cmd-thread uconn))
- (tcp-close (udat-socket uconn)))
-
-;;======================================================================
-;; peers and connections
-;;======================================================================
-
-(define *send-mutex* (make-mutex))
-
-;; send structured data to recipient
-;;
-;; NOTE: qrykey is what was called the "cookie" previously
-;;
-;; retval tells send to expect and wait for return data (one line) and return it or time out
-;; this is for ping where we don't want to necessarily have set up our own server yet.
-;;
-;; NOTE: see below for beginnings of code to allow re-use of tcp connections
-;; - I believe (without substantial evidence) that re-using connections will
-;; be beneficial ...
-;;
-(define (send udata host-port qrykey cmd params)
- (mutex-lock! *send-mutex*)
- (let* ((my-host-port (udat-host-port udata)) ;; remote will return to this
- (isme #f #;(equal? host-port my-host-port)) ;; calling myself?
- ;; dat is a self-contained work block that can be sent or handled locally
- (dat (list my-host-port qrykey cmd params)))
- (if isme
- (ulex-handler udata dat) ;; no transmission needed
- (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC?
- exn
- #f
- (let-values (((inp oup)(tcp-connect host-port)))
- (let ((res (if (and inp oup)
- (begin
- (serialize dat oup)
- (deserialize inp)) ;; yes, we always want an ack
- (begin
- (print "ERROR: send called but no receiver has been setup. Please call setup first!")
- #f))))
- (close-input-port inp)
- (close-output-port oup)
- (mutex-unlock! *send-mutex*)
- res)))))) ;; res will always be 'ack
-
-;; send a request to the given host-port and register a mailbox in udata
-;; wait for the mailbox data and return it
-;;
-(define (send-receive uconn host-port cmd data)
- (cond
- ((member cmd '(ping goodbye)) ;; these are immediate
- (send uconn host-port 'ping cmd data))
- (else
- (let* ((cmbox (get-cmbox uconn)) ;; would it be better to keep a stack of mboxes to reuse?
- (qrykey (car cmbox))
- (mbox (cdr cmbox))
- (mbox-time (current-milliseconds))
- (sres (send uconn host-port qrykey cmd data))) ;; short res
- sres))))
-
-;;======================================================================
-;; responder side
-;;======================================================================
-
-;; take a request, rdat, and if not immediate put it in the work queue
-;;
-;; Reserved cmds; ack ping goodbye response
-;;
-(define (ulex-handler uconn rdat)
- (assert (list? rdat) "FATAL: ulex-handler give rdat as not list")
- (match rdat ;; (string-split controldat)
- ((rem-host-port qrykey cmd params)
- ;; (print "ulex-handler got: "rem-host-port" qrykey: "qrykey" cmd: "cmd" params: "params)
- (let ((mbox (hash-table-ref/default (udat-mboxes uconn) qrykey #f)))
- (case cmd
- ;; ((ack )(print "Got ack! But why? Should NOT get here.") 'ack)
- ((ping)
- ;; (print "Got Ping!")
- ;; (add-to-work-queue uconn rdat)
- 'ack)
- (else
- (do-work uconn rdat)))))
- (else
- (print "BAD DATA? controldat=" rdat)
- 'ack) ;; send ack anyway?
- ))
-
-;; given an already set up uconn start the cmd-loop
-;;
-(define (ulex-cmd-loop uconn)
- (let* ((serv-listener (udat-socket uconn)))
- (let loop ((state 'start))
- (let-values (((inp oup)(tcp-accept serv-listener)))
- (let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params)
- (resp (ulex-handler uconn rdat)))
- (if resp (serialize resp oup))
- (close-input-port inp)
- (close-output-port oup))
- (loop state)))))
-;;(define (ulex-cmd-loop uconn)
-;; (let* ((serv-listener (udat-socket uconn))
-;; ;; (old-listener (lambda ()
-;; ;; (let loop ((state 'start))
-;; ;; (let-values (((inp oup)(tcp-accept serv-listener)))
-;; ;; (let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params)
-;; ;; (resp (ulex-handler uconn rdat)))
-;; ;; (if resp (serialize resp oup))
-;; ;; (close-input-port inp)
-;; ;; (close-output-port oup))
-;; ;; (loop state)))))
-;; (server (make-tcp-server
-;; serv-listener
-;; (lambda ()
-;; (let* ((rdat (deserialize )) ;; '(my-host-port qrykey cmd params)
-;; (resp (ulex-handler uconn rdat)))
-;; (if resp (serialize resp) resp))))))
-;; (server)))
-
-;; add a proc to the cmd list, these are done symetrically (i.e. in all instances)
-;; so that the proc can be dereferenced remotely
-;;
-(define (set-work-handler uconn proc)
- (udat-work-proc-set! uconn proc))
-
-;;======================================================================
-;; work queues - this is all happening on the listener side
-;;======================================================================
-
-;; rdat is (rem-host-port qrykey cmd params)
-
-(define (add-to-work-queue uconn rdat)
- #;(queue-add! (udat-work-queue uconn) rdat)
- (mailbox-send! (udat-work-queue uconn) rdat))
-
-(define (do-work uconn rdat)
- (let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change
- ;; put this following into a do-work procedure
- (match rdat
- ((rem-host-port qrykey cmd params)
- (let* ((start-time (current-milliseconds))
- (result (proc rem-host-port qrykey cmd params))
- (end-time (current-milliseconds))
- (run-time (- end-time start-time)))
- result))
- (else
- (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params")
- #f))))
-
-(define (process-work-queue uconn)
- (let ((wqueue (udat-work-queue uconn))
- (proc (udat-work-proc uconn))
- (numthr (udat-numthreads uconn)))
- (let loop ((thnum 1)
- (threads '()))
- (let ((thlst (cons (make-thread (lambda ()
- (let work-loop ()
- (let ((rdat (mailbox-receive! wqueue 24000 'MBOX_TIMEOUT)))
- (do-work uconn rdat))
- (work-loop)))
- (conc "work thread " thnum))
- threads)))
- (if (< thnum numthr)
- (loop (+ thnum 1)
- thlst)
- (begin
- (print "ULEX: Starting "(length thlst)" worker threads.")
- (map thread-start! thlst)
- (print "ULEX: Threads started. Joining all.")
- (map thread-join! thlst)))))))
-
-;; below was to enable re-use of connections. This seems non-trivial so for
-;; now lets open on each call
-;;
-;; ;; given host-port get or create peer struct
-;; ;;
-;; (define (udat-get-peer uconn host-port)
-;; (or (hash-table-ref/default (udat-peers uconn) host-port #f)
-;; ;; no peer, so create pdat and init it
-;;
-;; ;; NEED stack of connections, pop and use; inp, oup,
-;; ;; creation_time (remove and create new if over 24hrs old
-;; ;;
-;; (let ((pdat (make-pdat host-port: host-port)))
-;; (hash-table-set! (udat-peers uconn) host-port pdat)
-;; pdat)))
-;;
-;; ;; is pcon alive
-;;
-;; ;; given host-port and pdat get a pcon
-;; ;;
-;; (define (pdat-get-pcon pdat host-port)
-;; (let loop ((conns (pdat-conns pdat)))
-;; (if (null? conns) ;; none? make and return - do NOT add - it will be pushed back on list later
-;; (init-pcon (make-pcon))
-;; (let* ((conn (pop conns)))
-;;
-;; ;; given host-port get a pcon struct
-;; ;;
-;; (define (udat-get-pcon
-
-;;======================================================================
-;; misc utils
-;;======================================================================
-
-(define (make-cookie uconn)
- (let ((newcnum (+ (udat-cnum uconn) 1)))
- (udat-cnum-set! uconn newcnum)
- (conc (udat-host-port uconn) ":"
- newcnum)))
-
-;; cookie/mboxes
-
-;; we store each mbox with a cookie ( . )
-;;
-(define (get-cmbox uconn)
- (if (null? (udat-avail-cmboxes uconn))
- (let ((cookie (make-cookie uconn))
- (mbox (make-mailbox)))
- (hash-table-set! (udat-mboxes uconn) cookie mbox)
- `(,cookie . ,mbox))
- (let ((cmbox (car (udat-avail-cmboxes uconn))))
- (udat-avail-cmboxes-set! uconn (cdr (udat-avail-cmboxes uconn)))
- cmbox)))
-
-(define (put-cmbox uconn cmbox)
- (udat-avail-cmboxes-set! uconn (cons cmbox (udat-avail-cmboxes uconn))))
-
-(define (pp-uconn uconn)
- (pp (udat->alist uconn)))
-
-
-;;======================================================================
-;; network utilities
-;;======================================================================
-
-;; NOTE: Look at address-info egg as alternative to some of this
-
-(define (rate-ip ipaddr)
- (regex-case ipaddr
- ( "^127\\..*" _ 0 )
- ( "^(10\\.0|192\\.168)\\..*" _ 1 )
- ( else 2 ) ))
-
-;; Change this to bias for addresses with a reasonable broadcast value?
-;;
-(define (ip-pref-less? a b)
- (> (rate-ip a) (rate-ip b)))
-
-(define (get-my-best-address)
- (let ((all-my-addresses (get-all-ips)))
- (cond
- ((null? all-my-addresses)
- (get-host-name)) ;; no interfaces?
- ((eq? (length all-my-addresses) 1)
- (car all-my-addresses)) ;; only one to choose from, just go with it
- (else
- (car (sort all-my-addresses ip-pref-less?))))))
-
-(define (get-all-ips-sorted)
- (sort (get-all-ips) ip-pref-less?))
-
-(define (get-all-ips)
- (map address-info-host
- (filter (lambda (x)
- (equal? (address-info-type x) "tcp"))
- (address-infos (get-host-name)))))
-
+ (numthreads 10)
+ (cmd-thread #f)
+ (work-queue-thread #f)
+ (num-threads-running 0)
+ )
+
+;; == << ;; Parameters
+;; == <<
+;; == << ;; work-method:
+;; == << (define work-method (make-parameter 'mailbox))
+;; == << ;; mailbox - all rdat goes through mailbox
+;; == << ;; threads - all rdat immediately executed in new thread
+;; == << ;; direct - no queuing
+;; == << ;;
+;; == <<
+;; == << ;; return-method, return the result to waiting send-receive:
+;; == << (define return-method (make-parameter 'mailbox))
+;; == << ;; mailbox - create a mailbox and use it for passing returning results to send-receive
+;; == << ;; polling - put the result in a hash table keyed by qrykey and send-receive can poll it for result
+;; == << ;; direct - no queuing, result is passed back in single tcp connection
+;; == << ;;
+;; == <<
+;; == << ;; ;; struct for keeping track of others we are talking to
+;; == << ;; ;;
+;; == << ;; (defstruct pdat
+;; == << ;; (host-port #f)
+;; == << ;; (conns '()) ;; list of pcon structs, pop one off when calling the peer
+;; == << ;; )
+;; == << ;;
+;; == << ;; ;; struct for peer connections, keep track of expiration etc.
+;; == << ;; ;;
+;; == << ;; (defstruct pcon
+;; == << ;; (inp #f)
+;; == << ;; (oup #f)
+;; == << ;; (exp (+ (current-seconds) 59)) ;; expires at this time, set to (+ (current-seconds) 59)
+;; == << ;; (lifetime (+ (current-seconds) 600)) ;; throw away and create new after five minutes
+;; == << ;; )
+;; == <<
+;; == << ;;======================================================================
+;; == << ;; listener
+;; == << ;;======================================================================
+;; == <<
+;; == << ;; is uconn a ulex connector (listener)
+;; == << ;;
+;; == << (define (ulex-listener? uconn)
+;; == << (udat? uconn))
+;; == <<
+;; == << ;; create a tcp listener and return a populated udat struct with
+;; == << ;; my port, address, hostname, pid etc.
+;; == << ;; return #f if fail to find a port to allocate.
+;; == << ;;
+;; == << ;; if udata-in is #f create the record
+;; == << ;; if there is already a serv-listener return the udata
+;; == << ;;
+;; == << (define (setup-listener uconn #!optional (port 4242))
+;; == << (handle-exceptions
+;; == << exn
+;; == << (if (< port 65535)
+;; == << (setup-listener uconn (+ port 1))
+;; == << #f)
+;; == << (connect-listener uconn port)))
+;; == <<
+;; == << (define (connect-listener uconn port)
+;; == << ;; (tcp-listener-socket LISTENER)(socket-name so)
+;; == << ;; sockaddr-address, sockaddr-port, sockaddr->string
+;; == << (let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]])
+;; == << (addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname)))
+;; == << (udat-port-set! uconn port)
+;; == << (udat-host-port-set! uconn (conc addr":"port))
+;; == << (udat-socket-set! uconn tlsn)
+;; == << uconn))
+;; == <<
+;; == << ;; run-listener does all the work of starting a listener in a thread
+;; == << ;; it then returns control
+;; == << ;;
+;; == << (define (run-listener handler-proc #!optional (port-suggestion 4242))
+;; == << (let* ((uconn (make-udat)))
+;; == << (udat-work-proc-set! uconn handler-proc)
+;; == << (if (setup-listener uconn port-suggestion)
+;; == << (let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop"))
+;; == << (th2 (make-thread (lambda ()
+;; == << (case (work-method)
+;; == << ((mailbox limited)
+;; == << (process-work-queue uconn))))
+;; == << "Ulex work queue processor")))
+;; == << ;; (tcp-buffer-size 2048)
+;; == << (thread-start! th1)
+;; == << (thread-start! th2)
+;; == << (udat-cmd-thread-set! uconn th1)
+;; == << (udat-work-queue-thread-set! uconn th2)
+;; == << (print "cmd loop and process workers started, listening on "(udat-host-port uconn)".")
+;; == << uconn)
+;; == << (assert #f "ERROR: run-listener called without proper setup."))))
+;; == <<
+;; == << (define (wait-and-close uconn)
+;; == << (thread-join! (udat-cmd-thread uconn))
+;; == << (tcp-close (udat-socket uconn)))
+;; == <<
+;; == << ;;======================================================================
+;; == << ;; peers and connections
+;; == << ;;======================================================================
+;; == <<
+;; == << (define *send-mutex* (make-mutex))
+;; == <<
+;; == << ;; send structured data to recipient
+;; == << ;;
+;; == << ;; NOTE: qrykey is what was called the "cookie" previously
+;; == << ;;
+;; == << ;; retval tells send to expect and wait for return data (one line) and return it or time out
+;; == << ;; this is for ping where we don't want to necessarily have set up our own server yet.
+;; == << ;;
+;; == << ;; NOTE: see below for beginnings of code to allow re-use of tcp connections
+;; == << ;; - I believe (without substantial evidence) that re-using connections will
+;; == << ;; be beneficial ...
+;; == << ;;
+;; == << (define (send udata host-port qrykey cmd params)
+;; == << (let* ((my-host-port (udat-host-port udata)) ;; remote will return to this
+;; == << (isme #f #;(equal? host-port my-host-port)) ;; calling myself?
+;; == << ;; dat is a self-contained work block that can be sent or handled locally
+;; == << (dat (list my-host-port qrykey cmd params #;(cons (current-seconds)(current-milliseconds)))))
+;; == << (cond
+;; == << (isme (ulex-handler udata dat)) ;; no transmission needed
+;; == << (else
+;; == << (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC?
+;; == << exn
+;; == << (message exn)
+;; == << (begin
+;; == << ;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP
+;; == << (let-values (((inp oup)(tcp-connect host-port)))
+;; == << (let ((res (if (and inp oup)
+;; == << (begin
+;; == << (serialize dat oup)
+;; == << (close-output-port oup)
+;; == << (deserialize inp)
+;; == << )
+;; == << (begin
+;; == << (print "ERROR: send called but no receiver has been setup. Please call setup first!")
+;; == << #f))))
+;; == << (close-input-port inp)
+;; == << ;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP
+;; == << res)))))))) ;; res will always be 'ack unless return-method is direct
+;; == <<
+;; == << (define (send-via-polling uconn host-port cmd data)
+;; == << (let* ((qrykey (make-cookie uconn))
+;; == << (sres (send uconn host-port qrykey cmd data)))
+;; == << (case sres
+;; == << ((ack)
+;; == << (let loop ((start-time (current-milliseconds)))
+;; == << (if (> (current-milliseconds)(+ start-time 10000)) ;; ten seconds timeout
+;; == << (begin
+;; == << (print "ULEX ERROR: timed out waiting for response from "host-port", "cmd" "data)
+;; == << #f)
+;; == << (let* ((result (hash-table-ref/default (udat-mboxes uconn) qrykey #f))) ;; NOTE: we are re-using mboxes hash
+;; == << (if result ;; result is '(status . result-data) or #f for nothing yet
+;; == << (begin
+;; == << (hash-table-delete! (udat-mboxes uconn) qrykey)
+;; == << (cdr result))
+;; == << (begin
+;; == << (thread-sleep! 0.01)
+;; == << (loop start-time)))))))
+;; == << (else
+;; == << (print "ULEX ERROR: Communication failed? sres="sres)
+;; == << #f))))
+;; == <<
+;; == << (define (send-via-mailbox uconn host-port cmd data)
+;; == << (let* ((cmbox (get-cmbox uconn)) ;; would it be better to keep a stack of mboxes to reuse?
+;; == << (qrykey (car cmbox))
+;; == << (mbox (cdr cmbox))
+;; == << (mbox-time (current-milliseconds))
+;; == << (sres (send uconn host-port qrykey cmd data))) ;; short res
+;; == << (if (eq? sres 'ack)
+;; == << (let* ((mbox-timeout-secs 120 #;(if (eq? 'primordial (thread-name (current-thread)))
+;; == << #f
+;; == << 120)) ;; timeout)
+;; == << (mbox-timeout-result 'MBOX_TIMEOUT)
+;; == << (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result))
+;; == << (mbox-receive-time (current-milliseconds)))
+;; == << ;; (put-cmbox uconn cmbox) ;; reuse mbox and cookie. is it worth it?
+;; == << (hash-table-delete! (udat-mboxes uconn) qrykey)
+;; == << (if (eq? res 'MBOX_TIMEOUT)
+;; == << (begin
+;; == << (print "WARNING: mbox timed out for query "cmd", with data "data
+;; == << ", waiting for response from "host-port".")
+;; == <<
+;; == << ;; here it might make sense to clean up connection records and force clean start?
+;; == << ;; NO. The progam using ulex needs to do the reset. Right thing here is exception
+;; == <<
+;; == << #f) ;; convert to raising exception?
+;; == << res))
+;; == << (begin
+;; == << (print "ERROR: Communication failed? Got "sres)
+;; == << #f))))
+;; == <<
+;; == << ;; send a request to the given host-port and register a mailbox in udata
+;; == << ;; wait for the mailbox data and return it
+;; == << ;;
+;; == << (define (send-receive uconn host-port cmd data)
+;; == << (let* ((start-time (current-milliseconds))
+;; == << (result (cond
+;; == << ((member cmd '(ping goodbye)) ;; these are immediate
+;; == << (send uconn host-port 'ping cmd data))
+;; == << ((eq? (work-method) 'direct)
+;; == << ;; the result from send will be the actual result, not an 'ack
+;; == << (send uconn host-port 'direct cmd data))
+;; == << (else
+;; == << (case (return-method)
+;; == << ((polling)
+;; == << (send-via-polling uconn host-port cmd data))
+;; == << ((mailbox)
+;; == << (send-via-mailbox uconn host-port cmd data))
+;; == << (else
+;; == << (print "ULEX ERROR: unrecognised return-method "(return-method)".")
+;; == << #f)))))
+;; == << (duration (- (current-milliseconds) start-time)))
+;; == << ;; this is ONLY for development and debugging. It will be removed once Ulex is stable.
+;; == << (if (< 5000 duration)
+;; == << (print "ULEX WARNING: round-trip took "(inexact->exact (round (/ duration 1000)))
+;; == << " seconds; "cmd", host-port="host-port", data="data))
+;; == << result))
+;; == <<
+;; == <<
+;; == << ;;======================================================================
+;; == << ;; responder side
+;; == << ;;======================================================================
+;; == <<
+;; == << ;; take a request, rdat, and if not immediate put it in the work queue
+;; == << ;;
+;; == << ;; Reserved cmds; ack ping goodbye response
+;; == << ;;
+;; == << (define (ulex-handler uconn rdat)
+;; == << (assert (list? rdat) "FATAL: ulex-handler give rdat as not list")
+;; == << (match rdat ;; (string-split controldat)
+;; == << ((rem-host-port qrykey cmd params);; timedata)
+;; == << ;; (print "ulex-handler got: "rem-host-port" qrykey: "qrykey" cmd: "cmd" params: "params)
+;; == << (case cmd
+;; == << ;; ((ack )(print "Got ack! But why? Should NOT get here.") 'ack)
+;; == << ((ping)
+;; == << ;; (print "Got Ping!")
+;; == << ;; (add-to-work-queue uconn rdat)
+;; == << 'ack)
+;; == << ((goodbye)
+;; == << ;; just clear out references to the caller. NOT COMPLETE
+;; == << (add-to-work-queue uconn rdat)
+;; == << 'ack)
+;; == << ((response) ;; this is a result from remote processing, send it as mail ...
+;; == << (case (return-method)
+;; == << ((polling)
+;; == << (hash-table-set! (udat-mboxes uconn) qrykey (cons 'ok params))
+;; == << 'ack)
+;; == << ((mailbox)
+;; == << (let ((mbox (hash-table-ref/default (udat-mboxes uconn) qrykey #f)))
+;; == << (if mbox
+;; == << (begin
+;; == << (mailbox-send! mbox params) ;; params here is our result
+;; == << 'ack)
+;; == << (begin
+;; == << (print "ERROR: received result but no associated mbox for cookie "qrykey)
+;; == << 'no-mbox-found))))
+;; == << (else (print "ULEX ERROR: unrecognised return-method "(return-method))
+;; == << 'bad-return-method)))
+;; == << (else ;; generic request - hand it to the work queue
+;; == << (add-to-work-queue uconn rdat)
+;; == << 'ack)))
+;; == << (else
+;; == << (print "ULEX ERROR: bad rdat "rdat)
+;; == << 'bad-rdat)))
+;; == <<
+;; == << ;; given an already set up uconn start the cmd-loop
+;; == << ;;
+;; == << (define (ulex-cmd-loop uconn)
+;; == << (let* ((serv-listener (udat-socket uconn))
+;; == << (listener (lambda ()
+;; == << (let loop ((state 'start))
+;; == << (let-values (((inp oup)(tcp-accept serv-listener)))
+;; == << ;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP
+;; == << (let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params)
+;; == << (resp (ulex-handler uconn rdat)))
+;; == << (serialize resp oup)
+;; == << (close-input-port inp)
+;; == << (close-output-port oup)
+;; == << ;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP
+;; == << )
+;; == << (loop state))))))
+;; == << ;; start N of them
+;; == << (let loop ((thnum 0)
+;; == << (threads '()))
+;; == << (if (< thnum 100)
+;; == << (let* ((th (make-thread listener (conc "listener" thnum))))
+;; == << (thread-start! th)
+;; == << (loop (+ thnum 1)
+;; == << (cons th threads)))
+;; == << (map thread-join! threads)))))
+;; == <<
+;; == << ;; add a proc to the cmd list, these are done symetrically (i.e. in all instances)
+;; == << ;; so that the proc can be dereferenced remotely
+;; == << ;;
+;; == << (define (set-work-handler uconn proc)
+;; == << (udat-work-proc-set! uconn proc))
+;; == <<
+;; == << ;;======================================================================
+;; == << ;; work queues - this is all happening on the listener side
+;; == << ;;======================================================================
+;; == <<
+;; == << ;; rdat is (rem-host-port qrykey cmd params)
+;; == <<
+;; == << (define (add-to-work-queue uconn rdat)
+;; == << #;(queue-add! (udat-work-queue uconn) rdat)
+;; == << (case (work-method)
+;; == << ((threads)
+;; == << (thread-start! (make-thread (lambda ()
+;; == << (do-work uconn rdat))
+;; == << "worker thread")))
+;; == << ((mailbox)
+;; == << (mailbox-send! (udat-work-queue uconn) rdat))
+;; == << ((direct)
+;; == << (do-work uconn rdat))
+;; == << (else
+;; == << (print "ULEX ERROR: work-method "(work-method)" not recognised, using mailbox.")
+;; == << (mailbox-send! (udat-work-queue uconn) rdat))))
+;; == <<
+;; == << ;; move the logic to return the result somewhere else?
+;; == << ;;
+;; == << (define (do-work uconn rdat)
+;; == << (let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change
+;; == << ;; put this following into a do-work procedure
+;; == << (match rdat
+;; == << ((rem-host-port qrykey cmd params)
+;; == << (let* ((start-time (current-milliseconds))
+;; == << (result (proc rem-host-port qrykey cmd params))
+;; == << (end-time (current-milliseconds))
+;; == << (run-time (- end-time start-time)))
+;; == << (case (work-method)
+;; == << ((direct) result)
+;; == << (else
+;; == << (print "ULEX: work "cmd", "params" done in "run-time" ms")
+;; == << ;; send 'response as cmd and result as params
+;; == << (send uconn rem-host-port qrykey 'response result) ;; could check for ack
+;; == << (print "ULEX: response sent back to "rem-host-port" in "(- (current-milliseconds) end-time))))))
+;; == << (MBOX_TIMEOUT 'do-work-timeout)
+;; == << (else
+;; == << (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params")))))
+;; == <<
+;; == << ;; NEW APPROACH:
+;; == << ;;
+;; == << (define (process-work-queue uconn)
+;; == << (let ((wqueue (udat-work-queue uconn))
+;; == << (proc (udat-work-proc uconn))
+;; == << (numthr (udat-numthreads uconn)))
+;; == << (let loop ((thnum 1)
+;; == << (threads '()))
+;; == << (let ((thlst (cons (make-thread (lambda ()
+;; == << (let work-loop ()
+;; == << (let ((rdat (mailbox-receive! wqueue 24000 'MBOX_TIMEOUT)))
+;; == << (do-work uconn rdat))
+;; == << (work-loop)))
+;; == << (conc "work thread " thnum))
+;; == << threads)))
+;; == << (if (< thnum numthr)
+;; == << (loop (+ thnum 1)
+;; == << thlst)
+;; == << (begin
+;; == << (print "ULEX: Starting "(length thlst)" worker threads.")
+;; == << (map thread-start! thlst)
+;; == << (print "ULEX: Threads started. Joining all.")
+;; == << (map thread-join! thlst)))))))
+;; == <<
+;; == << ;; below was to enable re-use of connections. This seems non-trivial so for
+;; == << ;; now lets open on each call
+;; == << ;;
+;; == << ;; ;; given host-port get or create peer struct
+;; == << ;; ;;
+;; == << ;; (define (udat-get-peer uconn host-port)
+;; == << ;; (or (hash-table-ref/default (udat-peers uconn) host-port #f)
+;; == << ;; ;; no peer, so create pdat and init it
+;; == << ;;
+;; == << ;; ;; NEED stack of connections, pop and use; inp, oup,
+;; == << ;; ;; creation_time (remove and create new if over 24hrs old
+;; == << ;; ;;
+;; == << ;; (let ((pdat (make-pdat host-port: host-port)))
+;; == << ;; (hash-table-set! (udat-peers uconn) host-port pdat)
+;; == << ;; pdat)))
+;; == << ;;
+;; == << ;; ;; is pcon alive
+;; == << ;;
+;; == << ;; ;; given host-port and pdat get a pcon
+;; == << ;; ;;
+;; == << ;; (define (pdat-get-pcon pdat host-port)
+;; == << ;; (let loop ((conns (pdat-conns pdat)))
+;; == << ;; (if (null? conns) ;; none? make and return - do NOT add - it will be pushed back on list later
+;; == << ;; (init-pcon (make-pcon))
+;; == << ;; (let* ((conn (pop conns)))
+;; == << ;;
+;; == << ;; ;; given host-port get a pcon struct
+;; == << ;; ;;
+;; == << ;; (define (udat-get-pcon
+;; == <<
+;; == << ;;======================================================================
+;; == << ;; misc utils
+;; == << ;;======================================================================
+;; == <<
+;; == << (define (make-cookie uconn)
+;; == << (let ((newcnum (+ (udat-cnum uconn) 1)))
+;; == << (udat-cnum-set! uconn newcnum)
+;; == << (conc (udat-host-port uconn) ":"
+;; == << newcnum)))
+;; == <<
+;; == << ;; cookie/mboxes
+;; == <<
+;; == << ;; we store each mbox with a cookie ( . )
+;; == << ;;
+;; == << (define (get-cmbox uconn)
+;; == << (if (null? (udat-avail-cmboxes uconn))
+;; == << (let ((cookie (make-cookie uconn))
+;; == << (mbox (make-mailbox)))
+;; == << (hash-table-set! (udat-mboxes uconn) cookie mbox)
+;; == << `(,cookie . ,mbox))
+;; == << (let ((cmbox (car (udat-avail-cmboxes uconn))))
+;; == << (udat-avail-cmboxes-set! uconn (cdr (udat-avail-cmboxes uconn)))
+;; == << cmbox)))
+;; == <<
+;; == << (define (put-cmbox uconn cmbox)
+;; == << (udat-avail-cmboxes-set! uconn (cons cmbox (udat-avail-cmboxes uconn))))
+;; == <<
+;; == << (define (pp-uconn uconn)
+;; == << (pp (udat->alist uconn)))
+;; == <<
+;; == <<
+;; == << ;;======================================================================
+;; == << ;; network utilities
+;; == << ;;======================================================================
+;; == <<
+;; == << ;; NOTE: Look at address-info egg as alternative to some of this
+;; == <<
+;; == << (define (rate-ip ipaddr)
+;; == << (regex-case ipaddr
+;; == << ( "^127\\..*" _ 0 )
+;; == << ( "^(10\\.0|192\\.168)\\..*" _ 1 )
+;; == << ( else 2 ) ))
+;; == <<
+;; == << ;; Change this to bias for addresses with a reasonable broadcast value?
+;; == << ;;
+;; == << (define (ip-pref-less? a b)
+;; == << (> (rate-ip a) (rate-ip b)))
+;; == <<
+;; == << (define (get-my-best-address)
+;; == << (let ((all-my-addresses (get-all-ips)))
+;; == << (cond
+;; == << ((null? all-my-addresses)
+;; == << (get-host-name)) ;; no interfaces?
+;; == << ((eq? (length all-my-addresses) 1)
+;; == << (car all-my-addresses)) ;; only one to choose from, just go with it
+;; == << (else
+;; == << (car (sort all-my-addresses ip-pref-less?))))))
+;; == <<
+;; == << (define (get-all-ips-sorted)
+;; == << (sort (get-all-ips) ip-pref-less?))
+;; == <<
+;; == << (define (get-all-ips)
+;; == << (map address-info-host
+;; == << (filter (lambda (x)
+;; == << (equal? (address-info-type x) "tcp"))
+;; == << (address-infos (get-host-name)))))
+;; == <<
)