Comment: | Added ulex as compilation unit/module |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | v1.70-refactor01 | v1.70-defunct-try |
Files: | files | file ages | folders |
SHA1: |
59e9724ea34d49f44be47987f374323d |
User & Date: | matt on 2019-12-16 04:20:15 |
Other Links: | branch diff | manifest | tags |
2019-12-16
| ||
22:57 | Added stml2 as compilation unit/module check-in: 78408a15fb user: matt tags: v1.70-refactor01, v1.70-defunct-try | |
04:20 | Added ulex as compilation unit/module check-in: 59e9724ea3 user: matt tags: v1.70-refactor01, v1.70-defunct-try | |
04:17 | Pulled in ulex check-in: be8fe269fa user: matt tags: v1.70-defunct-try | |
2019-12-15
| ||
23:03 | Removed unneeded use of mtconfigf and margs in megatest.scm check-in: e99bb6366e user: matt tags: v1.70-refactor01, v1.70-defunct-try | |
Modified Makefile from [ce31b5458b] to [f1f1b56683].
︙ | ︙ | |||
27 28 29 30 31 32 33 | # removed from MSRCFILES: ftail.scm # module source files MSRCFILES = dbmod.scm rmtmod.scm commonmod.scm apimod.scm \ archivemod.scm clientmod.scm envmod.scm ezstepsmod.scm itemsmod.scm \ keysmod.scm launchmod.scm odsmod.scm processmod.scm runconfigmod.scm \ runsmod.scm servermod.scm subrunmod.scm tasksmod.scm testsmod.scm \ | | > | 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | # removed from MSRCFILES: ftail.scm # module source files MSRCFILES = dbmod.scm rmtmod.scm commonmod.scm apimod.scm \ archivemod.scm clientmod.scm envmod.scm ezstepsmod.scm itemsmod.scm \ keysmod.scm launchmod.scm odsmod.scm processmod.scm runconfigmod.scm \ runsmod.scm servermod.scm subrunmod.scm tasksmod.scm testsmod.scm \ pkts.scm mtargs.scm mtconfigf.scm ducttape-lib.scm ulex.scm \ megamod.scm GMSRCFILES = dcommonmod.scm vgmod.scm treemod.scm # Eggs to install (straightforward ones) EGGS=matchable readline apropos base64 regex-literals format \ regex-case test coops trace csv dot-locking posix-utils posix-extras \ |
︙ | ︙ | |||
63 64 65 66 67 68 69 70 71 72 73 74 75 76 | # mofiles/ducttape-lib.o : ducttape-lib.scm ducttape/*scm # csc -I ducttape -J -c ducttape-lib.scm -o mofiles/ducttape-lib.o mofiles/%.o %.import.scm : %.scm mkdir -p mofiles csc $(CSCOPTS) -J -c $< -o mofiles/$*.o # a.import.o : a.import.scm a.o # csc -unit a.import -c a.import.scm -o $*.o ADTLSCR=mt_laststep mt_runstep mt_ezstep HELPERS=$(addprefix $(PREFIX)/bin/,$(ADTLSCR)) DEPLOYHELPERS=$(addprefix deploytarg/,$(ADTLSCR)) | > | 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 | # mofiles/ducttape-lib.o : ducttape-lib.scm ducttape/*scm # csc -I ducttape -J -c ducttape-lib.scm -o mofiles/ducttape-lib.o mofiles/%.o %.import.scm : %.scm mkdir -p mofiles csc $(CSCOPTS) -J -c $< -o mofiles/$*.o touch $*.import.scm # ensure it is touched after the .o is made # a.import.o : a.import.scm a.o # csc -unit a.import -c a.import.scm -o $*.o ADTLSCR=mt_laststep mt_runstep mt_ezstep HELPERS=$(addprefix $(PREFIX)/bin/,$(ADTLSCR)) DEPLOYHELPERS=$(addprefix deploytarg/,$(ADTLSCR)) |
︙ | ︙ | |||
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 | dcommon.o : run_records.scm migrate-fix.scm # special include based modules mofiles/pkts.o : pkts/pkts.scm mofiles/mtargs.o : mtargs/mtargs.scm mofiles/mtconfigf.o : mtconfigf/mtconfigf.scm # mofile/ducttape-lib.o : ducttape/ducttape-lib.scm # Temporary while transitioning to new routine # runs.o : run-tests-queue-classic.scm run-tests-queue-new.scm # for the modularized stuff mofiles/commonmod.o : megatest-fossil-hash.scm | > | > | > | 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 | dcommon.o : run_records.scm migrate-fix.scm # special include based modules mofiles/pkts.o : pkts/pkts.scm mofiles/mtargs.o : mtargs/mtargs.scm mofiles/mtconfigf.o : mtconfigf/mtconfigf.scm mofiles/ulex.o : ulex/ulex.scm # mofile/ducttape-lib.o : ducttape/ducttape-lib.scm # Temporary while transitioning to new routine # runs.o : run-tests-queue-classic.scm run-tests-queue-new.scm # for the modularized stuff mofiles/commonmod.o : megatest-fossil-hash.scm mofiles/dbmod.o : mofiles/commonmod.o mofiles/keysmod.o \ mofiles/tasksmod.o mofiles/odsmod.o mofiles/commonmod.o : mofiles/processmod.o mofiles/rmtmod.o : mofiles/dbmod.o mofiles/commonmod.o \ mofiles/apimod.o mofiles/ulex.o mofiles/apimod.o : mofiles/dbmod.o mofiles/runsmod.o : mofiles/testsmod.o # Removed from megamod.o dep: mofiles/ftail.o mofiles/megamod.o : \ mofiles/rmtmod.o \ mofiles/commonmod.o \ |
︙ | ︙ |
Modified megatest.scm from [5f51ceacb7] to [5fdbb6ccd1].
︙ | ︙ | |||
20 21 22 23 24 25 26 | ;; (include "megatest-version.scm") ;; fake out readline usage of toplevel-command (define (toplevel-command . a) #f) (use (prefix sqlite3 sqlite3:) srfi-1 posix regex regex-case srfi-69 (prefix base64 base64:) readline apropos json http-client directory-utils typed-records | | < | 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | ;; (include "megatest-version.scm") ;; fake out readline usage of toplevel-command (define (toplevel-command . a) #f) (use (prefix sqlite3 sqlite3:) srfi-1 posix regex regex-case srfi-69 (prefix base64 base64:) readline apropos json http-client directory-utils typed-records http-client srfi-18 extras format) ;; Added for csv stuff - will be removed ;; (use sparse-vectors) (require-library mutils) |
︙ | ︙ | |||
699 700 701 702 703 704 705 | (printf "Sending signal/term to ~A\n" pid) (process-signal pid signal/term)))))) (process:children #f)) (original-exit exit-code))))) ;; for some switches always print the command to stderr ;; | | | | 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 | (printf "Sending signal/term to ~A\n" pid) (process-signal pid signal/term)))))) (process:children #f)) (original-exit exit-code))))) ;; for some switches always print the command to stderr ;; (if (args:any-defined? "-run" "-runall" "-remove-runs" "-set-state-status" "-kill-runs" "-kill-rerun") (debug:print 0 *default-log-port* (string-intersperse (argv) " "))) ;; some switches imply homehost. Exit here if not on homehost ;; (let ((homehost-required (list "-cleanup-db" "-server"))) (if (apply args:any-defined? homehost-required) (if (not (common:on-homehost?)) (for-each (lambda (switch) (if (args:get-arg switch) (begin (debug:print 0 *default-log-port* "ERROR: you must be on the homehost to run with " switch ", you can move homehost by removing the .homehost file but this will disrupt any runs in progress.") |
︙ | ︙ |
Modified mtargs/mtargs.scm from [73a7b43ccf] to [e2f1c247b7].
︙ | ︙ | |||
38 39 40 41 42 43 44 45 46 47 48 49 50 51 | (if (null? default) (hash-table-ref/default arg-hash arg #f) (hash-table-ref/default arg-hash arg (car default)))) (define (any-defined? . args) (not (null? (filter (lambda (x) x) (map get-arg args))))) (define (get-arg-from ht arg . default) (if (null? default) (hash-table-ref/default ht arg #f) (hash-table-ref/default ht arg (car default)))) (define (usage . args) | > > | 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 | (if (null? default) (hash-table-ref/default arg-hash arg #f) (hash-table-ref/default arg-hash arg (car default)))) (define (any-defined? . args) (not (null? (filter (lambda (x) x) (map get-arg args))))) ;; (define any any-defined?) (define (get-arg-from ht arg . default) (if (null? default) (hash-table-ref/default ht arg #f) (hash-table-ref/default ht arg (car default)))) (define (usage . args) |
︙ | ︙ |
Modified rmtmod.scm from [a90d5e229b] to [0c73ebad14].
︙ | ︙ | |||
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | ;; ;; You should have received a copy of the GNU General Public License ;; along with Megatest. If not, see <http://www.gnu.org/licenses/>. ;;====================================================================== (declare (unit rmtmod)) ;; (declare (uses commonmod)) ;; (declare (uses dbmod)) ;; (declare (uses megamod)) (module rmtmod * (import scheme chicken data-structures extras) (import (prefix sqlite3 sqlite3:) posix typed-records srfi-18 srfi-69 format ports srfi-1 matchable) ;; (import commonmod) ;;; DO NOT ALLOW rmt*scm TO DEPEND ON common*scm!!!! ;; (import dbmod) ;; (import megamod) | > | | 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | ;; ;; You should have received a copy of the GNU General Public License ;; along with Megatest. If not, see <http://www.gnu.org/licenses/>. ;;====================================================================== (declare (unit rmtmod)) (declare (uses ulex)) ;; (declare (uses commonmod)) ;; (declare (uses dbmod)) ;; (declare (uses megamod)) (module rmtmod * (import scheme chicken data-structures extras) (import (prefix sqlite3 sqlite3:) posix typed-records srfi-18 srfi-69 format ports srfi-1 matchable) ;; (import commonmod) ;;; DO NOT ALLOW rmt*scm TO DEPEND ON common*scm!!!! ;; (import dbmod) ;; (import megamod) (import (prefix ulex ulex:)) ;; (include "rmt-inc.scm") ;; (include "common_records.scm") ) |
Added ulex.scm version [419292ee51].
> > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | ;;====================================================================== ;; Copyright 2019, Matthew Welland. ;; ;; This file is part of Megatest. ;; ;; Megatest is free software: you can redistribute it and/or modify ;; it under the terms of the GNU General Public License as published by ;; the Free Software Foundation, either version 3 of the License, or ;; (at your option) any later version. ;; ;; Megatest is distributed in the hope that it will be useful, ;; but WITHOUT ANY WARRANTY; without even the implied warranty of ;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ;; GNU General Public License for more details. ;; ;; You should have received a copy of the GNU General Public License ;; along with Megatest. If not, see <http://www.gnu.org/licenses/>. ;;====================================================================== (declare (unit ulex)) (include "ulex/ulex.scm") |
Added ulex/Makefile version [2d6d448a34].
> > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | all : example ulex.so : ulex.scm telemetry/telemetry.so netutil/ulex-netutil.so portlogger/portlogger.so chicken-install telemetry/telemetry.so : telemetry/telemetry.scm cd telemetry && chicken-install example : ulex.so example.scm csc example.scm test : ulex.so csi -b tests/run.scm portlogger/portlogger.so : portlogger/portlogger.scm cd portlogger && chicken-install csi -s portlogger/test.scm netutil/ulex-netutil.so: netutil/ulex-netutil.scm cd netutil && chicken-install clean: rm -f example *so */*so *.import.* */*.import.* |
Added ulex/README version [cbd774a4f8].
> > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | ulex: thorny evergreen shrubs, also known as furze or gorse. Your application needs a database but the moment you build in dependence on Postgresql or Mysql your application becomes ponderous. Ulex is a mesh networked sqlite3 database, intended to provide a nimble alternative. It is designed so that nodes can come and go while talking to each other and sharing data via sqlite3 in a (mostly) ACID compliant way. Ulex mitigates issues with Sqlite3 on networked filesystems (e.g. NFS, Moosefs etc.) such as "lock storms" (*) and performance bottlenecks. (*) Network filesystems must provide posix locking across all files across all clients. This is handled by the file server by a queue. If the queue gets overwhelmed by lock requests locks may be dropped. This leads to concurrent access to shared files and in the case of sqlite3 the dreaded "database locked" message that won't go away (to fix you have to move the file and copy it back). It can also lead to a corrupted database. About the name Ulex: The Ulex plant, also known as Gorse, furze, or Whin, is considered an invasive pest in places like New Zealand but in spite of its thorny persistence it is considered quite useful in its native Europe, providing edible flowers, shelter and food for wildlife, hedges and fuel for cooking. |
Added ulex/TODO version [7ea22366f5].
> > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | * move cookie generation to start in (call ...) * add timestamp from perspective of caller to data passed around * add standard print for each event * move portlogger into module (retrieve from Brandon's work) * Create queues * Create text line logging interface * Add registration calls; write, read, transactional, osslow and osfast * Extend example to cover all aspects * * |
Added ulex/example-telemetry.scm version [c21fbe6c93].
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 | (use telemetry) ;; @startuml ;; Alice -> Bob: Authentication Request ;; Bob --> Alice: Authentication Response ;; Alice -> Bob: Another authentication Request ;; Alice <-- Bob: Another authentication Response ;; ;; ref over Bob ;; This can be on ;; several lines ;; end ref ;; ;; @enduml (define *actor-table* (make-hash-table)) (define *actor-next-id* 0) (define (get-actor-name key) (let* ((cast (string-split "Alice,Bob,Carol,Dave,Eve,Frank,Grace,Heidi,Ivan,Judy,Mallory,Nancy,Oscar,Peggy,Rupert,Sybil,Ted,Victor,Wendy" ",")) (cast-count (length cast)) (name (hash-table-ref/default *actor-table* key #f))) (cond (name name) (else (let* ((id *actor-next-id*) (name (list-ref cast (modulo id cast-count))) (seq (inexact->exact (floor (/ id cast-count)))) (newname (conc name (if (> seq 0) (conc "-" seq) "")))) (set! *actor-next-id* (add1 *actor-next-id*)) (hash-table-set! *actor-table* key newname) newname))))) (define (ulex-telemetry-server) (let* ((port 12345) (seq 0) (handler (lambda (msg) (set! seq (add1 seq)) (let* ((elapsed-ms (current-milliseconds)) (payload (alist-ref 'payload msg)) (action (if payload (alist-ref 'action payload))) (from-key (if payload (alist-ref 'from-key payload))) (to-key (if payload (alist-ref 'to-key payload))) ) (if (and action from-key to-key) (print (get-actor-name from-key) " -> " (get-actor-name to-key) ": "action)) ;;(pp msg) )))) (telemetry-server port handler))) (ulex-telemetry-server) |
Added ulex/example.scm version [5d474bbc0a].
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 | ;;; dbi: Minimal gasket to postgresql, sqlite3 and mysql ;;; ;; Copyright (C) 2007-2016 Matt Welland ;; Redistribution and use in source and binary forms, with or without ;; modification, is permitted. ;; ;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS ;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED ;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ;; ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE ;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR ;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT ;; OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR ;; BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF ;; LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT ;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE ;; USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH ;; DAMAGE. (use regex srfi-18 matchable) (load "ulex.scm") (import (prefix ulex ulex:)) (create-directory "ulexdb" #t) (create-directory "pkts" #f) (define *area* (ulex:make-area dbdir: (conc (current-directory) "/ulexdb") pktsdir: (conc (current-directory) "/pkts") )) (define (toplevel-command . args) #f) (use readline) ;; two reserved keys in the ulex registration hash table are: ;; dbinitsql => a list of sql statements to be executed at db creation time ;; dbinitfn => a function of two params; dbh, the sql-de-lite db handle and ;; dbfname, the database filename ;; (ulex:register-batch *area* 'dbwrite `((dbinitsql . ("CREATE TABLE IF NOT EXISTS messages (id INTEGER PRIMARY KEY, message TEXT, author TEXT, msg_time INTEGER);")) (savemsg . "INSERT INTO messages (message,author) VALUES (?,?)") )) (ulex:register-batch *area* 'dbread `((dbinitsql . ("CREATE TABLE IF NOT EXISTS messages (id INTEGER PRIMARY KEY, message TEXT, author TEXT, msg_time INTEGER);")) (getnum . "SELECT COUNT(*) FROM messages") (getsome . "SELECT * FROM messages LIMIT 10") )) (define (worker mode-in) (let* ((start (current-milliseconds)) (iters-per-sample 10) (mode (string->symbol mode-in)) (max-count (case mode ((all) 60) (else 1000))) (num-calls 0) (report (lambda () (let ((delta (- (current-milliseconds) start))) (print "Completed " num-calls " in " delta " for " (/ num-calls (/ delta 1000)) " calls per second"))))) (if (eq? mode 'repl) (begin (import extras) ;; might not be needed ;; (import csi) (import readline) (import apropos) (import (prefix ulex ulex:)) (install-history-file (get-environment-variable "HOME") ".example_history") ;; [homedir] [filename] [nlines]) (current-input-port (make-readline-port "example> ")) (repl)) (let loop ((count 0)) ;; (print "loop count=" count) (for-each (lambda (dbname) ;;(print "TOP OF LAMBDA") (case mode ((all) (let ((start-time (current-milliseconds)) (message (conc "Test message #" count "! From pid: " (current-process-id))) (user (current-user-name))) (ulex:call *area* dbname 'savemsg `(,message ,user)) (for-each (lambda (n) (print "have this many " (ulex:call *area* dbname 'getnum '()) " records in main.db")) (iota 10)) (set! num-calls (+ num-calls 11)) )) ((ping) (let ((srvrs (ulex:get-all-server-pkts *area*))) (for-each (lambda (srv) (print "Pinging " srv) (ulex:ping *area* srv)) srvrs))) ((fullping) (let ((srvrs (ulex:get-all-server-pkts *area*))) (for-each (lambda (srv) (let ((ipaddr (alist-ref 'ipaddr srv)) (port (any->number (alist-ref 'port srv)))) (print "Full Ping to " srv) (ulex:ping *area* ipaddr port))) srvrs))) ((passive) (thread-sleep! 10)) )) '("main.db")) ;; "test.db" "run-1.db" "run-2.db" "run-3.db" "run-4.db")) #;(thread-sleep! 0.001) #;(let ((now (current-milliseconds))) (if (and (> now start) (eq? (modulo count iters-per-sample) 0)) (begin (print "queries per second: "(* 1000.0 (/ iters-per-sample (- now start)))) (set! count 0) (set! start (current-milliseconds))))) ;; (print "count: " count " max-count: " max-count) (if (< count max-count) (loop (+ count 1))))) (report) (ulex:clear-server-pkt *area*) (thread-sleep! 5) ;; let others keep using this server (needs to be built in to ulex) ;; (print "Doing stuff") ;; (thread-sleep! 10) (print "Done doing stuff"))) (define (run-worker) (thread-start! (make-thread (lambda () (thread-sleep! 5) (worker "all")) "worker"))) (define (main . args) (if (member (car args) '("repl")) (print "NOTE: No exit timer started.") (thread-start! (make-thread (lambda () (thread-sleep! (* 60 5)) (ulex:clear-server-pkt *area*) (thread-sleep! 5) (exit 0))))) (print "Launching server") (ulex:launch *area*) (print "LAUNCHED.") (thread-sleep! 0.1) ;; chicken threads bit quirky? need little time for launch thread to get traction? (apply worker args) ) ;;====================================================================== ;; Strive for clean exit handling ;;====================================================================== ;; Ulex shutdown is handled within Ulex itself. #;(define (server-exit-procedure) (on-exit (lambda () ;; close the databases, ensure the pkt is removed! ;; (thread-sleep! 2) (ulex:shutdown *area*) 0))) ;; Copied from the SDL2 examples. ;; ;; Schedule quit! to be automatically called when your program exits normally. #;(on-exit server-exit-procedure) ;; Install a custom exception handler that will call quit! and then ;; call the original exception handler. This ensures that quit! will ;; be called even if an unhandled exception reaches the top level. #;(current-exception-handler (let ((original-handler (current-exception-handler))) (lambda (exception) (server-exit-procedure) (original-handler exception)))) (if (file-exists? ".examplerc") (load ".examplerc")) (let ((args-in (argv))) ;; command-line-arguments))) (let ((args (match args-in (("csi" "--" args ...) args) ((_ args ...) args) (else args-in)))) (if (null? args) (begin (print "Usage: example [mode]") (print " where mode is one of:") (print " ping : only do pings between servers") (print " fullping : ping with response via processing queue") (print " unix : only do unix commands") (print " read : only do ping, unix and db reads") (print " all : do pint, unix, and db reads and writes") (exit)) (apply main args)))) |
Added ulex/netutil/testit.scm version [c70a7686ef].
> > > > > > | 1 2 3 4 5 6 | (use ulex-netutil) (use test) (test #f #t (not (not (member "127.0.0.1" (get-all-ips))))) |
Added ulex/netutil/ulex-netutil.meta version [b9c81401c3].
> > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | ;; -*- scheme -*- ( ; Your egg's license: (license "BSD") ; Pick one from the list of categories (see below) for your egg and enter it ; here. (category db) (needs foreign ) ; A list of eggs required for TESTING ONLY. See the `Tests' section. (test-depends test) (author "Brandon Barclay") (synopsis "Get all IP addresses for all interfaces.")) |
Added ulex/netutil/ulex-netutil.release-info version [f8b73e2e54].
> > > | 1 2 3 | (repo fossil "http://www.kiatoa.com/cgi-bin/fossils/{egg-name}") (uri zip "http://www.kiatoa.com/cgi-bin/fossils/{egg-name}/zip/{egg-name}.zip?uuid={egg-release}") (release "0.1") |
Added ulex/netutil/ulex-netutil.scm version [326b1a9e82].
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 | ;;; ulex: Distributed sqlite3 db ;;; ;; Copyright (C) 2018 Matt Welland ;; Redistribution and use in source and binary forms, with or without ;; modification, is permitted. ;; ;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS ;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED ;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ;; ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE ;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR ;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT ;; OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR ;; BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF ;; LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT ;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE ;; USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH ;; DAMAGE. ;;====================================================================== ;; ABOUT: ;; See README in the distribution at https://www.kiatoa.com/fossils/ulex ;; NOTES: ;; provides all ipv4 addresses for all interfaces ;; ;;====================================================================== ;; get IP addresses from ALL interfaces (module ulex-netutil (get-all-ips get-my-best-address get-all-ips-sorted) (import scheme chicken data-structures foreign ports regex-case posix) ;; #include <stdio.h> ;; #include <netinet/in.h> ;; #include <string.h> ;; #include <arpa/inet.h> (foreign-declare "#include \"sys/types.h\"") (foreign-declare "#include \"sys/socket.h\"") (foreign-declare "#include \"ifaddrs.h\"") (foreign-declare "#include \"arpa/inet.h\"") ;; get IP addresses from ALL interfaces (define get-all-ips (foreign-safe-lambda* scheme-object () " // from https://stackoverflow.com/questions/17909401/linux-c-get-default-interfaces-ip-address : C_word lst = C_SCHEME_END_OF_LIST, len, str, *a; // struct ifaddrs *ifa, *i; // struct sockaddr *sa; struct ifaddrs * ifAddrStruct = NULL; struct ifaddrs * ifa = NULL; void * tmpAddrPtr = NULL; if ( getifaddrs(&ifAddrStruct) != 0) C_return(C_SCHEME_FALSE); // for (i = ifa; i != NULL; i = i->ifa_next) { for (ifa = ifAddrStruct; ifa != NULL; ifa = ifa->ifa_next) { if (ifa->ifa_addr->sa_family==AF_INET) { // Check it is // a valid IPv4 address tmpAddrPtr = &((struct sockaddr_in *)ifa->ifa_addr)->sin_addr; char addressBuffer[INET_ADDRSTRLEN]; inet_ntop(AF_INET, tmpAddrPtr, addressBuffer, INET_ADDRSTRLEN); // printf(\"%s IP Address %s\\n\", ifa->ifa_name, addressBuffer); len = strlen(addressBuffer); a = C_alloc(C_SIZEOF_PAIR + C_SIZEOF_STRING(len)); str = C_string(&a, len, addressBuffer); lst = C_a_pair(&a, str, lst); } // else if (ifa->ifa_addr->sa_family==AF_INET6) { // Check it is // // a valid IPv6 address // tmpAddrPtr = &((struct sockaddr_in6 *)ifa->ifa_addr)->sin6_addr; // char addressBuffer[INET6_ADDRSTRLEN]; // inet_ntop(AF_INET6, tmpAddrPtr, addressBuffer, INET6_ADDRSTRLEN); //// printf(\"%s IP Address %s\\n\", ifa->ifa_name, addressBuffer); // len = strlen(addressBuffer); // a = C_alloc(C_SIZEOF_PAIR + C_SIZEOF_STRING(len)); // str = C_string(&a, len, addressBuffer); // lst = C_a_pair(&a, str, lst); // } // else { // printf(\" not an IPv4 address\\n\"); // } } freeifaddrs(ifa); C_return(lst); ")) ;; Change this to bias for addresses with a reasonable broadcast value? ;; (define (ip-pref-less? a b) (let* ((rate (lambda (ipstr) (regex-case ipstr ( "^127\\." _ 0 ) ( "^(10\\.0|192\\.168\\.)\\..*" _ 1 ) ( else 2 ) )))) (< (rate a) (rate b)))) (define (get-my-best-address) (let ((all-my-addresses (get-all-ips)) ;;(all-my-addresses-old (vector->list (hostinfo-addresses (hostname->hostinfo (get-host-name))))) ) (cond ((null? all-my-addresses) (get-host-name)) ;; no interfaces? ((eq? (length all-my-addresses) 1) (car all-my-addresses)) ;; only one to choose from, just go with it (else (car (sort all-my-addresses ip-pref-less?))) ;; (else ;; (ip->string (car (filter (lambda (x) ;; take any but 127. ;; (not (eq? (u8vector-ref x 0) 127))) ;; all-my-addresses)))) ))) (define (get-all-ips-sorted) (sort (get-all-ips) ip-pref-less?)) ) |
Added ulex/netutil/ulex-netutil.setup version [9bb51f1edf].
> > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 | ;; Copyright 2007-2018, Matthew Welland. ;; ;; This program is made available under the GNU GPL version 2.0 or ;; greater. See the accompanying file COPYING for details. ;; ;; This program is distributed WITHOUT ANY WARRANTY; without even the ;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR ;; PURPOSE. ;;;; ulex.setup (standard-extension 'ulex-netutil "0.1") |
Added ulex/portlogger/portlogger.meta version [44ef60dd0b].
> > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | ;; -*- scheme -*- ( ; Your egg's license: (license "BSD") ; Pick one from the list of categories (see below) for your egg and enter it ; here. (category db) (needs foreign ) ; A list of eggs required for TESTING ONLY. See the `Tests' section. (test-depends test sqlite3 regex) (author "Matthew Welland") (synopsis "thoughtfully optain tcp port.")) |
Added ulex/portlogger/portlogger.release-info version [f8b73e2e54].
> > > | 1 2 3 | (repo fossil "http://www.kiatoa.com/cgi-bin/fossils/{egg-name}") (uri zip "http://www.kiatoa.com/cgi-bin/fossils/{egg-name}/zip/{egg-name}.zip?uuid={egg-release}") (release "0.1") |
Added ulex/portlogger/portlogger.scm version [d8f6d5639b].
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 | ;;====================================================================== ;; P O R T L O G G E R - track ports used on the current machine ;;====================================================================== ;; (module portlogger (pl-open-run-close pl-find-port pl-release-port pl-open-db pl-get-prev-used-port pl-get-port-state pl-take-port) (import scheme posix chicken data-structures ;ports extras ;files ;mailbox ;telemetry regex ;regex-case ) (use (prefix sqlite3 sqlite3:)) (use posix) (use regex) (define (pl-open-db fname) (let* ((avail #t) ;; for now - assume wait on journal not needed (tasks:wait-on-journal fname 5 remove: #t)) ;; wait up to about 10 seconds for the journal to go away (exists (file-exists? fname)) (db (if avail (sqlite3:open-database fname) (begin (system (conc "rm -f " fname)) (sqlite3:open-database fname)))) (handler (sqlite3:make-busy-timeout 136000)) (canwrite (file-write-access? fname))) (sqlite3:set-busy-handler! db handler) (sqlite3:execute db "PRAGMA synchronous = 0;") (sqlite3:execute db "CREATE TABLE IF NOT EXISTS ports ( port INTEGER PRIMARY KEY, state TEXT DEFAULT 'not-used', fail_count INTEGER DEFAULT 0, update_time TIMESTAMP DEFAULT (strftime('%s','now')) );") db)) (define (pl-open-run-close proc . params) (let* ((fname (conc "/tmp/." (current-user-name) "-portlogger.db")) (avail #t)) ;; (tasks:wait-on-journal fname 10))) ;; wait up to about 10 seconds for the journal to go away ;; (handle-exceptions ;; exn ;; (begin ;; ;; (release-dot-lock fname) ;; (debug:print-error 0 *default-log-port* "pl-open-run-close failed. " proc " " params) ;; (debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn)) ;; (debug:print 5 *default-log-port* "exn=" (condition->list exn)) ;; (if (file-exists? fname)(delete-file fname)) ;; brutally get rid of it ;; (print-call-chain (current-error-port))) (let* (;; (lock (obtain-dot-lock fname 2 9 10)) (db (pl-open-db fname)) (res (apply proc db params))) (sqlite3:finalize! db) ;; (release-dot-lock fname) res))) ;; ) ;; (fold-row PROC INIT DATABASE SQL . PARAMETERS) (define (pl-take-port db portnum) (let* ((qry1 "INSERT INTO ports (port,state) VALUES (?,?);") (qry2 "UPDATE ports SET state=?,update_time=strftime('%s','now') WHERE port=?;")) (let* ((curr (pl-get-port-state db portnum)) (res (case (string->symbol (or curr "n/a")) ((released) (sqlite3:execute db qry2 "taken" portnum) 'taken) ((not-tried n/a) (sqlite3:execute db qry1 portnum "taken") 'taken) ((taken) 'already-taken) ((failed) 'failed) (else 'error)))) ;; (print "res=" res) res))) (define (pl-get-prev-used-port db) ;; (handle-exceptions ;; exn ;; (with-output-to-port (current-error-port) ;; (lambda () ;; (print "EXCEPTION: portlogger database probably overloaded or unreadable. If you see this message again remove /tmp/.$USER-portlogger.db") ;; (print " message: " ((condition-property-accessor 'exn 'message) exn)) ;; (print "exn=" (condition->list exn)) ;; (print-call-chain) ;; (current-error-port)) ;; (print "Continuing anyway.") ;; #f)) (let ((res (sqlite3:fold-row (lambda (var curr) (or curr var curr)) #f db "SELECT port FROM ports WHERE state='released';"))) (if res res #f))) ;; ) (define (pl-find-port db acfg #!key (lowport 32768)) ;;(slite3:with-transaction ;; db ;; (lambda () (let loop ((numtries 0)) (let* ((portnum (or (pl-get-prev-used-port db) (+ lowport ;; top of registered ports is 49152 but let's use ports in the registered range (random (- 64000 lowport)))))) ;; (handle-exceptions ;; exn ;; (with-output-to-port (current-error-port) ;; (lambda () ;; (print "EXCEPTION: portlogger database probably overloaded or unreadable. If you see this message again remove /tmp/.$USER-portlogger.db") ;; (print " message: " ((condition-property-accessor 'exn 'message) exn)) ;; (print "exn=" (condition->list exn)) ;; (print-call-chain) ;; (print "Continuing anyway."))) (pl-take-port db portnum) ;; always "take the port" (if (pl-is-port-available portnum) portnum (begin (pl-set-port db portnum "taken") (loop (add1 numtries))))))) ;; set port to "released", "failed" etc. ;; (define (pl-set-port db portnum value) (sqlite3:execute db "UPDATE ports SET state=?,update_time=strftime('%s','now') WHERE port=?;") value portnum) ;; set port to "released", "failed" etc. ;; (define (pl-get-port-state db portnum) (let ((res (sqlite3:fold-row ;; get the state of given port or "not-tried" (lambda (var curr) ;; function on init/last current (or curr var curr)) #f ;; init db "SELECT state FROM ports WHERE port=?;" portnum))) ;; the parameter to the query (if res res #f))) ;; (slite3:exec (slite3:sql db "UPDATE ports SET state=?,update_time=strftime('%s','now') WHERE port=?;") value portnum)) ;; release port (define (pl-release-port db portnum) (sqlite3:execute db "UPDATE ports SET state=?,update_time=strftime('%s','now') WHERE port=?;" "released" portnum) (sqlite3:change-count db)) ;; set port to failed (attempted to take but got error) ;; (define (pl-set-failed db portnum) (sqlite3:execute db "UPDATE ports SET state='failed',fail_count=fail_count+1,update_time=strftime('%s','now') WHERE port=?;" portnum) (sqlite3:change-count db)) ;; pulled from mtut - TODO: remove from mtut, find a way *without* using netstat ;; (define (pl-is-port-available port-num) (let-values (((inp oup pid) (process "netstat" (list "-tulpn" )))) (let loop ((inl (read-line inp))) (if (not (eof-object? inl)) (begin (if (string-search (regexp (conc ":" port-num "\\s+")) inl) #f (loop (read-line inp)))) #t)))) ) ;; end module |
Added ulex/portlogger/portlogger.setup version [74cb64d178].
> > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 | ;; Copyright 2007-2018, Matthew Welland. ;; ;; This program is made available under the GNU GPL version 2.0 or ;; greater. See the accompanying file COPYING for details. ;; ;; This program is distributed WITHOUT ANY WARRANTY; without even the ;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR ;; PURPOSE. ;;;; portlogger.setup (standard-extension 'portlogger "0.1") |
Added ulex/portlogger/test.scm version [9297af53df].
> > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | (use portlogger) (use test) (test-begin "portlogger") (use (prefix sqlite3 sqlite3:)) (define *port* #f) (define *area* #f) (test #f #f (begin (pl-open-run-close (lambda (db b) (pl-get-prev-used-port db)) *area*) #f)) (test #f #f (pl-open-run-close (lambda (db b)(pl-get-port-state db 1234567)) *area*)) (test #f #f (number? (pl-open-run-close (lambda (db b)(pl-take-port db 123456)) *area*))) (test #f #t (number? (let ((port (pl-open-run-close pl-find-port *area*))) (set! *port* port) port))) (test #f 1 (pl-open-run-close pl-release-port *port*)) (test #f "released" (pl-open-run-close (lambda (db) (sqlite3:first-result db "select state from ports where port=?" *port*)))) (test-end "portlogger") |
Added ulex/run-parallel.sh version [e2d9733578].
> > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 | #!/bin/bash CMD=$1 make example for x in $(seq 1 10);do ./example $CMD 2>&1| tee run$x.log & done wait |
Added ulex/run-rpc.sh version [d48d551831].
> > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 | #!/bin/bash csc test-rpc.scm (./test-rpc 45000 45001 45002 45003 45004 45005|& tee test-rpc-45000.log) & (./test-rpc 45001 45000 45002 45003 45004 45005|& tee test-rpc-45001.log) & (./test-rpc 45002 45000 45001 45003 45004 45005|& tee test-rpc-45002.log) & (./test-rpc 45003 45000 45001 45002 45004 45005|& tee test-rpc-45003.log) & (./test-rpc 45004 45000 45001 45002 45003 45005|& tee test-rpc-45004.log) & (./test-rpc 45005 45000 45001 45002 45003 45004|& tee test-rpc-45005.log) & wait |
Added ulex/telemetry/telemetry-test-client.scm version [9f7f7588b5].
> > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 | (load "telemetry.scm") (import telemetry) (print 1) (telemetry-open "localhost" 12346) (print 2) (telemetry-send "goo") (print 3) (telemetry-send "goo2") (print 4) |
Added ulex/telemetry/telemetry-test-server.scm version [eaa57ff5ca].
> > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | (load "telemetry.scm") (import telemetry) (print "before") (use mailbox) (use mailbox-threads) (use srfi-18) (set! handler-seq 0) (define (handler msg) (set! handler-seq (add1 handler-seq)) (print "=============") (print handler-seq msg)) (telemetry-server 12346 handler) (print "after") |
Added ulex/telemetry/telemetry.meta version [6afdf842f1].
> > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | ;; -*- scheme -*- ( ; Your egg's license: (license "BSD") ; Pick one from the list of categories (see below) for your egg and enter it ; here. (category db) ; A list of eggs dbi depends on. If none, you can omit this declaration ; altogether. If you are making an egg for chicken 3 and you need to use ; procedures from the `files' unit, be sure to include the `files' egg in the ; `needs' section (chicken versions < 3.4.0 don't provide the `files' unit). ; `depends' is an alias to `needs'. (needs udp mailbox-threads z3 base64 ) ; A list of eggs required for TESTING ONLY. See the `Tests' section. (test-depends test) (author "Brandon Barclay") (synopsis "A telemetry send/receive system using udp.")) |
Added ulex/telemetry/telemetry.release-info version [f8b73e2e54].
> > > | 1 2 3 | (repo fossil "http://www.kiatoa.com/cgi-bin/fossils/{egg-name}") (uri zip "http://www.kiatoa.com/cgi-bin/fossils/{egg-name}/zip/{egg-name}.zip?uuid={egg-release}") (release "0.1") |
Added ulex/telemetry/telemetry.scm version [7663509699].
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 | (module telemetry (telemetry-open telemetry-send telemetry-close telemetry-server telemetry-show-debugs telemetry-hide-debugs ) (import chicken scheme data-structures base64 srfi-18 z3 udp posix extras ports mailbox mailbox-threads) (use udp) (use base64) (use z3) (use mailbox-threads) (define *telemetry:telemetry-log-state* 'startup) (define *telemetry:telemetry-log-socket* #f) (define *debug-print-flag* #f) (define (telemetry-show-debugs) (set! *debug-print-flag* #t)) (define (telemetry-hide-debugs) (set! *debug-print-flag* #f)) (define (debug-print . args) (if *debug-print-flag* (apply print "telemetry> " args))) (define (make-telemetry-server-thread port callback) (let* ((thr (make-thread (lambda () (let* ((s (udp-open-socket))) (udp-bind! s #f port) ;;(udp-connect! s "localhost" port) (let loop ((seq 0)) (debug-print "loop seq="seq) (receive (n data from-host from-port) (udp-recvfrom s 640000) (let* ((encapsulated-payload (with-input-from-string (z3:decode-buffer (base64-decode data)) read)) (callback-res `( (from-host . ,from-host) (from-port . ,from-port) (data-len . ,n) ,@encapsulated-payload ))) (callback callback-res)) ) (loop (add1 seq))) (udp-close-socket s)))))) (thread-start! thr) thr)) (define (telemetry-server port handler-callback) (let* ((serv-thread (make-telemetry-server-thread port handler-callback))) (print serv-thread) (thread-join! serv-thread))) (define (telemetry-open serverhost serverport) (let* ((user (or (get-environment-variable "USER") "unknown")) (host (or (get-environment-variable "HOST") "unknown"))) (set! *telemetry:telemetry-log-state* (handle-exceptions exn (begin (debug-print "telemetry-open udp port failure") 'broken) (if (and serverhost serverport user host) (let* ((s (udp-open-socket))) ;;(udp-bind! s #f 0) (udp-connect! s serverhost serverport) (set! *telemetry:telemetry-log-socket* s) 'open) 'not-needed))))) (define (telemetry-close) (when (or (member *telemetry:telemetry-log-state* '(broken-or-no-server-preclose open)) *telemetry:telemetry-log-socket*) (handle-exceptions exn (begin (define *telemetry:telemetry-log-state* 'closed-fail) (debug-print "telemetry-telemetry-log closure failure") ) (begin (define *telemetry:telemetry-log-state* 'closed) (udp-close-socket *telemetry:telemetry-log-socket*) (set! *telemetry:telemetry-log-socket* #f))))) (define (telemetry-send payload) (if (eq? 'open *telemetry:telemetry-log-state*) (handle-exceptions exn (begin (debug-print "telemetry-telemetry-log comms failure ; disabled (no server?)") (define *telemetry:telemetry-log-state* 'broken-or-no-server-preclose) (telemetry-close) (define *telemetry:telemetry-log-state* 'broken-or-no-server) (set! *telemetry:telemetry-log-socket* #f) ) (if (and *telemetry:telemetry-log-socket* payload) (let* ((user (or (get-environment-variable "USER") "unknown")) (host (or (get-environment-variable "HOST") "unknown")) (encapsulated-payload `( (user . ,user) (host . ,host) (pid . ,(current-process-id)) (payload . ,payload) ) ) (msg (base64-encode (z3:encode-buffer (with-output-to-string (lambda () (pp encapsulated-payload))))))) ;;(debug-print "pre-send") (let ((res (udp-send *telemetry:telemetry-log-socket* msg))) ;;(debug-print "post-send >"res"<") res) )))) ) ) |
Added ulex/telemetry/telemetry.setup version [547529f8eb].
> > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 | ;; Copyright 2007-2018, Matthew Welland. ;; ;; This program is made available under the GNU GPL version 2.0 or ;; greater. See the accompanying file COPYING for details. ;; ;; This program is distributed WITHOUT ANY WARRANTY; without even the ;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR ;; PURPOSE. ;;;; ulex.setup (standard-extension 'telemetry "0.1") |
Added ulex/test-rpc.scm version [307b9d1376].
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 | (use rpc posix srfi-1 pkts) (define *usage* "Usage: test-rpc") ;; myportnum [port-nums...]") #;(define *portnums* (let ((args (command-line-arguments))) (if (null? args) (begin (print *usage*) (exit)) (map string->number args)))) #;(if (not (null? (filter (lambda (x)(not x)) *portnums*))) (begin (print "ERROR: portnumbers must all be integers, you gave " *portnums*) (exit))) (define (find-free-port-and-open port) (handle-exceptions exn (begin (print "Failed to bind to port " (rpc:default-server-port) ", trying next port") (find-free-port-and-open (+ port 1))) (rpc:default-server-port port) (tcp-read-timeout 120000) (tcp-listen (rpc:default-server-port) ) port)) #;(define *myportnum* (car *portnums*)) #;(define *clients* (cdr *portnums*)) #;(print "Setting up server on " *myportnum*) ;; (rpc:default-server-port *myportnum*) (define *myportnum* (find-free-port-and-open 20000)) ;; *myportnum*)) ;;;; server.scm (rpc:publish-procedure! 'foo (lambda (x) (print "foo: " x) (conc "response from " *myportnum*))) (define *queue* (make-queue)) (rpc:publish-procedure! 'queue-add (lambda (dbfname qrystr raddr rport . params) (queue-add! *queue* (list dbfname qrystr raddr rport params)))) ;;;; client.scm (define (call n) (let ((port (list-ref *clients* (random (length *clients*))))) (print "calling to " port) ((rpc:procedure 'foo "localhost" port) n))) (rpc:publish-procedure! 'fini (lambda () (print "fini") (thread-start! (lambda () (thread-sleep! 3) (print "terminate") (exit))) #f)) (let* ((server-th (make-thread (lambda () ((rpc:make-server (tcp-listen *myportnum*)) #t)) ;; (rpc:default-server-port))) #t)) "server thread")) (timer-th (make-thread (lambda () (thread-sleep! 30) (print "Node " *myportnum* ", 30 seconds up. Exiting.") (exit))))) (thread-start! server-th) (thread-start! timer-th) (thread-sleep! 1) (do ((i 10000 (sub1 i))) ((zero? i)) (print "-> " (call (random 100))))) |
Added ulex/test-script.scm version [105d29a6a9].
> > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 | (import trace (prefix ulex ulex:)) (trace-call-sites #t) ;; (trace ulex:receive-message ulex:std-peer-handler ulex:process-db-queries ulex:work-queue-add ulex:call send-message ulex:get-best-server ulex:ping) (set! *default-error-port* (current-output-port)) (ulex:call *area* "test.db" 'savemsg '("my message" "matt")) (define *servers* (ulex:get-all-server-pkts *area*)) (define numofrecords (ulex:call *area* "test.db" 'getnum '())) ;; (define bunchofrecords (ulex:call *area* "test.db" 'getsome '())) |
Added ulex/tests/faux-mt-callspec.scm version [c85e86d3ff].
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | (use test (prefix sqlite3 sqlite3:) posix) (if (file-exists? "ulex.scm") (load "ulex.scm") (load "../ulex.scm")) (use trace) (trace-call-sites #t) (import ulex ) ;; (import (prefix ulex ulex:)) (test-begin "faux-mtdb") ;; pre-clean (for-each (lambda (dir) (if (directory-exists? dir) (system (conc "/bin/rm -rf ./"dir))) (system (conc "/bin/mkdir -p ./"dir)) ) '("faux-mtdb" "faux-mtdb-pkts")) (let* ((area (make-area dbdir: "faux-mtdb" pktsdir: "faux-mtdb-pkts")) (specfile "tests/mt-spec.sexpr") (dbname "faux-mt.db")) (launch area) (initialize-area-calls-from-specfile area specfile) (let* ((target-name "a/b/c/d") (insert-result (call area dbname 'new-target (list target-name))) (test-target-id (caar (call area dbname 'target-name->target-id (list target-name)))) (test-target-name (caar (call area dbname 'target-id->target-name (list 1))))) (test #f #t insert-result) (test #f 1 test-target-id ) (test #f target-name test-target-name ) ) (test #f #t (shutdown area))) ;; thought experiment - read cursors ;; (let* ((cursor (call area dbname 'get-target-names '()))) ;; (let loop ((row (cursor))) ;; (cond ;; ((not row) #t) ;; (else ;; (print "ROW IS "row) ;; (loop (cursor)))))) (test-end "faux-mtdb") |
Added ulex/tests/mt-spec.sexpr version [05a0658d78].
> > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | ( (dbwrite . ( (dbinitsql . ( "create table if not exists targets(id integer primary key,name)" "create table if not exists runs(id integer primary key,target_id,name,path,state,status)" "create table if not exists tests(id integer primary key,run_id,name,path,state,status,host)" "create table if not exists test_steps(id integer primary key,test_id,name,state)" )) ( new-target . "insert into targets (name) values(?);") ( new-run . "insert into runs (target_id,name,path,state,status) values(?,?,\"/dev/null\",\"NOT STARTED\",\"n/a\")") ( new-test . "insert into tests values(?,?,?,\"/dev/null\",\"NOT STARTED\")") ( update-one-run_id-state-status . "update runs set state=? status=? where id=?" ) ( update-one-test_id-state-status . "update tests set state=? status=? where id=?" ) ( update-matching-tests-state-status . "update tests set state=? status=? where run_id=?, state like ?, status like ?") ) ) (dbread . ( (get-targets . "select id,name from targets") (target-name->target-id . "select id from targets where name=?") (target-id->target-name . "select name from targets where id=?") (check-test-state-status . "select state,status from tests where id=?") ) ) ) |
Added ulex/tests/run.scm version [57deaf9515].
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 | (use test (prefix sqlite3 sqlite3:) posix ulex-netutil rpc pkts mailbox) ;; (use (prefix ulex ulex:)) (if (file-exists? "ulex.scm") (load "ulex.scm") (load "../ulex.scm")) (use trace) (trace-call-sites #t) (import ulex) ;; (import (prefix ulex ulex:)) (trace ;; send-message ;; receive-message ;; std-peer-handler ;; work-queue-add ;; deliver-response ;; finalize-all-db-handles ;; area-dbhandles ;; save-dbh ;; process-db-queries ;; dbdat-dbh ;; get-best-server ;; calc-server-score ;; ping ;; full-ping ;; register-node ;; calc-server-score ;; update-known-servers ;; request ;; get-dbh ;; update-stats ;; process-db-queries ;; deliver-response ) (test-begin "misc") (test #f #t (string? (get-my-best-address))) (test-end "misc") ;;====================================================================== ;; Setup ;;====================================================================== (system "rm -rf testulexdb testpkts") (create-directory "testulexdb" #t) (create-directory "testpkts" #t) (define *area* (make-area dbdir: "testulexdb" pktsdir: "testpkts")) (define *port* #f) ;;====================================================================== ;; Ulex-db ;;====================================================================== (test-begin "ulex-db") (test #f #t (equal? (area-dbdir *area*) "testulexdb")) (test #f #t (thread? (thread-start! (make-thread (lambda ()(launch *area*)) "server")))) (thread-sleep! 1) (test #f 1 (update-known-servers *area*)) (test #f #t (list? (get-all-server-pkts *area*))) (test #f (area-myaddr *area*) (cadr (ping *area* (area-myaddr *area*)(area-port *area*)))) (let loop ((count 10)) (if (null? (get-all-server-pkts *area*)) (if (> count 0) (begin (thread-sleep! 1) (print "waiting for server pkts") (loop (- count 1)))))) (test #f #t (let ((spkts (get-all-server-pkts *area*))) (and (list spkts) (> (length spkts) 0)))) (test #f #t (begin (register-batch *area* 'dbwrite ;; this is the call type `((dbinitsql . ("CREATE TABLE IF NOT EXISTS messages (id INTEGER PRIMARY KEY, message TEXT, author TEXT, msg_time INTEGER);")) (savemsg . "INSERT INTO messages (message,author) VALUES (?,?)") ;; (readmsg . "SELECT * FROM messages WHERE author=?;") )) #t)) (test #f #t (calldat? (get-rentry *area* 'dbinitsql))) (define cdat1 (get-rentry *area* 'dbinitsql)) (test #f #t (list? (get-best-server *area* "test.db" 'savemsg))) (test #f #t (eq? 'dbwrite (calldat-ctype cdat1))) (test #f #t (list? (get-rsql *area* 'dbinitsql))) (test #f #t (dbdat? (open-db *area* "test.db"))) (test #f #t (dbdat? (let ((dbh (get-dbh *area* "test.db"))) (save-dbh *area* "test.db" dbh) dbh))) (test #f #t (dbdat? (let ((dbh (get-dbh *area* "test.db"))) dbh))) ;(test #f '(#t "db write submitted" #t) (call *area* "test.db" 'savemsg '("Test message!" "matt"))) (test #f #t (call *area* "test.db" 'savemsg '("Test message!" "matt"))) ;;(thread-sleep! 15);; server needs time to process the request (it is non-blocking) ;; (test #f #t (shutdown *area*)) ;; (test #f 0 (calc-server-score *area* "test.db" (area-pktid *area*))) (test #f #t (list? (get-best-server *area* "test.db" (area-pktid *area*)))) (define *best-server* (car (get-best-server *area* "test.db" (area-pktid *area*)))) (pp *best-server*) (define *server-pkt* (hash-table-ref/default (area-hosts *area*) (area-pktid *area*) #f)) (define *server-ip* (alist-ref 'ipaddr *server-pkt*)) (define *server-port* (any->number (alist-ref 'port *server-pkt*))) (test #f #t (list? (ping *area* *server-ip* *server-port*))) (test #f #t (process-db-queries *area* "test.db")) (test #f #f (process-db-queries *area* "junk.db")) ;; (test #f #t (cadr (full-ping *area* *server-pkt*))) (test-end "ulex-db") ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (test-begin "faux-mtdb") ;; pre-clean #;(for-each (lambda (dir) (if (directory-exists? dir) (system (conc "/bin/rm -rf ./"dir))) (system (conc "/bin/mkdir -p ./"dir)) ) '("faux-mtdb" "faux-mtdb-pkts")) (let* ((area *area*) ;; (make-area dbdir: "faux-mtdb" pktsdir: "faux-mtdb-pkts")) (specfile "tests/mt-spec.sexpr") (dbname "faux-mt.db")) ;; (launch area) (initialize-area-calls-from-specfile area specfile) (let* ((target-name "a/b/c/d") (insert-result (call area dbname 'new-target (list target-name))) (test-target-id (caar (call area dbname 'target-name->target-id (list target-name)))) (test-target-name (caar (call area dbname 'target-id->target-name (list 1))))) (test #f #t insert-result) (test #f 1 test-target-id ) (test #f target-name test-target-name ) ) (test #f #t (list? (get-best-server *area* "test.db" 'savemsg))) (thread-sleep! 5) (test #f #t (begin (shutdown area) #t))) (test #f #t (process-db-queries *area* "test.db")) (test #f #f (process-db-queries *area* "junk.db")) ;; thought experiment - read cursors ;; (let* ((cursor (call area dbname 'get-target-names '()))) ;; (let loop ((row (cursor))) ;; (cond ;; ((not row) #t) ;; (else ;; (print "ROW IS "row) ;; (loop (cursor)))))) (test-end "faux-mtdb") ;;====================================================================== ;; Portlogger tests ;;====================================================================== ;; (test-begin "portlogger") ;; ;; (test #f #f (begin (pl-open-run-close (lambda (db b)(pl-get-prev-used-port db)) *area*) #f)) ;; (test #f #f (pl-open-run-close (lambda (db b)(pl-get-port-state db 1234567)) *area*)) ;; (test #f #f (number? (pl-open-run-close (lambda (db b)(pl-take-port db 123456)) *area*))) ;; (test #f #t (number? (let ((port (pl-open-run-close pl-find-port *area*))) ;; (set! *port* port) ;; port))) ;; (test #f 1 (pl-open-run-close pl-release-port *port*)) ;; (test #f "released" (pl-open-run-close ;; (lambda (db) ;; (sqlite3:first-result db "select state from ports where port=?" *port*)))) ;; ;; (test-end "portlogger") |
Added ulex/ulex.meta version [ccfccf1ce0].
> > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | ;; -*- scheme -*- ( ; Your egg's license: (license "BSD") ; Pick one from the list of categories (see below) for your egg and enter it ; here. (category db) ; A list of eggs dbi depends on. If none, you can omit this declaration ; altogether. If you are making an egg for chicken 3 and you need to use ; procedures from the `files' unit, be sure to include the `files' egg in the ; `needs' section (chicken versions < 3.4.0 don't provide the `files' unit). ; `depends' is an alias to `needs'. (needs rpc pkts mailbox sqlite3) ; A list of eggs required for TESTING ONLY. See the `Tests' section. (test-depends test) (author "Matt Welland") (synopsis "A distributed mesh-like layer for sqlite3.")) |
Added ulex/ulex.release-info version [f8b73e2e54].
> > > | 1 2 3 | (repo fossil "http://www.kiatoa.com/cgi-bin/fossils/{egg-name}") (uri zip "http://www.kiatoa.com/cgi-bin/fossils/{egg-name}/zip/{egg-name}.zip?uuid={egg-release}") (release "0.1") |
Added ulex/ulex.scm version [ef093072a2].
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 | ;;; ulex: Distributed sqlite3 db ;;; ;; Copyright (C) 2018 Matt Welland ;; Redistribution and use in source and binary forms, with or without ;; modification, is permitted. ;; ;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS ;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED ;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ;; ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE ;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR ;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT ;; OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR ;; BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF ;; LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT ;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE ;; USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH ;; DAMAGE. ;;====================================================================== ;; ABOUT: ;; See README in the distribution at https://www.kiatoa.com/fossils/ulex ;; NOTES: ;; Why sql-de-lite and not say, dbi? - performance mostly, then simplicity. ;; ;;====================================================================== ;; (use rpc pkts mailbox sqlite3) (module ulex * ;; #;( ;; ;; areas ;; make-area ;; area->alist ;; ;; server ;; launch ;; update-known-servers ;; shutdown ;; get-best-server ;; ;; client side ;; call ;; ;; queries, procs and system commands (i.e. workers) ;; register ;; register-batch ;; ;; database - note: most database stuff need not be exposed, these calls may be removed from exports in future ;; open-db ;; ;; ports ;; pl-find-port ;; pl-get-prev-used-port ;; pl-open-db ;; pl-open-run-close ;; pl-release-port ;; pl-set-port ;; pl-take-port ;; pl-is-port-available ;; pl-get-port-state ;; ;; system ;; get-normalized-cpu-load ;; ) (import scheme posix chicken data-structures ports extras files mailbox) (import rpc srfi-18 pkts matchable regex typed-records srfi-69 srfi-1 srfi-4 regex-case (prefix sqlite3 sqlite3:) foreign tcp) ;; ulex-netutil) ;;====================================================================== ;; D E B U G H E L P E R S ;;====================================================================== (define (dbg> . args) (with-output-to-port (current-error-port) (lambda () (apply print "dbg> " args)))) (define (debug-pp . args) (if (get-environment-variable "ULEX_DEBUG") (with-output-to-port (current-error-port) (lambda () (apply pp args))))) (define *default-debug-port* (current-error-port)) (define (sdbg> fn stage-name stage-start stage-end start-time . message) (if (get-environment-variable "ULEX_DEBUG") (with-output-to-port *default-debug-port* (lambda () (apply print "ulex:" fn " " stage-name " took " (- (if stage-end stage-end (current-milliseconds)) stage-start) " ms. " (if start-time (conc "total time " (- (current-milliseconds) start-time) " ms.") "") message ))))) ;;====================================================================== ;; M A C R O S ;;====================================================================== ;; iup callbacks are not dumping the stack, this is a work-around ;; ;; Some of these routines use: ;; ;; http://www.cs.toronto.edu/~gfb/scheme/simple-macros.html ;; ;; Syntax for defining macros in a simple style similar to function definiton, ;; when there is a single pattern for the argument list and there are no keywords. ;; ;; (define-simple-syntax (name arg ...) body ...) ;; (define-syntax define-simple-syntax (syntax-rules () ((_ (name arg ...) body ...) (define-syntax name (syntax-rules () ((name arg ...) (begin body ...))))))) (define-simple-syntax (catch-and-dump proc procname) (handle-exceptions exn (begin (print-call-chain (current-error-port)) (with-output-to-port (current-error-port) (lambda () (print ((condition-property-accessor 'exn 'message) exn)) (print "Callback error in " procname) (print "Full condition info:\n" (condition->list exn))))) (proc))) ;;====================================================================== ;; R E C O R D S ;;====================================================================== ;; information about me as a server ;; (defstruct area ;; about this area (useportlogger #f) (lowport 32768) (server-type 'auto) ;; auto=create up to five servers/pkts, main=create pkts, passive=no pkt (unless there are no pkts at all) (conn #f) (port #f) (myaddr (get-my-best-address)) pktid ;; get pkt from hosts table if needed pktfile pktsdir dbdir (dbhandles (make-hash-table)) ;; fname => list-of-dbh, NOTE: Should really never need more than one? (mutex (make-mutex)) (rtable (make-hash-table)) ;; registration table of available actions (dbs (make-hash-table)) ;; filename => random number, used for choosing what dbs I serve ;; about other servers (hosts (make-hash-table)) ;; key => hostdat (hoststats (make-hash-table)) ;; key => alist of fname => ( qcount . qtime ) (reqs (make-hash-table)) ;; uri => queue ;; work queues (wqueues (make-hash-table)) ;; fname => qdat (stats (make-hash-table)) ;; fname => totalqueries (last-srvup (current-seconds)) ;; last time we updated the known servers (cookie2mbox (make-hash-table)) ;; map cookie for outstanding request to mailbox of awaiting call (ready #f) (health (make-hash-table)) ;; ipaddr:port => num failed pings since last good ping ) ;; host stats ;; (defstruct hostdat (pkt #f) (dbload (make-hash-table)) ;; "dbfile.db" => queries/min (hostload #f) ;; normalized load ( 5min load / numcpus ) ) ;; dbdat ;; (defstruct dbdat (dbh #f) (fname #f) (write-access #f) (sths (make-hash-table)) ;; hash mapping query strings to handles ) ;; qdat ;; (defstruct qdat (writeq (make-queue)) (readq (make-queue)) (rwq (make-queue)) (logq (make-queue)) ;; do we need a queue for logging? yes, if we use sqlite3 db for logging (osshort (make-queue)) (oslong (make-queue)) (misc (make-queue)) ;; used for things like ping-full ) ;; calldat ;; (defstruct calldat (ctype 'dbwrite) (obj #f) ;; this would normally be an SQL statement e.g. SELECT, INSERT etc. (rtime (current-milliseconds))) ;; make it a global? Well, it is local to area module (define *pktspec* `((server (hostname . h) (port . p) (pid . i) (ipaddr . a) ) (data (hostname . h) ;; sender hostname (port . p) ;; sender port (ipaddr . a) ;; sender ip (hostkey . k) ;; sending host key - store info at server under this key (servkey . s) ;; server key - this needs to match at server end or reject the msg (format . f) ;; sb=serialized-base64, t=text, sx=sexpr, j=json (data . d) ;; base64 encoded slln data ))) ;; work item ;; (defstruct witem (rhost #f) ;; return host (ripaddr #f) ;; return ipaddr (rport #f) ;; return port (servkey #f) ;; the packet representing the client of this workitem, used by final send-message (rdat #f) ;; the request - usually an sql query, type is rdat (action #f) ;; the action: immediate, dbwrite, dbread,oslong, osshort (cookie #f) ;; cookie id for response (data #f) ;; the data payload, i.e. parameters (result #f) ;; the result from processing the data (caller #f)) ;; the calling peer according to rpc itself (define (trim-pktid pktid) (if (string? pktid) (substring pktid 0 4) "nopkt")) (define (any->number num) (cond ((number? num) num) ((string? num) (string->number num)) (else num))) (use trace) (trace-call-sites #t) ;;====================================================================== ;; D A T A B A S E H A N D L I N G ;;====================================================================== ;; look in dbhandles for a db, return it, else return #f ;; (define (get-dbh acfg fname) (let ((dbh-lst (hash-table-ref/default (area-dbhandles acfg) fname '()))) (if (null? dbh-lst) (begin ;; (print "opening db for " fname) (open-db acfg fname)) ;; Note that the handles get put back in the queue in the save-dbh calls (let ((rem-lst (cdr dbh-lst))) ;; (print "re-using saved connection for " fname) (hash-table-set! (area-dbhandles acfg) fname rem-lst) (car dbh-lst))))) (define (save-dbh acfg fname dbdat) ;; (print "saving dbh for " fname) (hash-table-set! (area-dbhandles acfg) fname (cons dbdat (hash-table-ref/default (area-dbhandles acfg) fname '())))) ;; open the database, if never before opened init it. put the handle in the ;; open db's hash table ;; returns: the dbdat ;; (define (open-db acfg fname) (let* ((fullname (conc (area-dbdir acfg) "/" fname)) (exists (file-exists? fullname)) (write-access (if exists (file-write-access? fullname) (file-write-access? (area-dbdir acfg)))) (db (sqlite3:open-database fullname)) (handler (sqlite3:make-busy-timeout 136000)) ) (sqlite3:set-busy-handler! db handler) (sqlite3:execute db "PRAGMA synchronous = 0;") (if (not exists) ;; need to init the db (if write-access (let ((isql (get-rsql acfg 'dbinitsql))) ;; get the init sql statements ;; (sqlite3:with-transaction ;; db ;; (lambda () (if isql (for-each (lambda (sql) (sqlite3:execute db sql)) isql))) (print "ERROR: no write access to " (area-dbdir acfg)))) (make-dbdat dbh: db fname: fname write-access: write-access))) ;; This is a low-level command to retrieve or to prepare, save and return a prepared statment ;; you must extract the db handle ;; (define (get-sth db cache stmt) (if (hash-table-exists? cache stmt) (begin ;; (print "Reusing cached stmt for " stmt) (hash-table-ref/default cache stmt #f)) (let ((sth (sqlite3:prepare db stmt))) (hash-table-set! cache stmt sth) ;; (print "prepared stmt for " stmt) sth))) ;; a little more expensive but does all the tedious deferencing - only use if you don't already ;; have dbdat and db sitting around ;; (define (full-get-sth acfg fname stmt) (let* ((dbdat (get-dbh acfg fname)) (db (dbdat-dbh dbdat)) (sths (dbdat-sths dbdat))) (get-sth db sths stmt))) ;; write to a db ;; acfg: area data ;; rdat: request data ;; hdat: (host . port) ;; ;; (define (dbwrite acfg rdat hdat data-in) ;; (let* ((dbname (car data-in)) ;; (dbdat (get-dbh acfg dbname)) ;; (db (dbdat-dbh dbdat)) ;; (sths (dbdat-sths dbdat)) ;; (stmt (calldat-obj rdat)) ;; (sth (get-sth db sths stmt)) ;; (data (cdr data-in))) ;; (print "dbname: " dbname " acfg: " acfg " rdat: " (calldat->alist rdat) " hdat: " hdat " data: " data) ;; (print "dbdat: " (dbdat->alist dbdat)) ;; (apply sqlite3:execute sth data) ;; (save-dbh acfg dbname dbdat) ;; #t ;; )) (define (finalize-all-db-handles acfg) (let* ((dbhandles (area-dbhandles acfg)) ;; dbhandles is hash of fname ==> dbdat (num 0)) (for-each (lambda (area-name) (print "Closing handles for " area-name) (let ((dbdats (hash-table-ref/default dbhandles area-name '()))) (for-each (lambda (dbdat) ;; first close all statement handles (for-each (lambda (sth) (sqlite3:finalize! sth) (set! num (+ num 1))) (hash-table-values (dbdat-sths dbdat))) ;; now close the dbh (set! num (+ num 1)) (sqlite3:finalize! (dbdat-dbh dbdat))) dbdats))) (hash-table-keys dbhandles)) (print "FINALIZED " num " dbhandles"))) ;;====================================================================== ;; W O R K Q U E U E H A N D L I N G ;;====================================================================== (define (register-db-as-mine acfg dbname) (let ((ht (area-dbs acfg))) (if (not (hash-table-ref/default ht dbname #f)) (hash-table-set! ht dbname (random 10000))))) (define (work-queue-add acfg fname witem) (let* ((work-queue-start (current-milliseconds)) (action (witem-action witem)) ;; NB the action is the index into the rdat actions (qdat (or (hash-table-ref/default (area-wqueues acfg) fname #f) (let ((newqdat (make-qdat))) (hash-table-set! (area-wqueues acfg) fname newqdat) newqdat))) (rdat (hash-table-ref/default (area-rtable acfg) action #f))) (if rdat (queue-add! (case (calldat-ctype rdat) ((dbwrite) (register-db-as-mine acfg fname)(qdat-writeq qdat)) ((dbread) (register-db-as-mine acfg fname)(qdat-readq qdat)) ((dbrw) (register-db-as-mine acfg fname)(qdat-rwq qdat)) ((oslong) (qdat-oslong qdat)) ((osshort) (qdat-osshort qdat)) ((full-ping) (qdat-misc qdat)) (else (print "ERROR: no queue for " action ". Adding to dbwrite queue.") (qdat-writeq qdat))) witem) (case action ((full-ping)(qdat-misc qdat)) (else (print "ERROR: No action " action " was registered")))) (sdbg> "work-queue-add" "queue-add" work-queue-start #f #f) #t)) ;; for now, simply return #t to indicate request got to the queue (define (doqueue acfg q fname dbdat dbh) ;; (print "doqueue: " fname) (let* ((start-time (current-milliseconds)) (qlen (queue-length q))) (if (> qlen 1) (print "Processing queue of length " qlen)) (let loop ((count 0) (responses '())) (let ((delta (- (current-milliseconds) start-time))) (if (or (queue-empty? q) (> delta 400)) ;; stop working on this queue after 400ms have passed (list count delta responses) ;; return count, delta and responses list (let* ((witem (queue-remove! q)) (action (witem-action witem)) (rdat (witem-rdat witem)) (stmt (calldat-obj rdat)) (sth (full-get-sth acfg fname stmt)) (ctype (calldat-ctype rdat)) (data (witem-data witem)) (cookie (witem-cookie witem))) ;; do the processing and save the result in witem-result (witem-result-set! witem (case ctype ;; action ((noblockwrite) ;; blind write, no ack of success returned (apply sqlite3:execute sth data) (sqlite3:last-insert-rowid dbh)) ((dbwrite) ;; blocking write (apply sqlite3:execute sth data) #t) ((dbread) ;; TODO: consider breaking this up and shipping in pieces for large query (apply sqlite3:map-row (lambda x x) sth data)) ((full-ping) 'full-ping) (else (print "Not ready for action " action) #f))) (loop (add1 count) (if cookie (cons witem responses) responses)))))))) ;; do up to 400ms of processing on each queue ;; - the work-queue-processor will allow the max 1200ms of work to complete but it will flag as overloaded ;; (define (process-db-queries acfg fname) (if (hash-table-exists? (area-wqueues acfg) fname) (let* ((process-db-queries-start-time (current-milliseconds)) (qdat (hash-table-ref/default (area-wqueues acfg) fname #f)) (queue-sym->queue (lambda (queue-sym) (case queue-sym ;; lookup the queue from qdat given a name (symbol) ((wqueue) (qdat-writeq qdat)) ((rqueue) (qdat-readq qdat)) ((rwqueue) (qdat-rwq qdat)) ((misc) (qdat-misc qdat)) (else #f)))) (dbdat (get-dbh acfg fname)) (dbh (if (dbdat? dbdat)(dbdat-dbh dbdat) #f)) (nowtime (current-seconds))) ;; handle the queues that require a transaction ;; (map ;; (lambda (queue-sym) ;; (print "processing queue " queue-sym) (let* ((queue (queue-sym->queue queue-sym))) (if (not (queue-empty? queue)) (let ((responses (sqlite3:with-transaction ;; todo - catch exceptions... dbh (lambda () (let* ((res (doqueue acfg queue fname dbdat dbh))) ;; this does the work! ;; (print "res=" res) (match res ((count delta responses) (update-stats acfg fname queue-sym delta count) (sdbg> "process-db-queries" "sqlite3-transaction" process-db-queries-start-time #f #f) responses) ;; return responses (else (print "ERROR: bad return data from doqueue " res))) ))))) ;; having completed the transaction, send the responses. ;; (print "INFO: sending " (length responses) " responses.") (let loop ((responses-left responses)) (cond ((null? responses-left) #t) (else (let* ((witem (car responses-left)) (response (cdr responses-left))) (call-deliver-response acfg (witem-ripaddr witem)(witem-rport witem) (witem-cookie witem)(witem-result witem))) (loop (cdr responses-left)))))) ))) '(wqueue rwqueue rqueue)) ;; handle misc queue ;; ;; (print "processing misc queue") (let ((queue (queue-sym->queue 'misc))) (doqueue acfg queue fname dbdat dbh)) ;; .... (save-dbh acfg fname dbdat) #t ;; just to let the tests know we got here ) #f ;; nothing processed )) ;; run all queues in parallel per db but sequentially per queue for that db. ;; - process the queues every 500 or so ms ;; - allow for long running queries to continue but all other activities for that ;; db will be blocked. ;; (define (work-queue-processor acfg) (let* ((threads (make-hash-table))) ;; fname => thread (let loop ((fnames (hash-table-keys (area-wqueues acfg))) (target-time (+ (current-milliseconds) 50))) ;;(if (not (null? fnames))(print "Processing for these databases: " fnames)) (for-each (lambda (fname) ;; (print "processing for " fname) ;;(process-db-queries acfg fname)) (let ((th (hash-table-ref/default threads fname #f))) (if (and th (not (member (thread-state th) '(dead terminated)))) (begin (print "WARNING: worker thread for " fname " is taking a long time.") (print "Thread is in state " (thread-state th))) (let ((th1 (make-thread (lambda () ;; (catch-and-dump ;; (lambda () ;; (print "Process queries for " fname) (let ((start-time (current-milliseconds))) (process-db-queries acfg fname) ;; (thread-sleep! 0.01) ;; need the thread to take at least some time (hash-table-delete! threads fname)) ;; no mutexes? fname) "th1"))) ;; )) (hash-table-set! threads fname th1) (thread-start! th1))))) fnames) ;; (thread-sleep! 0.1) ;; give the threads some time to process requests ;; burn time until 400ms is up (let ((now-time (current-milliseconds))) (if (< now-time target-time) (let ((delta (- target-time now-time))) (thread-sleep! (/ delta 1000))))) (loop (hash-table-keys (area-wqueues acfg)) (+ (current-milliseconds) 50))))) ;;====================================================================== ;; S T A T S G A T H E R I N G ;;====================================================================== (defstruct stat (qcount-avg 0) ;; coarse running average (qtime-avg 0) ;; coarse running average (qcount 0) ;; total (qtime 0) ;; total (last-qcount 0) ;; last (last-qtime 0) ;; last (dbs '()) ;; list of db files handled by this node (when 0)) ;; when the last query happened - seconds (define (update-stats acfg fname bucket duration numqueries) (let* ((key fname) ;; for now do not use bucket. Was: (conc fname "-" bucket)) ;; lazy but good enough (stats (or (hash-table-ref/default (area-stats acfg) key #f) (let ((newstats (make-stat))) (hash-table-set! (area-stats acfg) key newstats) newstats)))) ;; when the last query happended (used to remove the fname from the active list) (stat-when-set! stats (current-seconds)) ;; last values (stat-last-qcount-set! stats numqueries) (stat-last-qtime-set! stats duration) ;; total over process lifetime (stat-qcount-set! stats (+ (stat-qcount stats) numqueries)) (stat-qtime-set! stats (+ (stat-qtime stats) duration)) ;; coarse average (stat-qcount-avg-set! stats (/ (+ (stat-qcount-avg stats) numqueries) 2)) (stat-qtime-avg-set! stats (/ (+ (stat-qtime-avg stats) duration) 2)) ;; here is where we add the stats for a given dbfile (if (not (member fname (stat-dbs stats))) (stat-dbs-set! stats (cons fname (stat-dbs stats)))) )) ;;====================================================================== ;; S E R V E R S T U F F ;;====================================================================== ;; this does NOT return! ;; (define (find-free-port-and-open acfg) (let ((port (or (area-port acfg) 3200))) (handle-exceptions exn (begin (print "INFO: cannot bind to port " (rpc:default-server-port) ", trying next port") (area-port-set! acfg (+ port 1)) (find-free-port-and-open acfg)) (rpc:default-server-port port) (area-port-set! acfg port) (tcp-read-timeout 120000) ;; ((rpc:make-server (tcp-listen port)) #t) (tcp-listen (rpc:default-server-port) )))) ;; register this node by putting a packet into the pkts dir. ;; look for other servers ;; contact other servers and compile list of servers ;; there are two types of server ;; main servers - dashboards, runners and dedicated servers - need pkt ;; passive servers - test executers, step calls, list-runs - no pkt ;; (define (register-node acfg hostip port-num) ;;(mutex-lock! (area-mutex acfg)) (let* ((server-type (area-server-type acfg)) ;; auto, main, passive (no pkt created) (best-ip (or hostip (get-my-best-address))) (mtdir (area-dbdir acfg)) (pktdir (area-pktsdir acfg))) ;; conc mtdir "/.server-pkts"))) (print "Registering node " best-ip ":" port-num) (if (not mtdir) ;; require a home for this node to put or find databases #f (begin (if (not (directory? pktdir))(create-directory pktdir)) ;; server is started, now create pkt if needed (print "Starting server in " server-type " mode with port " port-num) (if (member server-type '(auto main)) ;; TODO: if auto, count number of servers registers, if > 3 then don't put out a pkt (begin (area-pktid-set! acfg (write-alist->pkt pktdir `((hostname . ,(get-host-name)) (ipaddr . ,best-ip) (port . ,port-num) (pid . ,(current-process-id))) pktspec: *pktspec* ptype: 'server)) (area-pktfile-set! acfg (conc pktdir "/" (area-pktid acfg) ".pkt")))) (area-port-set! acfg port-num) #;(mutex-unlock! (area-mutex acfg)))))) (define *cookie-seqnum* 0) (define (make-cookie key) (set! *cookie-seqnum* (add1 *cookie-seqnum*)) ;;(print "MAKE COOKIE CALLED -- on "servkey"-"*cookie-seqnum*) (conc key "-" *cookie-seqnum*) ) ;; dispatch locally if possible ;; (define (call-deliver-response acfg ipaddr port cookie data) (if (and (equal? (area-myaddr acfg) ipaddr) (equal? (area-port acfg) port)) (deliver-response acfg cookie data) ((rpc:procedure 'response ipaddr port) cookie data))) (define (deliver-response acfg cookie data) (let ((deliver-response-start (current-milliseconds))) (thread-start! (make-thread (lambda () (let loop ((tries-left 5)) ;;(print "TOP OF DELIVER_RESPONSE LOOP; triesleft="tries-left) ;;(pp (hash-table->alist (area-cookie2mbox acfg))) (let* ((mbox (hash-table-ref/default (area-cookie2mbox acfg) cookie #f))) (cond ((eq? 0 tries-left) (print "ulex:deliver-response: I give up. Mailbox never appeared. cookie="cookie) ) (mbox ;;(print "got mbox="mbox" got data="data" send.") (mailbox-send! mbox data)) (else ;;(print "no mbox yet. look for "cookie) (thread-sleep! (/ (- 6 tries-left) 10)) (loop (sub1 tries-left)))))) ;; (debug-pp (list (conc "ulex:deliver-response took " (- (current-milliseconds) deliver-response-start) " ms, cookie=" cookie " data=") data)) (sdbg> "deliver-response" "mailbox-send" deliver-response-start #f #f cookie) ) (conc "deliver-response thread for cookie="cookie)))) #t) ;; action: ;; immediate - quick actions, no need to put in queues ;; dbwrite - put in dbwrite queue ;; dbread - put in dbread queue ;; oslong - os actions, e.g. du, that could take a long time ;; osshort - os actions that should be quick, e.g. df ;; (define (request acfg from-ipaddr from-port servkey action cookie fname params) ;; std-peer-handler ;; NOTE: Use rpc:current-peer for getting return address (let* ((std-peer-handler-start (current-milliseconds)) ;; (raw-data (alist-ref 'data dat)) (rdat (hash-table-ref/default (area-rtable acfg) action #f)) ;; this looks up the sql query or other details indexed by the action (witem (make-witem ripaddr: from-ipaddr ;; rhost: from-host rport: from-port action: action rdat: rdat cookie: cookie servkey: servkey data: params ;; TODO - rename data to params caller: (rpc:current-peer)))) (if (not (equal? servkey (area-pktid acfg))) `(#f . ,(conc "I don't know you servkey=" servkey ", pktid=" (area-pktid acfg))) ;; immediately return this (let* ((ctype (if rdat (calldat-ctype rdat) ;; is this necessary? these should be identical action))) (sdbg> "std-peer-handler" "immediate" std-peer-handler-start #f #f) (case ctype ;; (dbwrite acfg rdat (cons from-ipaddr from-port) data))) ((full-ping) `(#t "ack to full ping" ,(work-queue-add acfg fname witem) ,cookie)) ((response) `(#t "ack from requestor" ,(deliver-response acfg fname params))) ((dbwrite) `(#t "db write submitted" ,(work-queue-add acfg fname witem) ,cookie)) ((dbread) `(#t "db read submitted" ,(work-queue-add acfg fname witem) ,cookie )) ((dbrw) `(#t "db read/write submitted" ,cookie)) ((osshort) `(#t "os short submitted" ,cookie)) ((oslong) `(#t "os long submitted" ,cookie)) (else `(#f "unrecognised action" ,ctype))))))) ;; Call this to start the actual server ;; ;; start_server ;; ;; mode: ' ;; handler: proc which takes pktrecieved as argument ;; (define (start-server acfg) (let* ((conn (find-free-port-and-open acfg)) (port (area-port acfg))) (rpc:publish-procedure! 'delist-db (lambda (fname) (hash-table-delete! (area-dbs acfg) fname))) (rpc:publish-procedure! 'calling-addr (lambda () (rpc:current-peer))) (rpc:publish-procedure! 'ping (lambda ()(real-ping acfg))) (rpc:publish-procedure! 'request (lambda (from-addr from-port servkey action cookie dbname params) (request acfg from-addr from-port servkey action cookie dbname params))) (rpc:publish-procedure! 'response (lambda (cookie res-dat) (deliver-response acfg cookie res-dat))) (area-ready-set! acfg #t) (area-conn-set! acfg conn) ((rpc:make-server conn) #f)));; ((tcp-listen (rpc:default-server-port)) #t) (define (launch acfg) ;; #!optional (proc std-peer-handler)) (print "starting launch") (update-known-servers acfg) ;; gotta do this on every start (thus why limit number of publicised servers) #;(let ((original-handler (current-exception-handler))) ;; is th (lambda (exception) (server-exit-procedure) (original-handler exception))) (on-exit (lambda () (shutdown acfg))) ;; (finalize-all-db-handles acfg))) ;; set up the rpc handler (let* ((th1 (make-thread (lambda ()(start-server acfg)) "server thread")) (th2 (make-thread (lambda () (print "th2 starting") (let loop () (work-queue-processor acfg) (print "work-queue-processor crashed!") (loop))) "work queue thread"))) (thread-start! th1) (thread-start! th2) (let loop () (thread-sleep! 0.025) (if (area-ready acfg) #t (loop))) ;; attempt to fix my address (let* ((all-addr (get-all-ips-sorted))) ;; could use (tcp-addresses conn)? (let loop ((rem-addrs all-addr)) (if (null? rem-addrs) (begin (print "ERROR: Failed to figure out the ip address of myself as a server. Giving up.") (exit 1)) ;; BUG Changeme to raising an exception (let* ((addr (car rem-addrs)) (good-addr (handle-exceptions exn #f ((rpc:procedure 'calling-addr addr (area-port acfg)))))) (if good-addr (begin (print "Got good-addr of " good-addr) (area-myaddr-set! acfg good-addr)) (loop (cdr rem-addrs))))))) (register-node acfg (area-myaddr acfg)(area-port acfg)) (print "INFO: Server started on " (area-myaddr acfg) ":" (area-port acfg)) ;; (update-known-servers acfg) ;; gotta do this on every start (thus why limit number of publicised servers) )) (define (clear-server-pkt acfg) (let ((pktf (area-pktfile acfg))) (if pktf (delete-file* pktf)))) (define (shutdown acfg) (let (;;(conn (area-conn acfg)) (pktf (area-pktfile acfg)) (port (area-port acfg))) (if pktf (delete-file* pktf)) (send-all "imshuttingdown") ;; (rpc:close-all-connections!) ;; don't know if this is actually needed (finalize-all-db-handles acfg))) (define (send-all msg) #f) ;; given a area record look up all the packets ;; (define (get-all-server-pkts acfg) (let ((all-pkt-files (glob (conc (area-pktsdir acfg) "/*.pkt")))) (map (lambda (pkt-file) (read-pkt->alist pkt-file pktspec: *pktspec*)) all-pkt-files))) #;((Z . "9a0212302295a19610d5796fce0370fa130758e9") (port . "34827") (pid . "28748") (hostname . "zeus") (T . "server") (D . "1549427032.0")) #;(define (get-my-best-address) (let ((all-my-addresses (get-all-ips))) ;; (vector->list (hostinfo-addresses (hostname->hostinfo (get-host-name)))))) (cond ((null? all-my-addresses) (get-host-name)) ;; no interfaces? ((eq? (length all-my-addresses) 1) (ip->string (car all-my-addresses))) ;; only one to choose from, just go with it (else (ip->string (car (filter (lambda (x) ;; take any but 127. (not (eq? (u8vector-ref x 0) 127))) all-my-addresses))))))) ;; whoami? I am my pkt ;; (define (whoami? acfg) (hash-table-ref/default (area-hosts acfg)(area-pktid acfg) #f)) ;;====================================================================== ;; "Client side" operations ;;====================================================================== (define (safe-call call-key host port . params) (handle-exceptions exn (begin (print "Call " call-key " to " host ":" port " failed") #f) (apply (rpc:procedure call-key host port) params))) ;; ;; convert to/from string / sexpr ;; ;; (define (string->sexpr str) ;; (if (string? str) ;; (with-input-from-string str read) ;; str)) ;; ;; (define (sexpr->string s) ;; (with-output-to-string (lambda ()(write s)))) ;; is the server alive? ;; (define (ping acfg host port) (let* ((myaddr (area-myaddr acfg)) (myport (area-port acfg)) (start-time (current-milliseconds)) (res (if (and (equal? myaddr host) (equal? myport port)) (real-ping acfg) ((rpc:procedure 'ping host port))))) (cons (- (current-milliseconds) start-time) res))) ;; returns ( ipaddr port alist-fname=>randnum ) (define (real-ping acfg) `(,(area-myaddr acfg) ,(area-port acfg) ,(get-host-stats acfg))) ;; is the server alive AND the queues processing? ;; #;(define (full-ping acfg servpkt) (let* ((start-time (current-milliseconds)) (res (send-message acfg servpkt '(full-ping) 'full-ping))) (cons (- (current-milliseconds) start-time) res))) ;; (equal? res "got ping")))) ;; look up all pkts and get the server id (the hash), port, host/ip ;; store this info in acfg ;; return the number of responsive servers found ;; ;; DO NOT VERIFY THAT THE SERVER IS ALIVE HERE. This is called at times where the current server is not yet alive and cannot ping itself ;; (define (update-known-servers acfg) ;; readll all pkts ;; foreach pkt; if it isn't me ping the server; if alive, add to hosts hash, else rm the pkt (let* ((start-time (current-milliseconds)) (all-pkts (delete-duplicates (append (get-all-server-pkts acfg) (hash-table-values (area-hosts acfg))))) (hostshash (area-hosts acfg)) (my-id (area-pktid acfg)) (pktsdir (area-pktsdir acfg)) ;; needed to remove pkts from non-responsive servers (numsrvs 0) (delpkt (lambda (pktsdir sid) (print "clearing out server " sid) (delete-file* (conc pktsdir "/" sid ".pkt")) (hash-table-delete! hostshash sid)))) (area-last-srvup-set! acfg (current-seconds)) (for-each (lambda (servpkt) (if (list? servpkt) ;; (pp servpkt) (let* ((shost (alist-ref 'ipaddr servpkt)) (sport (any->number (alist-ref 'port servpkt))) (res (handle-exceptions exn (begin ;; (print "INFO: bad server on " shost ":" sport) #f) (ping acfg shost sport))) (sid (alist-ref 'Z servpkt)) ;; Z code is our name for the server (url (conc shost ":" sport)) ) #;(if (or (not res) (null? res)) (begin (print "STRANGE: ping of " url " gave " res))) ;; (print "Got " res " from " shost ":" sport) (match res ((qduration . payload) ;; (print "Server pkt:" (alist-ref 'ipaddr servpkt) ":" (alist-ref 'port servpkt) ;; (if payload ;; "Success" "Fail")) (match payload ((host port stats) ;; (print "From " host ":" port " got stats: " stats) (if (and host port stats) (let ((url (conc host ":" port))) (hash-table-set! hostshash sid servpkt) ;; store based on host:port (hash-table-set! (area-hoststats acfg) sid stats)) (print "missing data from the server, not sure what that means!")) (set! numsrvs (+ numsrvs 1))) (#f (print "Removing pkt " sid " due to #f from server or failed ping") (delpkt pktsdir sid)) (else (print "Got ")(pp res)(print " from server ")(pp servpkt) " but response did not match (#f/#t . msg)"))) (else ;; here we delete the pkt - can't reach the server, remove it ;; however this logic is inadequate. we should mark the server as checked ;; and not good, if it happens a second time - then remove the pkt ;; or something similar. I.e. don't be too quick to assume the server is wedged or dead ;; could be it is simply too busy to reply (let ((bad-pings (hash-table-ref/default (area-health acfg) url 0))) (if (> bad-pings 1) ;; two bad pings - remove pkt (begin (print "INFO: " bad-pings " bad responses from " url ", deleting pkt " sid) (delpkt pktsdir sid)) (begin (print "INFO: " bad-pings " bad responses from " shost ":" sport " not deleting pkt yet") (hash-table-set! (area-health acfg) url (+ (hash-table-ref/default (area-health acfg) url 0) 1)) )) )))) ;; servpkt is not actually a pkt? (begin (print "Bad pkt " servpkt)))) all-pkts) (sdbg> "update-known-servers" "end" start-time #f #f " found " numsrvs " servers, pkts: " (map (lambda (p) (alist-ref 'Z p)) all-pkts)) numsrvs)) (defstruct srvstat (numfiles 0) ;; number of db files handled by this server - subtract 1 for the db being currently looked at (randnum #f) ;; tie breaker number assigned to by the server itself - applies only to the db under consideration (pkt #f)) ;; the server pkt ;;(define (srv->srvstat srvpkt) ;; Get the server best for given dbname and key ;; ;; NOTE: key is not currently used. The key points to the kind of query, this may be useful for directing read-only queries. ;; (define (get-best-server acfg dbname key) (let* (;; (servers (hash-table-values (area-hosts acfg))) (servers (area-hosts acfg)) (skeys (sort (hash-table-keys servers) string>=?)) ;; a stable listing (start-time (current-milliseconds)) (srvstats (make-hash-table)) ;; srvid => srvstat (url (conc (area-myaddr acfg) ":" (area-port acfg)))) ;; (print "scores for " dbname ": " (map (lambda (k)(cons k (calc-server-score acfg dbname k))) skeys)) (if (null? skeys) (if (> (update-known-servers acfg) 0) (get-best-server acfg dbname key) ;; some risk of infinite loop here, TODO add try counter (begin (print "ERROR: no server found!") ;; since this process is also a server this should never happen #f)) (begin ;; (print "in get-best-server with skeys=" skeys) (if (> (- (current-seconds) (area-last-srvup acfg)) 10) (begin (update-known-servers acfg) (sdbg> "get-best-server" "update-known-servers" start-time #f #f))) ;; for each server look at the list of dbfiles, total number of dbs being handled ;; and the rand number, save the best host ;; also do a delist-db for each server dbfile not used (let* ((best-server #f) (servers-to-delist (make-hash-table))) (for-each (lambda (srvid) (let* ((server (hash-table-ref/default servers srvid #f)) (stats (hash-table-ref/default (area-hoststats acfg) srvid '(())))) ;; (print "stats: " stats) (if server (let* ((dbweights (car stats)) (srvload (length (filter (lambda (x)(not (equal? dbname (car x)))) dbweights))) (dbrec (alist-ref dbname dbweights equal?)) ;; get the pair with fname . randscore (randnum (if dbrec dbrec ;; (cdr dbrec) 0))) (hash-table-set! srvstats srvid (make-srvstat numfiles: srvload randnum: randnum pkt: server)))))) skeys) (let* ((sorted (sort (hash-table-values srvstats) (lambda (a b) (let ((numfiles-a (srvstat-numfiles a)) (numfiles-b (srvstat-numfiles b)) (randnum-a (srvstat-randnum a)) (randnum-b (srvstat-randnum b))) (if (< numfiles-a numfiles-b) ;; Note, I don't think adding an offset works here. Goal was only move file handling to a different server if it has 2 less #t (if (and (equal? numfiles-a numfiles-b) (< randnum-a randnum-b)) #t #f)))))) (best (if (null? sorted) (begin (print "ERROR: should never be null due to self as server.") #f) (srvstat-pkt (car sorted))))) #;(print "SERVER(" url "): " dbname ": " (map (lambda (srv) (let ((p (srvstat-pkt srv))) (conc (alist-ref 'ipaddr p) ":" (alist-ref 'port p) "(" (srvstat-numfiles srv)","(srvstat-randnum srv)")"))) sorted)) best)))))) ;; send out an "I'm about to exit notice to all known servers" ;; (define (death-imminent acfg) '()) ;;====================================================================== ;; U L E X - T H E I N T E R E S T I N G S T U F F ! ! ;;====================================================================== ;; register a handler ;; NOTES: ;; dbinitsql is reserved for a list of sql statements for initializing the db ;; dbinitfn is reserved for a db init function, if exists called after dbinitsql ;; (define (register acfg key obj #!optional (ctype 'dbwrite)) (let ((ht (area-rtable acfg))) (if (hash-table-exists? ht key) (print "WARNING: redefinition of entry " key)) (hash-table-set! ht key (make-calldat obj: obj ctype: ctype)))) ;; usage: register-batch acfg '((key1 . sql1) (key2 . sql2) ... ) ;; NB// obj is often an sql query ;; (define (register-batch acfg ctype data) (let ((ht (area-rtable acfg))) (map (lambda (dat) (hash-table-set! ht (car dat)(make-calldat obj: (cdr dat) ctype: ctype))) data))) (define (initialize-area-calls-from-specfile area specfile) (let* ((callspec (with-input-from-file specfile read ))) (for-each (lambda (group) (register-batch area (car group) (cdr group))) callspec))) ;; get-rentry ;; (define (get-rentry acfg key) (hash-table-ref/default (area-rtable acfg) key #f)) (define (get-rsql acfg key) (let ((cdat (get-rentry acfg key))) (if cdat (calldat-obj cdat) #f))) ;; blocking call: ;; client server ;; ------ ------ ;; call() ;; send-message() ;; nmsg-send() ;; nmsg-receive() ;; nmsg-respond(ack,cookie) ;; ack, cookie ;; mbox-thread-wait(cookie) ;; nmsg-send(client,cookie,result) ;; nmsg-respond(ack) ;; return result ;; ;; reserved action: ;; 'immediate ;; 'dbinitsql ;; (define (call acfg dbname action params #!optional (count 0)) (let* ((call-start-time (current-milliseconds)) (srv (get-best-server acfg dbname action)) (post-get-start-time (current-milliseconds)) (rdat (hash-table-ref/default (area-rtable acfg) action #f)) (myid (trim-pktid (area-pktid acfg))) (srvid (trim-pktid (alist-ref 'Z srv))) (cookie (make-cookie myid))) (sdbg> "call" "get-best-server" call-start-time #f call-start-time " from: " myid " to server: " srvid " for " dbname " action: " action " params: " params " rdat: " rdat) (print "INFO: call to " (alist-ref 'ipaddr srv) ":" (alist-ref 'port srv) " from " (area-myaddr acfg) ":" (area-port acfg) " for " dbname) (if (and srv rdat) ;; need both to dispatch a request (let* ((ripaddr (alist-ref 'ipaddr srv)) (rsrvid (alist-ref 'Z srv)) (rport (any->number (alist-ref 'port srv))) (res-full (if (and (equal? ripaddr (area-myaddr acfg)) (equal? rport (area-port acfg))) (request acfg ripaddr rport (area-pktid acfg) action cookie dbname params) (safe-call 'request ripaddr rport (area-myaddr acfg) (area-port acfg) #;(area-pktid acfg) rsrvid action cookie dbname params)))) ;; (print "res-full: " res-full) (match res-full ((response-ok response-msg rem ...) (let* ((send-message-time (current-milliseconds)) ;; (match res-full ;; ((response-ok response-msg) ;; (response-ok (car res-full)) ;; (response-msg (cadr res-full) ) ;; (res (take res-full 3))) ;; ctype == action, TODO: converge on one term <<=== what was this? BUG ;; (print "ulex:call: send-message took " (- send-message-time post-get-start-time) " ms params=" params) (sdbg> "call" "send-message" post-get-start-time #f call-start-time) (cond ((not response-ok) #f) ((member response-msg '("db read submitted" "db write submitted")) (let* ((cookie-id (cadddr res-full)) (mbox (make-mailbox)) (mbox-time (current-milliseconds))) (hash-table-set! (area-cookie2mbox acfg) cookie-id mbox) (let* ((mbox-timeout-secs 20) (mbox-timeout-result 'MBOX_TIMEOUT) (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result)) (mbox-receive-time (current-milliseconds))) (hash-table-delete! (area-cookie2mbox acfg) cookie-id) (sdbg> "call" "mailbox-receive" mbox-time #f call-start-time " from: " myid " to server: " srvid " for " dbname) ;; (print "ulex:call mailbox-receive took " (- mbox-receive-time mbox-time) "ms params=" params) res))) (else (print "Unhandled response \""response-msg"\"") #f)) ;; depending on what action (i.e. ctype) is we will block here waiting for ;; all the data (mechanism to be determined) ;; ;; if res is a "working on it" then wait ;; wait for result ;; mailbox thread wait on ;; if res is a "can't help you" then try a different server ;; if res is a "ack" (e.g. for one-shot requests) then return res )) (else (if (< count 10) (let* ((url (conc (alist-ref 'ipaddr srv) ":" (alist-ref 'port srv)))) (thread-sleep! 1) (print "ERROR: Bad result from " url ", dbname: " dbname ", action: " action ", params: " params ". Trying again in 1 second.") (call acfg dbname action params (+ count 1))) (begin (error (conc "ERROR: " count " tries, still have improper response res-full=" res-full))))))) (begin (if (not rdat) (print "ERROR: action " action " not registered.") (if (< count 10) (begin (thread-sleep! 1) (area-hosts-set! acfg (make-hash-table)) ;; clear out all known hosts (print "ERROR: no server found, srv=" srv ", trying again in 1 seconds") (call acfg dbname action params (+ count 1))) (begin (error (conc "ERROR: no server found after 10 tries, srv=" srv ", giving up.")) #;(error "No server available")))))))) ;;====================================================================== ;; U T I L I T I E S ;;====================================================================== ;; get a signature for identifing this process ;; (define (get-process-signature) (cons (get-host-name)(current-process-id))) ;;====================================================================== ;; S Y S T E M S T U F F ;;====================================================================== ;; get normalized cpu load by reading from /proc/loadavg and ;; /proc/cpuinfo return all three values and the number of real cpus ;; and the number of threads returns alist '((adj-cpu-load ;; . normalized-proc-load) ... etc. keys: adj-proc-load, ;; adj-core-load, 1m-load, 5m-load, 15m-load ;; (define (get-normalized-cpu-load) (let ((res (get-normalized-cpu-load-raw)) (default `((adj-proc-load . 2) ;; there is no right answer (adj-core-load . 2) (1m-load . 2) (5m-load . 0) ;; causes a large delta - thus causing default of throttling if stuff goes wrong (15m-load . 0) (proc . 1) (core . 1) (phys . 1) (error . #t)))) (cond ((and (list? res) (> (length res) 2)) res) ((eq? res #f) default) ;; add messages? ((eq? res #f) default) ;; this would be the #eof (else default)))) (define (get-normalized-cpu-load-raw) (let* ((actual-host (get-host-name))) ;; #f is localhost (let ((data (append (with-input-from-file "/proc/loadavg" read-lines) (with-input-from-file "/proc/cpuinfo" read-lines) (list "end"))) (load-rx (regexp "^([\\d\\.]+)\\s+([\\d\\.]+)\\s+([\\d\\.]+)\\s+.*$")) (proc-rx (regexp "^processor\\s+:\\s+(\\d+)\\s*$")) (core-rx (regexp "^core id\\s+:\\s+(\\d+)\\s*$")) (phys-rx (regexp "^physical id\\s+:\\s+(\\d+)\\s*$")) (max-num (lambda (p n)(max (string->number p) n)))) ;; (print "data=" data) (if (null? data) ;; something went wrong #f (let loop ((hed (car data)) (tal (cdr data)) (loads #f) (proc-num 0) ;; processor includes threads (phys-num 0) ;; physical chip on motherboard (core-num 0)) ;; core ;; (print hed ", " loads ", " proc-num ", " phys-num ", " core-num) (if (null? tal) ;; have all our data, calculate normalized load and return result (let* ((act-proc (+ proc-num 1)) (act-phys (+ phys-num 1)) (act-core (+ core-num 1)) (adj-proc-load (/ (car loads) act-proc)) (adj-core-load (/ (car loads) act-core)) (result (append (list (cons 'adj-proc-load adj-proc-load) (cons 'adj-core-load adj-core-load)) (list (cons '1m-load (car loads)) (cons '5m-load (cadr loads)) (cons '15m-load (caddr loads))) (list (cons 'proc act-proc) (cons 'core act-core) (cons 'phys act-phys))))) result) (regex-case hed (load-rx ( x l1 l5 l15 ) (loop (car tal)(cdr tal)(map string->number (list l1 l5 l15)) proc-num phys-num core-num)) (proc-rx ( x p ) (loop (car tal)(cdr tal) loads (max-num p proc-num) phys-num core-num)) (phys-rx ( x p ) (loop (car tal)(cdr tal) loads proc-num (max-num p phys-num) core-num)) (core-rx ( x c ) (loop (car tal)(cdr tal) loads proc-num phys-num (max-num c core-num))) (else (begin ;; (print "NO MATCH: " hed) (loop (car tal)(cdr tal) loads proc-num phys-num core-num)))))))))) (define (get-host-stats acfg) (let ((stats-hash (area-stats acfg))) ;; use this opportunity to remove references to dbfiles which have not been accessed in a while (for-each (lambda (dbname) (let* ((stats (hash-table-ref stats-hash dbname)) (last-access (stat-when stats))) (if (and (> last-access 0) ;; if zero then there has been no access (> (- (current-seconds) last-access) 10)) ;; not used in ten seconds (begin (print "Removing " dbname " from stats list") (hash-table-delete! stats-hash dbname) ;; remove from stats hash (stat-dbs-set! stats (hash-table-keys stats)))))) (hash-table-keys stats-hash)) `(,(hash-table->alist (area-dbs acfg)) ;; dbname => randnum ,(map (lambda (dbname) ;; dbname is the db name (cons dbname (stat-when (hash-table-ref stats-hash dbname)))) (hash-table-keys stats-hash)) (cpuload . ,(get-normalized-cpu-load))))) #;(stats . ,(map (lambda (k) ;; create an alist from the stats data (cons k (stat->alist (hash-table-ref (area-stats acfg) k)))) (hash-table-keys (area-stats acfg)))) #;(trace ;; assv ;; cdr ;; caar ;; ;; cdr ;; call ;; finalize-all-db-handles ;; get-all-server-pkts ;; get-normalized-cpu-load ;; get-normalized-cpu-load-raw ;; launch ;; nmsg-send ;; process-db-queries ;; receive-message ;; std-peer-handler ;; update-known-servers ;; work-queue-processor ) ;;====================================================================== ;; netutil ;; move this back to ulex-netutil.scm someday? ;;====================================================================== ;; #include <stdio.h> ;; #include <netinet/in.h> ;; #include <string.h> ;; #include <arpa/inet.h> (foreign-declare "#include \"sys/types.h\"") (foreign-declare "#include \"sys/socket.h\"") (foreign-declare "#include \"ifaddrs.h\"") (foreign-declare "#include \"arpa/inet.h\"") ;; get IP addresses from ALL interfaces (define get-all-ips (foreign-safe-lambda* scheme-object () " // from https://stackoverflow.com/questions/17909401/linux-c-get-default-interfaces-ip-address : C_word lst = C_SCHEME_END_OF_LIST, len, str, *a; // struct ifaddrs *ifa, *i; // struct sockaddr *sa; struct ifaddrs * ifAddrStruct = NULL; struct ifaddrs * ifa = NULL; void * tmpAddrPtr = NULL; if ( getifaddrs(&ifAddrStruct) != 0) C_return(C_SCHEME_FALSE); // for (i = ifa; i != NULL; i = i->ifa_next) { for (ifa = ifAddrStruct; ifa != NULL; ifa = ifa->ifa_next) { if (ifa->ifa_addr->sa_family==AF_INET) { // Check it is // a valid IPv4 address tmpAddrPtr = &((struct sockaddr_in *)ifa->ifa_addr)->sin_addr; char addressBuffer[INET_ADDRSTRLEN]; inet_ntop(AF_INET, tmpAddrPtr, addressBuffer, INET_ADDRSTRLEN); // printf(\"%s IP Address %s\\n\", ifa->ifa_name, addressBuffer); len = strlen(addressBuffer); a = C_alloc(C_SIZEOF_PAIR + C_SIZEOF_STRING(len)); str = C_string(&a, len, addressBuffer); lst = C_a_pair(&a, str, lst); } // else if (ifa->ifa_addr->sa_family==AF_INET6) { // Check it is // // a valid IPv6 address // tmpAddrPtr = &((struct sockaddr_in6 *)ifa->ifa_addr)->sin6_addr; // char addressBuffer[INET6_ADDRSTRLEN]; // inet_ntop(AF_INET6, tmpAddrPtr, addressBuffer, INET6_ADDRSTRLEN); //// printf(\"%s IP Address %s\\n\", ifa->ifa_name, addressBuffer); // len = strlen(addressBuffer); // a = C_alloc(C_SIZEOF_PAIR + C_SIZEOF_STRING(len)); // str = C_string(&a, len, addressBuffer); // lst = C_a_pair(&a, str, lst); // } // else { // printf(\" not an IPv4 address\\n\"); // } } freeifaddrs(ifa); C_return(lst); ")) ;; Change this to bias for addresses with a reasonable broadcast value? ;; (define (ip-pref-less? a b) (let* ((rate (lambda (ipstr) (regex-case ipstr ( "^127\\." _ 0 ) ( "^(10\\.0|192\\.168\\.)\\..*" _ 1 ) ( else 2 ) )))) (< (rate a) (rate b)))) (define (get-my-best-address) (let ((all-my-addresses (get-all-ips)) ;;(all-my-addresses-old (vector->list (hostinfo-addresses (hostname->hostinfo (get-host-name))))) ) (cond ((null? all-my-addresses) (get-host-name)) ;; no interfaces? ((eq? (length all-my-addresses) 1) (car all-my-addresses)) ;; only one to choose from, just go with it (else (car (sort all-my-addresses ip-pref-less?))) ;; (else ;; (ip->string (car (filter (lambda (x) ;; take any but 127. ;; (not (eq? (u8vector-ref x 0) 127))) ;; all-my-addresses)))) ))) (define (get-all-ips-sorted) (sort (get-all-ips) ip-pref-less?)) ) |
Added ulex/ulex.setup version [90ae5db29a].
> > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 | ;; Copyright 2007-2018, Matthew Welland. ;; ;; This program is made available under the GNU GPL version 2.0 or ;; greater. See the accompanying file COPYING for details. ;; ;; This program is distributed WITHOUT ANY WARRANTY; without even the ;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR ;; PURPOSE. ;;;; ulex.setup (standard-extension 'ulex "0.1") |
Added ulex/ulex_europaeus-branch.jpg version [e0f1589481].
cannot compute difference between binary files
Added ulex/write-cycle.fig version [448d56cbcd].
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 | #FIG 3.2 Produced by xfig version 3.2.5-alpha5 Landscape Center Inches Letter 100.00 Single -2 1200 2 0 32 #c5b696 0 33 #eef7fe 0 34 #dbcaa5 0 35 #404040 0 36 #808080 0 37 #bfbfbf 0 38 #dfdfdf 0 39 #8d8e8d 0 40 #a9a9a9 0 41 #555555 0 42 #c6c2c6 0 43 #565151 0 44 #8d8d8d 0 45 #d6d6d6 0 46 #84807d 0 47 #d1d1d1 0 48 #3a3a3a 0 49 #4573a9 0 50 #adadad 0 51 #7b79a4 0 52 #444444 0 53 #73758b 0 54 #f6f6f6 0 55 #414541 0 56 #635dcd 0 57 #bdbdbd 0 58 #515151 0 59 #e6e2e6 0 60 #000049 0 61 #797979 0 62 #303430 0 63 #414141 0 64 #c6b595 6 11775 7350 12750 9000 6 11775 7350 12750 8775 5 1 0 1 -1 -1 0 0 -1 0.000 0 1 0 0 12240.000 7050.000 11790 7650 12240 7800 12690 7650 5 1 0 1 -1 -1 0 0 -1 0.000 0 1 0 0 12240.000 7950.000 11790 8550 12240 8700 12690 8550 1 2 0 1 -1 -1 0 0 -1 0.000 1 0.0000 12240 7500 450 150 11790 7350 12690 7650 2 1 0 1 -1 -1 0 0 -1 0.000 0 0 0 0 0 2 12690 7575 12690 8550 2 1 0 1 -1 -1 0 0 -1 0.000 0 0 0 0 0 2 11790 7575 11790 8550 4 0 0 50 -1 0 12 0.0000 4 150 210 12075 8250 db\001 -6 4 0 0 50 -1 0 12 0.0000 4 150 690 12000 9000 main.db\001 -6 6 7950 6975 9375 7575 2 2 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 5 7950 6975 9375 6975 9375 7575 7950 7575 7950 6975 4 0 0 50 -1 0 12 0.0000 4 195 1335 8100 7350 send-responses\001 -6 6 450 10950 1425 12600 6 450 10950 1425 12375 5 1 0 1 -1 -1 0 0 -1 0.000 0 1 0 0 915.000 10650.000 465 11250 915 11400 1365 11250 5 1 0 1 -1 -1 0 0 -1 0.000 0 1 0 0 915.000 11550.000 465 12150 915 12300 1365 12150 1 2 0 1 -1 -1 0 0 -1 0.000 1 0.0000 915 11100 450 150 465 10950 1365 11250 2 1 0 1 -1 -1 0 0 -1 0.000 0 0 0 0 0 2 1365 11175 1365 12150 2 1 0 1 -1 -1 0 0 -1 0.000 0 0 0 0 0 2 465 11175 465 12150 4 0 0 50 -1 0 12 0.0000 4 150 210 750 11850 db\001 -6 4 0 0 50 -1 0 12 0.0000 4 150 690 675 12600 main.db\001 -6 6 4800 15525 5775 16950 5 1 0 1 -1 -1 0 0 -1 0.000 0 1 0 0 5265.000 15225.000 4815 15825 5265 15975 5715 15825 5 1 0 1 -1 -1 0 0 -1 0.000 0 1 0 0 5265.000 16125.000 4815 16725 5265 16875 5715 16725 1 2 0 1 -1 -1 0 0 -1 0.000 1 0.0000 5265 15675 450 150 4815 15525 5715 15825 2 1 0 1 -1 -1 0 0 -1 0.000 0 0 0 0 0 2 5715 15750 5715 16725 2 1 0 1 -1 -1 0 0 -1 0.000 0 0 0 0 0 2 4815 15750 4815 16725 4 0 0 50 -1 0 12 0.0000 4 150 210 5100 16425 db\001 -6 6 8025 12750 9000 14175 5 1 0 1 -1 -1 0 0 -1 0.000 0 1 0 0 8490.000 12450.000 8040 13050 8490 13200 8940 13050 5 1 0 1 -1 -1 0 0 -1 0.000 0 1 0 0 8490.000 13350.000 8040 13950 8490 14100 8940 13950 1 2 0 1 -1 -1 0 0 -1 0.000 1 0.0000 8490 12900 450 150 8040 12750 8940 13050 2 1 0 1 -1 -1 0 0 -1 0.000 0 0 0 0 0 2 8940 12975 8940 13950 2 1 0 1 -1 -1 0 0 -1 0.000 0 0 0 0 0 2 8040 12975 8040 13950 4 0 0 50 -1 0 12 0.0000 4 150 210 8325 13650 db\001 -6 1 3 0 1 0 7 50 -1 -1 0.000 1 0.0000 2325 12675 645 645 2325 12675 2850 13050 1 3 0 1 0 7 50 -1 -1 0.000 1 0.0000 6075 11025 645 645 6075 11025 6600 11400 1 3 0 1 0 7 50 -1 -1 0.000 1 0.0000 6600 13950 645 645 6600 13950 7125 14325 1 3 0 1 0 7 50 -1 -1 0.000 1 0.0000 3750 16650 645 645 3750 16650 4275 17025 2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 1 0 2 0 0 1.00 60.00 120.00 2625 2250 7575 2250 2 2 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 5 7575 1875 9525 1875 9525 3750 7575 3750 7575 1875 2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 1 0 2 0 0 1.00 60.00 120.00 8250 2400 8250 4275 2 2 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 5 7575 4275 9525 4275 9525 5100 7575 5100 7575 4275 2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 1 0 2 0 0 1.00 60.00 120.00 7575 4650 2625 4650 2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 1 0 3 0 0 1.00 60.00 120.00 9525 4650 10275 4650 10275 5175 2 2 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 5 9975 5175 10650 5175 10650 6975 9975 6975 9975 5175 2 2 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 5 9975 6975 10650 6975 10650 7575 9975 7575 9975 6975 2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 1 0 3 0 0 1.00 60.00 120.00 10650 7125 12000 7125 12150 7350 2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 1 0 3 0 0 1.00 60.00 120.00 11925 7350 11850 7200 10650 7200 2 2 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 5 675 1875 2625 1875 2625 5025 675 5025 675 1875 2 2 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 5 675 5025 2625 5025 2625 6000 675 6000 675 5025 2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 1 0 2 0 0 1.00 60.00 120.00 1575 4800 1575 5325 2 2 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 5 3375 5100 5250 5100 5250 6300 3375 6300 3375 5100 2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 1 0 4 0 0 1.00 60.00 120.00 7950 7275 6525 7275 6525 5700 5250 5700 2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 1 0 2 0 0 1.00 60.00 120.00 3375 5700 2625 5700 2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 1 0 2 0 0 1.00 60.00 120.00 1575 6000 1575 6825 2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 1 0 2 0 0 1.00 60.00 120.00 9975 7275 9375 7275 2 2 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 5 450 1500 5775 1500 5775 7800 450 7800 450 1500 2 2 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 5 6075 1500 11325 1500 11325 7800 6075 7800 6075 1500 2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 2 5925 375 5925 9675 2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 2 1800 12225 1275 11775 2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 2 5550 11325 2850 12375 2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 2 5925 13875 2925 12900 2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 2 3525 16050 2625 13275 2 1 0 1 12 7 50 -1 -1 0.000 0 0 -1 0 0 2 7200 13800 8025 13500 2 1 0 1 1 7 50 -1 -1 0.000 0 0 -1 0 0 2 4350 16425 4800 16200 2 1 0 1 12 7 50 -1 -1 0.000 0 0 -1 0 0 2 6225 11700 6525 13350 2 1 0 1 1 7 50 -1 -1 0.000 0 0 -1 0 0 2 5850 11700 3975 16050 2 1 0 1 0 7 50 -1 -1 0.000 0 0 -1 0 0 2 4350 16575 4800 16350 4 0 0 50 -1 0 12 0.0000 4 150 990 7575 1800 ulex:launch\001 4 0 0 50 -1 0 12 0.0000 4 150 720 750 1800 ulex:call\001 4 0 0 50 -1 0 12 0.0000 4 195 1230 1200 2325 send-message\001 4 0 0 50 -1 0 12 0.0000 4 195 1470 7650 2325 receive-message\001 4 0 0 50 -1 0 12 0.0000 4 195 1410 7725 4575 std-peer-handler\001 4 0 0 50 -1 0 12 0.0000 4 195 2160 3600 4500 '(#t "info msg" <cookie>)\001 4 0 0 50 -1 0 12 0.0000 4 150 450 10725 5625 work\001 4 0 0 50 -1 0 12 0.0000 4 150 525 10725 5880 queue\001 4 0 0 50 -1 0 12 0.0000 4 150 1290 750 5775 mailbox - waits\001 4 0 0 50 -1 0 12 0.0000 4 150 990 3525 5400 ulex:launch\001 4 0 0 50 -1 0 12 0.0000 4 195 1470 3525 6000 receive-message\001 4 0 0 50 -1 0 12 0.0000 4 150 480 1200 6975 result\001 4 0 0 50 -1 0 12 0.0000 4 165 1185 1500 13500 megatest -run\001 4 0 0 50 -1 0 12 0.0000 4 150 900 6375 11925 dashboard\001 4 0 0 50 -1 0 12 0.0000 4 165 1590 6375 14925 megatest -execute\001 4 0 0 50 -1 0 12 0.0000 4 195 2040 3150 17625 megatest -remove-keep\001 4 0 0 50 -1 0 12 0.0000 4 150 375 8250 14400 1.db\001 4 0 0 50 -1 0 12 0.0000 4 150 375 5025 17175 2.db\001 |