Index: Makefile ================================================================== --- Makefile +++ Makefile @@ -29,11 +29,12 @@ MSRCFILES = dbmod.scm rmtmod.scm commonmod.scm apimod.scm \ archivemod.scm clientmod.scm envmod.scm ezstepsmod.scm itemsmod.scm \ keysmod.scm launchmod.scm odsmod.scm processmod.scm runconfigmod.scm \ runsmod.scm servermod.scm subrunmod.scm tasksmod.scm testsmod.scm \ -pkts.scm mtargs.scm mtconfigf.scm ducttape-lib.scm megamod.scm +pkts.scm mtargs.scm mtconfigf.scm ducttape-lib.scm ulex.scm \ +megamod.scm GMSRCFILES = dcommonmod.scm vgmod.scm treemod.scm # Eggs to install (straightforward ones) @@ -208,21 +209,24 @@ # special include based modules mofiles/pkts.o : pkts/pkts.scm mofiles/mtargs.o : mtargs/mtargs.scm mofiles/mtconfigf.o : mtconfigf/mtconfigf.scm +mofiles/ulex.o : ulex/ulex.scm # mofile/ducttape-lib.o : ducttape/ducttape-lib.scm # Temporary while transitioning to new routine # runs.o : run-tests-queue-classic.scm run-tests-queue-new.scm # for the modularized stuff mofiles/commonmod.o : megatest-fossil-hash.scm -mofiles/dbmod.o : mofiles/commonmod.o mofiles/keysmod.o mofiles/tasksmod.o mofiles/odsmod.o +mofiles/dbmod.o : mofiles/commonmod.o mofiles/keysmod.o \ + mofiles/tasksmod.o mofiles/odsmod.o mofiles/commonmod.o : mofiles/processmod.o -mofiles/rmtmod.o : mofiles/dbmod.o mofiles/commonmod.o mofiles/apimod.o +mofiles/rmtmod.o : mofiles/dbmod.o mofiles/commonmod.o \ + mofiles/apimod.o mofiles/ulex.o mofiles/apimod.o : mofiles/dbmod.o # Removed from megamod.o dep: mofiles/ftail.o mofiles/megamod.o : \ mofiles/rmtmod.o \ mofiles/commonmod.o \ Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -697,17 +697,17 @@ (process:children #f)) (original-exit exit-code))))) ;; for some switches always print the command to stderr ;; -(if (args:any? "-run" "-runall" "-remove-runs" "-set-state-status" "-kill-runs" "-kill-rerun") +(if (args:any-defined? "-run" "-runall" "-remove-runs" "-set-state-status" "-kill-runs" "-kill-rerun") (debug:print 0 *default-log-port* (string-intersperse (argv) " "))) ;; some switches imply homehost. Exit here if not on homehost ;; (let ((homehost-required (list "-cleanup-db" "-server"))) - (if (apply args:any? homehost-required) + (if (apply args:any-defined? homehost-required) (if (not (common:on-homehost?)) (for-each (lambda (switch) (if (args:get-arg switch) (begin Index: mtargs/mtargs.scm ================================================================== --- mtargs/mtargs.scm +++ mtargs/mtargs.scm @@ -40,10 +40,12 @@ (hash-table-ref/default arg-hash arg (car default)))) (define (any-defined? . args) (not (null? (filter (lambda (x) x) (map get-arg args))))) + +;; (define any any-defined?) (define (get-arg-from ht arg . default) (if (null? default) (hash-table-ref/default ht arg #f) (hash-table-ref/default ht arg (car default)))) Index: rmtmod.scm ================================================================== --- rmtmod.scm +++ rmtmod.scm @@ -17,10 +17,11 @@ ;; along with Megatest. If not, see . ;;====================================================================== (declare (unit rmtmod)) +(declare (uses ulex)) ;; (declare (uses commonmod)) ;; (declare (uses dbmod)) ;; (declare (uses megamod)) (module rmtmod @@ -30,10 +31,10 @@ (import (prefix sqlite3 sqlite3:) posix typed-records srfi-18 srfi-69 format ports srfi-1 matchable) ;; (import commonmod) ;;; DO NOT ALLOW rmt*scm TO DEPEND ON common*scm!!!! ;; (import dbmod) ;; (import megamod) -(use (prefix ulex ulex:)) +(import (prefix ulex ulex:)) ;; (include "rmt-inc.scm") ;; (include "common_records.scm") ) ADDED ulex.scm Index: ulex.scm ================================================================== --- /dev/null +++ ulex.scm @@ -0,0 +1,23 @@ +;;====================================================================== +;; Copyright 2019, Matthew Welland. +;; +;; This file is part of Megatest. +;; +;; Megatest is free software: you can redistribute it and/or modify +;; it under the terms of the GNU General Public License as published by +;; the Free Software Foundation, either version 3 of the License, or +;; (at your option) any later version. +;; +;; Megatest is distributed in the hope that it will be useful, +;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +;; GNU General Public License for more details. +;; +;; You should have received a copy of the GNU General Public License +;; along with Megatest. If not, see . + +;;====================================================================== + +(declare (unit ulex)) + +(include "ulex/ulex.scm") ADDED ulex/Makefile Index: ulex/Makefile ================================================================== --- /dev/null +++ ulex/Makefile @@ -0,0 +1,25 @@ + + +all : example + +ulex.so : ulex.scm telemetry/telemetry.so netutil/ulex-netutil.so portlogger/portlogger.so + chicken-install + +telemetry/telemetry.so : telemetry/telemetry.scm + cd telemetry && chicken-install + +example : ulex.so example.scm + csc example.scm + +test : ulex.so + csi -b tests/run.scm + +portlogger/portlogger.so : portlogger/portlogger.scm + cd portlogger && chicken-install + csi -s portlogger/test.scm + +netutil/ulex-netutil.so: netutil/ulex-netutil.scm + cd netutil && chicken-install + +clean: + rm -f example *so */*so *.import.* */*.import.* ADDED ulex/README Index: ulex/README ================================================================== --- /dev/null +++ ulex/README @@ -0,0 +1,25 @@ +ulex: thorny evergreen shrubs, also known as furze or gorse. + +Your application needs a database but the moment you build in +dependence on Postgresql or Mysql your application becomes ponderous. +Ulex is a mesh networked sqlite3 database, intended to provide a +nimble alternative. It is designed so that nodes can come and go while +talking to each other and sharing data via sqlite3 in a (mostly) ACID +compliant way. Ulex mitigates issues with Sqlite3 on networked +filesystems (e.g. NFS, Moosefs etc.) such as "lock storms" (*) and +performance bottlenecks. + +(*) Network filesystems must provide posix locking across all files +across all clients. This is handled by the file server by a queue. If +the queue gets overwhelmed by lock requests locks may be dropped. This +leads to concurrent access to shared files and in the case of sqlite3 +the dreaded "database locked" message that won't go away (to fix you +have to move the file and copy it back). It can also lead to a +corrupted database. + +About the name Ulex: The Ulex plant, also known as Gorse, furze, or +Whin, is considered an invasive pest in places like New Zealand but in +spite of its thorny persistence it is considered quite useful in its +native Europe, providing edible flowers, shelter and food for +wildlife, hedges and fuel for cooking. + ADDED ulex/TODO Index: ulex/TODO ================================================================== --- /dev/null +++ ulex/TODO @@ -0,0 +1,19 @@ +* move cookie generation to start in (call ...) +* add timestamp from perspective of caller to data passed around +* add standard print for each event +* move portlogger into module (retrieve from Brandon's work) + + + +* Create queues + +* Create text line logging interface + +* Add registration calls; write, read, transactional, osslow and osfast + +* Extend example to cover all aspects + +* + +* + ADDED ulex/example-telemetry.scm Index: ulex/example-telemetry.scm ================================================================== --- /dev/null +++ ulex/example-telemetry.scm @@ -0,0 +1,57 @@ +(use telemetry) + +;; @startuml +;; Alice -> Bob: Authentication Request +;; Bob --> Alice: Authentication Response + +;; Alice -> Bob: Another authentication Request +;; Alice <-- Bob: Another authentication Response +;; +;; ref over Bob +;; This can be on +;; several lines +;; end ref +;; +;; @enduml +(define *actor-table* (make-hash-table)) +(define *actor-next-id* 0) +(define (get-actor-name key) + (let* ((cast (string-split "Alice,Bob,Carol,Dave,Eve,Frank,Grace,Heidi,Ivan,Judy,Mallory,Nancy,Oscar,Peggy,Rupert,Sybil,Ted,Victor,Wendy" ",")) + (cast-count (length cast)) + (name (hash-table-ref/default *actor-table* key #f))) + (cond + (name name) + (else + (let* ((id *actor-next-id*) + (name (list-ref cast (modulo id cast-count))) + (seq (inexact->exact (floor (/ id cast-count)))) + (newname + (conc + name + (if (> seq 0) + (conc "-" seq) + "")))) + (set! *actor-next-id* (add1 *actor-next-id*)) + (hash-table-set! *actor-table* key newname) + newname))))) + + +(define (ulex-telemetry-server) + (let* ((port 12345) + (seq 0) + (handler (lambda (msg) + (set! seq (add1 seq)) + (let* ((elapsed-ms (current-milliseconds)) + (payload (alist-ref 'payload msg)) + (action (if payload (alist-ref 'action payload))) + (from-key (if payload (alist-ref 'from-key payload))) + (to-key (if payload (alist-ref 'to-key payload))) + ) + (if (and action from-key to-key) + (print (get-actor-name from-key) " -> " (get-actor-name to-key) ": "action)) + ;;(pp msg) + )))) + (telemetry-server port handler))) + +(ulex-telemetry-server) + ADDED ulex/example.scm Index: ulex/example.scm ================================================================== --- /dev/null +++ ulex/example.scm @@ -0,0 +1,201 @@ +;;; dbi: Minimal gasket to postgresql, sqlite3 and mysql +;;; +;; Copyright (C) 2007-2016 Matt Welland +;; Redistribution and use in source and binary forms, with or without +;; modification, is permitted. +;; +;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS +;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +;; ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE +;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT +;; OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR +;; BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +;; LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +;; USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH +;; DAMAGE. + +(use regex srfi-18 matchable) + +(load "ulex.scm") +(import (prefix ulex ulex:)) + +(create-directory "ulexdb" #t) +(create-directory "pkts" #f) + +(define *area* (ulex:make-area + dbdir: (conc (current-directory) "/ulexdb") + pktsdir: (conc (current-directory) "/pkts") + )) +(define (toplevel-command . args) #f) +(use readline) + +;; two reserved keys in the ulex registration hash table are: +;; dbinitsql => a list of sql statements to be executed at db creation time +;; dbinitfn => a function of two params; dbh, the sql-de-lite db handle and +;; dbfname, the database filename +;; +(ulex:register-batch + *area* + 'dbwrite + `((dbinitsql . ("CREATE TABLE IF NOT EXISTS messages (id INTEGER PRIMARY KEY, message TEXT, author TEXT, msg_time INTEGER);")) + (savemsg . "INSERT INTO messages (message,author) VALUES (?,?)") + )) + +(ulex:register-batch + *area* + 'dbread + `((dbinitsql . ("CREATE TABLE IF NOT EXISTS messages (id INTEGER PRIMARY KEY, message TEXT, author TEXT, msg_time INTEGER);")) + (getnum . "SELECT COUNT(*) FROM messages") + (getsome . "SELECT * FROM messages LIMIT 10") + )) + +(define (worker mode-in) + (let* ((start (current-milliseconds)) + (iters-per-sample 10) + (mode (string->symbol mode-in)) + (max-count (case mode + ((all) 60) + (else 1000))) + (num-calls 0) + (report (lambda () + (let ((delta (- (current-milliseconds) start))) + (print "Completed " num-calls " in " delta + " for " (/ num-calls (/ delta 1000)) " calls per second"))))) + (if (eq? mode 'repl) + (begin + (import extras) ;; might not be needed + ;; (import csi) + (import readline) + (import apropos) + (import (prefix ulex ulex:)) + (install-history-file (get-environment-variable "HOME") ".example_history") ;; [homedir] [filename] [nlines]) + (current-input-port (make-readline-port "example> ")) + (repl)) + (let loop ((count 0)) + ;; (print "loop count=" count) + (for-each + (lambda (dbname) + ;;(print "TOP OF LAMBDA") + (case mode + ((all) + (let ((start-time (current-milliseconds)) + (message (conc "Test message #" count "! From pid: " (current-process-id))) + (user (current-user-name))) + (ulex:call *area* dbname 'savemsg `(,message ,user)) + (for-each (lambda (n) + (print "have this many " (ulex:call *area* dbname 'getnum '()) " records in main.db")) + (iota 10)) + (set! num-calls (+ num-calls 11)) + )) + + ((ping) + (let ((srvrs (ulex:get-all-server-pkts *area*))) + (for-each + (lambda (srv) + (print "Pinging " srv) + (ulex:ping *area* srv)) + srvrs))) + ((fullping) + (let ((srvrs (ulex:get-all-server-pkts *area*))) + (for-each + (lambda (srv) + (let ((ipaddr (alist-ref 'ipaddr srv)) + (port (any->number (alist-ref 'port srv)))) + (print "Full Ping to " srv) + (ulex:ping *area* ipaddr port))) + srvrs))) + ((passive) + (thread-sleep! 10)) + )) + '("main.db")) ;; "test.db" "run-1.db" "run-2.db" "run-3.db" "run-4.db")) + #;(thread-sleep! 0.001) + #;(let ((now (current-milliseconds))) + (if (and (> now start) + (eq? (modulo count iters-per-sample) 0)) + (begin + (print "queries per second: "(* 1000.0 (/ iters-per-sample (- now start)))) + (set! count 0) + (set! start (current-milliseconds))))) + ;; (print "count: " count " max-count: " max-count) + (if (< count max-count) + (loop (+ count 1))))) + (report) + (ulex:clear-server-pkt *area*) + (thread-sleep! 5) ;; let others keep using this server (needs to be built in to ulex) + ;; (print "Doing stuff") + ;; (thread-sleep! 10) + (print "Done doing stuff"))) + +(define (run-worker) + (thread-start! + (make-thread (lambda () + (thread-sleep! 5) + (worker "all")) + "worker"))) + +(define (main . args) + (if (member (car args) '("repl")) + (print "NOTE: No exit timer started.") + (thread-start! (make-thread (lambda () + (thread-sleep! (* 60 5)) + (ulex:clear-server-pkt *area*) + (thread-sleep! 5) + (exit 0))))) + (print "Launching server") + (ulex:launch *area*) + (print "LAUNCHED.") + (thread-sleep! 0.1) ;; chicken threads bit quirky? need little time for launch thread to get traction? + (apply worker args) + ) + +;;====================================================================== +;; Strive for clean exit handling +;;====================================================================== + +;; Ulex shutdown is handled within Ulex itself. + +#;(define (server-exit-procedure) + (on-exit (lambda () + ;; close the databases, ensure the pkt is removed! + ;; (thread-sleep! 2) + (ulex:shutdown *area*) + 0))) + +;; Copied from the SDL2 examples. +;; +;; Schedule quit! to be automatically called when your program exits normally. +#;(on-exit server-exit-procedure) + +;; Install a custom exception handler that will call quit! and then +;; call the original exception handler. This ensures that quit! will +;; be called even if an unhandled exception reaches the top level. +#;(current-exception-handler + (let ((original-handler (current-exception-handler))) + (lambda (exception) + (server-exit-procedure) + (original-handler exception)))) + +(if (file-exists? ".examplerc") + (load ".examplerc")) + +(let ((args-in (argv))) ;; command-line-arguments))) + (let ((args (match + args-in + (("csi" "--" args ...) args) + ((_ args ...) args) + (else args-in)))) + (if (null? args) + (begin + (print "Usage: example [mode]") + (print " where mode is one of:") + (print " ping : only do pings between servers") + (print " fullping : ping with response via processing queue") + (print " unix : only do unix commands") + (print " read : only do ping, unix and db reads") + (print " all : do pint, unix, and db reads and writes") + (exit)) + (apply main args)))) + ADDED ulex/netutil/testit.scm Index: ulex/netutil/testit.scm ================================================================== --- /dev/null +++ ulex/netutil/testit.scm @@ -0,0 +1,6 @@ + +(use ulex-netutil) +(use test) + +(test #f #t (not (not (member "127.0.0.1" (get-all-ips))))) + ADDED ulex/netutil/ulex-netutil.meta Index: ulex/netutil/ulex-netutil.meta ================================================================== --- /dev/null +++ ulex/netutil/ulex-netutil.meta @@ -0,0 +1,16 @@ +;; -*- scheme -*- +( +; Your egg's license: +(license "BSD") + +; Pick one from the list of categories (see below) for your egg and enter it +; here. +(category db) + +(needs foreign ) + +; A list of eggs required for TESTING ONLY. See the `Tests' section. +(test-depends test) + +(author "Brandon Barclay") +(synopsis "Get all IP addresses for all interfaces.")) ADDED ulex/netutil/ulex-netutil.release-info Index: ulex/netutil/ulex-netutil.release-info ================================================================== --- /dev/null +++ ulex/netutil/ulex-netutil.release-info @@ -0,0 +1,3 @@ +(repo fossil "http://www.kiatoa.com/cgi-bin/fossils/{egg-name}") +(uri zip "http://www.kiatoa.com/cgi-bin/fossils/{egg-name}/zip/{egg-name}.zip?uuid={egg-release}") +(release "0.1") ADDED ulex/netutil/ulex-netutil.scm Index: ulex/netutil/ulex-netutil.scm ================================================================== --- /dev/null +++ ulex/netutil/ulex-netutil.scm @@ -0,0 +1,134 @@ +;;; ulex: Distributed sqlite3 db +;;; +;; Copyright (C) 2018 Matt Welland +;; Redistribution and use in source and binary forms, with or without +;; modification, is permitted. +;; +;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS +;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +;; ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE +;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT +;; OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR +;; BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +;; LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +;; USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH +;; DAMAGE. + +;;====================================================================== +;; ABOUT: +;; See README in the distribution at https://www.kiatoa.com/fossils/ulex +;; NOTES: +;; provides all ipv4 addresses for all interfaces +;; +;;====================================================================== + +;; get IP addresses from ALL interfaces +(module ulex-netutil + (get-all-ips get-my-best-address get-all-ips-sorted) + +(import scheme chicken data-structures foreign ports regex-case posix) + + +;; #include +;; #include +;; #include +;; #include + +(foreign-declare "#include \"sys/types.h\"") +(foreign-declare "#include \"sys/socket.h\"") +(foreign-declare "#include \"ifaddrs.h\"") +(foreign-declare "#include \"arpa/inet.h\"") + +;; get IP addresses from ALL interfaces +(define get-all-ips + (foreign-safe-lambda* scheme-object () + " + +// from https://stackoverflow.com/questions/17909401/linux-c-get-default-interfaces-ip-address : + + + C_word lst = C_SCHEME_END_OF_LIST, len, str, *a; +// struct ifaddrs *ifa, *i; +// struct sockaddr *sa; + + struct ifaddrs * ifAddrStruct = NULL; + struct ifaddrs * ifa = NULL; + void * tmpAddrPtr = NULL; + + if ( getifaddrs(&ifAddrStruct) != 0) + C_return(C_SCHEME_FALSE); + +// for (i = ifa; i != NULL; i = i->ifa_next) { + for (ifa = ifAddrStruct; ifa != NULL; ifa = ifa->ifa_next) { + if (ifa->ifa_addr->sa_family==AF_INET) { // Check it is + // a valid IPv4 address + tmpAddrPtr = &((struct sockaddr_in *)ifa->ifa_addr)->sin_addr; + char addressBuffer[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, tmpAddrPtr, addressBuffer, INET_ADDRSTRLEN); +// printf(\"%s IP Address %s\\n\", ifa->ifa_name, addressBuffer); + len = strlen(addressBuffer); + a = C_alloc(C_SIZEOF_PAIR + C_SIZEOF_STRING(len)); + str = C_string(&a, len, addressBuffer); + lst = C_a_pair(&a, str, lst); + } + +// else if (ifa->ifa_addr->sa_family==AF_INET6) { // Check it is +// // a valid IPv6 address +// tmpAddrPtr = &((struct sockaddr_in6 *)ifa->ifa_addr)->sin6_addr; +// char addressBuffer[INET6_ADDRSTRLEN]; +// inet_ntop(AF_INET6, tmpAddrPtr, addressBuffer, INET6_ADDRSTRLEN); +//// printf(\"%s IP Address %s\\n\", ifa->ifa_name, addressBuffer); +// len = strlen(addressBuffer); +// a = C_alloc(C_SIZEOF_PAIR + C_SIZEOF_STRING(len)); +// str = C_string(&a, len, addressBuffer); +// lst = C_a_pair(&a, str, lst); +// } + +// else { +// printf(\" not an IPv4 address\\n\"); +// } + + } + + freeifaddrs(ifa); + C_return(lst); + +")) + +;; Change this to bias for addresses with a reasonable broadcast value? +;; +(define (ip-pref-less? a b) + (let* ((rate (lambda (ipstr) + (regex-case ipstr + ( "^127\\." _ 0 ) + ( "^(10\\.0|192\\.168\\.)\\..*" _ 1 ) + ( else 2 ) )))) + (< (rate a) (rate b)))) + + +(define (get-my-best-address) + (let ((all-my-addresses (get-all-ips)) + ;;(all-my-addresses-old (vector->list (hostinfo-addresses (hostname->hostinfo (get-host-name))))) + ) + (cond + ((null? all-my-addresses) + (get-host-name)) ;; no interfaces? + ((eq? (length all-my-addresses) 1) + (car all-my-addresses)) ;; only one to choose from, just go with it + + (else + (car (sort all-my-addresses ip-pref-less?))) + ;; (else + ;; (ip->string (car (filter (lambda (x) ;; take any but 127. + ;; (not (eq? (u8vector-ref x 0) 127))) + ;; all-my-addresses)))) + + ))) + +(define (get-all-ips-sorted) + (sort (get-all-ips) ip-pref-less?)) + +) ADDED ulex/netutil/ulex-netutil.setup Index: ulex/netutil/ulex-netutil.setup ================================================================== --- /dev/null +++ ulex/netutil/ulex-netutil.setup @@ -0,0 +1,11 @@ +;; Copyright 2007-2018, Matthew Welland. +;; +;; This program is made available under the GNU GPL version 2.0 or +;; greater. See the accompanying file COPYING for details. +;; +;; This program is distributed WITHOUT ANY WARRANTY; without even the +;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +;; PURPOSE. + +;;;; ulex.setup +(standard-extension 'ulex-netutil "0.1") ADDED ulex/portlogger/portlogger.meta Index: ulex/portlogger/portlogger.meta ================================================================== --- /dev/null +++ ulex/portlogger/portlogger.meta @@ -0,0 +1,16 @@ +;; -*- scheme -*- +( +; Your egg's license: +(license "BSD") + +; Pick one from the list of categories (see below) for your egg and enter it +; here. +(category db) + +(needs foreign ) + +; A list of eggs required for TESTING ONLY. See the `Tests' section. +(test-depends test sqlite3 regex) + +(author "Matthew Welland") +(synopsis "thoughtfully optain tcp port.")) ADDED ulex/portlogger/portlogger.release-info Index: ulex/portlogger/portlogger.release-info ================================================================== --- /dev/null +++ ulex/portlogger/portlogger.release-info @@ -0,0 +1,3 @@ +(repo fossil "http://www.kiatoa.com/cgi-bin/fossils/{egg-name}") +(uri zip "http://www.kiatoa.com/cgi-bin/fossils/{egg-name}/zip/{egg-name}.zip?uuid={egg-release}") +(release "0.1") ADDED ulex/portlogger/portlogger.scm Index: ulex/portlogger/portlogger.scm ================================================================== --- /dev/null +++ ulex/portlogger/portlogger.scm @@ -0,0 +1,167 @@ +;;====================================================================== +;; P O R T L O G G E R - track ports used on the current machine +;;====================================================================== + +;; + +(module portlogger + (pl-open-run-close pl-find-port pl-release-port pl-open-db pl-get-prev-used-port pl-get-port-state pl-take-port) + (import scheme + posix + chicken + data-structures + ;ports + extras + ;files + ;mailbox + ;telemetry + regex + ;regex-case + + ) + (use (prefix sqlite3 sqlite3:)) + (use posix) + (use regex) + + (define (pl-open-db fname) + (let* ((avail #t) ;; for now - assume wait on journal not needed (tasks:wait-on-journal fname 5 remove: #t)) ;; wait up to about 10 seconds for the journal to go away + (exists (file-exists? fname)) + (db (if avail + (sqlite3:open-database fname) + (begin + (system (conc "rm -f " fname)) + (sqlite3:open-database fname)))) + (handler (sqlite3:make-busy-timeout 136000)) + (canwrite (file-write-access? fname))) + (sqlite3:set-busy-handler! db handler) + (sqlite3:execute db "PRAGMA synchronous = 0;") + (sqlite3:execute + db + "CREATE TABLE IF NOT EXISTS ports ( + port INTEGER PRIMARY KEY, + state TEXT DEFAULT 'not-used', + fail_count INTEGER DEFAULT 0, + update_time TIMESTAMP DEFAULT (strftime('%s','now')) );") + db)) + + (define (pl-open-run-close proc . params) + (let* ((fname (conc "/tmp/." (current-user-name) "-portlogger.db")) + (avail #t)) ;; (tasks:wait-on-journal fname 10))) ;; wait up to about 10 seconds for the journal to go away + ;; (handle-exceptions + ;; exn + ;; (begin + ;; ;; (release-dot-lock fname) + ;; (debug:print-error 0 *default-log-port* "pl-open-run-close failed. " proc " " params) + ;; (debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn)) + ;; (debug:print 5 *default-log-port* "exn=" (condition->list exn)) + ;; (if (file-exists? fname)(delete-file fname)) ;; brutally get rid of it + ;; (print-call-chain (current-error-port))) + (let* (;; (lock (obtain-dot-lock fname 2 9 10)) + (db (pl-open-db fname)) + (res (apply proc db params))) + (sqlite3:finalize! db) + ;; (release-dot-lock fname) + res))) + ;; ) + + ;; (fold-row PROC INIT DATABASE SQL . PARAMETERS) + (define (pl-take-port db portnum) + (let* ((qry1 "INSERT INTO ports (port,state) VALUES (?,?);") + (qry2 "UPDATE ports SET state=?,update_time=strftime('%s','now') WHERE port=?;")) + (let* ((curr (pl-get-port-state db portnum)) + (res (case (string->symbol (or curr "n/a")) + ((released) (sqlite3:execute db qry2 "taken" portnum) 'taken) + ((not-tried n/a) (sqlite3:execute db qry1 portnum "taken") 'taken) + ((taken) 'already-taken) + ((failed) 'failed) + (else 'error)))) + ;; (print "res=" res) + res))) + + (define (pl-get-prev-used-port db) + ;; (handle-exceptions + ;; exn + ;; (with-output-to-port (current-error-port) + ;; (lambda () + ;; (print "EXCEPTION: portlogger database probably overloaded or unreadable. If you see this message again remove /tmp/.$USER-portlogger.db") + ;; (print " message: " ((condition-property-accessor 'exn 'message) exn)) + ;; (print "exn=" (condition->list exn)) + ;; (print-call-chain) ;; (current-error-port)) + ;; (print "Continuing anyway.") + ;; #f)) + (let ((res (sqlite3:fold-row + (lambda (var curr) + (or curr var curr)) + #f + db "SELECT port FROM ports WHERE state='released';"))) + (if res res #f))) + ;; ) + + (define (pl-find-port db acfg #!key (lowport 32768)) + ;;(slite3:with-transaction + ;; db + ;; (lambda () + (let loop ((numtries 0)) + (let* ((portnum (or (pl-get-prev-used-port db) + (+ lowport ;; top of registered ports is 49152 but let's use ports in the registered range + (random (- 64000 lowport)))))) + ;; (handle-exceptions + ;; exn + ;; (with-output-to-port (current-error-port) + ;; (lambda () + ;; (print "EXCEPTION: portlogger database probably overloaded or unreadable. If you see this message again remove /tmp/.$USER-portlogger.db") + ;; (print " message: " ((condition-property-accessor 'exn 'message) exn)) + ;; (print "exn=" (condition->list exn)) + ;; (print-call-chain) + ;; (print "Continuing anyway."))) + (pl-take-port db portnum) ;; always "take the port" + (if (pl-is-port-available portnum) + portnum + (begin + (pl-set-port db portnum "taken") + (loop (add1 numtries))))))) + + + ;; set port to "released", "failed" etc. + ;; + (define (pl-set-port db portnum value) + (sqlite3:execute db "UPDATE ports SET state=?,update_time=strftime('%s','now') WHERE port=?;") value portnum) + + ;; set port to "released", "failed" etc. + ;; + (define (pl-get-port-state db portnum) + (let ((res (sqlite3:fold-row ;; get the state of given port or "not-tried" + (lambda (var curr) ;; function on init/last current + (or curr var curr)) + #f ;; init + db "SELECT state FROM ports WHERE port=?;" + portnum))) ;; the parameter to the query + (if res res #f))) + + ;; (slite3:exec (slite3:sql db "UPDATE ports SET state=?,update_time=strftime('%s','now') WHERE port=?;") value portnum)) + + ;; release port + (define (pl-release-port db portnum) + (sqlite3:execute db "UPDATE ports SET state=?,update_time=strftime('%s','now') WHERE port=?;" "released" portnum) + (sqlite3:change-count db)) + + ;; set port to failed (attempted to take but got error) + ;; + (define (pl-set-failed db portnum) + (sqlite3:execute db "UPDATE ports SET state='failed',fail_count=fail_count+1,update_time=strftime('%s','now') WHERE port=?;" portnum) + (sqlite3:change-count db)) + + ;; pulled from mtut - TODO: remove from mtut, find a way *without* using netstat + ;; + (define (pl-is-port-available port-num) + (let-values (((inp oup pid) + (process "netstat" (list "-tulpn" )))) + (let loop ((inl (read-line inp))) + (if (not (eof-object? inl)) + (begin + (if (string-search (regexp (conc ":" port-num "\\s+")) inl) + #f + (loop (read-line inp)))) + #t)))) + + ) ;; end module ADDED ulex/portlogger/portlogger.setup Index: ulex/portlogger/portlogger.setup ================================================================== --- /dev/null +++ ulex/portlogger/portlogger.setup @@ -0,0 +1,11 @@ +;; Copyright 2007-2018, Matthew Welland. +;; +;; This program is made available under the GNU GPL version 2.0 or +;; greater. See the accompanying file COPYING for details. +;; +;; This program is distributed WITHOUT ANY WARRANTY; without even the +;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +;; PURPOSE. + +;;;; portlogger.setup +(standard-extension 'portlogger "0.1") ADDED ulex/portlogger/test.scm Index: ulex/portlogger/test.scm ================================================================== --- /dev/null +++ ulex/portlogger/test.scm @@ -0,0 +1,25 @@ +(use portlogger) +(use test) + +(test-begin "portlogger") +(use (prefix sqlite3 sqlite3:)) + +(define *port* #f) +(define *area* #f) +(test #f #f (begin + (pl-open-run-close + (lambda (db b) + (pl-get-prev-used-port db)) + *area*) + #f)) +(test #f #f (pl-open-run-close (lambda (db b)(pl-get-port-state db 1234567)) *area*)) +(test #f #f (number? (pl-open-run-close (lambda (db b)(pl-take-port db 123456)) *area*))) +(test #f #t (number? (let ((port (pl-open-run-close pl-find-port *area*))) + (set! *port* port) + port))) +(test #f 1 (pl-open-run-close pl-release-port *port*)) +(test #f "released" (pl-open-run-close + (lambda (db) + (sqlite3:first-result db "select state from ports where port=?" *port*)))) + +(test-end "portlogger") ADDED ulex/run-parallel.sh Index: ulex/run-parallel.sh ================================================================== --- /dev/null +++ ulex/run-parallel.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +CMD=$1 + +make example + +for x in $(seq 1 10);do + ./example $CMD 2>&1| tee run$x.log & +done + +wait + ADDED ulex/run-rpc.sh Index: ulex/run-rpc.sh ================================================================== --- /dev/null +++ ulex/run-rpc.sh @@ -0,0 +1,11 @@ +#!/bin/bash +csc test-rpc.scm +(./test-rpc 45000 45001 45002 45003 45004 45005|& tee test-rpc-45000.log) & +(./test-rpc 45001 45000 45002 45003 45004 45005|& tee test-rpc-45001.log) & +(./test-rpc 45002 45000 45001 45003 45004 45005|& tee test-rpc-45002.log) & +(./test-rpc 45003 45000 45001 45002 45004 45005|& tee test-rpc-45003.log) & +(./test-rpc 45004 45000 45001 45002 45003 45005|& tee test-rpc-45004.log) & +(./test-rpc 45005 45000 45001 45002 45003 45004|& tee test-rpc-45005.log) & + +wait + ADDED ulex/telemetry/telemetry-test-client.scm Index: ulex/telemetry/telemetry-test-client.scm ================================================================== --- /dev/null +++ ulex/telemetry/telemetry-test-client.scm @@ -0,0 +1,12 @@ + +(load "telemetry.scm") + +(import telemetry) + +(print 1) +(telemetry-open "localhost" 12346) +(print 2) +(telemetry-send "goo") +(print 3) +(telemetry-send "goo2") +(print 4) ADDED ulex/telemetry/telemetry-test-server.scm Index: ulex/telemetry/telemetry-test-server.scm ================================================================== --- /dev/null +++ ulex/telemetry/telemetry-test-server.scm @@ -0,0 +1,18 @@ + +(load "telemetry.scm") +(import telemetry) +(print "before") +(use mailbox) +(use mailbox-threads) +(use srfi-18) + +(set! handler-seq 0) +(define (handler msg) + (set! handler-seq (add1 handler-seq)) + (print "=============") + (print handler-seq msg)) + +(telemetry-server 12346 handler) + + +(print "after") ADDED ulex/telemetry/telemetry.meta Index: ulex/telemetry/telemetry.meta ================================================================== --- /dev/null +++ ulex/telemetry/telemetry.meta @@ -0,0 +1,21 @@ +;; -*- scheme -*- +( +; Your egg's license: +(license "BSD") + +; Pick one from the list of categories (see below) for your egg and enter it +; here. +(category db) + +; A list of eggs dbi depends on. If none, you can omit this declaration +; altogether. If you are making an egg for chicken 3 and you need to use +; procedures from the `files' unit, be sure to include the `files' egg in the +; `needs' section (chicken versions < 3.4.0 don't provide the `files' unit). +; `depends' is an alias to `needs'. +(needs udp mailbox-threads z3 base64 ) + +; A list of eggs required for TESTING ONLY. See the `Tests' section. +(test-depends test) + +(author "Brandon Barclay") +(synopsis "A telemetry send/receive system using udp.")) ADDED ulex/telemetry/telemetry.release-info Index: ulex/telemetry/telemetry.release-info ================================================================== --- /dev/null +++ ulex/telemetry/telemetry.release-info @@ -0,0 +1,3 @@ +(repo fossil "http://www.kiatoa.com/cgi-bin/fossils/{egg-name}") +(uri zip "http://www.kiatoa.com/cgi-bin/fossils/{egg-name}/zip/{egg-name}.zip?uuid={egg-release}") +(release "0.1") ADDED ulex/telemetry/telemetry.scm Index: ulex/telemetry/telemetry.scm ================================================================== --- /dev/null +++ ulex/telemetry/telemetry.scm @@ -0,0 +1,124 @@ + +(module telemetry + (telemetry-open telemetry-send telemetry-close telemetry-server + telemetry-show-debugs telemetry-hide-debugs ) + + (import chicken scheme data-structures + base64 srfi-18 + z3 udp posix extras ports mailbox mailbox-threads) + + (use udp) + (use base64) + (use z3) + (use mailbox-threads) + + (define *telemetry:telemetry-log-state* 'startup) + (define *telemetry:telemetry-log-socket* #f) + + (define *debug-print-flag* #f) + + (define (telemetry-show-debugs) + (set! *debug-print-flag* #t)) + + (define (telemetry-hide-debugs) + (set! *debug-print-flag* #f)) + + (define (debug-print . args) + (if *debug-print-flag* + (apply print "telemetry> " args))) + + (define (make-telemetry-server-thread port callback) + (let* ((thr + (make-thread + (lambda () + (let* ((s (udp-open-socket))) + (udp-bind! s #f port) + ;;(udp-connect! s "localhost" port) + (let loop ((seq 0)) + (debug-print "loop seq="seq) + (receive (n data from-host from-port) (udp-recvfrom s 640000) + (let* ((encapsulated-payload + (with-input-from-string + (z3:decode-buffer + (base64-decode data)) read)) + (callback-res `( (from-host . ,from-host) + (from-port . ,from-port) + (data-len . ,n) + ,@encapsulated-payload ))) + (callback callback-res)) + + ) + (loop (add1 seq))) + (udp-close-socket s)))))) + (thread-start! thr) + thr)) + + (define (telemetry-server port handler-callback) + (let* ((serv-thread (make-telemetry-server-thread port handler-callback))) + (print serv-thread) + (thread-join! serv-thread))) + + + (define (telemetry-open serverhost serverport) + (let* ((user (or (get-environment-variable "USER") "unknown")) + (host (or (get-environment-variable "HOST") "unknown"))) + (set! *telemetry:telemetry-log-state* + (handle-exceptions + exn + (begin + (debug-print "telemetry-open udp port failure") + 'broken) + (if (and serverhost serverport user host) + (let* ((s (udp-open-socket))) + ;;(udp-bind! s #f 0) + (udp-connect! s serverhost serverport) + (set! *telemetry:telemetry-log-socket* s) + 'open) + 'not-needed))))) + + + (define (telemetry-close) + (when (or (member *telemetry:telemetry-log-state* '(broken-or-no-server-preclose open)) *telemetry:telemetry-log-socket*) + (handle-exceptions + exn + (begin + (define *telemetry:telemetry-log-state* 'closed-fail) + (debug-print "telemetry-telemetry-log closure failure") + ) + (begin + (define *telemetry:telemetry-log-state* 'closed) + (udp-close-socket *telemetry:telemetry-log-socket*) + (set! *telemetry:telemetry-log-socket* #f))))) + + (define (telemetry-send payload) + (if (eq? 'open *telemetry:telemetry-log-state*) + (handle-exceptions + exn + (begin + (debug-print "telemetry-telemetry-log comms failure ; disabled (no server?)") + (define *telemetry:telemetry-log-state* 'broken-or-no-server-preclose) + (telemetry-close) + (define *telemetry:telemetry-log-state* 'broken-or-no-server) + (set! *telemetry:telemetry-log-socket* #f) + ) + (if (and *telemetry:telemetry-log-socket* payload) + (let* ((user (or (get-environment-variable "USER") "unknown")) + (host (or (get-environment-variable "HOST") "unknown")) + (encapsulated-payload + `( (user . ,user) + (host . ,host) + (pid . ,(current-process-id)) + (payload . ,payload) ) ) + (msg + (base64-encode + (z3:encode-buffer + (with-output-to-string (lambda () (pp encapsulated-payload))))))) + ;;(debug-print "pre-send") + (let ((res (udp-send *telemetry:telemetry-log-socket* msg))) + ;;(debug-print "post-send >"res"<") + res) + + )))) ) + + + ) ADDED ulex/telemetry/telemetry.setup Index: ulex/telemetry/telemetry.setup ================================================================== --- /dev/null +++ ulex/telemetry/telemetry.setup @@ -0,0 +1,11 @@ +;; Copyright 2007-2018, Matthew Welland. +;; +;; This program is made available under the GNU GPL version 2.0 or +;; greater. See the accompanying file COPYING for details. +;; +;; This program is distributed WITHOUT ANY WARRANTY; without even the +;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +;; PURPOSE. + +;;;; ulex.setup +(standard-extension 'telemetry "0.1") ADDED ulex/test-rpc.scm Index: ulex/test-rpc.scm ================================================================== --- /dev/null +++ ulex/test-rpc.scm @@ -0,0 +1,75 @@ +(use rpc posix srfi-1 pkts) + +(define *usage* "Usage: test-rpc") ;; myportnum [port-nums...]") + +#;(define *portnums* (let ((args (command-line-arguments))) + (if (null? args) + (begin + (print *usage*) + (exit)) + (map string->number args)))) + +#;(if (not (null? (filter (lambda (x)(not x)) *portnums*))) + (begin + (print "ERROR: portnumbers must all be integers, you gave " *portnums*) + (exit))) + + +(define (find-free-port-and-open port) + (handle-exceptions + exn + (begin + (print "Failed to bind to port " (rpc:default-server-port) ", trying next port") + (find-free-port-and-open (+ port 1))) + (rpc:default-server-port port) + (tcp-read-timeout 120000) + (tcp-listen (rpc:default-server-port) ) + port)) + +#;(define *myportnum* (car *portnums*)) +#;(define *clients* (cdr *portnums*)) + +#;(print "Setting up server on " *myportnum*) +;; (rpc:default-server-port *myportnum*) +(define *myportnum* (find-free-port-and-open 20000)) ;; *myportnum*)) + + + +;;;; server.scm +(rpc:publish-procedure! + 'foo + (lambda (x) + (print "foo: " x) + (conc "response from " *myportnum*))) + +(define *queue* (make-queue)) + +(rpc:publish-procedure! + 'queue-add + (lambda (dbfname qrystr raddr rport . params) + (queue-add! *queue* (list dbfname qrystr raddr rport params)))) + +;;;; client.scm +(define (call n) + (let ((port (list-ref *clients* (random (length *clients*))))) + (print "calling to " port) + ((rpc:procedure 'foo "localhost" port) n))) + +(rpc:publish-procedure! + 'fini + (lambda () (print "fini") (thread-start! (lambda () (thread-sleep! 3) (print "terminate") (exit))) #f)) + +(let* ((server-th (make-thread (lambda () + ((rpc:make-server (tcp-listen *myportnum*)) #t)) ;; (rpc:default-server-port))) #t)) + "server thread")) + (timer-th (make-thread (lambda () + (thread-sleep! 30) + (print "Node " *myportnum* ", 30 seconds up. Exiting.") + (exit))))) + (thread-start! server-th) + (thread-start! timer-th) + (thread-sleep! 1) + (do ((i 10000 (sub1 i))) + ((zero? i)) + (print "-> " (call (random 100))))) + ADDED ulex/test-script.scm Index: ulex/test-script.scm ================================================================== --- /dev/null +++ ulex/test-script.scm @@ -0,0 +1,12 @@ +(import trace (prefix ulex ulex:)) +(trace-call-sites #t) + +;; (trace ulex:receive-message ulex:std-peer-handler ulex:process-db-queries ulex:work-queue-add ulex:call send-message ulex:get-best-server ulex:ping) +(set! *default-error-port* (current-output-port)) + +(ulex:call *area* "test.db" 'savemsg '("my message" "matt")) + +(define *servers* (ulex:get-all-server-pkts *area*)) + +(define numofrecords (ulex:call *area* "test.db" 'getnum '())) +;; (define bunchofrecords (ulex:call *area* "test.db" 'getsome '())) ADDED ulex/tests/faux-mt-callspec.scm Index: ulex/tests/faux-mt-callspec.scm ================================================================== --- /dev/null +++ ulex/tests/faux-mt-callspec.scm @@ -0,0 +1,54 @@ +(use test (prefix sqlite3 sqlite3:) posix) +(if (file-exists? "ulex.scm") + (load "ulex.scm") + (load "../ulex.scm")) + +(use trace) +(trace-call-sites #t) + +(import ulex ) ;; (import (prefix ulex ulex:)) + + +(test-begin "faux-mtdb") +;; pre-clean + +(for-each (lambda (dir) + (if (directory-exists? dir) + (system (conc "/bin/rm -rf ./"dir))) + (system (conc "/bin/mkdir -p ./"dir)) + ) + '("faux-mtdb" "faux-mtdb-pkts")) + + +(let* ((area (make-area dbdir: "faux-mtdb" pktsdir: "faux-mtdb-pkts")) + (specfile "tests/mt-spec.sexpr") + (dbname "faux-mt.db")) + (launch area) + (initialize-area-calls-from-specfile area specfile) + + + (let* ((target-name "a/b/c/d") + (insert-result (call area dbname 'new-target (list target-name))) + (test-target-id (caar (call area dbname 'target-name->target-id (list target-name)))) + (test-target-name (caar (call area dbname 'target-id->target-name (list 1))))) + (test #f #t insert-result) + (test #f 1 test-target-id ) + (test #f target-name test-target-name ) + ) + + (test #f #t (shutdown area))) + +;; thought experiment - read cursors +;; (let* ((cursor (call area dbname 'get-target-names '()))) +;; (let loop ((row (cursor))) +;; (cond +;; ((not row) #t) +;; (else +;; (print "ROW IS "row) +;; (loop (cursor)))))) + + +(test-end "faux-mtdb") + + + ADDED ulex/tests/mt-spec.sexpr Index: ulex/tests/mt-spec.sexpr ================================================================== --- /dev/null +++ ulex/tests/mt-spec.sexpr @@ -0,0 +1,28 @@ +( + (dbwrite . + ( + (dbinitsql . ( + "create table if not exists targets(id integer primary key,name)" + "create table if not exists runs(id integer primary key,target_id,name,path,state,status)" + "create table if not exists tests(id integer primary key,run_id,name,path,state,status,host)" + "create table if not exists test_steps(id integer primary key,test_id,name,state)" )) + + ( new-target . "insert into targets (name) values(?);") + ( new-run . "insert into runs (target_id,name,path,state,status) values(?,?,\"/dev/null\",\"NOT STARTED\",\"n/a\")") + ( new-test . "insert into tests values(?,?,?,\"/dev/null\",\"NOT STARTED\")") + ( update-one-run_id-state-status . "update runs set state=? status=? where id=?" ) + ( update-one-test_id-state-status . "update tests set state=? status=? where id=?" ) + ( update-matching-tests-state-status . "update tests set state=? status=? where run_id=?, state like ?, status like ?") + ) + ) + (dbread . + ( + (get-targets . "select id,name from targets") + (target-name->target-id . "select id from targets where name=?") + (target-id->target-name . "select name from targets where id=?") + (check-test-state-status . "select state,status from tests where id=?") + ) + ) + + ) + ADDED ulex/tests/run.scm Index: ulex/tests/run.scm ================================================================== --- /dev/null +++ ulex/tests/run.scm @@ -0,0 +1,182 @@ +(use test (prefix sqlite3 sqlite3:) posix ulex-netutil rpc pkts mailbox) + +;; (use (prefix ulex ulex:)) +(if (file-exists? "ulex.scm") + (load "ulex.scm") + (load "../ulex.scm")) + +(use trace) +(trace-call-sites #t) + +(import ulex) ;; (import (prefix ulex ulex:)) + +(trace + ;; send-message + ;; receive-message + ;; std-peer-handler + ;; work-queue-add + ;; deliver-response + ;; finalize-all-db-handles + ;; area-dbhandles + ;; save-dbh + ;; process-db-queries + + ;; dbdat-dbh + ;; get-best-server + ;; calc-server-score + ;; ping + ;; full-ping + ;; register-node + ;; calc-server-score + ;; update-known-servers + ;; request + ;; get-dbh + ;; update-stats + ;; process-db-queries + ;; deliver-response + ) + +(test-begin "misc") +(test #f #t (string? (get-my-best-address))) +(test-end "misc") + +;;====================================================================== +;; Setup +;;====================================================================== + +(system "rm -rf testulexdb testpkts") +(create-directory "testulexdb" #t) +(create-directory "testpkts" #t) + +(define *area* (make-area dbdir: "testulexdb" pktsdir: "testpkts")) + +(define *port* #f) + +;;====================================================================== +;; Ulex-db +;;====================================================================== + +(test-begin "ulex-db") +(test #f #t (equal? (area-dbdir *area*) "testulexdb")) +(test #f #t (thread? (thread-start! (make-thread (lambda ()(launch *area*)) "server")))) +(thread-sleep! 1) +(test #f 1 (update-known-servers *area*)) +(test #f #t (list? (get-all-server-pkts *area*))) +(test #f (area-myaddr *area*) (cadr (ping *area* (area-myaddr *area*)(area-port *area*)))) + +(let loop ((count 10)) + (if (null? (get-all-server-pkts *area*)) + (if (> count 0) + (begin + (thread-sleep! 1) + (print "waiting for server pkts") + (loop (- count 1)))))) +(test #f #t (let ((spkts (get-all-server-pkts *area*))) + (and (list spkts) (> (length spkts) 0)))) +(test #f #t (begin (register-batch + *area* + 'dbwrite ;; this is the call type + `((dbinitsql . ("CREATE TABLE IF NOT EXISTS messages (id INTEGER PRIMARY KEY, message TEXT, author TEXT, msg_time INTEGER);")) + (savemsg . "INSERT INTO messages (message,author) VALUES (?,?)") + ;; (readmsg . "SELECT * FROM messages WHERE author=?;") + + )) + #t)) + +(test #f #t (calldat? (get-rentry *area* 'dbinitsql))) +(define cdat1 (get-rentry *area* 'dbinitsql)) +(test #f #t (list? (get-best-server *area* "test.db" 'savemsg))) +(test #f #t (eq? 'dbwrite (calldat-ctype cdat1))) +(test #f #t (list? (get-rsql *area* 'dbinitsql))) +(test #f #t (dbdat? (open-db *area* "test.db"))) + +(test #f #t (dbdat? (let ((dbh (get-dbh *area* "test.db"))) + (save-dbh *area* "test.db" dbh) + dbh))) +(test #f #t (dbdat? (let ((dbh (get-dbh *area* "test.db"))) + dbh))) + +;(test #f '(#t "db write submitted" #t) (call *area* "test.db" 'savemsg '("Test message!" "matt"))) +(test #f #t (call *area* "test.db" 'savemsg '("Test message!" "matt"))) +;;(thread-sleep! 15);; server needs time to process the request (it is non-blocking) +;; (test #f #t (shutdown *area*)) +;; (test #f 0 (calc-server-score *area* "test.db" (area-pktid *area*))) + +(test #f #t (list? (get-best-server *area* "test.db" (area-pktid *area*)))) +(define *best-server* (car (get-best-server *area* "test.db" (area-pktid *area*)))) +(pp *best-server*) +(define *server-pkt* (hash-table-ref/default (area-hosts *area*) (area-pktid *area*) #f)) +(define *server-ip* (alist-ref 'ipaddr *server-pkt*)) +(define *server-port* (any->number (alist-ref 'port *server-pkt*))) +(test #f #t (list? (ping *area* *server-ip* *server-port*))) + +(test #f #t (process-db-queries *area* "test.db")) +(test #f #f (process-db-queries *area* "junk.db")) +;; (test #f #t (cadr (full-ping *area* *server-pkt*))) + + +(test-end "ulex-db") + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(test-begin "faux-mtdb") +;; pre-clean + +#;(for-each (lambda (dir) + (if (directory-exists? dir) + (system (conc "/bin/rm -rf ./"dir))) + (system (conc "/bin/mkdir -p ./"dir)) + ) + '("faux-mtdb" "faux-mtdb-pkts")) + + +(let* ((area *area*) ;; (make-area dbdir: "faux-mtdb" pktsdir: "faux-mtdb-pkts")) + (specfile "tests/mt-spec.sexpr") + (dbname "faux-mt.db")) + ;; (launch area) + (initialize-area-calls-from-specfile area specfile) + (let* ((target-name "a/b/c/d") + (insert-result (call area dbname 'new-target (list target-name))) + (test-target-id (caar (call area dbname 'target-name->target-id (list target-name)))) + (test-target-name (caar (call area dbname 'target-id->target-name (list 1))))) + (test #f #t insert-result) + (test #f 1 test-target-id ) + (test #f target-name test-target-name ) + ) + (test #f #t (list? (get-best-server *area* "test.db" 'savemsg))) + (thread-sleep! 5) + (test #f #t (begin (shutdown area) #t))) + +(test #f #t (process-db-queries *area* "test.db")) +(test #f #f (process-db-queries *area* "junk.db")) + +;; thought experiment - read cursors +;; (let* ((cursor (call area dbname 'get-target-names '()))) +;; (let loop ((row (cursor))) +;; (cond +;; ((not row) #t) + ;; (else +;; (print "ROW IS "row) +;; (loop (cursor)))))) + + +(test-end "faux-mtdb") + +;;====================================================================== +;; Portlogger tests +;;====================================================================== + +;; (test-begin "portlogger") +;; +;; (test #f #f (begin (pl-open-run-close (lambda (db b)(pl-get-prev-used-port db)) *area*) #f)) +;; (test #f #f (pl-open-run-close (lambda (db b)(pl-get-port-state db 1234567)) *area*)) +;; (test #f #f (number? (pl-open-run-close (lambda (db b)(pl-take-port db 123456)) *area*))) +;; (test #f #t (number? (let ((port (pl-open-run-close pl-find-port *area*))) +;; (set! *port* port) +;; port))) +;; (test #f 1 (pl-open-run-close pl-release-port *port*)) +;; (test #f "released" (pl-open-run-close +;; (lambda (db) +;; (sqlite3:first-result db "select state from ports where port=?" *port*)))) +;; +;; (test-end "portlogger") ADDED ulex/ulex.meta Index: ulex/ulex.meta ================================================================== --- /dev/null +++ ulex/ulex.meta @@ -0,0 +1,21 @@ +;; -*- scheme -*- +( +; Your egg's license: +(license "BSD") + +; Pick one from the list of categories (see below) for your egg and enter it +; here. +(category db) + +; A list of eggs dbi depends on. If none, you can omit this declaration +; altogether. If you are making an egg for chicken 3 and you need to use +; procedures from the `files' unit, be sure to include the `files' egg in the +; `needs' section (chicken versions < 3.4.0 don't provide the `files' unit). +; `depends' is an alias to `needs'. +(needs rpc pkts mailbox sqlite3) + +; A list of eggs required for TESTING ONLY. See the `Tests' section. +(test-depends test) + +(author "Matt Welland") +(synopsis "A distributed mesh-like layer for sqlite3.")) ADDED ulex/ulex.release-info Index: ulex/ulex.release-info ================================================================== --- /dev/null +++ ulex/ulex.release-info @@ -0,0 +1,3 @@ +(repo fossil "http://www.kiatoa.com/cgi-bin/fossils/{egg-name}") +(uri zip "http://www.kiatoa.com/cgi-bin/fossils/{egg-name}/zip/{egg-name}.zip?uuid={egg-release}") +(release "0.1") ADDED ulex/ulex.scm Index: ulex/ulex.scm ================================================================== --- /dev/null +++ ulex/ulex.scm @@ -0,0 +1,1452 @@ +;;; ulex: Distributed sqlite3 db +;;; +;; Copyright (C) 2018 Matt Welland +;; Redistribution and use in source and binary forms, with or without +;; modification, is permitted. +;; +;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS +;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +;; ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE +;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT +;; OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR +;; BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +;; LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +;; USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH +;; DAMAGE. + +;;====================================================================== +;; ABOUT: +;; See README in the distribution at https://www.kiatoa.com/fossils/ulex +;; NOTES: +;; Why sql-de-lite and not say, dbi? - performance mostly, then simplicity. +;; +;;====================================================================== + +;; (use rpc pkts mailbox sqlite3) + +(module ulex + * + ;; #;( + ;; ;; areas + ;; make-area + ;; area->alist + ;; ;; server + ;; launch + ;; update-known-servers + ;; shutdown + ;; get-best-server + ;; ;; client side + ;; call + ;; ;; queries, procs and system commands (i.e. workers) + ;; register + ;; register-batch + ;; ;; database - note: most database stuff need not be exposed, these calls may be removed from exports in future + ;; open-db + ;; ;; ports + ;; pl-find-port + ;; pl-get-prev-used-port + ;; pl-open-db + ;; pl-open-run-close + ;; pl-release-port + ;; pl-set-port + ;; pl-take-port + ;; pl-is-port-available + ;; pl-get-port-state + ;; ;; system + ;; get-normalized-cpu-load + + ;; ) + +(import scheme posix chicken data-structures ports extras files mailbox) +(import rpc srfi-18 pkts matchable regex + typed-records srfi-69 srfi-1 + srfi-4 regex-case + (prefix sqlite3 sqlite3:) + foreign + tcp) ;; ulex-netutil) + +;;====================================================================== +;; D E B U G H E L P E R S +;;====================================================================== + +(define (dbg> . args) + (with-output-to-port (current-error-port) + (lambda () + (apply print "dbg> " args)))) + +(define (debug-pp . args) + (if (get-environment-variable "ULEX_DEBUG") + (with-output-to-port (current-error-port) + (lambda () + (apply pp args))))) + +(define *default-debug-port* (current-error-port)) + +(define (sdbg> fn stage-name stage-start stage-end start-time . message) + (if (get-environment-variable "ULEX_DEBUG") + (with-output-to-port *default-debug-port* + (lambda () + (apply print "ulex:" fn " " stage-name " took " (- (if stage-end stage-end (current-milliseconds)) stage-start) " ms. " + (if start-time + (conc "total time " (- (current-milliseconds) start-time) + " ms.") + "") + message + ))))) + +;;====================================================================== +;; M A C R O S +;;====================================================================== +;; iup callbacks are not dumping the stack, this is a work-around +;; + +;; Some of these routines use: +;; +;; http://www.cs.toronto.edu/~gfb/scheme/simple-macros.html +;; +;; Syntax for defining macros in a simple style similar to function definiton, +;; when there is a single pattern for the argument list and there are no keywords. +;; +;; (define-simple-syntax (name arg ...) body ...) +;; + +(define-syntax define-simple-syntax + (syntax-rules () + ((_ (name arg ...) body ...) + (define-syntax name (syntax-rules () ((name arg ...) (begin body ...))))))) + +(define-simple-syntax (catch-and-dump proc procname) + (handle-exceptions + exn + (begin + (print-call-chain (current-error-port)) + (with-output-to-port (current-error-port) + (lambda () + (print ((condition-property-accessor 'exn 'message) exn)) + (print "Callback error in " procname) + (print "Full condition info:\n" (condition->list exn))))) + (proc))) + + +;;====================================================================== +;; R E C O R D S +;;====================================================================== + +;; information about me as a server +;; +(defstruct area + ;; about this area + (useportlogger #f) + (lowport 32768) + (server-type 'auto) ;; auto=create up to five servers/pkts, main=create pkts, passive=no pkt (unless there are no pkts at all) + (conn #f) + (port #f) + (myaddr (get-my-best-address)) + pktid ;; get pkt from hosts table if needed + pktfile + pktsdir + dbdir + (dbhandles (make-hash-table)) ;; fname => list-of-dbh, NOTE: Should really never need more than one? + (mutex (make-mutex)) + (rtable (make-hash-table)) ;; registration table of available actions + (dbs (make-hash-table)) ;; filename => random number, used for choosing what dbs I serve + ;; about other servers + (hosts (make-hash-table)) ;; key => hostdat + (hoststats (make-hash-table)) ;; key => alist of fname => ( qcount . qtime ) + (reqs (make-hash-table)) ;; uri => queue + ;; work queues + (wqueues (make-hash-table)) ;; fname => qdat + (stats (make-hash-table)) ;; fname => totalqueries + (last-srvup (current-seconds)) ;; last time we updated the known servers + (cookie2mbox (make-hash-table)) ;; map cookie for outstanding request to mailbox of awaiting call + (ready #f) + (health (make-hash-table)) ;; ipaddr:port => num failed pings since last good ping + ) + +;; host stats +;; +(defstruct hostdat + (pkt #f) + (dbload (make-hash-table)) ;; "dbfile.db" => queries/min + (hostload #f) ;; normalized load ( 5min load / numcpus ) + ) + +;; dbdat +;; +(defstruct dbdat + (dbh #f) + (fname #f) + (write-access #f) + (sths (make-hash-table)) ;; hash mapping query strings to handles + ) + +;; qdat +;; +(defstruct qdat + (writeq (make-queue)) + (readq (make-queue)) + (rwq (make-queue)) + (logq (make-queue)) ;; do we need a queue for logging? yes, if we use sqlite3 db for logging + (osshort (make-queue)) + (oslong (make-queue)) + (misc (make-queue)) ;; used for things like ping-full + ) + +;; calldat +;; +(defstruct calldat + (ctype 'dbwrite) + (obj #f) ;; this would normally be an SQL statement e.g. SELECT, INSERT etc. + (rtime (current-milliseconds))) + +;; make it a global? Well, it is local to area module + +(define *pktspec* + `((server (hostname . h) + (port . p) + (pid . i) + (ipaddr . a) + ) + (data (hostname . h) ;; sender hostname + (port . p) ;; sender port + (ipaddr . a) ;; sender ip + (hostkey . k) ;; sending host key - store info at server under this key + (servkey . s) ;; server key - this needs to match at server end or reject the msg + (format . f) ;; sb=serialized-base64, t=text, sx=sexpr, j=json + (data . d) ;; base64 encoded slln data + ))) + +;; work item +;; +(defstruct witem + (rhost #f) ;; return host + (ripaddr #f) ;; return ipaddr + (rport #f) ;; return port + (servkey #f) ;; the packet representing the client of this workitem, used by final send-message + (rdat #f) ;; the request - usually an sql query, type is rdat + (action #f) ;; the action: immediate, dbwrite, dbread,oslong, osshort + (cookie #f) ;; cookie id for response + (data #f) ;; the data payload, i.e. parameters + (result #f) ;; the result from processing the data + (caller #f)) ;; the calling peer according to rpc itself + +(define (trim-pktid pktid) + (if (string? pktid) + (substring pktid 0 4) + "nopkt")) + +(define (any->number num) + (cond + ((number? num) num) + ((string? num) (string->number num)) + (else num))) + +(use trace) +(trace-call-sites #t) + +;;====================================================================== +;; D A T A B A S E H A N D L I N G +;;====================================================================== + +;; look in dbhandles for a db, return it, else return #f +;; +(define (get-dbh acfg fname) + (let ((dbh-lst (hash-table-ref/default (area-dbhandles acfg) fname '()))) + (if (null? dbh-lst) + (begin + ;; (print "opening db for " fname) + (open-db acfg fname)) ;; Note that the handles get put back in the queue in the save-dbh calls + (let ((rem-lst (cdr dbh-lst))) + ;; (print "re-using saved connection for " fname) + (hash-table-set! (area-dbhandles acfg) fname rem-lst) + (car dbh-lst))))) + +(define (save-dbh acfg fname dbdat) + ;; (print "saving dbh for " fname) + (hash-table-set! (area-dbhandles acfg) fname (cons dbdat (hash-table-ref/default (area-dbhandles acfg) fname '())))) + +;; open the database, if never before opened init it. put the handle in the +;; open db's hash table +;; returns: the dbdat +;; +(define (open-db acfg fname) + (let* ((fullname (conc (area-dbdir acfg) "/" fname)) + (exists (file-exists? fullname)) + (write-access (if exists + (file-write-access? fullname) + (file-write-access? (area-dbdir acfg)))) + (db (sqlite3:open-database fullname)) + (handler (sqlite3:make-busy-timeout 136000)) + ) + (sqlite3:set-busy-handler! db handler) + (sqlite3:execute db "PRAGMA synchronous = 0;") + (if (not exists) ;; need to init the db + (if write-access + (let ((isql (get-rsql acfg 'dbinitsql))) ;; get the init sql statements + ;; (sqlite3:with-transaction + ;; db + ;; (lambda () + (if isql + (for-each + (lambda (sql) + (sqlite3:execute db sql)) + isql))) + (print "ERROR: no write access to " (area-dbdir acfg)))) + (make-dbdat dbh: db fname: fname write-access: write-access))) + +;; This is a low-level command to retrieve or to prepare, save and return a prepared statment +;; you must extract the db handle +;; +(define (get-sth db cache stmt) + (if (hash-table-exists? cache stmt) + (begin + ;; (print "Reusing cached stmt for " stmt) + (hash-table-ref/default cache stmt #f)) + (let ((sth (sqlite3:prepare db stmt))) + (hash-table-set! cache stmt sth) + ;; (print "prepared stmt for " stmt) + sth))) + +;; a little more expensive but does all the tedious deferencing - only use if you don't already +;; have dbdat and db sitting around +;; +(define (full-get-sth acfg fname stmt) + (let* ((dbdat (get-dbh acfg fname)) + (db (dbdat-dbh dbdat)) + (sths (dbdat-sths dbdat))) + (get-sth db sths stmt))) + +;; write to a db +;; acfg: area data +;; rdat: request data +;; hdat: (host . port) +;; +;; (define (dbwrite acfg rdat hdat data-in) +;; (let* ((dbname (car data-in)) +;; (dbdat (get-dbh acfg dbname)) +;; (db (dbdat-dbh dbdat)) +;; (sths (dbdat-sths dbdat)) +;; (stmt (calldat-obj rdat)) +;; (sth (get-sth db sths stmt)) +;; (data (cdr data-in))) +;; (print "dbname: " dbname " acfg: " acfg " rdat: " (calldat->alist rdat) " hdat: " hdat " data: " data) +;; (print "dbdat: " (dbdat->alist dbdat)) +;; (apply sqlite3:execute sth data) +;; (save-dbh acfg dbname dbdat) +;; #t +;; )) + +(define (finalize-all-db-handles acfg) + (let* ((dbhandles (area-dbhandles acfg)) ;; dbhandles is hash of fname ==> dbdat + (num 0)) + (for-each + (lambda (area-name) + (print "Closing handles for " area-name) + (let ((dbdats (hash-table-ref/default dbhandles area-name '()))) + (for-each + (lambda (dbdat) + ;; first close all statement handles + (for-each + (lambda (sth) + (sqlite3:finalize! sth) + (set! num (+ num 1))) + (hash-table-values (dbdat-sths dbdat))) + ;; now close the dbh + (set! num (+ num 1)) + (sqlite3:finalize! (dbdat-dbh dbdat))) + dbdats))) + (hash-table-keys dbhandles)) + (print "FINALIZED " num " dbhandles"))) + +;;====================================================================== +;; W O R K Q U E U E H A N D L I N G +;;====================================================================== + +(define (register-db-as-mine acfg dbname) + (let ((ht (area-dbs acfg))) + (if (not (hash-table-ref/default ht dbname #f)) + (hash-table-set! ht dbname (random 10000))))) + +(define (work-queue-add acfg fname witem) + (let* ((work-queue-start (current-milliseconds)) + (action (witem-action witem)) ;; NB the action is the index into the rdat actions + (qdat (or (hash-table-ref/default (area-wqueues acfg) fname #f) + (let ((newqdat (make-qdat))) + (hash-table-set! (area-wqueues acfg) fname newqdat) + newqdat))) + (rdat (hash-table-ref/default (area-rtable acfg) action #f))) + (if rdat + (queue-add! + (case (calldat-ctype rdat) + ((dbwrite) (register-db-as-mine acfg fname)(qdat-writeq qdat)) + ((dbread) (register-db-as-mine acfg fname)(qdat-readq qdat)) + ((dbrw) (register-db-as-mine acfg fname)(qdat-rwq qdat)) + ((oslong) (qdat-oslong qdat)) + ((osshort) (qdat-osshort qdat)) + ((full-ping) (qdat-misc qdat)) + (else + (print "ERROR: no queue for " action ". Adding to dbwrite queue.") + (qdat-writeq qdat))) + witem) + (case action + ((full-ping)(qdat-misc qdat)) + (else + (print "ERROR: No action " action " was registered")))) + (sdbg> "work-queue-add" "queue-add" work-queue-start #f #f) + #t)) ;; for now, simply return #t to indicate request got to the queue + +(define (doqueue acfg q fname dbdat dbh) + ;; (print "doqueue: " fname) + (let* ((start-time (current-milliseconds)) + (qlen (queue-length q))) + (if (> qlen 1) + (print "Processing queue of length " qlen)) + (let loop ((count 0) + (responses '())) + (let ((delta (- (current-milliseconds) start-time))) + (if (or (queue-empty? q) + (> delta 400)) ;; stop working on this queue after 400ms have passed + (list count delta responses) ;; return count, delta and responses list + (let* ((witem (queue-remove! q)) + (action (witem-action witem)) + (rdat (witem-rdat witem)) + (stmt (calldat-obj rdat)) + (sth (full-get-sth acfg fname stmt)) + (ctype (calldat-ctype rdat)) + (data (witem-data witem)) + (cookie (witem-cookie witem))) + ;; do the processing and save the result in witem-result + (witem-result-set! + witem + (case ctype ;; action + ((noblockwrite) ;; blind write, no ack of success returned + (apply sqlite3:execute sth data) + (sqlite3:last-insert-rowid dbh)) + ((dbwrite) ;; blocking write + (apply sqlite3:execute sth data) + #t) + ((dbread) ;; TODO: consider breaking this up and shipping in pieces for large query + (apply sqlite3:map-row (lambda x x) sth data)) + ((full-ping) 'full-ping) + (else (print "Not ready for action " action) #f))) + (loop (add1 count) + (if cookie + (cons witem responses) + responses)))))))) + +;; do up to 400ms of processing on each queue +;; - the work-queue-processor will allow the max 1200ms of work to complete but it will flag as overloaded +;; +(define (process-db-queries acfg fname) + (if (hash-table-exists? (area-wqueues acfg) fname) + (let* ((process-db-queries-start-time (current-milliseconds)) + (qdat (hash-table-ref/default (area-wqueues acfg) fname #f)) + (queue-sym->queue (lambda (queue-sym) + (case queue-sym ;; lookup the queue from qdat given a name (symbol) + ((wqueue) (qdat-writeq qdat)) + ((rqueue) (qdat-readq qdat)) + ((rwqueue) (qdat-rwq qdat)) + ((misc) (qdat-misc qdat)) + (else #f)))) + (dbdat (get-dbh acfg fname)) + (dbh (if (dbdat? dbdat)(dbdat-dbh dbdat) #f)) + (nowtime (current-seconds))) + ;; handle the queues that require a transaction + ;; + (map ;; + (lambda (queue-sym) + ;; (print "processing queue " queue-sym) + (let* ((queue (queue-sym->queue queue-sym))) + (if (not (queue-empty? queue)) + (let ((responses + (sqlite3:with-transaction ;; todo - catch exceptions... + dbh + (lambda () + (let* ((res (doqueue acfg queue fname dbdat dbh))) ;; this does the work! + ;; (print "res=" res) + (match res + ((count delta responses) + (update-stats acfg fname queue-sym delta count) + (sdbg> "process-db-queries" "sqlite3-transaction" process-db-queries-start-time #f #f) + responses) ;; return responses + (else + (print "ERROR: bad return data from doqueue " res))) + ))))) + ;; having completed the transaction, send the responses. + ;; (print "INFO: sending " (length responses) " responses.") + (let loop ((responses-left responses)) + (cond + ((null? responses-left) #t) + (else + (let* ((witem (car responses-left)) + (response (cdr responses-left))) + (call-deliver-response acfg (witem-ripaddr witem)(witem-rport witem) + (witem-cookie witem)(witem-result witem))) + (loop (cdr responses-left)))))) + ))) + '(wqueue rwqueue rqueue)) + + ;; handle misc queue + ;; + ;; (print "processing misc queue") + (let ((queue (queue-sym->queue 'misc))) + (doqueue acfg queue fname dbdat dbh)) + ;; .... + (save-dbh acfg fname dbdat) + #t ;; just to let the tests know we got here + ) + #f ;; nothing processed + )) + +;; run all queues in parallel per db but sequentially per queue for that db. +;; - process the queues every 500 or so ms +;; - allow for long running queries to continue but all other activities for that +;; db will be blocked. +;; +(define (work-queue-processor acfg) + (let* ((threads (make-hash-table))) ;; fname => thread + (let loop ((fnames (hash-table-keys (area-wqueues acfg))) + (target-time (+ (current-milliseconds) 50))) + ;;(if (not (null? fnames))(print "Processing for these databases: " fnames)) + (for-each + (lambda (fname) + ;; (print "processing for " fname) + ;;(process-db-queries acfg fname)) + (let ((th (hash-table-ref/default threads fname #f))) + (if (and th (not (member (thread-state th) '(dead terminated)))) + (begin + (print "WARNING: worker thread for " fname " is taking a long time.") + (print "Thread is in state " (thread-state th))) + (let ((th1 (make-thread (lambda () + ;; (catch-and-dump + ;; (lambda () + ;; (print "Process queries for " fname) + (let ((start-time (current-milliseconds))) + (process-db-queries acfg fname) + ;; (thread-sleep! 0.01) ;; need the thread to take at least some time + (hash-table-delete! threads fname)) ;; no mutexes? + fname) + "th1"))) ;; )) + (hash-table-set! threads fname th1) + (thread-start! th1))))) + fnames) + ;; (thread-sleep! 0.1) ;; give the threads some time to process requests + ;; burn time until 400ms is up + (let ((now-time (current-milliseconds))) + (if (< now-time target-time) + (let ((delta (- target-time now-time))) + (thread-sleep! (/ delta 1000))))) + (loop (hash-table-keys (area-wqueues acfg)) + (+ (current-milliseconds) 50))))) + +;;====================================================================== +;; S T A T S G A T H E R I N G +;;====================================================================== + +(defstruct stat + (qcount-avg 0) ;; coarse running average + (qtime-avg 0) ;; coarse running average + (qcount 0) ;; total + (qtime 0) ;; total + (last-qcount 0) ;; last + (last-qtime 0) ;; last + (dbs '()) ;; list of db files handled by this node + (when 0)) ;; when the last query happened - seconds + + +(define (update-stats acfg fname bucket duration numqueries) + (let* ((key fname) ;; for now do not use bucket. Was: (conc fname "-" bucket)) ;; lazy but good enough + (stats (or (hash-table-ref/default (area-stats acfg) key #f) + (let ((newstats (make-stat))) + (hash-table-set! (area-stats acfg) key newstats) + newstats)))) + ;; when the last query happended (used to remove the fname from the active list) + (stat-when-set! stats (current-seconds)) + ;; last values + (stat-last-qcount-set! stats numqueries) + (stat-last-qtime-set! stats duration) + ;; total over process lifetime + (stat-qcount-set! stats (+ (stat-qcount stats) numqueries)) + (stat-qtime-set! stats (+ (stat-qtime stats) duration)) + ;; coarse average + (stat-qcount-avg-set! stats (/ (+ (stat-qcount-avg stats) numqueries) 2)) + (stat-qtime-avg-set! stats (/ (+ (stat-qtime-avg stats) duration) 2)) + + ;; here is where we add the stats for a given dbfile + (if (not (member fname (stat-dbs stats))) + (stat-dbs-set! stats (cons fname (stat-dbs stats)))) + + )) + +;;====================================================================== +;; S E R V E R S T U F F +;;====================================================================== + +;; this does NOT return! +;; +(define (find-free-port-and-open acfg) + (let ((port (or (area-port acfg) 3200))) + (handle-exceptions + exn + (begin + (print "INFO: cannot bind to port " (rpc:default-server-port) ", trying next port") + (area-port-set! acfg (+ port 1)) + (find-free-port-and-open acfg)) + (rpc:default-server-port port) + (area-port-set! acfg port) + (tcp-read-timeout 120000) + ;; ((rpc:make-server (tcp-listen port)) #t) + (tcp-listen (rpc:default-server-port) + )))) + +;; register this node by putting a packet into the pkts dir. +;; look for other servers +;; contact other servers and compile list of servers +;; there are two types of server +;; main servers - dashboards, runners and dedicated servers - need pkt +;; passive servers - test executers, step calls, list-runs - no pkt +;; +(define (register-node acfg hostip port-num) + ;;(mutex-lock! (area-mutex acfg)) + (let* ((server-type (area-server-type acfg)) ;; auto, main, passive (no pkt created) + (best-ip (or hostip (get-my-best-address))) + (mtdir (area-dbdir acfg)) + (pktdir (area-pktsdir acfg))) ;; conc mtdir "/.server-pkts"))) + (print "Registering node " best-ip ":" port-num) + (if (not mtdir) ;; require a home for this node to put or find databases + #f + (begin + (if (not (directory? pktdir))(create-directory pktdir)) + ;; server is started, now create pkt if needed + (print "Starting server in " server-type " mode with port " port-num) + (if (member server-type '(auto main)) ;; TODO: if auto, count number of servers registers, if > 3 then don't put out a pkt + (begin + (area-pktid-set! acfg + (write-alist->pkt + pktdir + `((hostname . ,(get-host-name)) + (ipaddr . ,best-ip) + (port . ,port-num) + (pid . ,(current-process-id))) + pktspec: *pktspec* + ptype: 'server)) + (area-pktfile-set! acfg (conc pktdir "/" (area-pktid acfg) ".pkt")))) + (area-port-set! acfg port-num) + #;(mutex-unlock! (area-mutex acfg)))))) + +(define *cookie-seqnum* 0) +(define (make-cookie key) + (set! *cookie-seqnum* (add1 *cookie-seqnum*)) + ;;(print "MAKE COOKIE CALLED -- on "servkey"-"*cookie-seqnum*) + (conc key "-" *cookie-seqnum*) + ) + +;; dispatch locally if possible +;; +(define (call-deliver-response acfg ipaddr port cookie data) + (if (and (equal? (area-myaddr acfg) ipaddr) + (equal? (area-port acfg) port)) + (deliver-response acfg cookie data) + ((rpc:procedure 'response ipaddr port) cookie data))) + +(define (deliver-response acfg cookie data) + (let ((deliver-response-start (current-milliseconds))) + (thread-start! (make-thread + (lambda () + (let loop ((tries-left 5)) + ;;(print "TOP OF DELIVER_RESPONSE LOOP; triesleft="tries-left) + ;;(pp (hash-table->alist (area-cookie2mbox acfg))) + (let* ((mbox (hash-table-ref/default (area-cookie2mbox acfg) cookie #f))) + (cond + ((eq? 0 tries-left) + (print "ulex:deliver-response: I give up. Mailbox never appeared. cookie="cookie) + ) + (mbox + ;;(print "got mbox="mbox" got data="data" send.") + (mailbox-send! mbox data)) + (else + ;;(print "no mbox yet. look for "cookie) + (thread-sleep! (/ (- 6 tries-left) 10)) + (loop (sub1 tries-left)))))) + ;; (debug-pp (list (conc "ulex:deliver-response took " (- (current-milliseconds) deliver-response-start) " ms, cookie=" cookie " data=") data)) + (sdbg> "deliver-response" "mailbox-send" deliver-response-start #f #f cookie) + ) + (conc "deliver-response thread for cookie="cookie)))) + #t) + +;; action: +;; immediate - quick actions, no need to put in queues +;; dbwrite - put in dbwrite queue +;; dbread - put in dbread queue +;; oslong - os actions, e.g. du, that could take a long time +;; osshort - os actions that should be quick, e.g. df +;; +(define (request acfg from-ipaddr from-port servkey action cookie fname params) ;; std-peer-handler + ;; NOTE: Use rpc:current-peer for getting return address + (let* ((std-peer-handler-start (current-milliseconds)) + ;; (raw-data (alist-ref 'data dat)) + (rdat (hash-table-ref/default + (area-rtable acfg) action #f)) ;; this looks up the sql query or other details indexed by the action + (witem (make-witem ripaddr: from-ipaddr ;; rhost: from-host + rport: from-port action: action + rdat: rdat cookie: cookie + servkey: servkey data: params ;; TODO - rename data to params + caller: (rpc:current-peer)))) + (if (not (equal? servkey (area-pktid acfg))) + `(#f . ,(conc "I don't know you servkey=" servkey ", pktid=" (area-pktid acfg))) ;; immediately return this + (let* ((ctype (if rdat + (calldat-ctype rdat) ;; is this necessary? these should be identical + action))) + (sdbg> "std-peer-handler" "immediate" std-peer-handler-start #f #f) + (case ctype + ;; (dbwrite acfg rdat (cons from-ipaddr from-port) data))) + ((full-ping) `(#t "ack to full ping" ,(work-queue-add acfg fname witem) ,cookie)) + ((response) `(#t "ack from requestor" ,(deliver-response acfg fname params))) + ((dbwrite) `(#t "db write submitted" ,(work-queue-add acfg fname witem) ,cookie)) + ((dbread) `(#t "db read submitted" ,(work-queue-add acfg fname witem) ,cookie )) + ((dbrw) `(#t "db read/write submitted" ,cookie)) + ((osshort) `(#t "os short submitted" ,cookie)) + ((oslong) `(#t "os long submitted" ,cookie)) + (else `(#f "unrecognised action" ,ctype))))))) + +;; Call this to start the actual server +;; +;; start_server +;; +;; mode: ' +;; handler: proc which takes pktrecieved as argument +;; + +(define (start-server acfg) + (let* ((conn (find-free-port-and-open acfg)) + (port (area-port acfg))) + (rpc:publish-procedure! + 'delist-db + (lambda (fname) + (hash-table-delete! (area-dbs acfg) fname))) + (rpc:publish-procedure! + 'calling-addr + (lambda () + (rpc:current-peer))) + (rpc:publish-procedure! + 'ping + (lambda ()(real-ping acfg))) + (rpc:publish-procedure! + 'request + (lambda (from-addr from-port servkey action cookie dbname params) + (request acfg from-addr from-port servkey action cookie dbname params))) + (rpc:publish-procedure! + 'response + (lambda (cookie res-dat) + (deliver-response acfg cookie res-dat))) + (area-ready-set! acfg #t) + (area-conn-set! acfg conn) + ((rpc:make-server conn) #f)));; ((tcp-listen (rpc:default-server-port)) #t) + + +(define (launch acfg) ;; #!optional (proc std-peer-handler)) + (print "starting launch") + (update-known-servers acfg) ;; gotta do this on every start (thus why limit number of publicised servers) + #;(let ((original-handler (current-exception-handler))) ;; is th + (lambda (exception) + (server-exit-procedure) + (original-handler exception))) + (on-exit (lambda () + (shutdown acfg))) ;; (finalize-all-db-handles acfg))) + ;; set up the rpc handler + (let* ((th1 (make-thread + (lambda ()(start-server acfg)) + "server thread")) + (th2 (make-thread + (lambda () + (print "th2 starting") + (let loop () + (work-queue-processor acfg) + (print "work-queue-processor crashed!") + (loop))) + "work queue thread"))) + (thread-start! th1) + (thread-start! th2) + (let loop () + (thread-sleep! 0.025) + (if (area-ready acfg) + #t + (loop))) + ;; attempt to fix my address + (let* ((all-addr (get-all-ips-sorted))) ;; could use (tcp-addresses conn)? + (let loop ((rem-addrs all-addr)) + (if (null? rem-addrs) + (begin + (print "ERROR: Failed to figure out the ip address of myself as a server. Giving up.") + (exit 1)) ;; BUG Changeme to raising an exception + + (let* ((addr (car rem-addrs)) + (good-addr (handle-exceptions + exn + #f + ((rpc:procedure 'calling-addr addr (area-port acfg)))))) + (if good-addr + (begin + (print "Got good-addr of " good-addr) + (area-myaddr-set! acfg good-addr)) + (loop (cdr rem-addrs))))))) + (register-node acfg (area-myaddr acfg)(area-port acfg)) + (print "INFO: Server started on " (area-myaddr acfg) ":" (area-port acfg)) + ;; (update-known-servers acfg) ;; gotta do this on every start (thus why limit number of publicised servers) + )) + +(define (clear-server-pkt acfg) + (let ((pktf (area-pktfile acfg))) + (if pktf (delete-file* pktf)))) + +(define (shutdown acfg) + (let (;;(conn (area-conn acfg)) + (pktf (area-pktfile acfg)) + (port (area-port acfg))) + (if pktf (delete-file* pktf)) + (send-all "imshuttingdown") + ;; (rpc:close-all-connections!) ;; don't know if this is actually needed + (finalize-all-db-handles acfg))) + +(define (send-all msg) + #f) + +;; given a area record look up all the packets +;; +(define (get-all-server-pkts acfg) + (let ((all-pkt-files (glob (conc (area-pktsdir acfg) "/*.pkt")))) + (map (lambda (pkt-file) + (read-pkt->alist pkt-file pktspec: *pktspec*)) + all-pkt-files))) + +#;((Z . "9a0212302295a19610d5796fce0370fa130758e9") + (port . "34827") + (pid . "28748") + (hostname . "zeus") + (T . "server") + (D . "1549427032.0")) + +#;(define (get-my-best-address) + (let ((all-my-addresses (get-all-ips))) ;; (vector->list (hostinfo-addresses (hostname->hostinfo (get-host-name)))))) + (cond + ((null? all-my-addresses) + (get-host-name)) ;; no interfaces? + ((eq? (length all-my-addresses) 1) + (ip->string (car all-my-addresses))) ;; only one to choose from, just go with it + (else + (ip->string (car (filter (lambda (x) ;; take any but 127. + (not (eq? (u8vector-ref x 0) 127))) + all-my-addresses))))))) + +;; whoami? I am my pkt +;; +(define (whoami? acfg) + (hash-table-ref/default (area-hosts acfg)(area-pktid acfg) #f)) + +;;====================================================================== +;; "Client side" operations +;;====================================================================== + +(define (safe-call call-key host port . params) + (handle-exceptions + exn + (begin + (print "Call " call-key " to " host ":" port " failed") + #f) + (apply (rpc:procedure call-key host port) params))) + +;; ;; convert to/from string / sexpr +;; +;; (define (string->sexpr str) +;; (if (string? str) +;; (with-input-from-string str read) +;; str)) +;; +;; (define (sexpr->string s) +;; (with-output-to-string (lambda ()(write s)))) + +;; is the server alive? +;; +(define (ping acfg host port) + (let* ((myaddr (area-myaddr acfg)) + (myport (area-port acfg)) + (start-time (current-milliseconds)) + (res (if (and (equal? myaddr host) + (equal? myport port)) + (real-ping acfg) + ((rpc:procedure 'ping host port))))) + (cons (- (current-milliseconds) start-time) + res))) + +;; returns ( ipaddr port alist-fname=>randnum ) +(define (real-ping acfg) + `(,(area-myaddr acfg) ,(area-port acfg) ,(get-host-stats acfg))) + +;; is the server alive AND the queues processing? +;; +#;(define (full-ping acfg servpkt) + (let* ((start-time (current-milliseconds)) + (res (send-message acfg servpkt '(full-ping) 'full-ping))) + (cons (- (current-milliseconds) start-time) + res))) ;; (equal? res "got ping")))) + + +;; look up all pkts and get the server id (the hash), port, host/ip +;; store this info in acfg +;; return the number of responsive servers found +;; +;; DO NOT VERIFY THAT THE SERVER IS ALIVE HERE. This is called at times where the current server is not yet alive and cannot ping itself +;; +(define (update-known-servers acfg) + ;; readll all pkts + ;; foreach pkt; if it isn't me ping the server; if alive, add to hosts hash, else rm the pkt + (let* ((start-time (current-milliseconds)) + (all-pkts (delete-duplicates + (append (get-all-server-pkts acfg) + (hash-table-values (area-hosts acfg))))) + (hostshash (area-hosts acfg)) + (my-id (area-pktid acfg)) + (pktsdir (area-pktsdir acfg)) ;; needed to remove pkts from non-responsive servers + (numsrvs 0) + (delpkt (lambda (pktsdir sid) + (print "clearing out server " sid) + (delete-file* (conc pktsdir "/" sid ".pkt")) + (hash-table-delete! hostshash sid)))) + (area-last-srvup-set! acfg (current-seconds)) + (for-each + (lambda (servpkt) + (if (list? servpkt) + ;; (pp servpkt) + (let* ((shost (alist-ref 'ipaddr servpkt)) + (sport (any->number (alist-ref 'port servpkt))) + (res (handle-exceptions + exn + (begin + ;; (print "INFO: bad server on " shost ":" sport) + #f) + (ping acfg shost sport))) + (sid (alist-ref 'Z servpkt)) ;; Z code is our name for the server + (url (conc shost ":" sport)) + ) + #;(if (or (not res) + (null? res)) + (begin + (print "STRANGE: ping of " url " gave " res))) + + ;; (print "Got " res " from " shost ":" sport) + (match res + ((qduration . payload) + ;; (print "Server pkt:" (alist-ref 'ipaddr servpkt) ":" (alist-ref 'port servpkt) + ;; (if payload + ;; "Success" "Fail")) + (match payload + ((host port stats) + ;; (print "From " host ":" port " got stats: " stats) + (if (and host port stats) + (let ((url (conc host ":" port))) + (hash-table-set! hostshash sid servpkt) + ;; store based on host:port + (hash-table-set! (area-hoststats acfg) sid stats)) + (print "missing data from the server, not sure what that means!")) + (set! numsrvs (+ numsrvs 1))) + (#f + (print "Removing pkt " sid " due to #f from server or failed ping") + (delpkt pktsdir sid)) + (else + (print "Got ")(pp res)(print " from server ")(pp servpkt) " but response did not match (#f/#t . msg)"))) + (else + ;; here we delete the pkt - can't reach the server, remove it + ;; however this logic is inadequate. we should mark the server as checked + ;; and not good, if it happens a second time - then remove the pkt + ;; or something similar. I.e. don't be too quick to assume the server is wedged or dead + ;; could be it is simply too busy to reply + (let ((bad-pings (hash-table-ref/default (area-health acfg) url 0))) + (if (> bad-pings 1) ;; two bad pings - remove pkt + (begin + (print "INFO: " bad-pings " bad responses from " url ", deleting pkt " sid) + (delpkt pktsdir sid)) + (begin + (print "INFO: " bad-pings " bad responses from " shost ":" sport " not deleting pkt yet") + (hash-table-set! (area-health acfg) + url + (+ (hash-table-ref/default (area-health acfg) url 0) 1)) + )) + )))) + ;; servpkt is not actually a pkt? + (begin + (print "Bad pkt " servpkt)))) + all-pkts) + (sdbg> "update-known-servers" "end" start-time #f #f " found " numsrvs + " servers, pkts: " (map (lambda (p) + (alist-ref 'Z p)) + all-pkts)) + numsrvs)) + +(defstruct srvstat + (numfiles 0) ;; number of db files handled by this server - subtract 1 for the db being currently looked at + (randnum #f) ;; tie breaker number assigned to by the server itself - applies only to the db under consideration + (pkt #f)) ;; the server pkt + +;;(define (srv->srvstat srvpkt) + +;; Get the server best for given dbname and key +;; +;; NOTE: key is not currently used. The key points to the kind of query, this may be useful for directing read-only queries. +;; +(define (get-best-server acfg dbname key) + (let* (;; (servers (hash-table-values (area-hosts acfg))) + (servers (area-hosts acfg)) + (skeys (sort (hash-table-keys servers) string>=?)) ;; a stable listing + (start-time (current-milliseconds)) + (srvstats (make-hash-table)) ;; srvid => srvstat + (url (conc (area-myaddr acfg) ":" (area-port acfg)))) + ;; (print "scores for " dbname ": " (map (lambda (k)(cons k (calc-server-score acfg dbname k))) skeys)) + (if (null? skeys) + (if (> (update-known-servers acfg) 0) + (get-best-server acfg dbname key) ;; some risk of infinite loop here, TODO add try counter + (begin + (print "ERROR: no server found!") ;; since this process is also a server this should never happen + #f)) + (begin + ;; (print "in get-best-server with skeys=" skeys) + (if (> (- (current-seconds) (area-last-srvup acfg)) 10) + (begin + (update-known-servers acfg) + (sdbg> "get-best-server" "update-known-servers" start-time #f #f))) + + ;; for each server look at the list of dbfiles, total number of dbs being handled + ;; and the rand number, save the best host + ;; also do a delist-db for each server dbfile not used + (let* ((best-server #f) + (servers-to-delist (make-hash-table))) + (for-each + (lambda (srvid) + (let* ((server (hash-table-ref/default servers srvid #f)) + (stats (hash-table-ref/default (area-hoststats acfg) srvid '(())))) + ;; (print "stats: " stats) + (if server + (let* ((dbweights (car stats)) + (srvload (length (filter (lambda (x)(not (equal? dbname (car x)))) dbweights))) + (dbrec (alist-ref dbname dbweights equal?)) ;; get the pair with fname . randscore + (randnum (if dbrec + dbrec ;; (cdr dbrec) + 0))) + (hash-table-set! srvstats srvid (make-srvstat numfiles: srvload randnum: randnum pkt: server)))))) + skeys) + + (let* ((sorted (sort (hash-table-values srvstats) + (lambda (a b) + (let ((numfiles-a (srvstat-numfiles a)) + (numfiles-b (srvstat-numfiles b)) + (randnum-a (srvstat-randnum a)) + (randnum-b (srvstat-randnum b))) + (if (< numfiles-a numfiles-b) ;; Note, I don't think adding an offset works here. Goal was only move file handling to a different server if it has 2 less + #t + (if (and (equal? numfiles-a numfiles-b) + (< randnum-a randnum-b)) + #t + #f)))))) + (best (if (null? sorted) + (begin + (print "ERROR: should never be null due to self as server.") + #f) + (srvstat-pkt (car sorted))))) + #;(print "SERVER(" url "): " dbname ": " (map (lambda (srv) + (let ((p (srvstat-pkt srv))) + (conc (alist-ref 'ipaddr p) ":" (alist-ref 'port p) + "(" (srvstat-numfiles srv)","(srvstat-randnum srv)")"))) + sorted)) + best)))))) + + ;; send out an "I'm about to exit notice to all known servers" + ;; +(define (death-imminent acfg) + '()) + +;;====================================================================== +;; U L E X - T H E I N T E R E S T I N G S T U F F ! ! +;;====================================================================== + +;; register a handler +;; NOTES: +;; dbinitsql is reserved for a list of sql statements for initializing the db +;; dbinitfn is reserved for a db init function, if exists called after dbinitsql +;; +(define (register acfg key obj #!optional (ctype 'dbwrite)) + (let ((ht (area-rtable acfg))) + (if (hash-table-exists? ht key) + (print "WARNING: redefinition of entry " key)) + (hash-table-set! ht key (make-calldat obj: obj ctype: ctype)))) + +;; usage: register-batch acfg '((key1 . sql1) (key2 . sql2) ... ) +;; NB// obj is often an sql query +;; +(define (register-batch acfg ctype data) + (let ((ht (area-rtable acfg))) + (map (lambda (dat) + (hash-table-set! ht (car dat)(make-calldat obj: (cdr dat) ctype: ctype))) + data))) + +(define (initialize-area-calls-from-specfile area specfile) + (let* ((callspec (with-input-from-file specfile read ))) + (for-each (lambda (group) + (register-batch + area + (car group) + (cdr group))) + callspec))) + +;; get-rentry +;; +(define (get-rentry acfg key) + (hash-table-ref/default (area-rtable acfg) key #f)) + +(define (get-rsql acfg key) + (let ((cdat (get-rentry acfg key))) + (if cdat + (calldat-obj cdat) + #f))) + + + +;; blocking call: +;; client server +;; ------ ------ +;; call() +;; send-message() +;; nmsg-send() +;; nmsg-receive() +;; nmsg-respond(ack,cookie) +;; ack, cookie +;; mbox-thread-wait(cookie) +;; nmsg-send(client,cookie,result) +;; nmsg-respond(ack) +;; return result +;; +;; reserved action: +;; 'immediate +;; 'dbinitsql +;; +(define (call acfg dbname action params #!optional (count 0)) + (let* ((call-start-time (current-milliseconds)) + (srv (get-best-server acfg dbname action)) + (post-get-start-time (current-milliseconds)) + (rdat (hash-table-ref/default (area-rtable acfg) action #f)) + (myid (trim-pktid (area-pktid acfg))) + (srvid (trim-pktid (alist-ref 'Z srv))) + (cookie (make-cookie myid))) + (sdbg> "call" "get-best-server" call-start-time #f call-start-time " from: " myid " to server: " srvid " for " dbname " action: " action " params: " params " rdat: " rdat) + (print "INFO: call to " (alist-ref 'ipaddr srv) ":" (alist-ref 'port srv) " from " (area-myaddr acfg) ":" (area-port acfg) " for " dbname) + (if (and srv rdat) ;; need both to dispatch a request + (let* ((ripaddr (alist-ref 'ipaddr srv)) + (rsrvid (alist-ref 'Z srv)) + (rport (any->number (alist-ref 'port srv))) + (res-full (if (and (equal? ripaddr (area-myaddr acfg)) + (equal? rport (area-port acfg))) + (request acfg ripaddr rport (area-pktid acfg) action cookie dbname params) + (safe-call 'request ripaddr rport + (area-myaddr acfg) + (area-port acfg) + #;(area-pktid acfg) + rsrvid + action cookie dbname params)))) + ;; (print "res-full: " res-full) + (match res-full + ((response-ok response-msg rem ...) + (let* ((send-message-time (current-milliseconds)) + ;; (match res-full + ;; ((response-ok response-msg) + ;; (response-ok (car res-full)) + ;; (response-msg (cadr res-full) + ) + ;; (res (take res-full 3))) ;; ctype == action, TODO: converge on one term <<=== what was this? BUG + ;; (print "ulex:call: send-message took " (- send-message-time post-get-start-time) " ms params=" params) + (sdbg> "call" "send-message" post-get-start-time #f call-start-time) + (cond + ((not response-ok) #f) + ((member response-msg '("db read submitted" "db write submitted")) + (let* ((cookie-id (cadddr res-full)) + (mbox (make-mailbox)) + (mbox-time (current-milliseconds))) + (hash-table-set! (area-cookie2mbox acfg) cookie-id mbox) + (let* ((mbox-timeout-secs 20) + (mbox-timeout-result 'MBOX_TIMEOUT) + (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result)) + (mbox-receive-time (current-milliseconds))) + (hash-table-delete! (area-cookie2mbox acfg) cookie-id) + (sdbg> "call" "mailbox-receive" mbox-time #f call-start-time " from: " myid " to server: " srvid " for " dbname) + ;; (print "ulex:call mailbox-receive took " (- mbox-receive-time mbox-time) "ms params=" params) + res))) + (else + (print "Unhandled response \""response-msg"\"") + #f)) + ;; depending on what action (i.e. ctype) is we will block here waiting for + ;; all the data (mechanism to be determined) + ;; + ;; if res is a "working on it" then wait + ;; wait for result + ;; mailbox thread wait on + + ;; if res is a "can't help you" then try a different server + ;; if res is a "ack" (e.g. for one-shot requests) then return res + )) + (else + (if (< count 10) + (let* ((url (conc (alist-ref 'ipaddr srv) ":" (alist-ref 'port srv)))) + (thread-sleep! 1) + (print "ERROR: Bad result from " url ", dbname: " dbname ", action: " action ", params: " params ". Trying again in 1 second.") + (call acfg dbname action params (+ count 1))) + (begin + (error (conc "ERROR: " count " tries, still have improper response res-full=" res-full))))))) + (begin + (if (not rdat) + (print "ERROR: action " action " not registered.") + (if (< count 10) + (begin + (thread-sleep! 1) + (area-hosts-set! acfg (make-hash-table)) ;; clear out all known hosts + (print "ERROR: no server found, srv=" srv ", trying again in 1 seconds") + (call acfg dbname action params (+ count 1))) + (begin + (error (conc "ERROR: no server found after 10 tries, srv=" srv ", giving up.")) + #;(error "No server available")))))))) + + +;;====================================================================== +;; U T I L I T I E S +;;====================================================================== + +;; get a signature for identifing this process +;; +(define (get-process-signature) + (cons (get-host-name)(current-process-id))) + +;;====================================================================== +;; S Y S T E M S T U F F +;;====================================================================== + +;; get normalized cpu load by reading from /proc/loadavg and +;; /proc/cpuinfo return all three values and the number of real cpus +;; and the number of threads returns alist '((adj-cpu-load +;; . normalized-proc-load) ... etc. keys: adj-proc-load, +;; adj-core-load, 1m-load, 5m-load, 15m-load +;; +(define (get-normalized-cpu-load) + (let ((res (get-normalized-cpu-load-raw)) + (default `((adj-proc-load . 2) ;; there is no right answer + (adj-core-load . 2) + (1m-load . 2) + (5m-load . 0) ;; causes a large delta - thus causing default of throttling if stuff goes wrong + (15m-load . 0) + (proc . 1) + (core . 1) + (phys . 1) + (error . #t)))) + (cond + ((and (list? res) + (> (length res) 2)) + res) + ((eq? res #f) default) ;; add messages? + ((eq? res #f) default) ;; this would be the #eof + (else default)))) + +(define (get-normalized-cpu-load-raw) + (let* ((actual-host (get-host-name))) ;; #f is localhost + (let ((data (append + (with-input-from-file "/proc/loadavg" read-lines) + (with-input-from-file "/proc/cpuinfo" read-lines) + (list "end"))) + (load-rx (regexp "^([\\d\\.]+)\\s+([\\d\\.]+)\\s+([\\d\\.]+)\\s+.*$")) + (proc-rx (regexp "^processor\\s+:\\s+(\\d+)\\s*$")) + (core-rx (regexp "^core id\\s+:\\s+(\\d+)\\s*$")) + (phys-rx (regexp "^physical id\\s+:\\s+(\\d+)\\s*$")) + (max-num (lambda (p n)(max (string->number p) n)))) + ;; (print "data=" data) + (if (null? data) ;; something went wrong + #f + (let loop ((hed (car data)) + (tal (cdr data)) + (loads #f) + (proc-num 0) ;; processor includes threads + (phys-num 0) ;; physical chip on motherboard + (core-num 0)) ;; core + ;; (print hed ", " loads ", " proc-num ", " phys-num ", " core-num) + (if (null? tal) ;; have all our data, calculate normalized load and return result + (let* ((act-proc (+ proc-num 1)) + (act-phys (+ phys-num 1)) + (act-core (+ core-num 1)) + (adj-proc-load (/ (car loads) act-proc)) + (adj-core-load (/ (car loads) act-core)) + (result + (append (list (cons 'adj-proc-load adj-proc-load) + (cons 'adj-core-load adj-core-load)) + (list (cons '1m-load (car loads)) + (cons '5m-load (cadr loads)) + (cons '15m-load (caddr loads))) + (list (cons 'proc act-proc) + (cons 'core act-core) + (cons 'phys act-phys))))) + result) + (regex-case + hed + (load-rx ( x l1 l5 l15 ) (loop (car tal)(cdr tal)(map string->number (list l1 l5 l15)) proc-num phys-num core-num)) + (proc-rx ( x p ) (loop (car tal)(cdr tal) loads (max-num p proc-num) phys-num core-num)) + (phys-rx ( x p ) (loop (car tal)(cdr tal) loads proc-num (max-num p phys-num) core-num)) + (core-rx ( x c ) (loop (car tal)(cdr tal) loads proc-num phys-num (max-num c core-num))) + (else + (begin + ;; (print "NO MATCH: " hed) + (loop (car tal)(cdr tal) loads proc-num phys-num core-num)))))))))) + +(define (get-host-stats acfg) + (let ((stats-hash (area-stats acfg))) + ;; use this opportunity to remove references to dbfiles which have not been accessed in a while + (for-each + (lambda (dbname) + (let* ((stats (hash-table-ref stats-hash dbname)) + (last-access (stat-when stats))) + (if (and (> last-access 0) ;; if zero then there has been no access + (> (- (current-seconds) last-access) 10)) ;; not used in ten seconds + (begin + (print "Removing " dbname " from stats list") + (hash-table-delete! stats-hash dbname) ;; remove from stats hash + (stat-dbs-set! stats (hash-table-keys stats)))))) + (hash-table-keys stats-hash)) + + `(,(hash-table->alist (area-dbs acfg)) ;; dbname => randnum + ,(map (lambda (dbname) ;; dbname is the db name + (cons dbname (stat-when (hash-table-ref stats-hash dbname)))) + (hash-table-keys stats-hash)) + (cpuload . ,(get-normalized-cpu-load))))) + #;(stats . ,(map (lambda (k) ;; create an alist from the stats data + (cons k (stat->alist (hash-table-ref (area-stats acfg) k)))) + (hash-table-keys (area-stats acfg)))) + +#;(trace + ;; assv + ;; cdr + ;; caar + ;; ;; cdr + ;; call + ;; finalize-all-db-handles + ;; get-all-server-pkts + ;; get-normalized-cpu-load + ;; get-normalized-cpu-load-raw + ;; launch + ;; nmsg-send + ;; process-db-queries + ;; receive-message + ;; std-peer-handler + ;; update-known-servers + ;; work-queue-processor + ) + +;;====================================================================== +;; netutil +;; move this back to ulex-netutil.scm someday? +;;====================================================================== + +;; #include +;; #include +;; #include +;; #include + +(foreign-declare "#include \"sys/types.h\"") +(foreign-declare "#include \"sys/socket.h\"") +(foreign-declare "#include \"ifaddrs.h\"") +(foreign-declare "#include \"arpa/inet.h\"") + +;; get IP addresses from ALL interfaces +(define get-all-ips + (foreign-safe-lambda* scheme-object () + " + +// from https://stackoverflow.com/questions/17909401/linux-c-get-default-interfaces-ip-address : + + + C_word lst = C_SCHEME_END_OF_LIST, len, str, *a; +// struct ifaddrs *ifa, *i; +// struct sockaddr *sa; + + struct ifaddrs * ifAddrStruct = NULL; + struct ifaddrs * ifa = NULL; + void * tmpAddrPtr = NULL; + + if ( getifaddrs(&ifAddrStruct) != 0) + C_return(C_SCHEME_FALSE); + +// for (i = ifa; i != NULL; i = i->ifa_next) { + for (ifa = ifAddrStruct; ifa != NULL; ifa = ifa->ifa_next) { + if (ifa->ifa_addr->sa_family==AF_INET) { // Check it is + // a valid IPv4 address + tmpAddrPtr = &((struct sockaddr_in *)ifa->ifa_addr)->sin_addr; + char addressBuffer[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, tmpAddrPtr, addressBuffer, INET_ADDRSTRLEN); +// printf(\"%s IP Address %s\\n\", ifa->ifa_name, addressBuffer); + len = strlen(addressBuffer); + a = C_alloc(C_SIZEOF_PAIR + C_SIZEOF_STRING(len)); + str = C_string(&a, len, addressBuffer); + lst = C_a_pair(&a, str, lst); + } + +// else if (ifa->ifa_addr->sa_family==AF_INET6) { // Check it is +// // a valid IPv6 address +// tmpAddrPtr = &((struct sockaddr_in6 *)ifa->ifa_addr)->sin6_addr; +// char addressBuffer[INET6_ADDRSTRLEN]; +// inet_ntop(AF_INET6, tmpAddrPtr, addressBuffer, INET6_ADDRSTRLEN); +//// printf(\"%s IP Address %s\\n\", ifa->ifa_name, addressBuffer); +// len = strlen(addressBuffer); +// a = C_alloc(C_SIZEOF_PAIR + C_SIZEOF_STRING(len)); +// str = C_string(&a, len, addressBuffer); +// lst = C_a_pair(&a, str, lst); +// } + +// else { +// printf(\" not an IPv4 address\\n\"); +// } + + } + + freeifaddrs(ifa); + C_return(lst); + +")) + +;; Change this to bias for addresses with a reasonable broadcast value? +;; +(define (ip-pref-less? a b) + (let* ((rate (lambda (ipstr) + (regex-case ipstr + ( "^127\\." _ 0 ) + ( "^(10\\.0|192\\.168\\.)\\..*" _ 1 ) + ( else 2 ) )))) + (< (rate a) (rate b)))) + + +(define (get-my-best-address) + (let ((all-my-addresses (get-all-ips)) + ;;(all-my-addresses-old (vector->list (hostinfo-addresses (hostname->hostinfo (get-host-name))))) + ) + (cond + ((null? all-my-addresses) + (get-host-name)) ;; no interfaces? + ((eq? (length all-my-addresses) 1) + (car all-my-addresses)) ;; only one to choose from, just go with it + + (else + (car (sort all-my-addresses ip-pref-less?))) + ;; (else + ;; (ip->string (car (filter (lambda (x) ;; take any but 127. + ;; (not (eq? (u8vector-ref x 0) 127))) + ;; all-my-addresses)))) + + ))) + +(define (get-all-ips-sorted) + (sort (get-all-ips) ip-pref-less?)) + + +) ADDED ulex/ulex.setup Index: ulex/ulex.setup ================================================================== --- /dev/null +++ ulex/ulex.setup @@ -0,0 +1,11 @@ +;; Copyright 2007-2018, Matthew Welland. +;; +;; This program is made available under the GNU GPL version 2.0 or +;; greater. See the accompanying file COPYING for details. +;; +;; This program is distributed WITHOUT ANY WARRANTY; without even the +;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +;; PURPOSE. + +;;;; ulex.setup +(standard-extension 'ulex "0.1") ADDED ulex/ulex_europaeus-branch.jpg Index: ulex/ulex_europaeus-branch.jpg ================================================================== --- /dev/null +++ ulex/ulex_europaeus-branch.jpg cannot compute difference between binary files ADDED ulex/write-cycle.fig Index: ulex/write-cycle.fig ================================================================== --- /dev/null +++ ulex/write-cycle.fig @@ -0,0 +1,186 @@ +#FIG 3.2 Produced by xfig version 3.2.5-alpha5 +Landscape +Center +Inches +Letter +100.00 +Single +-2 +1200 2 +0 32 #c5b696 +0 33 #eef7fe +0 34 #dbcaa5 +0 35 #404040 +0 36 #808080 +0 37 #bfbfbf +0 38 #dfdfdf +0 39 #8d8e8d +0 40 #a9a9a9 +0 41 #555555 +0 42 #c6c2c6 +0 43 #565151 +0 44 #8d8d8d +0 45 #d6d6d6 +0 46 #84807d +0 47 #d1d1d1 +0 48 #3a3a3a +0 49 #4573a9 +0 50 #adadad +0 51 #7b79a4 +0 52 #444444 +0 53 #73758b +0 54 #f6f6f6 +0 55 #414541 +0 56 #635dcd +0 57 #bdbdbd +0 58 #515151 +0 59 #e6e2e6 +0 60 #000049 +0 61 #797979 +0 62 #303430 +0 63 #414141 +0 64 #c6b595 +6 11775 7350 12750 9000 +6 11775 7350 12750 8775 +5 1 0 1 -1 -1 0 0 -1 0.000 0 1 0 0 12240.000 7050.000 11790 7650 12240 7800 12690 7650 +5 1 0 1 -1 -1 0 0 -1 0.000 0 1 0 0 12240.000 7950.000 11790 8550 12240 8700 12690 8550 +1 2 0 1 -1 -1 0 0 -1 0.000 1 0.0000 12240 7500 450 150 11790 7350 12690 7650 +2 1 0 1 -1 -1 0 0 -1 0.000 0 0 0 0 0 2 + 12690 7575 12690 8550 +2 1 0 1 -1 -1 0 0 -1 0.000 0 0 0 0 0 2 + 11790 7575 11790 8550 +4 0 0 50 -1 0 12 0.0000 4 150 210 12075 8250 db\001 +-6 +4 0 0 50 -1 0 12 0.0000 4 150 690 12000 9000 main.db\001 +-6 +6 7950 6975 9375 7575 +2 2 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 5 + 7950 6975 9375 6975 9375 7575 7950 7575 7950 6975 +4 0 0 50 -1 0 12 0.0000 4 195 1335 8100 7350 send-responses\001 +-6 +6 450 10950 1425 12600 +6 450 10950 1425 12375 +5 1 0 1 -1 -1 0 0 -1 0.000 0 1 0 0 915.000 10650.000 465 11250 915 11400 1365 11250 +5 1 0 1 -1 -1 0 0 -1 0.000 0 1 0 0 915.000 11550.000 465 12150 915 12300 1365 12150 +1 2 0 1 -1 -1 0 0 -1 0.000 1 0.0000 915 11100 450 150 465 10950 1365 11250 +2 1 0 1 -1 -1 0 0 -1 0.000 0 0 0 0 0 2 + 1365 11175 1365 12150 +2 1 0 1 -1 -1 0 0 -1 0.000 0 0 0 0 0 2 + 465 11175 465 12150 +4 0 0 50 -1 0 12 0.0000 4 150 210 750 11850 db\001 +-6 +4 0 0 50 -1 0 12 0.0000 4 150 690 675 12600 main.db\001 +-6 +6 4800 15525 5775 16950 +5 1 0 1 -1 -1 0 0 -1 0.000 0 1 0 0 5265.000 15225.000 4815 15825 5265 15975 5715 15825 +5 1 0 1 -1 -1 0 0 -1 0.000 0 1 0 0 5265.000 16125.000 4815 16725 5265 16875 5715 16725 +1 2 0 1 -1 -1 0 0 -1 0.000 1 0.0000 5265 15675 450 150 4815 15525 5715 15825 +2 1 0 1 -1 -1 0 0 -1 0.000 0 0 0 0 0 2 + 5715 15750 5715 16725 +2 1 0 1 -1 -1 0 0 -1 0.000 0 0 0 0 0 2 + 4815 15750 4815 16725 +4 0 0 50 -1 0 12 0.0000 4 150 210 5100 16425 db\001 +-6 +6 8025 12750 9000 14175 +5 1 0 1 -1 -1 0 0 -1 0.000 0 1 0 0 8490.000 12450.000 8040 13050 8490 13200 8940 13050 +5 1 0 1 -1 -1 0 0 -1 0.000 0 1 0 0 8490.000 13350.000 8040 13950 8490 14100 8940 13950 +1 2 0 1 -1 -1 0 0 -1 0.000 1 0.0000 8490 12900 450 150 8040 12750 8940 13050 +2 1 0 1 -1 -1 0 0 -1 0.000 0 0 0 0 0 2 + 8940 12975 8940 13950 +2 1 0 1 -1 -1 0 0 -1 0.000 0 0 0 0 0 2 + 8040 12975 8040 13950 +4 0 0 50 -1 0 12 0.0000 4 150 210 8325 13650 db\001 +-6 +1 3 0 1 0 7 50 -1 -1 0.000 1 0.0000 2325 12675 645 645 2325 12675 2850 13050 +1 3 0 1 0 7 50 -1 -1 0.000 1 0.0000 6075 11025 645 645 6075 11025 6600 11400 +1 3 0 1 0 7 50 -1 -1 0.000 1 0.0000 6600 13950 645 645 6600 13950 7125 14325 +1 3 0 1 0 7 50 -1 -1 0.000 1 0.0000 3750 16650 645 645 3750 16650 4275 17025 +2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 1 0 2 + 0 0 1.00 60.00 120.00 + 2625 2250 7575 2250 +2 2 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 5 + 7575 1875 9525 1875 9525 3750 7575 3750 7575 1875 +2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 1 0 2 + 0 0 1.00 60.00 120.00 + 8250 2400 8250 4275 +2 2 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 5 + 7575 4275 9525 4275 9525 5100 7575 5100 7575 4275 +2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 1 0 2 + 0 0 1.00 60.00 120.00 + 7575 4650 2625 4650 +2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 1 0 3 + 0 0 1.00 60.00 120.00 + 9525 4650 10275 4650 10275 5175 +2 2 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 5 + 9975 5175 10650 5175 10650 6975 9975 6975 9975 5175 +2 2 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 5 + 9975 6975 10650 6975 10650 7575 9975 7575 9975 6975 +2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 1 0 3 + 0 0 1.00 60.00 120.00 + 10650 7125 12000 7125 12150 7350 +2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 1 0 3 + 0 0 1.00 60.00 120.00 + 11925 7350 11850 7200 10650 7200 +2 2 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 5 + 675 1875 2625 1875 2625 5025 675 5025 675 1875 +2 2 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 5 + 675 5025 2625 5025 2625 6000 675 6000 675 5025 +2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 1 0 2 + 0 0 1.00 60.00 120.00 + 1575 4800 1575 5325 +2 2 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 5 + 3375 5100 5250 5100 5250 6300 3375 6300 3375 5100 +2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 1 0 4 + 0 0 1.00 60.00 120.00 + 7950 7275 6525 7275 6525 5700 5250 5700 +2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 1 0 2 + 0 0 1.00 60.00 120.00 + 3375 5700 2625 5700 +2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 1 0 2 + 0 0 1.00 60.00 120.00 + 1575 6000 1575 6825 +2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 1 0 2 + 0 0 1.00 60.00 120.00 + 9975 7275 9375 7275 +2 2 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 5 + 450 1500 5775 1500 5775 7800 450 7800 450 1500 +2 2 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 5 + 6075 1500 11325 1500 11325 7800 6075 7800 6075 1500 +2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 2 + 5925 375 5925 9675 +2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 2 + 1800 12225 1275 11775 +2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 2 + 5550 11325 2850 12375 +2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 2 + 5925 13875 2925 12900 +2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 2 + 3525 16050 2625 13275 +2 1 0 1 12 7 50 -1 -1 0.000 0 0 -1 0 0 2 + 7200 13800 8025 13500 +2 1 0 1 1 7 50 -1 -1 0.000 0 0 -1 0 0 2 + 4350 16425 4800 16200 +2 1 0 1 12 7 50 -1 -1 0.000 0 0 -1 0 0 2 + 6225 11700 6525 13350 +2 1 0 1 1 7 50 -1 -1 0.000 0 0 -1 0 0 2 + 5850 11700 3975 16050 +2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 2 + 4350 16575 4800 16350 +4 0 0 50 -1 0 12 0.0000 4 150 990 7575 1800 ulex:launch\001 +4 0 0 50 -1 0 12 0.0000 4 150 720 750 1800 ulex:call\001 +4 0 0 50 -1 0 12 0.0000 4 195 1230 1200 2325 send-message\001 +4 0 0 50 -1 0 12 0.0000 4 195 1470 7650 2325 receive-message\001 +4 0 0 50 -1 0 12 0.0000 4 195 1410 7725 4575 std-peer-handler\001 +4 0 0 50 -1 0 12 0.0000 4 195 2160 3600 4500 '(#t "info msg" )\001 +4 0 0 50 -1 0 12 0.0000 4 150 450 10725 5625 work\001 +4 0 0 50 -1 0 12 0.0000 4 150 525 10725 5880 queue\001 +4 0 0 50 -1 0 12 0.0000 4 150 1290 750 5775 mailbox - waits\001 +4 0 0 50 -1 0 12 0.0000 4 150 990 3525 5400 ulex:launch\001 +4 0 0 50 -1 0 12 0.0000 4 195 1470 3525 6000 receive-message\001 +4 0 0 50 -1 0 12 0.0000 4 150 480 1200 6975 result\001 +4 0 0 50 -1 0 12 0.0000 4 165 1185 1500 13500 megatest -run\001 +4 0 0 50 -1 0 12 0.0000 4 150 900 6375 11925 dashboard\001 +4 0 0 50 -1 0 12 0.0000 4 165 1590 6375 14925 megatest -execute\001 +4 0 0 50 -1 0 12 0.0000 4 195 2040 3150 17625 megatest -remove-keep\001 +4 0 0 50 -1 0 12 0.0000 4 150 375 8250 14400 1.db\001 +4 0 0 50 -1 0 12 0.0000 4 150 375 5025 17175 2.db\001