Index: Makefile
--- Makefile
+++ Makefile
@@ -24,10 +24,18 @@
# all : $(PREFIX)/bin/.$(ARCHSTR) mtest dboard mtut ndboard
all : $(PREFIX)/bin/.$(ARCHSTR) mtest
# add dboard mtut and tcmt back later
+# Configuration stuff
+transport-flavor :
+ @echo Creating transport-flavor with full as flavor. Options include: full, simple
+ echo full > transport-flavor
+ulex.scm dbmgrmod.scm : ulex.scm.template dbmgrmod.scm.template transport-flavor ulex-*/*scm
+ ./configure
# module source files
MSRCFILES = autoload.scm dbi.scm ducttape-lib.scm pkts.scm stml2.scm \
cookie.scm mutils.scm mtargs.scm apimod.scm ulex.scm \
configfmod.scm commonmod.scm dbmod.scm rmtmod.scm \
@@ -36,23 +44,17 @@
itemsmod.scm keysmod.scm mtmod.scm rmtmod.scm \
tasksmod.scm pgdb.scm launchmod.scm runsmod.scm \
portloggermod.scm archivemod.scm ezstepsmod.scm \
subrunmod.scm bigmod.scm testsmod.scm dbmgrmod.scm
GUIMODFILES = tree.scm dashboard-tests.scm vgmod.scm \
dashboard-context-menu.scm dcommon.scm
-# dashboard-guimonitor.scm
mofiles/dashboard-context-menu.o : mofiles/dcommon.o
mofiles/dashboard-tests.o : mofiles/dcommon.o
-# mofiles/dcommon.o mofiles/tree.o : mofiles/gutils.o
OFILES = $(SRCFILES:%.scm=%.o)
-# GOFILES = $(GUISRCF:%.scm=%.o)
MOFILES = $(addprefix mofiles/,$(MSRCFILES:%.scm=%.o))
GMOFILES = $(addprefix mofiles/,$(GUIMODFILES:%.scm=%.o))
# compiled import files
ADDED attic/configure
Index: attic/configure
--- /dev/null
+++ attic/configure
@@ -0,0 +1,101 @@
+# Copyright 2006-2017, Matthew Welland.
+# This file is part of Megatest.
+# Megatest is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# Megatest is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with Megatest. If not, see .
+# Configure the build
+if [[ "$1"x == "x" ]];then
+# Configure stuff needed for eggs
+function configure_dependencies () {
+ #======================================================================
+ # libnanomsg
+ #======================================================================
+ if [[ ! $(ls /usr/lib/*/libnanomsg*) ]];then
+ echo "libnanomsg build needed."
+ echo "BUILD_NANOMSG=yes" >>
+ fi
+ #======================================================================
+ # postgresql libraries
+ #======================================================================
+ if [[ ! $(ls /usr/lib/*/libpq.*) ]];then
+ echo "Postgresql build needed."
+ echo "BUILD_POSTGRES=yes" >>
+ fi
+ if [[ ! $(ls /usr/lib/*/libsqlite3.*) ]];then
+ echo "Sqlite3 build needed."
+ echo "BUILD_SQLITE3=yes" >>
+ fi
+# Initialize
+echo "" >
+# Do we need Chicken?
+if [[ -e /usr/bin/sw_vers ]]; then
+ ARCHSTR=$(/usr/bin/sw_vers -productVersion)
+ ARCHSTR=$(lsb_release -sr)
+if [[ ! $(type csi) ]];then
+ echo "Chicken build needed."
+ echo "BUILD_CHICKEN=yes" >>
+ configure_dependencies
+ echo "include chicken.makefile" >>
+ echo "CSIPATH=$(which csi)" >>
+ CSIPATH=$(which csi)
+ echo "CKPATH=$(dirname $(dirname $CSIPATH))" >>
+# Make setup scripts
+echo "#!/bin/bash" >
+echo "export PATH=$CHICKEN_PREFIX/bin:\$PATH" >>
+echo "export LD_LIBRARY_PATH=$CHICKEN_PREFIX/lib" >>
+echo 'exec "$@"' >>
+chmod a+x
+echo "setenv PATH $CHICKEN_PREFIX/bin:\$PATH" > setup.csh
+echo "setenv LD_LIBRARY_PATH $CHICKEN_PREFIX/lib" >> setup.csh
+echo "All done creating, feel free to edit it!"
+echo "run \" bash\" or source setup.csh to get PATH and LD_LIBRARY_PATH adjusted"
Index: commonmod.scm
--- commonmod.scm
+++ commonmod.scm
@@ -184,10 +184,11 @@
@@ -1242,10 +1243,17 @@
(define (common:simple-file-release-lock fname)
#f ;; I don't really care why this failed (at least for now)
(delete-file* fname)))
+(define (common:with-simple-file-lock fname proc)
+ (let* ((lkfname (conc fname ".lock")))
+ (common:simple-file-lock-and-wait lkfname)
+ (let ((res (proc)))
+ (common:simple-file-release-lock lkfname)
+ res)))
;; PUlled below from common.scm
Index: configfmod.scm
--- configfmod.scm
+++ configfmod.scm
@@ -49,10 +49,11 @@
+ my-with-lock
@@ -114,14 +115,19 @@
;; while targets are Megatest specific they are a useful concept
(define mytarget (make-parameter #f))
+;; fake locker
+(define (fake-locker fname proc)(proc))
;; locking is optional, many environments don't care (e.g. running on one machine)
;; NOTE: the locker must follow the same syntax as with-dot-lock*
+;; with-dot-lock* has problems if /tmp and the file being
+;; locked are not on the same filesystem
-(define my-with-lock (make-parameter with-dot-lock*))
+(define my-with-lock (make-parameter fake-locker)) ;; with-dot-lock*))
;; move debug stuff to separate module then put these back where they belong
@@ -1190,11 +1196,11 @@
-(define (configf:write-alist cdat fname)
+(define (configf:write-alist cdat fname #!optional (check-written #f))
;; (if (not (common:faux-lock fname))
;; (debug:print 0 *default-log-port* "INFO: NEED LOCKING ADDED HERE " fname)
(lambda ()
@@ -1202,26 +1208,27 @@
(with-output-to-file fname ;; first write out the file
(lambda ()
(pp dat)))
- ;; I don't like this. It makes write-alist opaque and complicated. -mrw-
- (if (file-exists? fname) ;; now verify it is readable
- (if (configf:read-alist fname)
- #t ;; data is good.
- (begin
- (handle-exceptions
- exn
- (begin
- (debug:print 0 *default-log-port* "deleting " fname " failed, exn=" exn)
- #f)
- (debug:print 0 *default-log-port* "WARNING: content " dat " for cache " fname " is not readable. Deleting generated file.")
- (delete-file fname))
- #f))
- #f))))
+ ;; I don't like this. It makes write-alist complicated
+ ;; move to something like write-and-verify-alist. -mrw-
+ (if check-written
+ (if (file-exists? fname) ;; now verify it is readable
+ (if (configf:read-alist fname)
+ 'data-good ;; data is good.
+ (handle-exceptions
+ exn
+ (begin
+ (debug:print 0 *default-log-port* "deleting " fname " failed, exn=" exn)
+ 'data-bad)
+ (debug:print 0 *default-log-port* "WARNING: content " dat " for cache " fname " is not readable. Deleting generated file.")
+ (delete-file fname)))
+ 'data-not-there)
+ 'data-not-checked))))
(define (common:get-fields cfgdat)
(let ((fields (hash-table-ref/default cfgdat "fields" '())))
(map car fields)))
Index: configure
--- configure
+++ configure
@@ -15,87 +15,18 @@
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with Megatest. If not, see .
-# Configure the build
-if [[ "$1"x == "x" ]];then
-# Configure stuff needed for eggs
-function configure_dependencies () {
- #======================================================================
- # libnanomsg
- #======================================================================
- if [[ ! $(ls /usr/lib/*/libnanomsg*) ]];then
- echo "libnanomsg build needed."
- echo "BUILD_NANOMSG=yes" >>
- fi
- #======================================================================
- # postgresql libraries
- #======================================================================
- if [[ ! $(ls /usr/lib/*/libpq.*) ]];then
- echo "Postgresql build needed."
- echo "BUILD_POSTGRES=yes" >>
- fi
- if [[ ! $(ls /usr/lib/*/libsqlite3.*) ]];then
- echo "Sqlite3 build needed."
- echo "BUILD_SQLITE3=yes" >>
- fi
-# Initialize
-echo "" >
-# Do we need Chicken?
-if [[ -e /usr/bin/sw_vers ]]; then
- ARCHSTR=$(/usr/bin/sw_vers -productVersion)
- ARCHSTR=$(lsb_release -sr)
-if [[ ! $(type csi) ]];then
- echo "Chicken build needed."
- echo "BUILD_CHICKEN=yes" >>
- configure_dependencies
- echo "include chicken.makefile" >>
- echo "CSIPATH=$(which csi)" >>
- CSIPATH=$(which csi)
- echo "CKPATH=$(dirname $(dirname $CSIPATH))" >>
-# Make setup scripts
-echo "#!/bin/bash" >
-echo "export PATH=$CHICKEN_PREFIX/bin:\$PATH" >>
-echo "export LD_LIBRARY_PATH=$CHICKEN_PREFIX/lib" >>
-echo 'exec "$@"' >>
-chmod a+x
-echo "setenv PATH $CHICKEN_PREFIX/bin:\$PATH" > setup.csh
-echo "setenv LD_LIBRARY_PATH $CHICKEN_PREFIX/lib" >> setup.csh
-echo "All done creating, feel free to edit it!"
-echo "run \" bash\" or source setup.csh to get PATH and LD_LIBRARY_PATH adjusted"
+# Flavors include: simple, full and none
+# look at build.config (not a version controlled file and
+# create ulex.scm and dbmgr.scm
+if [[ -e transport-flavor ]];then
+ FLAVOR=$(cat transport-flavor)
+ FLAVOR=full
+sed -e "s/FLAVOR/$FLAVOR/" ulex.scm.template > ulex.scm
+sed -e "s/FLAVOR/$FLAVOR/" dbmgrmod.scm.template > dbmgrmod.scm
Index: dbmod.scm
--- dbmod.scm
+++ dbmod.scm
@@ -3454,12 +3454,14 @@
(lambda (db)
- (let* ((stmth (db:get-cache-stmth dbstruct db qry)))
- (sqlite3:first-result stmth run-id))))))
+ (let* (#;(stmth (db:get-cache-stmth dbstruct db qry)))
+ #;(sqlite3:first-result stmth run-id)
+ (sqlite3:first-result db qry run-id)
+ )))))
;; For a given testname how many items are running? Used to determine
;; probability for regenerating html
(define (db:get-count-tests-running-for-testname dbstruct run-id testname)
Index: dcommon.scm
--- dcommon.scm
+++ dcommon.scm
@@ -26,10 +26,11 @@
(declare (uses commonmod))
(declare (uses configfmod))
(declare (uses rmtmod))
(declare (uses mtargs))
(declare (uses testsmod))
+(declare (uses dbmgrmod))
(module dcommon
(import scheme
@@ -62,10 +63,11 @@
(import mtver
+ dbmgrmod
;; gutils
Index: launchmod.scm
--- launchmod.scm
+++ launchmod.scm
@@ -663,11 +663,12 @@
(let* ((tconfig-fname (conc work-area "/.testconfig"))
(tconfig-tmpfile (conc tconfig-fname ".tmp"))
(tconfig (tests:get-testconfig test-name item-path tconfigreg #t force-create: #t)) ;; 'return-procs)))
(scripts (configf:get-section tconfig "scripts")))
;; create .testconfig file
- (configf:write-alist tconfig tconfig-tmpfile)
+ (configf:write-alist tconfig tconfig-tmpfile #t) ;; the #t forces a check of the written data
+ (assert (file-exists? tconfig-tmpfile) "FATAL: We just wrote the dang file, how can it not exist?")
(move-file tconfig-tmpfile tconfig-fname #t)
(delete-file* ".final-status")
;; extract scripts from testconfig and write them to files in test run dir
Index: megatest.scm
--- megatest.scm
+++ megatest.scm
@@ -167,12 +167,14 @@
;; ;; ulex parameters
;; (work-method 'direct)
;; (return-method 'direct)
;; ulex parameters
- (work-method 'mailbox)
- (return-method 'mailbox)
+ ;; (work-method 'mailbox)
+ ;; (return-method 'mailbox)
+(my-with-lock common:with-simple-file-lock)
;; fake out readline usage of toplevel-command
(define (toplevel-command . a) #f)
(define *didsomething* #f)
(define *db* #f) ;; this is only for the repl, do not use in general!!!!
Index: rmtmod.scm
--- rmtmod.scm
+++ rmtmod.scm
@@ -1191,11 +1191,11 @@
(if (and no-hurry (debug:debug-mode 18))
(let ((th1 (make-thread
(lambda () ;; thread for cleaning up, give it five seconds
(let* ((start-time (current-seconds)))
- (if *db-serv-info*
+ #;(if *db-serv-info*
(let* ((host (servdat-host *db-serv-info*))
(port (servdat-port *db-serv-info*)))
(debug:print-info 0 *default-log-port* "Shutting down server/responder.")
;; TODO - add flushing/waiting on the work queue
Index: runsmod.scm
--- runsmod.scm
+++ runsmod.scm
@@ -244,10 +244,23 @@
(hash-table-set! *runs:denoise* key currtime)
+(define *too-soon-delays* (make-hash-table))
+;; to-soon delay, when matching event happened in less than dseconds delay wseconds
+(define (runs:too-soon-delay key dseconds wseconds)
+ (let* ((last-time (hash-table-ref/default *too-soon-delays* key #f)))
+ (if (and last-time
+ (< (- (current-seconds) last-time) dseconds))
+ (begin
+ (debug:print-info 0 *default-log-port* "Whoa, slow down there ... "key" has been too recently seen.")
+ (thread-sleep! wseconds)))
+ (hash-table-set! *too-soon-delays* key (current-seconds))))
(define (runs:can-run-more-tests runsdat run-id jobgroup max-concurrent-jobs)
;; Take advantage of a good place to exit if running the one-pass methodology
(if (and (> (runs:dat-can-run-more-tests-count runsdat) 20)
(args:get-arg "-one-pass"))
@@ -1467,11 +1480,13 @@
newtal: newtal
itemmaps: itemmaps
;; prereqs-not-met: prereqs-not-met
(runs:dat-regfull-set! runsdat regfull)
+ (runs:too-soon-delay (conc "loop delay " hed) 1 1)
(if (> num-running 0)
(set! last-time-some-running (current-seconds)))
(if (> (current-seconds)(+ last-time-some-running (or (configf:lookup *configdat* "setup" "give-up-waiting") 36000)))
(hash-table-set! *max-tries-hash* tfullname (+ (hash-table-ref/default *max-tries-hash* tfullname 0) 1)))
@@ -1494,10 +1509,11 @@
(if (or (not (null? tal))(not (null? reg)))
(loop (runs:queue-next-hed tal reg reglen regfull)
(runs:queue-next-tal tal reg reglen regfull)
(runs:queue-next-reg tal reg reglen regfull)
;; (loop (car tal)(cdr tal) reg reruns))))
(runs:incremental-print-results run-id)
(debug:print 4 *default-log-port* "TOP OF LOOP => "
"test-name: " test-name
@@ -1725,10 +1741,11 @@
(equal? (configf:lookup *configdat* "setup" "run-wait") "yes"))
(> num-running 0))
;; Here we mark any old defunct tests as incomplete. Do this every fifteen minutes
;; (debug:print 0 *default-log-port* "Got here eh! num-running=" num-running " (> num-running 0) " (> num-running 0))
+ (thread-sleep! 5) ;; let's always sleep, prevents abutting calls to rum:get-count-tests-running-for-run-id - didn't help
(if (> (current-seconds)(+ last-time-incomplete 900))
(let ((actual-num-running (rmt:get-count-tests-running-for-run-id run-id)))
(debug:print-info 0 *default-log-port* "Marking stuck tests as INCOMPLETE while waiting for run " run-id
". Running as pid " (current-process-id) " on " (get-host-name))
(set! last-time-incomplete (current-seconds)) ;; FIXME, this might be causing slow down - use of set!
@@ -1735,11 +1752,10 @@
(runs:find-and-mark-incomplete-and-check-end-of-run run-id #f)
(debug:print-info 0 *default-log-port* "run-wait specified, waiting on " actual-num-running
(time->string (seconds->local-time (current-seconds))))))
;; (if (runs:dat-load-mgmt-function runsdat)((runs:dat-load-mgmt-function runsdat)))
- (thread-sleep! 5) ;; (if (>= num-running max-concurrent-jobs) 5 1))
(wait-loop (rmt:get-count-tests-running-for-run-id run-id)
;; LET* ((test-record
;; we get here on "drop through". All done!
;; this is moved to runs:run-testes since this function is getting called twice to ensure everthing is completed.
Index: tests/simplerun/tests/test1/testconfig
--- tests/simplerun/tests/test1/testconfig
+++ tests/simplerun/tests/test1/testconfig
@@ -24,11 +24,11 @@
# waiton setup
priority 0
# Iteration for your tests are controlled by the items section
+# [items]
# PARTOFDAY morning noon afternoon evening night
# test_meta is a section for storing additional data on your test
author matt
ADDED ulex-full/dbmgr.scm
Index: ulex-full/dbmgr.scm
--- /dev/null
+++ ulex-full/dbmgr.scm
@@ -0,0 +1,1131 @@
+;; Copyright 2022, Matthew Welland.
+;; This file is part of Megatest.
+;; Megatest is free software: you can redistribute it and/or modify
+;; it under the terms of the GNU General Public License as published by
+;; the Free Software Foundation, either version 3 of the License, or
+;; (at your option) any later version.
+;; Megatest is distributed in the hope that it will be useful,
+;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;; GNU General Public License for more details.
+;; You should have received a copy of the GNU General Public License
+;; along with Megatest. If not, see .
+(declare (unit dbmgrmod))
+(declare (uses ulex))
+(declare (uses apimod))
+(declare (uses pkts))
+(declare (uses commonmod))
+(declare (uses dbmod))
+(declare (uses mtargs))
+(declare (uses portloggermod))
+(declare (uses debugprint))
+(module dbmgrmod
+ *
+(import scheme
+ chicken.base
+ chicken.condition
+ chicken.file
+ chicken.format
+ chicken.port
+ chicken.process
+ chicken.process-context
+ chicken.process-context.posix
+ chicken.sort
+ chicken.string
+ chicken.time
+ (prefix sqlite3 sqlite3:)
+ matchable
+ md5
+ message-digest
+ regex
+ s11n
+ srfi-1
+ srfi-18
+ srfi-69
+ system-information
+ typed-records
+ pkts
+ ulex
+ commonmod
+ apimod
+ dbmod
+ debugprint
+ (prefix mtargs args:)
+ portloggermod
+ )
+;; Configurations for server
+;; (tcp-buffer-size 2048)
+;; (max-connections 2048)
+;; info about me as a listener and my connections to db servers
+;; stored (for now) in *db-serv-info*
+(defstruct servdat
+ (host #f)
+ (port #f)
+ (uuid #f)
+ (dbfile #f)
+ (uconn #f) ;; this is the listener *FOR THIS PROCESS*
+ (mode #f)
+ (status 'starting)
+ (trynum 0) ;; count the number of ports we've tried
+ (conns (make-hash-table)) ;; apath/dbname => conndat
+ )
+(define *db-serv-info* (make-servdat))
+(define (servdat->url sdat)
+ (conc (servdat-host sdat)":"(servdat-port sdat)))
+;; db servers contact info
+(defstruct conndat
+ (apath #f)
+ (dbname #f)
+ (fullname #f)
+ (hostport #f)
+ (ipaddr #f)
+ (port #f)
+ (srvpkt #f)
+ (srvkey #f)
+ (lastmsg 0)
+ (expires 0))
+(define *srvpktspec*
+ `((server (host . h)
+ (port . p)
+ (servkey . k)
+ (pid . i)
+ (ipaddr . a)
+ (dbpath . d))))
+;; S U P P O R T F U N C T I O N S
+;; set up the api proc, seems like there should be a better place for this?
+;; (define api-proc (make-parameter conc))
+;; (api-proc api:execute-requests)
+;; do we have a connection to apath dbname and
+;; is it not expired? then return it
+;; else setup a connection
+;; if that fails, return '(#f "some reason") ;; NB// convert to raising an exception
+(define (rmt:get-conn remdat apath dbname)
+ (let* ((fullname (db:dbname->path apath dbname)))
+ (hash-table-ref/default (servdat-conns remdat) fullname #f)))
+(define (rmt:drop-conn remdat apath dbname)
+ (let* ((fullname (db:dbname->path apath dbname)))
+ (hash-table-delete! (servdat-conns remdat) fullname)))
+(define (rmt:find-main-server uconn apath dbname)
+ (let* ((pktsdir (get-pkts-dir apath))
+ (all-srvpkts (get-all-server-pkts pktsdir *srvpktspec*))
+ (viable-srvs (get-viable-servers all-srvpkts dbname)))
+ (get-the-server uconn apath viable-srvs)))
+(define *connstart-mutex* (make-mutex))
+(define *last-main-start* 0)
+;; looks for a connection to main, returns if have and not exired
+;; creates new otherwise
+;; connections for other servers happens by requesting from main
+;; TODO: This is unnecessarily re-creating the record in the hash table
+(define (rmt:open-main-connection remdat apath)
+ (let* ((fullpath (db:dbname->path apath ".db/main.db"))
+ (conns (servdat-conns remdat))
+ (conn (rmt:get-conn remdat apath ".db/main.db")) ;; (hash-table-ref/default conns fullpath #f)) ;; TODO - create call for this
+ (start-rmt:run (lambda ()
+ (let* ((th1 (make-thread (lambda ()(rmt:run (get-host-name))) "non-db mode server")))
+ (thread-start! th1)
+ (thread-sleep! 1)
+ (let loop ((count 0))
+ (assert (< count 30) "FATAL: responder failed to initialize in rmt:open-main-connection")
+ (if (or (not *db-serv-info*)
+ (not (servdat-uconn *db-serv-info*)))
+ (begin
+ (thread-sleep! 1)
+ (loop (+ count 1)))
+ (begin
+ (servdat-mode-set! *db-serv-info* 'non-db)
+ (servdat-uconn *db-serv-info*)))))))
+ (myconn (servdat-uconn *db-serv-info*)))
+ (cond
+ ((not myconn)
+ (start-rmt:run)
+ (rmt:open-main-connection remdat apath))
+ ((and conn ;; conn is NOT a socket, just saying ...
+ (< (current-seconds) (conndat-expires conn)))
+ #t) ;; we are current and good to go - we'll deal elsewhere with a server that was killed or died
+ ((and conn
+ (>= (current-seconds)(conndat-expires conn)))
+ (debug:print-info 0 *default-log-port* "connection to "fullpath" server expired. Reconnecting.")
+ (rmt:drop-conn remdat apath ".db/main.db") ;;
+ (rmt:open-main-connection remdat apath))
+ (else
+ ;; Below we will find or create and connect to main
+ (debug:print-info 0 *default-log-port* "rmt:open-main-connection - starting from scratch")
+ (let* ((dbname (db:run-id->dbname #f))
+ (the-srv (rmt:find-main-server myconn apath dbname))
+ (start-main-srv (lambda () ;; call IF there is no the-srv found
+ (mutex-lock! *connstart-mutex*)
+ (if (> (- (current-seconds) *last-main-start*) 5) ;; at least four seconds since last attempt to start main server
+ (begin
+ (api:run-server-process apath dbname)
+ (set! *last-main-start* (current-seconds))
+ (thread-sleep! 1))
+ (thread-sleep! 0.25))
+ (mutex-unlock! *connstart-mutex*)
+ (rmt:open-main-connection remdat apath) ;; TODO: Add limit to number of tries
+ )))
+ (if (not the-srv) ;; have server, try connecting to it
+ (start-main-srv)
+ (let* ((srv-addr (server-address the-srv)) ;; need serv
+ (ipaddr (alist-ref 'ipaddr the-srv))
+ (port (alist-ref 'port the-srv))
+ (srvkey (alist-ref 'servkey the-srv))
+ (fullpath (db:dbname->path apath dbname))
+ (new-the-srv (make-conndat
+ apath: apath
+ dbname: dbname
+ fullname: fullpath
+ hostport: srv-addr
+ ;; socket: (open-nn-connection srv-addr) - TODO - open ulex connection?
+ ipaddr: ipaddr
+ port: port
+ srvpkt: the-srv
+ srvkey: srvkey ;; generated by rmt:get-signature on the server side
+ lastmsg: (current-seconds)
+ expires: (+ (current-seconds)
+ (server:expiration-timeout)
+ -2) ;; this needs to be gathered during the ping
+ )))
+ (hash-table-set! conns fullpath new-the-srv)))
+ #t)))))
+;; NB// sinfo is a servdat struct
+(define (rmt:general-open-connection sinfo apath dbname #!key (num-tries 5))
+ (assert (not (equal? dbname ".db/main.db")) "ERROR: general-open-connection should never be called with main as the db")
+ (let* ((mdbname ".db/main.db") ;; (db:run-id->dbname #f)) TODO: put this back to the lookup when stable
+ (fullname (db:dbname->path apath dbname))
+ (conns (servdat-conns sinfo))
+ (mconn (rmt:get-conn sinfo apath ".db/main.db"))
+ (dconn (rmt:get-conn sinfo apath dbname)))
+ #;(if (and mconn
+ (not (debug:print-logger)))
+ (begin
+ (debug:print-info 0 *default-log-port* "Turning on logging to main, look in logs dir for main log.")
+ (debug:print-logger rmt:log-to-main)))
+ (cond
+ ((and mconn
+ dconn
+ (< (current-seconds)(conndat-expires dconn)))
+ #t) ;; good to go
+ ((not mconn) ;; no channel open to main? open it...
+ (rmt:open-main-connection sinfo apath)
+ (rmt:general-open-connection sinfo apath dbname num-tries: (- num-tries 1)))
+ ((not dconn) ;; no channel open to dbname?
+ (let* ((res (rmt:send-receive-real sinfo apath mdbname 'get-server `(,apath ,dbname))))
+ (case res
+ ((server-started)
+ (if (> num-tries 0)
+ (begin
+ (thread-sleep! 2)
+ (rmt:general-open-connection sinfo apath dbname num-tries: (- num-tries 1)))
+ (begin
+ (debug:print-error 0 *default-log-port* "Failed to start servers needed or open channel to "apath", "dbname)
+ (exit 1))))
+ (else
+ (if (list? res) ;; server has been registered and the info was returned. pass it on.
+ (begin ;; ("" 53817
+ ;; "5e34239f48e8973b3813221e54701a01" "24310"
+ ;; ""
+ ;; "/home/matt/data/megatest/tests/simplerun"
+ ;; ".db/1.db")
+ (match
+ res
+ ((host port servkey pid ipaddr apath dbname)
+ (debug:print-info 0 *default-log-port* "got "res)
+ (hash-table-set! conns
+ fullname
+ (make-conndat
+ apath: apath
+ dbname: dbname
+ hostport: (conc host":"port)
+ ;; socket: (open-nn-connection (conc host":"port)) ;; TODO - open ulex connection?
+ ipaddr: ipaddr
+ port: port
+ srvkey: servkey
+ lastmsg: (current-seconds)
+ expires: (+ (current-seconds)
+ (server:expiration-timeout)
+ -2))))
+ (else
+ (debug:print-info 0 *default-log-port* "return data from starting server did not match host port servkey pid ipaddr apath dbname " res)))
+ res)
+ (begin
+ (debug:print-info 0 *default-log-port* "Unexpected result: " res)
+ res)))))))
+ #t))
+;; (define *localmode* #t)
+(define *localmode* #f)
+(define *dbstruct* (make-dbr:dbstruct))
+;; Defaults to current area
+(define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f))
+ (let* ((apath *toppath*)
+ (sinfo *db-serv-info*)
+ (dbname (db:run-id->dbname rid)))
+ (if *localmode*
+ (api:execute-requests *dbstruct* cmd params)
+ (begin
+ (rmt:open-main-connection sinfo apath)
+ (if rid (rmt:general-open-connection sinfo apath dbname))
+ #;(if (not (member cmd '(log-to-main)))
+ (debug:print-info 0 *default-log-port* "rmt:send-receive "cmd" params="params))
+ (rmt:send-receive-real sinfo apath dbname cmd params)))))
+;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed
+;; sometime in the future
+(define (rmt:send-receive-real sinfo apath dbname cmd params)
+ (assert (not (eq? 'primordial (thread-name (current-thread)))) "FATAL: Do not call rmt:send-receive-real in the primodial thread.")
+ (let* ((cdat (rmt:get-conn sinfo apath dbname)))
+ (assert cdat "FATAL: rmt:send-receive-real called without the needed channels opened")
+ (let* ((uconn (servdat-uconn sinfo)) ;; get the interface to ulex
+ ;; then send-receive using the ulex layer to host-port stored in cdat
+ (res (send-receive uconn (conndat-hostport cdat) cmd params))
+ #;(th1 (make-thread (lambda ()
+ (set! res (send-receive uconn (conndat-hostport cdat) cmd params)))
+ "send-receive thread")))
+ ;; (thread-start! th1)
+ ;; (thread-join! th1) ;; gratuitious thread stuff is so that mailbox is not used in primordial thead
+ ;; since we accessed the server we can bump the expires time up
+ (conndat-expires-set! cdat (+ (current-seconds)
+ (server:expiration-timeout)
+ -2)) ;; two second margin for network time misalignments etc.
+ res)))
+;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed
+;; sometime in the future.
+;; Purpose - call the main.db server and request a server be started
+;; for the given area path and dbname
+(define (rmt:print-db-stats)
+ (let ((fmtstr "~40a~7-d~9-d~20,2-f")) ;; "~20,2-f"
+ (debug:print 18 *default-log-port* "DB Stats, "(seconds->year-week/day-time (current-seconds))"\n=====================")
+ (debug:print 18 *default-log-port* (format #f "~40a~8a~10a~10a" "Cmd" "Count" "TotTime" "Avg"))
+ (for-each (lambda (cmd)
+ (let ((cmd-dat (hash-table-ref *db-stats* cmd)))
+ (debug:print 18 *default-log-port* (format #f fmtstr cmd (vector-ref cmd-dat 0) (vector-ref cmd-dat 1) (/ (vector-ref cmd-dat 1)(vector-ref cmd-dat 0))))))
+ (sort (hash-table-keys *db-stats*)
+ (lambda (a b)
+ (> (vector-ref (hash-table-ref *db-stats* a) 0)
+ (vector-ref (hash-table-ref *db-stats* b) 0)))))))
+(define (rmt:get-max-query-average run-id)
+ (mutex-lock! *db-stats-mutex*)
+ (let* ((runkey (conc "run-id=" run-id " "))
+ (cmds (filter (lambda (x)
+ (substring-index runkey x))
+ (hash-table-keys *db-stats*)))
+ (res (if (null? cmds)
+ (cons 'none 0)
+ (let loop ((cmd (car cmds))
+ (tal (cdr cmds))
+ (max-cmd (car cmds))
+ (res 0))
+ (let* ((cmd-dat (hash-table-ref *db-stats* cmd))
+ (tot (vector-ref cmd-dat 0))
+ (curravg (/ (vector-ref cmd-dat 1) (vector-ref cmd-dat 0))) ;; count is never zero by construction
+ (currmax (max res curravg))
+ (newmax-cmd (if (> curravg res) cmd max-cmd)))
+ (if (null? tal)
+ (if (> tot 10)
+ (cons newmax-cmd currmax)
+ (cons 'none 0))
+ (loop (car tal)(cdr tal) newmax-cmd currmax)))))))
+ (mutex-unlock! *db-stats-mutex*)
+ res))
+;; host and port are used to ensure we are remove proper records
+(define (rmt:server-shutdown host port)
+ (let ((dbfile (servdat-dbfile *db-serv-info*)))
+ (debug:print-info 0 *default-log-port* "dbfile is "dbfile)
+ (if dbfile
+ (let* ((am-server (args:get-arg "-server"))
+ (dbfile (args:get-arg "-db"))
+ (apath *toppath*)
+ #;(sinfo *remotedat*)) ;; foundation for future fix
+ (if *dbstruct-db*
+ (let* ((dbdat (db:get-dbdat *dbstruct-db* apath dbfile))
+ (db (dbr:dbdat-db dbdat))
+ (inmem (dbr:dbdat-db dbdat)) ;; WRONG
+ )
+ ;; do a final sync here
+ (debug:print-info 0 *default-log-port* "Doing final sync for "apath" "dbfile" at "(current-seconds))
+ (db:sync-inmem->disk *dbstruct-db* apath dbfile force-sync: #t)
+ ;; let's finalize here
+ (debug:print-info 0 *default-log-port* "Finalizing db and inmem")
+ (if (sqlite3:database? db)
+ (sqlite3:finalize! db)
+ (debug:print-info 0 *default-log-port* "in rmt:server-shutdown, db is not a database, not finalizing..."))
+ (if (sqlite3:database? inmem)
+ (sqlite3:finalize! inmem)
+ (debug:print-info 0 *default-log-port* "in rmt:server-shutdown, inmem is not a database, not finalizing..."))
+ (debug:print-info 0 *default-log-port* "Finalizing db and inmem complete"))
+ (debug:print-info 0 *default-log-port* "Db was never opened, no cleanup to do."))
+ (if (not am-server)
+ (debug:print-info 0 *default-log-port* "I am not a server, should NOT get here!")
+ (if (string-match ".*/main.db$" dbfile)
+ (let ((pkt-file (conc (get-pkts-dir *toppath*)
+ "/" (servdat-uuid *db-serv-info*)
+ ".pkt")))
+ (debug:print-info 0 *default-log-port* "removing pkt "pkt-file)
+ (delete-file* pkt-file)
+ (debug:print-info 0 *default-log-port* "Releasing lock (if any) for "dbfile ", host "host", port "port)
+ (db:with-lock-db
+ (servdat-dbfile *db-serv-info*)
+ (lambda (dbh dbfile)
+ (db:release-lock dbh dbfile host port)))) ;; I'm not the server - should not have a lock to remove
+ (let* ((sdat *db-serv-info*) ;; we have a run-id server
+ (host (servdat-host sdat))
+ (port (servdat-port sdat))
+ (uuid (servdat-uuid sdat))
+ (res (rmt:deregister-server *db-serv-info* *toppath* host port uuid dbfile)))
+ (debug:print-info 0 *default-log-port* "deregistered-server, res="res)
+ (debug:print-info 0 *default-log-port* "deregistering server "host":"port" with uuid "uuid)
+ )))))))
+(define (common:run-sync?)
+ ;; (and (common:on-homehost?)
+ (args:get-arg "-server"))
+(define *rmt:run-mutex* (make-mutex))
+(define *rmt:run-flag* #f)
+;; Main entry point to start a server. was start-server
+(define (rmt:run hostn)
+ (mutex-lock! *rmt:run-mutex*)
+ (if *rmt:run-flag*
+ (begin
+ (debug:print-warn 0 *default-log-port* "rmt:run already running.")
+ (mutex-unlock! *rmt:run-mutex*))
+ (begin
+ (set! *rmt:run-flag* #t)
+ (mutex-unlock! *rmt:run-mutex*)
+ ;; ;; Configurations for server
+ ;; (tcp-buffer-size 2048)
+ ;; (max-connections 2048)
+ (debug:print 2 *default-log-port* "PID: "(current-process-id)". Attempting to start the server ...")
+ (if (and *db-serv-info*
+ (servdat-uconn *db-serv-info*))
+ (let* ((uconn (servdat-uconn *db-serv-info*)))
+ (wait-and-close uconn))
+ (let* ((port (portlogger:open-run-close portlogger:find-port))
+ (handler-proc (lambda (rem-host-port qrykey cmd params) ;;
+ (set! *db-last-access* (current-seconds))
+ (assert (list? params) "FATAL: handler called with non-list params")
+ (assert (args:get-arg "-server") "FATAL: handler called on non-server side. cmd="cmd", params="params)
+ (debug:print 0 *default-log-port* "handler call: "cmd", params="params)
+ (api:execute-requests *dbstruct-db* cmd params))))
+ ;; (api:process-request *dbstuct-db*
+ (if (not *db-serv-info*)
+ (set! *db-serv-info* (make-servdat host: hostn port: port)))
+ (let* ((uconn (run-listener handler-proc port))
+ (rport (udat-port uconn))) ;; the real port
+ (servdat-host-set! *db-serv-info* hostn)
+ (servdat-port-set! *db-serv-info* rport)
+ (servdat-uconn-set! *db-serv-info* uconn)
+ (wait-and-close uconn)
+ (db:print-current-query-stats)
+ )))
+ (let* ((host (servdat-host *db-serv-info*))
+ (port (servdat-port *db-serv-info*))
+ (mode (or (servdat-mode *db-serv-info*)
+ "non-db")))
+ ;; server exit stuff here
+ ;; (rmt:server-shutdown host port) - always do in on-exit
+ ;; (portlogger:open-run-close portlogger:set-port port "released") ;; moved to on-exit
+ (debug:print-info 0 *default-log-port* "Server "host":"port" mode "mode"shutdown complete. Exiting")
+ ))))
+;; S E R V E R U T I L I T I E S
+;; only use for main.db - need to re-write some of this :(
+(define (get-lock-db sdat dbfile host port)
+ (assert host "FATAL: get-lock-db called with host not set.")
+ (assert port "FATAL: get-lock-db called with port not set.")
+ (let* ((dbh (db:open-run-db dbfile db:initialize-db)) ;; open-run-db creates a standard db with schema used by all situations
+ (res (db:get-iam-server-lock dbh dbfile host port))
+ (uconn (servdat-uconn sdat)))
+ ;; res => list then already locked, check server is responsive
+ ;; => #t then sucessfully got the lock
+ ;; => #f reserved for future use as to indicate something went wrong
+ (match res
+ ((owner_pid owner_host owner_port event_time)
+ (if (server-ready? uconn (conc owner_host":"owner_port) "abc")
+ #f ;; locked by someone else
+ (begin ;; locked by someone dead and gone
+ (debug:print 0 *default-log-port* "WARNING: stale lock - have to steal it. This may fail.")
+ (db:steal-lock-db dbh dbfile port))))
+ (#t #t) ;; placeholder so that we don't touch res if it is #t
+ (else (set! res #f)))
+ (sqlite3:finalize! dbh)
+ res))
+(define (register-server pkts-dir pkt-spec host port servkey ipaddr dbpath)
+ (let* ((pkt-dat `((host . ,host)
+ (port . ,port)
+ (servkey . ,servkey)
+ (pid . ,(current-process-id))
+ (ipaddr . ,ipaddr)
+ (dbpath . ,dbpath)))
+ (uuid (write-alist->pkt
+ pkts-dir
+ pkt-dat
+ pktspec: pkt-spec
+ ptype: 'server)))
+ (debug:print 0 *default-log-port* "Server on "host":"port" registered in pkt "uuid)
+ uuid))
+(define (get-pkts-dir #!optional (apath #f))
+ (let* ((effective-toppath (or *toppath* apath)))
+ (assert effective-toppath
+ "ERROR: get-pkts-dir called without *toppath* set. Exiting.")
+ (let* ((pdir (conc effective-toppath "/.meta/srvpkts")))
+ (if (file-exists? pdir)
+ pdir
+ (begin
+ (handle-exceptions ;; this exception handler should NOT be needed but ...
+ exn
+ pdir
+ (create-directory pdir #t))
+ pdir)))))
+;; given a pkts dir read
+(define (get-all-server-pkts pktsdir-in pktspec)
+ (let* ((pktsdir (if (file-exists? pktsdir-in)
+ pktsdir-in
+ (begin
+ (create-directory pktsdir-in #t)
+ pktsdir-in)))
+ (all-pkt-files (glob (conc pktsdir "/*.pkt"))))
+ (map (lambda (pkt-file)
+ (read-pkt->alist pkt-file pktspec: pktspec))
+ all-pkt-files)))
+(define (server-address srv-pkt)
+ (conc (alist-ref 'host srv-pkt) ":"
+ (alist-ref 'port srv-pkt)))
+(define (server-ready? uconn host-port key) ;; server-address is host:port
+ (let* ((params `((cmd . ping)(key . ,key)))
+ (data `((cmd . ping)
+ (key . ,key)
+ (params . ,params))) ;; I don't get it.
+ (res (send-receive uconn host-port 'ping data)))
+ (if (eq? res 'ack) ;; yep, likely it is who we want on the other end
+ res
+ #f)))
+;; (begin (debug:print-info 0 *default-log-port* "server-ready? => "res) #f))))
+; from the pkts return servers associated with dbpath
+;; NOTE: Only one can be alive - have to check on each
+;; in the list of pkts returned
+(define (get-viable-servers serv-pkts dbpath)
+ (let loop ((tail serv-pkts)
+ (res '()))
+ (if (null? tail)
+ res ;; NOTE: sort by age so oldest is considered first
+ (let* ((spkt (car tail)))
+ (loop (cdr tail)
+ (if (equal? dbpath (alist-ref 'dbpath spkt))
+ (cons spkt res)
+ res))))))
+(define (remove-pkts-if-not-alive uconn serv-pkts)
+ (filter (lambda (pkt)
+ (let* ((host (alist-ref 'host pkt))
+ (port (alist-ref 'port pkt))
+ (host-port (conc host":"port))
+ (key (alist-ref 'servkey pkt))
+ (pktz (alist-ref 'Z pkt))
+ (res (server-ready? uconn host-port key)))
+ (if res
+ res
+ (let* ((pktsdir (get-pkts-dir *toppath*))
+ (pktpath (conc pktsdir"/"pktz".pkt")))
+ (debug:print 0 *default-log-port* "WARNING: pkt with no server "pktpath)
+ (delete-file* pktpath)
+ #f))))
+ serv-pkts))
+;; from viable servers get one that is alive and ready
+(define (get-the-server uconn apath serv-pkts)
+ (let loop ((tail serv-pkts))
+ (if (null? tail)
+ #f
+ (let* ((spkt (car tail))
+ (host (alist-ref 'ipaddr spkt))
+ (port (alist-ref 'port spkt))
+ (host-port (conc host":"port))
+ (dbpth (alist-ref 'dbpath spkt))
+ (srvkey (alist-ref 'Z spkt)) ;; (alist-ref 'srvkey spkt))
+ (addr (server-address spkt)))
+ (if (server-ready? uconn host-port srvkey)
+ spkt
+ (loop (cdr tail)))))))
+;; am I the "first" in line server? I.e. my D card is smallest
+;; use Z card as tie breaker
+(define (get-best-candidate serv-pkts dbpath)
+ (if (null? serv-pkts)
+ #f
+ (let loop ((tail serv-pkts)
+ (best (car serv-pkts)))
+ (if (null? tail)
+ best
+ (let* ((candidate (car tail))
+ (candidate-bd (string->number (alist-ref 'D candidate)))
+ (best-bd (string->number (alist-ref 'D best)))
+ ;; bigger number is younger
+ (candidate-z (alist-ref 'Z candidate))
+ (best-z (alist-ref 'Z best))
+ (new-best (cond
+ ((> best-bd candidate-bd) ;; best is younger than candidate
+ candidate)
+ ((< best-bd candidate-bd) ;; candidate is younger than best
+ best)
+ (else
+ (if (string>=? best-z candidate-z)
+ best
+ candidate))))) ;; use Z card as tie breaker
+ (if (null? tail)
+ new-best
+ (loop (cdr tail) new-best)))))))
+;; if .db/main.db check the pkts
+(define (rmt:wait-for-server pkts-dir db-file server-key)
+ (let* ((sdat *db-serv-info*))
+ (let loop ((start-time (current-seconds))
+ (changed #t)
+ (last-sdat "not this"))
+ (begin ;; let ((sdat #f))
+ (thread-sleep! 0.01)
+ (debug:print-info 0 *default-log-port* "Waiting for server alive signature")
+ (mutex-lock! *heartbeat-mutex*)
+ (set! sdat *db-serv-info*)
+ (mutex-unlock! *heartbeat-mutex*)
+ (if (and sdat
+ (not changed)
+ (> (- (current-seconds) start-time) 2))
+ (let* ((uconn (servdat-uconn sdat)))
+ (servdat-status-set! sdat 'iface-stable)
+ (debug:print-info 0 *default-log-port* "Received server alive signature, now attempting to lock in server")
+ ;; create a server pkt in *toppath*/.meta/srvpkts
+ ;; TODO:
+ ;; 1. change sdat to stuct
+ ;; 2. add uuid to struct
+ ;; 3. update uuid in sdat here
+ ;;
+ (servdat-uuid-set! sdat
+ (register-server
+ pkts-dir *srvpktspec*
+ (get-host-name)
+ (servdat-port sdat) server-key
+ (servdat-host sdat) db-file))
+ ;; (set! *my-signature* (servdat-uuid sdat)) ;; replace with Z, no, stick with proper key
+ ;; now read pkts and see if we are a contender
+ (let* ((all-pkts (get-all-server-pkts pkts-dir *srvpktspec*))
+ (viables (get-viable-servers all-pkts db-file))
+ (alive (remove-pkts-if-not-alive uconn viables))
+ (best-srv (get-best-candidate alive db-file))
+ (best-srv-key (if best-srv (alist-ref 'servkey best-srv) #f))
+ (i-am-srv (equal? best-srv-key server-key))
+ (delete-pkt (lambda ()
+ (let* ((pktfile (conc (get-pkts-dir *toppath*)
+ "/" (servdat-uuid *db-serv-info*)
+ ".pkt")))
+ (debug:print-info 0 *default-log-port* "Attempting to remove bogus pkt file "pktfile)
+ (delete-file* pktfile))))) ;; remove immediately instead of waiting for on-exit
+ (debug:print 0 *default-log-port* "best-srv-key: "best-srv-key", server-key: "server-key", i-am-srv: "i-am-srv)
+ ;; am I the best-srv, compare server-keys to know
+ (if i-am-srv
+ (if (get-lock-db sdat db-file (servdat-host sdat)(servdat-port sdat)) ;; (db:get-iam-server-lock *dbstruct-db* *toppath* run-id)
+ (begin
+ (debug:print-info 0 *default-log-port* "I'm the server!")
+ (servdat-dbfile-set! sdat db-file)
+ (servdat-status-set! sdat 'db-locked))
+ (begin
+ (debug:print-info 0 *default-log-port* "I'm not the server, exiting.")
+ (bdat-time-to-exit-set! *bdat* #t)
+ (delete-pkt)
+ (thread-sleep! 0.2)
+ (exit)))
+ (begin
+ (debug:print-info 0 *default-log-port*
+ "Keys do not match "best-srv-key", "server-key", exiting.")
+ (bdat-time-to-exit-set! *bdat* #t)
+ (delete-pkt)
+ (thread-sleep! 0.2)
+ (exit)))
+ sdat))
+ (begin ;; sdat not yet contains server info
+ (debug:print-info 0 *default-log-port* "Still waiting, last-sdat=" last-sdat)
+ (sleep 4)
+ (if (> (- (current-seconds) start-time) 120) ;; been waiting for two minutes
+ (begin
+ (debug:print-error 0 *default-log-port* "transport appears to have died, exiting server")
+ (exit))
+ (loop start-time
+ (equal? sdat last-sdat)
+ sdat))))))))
+(define (rmt:register-server sinfo apath iface port server-key dbname)
+ (servdat-conns sinfo) ;; just checking types
+ (rmt:open-main-connection sinfo apath) ;; we need a channel to main.db
+ (rmt:send-receive-real sinfo apath ;; params: host port servkey pid ipaddr dbpath
+ (db:run-id->dbname #f)
+ 'register-server `(,iface
+ ,port
+ ,server-key
+ ,(current-process-id)
+ ,iface
+ ,apath
+ ,dbname)))
+(define (rmt:get-count-servers sinfo apath)
+ (servdat-conns sinfo) ;; just checking types
+ (rmt:open-main-connection sinfo apath) ;; we need a channel to main.db
+ (rmt:send-receive-real sinfo apath ;; params: host port servkey pid ipaddr dbpath
+ (db:run-id->dbname #f)
+ 'get-count-servers `(,apath)))
+(define (rmt:get-servers-info apath)
+ (rmt:send-receive 'get-servers-info #f `(,apath)))
+(define (rmt:deregister-server db-serv-info apath iface port server-key dbname)
+ (rmt:open-main-connection db-serv-info apath) ;; we need a channel to main.db
+ (rmt:send-receive-real db-serv-info apath ;; params: host port servkey pid ipaddr dbpath
+ (db:run-id->dbname #f)
+ 'deregister-server `(,iface
+ ,port
+ ,server-key
+ ,(current-process-id)
+ ,iface
+ ,apath
+ ,dbname)))
+(define (rmt:wait-for-stable-interface #!optional (num-tries-allowed 100))
+ ;; wait until *db-serv-info* stops changing
+ (let* ((stime (current-seconds)))
+ (let loop ((last-host #f)
+ (last-port #f)
+ (tries 0))
+ (let* ((curr-host (and *db-serv-info* (servdat-host *db-serv-info*)))
+ (curr-port (and *db-serv-info* (servdat-port *db-serv-info*))))
+ ;; first we verify port and interface, update *db-serv-info* in need be.
+ (cond
+ ((> tries num-tries-allowed)
+ (debug:print 0 *default-log-port* "rmt:keep-running, giving up after trying for several minutes.")
+ (exit 1))
+ ((not *db-serv-info*)
+ (thread-sleep! 0.25)
+ (loop curr-host curr-port (+ tries 1)))
+ ((or (not last-host)(not last-port))
+ (debug:print 0 *default-log-port* "rmt:keep-running, still no interface, tries="tries)
+ (thread-sleep! 0.25)
+ (loop curr-host curr-port (+ tries 1)))
+ ((or (not (equal? last-host curr-host))
+ (not (equal? last-port curr-port)))
+ (debug:print-info 0 *default-log-port* "WARNING: interface changed, refreshing iface and port info")
+ (thread-sleep! 0.25)
+ (loop curr-host curr-port (+ tries 1)))
+ ((< (- (current-seconds) stime) 1) ;; keep up the looping until at least 3 seconds have passed
+ (thread-sleep! 0.5)
+ (loop curr-host curr-port (+ tries 1)))
+ (else
+ (rmt:get-signature) ;; sets *my-signature* as side effect
+ (servdat-status-set! *db-serv-info* 'interface-stable)
+ (debug:print 0 *default-log-port*
+ "SERVER STARTED: " curr-host
+ ":" curr-port
+ " AT " (current-seconds) " server signature: " *my-signature*
+ " with "(servdat-trynum *db-serv-info*)" port changes")
+ (flush-output *default-log-port*)
+ #t))))))
+;; run rmt:keep-running in a parallel thread to monitor that the db is being
+;; used and to shutdown after sometime if it is not.
+(define (rmt:keep-running dbname)
+ ;; if none running or if > 20 seconds since
+ ;; server last used then start shutdown
+ ;; This thread waits for the server to come alive
+ (debug:print-info 0 *default-log-port* "Starting the sync-back, keep alive thread in server")
+ (let* ((sinfo *db-serv-info*)
+ (server-start-time (current-seconds))
+ (pkts-dir (get-pkts-dir))
+ (server-key (rmt:get-signature)) ;; This servers key
+ (is-main (equal? (args:get-arg "-db") ".db/main.db"))
+ (last-access 0)
+ (server-timeout (server:expiration-timeout))
+ (shutdown-server-sequence (lambda (host port)
+ (set! *unclean-shutdown* #f) ;; Should not be needed anymore
+ (debug:print-info 0 *default-log-port* "Starting to shutdown the server. pid="(current-process-id))
+ ;; (rmt:server-shutdown host port) -- called in on-exit
+ ;; (portlogger:open-run-close portlogger:set-port port "released") called in on-exit
+ (exit)))
+ (timed-out? (lambda ()
+ (<= (+ last-access server-timeout)
+ (current-seconds)))))
+ (servdat-dbfile-set! *db-serv-info* (args:get-arg "-db"))
+ ;; main and run db servers have both got wait logic (could/should merge it)
+ (if is-main
+ (rmt:wait-for-server pkts-dir dbname server-key)
+ (rmt:wait-for-stable-interface))
+ ;; this is our forever loop
+ (let* ((iface (servdat-host *db-serv-info*))
+ (port (servdat-port *db-serv-info*))
+ (uconn (servdat-uconn *db-serv-info*)))
+ (let loop ((count 0)
+ (bad-sync-count 0)
+ (start-time (current-milliseconds)))
+ (if (and (not is-main)
+ (common:low-noise-print 60 "servdat-status"))
+ (debug:print-info 0 *default-log-port* "servdat-status is " (servdat-status *db-serv-info*)))
+ (mutex-lock! *heartbeat-mutex*)
+ ;; set up the database handle
+ (if (not *dbstruct-db*) ;; no db opened yet, open the db and register with main if appropriate
+ (let ((watchdog (bdat-watchdog *bdat*)))
+ (debug:print 0 *default-log-port* "SERVER: dbprep")
+ (db:setup dbname) ;; sets *dbstruct-db* as side effect
+ (servdat-status-set! *db-serv-info* 'db-opened)
+ ;; IFF I'm not main, call into main and register self
+ (if (not is-main)
+ (let ((res (rmt:register-server sinfo
+ *toppath* iface port
+ server-key dbname)))
+ (if res ;; we are the server
+ (servdat-status-set! *db-serv-info* 'have-interface-and-db)
+ ;; now check that the db locker is alive, clear it out if not
+ (let* ((serv-info (rmt:server-info *toppath* dbname)))
+ (match serv-info
+ ((host port servkey pid ipaddr apath dbpath)
+ (if (not (server-ready? uconn (conc host":"port) servkey))
+ (begin
+ (debug:print-info 0 *default-log-port* "Server registered but not alive. Removing and trying again.")
+ (rmt:deregister-server sinfo apath host port servkey dbpath) ;; servkey pid ipaddr apath dbpath)
+ (loop (+ count 1) bad-sync-count start-time))))
+ (else
+ (debug:print 0 *default-log-port* "We are not the server for "dbname", exiting. Server info is: "serv-info)
+ (exit)))))))
+ (debug:print 0 *default-log-port*
+ "SERVER: running, db "dbname" opened, megatest version: "
+ (common:get-full-version))
+ ;; start the watchdog
+ ;; is this really needed?
+ #;(if watchdog
+ (if (not (member (thread-state watchdog)
+ '(ready running blocked
+ sleeping dead)))
+ (begin
+ (debug:print-info 0 *default-log-port* "Starting watchdog thread (in state "(thread-state watchdog)")")
+ (thread-start! watchdog))
+ (debug:print-info 0 *default-log-port* "Not starting watchdog thread (in state "(thread-state watchdog)")"))
+ (debug:print 0 *default-log-port* "ERROR: *watchdog* not setup, cannot start it."))
+ #;(loop (+ count 1) bad-sync-count start-time)
+ ))
+ (db:sync-inmem->disk *dbstruct-db* *toppath* dbname force-sync: #t)
+ (mutex-unlock! *heartbeat-mutex*)
+ ;; when things go wrong we don't want to be doing the various
+ ;; queries too often so we strive to run this stuff only every
+ ;; four seconds or so.
+ (let* ((sync-time (- (current-milliseconds) start-time))
+ (rem-time (quotient (- 4000 sync-time) 1000)))
+ (if (and (<= rem-time 4)
+ (> rem-time 0))
+ (thread-sleep! rem-time)))
+ ;; Transfer *db-last-access* to last-access to use in checking that we are still alive
+ (set! last-access *db-last-access*)
+ (if (< count 1) ;; 3x3 = 9 secs aprox
+ (loop (+ count 1) bad-sync-count (current-milliseconds)))
+ (if (common:low-noise-print 60 "dbstats")
+ (begin
+ (debug:print 0 *default-log-port* "Server stats:")
+ (db:print-current-query-stats)))
+ (let* ((hrs-since-start (/ (- (current-seconds) server-start-time) 3600)))
+ (cond
+ ((not *server-run*)
+ (debug:print-info 0 *default-log-port* "*server-run* set to #f. Shutting down.")
+ (shutdown-server-sequence (get-host-name) port))
+ ((timed-out?)
+ (debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access))
+ (shutdown-server-sequence (get-host-name) port))
+ ((and *server-run*
+ (or (not (timed-out?))
+ (if is-main ;; do not exit if there are other servers (keep main open until all others gone)
+ (> (rmt:get-count-servers sinfo *toppath*) 1)
+ #f)))
+ (if (common:low-noise-print 120 "server continuing")
+ (debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access)))
+ (loop 0 bad-sync-count (current-milliseconds)))
+ (else
+ (set! *unclean-shutdown* #f)
+ (debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access))
+ (shutdown-server-sequence (get-host-name) port)
+ #;(debug:print-info 0 *default-log-port* "Sending 'quit to server, received: "
+ (open-send-receive-nn (conc iface":"port) ;; do this here and not in server-shutdown
+ (sexpr->string 'quit))))))))))
+(define (rmt:get-reasonable-hostname)
+ (let* ((inhost (or (args:get-arg "-server") "-")))
+ (if (equal? inhost "-")
+ (get-host-name)
+ inhost)))
+;; Call this to start the actual server
+;; all routes though here end in exit ...
+;; This is the point at which servers are started
+(define (rmt:server-launch dbname)
+ (debug:print-info 0 *default-log-port* "Entered rmt:server-launch")
+ (let* ((th2 (make-thread (lambda ()
+ (debug:print-info 0 *default-log-port* "Server run thread started")
+ (rmt:run (rmt:get-reasonable-hostname)))
+ "Server run"))
+ (th3 (make-thread (lambda ()
+ (debug:print-info 0 *default-log-port* "Server monitor thread started")
+ (if (args:get-arg "-server")
+ (rmt:keep-running dbname)))
+ "Keep running")))
+ (thread-start! th2)
+ (thread-sleep! 0.252) ;; give the server time to settle before starting the keep-running monitor.
+ (thread-start! th3)
+ (set! *didsomething* #t)
+ (thread-join! th2)
+ (thread-join! th3))
+ #f)
+;; S E R V E R - D I R E C T C A L L S
+(define (rmt:kill-server run-id)
+ (rmt:send-receive 'kill-server #f (list run-id)))
+(define (rmt:start-server run-id)
+ (rmt:send-receive 'start-server #f (list run-id)))
+(define (rmt:server-info apath dbname)
+ (rmt:send-receive 'get-server-info #f (list apath dbname)))
+;; Nanomsg transport
+#;(define (is-port-in-use port-num)
+ (let* ((ret #f))
+ (let-values (((inp oup pid)
+ (process "netstat" (list "-tulpn" ))))
+ (let loop ((inl (read-line inp)))
+ (if (not (eof-object? inl))
+ (begin
+ (if (string-search (regexp (conc ":" port-num)) inl)
+ (begin
+ ;(print "Output: " inl)
+ (set! ret #t))
+ (loop (read-line inp)))))))
+ ret))
+#;(define (open-nn-connection host-port)
+ (let ((req (make-req-socket))
+ (uri (conc "tcp://" host-port)))
+ (nng-dial req uri)
+ (socket-set! req 'nng/recvtimeo 2000)
+ req))
+#;(define (send-receive-nn req msg)
+ (nng-send req msg)
+ (nng-recv req))
+#;(define (close-nn-connection req)
+ (nng-close! req))
+;; ;; open connection to server, send message, close connection
+;; ;;
+;; (define (open-send-close-nn host-port msg #!key (timeout 3) ) ;; default timeout is 3 seconds
+;; (let ((req (make-req-socket 'req))
+;; (uri (conc "tcp://" host-port))
+;; (res #f)
+;; ;; (contacts (alist-ref 'contact attrib))
+;; ;; (mode (alist-ref 'mode attrib))
+;; )
+;; (socket-set! req 'nng/recvtimeo 2000)
+;; (handle-exceptions
+;; exn
+;; (let ((emsg ((condition-property-accessor 'exn 'message) exn)))
+;; ;; Send notification
+;; (debug:print 0 *default-log-port* "ERROR: Failed to connect / send to " uri " message was \"" emsg "\"" )
+;; #f)
+;; (nng-dial req uri)
+;; ;; (print "Connected to the server " )
+;; (nng-send req msg)
+;; ;; (print "Request Sent")
+;; (let* ((th1 (make-thread (lambda ()
+;; (let ((resp (nng-recv req)))
+;; (nng-close! req)
+;; (set! res (if (equal? resp "ok")
+;; #t
+;; #f))))
+;; "recv thread"))
+;; (th2 (make-thread (lambda ()
+;; (thread-sleep! timeout)
+;; (thread-terminate! th1))
+;; "timer thread")))
+;; (thread-start! th1)
+;; (thread-start! th2)
+;; (thread-join! th1)
+;; res))))
+#;(define (open-send-receive-nn host-port msg #!key (timeout 3) ) ;; default timeout is 3 seconds
+ (let ((req (make-req-socket))
+ (uri (conc "tcp://" host-port))
+ (res #f))
+ (handle-exceptions
+ exn
+ (let ((emsg ((condition-property-accessor 'exn 'message) exn)))
+ ;; Send notification
+ (debug:print 0 *default-log-port* "ERROR: Failed to connect / send to " uri " message was \"" emsg "\", exn=" exn)
+ #f)
+ (nng-dial req uri)
+ (nng-send req msg)
+ (let* ((th1 (make-thread (lambda ()
+ (let ((resp (nng-recv req)))
+ (nng-close! req)
+ ;; (print resp)
+ (set! res resp)))
+ "recv thread"))
+ (th2 (make-thread (lambda ()
+ (thread-sleep! timeout)
+ (thread-terminate! th1))
+ "timer thread")))
+ (thread-start! th1)
+ (thread-start! th2)
+ (thread-join! th1)
+ res))))
+;; S E R V E R U T I L I T I E S
+;; run ping in separate process, safest way in some cases
+#;(define (server:ping-server ifaceport)
+ (with-input-from-pipe
+ (conc (common:get-megatest-exe) " -ping " ifaceport)
+ (lambda ()
+ (let loop ((inl (read-line))
+ (res "NOREPLY"))
+ (if (eof-object? inl)
+ (case (string->symbol res)
+ ((NOREPLY) #f)
+ ((LOGIN_OK) #t)
+ (else #f))
+ (loop (read-line) inl))))))
+;; NOT USED (well, ok, reference in rpc-transport but otherwise not used).
+#;(define (server:login toppath)
+ (lambda (toppath)
+ (set! *db-last-access* (current-seconds)) ;; might not be needed.
+ (if (equal? *toppath* toppath)
+ #t
+ #f)))
+;; (define server:sync-lock-token "SERVER_SYNC_LOCK")
+;; (define (server:release-sync-lock)
+;; (db:no-sync-del! *no-sync-db* server:sync-lock-token))
+;; (define (server:have-sync-lock?)
+;; (let* ((have-lock-pair (db:no-sync-get-lock *no-sync-db* server:sync-lock-token))
+;; (have-lock? (car have-lock-pair))
+;; (lock-time (cdr have-lock-pair))
+;; (lock-age (- (current-seconds) lock-time)))
+;; (cond
+;; (have-lock? #t)
+;; ((>lock-age
+;; (* 3 (configf:lookup-number *configdat* "server" "minimum-intersync-delay" default: 180)))
+;; (server:release-sync-lock)
+;; (server:have-sync-lock?))
+;; (else #f))))
ADDED ulex-full/ulex.scm
Index: ulex-full/ulex.scm
--- /dev/null
+++ ulex-full/ulex.scm
@@ -0,0 +1,569 @@
+;; ulex: Distributed sqlite3 db
+;; Copyright (C) 2018-2021 Matt Welland
+;; Redistribution and use in source and binary forms, with or without
+;; modification, is permitted.
+;; ABOUT:
+;; See README in the distribution at
+;; NOTES:
+;; Why sql-de-lite and not say, dbi? - performance mostly, then simplicity.
+(module ulex
+ *
+ #;(
+ ;; NOTE: looking for the handler proc - find the run-listener :)
+ run-listener ;; (run-listener handler-proc [port]) => uconn
+ ;; NOTE: handler-proc params;
+ ;; (handler-proc rem-host-port qrykey cmd params)
+ send-receive ;; (send-receive uconn host-port cmd data)
+ ;; NOTE: cmd can be any plain text symbol except for these;
+ ;; 'ping 'ack 'goodbye 'response
+ set-work-handler ;; (set-work-handler proc)
+ wait-and-close ;; (wait-and-close uconn)
+ ulex-listener?
+ ;; needed to get the interface:port that was automatically found
+ udat-port
+ udat-host-port
+ ;; for testing only
+ ;; pp-uconn
+ ;; parameters
+ work-method ;; parameter; 'threads, 'mailbox, 'limited, 'direct
+ return-method ;; parameter; 'mailbox, 'polling, 'direct
+ )
+(import scheme
+ chicken.base
+ chicken.file
+ chicken.time
+ chicken.condition
+ chicken.string
+ chicken.sort
+ chicken.pretty-print
+ address-info
+ mailbox
+ matchable
+ ;; queues
+ regex
+ regex-case
+ simple-exceptions
+ s11n
+ srfi-1
+ srfi-18
+ srfi-4
+ srfi-69
+ system-information
+ tcp6
+ typed-records
+ )
+;; udat struct, used by both caller and callee
+;; instantiated as uconn by convention
+(defstruct udat
+ ;; the listener side
+ (port #f)
+ (host-port #f)
+ (socket #f)
+ ;; the peers
+ (peers (make-hash-table)) ;; host:port->peer
+ ;; work handling
+ (work-queue (make-mailbox))
+ (work-proc #f) ;; set by user
+ (cnum 0) ;; cookie number
+ (mboxes (make-hash-table)) ;; for the replies
+ (avail-cmboxes '()) ;; list of ( . ) for re-use
+ ;; threads
+ (numthreads 10)
+ (cmd-thread #f)
+ (work-queue-thread #f)
+ (num-threads-running 0)
+ )
+;; Parameters
+;; work-method:
+(define work-method (make-parameter 'mailbox))
+;; mailbox - all rdat goes through mailbox
+;; threads - all rdat immediately executed in new thread
+;; direct - no queuing
+;; return-method, return the result to waiting send-receive:
+(define return-method (make-parameter 'mailbox))
+;; mailbox - create a mailbox and use it for passing returning results to send-receive
+;; polling - put the result in a hash table keyed by qrykey and send-receive can poll it for result
+;; direct - no queuing, result is passed back in single tcp connection
+;; ;; struct for keeping track of others we are talking to
+;; ;;
+;; (defstruct pdat
+;; (host-port #f)
+;; (conns '()) ;; list of pcon structs, pop one off when calling the peer
+;; )
+;; ;; struct for peer connections, keep track of expiration etc.
+;; ;;
+;; (defstruct pcon
+;; (inp #f)
+;; (oup #f)
+;; (exp (+ (current-seconds) 59)) ;; expires at this time, set to (+ (current-seconds) 59)
+;; (lifetime (+ (current-seconds) 600)) ;; throw away and create new after five minutes
+;; )
+;; listener
+;; is uconn a ulex connector (listener)
+(define (ulex-listener? uconn)
+ (udat? uconn))
+;; create a tcp listener and return a populated udat struct with
+;; my port, address, hostname, pid etc.
+;; return #f if fail to find a port to allocate.
+;; if udata-in is #f create the record
+;; if there is already a serv-listener return the udata
+(define (setup-listener uconn #!optional (port 4242))
+ (handle-exceptions
+ exn
+ (if (< port 65535)
+ (setup-listener uconn (+ port 1))
+ #f)
+ (connect-listener uconn port)))
+(define (connect-listener uconn port)
+ ;; (tcp-listener-socket LISTENER)(socket-name so)
+ ;; sockaddr-address, sockaddr-port, sockaddr->string
+ (let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]])
+ (addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname)))
+ (udat-port-set! uconn port)
+ (udat-host-port-set! uconn (conc addr":"port))
+ (udat-socket-set! uconn tlsn)
+ uconn))
+;; run-listener does all the work of starting a listener in a thread
+;; it then returns control
+(define (run-listener handler-proc #!optional (port-suggestion 4242))
+ (let* ((uconn (make-udat)))
+ (udat-work-proc-set! uconn handler-proc)
+ (if (setup-listener uconn port-suggestion)
+ (let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop"))
+ (th2 (make-thread (lambda ()
+ (case (work-method)
+ ((mailbox limited)
+ (process-work-queue uconn))))
+ "Ulex work queue processor")))
+ ;; (tcp-buffer-size 2048)
+ (thread-start! th1)
+ (thread-start! th2)
+ (udat-cmd-thread-set! uconn th1)
+ (udat-work-queue-thread-set! uconn th2)
+ (print "cmd loop and process workers started, listening on "(udat-host-port uconn)".")
+ uconn)
+ (assert #f "ERROR: run-listener called without proper setup."))))
+(define (wait-and-close uconn)
+ (thread-join! (udat-cmd-thread uconn))
+ (tcp-close (udat-socket uconn)))
+;; peers and connections
+(define *send-mutex* (make-mutex))
+;; send structured data to recipient
+;; NOTE: qrykey is what was called the "cookie" previously
+;; retval tells send to expect and wait for return data (one line) and return it or time out
+;; this is for ping where we don't want to necessarily have set up our own server yet.
+;; NOTE: see below for beginnings of code to allow re-use of tcp connections
+;; - I believe (without substantial evidence) that re-using connections will
+;; be beneficial ...
+(define (send udata host-port qrykey cmd params)
+ (let* ((my-host-port (udat-host-port udata)) ;; remote will return to this
+ (isme #f #;(equal? host-port my-host-port)) ;; calling myself?
+ ;; dat is a self-contained work block that can be sent or handled locally
+ (dat (list my-host-port qrykey cmd params #;(cons (current-seconds)(current-milliseconds)))))
+ (cond
+ (isme (ulex-handler udata dat)) ;; no transmission needed
+ (else
+ (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC?
+ exn
+ (message exn)
+ (begin
+ ;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP
+ (let-values (((inp oup)(tcp-connect host-port)))
+ (let ((res (if (and inp oup)
+ (begin
+ (serialize dat oup)
+ (close-output-port oup)
+ (deserialize inp)
+ )
+ (begin
+ (print "ERROR: send called but no receiver has been setup. Please call setup first!")
+ #f))))
+ (close-input-port inp)
+ ;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP
+ res)))))))) ;; res will always be 'ack unless return-method is direct
+(define (send-via-polling uconn host-port cmd data)
+ (let* ((qrykey (make-cookie uconn))
+ (sres (send uconn host-port qrykey cmd data)))
+ (case sres
+ ((ack)
+ (let loop ((start-time (current-milliseconds)))
+ (if (> (current-milliseconds)(+ start-time 10000)) ;; ten seconds timeout
+ (begin
+ (print "ULEX ERROR: timed out waiting for response from "host-port", "cmd" "data)
+ #f)
+ (let* ((result (hash-table-ref/default (udat-mboxes uconn) qrykey #f))) ;; NOTE: we are re-using mboxes hash
+ (if result ;; result is '(status . result-data) or #f for nothing yet
+ (begin
+ (hash-table-delete! (udat-mboxes uconn) qrykey)
+ (cdr result))
+ (begin
+ (thread-sleep! 0.01)
+ (loop start-time)))))))
+ (else
+ (print "ULEX ERROR: Communication failed? sres="sres)
+ #f))))
+(define (send-via-mailbox uconn host-port cmd data)
+ (let* ((cmbox (get-cmbox uconn)) ;; would it be better to keep a stack of mboxes to reuse?
+ (qrykey (car cmbox))
+ (mbox (cdr cmbox))
+ (mbox-time (current-milliseconds))
+ (sres (send uconn host-port qrykey cmd data))) ;; short res
+ (if (eq? sres 'ack)
+ (let* ((mbox-timeout-secs 120 #;(if (eq? 'primordial (thread-name (current-thread)))
+ #f
+ 120)) ;; timeout)
+ (mbox-timeout-result 'MBOX_TIMEOUT)
+ (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result))
+ (mbox-receive-time (current-milliseconds)))
+ ;; (put-cmbox uconn cmbox) ;; reuse mbox and cookie. is it worth it?
+ (hash-table-delete! (udat-mboxes uconn) qrykey)
+ (if (eq? res 'MBOX_TIMEOUT)
+ (begin
+ (print "WARNING: mbox timed out for query "cmd", with data "data
+ ", waiting for response from "host-port".")
+ ;; here it might make sense to clean up connection records and force clean start?
+ ;; NO. The progam using ulex needs to do the reset. Right thing here is exception
+ #f) ;; convert to raising exception?
+ res))
+ (begin
+ (print "ERROR: Communication failed? Got "sres)
+ #f))))
+;; send a request to the given host-port and register a mailbox in udata
+;; wait for the mailbox data and return it
+(define (send-receive uconn host-port cmd data)
+ (let* ((start-time (current-milliseconds))
+ (result (cond
+ ((member cmd '(ping goodbye)) ;; these are immediate
+ (send uconn host-port 'ping cmd data))
+ ((eq? (work-method) 'direct)
+ ;; the result from send will be the actual result, not an 'ack
+ (send uconn host-port 'direct cmd data))
+ (else
+ (case (return-method)
+ ((polling)
+ (send-via-polling uconn host-port cmd data))
+ ((mailbox)
+ (send-via-mailbox uconn host-port cmd data))
+ (else
+ (print "ULEX ERROR: unrecognised return-method "(return-method)".")
+ #f)))))
+ (duration (- (current-milliseconds) start-time)))
+ ;; this is ONLY for development and debugging. It will be removed once Ulex is stable.
+ (if (< 5000 duration)
+ (print "ULEX WARNING: round-trip took "(inexact->exact (round (/ duration 1000)))
+ " seconds; "cmd", host-port="host-port", data="data))
+ result))
+;; responder side
+;; take a request, rdat, and if not immediate put it in the work queue
+;; Reserved cmds; ack ping goodbye response
+(define (ulex-handler uconn rdat)
+ (assert (list? rdat) "FATAL: ulex-handler give rdat as not list")
+ (match rdat ;; (string-split controldat)
+ ((rem-host-port qrykey cmd params);; timedata)
+ ;; (print "ulex-handler got: "rem-host-port" qrykey: "qrykey" cmd: "cmd" params: "params)
+ (case cmd
+ ;; ((ack )(print "Got ack! But why? Should NOT get here.") 'ack)
+ ((ping)
+ ;; (print "Got Ping!")
+ ;; (add-to-work-queue uconn rdat)
+ 'ack)
+ ((goodbye)
+ ;; just clear out references to the caller. NOT COMPLETE
+ (add-to-work-queue uconn rdat)
+ 'ack)
+ ((response) ;; this is a result from remote processing, send it as mail ...
+ (case (return-method)
+ ((polling)
+ (hash-table-set! (udat-mboxes uconn) qrykey (cons 'ok params))
+ 'ack)
+ ((mailbox)
+ (let ((mbox (hash-table-ref/default (udat-mboxes uconn) qrykey #f)))
+ (if mbox
+ (begin
+ (mailbox-send! mbox params) ;; params here is our result
+ 'ack)
+ (begin
+ (print "ERROR: received result but no associated mbox for cookie "qrykey)
+ 'no-mbox-found))))
+ (else (print "ULEX ERROR: unrecognised return-method "(return-method))
+ 'bad-return-method)))
+ (else ;; generic request - hand it to the work queue
+ (add-to-work-queue uconn rdat)
+ 'ack)))
+ (else
+ (print "ULEX ERROR: bad rdat "rdat)
+ 'bad-rdat)))
+;; given an already set up uconn start the cmd-loop
+(define (ulex-cmd-loop uconn)
+ (let* ((serv-listener (udat-socket uconn))
+ (listener (lambda ()
+ (let loop ((state 'start))
+ (let-values (((inp oup)(tcp-accept serv-listener)))
+ ;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP
+ (let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params)
+ (resp (ulex-handler uconn rdat)))
+ (serialize resp oup)
+ (close-input-port inp)
+ (close-output-port oup)
+ ;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP
+ )
+ (loop state))))))
+ ;; start N of them
+ (let loop ((thnum 0)
+ (threads '()))
+ (if (< thnum 100)
+ (let* ((th (make-thread listener (conc "listener" thnum))))
+ (thread-start! th)
+ (loop (+ thnum 1)
+ (cons th threads)))
+ (map thread-join! threads)))))
+;; add a proc to the cmd list, these are done symetrically (i.e. in all instances)
+;; so that the proc can be dereferenced remotely
+(define (set-work-handler uconn proc)
+ (udat-work-proc-set! uconn proc))
+;; work queues - this is all happening on the listener side
+;; rdat is (rem-host-port qrykey cmd params)
+(define (add-to-work-queue uconn rdat)
+ #;(queue-add! (udat-work-queue uconn) rdat)
+ (case (work-method)
+ ((threads)
+ (thread-start! (make-thread (lambda ()
+ (do-work uconn rdat))
+ "worker thread")))
+ ((mailbox)
+ (mailbox-send! (udat-work-queue uconn) rdat))
+ ((direct)
+ (do-work uconn rdat))
+ (else
+ (print "ULEX ERROR: work-method "(work-method)" not recognised, using mailbox.")
+ (mailbox-send! (udat-work-queue uconn) rdat))))
+;; move the logic to return the result somewhere else?
+(define (do-work uconn rdat)
+ (let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change
+ ;; put this following into a do-work procedure
+ (match rdat
+ ((rem-host-port qrykey cmd params)
+ (let* ((start-time (current-milliseconds))
+ (result (proc rem-host-port qrykey cmd params))
+ (end-time (current-milliseconds))
+ (run-time (- end-time start-time)))
+ (case (work-method)
+ ((direct) result)
+ (else
+ (print "ULEX: work "cmd", "params" done in "run-time" ms")
+ ;; send 'response as cmd and result as params
+ (send uconn rem-host-port qrykey 'response result) ;; could check for ack
+ (print "ULEX: response sent back to "rem-host-port" in "(- (current-milliseconds) end-time))))))
+ (MBOX_TIMEOUT 'do-work-timeout)
+ (else
+ (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params")))))
+(define (process-work-queue uconn)
+ (let ((wqueue (udat-work-queue uconn))
+ (proc (udat-work-proc uconn))
+ (numthr (udat-numthreads uconn)))
+ (let loop ((thnum 1)
+ (threads '()))
+ (let ((thlst (cons (make-thread (lambda ()
+ (let work-loop ()
+ (let ((rdat (mailbox-receive! wqueue 24000 'MBOX_TIMEOUT)))
+ (do-work uconn rdat))
+ (work-loop)))
+ (conc "work thread " thnum))
+ threads)))
+ (if (< thnum numthr)
+ (loop (+ thnum 1)
+ thlst)
+ (begin
+ (print "ULEX: Starting "(length thlst)" worker threads.")
+ (map thread-start! thlst)
+ (print "ULEX: Threads started. Joining all.")
+ (map thread-join! thlst)))))))
+;; below was to enable re-use of connections. This seems non-trivial so for
+;; now lets open on each call
+;; ;; given host-port get or create peer struct
+;; ;;
+;; (define (udat-get-peer uconn host-port)
+;; (or (hash-table-ref/default (udat-peers uconn) host-port #f)
+;; ;; no peer, so create pdat and init it
+;; ;; NEED stack of connections, pop and use; inp, oup,
+;; ;; creation_time (remove and create new if over 24hrs old
+;; ;;
+;; (let ((pdat (make-pdat host-port: host-port)))
+;; (hash-table-set! (udat-peers uconn) host-port pdat)
+;; pdat)))
+;; ;; is pcon alive
+;; ;; given host-port and pdat get a pcon
+;; ;;
+;; (define (pdat-get-pcon pdat host-port)
+;; (let loop ((conns (pdat-conns pdat)))
+;; (if (null? conns) ;; none? make and return - do NOT add - it will be pushed back on list later
+;; (init-pcon (make-pcon))
+;; (let* ((conn (pop conns)))
+;; ;; given host-port get a pcon struct
+;; ;;
+;; (define (udat-get-pcon
+;; misc utils
+(define (make-cookie uconn)
+ (let ((newcnum (+ (udat-cnum uconn) 1)))
+ (udat-cnum-set! uconn newcnum)
+ (conc (udat-host-port uconn) ":"
+ newcnum)))
+;; cookie/mboxes
+;; we store each mbox with a cookie ( . )
+(define (get-cmbox uconn)
+ (if (null? (udat-avail-cmboxes uconn))
+ (let ((cookie (make-cookie uconn))
+ (mbox (make-mailbox)))
+ (hash-table-set! (udat-mboxes uconn) cookie mbox)
+ `(,cookie . ,mbox))
+ (let ((cmbox (car (udat-avail-cmboxes uconn))))
+ (udat-avail-cmboxes-set! uconn (cdr (udat-avail-cmboxes uconn)))
+ cmbox)))
+(define (put-cmbox uconn cmbox)
+ (udat-avail-cmboxes-set! uconn (cons cmbox (udat-avail-cmboxes uconn))))
+(define (pp-uconn uconn)
+ (pp (udat->alist uconn)))
+;; network utilities
+;; NOTE: Look at address-info egg as alternative to some of this
+(define (rate-ip ipaddr)
+ (regex-case ipaddr
+ ( "^127\\..*" _ 0 )
+ ( "^(10\\.0|192\\.168)\\..*" _ 1 )
+ ( else 2 ) ))
+;; Change this to bias for addresses with a reasonable broadcast value?
+(define (ip-pref-less? a b)
+ (> (rate-ip a) (rate-ip b)))
+(define (get-my-best-address)
+ (let ((all-my-addresses (get-all-ips)))
+ (cond
+ ((null? all-my-addresses)
+ (get-host-name)) ;; no interfaces?
+ ((eq? (length all-my-addresses) 1)
+ (car all-my-addresses)) ;; only one to choose from, just go with it
+ (else
+ (car (sort all-my-addresses ip-pref-less?))))))
+(define (get-all-ips-sorted)
+ (sort (get-all-ips) ip-pref-less?))
+(define (get-all-ips)
+ (map address-info-host
+ (filter (lambda (x)
+ (equal? (address-info-type x) "tcp"))
+ (address-infos (get-host-name)))))
ADDED ulex-none/dbmgr.scm
Index: ulex-none/dbmgr.scm
--- /dev/null
+++ ulex-none/dbmgr.scm
@@ -0,0 +1,1123 @@
+;; Copyright 2022, Matthew Welland.
+;; This file is part of Megatest.
+;; Megatest is free software: you can redistribute it and/or modify
+;; it under the terms of the GNU General Public License as published by
+;; the Free Software Foundation, either version 3 of the License, or
+;; (at your option) any later version.
+;; Megatest is distributed in the hope that it will be useful,
+;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;; GNU General Public License for more details.
+;; You should have received a copy of the GNU General Public License
+;; along with Megatest. If not, see .
+(declare (unit dbmgrmod))
+(declare (uses ulex))
+(declare (uses apimod))
+(declare (uses pkts))
+(declare (uses commonmod))
+(declare (uses dbmod))
+(declare (uses mtargs))
+(declare (uses portloggermod))
+(declare (uses debugprint))
+(module dbmgrmod
+ *
+(import scheme
+ chicken.base
+ chicken.condition
+ chicken.file
+ chicken.format
+ chicken.port
+ chicken.process
+ chicken.process-context
+ chicken.process-context.posix
+ chicken.sort
+ chicken.string
+ chicken.time
+ (prefix sqlite3 sqlite3:)
+ matchable
+ md5
+ message-digest
+ regex
+ s11n
+ srfi-1
+ srfi-18
+ srfi-69
+ system-information
+ typed-records
+ pkts
+ ulex
+ commonmod
+ apimod
+ dbmod
+ debugprint
+ (prefix mtargs args:)
+ portloggermod
+ )
+;; ;; Configurations for server
+;; ;; (tcp-buffer-size 2048)
+;; ;; (max-connections 2048)
+;; ;; info about me as a listener and my connections to db servers
+;; ;; stored (for now) in *db-serv-info*
+;; ;;
+;; (defstruct servdat
+;; (host #f)
+;; (port #f)
+;; (uuid #f)
+;; (dbfile #f)
+;; (uconn #f) ;; this is the listener *FOR THIS PROCESS*
+;; (mode #f)
+;; (status 'starting)
+;; (trynum 0) ;; count the number of ports we've tried
+;; (conns (make-hash-table)) ;; apath/dbname => conndat
+;; )
+;; (define *db-serv-info* (make-servdat))
+;; (define (servdat->url sdat)
+;; (conc (servdat-host sdat)":"(servdat-port sdat)))
+;; ;; db servers contact info
+;; ;;
+;; (defstruct conndat
+;; (apath #f)
+;; (dbname #f)
+;; (fullname #f)
+;; (hostport #f)
+;; (ipaddr #f)
+;; (port #f)
+;; (srvpkt #f)
+;; (srvkey #f)
+;; (lastmsg 0)
+;; (expires 0))
+;; (define *srvpktspec*
+;; `((server (host . h)
+;; (port . p)
+;; (servkey . k)
+;; (pid . i)
+;; (ipaddr . a)
+;; (dbpath . d))))
+;; ;;======================================================================
+;; ;; S U P P O R T F U N C T I O N S
+;; ;;======================================================================
+;; ;; set up the api proc, seems like there should be a better place for this?
+;; ;;
+;; ;;
+;; ;; (define api-proc (make-parameter conc))
+;; ;; (api-proc api:execute-requests)
+;; ;; do we have a connection to apath dbname and
+;; ;; is it not expired? then return it
+;; ;;
+;; ;; else setup a connection
+;; ;;
+;; ;; if that fails, return '(#f "some reason") ;; NB// convert to raising an exception
+;; ;;
+;; (define (rmt:get-conn remdat apath dbname)
+;; (let* ((fullname (db:dbname->path apath dbname)))
+;; (hash-table-ref/default (servdat-conns remdat) fullname #f)))
+;; (define (rmt:drop-conn remdat apath dbname)
+;; (let* ((fullname (db:dbname->path apath dbname)))
+;; (hash-table-delete! (servdat-conns remdat) fullname)))
+;; (define (rmt:find-main-server uconn apath dbname)
+;; (let* ((pktsdir (get-pkts-dir apath))
+;; (all-srvpkts (get-all-server-pkts pktsdir *srvpktspec*))
+;; (viable-srvs (get-viable-servers all-srvpkts dbname)))
+;; (get-the-server uconn apath viable-srvs)))
+;; (define *connstart-mutex* (make-mutex))
+;; (define *last-main-start* 0)
+;; ;; looks for a connection to main, returns if have and not exired
+;; ;; creates new otherwise
+;; ;;
+;; ;; connections for other servers happens by requesting from main
+;; ;;
+;; ;; TODO: This is unnecessarily re-creating the record in the hash table
+;; ;;
+;; (define (rmt:open-main-connection remdat apath)
+;; (let* ((fullpath (db:dbname->path apath ".db/main.db"))
+;; (conns (servdat-conns remdat))
+;; (conn (rmt:get-conn remdat apath ".db/main.db")) ;; (hash-table-ref/default conns fullpath #f)) ;; TODO - create call for this
+;; (start-rmt:run (lambda ()
+;; (let* ((th1 (make-thread (lambda ()(rmt:run (get-host-name))) "non-db mode server")))
+;; (thread-start! th1)
+;; (thread-sleep! 1)
+;; (let loop ((count 0))
+;; (assert (< count 30) "FATAL: responder failed to initialize in rmt:open-main-connection")
+;; (if (or (not *db-serv-info*)
+;; (not (servdat-uconn *db-serv-info*)))
+;; (begin
+;; (thread-sleep! 1)
+;; (loop (+ count 1)))
+;; (begin
+;; (servdat-mode-set! *db-serv-info* 'non-db)
+;; (servdat-uconn *db-serv-info*)))))))
+;; (myconn (servdat-uconn *db-serv-info*)))
+;; (cond
+;; ((not myconn)
+;; (start-rmt:run)
+;; (rmt:open-main-connection remdat apath))
+;; ((and conn ;; conn is NOT a socket, just saying ...
+;; (< (current-seconds) (conndat-expires conn)))
+;; #t) ;; we are current and good to go - we'll deal elsewhere with a server that was killed or died
+;; ((and conn
+;; (>= (current-seconds)(conndat-expires conn)))
+;; (debug:print-info 0 *default-log-port* "connection to "fullpath" server expired. Reconnecting.")
+;; (rmt:drop-conn remdat apath ".db/main.db") ;;
+;; (rmt:open-main-connection remdat apath))
+;; (else
+;; ;; Below we will find or create and connect to main
+;; (debug:print-info 0 *default-log-port* "rmt:open-main-connection - starting from scratch")
+;; (let* ((dbname (db:run-id->dbname #f))
+;; (the-srv (rmt:find-main-server myconn apath dbname))
+;; (start-main-srv (lambda () ;; call IF there is no the-srv found
+;; (mutex-lock! *connstart-mutex*)
+;; (if (> (- (current-seconds) *last-main-start*) 5) ;; at least four seconds since last attempt to start main server
+;; (begin
+;; (api:run-server-process apath dbname)
+;; (set! *last-main-start* (current-seconds))
+;; (thread-sleep! 1))
+;; (thread-sleep! 0.25))
+;; (mutex-unlock! *connstart-mutex*)
+;; (rmt:open-main-connection remdat apath) ;; TODO: Add limit to number of tries
+;; )))
+;; (if (not the-srv) ;; have server, try connecting to it
+;; (start-main-srv)
+;; (let* ((srv-addr (server-address the-srv)) ;; need serv
+;; (ipaddr (alist-ref 'ipaddr the-srv))
+;; (port (alist-ref 'port the-srv))
+;; (srvkey (alist-ref 'servkey the-srv))
+;; (fullpath (db:dbname->path apath dbname))
+;; (new-the-srv (make-conndat
+;; apath: apath
+;; dbname: dbname
+;; fullname: fullpath
+;; hostport: srv-addr
+;; ;; socket: (open-nn-connection srv-addr) - TODO - open ulex connection?
+;; ipaddr: ipaddr
+;; port: port
+;; srvpkt: the-srv
+;; srvkey: srvkey ;; generated by rmt:get-signature on the server side
+;; lastmsg: (current-seconds)
+;; expires: (+ (current-seconds)
+;; (server:expiration-timeout)
+;; -2) ;; this needs to be gathered during the ping
+;; )))
+;; (hash-table-set! conns fullpath new-the-srv)))
+;; #t)))))
+;; ;; NB// sinfo is a servdat struct
+;; ;;
+;; (define (rmt:general-open-connection sinfo apath dbname #!key (num-tries 5))
+;; (assert (not (equal? dbname ".db/main.db")) "ERROR: general-open-connection should never be called with main as the db")
+;; (let* ((mdbname ".db/main.db") ;; (db:run-id->dbname #f)) TODO: put this back to the lookup when stable
+;; (fullname (db:dbname->path apath dbname))
+;; (conns (servdat-conns sinfo))
+;; (mconn (rmt:get-conn sinfo apath ".db/main.db"))
+;; (dconn (rmt:get-conn sinfo apath dbname)))
+;; #;(if (and mconn
+;; (not (debug:print-logger)))
+;; (begin
+;; (debug:print-info 0 *default-log-port* "Turning on logging to main, look in logs dir for main log.")
+;; (debug:print-logger rmt:log-to-main)))
+;; (cond
+;; ((and mconn
+;; dconn
+;; (< (current-seconds)(conndat-expires dconn)))
+;; #t) ;; good to go
+;; ((not mconn) ;; no channel open to main? open it...
+;; (rmt:open-main-connection sinfo apath)
+;; (rmt:general-open-connection sinfo apath dbname num-tries: (- num-tries 1)))
+;; ((not dconn) ;; no channel open to dbname?
+;; (let* ((res (rmt:send-receive-real sinfo apath mdbname 'get-server `(,apath ,dbname))))
+;; (case res
+;; ((server-started)
+;; (if (> num-tries 0)
+;; (begin
+;; (thread-sleep! 2)
+;; (rmt:general-open-connection sinfo apath dbname num-tries: (- num-tries 1)))
+;; (begin
+;; (debug:print-error 0 *default-log-port* "Failed to start servers needed or open channel to "apath", "dbname)
+;; (exit 1))))
+;; (else
+;; (if (list? res) ;; server has been registered and the info was returned. pass it on.
+;; (begin ;; ("" 53817
+;; ;; "5e34239f48e8973b3813221e54701a01" "24310"
+;; ;; ""
+;; ;; "/home/matt/data/megatest/tests/simplerun"
+;; ;; ".db/1.db")
+;; (match
+;; res
+;; ((host port servkey pid ipaddr apath dbname)
+;; (debug:print-info 0 *default-log-port* "got "res)
+;; (hash-table-set! conns
+;; fullname
+;; (make-conndat
+;; apath: apath
+;; dbname: dbname
+;; hostport: (conc host":"port)
+;; ;; socket: (open-nn-connection (conc host":"port)) ;; TODO - open ulex connection?
+;; ipaddr: ipaddr
+;; port: port
+;; srvkey: servkey
+;; lastmsg: (current-seconds)
+;; expires: (+ (current-seconds)
+;; (server:expiration-timeout)
+;; -2))))
+;; (else
+;; (debug:print-info 0 *default-log-port* "return data from starting server did not match host port servkey pid ipaddr apath dbname " res)))
+;; res)
+;; (begin
+;; (debug:print-info 0 *default-log-port* "Unexpected result: " res)
+;; res)))))))
+;; #t))
+;; ;;======================================================================
+;; ;; (define *localmode* #t)
+;; (define *localmode* #f)
+(define *dbstruct* (make-dbr:dbstruct))
+;; Defaults to current area
+(define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f))
+ (let* ((apath *toppath*)
+ (dbname (db:run-id->dbname rid)))
+ (api:execute-requests *dbstruct* cmd params)))
+;; ;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed
+;; ;; sometime in the future
+;; ;;
+;; (define (rmt:send-receive-real sinfo apath dbname cmd params)
+;; (assert (not (eq? 'primordial (thread-name (current-thread)))) "FATAL: Do not call rmt:send-receive-real in the primodial thread.")
+;; (let* ((cdat (rmt:get-conn sinfo apath dbname)))
+;; (assert cdat "FATAL: rmt:send-receive-real called without the needed channels opened")
+;; (let* ((uconn (servdat-uconn sinfo)) ;; get the interface to ulex
+;; ;; then send-receive using the ulex layer to host-port stored in cdat
+;; (res (send-receive uconn (conndat-hostport cdat) cmd params))
+;; #;(th1 (make-thread (lambda ()
+;; (set! res (send-receive uconn (conndat-hostport cdat) cmd params)))
+;; "send-receive thread")))
+;; ;; (thread-start! th1)
+;; ;; (thread-join! th1) ;; gratuitious thread stuff is so that mailbox is not used in primordial thead
+;; ;; since we accessed the server we can bump the expires time up
+;; (conndat-expires-set! cdat (+ (current-seconds)
+;; (server:expiration-timeout)
+;; -2)) ;; two second margin for network time misalignments etc.
+;; res)))
+;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed
+;; sometime in the future.
+;; Purpose - call the main.db server and request a server be started
+;; for the given area path and dbname
+(define (rmt:print-db-stats)
+ (let ((fmtstr "~40a~7-d~9-d~20,2-f")) ;; "~20,2-f"
+ (debug:print 18 *default-log-port* "DB Stats, "(seconds->year-week/day-time (current-seconds))"\n=====================")
+ (debug:print 18 *default-log-port* (format #f "~40a~8a~10a~10a" "Cmd" "Count" "TotTime" "Avg"))
+ (for-each (lambda (cmd)
+ (let ((cmd-dat (hash-table-ref *db-stats* cmd)))
+ (debug:print 18 *default-log-port* (format #f fmtstr cmd (vector-ref cmd-dat 0) (vector-ref cmd-dat 1) (/ (vector-ref cmd-dat 1)(vector-ref cmd-dat 0))))))
+ (sort (hash-table-keys *db-stats*)
+ (lambda (a b)
+ (> (vector-ref (hash-table-ref *db-stats* a) 0)
+ (vector-ref (hash-table-ref *db-stats* b) 0)))))))
+(define (rmt:get-max-query-average run-id)
+ (mutex-lock! *db-stats-mutex*)
+ (let* ((runkey (conc "run-id=" run-id " "))
+ (cmds (filter (lambda (x)
+ (substring-index runkey x))
+ (hash-table-keys *db-stats*)))
+ (res (if (null? cmds)
+ (cons 'none 0)
+ (let loop ((cmd (car cmds))
+ (tal (cdr cmds))
+ (max-cmd (car cmds))
+ (res 0))
+ (let* ((cmd-dat (hash-table-ref *db-stats* cmd))
+ (tot (vector-ref cmd-dat 0))
+ (curravg (/ (vector-ref cmd-dat 1) (vector-ref cmd-dat 0))) ;; count is never zero by construction
+ (currmax (max res curravg))
+ (newmax-cmd (if (> curravg res) cmd max-cmd)))
+ (if (null? tal)
+ (if (> tot 10)
+ (cons newmax-cmd currmax)
+ (cons 'none 0))
+ (loop (car tal)(cdr tal) newmax-cmd currmax)))))))
+ (mutex-unlock! *db-stats-mutex*)
+ res))
+;; ;; host and port are used to ensure we are remove proper records
+;; (define (rmt:server-shutdown host port)
+;; (let ((dbfile (servdat-dbfile *db-serv-info*)))
+;; (debug:print-info 0 *default-log-port* "dbfile is "dbfile)
+;; (if dbfile
+;; (let* ((am-server (args:get-arg "-server"))
+;; (dbfile (args:get-arg "-db"))
+;; (apath *toppath*)
+;; #;(sinfo *remotedat*)) ;; foundation for future fix
+;; (if *dbstruct-db*
+;; (let* ((dbdat (db:get-dbdat *dbstruct-db* apath dbfile))
+;; (db (dbr:dbdat-db dbdat))
+;; (inmem (dbr:dbdat-db dbdat)) ;; WRONG
+;; )
+;; ;; do a final sync here
+;; (debug:print-info 0 *default-log-port* "Doing final sync for "apath" "dbfile" at "(current-seconds))
+;; (db:sync-inmem->disk *dbstruct-db* apath dbfile force-sync: #t)
+;; ;; let's finalize here
+;; (debug:print-info 0 *default-log-port* "Finalizing db and inmem")
+;; (if (sqlite3:database? db)
+;; (sqlite3:finalize! db)
+;; (debug:print-info 0 *default-log-port* "in rmt:server-shutdown, db is not a database, not finalizing..."))
+;; (if (sqlite3:database? inmem)
+;; (sqlite3:finalize! inmem)
+;; (debug:print-info 0 *default-log-port* "in rmt:server-shutdown, inmem is not a database, not finalizing..."))
+;; (debug:print-info 0 *default-log-port* "Finalizing db and inmem complete"))
+;; (debug:print-info 0 *default-log-port* "Db was never opened, no cleanup to do."))
+;; (if (not am-server)
+;; (debug:print-info 0 *default-log-port* "I am not a server, should NOT get here!")
+;; (if (string-match ".*/main.db$" dbfile)
+;; (let ((pkt-file (conc (get-pkts-dir *toppath*)
+;; "/" (servdat-uuid *db-serv-info*)
+;; ".pkt")))
+;; (debug:print-info 0 *default-log-port* "removing pkt "pkt-file)
+;; (delete-file* pkt-file)
+;; (debug:print-info 0 *default-log-port* "Releasing lock (if any) for "dbfile ", host "host", port "port)
+;; (db:with-lock-db
+;; (servdat-dbfile *db-serv-info*)
+;; (lambda (dbh dbfile)
+;; (db:release-lock dbh dbfile host port)))) ;; I'm not the server - should not have a lock to remove
+;; (let* ((sdat *db-serv-info*) ;; we have a run-id server
+;; (host (servdat-host sdat))
+;; (port (servdat-port sdat))
+;; (uuid (servdat-uuid sdat))
+;; (res (rmt:deregister-server *db-serv-info* *toppath* host port uuid dbfile)))
+;; (debug:print-info 0 *default-log-port* "deregistered-server, res="res)
+;; (debug:print-info 0 *default-log-port* "deregistering server "host":"port" with uuid "uuid)
+;; )))))))
+;; (define (common:run-sync?)
+;; ;; (and (common:on-homehost?)
+;; (args:get-arg "-server"))
+;; (define *rmt:run-mutex* (make-mutex))
+;; (define *rmt:run-flag* #f)
+;; ;; Main entry point to start a server. was start-server
+;; (define (rmt:run hostn)
+;; (mutex-lock! *rmt:run-mutex*)
+;; (if *rmt:run-flag*
+;; (begin
+;; (debug:print-warn 0 *default-log-port* "rmt:run already running.")
+;; (mutex-unlock! *rmt:run-mutex*))
+;; (begin
+;; (set! *rmt:run-flag* #t)
+;; (mutex-unlock! *rmt:run-mutex*)
+;; ;; ;; Configurations for server
+;; ;; (tcp-buffer-size 2048)
+;; ;; (max-connections 2048)
+;; (debug:print 2 *default-log-port* "PID: "(current-process-id)". Attempting to start the server ...")
+;; (if (and *db-serv-info*
+;; (servdat-uconn *db-serv-info*))
+;; (let* ((uconn (servdat-uconn *db-serv-info*)))
+;; (wait-and-close uconn))
+;; (let* ((port (portlogger:open-run-close portlogger:find-port))
+;; (handler-proc (lambda (rem-host-port qrykey cmd params) ;;
+;; (set! *db-last-access* (current-seconds))
+;; (assert (list? params) "FATAL: handler called with non-list params")
+;; (assert (args:get-arg "-server") "FATAL: handler called on non-server side. cmd="cmd", params="params)
+;; (debug:print 0 *default-log-port* "handler call: "cmd", params="params)
+;; (api:execute-requests *dbstruct-db* cmd params))))
+;; ;; (api:process-request *dbstuct-db*
+;; (if (not *db-serv-info*)
+;; (set! *db-serv-info* (make-servdat host: hostn port: port)))
+;; (let* ((uconn (run-listener handler-proc port))
+;; (rport (udat-port uconn))) ;; the real port
+;; (servdat-host-set! *db-serv-info* hostn)
+;; (servdat-port-set! *db-serv-info* rport)
+;; (servdat-uconn-set! *db-serv-info* uconn)
+;; (wait-and-close uconn)
+;; (db:print-current-query-stats)
+;; )))
+;; (let* ((host (servdat-host *db-serv-info*))
+;; (port (servdat-port *db-serv-info*))
+;; (mode (or (servdat-mode *db-serv-info*)
+;; "non-db")))
+;; ;; server exit stuff here
+;; ;; (rmt:server-shutdown host port) - always do in on-exit
+;; ;; (portlogger:open-run-close portlogger:set-port port "released") ;; moved to on-exit
+;; (debug:print-info 0 *default-log-port* "Server "host":"port" mode "mode"shutdown complete. Exiting")
+;; ))))
+;; ;;======================================================================
+;; ;; S E R V E R U T I L I T I E S
+;; ;;======================================================================
+;; ;;======================================================================
+;; ;;======================================================================
+;; ;; only use for main.db - need to re-write some of this :(
+;; ;;
+;; (define (get-lock-db sdat dbfile host port)
+;; (assert host "FATAL: get-lock-db called with host not set.")
+;; (assert port "FATAL: get-lock-db called with port not set.")
+;; (let* ((dbh (db:open-run-db dbfile db:initialize-db)) ;; open-run-db creates a standard db with schema used by all situations
+;; (res (db:get-iam-server-lock dbh dbfile host port))
+;; (uconn (servdat-uconn sdat)))
+;; ;; res => list then already locked, check server is responsive
+;; ;; => #t then sucessfully got the lock
+;; ;; => #f reserved for future use as to indicate something went wrong
+;; (match res
+;; ((owner_pid owner_host owner_port event_time)
+;; (if (server-ready? uconn (conc owner_host":"owner_port) "abc")
+;; #f ;; locked by someone else
+;; (begin ;; locked by someone dead and gone
+;; (debug:print 0 *default-log-port* "WARNING: stale lock - have to steal it. This may fail.")
+;; (db:steal-lock-db dbh dbfile port))))
+;; (#t #t) ;; placeholder so that we don't touch res if it is #t
+;; (else (set! res #f)))
+;; (sqlite3:finalize! dbh)
+;; res))
+;; (define (register-server pkts-dir pkt-spec host port servkey ipaddr dbpath)
+;; (let* ((pkt-dat `((host . ,host)
+;; (port . ,port)
+;; (servkey . ,servkey)
+;; (pid . ,(current-process-id))
+;; (ipaddr . ,ipaddr)
+;; (dbpath . ,dbpath)))
+;; (uuid (write-alist->pkt
+;; pkts-dir
+;; pkt-dat
+;; pktspec: pkt-spec
+;; ptype: 'server)))
+;; (debug:print 0 *default-log-port* "Server on "host":"port" registered in pkt "uuid)
+;; uuid))
+;; (define (get-pkts-dir #!optional (apath #f))
+;; (let* ((effective-toppath (or *toppath* apath)))
+;; (assert effective-toppath
+;; "ERROR: get-pkts-dir called without *toppath* set. Exiting.")
+;; (let* ((pdir (conc effective-toppath "/.meta/srvpkts")))
+;; (if (file-exists? pdir)
+;; pdir
+;; (begin
+;; (handle-exceptions ;; this exception handler should NOT be needed but ...
+;; exn
+;; pdir
+;; (create-directory pdir #t))
+;; pdir)))))
+;; ;; given a pkts dir read
+;; ;;
+;; (define (get-all-server-pkts pktsdir-in pktspec)
+;; (let* ((pktsdir (if (file-exists? pktsdir-in)
+;; pktsdir-in
+;; (begin
+;; (create-directory pktsdir-in #t)
+;; pktsdir-in)))
+;; (all-pkt-files (glob (conc pktsdir "/*.pkt"))))
+;; (map (lambda (pkt-file)
+;; (read-pkt->alist pkt-file pktspec: pktspec))
+;; all-pkt-files)))
+;; (define (server-address srv-pkt)
+;; (conc (alist-ref 'host srv-pkt) ":"
+;; (alist-ref 'port srv-pkt)))
+(define (server-ready? uconn host-port key) ;; server-address is host:port
+ #;(let* ((params `((cmd . ping)(key . ,key)))
+ (data `((cmd . ping)
+ (key . ,key)
+ (params . ,params))) ;; I don't get it.
+ (res (send-receive uconn host-port 'ping data)))
+ (if (eq? res 'ack) ;; yep, likely it is who we want on the other end
+ res
+ #f))
+ #t)
+;; ; from the pkts return servers associated with dbpath
+;; ;; NOTE: Only one can be alive - have to check on each
+;; ;; in the list of pkts returned
+;; ;;
+;; (define (get-viable-servers serv-pkts dbpath)
+;; (let loop ((tail serv-pkts)
+;; (res '()))
+;; (if (null? tail)
+;; res ;; NOTE: sort by age so oldest is considered first
+;; (let* ((spkt (car tail)))
+;; (loop (cdr tail)
+;; (if (equal? dbpath (alist-ref 'dbpath spkt))
+;; (cons spkt res)
+;; res))))))
+;; (define (remove-pkts-if-not-alive uconn serv-pkts)
+;; (filter (lambda (pkt)
+;; (let* ((host (alist-ref 'host pkt))
+;; (port (alist-ref 'port pkt))
+;; (host-port (conc host":"port))
+;; (key (alist-ref 'servkey pkt))
+;; (pktz (alist-ref 'Z pkt))
+;; (res (server-ready? uconn host-port key)))
+;; (if res
+;; res
+;; (let* ((pktsdir (get-pkts-dir *toppath*))
+;; (pktpath (conc pktsdir"/"pktz".pkt")))
+;; (debug:print 0 *default-log-port* "WARNING: pkt with no server "pktpath)
+;; (delete-file* pktpath)
+;; #f))))
+;; serv-pkts))
+;; ;; from viable servers get one that is alive and ready
+;; ;;
+;; (define (get-the-server uconn apath serv-pkts)
+;; (let loop ((tail serv-pkts))
+;; (if (null? tail)
+;; #f
+;; (let* ((spkt (car tail))
+;; (host (alist-ref 'ipaddr spkt))
+;; (port (alist-ref 'port spkt))
+;; (host-port (conc host":"port))
+;; (dbpth (alist-ref 'dbpath spkt))
+;; (srvkey (alist-ref 'Z spkt)) ;; (alist-ref 'srvkey spkt))
+;; (addr (server-address spkt)))
+;; (if (server-ready? uconn host-port srvkey)
+;; spkt
+;; (loop (cdr tail)))))))
+;; ;; am I the "first" in line server? I.e. my D card is smallest
+;; ;; use Z card as tie breaker
+;; ;;
+;; (define (get-best-candidate serv-pkts dbpath)
+;; (if (null? serv-pkts)
+;; #f
+;; (let loop ((tail serv-pkts)
+;; (best (car serv-pkts)))
+;; (if (null? tail)
+;; best
+;; (let* ((candidate (car tail))
+;; (candidate-bd (string->number (alist-ref 'D candidate)))
+;; (best-bd (string->number (alist-ref 'D best)))
+;; ;; bigger number is younger
+;; (candidate-z (alist-ref 'Z candidate))
+;; (best-z (alist-ref 'Z best))
+;; (new-best (cond
+;; ((> best-bd candidate-bd) ;; best is younger than candidate
+;; candidate)
+;; ((< best-bd candidate-bd) ;; candidate is younger than best
+;; best)
+;; (else
+;; (if (string>=? best-z candidate-z)
+;; best
+;; candidate))))) ;; use Z card as tie breaker
+;; (if (null? tail)
+;; new-best
+;; (loop (cdr tail) new-best)))))))
+;; ;;======================================================================
+;; ;;======================================================================
+;; ;; if .db/main.db check the pkts
+;; ;;
+;; (define (rmt:wait-for-server pkts-dir db-file server-key)
+;; (let* ((sdat *db-serv-info*))
+;; (let loop ((start-time (current-seconds))
+;; (changed #t)
+;; (last-sdat "not this"))
+;; (begin ;; let ((sdat #f))
+;; (thread-sleep! 0.01)
+;; (debug:print-info 0 *default-log-port* "Waiting for server alive signature")
+;; (mutex-lock! *heartbeat-mutex*)
+;; (set! sdat *db-serv-info*)
+;; (mutex-unlock! *heartbeat-mutex*)
+;; (if (and sdat
+;; (not changed)
+;; (> (- (current-seconds) start-time) 2))
+;; (let* ((uconn (servdat-uconn sdat)))
+;; (servdat-status-set! sdat 'iface-stable)
+;; (debug:print-info 0 *default-log-port* "Received server alive signature, now attempting to lock in server")
+;; ;; create a server pkt in *toppath*/.meta/srvpkts
+;; ;; TODO:
+;; ;; 1. change sdat to stuct
+;; ;; 2. add uuid to struct
+;; ;; 3. update uuid in sdat here
+;; ;;
+;; (servdat-uuid-set! sdat
+;; (register-server
+;; pkts-dir *srvpktspec*
+;; (get-host-name)
+;; (servdat-port sdat) server-key
+;; (servdat-host sdat) db-file))
+;; ;; (set! *my-signature* (servdat-uuid sdat)) ;; replace with Z, no, stick with proper key
+;; ;; now read pkts and see if we are a contender
+;; (let* ((all-pkts (get-all-server-pkts pkts-dir *srvpktspec*))
+;; (viables (get-viable-servers all-pkts db-file))
+;; (alive (remove-pkts-if-not-alive uconn viables))
+;; (best-srv (get-best-candidate alive db-file))
+;; (best-srv-key (if best-srv (alist-ref 'servkey best-srv) #f))
+;; (i-am-srv (equal? best-srv-key server-key))
+;; (delete-pkt (lambda ()
+;; (let* ((pktfile (conc (get-pkts-dir *toppath*)
+;; "/" (servdat-uuid *db-serv-info*)
+;; ".pkt")))
+;; (debug:print-info 0 *default-log-port* "Attempting to remove bogus pkt file "pktfile)
+;; (delete-file* pktfile))))) ;; remove immediately instead of waiting for on-exit
+;; (debug:print 0 *default-log-port* "best-srv-key: "best-srv-key", server-key: "server-key", i-am-srv: "i-am-srv)
+;; ;; am I the best-srv, compare server-keys to know
+;; (if i-am-srv
+;; (if (get-lock-db sdat db-file (servdat-host sdat)(servdat-port sdat)) ;; (db:get-iam-server-lock *dbstruct-db* *toppath* run-id)
+;; (begin
+;; (debug:print-info 0 *default-log-port* "I'm the server!")
+;; (servdat-dbfile-set! sdat db-file)
+;; (servdat-status-set! sdat 'db-locked))
+;; (begin
+;; (debug:print-info 0 *default-log-port* "I'm not the server, exiting.")
+;; (bdat-time-to-exit-set! *bdat* #t)
+;; (delete-pkt)
+;; (thread-sleep! 0.2)
+;; (exit)))
+;; (begin
+;; (debug:print-info 0 *default-log-port*
+;; "Keys do not match "best-srv-key", "server-key", exiting.")
+;; (bdat-time-to-exit-set! *bdat* #t)
+;; (delete-pkt)
+;; (thread-sleep! 0.2)
+;; (exit)))
+;; sdat))
+;; (begin ;; sdat not yet contains server info
+;; (debug:print-info 0 *default-log-port* "Still waiting, last-sdat=" last-sdat)
+;; (sleep 4)
+;; (if (> (- (current-seconds) start-time) 120) ;; been waiting for two minutes
+;; (begin
+;; (debug:print-error 0 *default-log-port* "transport appears to have died, exiting server")
+;; (exit))
+;; (loop start-time
+;; (equal? sdat last-sdat)
+;; sdat))))))))
+;; (define (rmt:register-server sinfo apath iface port server-key dbname)
+;; (servdat-conns sinfo) ;; just checking types
+;; (rmt:open-main-connection sinfo apath) ;; we need a channel to main.db
+;; (rmt:send-receive-real sinfo apath ;; params: host port servkey pid ipaddr dbpath
+;; (db:run-id->dbname #f)
+;; 'register-server `(,iface
+;; ,port
+;; ,server-key
+;; ,(current-process-id)
+;; ,iface
+;; ,apath
+;; ,dbname)))
+;; (define (rmt:get-count-servers sinfo apath)
+;; (servdat-conns sinfo) ;; just checking types
+;; (rmt:open-main-connection sinfo apath) ;; we need a channel to main.db
+;; (rmt:send-receive-real sinfo apath ;; params: host port servkey pid ipaddr dbpath
+;; (db:run-id->dbname #f)
+;; 'get-count-servers `(,apath)))
+(define (rmt:get-servers-info apath)
+ (rmt:send-receive 'get-servers-info #f `(,apath)))
+;; (define (rmt:deregister-server db-serv-info apath iface port server-key dbname)
+;; (rmt:open-main-connection db-serv-info apath) ;; we need a channel to main.db
+;; (rmt:send-receive-real db-serv-info apath ;; params: host port servkey pid ipaddr dbpath
+;; (db:run-id->dbname #f)
+;; 'deregister-server `(,iface
+;; ,port
+;; ,server-key
+;; ,(current-process-id)
+;; ,iface
+;; ,apath
+;; ,dbname)))
+;; (define (rmt:wait-for-stable-interface #!optional (num-tries-allowed 100))
+;; ;; wait until *db-serv-info* stops changing
+;; (let* ((stime (current-seconds)))
+;; (let loop ((last-host #f)
+;; (last-port #f)
+;; (tries 0))
+;; (let* ((curr-host (and *db-serv-info* (servdat-host *db-serv-info*)))
+;; (curr-port (and *db-serv-info* (servdat-port *db-serv-info*))))
+;; ;; first we verify port and interface, update *db-serv-info* in need be.
+;; (cond
+;; ((> tries num-tries-allowed)
+;; (debug:print 0 *default-log-port* "rmt:keep-running, giving up after trying for several minutes.")
+;; (exit 1))
+;; ((not *db-serv-info*)
+;; (thread-sleep! 0.25)
+;; (loop curr-host curr-port (+ tries 1)))
+;; ((or (not last-host)(not last-port))
+;; (debug:print 0 *default-log-port* "rmt:keep-running, still no interface, tries="tries)
+;; (thread-sleep! 0.25)
+;; (loop curr-host curr-port (+ tries 1)))
+;; ((or (not (equal? last-host curr-host))
+;; (not (equal? last-port curr-port)))
+;; (debug:print-info 0 *default-log-port* "WARNING: interface changed, refreshing iface and port info")
+;; (thread-sleep! 0.25)
+;; (loop curr-host curr-port (+ tries 1)))
+;; ((< (- (current-seconds) stime) 1) ;; keep up the looping until at least 3 seconds have passed
+;; (thread-sleep! 0.5)
+;; (loop curr-host curr-port (+ tries 1)))
+;; (else
+;; (rmt:get-signature) ;; sets *my-signature* as side effect
+;; (servdat-status-set! *db-serv-info* 'interface-stable)
+;; (debug:print 0 *default-log-port*
+;; "SERVER STARTED: " curr-host
+;; ":" curr-port
+;; " AT " (current-seconds) " server signature: " *my-signature*
+;; " with "(servdat-trynum *db-serv-info*)" port changes")
+;; (flush-output *default-log-port*)
+;; #t))))))
+;; ;; run rmt:keep-running in a parallel thread to monitor that the db is being
+;; ;; used and to shutdown after sometime if it is not.
+;; ;;
+;; (define (rmt:keep-running dbname)
+;; ;; if none running or if > 20 seconds since
+;; ;; server last used then start shutdown
+;; ;; This thread waits for the server to come alive
+;; (debug:print-info 0 *default-log-port* "Starting the sync-back, keep alive thread in server")
+;; (let* ((sinfo *db-serv-info*)
+;; (server-start-time (current-seconds))
+;; (pkts-dir (get-pkts-dir))
+;; (server-key (rmt:get-signature)) ;; This servers key
+;; (is-main (equal? (args:get-arg "-db") ".db/main.db"))
+;; (last-access 0)
+;; (server-timeout (server:expiration-timeout))
+;; (shutdown-server-sequence (lambda (host port)
+;; (set! *unclean-shutdown* #f) ;; Should not be needed anymore
+;; (debug:print-info 0 *default-log-port* "Starting to shutdown the server. pid="(current-process-id))
+;; ;; (rmt:server-shutdown host port) -- called in on-exit
+;; ;; (portlogger:open-run-close portlogger:set-port port "released") called in on-exit
+;; (exit)))
+;; (timed-out? (lambda ()
+;; (<= (+ last-access server-timeout)
+;; (current-seconds)))))
+;; (servdat-dbfile-set! *db-serv-info* (args:get-arg "-db"))
+;; ;; main and run db servers have both got wait logic (could/should merge it)
+;; (if is-main
+;; (rmt:wait-for-server pkts-dir dbname server-key)
+;; (rmt:wait-for-stable-interface))
+;; ;; this is our forever loop
+;; (let* ((iface (servdat-host *db-serv-info*))
+;; (port (servdat-port *db-serv-info*))
+;; (uconn (servdat-uconn *db-serv-info*)))
+;; (let loop ((count 0)
+;; (bad-sync-count 0)
+;; (start-time (current-milliseconds)))
+;; (if (and (not is-main)
+;; (common:low-noise-print 60 "servdat-status"))
+;; (debug:print-info 0 *default-log-port* "servdat-status is " (servdat-status *db-serv-info*)))
+;; (mutex-lock! *heartbeat-mutex*)
+;; ;; set up the database handle
+;; (if (not *dbstruct-db*) ;; no db opened yet, open the db and register with main if appropriate
+;; (let ((watchdog (bdat-watchdog *bdat*)))
+;; (debug:print 0 *default-log-port* "SERVER: dbprep")
+;; (db:setup dbname) ;; sets *dbstruct-db* as side effect
+;; (servdat-status-set! *db-serv-info* 'db-opened)
+;; ;; IFF I'm not main, call into main and register self
+;; (if (not is-main)
+;; (let ((res (rmt:register-server sinfo
+;; *toppath* iface port
+;; server-key dbname)))
+;; (if res ;; we are the server
+;; (servdat-status-set! *db-serv-info* 'have-interface-and-db)
+;; ;; now check that the db locker is alive, clear it out if not
+;; (let* ((serv-info (rmt:server-info *toppath* dbname)))
+;; (match serv-info
+;; ((host port servkey pid ipaddr apath dbpath)
+;; (if (not (server-ready? uconn (conc host":"port) servkey))
+;; (begin
+;; (debug:print-info 0 *default-log-port* "Server registered but not alive. Removing and trying again.")
+;; (rmt:deregister-server sinfo apath host port servkey dbpath) ;; servkey pid ipaddr apath dbpath)
+;; (loop (+ count 1) bad-sync-count start-time))))
+;; (else
+;; (debug:print 0 *default-log-port* "We are not the server for "dbname", exiting. Server info is: "serv-info)
+;; (exit)))))))
+;; (debug:print 0 *default-log-port*
+;; "SERVER: running, db "dbname" opened, megatest version: "
+;; (common:get-full-version))
+;; ;; start the watchdog
+;; ;; is this really needed?
+;; #;(if watchdog
+;; (if (not (member (thread-state watchdog)
+;; '(ready running blocked
+;; sleeping dead)))
+;; (begin
+;; (debug:print-info 0 *default-log-port* "Starting watchdog thread (in state "(thread-state watchdog)")")
+;; (thread-start! watchdog))
+;; (debug:print-info 0 *default-log-port* "Not starting watchdog thread (in state "(thread-state watchdog)")"))
+;; (debug:print 0 *default-log-port* "ERROR: *watchdog* not setup, cannot start it."))
+;; #;(loop (+ count 1) bad-sync-count start-time)
+;; ))
+;; (db:sync-inmem->disk *dbstruct-db* *toppath* dbname force-sync: #t)
+;; (mutex-unlock! *heartbeat-mutex*)
+;; ;; when things go wrong we don't want to be doing the various
+;; ;; queries too often so we strive to run this stuff only every
+;; ;; four seconds or so.
+;; (let* ((sync-time (- (current-milliseconds) start-time))
+;; (rem-time (quotient (- 4000 sync-time) 1000)))
+;; (if (and (<= rem-time 4)
+;; (> rem-time 0))
+;; (thread-sleep! rem-time)))
+;; ;; Transfer *db-last-access* to last-access to use in checking that we are still alive
+;; (set! last-access *db-last-access*)
+;; (if (< count 1) ;; 3x3 = 9 secs aprox
+;; (loop (+ count 1) bad-sync-count (current-milliseconds)))
+;; (if (common:low-noise-print 60 "dbstats")
+;; (begin
+;; (debug:print 0 *default-log-port* "Server stats:")
+;; (db:print-current-query-stats)))
+;; (let* ((hrs-since-start (/ (- (current-seconds) server-start-time) 3600)))
+;; (cond
+;; ((not *server-run*)
+;; (debug:print-info 0 *default-log-port* "*server-run* set to #f. Shutting down.")
+;; (shutdown-server-sequence (get-host-name) port))
+;; ((timed-out?)
+;; (debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access))
+;; (shutdown-server-sequence (get-host-name) port))
+;; ((and *server-run*
+;; (or (not (timed-out?))
+;; (if is-main ;; do not exit if there are other servers (keep main open until all others gone)
+;; (> (rmt:get-count-servers sinfo *toppath*) 1)
+;; #f)))
+;; (if (common:low-noise-print 120 "server continuing")
+;; (debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access)))
+;; (loop 0 bad-sync-count (current-milliseconds)))
+;; (else
+;; (set! *unclean-shutdown* #f)
+;; (debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access))
+;; (shutdown-server-sequence (get-host-name) port)
+;; #;(debug:print-info 0 *default-log-port* "Sending 'quit to server, received: "
+;; (open-send-receive-nn (conc iface":"port) ;; do this here and not in server-shutdown
+;; (sexpr->string 'quit))))))))))
+;; (define (rmt:get-reasonable-hostname)
+;; (let* ((inhost (or (args:get-arg "-server") "-")))
+;; (if (equal? inhost "-")
+;; (get-host-name)
+;; inhost)))
+;; Call this to start the actual server
+;; all routes though here end in exit ...
+;; This is the point at which servers are started
+(define (rmt:server-launch dbname)
+ (debug:print-info 0 *default-log-port* "Entered rmt:server-launch")
+ #;(let* ((th2 (make-thread (lambda ()
+ (debug:print-info 0 *default-log-port* "Server run thread started")
+ (rmt:run (rmt:get-reasonable-hostname)))
+ "Server run"))
+ (th3 (make-thread (lambda ()
+ (debug:print-info 0 *default-log-port* "Server monitor thread started")
+ (if (args:get-arg "-server")
+ (rmt:keep-running dbname)))
+ "Keep running")))
+ (thread-start! th2)
+ (thread-sleep! 0.252) ;; give the server time to settle before starting the keep-running monitor.
+ (thread-start! th3)
+ (set! *didsomething* #t)
+ (thread-join! th2)
+ (thread-join! th3))
+ #f)
+;; ;;======================================================================
+;; ;; S E R V E R - D I R E C T C A L L S
+;; ;;======================================================================
+;; (define (rmt:kill-server run-id)
+;; (rmt:send-receive 'kill-server #f (list run-id)))
+;; (define (rmt:start-server run-id)
+;; (rmt:send-receive 'start-server #f (list run-id)))
+;; (define (rmt:server-info apath dbname)
+;; (rmt:send-receive 'get-server-info #f (list apath dbname)))
+;; ;;======================================================================
+;; ;; Nanomsg transport
+;; ;;======================================================================
+;; #;(define (is-port-in-use port-num)
+;; (let* ((ret #f))
+;; (let-values (((inp oup pid)
+;; (process "netstat" (list "-tulpn" ))))
+;; (let loop ((inl (read-line inp)))
+;; (if (not (eof-object? inl))
+;; (begin
+;; (if (string-search (regexp (conc ":" port-num)) inl)
+;; (begin
+;; ;(print "Output: " inl)
+;; (set! ret #t))
+;; (loop (read-line inp)))))))
+;; ret))
+;; #;(define (open-nn-connection host-port)
+;; (let ((req (make-req-socket))
+;; (uri (conc "tcp://" host-port)))
+;; (nng-dial req uri)
+;; (socket-set! req 'nng/recvtimeo 2000)
+;; req))
+;; #;(define (send-receive-nn req msg)
+;; (nng-send req msg)
+;; (nng-recv req))
+;; #;(define (close-nn-connection req)
+;; (nng-close! req))
+;; ;; ;; open connection to server, send message, close connection
+;; ;; ;;
+;; ;; (define (open-send-close-nn host-port msg #!key (timeout 3) ) ;; default timeout is 3 seconds
+;; ;; (let ((req (make-req-socket 'req))
+;; ;; (uri (conc "tcp://" host-port))
+;; ;; (res #f)
+;; ;; ;; (contacts (alist-ref 'contact attrib))
+;; ;; ;; (mode (alist-ref 'mode attrib))
+;; ;; )
+;; ;; (socket-set! req 'nng/recvtimeo 2000)
+;; ;; (handle-exceptions
+;; ;; exn
+;; ;; (let ((emsg ((condition-property-accessor 'exn 'message) exn)))
+;; ;; ;; Send notification
+;; ;; (debug:print 0 *default-log-port* "ERROR: Failed to connect / send to " uri " message was \"" emsg "\"" )
+;; ;; #f)
+;; ;; (nng-dial req uri)
+;; ;; ;; (print "Connected to the server " )
+;; ;; (nng-send req msg)
+;; ;; ;; (print "Request Sent")
+;; ;; (let* ((th1 (make-thread (lambda ()
+;; ;; (let ((resp (nng-recv req)))
+;; ;; (nng-close! req)
+;; ;; (set! res (if (equal? resp "ok")
+;; ;; #t
+;; ;; #f))))
+;; ;; "recv thread"))
+;; ;; (th2 (make-thread (lambda ()
+;; ;; (thread-sleep! timeout)
+;; ;; (thread-terminate! th1))
+;; ;; "timer thread")))
+;; ;; (thread-start! th1)
+;; ;; (thread-start! th2)
+;; ;; (thread-join! th1)
+;; ;; res))))
+;; ;;
+;; #;(define (open-send-receive-nn host-port msg #!key (timeout 3) ) ;; default timeout is 3 seconds
+;; (let ((req (make-req-socket))
+;; (uri (conc "tcp://" host-port))
+;; (res #f))
+;; (handle-exceptions
+;; exn
+;; (let ((emsg ((condition-property-accessor 'exn 'message) exn)))
+;; ;; Send notification
+;; (debug:print 0 *default-log-port* "ERROR: Failed to connect / send to " uri " message was \"" emsg "\", exn=" exn)
+;; #f)
+;; (nng-dial req uri)
+;; (nng-send req msg)
+;; (let* ((th1 (make-thread (lambda ()
+;; (let ((resp (nng-recv req)))
+;; (nng-close! req)
+;; ;; (print resp)
+;; (set! res resp)))
+;; "recv thread"))
+;; (th2 (make-thread (lambda ()
+;; (thread-sleep! timeout)
+;; (thread-terminate! th1))
+;; "timer thread")))
+;; (thread-start! th1)
+;; (thread-start! th2)
+;; (thread-join! th1)
+;; res))))
+;; ;;======================================================================
+;; ;; S E R V E R U T I L I T I E S
+;; ;;======================================================================
+;; ;; run ping in separate process, safest way in some cases
+;; ;;
+;; #;(define (server:ping-server ifaceport)
+;; (with-input-from-pipe
+;; (conc (common:get-megatest-exe) " -ping " ifaceport)
+;; (lambda ()
+;; (let loop ((inl (read-line))
+;; (res "NOREPLY"))
+;; (if (eof-object? inl)
+;; (case (string->symbol res)
+;; ((NOREPLY) #f)
+;; ((LOGIN_OK) #t)
+;; (else #f))
+;; (loop (read-line) inl))))))
+;; ;; NOT USED (well, ok, reference in rpc-transport but otherwise not used).
+;; ;;
+;; #;(define (server:login toppath)
+;; (lambda (toppath)
+;; (set! *db-last-access* (current-seconds)) ;; might not be needed.
+;; (if (equal? *toppath* toppath)
+;; #t
+;; #f)))
+;; ;; (define server:sync-lock-token "SERVER_SYNC_LOCK")
+;; ;; (define (server:release-sync-lock)
+;; ;; (db:no-sync-del! *no-sync-db* server:sync-lock-token))
+;; ;; (define (server:have-sync-lock?)
+;; ;; (let* ((have-lock-pair (db:no-sync-get-lock *no-sync-db* server:sync-lock-token))
+;; ;; (have-lock? (car have-lock-pair))
+;; ;; (lock-time (cdr have-lock-pair))
+;; ;; (lock-age (- (current-seconds) lock-time)))
+;; ;; (cond
+;; ;; (have-lock? #t)
+;; ;; ((>lock-age
+;; ;; (* 3 (configf:lookup-number *configdat* "server" "minimum-intersync-delay" default: 180)))
+;; ;; (server:release-sync-lock)
+;; ;; (server:have-sync-lock?))
+;; ;; (else #f))))
ADDED ulex-none/ulex.scm
Index: ulex-none/ulex.scm
--- /dev/null
+++ ulex-none/ulex.scm
@@ -0,0 +1,569 @@
+;; ulex: Distributed sqlite3 db
+;; Copyright (C) 2018-2021 Matt Welland
+;; Redistribution and use in source and binary forms, with or without
+;; modification, is permitted.
+;; ABOUT:
+;; See README in the distribution at
+;; NOTES:
+;; Why sql-de-lite and not say, dbi? - performance mostly, then simplicity.
+(module ulex
+ *
+ #;(
+ ;; NOTE: looking for the handler proc - find the run-listener :)
+ run-listener ;; (run-listener handler-proc [port]) => uconn
+ ;; NOTE: handler-proc params;
+ ;; (handler-proc rem-host-port qrykey cmd params)
+ send-receive ;; (send-receive uconn host-port cmd data)
+ ;; NOTE: cmd can be any plain text symbol except for these;
+ ;; 'ping 'ack 'goodbye 'response
+ set-work-handler ;; (set-work-handler proc)
+ wait-and-close ;; (wait-and-close uconn)
+ ulex-listener?
+ ;; needed to get the interface:port that was automatically found
+ udat-port
+ udat-host-port
+ ;; for testing only
+ ;; pp-uconn
+ ;; parameters
+ work-method ;; parameter; 'threads, 'mailbox, 'limited, 'direct
+ return-method ;; parameter; 'mailbox, 'polling, 'direct
+ )
+(import scheme
+ chicken.base
+ chicken.file
+ chicken.time
+ chicken.condition
+ chicken.string
+ chicken.sort
+ chicken.pretty-print
+ address-info
+ mailbox
+ matchable
+ ;; queues
+ regex
+ regex-case
+ simple-exceptions
+ s11n
+ srfi-1
+ srfi-18
+ srfi-4
+ srfi-69
+ system-information
+ tcp6
+ typed-records
+ )
+;; ;; udat struct, used by both caller and callee
+;; ;; instantiated as uconn by convention
+;; ;;
+;; (defstruct udat
+;; ;; the listener side
+;; (port #f)
+;; (host-port #f)
+;; (socket #f)
+;; ;; the peers
+;; (peers (make-hash-table)) ;; host:port->peer
+;; ;; work handling
+;; (work-queue (make-mailbox))
+;; (work-proc #f) ;; set by user
+;; (cnum 0) ;; cookie number
+;; (mboxes (make-hash-table)) ;; for the replies
+;; (avail-cmboxes '()) ;; list of ( . ) for re-use
+;; ;; threads
+;; (numthreads 10)
+;; (cmd-thread #f)
+;; (work-queue-thread #f)
+;; (num-threads-running 0)
+;; )
+;; ;; Parameters
+;; ;; work-method:
+;; (define work-method (make-parameter 'mailbox))
+;; ;; mailbox - all rdat goes through mailbox
+;; ;; threads - all rdat immediately executed in new thread
+;; ;; direct - no queuing
+;; ;;
+;; ;; return-method, return the result to waiting send-receive:
+;; (define return-method (make-parameter 'mailbox))
+;; ;; mailbox - create a mailbox and use it for passing returning results to send-receive
+;; ;; polling - put the result in a hash table keyed by qrykey and send-receive can poll it for result
+;; ;; direct - no queuing, result is passed back in single tcp connection
+;; ;;
+;; ;; ;; struct for keeping track of others we are talking to
+;; ;; ;;
+;; ;; (defstruct pdat
+;; ;; (host-port #f)
+;; ;; (conns '()) ;; list of pcon structs, pop one off when calling the peer
+;; ;; )
+;; ;;
+;; ;; ;; struct for peer connections, keep track of expiration etc.
+;; ;; ;;
+;; ;; (defstruct pcon
+;; ;; (inp #f)
+;; ;; (oup #f)
+;; ;; (exp (+ (current-seconds) 59)) ;; expires at this time, set to (+ (current-seconds) 59)
+;; ;; (lifetime (+ (current-seconds) 600)) ;; throw away and create new after five minutes
+;; ;; )
+;; ;;======================================================================
+;; ;; listener
+;; ;;======================================================================
+;; ;; is uconn a ulex connector (listener)
+;; ;;
+;; (define (ulex-listener? uconn)
+;; (udat? uconn))
+;; ;; create a tcp listener and return a populated udat struct with
+;; ;; my port, address, hostname, pid etc.
+;; ;; return #f if fail to find a port to allocate.
+;; ;;
+;; ;; if udata-in is #f create the record
+;; ;; if there is already a serv-listener return the udata
+;; ;;
+;; (define (setup-listener uconn #!optional (port 4242))
+;; (handle-exceptions
+;; exn
+;; (if (< port 65535)
+;; (setup-listener uconn (+ port 1))
+;; #f)
+;; (connect-listener uconn port)))
+;; (define (connect-listener uconn port)
+;; ;; (tcp-listener-socket LISTENER)(socket-name so)
+;; ;; sockaddr-address, sockaddr-port, sockaddr->string
+;; (let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]])
+;; (addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname)))
+;; (udat-port-set! uconn port)
+;; (udat-host-port-set! uconn (conc addr":"port))
+;; (udat-socket-set! uconn tlsn)
+;; uconn))
+;; ;; run-listener does all the work of starting a listener in a thread
+;; ;; it then returns control
+;; ;;
+;; (define (run-listener handler-proc #!optional (port-suggestion 4242))
+;; (let* ((uconn (make-udat)))
+;; (udat-work-proc-set! uconn handler-proc)
+;; (if (setup-listener uconn port-suggestion)
+;; (let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop"))
+;; (th2 (make-thread (lambda ()
+;; (case (work-method)
+;; ((mailbox limited)
+;; (process-work-queue uconn))))
+;; "Ulex work queue processor")))
+;; ;; (tcp-buffer-size 2048)
+;; (thread-start! th1)
+;; (thread-start! th2)
+;; (udat-cmd-thread-set! uconn th1)
+;; (udat-work-queue-thread-set! uconn th2)
+;; (print "cmd loop and process workers started, listening on "(udat-host-port uconn)".")
+;; uconn)
+;; (assert #f "ERROR: run-listener called without proper setup."))))
+;; (define (wait-and-close uconn)
+;; (thread-join! (udat-cmd-thread uconn))
+;; (tcp-close (udat-socket uconn)))
+;; ;;======================================================================
+;; ;; peers and connections
+;; ;;======================================================================
+;; (define *send-mutex* (make-mutex))
+;; ;; send structured data to recipient
+;; ;;
+;; ;; NOTE: qrykey is what was called the "cookie" previously
+;; ;;
+;; ;; retval tells send to expect and wait for return data (one line) and return it or time out
+;; ;; this is for ping where we don't want to necessarily have set up our own server yet.
+;; ;;
+;; ;; NOTE: see below for beginnings of code to allow re-use of tcp connections
+;; ;; - I believe (without substantial evidence) that re-using connections will
+;; ;; be beneficial ...
+;; ;;
+;; (define (send udata host-port qrykey cmd params)
+;; (let* ((my-host-port (udat-host-port udata)) ;; remote will return to this
+;; (isme #f #;(equal? host-port my-host-port)) ;; calling myself?
+;; ;; dat is a self-contained work block that can be sent or handled locally
+;; (dat (list my-host-port qrykey cmd params #;(cons (current-seconds)(current-milliseconds)))))
+;; (cond
+;; (isme (ulex-handler udata dat)) ;; no transmission needed
+;; (else
+;; (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC?
+;; exn
+;; (message exn)
+;; (begin
+;; ;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP
+;; (let-values (((inp oup)(tcp-connect host-port)))
+;; (let ((res (if (and inp oup)
+;; (begin
+;; (serialize dat oup)
+;; (close-output-port oup)
+;; (deserialize inp)
+;; )
+;; (begin
+;; (print "ERROR: send called but no receiver has been setup. Please call setup first!")
+;; #f))))
+;; (close-input-port inp)
+;; ;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP
+;; res)))))))) ;; res will always be 'ack unless return-method is direct
+;; (define (send-via-polling uconn host-port cmd data)
+;; (let* ((qrykey (make-cookie uconn))
+;; (sres (send uconn host-port qrykey cmd data)))
+;; (case sres
+;; ((ack)
+;; (let loop ((start-time (current-milliseconds)))
+;; (if (> (current-milliseconds)(+ start-time 10000)) ;; ten seconds timeout
+;; (begin
+;; (print "ULEX ERROR: timed out waiting for response from "host-port", "cmd" "data)
+;; #f)
+;; (let* ((result (hash-table-ref/default (udat-mboxes uconn) qrykey #f))) ;; NOTE: we are re-using mboxes hash
+;; (if result ;; result is '(status . result-data) or #f for nothing yet
+;; (begin
+;; (hash-table-delete! (udat-mboxes uconn) qrykey)
+;; (cdr result))
+;; (begin
+;; (thread-sleep! 0.01)
+;; (loop start-time)))))))
+;; (else
+;; (print "ULEX ERROR: Communication failed? sres="sres)
+;; #f))))
+;; (define (send-via-mailbox uconn host-port cmd data)
+;; (let* ((cmbox (get-cmbox uconn)) ;; would it be better to keep a stack of mboxes to reuse?
+;; (qrykey (car cmbox))
+;; (mbox (cdr cmbox))
+;; (mbox-time (current-milliseconds))
+;; (sres (send uconn host-port qrykey cmd data))) ;; short res
+;; (if (eq? sres 'ack)
+;; (let* ((mbox-timeout-secs 120 #;(if (eq? 'primordial (thread-name (current-thread)))
+;; #f
+;; 120)) ;; timeout)
+;; (mbox-timeout-result 'MBOX_TIMEOUT)
+;; (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result))
+;; (mbox-receive-time (current-milliseconds)))
+;; ;; (put-cmbox uconn cmbox) ;; reuse mbox and cookie. is it worth it?
+;; (hash-table-delete! (udat-mboxes uconn) qrykey)
+;; (if (eq? res 'MBOX_TIMEOUT)
+;; (begin
+;; (print "WARNING: mbox timed out for query "cmd", with data "data
+;; ", waiting for response from "host-port".")
+;; ;; here it might make sense to clean up connection records and force clean start?
+;; ;; NO. The progam using ulex needs to do the reset. Right thing here is exception
+;; #f) ;; convert to raising exception?
+;; res))
+;; (begin
+;; (print "ERROR: Communication failed? Got "sres)
+;; #f))))
+;; ;; send a request to the given host-port and register a mailbox in udata
+;; ;; wait for the mailbox data and return it
+;; ;;
+;; (define (send-receive uconn host-port cmd data)
+;; (let* ((start-time (current-milliseconds))
+;; (result (cond
+;; ((member cmd '(ping goodbye)) ;; these are immediate
+;; (send uconn host-port 'ping cmd data))
+;; ((eq? (work-method) 'direct)
+;; ;; the result from send will be the actual result, not an 'ack
+;; (send uconn host-port 'direct cmd data))
+;; (else
+;; (case (return-method)
+;; ((polling)
+;; (send-via-polling uconn host-port cmd data))
+;; ((mailbox)
+;; (send-via-mailbox uconn host-port cmd data))
+;; (else
+;; (print "ULEX ERROR: unrecognised return-method "(return-method)".")
+;; #f)))))
+;; (duration (- (current-milliseconds) start-time)))
+;; ;; this is ONLY for development and debugging. It will be removed once Ulex is stable.
+;; (if (< 5000 duration)
+;; (print "ULEX WARNING: round-trip took "(inexact->exact (round (/ duration 1000)))
+;; " seconds; "cmd", host-port="host-port", data="data))
+;; result))
+;; ;;======================================================================
+;; ;; responder side
+;; ;;======================================================================
+;; ;; take a request, rdat, and if not immediate put it in the work queue
+;; ;;
+;; ;; Reserved cmds; ack ping goodbye response
+;; ;;
+;; (define (ulex-handler uconn rdat)
+;; (assert (list? rdat) "FATAL: ulex-handler give rdat as not list")
+;; (match rdat ;; (string-split controldat)
+;; ((rem-host-port qrykey cmd params);; timedata)
+;; ;; (print "ulex-handler got: "rem-host-port" qrykey: "qrykey" cmd: "cmd" params: "params)
+;; (case cmd
+;; ;; ((ack )(print "Got ack! But why? Should NOT get here.") 'ack)
+;; ((ping)
+;; ;; (print "Got Ping!")
+;; ;; (add-to-work-queue uconn rdat)
+;; 'ack)
+;; ((goodbye)
+;; ;; just clear out references to the caller. NOT COMPLETE
+;; (add-to-work-queue uconn rdat)
+;; 'ack)
+;; ((response) ;; this is a result from remote processing, send it as mail ...
+;; (case (return-method)
+;; ((polling)
+;; (hash-table-set! (udat-mboxes uconn) qrykey (cons 'ok params))
+;; 'ack)
+;; ((mailbox)
+;; (let ((mbox (hash-table-ref/default (udat-mboxes uconn) qrykey #f)))
+;; (if mbox
+;; (begin
+;; (mailbox-send! mbox params) ;; params here is our result
+;; 'ack)
+;; (begin
+;; (print "ERROR: received result but no associated mbox for cookie "qrykey)
+;; 'no-mbox-found))))
+;; (else (print "ULEX ERROR: unrecognised return-method "(return-method))
+;; 'bad-return-method)))
+;; (else ;; generic request - hand it to the work queue
+;; (add-to-work-queue uconn rdat)
+;; 'ack)))
+;; (else
+;; (print "ULEX ERROR: bad rdat "rdat)
+;; 'bad-rdat)))
+;; ;; given an already set up uconn start the cmd-loop
+;; ;;
+;; (define (ulex-cmd-loop uconn)
+;; (let* ((serv-listener (udat-socket uconn))
+;; (listener (lambda ()
+;; (let loop ((state 'start))
+;; (let-values (((inp oup)(tcp-accept serv-listener)))
+;; ;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP
+;; (let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params)
+;; (resp (ulex-handler uconn rdat)))
+;; (serialize resp oup)
+;; (close-input-port inp)
+;; (close-output-port oup)
+;; ;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP
+;; )
+;; (loop state))))))
+;; ;; start N of them
+;; (let loop ((thnum 0)
+;; (threads '()))
+;; (if (< thnum 100)
+;; (let* ((th (make-thread listener (conc "listener" thnum))))
+;; (thread-start! th)
+;; (loop (+ thnum 1)
+;; (cons th threads)))
+;; (map thread-join! threads)))))
+;; ;; add a proc to the cmd list, these are done symetrically (i.e. in all instances)
+;; ;; so that the proc can be dereferenced remotely
+;; ;;
+;; (define (set-work-handler uconn proc)
+;; (udat-work-proc-set! uconn proc))
+;; ;;======================================================================
+;; ;; work queues - this is all happening on the listener side
+;; ;;======================================================================
+;; ;; rdat is (rem-host-port qrykey cmd params)
+;; (define (add-to-work-queue uconn rdat)
+;; #;(queue-add! (udat-work-queue uconn) rdat)
+;; (case (work-method)
+;; ((threads)
+;; (thread-start! (make-thread (lambda ()
+;; (do-work uconn rdat))
+;; "worker thread")))
+;; ((mailbox)
+;; (mailbox-send! (udat-work-queue uconn) rdat))
+;; ((direct)
+;; (do-work uconn rdat))
+;; (else
+;; (print "ULEX ERROR: work-method "(work-method)" not recognised, using mailbox.")
+;; (mailbox-send! (udat-work-queue uconn) rdat))))
+;; ;; move the logic to return the result somewhere else?
+;; ;;
+;; (define (do-work uconn rdat)
+;; (let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change
+;; ;; put this following into a do-work procedure
+;; (match rdat
+;; ((rem-host-port qrykey cmd params)
+;; (let* ((start-time (current-milliseconds))
+;; (result (proc rem-host-port qrykey cmd params))
+;; (end-time (current-milliseconds))
+;; (run-time (- end-time start-time)))
+;; (case (work-method)
+;; ((direct) result)
+;; (else
+;; (print "ULEX: work "cmd", "params" done in "run-time" ms")
+;; ;; send 'response as cmd and result as params
+;; (send uconn rem-host-port qrykey 'response result) ;; could check for ack
+;; (print "ULEX: response sent back to "rem-host-port" in "(- (current-milliseconds) end-time))))))
+;; (MBOX_TIMEOUT 'do-work-timeout)
+;; (else
+;; (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params")))))
+;; ;;
+;; (define (process-work-queue uconn)
+;; (let ((wqueue (udat-work-queue uconn))
+;; (proc (udat-work-proc uconn))
+;; (numthr (udat-numthreads uconn)))
+;; (let loop ((thnum 1)
+;; (threads '()))
+;; (let ((thlst (cons (make-thread (lambda ()
+;; (let work-loop ()
+;; (let ((rdat (mailbox-receive! wqueue 24000 'MBOX_TIMEOUT)))
+;; (do-work uconn rdat))
+;; (work-loop)))
+;; (conc "work thread " thnum))
+;; threads)))
+;; (if (< thnum numthr)
+;; (loop (+ thnum 1)
+;; thlst)
+;; (begin
+;; (print "ULEX: Starting "(length thlst)" worker threads.")
+;; (map thread-start! thlst)
+;; (print "ULEX: Threads started. Joining all.")
+;; (map thread-join! thlst)))))))
+;; ;; below was to enable re-use of connections. This seems non-trivial so for
+;; ;; now lets open on each call
+;; ;;
+;; ;; ;; given host-port get or create peer struct
+;; ;; ;;
+;; ;; (define (udat-get-peer uconn host-port)
+;; ;; (or (hash-table-ref/default (udat-peers uconn) host-port #f)
+;; ;; ;; no peer, so create pdat and init it
+;; ;;
+;; ;; ;; NEED stack of connections, pop and use; inp, oup,
+;; ;; ;; creation_time (remove and create new if over 24hrs old
+;; ;; ;;
+;; ;; (let ((pdat (make-pdat host-port: host-port)))
+;; ;; (hash-table-set! (udat-peers uconn) host-port pdat)
+;; ;; pdat)))
+;; ;;
+;; ;; ;; is pcon alive
+;; ;;
+;; ;; ;; given host-port and pdat get a pcon
+;; ;; ;;
+;; ;; (define (pdat-get-pcon pdat host-port)
+;; ;; (let loop ((conns (pdat-conns pdat)))
+;; ;; (if (null? conns) ;; none? make and return - do NOT add - it will be pushed back on list later
+;; ;; (init-pcon (make-pcon))
+;; ;; (let* ((conn (pop conns)))
+;; ;;
+;; ;; ;; given host-port get a pcon struct
+;; ;; ;;
+;; ;; (define (udat-get-pcon
+;; ;;======================================================================
+;; ;; misc utils
+;; ;;======================================================================
+;; (define (make-cookie uconn)
+;; (let ((newcnum (+ (udat-cnum uconn) 1)))
+;; (udat-cnum-set! uconn newcnum)
+;; (conc (udat-host-port uconn) ":"
+;; newcnum)))
+;; ;; cookie/mboxes
+;; ;; we store each mbox with a cookie ( . )
+;; ;;
+;; (define (get-cmbox uconn)
+;; (if (null? (udat-avail-cmboxes uconn))
+;; (let ((cookie (make-cookie uconn))
+;; (mbox (make-mailbox)))
+;; (hash-table-set! (udat-mboxes uconn) cookie mbox)
+;; `(,cookie . ,mbox))
+;; (let ((cmbox (car (udat-avail-cmboxes uconn))))
+;; (udat-avail-cmboxes-set! uconn (cdr (udat-avail-cmboxes uconn)))
+;; cmbox)))
+;; (define (put-cmbox uconn cmbox)
+;; (udat-avail-cmboxes-set! uconn (cons cmbox (udat-avail-cmboxes uconn))))
+;; (define (pp-uconn uconn)
+;; (pp (udat->alist uconn)))
+;; ;;======================================================================
+;; ;; network utilities
+;; ;;======================================================================
+;; ;; NOTE: Look at address-info egg as alternative to some of this
+;; (define (rate-ip ipaddr)
+;; (regex-case ipaddr
+;; ( "^127\\..*" _ 0 )
+;; ( "^(10\\.0|192\\.168)\\..*" _ 1 )
+;; ( else 2 ) ))
+;; ;; Change this to bias for addresses with a reasonable broadcast value?
+;; ;;
+;; (define (ip-pref-less? a b)
+;; (> (rate-ip a) (rate-ip b)))
+;; (define (get-my-best-address)
+;; (let ((all-my-addresses (get-all-ips)))
+;; (cond
+;; ((null? all-my-addresses)
+;; (get-host-name)) ;; no interfaces?
+;; ((eq? (length all-my-addresses) 1)
+;; (car all-my-addresses)) ;; only one to choose from, just go with it
+;; (else
+;; (car (sort all-my-addresses ip-pref-less?))))))
+;; (define (get-all-ips-sorted)
+;; (sort (get-all-ips) ip-pref-less?))
+;; (define (get-all-ips)
+;; (map address-info-host
+;; (filter (lambda (x)
+;; (equal? (address-info-type x) "tcp"))
+;; (address-infos (get-host-name)))))
ADDED ulex-simple/dbmgr.scm
Index: ulex-simple/dbmgr.scm
--- /dev/null
+++ ulex-simple/dbmgr.scm
@@ -0,0 +1,953 @@
+;; Copyright 2022, Matthew Welland.
+;; This file is part of Megatest.
+;; Megatest is free software: you can redistribute it and/or modify
+;; it under the terms of the GNU General Public License as published by
+;; the Free Software Foundation, either version 3 of the License, or
+;; (at your option) any later version.
+;; Megatest is distributed in the hope that it will be useful,
+;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;; GNU General Public License for more details.
+;; You should have received a copy of the GNU General Public License
+;; along with Megatest. If not, see .
+(declare (unit dbmgrmod))
+(declare (uses ulex))
+(declare (uses apimod))
+(declare (uses pkts))
+(declare (uses commonmod))
+(declare (uses dbmod))
+(declare (uses mtargs))
+(declare (uses portloggermod))
+(declare (uses debugprint))
+(module dbmgrmod
+ *
+(import scheme
+ chicken.base
+ chicken.condition
+ chicken.file
+ chicken.format
+ chicken.port
+ chicken.process
+ chicken.process-context
+ chicken.process-context.posix
+ chicken.sort
+ chicken.string
+ chicken.time
+ (prefix sqlite3 sqlite3:)
+ matchable
+ md5
+ message-digest
+ regex
+ s11n
+ srfi-1
+ srfi-18
+ srfi-69
+ system-information
+ typed-records
+ pkts
+ ulex
+ commonmod
+ apimod
+ dbmod
+ debugprint
+ (prefix mtargs args:)
+ portloggermod
+ )
+;; Configurations for server
+;; (tcp-buffer-size 2048)
+;; (max-connections 2048)
+;; info about me as a listener and my connections to db servers
+;; stored (for now) in *db-serv-info*
+(defstruct servdat
+ (host #f)
+ (port #f)
+ (uuid #f)
+ (dbfile #f)
+ (uconn #f) ;; this is the listener *FOR THIS PROCESS*
+ (mode #f)
+ (status 'starting)
+ (trynum 0) ;; count the number of ports we've tried
+ (conns (make-hash-table)) ;; apath/dbname => conndat
+ )
+(define *db-serv-info* (make-servdat))
+(define (servdat->url sdat)
+ (conc (servdat-host sdat)":"(servdat-port sdat)))
+;; db servers contact info
+(defstruct conndat
+ (apath #f)
+ (dbname #f)
+ (fullname #f)
+ (hostport #f)
+ (ipaddr #f)
+ (port #f)
+ (srvpkt #f)
+ (srvkey #f)
+ (lastmsg 0)
+ (expires 0))
+(define *srvpktspec*
+ `((server (host . h)
+ (port . p)
+ (servkey . k)
+ (pid . i)
+ (ipaddr . a)
+ (dbpath . d))))
+;; S U P P O R T F U N C T I O N S
+;; set up the api proc, seems like there should be a better place for this?
+;; (define api-proc (make-parameter conc))
+;; (api-proc api:execute-requests)
+;; do we have a connection to apath dbname and
+;; is it not expired? then return it
+;; else setup a connection
+;; if that fails, return '(#f "some reason") ;; NB// convert to raising an exception
+(define (rmt:get-conn remdat apath dbname)
+ (let* ((fullname (db:dbname->path apath dbname)))
+ (hash-table-ref/default (servdat-conns remdat) fullname #f)))
+(define (rmt:drop-conn remdat apath dbname)
+ (let* ((fullname (db:dbname->path apath dbname)))
+ (hash-table-delete! (servdat-conns remdat) fullname)))
+(define (rmt:find-main-server uconn apath dbname)
+ (let* ((pktsdir (get-pkts-dir apath))
+ (all-srvpkts (get-all-server-pkts pktsdir *srvpktspec*))
+ (viable-srvs (get-viable-servers all-srvpkts dbname)))
+ (get-the-server uconn apath viable-srvs)))
+(define *connstart-mutex* (make-mutex))
+(define *last-main-start* 0)
+;; looks for a connection to main, returns if have and not exired
+;; creates new otherwise
+;; connections for other servers happens by requesting from main
+;; TODO: This is unnecessarily re-creating the record in the hash table
+(define (rmt:open-main-connection remdat apath)
+ (let* ((fullpath (db:dbname->path apath ".db/main.db"))
+ (conns (servdat-conns remdat))
+ (conn (rmt:get-conn remdat apath ".db/main.db")) ;; (hash-table-ref/default conns fullpath #f)) ;; TODO - create call for this
+ (myconn (servdat-uconn remdat)))
+ (cond
+ ((not (listener-running?))
+ (servdat-uconn-set! remdat (make-udat))
+ (rmt:open-main-connection remdat apath))
+ ((and conn ;; conn is NOT a socket, just saying ...
+ (< (current-seconds) (conndat-expires conn)))
+ #t) ;; we are current and good to go - we'll deal elsewhere with a server that was killed or died
+ ((and conn
+ (>= (current-seconds)(conndat-expires conn)))
+ (debug:print-info 0 *default-log-port* "connection to "fullpath" server expired. Reconnecting.")
+ (rmt:drop-conn remdat apath ".db/main.db") ;;
+ (rmt:open-main-connection remdat apath))
+ (else
+ ;; Below we will find or create and connect to main
+ (debug:print-info 0 *default-log-port* "rmt:open-main-connection - starting from scratch")
+ (let* ((dbname (db:run-id->dbname #f))
+ (the-srv (rmt:find-main-server myconn apath dbname))
+ (start-main-srv (lambda () ;; call IF there is no the-srv found
+ (mutex-lock! *connstart-mutex*)
+ (if (> (- (current-seconds) *last-main-start*) 5) ;; at least four seconds since last attempt to start main server
+ (begin
+ (api:run-server-process apath dbname)
+ (set! *last-main-start* (current-seconds))
+ (thread-sleep! 1))
+ (thread-sleep! 0.25))
+ (mutex-unlock! *connstart-mutex*)
+ (rmt:open-main-connection remdat apath) ;; TODO: Add limit to number of tries
+ )))
+ (if (not the-srv) ;; have server, try connecting to it
+ (start-main-srv)
+ (let* ((srv-addr (server-address the-srv)) ;; need serv
+ (ipaddr (alist-ref 'ipaddr the-srv))
+ (port (alist-ref 'port the-srv))
+ (srvkey (alist-ref 'servkey the-srv))
+ (fullpath (db:dbname->path apath dbname))
+ (new-the-srv (make-conndat
+ apath: apath
+ dbname: dbname
+ fullname: fullpath
+ hostport: srv-addr
+ ;; socket: (open-nn-connection srv-addr) - TODO - open ulex connection?
+ ipaddr: ipaddr
+ port: port
+ srvpkt: the-srv
+ srvkey: srvkey ;; generated by rmt:get-signature on the server side
+ lastmsg: (current-seconds)
+ expires: (+ (current-seconds)
+ (server:expiration-timeout)
+ -2) ;; this needs to be gathered during the ping
+ )))
+ (hash-table-set! conns fullpath new-the-srv)))
+ #t)))))
+;; NB// sinfo is a servdat struct
+(define (rmt:general-open-connection sinfo apath dbname #!key (num-tries 5))
+ (assert (not (equal? dbname ".db/main.db")) "ERROR: general-open-connection should never be called with main as the db")
+ (let* ((mdbname ".db/main.db") ;; (db:run-id->dbname #f)) TODO: put this back to the lookup when stable
+ (fullname (db:dbname->path apath dbname))
+ (conns (servdat-conns sinfo))
+ (mconn (rmt:get-conn sinfo apath ".db/main.db"))
+ (dconn (rmt:get-conn sinfo apath dbname)))
+ #;(if (and mconn
+ (not (debug:print-logger)))
+ (begin
+ (debug:print-info 0 *default-log-port* "Turning on logging to main, look in logs dir for main log.")
+ (debug:print-logger rmt:log-to-main)))
+ (cond
+ ((and mconn
+ dconn
+ (< (current-seconds)(conndat-expires dconn)))
+ #t) ;; good to go
+ ((not mconn) ;; no channel open to main? open it...
+ (rmt:open-main-connection sinfo apath)
+ (rmt:general-open-connection sinfo apath dbname num-tries: (- num-tries 1)))
+ ((not dconn) ;; no channel open to dbname?
+ (let* ((res (rmt:send-receive sinfo apath mdbname 'get-server `(,apath ,dbname))))
+ (case res
+ ((server-started)
+ (if (> num-tries 0)
+ (begin
+ (thread-sleep! 2)
+ (rmt:general-open-connection sinfo apath dbname num-tries: (- num-tries 1)))
+ (begin
+ (debug:print-error 0 *default-log-port* "Failed to start servers needed or open channel to "apath", "dbname)
+ (exit 1))))
+ (else
+ (if (list? res) ;; server has been registered and the info was returned. pass it on.
+ (begin ;; ("" 53817
+ ;; "5e34239f48e8973b3813221e54701a01" "24310"
+ ;; ""
+ ;; "/home/matt/data/megatest/tests/simplerun"
+ ;; ".db/1.db")
+ (match
+ res
+ ((host port servkey pid ipaddr apath dbname)
+ (debug:print-info 0 *default-log-port* "got "res)
+ (hash-table-set! conns
+ fullname
+ (make-conndat
+ apath: apath
+ dbname: dbname
+ hostport: (conc host":"port)
+ ;; socket: (open-nn-connection (conc host":"port)) ;; TODO - open ulex connection?
+ ipaddr: ipaddr
+ port: port
+ srvkey: servkey
+ lastmsg: (current-seconds)
+ expires: (+ (current-seconds)
+ (server:expiration-timeout)
+ -2))))
+ (else
+ (debug:print-info 0 *default-log-port* "return data from starting server did not match host port servkey pid ipaddr apath dbname " res)))
+ res)
+ (begin
+ (debug:print-info 0 *default-log-port* "Unexpected result: " res)
+ res)))))))
+ #t))
+;; (define *localmode* #t)
+(define *localmode* #f)
+(define *dbstruct* (make-dbr:dbstruct))
+;; Defaults to current area
+(define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f))
+ (let* ((apath *toppath*)
+ (sinfo *db-serv-info*)
+ (dbname (db:run-id->dbname rid)))
+ (rmt:open-main-connection sinfo apath)
+ (if rid (rmt:general-open-connection sinfo apath dbname))
+ ;; (if (not (member cmd '(log-to-main)))
+ ;; (debug:print-info 0 *default-log-port* "rmt:send-receive "cmd" params="params))
+ (let* ((cdat (rmt:get-conn sinfo apath dbname)))
+ (assert cdat "FATAL: rmt:send-receive-real called without the needed channels opened")
+ (let* ((uconn (servdat-uconn sinfo)) ;; get the interface to ulex
+ ;; then send-receive using the ulex layer to host-port stored in cdat
+ (res (send-receive uconn (conndat-hostport cdat) cmd params)))
+ (conndat-expires-set! cdat (+ (current-seconds)
+ (server:expiration-timeout)
+ -2)) ;; two second margin for network time misalignments etc.
+ res))))
+;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed
+;; sometime in the future.
+;; Purpose - call the main.db server and request a server be started
+;; for the given area path and dbname
+(define (rmt:print-db-stats)
+ (let ((fmtstr "~40a~7-d~9-d~20,2-f")) ;; "~20,2-f"
+ (debug:print 18 *default-log-port* "DB Stats, "(seconds->year-week/day-time (current-seconds))"\n=====================")
+ (debug:print 18 *default-log-port* (format #f "~40a~8a~10a~10a" "Cmd" "Count" "TotTime" "Avg"))
+ (for-each (lambda (cmd)
+ (let ((cmd-dat (hash-table-ref *db-stats* cmd)))
+ (debug:print 18 *default-log-port* (format #f fmtstr cmd (vector-ref cmd-dat 0) (vector-ref cmd-dat 1) (/ (vector-ref cmd-dat 1)(vector-ref cmd-dat 0))))))
+ (sort (hash-table-keys *db-stats*)
+ (lambda (a b)
+ (> (vector-ref (hash-table-ref *db-stats* a) 0)
+ (vector-ref (hash-table-ref *db-stats* b) 0)))))))
+(define (rmt:get-max-query-average run-id)
+ (mutex-lock! *db-stats-mutex*)
+ (let* ((runkey (conc "run-id=" run-id " "))
+ (cmds (filter (lambda (x)
+ (substring-index runkey x))
+ (hash-table-keys *db-stats*)))
+ (res (if (null? cmds)
+ (cons 'none 0)
+ (let loop ((cmd (car cmds))
+ (tal (cdr cmds))
+ (max-cmd (car cmds))
+ (res 0))
+ (let* ((cmd-dat (hash-table-ref *db-stats* cmd))
+ (tot (vector-ref cmd-dat 0))
+ (curravg (/ (vector-ref cmd-dat 1) (vector-ref cmd-dat 0))) ;; count is never zero by construction
+ (currmax (max res curravg))
+ (newmax-cmd (if (> curravg res) cmd max-cmd)))
+ (if (null? tal)
+ (if (> tot 10)
+ (cons newmax-cmd currmax)
+ (cons 'none 0))
+ (loop (car tal)(cdr tal) newmax-cmd currmax)))))))
+ (mutex-unlock! *db-stats-mutex*)
+ res))
+;; host and port are used to ensure we are remove proper records
+(define (rmt:server-shutdown host port)
+ (let ((dbfile (servdat-dbfile *db-serv-info*)))
+ (debug:print-info 0 *default-log-port* "dbfile is "dbfile)
+ (if dbfile
+ (let* ((am-server (args:get-arg "-server"))
+ (dbfile (args:get-arg "-db"))
+ (apath *toppath*)
+ #;(sinfo *remotedat*)) ;; foundation for future fix
+ (if *dbstruct-db*
+ (let* ((dbdat (db:get-dbdat *dbstruct-db* apath dbfile))
+ (db (dbr:dbdat-db dbdat))
+ (inmem (dbr:dbdat-db dbdat)) ;; WRONG
+ )
+ ;; do a final sync here
+ (debug:print-info 0 *default-log-port* "Doing final sync for "apath" "dbfile" at "(current-seconds))
+ (db:sync-inmem->disk *dbstruct-db* apath dbfile force-sync: #t)
+ ;; let's finalize here
+ (debug:print-info 0 *default-log-port* "Finalizing db and inmem")
+ (if (sqlite3:database? db)
+ (sqlite3:finalize! db)
+ (debug:print-info 0 *default-log-port* "in rmt:server-shutdown, db is not a database, not finalizing..."))
+ (if (sqlite3:database? inmem)
+ (sqlite3:finalize! inmem)
+ (debug:print-info 0 *default-log-port* "in rmt:server-shutdown, inmem is not a database, not finalizing..."))
+ (debug:print-info 0 *default-log-port* "Finalizing db and inmem complete"))
+ (debug:print-info 0 *default-log-port* "Db was never opened, no cleanup to do."))
+ (if (not am-server)
+ (debug:print-info 0 *default-log-port* "I am not a server, should NOT get here!")
+ (if (string-match ".*/main.db$" dbfile)
+ (let ((pkt-file (conc (get-pkts-dir *toppath*)
+ "/" (servdat-uuid *db-serv-info*)
+ ".pkt")))
+ (debug:print-info 0 *default-log-port* "removing pkt "pkt-file)
+ (delete-file* pkt-file)
+ (debug:print-info 0 *default-log-port* "Releasing lock (if any) for "dbfile ", host "host", port "port)
+ (db:with-lock-db
+ (servdat-dbfile *db-serv-info*)
+ (lambda (dbh dbfile)
+ (db:release-lock dbh dbfile host port)))) ;; I'm not the server - should not have a lock to remove
+ (let* ((sdat *db-serv-info*) ;; we have a run-id server
+ (host (servdat-host sdat))
+ (port (servdat-port sdat))
+ (uuid (servdat-uuid sdat))
+ (res (rmt:deregister-server *db-serv-info* *toppath* host port uuid dbfile)))
+ (debug:print-info 0 *default-log-port* "deregistered-server, res="res)
+ (debug:print-info 0 *default-log-port* "deregistering server "host":"port" with uuid "uuid)
+ )))))))
+(define (common:run-sync?)
+ ;; (and (common:on-homehost?)
+ (args:get-arg "-server"))
+(define *rmt:run-mutex* (make-mutex))
+(define *rmt:run-flag* #f)
+(define (listener-running?)
+ (and *db-serv-info*
+ (servdat-uconn *db-serv-info*)))
+;; Main entry point to start a server. was start-server
+(define (rmt:run hostn)
+ (mutex-lock! *rmt:run-mutex*)
+ (if *rmt:run-flag*
+ (begin
+ (debug:print-warn 0 *default-log-port* "rmt:run already running.")
+ (mutex-unlock! *rmt:run-mutex*))
+ (begin
+ (set! *rmt:run-flag* #t)
+ (mutex-unlock! *rmt:run-mutex*)
+ ;; ;; Configurations for server
+ ;; (tcp-buffer-size 2048)
+ ;; (max-connections 2048)
+ (debug:print 2 *default-log-port* "PID: "(current-process-id)". Attempting to start the server ...")
+ (if (listener-running?)
+ (let* ((uconn (servdat-uconn *db-serv-info*)))
+ (wait-and-close uconn))
+ (let* ((port (portlogger:open-run-close portlogger:find-port))
+ (handler-proc (lambda (rem-host-port qrykey cmd params) ;;
+ (set! *db-last-access* (current-seconds))
+ (assert (list? params) "FATAL: handler called with non-list params")
+ (assert (args:get-arg "-server") "FATAL: handler called on non-server side. cmd="cmd", params="params)
+ (debug:print 0 *default-log-port* "handler call: "cmd", params="params)
+ (api:execute-requests *dbstruct-db* cmd params))))
+ ;; (api:process-request *dbstuct-db*
+ (if (not *db-serv-info*)
+ (set! *db-serv-info* (make-servdat host: hostn port: port)))
+ (let* ((uconn (run-listener handler-proc port))
+ (rport (udat-port uconn))) ;; the real port
+ (servdat-host-set! *db-serv-info* hostn)
+ (servdat-port-set! *db-serv-info* rport)
+ (servdat-uconn-set! *db-serv-info* uconn)
+ (wait-and-close uconn)
+ (db:print-current-query-stats)
+ )))
+ (let* ((host (servdat-host *db-serv-info*))
+ (port (servdat-port *db-serv-info*))
+ (mode (or (servdat-mode *db-serv-info*)
+ "non-db")))
+ ;; server exit stuff here
+ ;; (rmt:server-shutdown host port) - always do in on-exit
+ ;; (portlogger:open-run-close portlogger:set-port port "released") ;; moved to on-exit
+ (debug:print-info 0 *default-log-port* "Server "host":"port" mode "mode"shutdown complete. Exiting")
+ ))))
+;; S E R V E R U T I L I T I E S
+;; only use for main.db - need to re-write some of this :(
+(define (get-lock-db sdat dbfile host port)
+ (assert host "FATAL: get-lock-db called with host not set.")
+ (assert port "FATAL: get-lock-db called with port not set.")
+ (let* ((dbh (db:open-run-db dbfile db:initialize-db)) ;; open-run-db creates a standard db with schema used by all situations
+ (res (db:get-iam-server-lock dbh dbfile host port))
+ (uconn (servdat-uconn sdat)))
+ ;; res => list then already locked, check server is responsive
+ ;; => #t then sucessfully got the lock
+ ;; => #f reserved for future use as to indicate something went wrong
+ (match res
+ ((owner_pid owner_host owner_port event_time)
+ (if (server-ready? uconn (conc owner_host":"owner_port) "abc")
+ #f ;; locked by someone else
+ (begin ;; locked by someone dead and gone
+ (debug:print 0 *default-log-port* "WARNING: stale lock - have to steal it. This may fail.")
+ (db:steal-lock-db dbh dbfile port))))
+ (#t #t) ;; placeholder so that we don't touch res if it is #t
+ (else (set! res #f)))
+ (sqlite3:finalize! dbh)
+ res))
+(define (register-server pkts-dir pkt-spec host port servkey ipaddr dbpath)
+ (let* ((pkt-dat `((host . ,host)
+ (port . ,port)
+ (servkey . ,servkey)
+ (pid . ,(current-process-id))
+ (ipaddr . ,ipaddr)
+ (dbpath . ,dbpath)))
+ (uuid (write-alist->pkt
+ pkts-dir
+ pkt-dat
+ pktspec: pkt-spec
+ ptype: 'server)))
+ (debug:print 0 *default-log-port* "Server on "host":"port" registered in pkt "uuid)
+ uuid))
+(define (get-pkts-dir #!optional (apath #f))
+ (let* ((effective-toppath (or *toppath* apath)))
+ (assert effective-toppath
+ "ERROR: get-pkts-dir called without *toppath* set. Exiting.")
+ (let* ((pdir (conc effective-toppath "/.meta/srvpkts")))
+ (if (file-exists? pdir)
+ pdir
+ (begin
+ (handle-exceptions ;; this exception handler should NOT be needed but ...
+ exn
+ pdir
+ (create-directory pdir #t))
+ pdir)))))
+;; given a pkts dir read
+(define (get-all-server-pkts pktsdir-in pktspec)
+ (let* ((pktsdir (if (file-exists? pktsdir-in)
+ pktsdir-in
+ (begin
+ (create-directory pktsdir-in #t)
+ pktsdir-in)))
+ (all-pkt-files (glob (conc pktsdir "/*.pkt"))))
+ (map (lambda (pkt-file)
+ (read-pkt->alist pkt-file pktspec: pktspec))
+ all-pkt-files)))
+(define (server-address srv-pkt)
+ (conc (alist-ref 'host srv-pkt) ":"
+ (alist-ref 'port srv-pkt)))
+(define (server-ready? uconn host-port key) ;; server-address is host:port
+ (let* ((params `((cmd . ping)(key . ,key)))
+ (data `((cmd . ping)
+ (key . ,key)
+ (params . ,params))) ;; I don't get it.
+ (res (send-receive uconn host-port 'ping data)))
+ (if (eq? res 'ack) ;; yep, likely it is who we want on the other end
+ res
+ #f)))
+;; (begin (debug:print-info 0 *default-log-port* "server-ready? => "res) #f))))
+; from the pkts return servers associated with dbpath
+;; NOTE: Only one can be alive - have to check on each
+;; in the list of pkts returned
+(define (get-viable-servers serv-pkts dbpath)
+ (let loop ((tail serv-pkts)
+ (res '()))
+ (if (null? tail)
+ res ;; NOTE: sort by age so oldest is considered first
+ (let* ((spkt (car tail)))
+ (loop (cdr tail)
+ (if (equal? dbpath (alist-ref 'dbpath spkt))
+ (cons spkt res)
+ res))))))
+(define (remove-pkts-if-not-alive uconn serv-pkts)
+ (filter (lambda (pkt)
+ (let* ((host (alist-ref 'host pkt))
+ (port (alist-ref 'port pkt))
+ (host-port (conc host":"port))
+ (key (alist-ref 'servkey pkt))
+ (pktz (alist-ref 'Z pkt))
+ (res (server-ready? uconn host-port key)))
+ (if res
+ res
+ (let* ((pktsdir (get-pkts-dir *toppath*))
+ (pktpath (conc pktsdir"/"pktz".pkt")))
+ (debug:print 0 *default-log-port* "WARNING: pkt with no server "pktpath)
+ (delete-file* pktpath)
+ #f))))
+ serv-pkts))
+;; from viable servers get one that is alive and ready
+(define (get-the-server uconn apath serv-pkts)
+ (let loop ((tail serv-pkts))
+ (if (null? tail)
+ #f
+ (let* ((spkt (car tail))
+ (host (alist-ref 'ipaddr spkt))
+ (port (alist-ref 'port spkt))
+ (host-port (conc host":"port))
+ (dbpth (alist-ref 'dbpath spkt))
+ (srvkey (alist-ref 'Z spkt)) ;; (alist-ref 'srvkey spkt))
+ (addr (server-address spkt)))
+ (if (server-ready? uconn host-port srvkey)
+ spkt
+ (loop (cdr tail)))))))
+;; am I the "first" in line server? I.e. my D card is smallest
+;; use Z card as tie breaker
+(define (get-best-candidate serv-pkts dbpath)
+ (if (null? serv-pkts)
+ #f
+ (let loop ((tail serv-pkts)
+ (best (car serv-pkts)))
+ (if (null? tail)
+ best
+ (let* ((candidate (car tail))
+ (candidate-bd (string->number (alist-ref 'D candidate)))
+ (best-bd (string->number (alist-ref 'D best)))
+ ;; bigger number is younger
+ (candidate-z (alist-ref 'Z candidate))
+ (best-z (alist-ref 'Z best))
+ (new-best (cond
+ ((> best-bd candidate-bd) ;; best is younger than candidate
+ candidate)
+ ((< best-bd candidate-bd) ;; candidate is younger than best
+ best)
+ (else
+ (if (string>=? best-z candidate-z)
+ best
+ candidate))))) ;; use Z card as tie breaker
+ (if (null? tail)
+ new-best
+ (loop (cdr tail) new-best)))))))
+;; if .db/main.db check the pkts
+(define (rmt:wait-for-server pkts-dir db-file server-key)
+ (let* ((sdat *db-serv-info*))
+ (let loop ((start-time (current-seconds))
+ (changed #t)
+ (last-sdat "not this"))
+ (begin ;; let ((sdat #f))
+ (thread-sleep! 0.01)
+ (debug:print-info 0 *default-log-port* "Waiting for server alive signature")
+ (mutex-lock! *heartbeat-mutex*)
+ (set! sdat *db-serv-info*)
+ (mutex-unlock! *heartbeat-mutex*)
+ (if (and sdat
+ (not changed)
+ (> (- (current-seconds) start-time) 2))
+ (let* ((uconn (servdat-uconn sdat)))
+ (servdat-status-set! sdat 'iface-stable)
+ (debug:print-info 0 *default-log-port* "Received server alive signature, now attempting to lock in server")
+ ;; create a server pkt in *toppath*/.meta/srvpkts
+ ;; TODO:
+ ;; 1. change sdat to stuct
+ ;; 2. add uuid to struct
+ ;; 3. update uuid in sdat here
+ ;;
+ (servdat-uuid-set! sdat
+ (register-server
+ pkts-dir *srvpktspec*
+ (get-host-name)
+ (servdat-port sdat) server-key
+ (servdat-host sdat) db-file))
+ ;; (set! *my-signature* (servdat-uuid sdat)) ;; replace with Z, no, stick with proper key
+ ;; now read pkts and see if we are a contender
+ (let* ((all-pkts (get-all-server-pkts pkts-dir *srvpktspec*))
+ (viables (get-viable-servers all-pkts db-file))
+ (alive (remove-pkts-if-not-alive uconn viables))
+ (best-srv (get-best-candidate alive db-file))
+ (best-srv-key (if best-srv (alist-ref 'servkey best-srv) #f))
+ (i-am-srv (equal? best-srv-key server-key))
+ (delete-pkt (lambda ()
+ (let* ((pktfile (conc (get-pkts-dir *toppath*)
+ "/" (servdat-uuid *db-serv-info*)
+ ".pkt")))
+ (debug:print-info 0 *default-log-port* "Attempting to remove bogus pkt file "pktfile)
+ (delete-file* pktfile))))) ;; remove immediately instead of waiting for on-exit
+ (debug:print 0 *default-log-port* "best-srv-key: "best-srv-key", server-key: "server-key", i-am-srv: "i-am-srv)
+ ;; am I the best-srv, compare server-keys to know
+ (if i-am-srv
+ (if (get-lock-db sdat db-file (servdat-host sdat)(servdat-port sdat)) ;; (db:get-iam-server-lock *dbstruct-db* *toppath* run-id)
+ (begin
+ (debug:print-info 0 *default-log-port* "I'm the server!")
+ (servdat-dbfile-set! sdat db-file)
+ (servdat-status-set! sdat 'db-locked))
+ (begin
+ (debug:print-info 0 *default-log-port* "I'm not the server, exiting.")
+ (bdat-time-to-exit-set! *bdat* #t)
+ (delete-pkt)
+ (thread-sleep! 0.2)
+ (exit)))
+ (begin
+ (debug:print-info 0 *default-log-port*
+ "Keys do not match "best-srv-key", "server-key", exiting.")
+ (bdat-time-to-exit-set! *bdat* #t)
+ (delete-pkt)
+ (thread-sleep! 0.2)
+ (exit)))
+ sdat))
+ (begin ;; sdat not yet contains server info
+ (debug:print-info 0 *default-log-port* "Still waiting, last-sdat=" last-sdat)
+ (sleep 4)
+ (if (> (- (current-seconds) start-time) 120) ;; been waiting for two minutes
+ (begin
+ (debug:print-error 0 *default-log-port* "transport appears to have died, exiting server")
+ (exit))
+ (loop start-time
+ (equal? sdat last-sdat)
+ sdat))))))))
+(define (rmt:register-server sinfo apath iface port server-key dbname)
+ (servdat-conns sinfo) ;; just checking types
+ (rmt:open-main-connection sinfo apath) ;; we need a channel to main.db
+ (rmt:send-receive sinfo apath ;; params: host port servkey pid ipaddr dbpath
+ (db:run-id->dbname #f)
+ 'register-server `(,iface
+ ,port
+ ,server-key
+ ,(current-process-id)
+ ,iface
+ ,apath
+ ,dbname)))
+(define (rmt:get-count-servers sinfo apath)
+ (servdat-conns sinfo) ;; just checking types
+ (rmt:open-main-connection sinfo apath) ;; we need a channel to main.db
+ (rmt:send-receive sinfo apath ;; params: host port servkey pid ipaddr dbpath
+ (db:run-id->dbname #f)
+ 'get-count-servers `(,apath)))
+(define (rmt:get-servers-info apath)
+ (rmt:send-receive 'get-servers-info #f `(,apath)))
+(define (rmt:deregister-server db-serv-info apath iface port server-key dbname)
+ (rmt:open-main-connection db-serv-info apath) ;; we need a channel to main.db
+ (rmt:send-receive db-serv-info apath ;; params: host port servkey pid ipaddr dbpath
+ (db:run-id->dbname #f)
+ 'deregister-server `(,iface
+ ,port
+ ,server-key
+ ,(current-process-id)
+ ,iface
+ ,apath
+ ,dbname)))
+(define (rmt:wait-for-stable-interface #!optional (num-tries-allowed 100))
+ ;; wait until *db-serv-info* stops changing
+ (let* ((stime (current-seconds)))
+ (let loop ((last-host #f)
+ (last-port #f)
+ (tries 0))
+ (let* ((curr-host (and *db-serv-info* (servdat-host *db-serv-info*)))
+ (curr-port (and *db-serv-info* (servdat-port *db-serv-info*))))
+ ;; first we verify port and interface, update *db-serv-info* in need be.
+ (cond
+ ((> tries num-tries-allowed)
+ (debug:print 0 *default-log-port* "rmt:keep-running, giving up after trying for several minutes.")
+ (exit 1))
+ ((not *db-serv-info*)
+ (thread-sleep! 0.25)
+ (loop curr-host curr-port (+ tries 1)))
+ ((or (not last-host)(not last-port))
+ (debug:print 0 *default-log-port* "rmt:keep-running, still no interface, tries="tries)
+ (thread-sleep! 0.25)
+ (loop curr-host curr-port (+ tries 1)))
+ ((or (not (equal? last-host curr-host))
+ (not (equal? last-port curr-port)))
+ (debug:print-info 0 *default-log-port* "WARNING: interface changed, refreshing iface and port info")
+ (thread-sleep! 0.25)
+ (loop curr-host curr-port (+ tries 1)))
+ ((< (- (current-seconds) stime) 1) ;; keep up the looping until at least 3 seconds have passed
+ (thread-sleep! 0.5)
+ (loop curr-host curr-port (+ tries 1)))
+ (else
+ (rmt:get-signature) ;; sets *my-signature* as side effect
+ (servdat-status-set! *db-serv-info* 'interface-stable)
+ (debug:print 0 *default-log-port*
+ "SERVER STARTED: " curr-host
+ ":" curr-port
+ " AT " (current-seconds) " server signature: " *my-signature*
+ " with "(servdat-trynum *db-serv-info*)" port changes")
+ (flush-output *default-log-port*)
+ #t))))))
+;; run rmt:keep-running in a parallel thread to monitor that the db is being
+;; used and to shutdown after sometime if it is not.
+(define (rmt:keep-running dbname)
+ ;; if none running or if > 20 seconds since
+ ;; server last used then start shutdown
+ ;; This thread waits for the server to come alive
+ (debug:print-info 0 *default-log-port* "Starting the sync-back, keep alive thread in server")
+ (let* ((sinfo *db-serv-info*)
+ (server-start-time (current-seconds))
+ (pkts-dir (get-pkts-dir))
+ (server-key (rmt:get-signature)) ;; This servers key
+ (is-main (equal? (args:get-arg "-db") ".db/main.db"))
+ (last-access 0)
+ (server-timeout (server:expiration-timeout))
+ (shutdown-server-sequence (lambda (host port)
+ (set! *unclean-shutdown* #f) ;; Should not be needed anymore
+ (debug:print-info 0 *default-log-port* "Starting to shutdown the server. pid="(current-process-id))
+ ;; (rmt:server-shutdown host port) -- called in on-exit
+ ;; (portlogger:open-run-close portlogger:set-port port "released") called in on-exit
+ (exit)))
+ (timed-out? (lambda ()
+ (<= (+ last-access server-timeout)
+ (current-seconds)))))
+ (servdat-dbfile-set! *db-serv-info* (args:get-arg "-db"))
+ ;; main and run db servers have both got wait logic (could/should merge it)
+ (if is-main
+ (rmt:wait-for-server pkts-dir dbname server-key)
+ (rmt:wait-for-stable-interface))
+ ;; this is our forever loop
+ (let* ((iface (servdat-host *db-serv-info*))
+ (port (servdat-port *db-serv-info*))
+ (uconn (servdat-uconn *db-serv-info*)))
+ (let loop ((count 0)
+ (bad-sync-count 0)
+ (start-time (current-milliseconds)))
+ (if (and (not is-main)
+ (common:low-noise-print 60 "servdat-status"))
+ (debug:print-info 0 *default-log-port* "servdat-status is " (servdat-status *db-serv-info*)))
+ (mutex-lock! *heartbeat-mutex*)
+ ;; set up the database handle
+ (if (not *dbstruct-db*) ;; no db opened yet, open the db and register with main if appropriate
+ (let ((watchdog (bdat-watchdog *bdat*)))
+ (debug:print 0 *default-log-port* "SERVER: dbprep")
+ (db:setup dbname) ;; sets *dbstruct-db* as side effect
+ (servdat-status-set! *db-serv-info* 'db-opened)
+ ;; IFF I'm not main, call into main and register self
+ (if (not is-main)
+ (let ((res (rmt:register-server sinfo
+ *toppath* iface port
+ server-key dbname)))
+ (if res ;; we are the server
+ (servdat-status-set! *db-serv-info* 'have-interface-and-db)
+ ;; now check that the db locker is alive, clear it out if not
+ (let* ((serv-info (rmt:server-info *toppath* dbname)))
+ (match serv-info
+ ((host port servkey pid ipaddr apath dbpath)
+ (if (not (server-ready? uconn (conc host":"port) servkey))
+ (begin
+ (debug:print-info 0 *default-log-port* "Server registered but not alive. Removing and trying again.")
+ (rmt:deregister-server sinfo apath host port servkey dbpath) ;; servkey pid ipaddr apath dbpath)
+ (loop (+ count 1) bad-sync-count start-time))))
+ (else
+ (debug:print 0 *default-log-port* "We are not the server for "dbname", exiting. Server info is: "serv-info)
+ (exit)))))))
+ (debug:print 0 *default-log-port*
+ "SERVER: running, db "dbname" opened, megatest version: "
+ (common:get-full-version))
+ ))
+ (db:sync-inmem->disk *dbstruct-db* *toppath* dbname force-sync: #t)
+ (mutex-unlock! *heartbeat-mutex*)
+ ;; when things go wrong we don't want to be doing the various
+ ;; queries too often so we strive to run this stuff only every
+ ;; four seconds or so.
+ (let* ((sync-time (- (current-milliseconds) start-time))
+ (rem-time (quotient (- 4000 sync-time) 1000)))
+ (if (and (<= rem-time 4)
+ (> rem-time 0))
+ (thread-sleep! rem-time)))
+ ;; Transfer *db-last-access* to last-access to use in checking that we are still alive
+ (set! last-access *db-last-access*)
+ (if (< count 1) ;; 3x3 = 9 secs aprox
+ (loop (+ count 1) bad-sync-count (current-milliseconds)))
+ (if (common:low-noise-print 60 "dbstats")
+ (begin
+ (debug:print 0 *default-log-port* "Server stats:")
+ (db:print-current-query-stats)))
+ (let* ((hrs-since-start (/ (- (current-seconds) server-start-time) 3600)))
+ (cond
+ ((not *server-run*)
+ (debug:print-info 0 *default-log-port* "*server-run* set to #f. Shutting down.")
+ (shutdown-server-sequence (get-host-name) port))
+ ((timed-out?)
+ (debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access))
+ (shutdown-server-sequence (get-host-name) port))
+ ((and *server-run*
+ (or (not (timed-out?))
+ (if is-main ;; do not exit if there are other servers (keep main open until all others gone)
+ (> (rmt:get-count-servers sinfo *toppath*) 1)
+ #f)))
+ (if (common:low-noise-print 120 "server continuing")
+ (debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access)))
+ (loop 0 bad-sync-count (current-milliseconds)))
+ (else
+ (set! *unclean-shutdown* #f)
+ (debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access))
+ (shutdown-server-sequence (get-host-name) port)
+ #;(debug:print-info 0 *default-log-port* "Sending 'quit to server, received: "
+ (open-send-receive-nn (conc iface":"port) ;; do this here and not in server-shutdown
+ (sexpr->string 'quit))))))))))
+(define (rmt:get-reasonable-hostname)
+ (let* ((inhost (or (args:get-arg "-server") "-")))
+ (if (equal? inhost "-")
+ (get-host-name)
+ inhost)))
+;; Call this to start the actual server
+;; all routes though here end in exit ...
+;; This is the point at which servers are started
+(define (rmt:server-launch dbname)
+ (debug:print-info 0 *default-log-port* "Entered rmt:server-launch")
+ (let* ((th2 (make-thread (lambda ()
+ (debug:print-info 0 *default-log-port* "Server run thread started")
+ (rmt:run (rmt:get-reasonable-hostname)))
+ "Server run"))
+ (th3 (make-thread (lambda ()
+ (debug:print-info 0 *default-log-port* "Server monitor thread started")
+ (if (args:get-arg "-server")
+ (rmt:keep-running dbname)
+ #;(rmt:wait-for-stable-interface)))
+ "Keep running")))
+ (thread-start! th2)
+ (thread-sleep! 0.252) ;; give the server time to settle before starting the keep-running monitor.
+ (thread-start! th3)
+ (set! *didsomething* #t)
+ (thread-join! th2)
+ (thread-join! th3))
+ #f)
+;; S E R V E R - D I R E C T C A L L S
+(define (rmt:kill-server run-id)
+ (rmt:send-receive 'kill-server #f (list run-id)))
+(define (rmt:start-server run-id)
+ (rmt:send-receive 'start-server #f (list run-id)))
+(define (rmt:server-info apath dbname)
+ (rmt:send-receive 'get-server-info #f (list apath dbname)))
Index: ulex-simple/ulex.scm
--- ulex-simple/ulex.scm
+++ ulex-simple/ulex.scm
@@ -24,11 +24,12 @@
;; Why sql-de-lite and not say, dbi? - performance mostly, then simplicity.
(module ulex
- (
+ *
+ #;(
;; NOTE: looking for the handler proc - find the run-listener :)
run-listener ;; (run-listener handler-proc [port]) => uconn
@@ -50,34 +51,42 @@
;; for testing only
;; pp-uconn
+ ;; parameters
+ work-method ;; parameter; 'threads, 'mailbox, 'limited, 'direct
+ return-method ;; parameter; 'mailbox, 'polling, 'direct
(import scheme
+ chicken.tcp
;; queues
+ simple-exceptions
- tcp6
+ ;; tcp6
+ tcp-server
;; udat struct, used by both caller and callee
;; instantiated as uconn by convention
@@ -94,30 +103,15 @@
(work-proc #f) ;; set by user
(cnum 0) ;; cookie number
(mboxes (make-hash-table)) ;; for the replies
(avail-cmboxes '()) ;; list of ( . ) for re-use
;; threads
- (numthreads 50)
+ (numthreads 10)
(cmd-thread #f)
(work-queue-thread #f)
- )
-;; ;; struct for keeping track of others we are talking to
-;; ;;
-;; (defstruct pdat
-;; (host-port #f)
-;; (conns '()) ;; list of pcon structs, pop one off when calling the peer
-;; )
-;; ;; struct for peer connections, keep track of expiration etc.
-;; ;;
-;; (defstruct pcon
-;; (inp #f)
-;; (oup #f)
-;; (exp (+ (current-seconds) 59)) ;; expires at this time, set to (+ (current-seconds) 59)
-;; (lifetime (+ (current-seconds) 600)) ;; throw away and create new after five minutes
-;; )
+ (num-threads-running 0)
+ )
;; listener
@@ -156,29 +150,25 @@
(define (run-listener handler-proc #!optional (port-suggestion 4242))
(let* ((uconn (make-udat)))
(udat-work-proc-set! uconn handler-proc)
(if (setup-listener uconn port-suggestion)
- (let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop"))
- #;(th2 (make-thread (lambda ()(process-work-queue uconn)) "Ulex work queue processor")))
- (tcp-buffer-size 2048)
- ;; (max-connections 2048)
- (thread-start! th1)
- #;(thread-start! th2)
- (udat-cmd-thread-set! uconn th1)
- #;(udat-work-queue-thread-set! uconn th2)
- (print "cmd loop and process workers started")
- uconn)
+ ((make-tcp-server
+ (udat-socket uconn)
+ (lambda ()
+ (let* ((rdat (deserialize)) ;; '(my-host-port qrykey cmd params)
+ (resp (do-work uconn rdat)))
+ (serialize resp)))))
(assert #f "ERROR: run-listener called without proper setup."))))
(define (wait-and-close uconn)
(thread-join! (udat-cmd-thread uconn))
(tcp-close (udat-socket uconn)))
-;; peers and connections
+;; == << ;; peers and connections
+;; == << ;;======================================================================
(define *send-mutex* (make-mutex))
;; send structured data to recipient
@@ -185,127 +175,42 @@
;; NOTE: qrykey is what was called the "cookie" previously
;; retval tells send to expect and wait for return data (one line) and return it or time out
;; this is for ping where we don't want to necessarily have set up our own server yet.
-;; NOTE: see below for beginnings of code to allow re-use of tcp connections
-;; - I believe (without substantial evidence) that re-using connections will
-;; be beneficial ...
-(define (send udata host-port qrykey cmd params)
- (mutex-lock! *send-mutex*)
+(define (send-receive udata host-port cmd params)
(let* ((my-host-port (udat-host-port udata)) ;; remote will return to this
- (isme #f #;(equal? host-port my-host-port)) ;; calling myself?
+ (isme (equal? host-port my-host-port)) ;; calling myself?
;; dat is a self-contained work block that can be sent or handled locally
- (dat (list my-host-port qrykey cmd params)))
- (if isme
- (ulex-handler udata dat) ;; no transmission needed
- (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC?
- exn
- #f
+ (dat (list my-host-port 'qrykey cmd params #;(cons (current-seconds)(current-milliseconds)))))
+ (cond
+ (isme (do-work udata dat)) ;; no transmission needed
+ (else
+ (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC?
+ exn
+ (message exn)
+ (begin
+ ;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP
(let-values (((inp oup)(tcp-connect host-port)))
(let ((res (if (and inp oup)
(serialize dat oup)
- (deserialize inp)) ;; yes, we always want an ack
+ (close-output-port oup)
+ (deserialize inp))
(print "ERROR: send called but no receiver has been setup. Please call setup first!")
(close-input-port inp)
- (close-output-port oup)
- (mutex-unlock! *send-mutex*)
- res)))))) ;; res will always be 'ack
-;; send a request to the given host-port and register a mailbox in udata
-;; wait for the mailbox data and return it
-(define (send-receive uconn host-port cmd data)
- (cond
- ((member cmd '(ping goodbye)) ;; these are immediate
- (send uconn host-port 'ping cmd data))
- (else
- (let* ((cmbox (get-cmbox uconn)) ;; would it be better to keep a stack of mboxes to reuse?
- (qrykey (car cmbox))
- (mbox (cdr cmbox))
- (mbox-time (current-milliseconds))
- (sres (send uconn host-port qrykey cmd data))) ;; short res
- sres))))
-;; responder side
-;; take a request, rdat, and if not immediate put it in the work queue
-;; Reserved cmds; ack ping goodbye response
-(define (ulex-handler uconn rdat)
- (assert (list? rdat) "FATAL: ulex-handler give rdat as not list")
- (match rdat ;; (string-split controldat)
- ((rem-host-port qrykey cmd params)
- ;; (print "ulex-handler got: "rem-host-port" qrykey: "qrykey" cmd: "cmd" params: "params)
- (let ((mbox (hash-table-ref/default (udat-mboxes uconn) qrykey #f)))
- (case cmd
- ;; ((ack )(print "Got ack! But why? Should NOT get here.") 'ack)
- ((ping)
- ;; (print "Got Ping!")
- ;; (add-to-work-queue uconn rdat)
- 'ack)
- (else
- (do-work uconn rdat)))))
- (else
- (print "BAD DATA? controldat=" rdat)
- 'ack) ;; send ack anyway?
- ))
-;; given an already set up uconn start the cmd-loop
-(define (ulex-cmd-loop uconn)
- (let* ((serv-listener (udat-socket uconn)))
- (let loop ((state 'start))
- (let-values (((inp oup)(tcp-accept serv-listener)))
- (let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params)
- (resp (ulex-handler uconn rdat)))
- (if resp (serialize resp oup))
- (close-input-port inp)
- (close-output-port oup))
- (loop state)))))
-;;(define (ulex-cmd-loop uconn)
-;; (let* ((serv-listener (udat-socket uconn))
-;; ;; (old-listener (lambda ()
-;; ;; (let loop ((state 'start))
-;; ;; (let-values (((inp oup)(tcp-accept serv-listener)))
-;; ;; (let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params)
-;; ;; (resp (ulex-handler uconn rdat)))
-;; ;; (if resp (serialize resp oup))
-;; ;; (close-input-port inp)
-;; ;; (close-output-port oup))
-;; ;; (loop state)))))
-;; (server (make-tcp-server
-;; serv-listener
-;; (lambda ()
-;; (let* ((rdat (deserialize )) ;; '(my-host-port qrykey cmd params)
-;; (resp (ulex-handler uconn rdat)))
-;; (if resp (serialize resp) resp))))))
-;; (server)))
-;; add a proc to the cmd list, these are done symetrically (i.e. in all instances)
-;; so that the proc can be dereferenced remotely
-(define (set-work-handler uconn proc)
- (udat-work-proc-set! uconn proc))
+ ;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP
+ res)))))))) ;; res will always be 'ack unless return-method is direct
;; work queues - this is all happening on the listener side
-;; rdat is (rem-host-port qrykey cmd params)
-(define (add-to-work-queue uconn rdat)
- #;(queue-add! (udat-work-queue uconn) rdat)
- (mailbox-send! (udat-work-queue uconn) rdat))
+;; move the logic to return the result somewhere else?
(define (do-work uconn rdat)
(let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change
;; put this following into a do-work procedure
(match rdat
((rem-host-port qrykey cmd params)
@@ -313,92 +218,16 @@
(result (proc rem-host-port qrykey cmd params))
(end-time (current-milliseconds))
(run-time (- end-time start-time)))
- (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params")
- #f))))
-(define (process-work-queue uconn)
- (let ((wqueue (udat-work-queue uconn))
- (proc (udat-work-proc uconn))
- (numthr (udat-numthreads uconn)))
- (let loop ((thnum 1)
- (threads '()))
- (let ((thlst (cons (make-thread (lambda ()
- (let work-loop ()
- (let ((rdat (mailbox-receive! wqueue 24000 'MBOX_TIMEOUT)))
- (do-work uconn rdat))
- (work-loop)))
- (conc "work thread " thnum))
- threads)))
- (if (< thnum numthr)
- (loop (+ thnum 1)
- thlst)
- (begin
- (print "ULEX: Starting "(length thlst)" worker threads.")
- (map thread-start! thlst)
- (print "ULEX: Threads started. Joining all.")
- (map thread-join! thlst)))))))
-;; below was to enable re-use of connections. This seems non-trivial so for
-;; now lets open on each call
-;; ;; given host-port get or create peer struct
-;; ;;
-;; (define (udat-get-peer uconn host-port)
-;; (or (hash-table-ref/default (udat-peers uconn) host-port #f)
-;; ;; no peer, so create pdat and init it
-;; ;; NEED stack of connections, pop and use; inp, oup,
-;; ;; creation_time (remove and create new if over 24hrs old
-;; ;;
-;; (let ((pdat (make-pdat host-port: host-port)))
-;; (hash-table-set! (udat-peers uconn) host-port pdat)
-;; pdat)))
-;; ;; is pcon alive
-;; ;; given host-port and pdat get a pcon
-;; ;;
-;; (define (pdat-get-pcon pdat host-port)
-;; (let loop ((conns (pdat-conns pdat)))
-;; (if (null? conns) ;; none? make and return - do NOT add - it will be pushed back on list later
-;; (init-pcon (make-pcon))
-;; (let* ((conn (pop conns)))
-;; ;; given host-port get a pcon struct
-;; ;;
-;; (define (udat-get-pcon
+ (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params")))))
;; misc utils
-(define (make-cookie uconn)
- (let ((newcnum (+ (udat-cnum uconn) 1)))
- (udat-cnum-set! uconn newcnum)
- (conc (udat-host-port uconn) ":"
- newcnum)))
-;; cookie/mboxes
-;; we store each mbox with a cookie ( . )
-(define (get-cmbox uconn)
- (if (null? (udat-avail-cmboxes uconn))
- (let ((cookie (make-cookie uconn))
- (mbox (make-mailbox)))
- (hash-table-set! (udat-mboxes uconn) cookie mbox)
- `(,cookie . ,mbox))
- (let ((cmbox (car (udat-avail-cmboxes uconn))))
- (udat-avail-cmboxes-set! uconn (cdr (udat-avail-cmboxes uconn)))
- cmbox)))
-(define (put-cmbox uconn cmbox)
- (udat-avail-cmboxes-set! uconn (cons cmbox (udat-avail-cmboxes uconn))))
(define (pp-uconn uconn)
(pp (udat->alist uconn)))
DELETED ulex.scm
Index: ulex.scm
--- ulex.scm
+++ /dev/null
@@ -1,24 +0,0 @@
-;; Copyright 2019, Matthew Welland.
-;; This file is part of Megatest.
-;; Megatest is free software: you can redistribute it and/or modify
-;; it under the terms of the GNU General Public License as published by
-;; the Free Software Foundation, either version 3 of the License, or
-;; (at your option) any later version.
-;; Megatest is distributed in the hope that it will be useful,
-;; but WITHOUT ANY WARRANTY; without even the implied warranty of
-;; GNU General Public License for more details.
-;; You should have received a copy of the GNU General Public License
-;; along with Megatest. If not, see .
-(declare (unit ulex))
-(include "ulex/ulex.scm")
-;; (include "ulex-simple/ulex.scm")
ADDED ulex.scm.template
Index: ulex.scm.template
--- /dev/null
+++ ulex.scm.template
@@ -0,0 +1,23 @@
+;; Copyright 2019, Matthew Welland.
+;; This file is part of Megatest.
+;; Megatest is free software: you can redistribute it and/or modify
+;; it under the terms of the GNU General Public License as published by
+;; the Free Software Foundation, either version 3 of the License, or
+;; (at your option) any later version.
+;; Megatest is distributed in the hope that it will be useful,
+;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;; GNU General Public License for more details.
+;; You should have received a copy of the GNU General Public License
+;; along with Megatest. If not, see .
+(declare (unit ulex))
+(include "ulex-FLAVOR/ulex.scm")
ADDED ulex/dbmgr.scm
Index: ulex/dbmgr.scm
--- /dev/null
+++ ulex/dbmgr.scm
@@ -0,0 +1,1131 @@
+;; Copyright 2022, Matthew Welland.
+;; This file is part of Megatest.
+;; Megatest is free software: you can redistribute it and/or modify
+;; it under the terms of the GNU General Public License as published by
+;; the Free Software Foundation, either version 3 of the License, or
+;; (at your option) any later version.
+;; Megatest is distributed in the hope that it will be useful,
+;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;; GNU General Public License for more details.
+;; You should have received a copy of the GNU General Public License
+;; along with Megatest. If not, see .
+(declare (unit dbmgrmod))
+(declare (uses ulex))
+(declare (uses apimod))
+(declare (uses pkts))
+(declare (uses commonmod))
+(declare (uses dbmod))
+(declare (uses mtargs))
+(declare (uses portloggermod))
+(declare (uses debugprint))
+(module dbmgrmod
+ *
+(import scheme
+ chicken.base
+ chicken.condition
+ chicken.file
+ chicken.format
+ chicken.port
+ chicken.process
+ chicken.process-context
+ chicken.process-context.posix
+ chicken.sort
+ chicken.string
+ chicken.time
+ (prefix sqlite3 sqlite3:)
+ matchable
+ md5
+ message-digest
+ regex
+ s11n
+ srfi-1
+ srfi-18
+ srfi-69
+ system-information
+ typed-records
+ pkts
+ ulex
+ commonmod
+ apimod
+ dbmod
+ debugprint
+ (prefix mtargs args:)
+ portloggermod
+ )
+;; Configurations for server
+;; (tcp-buffer-size 2048)
+;; (max-connections 2048)
+;; info about me as a listener and my connections to db servers
+;; stored (for now) in *db-serv-info*
+(defstruct servdat
+ (host #f)
+ (port #f)
+ (uuid #f)
+ (dbfile #f)
+ (uconn #f) ;; this is the listener *FOR THIS PROCESS*
+ (mode #f)
+ (status 'starting)
+ (trynum 0) ;; count the number of ports we've tried
+ (conns (make-hash-table)) ;; apath/dbname => conndat
+ )
+(define *db-serv-info* (make-servdat))
+(define (servdat->url sdat)
+ (conc (servdat-host sdat)":"(servdat-port sdat)))
+;; db servers contact info
+(defstruct conndat
+ (apath #f)
+ (dbname #f)
+ (fullname #f)
+ (hostport #f)
+ (ipaddr #f)
+ (port #f)
+ (srvpkt #f)
+ (srvkey #f)
+ (lastmsg 0)
+ (expires 0))
+(define *srvpktspec*
+ `((server (host . h)
+ (port . p)
+ (servkey . k)
+ (pid . i)
+ (ipaddr . a)
+ (dbpath . d))))
+;; S U P P O R T F U N C T I O N S
+;; set up the api proc, seems like there should be a better place for this?
+;; (define api-proc (make-parameter conc))
+;; (api-proc api:execute-requests)
+;; do we have a connection to apath dbname and
+;; is it not expired? then return it
+;; else setup a connection
+;; if that fails, return '(#f "some reason") ;; NB// convert to raising an exception
+(define (rmt:get-conn remdat apath dbname)
+ (let* ((fullname (db:dbname->path apath dbname)))
+ (hash-table-ref/default (servdat-conns remdat) fullname #f)))
+(define (rmt:drop-conn remdat apath dbname)
+ (let* ((fullname (db:dbname->path apath dbname)))
+ (hash-table-delete! (servdat-conns remdat) fullname)))
+(define (rmt:find-main-server uconn apath dbname)
+ (let* ((pktsdir (get-pkts-dir apath))
+ (all-srvpkts (get-all-server-pkts pktsdir *srvpktspec*))
+ (viable-srvs (get-viable-servers all-srvpkts dbname)))
+ (get-the-server uconn apath viable-srvs)))
+(define *connstart-mutex* (make-mutex))
+(define *last-main-start* 0)
+;; looks for a connection to main, returns if have and not exired
+;; creates new otherwise
+;; connections for other servers happens by requesting from main
+;; TODO: This is unnecessarily re-creating the record in the hash table
+(define (rmt:open-main-connection remdat apath)
+ (let* ((fullpath (db:dbname->path apath ".db/main.db"))
+ (conns (servdat-conns remdat))
+ (conn (rmt:get-conn remdat apath ".db/main.db")) ;; (hash-table-ref/default conns fullpath #f)) ;; TODO - create call for this
+ (start-rmt:run (lambda ()
+ (let* ((th1 (make-thread (lambda ()(rmt:run (get-host-name))) "non-db mode server")))
+ (thread-start! th1)
+ (thread-sleep! 1)
+ (let loop ((count 0))
+ (assert (< count 30) "FATAL: responder failed to initialize in rmt:open-main-connection")
+ (if (or (not *db-serv-info*)
+ (not (servdat-uconn *db-serv-info*)))
+ (begin
+ (thread-sleep! 1)
+ (loop (+ count 1)))
+ (begin
+ (servdat-mode-set! *db-serv-info* 'non-db)
+ (servdat-uconn *db-serv-info*)))))))
+ (myconn (servdat-uconn *db-serv-info*)))
+ (cond
+ ((not myconn)
+ (start-rmt:run)
+ (rmt:open-main-connection remdat apath))
+ ((and conn ;; conn is NOT a socket, just saying ...
+ (< (current-seconds) (conndat-expires conn)))
+ #t) ;; we are current and good to go - we'll deal elsewhere with a server that was killed or died
+ ((and conn
+ (>= (current-seconds)(conndat-expires conn)))
+ (debug:print-info 0 *default-log-port* "connection to "fullpath" server expired. Reconnecting.")
+ (rmt:drop-conn remdat apath ".db/main.db") ;;
+ (rmt:open-main-connection remdat apath))
+ (else
+ ;; Below we will find or create and connect to main
+ (debug:print-info 0 *default-log-port* "rmt:open-main-connection - starting from scratch")
+ (let* ((dbname (db:run-id->dbname #f))
+ (the-srv (rmt:find-main-server myconn apath dbname))
+ (start-main-srv (lambda () ;; call IF there is no the-srv found
+ (mutex-lock! *connstart-mutex*)
+ (if (> (- (current-seconds) *last-main-start*) 5) ;; at least four seconds since last attempt to start main server
+ (begin
+ (api:run-server-process apath dbname)
+ (set! *last-main-start* (current-seconds))
+ (thread-sleep! 1))
+ (thread-sleep! 0.25))
+ (mutex-unlock! *connstart-mutex*)
+ (rmt:open-main-connection remdat apath) ;; TODO: Add limit to number of tries
+ )))
+ (if (not the-srv) ;; have server, try connecting to it
+ (start-main-srv)
+ (let* ((srv-addr (server-address the-srv)) ;; need serv
+ (ipaddr (alist-ref 'ipaddr the-srv))
+ (port (alist-ref 'port the-srv))
+ (srvkey (alist-ref 'servkey the-srv))
+ (fullpath (db:dbname->path apath dbname))
+ (new-the-srv (make-conndat
+ apath: apath
+ dbname: dbname
+ fullname: fullpath
+ hostport: srv-addr
+ ;; socket: (open-nn-connection srv-addr) - TODO - open ulex connection?
+ ipaddr: ipaddr
+ port: port
+ srvpkt: the-srv
+ srvkey: srvkey ;; generated by rmt:get-signature on the server side
+ lastmsg: (current-seconds)
+ expires: (+ (current-seconds)
+ (server:expiration-timeout)
+ -2) ;; this needs to be gathered during the ping
+ )))
+ (hash-table-set! conns fullpath new-the-srv)))
+ #t)))))
+;; NB// sinfo is a servdat struct
+(define (rmt:general-open-connection sinfo apath dbname #!key (num-tries 5))
+ (assert (not (equal? dbname ".db/main.db")) "ERROR: general-open-connection should never be called with main as the db")
+ (let* ((mdbname ".db/main.db") ;; (db:run-id->dbname #f)) TODO: put this back to the lookup when stable
+ (fullname (db:dbname->path apath dbname))
+ (conns (servdat-conns sinfo))
+ (mconn (rmt:get-conn sinfo apath ".db/main.db"))
+ (dconn (rmt:get-conn sinfo apath dbname)))
+ #;(if (and mconn
+ (not (debug:print-logger)))
+ (begin
+ (debug:print-info 0 *default-log-port* "Turning on logging to main, look in logs dir for main log.")
+ (debug:print-logger rmt:log-to-main)))
+ (cond
+ ((and mconn
+ dconn
+ (< (current-seconds)(conndat-expires dconn)))
+ #t) ;; good to go
+ ((not mconn) ;; no channel open to main? open it...
+ (rmt:open-main-connection sinfo apath)
+ (rmt:general-open-connection sinfo apath dbname num-tries: (- num-tries 1)))
+ ((not dconn) ;; no channel open to dbname?
+ (let* ((res (rmt:send-receive-real sinfo apath mdbname 'get-server `(,apath ,dbname))))
+ (case res
+ ((server-started)
+ (if (> num-tries 0)
+ (begin
+ (thread-sleep! 2)
+ (rmt:general-open-connection sinfo apath dbname num-tries: (- num-tries 1)))
+ (begin
+ (debug:print-error 0 *default-log-port* "Failed to start servers needed or open channel to "apath", "dbname)
+ (exit 1))))
+ (else
+ (if (list? res) ;; server has been registered and the info was returned. pass it on.
+ (begin ;; ("" 53817
+ ;; "5e34239f48e8973b3813221e54701a01" "24310"
+ ;; ""
+ ;; "/home/matt/data/megatest/tests/simplerun"
+ ;; ".db/1.db")
+ (match
+ res
+ ((host port servkey pid ipaddr apath dbname)
+ (debug:print-info 0 *default-log-port* "got "res)
+ (hash-table-set! conns
+ fullname
+ (make-conndat
+ apath: apath
+ dbname: dbname
+ hostport: (conc host":"port)
+ ;; socket: (open-nn-connection (conc host":"port)) ;; TODO - open ulex connection?
+ ipaddr: ipaddr
+ port: port
+ srvkey: servkey
+ lastmsg: (current-seconds)
+ expires: (+ (current-seconds)
+ (server:expiration-timeout)
+ -2))))
+ (else
+ (debug:print-info 0 *default-log-port* "return data from starting server did not match host port servkey pid ipaddr apath dbname " res)))
+ res)
+ (begin
+ (debug:print-info 0 *default-log-port* "Unexpected result: " res)
+ res)))))))
+ #t))
+;; (define *localmode* #t)
+(define *localmode* #f)
+(define *dbstruct* (make-dbr:dbstruct))
+;; Defaults to current area
+(define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f))
+ (let* ((apath *toppath*)
+ (sinfo *db-serv-info*)
+ (dbname (db:run-id->dbname rid)))
+ (if *localmode*
+ (api:execute-requests *dbstruct* cmd params)
+ (begin
+ (rmt:open-main-connection sinfo apath)
+ (if rid (rmt:general-open-connection sinfo apath dbname))
+ #;(if (not (member cmd '(log-to-main)))
+ (debug:print-info 0 *default-log-port* "rmt:send-receive "cmd" params="params))
+ (rmt:send-receive-real sinfo apath dbname cmd params)))))
+;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed
+;; sometime in the future
+(define (rmt:send-receive-real sinfo apath dbname cmd params)
+ (assert (not (eq? 'primordial (thread-name (current-thread)))) "FATAL: Do not call rmt:send-receive-real in the primodial thread.")
+ (let* ((cdat (rmt:get-conn sinfo apath dbname)))
+ (assert cdat "FATAL: rmt:send-receive-real called without the needed channels opened")
+ (let* ((uconn (servdat-uconn sinfo)) ;; get the interface to ulex
+ ;; then send-receive using the ulex layer to host-port stored in cdat
+ (res (send-receive uconn (conndat-hostport cdat) cmd params))
+ #;(th1 (make-thread (lambda ()
+ (set! res (send-receive uconn (conndat-hostport cdat) cmd params)))
+ "send-receive thread")))
+ ;; (thread-start! th1)
+ ;; (thread-join! th1) ;; gratuitious thread stuff is so that mailbox is not used in primordial thead
+ ;; since we accessed the server we can bump the expires time up
+ (conndat-expires-set! cdat (+ (current-seconds)
+ (server:expiration-timeout)
+ -2)) ;; two second margin for network time misalignments etc.
+ res)))
+;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed
+;; sometime in the future.
+;; Purpose - call the main.db server and request a server be started
+;; for the given area path and dbname
+(define (rmt:print-db-stats)
+ (let ((fmtstr "~40a~7-d~9-d~20,2-f")) ;; "~20,2-f"
+ (debug:print 18 *default-log-port* "DB Stats, "(seconds->year-week/day-time (current-seconds))"\n=====================")
+ (debug:print 18 *default-log-port* (format #f "~40a~8a~10a~10a" "Cmd" "Count" "TotTime" "Avg"))
+ (for-each (lambda (cmd)
+ (let ((cmd-dat (hash-table-ref *db-stats* cmd)))
+ (debug:print 18 *default-log-port* (format #f fmtstr cmd (vector-ref cmd-dat 0) (vector-ref cmd-dat 1) (/ (vector-ref cmd-dat 1)(vector-ref cmd-dat 0))))))
+ (sort (hash-table-keys *db-stats*)
+ (lambda (a b)
+ (> (vector-ref (hash-table-ref *db-stats* a) 0)
+ (vector-ref (hash-table-ref *db-stats* b) 0)))))))
+(define (rmt:get-max-query-average run-id)
+ (mutex-lock! *db-stats-mutex*)
+ (let* ((runkey (conc "run-id=" run-id " "))
+ (cmds (filter (lambda (x)
+ (substring-index runkey x))
+ (hash-table-keys *db-stats*)))
+ (res (if (null? cmds)
+ (cons 'none 0)
+ (let loop ((cmd (car cmds))
+ (tal (cdr cmds))
+ (max-cmd (car cmds))
+ (res 0))
+ (let* ((cmd-dat (hash-table-ref *db-stats* cmd))
+ (tot (vector-ref cmd-dat 0))
+ (curravg (/ (vector-ref cmd-dat 1) (vector-ref cmd-dat 0))) ;; count is never zero by construction
+ (currmax (max res curravg))
+ (newmax-cmd (if (> curravg res) cmd max-cmd)))
+ (if (null? tal)
+ (if (> tot 10)
+ (cons newmax-cmd currmax)
+ (cons 'none 0))
+ (loop (car tal)(cdr tal) newmax-cmd currmax)))))))
+ (mutex-unlock! *db-stats-mutex*)
+ res))
+;; host and port are used to ensure we are remove proper records
+(define (rmt:server-shutdown host port)
+ (let ((dbfile (servdat-dbfile *db-serv-info*)))
+ (debug:print-info 0 *default-log-port* "dbfile is "dbfile)
+ (if dbfile
+ (let* ((am-server (args:get-arg "-server"))
+ (dbfile (args:get-arg "-db"))
+ (apath *toppath*)
+ #;(sinfo *remotedat*)) ;; foundation for future fix
+ (if *dbstruct-db*
+ (let* ((dbdat (db:get-dbdat *dbstruct-db* apath dbfile))
+ (db (dbr:dbdat-db dbdat))
+ (inmem (dbr:dbdat-db dbdat)) ;; WRONG
+ )
+ ;; do a final sync here
+ (debug:print-info 0 *default-log-port* "Doing final sync for "apath" "dbfile" at "(current-seconds))
+ (db:sync-inmem->disk *dbstruct-db* apath dbfile force-sync: #t)
+ ;; let's finalize here
+ (debug:print-info 0 *default-log-port* "Finalizing db and inmem")
+ (if (sqlite3:database? db)
+ (sqlite3:finalize! db)
+ (debug:print-info 0 *default-log-port* "in rmt:server-shutdown, db is not a database, not finalizing..."))
+ (if (sqlite3:database? inmem)
+ (sqlite3:finalize! inmem)
+ (debug:print-info 0 *default-log-port* "in rmt:server-shutdown, inmem is not a database, not finalizing..."))
+ (debug:print-info 0 *default-log-port* "Finalizing db and inmem complete"))
+ (debug:print-info 0 *default-log-port* "Db was never opened, no cleanup to do."))
+ (if (not am-server)
+ (debug:print-info 0 *default-log-port* "I am not a server, should NOT get here!")
+ (if (string-match ".*/main.db$" dbfile)
+ (let ((pkt-file (conc (get-pkts-dir *toppath*)
+ "/" (servdat-uuid *db-serv-info*)
+ ".pkt")))
+ (debug:print-info 0 *default-log-port* "removing pkt "pkt-file)
+ (delete-file* pkt-file)
+ (debug:print-info 0 *default-log-port* "Releasing lock (if any) for "dbfile ", host "host", port "port)
+ (db:with-lock-db
+ (servdat-dbfile *db-serv-info*)
+ (lambda (dbh dbfile)
+ (db:release-lock dbh dbfile host port)))) ;; I'm not the server - should not have a lock to remove
+ (let* ((sdat *db-serv-info*) ;; we have a run-id server
+ (host (servdat-host sdat))
+ (port (servdat-port sdat))
+ (uuid (servdat-uuid sdat))
+ (res (rmt:deregister-server *db-serv-info* *toppath* host port uuid dbfile)))
+ (debug:print-info 0 *default-log-port* "deregistered-server, res="res)
+ (debug:print-info 0 *default-log-port* "deregistering server "host":"port" with uuid "uuid)
+ )))))))
+(define (common:run-sync?)
+ ;; (and (common:on-homehost?)
+ (args:get-arg "-server"))
+(define *rmt:run-mutex* (make-mutex))
+(define *rmt:run-flag* #f)
+;; Main entry point to start a server. was start-server
+(define (rmt:run hostn)
+ (mutex-lock! *rmt:run-mutex*)
+ (if *rmt:run-flag*
+ (begin
+ (debug:print-warn 0 *default-log-port* "rmt:run already running.")
+ (mutex-unlock! *rmt:run-mutex*))
+ (begin
+ (set! *rmt:run-flag* #t)
+ (mutex-unlock! *rmt:run-mutex*)
+ ;; ;; Configurations for server
+ ;; (tcp-buffer-size 2048)
+ ;; (max-connections 2048)
+ (debug:print 2 *default-log-port* "PID: "(current-process-id)". Attempting to start the server ...")
+ (if (and *db-serv-info*
+ (servdat-uconn *db-serv-info*))
+ (let* ((uconn (servdat-uconn *db-serv-info*)))
+ (wait-and-close uconn))
+ (let* ((port (portlogger:open-run-close portlogger:find-port))
+ (handler-proc (lambda (rem-host-port qrykey cmd params) ;;
+ (set! *db-last-access* (current-seconds))
+ (assert (list? params) "FATAL: handler called with non-list params")
+ (assert (args:get-arg "-server") "FATAL: handler called on non-server side. cmd="cmd", params="params)
+ (debug:print 0 *default-log-port* "handler call: "cmd", params="params)
+ (api:execute-requests *dbstruct-db* cmd params))))
+ ;; (api:process-request *dbstuct-db*
+ (if (not *db-serv-info*)
+ (set! *db-serv-info* (make-servdat host: hostn port: port)))
+ (let* ((uconn (run-listener handler-proc port))
+ (rport (udat-port uconn))) ;; the real port
+ (servdat-host-set! *db-serv-info* hostn)
+ (servdat-port-set! *db-serv-info* rport)
+ (servdat-uconn-set! *db-serv-info* uconn)
+ (wait-and-close uconn)
+ (db:print-current-query-stats)
+ )))
+ (let* ((host (servdat-host *db-serv-info*))
+ (port (servdat-port *db-serv-info*))
+ (mode (or (servdat-mode *db-serv-info*)
+ "non-db")))
+ ;; server exit stuff here
+ ;; (rmt:server-shutdown host port) - always do in on-exit
+ ;; (portlogger:open-run-close portlogger:set-port port "released") ;; moved to on-exit
+ (debug:print-info 0 *default-log-port* "Server "host":"port" mode "mode"shutdown complete. Exiting")
+ ))))
+;; S E R V E R U T I L I T I E S
+;; only use for main.db - need to re-write some of this :(
+(define (get-lock-db sdat dbfile host port)
+ (assert host "FATAL: get-lock-db called with host not set.")
+ (assert port "FATAL: get-lock-db called with port not set.")
+ (let* ((dbh (db:open-run-db dbfile db:initialize-db)) ;; open-run-db creates a standard db with schema used by all situations
+ (res (db:get-iam-server-lock dbh dbfile host port))
+ (uconn (servdat-uconn sdat)))
+ ;; res => list then already locked, check server is responsive
+ ;; => #t then sucessfully got the lock
+ ;; => #f reserved for future use as to indicate something went wrong
+ (match res
+ ((owner_pid owner_host owner_port event_time)
+ (if (server-ready? uconn (conc owner_host":"owner_port) "abc")
+ #f ;; locked by someone else
+ (begin ;; locked by someone dead and gone
+ (debug:print 0 *default-log-port* "WARNING: stale lock - have to steal it. This may fail.")
+ (db:steal-lock-db dbh dbfile port))))
+ (#t #t) ;; placeholder so that we don't touch res if it is #t
+ (else (set! res #f)))
+ (sqlite3:finalize! dbh)
+ res))
+(define (register-server pkts-dir pkt-spec host port servkey ipaddr dbpath)
+ (let* ((pkt-dat `((host . ,host)
+ (port . ,port)
+ (servkey . ,servkey)
+ (pid . ,(current-process-id))
+ (ipaddr . ,ipaddr)
+ (dbpath . ,dbpath)))
+ (uuid (write-alist->pkt
+ pkts-dir
+ pkt-dat
+ pktspec: pkt-spec
+ ptype: 'server)))
+ (debug:print 0 *default-log-port* "Server on "host":"port" registered in pkt "uuid)
+ uuid))
+(define (get-pkts-dir #!optional (apath #f))
+ (let* ((effective-toppath (or *toppath* apath)))
+ (assert effective-toppath
+ "ERROR: get-pkts-dir called without *toppath* set. Exiting.")
+ (let* ((pdir (conc effective-toppath "/.meta/srvpkts")))
+ (if (file-exists? pdir)
+ pdir
+ (begin
+ (handle-exceptions ;; this exception handler should NOT be needed but ...
+ exn
+ pdir
+ (create-directory pdir #t))
+ pdir)))))
+;; given a pkts dir read
+(define (get-all-server-pkts pktsdir-in pktspec)
+ (let* ((pktsdir (if (file-exists? pktsdir-in)
+ pktsdir-in
+ (begin
+ (create-directory pktsdir-in #t)
+ pktsdir-in)))
+ (all-pkt-files (glob (conc pktsdir "/*.pkt"))))
+ (map (lambda (pkt-file)
+ (read-pkt->alist pkt-file pktspec: pktspec))
+ all-pkt-files)))
+(define (server-address srv-pkt)
+ (conc (alist-ref 'host srv-pkt) ":"
+ (alist-ref 'port srv-pkt)))
+(define (server-ready? uconn host-port key) ;; server-address is host:port
+ (let* ((params `((cmd . ping)(key . ,key)))
+ (data `((cmd . ping)
+ (key . ,key)
+ (params . ,params))) ;; I don't get it.
+ (res (send-receive uconn host-port 'ping data)))
+ (if (eq? res 'ack) ;; yep, likely it is who we want on the other end
+ res
+ #f)))
+;; (begin (debug:print-info 0 *default-log-port* "server-ready? => "res) #f))))
+; from the pkts return servers associated with dbpath
+;; NOTE: Only one can be alive - have to check on each
+;; in the list of pkts returned
+(define (get-viable-servers serv-pkts dbpath)
+ (let loop ((tail serv-pkts)
+ (res '()))
+ (if (null? tail)
+ res ;; NOTE: sort by age so oldest is considered first
+ (let* ((spkt (car tail)))
+ (loop (cdr tail)
+ (if (equal? dbpath (alist-ref 'dbpath spkt))
+ (cons spkt res)
+ res))))))
+(define (remove-pkts-if-not-alive uconn serv-pkts)
+ (filter (lambda (pkt)
+ (let* ((host (alist-ref 'host pkt))
+ (port (alist-ref 'port pkt))
+ (host-port (conc host":"port))
+ (key (alist-ref 'servkey pkt))
+ (pktz (alist-ref 'Z pkt))
+ (res (server-ready? uconn host-port key)))
+ (if res
+ res
+ (let* ((pktsdir (get-pkts-dir *toppath*))
+ (pktpath (conc pktsdir"/"pktz".pkt")))
+ (debug:print 0 *default-log-port* "WARNING: pkt with no server "pktpath)
+ (delete-file* pktpath)
+ #f))))
+ serv-pkts))
+;; from viable servers get one that is alive and ready
+(define (get-the-server uconn apath serv-pkts)
+ (let loop ((tail serv-pkts))
+ (if (null? tail)
+ #f
+ (let* ((spkt (car tail))
+ (host (alist-ref 'ipaddr spkt))
+ (port (alist-ref 'port spkt))
+ (host-port (conc host":"port))
+ (dbpth (alist-ref 'dbpath spkt))
+ (srvkey (alist-ref 'Z spkt)) ;; (alist-ref 'srvkey spkt))
+ (addr (server-address spkt)))
+ (if (server-ready? uconn host-port srvkey)
+ spkt
+ (loop (cdr tail)))))))
+;; am I the "first" in line server? I.e. my D card is smallest
+;; use Z card as tie breaker
+(define (get-best-candidate serv-pkts dbpath)
+ (if (null? serv-pkts)
+ #f
+ (let loop ((tail serv-pkts)
+ (best (car serv-pkts)))
+ (if (null? tail)
+ best
+ (let* ((candidate (car tail))
+ (candidate-bd (string->number (alist-ref 'D candidate)))
+ (best-bd (string->number (alist-ref 'D best)))
+ ;; bigger number is younger
+ (candidate-z (alist-ref 'Z candidate))
+ (best-z (alist-ref 'Z best))
+ (new-best (cond
+ ((> best-bd candidate-bd) ;; best is younger than candidate
+ candidate)
+ ((< best-bd candidate-bd) ;; candidate is younger than best
+ best)
+ (else
+ (if (string>=? best-z candidate-z)
+ best
+ candidate))))) ;; use Z card as tie breaker
+ (if (null? tail)
+ new-best
+ (loop (cdr tail) new-best)))))))
+;; if .db/main.db check the pkts
+(define (rmt:wait-for-server pkts-dir db-file server-key)
+ (let* ((sdat *db-serv-info*))
+ (let loop ((start-time (current-seconds))
+ (changed #t)
+ (last-sdat "not this"))
+ (begin ;; let ((sdat #f))
+ (thread-sleep! 0.01)
+ (debug:print-info 0 *default-log-port* "Waiting for server alive signature")
+ (mutex-lock! *heartbeat-mutex*)
+ (set! sdat *db-serv-info*)
+ (mutex-unlock! *heartbeat-mutex*)
+ (if (and sdat
+ (not changed)
+ (> (- (current-seconds) start-time) 2))
+ (let* ((uconn (servdat-uconn sdat)))
+ (servdat-status-set! sdat 'iface-stable)
+ (debug:print-info 0 *default-log-port* "Received server alive signature, now attempting to lock in server")
+ ;; create a server pkt in *toppath*/.meta/srvpkts
+ ;; TODO:
+ ;; 1. change sdat to stuct
+ ;; 2. add uuid to struct
+ ;; 3. update uuid in sdat here
+ ;;
+ (servdat-uuid-set! sdat
+ (register-server
+ pkts-dir *srvpktspec*
+ (get-host-name)
+ (servdat-port sdat) server-key
+ (servdat-host sdat) db-file))
+ ;; (set! *my-signature* (servdat-uuid sdat)) ;; replace with Z, no, stick with proper key
+ ;; now read pkts and see if we are a contender
+ (let* ((all-pkts (get-all-server-pkts pkts-dir *srvpktspec*))
+ (viables (get-viable-servers all-pkts db-file))
+ (alive (remove-pkts-if-not-alive uconn viables))
+ (best-srv (get-best-candidate alive db-file))
+ (best-srv-key (if best-srv (alist-ref 'servkey best-srv) #f))
+ (i-am-srv (equal? best-srv-key server-key))
+ (delete-pkt (lambda ()
+ (let* ((pktfile (conc (get-pkts-dir *toppath*)
+ "/" (servdat-uuid *db-serv-info*)
+ ".pkt")))
+ (debug:print-info 0 *default-log-port* "Attempting to remove bogus pkt file "pktfile)
+ (delete-file* pktfile))))) ;; remove immediately instead of waiting for on-exit
+ (debug:print 0 *default-log-port* "best-srv-key: "best-srv-key", server-key: "server-key", i-am-srv: "i-am-srv)
+ ;; am I the best-srv, compare server-keys to know
+ (if i-am-srv
+ (if (get-lock-db sdat db-file (servdat-host sdat)(servdat-port sdat)) ;; (db:get-iam-server-lock *dbstruct-db* *toppath* run-id)
+ (begin
+ (debug:print-info 0 *default-log-port* "I'm the server!")
+ (servdat-dbfile-set! sdat db-file)
+ (servdat-status-set! sdat 'db-locked))
+ (begin
+ (debug:print-info 0 *default-log-port* "I'm not the server, exiting.")
+ (bdat-time-to-exit-set! *bdat* #t)
+ (delete-pkt)
+ (thread-sleep! 0.2)
+ (exit)))
+ (begin
+ (debug:print-info 0 *default-log-port*
+ "Keys do not match "best-srv-key", "server-key", exiting.")
+ (bdat-time-to-exit-set! *bdat* #t)
+ (delete-pkt)
+ (thread-sleep! 0.2)
+ (exit)))
+ sdat))
+ (begin ;; sdat not yet contains server info
+ (debug:print-info 0 *default-log-port* "Still waiting, last-sdat=" last-sdat)
+ (sleep 4)
+ (if (> (- (current-seconds) start-time) 120) ;; been waiting for two minutes
+ (begin
+ (debug:print-error 0 *default-log-port* "transport appears to have died, exiting server")
+ (exit))
+ (loop start-time
+ (equal? sdat last-sdat)
+ sdat))))))))
+(define (rmt:register-server sinfo apath iface port server-key dbname)
+ (servdat-conns sinfo) ;; just checking types
+ (rmt:open-main-connection sinfo apath) ;; we need a channel to main.db
+ (rmt:send-receive-real sinfo apath ;; params: host port servkey pid ipaddr dbpath
+ (db:run-id->dbname #f)
+ 'register-server `(,iface
+ ,port
+ ,server-key
+ ,(current-process-id)
+ ,iface
+ ,apath
+ ,dbname)))
+(define (rmt:get-count-servers sinfo apath)
+ (servdat-conns sinfo) ;; just checking types
+ (rmt:open-main-connection sinfo apath) ;; we need a channel to main.db
+ (rmt:send-receive-real sinfo apath ;; params: host port servkey pid ipaddr dbpath
+ (db:run-id->dbname #f)
+ 'get-count-servers `(,apath)))
+(define (rmt:get-servers-info apath)
+ (rmt:send-receive 'get-servers-info #f `(,apath)))
+(define (rmt:deregister-server db-serv-info apath iface port server-key dbname)
+ (rmt:open-main-connection db-serv-info apath) ;; we need a channel to main.db
+ (rmt:send-receive-real db-serv-info apath ;; params: host port servkey pid ipaddr dbpath
+ (db:run-id->dbname #f)
+ 'deregister-server `(,iface
+ ,port
+ ,server-key
+ ,(current-process-id)
+ ,iface
+ ,apath
+ ,dbname)))
+(define (rmt:wait-for-stable-interface #!optional (num-tries-allowed 100))
+ ;; wait until *db-serv-info* stops changing
+ (let* ((stime (current-seconds)))
+ (let loop ((last-host #f)
+ (last-port #f)
+ (tries 0))
+ (let* ((curr-host (and *db-serv-info* (servdat-host *db-serv-info*)))
+ (curr-port (and *db-serv-info* (servdat-port *db-serv-info*))))
+ ;; first we verify port and interface, update *db-serv-info* in need be.
+ (cond
+ ((> tries num-tries-allowed)
+ (debug:print 0 *default-log-port* "rmt:keep-running, giving up after trying for several minutes.")
+ (exit 1))
+ ((not *db-serv-info*)
+ (thread-sleep! 0.25)
+ (loop curr-host curr-port (+ tries 1)))
+ ((or (not last-host)(not last-port))
+ (debug:print 0 *default-log-port* "rmt:keep-running, still no interface, tries="tries)
+ (thread-sleep! 0.25)
+ (loop curr-host curr-port (+ tries 1)))
+ ((or (not (equal? last-host curr-host))
+ (not (equal? last-port curr-port)))
+ (debug:print-info 0 *default-log-port* "WARNING: interface changed, refreshing iface and port info")
+ (thread-sleep! 0.25)
+ (loop curr-host curr-port (+ tries 1)))
+ ((< (- (current-seconds) stime) 1) ;; keep up the looping until at least 3 seconds have passed
+ (thread-sleep! 0.5)
+ (loop curr-host curr-port (+ tries 1)))
+ (else
+ (rmt:get-signature) ;; sets *my-signature* as side effect
+ (servdat-status-set! *db-serv-info* 'interface-stable)
+ (debug:print 0 *default-log-port*
+ "SERVER STARTED: " curr-host
+ ":" curr-port
+ " AT " (current-seconds) " server signature: " *my-signature*
+ " with "(servdat-trynum *db-serv-info*)" port changes")
+ (flush-output *default-log-port*)
+ #t))))))
+;; run rmt:keep-running in a parallel thread to monitor that the db is being
+;; used and to shutdown after sometime if it is not.
+(define (rmt:keep-running dbname)
+ ;; if none running or if > 20 seconds since
+ ;; server last used then start shutdown
+ ;; This thread waits for the server to come alive
+ (debug:print-info 0 *default-log-port* "Starting the sync-back, keep alive thread in server")
+ (let* ((sinfo *db-serv-info*)
+ (server-start-time (current-seconds))
+ (pkts-dir (get-pkts-dir))
+ (server-key (rmt:get-signature)) ;; This servers key
+ (is-main (equal? (args:get-arg "-db") ".db/main.db"))
+ (last-access 0)
+ (server-timeout (server:expiration-timeout))
+ (shutdown-server-sequence (lambda (host port)
+ (set! *unclean-shutdown* #f) ;; Should not be needed anymore
+ (debug:print-info 0 *default-log-port* "Starting to shutdown the server. pid="(current-process-id))
+ ;; (rmt:server-shutdown host port) -- called in on-exit
+ ;; (portlogger:open-run-close portlogger:set-port port "released") called in on-exit
+ (exit)))
+ (timed-out? (lambda ()
+ (<= (+ last-access server-timeout)
+ (current-seconds)))))
+ (servdat-dbfile-set! *db-serv-info* (args:get-arg "-db"))
+ ;; main and run db servers have both got wait logic (could/should merge it)
+ (if is-main
+ (rmt:wait-for-server pkts-dir dbname server-key)
+ (rmt:wait-for-stable-interface))
+ ;; this is our forever loop
+ (let* ((iface (servdat-host *db-serv-info*))
+ (port (servdat-port *db-serv-info*))
+ (uconn (servdat-uconn *db-serv-info*)))
+ (let loop ((count 0)
+ (bad-sync-count 0)
+ (start-time (current-milliseconds)))
+ (if (and (not is-main)
+ (common:low-noise-print 60 "servdat-status"))
+ (debug:print-info 0 *default-log-port* "servdat-status is " (servdat-status *db-serv-info*)))
+ (mutex-lock! *heartbeat-mutex*)
+ ;; set up the database handle
+ (if (not *dbstruct-db*) ;; no db opened yet, open the db and register with main if appropriate
+ (let ((watchdog (bdat-watchdog *bdat*)))
+ (debug:print 0 *default-log-port* "SERVER: dbprep")
+ (db:setup dbname) ;; sets *dbstruct-db* as side effect
+ (servdat-status-set! *db-serv-info* 'db-opened)
+ ;; IFF I'm not main, call into main and register self
+ (if (not is-main)
+ (let ((res (rmt:register-server sinfo
+ *toppath* iface port
+ server-key dbname)))
+ (if res ;; we are the server
+ (servdat-status-set! *db-serv-info* 'have-interface-and-db)
+ ;; now check that the db locker is alive, clear it out if not
+ (let* ((serv-info (rmt:server-info *toppath* dbname)))
+ (match serv-info
+ ((host port servkey pid ipaddr apath dbpath)
+ (if (not (server-ready? uconn (conc host":"port) servkey))
+ (begin
+ (debug:print-info 0 *default-log-port* "Server registered but not alive. Removing and trying again.")
+ (rmt:deregister-server sinfo apath host port servkey dbpath) ;; servkey pid ipaddr apath dbpath)
+ (loop (+ count 1) bad-sync-count start-time))))
+ (else
+ (debug:print 0 *default-log-port* "We are not the server for "dbname", exiting. Server info is: "serv-info)
+ (exit)))))))
+ (debug:print 0 *default-log-port*
+ "SERVER: running, db "dbname" opened, megatest version: "
+ (common:get-full-version))
+ ;; start the watchdog
+ ;; is this really needed?
+ #;(if watchdog
+ (if (not (member (thread-state watchdog)
+ '(ready running blocked
+ sleeping dead)))
+ (begin
+ (debug:print-info 0 *default-log-port* "Starting watchdog thread (in state "(thread-state watchdog)")")
+ (thread-start! watchdog))
+ (debug:print-info 0 *default-log-port* "Not starting watchdog thread (in state "(thread-state watchdog)")"))
+ (debug:print 0 *default-log-port* "ERROR: *watchdog* not setup, cannot start it."))
+ #;(loop (+ count 1) bad-sync-count start-time)
+ ))
+ (db:sync-inmem->disk *dbstruct-db* *toppath* dbname force-sync: #t)
+ (mutex-unlock! *heartbeat-mutex*)
+ ;; when things go wrong we don't want to be doing the various
+ ;; queries too often so we strive to run this stuff only every
+ ;; four seconds or so.
+ (let* ((sync-time (- (current-milliseconds) start-time))
+ (rem-time (quotient (- 4000 sync-time) 1000)))
+ (if (and (<= rem-time 4)
+ (> rem-time 0))
+ (thread-sleep! rem-time)))
+ ;; Transfer *db-last-access* to last-access to use in checking that we are still alive
+ (set! last-access *db-last-access*)
+ (if (< count 1) ;; 3x3 = 9 secs aprox
+ (loop (+ count 1) bad-sync-count (current-milliseconds)))
+ (if (common:low-noise-print 60 "dbstats")
+ (begin
+ (debug:print 0 *default-log-port* "Server stats:")
+ (db:print-current-query-stats)))
+ (let* ((hrs-since-start (/ (- (current-seconds) server-start-time) 3600)))
+ (cond
+ ((not *server-run*)
+ (debug:print-info 0 *default-log-port* "*server-run* set to #f. Shutting down.")
+ (shutdown-server-sequence (get-host-name) port))
+ ((timed-out?)
+ (debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access))
+ (shutdown-server-sequence (get-host-name) port))
+ ((and *server-run*
+ (or (not (timed-out?))
+ (if is-main ;; do not exit if there are other servers (keep main open until all others gone)
+ (> (rmt:get-count-servers sinfo *toppath*) 1)
+ #f)))
+ (if (common:low-noise-print 120 "server continuing")
+ (debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access)))
+ (loop 0 bad-sync-count (current-milliseconds)))
+ (else
+ (set! *unclean-shutdown* #f)
+ (debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access))
+ (shutdown-server-sequence (get-host-name) port)
+ #;(debug:print-info 0 *default-log-port* "Sending 'quit to server, received: "
+ (open-send-receive-nn (conc iface":"port) ;; do this here and not in server-shutdown
+ (sexpr->string 'quit))))))))))
+(define (rmt:get-reasonable-hostname)
+ (let* ((inhost (or (args:get-arg "-server") "-")))
+ (if (equal? inhost "-")
+ (get-host-name)
+ inhost)))
+;; Call this to start the actual server
+;; all routes though here end in exit ...
+;; This is the point at which servers are started
+(define (rmt:server-launch dbname)
+ (debug:print-info 0 *default-log-port* "Entered rmt:server-launch")
+ (let* ((th2 (make-thread (lambda ()
+ (debug:print-info 0 *default-log-port* "Server run thread started")
+ (rmt:run (rmt:get-reasonable-hostname)))
+ "Server run"))
+ (th3 (make-thread (lambda ()
+ (debug:print-info 0 *default-log-port* "Server monitor thread started")
+ (if (args:get-arg "-server")
+ (rmt:keep-running dbname)))
+ "Keep running")))
+ (thread-start! th2)
+ (thread-sleep! 0.252) ;; give the server time to settle before starting the keep-running monitor.
+ (thread-start! th3)
+ (set! *didsomething* #t)
+ (thread-join! th2)
+ (thread-join! th3))
+ #f)
+;; S E R V E R - D I R E C T C A L L S
+(define (rmt:kill-server run-id)
+ (rmt:send-receive 'kill-server #f (list run-id)))
+(define (rmt:start-server run-id)
+ (rmt:send-receive 'start-server #f (list run-id)))
+(define (rmt:server-info apath dbname)
+ (rmt:send-receive 'get-server-info #f (list apath dbname)))
+;; Nanomsg transport
+#;(define (is-port-in-use port-num)
+ (let* ((ret #f))
+ (let-values (((inp oup pid)
+ (process "netstat" (list "-tulpn" ))))
+ (let loop ((inl (read-line inp)))
+ (if (not (eof-object? inl))
+ (begin
+ (if (string-search (regexp (conc ":" port-num)) inl)
+ (begin
+ ;(print "Output: " inl)
+ (set! ret #t))
+ (loop (read-line inp)))))))
+ ret))
+#;(define (open-nn-connection host-port)
+ (let ((req (make-req-socket))
+ (uri (conc "tcp://" host-port)))
+ (nng-dial req uri)
+ (socket-set! req 'nng/recvtimeo 2000)
+ req))
+#;(define (send-receive-nn req msg)
+ (nng-send req msg)
+ (nng-recv req))
+#;(define (close-nn-connection req)
+ (nng-close! req))
+;; ;; open connection to server, send message, close connection
+;; ;;
+;; (define (open-send-close-nn host-port msg #!key (timeout 3) ) ;; default timeout is 3 seconds
+;; (let ((req (make-req-socket 'req))
+;; (uri (conc "tcp://" host-port))
+;; (res #f)
+;; ;; (contacts (alist-ref 'contact attrib))
+;; ;; (mode (alist-ref 'mode attrib))
+;; )
+;; (socket-set! req 'nng/recvtimeo 2000)
+;; (handle-exceptions
+;; exn
+;; (let ((emsg ((condition-property-accessor 'exn 'message) exn)))
+;; ;; Send notification
+;; (debug:print 0 *default-log-port* "ERROR: Failed to connect / send to " uri " message was \"" emsg "\"" )
+;; #f)
+;; (nng-dial req uri)
+;; ;; (print "Connected to the server " )
+;; (nng-send req msg)
+;; ;; (print "Request Sent")
+;; (let* ((th1 (make-thread (lambda ()
+;; (let ((resp (nng-recv req)))
+;; (nng-close! req)
+;; (set! res (if (equal? resp "ok")
+;; #t
+;; #f))))
+;; "recv thread"))
+;; (th2 (make-thread (lambda ()
+;; (thread-sleep! timeout)
+;; (thread-terminate! th1))
+;; "timer thread")))
+;; (thread-start! th1)
+;; (thread-start! th2)
+;; (thread-join! th1)
+;; res))))
+#;(define (open-send-receive-nn host-port msg #!key (timeout 3) ) ;; default timeout is 3 seconds
+ (let ((req (make-req-socket))
+ (uri (conc "tcp://" host-port))
+ (res #f))
+ (handle-exceptions
+ exn
+ (let ((emsg ((condition-property-accessor 'exn 'message) exn)))
+ ;; Send notification
+ (debug:print 0 *default-log-port* "ERROR: Failed to connect / send to " uri " message was \"" emsg "\", exn=" exn)
+ #f)
+ (nng-dial req uri)
+ (nng-send req msg)
+ (let* ((th1 (make-thread (lambda ()
+ (let ((resp (nng-recv req)))
+ (nng-close! req)
+ ;; (print resp)
+ (set! res resp)))
+ "recv thread"))
+ (th2 (make-thread (lambda ()
+ (thread-sleep! timeout)
+ (thread-terminate! th1))
+ "timer thread")))
+ (thread-start! th1)
+ (thread-start! th2)
+ (thread-join! th1)
+ res))))
+;; S E R V E R U T I L I T I E S
+;; run ping in separate process, safest way in some cases
+#;(define (server:ping-server ifaceport)
+ (with-input-from-pipe
+ (conc (common:get-megatest-exe) " -ping " ifaceport)
+ (lambda ()
+ (let loop ((inl (read-line))
+ (res "NOREPLY"))
+ (if (eof-object? inl)
+ (case (string->symbol res)
+ ((NOREPLY) #f)
+ ((LOGIN_OK) #t)
+ (else #f))
+ (loop (read-line) inl))))))
+;; NOT USED (well, ok, reference in rpc-transport but otherwise not used).
+#;(define (server:login toppath)
+ (lambda (toppath)
+ (set! *db-last-access* (current-seconds)) ;; might not be needed.
+ (if (equal? *toppath* toppath)
+ #t
+ #f)))
+;; (define server:sync-lock-token "SERVER_SYNC_LOCK")
+;; (define (server:release-sync-lock)
+;; (db:no-sync-del! *no-sync-db* server:sync-lock-token))
+;; (define (server:have-sync-lock?)
+;; (let* ((have-lock-pair (db:no-sync-get-lock *no-sync-db* server:sync-lock-token))
+;; (have-lock? (car have-lock-pair))
+;; (lock-time (cdr have-lock-pair))
+;; (lock-age (- (current-seconds) lock-time)))
+;; (cond
+;; (have-lock? #t)
+;; ((>lock-age
+;; (* 3 (configf:lookup-number *configdat* "server" "minimum-intersync-delay" default: 180)))
+;; (server:release-sync-lock)
+;; (server:have-sync-lock?))
+;; (else #f))))