Index: Makefile
==================================================================
--- Makefile
+++ Makefile
@@ -173,11 +173,10 @@
mofiles/mutils.o : mutils/mutils.scm
mofiles/cookie.o : stml2/cookie.scm
mofiles/stml2.o : stml2/stml2.scm
# for the modularized stuff
-
mofiles/commonmod.o : megatest-fossil-hash.scm mofiles/stml2.o \
mofiles/mtargs.o mofiles/pkts.o mofiles/mtconfigf.o \
mofiles/processmod.o
mofiles/pgdbmod.o : mofiles/commonmod.o
mofiles/dbmod.o : mofiles/commonmod.o mofiles/keysmod.o \
@@ -191,10 +190,13 @@
mofiles/dbmod.o mofiles/pgdbmod.o mofiles/launchmod.o \
mofiles/subrunmod.o
mofiles/servermod.o : mofiles/commonmod.o mofiles/dbmod.o
mofiles/testsmod.o : mofiles/servermod.o mofiles/dbmod.o
mofiles/launchmod.o : mofiles/subrunmod.o mofiles/testsmod.o
+
+# special cases where an upstream .import file is needed to compile a module
+mofiles/rmtmod.o : ulex.import.o
# Removed from megamod.o dep: mofiles/ftail.o
mofiles/megamod.o : \
mofiles/rmtmod.o \
mofiles/commonmod.o \
Index: commonmod.scm
==================================================================
--- commonmod.scm
+++ commonmod.scm
@@ -20,11 +20,10 @@
(declare (unit commonmod))
(declare (uses mtargs))
;; (declare (uses stml2))
(declare (uses mtconfigf))
-(declare (uses ulex))
(declare (uses pkts))
(module commonmod
*
(import scheme chicken data-structures extras)
@@ -41,11 +40,10 @@
z3
directory-utils
sparse-vectors)
(import pkts)
-(import ulex)
(import (prefix mtconfigf configf:))
(import (prefix mtargs args:))
(include "common_records.scm")
(include "megatest-fossil-hash.scm")
@@ -1436,11 +1434,11 @@
csv->test-data
;; MISC
sync-inmem->db
- ;; TESTMETA
+ ;; TESTMETAl
testmeta-add-record
testmeta-update-field
;; TASKS
tasks-add
@@ -1448,19 +1446,19 @@
))
;;======================================================================
;; ALLDATA
;;======================================================================
-;;
+
;; attempt to consolidate a bunch of global information into one struct to toss around
(defstruct alldat
;; misc
(denoise (make-hash-table))
(areapath #f) ;; i.e. toppath
(mtconfig #f)
(log-port #f)
- (areadat #f) ;; i.e. runremote
+ (ulexdat #f) ;; connection to the databases
(rmt-mutex (make-mutex))
(db-sync-mutex (make-mutex))
(db-with-db-mutex (make-mutex))
(read-only-queries api:read-only-queries)
(write-queries api:write-queries)
Index: dashboard.scm
==================================================================
--- dashboard.scm
+++ dashboard.scm
@@ -55,10 +55,11 @@
(declare (uses stml2.import))
(declare (uses ducttape-lib.import))
(declare (uses mtconfigf.import))
(declare (uses gutilsmod))
+(declare (uses servermod))
(declare (uses megamod))
(declare (uses commonmod))
(declare (uses rmtmod))
(declare (uses runsmod))
(declare (uses dbmod))
@@ -67,10 +68,11 @@
(declare (uses dcommonmod))
(declare (uses tasksmod))
(import gutilsmod)
+(import servermod)
(import megamod)
(import commonmod)
(import rmtmod)
(import runsmod)
(import dbmod)
@@ -1433,10 +1435,95 @@
(for-each (lambda (graph-cell)
(let* ((graph-dat (hash-table-ref (dboard:tabdat-graph-cell-table tabdat) graph-cell)))
(dboard:graph-dat-flag-set! graph-dat #f)))
(hash-table-keys (dboard:tabdat-graph-cell-table tabdat)))))))
))))
+
+;;======================================================================
+;; SYSTEM MONITOR - Track relevant Megatest processes
+;;======================================================================
+;;
+;; A gui for launching tests
+;;
+(define (dashboard:sysmon commondat tabdat #!key (tab-num #f))
+ (let* ((drawing (vg:drawing-new))
+ (sysmon-tab-updater (lambda ()
+ (debug:catch-and-dump
+ (lambda ()
+ (let ((tabdat (dboard:common-get-tabdat commondat tab-num: tab-num)))
+ (if tabdat
+ (let ((last-data-update (dboard:tabdat-last-data-update tabdat))
+ (now-time (current-seconds)))
+ (dashboard:sysmon-canvas-updater commondat tabdat tab-num) ;; redraws canvas on user input (I think...)
+ ))))
+ "dashboard:sysmon-tab-updater")))
+ (key-listboxes #f) ;;
+ (update-keyvals (lambda ()
+ (dboard:target-updater tabdat))))
+ (dboard:tabdat-drawing-set! tabdat drawing)
+ (dboard:commondat-add-updater commondat sysmon-tab-updater tab-num: tab-num)
+ (iup:vbox
+ (let* ((cnv-obj (iup:canvas
+ ;; #:size "250x250" ;; "500x400"
+ #:expand "YES"
+ #:scrollbar "YES"
+ #:posx "0.5"
+ #:posy "0.5"
+ #:action (make-canvas-action
+ (lambda (c xadj yadj)
+ (debug:catch-and-dump
+ (lambda ()
+ (if (not (dboard:tabdat-cnv tabdat))
+ (let ((cnv (dboard:tabdat-cnv tabdat)))
+ (dboard:tabdat-cnv-set! tabdat c)
+ (vg:drawing-cnv-set! (dboard:tabdat-drawing tabdat)
+ (dboard:tabdat-cnv tabdat))))
+ (let ((drawing (dboard:tabdat-drawing tabdat))
+ (old-xadj (dboard:tabdat-xadj tabdat))
+ (old-yadj (dboard:tabdat-yadj tabdat)))
+ (if (not (and (eq? xadj old-xadj)(eq? yadj old-yadj)))
+ (begin
+ ;; (print "xadj: " xadj " yadj: " yadj "changed: "(eq? xadj old-xadj) " " (eq? yadj old-yadj))
+ (dboard:tabdat-view-changed-set! tabdat #t)
+ (dboard:tabdat-xadj-set! tabdat (* -2000 (- xadj 0.5)))
+ (dboard:tabdat-yadj-set! tabdat (* 2000 (- yadj 0.5)))
+ ))))
+ "iup:canvas action")))
+ #:wheel-cb (lambda (obj step x y dir) ;; dir is 4 for up and 5 for down. I think.
+ (debug:catch-and-dump
+ (lambda ()
+ (let* ((drawing (dboard:tabdat-drawing tabdat))
+ (scalex (vg:drawing-scalex drawing)))
+ (dboard:tabdat-view-changed-set! tabdat #t)
+ ;; (print "step: " step " x: " x " y: " y " dir: " dir " scalex: " scalex)
+ (vg:drawing-scalex-set! drawing
+ (+ scalex
+ (if (> step 0)
+ (* scalex 0.02)
+ (* scalex -0.02))))))
+ "wheel-cb"))
+ )))
+ cnv-obj))))
+
+;; run times canvas updater
+;;
+(define (dashboard:sysmon-canvas-updater commondat tabdat tab-num)
+ (let ((cnv (dboard:tabdat-cnv tabdat))
+ (dwg (dboard:tabdat-drawing tabdat))
+ (mtx (dboard:tabdat-runs-mutex tabdat))
+ (vch (dboard:tabdat-view-changed tabdat)))
+ (if (and cnv dwg vch)
+ (begin
+ (vg:drawing-xoff-set! dwg (dboard:tabdat-xadj tabdat))
+ (vg:drawing-yoff-set! dwg (dboard:tabdat-yadj tabdat))
+ (mutex-lock! mtx)
+ (canvas-clear! cnv)
+ (vg:draw dwg tabdat)
+ (mutex-unlock! mtx)
+ (dboard:tabdat-view-changed-set! tabdat #f)))))
+
+
;;======================================================================
;; R U N
;;======================================================================
;;
@@ -2205,10 +2292,11 @@
(let* ((stats-dat (dboard:tabdat-make-data))
(runs-dat (dboard:tabdat-make-data))
(onerun-dat (dboard:tabdat-make-data)) ;; name for run-summary structure
(runcontrols-dat (dboard:tabdat-make-data))
(runtimes-dat (dboard:tabdat-make-data))
+ (sysmon-dat (dboard:tabdat-make-data))
(nruns (dboard:tabdat-numruns runs-dat))
(ntests (dboard:tabdat-num-tests runs-dat))
(keynames (dboard:tabdat-dbkeys runs-dat))
(nkeys (length keynames))
(runsvec (make-vector nruns))
@@ -2423,18 +2511,20 @@
(dashboard:runs-summary commondat onerun-dat tab-num: 2)
;; (dashboard:new-view db data new-view-dat tab-num: 3)
(dashboard:run-controls commondat runcontrols-dat tab-num: 3)
(dashboard:run-times commondat runtimes-dat tab-num: 4)
;; ;; (dashboard:runs-summary commondat onerun-dat tab-num: 4)
+ (dashboard:sysmon commondat sysmon-dat tab-num: 5)
additional-views
)))
;; (set! (iup:callback tabs tabchange-cb:) (lambda (a b c)(print "SWITCHED TO TAB: " a " " b " " c)))
(iup:attribute-set! tabs "TABTITLE0" "Summary")
(iup:attribute-set! tabs "TABTITLE1" "Runs")
(iup:attribute-set! tabs "TABTITLE2" "Run Summary")
(iup:attribute-set! tabs "TABTITLE3" "Run Control")
(iup:attribute-set! tabs "TABTITLE4" "Run Times")
+ (iup:attribute-set! tabs "TABTITLE5" "Sysmon")
;; (iup:attribute-set! tabs "TABTITLE3" "New View")
;; (iup:attribute-set! tabs "TABTITLE4" "Run Control")
;; set the tab names for user added tabs
(for-each
@@ -2449,10 +2539,11 @@
(dboard:common-set-tabdat! commondat 0 stats-dat)
(dboard:common-set-tabdat! commondat 1 runs-dat)
(dboard:common-set-tabdat! commondat 2 onerun-dat)
(dboard:common-set-tabdat! commondat 3 runcontrols-dat)
(dboard:common-set-tabdat! commondat 4 runtimes-dat)
+ (dboard:common-set-tabdat! commondat 5 sysmon-dat)
(iup:vbox
tabs
;; controls
))))
Index: dcommonmod.scm
==================================================================
--- dcommonmod.scm
+++ dcommonmod.scm
@@ -40,11 +40,11 @@
matchable (prefix iup iup:)
canvas-draw
;; blindly copied from megamod
(prefix base64 base64:)
(prefix dbi dbi:)
- (prefix nanomsg nmsg:)
+ ;; (prefix nanomsg nmsg:)
(prefix sqlite3 sqlite3:)
call-with-environment-variables
csv
csv-xml
data-structures
Index: docs/code/Makefile
==================================================================
--- docs/code/Makefile
+++ docs/code/Makefile
@@ -1,3 +1,12 @@
+
+all : module-hierarchy.pdf
+
+deps.data : ../../*scm
+ cd ../..;csi -bq utils/deps.scm > docs/code/deps.data
+
+module-hierarchy.dot : preamble.dot deps.data postamble.dot
+ cat preamble.dot deps.data postamble.dot > module-hierarchy.dot
+
module-hierarchy.pdf : module-hierarchy.dot
dot -Tpdf module-hierarchy.dot -o module-hierarchy.pdf
Index: docs/code/module-hierarchy.dot
==================================================================
--- docs/code/module-hierarchy.dot
+++ docs/code/module-hierarchy.dot
@@ -23,38 +23,53 @@
subgraph cluster_megatest {
label="megatest";
rmtmod [label="rmt mod"];
- // httpmod [label="http-transportmod"];
- // commonmod
// archivemod.scm
+"sqlite3" -> "archivemod";
"commonmod" -> "archivemod";
// clientmod.scm
+"sqlite3" -> "clientmod";
"commonmod" -> "clientmod";
// configfmod.scm
+"sqlite3" -> "configfmod";
// subrunmod.scm
+"sqlite3" -> "subrunmod";
"commonmod" -> "subrunmod";
+"mtconfigf" -> "subrunmod";
// ezstepsmod.scm
+"sqlite3" -> "ezstepsmod";
"commonmod" -> "ezstepsmod";
// itemsmod.scm
+"sqlite3" -> "itemsmod";
"commonmod" -> "itemsmod";
// gutilsmod.scm
+"iup" -> "gutilsmod";
+"canvas-draw" -> "gutilsmod";
// testsmod.scm
+"sqlite3" -> "testsmod";
"commonmod" -> "testsmod";
"servermod" -> "testsmod";
"itemsmod" -> "testsmod";
"dbmod" -> "testsmod";
+"mtconfigf" -> "testsmod";
+"mtargs" -> "testsmod";
// runconfigmod.scm
+"sqlite3" -> "runconfigmod";
"commonmod" -> "runconfigmod";
// keysmod.scm
+"sqlite3" -> "keysmod";
"srfi-13" -> "keysmod";
// launchmod.scm
+"sqlite3" -> "launchmod";
"commonmod" -> "launchmod";
// megamod.scm
+"mtconfigf" -> "megamod";
"spiffy" -> "megamod";
"stml2" -> "megamod";
+"mtargs" -> "megamod";
"commonmod" -> "megamod";
"keysmod" -> "megamod";
"pgdbmod" -> "megamod";
"tasksmod" -> "megamod";
"dbmod" -> "megamod";
@@ -68,21 +83,29 @@
"servermod" -> "megamod";
"subrunmod" -> "megamod";
"itemsmod" -> "megamod";
"runsmod" -> "megamod";
// odsmod.scm
+"sqlite3" -> "odsmod";
"commonmod" -> "odsmod";
// envmod.scm
+"sqlite3" -> "envmod";
"commonmod" -> "envmod";
// http-transportmod.scm
"commonmod" -> "http-transportmod";
"stml2" -> "http-transportmod";
"apimod" -> "http-transportmod";
"dbmod" -> "http-transportmod";
"testsmod" -> "http-transportmod";
+"mtargs" -> "http-transportmod";
+"mtconfigf" -> "http-transportmod";
// processmod.scm
+"sqlite3" -> "processmod";
// mtmod.scm
+"sqlite3" -> "mtmod";
+"mtargs" -> "mtmod";
+"mtconfigf" -> "mtmod";
"commonmod" -> "mtmod";
"dbmod" -> "mtmod";
"pgdbmod" -> "mtmod";
"rmtmod" -> "mtmod";
"servermod" -> "mtmod";
@@ -89,58 +112,91 @@
"stml2" -> "mtmod";
"subrunmod" -> "mtmod";
"tasksmod" -> "mtmod";
"testsmod" -> "mtmod";
// pgdbmod.scm
+"mtconfigf" -> "pgdbmod";
+"mtargs" -> "pgdbmod";
"commonmod" -> "pgdbmod";
// runsmod.scm
+"base64" -> "runsmod";
"commonmod" -> "runsmod";
"dbmod" -> "runsmod";
"itemsmod" -> "runsmod";
"mtmod" -> "runsmod";
"pgdbmod" -> "runsmod";
+"mtargs" -> "runsmod";
+"mtconfigf" -> "runsmod";
"rmtmod" -> "runsmod";
"servermod" -> "runsmod";
"stml2" -> "runsmod";
"subrunmod" -> "runsmod";
"tasksmod" -> "runsmod";
"testsmod" -> "runsmod";
// apimod.scm
+"sqlite3" -> "apimod";
"commonmod" -> "apimod";
"dbmod" -> "apimod";
"servermod" -> "apimod";
// dbmod.scm
+"sqlite3" -> "dbmod";
"commonmod" -> "dbmod";
"keysmod" -> "dbmod";
"files" -> "dbmod";
"tasksmod" -> "dbmod";
"odsmod" -> "dbmod";
+"mtargs" -> "dbmod";
+"mtconfigf" -> "dbmod";
// dcommonmod.scm
+"sqlite3" -> "dcommonmod";
+"mtconfigf" -> "dcommonmod";
"gutilsmod" -> "dcommonmod";
"commonmod" -> "dcommonmod";
+"servermod" -> "dcommonmod";
"testsmod" -> "dcommonmod";
"megamod" -> "dcommonmod";
+"subrunmod" -> "dcommonmod";
+"runsmod" -> "dcommonmod";
+"rmtmod" -> "dcommonmod";
+"dbmod" -> "dcommonmod";
"canvas-draw" -> "dcommonmod";
"canvas-draw-iup" -> "dcommonmod";
+"iup" -> "dcommonmod";
+"mtargs" -> "dcommonmod";
// tasksmod.scm
+"sqlite3" -> "tasksmod";
"commonmod" -> "tasksmod";
+"mtconfigf" -> "tasksmod";
"pgdbmod" -> "tasksmod";
// template-mod.scm
+"sqlite3" -> "template-mod";
"commonmod" -> "template-mod";
// servermod.scm
+"sqlite3" -> "servermod";
"commonmod" -> "servermod";
"dbmod" -> "servermod";
"tasksmod" -> "servermod";
+"mtargs" -> "servermod";
+"mtconfigf" -> "servermod";
// treemod.scm
+"iup" -> "treemod";
// commonmod.scm
+"sqlite3" -> "commonmod";
"pkts" -> "commonmod";
+"mtconfigf" -> "commonmod";
+"mtargs" -> "commonmod";
// vgmod.scm
+"sqlite3" -> "vgmod";
// rmtmod.scm
+"sqlite3" -> "rmtmod";
+"ulex" -> "rmtmod";
"commonmod" -> "rmtmod";
"itemsmod" -> "rmtmod";
"apimod" -> "rmtmod";
"dbmod" -> "rmtmod";
}
-
+
+{ rank=same; eggs; sqlite3, spiffy, files, base64, iup, "canvas-draw" }
+
}
ADDED docs/code/postamble.dot
Index: docs/code/postamble.dot
==================================================================
--- /dev/null
+++ docs/code/postamble.dot
@@ -0,0 +1,7 @@
+
+}
+
+{ rank=same; eggs; sqlite3, spiffy, files, base64, iup, "canvas-draw" }
+
+}
+
ADDED docs/code/preamble.dot
Index: docs/code/preamble.dot
==================================================================
--- /dev/null
+++ docs/code/preamble.dot
@@ -0,0 +1,27 @@
+// Copyright 2006-2017, Matthew Welland.
+//
+// This file is part of Megatest.
+//
+// Megatest is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// Megatest is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with Megatest. If not, see .
+//
+digraph megatest_code_hierarchy {
+ ranksep=0.05;
+ // rankdir=LR
+
+ node [shape=box,style=filled,fontname="clear",fontsize="10"];
+
+ subgraph cluster_megatest {
+ label="megatest";
+
+ rmtmod [label="rmt mod"];
Index: megamod.scm
==================================================================
--- megamod.scm
+++ megamod.scm
@@ -47,11 +47,11 @@
(import scheme chicken data-structures extras)
(use
(prefix base64 base64:)
(prefix dbi dbi:)
- (prefix nanomsg nmsg:)
+;; (prefix nanomsg nmsg:)
(prefix sqlite3 sqlite3:)
call-with-environment-variables
csv
csv-xml
data-structures
Index: megatest.scm
==================================================================
--- megatest.scm
+++ megatest.scm
@@ -20,13 +20,13 @@
;; (include "megatest-version.scm")
;; fake out readline usage of toplevel-command
(define (toplevel-command . a) #f)
-(use (prefix sqlite3 sqlite3:) srfi-1 posix regex regex-case srfi-69 (prefix base64 base64:)
- readline apropos json http-client directory-utils typed-records
- http-client srfi-18 extras format)
+(use (prefix sqlite3 sqlite3:) srfi-1 posix regex regex-case
+ srfi-69 (prefix base64 base64:) readline apropos json http-client
+ directory-utils typed-records http-client srfi-18 extras format tcp6)
;; Added for csv stuff - will be removed
;;
(use sparse-vectors)
Index: rmtmod.scm
==================================================================
--- rmtmod.scm
+++ rmtmod.scm
@@ -37,12 +37,10 @@
(import commonmod)
(import itemsmod)
(import apimod)
(import dbmod)
-;; (include "rmt-inc.scm")
-
;;
;; THESE ARE ALL CALLED ON THE CLIENT SIDE!!!
;;
;; generate entries for ~/.megatestrc with the following
@@ -72,14 +70,44 @@
cinfo
(if (server:check-if-running areapath)
(client:setup areapath)
#f))))
-(define *send-receive-mutex* (make-mutex)) ;; should have separate mutex per run-id
+;; return the handle struct for sending queries to a specific database
+;; - initializes the connection object if this is the first access
+;; - finds the "captain" and asks who to talk to for the given dbfname
+;; - establishes the connection to the current dbowner
+;;
+(define (rmt:connect alldat dbfname dbtype)
+ (let* ((ulexdat (or (alldat-ulexdat alldat)
+ (rmt:setup-ulex alldat))))
+ (ulex:connect ulexdat dbfname dbtype)))
+;; setup the remote calls
+(define (rmt:setup-ulex alldat)
+ (let* ((udata (ulex:setup))) ;; establish connection to ulex
+ (alldat-ulexdat-set! alldat udata)
+ ;; register all needed procs
+ (ulex:register-handler udata 'ping common:get-full-version) ;; override ping with get-full-version
+ (ulex:register-handler udata 'login common:get-full-version) ;; force setup of the connection
+ udata))
+
+;; set up a connection to the current owner of the dbfile associated with rid
+;; then send the query to that dbfile owner and wait for a response.
+;;
(define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f)) ;; start attemptnum at 1 so the modulo below works as expected
- (rmt:open-qry-close-locally cmd 0 params))
+ (let* ((alldat *alldat*)
+ (areapath (alldat-areapath alldat))
+ (dbtype (if (or (not rid)(< rid 1)) ;; this is the criteria for "main.db"
+ 'main 'runs))
+ (dbfname (if (eq? dbtype 'main)
+ "main.db"
+ (conc rid ".db")))
+ (dbfile (conc areapath "/.db/" dbfname))
+ (ulexconn (rmt:connect alldat dbfname dbtype)))
+ (rmt:open-qry-close-locally cmd 0 params)))
+
;;
;; ;; #;(common:telemetry-log (conc "rmt:"(->string cmd))
;; ;; #;(define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f)) ;; start attemptnum at 1 so the modulo below works as expected
;; ;;
;; ;; #;(common:telemetry-log (conc "rmt:"(->string cmd))
Index: ulex/ulex.scm
==================================================================
--- ulex/ulex.scm
+++ ulex/ulex.scm
@@ -23,1409 +23,114 @@
;; NOTES:
;; Why sql-de-lite and not say, dbi? - performance mostly, then simplicity.
;;
;;======================================================================
-;; (use rpc pkts mailbox sqlite3)
-
+(use mailbox)
+
(module ulex
*
- ;; #;(
- ;; ;; areas
- ;; make-area
- ;; area->alist
- ;; ;; server
- ;; launch
- ;; update-known-servers
- ;; shutdown
- ;; get-best-server
- ;; ;; client side
- ;; call
- ;; ;; queries, procs and system commands (i.e. workers)
- ;; register
- ;; register-batch
- ;; ;; database - note: most database stuff need not be exposed, these calls may be removed from exports in future
- ;; open-db
- ;; ;; ports
- ;; pl-find-port
- ;; pl-get-prev-used-port
- ;; pl-open-db
- ;; pl-open-run-close
- ;; pl-release-port
- ;; pl-set-port
- ;; pl-take-port
- ;; pl-is-port-available
- ;; pl-get-port-state
- ;; ;; system
- ;; get-normalized-cpu-load
-
- ;; )
(import scheme posix chicken data-structures ports extras files mailbox)
-(import rpc srfi-18 pkts matchable regex
+(import srfi-18 pkts matchable regex
typed-records srfi-69 srfi-1
srfi-4 regex-case
(prefix sqlite3 sqlite3:)
foreign
- tcp) ;; ulex-netutil)
-
-;;======================================================================
-;; D E B U G H E L P E R S
-;;======================================================================
-
-(define (dbg> . args)
- (with-output-to-port (current-error-port)
- (lambda ()
- (apply print "dbg> " args))))
-
-(define (debug-pp . args)
- (if (get-environment-variable "ULEX_DEBUG")
- (with-output-to-port (current-error-port)
- (lambda ()
- (apply pp args)))))
-
-(define *default-debug-port* (current-error-port))
-
-(define (sdbg> fn stage-name stage-start stage-end start-time . message)
- (if (get-environment-variable "ULEX_DEBUG")
- (with-output-to-port *default-debug-port*
- (lambda ()
- (apply print "ulex:" fn " " stage-name " took " (- (if stage-end stage-end (current-milliseconds)) stage-start) " ms. "
- (if start-time
- (conc "total time " (- (current-milliseconds) start-time)
- " ms.")
- "")
- message
- )))))
-
-;;======================================================================
-;; M A C R O S
-;;======================================================================
-;; iup callbacks are not dumping the stack, this is a work-around
-;;
-
-;; Some of these routines use:
-;;
-;; http://www.cs.toronto.edu/~gfb/scheme/simple-macros.html
-;;
-;; Syntax for defining macros in a simple style similar to function definiton,
-;; when there is a single pattern for the argument list and there are no keywords.
-;;
-;; (define-simple-syntax (name arg ...) body ...)
-;;
-
-(define-syntax define-simple-syntax
- (syntax-rules ()
- ((_ (name arg ...) body ...)
- (define-syntax name (syntax-rules () ((name arg ...) (begin body ...)))))))
-
-(define-simple-syntax (catch-and-dump proc procname)
- (handle-exceptions
- exn
- (begin
- (print-call-chain (current-error-port))
- (with-output-to-port (current-error-port)
- (lambda ()
- (print ((condition-property-accessor 'exn 'message) exn))
- (print "Callback error in " procname)
- (print "Full condition info:\n" (condition->list exn)))))
- (proc)))
-
-
-;;======================================================================
-;; R E C O R D S
-;;======================================================================
-
-;; information about me as a server
-;;
-(defstruct area
- ;; about this area
- (useportlogger #f)
- (lowport 32768)
- (server-type 'auto) ;; auto=create up to five servers/pkts, main=create pkts, passive=no pkt (unless there are no pkts at all)
- (conn #f)
- (port #f)
- (myaddr (get-my-best-address))
- pktid ;; get pkt from hosts table if needed
- pktfile
- pktsdir
- dbdir
- (dbhandles (make-hash-table)) ;; fname => list-of-dbh, NOTE: Should really never need more than one?
- (mutex (make-mutex))
- (rtable (make-hash-table)) ;; registration table of available actions
- (dbs (make-hash-table)) ;; filename => random number, used for choosing what dbs I serve
- ;; about other servers
- (hosts (make-hash-table)) ;; key => hostdat
- (hoststats (make-hash-table)) ;; key => alist of fname => ( qcount . qtime )
- (reqs (make-hash-table)) ;; uri => queue
- ;; work queues
- (wqueues (make-hash-table)) ;; fname => qdat
- (stats (make-hash-table)) ;; fname => totalqueries
- (last-srvup (current-seconds)) ;; last time we updated the known servers
- (cookie2mbox (make-hash-table)) ;; map cookie for outstanding request to mailbox of awaiting call
- (ready #f)
- (health (make-hash-table)) ;; ipaddr:port => num failed pings since last good ping
- )
-
-;; host stats
-;;
-(defstruct hostdat
- (pkt #f)
- (dbload (make-hash-table)) ;; "dbfile.db" => queries/min
- (hostload #f) ;; normalized load ( 5min load / numcpus )
- )
-
-;; dbdat
-;;
-(defstruct dbdat
- (dbh #f)
- (fname #f)
- (write-access #f)
- (sths (make-hash-table)) ;; hash mapping query strings to handles
- )
-
-;; qdat
-;;
-(defstruct qdat
- (writeq (make-queue))
- (readq (make-queue))
- (rwq (make-queue))
- (logq (make-queue)) ;; do we need a queue for logging? yes, if we use sqlite3 db for logging
- (osshort (make-queue))
- (oslong (make-queue))
- (misc (make-queue)) ;; used for things like ping-full
- )
-
-;; calldat
-;;
-(defstruct calldat
- (ctype 'dbwrite)
- (obj #f) ;; this would normally be an SQL statement e.g. SELECT, INSERT etc.
- (rtime (current-milliseconds)))
-
-;; make it a global? Well, it is local to area module
-
-(define *pktspec*
- `((server (hostname . h)
- (port . p)
- (pid . i)
- (ipaddr . a)
- )
- (data (hostname . h) ;; sender hostname
- (port . p) ;; sender port
- (ipaddr . a) ;; sender ip
- (hostkey . k) ;; sending host key - store info at server under this key
- (servkey . s) ;; server key - this needs to match at server end or reject the msg
- (format . f) ;; sb=serialized-base64, t=text, sx=sexpr, j=json
- (data . d) ;; base64 encoded slln data
- )))
-
-;; work item
-;;
-(defstruct witem
- (rhost #f) ;; return host
- (ripaddr #f) ;; return ipaddr
- (rport #f) ;; return port
- (servkey #f) ;; the packet representing the client of this workitem, used by final send-message
- (rdat #f) ;; the request - usually an sql query, type is rdat
- (action #f) ;; the action: immediate, dbwrite, dbread,oslong, osshort
- (cookie #f) ;; cookie id for response
- (data #f) ;; the data payload, i.e. parameters
- (result #f) ;; the result from processing the data
- (caller #f)) ;; the calling peer according to rpc itself
-
-(define (trim-pktid pktid)
- (if (string? pktid)
- (substring pktid 0 4)
- "nopkt"))
-
-(define (any->number num)
- (cond
- ((number? num) num)
- ((string? num) (string->number num))
- (else num)))
-
-(use trace)
-(trace-call-sites #t)
-
-;;======================================================================
-;; D A T A B A S E H A N D L I N G
-;;======================================================================
-
-;; look in dbhandles for a db, return it, else return #f
-;;
-(define (get-dbh acfg fname)
- (let ((dbh-lst (hash-table-ref/default (area-dbhandles acfg) fname '())))
- (if (null? dbh-lst)
- (begin
- ;; (print "opening db for " fname)
- (open-db acfg fname)) ;; Note that the handles get put back in the queue in the save-dbh calls
- (let ((rem-lst (cdr dbh-lst)))
- ;; (print "re-using saved connection for " fname)
- (hash-table-set! (area-dbhandles acfg) fname rem-lst)
- (car dbh-lst)))))
-
-(define (save-dbh acfg fname dbdat)
- ;; (print "saving dbh for " fname)
- (hash-table-set! (area-dbhandles acfg) fname (cons dbdat (hash-table-ref/default (area-dbhandles acfg) fname '()))))
-
-;; open the database, if never before opened init it. put the handle in the
-;; open db's hash table
-;; returns: the dbdat
-;;
-(define (open-db acfg fname)
- (let* ((fullname (conc (area-dbdir acfg) "/" fname))
- (exists (file-exists? fullname))
- (write-access (if exists
- (file-write-access? fullname)
- (file-write-access? (area-dbdir acfg))))
- (db (sqlite3:open-database fullname))
- (handler (sqlite3:make-busy-timeout 136000))
- )
- (sqlite3:set-busy-handler! db handler)
- (sqlite3:execute db "PRAGMA synchronous = 0;")
- (if (not exists) ;; need to init the db
- (if write-access
- (let ((isql (get-rsql acfg 'dbinitsql))) ;; get the init sql statements
- ;; (sqlite3:with-transaction
- ;; db
- ;; (lambda ()
- (if isql
- (for-each
- (lambda (sql)
- (sqlite3:execute db sql))
- isql)))
- (print "ERROR: no write access to " (area-dbdir acfg))))
- (make-dbdat dbh: db fname: fname write-access: write-access)))
-
-;; This is a low-level command to retrieve or to prepare, save and return a prepared statment
-;; you must extract the db handle
-;;
-(define (get-sth db cache stmt)
- (if (hash-table-exists? cache stmt)
- (begin
- ;; (print "Reusing cached stmt for " stmt)
- (hash-table-ref/default cache stmt #f))
- (let ((sth (sqlite3:prepare db stmt)))
- (hash-table-set! cache stmt sth)
- ;; (print "prepared stmt for " stmt)
- sth)))
-
-;; a little more expensive but does all the tedious deferencing - only use if you don't already
-;; have dbdat and db sitting around
-;;
-(define (full-get-sth acfg fname stmt)
- (let* ((dbdat (get-dbh acfg fname))
- (db (dbdat-dbh dbdat))
- (sths (dbdat-sths dbdat)))
- (get-sth db sths stmt)))
-
-;; write to a db
-;; acfg: area data
-;; rdat: request data
-;; hdat: (host . port)
-;;
-;; (define (dbwrite acfg rdat hdat data-in)
-;; (let* ((dbname (car data-in))
-;; (dbdat (get-dbh acfg dbname))
-;; (db (dbdat-dbh dbdat))
-;; (sths (dbdat-sths dbdat))
-;; (stmt (calldat-obj rdat))
-;; (sth (get-sth db sths stmt))
-;; (data (cdr data-in)))
-;; (print "dbname: " dbname " acfg: " acfg " rdat: " (calldat->alist rdat) " hdat: " hdat " data: " data)
-;; (print "dbdat: " (dbdat->alist dbdat))
-;; (apply sqlite3:execute sth data)
-;; (save-dbh acfg dbname dbdat)
-;; #t
-;; ))
-
-(define (finalize-all-db-handles acfg)
- (let* ((dbhandles (area-dbhandles acfg)) ;; dbhandles is hash of fname ==> dbdat
- (num 0))
- (for-each
- (lambda (area-name)
- (print "Closing handles for " area-name)
- (let ((dbdats (hash-table-ref/default dbhandles area-name '())))
- (for-each
- (lambda (dbdat)
- ;; first close all statement handles
- (for-each
- (lambda (sth)
- (sqlite3:finalize! sth)
- (set! num (+ num 1)))
- (hash-table-values (dbdat-sths dbdat)))
- ;; now close the dbh
- (set! num (+ num 1))
- (sqlite3:finalize! (dbdat-dbh dbdat)))
- dbdats)))
- (hash-table-keys dbhandles))
- (print "FINALIZED " num " dbhandles")))
-
-;;======================================================================
-;; W O R K Q U E U E H A N D L I N G
-;;======================================================================
-
-(define (register-db-as-mine acfg dbname)
- (let ((ht (area-dbs acfg)))
- (if (not (hash-table-ref/default ht dbname #f))
- (hash-table-set! ht dbname (random 10000)))))
-
-(define (work-queue-add acfg fname witem)
- (let* ((work-queue-start (current-milliseconds))
- (action (witem-action witem)) ;; NB the action is the index into the rdat actions
- (qdat (or (hash-table-ref/default (area-wqueues acfg) fname #f)
- (let ((newqdat (make-qdat)))
- (hash-table-set! (area-wqueues acfg) fname newqdat)
- newqdat)))
- (rdat (hash-table-ref/default (area-rtable acfg) action #f)))
- (if rdat
- (queue-add!
- (case (calldat-ctype rdat)
- ((dbwrite) (register-db-as-mine acfg fname)(qdat-writeq qdat))
- ((dbread) (register-db-as-mine acfg fname)(qdat-readq qdat))
- ((dbrw) (register-db-as-mine acfg fname)(qdat-rwq qdat))
- ((oslong) (qdat-oslong qdat))
- ((osshort) (qdat-osshort qdat))
- ((full-ping) (qdat-misc qdat))
- (else
- (print "ERROR: no queue for " action ". Adding to dbwrite queue.")
- (qdat-writeq qdat)))
- witem)
- (case action
- ((full-ping)(qdat-misc qdat))
- (else
- (print "ERROR: No action " action " was registered"))))
- (sdbg> "work-queue-add" "queue-add" work-queue-start #f #f)
- #t)) ;; for now, simply return #t to indicate request got to the queue
-
-(define (doqueue acfg q fname dbdat dbh)
- ;; (print "doqueue: " fname)
- (let* ((start-time (current-milliseconds))
- (qlen (queue-length q)))
- (if (> qlen 1)
- (print "Processing queue of length " qlen))
- (let loop ((count 0)
- (responses '()))
- (let ((delta (- (current-milliseconds) start-time)))
- (if (or (queue-empty? q)
- (> delta 400)) ;; stop working on this queue after 400ms have passed
- (list count delta responses) ;; return count, delta and responses list
- (let* ((witem (queue-remove! q))
- (action (witem-action witem))
- (rdat (witem-rdat witem))
- (stmt (calldat-obj rdat))
- (sth (full-get-sth acfg fname stmt))
- (ctype (calldat-ctype rdat))
- (data (witem-data witem))
- (cookie (witem-cookie witem)))
- ;; do the processing and save the result in witem-result
- (witem-result-set!
- witem
- (case ctype ;; action
- ((noblockwrite) ;; blind write, no ack of success returned
- (apply sqlite3:execute sth data)
- (sqlite3:last-insert-rowid dbh))
- ((dbwrite) ;; blocking write
- (apply sqlite3:execute sth data)
- #t)
- ((dbread) ;; TODO: consider breaking this up and shipping in pieces for large query
- (apply sqlite3:map-row (lambda x x) sth data))
- ((full-ping) 'full-ping)
- (else (print "Not ready for action " action) #f)))
- (loop (add1 count)
- (if cookie
- (cons witem responses)
- responses))))))))
-
-;; do up to 400ms of processing on each queue
-;; - the work-queue-processor will allow the max 1200ms of work to complete but it will flag as overloaded
-;;
-(define (process-db-queries acfg fname)
- (if (hash-table-exists? (area-wqueues acfg) fname)
- (let* ((process-db-queries-start-time (current-milliseconds))
- (qdat (hash-table-ref/default (area-wqueues acfg) fname #f))
- (queue-sym->queue (lambda (queue-sym)
- (case queue-sym ;; lookup the queue from qdat given a name (symbol)
- ((wqueue) (qdat-writeq qdat))
- ((rqueue) (qdat-readq qdat))
- ((rwqueue) (qdat-rwq qdat))
- ((misc) (qdat-misc qdat))
- (else #f))))
- (dbdat (get-dbh acfg fname))
- (dbh (if (dbdat? dbdat)(dbdat-dbh dbdat) #f))
- (nowtime (current-seconds)))
- ;; handle the queues that require a transaction
- ;;
- (map ;;
- (lambda (queue-sym)
- ;; (print "processing queue " queue-sym)
- (let* ((queue (queue-sym->queue queue-sym)))
- (if (not (queue-empty? queue))
- (let ((responses
- (sqlite3:with-transaction ;; todo - catch exceptions...
- dbh
- (lambda ()
- (let* ((res (doqueue acfg queue fname dbdat dbh))) ;; this does the work!
- ;; (print "res=" res)
- (match res
- ((count delta responses)
- (update-stats acfg fname queue-sym delta count)
- (sdbg> "process-db-queries" "sqlite3-transaction" process-db-queries-start-time #f #f)
- responses) ;; return responses
- (else
- (print "ERROR: bad return data from doqueue " res)))
- )))))
- ;; having completed the transaction, send the responses.
- ;; (print "INFO: sending " (length responses) " responses.")
- (let loop ((responses-left responses))
- (cond
- ((null? responses-left) #t)
- (else
- (let* ((witem (car responses-left))
- (response (cdr responses-left)))
- (call-deliver-response acfg (witem-ripaddr witem)(witem-rport witem)
- (witem-cookie witem)(witem-result witem)))
- (loop (cdr responses-left))))))
- )))
- '(wqueue rwqueue rqueue))
-
- ;; handle misc queue
- ;;
- ;; (print "processing misc queue")
- (let ((queue (queue-sym->queue 'misc)))
- (doqueue acfg queue fname dbdat dbh))
- ;; ....
- (save-dbh acfg fname dbdat)
- #t ;; just to let the tests know we got here
- )
- #f ;; nothing processed
- ))
-
-;; run all queues in parallel per db but sequentially per queue for that db.
-;; - process the queues every 500 or so ms
-;; - allow for long running queries to continue but all other activities for that
-;; db will be blocked.
-;;
-(define (work-queue-processor acfg)
- (let* ((threads (make-hash-table))) ;; fname => thread
- (let loop ((fnames (hash-table-keys (area-wqueues acfg)))
- (target-time (+ (current-milliseconds) 50)))
- ;;(if (not (null? fnames))(print "Processing for these databases: " fnames))
- (for-each
- (lambda (fname)
- ;; (print "processing for " fname)
- ;;(process-db-queries acfg fname))
- (let ((th (hash-table-ref/default threads fname #f)))
- (if (and th (not (member (thread-state th) '(dead terminated))))
- (begin
- (print "WARNING: worker thread for " fname " is taking a long time.")
- (print "Thread is in state " (thread-state th)))
- (let ((th1 (make-thread (lambda ()
- ;; (catch-and-dump
- ;; (lambda ()
- ;; (print "Process queries for " fname)
- (let ((start-time (current-milliseconds)))
- (process-db-queries acfg fname)
- ;; (thread-sleep! 0.01) ;; need the thread to take at least some time
- (hash-table-delete! threads fname)) ;; no mutexes?
- fname)
- "th1"))) ;; ))
- (hash-table-set! threads fname th1)
- (thread-start! th1)))))
- fnames)
- ;; (thread-sleep! 0.1) ;; give the threads some time to process requests
- ;; burn time until 400ms is up
- (let ((now-time (current-milliseconds)))
- (if (< now-time target-time)
- (let ((delta (- target-time now-time)))
- (thread-sleep! (/ delta 1000)))))
- (loop (hash-table-keys (area-wqueues acfg))
- (+ (current-milliseconds) 50)))))
-
-;;======================================================================
-;; S T A T S G A T H E R I N G
-;;======================================================================
-
-(defstruct stat
- (qcount-avg 0) ;; coarse running average
- (qtime-avg 0) ;; coarse running average
- (qcount 0) ;; total
- (qtime 0) ;; total
- (last-qcount 0) ;; last
- (last-qtime 0) ;; last
- (dbs '()) ;; list of db files handled by this node
- (when 0)) ;; when the last query happened - seconds
-
-
-(define (update-stats acfg fname bucket duration numqueries)
- (let* ((key fname) ;; for now do not use bucket. Was: (conc fname "-" bucket)) ;; lazy but good enough
- (stats (or (hash-table-ref/default (area-stats acfg) key #f)
- (let ((newstats (make-stat)))
- (hash-table-set! (area-stats acfg) key newstats)
- newstats))))
- ;; when the last query happended (used to remove the fname from the active list)
- (stat-when-set! stats (current-seconds))
- ;; last values
- (stat-last-qcount-set! stats numqueries)
- (stat-last-qtime-set! stats duration)
- ;; total over process lifetime
- (stat-qcount-set! stats (+ (stat-qcount stats) numqueries))
- (stat-qtime-set! stats (+ (stat-qtime stats) duration))
- ;; coarse average
- (stat-qcount-avg-set! stats (/ (+ (stat-qcount-avg stats) numqueries) 2))
- (stat-qtime-avg-set! stats (/ (+ (stat-qtime-avg stats) duration) 2))
-
- ;; here is where we add the stats for a given dbfile
- (if (not (member fname (stat-dbs stats)))
- (stat-dbs-set! stats (cons fname (stat-dbs stats))))
-
- ))
-
-;;======================================================================
-;; S E R V E R S T U F F
-;;======================================================================
-
-;; this does NOT return!
-;;
-(define (find-free-port-and-open acfg)
- (let ((port (or (area-port acfg) 3200)))
- (handle-exceptions
- exn
- (begin
- (print "INFO: cannot bind to port " (rpc:default-server-port) ", trying next port")
- (area-port-set! acfg (+ port 1))
- (find-free-port-and-open acfg))
- (rpc:default-server-port port)
- (area-port-set! acfg port)
- (tcp-read-timeout 120000)
- ;; ((rpc:make-server (tcp-listen port)) #t)
- (tcp-listen (rpc:default-server-port)
- ))))
-
-;; register this node by putting a packet into the pkts dir.
-;; look for other servers
-;; contact other servers and compile list of servers
-;; there are two types of server
-;; main servers - dashboards, runners and dedicated servers - need pkt
-;; passive servers - test executers, step calls, list-runs - no pkt
-;;
-(define (register-node acfg hostip port-num)
- ;;(mutex-lock! (area-mutex acfg))
- (let* ((server-type (area-server-type acfg)) ;; auto, main, passive (no pkt created)
- (best-ip (or hostip (get-my-best-address)))
- (mtdir (area-dbdir acfg))
- (pktdir (area-pktsdir acfg))) ;; conc mtdir "/.server-pkts")))
- (print "Registering node " best-ip ":" port-num)
- (if (not mtdir) ;; require a home for this node to put or find databases
- #f
- (begin
- (if (not (directory? pktdir))(create-directory pktdir))
- ;; server is started, now create pkt if needed
- (print "Starting server in " server-type " mode with port " port-num)
- (if (member server-type '(auto main)) ;; TODO: if auto, count number of servers registers, if > 3 then don't put out a pkt
- (begin
- (area-pktid-set! acfg
- (write-alist->pkt
- pktdir
- `((hostname . ,(get-host-name))
- (ipaddr . ,best-ip)
- (port . ,port-num)
- (pid . ,(current-process-id)))
- pktspec: *pktspec*
- ptype: 'server))
- (area-pktfile-set! acfg (conc pktdir "/" (area-pktid acfg) ".pkt"))))
- (area-port-set! acfg port-num)
- #;(mutex-unlock! (area-mutex acfg))))))
-
-(define *cookie-seqnum* 0)
-(define (make-cookie key)
- (set! *cookie-seqnum* (add1 *cookie-seqnum*))
- ;;(print "MAKE COOKIE CALLED -- on "servkey"-"*cookie-seqnum*)
- (conc key "-" *cookie-seqnum*)
- )
-
-;; dispatch locally if possible
-;;
-(define (call-deliver-response acfg ipaddr port cookie data)
- (if (and (equal? (area-myaddr acfg) ipaddr)
- (equal? (area-port acfg) port))
- (deliver-response acfg cookie data)
- ((rpc:procedure 'response ipaddr port) cookie data)))
-
-(define (deliver-response acfg cookie data)
- (let ((deliver-response-start (current-milliseconds)))
- (thread-start! (make-thread
- (lambda ()
- (let loop ((tries-left 5))
- ;;(print "TOP OF DELIVER_RESPONSE LOOP; triesleft="tries-left)
- ;;(pp (hash-table->alist (area-cookie2mbox acfg)))
- (let* ((mbox (hash-table-ref/default (area-cookie2mbox acfg) cookie #f)))
- (cond
- ((eq? 0 tries-left)
- (print "ulex:deliver-response: I give up. Mailbox never appeared. cookie="cookie)
- )
- (mbox
- ;;(print "got mbox="mbox" got data="data" send.")
- (mailbox-send! mbox data))
- (else
- ;;(print "no mbox yet. look for "cookie)
- (thread-sleep! (/ (- 6 tries-left) 10))
- (loop (sub1 tries-left))))))
- ;; (debug-pp (list (conc "ulex:deliver-response took " (- (current-milliseconds) deliver-response-start) " ms, cookie=" cookie " data=") data))
- (sdbg> "deliver-response" "mailbox-send" deliver-response-start #f #f cookie)
- )
- (conc "deliver-response thread for cookie="cookie))))
- #t)
-
-;; action:
-;; immediate - quick actions, no need to put in queues
-;; dbwrite - put in dbwrite queue
-;; dbread - put in dbread queue
-;; oslong - os actions, e.g. du, that could take a long time
-;; osshort - os actions that should be quick, e.g. df
-;;
-(define (request acfg from-ipaddr from-port servkey action cookie fname params) ;; std-peer-handler
- ;; NOTE: Use rpc:current-peer for getting return address
- (let* ((std-peer-handler-start (current-milliseconds))
- ;; (raw-data (alist-ref 'data dat))
- (rdat (hash-table-ref/default
- (area-rtable acfg) action #f)) ;; this looks up the sql query or other details indexed by the action
- (witem (make-witem ripaddr: from-ipaddr ;; rhost: from-host
- rport: from-port action: action
- rdat: rdat cookie: cookie
- servkey: servkey data: params ;; TODO - rename data to params
- caller: (rpc:current-peer))))
- (if (not (equal? servkey (area-pktid acfg)))
- `(#f . ,(conc "I don't know you servkey=" servkey ", pktid=" (area-pktid acfg))) ;; immediately return this
- (let* ((ctype (if rdat
- (calldat-ctype rdat) ;; is this necessary? these should be identical
- action)))
- (sdbg> "std-peer-handler" "immediate" std-peer-handler-start #f #f)
- (case ctype
- ;; (dbwrite acfg rdat (cons from-ipaddr from-port) data)))
- ((full-ping) `(#t "ack to full ping" ,(work-queue-add acfg fname witem) ,cookie))
- ((response) `(#t "ack from requestor" ,(deliver-response acfg fname params)))
- ((dbwrite) `(#t "db write submitted" ,(work-queue-add acfg fname witem) ,cookie))
- ((dbread) `(#t "db read submitted" ,(work-queue-add acfg fname witem) ,cookie ))
- ((dbrw) `(#t "db read/write submitted" ,cookie))
- ((osshort) `(#t "os short submitted" ,cookie))
- ((oslong) `(#t "os long submitted" ,cookie))
- (else `(#f "unrecognised action" ,ctype)))))))
-
-;; Call this to start the actual server
-;;
-;; start_server
-;;
-;; mode: '
-;; handler: proc which takes pktrecieved as argument
-;;
-
-(define (start-server acfg)
- (let* ((conn (find-free-port-and-open acfg))
- (port (area-port acfg)))
- (rpc:publish-procedure!
- 'delist-db
- (lambda (fname)
- (hash-table-delete! (area-dbs acfg) fname)))
- (rpc:publish-procedure!
- 'calling-addr
- (lambda ()
- (rpc:current-peer)))
- (rpc:publish-procedure!
- 'ping
- (lambda ()(real-ping acfg)))
- (rpc:publish-procedure!
- 'request
- (lambda (from-addr from-port servkey action cookie dbname params)
- (request acfg from-addr from-port servkey action cookie dbname params)))
- (rpc:publish-procedure!
- 'response
- (lambda (cookie res-dat)
- (deliver-response acfg cookie res-dat)))
- (area-ready-set! acfg #t)
- (area-conn-set! acfg conn)
- ((rpc:make-server conn) #f)));; ((tcp-listen (rpc:default-server-port)) #t)
-
-
-(define (launch acfg) ;; #!optional (proc std-peer-handler))
- (print "starting launch")
- (update-known-servers acfg) ;; gotta do this on every start (thus why limit number of publicised servers)
- #;(let ((original-handler (current-exception-handler))) ;; is th
- (lambda (exception)
- (server-exit-procedure)
- (original-handler exception)))
- (on-exit (lambda ()
- (shutdown acfg))) ;; (finalize-all-db-handles acfg)))
- ;; set up the rpc handler
- (let* ((th1 (make-thread
- (lambda ()(start-server acfg))
- "server thread"))
- (th2 (make-thread
- (lambda ()
- (print "th2 starting")
- (let loop ()
- (work-queue-processor acfg)
- (print "work-queue-processor crashed!")
- (loop)))
- "work queue thread")))
- (thread-start! th1)
- (thread-start! th2)
- (let loop ()
- (thread-sleep! 0.025)
- (if (area-ready acfg)
- #t
- (loop)))
- ;; attempt to fix my address
- (let* ((all-addr (get-all-ips-sorted))) ;; could use (tcp-addresses conn)?
- (let loop ((rem-addrs all-addr))
- (if (null? rem-addrs)
- (begin
- (print "ERROR: Failed to figure out the ip address of myself as a server. Giving up.")
- (exit 1)) ;; BUG Changeme to raising an exception
-
- (let* ((addr (car rem-addrs))
- (good-addr (handle-exceptions
- exn
- #f
- ((rpc:procedure 'calling-addr addr (area-port acfg))))))
- (if good-addr
- (begin
- (print "Got good-addr of " good-addr)
- (area-myaddr-set! acfg good-addr))
- (loop (cdr rem-addrs)))))))
- (register-node acfg (area-myaddr acfg)(area-port acfg))
- (print "INFO: Server started on " (area-myaddr acfg) ":" (area-port acfg))
- ;; (update-known-servers acfg) ;; gotta do this on every start (thus why limit number of publicised servers)
- ))
-
-(define (clear-server-pkt acfg)
- (let ((pktf (area-pktfile acfg)))
- (if pktf (delete-file* pktf))))
-
-(define (shutdown acfg)
- (let (;;(conn (area-conn acfg))
- (pktf (area-pktfile acfg))
- (port (area-port acfg)))
- (if pktf (delete-file* pktf))
- (send-all "imshuttingdown")
- ;; (rpc:close-all-connections!) ;; don't know if this is actually needed
- (finalize-all-db-handles acfg)))
-
-(define (send-all msg)
- #f)
-
-;; given a area record look up all the packets
-;;
-(define (get-all-server-pkts acfg)
- (let ((all-pkt-files (glob (conc (area-pktsdir acfg) "/*.pkt"))))
- (map (lambda (pkt-file)
- (read-pkt->alist pkt-file pktspec: *pktspec*))
- all-pkt-files)))
-
-#;((Z . "9a0212302295a19610d5796fce0370fa130758e9")
- (port . "34827")
- (pid . "28748")
- (hostname . "zeus")
- (T . "server")
- (D . "1549427032.0"))
-
-#;(define (get-my-best-address)
- (let ((all-my-addresses (get-all-ips))) ;; (vector->list (hostinfo-addresses (hostname->hostinfo (get-host-name))))))
- (cond
- ((null? all-my-addresses)
- (get-host-name)) ;; no interfaces?
- ((eq? (length all-my-addresses) 1)
- (ip->string (car all-my-addresses))) ;; only one to choose from, just go with it
- (else
- (ip->string (car (filter (lambda (x) ;; take any but 127.
- (not (eq? (u8vector-ref x 0) 127)))
- all-my-addresses)))))))
-
-;; whoami? I am my pkt
-;;
-(define (whoami? acfg)
- (hash-table-ref/default (area-hosts acfg)(area-pktid acfg) #f))
-
-;;======================================================================
-;; "Client side" operations
-;;======================================================================
-
-(define (safe-call call-key host port . params)
- (handle-exceptions
- exn
- (begin
- (print "Call " call-key " to " host ":" port " failed")
- #f)
- (apply (rpc:procedure call-key host port) params)))
-
-;; ;; convert to/from string / sexpr
-;;
-;; (define (string->sexpr str)
-;; (if (string? str)
-;; (with-input-from-string str read)
-;; str))
-;;
-;; (define (sexpr->string s)
-;; (with-output-to-string (lambda ()(write s))))
-
-;; is the server alive?
-;;
-(define (ping acfg host port)
- (let* ((myaddr (area-myaddr acfg))
- (myport (area-port acfg))
- (start-time (current-milliseconds))
- (res (if (and (equal? myaddr host)
- (equal? myport port))
- (real-ping acfg)
- ((rpc:procedure 'ping host port)))))
- (cons (- (current-milliseconds) start-time)
- res)))
-
-;; returns ( ipaddr port alist-fname=>randnum )
-(define (real-ping acfg)
- `(,(area-myaddr acfg) ,(area-port acfg) ,(get-host-stats acfg)))
-
-;; is the server alive AND the queues processing?
-;;
-#;(define (full-ping acfg servpkt)
- (let* ((start-time (current-milliseconds))
- (res (send-message acfg servpkt '(full-ping) 'full-ping)))
- (cons (- (current-milliseconds) start-time)
- res))) ;; (equal? res "got ping"))))
-
-
-;; look up all pkts and get the server id (the hash), port, host/ip
-;; store this info in acfg
-;; return the number of responsive servers found
-;;
-;; DO NOT VERIFY THAT THE SERVER IS ALIVE HERE. This is called at times where the current server is not yet alive and cannot ping itself
-;;
-(define (update-known-servers acfg)
- ;; readll all pkts
- ;; foreach pkt; if it isn't me ping the server; if alive, add to hosts hash, else rm the pkt
- (let* ((start-time (current-milliseconds))
- (all-pkts (delete-duplicates
- (append (get-all-server-pkts acfg)
- (hash-table-values (area-hosts acfg)))))
- (hostshash (area-hosts acfg))
- (my-id (area-pktid acfg))
- (pktsdir (area-pktsdir acfg)) ;; needed to remove pkts from non-responsive servers
- (numsrvs 0)
- (delpkt (lambda (pktsdir sid)
- (print "clearing out server " sid)
- (delete-file* (conc pktsdir "/" sid ".pkt"))
- (hash-table-delete! hostshash sid))))
- (area-last-srvup-set! acfg (current-seconds))
- (for-each
- (lambda (servpkt)
- (if (list? servpkt)
- ;; (pp servpkt)
- (let* ((shost (alist-ref 'ipaddr servpkt))
- (sport (any->number (alist-ref 'port servpkt)))
- (res (handle-exceptions
- exn
- (begin
- ;; (print "INFO: bad server on " shost ":" sport)
- #f)
- (ping acfg shost sport)))
- (sid (alist-ref 'Z servpkt)) ;; Z code is our name for the server
- (url (conc shost ":" sport))
- )
- #;(if (or (not res)
- (null? res))
- (begin
- (print "STRANGE: ping of " url " gave " res)))
-
- ;; (print "Got " res " from " shost ":" sport)
- (match res
- ((qduration . payload)
- ;; (print "Server pkt:" (alist-ref 'ipaddr servpkt) ":" (alist-ref 'port servpkt)
- ;; (if payload
- ;; "Success" "Fail"))
- (match payload
- ((host port stats)
- ;; (print "From " host ":" port " got stats: " stats)
- (if (and host port stats)
- (let ((url (conc host ":" port)))
- (hash-table-set! hostshash sid servpkt)
- ;; store based on host:port
- (hash-table-set! (area-hoststats acfg) sid stats))
- (print "missing data from the server, not sure what that means!"))
- (set! numsrvs (+ numsrvs 1)))
- (#f
- (print "Removing pkt " sid " due to #f from server or failed ping")
- (delpkt pktsdir sid))
- (else
- (print "Got ")(pp res)(print " from server ")(pp servpkt) " but response did not match (#f/#t . msg)")))
- (else
- ;; here we delete the pkt - can't reach the server, remove it
- ;; however this logic is inadequate. we should mark the server as checked
- ;; and not good, if it happens a second time - then remove the pkt
- ;; or something similar. I.e. don't be too quick to assume the server is wedged or dead
- ;; could be it is simply too busy to reply
- (let ((bad-pings (hash-table-ref/default (area-health acfg) url 0)))
- (if (> bad-pings 1) ;; two bad pings - remove pkt
- (begin
- (print "INFO: " bad-pings " bad responses from " url ", deleting pkt " sid)
- (delpkt pktsdir sid))
- (begin
- (print "INFO: " bad-pings " bad responses from " shost ":" sport " not deleting pkt yet")
- (hash-table-set! (area-health acfg)
- url
- (+ (hash-table-ref/default (area-health acfg) url 0) 1))
- ))
- ))))
- ;; servpkt is not actually a pkt?
- (begin
- (print "Bad pkt " servpkt))))
- all-pkts)
- (sdbg> "update-known-servers" "end" start-time #f #f " found " numsrvs
- " servers, pkts: " (map (lambda (p)
- (alist-ref 'Z p))
- all-pkts))
- numsrvs))
-
-(defstruct srvstat
- (numfiles 0) ;; number of db files handled by this server - subtract 1 for the db being currently looked at
- (randnum #f) ;; tie breaker number assigned to by the server itself - applies only to the db under consideration
- (pkt #f)) ;; the server pkt
-
-;;(define (srv->srvstat srvpkt)
-
-;; Get the server best for given dbname and key
-;;
-;; NOTE: key is not currently used. The key points to the kind of query, this may be useful for directing read-only queries.
-;;
-(define (get-best-server acfg dbname key)
- (let* (;; (servers (hash-table-values (area-hosts acfg)))
- (servers (area-hosts acfg))
- (skeys (sort (hash-table-keys servers) string>=?)) ;; a stable listing
- (start-time (current-milliseconds))
- (srvstats (make-hash-table)) ;; srvid => srvstat
- (url (conc (area-myaddr acfg) ":" (area-port acfg))))
- ;; (print "scores for " dbname ": " (map (lambda (k)(cons k (calc-server-score acfg dbname k))) skeys))
- (if (null? skeys)
- (if (> (update-known-servers acfg) 0)
- (get-best-server acfg dbname key) ;; some risk of infinite loop here, TODO add try counter
- (begin
- (print "ERROR: no server found!") ;; since this process is also a server this should never happen
- #f))
- (begin
- ;; (print "in get-best-server with skeys=" skeys)
- (if (> (- (current-seconds) (area-last-srvup acfg)) 10)
- (begin
- (update-known-servers acfg)
- (sdbg> "get-best-server" "update-known-servers" start-time #f #f)))
-
- ;; for each server look at the list of dbfiles, total number of dbs being handled
- ;; and the rand number, save the best host
- ;; also do a delist-db for each server dbfile not used
- (let* ((best-server #f)
- (servers-to-delist (make-hash-table)))
- (for-each
- (lambda (srvid)
- (let* ((server (hash-table-ref/default servers srvid #f))
- (stats (hash-table-ref/default (area-hoststats acfg) srvid '(()))))
- ;; (print "stats: " stats)
- (if server
- (let* ((dbweights (car stats))
- (srvload (length (filter (lambda (x)(not (equal? dbname (car x)))) dbweights)))
- (dbrec (alist-ref dbname dbweights equal?)) ;; get the pair with fname . randscore
- (randnum (if dbrec
- dbrec ;; (cdr dbrec)
- 0)))
- (hash-table-set! srvstats srvid (make-srvstat numfiles: srvload randnum: randnum pkt: server))))))
- skeys)
-
- (let* ((sorted (sort (hash-table-values srvstats)
- (lambda (a b)
- (let ((numfiles-a (srvstat-numfiles a))
- (numfiles-b (srvstat-numfiles b))
- (randnum-a (srvstat-randnum a))
- (randnum-b (srvstat-randnum b)))
- (if (< numfiles-a numfiles-b) ;; Note, I don't think adding an offset works here. Goal was only move file handling to a different server if it has 2 less
- #t
- (if (and (equal? numfiles-a numfiles-b)
- (< randnum-a randnum-b))
- #t
- #f))))))
- (best (if (null? sorted)
- (begin
- (print "ERROR: should never be null due to self as server.")
- #f)
- (srvstat-pkt (car sorted)))))
- #;(print "SERVER(" url "): " dbname ": " (map (lambda (srv)
- (let ((p (srvstat-pkt srv)))
- (conc (alist-ref 'ipaddr p) ":" (alist-ref 'port p)
- "(" (srvstat-numfiles srv)","(srvstat-randnum srv)")")))
- sorted))
- best))))))
-
- ;; send out an "I'm about to exit notice to all known servers"
- ;;
-(define (death-imminent acfg)
- '())
-
-;;======================================================================
-;; U L E X - T H E I N T E R E S T I N G S T U F F ! !
-;;======================================================================
-
-;; register a handler
-;; NOTES:
-;; dbinitsql is reserved for a list of sql statements for initializing the db
-;; dbinitfn is reserved for a db init function, if exists called after dbinitsql
-;;
-(define (register acfg key obj #!optional (ctype 'dbwrite))
- (let ((ht (area-rtable acfg)))
- (if (hash-table-exists? ht key)
- (print "WARNING: redefinition of entry " key))
- (hash-table-set! ht key (make-calldat obj: obj ctype: ctype))))
-
-;; usage: register-batch acfg '((key1 . sql1) (key2 . sql2) ... )
-;; NB// obj is often an sql query
-;;
-(define (register-batch acfg ctype data)
- (let ((ht (area-rtable acfg)))
- (map (lambda (dat)
- (hash-table-set! ht (car dat)(make-calldat obj: (cdr dat) ctype: ctype)))
- data)))
-
-(define (initialize-area-calls-from-specfile area specfile)
- (let* ((callspec (with-input-from-file specfile read )))
- (for-each (lambda (group)
- (register-batch
- area
- (car group)
- (cdr group)))
- callspec)))
-
-;; get-rentry
-;;
-(define (get-rentry acfg key)
- (hash-table-ref/default (area-rtable acfg) key #f))
-
-(define (get-rsql acfg key)
- (let ((cdat (get-rentry acfg key)))
- (if cdat
- (calldat-obj cdat)
- #f)))
-
-
-
-;; blocking call:
-;; client server
-;; ------ ------
-;; call()
-;; send-message()
-;; nmsg-send()
-;; nmsg-receive()
-;; nmsg-respond(ack,cookie)
-;; ack, cookie
-;; mbox-thread-wait(cookie)
-;; nmsg-send(client,cookie,result)
-;; nmsg-respond(ack)
-;; return result
-;;
-;; reserved action:
-;; 'immediate
-;; 'dbinitsql
-;;
-(define (call acfg dbname action params #!optional (count 0))
- (let* ((call-start-time (current-milliseconds))
- (srv (get-best-server acfg dbname action))
- (post-get-start-time (current-milliseconds))
- (rdat (hash-table-ref/default (area-rtable acfg) action #f))
- (myid (trim-pktid (area-pktid acfg)))
- (srvid (trim-pktid (alist-ref 'Z srv)))
- (cookie (make-cookie myid)))
- (sdbg> "call" "get-best-server" call-start-time #f call-start-time " from: " myid " to server: " srvid " for " dbname " action: " action " params: " params " rdat: " rdat)
- (print "INFO: call to " (alist-ref 'ipaddr srv) ":" (alist-ref 'port srv) " from " (area-myaddr acfg) ":" (area-port acfg) " for " dbname)
- (if (and srv rdat) ;; need both to dispatch a request
- (let* ((ripaddr (alist-ref 'ipaddr srv))
- (rsrvid (alist-ref 'Z srv))
- (rport (any->number (alist-ref 'port srv)))
- (res-full (if (and (equal? ripaddr (area-myaddr acfg))
- (equal? rport (area-port acfg)))
- (request acfg ripaddr rport (area-pktid acfg) action cookie dbname params)
- (safe-call 'request ripaddr rport
- (area-myaddr acfg)
- (area-port acfg)
- #;(area-pktid acfg)
- rsrvid
- action cookie dbname params))))
- ;; (print "res-full: " res-full)
- (match res-full
- ((response-ok response-msg rem ...)
- (let* ((send-message-time (current-milliseconds))
- ;; (match res-full
- ;; ((response-ok response-msg)
- ;; (response-ok (car res-full))
- ;; (response-msg (cadr res-full)
- )
- ;; (res (take res-full 3))) ;; ctype == action, TODO: converge on one term <<=== what was this? BUG
- ;; (print "ulex:call: send-message took " (- send-message-time post-get-start-time) " ms params=" params)
- (sdbg> "call" "send-message" post-get-start-time #f call-start-time)
- (cond
- ((not response-ok) #f)
- ((member response-msg '("db read submitted" "db write submitted"))
- (let* ((cookie-id (cadddr res-full))
- (mbox (make-mailbox))
- (mbox-time (current-milliseconds)))
- (hash-table-set! (area-cookie2mbox acfg) cookie-id mbox)
- (let* ((mbox-timeout-secs 20)
- (mbox-timeout-result 'MBOX_TIMEOUT)
- (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result))
- (mbox-receive-time (current-milliseconds)))
- (hash-table-delete! (area-cookie2mbox acfg) cookie-id)
- (sdbg> "call" "mailbox-receive" mbox-time #f call-start-time " from: " myid " to server: " srvid " for " dbname)
- ;; (print "ulex:call mailbox-receive took " (- mbox-receive-time mbox-time) "ms params=" params)
- res)))
- (else
- (print "Unhandled response \""response-msg"\"")
- #f))
- ;; depending on what action (i.e. ctype) is we will block here waiting for
- ;; all the data (mechanism to be determined)
- ;;
- ;; if res is a "working on it" then wait
- ;; wait for result
- ;; mailbox thread wait on
-
- ;; if res is a "can't help you" then try a different server
- ;; if res is a "ack" (e.g. for one-shot requests) then return res
- ))
- (else
- (if (< count 10)
- (let* ((url (conc (alist-ref 'ipaddr srv) ":" (alist-ref 'port srv))))
- (thread-sleep! 1)
- (print "ERROR: Bad result from " url ", dbname: " dbname ", action: " action ", params: " params ". Trying again in 1 second.")
- (call acfg dbname action params (+ count 1)))
- (begin
- (error (conc "ERROR: " count " tries, still have improper response res-full=" res-full)))))))
- (begin
- (if (not rdat)
- (print "ERROR: action " action " not registered.")
- (if (< count 10)
- (begin
- (thread-sleep! 1)
- (area-hosts-set! acfg (make-hash-table)) ;; clear out all known hosts
- (print "ERROR: no server found, srv=" srv ", trying again in 1 seconds")
- (call acfg dbname action params (+ count 1)))
- (begin
- (error (conc "ERROR: no server found after 10 tries, srv=" srv ", giving up."))
- #;(error "No server available"))))))))
-
-
-;;======================================================================
-;; U T I L I T I E S
-;;======================================================================
-
-;; get a signature for identifing this process
-;;
-(define (get-process-signature)
- (cons (get-host-name)(current-process-id)))
-
-;;======================================================================
-;; S Y S T E M S T U F F
-;;======================================================================
-
-;; get normalized cpu load by reading from /proc/loadavg and
-;; /proc/cpuinfo return all three values and the number of real cpus
-;; and the number of threads returns alist '((adj-cpu-load
-;; . normalized-proc-load) ... etc. keys: adj-proc-load,
-;; adj-core-load, 1m-load, 5m-load, 15m-load
-;;
-(define (get-normalized-cpu-load)
- (let ((res (get-normalized-cpu-load-raw))
- (default `((adj-proc-load . 2) ;; there is no right answer
- (adj-core-load . 2)
- (1m-load . 2)
- (5m-load . 0) ;; causes a large delta - thus causing default of throttling if stuff goes wrong
- (15m-load . 0)
- (proc . 1)
- (core . 1)
- (phys . 1)
- (error . #t))))
- (cond
- ((and (list? res)
- (> (length res) 2))
- res)
- ((eq? res #f) default) ;; add messages?
- ((eq? res #f) default) ;; this would be the #eof
- (else default))))
-
-(define (get-normalized-cpu-load-raw)
- (let* ((actual-host (get-host-name))) ;; #f is localhost
- (let ((data (append
- (with-input-from-file "/proc/loadavg" read-lines)
- (with-input-from-file "/proc/cpuinfo" read-lines)
- (list "end")))
- (load-rx (regexp "^([\\d\\.]+)\\s+([\\d\\.]+)\\s+([\\d\\.]+)\\s+.*$"))
- (proc-rx (regexp "^processor\\s+:\\s+(\\d+)\\s*$"))
- (core-rx (regexp "^core id\\s+:\\s+(\\d+)\\s*$"))
- (phys-rx (regexp "^physical id\\s+:\\s+(\\d+)\\s*$"))
- (max-num (lambda (p n)(max (string->number p) n))))
- ;; (print "data=" data)
- (if (null? data) ;; something went wrong
- #f
- (let loop ((hed (car data))
- (tal (cdr data))
- (loads #f)
- (proc-num 0) ;; processor includes threads
- (phys-num 0) ;; physical chip on motherboard
- (core-num 0)) ;; core
- ;; (print hed ", " loads ", " proc-num ", " phys-num ", " core-num)
- (if (null? tal) ;; have all our data, calculate normalized load and return result
- (let* ((act-proc (+ proc-num 1))
- (act-phys (+ phys-num 1))
- (act-core (+ core-num 1))
- (adj-proc-load (/ (car loads) act-proc))
- (adj-core-load (/ (car loads) act-core))
- (result
- (append (list (cons 'adj-proc-load adj-proc-load)
- (cons 'adj-core-load adj-core-load))
- (list (cons '1m-load (car loads))
- (cons '5m-load (cadr loads))
- (cons '15m-load (caddr loads)))
- (list (cons 'proc act-proc)
- (cons 'core act-core)
- (cons 'phys act-phys)))))
- result)
- (regex-case
- hed
- (load-rx ( x l1 l5 l15 ) (loop (car tal)(cdr tal)(map string->number (list l1 l5 l15)) proc-num phys-num core-num))
- (proc-rx ( x p ) (loop (car tal)(cdr tal) loads (max-num p proc-num) phys-num core-num))
- (phys-rx ( x p ) (loop (car tal)(cdr tal) loads proc-num (max-num p phys-num) core-num))
- (core-rx ( x c ) (loop (car tal)(cdr tal) loads proc-num phys-num (max-num c core-num)))
- (else
- (begin
- ;; (print "NO MATCH: " hed)
- (loop (car tal)(cdr tal) loads proc-num phys-num core-num))))))))))
-
-(define (get-host-stats acfg)
- (let ((stats-hash (area-stats acfg)))
- ;; use this opportunity to remove references to dbfiles which have not been accessed in a while
- (for-each
- (lambda (dbname)
- (let* ((stats (hash-table-ref stats-hash dbname))
- (last-access (stat-when stats)))
- (if (and (> last-access 0) ;; if zero then there has been no access
- (> (- (current-seconds) last-access) 10)) ;; not used in ten seconds
- (begin
- (print "Removing " dbname " from stats list")
- (hash-table-delete! stats-hash dbname) ;; remove from stats hash
- (stat-dbs-set! stats (hash-table-keys stats))))))
- (hash-table-keys stats-hash))
-
- `(,(hash-table->alist (area-dbs acfg)) ;; dbname => randnum
- ,(map (lambda (dbname) ;; dbname is the db name
- (cons dbname (stat-when (hash-table-ref stats-hash dbname))))
- (hash-table-keys stats-hash))
- (cpuload . ,(get-normalized-cpu-load)))))
- #;(stats . ,(map (lambda (k) ;; create an alist from the stats data
- (cons k (stat->alist (hash-table-ref (area-stats acfg) k))))
- (hash-table-keys (area-stats acfg))))
-
-#;(trace
- ;; assv
- ;; cdr
- ;; caar
- ;; ;; cdr
- ;; call
- ;; finalize-all-db-handles
- ;; get-all-server-pkts
- ;; get-normalized-cpu-load
- ;; get-normalized-cpu-load-raw
- ;; launch
- ;; nmsg-send
- ;; process-db-queries
- ;; receive-message
- ;; std-peer-handler
- ;; update-known-servers
- ;; work-queue-processor
- )
-
-;;======================================================================
-;; netutil
-;; move this back to ulex-netutil.scm someday?
-;;======================================================================
-
-;; #include
-;; #include
-;; #include
-;; #include
-
-(foreign-declare "#include \"sys/types.h\"")
-(foreign-declare "#include \"sys/socket.h\"")
-(foreign-declare "#include \"ifaddrs.h\"")
-(foreign-declare "#include \"arpa/inet.h\"")
-
-;; get IP addresses from ALL interfaces
-(define get-all-ips
- (foreign-safe-lambda* scheme-object ()
- "
-
-// from https://stackoverflow.com/questions/17909401/linux-c-get-default-interfaces-ip-address :
-
-
- C_word lst = C_SCHEME_END_OF_LIST, len, str, *a;
-// struct ifaddrs *ifa, *i;
-// struct sockaddr *sa;
-
- struct ifaddrs * ifAddrStruct = NULL;
- struct ifaddrs * ifa = NULL;
- void * tmpAddrPtr = NULL;
-
- if ( getifaddrs(&ifAddrStruct) != 0)
- C_return(C_SCHEME_FALSE);
-
-// for (i = ifa; i != NULL; i = i->ifa_next) {
- for (ifa = ifAddrStruct; ifa != NULL; ifa = ifa->ifa_next) {
- if (ifa->ifa_addr->sa_family==AF_INET) { // Check it is
- // a valid IPv4 address
- tmpAddrPtr = &((struct sockaddr_in *)ifa->ifa_addr)->sin_addr;
- char addressBuffer[INET_ADDRSTRLEN];
- inet_ntop(AF_INET, tmpAddrPtr, addressBuffer, INET_ADDRSTRLEN);
-// printf(\"%s IP Address %s\\n\", ifa->ifa_name, addressBuffer);
- len = strlen(addressBuffer);
- a = C_alloc(C_SIZEOF_PAIR + C_SIZEOF_STRING(len));
- str = C_string(&a, len, addressBuffer);
- lst = C_a_pair(&a, str, lst);
- }
-
-// else if (ifa->ifa_addr->sa_family==AF_INET6) { // Check it is
-// // a valid IPv6 address
-// tmpAddrPtr = &((struct sockaddr_in6 *)ifa->ifa_addr)->sin6_addr;
-// char addressBuffer[INET6_ADDRSTRLEN];
-// inet_ntop(AF_INET6, tmpAddrPtr, addressBuffer, INET6_ADDRSTRLEN);
-//// printf(\"%s IP Address %s\\n\", ifa->ifa_name, addressBuffer);
-// len = strlen(addressBuffer);
-// a = C_alloc(C_SIZEOF_PAIR + C_SIZEOF_STRING(len));
-// str = C_string(&a, len, addressBuffer);
-// lst = C_a_pair(&a, str, lst);
-// }
-
-// else {
-// printf(\" not an IPv4 address\\n\");
-// }
-
- }
-
- freeifaddrs(ifa);
- C_return(lst);
-
-"))
+ tcp6
+ ;; ulex-netutil
+ hostinfo
+ )
+
+;;======================================================================
+;; KEY FUNCTIONS - THESE ARE TOO BE EXPOSED AND USED
+;;======================================================================
+
+;; connection setup and management functions
+
+;; This is the basic setup command. Must always be
+;; called before connecting to a db using connect.
+;;
+;; find or become the captain
+;; setup and return a ulex object
+;;
+(define (setup)
+ (let* ((udata (make-udat))
+ (cpkts (get-all-captain-pkts udata)) ;; read captain pkts
+ (captn (get-winning-pkt cpkts)))
+ ;; check to see if our own server is started and start one if not
+ (if (not (udat-serv-listener udata))(start-server-find-port udata))
+ (if captn
+ (let* ((port (alist-ref 'port captn))
+ (host (alist-ref 'host captn))
+ (ipaddr (alist-ref 'ipaddr captn))
+ (pid (alist-ref 'pid captn))
+ (Z (alist-ref 'Z captn)))
+ (udat-captain-address-set! udata ipaddr)
+ (udat-captain-host-set! udata host)
+ (udat-captain-port-set! udata port)
+ (udat-captain-pid-set! udata pid)
+ (let-values (((success pingtime)(ping udata (conc ipaddr ":" port))))
+ (if success
+ udata
+ (begin
+ (print "Found unreachable captain at " ipaddr ":" port ", removing pkt")
+ (remove-captain-pkt udata captn)
+ (setup)))))
+ (begin
+ (setup-as-captain udata) ;; this saves the thread to captain-thread and starts the thread
+ (setup)))))
+
+;; connect to a specific dbfile
+(define (connect udata dbfname dbtype)
+ udata)
+
+;; returns: success pingtime
+;;
+;; NOTE: causes the callee to store the info on this host along with the dbs this host currently owns
+;;
+(define (ping udata host-port)
+ (let* ((start (current-milliseconds))
+ (cookie (make-cookie udata))
+ (dbs (udat-my-dbs udata))
+ (msg (string-intersperse dbs " "))
+ (res (send udata host-port 'ping cookie msg retval: #t))
+ (delta (- (current-milliseconds) start)))
+ (values (equal? res cookie) delta)))
+
+;; returns: success pingtime
+;;
+;; NOTE: causes all references to this worker to be wiped out in the callee (ususally the captain)
+;;
+(define (goodbye-ping udata host-port)
+ (let* ((start (current-milliseconds))
+ (cookie (make-cookie udata))
+ (dbs (udat-my-dbs udata))
+ (res (send udata host-port 'goodbye cookie "nomsg" retval: #t))
+ (delta (- (current-milliseconds) start)))
+ (values (equal? res cookie) delta)))
+
+(define (goodbye-captain udata)
+ (let* ((host-port (udat-captain-host-port udata)))
+ (if host-port
+ (goodbye-ping udata host-port)
+ (values #f -1))))
+
+;;======================================================================
+;; network utilities
+;;======================================================================
+
+(define (rate-ip ipaddr)
+ (regex-case ipaddr
+ ( "^127\\..*" _ 0 )
+ ( "^(10\\.0|192\\.168)\\..*" _ 1 )
+ ( else 2 ) ))
;; Change this to bias for addresses with a reasonable broadcast value?
;;
(define (ip-pref-less? a b)
- (let* ((rate (lambda (ipstr)
- (regex-case ipstr
- ( "^127\\." _ 0 )
- ( "^(10\\.0|192\\.168\\.)\\..*" _ 1 )
- ( else 2 ) ))))
- (< (rate a) (rate b))))
+ (> (rate-ip a) (rate-ip b)))
(define (get-my-best-address)
(let ((all-my-addresses (get-all-ips))
;;(all-my-addresses-old (vector->list (hostinfo-addresses (hostname->hostinfo (get-host-name)))))
@@ -1446,7 +151,1872 @@
)))
(define (get-all-ips-sorted)
(sort (get-all-ips) ip-pref-less?))
+(define (get-all-ips)
+ (map ip->string (vector->list
+ (hostinfo-addresses
+ (host-information (current-hostname))))))
+
+;; make it a global? Well, it is local to area module
+
+(define *captain-pktspec*
+ `((captain (host . h)
+ (port . p)
+ (pid . i)
+ (ipaddr . a)
+ )
+ #;(data (hostname . h) ;; sender hostname
+ (port . p) ;; sender port
+ (ipaddr . a) ;; sender ip
+ (hostkey . k) ;; sending host key - store info at server under this key
+ (servkey . s) ;; server key - this needs to match at server end or reject the msg
+ (format . f) ;; sb=serialized-base64, t=text, sx=sexpr, j=json
+ (data . d) ;; base64 encoded slln data
+ )))
+
+;; struct for keeping track of our world
+
+(defstruct udat
+ ;; captain info
+ (captain-address #f)
+ (captain-host #f)
+ (captain-port #f)
+ (captain-pid #f)
+ (ulex-dir (conc (get-environment-variable "HOME") "/.ulex"))
+ (cpkts-dir (conc (get-environment-variable "HOME") "/.ulex/pkts"))
+ (cpkt-spec *captain-pktspec*)
+ ;; this processes info
+ (my-cpkt-key #f) ;; put Z card here when I create a pkt for myself as captain
+ (my-address #f)
+ (my-hostname #f)
+ (my-port #f)
+ (my-pid (current-process-id))
+ (my-dbs '())
+ ;; server and handler thread
+ (serv-listener #f) ;; this processes server info
+ (handler-thread #f)
+ (mboxes (make-hash-table)) ;; key => mbox
+ ;; other servers
+ (peers (make-hash-table)) ;; host-port => peer record
+ (dbowners (make-hash-table)) ;; dbfile => host-port
+ (handlers (make-hash-table)) ;; dbfile => proc
+ (outgoing-conns (make-hash-table)) ;; host:port -> conn
+ (work-queue (make-queue)) ;; most stuff goes here
+ ;; (fast-queue (make-queue)) ;; super quick stuff goes here (e.g. ping)
+ (busy #f) ;; is either of the queues busy, use to switch between queuing tasks or doing immediately
+ ;; app info
+ (appname #f)
+ (dbtypes (make-hash-table)) ;; this should be an alist but hash is easier. dbtype => [ initproc syncproc ]
+ ;; cookies
+ (cnum 0) ;; cookie num
+ )
+
+(define (udat-my-host-port udata)
+ (if (and (udat-my-address udata)(udat-my-port udata))
+ (conc (udat-my-address udata) ":" (udat-my-port udata))
+ #f))
+
+(define (udat-captain-host-port udata)
+ (if (and (udat-captain-address udata)(udat-captain-port udata))
+ (conc (udat-captain-address udata) ":" (udat-captain-port udata))
+ #f))
+
+(define (udat-get-peer udata host-port)
+ (hash-table-ref/default (udat-peers udata) host-port #f))
+
+;; struct for keeping track of others we are talking to
+
+(defstruct peer
+ (addr-port #f)
+ (hostname #f)
+ (pid #f)
+ ;; (inp #f)
+ ;; (oup #f)
+ (dbs '()) ;; list of databases this peer is currently handling
+ )
+
+(defstruct work
+ (peer-dat #f)
+ (handlerkey #f)
+ (qrykey #f)
+ (data #f)
+ (start (current-milliseconds)))
+
+;;======================================================================
+;; Captain functions
+;;======================================================================
+
+;; NB// This needs to be started in a thread
+;;
+;; setup to be a captain
+;; - start server
+;; - create pkt
+;; - start server port handler
+;;
+(define (setup-as-captain udata)
+ (if (start-server-find-port udata) ;; puts the server in udata
+ (if (create-captain-pkt udata)
+ (let* ((th (make-thread (lambda ()
+ (ulex-handler-loop udata)) "Captain handler")))
+ (udat-handler-thread-set! udata th)
+ (thread-start! th))
+ #f)
+ #f))
+
+;; given a pkts dir read
+;;
+(define (get-all-captain-pkts udata)
+ (let* ((pktsdir (let ((d (udat-cpkts-dir udata)))
+ (if (file-exists? d)
+ d
+ (begin
+ (create-directory d #t)
+ d))))
+ (all-pkt-files (glob (conc pktsdir "/*.pkt")))
+ (pkt-spec (udat-cpkt-spec udata)))
+ (map (lambda (pkt-file)
+ (read-pkt->alist pkt-file pktspec: pkt-spec))
+ all-pkt-files)))
+
+;; sort by D then Z, return one, choose the oldest then
+;; differentiate if needed using the Z key
+;;l
+(define (get-winning-pkt pkts)
+ (if (null? pkts)
+ #f
+ (car (sort pkts (lambda (a b)
+ (let ((ad (string->number (alist-ref 'D a)))
+ (bd (string->number (alist-ref 'D b))))
+ (if (eq? a b)
+ (let ((az (alist-ref 'Z a))
+ (bz (alist-ref 'Z b)))
+ (string>=? az bz))
+ (> ad bd))))))))
+
+;; put the host, ip, port and pid into a pkt in
+;; the captain pkts dir
+;; - assumes user has already fired up a server
+;; which will be in the udata struct
+;;
+(define (create-captain-pkt udata)
+ (if (not (udat-serv-listener udata))
+ (begin
+ (print "ERROR: create-captain-pkt called with out a listener")
+ #f)
+ (let* ((pktdat `((port . ,(udat-my-port udata))
+ (host . ,(udat-my-hostname udata))
+ (ipaddr . ,(udat-my-address udata))
+ (pid . ,(udat-my-pid udata))))
+ (pktdir (udat-cpkts-dir udata))
+ (pktspec (udat-cpkt-spec udata))
+ )
+ (udat-my-cpkt-key-set!
+ udata
+ (write-alist->pkt
+ pktdir
+ pktdat
+ pktspec: pktspec
+ ptype: 'captain))
+ (udat-my-cpkt-key udata))))
+
+;; remove pkt associated with captn (the Z key .pkt)
+;;
+(define (remove-captain-pkt udata captn)
+ (let ((Z (alist-ref 'Z captn))
+ (cpktdir (udat-cpkts-dir udata)))
+ (delete-file* (conc cpktdir "/" Z ".pkt"))))
+
+;; call all known peers and tell them to delete their info on the captain
+;; thus forcing them to re-read pkts and connect to a new captain
+;; call this when the captain needs to exit and if an older captain is
+;; detected. Due to delays in sending file meta data in NFS multiple
+;; captains can be initiated in a "Storm of Captains", book soon to be
+;; on Amazon
+;;
+(define (drop-captain udata)
+ (let* ((peers (hash-table-keys (udat-peers udata)))
+ (cookie (make-cookie udata)))
+ (for-each
+ (lambda (host-port)
+ (send udata host-port 'dropcaptain cookie "nomsg" retval: #t))
+ peers)))
+
+;;======================================================================
+;; server primitives
+;;======================================================================
+
+(define (make-cookie udata)
+ (let ((newcnum (+ (udat-cnum udata))))
+ (udat-cnum-set! udata newcnum)
+ (conc (udat-my-address udata) ":"
+ (udat-my-port udata) "-"
+ (udat-my-pid udata) "-"
+ newcnum)))
+
+;; create a tcp listener and return a populated udat struct with
+;; my port, address, hostname, pid etc.
+;; return #f if fail to find a port to allocate.
+;;
+(define (start-server-find-port udata #!optional (port 4242))
+ (handle-exceptions
+ exn
+ (if (< port 65535)(start-server-find-port udata (+ port 1)) #f)
+ (connect-server udata port)))
+
+(define (connect-server udata port)
+ ;; (tcp-listener-socket LISTENER)(socket-name so)
+ ;; sockaddr-address, sockaddr-port, sockaddr->string
+ (let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]])
+ (addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname)))
+ (udat-my-address-set! udata addr)
+ (udat-my-port-set! udata port)
+ (udat-my-hostname-set! udata (get-host-name))
+ (udat-serv-listener-set! udata tlsn)
+ udata))
+
+(define (get-peer-dat udata host-port #!optional (hostname #f)(pid #f))
+ (let* ((pdat (or (udat-get-peer udata host-port)
+ (handle-exceptions ;; ERROR - MAKE THIS EXCEPTION HANDLER MORE SPECIFIC
+ exn
+ #f
+ (let ((npdat (make-peer addr-port: host-port)))
+ (if hostname (peer-hostname-set! npdat hostname))
+ (if pid (peer-pid-set! npdat pid))
+ npdat)))))
+ pdat))
+
+;; send structured data to recipient
+;;
+;; NOTE: qrykey is what was called the "cookie" previously
+;;
+;; retval tells send to expect and wait for return data (one line) and return it or time out
+;; this is for ping where we don't want to necessarily have set up our own server yet.
+;;
+(define (send udata host-port handler qrykey data #!key (hostname #f)(pid #f)(params '())(retval #f))
+ (handle-exceptions ;; ERROR - MAKE THIS EXCEPTION HANDLER MORE SPECIFIC
+ exn
+ #f
+ (let-values (((inp oup)(tcp-connect host-port)))
+ ;;
+ ;; CONTROL LINE:
+ ;; handlerkey host:port pid qrykey params ...
+ ;;
+ (let ((res
+ (if (and inp oup)
+ (let* ((myhost (udat-my-address udata))
+ (myport (udat-my-port udata))
+ (dat (conc
+ handler " "
+ (udat-my-address udata) ":" (udat-my-port udata) " "
+ ;; (udat-my-hostname udata) " "
+ (udat-my-pid udata) " "
+ qrykey
+ (if (null? params) "" (conc " " (string-intersperse params " "))))))
+ (if (and myhost myport)
+ (begin
+ (write-line dat oup)
+ (write-line data oup)
+ ;; (print "Sent dat: " dat " data: " data)
+ (if retval
+ (read-line inp)
+ #t))
+ (begin
+ (print "ERROR: send called but no receiver has been setup. Please call setup first!")
+ #f))
+ ;; NOTE: DO NOT BE TEMPTED TO LOOK AT ANY DATA ON INP HERE!
+ ;; (there is a listener for handling that)
+ )
+ #f))) ;; #f means failed to connect and send
+ (close-input-port inp)
+ (close-output-port oup)
+ res))))
+
+;; send a request to the given host-port and register a mailbox in udata
+;; wait for the mailbox data and return it
+;;
+(define (send-receive udata host-port handler qrykey data #!key (hostname #f)(pid #f)(params '())(timeout 20))
+ (let ((mbox (make-mailbox))
+ (mbox-time (current-milliseconds))
+ (mboxes (udat-mboxes udata)))
+ (hash-table-set! mboxes qrykey mbox)
+ (if (send udata host-port handler qrykey data hostname: hostname pid: pid params: params)
+ (let* ((mbox-timeout-secs timeout)
+ (mbox-timeout-result 'MBOX_TIMEOUT)
+ (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result))
+ (mbox-receive-time (current-milliseconds)))
+ (hash-table-delete! mboxes qrykey)
+ (if (eq? res 'MBOX_TIMEOUT)
+ #f
+ res))
+ #f))) ;; #f means failed to communicate
+
+;;
+(define (ulex-handler udata controldat data)
+ (print "controldat: " controldat " data: " data)
+ (match (string-split controldat)
+ ((handlerkey host-port pid qrykey params ...)
+ (print "handlerkey: " handlerkey " host-port: " host-port " pid: " pid " qrykey: " qrykey " params: " params)
+ (case (string->symbol handlerkey)
+ ((ack)(print "Got ack!"))
+ ((ping) ;; special case - return result immediately on the same connection
+ (let* ((proc (hash-table-ref/default (udat-handlers udata) 'ping #f))
+ (val (if proc (proc) "gotping"))
+ (peer (make-peer addr-port: host-port pid: pid))
+ (dbshash (udat-dbowners udata)))
+ (peer-dbs-set! peer params) ;; params for ping is list of dbs owned by pinger
+ (for-each (lambda (dbfile)
+ (hash-table-set! dbshash dbfile host-port))
+ params) ;; register each db in the dbshash
+ (if (not (hash-table-exists? (udat-peers udata) host-port))
+ (hash-table-set! (udat-peers udata) host-port peer)) ;; save the details of this caller in peers
+ qrykey)) ;; End of ping
+ ((goodbye)
+ ;; remove all traces of the caller in db ownership etc.
+ (let* ((peer (hash-table-ref/default (udat-peers udata) host-port #f))
+ (dbs (if peer (peer-dbs peer) '()))
+ (dbshash (udat-dbowners udata)))
+ (for-each (lambda (dbfile)(hash-table-delete! dbshash dbfile)) dbs)
+ (hash-table-delete! (udat-peers udata) host-port)
+ qrykey))
+ ((dropcaptain)
+ ;; remove all traces of the captain
+ (udat-captain-address-set! udata #f)
+ (udat-captain-host-set! udata #f)
+ (udat-captain-port-set! udata #f)
+ (udat-captain-pid-set! udata #f)
+ qrykey)
+ ((rucaptain) ;; remote is asking if I'm the captain
+ (if (udat-my-cpkt-key udata) "yes" "no"))
+ ((whoowns) ;; given a db name who do I send my queries to
+ ;; look up the file in handlers, if have an entry ping them to be sure
+ ;; they are still alive and then return that host:port.
+ ;; if no handler found or if the ping fails pick from peers the oldest that
+ ;; is managing the fewest dbs
+ #f)
+ (else
+ (add-to-work-queue udata (get-peer-dat udata host-port) handlerkey qrykey data)
+ #f)))
+ (else
+ (print "BAD DATA? controldat=" controldat " data=" data)
+ #f)));; handles the incoming messages and dispatches to queues
+
+;;
+(define (ulex-handler-loop udata)
+ (let* ((serv-listener (udat-serv-listener udata)))
+ ;; data comes as two lines
+ ;; handlerkey resp-addr:resp-port hostname pid qrykey [dbpath/dbfile.db]
+ ;; data
+ (let loop ((state 'start))
+ (let-values (((inp oup)(tcp-accept serv-listener)))
+ (let* ((controldat (read-line inp))
+ (data (read-line inp))
+ (resp (ulex-handler udata controldat data)))
+ (if resp (write-line resp oup))
+ (close-input-port inp)
+ (close-output-port oup))
+ (loop state)))))
+
+;; add a proc to the handler list
+(define (register-handler udata key proc)
+ (hash-table-set! (udat-handlers udata) key proc))
+
+
+;;======================================================================
+;; work queues
+;;======================================================================
+
+(define (add-to-work-queue udata peer-dat handlerkey qrykey data)
+ (let ((wdat (make-work peer-dat: peer-dat handlerkey: handlerkey qrykey: qrykey data: data)))
+ (if (udat-busy udata)
+ (queue-add! (udat-work-queue udata) wdat)
+ (process-work udata wdat)) ;; passing in wdat tells process-work to first process the passed in wdat
+ ))
+
+(define (do-work udata wdat)
+ #f)
+
+(define (process-work udata #!optional wdat)
+ (if wdat (do-work udata wdat)) ;; process wdat
+ (let ((wqueue (udat-work-queue udata)))
+ (if (not (queue-empty? wqueue))
+ (let loop ((wd (queue-remove! wqueue)))
+ (do-work udata wd)
+ (if (not (queue-empty? wqueue))
+ (loop (queue-remove! wqueue)))))))
+
+;;======================================================================
+;; Generic db handling
+;; setup a inmem db instance
+;; open connection to on-disk db
+;; sync on-disk db to inmem
+;; get lock in on-disk db for dbowner of this db
+;; put sync-proc, init-proc, on-disk handle, inmem handle in dbconn stuct
+;; return the stuct
+;;======================================================================
+
+(defstruct dbconn
+ (inmem #f)
+ (conn #f)
+ (sync #f) ;; sync proc
+ (init #f) ;; init proc
+ (lastsync (current-seconds))
+ )
+
+(defstruct dbinfo
+ (initproc #f)
+ (syncproc #f))
+
+;; open inmem and disk database
+;; init with initproc
+;; return db struct
+;;
+;; appname; megatest, ulex or something else.
+;;
+(define (setup-db-connection udata fname-in appname dbtype)
+ (let* ((is-ulex (eq? appname 'ulex))
+ (dbinf (if is-ulex ;; ulex is a built-in special case
+ (make-dbinfo initproc: ulexdb-init syncproc: ulexdb-sync)
+ (hash-table-ref/default (udat-dbtypes udata) dbtype #f)))
+ (initproc (dbinfo-initproc dbinf))
+ (syncproc (dbinfo-syncproc dbinf))
+ (fname (if is-ulex
+ (conc (udat-ulex-dir udata) "/ulex.db")
+ fname-in))
+ (inmem-db (open-and-initdb udata #f 'inmem (dbinfo-initproc dbinf)))
+ (disk-db (open-and-initdb udata fname 'disk (dbinfo-initproc dbinf))))
+ (make-dbconn inmem: inmem-db conn: disk-db sync: syncproc init: initproc)))
+
+;; dest='inmem or 'disk
+;;
+(define (open-and-initdb udata filename dest init-proc)
+ (let* ((inmem (eq? dest 'inmem))
+ (dbfile (if inmem
+ ":INMEM:"
+ filename))
+ (dbexists (if inmem #t (file-exists? dbfile)))
+ (db (sqlite3:open-database dbfile)))
+ (sqlite3:set-busy-handler! db (sqlite3:make-busy-timeout 136000))
+ (if (not dbexists)
+ (init-proc db))
+ db))
+
+
+;;======================================================================
+;; Ulex db
+;;======================================================================
+
+(define (ulexdb-init db inmem)
+ (sqlite3:with-transaction
+ db
+ (lambda ()
+ (for-each
+ (lambda (stmt)
+ (if stmt (sqlite3:execute db stmt)))
+ `("CREATE TABLE IF NOT EXISTS processes
+ (id INTEGER PRIMARY KEY,
+ host TEXT NOT NULL,
+ ipadr TEXT NOT NULL,
+ port INTEGER NOT NULL,
+ pid INTEGER NOT NULL,
+ regtime INTEGER DEFAULT (strftime('%s','now')),
+ last_update INTEGER DEFAULT (strftime('%s','now')));"
+ (if inmem
+ "CREATE TRIGGER IF NOT EXISTS update_proces_trigger AFTER UPDATE ON processes
+ FOR EACH ROW
+ BEGIN
+ UPDATE processes SET last_update=(strftime('%s','now'))
+ WHERE id=old.id;
+ END;"
+ #f))))))
+
+;; open databases, do initial sync
+(define (ulexdb-sync dbconndat udata)
+ #f)
+
+
+) ;; END OF ULEX
+
+
+;;; ;;======================================================================
+;;; ;; D E B U G H E L P E R S
+;;; ;;======================================================================
+;;;
+;;; (define (dbg> . args)
+;;; (with-output-to-port (current-error-port)
+;;; (lambda ()
+;;; (apply print "dbg> " args))))
+;;;
+;;; (define (debug-pp . args)
+;;; (if (get-environment-variable "ULEX_DEBUG")
+;;; (with-output-to-port (current-error-port)
+;;; (lambda ()
+;;; (apply pp args)))))
+;;;
+;;; (define *default-debug-port* (current-error-port))
+;;;
+;;; (define (sdbg> fn stage-name stage-start stage-end start-time . message)
+;;; (if (get-environment-variable "ULEX_DEBUG")
+;;; (with-output-to-port *default-debug-port*
+;;; (lambda ()
+;;; (apply print "ulex:" fn " " stage-name " took " (- (if stage-end stage-end (current-milliseconds)) stage-start) " ms. "
+;;; (if start-time
+;;; (conc "total time " (- (current-milliseconds) start-time)
+;;; " ms.")
+;;; "")
+;;; message
+;;; )))))
+
+;;======================================================================
+;; M A C R O S
+;;======================================================================
+;; iup callbacks are not dumping the stack, this is a work-around
+;;
+
+;; Some of these routines use:
+;;
+;; http://www.cs.toronto.edu/~gfb/scheme/simple-macros.html
+;;
+;; Syntax for defining macros in a simple style similar to function definiton,
+;; when there is a single pattern for the argument list and there are no keywords.
+;;
+;; (define-simple-syntax (name arg ...) body ...)
+;;
+;;
+;; (define-syntax define-simple-syntax
+;; (syntax-rules ()
+;; ((_ (name arg ...) body ...)
+;; (define-syntax name (syntax-rules () ((name arg ...) (begin body ...)))))))
+;;
+;; (define-simple-syntax (catch-and-dump proc procname)
+;; (handle-exceptions
+;; exn
+;; (begin
+;; (print-call-chain (current-error-port))
+;; (with-output-to-port (current-error-port)
+;; (lambda ()
+;; (print ((condition-property-accessor 'exn 'message) exn))
+;; (print "Callback error in " procname)
+;; (print "Full condition info:\n" (condition->list exn)))))
+;; (proc)))
+;;
+;;
+;;======================================================================
+;; R E C O R D S
+;;======================================================================
-)
+;;; ;; information about me as a server
+;;; ;;
+;;; (defstruct area
+;;; ;; about this area
+;;; (useportlogger #f)
+;;; (lowport 32768)
+;;; (server-type 'auto) ;; auto=create up to five servers/pkts, main=create pkts, passive=no pkt (unless there are no pkts at all)
+;;; (conn #f)
+;;; (port #f)
+;;; (myaddr (get-my-best-address))
+;;; pktid ;; get pkt from hosts table if needed
+;;; pktfile
+;;; pktsdir
+;;; dbdir
+;;; (dbhandles (make-hash-table)) ;; fname => list-of-dbh, NOTE: Should really never need more than one?
+;;; (mutex (make-mutex))
+;;; (rtable (make-hash-table)) ;; registration table of available actions
+;;; (dbs (make-hash-table)) ;; filename => random number, used for choosing what dbs I serve
+;;; ;; about other servers
+;;; (hosts (make-hash-table)) ;; key => hostdat
+;;; (hoststats (make-hash-table)) ;; key => alist of fname => ( qcount . qtime )
+;;; (reqs (make-hash-table)) ;; uri => queue
+;;; ;; work queues
+;;; (wqueues (make-hash-table)) ;; fname => qdat
+;;; (stats (make-hash-table)) ;; fname => totalqueries
+;;; (last-srvup (current-seconds)) ;; last time we updated the known servers
+;;; (cookie2mbox (make-hash-table)) ;; map cookie for outstanding request to mailbox of awaiting call
+;;; (ready #f)
+;;; (health (make-hash-table)) ;; ipaddr:port => num failed pings since last good ping
+;;; )
+;;;
+;;; ;; host stats
+;;; ;;
+;;; (defstruct hostdat
+;;; (pkt #f)
+;;; (dbload (make-hash-table)) ;; "dbfile.db" => queries/min
+;;; (hostload #f) ;; normalized load ( 5min load / numcpus )
+;;; )
+;;;
+;;; ;; dbdat
+;;; ;;
+;;; (defstruct dbdat
+;;; (dbh #f)
+;;; (fname #f)
+;;; (write-access #f)
+;;; (sths (make-hash-table)) ;; hash mapping query strings to handles
+;;; )
+;;;
+;;; ;; qdat
+;;; ;;
+;;; (defstruct qdat
+;;; (writeq (make-queue))
+;;; (readq (make-queue))
+;;; (rwq (make-queue))
+;;; (logq (make-queue)) ;; do we need a queue for logging? yes, if we use sqlite3 db for logging
+;;; (osshort (make-queue))
+;;; (oslong (make-queue))
+;;; (misc (make-queue)) ;; used for things like ping-full
+;;; )
+;;;
+;;; ;; calldat
+;;; ;;
+;;; (defstruct calldat
+;;; (ctype 'dbwrite)
+;;; (obj #f) ;; this would normally be an SQL statement e.g. SELECT, INSERT etc.
+;;; (rtime (current-milliseconds)))
+;;;
+;;; ;; make it a global? Well, it is local to area module
+;;;
+;;; (define *pktspec*
+;;; `((server (hostname . h)
+;;; (port . p)
+;;; (pid . i)
+;;; (ipaddr . a)
+;;; )
+;;; (data (hostname . h) ;; sender hostname
+;;; (port . p) ;; sender port
+;;; (ipaddr . a) ;; sender ip
+;;; (hostkey . k) ;; sending host key - store info at server under this key
+;;; (servkey . s) ;; server key - this needs to match at server end or reject the msg
+;;; (format . f) ;; sb=serialized-base64, t=text, sx=sexpr, j=json
+;;; (data . d) ;; base64 encoded slln data
+;;; )))
+;;;
+;;; ;; work item
+;;; ;;
+;;; (defstruct witem
+;;; (rhost #f) ;; return host
+;;; (ripaddr #f) ;; return ipaddr
+;;; (rport #f) ;; return port
+;;; (servkey #f) ;; the packet representing the client of this workitem, used by final send-message
+;;; (rdat #f) ;; the request - usually an sql query, type is rdat
+;;; (action #f) ;; the action: immediate, dbwrite, dbread,oslong, osshort
+;;; (cookie #f) ;; cookie id for response
+;;; (data #f) ;; the data payload, i.e. parameters
+;;; (result #f) ;; the result from processing the data
+;;; (caller #f)) ;; the calling peer according to rpc itself
+;;;
+;;; (define (trim-pktid pktid)
+;;; (if (string? pktid)
+;;; (substring pktid 0 4)
+;;; "nopkt"))
+;;;
+;;; (define (any->number num)
+;;; (cond
+;;; ((number? num) num)
+;;; ((string? num) (string->number num))
+;;; (else num)))
+;;;
+;;; (use trace)
+;;; (trace-call-sites #t)
+;;;
+;;; ;;======================================================================
+;;; ;; D A T A B A S E H A N D L I N G
+;;; ;;======================================================================
+;;;
+;;; ;; look in dbhandles for a db, return it, else return #f
+;;; ;;
+;;; (define (get-dbh acfg fname)
+;;; (let ((dbh-lst (hash-table-ref/default (area-dbhandles acfg) fname '())))
+;;; (if (null? dbh-lst)
+;;; (begin
+;;; ;; (print "opening db for " fname)
+;;; (open-db acfg fname)) ;; Note that the handles get put back in the queue in the save-dbh calls
+;;; (let ((rem-lst (cdr dbh-lst)))
+;;; ;; (print "re-using saved connection for " fname)
+;;; (hash-table-set! (area-dbhandles acfg) fname rem-lst)
+;;; (car dbh-lst)))))
+;;;
+;;; (define (save-dbh acfg fname dbdat)
+;;; ;; (print "saving dbh for " fname)
+;;; (hash-table-set! (area-dbhandles acfg) fname (cons dbdat (hash-table-ref/default (area-dbhandles acfg) fname '()))))
+;;;
+;;; ;; open the database, if never before opened init it. put the handle in the
+;;; ;; open db's hash table
+;;; ;; returns: the dbdat
+;;; ;;
+;;; (define (open-db acfg fname)
+;;; (let* ((fullname (conc (area-dbdir acfg) "/" fname))
+;;; (exists (file-exists? fullname))
+;;; (write-access (if exists
+;;; (file-write-access? fullname)
+;;; (file-write-access? (area-dbdir acfg))))
+;;; (db (sqlite3:open-database fullname))
+;;; (handler (sqlite3:make-busy-timeout 136000))
+;;; )
+;;; (sqlite3:set-busy-handler! db handler)
+;;; (sqlite3:execute db "PRAGMA synchronous = 0;")
+;;; (if (not exists) ;; need to init the db
+;;; (if write-access
+;;; (let ((isql (get-rsql acfg 'dbinitsql))) ;; get the init sql statements
+;;; ;; (sqlite3:with-transaction
+;;; ;; db
+;;; ;; (lambda ()
+;;; (if isql
+;;; (for-each
+;;; (lambda (sql)
+;;; (sqlite3:execute db sql))
+;;; isql)))
+;;; (print "ERROR: no write access to " (area-dbdir acfg))))
+;;; (make-dbdat dbh: db fname: fname write-access: write-access)))
+;;;
+;;; ;; This is a low-level command to retrieve or to prepare, save and return a prepared statment
+;;; ;; you must extract the db handle
+;;; ;;
+;;; (define (get-sth db cache stmt)
+;;; (if (hash-table-exists? cache stmt)
+;;; (begin
+;;; ;; (print "Reusing cached stmt for " stmt)
+;;; (hash-table-ref/default cache stmt #f))
+;;; (let ((sth (sqlite3:prepare db stmt)))
+;;; (hash-table-set! cache stmt sth)
+;;; ;; (print "prepared stmt for " stmt)
+;;; sth)))
+;;;
+;;; ;; a little more expensive but does all the tedious deferencing - only use if you don't already
+;;; ;; have dbdat and db sitting around
+;;; ;;
+;;; (define (full-get-sth acfg fname stmt)
+;;; (let* ((dbdat (get-dbh acfg fname))
+;;; (db (dbdat-dbh dbdat))
+;;; (sths (dbdat-sths dbdat)))
+;;; (get-sth db sths stmt)))
+;;;
+;;; ;; write to a db
+;;; ;; acfg: area data
+;;; ;; rdat: request data
+;;; ;; hdat: (host . port)
+;;; ;;
+;;; ;; (define (dbwrite acfg rdat hdat data-in)
+;;; ;; (let* ((dbname (car data-in))
+;;; ;; (dbdat (get-dbh acfg dbname))
+;;; ;; (db (dbdat-dbh dbdat))
+;;; ;; (sths (dbdat-sths dbdat))
+;;; ;; (stmt (calldat-obj rdat))
+;;; ;; (sth (get-sth db sths stmt))
+;;; ;; (data (cdr data-in)))
+;;; ;; (print "dbname: " dbname " acfg: " acfg " rdat: " (calldat->alist rdat) " hdat: " hdat " data: " data)
+;;; ;; (print "dbdat: " (dbdat->alist dbdat))
+;;; ;; (apply sqlite3:execute sth data)
+;;; ;; (save-dbh acfg dbname dbdat)
+;;; ;; #t
+;;; ;; ))
+;;;
+;;; (define (finalize-all-db-handles acfg)
+;;; (let* ((dbhandles (area-dbhandles acfg)) ;; dbhandles is hash of fname ==> dbdat
+;;; (num 0))
+;;; (for-each
+;;; (lambda (area-name)
+;;; (print "Closing handles for " area-name)
+;;; (let ((dbdats (hash-table-ref/default dbhandles area-name '())))
+;;; (for-each
+;;; (lambda (dbdat)
+;;; ;; first close all statement handles
+;;; (for-each
+;;; (lambda (sth)
+;;; (sqlite3:finalize! sth)
+;;; (set! num (+ num 1)))
+;;; (hash-table-values (dbdat-sths dbdat)))
+;;; ;; now close the dbh
+;;; (set! num (+ num 1))
+;;; (sqlite3:finalize! (dbdat-dbh dbdat)))
+;;; dbdats)))
+;;; (hash-table-keys dbhandles))
+;;; (print "FINALIZED " num " dbhandles")))
+;;;
+;;; ;;======================================================================
+;;; ;; W O R K Q U E U E H A N D L I N G
+;;; ;;======================================================================
+;;;
+;;; (define (register-db-as-mine acfg dbname)
+;;; (let ((ht (area-dbs acfg)))
+;;; (if (not (hash-table-ref/default ht dbname #f))
+;;; (hash-table-set! ht dbname (random 10000)))))
+;;;
+;;; (define (work-queue-add acfg fname witem)
+;;; (let* ((work-queue-start (current-milliseconds))
+;;; (action (witem-action witem)) ;; NB the action is the index into the rdat actions
+;;; (qdat (or (hash-table-ref/default (area-wqueues acfg) fname #f)
+;;; (let ((newqdat (make-qdat)))
+;;; (hash-table-set! (area-wqueues acfg) fname newqdat)
+;;; newqdat)))
+;;; (rdat (hash-table-ref/default (area-rtable acfg) action #f)))
+;;; (if rdat
+;;; (queue-add!
+;;; (case (calldat-ctype rdat)
+;;; ((dbwrite) (register-db-as-mine acfg fname)(qdat-writeq qdat))
+;;; ((dbread) (register-db-as-mine acfg fname)(qdat-readq qdat))
+;;; ((dbrw) (register-db-as-mine acfg fname)(qdat-rwq qdat))
+;;; ((oslong) (qdat-oslong qdat))
+;;; ((osshort) (qdat-osshort qdat))
+;;; ((full-ping) (qdat-misc qdat))
+;;; (else
+;;; (print "ERROR: no queue for " action ". Adding to dbwrite queue.")
+;;; (qdat-writeq qdat)))
+;;; witem)
+;;; (case action
+;;; ((full-ping)(qdat-misc qdat))
+;;; (else
+;;; (print "ERROR: No action " action " was registered"))))
+;;; (sdbg> "work-queue-add" "queue-add" work-queue-start #f #f)
+;;; #t)) ;; for now, simply return #t to indicate request got to the queue
+;;;
+;;; (define (doqueue acfg q fname dbdat dbh)
+;;; ;; (print "doqueue: " fname)
+;;; (let* ((start-time (current-milliseconds))
+;;; (qlen (queue-length q)))
+;;; (if (> qlen 1)
+;;; (print "Processing queue of length " qlen))
+;;; (let loop ((count 0)
+;;; (responses '()))
+;;; (let ((delta (- (current-milliseconds) start-time)))
+;;; (if (or (queue-empty? q)
+;;; (> delta 400)) ;; stop working on this queue after 400ms have passed
+;;; (list count delta responses) ;; return count, delta and responses list
+;;; (let* ((witem (queue-remove! q))
+;;; (action (witem-action witem))
+;;; (rdat (witem-rdat witem))
+;;; (stmt (calldat-obj rdat))
+;;; (sth (full-get-sth acfg fname stmt))
+;;; (ctype (calldat-ctype rdat))
+;;; (data (witem-data witem))
+;;; (cookie (witem-cookie witem)))
+;;; ;; do the processing and save the result in witem-result
+;;; (witem-result-set!
+;;; witem
+;;; (case ctype ;; action
+;;; ((noblockwrite) ;; blind write, no ack of success returned
+;;; (apply sqlite3:execute sth data)
+;;; (sqlite3:last-insert-rowid dbh))
+;;; ((dbwrite) ;; blocking write
+;;; (apply sqlite3:execute sth data)
+;;; #t)
+;;; ((dbread) ;; TODO: consider breaking this up and shipping in pieces for large query
+;;; (apply sqlite3:map-row (lambda x x) sth data))
+;;; ((full-ping) 'full-ping)
+;;; (else (print "Not ready for action " action) #f)))
+;;; (loop (add1 count)
+;;; (if cookie
+;;; (cons witem responses)
+;;; responses))))))))
+;;;
+;;; ;; do up to 400ms of processing on each queue
+;;; ;; - the work-queue-processor will allow the max 1200ms of work to complete but it will flag as overloaded
+;;; ;;
+;;; (define (process-db-queries acfg fname)
+;;; (if (hash-table-exists? (area-wqueues acfg) fname)
+;;; (let* ((process-db-queries-start-time (current-milliseconds))
+;;; (qdat (hash-table-ref/default (area-wqueues acfg) fname #f))
+;;; (queue-sym->queue (lambda (queue-sym)
+;;; (case queue-sym ;; lookup the queue from qdat given a name (symbol)
+;;; ((wqueue) (qdat-writeq qdat))
+;;; ((rqueue) (qdat-readq qdat))
+;;; ((rwqueue) (qdat-rwq qdat))
+;;; ((misc) (qdat-misc qdat))
+;;; (else #f))))
+;;; (dbdat (get-dbh acfg fname))
+;;; (dbh (if (dbdat? dbdat)(dbdat-dbh dbdat) #f))
+;;; (nowtime (current-seconds)))
+;;; ;; handle the queues that require a transaction
+;;; ;;
+;;; (map ;;
+;;; (lambda (queue-sym)
+;;; ;; (print "processing queue " queue-sym)
+;;; (let* ((queue (queue-sym->queue queue-sym)))
+;;; (if (not (queue-empty? queue))
+;;; (let ((responses
+;;; (sqlite3:with-transaction ;; todo - catch exceptions...
+;;; dbh
+;;; (lambda ()
+;;; (let* ((res (doqueue acfg queue fname dbdat dbh))) ;; this does the work!
+;;; ;; (print "res=" res)
+;;; (match res
+;;; ((count delta responses)
+;;; (update-stats acfg fname queue-sym delta count)
+;;; (sdbg> "process-db-queries" "sqlite3-transaction" process-db-queries-start-time #f #f)
+;;; responses) ;; return responses
+;;; (else
+;;; (print "ERROR: bad return data from doqueue " res)))
+;;; )))))
+;;; ;; having completed the transaction, send the responses.
+;;; ;; (print "INFO: sending " (length responses) " responses.")
+;;; (let loop ((responses-left responses))
+;;; (cond
+;;; ((null? responses-left) #t)
+;;; (else
+;;; (let* ((witem (car responses-left))
+;;; (response (cdr responses-left)))
+;;; (call-deliver-response acfg (witem-ripaddr witem)(witem-rport witem)
+;;; (witem-cookie witem)(witem-result witem)))
+;;; (loop (cdr responses-left))))))
+;;; )))
+;;; '(wqueue rwqueue rqueue))
+;;;
+;;; ;; handle misc queue
+;;; ;;
+;;; ;; (print "processing misc queue")
+;;; (let ((queue (queue-sym->queue 'misc)))
+;;; (doqueue acfg queue fname dbdat dbh))
+;;; ;; ....
+;;; (save-dbh acfg fname dbdat)
+;;; #t ;; just to let the tests know we got here
+;;; )
+;;; #f ;; nothing processed
+;;; ))
+;;;
+;;; ;; run all queues in parallel per db but sequentially per queue for that db.
+;;; ;; - process the queues every 500 or so ms
+;;; ;; - allow for long running queries to continue but all other activities for that
+;;; ;; db will be blocked.
+;;; ;;
+;;; (define (work-queue-processor acfg)
+;;; (let* ((threads (make-hash-table))) ;; fname => thread
+;;; (let loop ((fnames (hash-table-keys (area-wqueues acfg)))
+;;; (target-time (+ (current-milliseconds) 50)))
+;;; ;;(if (not (null? fnames))(print "Processing for these databases: " fnames))
+;;; (for-each
+;;; (lambda (fname)
+;;; ;; (print "processing for " fname)
+;;; ;;(process-db-queries acfg fname))
+;;; (let ((th (hash-table-ref/default threads fname #f)))
+;;; (if (and th (not (member (thread-state th) '(dead terminated))))
+;;; (begin
+;;; (print "WARNING: worker thread for " fname " is taking a long time.")
+;;; (print "Thread is in state " (thread-state th)))
+;;; (let ((th1 (make-thread (lambda ()
+;;; ;; (catch-and-dump
+;;; ;; (lambda ()
+;;; ;; (print "Process queries for " fname)
+;;; (let ((start-time (current-milliseconds)))
+;;; (process-db-queries acfg fname)
+;;; ;; (thread-sleep! 0.01) ;; need the thread to take at least some time
+;;; (hash-table-delete! threads fname)) ;; no mutexes?
+;;; fname)
+;;; "th1"))) ;; ))
+;;; (hash-table-set! threads fname th1)
+;;; (thread-start! th1)))))
+;;; fnames)
+;;; ;; (thread-sleep! 0.1) ;; give the threads some time to process requests
+;;; ;; burn time until 400ms is up
+;;; (let ((now-time (current-milliseconds)))
+;;; (if (< now-time target-time)
+;;; (let ((delta (- target-time now-time)))
+;;; (thread-sleep! (/ delta 1000)))))
+;;; (loop (hash-table-keys (area-wqueues acfg))
+;;; (+ (current-milliseconds) 50)))))
+;;;
+;;; ;;======================================================================
+;;; ;; S T A T S G A T H E R I N G
+;;; ;;======================================================================
+;;;
+;;; (defstruct stat
+;;; (qcount-avg 0) ;; coarse running average
+;;; (qtime-avg 0) ;; coarse running average
+;;; (qcount 0) ;; total
+;;; (qtime 0) ;; total
+;;; (last-qcount 0) ;; last
+;;; (last-qtime 0) ;; last
+;;; (dbs '()) ;; list of db files handled by this node
+;;; (when 0)) ;; when the last query happened - seconds
+;;;
+;;;
+;;; (define (update-stats acfg fname bucket duration numqueries)
+;;; (let* ((key fname) ;; for now do not use bucket. Was: (conc fname "-" bucket)) ;; lazy but good enough
+;;; (stats (or (hash-table-ref/default (area-stats acfg) key #f)
+;;; (let ((newstats (make-stat)))
+;;; (hash-table-set! (area-stats acfg) key newstats)
+;;; newstats))))
+;;; ;; when the last query happended (used to remove the fname from the active list)
+;;; (stat-when-set! stats (current-seconds))
+;;; ;; last values
+;;; (stat-last-qcount-set! stats numqueries)
+;;; (stat-last-qtime-set! stats duration)
+;;; ;; total over process lifetime
+;;; (stat-qcount-set! stats (+ (stat-qcount stats) numqueries))
+;;; (stat-qtime-set! stats (+ (stat-qtime stats) duration))
+;;; ;; coarse average
+;;; (stat-qcount-avg-set! stats (/ (+ (stat-qcount-avg stats) numqueries) 2))
+;;; (stat-qtime-avg-set! stats (/ (+ (stat-qtime-avg stats) duration) 2))
+;;;
+;;; ;; here is where we add the stats for a given dbfile
+;;; (if (not (member fname (stat-dbs stats)))
+;;; (stat-dbs-set! stats (cons fname (stat-dbs stats))))
+;;;
+;;; ))
+;;;
+;;; ;;======================================================================
+;;; ;; S E R V E R S T U F F
+;;; ;;======================================================================
+;;;
+;;; ;; this does NOT return!
+;;; ;;
+;;; (define (find-free-port-and-open acfg)
+;;; (let ((port (or (area-port acfg) 3200)))
+;;; (handle-exceptions
+;;; exn
+;;; (begin
+;;; (print "INFO: cannot bind to port " (rpc:default-server-port) ", trying next port")
+;;; (area-port-set! acfg (+ port 1))
+;;; (find-free-port-and-open acfg))
+;;; (rpc:default-server-port port)
+;;; (area-port-set! acfg port)
+;;; (tcp-read-timeout 120000)
+;;; ;; ((rpc:make-server (tcp-listen port)) #t)
+;;; (tcp-listen (rpc:default-server-port)
+;;; ))))
+;;;
+;;; ;; register this node by putting a packet into the pkts dir.
+;;; ;; look for other servers
+;;; ;; contact other servers and compile list of servers
+;;; ;; there are two types of server
+;;; ;; main servers - dashboards, runners and dedicated servers - need pkt
+;;; ;; passive servers - test executers, step calls, list-runs - no pkt
+;;; ;;
+;;; (define (register-node acfg hostip port-num)
+;;; ;;(mutex-lock! (area-mutex acfg))
+;;; (let* ((server-type (area-server-type acfg)) ;; auto, main, passive (no pkt created)
+;;; (best-ip (or hostip (get-my-best-address)))
+;;; (mtdir (area-dbdir acfg))
+;;; (pktdir (area-pktsdir acfg))) ;; conc mtdir "/.server-pkts")))
+;;; (print "Registering node " best-ip ":" port-num)
+;;; (if (not mtdir) ;; require a home for this node to put or find databases
+;;; #f
+;;; (begin
+;;; (if (not (directory? pktdir))(create-directory pktdir))
+;;; ;; server is started, now create pkt if needed
+;;; (print "Starting server in " server-type " mode with port " port-num)
+;;; (if (member server-type '(auto main)) ;; TODO: if auto, count number of servers registers, if > 3 then don't put out a pkt
+;;; (begin
+;;; (area-pktid-set! acfg
+;;; (write-alist->pkt
+;;; pktdir
+;;; `((hostname . ,(get-host-name))
+;;; (ipaddr . ,best-ip)
+;;; (port . ,port-num)
+;;; (pid . ,(current-process-id)))
+;;; pktspec: *pktspec*
+;;; ptype: 'server))
+;;; (area-pktfile-set! acfg (conc pktdir "/" (area-pktid acfg) ".pkt"))))
+;;; (area-port-set! acfg port-num)
+;;; #;(mutex-unlock! (area-mutex acfg))))))
+;;;
+;;; (define *cookie-seqnum* 0)
+;;; (define (make-cookie key)
+;;; (set! *cookie-seqnum* (add1 *cookie-seqnum*))
+;;; ;;(print "MAKE COOKIE CALLED -- on "servkey"-"*cookie-seqnum*)
+;;; (conc key "-" *cookie-seqnum*)
+;;; )
+;;;
+;;; ;; dispatch locally if possible
+;;; ;;
+;;; (define (call-deliver-response acfg ipaddr port cookie data)
+;;; (if (and (equal? (area-myaddr acfg) ipaddr)
+;;; (equal? (area-port acfg) port))
+;;; (deliver-response acfg cookie data)
+;;; ((rpc:procedure 'response ipaddr port) cookie data)))
+;;;
+;;; (define (deliver-response acfg cookie data)
+;;; (let ((deliver-response-start (current-milliseconds)))
+;;; (thread-start! (make-thread
+;;; (lambda ()
+;;; (let loop ((tries-left 5))
+;;; ;;(print "TOP OF DELIVER_RESPONSE LOOP; triesleft="tries-left)
+;;; ;;(pp (hash-table->alist (area-cookie2mbox acfg)))
+;;; (let* ((mbox (hash-table-ref/default (area-cookie2mbox acfg) cookie #f)))
+;;; (cond
+;;; ((eq? 0 tries-left)
+;;; (print "ulex:deliver-response: I give up. Mailbox never appeared. cookie="cookie)
+;;; )
+;;; (mbox
+;;; ;;(print "got mbox="mbox" got data="data" send.")
+;;; (mailbox-send! mbox data))
+;;; (else
+;;; ;;(print "no mbox yet. look for "cookie)
+;;; (thread-sleep! (/ (- 6 tries-left) 10))
+;;; (loop (sub1 tries-left))))))
+;;; ;; (debug-pp (list (conc "ulex:deliver-response took " (- (current-milliseconds) deliver-response-start) " ms, cookie=" cookie " data=") data))
+;;; (sdbg> "deliver-response" "mailbox-send" deliver-response-start #f #f cookie)
+;;; )
+;;; (conc "deliver-response thread for cookie="cookie))))
+;;; #t)
+;;;
+;;; ;; action:
+;;; ;; immediate - quick actions, no need to put in queues
+;;; ;; dbwrite - put in dbwrite queue
+;;; ;; dbread - put in dbread queue
+;;; ;; oslong - os actions, e.g. du, that could take a long time
+;;; ;; osshort - os actions that should be quick, e.g. df
+;;; ;;
+;;; (define (request acfg from-ipaddr from-port servkey action cookie fname params) ;; std-peer-handler
+;;; ;; NOTE: Use rpc:current-peer for getting return address
+;;; (let* ((std-peer-handler-start (current-milliseconds))
+;;; ;; (raw-data (alist-ref 'data dat))
+;;; (rdat (hash-table-ref/default
+;;; (area-rtable acfg) action #f)) ;; this looks up the sql query or other details indexed by the action
+;;; (witem (make-witem ripaddr: from-ipaddr ;; rhost: from-host
+;;; rport: from-port action: action
+;;; rdat: rdat cookie: cookie
+;;; servkey: servkey data: params ;; TODO - rename data to params
+;;; caller: (rpc:current-peer))))
+;;; (if (not (equal? servkey (area-pktid acfg)))
+;;; `(#f . ,(conc "I don't know you servkey=" servkey ", pktid=" (area-pktid acfg))) ;; immediately return this
+;;; (let* ((ctype (if rdat
+;;; (calldat-ctype rdat) ;; is this necessary? these should be identical
+;;; action)))
+;;; (sdbg> "std-peer-handler" "immediate" std-peer-handler-start #f #f)
+;;; (case ctype
+;;; ;; (dbwrite acfg rdat (cons from-ipaddr from-port) data)))
+;;; ((full-ping) `(#t "ack to full ping" ,(work-queue-add acfg fname witem) ,cookie))
+;;; ((response) `(#t "ack from requestor" ,(deliver-response acfg fname params)))
+;;; ((dbwrite) `(#t "db write submitted" ,(work-queue-add acfg fname witem) ,cookie))
+;;; ((dbread) `(#t "db read submitted" ,(work-queue-add acfg fname witem) ,cookie ))
+;;; ((dbrw) `(#t "db read/write submitted" ,cookie))
+;;; ((osshort) `(#t "os short submitted" ,cookie))
+;;; ((oslong) `(#t "os long submitted" ,cookie))
+;;; (else `(#f "unrecognised action" ,ctype)))))))
+;;;
+;;; ;; Call this to start the actual server
+;;; ;;
+;;; ;; start_server
+;;; ;;
+;;; ;; mode: '
+;;; ;; handler: proc which takes pktrecieved as argument
+;;; ;;
+;;;
+;;; (define (start-server acfg)
+;;; (let* ((conn (find-free-port-and-open acfg))
+;;; (port (area-port acfg)))
+;;; (rpc:publish-procedure!
+;;; 'delist-db
+;;; (lambda (fname)
+;;; (hash-table-delete! (area-dbs acfg) fname)))
+;;; (rpc:publish-procedure!
+;;; 'calling-addr
+;;; (lambda ()
+;;; (rpc:current-peer)))
+;;; (rpc:publish-procedure!
+;;; 'ping
+;;; (lambda ()(real-ping acfg)))
+;;; (rpc:publish-procedure!
+;;; 'request
+;;; (lambda (from-addr from-port servkey action cookie dbname params)
+;;; (request acfg from-addr from-port servkey action cookie dbname params)))
+;;; (rpc:publish-procedure!
+;;; 'response
+;;; (lambda (cookie res-dat)
+;;; (deliver-response acfg cookie res-dat)))
+;;; (area-ready-set! acfg #t)
+;;; (area-conn-set! acfg conn)
+;;; ((rpc:make-server conn) #f)));; ((tcp-listen (rpc:default-server-port)) #t)
+;;;
+;;;
+;;; (define (launch acfg) ;; #!optional (proc std-peer-handler))
+;;; (print "starting launch")
+;;; (update-known-servers acfg) ;; gotta do this on every start (thus why limit number of publicised servers)
+;;; #;(let ((original-handler (current-exception-handler))) ;; is th
+;;; (lambda (exception)
+;;; (server-exit-procedure)
+;;; (original-handler exception)))
+;;; (on-exit (lambda ()
+;;; (shutdown acfg))) ;; (finalize-all-db-handles acfg)))
+;;; ;; set up the rpc handler
+;;; (let* ((th1 (make-thread
+;;; (lambda ()(start-server acfg))
+;;; "server thread"))
+;;; (th2 (make-thread
+;;; (lambda ()
+;;; (print "th2 starting")
+;;; (let loop ()
+;;; (work-queue-processor acfg)
+;;; (print "work-queue-processor crashed!")
+;;; (loop)))
+;;; "work queue thread")))
+;;; (thread-start! th1)
+;;; (thread-start! th2)
+;;; (let loop ()
+;;; (thread-sleep! 0.025)
+;;; (if (area-ready acfg)
+;;; #t
+;;; (loop)))
+;;; ;; attempt to fix my address
+;;; (let* ((all-addr (get-all-ips-sorted))) ;; could use (tcp-addresses conn)?
+;;; (let loop ((rem-addrs all-addr))
+;;; (if (null? rem-addrs)
+;;; (begin
+;;; (print "ERROR: Failed to figure out the ip address of myself as a server. Giving up.")
+;;; (exit 1)) ;; BUG Changeme to raising an exception
+;;;
+;;; (let* ((addr (car rem-addrs))
+;;; (good-addr (handle-exceptions
+;;; exn
+;;; #f
+;;; ((rpc:procedure 'calling-addr addr (area-port acfg))))))
+;;; (if good-addr
+;;; (begin
+;;; (print "Got good-addr of " good-addr)
+;;; (area-myaddr-set! acfg good-addr))
+;;; (loop (cdr rem-addrs)))))))
+;;; (register-node acfg (area-myaddr acfg)(area-port acfg))
+;;; (print "INFO: Server started on " (area-myaddr acfg) ":" (area-port acfg))
+;;; ;; (update-known-servers acfg) ;; gotta do this on every start (thus why limit number of publicised servers)
+;;; ))
+;;;
+;;; (define (clear-server-pkt acfg)
+;;; (let ((pktf (area-pktfile acfg)))
+;;; (if pktf (delete-file* pktf))))
+;;;
+;;; (define (shutdown acfg)
+;;; (let (;;(conn (area-conn acfg))
+;;; (pktf (area-pktfile acfg))
+;;; (port (area-port acfg)))
+;;; (if pktf (delete-file* pktf))
+;;; (send-all "imshuttingdown")
+;;; ;; (rpc:close-all-connections!) ;; don't know if this is actually needed
+;;; (finalize-all-db-handles acfg)))
+;;;
+;;; (define (send-all msg)
+;;; #f)
+;;;
+;;; ;; given a area record look up all the packets
+;;; ;;
+;;; (define (get-all-server-pkts acfg)
+;;; (let ((all-pkt-files (glob (conc (area-pktsdir acfg) "/*.pkt"))))
+;;; (map (lambda (pkt-file)
+;;; (read-pkt->alist pkt-file pktspec: *pktspec*))
+;;; all-pkt-files)))
+;;;
+;;; #;((Z . "9a0212302295a19610d5796fce0370fa130758e9")
+;;; (port . "34827")
+;;; (pid . "28748")
+;;; (hostname . "zeus")
+;;; (T . "server")
+;;; (D . "1549427032.0"))
+;;;
+;;; #;(define (get-my-best-address)
+;;; (let ((all-my-addresses (get-all-ips))) ;; (vector->list (hostinfo-addresses (hostname->hostinfo (get-host-name))))))
+;;; (cond
+;;; ((null? all-my-addresses)
+;;; (get-host-name)) ;; no interfaces?
+;;; ((eq? (length all-my-addresses) 1)
+;;; (ip->string (car all-my-addresses))) ;; only one to choose from, just go with it
+;;; (else
+;;; (ip->string (car (filter (lambda (x) ;; take any but 127.
+;;; (not (eq? (u8vector-ref x 0) 127)))
+;;; all-my-addresses)))))))
+;;;
+;;; ;; whoami? I am my pkt
+;;; ;;
+;;; (define (whoami? acfg)
+;;; (hash-table-ref/default (area-hosts acfg)(area-pktid acfg) #f))
+;;;
+;;; ;;======================================================================
+;;; ;; "Client side" operations
+;;; ;;======================================================================
+;;;
+;;; (define (safe-call call-key host port . params)
+;;; (handle-exceptions
+;;; exn
+;;; (begin
+;;; (print "Call " call-key " to " host ":" port " failed")
+;;; #f)
+;;; (apply (rpc:procedure call-key host port) params)))
+;;;
+;;; ;; ;; convert to/from string / sexpr
+;;; ;;
+;;; ;; (define (string->sexpr str)
+;;; ;; (if (string? str)
+;;; ;; (with-input-from-string str read)
+;;; ;; str))
+;;; ;;
+;;; ;; (define (sexpr->string s)
+;;; ;; (with-output-to-string (lambda ()(write s))))
+;;;
+;;; ;; is the server alive?
+;;; ;;
+;;; (define (ping acfg host port)
+;;; (let* ((myaddr (area-myaddr acfg))
+;;; (myport (area-port acfg))
+;;; (start-time (current-milliseconds))
+;;; (res (if (and (equal? myaddr host)
+;;; (equal? myport port))
+;;; (real-ping acfg)
+;;; ((rpc:procedure 'ping host port)))))
+;;; (cons (- (current-milliseconds) start-time)
+;;; res)))
+;;;
+;;; ;; returns ( ipaddr port alist-fname=>randnum )
+;;; (define (real-ping acfg)
+;;; `(,(area-myaddr acfg) ,(area-port acfg) ,(get-host-stats acfg)))
+;;;
+;;; ;; is the server alive AND the queues processing?
+;;; ;;
+;;; #;(define (full-ping acfg servpkt)
+;;; (let* ((start-time (current-milliseconds))
+;;; (res (send-message acfg servpkt '(full-ping) 'full-ping)))
+;;; (cons (- (current-milliseconds) start-time)
+;;; res))) ;; (equal? res "got ping"))))
+;;;
+;;;
+;;; ;; look up all pkts and get the server id (the hash), port, host/ip
+;;; ;; store this info in acfg
+;;; ;; return the number of responsive servers found
+;;; ;;
+;;; ;; DO NOT VERIFY THAT THE SERVER IS ALIVE HERE. This is called at times where the current server is not yet alive and cannot ping itself
+;;; ;;
+;;; (define (update-known-servers acfg)
+;;; ;; readll all pkts
+;;; ;; foreach pkt; if it isn't me ping the server; if alive, add to hosts hash, else rm the pkt
+;;; (let* ((start-time (current-milliseconds))
+;;; (all-pkts (delete-duplicates
+;;; (append (get-all-server-pkts acfg)
+;;; (hash-table-values (area-hosts acfg)))))
+;;; (hostshash (area-hosts acfg))
+;;; (my-id (area-pktid acfg))
+;;; (pktsdir (area-pktsdir acfg)) ;; needed to remove pkts from non-responsive servers
+;;; (numsrvs 0)
+;;; (delpkt (lambda (pktsdir sid)
+;;; (print "clearing out server " sid)
+;;; (delete-file* (conc pktsdir "/" sid ".pkt"))
+;;; (hash-table-delete! hostshash sid))))
+;;; (area-last-srvup-set! acfg (current-seconds))
+;;; (for-each
+;;; (lambda (servpkt)
+;;; (if (list? servpkt)
+;;; ;; (pp servpkt)
+;;; (let* ((shost (alist-ref 'ipaddr servpkt))
+;;; (sport (any->number (alist-ref 'port servpkt)))
+;;; (res (handle-exceptions
+;;; exn
+;;; (begin
+;;; ;; (print "INFO: bad server on " shost ":" sport)
+;;; #f)
+;;; (ping acfg shost sport)))
+;;; (sid (alist-ref 'Z servpkt)) ;; Z code is our name for the server
+;;; (url (conc shost ":" sport))
+;;; )
+;;; #;(if (or (not res)
+;;; (null? res))
+;;; (begin
+;;; (print "STRANGE: ping of " url " gave " res)))
+;;;
+;;; ;; (print "Got " res " from " shost ":" sport)
+;;; (match res
+;;; ((qduration . payload)
+;;; ;; (print "Server pkt:" (alist-ref 'ipaddr servpkt) ":" (alist-ref 'port servpkt)
+;;; ;; (if payload
+;;; ;; "Success" "Fail"))
+;;; (match payload
+;;; ((host port stats)
+;;; ;; (print "From " host ":" port " got stats: " stats)
+;;; (if (and host port stats)
+;;; (let ((url (conc host ":" port)))
+;;; (hash-table-set! hostshash sid servpkt)
+;;; ;; store based on host:port
+;;; (hash-table-set! (area-hoststats acfg) sid stats))
+;;; (print "missing data from the server, not sure what that means!"))
+;;; (set! numsrvs (+ numsrvs 1)))
+;;; (#f
+;;; (print "Removing pkt " sid " due to #f from server or failed ping")
+;;; (delpkt pktsdir sid))
+;;; (else
+;;; (print "Got ")(pp res)(print " from server ")(pp servpkt) " but response did not match (#f/#t . msg)")))
+;;; (else
+;;; ;; here we delete the pkt - can't reach the server, remove it
+;;; ;; however this logic is inadequate. we should mark the server as checked
+;;; ;; and not good, if it happens a second time - then remove the pkt
+;;; ;; or something similar. I.e. don't be too quick to assume the server is wedged or dead
+;;; ;; could be it is simply too busy to reply
+;;; (let ((bad-pings (hash-table-ref/default (area-health acfg) url 0)))
+;;; (if (> bad-pings 1) ;; two bad pings - remove pkt
+;;; (begin
+;;; (print "INFO: " bad-pings " bad responses from " url ", deleting pkt " sid)
+;;; (delpkt pktsdir sid))
+;;; (begin
+;;; (print "INFO: " bad-pings " bad responses from " shost ":" sport " not deleting pkt yet")
+;;; (hash-table-set! (area-health acfg)
+;;; url
+;;; (+ (hash-table-ref/default (area-health acfg) url 0) 1))
+;;; ))
+;;; ))))
+;;; ;; servpkt is not actually a pkt?
+;;; (begin
+;;; (print "Bad pkt " servpkt))))
+;;; all-pkts)
+;;; (sdbg> "update-known-servers" "end" start-time #f #f " found " numsrvs
+;;; " servers, pkts: " (map (lambda (p)
+;;; (alist-ref 'Z p))
+;;; all-pkts))
+;;; numsrvs))
+;;;
+;;; (defstruct srvstat
+;;; (numfiles 0) ;; number of db files handled by this server - subtract 1 for the db being currently looked at
+;;; (randnum #f) ;; tie breaker number assigned to by the server itself - applies only to the db under consideration
+;;; (pkt #f)) ;; the server pkt
+;;;
+;;; ;;(define (srv->srvstat srvpkt)
+;;;
+;;; ;; Get the server best for given dbname and key
+;;; ;;
+;;; ;; NOTE: key is not currently used. The key points to the kind of query, this may be useful for directing read-only queries.
+;;; ;;
+;;; (define (get-best-server acfg dbname key)
+;;; (let* (;; (servers (hash-table-values (area-hosts acfg)))
+;;; (servers (area-hosts acfg))
+;;; (skeys (sort (hash-table-keys servers) string>=?)) ;; a stable listing
+;;; (start-time (current-milliseconds))
+;;; (srvstats (make-hash-table)) ;; srvid => srvstat
+;;; (url (conc (area-myaddr acfg) ":" (area-port acfg))))
+;;; ;; (print "scores for " dbname ": " (map (lambda (k)(cons k (calc-server-score acfg dbname k))) skeys))
+;;; (if (null? skeys)
+;;; (if (> (update-known-servers acfg) 0)
+;;; (get-best-server acfg dbname key) ;; some risk of infinite loop here, TODO add try counter
+;;; (begin
+;;; (print "ERROR: no server found!") ;; since this process is also a server this should never happen
+;;; #f))
+;;; (begin
+;;; ;; (print "in get-best-server with skeys=" skeys)
+;;; (if (> (- (current-seconds) (area-last-srvup acfg)) 10)
+;;; (begin
+;;; (update-known-servers acfg)
+;;; (sdbg> "get-best-server" "update-known-servers" start-time #f #f)))
+;;;
+;;; ;; for each server look at the list of dbfiles, total number of dbs being handled
+;;; ;; and the rand number, save the best host
+;;; ;; also do a delist-db for each server dbfile not used
+;;; (let* ((best-server #f)
+;;; (servers-to-delist (make-hash-table)))
+;;; (for-each
+;;; (lambda (srvid)
+;;; (let* ((server (hash-table-ref/default servers srvid #f))
+;;; (stats (hash-table-ref/default (area-hoststats acfg) srvid '(()))))
+;;; ;; (print "stats: " stats)
+;;; (if server
+;;; (let* ((dbweights (car stats))
+;;; (srvload (length (filter (lambda (x)(not (equal? dbname (car x)))) dbweights)))
+;;; (dbrec (alist-ref dbname dbweights equal?)) ;; get the pair with fname . randscore
+;;; (randnum (if dbrec
+;;; dbrec ;; (cdr dbrec)
+;;; 0)))
+;;; (hash-table-set! srvstats srvid (make-srvstat numfiles: srvload randnum: randnum pkt: server))))))
+;;; skeys)
+;;;
+;;; (let* ((sorted (sort (hash-table-values srvstats)
+;;; (lambda (a b)
+;;; (let ((numfiles-a (srvstat-numfiles a))
+;;; (numfiles-b (srvstat-numfiles b))
+;;; (randnum-a (srvstat-randnum a))
+;;; (randnum-b (srvstat-randnum b)))
+;;; (if (< numfiles-a numfiles-b) ;; Note, I don't think adding an offset works here. Goal was only move file handling to a different server if it has 2 less
+;;; #t
+;;; (if (and (equal? numfiles-a numfiles-b)
+;;; (< randnum-a randnum-b))
+;;; #t
+;;; #f))))))
+;;; (best (if (null? sorted)
+;;; (begin
+;;; (print "ERROR: should never be null due to self as server.")
+;;; #f)
+;;; (srvstat-pkt (car sorted)))))
+;;; #;(print "SERVER(" url "): " dbname ": " (map (lambda (srv)
+;;; (let ((p (srvstat-pkt srv)))
+;;; (conc (alist-ref 'ipaddr p) ":" (alist-ref 'port p)
+;;; "(" (srvstat-numfiles srv)","(srvstat-randnum srv)")")))
+;;; sorted))
+;;; best))))))
+;;;
+;;; ;; send out an "I'm about to exit notice to all known servers"
+;;; ;;
+;;; (define (death-imminent acfg)
+;;; '())
+;;;
+;;; ;;======================================================================
+;;; ;; U L E X - T H E I N T E R E S T I N G S T U F F ! !
+;;; ;;======================================================================
+;;;
+;;; ;; register a handler
+;;; ;; NOTES:
+;;; ;; dbinitsql is reserved for a list of sql statements for initializing the db
+;;; ;; dbinitfn is reserved for a db init function, if exists called after dbinitsql
+;;; ;;
+;;; (define (register acfg key obj #!optional (ctype 'dbwrite))
+;;; (let ((ht (area-rtable acfg)))
+;;; (if (hash-table-exists? ht key)
+;;; (print "WARNING: redefinition of entry " key))
+;;; (hash-table-set! ht key (make-calldat obj: obj ctype: ctype))))
+;;;
+;;; ;; usage: register-batch acfg '((key1 . sql1) (key2 . sql2) ... )
+;;; ;; NB// obj is often an sql query
+;;; ;;
+;;; (define (register-batch acfg ctype data)
+;;; (let ((ht (area-rtable acfg)))
+;;; (map (lambda (dat)
+;;; (hash-table-set! ht (car dat)(make-calldat obj: (cdr dat) ctype: ctype)))
+;;; data)))
+;;;
+;;; (define (initialize-area-calls-from-specfile area specfile)
+;;; (let* ((callspec (with-input-from-file specfile read )))
+;;; (for-each (lambda (group)
+;;; (register-batch
+;;; area
+;;; (car group)
+;;; (cdr group)))
+;;; callspec)))
+;;;
+;;; ;; get-rentry
+;;; ;;
+;;; (define (get-rentry acfg key)
+;;; (hash-table-ref/default (area-rtable acfg) key #f))
+;;;
+;;; (define (get-rsql acfg key)
+;;; (let ((cdat (get-rentry acfg key)))
+;;; (if cdat
+;;; (calldat-obj cdat)
+;;; #f)))
+;;;
+;;;
+;;;
+;;; ;; blocking call:
+;;; ;; client server
+;;; ;; ------ ------
+;;; ;; call()
+;;; ;; send-message()
+;;; ;; nmsg-send()
+;;; ;; nmsg-receive()
+;;; ;; nmsg-respond(ack,cookie)
+;;; ;; ack, cookie
+;;; ;; mbox-thread-wait(cookie)
+;;; ;; nmsg-send(client,cookie,result)
+;;; ;; nmsg-respond(ack)
+;;; ;; return result
+;;; ;;
+;;; ;; reserved action:
+;;; ;; 'immediate
+;;; ;; 'dbinitsql
+;;; ;;
+;;; (define (call acfg dbname action params #!optional (count 0))
+;;; (let* ((call-start-time (current-milliseconds))
+;;; (srv (get-best-server acfg dbname action))
+;;; (post-get-start-time (current-milliseconds))
+;;; (rdat (hash-table-ref/default (area-rtable acfg) action #f))
+;;; (myid (trim-pktid (area-pktid acfg)))
+;;; (srvid (trim-pktid (alist-ref 'Z srv)))
+;;; (cookie (make-cookie myid)))
+;;; (sdbg> "call" "get-best-server" call-start-time #f call-start-time " from: " myid " to server: " srvid " for " dbname " action: " action " params: " params " rdat: " rdat)
+;;; (print "INFO: call to " (alist-ref 'ipaddr srv) ":" (alist-ref 'port srv) " from " (area-myaddr acfg) ":" (area-port acfg) " for " dbname)
+;;; (if (and srv rdat) ;; need both to dispatch a request
+;;; (let* ((ripaddr (alist-ref 'ipaddr srv))
+;;; (rsrvid (alist-ref 'Z srv))
+;;; (rport (any->number (alist-ref 'port srv)))
+;;; (res-full (if (and (equal? ripaddr (area-myaddr acfg))
+;;; (equal? rport (area-port acfg)))
+;;; (request acfg ripaddr rport (area-pktid acfg) action cookie dbname params)
+;;; (safe-call 'request ripaddr rport
+;;; (area-myaddr acfg)
+;;; (area-port acfg)
+;;; #;(area-pktid acfg)
+;;; rsrvid
+;;; action cookie dbname params))))
+;;; ;; (print "res-full: " res-full)
+;;; (match res-full
+;;; ((response-ok response-msg rem ...)
+;;; (let* ((send-message-time (current-milliseconds))
+;;; ;; (match res-full
+;;; ;; ((response-ok response-msg)
+;;; ;; (response-ok (car res-full))
+;;; ;; (response-msg (cadr res-full)
+;;; )
+;;; ;; (res (take res-full 3))) ;; ctype == action, TODO: converge on one term <<=== what was this? BUG
+;;; ;; (print "ulex:call: send-message took " (- send-message-time post-get-start-time) " ms params=" params)
+;;; (sdbg> "call" "send-message" post-get-start-time #f call-start-time)
+;;; (cond
+;;; ((not response-ok) #f)
+;;; ((member response-msg '("db read submitted" "db write submitted"))
+;;; (let* ((cookie-id (cadddr res-full))
+;;; (mbox (make-mailbox))
+;;; (mbox-time (current-milliseconds)))
+;;; (hash-table-set! (area-cookie2mbox acfg) cookie-id mbox)
+;;; (let* ((mbox-timeout-secs 20)
+;;; (mbox-timeout-result 'MBOX_TIMEOUT)
+;;; (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result))
+;;; (mbox-receive-time (current-milliseconds)))
+;;; (hash-table-delete! (area-cookie2mbox acfg) cookie-id)
+;;; (sdbg> "call" "mailbox-receive" mbox-time #f call-start-time " from: " myid " to server: " srvid " for " dbname)
+;;; ;; (print "ulex:call mailbox-receive took " (- mbox-receive-time mbox-time) "ms params=" params)
+;;; res)))
+;;; (else
+;;; (print "Unhandled response \""response-msg"\"")
+;;; #f))
+;;; ;; depending on what action (i.e. ctype) is we will block here waiting for
+;;; ;; all the data (mechanism to be determined)
+;;; ;;
+;;; ;; if res is a "working on it" then wait
+;;; ;; wait for result
+;;; ;; mailbox thread wait on
+;;;
+;;; ;; if res is a "can't help you" then try a different server
+;;; ;; if res is a "ack" (e.g. for one-shot requests) then return res
+;;; ))
+;;; (else
+;;; (if (< count 10)
+;;; (let* ((url (conc (alist-ref 'ipaddr srv) ":" (alist-ref 'port srv))))
+;;; (thread-sleep! 1)
+;;; (print "ERROR: Bad result from " url ", dbname: " dbname ", action: " action ", params: " params ". Trying again in 1 second.")
+;;; (call acfg dbname action params (+ count 1)))
+;;; (begin
+;;; (error (conc "ERROR: " count " tries, still have improper response res-full=" res-full)))))))
+;;; (begin
+;;; (if (not rdat)
+;;; (print "ERROR: action " action " not registered.")
+;;; (if (< count 10)
+;;; (begin
+;;; (thread-sleep! 1)
+;;; (area-hosts-set! acfg (make-hash-table)) ;; clear out all known hosts
+;;; (print "ERROR: no server found, srv=" srv ", trying again in 1 seconds")
+;;; (call acfg dbname action params (+ count 1)))
+;;; (begin
+;;; (error (conc "ERROR: no server found after 10 tries, srv=" srv ", giving up."))
+;;; #;(error "No server available"))))))))
+;;;
+;;;
+;;; ;;======================================================================
+;;; ;; U T I L I T I E S
+;;; ;;======================================================================
+;;;
+;;; ;; get a signature for identifing this process
+;;; ;;
+;;; (define (get-process-signature)
+;;; (cons (get-host-name)(current-process-id)))
+;;;
+;;; ;;======================================================================
+;;; ;; S Y S T E M S T U F F
+;;; ;;======================================================================
+;;;
+;;; ;; get normalized cpu load by reading from /proc/loadavg and
+;;; ;; /proc/cpuinfo return all three values and the number of real cpus
+;;; ;; and the number of threads returns alist '((adj-cpu-load
+;;; ;; . normalized-proc-load) ... etc. keys: adj-proc-load,
+;;; ;; adj-core-load, 1m-load, 5m-load, 15m-load
+;;; ;;
+;;; (define (get-normalized-cpu-load)
+;;; (let ((res (get-normalized-cpu-load-raw))
+;;; (default `((adj-proc-load . 2) ;; there is no right answer
+;;; (adj-core-load . 2)
+;;; (1m-load . 2)
+;;; (5m-load . 0) ;; causes a large delta - thus causing default of throttling if stuff goes wrong
+;;; (15m-load . 0)
+;;; (proc . 1)
+;;; (core . 1)
+;;; (phys . 1)
+;;; (error . #t))))
+;;; (cond
+;;; ((and (list? res)
+;;; (> (length res) 2))
+;;; res)
+;;; ((eq? res #f) default) ;; add messages?
+;;; ((eq? res #f) default) ;; this would be the #eof
+;;; (else default))))
+;;;
+;;; (define (get-normalized-cpu-load-raw)
+;;; (let* ((actual-host (get-host-name))) ;; #f is localhost
+;;; (let ((data (append
+;;; (with-input-from-file "/proc/loadavg" read-lines)
+;;; (with-input-from-file "/proc/cpuinfo" read-lines)
+;;; (list "end")))
+;;; (load-rx (regexp "^([\\d\\.]+)\\s+([\\d\\.]+)\\s+([\\d\\.]+)\\s+.*$"))
+;;; (proc-rx (regexp "^processor\\s+:\\s+(\\d+)\\s*$"))
+;;; (core-rx (regexp "^core id\\s+:\\s+(\\d+)\\s*$"))
+;;; (phys-rx (regexp "^physical id\\s+:\\s+(\\d+)\\s*$"))
+;;; (max-num (lambda (p n)(max (string->number p) n))))
+;;; ;; (print "data=" data)
+;;; (if (null? data) ;; something went wrong
+;;; #f
+;;; (let loop ((hed (car data))
+;;; (tal (cdr data))
+;;; (loads #f)
+;;; (proc-num 0) ;; processor includes threads
+;;; (phys-num 0) ;; physical chip on motherboard
+;;; (core-num 0)) ;; core
+;;; ;; (print hed ", " loads ", " proc-num ", " phys-num ", " core-num)
+;;; (if (null? tal) ;; have all our data, calculate normalized load and return result
+;;; (let* ((act-proc (+ proc-num 1))
+;;; (act-phys (+ phys-num 1))
+;;; (act-core (+ core-num 1))
+;;; (adj-proc-load (/ (car loads) act-proc))
+;;; (adj-core-load (/ (car loads) act-core))
+;;; (result
+;;; (append (list (cons 'adj-proc-load adj-proc-load)
+;;; (cons 'adj-core-load adj-core-load))
+;;; (list (cons '1m-load (car loads))
+;;; (cons '5m-load (cadr loads))
+;;; (cons '15m-load (caddr loads)))
+;;; (list (cons 'proc act-proc)
+;;; (cons 'core act-core)
+;;; (cons 'phys act-phys)))))
+;;; result)
+;;; (regex-case
+;;; hed
+;;; (load-rx ( x l1 l5 l15 ) (loop (car tal)(cdr tal)(map string->number (list l1 l5 l15)) proc-num phys-num core-num))
+;;; (proc-rx ( x p ) (loop (car tal)(cdr tal) loads (max-num p proc-num) phys-num core-num))
+;;; (phys-rx ( x p ) (loop (car tal)(cdr tal) loads proc-num (max-num p phys-num) core-num))
+;;; (core-rx ( x c ) (loop (car tal)(cdr tal) loads proc-num phys-num (max-num c core-num)))
+;;; (else
+;;; (begin
+;;; ;; (print "NO MATCH: " hed)
+;;; (loop (car tal)(cdr tal) loads proc-num phys-num core-num))))))))))
+;;;
+;;; (define (get-host-stats acfg)
+;;; (let ((stats-hash (area-stats acfg)))
+;;; ;; use this opportunity to remove references to dbfiles which have not been accessed in a while
+;;; (for-each
+;;; (lambda (dbname)
+;;; (let* ((stats (hash-table-ref stats-hash dbname))
+;;; (last-access (stat-when stats)))
+;;; (if (and (> last-access 0) ;; if zero then there has been no access
+;;; (> (- (current-seconds) last-access) 10)) ;; not used in ten seconds
+;;; (begin
+;;; (print "Removing " dbname " from stats list")
+;;; (hash-table-delete! stats-hash dbname) ;; remove from stats hash
+;;; (stat-dbs-set! stats (hash-table-keys stats))))))
+;;; (hash-table-keys stats-hash))
+;;;
+;;; `(,(hash-table->alist (area-dbs acfg)) ;; dbname => randnum
+;;; ,(map (lambda (dbname) ;; dbname is the db name
+;;; (cons dbname (stat-when (hash-table-ref stats-hash dbname))))
+;;; (hash-table-keys stats-hash))
+;;; (cpuload . ,(get-normalized-cpu-load)))))
+;;; #;(stats . ,(map (lambda (k) ;; create an alist from the stats data
+;;; (cons k (stat->alist (hash-table-ref (area-stats acfg) k))))
+;;; (hash-table-keys (area-stats acfg))))
+;;;
+;;; #;(trace
+;;; ;; assv
+;;; ;; cdr
+;;; ;; caar
+;;; ;; ;; cdr
+;;; ;; call
+;;; ;; finalize-all-db-handles
+;;; ;; get-all-server-pkts
+;;; ;; get-normalized-cpu-load
+;;; ;; get-normalized-cpu-load-raw
+;;; ;; launch
+;;; ;; nmsg-send
+;;; ;; process-db-queries
+;;; ;; receive-message
+;;; ;; std-peer-handler
+;;; ;; update-known-servers
+;;; ;; work-queue-processor
+;;; )
+;;;
+;;; ;;======================================================================
+;;; ;; netutil
+;;; ;; move this back to ulex-netutil.scm someday?
+;;; ;;======================================================================
+;;;
+;;; ;; #include
+;;; ;; #include
+;;; ;; #include
+;;; ;; #include
+;;;
+;;; (foreign-declare "#include \"sys/types.h\"")
+;;; (foreign-declare "#include \"sys/socket.h\"")
+;;; (foreign-declare "#include \"ifaddrs.h\"")
+;;; (foreign-declare "#include \"arpa/inet.h\"")
+;;;
+;;; ;; get IP addresses from ALL interfaces
+;;; (define get-all-ips
+;;; (foreign-safe-lambda* scheme-object ()
+;;; "
+;;;
+;;; // from https://stackoverflow.com/questions/17909401/linux-c-get-default-interfaces-ip-address :
+;;;
+;;;
+;;; C_word lst = C_SCHEME_END_OF_LIST, len, str, *a;
+;;; // struct ifaddrs *ifa, *i;
+;;; // struct sockaddr *sa;
+;;;
+;;; struct ifaddrs * ifAddrStruct = NULL;
+;;; struct ifaddrs * ifa = NULL;
+;;; void * tmpAddrPtr = NULL;
+;;;
+;;; if ( getifaddrs(&ifAddrStruct) != 0)
+;;; C_return(C_SCHEME_FALSE);
+;;;
+;;; // for (i = ifa; i != NULL; i = i->ifa_next) {
+;;; for (ifa = ifAddrStruct; ifa != NULL; ifa = ifa->ifa_next) {
+;;; if (ifa->ifa_addr->sa_family==AF_INET) { // Check it is
+;;; // a valid IPv4 address
+;;; tmpAddrPtr = &((struct sockaddr_in *)ifa->ifa_addr)->sin_addr;
+;;; char addressBuffer[INET_ADDRSTRLEN];
+;;; inet_ntop(AF_INET, tmpAddrPtr, addressBuffer, INET_ADDRSTRLEN);
+;;; // printf(\"%s IP Address %s\\n\", ifa->ifa_name, addressBuffer);
+;;; len = strlen(addressBuffer);
+;;; a = C_alloc(C_SIZEOF_PAIR + C_SIZEOF_STRING(len));
+;;; str = C_string(&a, len, addressBuffer);
+;;; lst = C_a_pair(&a, str, lst);
+;;; }
+;;;
+;;; // else if (ifa->ifa_addr->sa_family==AF_INET6) { // Check it is
+;;; // // a valid IPv6 address
+;;; // tmpAddrPtr = &((struct sockaddr_in6 *)ifa->ifa_addr)->sin6_addr;
+;;; // char addressBuffer[INET6_ADDRSTRLEN];
+;;; // inet_ntop(AF_INET6, tmpAddrPtr, addressBuffer, INET6_ADDRSTRLEN);
+;;; //// printf(\"%s IP Address %s\\n\", ifa->ifa_name, addressBuffer);
+;;; // len = strlen(addressBuffer);
+;;; // a = C_alloc(C_SIZEOF_PAIR + C_SIZEOF_STRING(len));
+;;; // str = C_string(&a, len, addressBuffer);
+;;; // lst = C_a_pair(&a, str, lst);
+;;; // }
+;;;
+;;; // else {
+;;; // printf(\" not an IPv4 address\\n\");
+;;; // }
+;;;
+;;; }
+;;;
+;;; freeifaddrs(ifa);
+;;; C_return(lst);
+;;;
+;;; "))
+;;;
+;;; ;; Change this to bias for addresses with a reasonable broadcast value?
+;;; ;;
+;;; (define (ip-pref-less? a b)
+;;; (let* ((rate (lambda (ipstr)
+;;; (regex-case ipstr
+;;; ( "^127\\." _ 0 )
+;;; ( "^(10\\.0|192\\.168\\.)\\..*" _ 1 )
+;;; ( else 2 ) ))))
+;;; (< (rate a) (rate b))))
+;;;
+;;;
+;;; (define (get-my-best-address)
+;;; (let ((all-my-addresses (get-all-ips))
+;;; ;;(all-my-addresses-old (vector->list (hostinfo-addresses (hostname->hostinfo (get-host-name)))))
+;;; )
+;;; (cond
+;;; ((null? all-my-addresses)
+;;; (get-host-name)) ;; no interfaces?
+;;; ((eq? (length all-my-addresses) 1)
+;;; (car all-my-addresses)) ;; only one to choose from, just go with it
+;;;
+;;; (else
+;;; (car (sort all-my-addresses ip-pref-less?)))
+;;; ;; (else
+;;; ;; (ip->string (car (filter (lambda (x) ;; take any but 127.
+;;; ;; (not (eq? (u8vector-ref x 0) 127)))
+;;; ;; all-my-addresses))))
+;;;
+;;; )))
+;;;
+;;; (define (get-all-ips-sorted)
+;;; (sort (get-all-ips) ip-pref-less?))
+;;;
+;;;
+
Index: utils/deps.scm
==================================================================
--- utils/deps.scm
+++ utils/deps.scm
@@ -17,16 +17,21 @@
(if (eof-object? l)
data
(begin
(regex-case
l
- ("^\\s*\\(import\\s+([^\\s]+)\\).*" (x md )
- (print "\"" md "\" -> \"" modname "\";")))
+ ("^\\s*\\((import|use)\\s+([^\\s]+)\\).*"
+ (x junk md )(print "\"" md "\" -> \"" modname "\";"))
+ ;; now get entries with prefix
+ ("^\\s*\\((import|use)\\s+\\(prefix\\s+([^\\s]+)\\s+.*\\).*"
+ (x junk md ) (print "\"" md "\" -> \"" modname "\";"))
+ )
(loop (read-line)))))))))
(define (do-all-mod-files)
- (let ((modfiles (get-files "*mod.scm")))
+ (let ((modfiles (append ;; '("megatest.scm" "dashboard.scm")
+ (get-files "*mod.scm"))))
(for-each
(lambda (mfile)
(print "// " mfile)
(get-deps mfile))
modfiles)))