Index: Makefile
==================================================================
--- Makefile
+++ Makefile
@@ -29,11 +29,12 @@
MSRCFILES = dbmod.scm rmtmod.scm commonmod.scm apimod.scm \
archivemod.scm clientmod.scm envmod.scm ezstepsmod.scm itemsmod.scm \
keysmod.scm launchmod.scm odsmod.scm processmod.scm runconfigmod.scm \
runsmod.scm servermod.scm subrunmod.scm tasksmod.scm testsmod.scm \
-pkts.scm mtargs.scm mtconfigf.scm ducttape-lib.scm megamod.scm
+pkts.scm mtargs.scm mtconfigf.scm ducttape-lib.scm ulex.scm \
+megamod.scm
GMSRCFILES = dcommonmod.scm vgmod.scm treemod.scm
# Eggs to install (straightforward ones)
@@ -208,21 +209,24 @@
# special include based modules
mofiles/pkts.o : pkts/pkts.scm
mofiles/mtargs.o : mtargs/mtargs.scm
mofiles/mtconfigf.o : mtconfigf/mtconfigf.scm
+mofiles/ulex.o : ulex/ulex.scm
# mofile/ducttape-lib.o : ducttape/ducttape-lib.scm
# Temporary while transitioning to new routine
# runs.o : run-tests-queue-classic.scm run-tests-queue-new.scm
# for the modularized stuff
mofiles/commonmod.o : megatest-fossil-hash.scm
-mofiles/dbmod.o : mofiles/commonmod.o mofiles/keysmod.o mofiles/tasksmod.o mofiles/odsmod.o
+mofiles/dbmod.o : mofiles/commonmod.o mofiles/keysmod.o \
+ mofiles/tasksmod.o mofiles/odsmod.o
mofiles/commonmod.o : mofiles/processmod.o
-mofiles/rmtmod.o : mofiles/dbmod.o mofiles/commonmod.o mofiles/apimod.o
+mofiles/rmtmod.o : mofiles/dbmod.o mofiles/commonmod.o \
+ mofiles/apimod.o mofiles/ulex.o
mofiles/apimod.o : mofiles/dbmod.o
# Removed from megamod.o dep: mofiles/ftail.o
mofiles/megamod.o : \
mofiles/rmtmod.o \
mofiles/commonmod.o \
Index: megatest.scm
==================================================================
--- megatest.scm
+++ megatest.scm
@@ -697,17 +697,17 @@
(process:children #f))
(original-exit exit-code)))))
;; for some switches always print the command to stderr
;;
-(if (args:any? "-run" "-runall" "-remove-runs" "-set-state-status" "-kill-runs" "-kill-rerun")
+(if (args:any-defined? "-run" "-runall" "-remove-runs" "-set-state-status" "-kill-runs" "-kill-rerun")
(debug:print 0 *default-log-port* (string-intersperse (argv) " ")))
;; some switches imply homehost. Exit here if not on homehost
;;
(let ((homehost-required (list "-cleanup-db" "-server")))
- (if (apply args:any? homehost-required)
+ (if (apply args:any-defined? homehost-required)
(if (not (common:on-homehost?))
(for-each
(lambda (switch)
(if (args:get-arg switch)
(begin
Index: mtargs/mtargs.scm
==================================================================
--- mtargs/mtargs.scm
+++ mtargs/mtargs.scm
@@ -40,10 +40,12 @@
(hash-table-ref/default arg-hash arg (car default))))
(define (any-defined? . args)
(not (null? (filter (lambda (x) x)
(map get-arg args)))))
+
+;; (define any any-defined?)
(define (get-arg-from ht arg . default)
(if (null? default)
(hash-table-ref/default ht arg #f)
(hash-table-ref/default ht arg (car default))))
Index: rmtmod.scm
==================================================================
--- rmtmod.scm
+++ rmtmod.scm
@@ -17,10 +17,11 @@
;; along with Megatest. If not, see .
;;======================================================================
(declare (unit rmtmod))
+(declare (uses ulex))
;; (declare (uses commonmod))
;; (declare (uses dbmod))
;; (declare (uses megamod))
(module rmtmod
@@ -30,10 +31,10 @@
(import (prefix sqlite3 sqlite3:) posix typed-records srfi-18 srfi-69 format ports srfi-1 matchable)
;; (import commonmod) ;;; DO NOT ALLOW rmt*scm TO DEPEND ON common*scm!!!!
;; (import dbmod)
;; (import megamod)
-(use (prefix ulex ulex:))
+(import (prefix ulex ulex:))
;; (include "rmt-inc.scm")
;; (include "common_records.scm")
)
ADDED ulex.scm
Index: ulex.scm
==================================================================
--- /dev/null
+++ ulex.scm
@@ -0,0 +1,23 @@
+;;======================================================================
+;; Copyright 2019, Matthew Welland.
+;;
+;; This file is part of Megatest.
+;;
+;; Megatest is free software: you can redistribute it and/or modify
+;; it under the terms of the GNU General Public License as published by
+;; the Free Software Foundation, either version 3 of the License, or
+;; (at your option) any later version.
+;;
+;; Megatest is distributed in the hope that it will be useful,
+;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+;; GNU General Public License for more details.
+;;
+;; You should have received a copy of the GNU General Public License
+;; along with Megatest. If not, see .
+
+;;======================================================================
+
+(declare (unit ulex))
+
+(include "ulex/ulex.scm")
ADDED ulex/Makefile
Index: ulex/Makefile
==================================================================
--- /dev/null
+++ ulex/Makefile
@@ -0,0 +1,25 @@
+
+
+all : example
+
+ulex.so : ulex.scm telemetry/telemetry.so netutil/ulex-netutil.so portlogger/portlogger.so
+ chicken-install
+
+telemetry/telemetry.so : telemetry/telemetry.scm
+ cd telemetry && chicken-install
+
+example : ulex.so example.scm
+ csc example.scm
+
+test : ulex.so
+ csi -b tests/run.scm
+
+portlogger/portlogger.so : portlogger/portlogger.scm
+ cd portlogger && chicken-install
+ csi -s portlogger/test.scm
+
+netutil/ulex-netutil.so: netutil/ulex-netutil.scm
+ cd netutil && chicken-install
+
+clean:
+ rm -f example *so */*so *.import.* */*.import.*
ADDED ulex/README
Index: ulex/README
==================================================================
--- /dev/null
+++ ulex/README
@@ -0,0 +1,25 @@
+ulex: thorny evergreen shrubs, also known as furze or gorse.
+
+Your application needs a database but the moment you build in
+dependence on Postgresql or Mysql your application becomes ponderous.
+Ulex is a mesh networked sqlite3 database, intended to provide a
+nimble alternative. It is designed so that nodes can come and go while
+talking to each other and sharing data via sqlite3 in a (mostly) ACID
+compliant way. Ulex mitigates issues with Sqlite3 on networked
+filesystems (e.g. NFS, Moosefs etc.) such as "lock storms" (*) and
+performance bottlenecks.
+
+(*) Network filesystems must provide posix locking across all files
+across all clients. This is handled by the file server by a queue. If
+the queue gets overwhelmed by lock requests locks may be dropped. This
+leads to concurrent access to shared files and in the case of sqlite3
+the dreaded "database locked" message that won't go away (to fix you
+have to move the file and copy it back). It can also lead to a
+corrupted database.
+
+About the name Ulex: The Ulex plant, also known as Gorse, furze, or
+Whin, is considered an invasive pest in places like New Zealand but in
+spite of its thorny persistence it is considered quite useful in its
+native Europe, providing edible flowers, shelter and food for
+wildlife, hedges and fuel for cooking.
+
ADDED ulex/TODO
Index: ulex/TODO
==================================================================
--- /dev/null
+++ ulex/TODO
@@ -0,0 +1,19 @@
+* move cookie generation to start in (call ...)
+* add timestamp from perspective of caller to data passed around
+* add standard print for each event
+* move portlogger into module (retrieve from Brandon's work)
+
+
+
+* Create queues
+
+* Create text line logging interface
+
+* Add registration calls; write, read, transactional, osslow and osfast
+
+* Extend example to cover all aspects
+
+*
+
+*
+
ADDED ulex/example-telemetry.scm
Index: ulex/example-telemetry.scm
==================================================================
--- /dev/null
+++ ulex/example-telemetry.scm
@@ -0,0 +1,57 @@
+(use telemetry)
+
+;; @startuml
+;; Alice -> Bob: Authentication Request
+;; Bob --> Alice: Authentication Response
+
+;; Alice -> Bob: Another authentication Request
+;; Alice <-- Bob: Another authentication Response
+;;
+;; ref over Bob
+;; This can be on
+;; several lines
+;; end ref
+;;
+;; @enduml
+(define *actor-table* (make-hash-table))
+(define *actor-next-id* 0)
+(define (get-actor-name key)
+ (let* ((cast (string-split "Alice,Bob,Carol,Dave,Eve,Frank,Grace,Heidi,Ivan,Judy,Mallory,Nancy,Oscar,Peggy,Rupert,Sybil,Ted,Victor,Wendy" ","))
+ (cast-count (length cast))
+ (name (hash-table-ref/default *actor-table* key #f)))
+ (cond
+ (name name)
+ (else
+ (let* ((id *actor-next-id*)
+ (name (list-ref cast (modulo id cast-count)))
+ (seq (inexact->exact (floor (/ id cast-count))))
+ (newname
+ (conc
+ name
+ (if (> seq 0)
+ (conc "-" seq)
+ ""))))
+ (set! *actor-next-id* (add1 *actor-next-id*))
+ (hash-table-set! *actor-table* key newname)
+ newname)))))
+
+
+(define (ulex-telemetry-server)
+ (let* ((port 12345)
+ (seq 0)
+ (handler (lambda (msg)
+ (set! seq (add1 seq))
+ (let* ((elapsed-ms (current-milliseconds))
+ (payload (alist-ref 'payload msg))
+ (action (if payload (alist-ref 'action payload)))
+ (from-key (if payload (alist-ref 'from-key payload)))
+ (to-key (if payload (alist-ref 'to-key payload)))
+ )
+ (if (and action from-key to-key)
+ (print (get-actor-name from-key) " -> " (get-actor-name to-key) ": "action))
+ ;;(pp msg)
+ ))))
+ (telemetry-server port handler)))
+
+(ulex-telemetry-server)
+
ADDED ulex/example.scm
Index: ulex/example.scm
==================================================================
--- /dev/null
+++ ulex/example.scm
@@ -0,0 +1,201 @@
+;;; dbi: Minimal gasket to postgresql, sqlite3 and mysql
+;;;
+;; Copyright (C) 2007-2016 Matt Welland
+;; Redistribution and use in source and binary forms, with or without
+;; modification, is permitted.
+;;
+;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS
+;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+;; ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE
+;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
+;; OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+;; BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+;; LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
+;; USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
+;; DAMAGE.
+
+(use regex srfi-18 matchable)
+
+(load "ulex.scm")
+(import (prefix ulex ulex:))
+
+(create-directory "ulexdb" #t)
+(create-directory "pkts" #f)
+
+(define *area* (ulex:make-area
+ dbdir: (conc (current-directory) "/ulexdb")
+ pktsdir: (conc (current-directory) "/pkts")
+ ))
+(define (toplevel-command . args) #f)
+(use readline)
+
+;; two reserved keys in the ulex registration hash table are:
+;; dbinitsql => a list of sql statements to be executed at db creation time
+;; dbinitfn => a function of two params; dbh, the sql-de-lite db handle and
+;; dbfname, the database filename
+;;
+(ulex:register-batch
+ *area*
+ 'dbwrite
+ `((dbinitsql . ("CREATE TABLE IF NOT EXISTS messages (id INTEGER PRIMARY KEY, message TEXT, author TEXT, msg_time INTEGER);"))
+ (savemsg . "INSERT INTO messages (message,author) VALUES (?,?)")
+ ))
+
+(ulex:register-batch
+ *area*
+ 'dbread
+ `((dbinitsql . ("CREATE TABLE IF NOT EXISTS messages (id INTEGER PRIMARY KEY, message TEXT, author TEXT, msg_time INTEGER);"))
+ (getnum . "SELECT COUNT(*) FROM messages")
+ (getsome . "SELECT * FROM messages LIMIT 10")
+ ))
+
+(define (worker mode-in)
+ (let* ((start (current-milliseconds))
+ (iters-per-sample 10)
+ (mode (string->symbol mode-in))
+ (max-count (case mode
+ ((all) 60)
+ (else 1000)))
+ (num-calls 0)
+ (report (lambda ()
+ (let ((delta (- (current-milliseconds) start)))
+ (print "Completed " num-calls " in " delta
+ " for " (/ num-calls (/ delta 1000)) " calls per second")))))
+ (if (eq? mode 'repl)
+ (begin
+ (import extras) ;; might not be needed
+ ;; (import csi)
+ (import readline)
+ (import apropos)
+ (import (prefix ulex ulex:))
+ (install-history-file (get-environment-variable "HOME") ".example_history") ;; [homedir] [filename] [nlines])
+ (current-input-port (make-readline-port "example> "))
+ (repl))
+ (let loop ((count 0))
+ ;; (print "loop count=" count)
+ (for-each
+ (lambda (dbname)
+ ;;(print "TOP OF LAMBDA")
+ (case mode
+ ((all)
+ (let ((start-time (current-milliseconds))
+ (message (conc "Test message #" count "! From pid: " (current-process-id)))
+ (user (current-user-name)))
+ (ulex:call *area* dbname 'savemsg `(,message ,user))
+ (for-each (lambda (n)
+ (print "have this many " (ulex:call *area* dbname 'getnum '()) " records in main.db"))
+ (iota 10))
+ (set! num-calls (+ num-calls 11))
+ ))
+
+ ((ping)
+ (let ((srvrs (ulex:get-all-server-pkts *area*)))
+ (for-each
+ (lambda (srv)
+ (print "Pinging " srv)
+ (ulex:ping *area* srv))
+ srvrs)))
+ ((fullping)
+ (let ((srvrs (ulex:get-all-server-pkts *area*)))
+ (for-each
+ (lambda (srv)
+ (let ((ipaddr (alist-ref 'ipaddr srv))
+ (port (any->number (alist-ref 'port srv))))
+ (print "Full Ping to " srv)
+ (ulex:ping *area* ipaddr port)))
+ srvrs)))
+ ((passive)
+ (thread-sleep! 10))
+ ))
+ '("main.db")) ;; "test.db" "run-1.db" "run-2.db" "run-3.db" "run-4.db"))
+ #;(thread-sleep! 0.001)
+ #;(let ((now (current-milliseconds)))
+ (if (and (> now start)
+ (eq? (modulo count iters-per-sample) 0))
+ (begin
+ (print "queries per second: "(* 1000.0 (/ iters-per-sample (- now start))))
+ (set! count 0)
+ (set! start (current-milliseconds)))))
+ ;; (print "count: " count " max-count: " max-count)
+ (if (< count max-count)
+ (loop (+ count 1)))))
+ (report)
+ (ulex:clear-server-pkt *area*)
+ (thread-sleep! 5) ;; let others keep using this server (needs to be built in to ulex)
+ ;; (print "Doing stuff")
+ ;; (thread-sleep! 10)
+ (print "Done doing stuff")))
+
+(define (run-worker)
+ (thread-start!
+ (make-thread (lambda ()
+ (thread-sleep! 5)
+ (worker "all"))
+ "worker")))
+
+(define (main . args)
+ (if (member (car args) '("repl"))
+ (print "NOTE: No exit timer started.")
+ (thread-start! (make-thread (lambda ()
+ (thread-sleep! (* 60 5))
+ (ulex:clear-server-pkt *area*)
+ (thread-sleep! 5)
+ (exit 0)))))
+ (print "Launching server")
+ (ulex:launch *area*)
+ (print "LAUNCHED.")
+ (thread-sleep! 0.1) ;; chicken threads bit quirky? need little time for launch thread to get traction?
+ (apply worker args)
+ )
+
+;;======================================================================
+;; Strive for clean exit handling
+;;======================================================================
+
+;; Ulex shutdown is handled within Ulex itself.
+
+#;(define (server-exit-procedure)
+ (on-exit (lambda ()
+ ;; close the databases, ensure the pkt is removed!
+ ;; (thread-sleep! 2)
+ (ulex:shutdown *area*)
+ 0)))
+
+;; Copied from the SDL2 examples.
+;;
+;; Schedule quit! to be automatically called when your program exits normally.
+#;(on-exit server-exit-procedure)
+
+;; Install a custom exception handler that will call quit! and then
+;; call the original exception handler. This ensures that quit! will
+;; be called even if an unhandled exception reaches the top level.
+#;(current-exception-handler
+ (let ((original-handler (current-exception-handler)))
+ (lambda (exception)
+ (server-exit-procedure)
+ (original-handler exception))))
+
+(if (file-exists? ".examplerc")
+ (load ".examplerc"))
+
+(let ((args-in (argv))) ;; command-line-arguments)))
+ (let ((args (match
+ args-in
+ (("csi" "--" args ...) args)
+ ((_ args ...) args)
+ (else args-in))))
+ (if (null? args)
+ (begin
+ (print "Usage: example [mode]")
+ (print " where mode is one of:")
+ (print " ping : only do pings between servers")
+ (print " fullping : ping with response via processing queue")
+ (print " unix : only do unix commands")
+ (print " read : only do ping, unix and db reads")
+ (print " all : do pint, unix, and db reads and writes")
+ (exit))
+ (apply main args))))
+
ADDED ulex/netutil/testit.scm
Index: ulex/netutil/testit.scm
==================================================================
--- /dev/null
+++ ulex/netutil/testit.scm
@@ -0,0 +1,6 @@
+
+(use ulex-netutil)
+(use test)
+
+(test #f #t (not (not (member "127.0.0.1" (get-all-ips)))))
+
ADDED ulex/netutil/ulex-netutil.meta
Index: ulex/netutil/ulex-netutil.meta
==================================================================
--- /dev/null
+++ ulex/netutil/ulex-netutil.meta
@@ -0,0 +1,16 @@
+;; -*- scheme -*-
+(
+; Your egg's license:
+(license "BSD")
+
+; Pick one from the list of categories (see below) for your egg and enter it
+; here.
+(category db)
+
+(needs foreign )
+
+; A list of eggs required for TESTING ONLY. See the `Tests' section.
+(test-depends test)
+
+(author "Brandon Barclay")
+(synopsis "Get all IP addresses for all interfaces."))
ADDED ulex/netutil/ulex-netutil.release-info
Index: ulex/netutil/ulex-netutil.release-info
==================================================================
--- /dev/null
+++ ulex/netutil/ulex-netutil.release-info
@@ -0,0 +1,3 @@
+(repo fossil "http://www.kiatoa.com/cgi-bin/fossils/{egg-name}")
+(uri zip "http://www.kiatoa.com/cgi-bin/fossils/{egg-name}/zip/{egg-name}.zip?uuid={egg-release}")
+(release "0.1")
ADDED ulex/netutil/ulex-netutil.scm
Index: ulex/netutil/ulex-netutil.scm
==================================================================
--- /dev/null
+++ ulex/netutil/ulex-netutil.scm
@@ -0,0 +1,134 @@
+;;; ulex: Distributed sqlite3 db
+;;;
+;; Copyright (C) 2018 Matt Welland
+;; Redistribution and use in source and binary forms, with or without
+;; modification, is permitted.
+;;
+;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS
+;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+;; ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE
+;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
+;; OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+;; BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+;; LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
+;; USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
+;; DAMAGE.
+
+;;======================================================================
+;; ABOUT:
+;; See README in the distribution at https://www.kiatoa.com/fossils/ulex
+;; NOTES:
+;; provides all ipv4 addresses for all interfaces
+;;
+;;======================================================================
+
+;; get IP addresses from ALL interfaces
+(module ulex-netutil
+ (get-all-ips get-my-best-address get-all-ips-sorted)
+
+(import scheme chicken data-structures foreign ports regex-case posix)
+
+
+;; #include
+;; #include
+;; #include
+;; #include
+
+(foreign-declare "#include \"sys/types.h\"")
+(foreign-declare "#include \"sys/socket.h\"")
+(foreign-declare "#include \"ifaddrs.h\"")
+(foreign-declare "#include \"arpa/inet.h\"")
+
+;; get IP addresses from ALL interfaces
+(define get-all-ips
+ (foreign-safe-lambda* scheme-object ()
+ "
+
+// from https://stackoverflow.com/questions/17909401/linux-c-get-default-interfaces-ip-address :
+
+
+ C_word lst = C_SCHEME_END_OF_LIST, len, str, *a;
+// struct ifaddrs *ifa, *i;
+// struct sockaddr *sa;
+
+ struct ifaddrs * ifAddrStruct = NULL;
+ struct ifaddrs * ifa = NULL;
+ void * tmpAddrPtr = NULL;
+
+ if ( getifaddrs(&ifAddrStruct) != 0)
+ C_return(C_SCHEME_FALSE);
+
+// for (i = ifa; i != NULL; i = i->ifa_next) {
+ for (ifa = ifAddrStruct; ifa != NULL; ifa = ifa->ifa_next) {
+ if (ifa->ifa_addr->sa_family==AF_INET) { // Check it is
+ // a valid IPv4 address
+ tmpAddrPtr = &((struct sockaddr_in *)ifa->ifa_addr)->sin_addr;
+ char addressBuffer[INET_ADDRSTRLEN];
+ inet_ntop(AF_INET, tmpAddrPtr, addressBuffer, INET_ADDRSTRLEN);
+// printf(\"%s IP Address %s\\n\", ifa->ifa_name, addressBuffer);
+ len = strlen(addressBuffer);
+ a = C_alloc(C_SIZEOF_PAIR + C_SIZEOF_STRING(len));
+ str = C_string(&a, len, addressBuffer);
+ lst = C_a_pair(&a, str, lst);
+ }
+
+// else if (ifa->ifa_addr->sa_family==AF_INET6) { // Check it is
+// // a valid IPv6 address
+// tmpAddrPtr = &((struct sockaddr_in6 *)ifa->ifa_addr)->sin6_addr;
+// char addressBuffer[INET6_ADDRSTRLEN];
+// inet_ntop(AF_INET6, tmpAddrPtr, addressBuffer, INET6_ADDRSTRLEN);
+//// printf(\"%s IP Address %s\\n\", ifa->ifa_name, addressBuffer);
+// len = strlen(addressBuffer);
+// a = C_alloc(C_SIZEOF_PAIR + C_SIZEOF_STRING(len));
+// str = C_string(&a, len, addressBuffer);
+// lst = C_a_pair(&a, str, lst);
+// }
+
+// else {
+// printf(\" not an IPv4 address\\n\");
+// }
+
+ }
+
+ freeifaddrs(ifa);
+ C_return(lst);
+
+"))
+
+;; Change this to bias for addresses with a reasonable broadcast value?
+;;
+(define (ip-pref-less? a b)
+ (let* ((rate (lambda (ipstr)
+ (regex-case ipstr
+ ( "^127\\." _ 0 )
+ ( "^(10\\.0|192\\.168\\.)\\..*" _ 1 )
+ ( else 2 ) ))))
+ (< (rate a) (rate b))))
+
+
+(define (get-my-best-address)
+ (let ((all-my-addresses (get-all-ips))
+ ;;(all-my-addresses-old (vector->list (hostinfo-addresses (hostname->hostinfo (get-host-name)))))
+ )
+ (cond
+ ((null? all-my-addresses)
+ (get-host-name)) ;; no interfaces?
+ ((eq? (length all-my-addresses) 1)
+ (car all-my-addresses)) ;; only one to choose from, just go with it
+
+ (else
+ (car (sort all-my-addresses ip-pref-less?)))
+ ;; (else
+ ;; (ip->string (car (filter (lambda (x) ;; take any but 127.
+ ;; (not (eq? (u8vector-ref x 0) 127)))
+ ;; all-my-addresses))))
+
+ )))
+
+(define (get-all-ips-sorted)
+ (sort (get-all-ips) ip-pref-less?))
+
+)
ADDED ulex/netutil/ulex-netutil.setup
Index: ulex/netutil/ulex-netutil.setup
==================================================================
--- /dev/null
+++ ulex/netutil/ulex-netutil.setup
@@ -0,0 +1,11 @@
+;; Copyright 2007-2018, Matthew Welland.
+;;
+;; This program is made available under the GNU GPL version 2.0 or
+;; greater. See the accompanying file COPYING for details.
+;;
+;; This program is distributed WITHOUT ANY WARRANTY; without even the
+;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+;; PURPOSE.
+
+;;;; ulex.setup
+(standard-extension 'ulex-netutil "0.1")
ADDED ulex/portlogger/portlogger.meta
Index: ulex/portlogger/portlogger.meta
==================================================================
--- /dev/null
+++ ulex/portlogger/portlogger.meta
@@ -0,0 +1,16 @@
+;; -*- scheme -*-
+(
+; Your egg's license:
+(license "BSD")
+
+; Pick one from the list of categories (see below) for your egg and enter it
+; here.
+(category db)
+
+(needs foreign )
+
+; A list of eggs required for TESTING ONLY. See the `Tests' section.
+(test-depends test sqlite3 regex)
+
+(author "Matthew Welland")
+(synopsis "thoughtfully optain tcp port."))
ADDED ulex/portlogger/portlogger.release-info
Index: ulex/portlogger/portlogger.release-info
==================================================================
--- /dev/null
+++ ulex/portlogger/portlogger.release-info
@@ -0,0 +1,3 @@
+(repo fossil "http://www.kiatoa.com/cgi-bin/fossils/{egg-name}")
+(uri zip "http://www.kiatoa.com/cgi-bin/fossils/{egg-name}/zip/{egg-name}.zip?uuid={egg-release}")
+(release "0.1")
ADDED ulex/portlogger/portlogger.scm
Index: ulex/portlogger/portlogger.scm
==================================================================
--- /dev/null
+++ ulex/portlogger/portlogger.scm
@@ -0,0 +1,167 @@
+;;======================================================================
+;; P O R T L O G G E R - track ports used on the current machine
+;;======================================================================
+
+;;
+
+(module portlogger
+ (pl-open-run-close pl-find-port pl-release-port pl-open-db pl-get-prev-used-port pl-get-port-state pl-take-port)
+ (import scheme
+ posix
+ chicken
+ data-structures
+ ;ports
+ extras
+ ;files
+ ;mailbox
+ ;telemetry
+ regex
+ ;regex-case
+
+ )
+ (use (prefix sqlite3 sqlite3:))
+ (use posix)
+ (use regex)
+
+ (define (pl-open-db fname)
+ (let* ((avail #t) ;; for now - assume wait on journal not needed (tasks:wait-on-journal fname 5 remove: #t)) ;; wait up to about 10 seconds for the journal to go away
+ (exists (file-exists? fname))
+ (db (if avail
+ (sqlite3:open-database fname)
+ (begin
+ (system (conc "rm -f " fname))
+ (sqlite3:open-database fname))))
+ (handler (sqlite3:make-busy-timeout 136000))
+ (canwrite (file-write-access? fname)))
+ (sqlite3:set-busy-handler! db handler)
+ (sqlite3:execute db "PRAGMA synchronous = 0;")
+ (sqlite3:execute
+ db
+ "CREATE TABLE IF NOT EXISTS ports (
+ port INTEGER PRIMARY KEY,
+ state TEXT DEFAULT 'not-used',
+ fail_count INTEGER DEFAULT 0,
+ update_time TIMESTAMP DEFAULT (strftime('%s','now')) );")
+ db))
+
+ (define (pl-open-run-close proc . params)
+ (let* ((fname (conc "/tmp/." (current-user-name) "-portlogger.db"))
+ (avail #t)) ;; (tasks:wait-on-journal fname 10))) ;; wait up to about 10 seconds for the journal to go away
+ ;; (handle-exceptions
+ ;; exn
+ ;; (begin
+ ;; ;; (release-dot-lock fname)
+ ;; (debug:print-error 0 *default-log-port* "pl-open-run-close failed. " proc " " params)
+ ;; (debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn))
+ ;; (debug:print 5 *default-log-port* "exn=" (condition->list exn))
+ ;; (if (file-exists? fname)(delete-file fname)) ;; brutally get rid of it
+ ;; (print-call-chain (current-error-port)))
+ (let* (;; (lock (obtain-dot-lock fname 2 9 10))
+ (db (pl-open-db fname))
+ (res (apply proc db params)))
+ (sqlite3:finalize! db)
+ ;; (release-dot-lock fname)
+ res)))
+ ;; )
+
+ ;; (fold-row PROC INIT DATABASE SQL . PARAMETERS)
+ (define (pl-take-port db portnum)
+ (let* ((qry1 "INSERT INTO ports (port,state) VALUES (?,?);")
+ (qry2 "UPDATE ports SET state=?,update_time=strftime('%s','now') WHERE port=?;"))
+ (let* ((curr (pl-get-port-state db portnum))
+ (res (case (string->symbol (or curr "n/a"))
+ ((released) (sqlite3:execute db qry2 "taken" portnum) 'taken)
+ ((not-tried n/a) (sqlite3:execute db qry1 portnum "taken") 'taken)
+ ((taken) 'already-taken)
+ ((failed) 'failed)
+ (else 'error))))
+ ;; (print "res=" res)
+ res)))
+
+ (define (pl-get-prev-used-port db)
+ ;; (handle-exceptions
+ ;; exn
+ ;; (with-output-to-port (current-error-port)
+ ;; (lambda ()
+ ;; (print "EXCEPTION: portlogger database probably overloaded or unreadable. If you see this message again remove /tmp/.$USER-portlogger.db")
+ ;; (print " message: " ((condition-property-accessor 'exn 'message) exn))
+ ;; (print "exn=" (condition->list exn))
+ ;; (print-call-chain) ;; (current-error-port))
+ ;; (print "Continuing anyway.")
+ ;; #f))
+ (let ((res (sqlite3:fold-row
+ (lambda (var curr)
+ (or curr var curr))
+ #f
+ db "SELECT port FROM ports WHERE state='released';")))
+ (if res res #f)))
+ ;; )
+
+ (define (pl-find-port db acfg #!key (lowport 32768))
+ ;;(slite3:with-transaction
+ ;; db
+ ;; (lambda ()
+ (let loop ((numtries 0))
+ (let* ((portnum (or (pl-get-prev-used-port db)
+ (+ lowport ;; top of registered ports is 49152 but let's use ports in the registered range
+ (random (- 64000 lowport))))))
+ ;; (handle-exceptions
+ ;; exn
+ ;; (with-output-to-port (current-error-port)
+ ;; (lambda ()
+ ;; (print "EXCEPTION: portlogger database probably overloaded or unreadable. If you see this message again remove /tmp/.$USER-portlogger.db")
+ ;; (print " message: " ((condition-property-accessor 'exn 'message) exn))
+ ;; (print "exn=" (condition->list exn))
+ ;; (print-call-chain)
+ ;; (print "Continuing anyway.")))
+ (pl-take-port db portnum) ;; always "take the port"
+ (if (pl-is-port-available portnum)
+ portnum
+ (begin
+ (pl-set-port db portnum "taken")
+ (loop (add1 numtries)))))))
+
+
+ ;; set port to "released", "failed" etc.
+ ;;
+ (define (pl-set-port db portnum value)
+ (sqlite3:execute db "UPDATE ports SET state=?,update_time=strftime('%s','now') WHERE port=?;") value portnum)
+
+ ;; set port to "released", "failed" etc.
+ ;;
+ (define (pl-get-port-state db portnum)
+ (let ((res (sqlite3:fold-row ;; get the state of given port or "not-tried"
+ (lambda (var curr) ;; function on init/last current
+ (or curr var curr))
+ #f ;; init
+ db "SELECT state FROM ports WHERE port=?;"
+ portnum))) ;; the parameter to the query
+ (if res res #f)))
+
+ ;; (slite3:exec (slite3:sql db "UPDATE ports SET state=?,update_time=strftime('%s','now') WHERE port=?;") value portnum))
+
+ ;; release port
+ (define (pl-release-port db portnum)
+ (sqlite3:execute db "UPDATE ports SET state=?,update_time=strftime('%s','now') WHERE port=?;" "released" portnum)
+ (sqlite3:change-count db))
+
+ ;; set port to failed (attempted to take but got error)
+ ;;
+ (define (pl-set-failed db portnum)
+ (sqlite3:execute db "UPDATE ports SET state='failed',fail_count=fail_count+1,update_time=strftime('%s','now') WHERE port=?;" portnum)
+ (sqlite3:change-count db))
+
+ ;; pulled from mtut - TODO: remove from mtut, find a way *without* using netstat
+ ;;
+ (define (pl-is-port-available port-num)
+ (let-values (((inp oup pid)
+ (process "netstat" (list "-tulpn" ))))
+ (let loop ((inl (read-line inp)))
+ (if (not (eof-object? inl))
+ (begin
+ (if (string-search (regexp (conc ":" port-num "\\s+")) inl)
+ #f
+ (loop (read-line inp))))
+ #t))))
+
+ ) ;; end module
ADDED ulex/portlogger/portlogger.setup
Index: ulex/portlogger/portlogger.setup
==================================================================
--- /dev/null
+++ ulex/portlogger/portlogger.setup
@@ -0,0 +1,11 @@
+;; Copyright 2007-2018, Matthew Welland.
+;;
+;; This program is made available under the GNU GPL version 2.0 or
+;; greater. See the accompanying file COPYING for details.
+;;
+;; This program is distributed WITHOUT ANY WARRANTY; without even the
+;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+;; PURPOSE.
+
+;;;; portlogger.setup
+(standard-extension 'portlogger "0.1")
ADDED ulex/portlogger/test.scm
Index: ulex/portlogger/test.scm
==================================================================
--- /dev/null
+++ ulex/portlogger/test.scm
@@ -0,0 +1,25 @@
+(use portlogger)
+(use test)
+
+(test-begin "portlogger")
+(use (prefix sqlite3 sqlite3:))
+
+(define *port* #f)
+(define *area* #f)
+(test #f #f (begin
+ (pl-open-run-close
+ (lambda (db b)
+ (pl-get-prev-used-port db))
+ *area*)
+ #f))
+(test #f #f (pl-open-run-close (lambda (db b)(pl-get-port-state db 1234567)) *area*))
+(test #f #f (number? (pl-open-run-close (lambda (db b)(pl-take-port db 123456)) *area*)))
+(test #f #t (number? (let ((port (pl-open-run-close pl-find-port *area*)))
+ (set! *port* port)
+ port)))
+(test #f 1 (pl-open-run-close pl-release-port *port*))
+(test #f "released" (pl-open-run-close
+ (lambda (db)
+ (sqlite3:first-result db "select state from ports where port=?" *port*))))
+
+(test-end "portlogger")
ADDED ulex/run-parallel.sh
Index: ulex/run-parallel.sh
==================================================================
--- /dev/null
+++ ulex/run-parallel.sh
@@ -0,0 +1,12 @@
+#!/bin/bash
+
+CMD=$1
+
+make example
+
+for x in $(seq 1 10);do
+ ./example $CMD 2>&1| tee run$x.log &
+done
+
+wait
+
ADDED ulex/run-rpc.sh
Index: ulex/run-rpc.sh
==================================================================
--- /dev/null
+++ ulex/run-rpc.sh
@@ -0,0 +1,11 @@
+#!/bin/bash
+csc test-rpc.scm
+(./test-rpc 45000 45001 45002 45003 45004 45005|& tee test-rpc-45000.log) &
+(./test-rpc 45001 45000 45002 45003 45004 45005|& tee test-rpc-45001.log) &
+(./test-rpc 45002 45000 45001 45003 45004 45005|& tee test-rpc-45002.log) &
+(./test-rpc 45003 45000 45001 45002 45004 45005|& tee test-rpc-45003.log) &
+(./test-rpc 45004 45000 45001 45002 45003 45005|& tee test-rpc-45004.log) &
+(./test-rpc 45005 45000 45001 45002 45003 45004|& tee test-rpc-45005.log) &
+
+wait
+
ADDED ulex/telemetry/telemetry-test-client.scm
Index: ulex/telemetry/telemetry-test-client.scm
==================================================================
--- /dev/null
+++ ulex/telemetry/telemetry-test-client.scm
@@ -0,0 +1,12 @@
+
+(load "telemetry.scm")
+
+(import telemetry)
+
+(print 1)
+(telemetry-open "localhost" 12346)
+(print 2)
+(telemetry-send "goo")
+(print 3)
+(telemetry-send "goo2")
+(print 4)
ADDED ulex/telemetry/telemetry-test-server.scm
Index: ulex/telemetry/telemetry-test-server.scm
==================================================================
--- /dev/null
+++ ulex/telemetry/telemetry-test-server.scm
@@ -0,0 +1,18 @@
+
+(load "telemetry.scm")
+(import telemetry)
+(print "before")
+(use mailbox)
+(use mailbox-threads)
+(use srfi-18)
+
+(set! handler-seq 0)
+(define (handler msg)
+ (set! handler-seq (add1 handler-seq))
+ (print "=============")
+ (print handler-seq msg))
+
+(telemetry-server 12346 handler)
+
+
+(print "after")
ADDED ulex/telemetry/telemetry.meta
Index: ulex/telemetry/telemetry.meta
==================================================================
--- /dev/null
+++ ulex/telemetry/telemetry.meta
@@ -0,0 +1,21 @@
+;; -*- scheme -*-
+(
+; Your egg's license:
+(license "BSD")
+
+; Pick one from the list of categories (see below) for your egg and enter it
+; here.
+(category db)
+
+; A list of eggs dbi depends on. If none, you can omit this declaration
+; altogether. If you are making an egg for chicken 3 and you need to use
+; procedures from the `files' unit, be sure to include the `files' egg in the
+; `needs' section (chicken versions < 3.4.0 don't provide the `files' unit).
+; `depends' is an alias to `needs'.
+(needs udp mailbox-threads z3 base64 )
+
+; A list of eggs required for TESTING ONLY. See the `Tests' section.
+(test-depends test)
+
+(author "Brandon Barclay")
+(synopsis "A telemetry send/receive system using udp."))
ADDED ulex/telemetry/telemetry.release-info
Index: ulex/telemetry/telemetry.release-info
==================================================================
--- /dev/null
+++ ulex/telemetry/telemetry.release-info
@@ -0,0 +1,3 @@
+(repo fossil "http://www.kiatoa.com/cgi-bin/fossils/{egg-name}")
+(uri zip "http://www.kiatoa.com/cgi-bin/fossils/{egg-name}/zip/{egg-name}.zip?uuid={egg-release}")
+(release "0.1")
ADDED ulex/telemetry/telemetry.scm
Index: ulex/telemetry/telemetry.scm
==================================================================
--- /dev/null
+++ ulex/telemetry/telemetry.scm
@@ -0,0 +1,124 @@
+
+(module telemetry
+ (telemetry-open telemetry-send telemetry-close telemetry-server
+ telemetry-show-debugs telemetry-hide-debugs )
+
+ (import chicken scheme data-structures
+ base64 srfi-18
+ z3 udp posix extras ports mailbox mailbox-threads)
+
+ (use udp)
+ (use base64)
+ (use z3)
+ (use mailbox-threads)
+
+ (define *telemetry:telemetry-log-state* 'startup)
+ (define *telemetry:telemetry-log-socket* #f)
+
+ (define *debug-print-flag* #f)
+
+ (define (telemetry-show-debugs)
+ (set! *debug-print-flag* #t))
+
+ (define (telemetry-hide-debugs)
+ (set! *debug-print-flag* #f))
+
+ (define (debug-print . args)
+ (if *debug-print-flag*
+ (apply print "telemetry> " args)))
+
+ (define (make-telemetry-server-thread port callback)
+ (let* ((thr
+ (make-thread
+ (lambda ()
+ (let* ((s (udp-open-socket)))
+ (udp-bind! s #f port)
+ ;;(udp-connect! s "localhost" port)
+ (let loop ((seq 0))
+ (debug-print "loop seq="seq)
+ (receive (n data from-host from-port) (udp-recvfrom s 640000)
+ (let* ((encapsulated-payload
+ (with-input-from-string
+ (z3:decode-buffer
+ (base64-decode data)) read))
+ (callback-res `( (from-host . ,from-host)
+ (from-port . ,from-port)
+ (data-len . ,n)
+ ,@encapsulated-payload )))
+ (callback callback-res))
+
+ )
+ (loop (add1 seq)))
+ (udp-close-socket s))))))
+ (thread-start! thr)
+ thr))
+
+ (define (telemetry-server port handler-callback)
+ (let* ((serv-thread (make-telemetry-server-thread port handler-callback)))
+ (print serv-thread)
+ (thread-join! serv-thread)))
+
+
+ (define (telemetry-open serverhost serverport)
+ (let* ((user (or (get-environment-variable "USER") "unknown"))
+ (host (or (get-environment-variable "HOST") "unknown")))
+ (set! *telemetry:telemetry-log-state*
+ (handle-exceptions
+ exn
+ (begin
+ (debug-print "telemetry-open udp port failure")
+ 'broken)
+ (if (and serverhost serverport user host)
+ (let* ((s (udp-open-socket)))
+ ;;(udp-bind! s #f 0)
+ (udp-connect! s serverhost serverport)
+ (set! *telemetry:telemetry-log-socket* s)
+ 'open)
+ 'not-needed)))))
+
+
+ (define (telemetry-close)
+ (when (or (member *telemetry:telemetry-log-state* '(broken-or-no-server-preclose open)) *telemetry:telemetry-log-socket*)
+ (handle-exceptions
+ exn
+ (begin
+ (define *telemetry:telemetry-log-state* 'closed-fail)
+ (debug-print "telemetry-telemetry-log closure failure")
+ )
+ (begin
+ (define *telemetry:telemetry-log-state* 'closed)
+ (udp-close-socket *telemetry:telemetry-log-socket*)
+ (set! *telemetry:telemetry-log-socket* #f)))))
+
+ (define (telemetry-send payload)
+ (if (eq? 'open *telemetry:telemetry-log-state*)
+ (handle-exceptions
+ exn
+ (begin
+ (debug-print "telemetry-telemetry-log comms failure ; disabled (no server?)")
+ (define *telemetry:telemetry-log-state* 'broken-or-no-server-preclose)
+ (telemetry-close)
+ (define *telemetry:telemetry-log-state* 'broken-or-no-server)
+ (set! *telemetry:telemetry-log-socket* #f)
+ )
+ (if (and *telemetry:telemetry-log-socket* payload)
+ (let* ((user (or (get-environment-variable "USER") "unknown"))
+ (host (or (get-environment-variable "HOST") "unknown"))
+ (encapsulated-payload
+ `( (user . ,user)
+ (host . ,host)
+ (pid . ,(current-process-id))
+ (payload . ,payload) ) )
+ (msg
+ (base64-encode
+ (z3:encode-buffer
+ (with-output-to-string (lambda () (pp encapsulated-payload)))))))
+ ;;(debug-print "pre-send")
+ (let ((res (udp-send *telemetry:telemetry-log-socket* msg)))
+ ;;(debug-print "post-send >"res"<")
+ res)
+
+ )))) )
+
+
+ )
ADDED ulex/telemetry/telemetry.setup
Index: ulex/telemetry/telemetry.setup
==================================================================
--- /dev/null
+++ ulex/telemetry/telemetry.setup
@@ -0,0 +1,11 @@
+;; Copyright 2007-2018, Matthew Welland.
+;;
+;; This program is made available under the GNU GPL version 2.0 or
+;; greater. See the accompanying file COPYING for details.
+;;
+;; This program is distributed WITHOUT ANY WARRANTY; without even the
+;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+;; PURPOSE.
+
+;;;; ulex.setup
+(standard-extension 'telemetry "0.1")
ADDED ulex/test-rpc.scm
Index: ulex/test-rpc.scm
==================================================================
--- /dev/null
+++ ulex/test-rpc.scm
@@ -0,0 +1,75 @@
+(use rpc posix srfi-1 pkts)
+
+(define *usage* "Usage: test-rpc") ;; myportnum [port-nums...]")
+
+#;(define *portnums* (let ((args (command-line-arguments)))
+ (if (null? args)
+ (begin
+ (print *usage*)
+ (exit))
+ (map string->number args))))
+
+#;(if (not (null? (filter (lambda (x)(not x)) *portnums*)))
+ (begin
+ (print "ERROR: portnumbers must all be integers, you gave " *portnums*)
+ (exit)))
+
+
+(define (find-free-port-and-open port)
+ (handle-exceptions
+ exn
+ (begin
+ (print "Failed to bind to port " (rpc:default-server-port) ", trying next port")
+ (find-free-port-and-open (+ port 1)))
+ (rpc:default-server-port port)
+ (tcp-read-timeout 120000)
+ (tcp-listen (rpc:default-server-port) )
+ port))
+
+#;(define *myportnum* (car *portnums*))
+#;(define *clients* (cdr *portnums*))
+
+#;(print "Setting up server on " *myportnum*)
+;; (rpc:default-server-port *myportnum*)
+(define *myportnum* (find-free-port-and-open 20000)) ;; *myportnum*))
+
+
+
+;;;; server.scm
+(rpc:publish-procedure!
+ 'foo
+ (lambda (x)
+ (print "foo: " x)
+ (conc "response from " *myportnum*)))
+
+(define *queue* (make-queue))
+
+(rpc:publish-procedure!
+ 'queue-add
+ (lambda (dbfname qrystr raddr rport . params)
+ (queue-add! *queue* (list dbfname qrystr raddr rport params))))
+
+;;;; client.scm
+(define (call n)
+ (let ((port (list-ref *clients* (random (length *clients*)))))
+ (print "calling to " port)
+ ((rpc:procedure 'foo "localhost" port) n)))
+
+(rpc:publish-procedure!
+ 'fini
+ (lambda () (print "fini") (thread-start! (lambda () (thread-sleep! 3) (print "terminate") (exit))) #f))
+
+(let* ((server-th (make-thread (lambda ()
+ ((rpc:make-server (tcp-listen *myportnum*)) #t)) ;; (rpc:default-server-port))) #t))
+ "server thread"))
+ (timer-th (make-thread (lambda ()
+ (thread-sleep! 30)
+ (print "Node " *myportnum* ", 30 seconds up. Exiting.")
+ (exit)))))
+ (thread-start! server-th)
+ (thread-start! timer-th)
+ (thread-sleep! 1)
+ (do ((i 10000 (sub1 i)))
+ ((zero? i))
+ (print "-> " (call (random 100)))))
+
ADDED ulex/test-script.scm
Index: ulex/test-script.scm
==================================================================
--- /dev/null
+++ ulex/test-script.scm
@@ -0,0 +1,12 @@
+(import trace (prefix ulex ulex:))
+(trace-call-sites #t)
+
+;; (trace ulex:receive-message ulex:std-peer-handler ulex:process-db-queries ulex:work-queue-add ulex:call send-message ulex:get-best-server ulex:ping)
+(set! *default-error-port* (current-output-port))
+
+(ulex:call *area* "test.db" 'savemsg '("my message" "matt"))
+
+(define *servers* (ulex:get-all-server-pkts *area*))
+
+(define numofrecords (ulex:call *area* "test.db" 'getnum '()))
+;; (define bunchofrecords (ulex:call *area* "test.db" 'getsome '()))
ADDED ulex/tests/faux-mt-callspec.scm
Index: ulex/tests/faux-mt-callspec.scm
==================================================================
--- /dev/null
+++ ulex/tests/faux-mt-callspec.scm
@@ -0,0 +1,54 @@
+(use test (prefix sqlite3 sqlite3:) posix)
+(if (file-exists? "ulex.scm")
+ (load "ulex.scm")
+ (load "../ulex.scm"))
+
+(use trace)
+(trace-call-sites #t)
+
+(import ulex ) ;; (import (prefix ulex ulex:))
+
+
+(test-begin "faux-mtdb")
+;; pre-clean
+
+(for-each (lambda (dir)
+ (if (directory-exists? dir)
+ (system (conc "/bin/rm -rf ./"dir)))
+ (system (conc "/bin/mkdir -p ./"dir))
+ )
+ '("faux-mtdb" "faux-mtdb-pkts"))
+
+
+(let* ((area (make-area dbdir: "faux-mtdb" pktsdir: "faux-mtdb-pkts"))
+ (specfile "tests/mt-spec.sexpr")
+ (dbname "faux-mt.db"))
+ (launch area)
+ (initialize-area-calls-from-specfile area specfile)
+
+
+ (let* ((target-name "a/b/c/d")
+ (insert-result (call area dbname 'new-target (list target-name)))
+ (test-target-id (caar (call area dbname 'target-name->target-id (list target-name))))
+ (test-target-name (caar (call area dbname 'target-id->target-name (list 1)))))
+ (test #f #t insert-result)
+ (test #f 1 test-target-id )
+ (test #f target-name test-target-name )
+ )
+
+ (test #f #t (shutdown area)))
+
+;; thought experiment - read cursors
+;; (let* ((cursor (call area dbname 'get-target-names '())))
+;; (let loop ((row (cursor)))
+;; (cond
+;; ((not row) #t)
+;; (else
+;; (print "ROW IS "row)
+;; (loop (cursor))))))
+
+
+(test-end "faux-mtdb")
+
+
+
ADDED ulex/tests/mt-spec.sexpr
Index: ulex/tests/mt-spec.sexpr
==================================================================
--- /dev/null
+++ ulex/tests/mt-spec.sexpr
@@ -0,0 +1,28 @@
+(
+ (dbwrite .
+ (
+ (dbinitsql . (
+ "create table if not exists targets(id integer primary key,name)"
+ "create table if not exists runs(id integer primary key,target_id,name,path,state,status)"
+ "create table if not exists tests(id integer primary key,run_id,name,path,state,status,host)"
+ "create table if not exists test_steps(id integer primary key,test_id,name,state)" ))
+
+ ( new-target . "insert into targets (name) values(?);")
+ ( new-run . "insert into runs (target_id,name,path,state,status) values(?,?,\"/dev/null\",\"NOT STARTED\",\"n/a\")")
+ ( new-test . "insert into tests values(?,?,?,\"/dev/null\",\"NOT STARTED\")")
+ ( update-one-run_id-state-status . "update runs set state=? status=? where id=?" )
+ ( update-one-test_id-state-status . "update tests set state=? status=? where id=?" )
+ ( update-matching-tests-state-status . "update tests set state=? status=? where run_id=?, state like ?, status like ?")
+ )
+ )
+ (dbread .
+ (
+ (get-targets . "select id,name from targets")
+ (target-name->target-id . "select id from targets where name=?")
+ (target-id->target-name . "select name from targets where id=?")
+ (check-test-state-status . "select state,status from tests where id=?")
+ )
+ )
+
+ )
+
ADDED ulex/tests/run.scm
Index: ulex/tests/run.scm
==================================================================
--- /dev/null
+++ ulex/tests/run.scm
@@ -0,0 +1,182 @@
+(use test (prefix sqlite3 sqlite3:) posix ulex-netutil rpc pkts mailbox)
+
+;; (use (prefix ulex ulex:))
+(if (file-exists? "ulex.scm")
+ (load "ulex.scm")
+ (load "../ulex.scm"))
+
+(use trace)
+(trace-call-sites #t)
+
+(import ulex) ;; (import (prefix ulex ulex:))
+
+(trace
+ ;; send-message
+ ;; receive-message
+ ;; std-peer-handler
+ ;; work-queue-add
+ ;; deliver-response
+ ;; finalize-all-db-handles
+ ;; area-dbhandles
+ ;; save-dbh
+ ;; process-db-queries
+
+ ;; dbdat-dbh
+ ;; get-best-server
+ ;; calc-server-score
+ ;; ping
+ ;; full-ping
+ ;; register-node
+ ;; calc-server-score
+ ;; update-known-servers
+ ;; request
+ ;; get-dbh
+ ;; update-stats
+ ;; process-db-queries
+ ;; deliver-response
+ )
+
+(test-begin "misc")
+(test #f #t (string? (get-my-best-address)))
+(test-end "misc")
+
+;;======================================================================
+;; Setup
+;;======================================================================
+
+(system "rm -rf testulexdb testpkts")
+(create-directory "testulexdb" #t)
+(create-directory "testpkts" #t)
+
+(define *area* (make-area dbdir: "testulexdb" pktsdir: "testpkts"))
+
+(define *port* #f)
+
+;;======================================================================
+;; Ulex-db
+;;======================================================================
+
+(test-begin "ulex-db")
+(test #f #t (equal? (area-dbdir *area*) "testulexdb"))
+(test #f #t (thread? (thread-start! (make-thread (lambda ()(launch *area*)) "server"))))
+(thread-sleep! 1)
+(test #f 1 (update-known-servers *area*))
+(test #f #t (list? (get-all-server-pkts *area*)))
+(test #f (area-myaddr *area*) (cadr (ping *area* (area-myaddr *area*)(area-port *area*))))
+
+(let loop ((count 10))
+ (if (null? (get-all-server-pkts *area*))
+ (if (> count 0)
+ (begin
+ (thread-sleep! 1)
+ (print "waiting for server pkts")
+ (loop (- count 1))))))
+(test #f #t (let ((spkts (get-all-server-pkts *area*)))
+ (and (list spkts) (> (length spkts) 0))))
+(test #f #t (begin (register-batch
+ *area*
+ 'dbwrite ;; this is the call type
+ `((dbinitsql . ("CREATE TABLE IF NOT EXISTS messages (id INTEGER PRIMARY KEY, message TEXT, author TEXT, msg_time INTEGER);"))
+ (savemsg . "INSERT INTO messages (message,author) VALUES (?,?)")
+ ;; (readmsg . "SELECT * FROM messages WHERE author=?;")
+
+ ))
+ #t))
+
+(test #f #t (calldat? (get-rentry *area* 'dbinitsql)))
+(define cdat1 (get-rentry *area* 'dbinitsql))
+(test #f #t (list? (get-best-server *area* "test.db" 'savemsg)))
+(test #f #t (eq? 'dbwrite (calldat-ctype cdat1)))
+(test #f #t (list? (get-rsql *area* 'dbinitsql)))
+(test #f #t (dbdat? (open-db *area* "test.db")))
+
+(test #f #t (dbdat? (let ((dbh (get-dbh *area* "test.db")))
+ (save-dbh *area* "test.db" dbh)
+ dbh)))
+(test #f #t (dbdat? (let ((dbh (get-dbh *area* "test.db")))
+ dbh)))
+
+;(test #f '(#t "db write submitted" #t) (call *area* "test.db" 'savemsg '("Test message!" "matt")))
+(test #f #t (call *area* "test.db" 'savemsg '("Test message!" "matt")))
+;;(thread-sleep! 15);; server needs time to process the request (it is non-blocking)
+;; (test #f #t (shutdown *area*))
+;; (test #f 0 (calc-server-score *area* "test.db" (area-pktid *area*)))
+
+(test #f #t (list? (get-best-server *area* "test.db" (area-pktid *area*))))
+(define *best-server* (car (get-best-server *area* "test.db" (area-pktid *area*))))
+(pp *best-server*)
+(define *server-pkt* (hash-table-ref/default (area-hosts *area*) (area-pktid *area*) #f))
+(define *server-ip* (alist-ref 'ipaddr *server-pkt*))
+(define *server-port* (any->number (alist-ref 'port *server-pkt*)))
+(test #f #t (list? (ping *area* *server-ip* *server-port*)))
+
+(test #f #t (process-db-queries *area* "test.db"))
+(test #f #f (process-db-queries *area* "junk.db"))
+;; (test #f #t (cadr (full-ping *area* *server-pkt*)))
+
+
+(test-end "ulex-db")
+
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+
+(test-begin "faux-mtdb")
+;; pre-clean
+
+#;(for-each (lambda (dir)
+ (if (directory-exists? dir)
+ (system (conc "/bin/rm -rf ./"dir)))
+ (system (conc "/bin/mkdir -p ./"dir))
+ )
+ '("faux-mtdb" "faux-mtdb-pkts"))
+
+
+(let* ((area *area*) ;; (make-area dbdir: "faux-mtdb" pktsdir: "faux-mtdb-pkts"))
+ (specfile "tests/mt-spec.sexpr")
+ (dbname "faux-mt.db"))
+ ;; (launch area)
+ (initialize-area-calls-from-specfile area specfile)
+ (let* ((target-name "a/b/c/d")
+ (insert-result (call area dbname 'new-target (list target-name)))
+ (test-target-id (caar (call area dbname 'target-name->target-id (list target-name))))
+ (test-target-name (caar (call area dbname 'target-id->target-name (list 1)))))
+ (test #f #t insert-result)
+ (test #f 1 test-target-id )
+ (test #f target-name test-target-name )
+ )
+ (test #f #t (list? (get-best-server *area* "test.db" 'savemsg)))
+ (thread-sleep! 5)
+ (test #f #t (begin (shutdown area) #t)))
+
+(test #f #t (process-db-queries *area* "test.db"))
+(test #f #f (process-db-queries *area* "junk.db"))
+
+;; thought experiment - read cursors
+;; (let* ((cursor (call area dbname 'get-target-names '())))
+;; (let loop ((row (cursor)))
+;; (cond
+;; ((not row) #t)
+ ;; (else
+;; (print "ROW IS "row)
+;; (loop (cursor))))))
+
+
+(test-end "faux-mtdb")
+
+;;======================================================================
+;; Portlogger tests
+;;======================================================================
+
+;; (test-begin "portlogger")
+;;
+;; (test #f #f (begin (pl-open-run-close (lambda (db b)(pl-get-prev-used-port db)) *area*) #f))
+;; (test #f #f (pl-open-run-close (lambda (db b)(pl-get-port-state db 1234567)) *area*))
+;; (test #f #f (number? (pl-open-run-close (lambda (db b)(pl-take-port db 123456)) *area*)))
+;; (test #f #t (number? (let ((port (pl-open-run-close pl-find-port *area*)))
+;; (set! *port* port)
+;; port)))
+;; (test #f 1 (pl-open-run-close pl-release-port *port*))
+;; (test #f "released" (pl-open-run-close
+;; (lambda (db)
+;; (sqlite3:first-result db "select state from ports where port=?" *port*))))
+;;
+;; (test-end "portlogger")
ADDED ulex/ulex.meta
Index: ulex/ulex.meta
==================================================================
--- /dev/null
+++ ulex/ulex.meta
@@ -0,0 +1,21 @@
+;; -*- scheme -*-
+(
+; Your egg's license:
+(license "BSD")
+
+; Pick one from the list of categories (see below) for your egg and enter it
+; here.
+(category db)
+
+; A list of eggs dbi depends on. If none, you can omit this declaration
+; altogether. If you are making an egg for chicken 3 and you need to use
+; procedures from the `files' unit, be sure to include the `files' egg in the
+; `needs' section (chicken versions < 3.4.0 don't provide the `files' unit).
+; `depends' is an alias to `needs'.
+(needs rpc pkts mailbox sqlite3)
+
+; A list of eggs required for TESTING ONLY. See the `Tests' section.
+(test-depends test)
+
+(author "Matt Welland")
+(synopsis "A distributed mesh-like layer for sqlite3."))
ADDED ulex/ulex.release-info
Index: ulex/ulex.release-info
==================================================================
--- /dev/null
+++ ulex/ulex.release-info
@@ -0,0 +1,3 @@
+(repo fossil "http://www.kiatoa.com/cgi-bin/fossils/{egg-name}")
+(uri zip "http://www.kiatoa.com/cgi-bin/fossils/{egg-name}/zip/{egg-name}.zip?uuid={egg-release}")
+(release "0.1")
ADDED ulex/ulex.scm
Index: ulex/ulex.scm
==================================================================
--- /dev/null
+++ ulex/ulex.scm
@@ -0,0 +1,1452 @@
+;;; ulex: Distributed sqlite3 db
+;;;
+;; Copyright (C) 2018 Matt Welland
+;; Redistribution and use in source and binary forms, with or without
+;; modification, is permitted.
+;;
+;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS
+;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+;; ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE
+;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
+;; OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+;; BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+;; LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
+;; USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
+;; DAMAGE.
+
+;;======================================================================
+;; ABOUT:
+;; See README in the distribution at https://www.kiatoa.com/fossils/ulex
+;; NOTES:
+;; Why sql-de-lite and not say, dbi? - performance mostly, then simplicity.
+;;
+;;======================================================================
+
+;; (use rpc pkts mailbox sqlite3)
+
+(module ulex
+ *
+ ;; #;(
+ ;; ;; areas
+ ;; make-area
+ ;; area->alist
+ ;; ;; server
+ ;; launch
+ ;; update-known-servers
+ ;; shutdown
+ ;; get-best-server
+ ;; ;; client side
+ ;; call
+ ;; ;; queries, procs and system commands (i.e. workers)
+ ;; register
+ ;; register-batch
+ ;; ;; database - note: most database stuff need not be exposed, these calls may be removed from exports in future
+ ;; open-db
+ ;; ;; ports
+ ;; pl-find-port
+ ;; pl-get-prev-used-port
+ ;; pl-open-db
+ ;; pl-open-run-close
+ ;; pl-release-port
+ ;; pl-set-port
+ ;; pl-take-port
+ ;; pl-is-port-available
+ ;; pl-get-port-state
+ ;; ;; system
+ ;; get-normalized-cpu-load
+
+ ;; )
+
+(import scheme posix chicken data-structures ports extras files mailbox)
+(import rpc srfi-18 pkts matchable regex
+ typed-records srfi-69 srfi-1
+ srfi-4 regex-case
+ (prefix sqlite3 sqlite3:)
+ foreign
+ tcp) ;; ulex-netutil)
+
+;;======================================================================
+;; D E B U G H E L P E R S
+;;======================================================================
+
+(define (dbg> . args)
+ (with-output-to-port (current-error-port)
+ (lambda ()
+ (apply print "dbg> " args))))
+
+(define (debug-pp . args)
+ (if (get-environment-variable "ULEX_DEBUG")
+ (with-output-to-port (current-error-port)
+ (lambda ()
+ (apply pp args)))))
+
+(define *default-debug-port* (current-error-port))
+
+(define (sdbg> fn stage-name stage-start stage-end start-time . message)
+ (if (get-environment-variable "ULEX_DEBUG")
+ (with-output-to-port *default-debug-port*
+ (lambda ()
+ (apply print "ulex:" fn " " stage-name " took " (- (if stage-end stage-end (current-milliseconds)) stage-start) " ms. "
+ (if start-time
+ (conc "total time " (- (current-milliseconds) start-time)
+ " ms.")
+ "")
+ message
+ )))))
+
+;;======================================================================
+;; M A C R O S
+;;======================================================================
+;; iup callbacks are not dumping the stack, this is a work-around
+;;
+
+;; Some of these routines use:
+;;
+;; http://www.cs.toronto.edu/~gfb/scheme/simple-macros.html
+;;
+;; Syntax for defining macros in a simple style similar to function definiton,
+;; when there is a single pattern for the argument list and there are no keywords.
+;;
+;; (define-simple-syntax (name arg ...) body ...)
+;;
+
+(define-syntax define-simple-syntax
+ (syntax-rules ()
+ ((_ (name arg ...) body ...)
+ (define-syntax name (syntax-rules () ((name arg ...) (begin body ...)))))))
+
+(define-simple-syntax (catch-and-dump proc procname)
+ (handle-exceptions
+ exn
+ (begin
+ (print-call-chain (current-error-port))
+ (with-output-to-port (current-error-port)
+ (lambda ()
+ (print ((condition-property-accessor 'exn 'message) exn))
+ (print "Callback error in " procname)
+ (print "Full condition info:\n" (condition->list exn)))))
+ (proc)))
+
+
+;;======================================================================
+;; R E C O R D S
+;;======================================================================
+
+;; information about me as a server
+;;
+(defstruct area
+ ;; about this area
+ (useportlogger #f)
+ (lowport 32768)
+ (server-type 'auto) ;; auto=create up to five servers/pkts, main=create pkts, passive=no pkt (unless there are no pkts at all)
+ (conn #f)
+ (port #f)
+ (myaddr (get-my-best-address))
+ pktid ;; get pkt from hosts table if needed
+ pktfile
+ pktsdir
+ dbdir
+ (dbhandles (make-hash-table)) ;; fname => list-of-dbh, NOTE: Should really never need more than one?
+ (mutex (make-mutex))
+ (rtable (make-hash-table)) ;; registration table of available actions
+ (dbs (make-hash-table)) ;; filename => random number, used for choosing what dbs I serve
+ ;; about other servers
+ (hosts (make-hash-table)) ;; key => hostdat
+ (hoststats (make-hash-table)) ;; key => alist of fname => ( qcount . qtime )
+ (reqs (make-hash-table)) ;; uri => queue
+ ;; work queues
+ (wqueues (make-hash-table)) ;; fname => qdat
+ (stats (make-hash-table)) ;; fname => totalqueries
+ (last-srvup (current-seconds)) ;; last time we updated the known servers
+ (cookie2mbox (make-hash-table)) ;; map cookie for outstanding request to mailbox of awaiting call
+ (ready #f)
+ (health (make-hash-table)) ;; ipaddr:port => num failed pings since last good ping
+ )
+
+;; host stats
+;;
+(defstruct hostdat
+ (pkt #f)
+ (dbload (make-hash-table)) ;; "dbfile.db" => queries/min
+ (hostload #f) ;; normalized load ( 5min load / numcpus )
+ )
+
+;; dbdat
+;;
+(defstruct dbdat
+ (dbh #f)
+ (fname #f)
+ (write-access #f)
+ (sths (make-hash-table)) ;; hash mapping query strings to handles
+ )
+
+;; qdat
+;;
+(defstruct qdat
+ (writeq (make-queue))
+ (readq (make-queue))
+ (rwq (make-queue))
+ (logq (make-queue)) ;; do we need a queue for logging? yes, if we use sqlite3 db for logging
+ (osshort (make-queue))
+ (oslong (make-queue))
+ (misc (make-queue)) ;; used for things like ping-full
+ )
+
+;; calldat
+;;
+(defstruct calldat
+ (ctype 'dbwrite)
+ (obj #f) ;; this would normally be an SQL statement e.g. SELECT, INSERT etc.
+ (rtime (current-milliseconds)))
+
+;; make it a global? Well, it is local to area module
+
+(define *pktspec*
+ `((server (hostname . h)
+ (port . p)
+ (pid . i)
+ (ipaddr . a)
+ )
+ (data (hostname . h) ;; sender hostname
+ (port . p) ;; sender port
+ (ipaddr . a) ;; sender ip
+ (hostkey . k) ;; sending host key - store info at server under this key
+ (servkey . s) ;; server key - this needs to match at server end or reject the msg
+ (format . f) ;; sb=serialized-base64, t=text, sx=sexpr, j=json
+ (data . d) ;; base64 encoded slln data
+ )))
+
+;; work item
+;;
+(defstruct witem
+ (rhost #f) ;; return host
+ (ripaddr #f) ;; return ipaddr
+ (rport #f) ;; return port
+ (servkey #f) ;; the packet representing the client of this workitem, used by final send-message
+ (rdat #f) ;; the request - usually an sql query, type is rdat
+ (action #f) ;; the action: immediate, dbwrite, dbread,oslong, osshort
+ (cookie #f) ;; cookie id for response
+ (data #f) ;; the data payload, i.e. parameters
+ (result #f) ;; the result from processing the data
+ (caller #f)) ;; the calling peer according to rpc itself
+
+(define (trim-pktid pktid)
+ (if (string? pktid)
+ (substring pktid 0 4)
+ "nopkt"))
+
+(define (any->number num)
+ (cond
+ ((number? num) num)
+ ((string? num) (string->number num))
+ (else num)))
+
+(use trace)
+(trace-call-sites #t)
+
+;;======================================================================
+;; D A T A B A S E H A N D L I N G
+;;======================================================================
+
+;; look in dbhandles for a db, return it, else return #f
+;;
+(define (get-dbh acfg fname)
+ (let ((dbh-lst (hash-table-ref/default (area-dbhandles acfg) fname '())))
+ (if (null? dbh-lst)
+ (begin
+ ;; (print "opening db for " fname)
+ (open-db acfg fname)) ;; Note that the handles get put back in the queue in the save-dbh calls
+ (let ((rem-lst (cdr dbh-lst)))
+ ;; (print "re-using saved connection for " fname)
+ (hash-table-set! (area-dbhandles acfg) fname rem-lst)
+ (car dbh-lst)))))
+
+(define (save-dbh acfg fname dbdat)
+ ;; (print "saving dbh for " fname)
+ (hash-table-set! (area-dbhandles acfg) fname (cons dbdat (hash-table-ref/default (area-dbhandles acfg) fname '()))))
+
+;; open the database, if never before opened init it. put the handle in the
+;; open db's hash table
+;; returns: the dbdat
+;;
+(define (open-db acfg fname)
+ (let* ((fullname (conc (area-dbdir acfg) "/" fname))
+ (exists (file-exists? fullname))
+ (write-access (if exists
+ (file-write-access? fullname)
+ (file-write-access? (area-dbdir acfg))))
+ (db (sqlite3:open-database fullname))
+ (handler (sqlite3:make-busy-timeout 136000))
+ )
+ (sqlite3:set-busy-handler! db handler)
+ (sqlite3:execute db "PRAGMA synchronous = 0;")
+ (if (not exists) ;; need to init the db
+ (if write-access
+ (let ((isql (get-rsql acfg 'dbinitsql))) ;; get the init sql statements
+ ;; (sqlite3:with-transaction
+ ;; db
+ ;; (lambda ()
+ (if isql
+ (for-each
+ (lambda (sql)
+ (sqlite3:execute db sql))
+ isql)))
+ (print "ERROR: no write access to " (area-dbdir acfg))))
+ (make-dbdat dbh: db fname: fname write-access: write-access)))
+
+;; This is a low-level command to retrieve or to prepare, save and return a prepared statment
+;; you must extract the db handle
+;;
+(define (get-sth db cache stmt)
+ (if (hash-table-exists? cache stmt)
+ (begin
+ ;; (print "Reusing cached stmt for " stmt)
+ (hash-table-ref/default cache stmt #f))
+ (let ((sth (sqlite3:prepare db stmt)))
+ (hash-table-set! cache stmt sth)
+ ;; (print "prepared stmt for " stmt)
+ sth)))
+
+;; a little more expensive but does all the tedious deferencing - only use if you don't already
+;; have dbdat and db sitting around
+;;
+(define (full-get-sth acfg fname stmt)
+ (let* ((dbdat (get-dbh acfg fname))
+ (db (dbdat-dbh dbdat))
+ (sths (dbdat-sths dbdat)))
+ (get-sth db sths stmt)))
+
+;; write to a db
+;; acfg: area data
+;; rdat: request data
+;; hdat: (host . port)
+;;
+;; (define (dbwrite acfg rdat hdat data-in)
+;; (let* ((dbname (car data-in))
+;; (dbdat (get-dbh acfg dbname))
+;; (db (dbdat-dbh dbdat))
+;; (sths (dbdat-sths dbdat))
+;; (stmt (calldat-obj rdat))
+;; (sth (get-sth db sths stmt))
+;; (data (cdr data-in)))
+;; (print "dbname: " dbname " acfg: " acfg " rdat: " (calldat->alist rdat) " hdat: " hdat " data: " data)
+;; (print "dbdat: " (dbdat->alist dbdat))
+;; (apply sqlite3:execute sth data)
+;; (save-dbh acfg dbname dbdat)
+;; #t
+;; ))
+
+(define (finalize-all-db-handles acfg)
+ (let* ((dbhandles (area-dbhandles acfg)) ;; dbhandles is hash of fname ==> dbdat
+ (num 0))
+ (for-each
+ (lambda (area-name)
+ (print "Closing handles for " area-name)
+ (let ((dbdats (hash-table-ref/default dbhandles area-name '())))
+ (for-each
+ (lambda (dbdat)
+ ;; first close all statement handles
+ (for-each
+ (lambda (sth)
+ (sqlite3:finalize! sth)
+ (set! num (+ num 1)))
+ (hash-table-values (dbdat-sths dbdat)))
+ ;; now close the dbh
+ (set! num (+ num 1))
+ (sqlite3:finalize! (dbdat-dbh dbdat)))
+ dbdats)))
+ (hash-table-keys dbhandles))
+ (print "FINALIZED " num " dbhandles")))
+
+;;======================================================================
+;; W O R K Q U E U E H A N D L I N G
+;;======================================================================
+
+(define (register-db-as-mine acfg dbname)
+ (let ((ht (area-dbs acfg)))
+ (if (not (hash-table-ref/default ht dbname #f))
+ (hash-table-set! ht dbname (random 10000)))))
+
+(define (work-queue-add acfg fname witem)
+ (let* ((work-queue-start (current-milliseconds))
+ (action (witem-action witem)) ;; NB the action is the index into the rdat actions
+ (qdat (or (hash-table-ref/default (area-wqueues acfg) fname #f)
+ (let ((newqdat (make-qdat)))
+ (hash-table-set! (area-wqueues acfg) fname newqdat)
+ newqdat)))
+ (rdat (hash-table-ref/default (area-rtable acfg) action #f)))
+ (if rdat
+ (queue-add!
+ (case (calldat-ctype rdat)
+ ((dbwrite) (register-db-as-mine acfg fname)(qdat-writeq qdat))
+ ((dbread) (register-db-as-mine acfg fname)(qdat-readq qdat))
+ ((dbrw) (register-db-as-mine acfg fname)(qdat-rwq qdat))
+ ((oslong) (qdat-oslong qdat))
+ ((osshort) (qdat-osshort qdat))
+ ((full-ping) (qdat-misc qdat))
+ (else
+ (print "ERROR: no queue for " action ". Adding to dbwrite queue.")
+ (qdat-writeq qdat)))
+ witem)
+ (case action
+ ((full-ping)(qdat-misc qdat))
+ (else
+ (print "ERROR: No action " action " was registered"))))
+ (sdbg> "work-queue-add" "queue-add" work-queue-start #f #f)
+ #t)) ;; for now, simply return #t to indicate request got to the queue
+
+(define (doqueue acfg q fname dbdat dbh)
+ ;; (print "doqueue: " fname)
+ (let* ((start-time (current-milliseconds))
+ (qlen (queue-length q)))
+ (if (> qlen 1)
+ (print "Processing queue of length " qlen))
+ (let loop ((count 0)
+ (responses '()))
+ (let ((delta (- (current-milliseconds) start-time)))
+ (if (or (queue-empty? q)
+ (> delta 400)) ;; stop working on this queue after 400ms have passed
+ (list count delta responses) ;; return count, delta and responses list
+ (let* ((witem (queue-remove! q))
+ (action (witem-action witem))
+ (rdat (witem-rdat witem))
+ (stmt (calldat-obj rdat))
+ (sth (full-get-sth acfg fname stmt))
+ (ctype (calldat-ctype rdat))
+ (data (witem-data witem))
+ (cookie (witem-cookie witem)))
+ ;; do the processing and save the result in witem-result
+ (witem-result-set!
+ witem
+ (case ctype ;; action
+ ((noblockwrite) ;; blind write, no ack of success returned
+ (apply sqlite3:execute sth data)
+ (sqlite3:last-insert-rowid dbh))
+ ((dbwrite) ;; blocking write
+ (apply sqlite3:execute sth data)
+ #t)
+ ((dbread) ;; TODO: consider breaking this up and shipping in pieces for large query
+ (apply sqlite3:map-row (lambda x x) sth data))
+ ((full-ping) 'full-ping)
+ (else (print "Not ready for action " action) #f)))
+ (loop (add1 count)
+ (if cookie
+ (cons witem responses)
+ responses))))))))
+
+;; do up to 400ms of processing on each queue
+;; - the work-queue-processor will allow the max 1200ms of work to complete but it will flag as overloaded
+;;
+(define (process-db-queries acfg fname)
+ (if (hash-table-exists? (area-wqueues acfg) fname)
+ (let* ((process-db-queries-start-time (current-milliseconds))
+ (qdat (hash-table-ref/default (area-wqueues acfg) fname #f))
+ (queue-sym->queue (lambda (queue-sym)
+ (case queue-sym ;; lookup the queue from qdat given a name (symbol)
+ ((wqueue) (qdat-writeq qdat))
+ ((rqueue) (qdat-readq qdat))
+ ((rwqueue) (qdat-rwq qdat))
+ ((misc) (qdat-misc qdat))
+ (else #f))))
+ (dbdat (get-dbh acfg fname))
+ (dbh (if (dbdat? dbdat)(dbdat-dbh dbdat) #f))
+ (nowtime (current-seconds)))
+ ;; handle the queues that require a transaction
+ ;;
+ (map ;;
+ (lambda (queue-sym)
+ ;; (print "processing queue " queue-sym)
+ (let* ((queue (queue-sym->queue queue-sym)))
+ (if (not (queue-empty? queue))
+ (let ((responses
+ (sqlite3:with-transaction ;; todo - catch exceptions...
+ dbh
+ (lambda ()
+ (let* ((res (doqueue acfg queue fname dbdat dbh))) ;; this does the work!
+ ;; (print "res=" res)
+ (match res
+ ((count delta responses)
+ (update-stats acfg fname queue-sym delta count)
+ (sdbg> "process-db-queries" "sqlite3-transaction" process-db-queries-start-time #f #f)
+ responses) ;; return responses
+ (else
+ (print "ERROR: bad return data from doqueue " res)))
+ )))))
+ ;; having completed the transaction, send the responses.
+ ;; (print "INFO: sending " (length responses) " responses.")
+ (let loop ((responses-left responses))
+ (cond
+ ((null? responses-left) #t)
+ (else
+ (let* ((witem (car responses-left))
+ (response (cdr responses-left)))
+ (call-deliver-response acfg (witem-ripaddr witem)(witem-rport witem)
+ (witem-cookie witem)(witem-result witem)))
+ (loop (cdr responses-left))))))
+ )))
+ '(wqueue rwqueue rqueue))
+
+ ;; handle misc queue
+ ;;
+ ;; (print "processing misc queue")
+ (let ((queue (queue-sym->queue 'misc)))
+ (doqueue acfg queue fname dbdat dbh))
+ ;; ....
+ (save-dbh acfg fname dbdat)
+ #t ;; just to let the tests know we got here
+ )
+ #f ;; nothing processed
+ ))
+
+;; run all queues in parallel per db but sequentially per queue for that db.
+;; - process the queues every 500 or so ms
+;; - allow for long running queries to continue but all other activities for that
+;; db will be blocked.
+;;
+(define (work-queue-processor acfg)
+ (let* ((threads (make-hash-table))) ;; fname => thread
+ (let loop ((fnames (hash-table-keys (area-wqueues acfg)))
+ (target-time (+ (current-milliseconds) 50)))
+ ;;(if (not (null? fnames))(print "Processing for these databases: " fnames))
+ (for-each
+ (lambda (fname)
+ ;; (print "processing for " fname)
+ ;;(process-db-queries acfg fname))
+ (let ((th (hash-table-ref/default threads fname #f)))
+ (if (and th (not (member (thread-state th) '(dead terminated))))
+ (begin
+ (print "WARNING: worker thread for " fname " is taking a long time.")
+ (print "Thread is in state " (thread-state th)))
+ (let ((th1 (make-thread (lambda ()
+ ;; (catch-and-dump
+ ;; (lambda ()
+ ;; (print "Process queries for " fname)
+ (let ((start-time (current-milliseconds)))
+ (process-db-queries acfg fname)
+ ;; (thread-sleep! 0.01) ;; need the thread to take at least some time
+ (hash-table-delete! threads fname)) ;; no mutexes?
+ fname)
+ "th1"))) ;; ))
+ (hash-table-set! threads fname th1)
+ (thread-start! th1)))))
+ fnames)
+ ;; (thread-sleep! 0.1) ;; give the threads some time to process requests
+ ;; burn time until 400ms is up
+ (let ((now-time (current-milliseconds)))
+ (if (< now-time target-time)
+ (let ((delta (- target-time now-time)))
+ (thread-sleep! (/ delta 1000)))))
+ (loop (hash-table-keys (area-wqueues acfg))
+ (+ (current-milliseconds) 50)))))
+
+;;======================================================================
+;; S T A T S G A T H E R I N G
+;;======================================================================
+
+(defstruct stat
+ (qcount-avg 0) ;; coarse running average
+ (qtime-avg 0) ;; coarse running average
+ (qcount 0) ;; total
+ (qtime 0) ;; total
+ (last-qcount 0) ;; last
+ (last-qtime 0) ;; last
+ (dbs '()) ;; list of db files handled by this node
+ (when 0)) ;; when the last query happened - seconds
+
+
+(define (update-stats acfg fname bucket duration numqueries)
+ (let* ((key fname) ;; for now do not use bucket. Was: (conc fname "-" bucket)) ;; lazy but good enough
+ (stats (or (hash-table-ref/default (area-stats acfg) key #f)
+ (let ((newstats (make-stat)))
+ (hash-table-set! (area-stats acfg) key newstats)
+ newstats))))
+ ;; when the last query happended (used to remove the fname from the active list)
+ (stat-when-set! stats (current-seconds))
+ ;; last values
+ (stat-last-qcount-set! stats numqueries)
+ (stat-last-qtime-set! stats duration)
+ ;; total over process lifetime
+ (stat-qcount-set! stats (+ (stat-qcount stats) numqueries))
+ (stat-qtime-set! stats (+ (stat-qtime stats) duration))
+ ;; coarse average
+ (stat-qcount-avg-set! stats (/ (+ (stat-qcount-avg stats) numqueries) 2))
+ (stat-qtime-avg-set! stats (/ (+ (stat-qtime-avg stats) duration) 2))
+
+ ;; here is where we add the stats for a given dbfile
+ (if (not (member fname (stat-dbs stats)))
+ (stat-dbs-set! stats (cons fname (stat-dbs stats))))
+
+ ))
+
+;;======================================================================
+;; S E R V E R S T U F F
+;;======================================================================
+
+;; this does NOT return!
+;;
+(define (find-free-port-and-open acfg)
+ (let ((port (or (area-port acfg) 3200)))
+ (handle-exceptions
+ exn
+ (begin
+ (print "INFO: cannot bind to port " (rpc:default-server-port) ", trying next port")
+ (area-port-set! acfg (+ port 1))
+ (find-free-port-and-open acfg))
+ (rpc:default-server-port port)
+ (area-port-set! acfg port)
+ (tcp-read-timeout 120000)
+ ;; ((rpc:make-server (tcp-listen port)) #t)
+ (tcp-listen (rpc:default-server-port)
+ ))))
+
+;; register this node by putting a packet into the pkts dir.
+;; look for other servers
+;; contact other servers and compile list of servers
+;; there are two types of server
+;; main servers - dashboards, runners and dedicated servers - need pkt
+;; passive servers - test executers, step calls, list-runs - no pkt
+;;
+(define (register-node acfg hostip port-num)
+ ;;(mutex-lock! (area-mutex acfg))
+ (let* ((server-type (area-server-type acfg)) ;; auto, main, passive (no pkt created)
+ (best-ip (or hostip (get-my-best-address)))
+ (mtdir (area-dbdir acfg))
+ (pktdir (area-pktsdir acfg))) ;; conc mtdir "/.server-pkts")))
+ (print "Registering node " best-ip ":" port-num)
+ (if (not mtdir) ;; require a home for this node to put or find databases
+ #f
+ (begin
+ (if (not (directory? pktdir))(create-directory pktdir))
+ ;; server is started, now create pkt if needed
+ (print "Starting server in " server-type " mode with port " port-num)
+ (if (member server-type '(auto main)) ;; TODO: if auto, count number of servers registers, if > 3 then don't put out a pkt
+ (begin
+ (area-pktid-set! acfg
+ (write-alist->pkt
+ pktdir
+ `((hostname . ,(get-host-name))
+ (ipaddr . ,best-ip)
+ (port . ,port-num)
+ (pid . ,(current-process-id)))
+ pktspec: *pktspec*
+ ptype: 'server))
+ (area-pktfile-set! acfg (conc pktdir "/" (area-pktid acfg) ".pkt"))))
+ (area-port-set! acfg port-num)
+ #;(mutex-unlock! (area-mutex acfg))))))
+
+(define *cookie-seqnum* 0)
+(define (make-cookie key)
+ (set! *cookie-seqnum* (add1 *cookie-seqnum*))
+ ;;(print "MAKE COOKIE CALLED -- on "servkey"-"*cookie-seqnum*)
+ (conc key "-" *cookie-seqnum*)
+ )
+
+;; dispatch locally if possible
+;;
+(define (call-deliver-response acfg ipaddr port cookie data)
+ (if (and (equal? (area-myaddr acfg) ipaddr)
+ (equal? (area-port acfg) port))
+ (deliver-response acfg cookie data)
+ ((rpc:procedure 'response ipaddr port) cookie data)))
+
+(define (deliver-response acfg cookie data)
+ (let ((deliver-response-start (current-milliseconds)))
+ (thread-start! (make-thread
+ (lambda ()
+ (let loop ((tries-left 5))
+ ;;(print "TOP OF DELIVER_RESPONSE LOOP; triesleft="tries-left)
+ ;;(pp (hash-table->alist (area-cookie2mbox acfg)))
+ (let* ((mbox (hash-table-ref/default (area-cookie2mbox acfg) cookie #f)))
+ (cond
+ ((eq? 0 tries-left)
+ (print "ulex:deliver-response: I give up. Mailbox never appeared. cookie="cookie)
+ )
+ (mbox
+ ;;(print "got mbox="mbox" got data="data" send.")
+ (mailbox-send! mbox data))
+ (else
+ ;;(print "no mbox yet. look for "cookie)
+ (thread-sleep! (/ (- 6 tries-left) 10))
+ (loop (sub1 tries-left))))))
+ ;; (debug-pp (list (conc "ulex:deliver-response took " (- (current-milliseconds) deliver-response-start) " ms, cookie=" cookie " data=") data))
+ (sdbg> "deliver-response" "mailbox-send" deliver-response-start #f #f cookie)
+ )
+ (conc "deliver-response thread for cookie="cookie))))
+ #t)
+
+;; action:
+;; immediate - quick actions, no need to put in queues
+;; dbwrite - put in dbwrite queue
+;; dbread - put in dbread queue
+;; oslong - os actions, e.g. du, that could take a long time
+;; osshort - os actions that should be quick, e.g. df
+;;
+(define (request acfg from-ipaddr from-port servkey action cookie fname params) ;; std-peer-handler
+ ;; NOTE: Use rpc:current-peer for getting return address
+ (let* ((std-peer-handler-start (current-milliseconds))
+ ;; (raw-data (alist-ref 'data dat))
+ (rdat (hash-table-ref/default
+ (area-rtable acfg) action #f)) ;; this looks up the sql query or other details indexed by the action
+ (witem (make-witem ripaddr: from-ipaddr ;; rhost: from-host
+ rport: from-port action: action
+ rdat: rdat cookie: cookie
+ servkey: servkey data: params ;; TODO - rename data to params
+ caller: (rpc:current-peer))))
+ (if (not (equal? servkey (area-pktid acfg)))
+ `(#f . ,(conc "I don't know you servkey=" servkey ", pktid=" (area-pktid acfg))) ;; immediately return this
+ (let* ((ctype (if rdat
+ (calldat-ctype rdat) ;; is this necessary? these should be identical
+ action)))
+ (sdbg> "std-peer-handler" "immediate" std-peer-handler-start #f #f)
+ (case ctype
+ ;; (dbwrite acfg rdat (cons from-ipaddr from-port) data)))
+ ((full-ping) `(#t "ack to full ping" ,(work-queue-add acfg fname witem) ,cookie))
+ ((response) `(#t "ack from requestor" ,(deliver-response acfg fname params)))
+ ((dbwrite) `(#t "db write submitted" ,(work-queue-add acfg fname witem) ,cookie))
+ ((dbread) `(#t "db read submitted" ,(work-queue-add acfg fname witem) ,cookie ))
+ ((dbrw) `(#t "db read/write submitted" ,cookie))
+ ((osshort) `(#t "os short submitted" ,cookie))
+ ((oslong) `(#t "os long submitted" ,cookie))
+ (else `(#f "unrecognised action" ,ctype)))))))
+
+;; Call this to start the actual server
+;;
+;; start_server
+;;
+;; mode: '
+;; handler: proc which takes pktrecieved as argument
+;;
+
+(define (start-server acfg)
+ (let* ((conn (find-free-port-and-open acfg))
+ (port (area-port acfg)))
+ (rpc:publish-procedure!
+ 'delist-db
+ (lambda (fname)
+ (hash-table-delete! (area-dbs acfg) fname)))
+ (rpc:publish-procedure!
+ 'calling-addr
+ (lambda ()
+ (rpc:current-peer)))
+ (rpc:publish-procedure!
+ 'ping
+ (lambda ()(real-ping acfg)))
+ (rpc:publish-procedure!
+ 'request
+ (lambda (from-addr from-port servkey action cookie dbname params)
+ (request acfg from-addr from-port servkey action cookie dbname params)))
+ (rpc:publish-procedure!
+ 'response
+ (lambda (cookie res-dat)
+ (deliver-response acfg cookie res-dat)))
+ (area-ready-set! acfg #t)
+ (area-conn-set! acfg conn)
+ ((rpc:make-server conn) #f)));; ((tcp-listen (rpc:default-server-port)) #t)
+
+
+(define (launch acfg) ;; #!optional (proc std-peer-handler))
+ (print "starting launch")
+ (update-known-servers acfg) ;; gotta do this on every start (thus why limit number of publicised servers)
+ #;(let ((original-handler (current-exception-handler))) ;; is th
+ (lambda (exception)
+ (server-exit-procedure)
+ (original-handler exception)))
+ (on-exit (lambda ()
+ (shutdown acfg))) ;; (finalize-all-db-handles acfg)))
+ ;; set up the rpc handler
+ (let* ((th1 (make-thread
+ (lambda ()(start-server acfg))
+ "server thread"))
+ (th2 (make-thread
+ (lambda ()
+ (print "th2 starting")
+ (let loop ()
+ (work-queue-processor acfg)
+ (print "work-queue-processor crashed!")
+ (loop)))
+ "work queue thread")))
+ (thread-start! th1)
+ (thread-start! th2)
+ (let loop ()
+ (thread-sleep! 0.025)
+ (if (area-ready acfg)
+ #t
+ (loop)))
+ ;; attempt to fix my address
+ (let* ((all-addr (get-all-ips-sorted))) ;; could use (tcp-addresses conn)?
+ (let loop ((rem-addrs all-addr))
+ (if (null? rem-addrs)
+ (begin
+ (print "ERROR: Failed to figure out the ip address of myself as a server. Giving up.")
+ (exit 1)) ;; BUG Changeme to raising an exception
+
+ (let* ((addr (car rem-addrs))
+ (good-addr (handle-exceptions
+ exn
+ #f
+ ((rpc:procedure 'calling-addr addr (area-port acfg))))))
+ (if good-addr
+ (begin
+ (print "Got good-addr of " good-addr)
+ (area-myaddr-set! acfg good-addr))
+ (loop (cdr rem-addrs)))))))
+ (register-node acfg (area-myaddr acfg)(area-port acfg))
+ (print "INFO: Server started on " (area-myaddr acfg) ":" (area-port acfg))
+ ;; (update-known-servers acfg) ;; gotta do this on every start (thus why limit number of publicised servers)
+ ))
+
+(define (clear-server-pkt acfg)
+ (let ((pktf (area-pktfile acfg)))
+ (if pktf (delete-file* pktf))))
+
+(define (shutdown acfg)
+ (let (;;(conn (area-conn acfg))
+ (pktf (area-pktfile acfg))
+ (port (area-port acfg)))
+ (if pktf (delete-file* pktf))
+ (send-all "imshuttingdown")
+ ;; (rpc:close-all-connections!) ;; don't know if this is actually needed
+ (finalize-all-db-handles acfg)))
+
+(define (send-all msg)
+ #f)
+
+;; given a area record look up all the packets
+;;
+(define (get-all-server-pkts acfg)
+ (let ((all-pkt-files (glob (conc (area-pktsdir acfg) "/*.pkt"))))
+ (map (lambda (pkt-file)
+ (read-pkt->alist pkt-file pktspec: *pktspec*))
+ all-pkt-files)))
+
+#;((Z . "9a0212302295a19610d5796fce0370fa130758e9")
+ (port . "34827")
+ (pid . "28748")
+ (hostname . "zeus")
+ (T . "server")
+ (D . "1549427032.0"))
+
+#;(define (get-my-best-address)
+ (let ((all-my-addresses (get-all-ips))) ;; (vector->list (hostinfo-addresses (hostname->hostinfo (get-host-name))))))
+ (cond
+ ((null? all-my-addresses)
+ (get-host-name)) ;; no interfaces?
+ ((eq? (length all-my-addresses) 1)
+ (ip->string (car all-my-addresses))) ;; only one to choose from, just go with it
+ (else
+ (ip->string (car (filter (lambda (x) ;; take any but 127.
+ (not (eq? (u8vector-ref x 0) 127)))
+ all-my-addresses)))))))
+
+;; whoami? I am my pkt
+;;
+(define (whoami? acfg)
+ (hash-table-ref/default (area-hosts acfg)(area-pktid acfg) #f))
+
+;;======================================================================
+;; "Client side" operations
+;;======================================================================
+
+(define (safe-call call-key host port . params)
+ (handle-exceptions
+ exn
+ (begin
+ (print "Call " call-key " to " host ":" port " failed")
+ #f)
+ (apply (rpc:procedure call-key host port) params)))
+
+;; ;; convert to/from string / sexpr
+;;
+;; (define (string->sexpr str)
+;; (if (string? str)
+;; (with-input-from-string str read)
+;; str))
+;;
+;; (define (sexpr->string s)
+;; (with-output-to-string (lambda ()(write s))))
+
+;; is the server alive?
+;;
+(define (ping acfg host port)
+ (let* ((myaddr (area-myaddr acfg))
+ (myport (area-port acfg))
+ (start-time (current-milliseconds))
+ (res (if (and (equal? myaddr host)
+ (equal? myport port))
+ (real-ping acfg)
+ ((rpc:procedure 'ping host port)))))
+ (cons (- (current-milliseconds) start-time)
+ res)))
+
+;; returns ( ipaddr port alist-fname=>randnum )
+(define (real-ping acfg)
+ `(,(area-myaddr acfg) ,(area-port acfg) ,(get-host-stats acfg)))
+
+;; is the server alive AND the queues processing?
+;;
+#;(define (full-ping acfg servpkt)
+ (let* ((start-time (current-milliseconds))
+ (res (send-message acfg servpkt '(full-ping) 'full-ping)))
+ (cons (- (current-milliseconds) start-time)
+ res))) ;; (equal? res "got ping"))))
+
+
+;; look up all pkts and get the server id (the hash), port, host/ip
+;; store this info in acfg
+;; return the number of responsive servers found
+;;
+;; DO NOT VERIFY THAT THE SERVER IS ALIVE HERE. This is called at times where the current server is not yet alive and cannot ping itself
+;;
+(define (update-known-servers acfg)
+ ;; readll all pkts
+ ;; foreach pkt; if it isn't me ping the server; if alive, add to hosts hash, else rm the pkt
+ (let* ((start-time (current-milliseconds))
+ (all-pkts (delete-duplicates
+ (append (get-all-server-pkts acfg)
+ (hash-table-values (area-hosts acfg)))))
+ (hostshash (area-hosts acfg))
+ (my-id (area-pktid acfg))
+ (pktsdir (area-pktsdir acfg)) ;; needed to remove pkts from non-responsive servers
+ (numsrvs 0)
+ (delpkt (lambda (pktsdir sid)
+ (print "clearing out server " sid)
+ (delete-file* (conc pktsdir "/" sid ".pkt"))
+ (hash-table-delete! hostshash sid))))
+ (area-last-srvup-set! acfg (current-seconds))
+ (for-each
+ (lambda (servpkt)
+ (if (list? servpkt)
+ ;; (pp servpkt)
+ (let* ((shost (alist-ref 'ipaddr servpkt))
+ (sport (any->number (alist-ref 'port servpkt)))
+ (res (handle-exceptions
+ exn
+ (begin
+ ;; (print "INFO: bad server on " shost ":" sport)
+ #f)
+ (ping acfg shost sport)))
+ (sid (alist-ref 'Z servpkt)) ;; Z code is our name for the server
+ (url (conc shost ":" sport))
+ )
+ #;(if (or (not res)
+ (null? res))
+ (begin
+ (print "STRANGE: ping of " url " gave " res)))
+
+ ;; (print "Got " res " from " shost ":" sport)
+ (match res
+ ((qduration . payload)
+ ;; (print "Server pkt:" (alist-ref 'ipaddr servpkt) ":" (alist-ref 'port servpkt)
+ ;; (if payload
+ ;; "Success" "Fail"))
+ (match payload
+ ((host port stats)
+ ;; (print "From " host ":" port " got stats: " stats)
+ (if (and host port stats)
+ (let ((url (conc host ":" port)))
+ (hash-table-set! hostshash sid servpkt)
+ ;; store based on host:port
+ (hash-table-set! (area-hoststats acfg) sid stats))
+ (print "missing data from the server, not sure what that means!"))
+ (set! numsrvs (+ numsrvs 1)))
+ (#f
+ (print "Removing pkt " sid " due to #f from server or failed ping")
+ (delpkt pktsdir sid))
+ (else
+ (print "Got ")(pp res)(print " from server ")(pp servpkt) " but response did not match (#f/#t . msg)")))
+ (else
+ ;; here we delete the pkt - can't reach the server, remove it
+ ;; however this logic is inadequate. we should mark the server as checked
+ ;; and not good, if it happens a second time - then remove the pkt
+ ;; or something similar. I.e. don't be too quick to assume the server is wedged or dead
+ ;; could be it is simply too busy to reply
+ (let ((bad-pings (hash-table-ref/default (area-health acfg) url 0)))
+ (if (> bad-pings 1) ;; two bad pings - remove pkt
+ (begin
+ (print "INFO: " bad-pings " bad responses from " url ", deleting pkt " sid)
+ (delpkt pktsdir sid))
+ (begin
+ (print "INFO: " bad-pings " bad responses from " shost ":" sport " not deleting pkt yet")
+ (hash-table-set! (area-health acfg)
+ url
+ (+ (hash-table-ref/default (area-health acfg) url 0) 1))
+ ))
+ ))))
+ ;; servpkt is not actually a pkt?
+ (begin
+ (print "Bad pkt " servpkt))))
+ all-pkts)
+ (sdbg> "update-known-servers" "end" start-time #f #f " found " numsrvs
+ " servers, pkts: " (map (lambda (p)
+ (alist-ref 'Z p))
+ all-pkts))
+ numsrvs))
+
+(defstruct srvstat
+ (numfiles 0) ;; number of db files handled by this server - subtract 1 for the db being currently looked at
+ (randnum #f) ;; tie breaker number assigned to by the server itself - applies only to the db under consideration
+ (pkt #f)) ;; the server pkt
+
+;;(define (srv->srvstat srvpkt)
+
+;; Get the server best for given dbname and key
+;;
+;; NOTE: key is not currently used. The key points to the kind of query, this may be useful for directing read-only queries.
+;;
+(define (get-best-server acfg dbname key)
+ (let* (;; (servers (hash-table-values (area-hosts acfg)))
+ (servers (area-hosts acfg))
+ (skeys (sort (hash-table-keys servers) string>=?)) ;; a stable listing
+ (start-time (current-milliseconds))
+ (srvstats (make-hash-table)) ;; srvid => srvstat
+ (url (conc (area-myaddr acfg) ":" (area-port acfg))))
+ ;; (print "scores for " dbname ": " (map (lambda (k)(cons k (calc-server-score acfg dbname k))) skeys))
+ (if (null? skeys)
+ (if (> (update-known-servers acfg) 0)
+ (get-best-server acfg dbname key) ;; some risk of infinite loop here, TODO add try counter
+ (begin
+ (print "ERROR: no server found!") ;; since this process is also a server this should never happen
+ #f))
+ (begin
+ ;; (print "in get-best-server with skeys=" skeys)
+ (if (> (- (current-seconds) (area-last-srvup acfg)) 10)
+ (begin
+ (update-known-servers acfg)
+ (sdbg> "get-best-server" "update-known-servers" start-time #f #f)))
+
+ ;; for each server look at the list of dbfiles, total number of dbs being handled
+ ;; and the rand number, save the best host
+ ;; also do a delist-db for each server dbfile not used
+ (let* ((best-server #f)
+ (servers-to-delist (make-hash-table)))
+ (for-each
+ (lambda (srvid)
+ (let* ((server (hash-table-ref/default servers srvid #f))
+ (stats (hash-table-ref/default (area-hoststats acfg) srvid '(()))))
+ ;; (print "stats: " stats)
+ (if server
+ (let* ((dbweights (car stats))
+ (srvload (length (filter (lambda (x)(not (equal? dbname (car x)))) dbweights)))
+ (dbrec (alist-ref dbname dbweights equal?)) ;; get the pair with fname . randscore
+ (randnum (if dbrec
+ dbrec ;; (cdr dbrec)
+ 0)))
+ (hash-table-set! srvstats srvid (make-srvstat numfiles: srvload randnum: randnum pkt: server))))))
+ skeys)
+
+ (let* ((sorted (sort (hash-table-values srvstats)
+ (lambda (a b)
+ (let ((numfiles-a (srvstat-numfiles a))
+ (numfiles-b (srvstat-numfiles b))
+ (randnum-a (srvstat-randnum a))
+ (randnum-b (srvstat-randnum b)))
+ (if (< numfiles-a numfiles-b) ;; Note, I don't think adding an offset works here. Goal was only move file handling to a different server if it has 2 less
+ #t
+ (if (and (equal? numfiles-a numfiles-b)
+ (< randnum-a randnum-b))
+ #t
+ #f))))))
+ (best (if (null? sorted)
+ (begin
+ (print "ERROR: should never be null due to self as server.")
+ #f)
+ (srvstat-pkt (car sorted)))))
+ #;(print "SERVER(" url "): " dbname ": " (map (lambda (srv)
+ (let ((p (srvstat-pkt srv)))
+ (conc (alist-ref 'ipaddr p) ":" (alist-ref 'port p)
+ "(" (srvstat-numfiles srv)","(srvstat-randnum srv)")")))
+ sorted))
+ best))))))
+
+ ;; send out an "I'm about to exit notice to all known servers"
+ ;;
+(define (death-imminent acfg)
+ '())
+
+;;======================================================================
+;; U L E X - T H E I N T E R E S T I N G S T U F F ! !
+;;======================================================================
+
+;; register a handler
+;; NOTES:
+;; dbinitsql is reserved for a list of sql statements for initializing the db
+;; dbinitfn is reserved for a db init function, if exists called after dbinitsql
+;;
+(define (register acfg key obj #!optional (ctype 'dbwrite))
+ (let ((ht (area-rtable acfg)))
+ (if (hash-table-exists? ht key)
+ (print "WARNING: redefinition of entry " key))
+ (hash-table-set! ht key (make-calldat obj: obj ctype: ctype))))
+
+;; usage: register-batch acfg '((key1 . sql1) (key2 . sql2) ... )
+;; NB// obj is often an sql query
+;;
+(define (register-batch acfg ctype data)
+ (let ((ht (area-rtable acfg)))
+ (map (lambda (dat)
+ (hash-table-set! ht (car dat)(make-calldat obj: (cdr dat) ctype: ctype)))
+ data)))
+
+(define (initialize-area-calls-from-specfile area specfile)
+ (let* ((callspec (with-input-from-file specfile read )))
+ (for-each (lambda (group)
+ (register-batch
+ area
+ (car group)
+ (cdr group)))
+ callspec)))
+
+;; get-rentry
+;;
+(define (get-rentry acfg key)
+ (hash-table-ref/default (area-rtable acfg) key #f))
+
+(define (get-rsql acfg key)
+ (let ((cdat (get-rentry acfg key)))
+ (if cdat
+ (calldat-obj cdat)
+ #f)))
+
+
+
+;; blocking call:
+;; client server
+;; ------ ------
+;; call()
+;; send-message()
+;; nmsg-send()
+;; nmsg-receive()
+;; nmsg-respond(ack,cookie)
+;; ack, cookie
+;; mbox-thread-wait(cookie)
+;; nmsg-send(client,cookie,result)
+;; nmsg-respond(ack)
+;; return result
+;;
+;; reserved action:
+;; 'immediate
+;; 'dbinitsql
+;;
+(define (call acfg dbname action params #!optional (count 0))
+ (let* ((call-start-time (current-milliseconds))
+ (srv (get-best-server acfg dbname action))
+ (post-get-start-time (current-milliseconds))
+ (rdat (hash-table-ref/default (area-rtable acfg) action #f))
+ (myid (trim-pktid (area-pktid acfg)))
+ (srvid (trim-pktid (alist-ref 'Z srv)))
+ (cookie (make-cookie myid)))
+ (sdbg> "call" "get-best-server" call-start-time #f call-start-time " from: " myid " to server: " srvid " for " dbname " action: " action " params: " params " rdat: " rdat)
+ (print "INFO: call to " (alist-ref 'ipaddr srv) ":" (alist-ref 'port srv) " from " (area-myaddr acfg) ":" (area-port acfg) " for " dbname)
+ (if (and srv rdat) ;; need both to dispatch a request
+ (let* ((ripaddr (alist-ref 'ipaddr srv))
+ (rsrvid (alist-ref 'Z srv))
+ (rport (any->number (alist-ref 'port srv)))
+ (res-full (if (and (equal? ripaddr (area-myaddr acfg))
+ (equal? rport (area-port acfg)))
+ (request acfg ripaddr rport (area-pktid acfg) action cookie dbname params)
+ (safe-call 'request ripaddr rport
+ (area-myaddr acfg)
+ (area-port acfg)
+ #;(area-pktid acfg)
+ rsrvid
+ action cookie dbname params))))
+ ;; (print "res-full: " res-full)
+ (match res-full
+ ((response-ok response-msg rem ...)
+ (let* ((send-message-time (current-milliseconds))
+ ;; (match res-full
+ ;; ((response-ok response-msg)
+ ;; (response-ok (car res-full))
+ ;; (response-msg (cadr res-full)
+ )
+ ;; (res (take res-full 3))) ;; ctype == action, TODO: converge on one term <<=== what was this? BUG
+ ;; (print "ulex:call: send-message took " (- send-message-time post-get-start-time) " ms params=" params)
+ (sdbg> "call" "send-message" post-get-start-time #f call-start-time)
+ (cond
+ ((not response-ok) #f)
+ ((member response-msg '("db read submitted" "db write submitted"))
+ (let* ((cookie-id (cadddr res-full))
+ (mbox (make-mailbox))
+ (mbox-time (current-milliseconds)))
+ (hash-table-set! (area-cookie2mbox acfg) cookie-id mbox)
+ (let* ((mbox-timeout-secs 20)
+ (mbox-timeout-result 'MBOX_TIMEOUT)
+ (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result))
+ (mbox-receive-time (current-milliseconds)))
+ (hash-table-delete! (area-cookie2mbox acfg) cookie-id)
+ (sdbg> "call" "mailbox-receive" mbox-time #f call-start-time " from: " myid " to server: " srvid " for " dbname)
+ ;; (print "ulex:call mailbox-receive took " (- mbox-receive-time mbox-time) "ms params=" params)
+ res)))
+ (else
+ (print "Unhandled response \""response-msg"\"")
+ #f))
+ ;; depending on what action (i.e. ctype) is we will block here waiting for
+ ;; all the data (mechanism to be determined)
+ ;;
+ ;; if res is a "working on it" then wait
+ ;; wait for result
+ ;; mailbox thread wait on
+
+ ;; if res is a "can't help you" then try a different server
+ ;; if res is a "ack" (e.g. for one-shot requests) then return res
+ ))
+ (else
+ (if (< count 10)
+ (let* ((url (conc (alist-ref 'ipaddr srv) ":" (alist-ref 'port srv))))
+ (thread-sleep! 1)
+ (print "ERROR: Bad result from " url ", dbname: " dbname ", action: " action ", params: " params ". Trying again in 1 second.")
+ (call acfg dbname action params (+ count 1)))
+ (begin
+ (error (conc "ERROR: " count " tries, still have improper response res-full=" res-full)))))))
+ (begin
+ (if (not rdat)
+ (print "ERROR: action " action " not registered.")
+ (if (< count 10)
+ (begin
+ (thread-sleep! 1)
+ (area-hosts-set! acfg (make-hash-table)) ;; clear out all known hosts
+ (print "ERROR: no server found, srv=" srv ", trying again in 1 seconds")
+ (call acfg dbname action params (+ count 1)))
+ (begin
+ (error (conc "ERROR: no server found after 10 tries, srv=" srv ", giving up."))
+ #;(error "No server available"))))))))
+
+
+;;======================================================================
+;; U T I L I T I E S
+;;======================================================================
+
+;; get a signature for identifing this process
+;;
+(define (get-process-signature)
+ (cons (get-host-name)(current-process-id)))
+
+;;======================================================================
+;; S Y S T E M S T U F F
+;;======================================================================
+
+;; get normalized cpu load by reading from /proc/loadavg and
+;; /proc/cpuinfo return all three values and the number of real cpus
+;; and the number of threads returns alist '((adj-cpu-load
+;; . normalized-proc-load) ... etc. keys: adj-proc-load,
+;; adj-core-load, 1m-load, 5m-load, 15m-load
+;;
+(define (get-normalized-cpu-load)
+ (let ((res (get-normalized-cpu-load-raw))
+ (default `((adj-proc-load . 2) ;; there is no right answer
+ (adj-core-load . 2)
+ (1m-load . 2)
+ (5m-load . 0) ;; causes a large delta - thus causing default of throttling if stuff goes wrong
+ (15m-load . 0)
+ (proc . 1)
+ (core . 1)
+ (phys . 1)
+ (error . #t))))
+ (cond
+ ((and (list? res)
+ (> (length res) 2))
+ res)
+ ((eq? res #f) default) ;; add messages?
+ ((eq? res #f) default) ;; this would be the #eof
+ (else default))))
+
+(define (get-normalized-cpu-load-raw)
+ (let* ((actual-host (get-host-name))) ;; #f is localhost
+ (let ((data (append
+ (with-input-from-file "/proc/loadavg" read-lines)
+ (with-input-from-file "/proc/cpuinfo" read-lines)
+ (list "end")))
+ (load-rx (regexp "^([\\d\\.]+)\\s+([\\d\\.]+)\\s+([\\d\\.]+)\\s+.*$"))
+ (proc-rx (regexp "^processor\\s+:\\s+(\\d+)\\s*$"))
+ (core-rx (regexp "^core id\\s+:\\s+(\\d+)\\s*$"))
+ (phys-rx (regexp "^physical id\\s+:\\s+(\\d+)\\s*$"))
+ (max-num (lambda (p n)(max (string->number p) n))))
+ ;; (print "data=" data)
+ (if (null? data) ;; something went wrong
+ #f
+ (let loop ((hed (car data))
+ (tal (cdr data))
+ (loads #f)
+ (proc-num 0) ;; processor includes threads
+ (phys-num 0) ;; physical chip on motherboard
+ (core-num 0)) ;; core
+ ;; (print hed ", " loads ", " proc-num ", " phys-num ", " core-num)
+ (if (null? tal) ;; have all our data, calculate normalized load and return result
+ (let* ((act-proc (+ proc-num 1))
+ (act-phys (+ phys-num 1))
+ (act-core (+ core-num 1))
+ (adj-proc-load (/ (car loads) act-proc))
+ (adj-core-load (/ (car loads) act-core))
+ (result
+ (append (list (cons 'adj-proc-load adj-proc-load)
+ (cons 'adj-core-load adj-core-load))
+ (list (cons '1m-load (car loads))
+ (cons '5m-load (cadr loads))
+ (cons '15m-load (caddr loads)))
+ (list (cons 'proc act-proc)
+ (cons 'core act-core)
+ (cons 'phys act-phys)))))
+ result)
+ (regex-case
+ hed
+ (load-rx ( x l1 l5 l15 ) (loop (car tal)(cdr tal)(map string->number (list l1 l5 l15)) proc-num phys-num core-num))
+ (proc-rx ( x p ) (loop (car tal)(cdr tal) loads (max-num p proc-num) phys-num core-num))
+ (phys-rx ( x p ) (loop (car tal)(cdr tal) loads proc-num (max-num p phys-num) core-num))
+ (core-rx ( x c ) (loop (car tal)(cdr tal) loads proc-num phys-num (max-num c core-num)))
+ (else
+ (begin
+ ;; (print "NO MATCH: " hed)
+ (loop (car tal)(cdr tal) loads proc-num phys-num core-num))))))))))
+
+(define (get-host-stats acfg)
+ (let ((stats-hash (area-stats acfg)))
+ ;; use this opportunity to remove references to dbfiles which have not been accessed in a while
+ (for-each
+ (lambda (dbname)
+ (let* ((stats (hash-table-ref stats-hash dbname))
+ (last-access (stat-when stats)))
+ (if (and (> last-access 0) ;; if zero then there has been no access
+ (> (- (current-seconds) last-access) 10)) ;; not used in ten seconds
+ (begin
+ (print "Removing " dbname " from stats list")
+ (hash-table-delete! stats-hash dbname) ;; remove from stats hash
+ (stat-dbs-set! stats (hash-table-keys stats))))))
+ (hash-table-keys stats-hash))
+
+ `(,(hash-table->alist (area-dbs acfg)) ;; dbname => randnum
+ ,(map (lambda (dbname) ;; dbname is the db name
+ (cons dbname (stat-when (hash-table-ref stats-hash dbname))))
+ (hash-table-keys stats-hash))
+ (cpuload . ,(get-normalized-cpu-load)))))
+ #;(stats . ,(map (lambda (k) ;; create an alist from the stats data
+ (cons k (stat->alist (hash-table-ref (area-stats acfg) k))))
+ (hash-table-keys (area-stats acfg))))
+
+#;(trace
+ ;; assv
+ ;; cdr
+ ;; caar
+ ;; ;; cdr
+ ;; call
+ ;; finalize-all-db-handles
+ ;; get-all-server-pkts
+ ;; get-normalized-cpu-load
+ ;; get-normalized-cpu-load-raw
+ ;; launch
+ ;; nmsg-send
+ ;; process-db-queries
+ ;; receive-message
+ ;; std-peer-handler
+ ;; update-known-servers
+ ;; work-queue-processor
+ )
+
+;;======================================================================
+;; netutil
+;; move this back to ulex-netutil.scm someday?
+;;======================================================================
+
+;; #include
+;; #include
+;; #include
+;; #include
+
+(foreign-declare "#include \"sys/types.h\"")
+(foreign-declare "#include \"sys/socket.h\"")
+(foreign-declare "#include \"ifaddrs.h\"")
+(foreign-declare "#include \"arpa/inet.h\"")
+
+;; get IP addresses from ALL interfaces
+(define get-all-ips
+ (foreign-safe-lambda* scheme-object ()
+ "
+
+// from https://stackoverflow.com/questions/17909401/linux-c-get-default-interfaces-ip-address :
+
+
+ C_word lst = C_SCHEME_END_OF_LIST, len, str, *a;
+// struct ifaddrs *ifa, *i;
+// struct sockaddr *sa;
+
+ struct ifaddrs * ifAddrStruct = NULL;
+ struct ifaddrs * ifa = NULL;
+ void * tmpAddrPtr = NULL;
+
+ if ( getifaddrs(&ifAddrStruct) != 0)
+ C_return(C_SCHEME_FALSE);
+
+// for (i = ifa; i != NULL; i = i->ifa_next) {
+ for (ifa = ifAddrStruct; ifa != NULL; ifa = ifa->ifa_next) {
+ if (ifa->ifa_addr->sa_family==AF_INET) { // Check it is
+ // a valid IPv4 address
+ tmpAddrPtr = &((struct sockaddr_in *)ifa->ifa_addr)->sin_addr;
+ char addressBuffer[INET_ADDRSTRLEN];
+ inet_ntop(AF_INET, tmpAddrPtr, addressBuffer, INET_ADDRSTRLEN);
+// printf(\"%s IP Address %s\\n\", ifa->ifa_name, addressBuffer);
+ len = strlen(addressBuffer);
+ a = C_alloc(C_SIZEOF_PAIR + C_SIZEOF_STRING(len));
+ str = C_string(&a, len, addressBuffer);
+ lst = C_a_pair(&a, str, lst);
+ }
+
+// else if (ifa->ifa_addr->sa_family==AF_INET6) { // Check it is
+// // a valid IPv6 address
+// tmpAddrPtr = &((struct sockaddr_in6 *)ifa->ifa_addr)->sin6_addr;
+// char addressBuffer[INET6_ADDRSTRLEN];
+// inet_ntop(AF_INET6, tmpAddrPtr, addressBuffer, INET6_ADDRSTRLEN);
+//// printf(\"%s IP Address %s\\n\", ifa->ifa_name, addressBuffer);
+// len = strlen(addressBuffer);
+// a = C_alloc(C_SIZEOF_PAIR + C_SIZEOF_STRING(len));
+// str = C_string(&a, len, addressBuffer);
+// lst = C_a_pair(&a, str, lst);
+// }
+
+// else {
+// printf(\" not an IPv4 address\\n\");
+// }
+
+ }
+
+ freeifaddrs(ifa);
+ C_return(lst);
+
+"))
+
+;; Change this to bias for addresses with a reasonable broadcast value?
+;;
+(define (ip-pref-less? a b)
+ (let* ((rate (lambda (ipstr)
+ (regex-case ipstr
+ ( "^127\\." _ 0 )
+ ( "^(10\\.0|192\\.168\\.)\\..*" _ 1 )
+ ( else 2 ) ))))
+ (< (rate a) (rate b))))
+
+
+(define (get-my-best-address)
+ (let ((all-my-addresses (get-all-ips))
+ ;;(all-my-addresses-old (vector->list (hostinfo-addresses (hostname->hostinfo (get-host-name)))))
+ )
+ (cond
+ ((null? all-my-addresses)
+ (get-host-name)) ;; no interfaces?
+ ((eq? (length all-my-addresses) 1)
+ (car all-my-addresses)) ;; only one to choose from, just go with it
+
+ (else
+ (car (sort all-my-addresses ip-pref-less?)))
+ ;; (else
+ ;; (ip->string (car (filter (lambda (x) ;; take any but 127.
+ ;; (not (eq? (u8vector-ref x 0) 127)))
+ ;; all-my-addresses))))
+
+ )))
+
+(define (get-all-ips-sorted)
+ (sort (get-all-ips) ip-pref-less?))
+
+
+)
ADDED ulex/ulex.setup
Index: ulex/ulex.setup
==================================================================
--- /dev/null
+++ ulex/ulex.setup
@@ -0,0 +1,11 @@
+;; Copyright 2007-2018, Matthew Welland.
+;;
+;; This program is made available under the GNU GPL version 2.0 or
+;; greater. See the accompanying file COPYING for details.
+;;
+;; This program is distributed WITHOUT ANY WARRANTY; without even the
+;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+;; PURPOSE.
+
+;;;; ulex.setup
+(standard-extension 'ulex "0.1")
ADDED ulex/ulex_europaeus-branch.jpg
Index: ulex/ulex_europaeus-branch.jpg
==================================================================
--- /dev/null
+++ ulex/ulex_europaeus-branch.jpg
cannot compute difference between binary files
ADDED ulex/write-cycle.fig
Index: ulex/write-cycle.fig
==================================================================
--- /dev/null
+++ ulex/write-cycle.fig
@@ -0,0 +1,186 @@
+#FIG 3.2 Produced by xfig version 3.2.5-alpha5
+Landscape
+Center
+Inches
+Letter
+100.00
+Single
+-2
+1200 2
+0 32 #c5b696
+0 33 #eef7fe
+0 34 #dbcaa5
+0 35 #404040
+0 36 #808080
+0 37 #bfbfbf
+0 38 #dfdfdf
+0 39 #8d8e8d
+0 40 #a9a9a9
+0 41 #555555
+0 42 #c6c2c6
+0 43 #565151
+0 44 #8d8d8d
+0 45 #d6d6d6
+0 46 #84807d
+0 47 #d1d1d1
+0 48 #3a3a3a
+0 49 #4573a9
+0 50 #adadad
+0 51 #7b79a4
+0 52 #444444
+0 53 #73758b
+0 54 #f6f6f6
+0 55 #414541
+0 56 #635dcd
+0 57 #bdbdbd
+0 58 #515151
+0 59 #e6e2e6
+0 60 #000049
+0 61 #797979
+0 62 #303430
+0 63 #414141
+0 64 #c6b595
+6 11775 7350 12750 9000
+6 11775 7350 12750 8775
+5 1 0 1 -1 -1 0 0 -1 0.000 0 1 0 0 12240.000 7050.000 11790 7650 12240 7800 12690 7650
+5 1 0 1 -1 -1 0 0 -1 0.000 0 1 0 0 12240.000 7950.000 11790 8550 12240 8700 12690 8550
+1 2 0 1 -1 -1 0 0 -1 0.000 1 0.0000 12240 7500 450 150 11790 7350 12690 7650
+2 1 0 1 -1 -1 0 0 -1 0.000 0 0 0 0 0 2
+ 12690 7575 12690 8550
+2 1 0 1 -1 -1 0 0 -1 0.000 0 0 0 0 0 2
+ 11790 7575 11790 8550
+4 0 0 50 -1 0 12 0.0000 4 150 210 12075 8250 db\001
+-6
+4 0 0 50 -1 0 12 0.0000 4 150 690 12000 9000 main.db\001
+-6
+6 7950 6975 9375 7575
+2 2 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 5
+ 7950 6975 9375 6975 9375 7575 7950 7575 7950 6975
+4 0 0 50 -1 0 12 0.0000 4 195 1335 8100 7350 send-responses\001
+-6
+6 450 10950 1425 12600
+6 450 10950 1425 12375
+5 1 0 1 -1 -1 0 0 -1 0.000 0 1 0 0 915.000 10650.000 465 11250 915 11400 1365 11250
+5 1 0 1 -1 -1 0 0 -1 0.000 0 1 0 0 915.000 11550.000 465 12150 915 12300 1365 12150
+1 2 0 1 -1 -1 0 0 -1 0.000 1 0.0000 915 11100 450 150 465 10950 1365 11250
+2 1 0 1 -1 -1 0 0 -1 0.000 0 0 0 0 0 2
+ 1365 11175 1365 12150
+2 1 0 1 -1 -1 0 0 -1 0.000 0 0 0 0 0 2
+ 465 11175 465 12150
+4 0 0 50 -1 0 12 0.0000 4 150 210 750 11850 db\001
+-6
+4 0 0 50 -1 0 12 0.0000 4 150 690 675 12600 main.db\001
+-6
+6 4800 15525 5775 16950
+5 1 0 1 -1 -1 0 0 -1 0.000 0 1 0 0 5265.000 15225.000 4815 15825 5265 15975 5715 15825
+5 1 0 1 -1 -1 0 0 -1 0.000 0 1 0 0 5265.000 16125.000 4815 16725 5265 16875 5715 16725
+1 2 0 1 -1 -1 0 0 -1 0.000 1 0.0000 5265 15675 450 150 4815 15525 5715 15825
+2 1 0 1 -1 -1 0 0 -1 0.000 0 0 0 0 0 2
+ 5715 15750 5715 16725
+2 1 0 1 -1 -1 0 0 -1 0.000 0 0 0 0 0 2
+ 4815 15750 4815 16725
+4 0 0 50 -1 0 12 0.0000 4 150 210 5100 16425 db\001
+-6
+6 8025 12750 9000 14175
+5 1 0 1 -1 -1 0 0 -1 0.000 0 1 0 0 8490.000 12450.000 8040 13050 8490 13200 8940 13050
+5 1 0 1 -1 -1 0 0 -1 0.000 0 1 0 0 8490.000 13350.000 8040 13950 8490 14100 8940 13950
+1 2 0 1 -1 -1 0 0 -1 0.000 1 0.0000 8490 12900 450 150 8040 12750 8940 13050
+2 1 0 1 -1 -1 0 0 -1 0.000 0 0 0 0 0 2
+ 8940 12975 8940 13950
+2 1 0 1 -1 -1 0 0 -1 0.000 0 0 0 0 0 2
+ 8040 12975 8040 13950
+4 0 0 50 -1 0 12 0.0000 4 150 210 8325 13650 db\001
+-6
+1 3 0 1 0 7 50 -1 -1 0.000 1 0.0000 2325 12675 645 645 2325 12675 2850 13050
+1 3 0 1 0 7 50 -1 -1 0.000 1 0.0000 6075 11025 645 645 6075 11025 6600 11400
+1 3 0 1 0 7 50 -1 -1 0.000 1 0.0000 6600 13950 645 645 6600 13950 7125 14325
+1 3 0 1 0 7 50 -1 -1 0.000 1 0.0000 3750 16650 645 645 3750 16650 4275 17025
+2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 1 0 2
+ 0 0 1.00 60.00 120.00
+ 2625 2250 7575 2250
+2 2 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 5
+ 7575 1875 9525 1875 9525 3750 7575 3750 7575 1875
+2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 1 0 2
+ 0 0 1.00 60.00 120.00
+ 8250 2400 8250 4275
+2 2 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 5
+ 7575 4275 9525 4275 9525 5100 7575 5100 7575 4275
+2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 1 0 2
+ 0 0 1.00 60.00 120.00
+ 7575 4650 2625 4650
+2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 1 0 3
+ 0 0 1.00 60.00 120.00
+ 9525 4650 10275 4650 10275 5175
+2 2 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 5
+ 9975 5175 10650 5175 10650 6975 9975 6975 9975 5175
+2 2 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 5
+ 9975 6975 10650 6975 10650 7575 9975 7575 9975 6975
+2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 1 0 3
+ 0 0 1.00 60.00 120.00
+ 10650 7125 12000 7125 12150 7350
+2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 1 0 3
+ 0 0 1.00 60.00 120.00
+ 11925 7350 11850 7200 10650 7200
+2 2 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 5
+ 675 1875 2625 1875 2625 5025 675 5025 675 1875
+2 2 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 5
+ 675 5025 2625 5025 2625 6000 675 6000 675 5025
+2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 1 0 2
+ 0 0 1.00 60.00 120.00
+ 1575 4800 1575 5325
+2 2 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 5
+ 3375 5100 5250 5100 5250 6300 3375 6300 3375 5100
+2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 1 0 4
+ 0 0 1.00 60.00 120.00
+ 7950 7275 6525 7275 6525 5700 5250 5700
+2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 1 0 2
+ 0 0 1.00 60.00 120.00
+ 3375 5700 2625 5700
+2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 1 0 2
+ 0 0 1.00 60.00 120.00
+ 1575 6000 1575 6825
+2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 1 0 2
+ 0 0 1.00 60.00 120.00
+ 9975 7275 9375 7275
+2 2 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 5
+ 450 1500 5775 1500 5775 7800 450 7800 450 1500
+2 2 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 5
+ 6075 1500 11325 1500 11325 7800 6075 7800 6075 1500
+2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 2
+ 5925 375 5925 9675
+2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 2
+ 1800 12225 1275 11775
+2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 2
+ 5550 11325 2850 12375
+2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 2
+ 5925 13875 2925 12900
+2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 2
+ 3525 16050 2625 13275
+2 1 0 1 12 7 50 -1 -1 0.000 0 0 -1 0 0 2
+ 7200 13800 8025 13500
+2 1 0 1 1 7 50 -1 -1 0.000 0 0 -1 0 0 2
+ 4350 16425 4800 16200
+2 1 0 1 12 7 50 -1 -1 0.000 0 0 -1 0 0 2
+ 6225 11700 6525 13350
+2 1 0 1 1 7 50 -1 -1 0.000 0 0 -1 0 0 2
+ 5850 11700 3975 16050
+2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 2
+ 4350 16575 4800 16350
+4 0 0 50 -1 0 12 0.0000 4 150 990 7575 1800 ulex:launch\001
+4 0 0 50 -1 0 12 0.0000 4 150 720 750 1800 ulex:call\001
+4 0 0 50 -1 0 12 0.0000 4 195 1230 1200 2325 send-message\001
+4 0 0 50 -1 0 12 0.0000 4 195 1470 7650 2325 receive-message\001
+4 0 0 50 -1 0 12 0.0000 4 195 1410 7725 4575 std-peer-handler\001
+4 0 0 50 -1 0 12 0.0000 4 195 2160 3600 4500 '(#t "info msg" )\001
+4 0 0 50 -1 0 12 0.0000 4 150 450 10725 5625 work\001
+4 0 0 50 -1 0 12 0.0000 4 150 525 10725 5880 queue\001
+4 0 0 50 -1 0 12 0.0000 4 150 1290 750 5775 mailbox - waits\001
+4 0 0 50 -1 0 12 0.0000 4 150 990 3525 5400 ulex:launch\001
+4 0 0 50 -1 0 12 0.0000 4 195 1470 3525 6000 receive-message\001
+4 0 0 50 -1 0 12 0.0000 4 150 480 1200 6975 result\001
+4 0 0 50 -1 0 12 0.0000 4 165 1185 1500 13500 megatest -run\001
+4 0 0 50 -1 0 12 0.0000 4 150 900 6375 11925 dashboard\001
+4 0 0 50 -1 0 12 0.0000 4 165 1590 6375 14925 megatest -execute\001
+4 0 0 50 -1 0 12 0.0000 4 195 2040 3150 17625 megatest -remove-keep\001
+4 0 0 50 -1 0 12 0.0000 4 150 375 8250 14400 1.db\001
+4 0 0 50 -1 0 12 0.0000 4 150 375 5025 17175 2.db\001