Index: Makefile ================================================================== --- Makefile +++ Makefile @@ -173,11 +173,10 @@ mofiles/mutils.o : mutils/mutils.scm mofiles/cookie.o : stml2/cookie.scm mofiles/stml2.o : stml2/stml2.scm # for the modularized stuff - mofiles/commonmod.o : megatest-fossil-hash.scm mofiles/stml2.o \ mofiles/mtargs.o mofiles/pkts.o mofiles/mtconfigf.o \ mofiles/processmod.o mofiles/pgdbmod.o : mofiles/commonmod.o mofiles/dbmod.o : mofiles/commonmod.o mofiles/keysmod.o \ @@ -191,10 +190,13 @@ mofiles/dbmod.o mofiles/pgdbmod.o mofiles/launchmod.o \ mofiles/subrunmod.o mofiles/servermod.o : mofiles/commonmod.o mofiles/dbmod.o mofiles/testsmod.o : mofiles/servermod.o mofiles/dbmod.o mofiles/launchmod.o : mofiles/subrunmod.o mofiles/testsmod.o + +# special cases where an upstream .import file is needed to compile a module +mofiles/rmtmod.o : ulex.import.o # Removed from megamod.o dep: mofiles/ftail.o mofiles/megamod.o : \ mofiles/rmtmod.o \ mofiles/commonmod.o \ Index: archivemod.scm ================================================================== --- archivemod.scm +++ archivemod.scm @@ -22,12 +22,12 @@ (declare (uses commonmod)) (module archivemod * -(import scheme chicken data-structures extras) -(import (prefix sqlite3 sqlite3:) posix typed-records srfi-18 srfi-69 format ports srfi-1 matchable) +(import scheme (chicken base)) +(import (prefix sqlite3 sqlite3:) typed-records srfi-18 srfi-69 format (chicken port) srfi-1 matchable) (import commonmod) ;; (use (prefix ulex ulex:)) (include "common_records.scm") ADDED autoload/autoload.egg Index: autoload/autoload.egg ================================================================== --- /dev/null +++ autoload/autoload.egg @@ -0,0 +1,5 @@ +((license "BSD") + (category lang-exts) + (author "Alex Shinn") + (synopsis "Load modules lazily") + (components (extension autoload))) ADDED autoload/autoload.meta Index: autoload/autoload.meta ================================================================== --- /dev/null +++ autoload/autoload.meta @@ -0,0 +1,9 @@ +;;; autoload.meta -*- Hen -*- + +((egg "autoload.egg") + (synopsis "Load modules lazily") + (category lang-exts) + (license "BSD") + (author "Alex Shinn") + (doc-from-wiki) + (files "autoload.meta" "autoload.scm" "autoload.release-info" "autoload.setup")) ADDED autoload/autoload.scm Index: autoload/autoload.scm ================================================================== --- /dev/null +++ autoload/autoload.scm @@ -0,0 +1,93 @@ +;;;; autoload.scm -- load modules lazily +;; +;; Copyright (c) 2005-2009 Alex Shinn +;; All rights reserved. +;; +;; BSD-style license: http://www.debian.org/misc/bsd.license + +;; Provides an Emacs-style autoload facility which takes the basic form +;; +;; (autoload unit procedure-name ...) +;; +;; such that the first time procedure-name is called, it will perform a +;; runtime require of 'unit and then apply the procedure from the newly +;; loaded unit to the args it was passed. Subsequent calls to +;; procedure-name will thereafter refer to the new procedure and will +;; thus not incur any overhead. +;; +;; You may also specify an alias for the procedure, and a default +;; procedure if the library can't be loaded: +;; +;; (autoload unit (name alias default) ...) +;; +;; In this case, although the procedure name from the unit is "name," +;; the form defines the autoload procedure as "alias." +;; +;; If the library can't be loaded then an error is signalled, unless +;; default is given, in which case the values are passed to that. +;; +;; Examples: +;; +;; ;; load iconv procedures lazily +;; (autoload iconv iconv iconv-open) +;; +;; ;; load some sqlite procedures lazily with "-" names +;; (autoload sqlite (sqlite:open sqlite-open) +;; (sqlite:execute sqlite-execute)) +;; +;; ;; load md5 library, falling back on slower scheme version +;; (autoload scheme-md5 (md5:digest scheme-md5:digest)) +;; (autoload md5 (md5:digest #f scheme-md5:digest)) + +(module autoload (autoload) + +(import scheme (chicken base)) + +(define-syntax autoload + (er-macro-transformer + (lambda (expr rename compare) + (let ((module (cadr expr)) + (procs (cddr expr)) + (_import (rename 'import)) + (_define (rename 'define)) + (_let (rename 'let)) + (_set! (rename 'set!)) + (_begin (rename 'begin)) + (_apply (rename 'apply)) + (_args (rename 'args)) + (_tmp (rename 'tmp)) + (_eval (rename 'eval)) + (_condition-case (rename 'condition-case))) + `(,_begin + ,@(map + (lambda (x) + (let* ((x (if (pair? x) x (list x))) + (name (car x)) + (full-name + (string->symbol + (string-append (symbol->string module) "#" + (symbol->string name)))) + (alias (or (and (pair? (cdr x)) (cadr x)) name)) + (default (and (pair? (cdr x)) (pair? (cddr x)) (caddr x)))) + (if default + `(,_define (,alias . ,_args) + (,_let ((,_tmp (,_condition-case + (,_begin + (,_eval + (begin (require-library ,module) + #f)) + (,_eval ',full-name)) + (exn () ,default)))) + (,_set! ,alias ,_tmp) + (,_apply ,_tmp ,_args))) + `(,_define (,alias . ,_args) + (,_let ((,_tmp (,_begin + (,_eval + (begin (require-library ,module) + #f)) + (,_eval ',full-name)))) + (,_set! ,alias ,_tmp) + (,_apply ,_tmp ,_args)))))) + procs)))))) + +) ADDED autoload/autoload.setup Index: autoload/autoload.setup ================================================================== --- /dev/null +++ autoload/autoload.setup @@ -0,0 +1,7 @@ + +(compile -s -O2 -j autoload autoload.scm) +(compile -s -O2 autoload.import.scm) + +(install-extension + 'autoload '("autoload.so" "autoload.import.so") + '((version 3.0) (syntax))) Index: clientmod.scm ================================================================== --- clientmod.scm +++ clientmod.scm @@ -22,12 +22,12 @@ (declare (uses commonmod)) (module clientmod * -(import scheme chicken data-structures extras) -(import (prefix sqlite3 sqlite3:) posix typed-records srfi-18 srfi-69 format ports srfi-1 matchable) +(import scheme (chicken base)) +(import (prefix sqlite3 sqlite3:) typed-records srfi-18 srfi-69 format (chicken port) srfi-1 matchable) (import commonmod) ;; (use (prefix ulex ulex:)) (include "common_records.scm") Index: common_records.scm ================================================================== --- common_records.scm +++ common_records.scm @@ -17,8 +17,8 @@ ;; along with Megatest. If not, see . ;; ;;====================================================================== ;; (use trace) -(use typed-records) +(import typed-records) ;; moved to commonmod Index: commonmod.scm ================================================================== --- commonmod.scm +++ commonmod.scm @@ -20,32 +20,48 @@ (declare (unit commonmod)) (declare (uses mtargs)) ;; (declare (uses stml2)) (declare (uses mtconfigf)) -(declare (uses ulex)) (declare (uses pkts)) (module commonmod * -(import scheme chicken data-structures extras) +(import scheme (chicken base) + (chicken process) + (chicken format) + (chicken process-context) + (chicken process-context posix) + (chicken string) + (chicken io) + (chicken pretty-print) + (chicken file) + (chicken file posix) + (chicken pathname) + (chicken time) + (chicken sort) + (chicken condition) + (chicken time posix) -(use (prefix sqlite3 sqlite3:) posix typed-records srfi-18 - srfi-1 files format srfi-13 matchable - srfi-69 ports +) + +(import (prefix sqlite3 sqlite3:) typed-records srfi-18 + srfi-1 (chicken file) format srfi-13 matchable + srfi-69 (chicken port) (prefix base64 base64:) regex-case regex hostinfo srfi-4 (prefix dbi dbi:) stack md5 message-digest z3 directory-utils - sparse-vectors) + system-information + ;;sparse-vectors +) (import pkts) -(import ulex) (import (prefix mtconfigf configf:)) (import (prefix mtargs args:)) (include "common_records.scm") (include "megatest-fossil-hash.scm") @@ -239,11 +255,11 @@ (debug:check-verbosity *verbosity* debugstr) ;; if we were handed a bad verbosity rule then we will override it with 1 and continue (if (not *verbosity*)(set! *verbosity* 1)) (if (or dmode ;; (args:get-arg "-debug") (not (get-environment-variable "MT_DEBUG_MODE"))) - (setenv "MT_DEBUG_MODE" (if (list? *verbosity*) + (set-environment-variable! "MT_DEBUG_MODE" (if (list? *verbosity*) (string-intersperse (map conc *verbosity*) ",") (conc *verbosity*)))))) (define (debug:print n e . params) (if (debug:debug-mode n) @@ -598,11 +614,11 @@ (if (null? dirs) #f (let loop ((hed (car dirs)) (tal (cdr dirs))) (let ((res (or (and (directory? hed) - (file-write-access? hed) + (file-writable? hed) hed) (handle-exceptions exn (begin (debug:print-info 0 *default-log-port* "could not create " hed ", this might cause problems down the road.") @@ -674,11 +690,11 @@ (define (common:directory-writable? path-string) (handle-exceptions exn #f (if (and (directory-exists? path-string) - (file-write-access? path-string)) + (file-writable? path-string)) path-string #f))) ;;====================================================================== ;; T I M E A N D D A T E ;;====================================================================== @@ -1091,11 +1107,11 @@ (if (and (string? val) (string? key)) (handle-exceptions exn (debug:print-error 0 *default-log-port* "bad value for setenv, key=" key ", value=" val) - (setenv key val)) + (set-environment-variable! key val)) (debug:print-error 0 *default-log-port* "bad value for setenv, key=" key ", value=" val)))) (define home (getenv "HOME")) (define user (getenv "USER")) @@ -1107,11 +1123,11 @@ (val (cdr vardat))) (if (not (equal? (get-environment-variable var) val)) (handle-exceptions exn (debug:print-error 0 *default-log-port* "Failed to set " var " to " val) - (setenv var val))))) + (set-environment-variable! var val))))) all-vars)) ;; returns list of fd count, socket count (define (get-file-descriptor-count #!key (pid (current-process-id ))) (list @@ -1436,11 +1452,11 @@ csv->test-data ;; MISC sync-inmem->db - ;; TESTMETA + ;; TESTMETAl testmeta-add-record testmeta-update-field ;; TASKS tasks-add @@ -1448,19 +1464,19 @@ )) ;;====================================================================== ;; ALLDATA ;;====================================================================== -;; + ;; attempt to consolidate a bunch of global information into one struct to toss around (defstruct alldat ;; misc (denoise (make-hash-table)) (areapath #f) ;; i.e. toppath (mtconfig #f) (log-port #f) - (areadat #f) ;; i.e. runremote + (ulexdat #f) ;; connection to the databases (rmt-mutex (make-mutex)) (db-sync-mutex (make-mutex)) (db-with-db-mutex (make-mutex)) (read-only-queries api:read-only-queries) (write-queries api:write-queries) @@ -1662,14 +1678,14 @@ -(use posix-extras pathname-expand files) +(import pathname-expand (chicken file)) ;; this plugs a hole in posix-extras in recent chicken versions > 4.9) -(let-values (( (chicken-release-number chicken-major-version) +#;(let-values (( (chicken-release-number chicken-major-version) (apply values (map string->number (take (string-split (chicken-version) ".") 2))))) @@ -1677,11 +1693,12 @@ (or (> chicken-release-number 4) (and (eq? 4 chicken-release-number) (> chicken-major-version 9))))) (if resolve-pathname-broken? (define ##sys#expand-home-path pathname-expand)))) -(define (realpath x) (resolve-pathname (pathname-expand (or x "/dev/null")) )) +;;(define (realpath x) (resolve-pathname (pathname-expand (or x "/dev/null")) )) +(define (realpath x) (pathname-expand (or x "/dev/null")) ) (define (common:get-this-exe-fullpath #!key (argv (argv))) (let* ((this-script (cond ((and (> (length argv) 2) @@ -2007,11 +2024,11 @@ (debug:print 0 *default-log-port* "ERROR: ["(common:human-time)"] Failed to read .homehost file after trying five times. Giving up and exiting, message: " ((condition-property-accessor 'exn 'message) exn)) (exit 1))) (let ((hhf (conc *toppath* "/.homehost"))) (if (common:file-exists? hhf) (with-input-from-file hhf read-line) - (if (file-write-access? *toppath*) + (if (file-writable? *toppath*) (begin (with-output-to-file hhf (lambda () (print bestadrs))) (begin @@ -2215,11 +2232,11 @@ ;; e.g. key is host and dtype is normalized-load ;; (define (common:get-cached-info key dtype #!key (age 5)) (let* ((fullpath (conc *toppath* "/logs/" key "-" dtype ".log"))) (if (and (file-exists? fullpath) - (file-read-access? fullpath)) + (file-readable? fullpath)) (handle-exceptions exn #f (debug:print 2 *default-log-port* "reading file " fullpath) (let ((real-age (- (current-seconds)(file-change-time fullpath)))) @@ -2486,11 +2503,11 @@ (freespc (cond ((not (directory? dirpath)) (if (common:low-noise-print 300 "disks not a dir " disk-num) (debug:print 0 *default-log-port* "WARNING: disk " disk-num " at path \"" dirpath "\" is not a directory - ignoring it.")) -1) - ((not (file-write-access? dirpath)) + ((not (file-writable? dirpath)) (if (common:low-noise-print 300 "disks not writeable " disk-num) (debug:print 0 *default-log-port* "WARNING: disk " disk-num " at path \"" dirpath "\" is not writeable - ignoring it.")) -1) ((not (eq? (string-ref dirpath 0) #\/)) (if (common:low-noise-print 300 "disks not a proper path " disk-num) @@ -2501,11 +2518,11 @@ (free-inodes (cond ((not (directory? dirpath)) (if (common:low-noise-print 300 "disks not a dir " disk-num) (debug:print 0 *default-log-port* "WARNING: disk " disk-num " at path \"" dirpath "\" is not a directory - ignoring it.")) -1) - ((not (file-write-access? dirpath)) + ((not (file-writable? dirpath)) (if (common:low-noise-print 300 "disks not writeable " disk-num) (debug:print 0 *default-log-port* "WARNING: disk " disk-num " at path \"" dirpath "\" is not writeable - ignoring it.")) -1) ((not (eq? (string-ref dirpath 0) #\/)) (if (common:low-noise-print 300 "disks not a proper path " disk-num) @@ -2683,11 +2700,11 @@ (val (cadr p)) (prv (get-environment-variable var))) (set! res (cons (list var prv) res)) (if val (safe-setenv var (->string val)) - (unsetenv var)))) + (unset-environment-variable! var)))) lst) res) '())) @@ -2707,17 +2724,17 @@ x)) envvars)))) (define (common:with-orig-env proc) (let ((current-env (get-environment-variables))) - (for-each (lambda (x) (unsetenv (car x))) current-env) - (for-each (lambda (x) (setenv (car x) (cdr x))) *common:orig-env*) + (for-each (lambda (x) (unset-environment-variable! (car x))) current-env) + (for-each (lambda (x) (set-environment-variable! (car x) (cdr x))) *common:orig-env*) (let ((rv (cond ((string? proc)(system proc)) (proc (proc))))) - (for-each (lambda (x) (unsetenv (car x))) *common:orig-env*) - (for-each (lambda (x) (setenv (car x) (cdr x))) current-env) + (for-each (lambda (x) (unset-environment-variable! (car x))) *common:orig-env*) + (for-each (lambda (x) (set-environment-variable! (car x) (cdr x))) current-env) rv))) (define (common:without-vars proc . var-patts) (let ((vars (make-hash-table))) (for-each @@ -2726,20 +2743,20 @@ (lambda (var-patt) (if (string-match var-patt (car vardat)) (let ((var (car vardat)) (val (cdr vardat))) (hash-table-set! vars var val) - (unsetenv var)))) + (unset-environment-variable! var)))) var-patts)) (get-environment-variables)) (cond ((string? proc)(system proc)) (proc (proc))) (hash-table-for-each vars (lambda (var val) - (setenv var val))) + (set-environment-variable! var val))) vars)) ;;====================================================================== ;; C O L O R S @@ -3062,11 +3079,11 @@ (cond ((not (common:file-exists? pktsdir)) (debug:print 0 *default-log-port* "ERROR: packets directory " pktsdir " does not exist.")) ((not (directory? pktsdir)) (debug:print 0 *default-log-port* "ERROR: packets directory path " pktsdir " is not a directory.")) - ((not (file-read-access? pktsdir)) + ((not (file-readable? pktsdir)) (debug:print 0 *default-log-port* "ERROR: packets directory path " pktsdir " is not readable.")) (else (debug:print-info 0 *default-log-port* "Loading packets found in " pktsdir) (let ((pkts (glob (conc pktsdir "/*.pkt")))) (for-each @@ -3290,26 +3307,26 @@ (new-val (let ((tmp (cdr env-pair))) (if (list? tmp) (car tmp) tmp))) (current-val (get-environment-variable env-var)) (restore-thunk (cond - ((not current-val) (lambda () (unsetenv env-var))) + ((not current-val) (lambda () (unset-environment-variable! env-var))) ((not (string? new-val)) #f) ((eq? current-val new-val) #f) (else - (lambda () (setenv env-var current-val)))))) + (lambda () (set-environment-variable! env-var current-val)))))) ;;(when (not (string? new-val)) ;; (debug:print 0 *default-log-port* " PROBLEM: not a string: "new-val"\n from env-alist:\n"delta-env-alist) ;; (pp delta-env-alist) ;; (exit 1)) (cond ((not new-val) ;; modify env here - (unsetenv env-var)) + (unset-environment-variable! env-var)) ((string? new-val) - (setenv env-var new-val))) + (set-environment-variable! env-var new-val))) restore-thunk)) delta-env-alist)))) (let ((rv (thunk))) (for-each (lambda (x) (x)) restore-thunks) ;; restore env to original state rv))) Index: configfmod.scm ================================================================== --- configfmod.scm +++ configfmod.scm @@ -22,13 +22,13 @@ ;; (declare (uses commonmod)) (module configfmod * -(import scheme chicken data-structures extras) -(import (prefix sqlite3 sqlite3:) posix typed-records srfi-18 - srfi-69 format ports srfi-1 matchable regex) +(import scheme (chicken base)) +(import (prefix sqlite3 sqlite3:) typed-records srfi-18 + srfi-69 format (chicken port) srfi-1 matchable regex) ;; (import commonmod) ;; (use (prefix ulex ulex:)) (include "common_records.scm") Index: dashboard.scm ================================================================== --- dashboard.scm +++ dashboard.scm @@ -55,10 +55,11 @@ (declare (uses stml2.import)) (declare (uses ducttape-lib.import)) (declare (uses mtconfigf.import)) (declare (uses gutilsmod)) +(declare (uses servermod)) (declare (uses megamod)) (declare (uses commonmod)) (declare (uses rmtmod)) (declare (uses runsmod)) (declare (uses dbmod)) @@ -67,10 +68,11 @@ (declare (uses dcommonmod)) (declare (uses tasksmod)) (import gutilsmod) +(import servermod) (import megamod) (import commonmod) (import rmtmod) (import runsmod) (import dbmod) @@ -1433,10 +1435,95 @@ (for-each (lambda (graph-cell) (let* ((graph-dat (hash-table-ref (dboard:tabdat-graph-cell-table tabdat) graph-cell))) (dboard:graph-dat-flag-set! graph-dat #f))) (hash-table-keys (dboard:tabdat-graph-cell-table tabdat))))))) )))) + +;;====================================================================== +;; SYSTEM MONITOR - Track relevant Megatest processes +;;====================================================================== +;; +;; A gui for launching tests +;; +(define (dashboard:sysmon commondat tabdat #!key (tab-num #f)) + (let* ((drawing (vg:drawing-new)) + (sysmon-tab-updater (lambda () + (debug:catch-and-dump + (lambda () + (let ((tabdat (dboard:common-get-tabdat commondat tab-num: tab-num))) + (if tabdat + (let ((last-data-update (dboard:tabdat-last-data-update tabdat)) + (now-time (current-seconds))) + (dashboard:sysmon-canvas-updater commondat tabdat tab-num) ;; redraws canvas on user input (I think...) + )))) + "dashboard:sysmon-tab-updater"))) + (key-listboxes #f) ;; + (update-keyvals (lambda () + (dboard:target-updater tabdat)))) + (dboard:tabdat-drawing-set! tabdat drawing) + (dboard:commondat-add-updater commondat sysmon-tab-updater tab-num: tab-num) + (iup:vbox + (let* ((cnv-obj (iup:canvas + ;; #:size "250x250" ;; "500x400" + #:expand "YES" + #:scrollbar "YES" + #:posx "0.5" + #:posy "0.5" + #:action (make-canvas-action + (lambda (c xadj yadj) + (debug:catch-and-dump + (lambda () + (if (not (dboard:tabdat-cnv tabdat)) + (let ((cnv (dboard:tabdat-cnv tabdat))) + (dboard:tabdat-cnv-set! tabdat c) + (vg:drawing-cnv-set! (dboard:tabdat-drawing tabdat) + (dboard:tabdat-cnv tabdat)))) + (let ((drawing (dboard:tabdat-drawing tabdat)) + (old-xadj (dboard:tabdat-xadj tabdat)) + (old-yadj (dboard:tabdat-yadj tabdat))) + (if (not (and (eq? xadj old-xadj)(eq? yadj old-yadj))) + (begin + ;; (print "xadj: " xadj " yadj: " yadj "changed: "(eq? xadj old-xadj) " " (eq? yadj old-yadj)) + (dboard:tabdat-view-changed-set! tabdat #t) + (dboard:tabdat-xadj-set! tabdat (* -2000 (- xadj 0.5))) + (dboard:tabdat-yadj-set! tabdat (* 2000 (- yadj 0.5))) + )))) + "iup:canvas action"))) + #:wheel-cb (lambda (obj step x y dir) ;; dir is 4 for up and 5 for down. I think. + (debug:catch-and-dump + (lambda () + (let* ((drawing (dboard:tabdat-drawing tabdat)) + (scalex (vg:drawing-scalex drawing))) + (dboard:tabdat-view-changed-set! tabdat #t) + ;; (print "step: " step " x: " x " y: " y " dir: " dir " scalex: " scalex) + (vg:drawing-scalex-set! drawing + (+ scalex + (if (> step 0) + (* scalex 0.02) + (* scalex -0.02)))))) + "wheel-cb")) + ))) + cnv-obj)))) + +;; run times canvas updater +;; +(define (dashboard:sysmon-canvas-updater commondat tabdat tab-num) + (let ((cnv (dboard:tabdat-cnv tabdat)) + (dwg (dboard:tabdat-drawing tabdat)) + (mtx (dboard:tabdat-runs-mutex tabdat)) + (vch (dboard:tabdat-view-changed tabdat))) + (if (and cnv dwg vch) + (begin + (vg:drawing-xoff-set! dwg (dboard:tabdat-xadj tabdat)) + (vg:drawing-yoff-set! dwg (dboard:tabdat-yadj tabdat)) + (mutex-lock! mtx) + (canvas-clear! cnv) + (vg:draw dwg tabdat) + (mutex-unlock! mtx) + (dboard:tabdat-view-changed-set! tabdat #f))))) + + ;;====================================================================== ;; R U N ;;====================================================================== ;; @@ -2205,10 +2292,11 @@ (let* ((stats-dat (dboard:tabdat-make-data)) (runs-dat (dboard:tabdat-make-data)) (onerun-dat (dboard:tabdat-make-data)) ;; name for run-summary structure (runcontrols-dat (dboard:tabdat-make-data)) (runtimes-dat (dboard:tabdat-make-data)) + (sysmon-dat (dboard:tabdat-make-data)) (nruns (dboard:tabdat-numruns runs-dat)) (ntests (dboard:tabdat-num-tests runs-dat)) (keynames (dboard:tabdat-dbkeys runs-dat)) (nkeys (length keynames)) (runsvec (make-vector nruns)) @@ -2423,18 +2511,20 @@ (dashboard:runs-summary commondat onerun-dat tab-num: 2) ;; (dashboard:new-view db data new-view-dat tab-num: 3) (dashboard:run-controls commondat runcontrols-dat tab-num: 3) (dashboard:run-times commondat runtimes-dat tab-num: 4) ;; ;; (dashboard:runs-summary commondat onerun-dat tab-num: 4) + (dashboard:sysmon commondat sysmon-dat tab-num: 5) additional-views ))) ;; (set! (iup:callback tabs tabchange-cb:) (lambda (a b c)(print "SWITCHED TO TAB: " a " " b " " c))) (iup:attribute-set! tabs "TABTITLE0" "Summary") (iup:attribute-set! tabs "TABTITLE1" "Runs") (iup:attribute-set! tabs "TABTITLE2" "Run Summary") (iup:attribute-set! tabs "TABTITLE3" "Run Control") (iup:attribute-set! tabs "TABTITLE4" "Run Times") + (iup:attribute-set! tabs "TABTITLE5" "Sysmon") ;; (iup:attribute-set! tabs "TABTITLE3" "New View") ;; (iup:attribute-set! tabs "TABTITLE4" "Run Control") ;; set the tab names for user added tabs (for-each @@ -2449,10 +2539,11 @@ (dboard:common-set-tabdat! commondat 0 stats-dat) (dboard:common-set-tabdat! commondat 1 runs-dat) (dboard:common-set-tabdat! commondat 2 onerun-dat) (dboard:common-set-tabdat! commondat 3 runcontrols-dat) (dboard:common-set-tabdat! commondat 4 runtimes-dat) + (dboard:common-set-tabdat! commondat 5 sysmon-dat) (iup:vbox tabs ;; controls )))) ADDED dbi/dbi.egg Index: dbi/dbi.egg ================================================================== --- /dev/null +++ dbi/dbi.egg @@ -0,0 +1,5 @@ +((license "BSD") + (category db) + (dependencies autoload sql-null) + (test-dependencies test) + (components (extension dbi))) ADDED dbi/dbi.meta Index: dbi/dbi.meta ================================================================== --- /dev/null +++ dbi/dbi.meta @@ -0,0 +1,21 @@ +;; -*- scheme -*- +( +; Your egg's license: +(license "BSD") + +; Pick one from the list of categories (see below) for your egg and enter it +; here. +(category db) + +; A list of eggs dbi depends on. If none, you can omit this declaration +; altogether. If you are making an egg for chicken 3 and you need to use +; procedures from the `files' unit, be sure to include the `files' egg in the +; `needs' section (chicken versions < 3.4.0 don't provide the `files' unit). +; `depends' is an alias to `needs'. +(needs (autoload "3.0") sql-null) + +; A list of eggs required for TESTING ONLY. See the `Tests' section. +(test-depends test) + +(author "Matt Welland") +(synopsis "An abstract database interface.")) ADDED dbi/dbi.release-info Index: dbi/dbi.release-info ================================================================== --- /dev/null +++ dbi/dbi.release-info @@ -0,0 +1,7 @@ +(repo fossil "http://www.kiatoa.com/cgi-bin/fossils/{egg-name}") +(uri zip "http://www.kiatoa.com/cgi-bin/fossils/{egg-name}/zip/{egg-name}.zip?uuid={egg-release}") +(release "0.5") +(release "0.4") +(release "0.3") +(release "0.2") +(release "0.1") ADDED dbi/dbi.scm Index: dbi/dbi.scm ================================================================== --- /dev/null +++ dbi/dbi.scm @@ -0,0 +1,483 @@ +;;; dbi: Minimal gasket to postgresql, sqlite3 and mysql +;;; +;; Copyright (C) 2007-2018 Matt Welland +;; Copyright (C) 2016 Peter Bex +;; Redistribution and use in source and binary forms, with or without +;; modification, is permitted. +;; +;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS +;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +;; ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE +;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT +;; OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR +;; BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +;; LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +;; USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH +;; DAMAGE. + +;; ONLY A LOWEST COMMON DEMOMINATOR IS SUPPORTED! + +;; d = db handle +;; t = statement handle +;; s = statement +;; l = proc +;; p = params +;; +;; sqlite3 postgres dbi +;; prepare: (prepare d s) n/a prepare (sqlite3, pg) +;; for-each (for-each-row l d s . p) (query-for-each l s d) for-each-row +;; for-each (for-each-row l t . p) n/a NOT YET +;; exec (exec d s . p) (query-tuples s d) +;; exec (exec t . p) n/a + +;; set to 'pg or 'sqlite3 +;; (define dbi:type 'sqlite3) ;; or 'pg +;; (dbi:open 'sqlite3 (list (cons 'dbname fullname))) + +;;====================================================================== +;; D B I +;;====================================================================== +(module dbi + (open db-dbtype db-conn for-each-row get-one get-one-row get-rows + exec close escape-string mk-db now database? with-transaction fold-row + prepare map-row convert prepare-exec get-res + + ;; TODO: These don't really belong here. Also, the naming is not + ;; consistent with the usual Scheme conventions. + pgdatetime-get-year pgdatetime-get-month pgdatetime-get-day + pgdatetime-get-hour pgdatetime-get-minute pgdatetime-get-second + pgdatetime-get-microsecond + pgdatetime-set-year! pgdatetime-set-month! pgdatetime-set-day! + pgdatetime-set-hour! pgdatetime-set-minute! pgdatetime-set-second! + pgdatetime-set-microsecond! + + lazy-bool) + +(import (chicken base) (chicken process) (chicken file) (chicken time) (chicken string) (chicken format) (chicken time posix) scheme srfi-1 srfi-13) +(import (chicken condition) autoload sql-null) + +(define-record-type db + (make-db dbtype dbconn) + db? + (dbtype db-dbtype db-dbtype-set!) + (dbconn db-conn db-conn-set!)) + +(define (missing-egg type eggname) + (lambda _ + (error (printf + "Cannot access ~A databases. Please install the ~S egg and try again." type eggname)))) + +;; (define (sqlite3:statement? h) #f) ;; dummy - hope it gets clobbered if sqlite3 gets loaded + +;; TODO: Make a convenience macro for this? +(define sqlite3-missing (missing-egg 'sqlite3 "sqlite3")) +(autoload sqlite3 + (open-database sqlite3:open-database sqlite3-missing) + (for-each-row sqlite3:for-each-row sqlite3-missing) + (execute sqlite3:execute sqlite3-missing) + (with-transaction sqlite3:with-transaction sqlite3-missing) + (finalize! sqlite3:finalize! sqlite3-missing) + (make-busy-timeout sqlite3:make-busy-timeout sqlite3-missing) + (set-busy-handler! sqlite3:set-busy-handler! sqlite3-missing) + (database? sqlite3:database? sqlite3-missing) + (prepare sqlite3:prepare sqlite3-missing) + (fold-row sqlite3:fold-row sqlite3-missing) + (map-row sqlite3:map-row sqlite3-missing) + (statement? sqlite3:statement? sqlite3-missing)) + +(define sql-de-lite-missing (missing-egg 'sql-de-lite "sql-de-lite")) +(autoload sql-de-lite + (open-database sql:open-database sql-de-lite-missing) + (close-database sql:close-database sql-de-lite-missing) + (for-each-row sql:for-each-row sql-de-lite-missing) + (fold-rows sql:fold-rows sql-de-lite-missing) + (exec sql:exec sql-de-lite-missing) + (fetch-value sql:fetch-value sql-de-lite-missing) + (with-transaction sql:with-transaction sql-de-lite-missing) + (finalize! sql:finalize! sql-de-lite-missing) + (make-busy-timeout sql:make-busy-timeout sql-de-lite-missing) + (set-busy-handler! sql:set-busy-handler! sql-de-lite-missing) + (query sql:query sql-de-lite-missing) + (sql sql:sql sql-de-lite-missing)) + +(define pg-missing (missing-egg 'pg "postgresql")) +(autoload postgresql + (connect pg:connect pg-missing) + (row-for-each pg:row-for-each pg-missing) + (with-transaction pg:with-transaction pg-missing) + (query pg:query pg-missing) + ;;(escape-string pg:escape-string pg-missing) + (disconnect pg:disconnect pg-missing) + (connection? pg:connection? pg-missing) + (row-fold pg:row-fold pg-missing) + (row-map pg:row-map pg-missing) + (affected-rows pg:affected-rows pg-missing) + (result? pg:result? pg-missing)) + +(define mysql-missing (missing-egg 'mysql "mysql-client")) +(autoload mysql-client + (make-mysql-connection mysql:make-connection mysql-missing) + (mysql-null? mysql:mysql-null? mysql-missing)) + +(define (open dbtype dbinit) + (make-db + dbtype + (case dbtype + ((sqlite3) (sqlite3:open-database (alist-ref 'dbname dbinit))) + ((sql-de-lite) (sql:open-database (alist-ref 'dbname dbinit))) + ((pg) (pg:connect dbinit)) + ((mysql) (mysql:make-connection (alist-ref 'host dbinit) + (alist-ref 'user dbinit) + (alist-ref 'password dbinit) + (alist-ref 'dbname dbinit) + port: (alist-ref 'port dbinit))) + (else (error "Unsupported dbtype " dbtype))))) + +(define (convert dbh) + (cond + ((database? dbh) dbh) + ((sqlite3:database? dbh) (make-db 'sqlite3 dbh)) + ((pg:connection? dbh) (make-db 'pg dbh)) + ((not mysql:mysql-null?) (make-db 'mysql dbh)) + (else (error "Unsupported database handle " dbh)))) + +(define (for-each-row proc dbh stmt . params) + (let ((dbtype (db-dbtype dbh)) + (conn (db-conn dbh))) + (case dbtype + ((sqlite3) (sqlite3:for-each-row + (lambda (first . remaining) + (let ((tuple (list->vector (cons first remaining)))) + (proc tuple))) + conn + (apply sqlparam stmt params))) + ((sql-de-lite)(apply sql:query (sql:for-each-row + (lambda (row) + (proc (list->vector row)))) + (sql:sql conn stmt) + params)) + ((pg) (pg:row-for-each + (lambda (tuple) + (proc (list->vector tuple))) + (pg:query conn (apply sqlparam stmt params)))) + ((mysql) (let* ((replaced-sql (apply sqlparam stmt params)) + (fetcher (conn replaced-sql))) + (fetcher (lambda (tuple) + (proc (list->vector tuple)))))) + (else (error "Unsupported dbtype " dbtype))))) + +;; common idiom is to seek a single value, #f if no match +;; NOTE: wish to return first found. Do the set only if not set +(define (get-one dbh stmt . params) + (let ((dbtype (db-dbtype dbh)) + (conn (db-conn dbh))) + (case dbtype + ((sql-de-lite) + (apply sql:query sql:fetch-value (sql:sql conn stmt) params)) + (else + (let ((res #f)) + (apply for-each-row + (lambda (row) + (if (not res) + (set! res (vector-ref row 0)))) + dbh + stmt + params) + res))))) + +;; common idiom is to seek a single value, #f if no match +;; NOTE: wish to return first found. Do the set only if not set +(define (get-one-row dbh stmt . params) + (let ((res #f)) + (apply for-each-row + (lambda (row) + (if (not res) + (set! res row))) + dbh + stmt + params) + res)) + +;; common idiom is to seek a list of rows, '() if no match +(define (get-rows dbh stmt . params) + (let ((res '())) + (apply for-each-row + (lambda (row) + (set! res (cons row res))) + dbh + stmt + params) + (reverse res))) + +(define (exec dbh stmt . params) + (let ((dbtype (db-dbtype dbh)) + (conn (db-conn dbh)) + (junk #f)) + (case dbtype + ((sqlite3) (apply sqlite3:execute conn stmt params)) + ((sql-de-lite)(apply sql:exec (sql:sql conn stmt) params)) + ((pg) (pg:query conn (apply sqlparam stmt params))) + ((mysql) (conn (apply sqlparam stmt params))) + (else (error "Unsupported dbtype " dbtype))))) + +(define (with-transaction dbh proc) + (let ((dbtype (db-dbtype dbh)) + (conn (db-conn dbh))) + (case dbtype + ((sql-de-lite)(sql:with-transaction conn proc)) + ((sqlite3) (sqlite3:with-transaction + conn + (lambda () (proc)))) + ((pg) (pg:with-transaction + conn (lambda () (proc)))) + ((mysql) + (conn "START TRANSACTION") + (conn proc) + (conn "COMMIT")) + (else (error "Unsupported dbtype " dbtype))))) + +(define (prepare dbh stmt) + (let ((dbtype (db-dbtype dbh)) + (conn (db-conn dbh))) + (case dbtype + ((sql-de-lite) dbh) ;; nop? + ((sqlite3) (sqlite3:prepare conn stmt)) + ((pg) (exec dbh stmt) (cons (cons dbh (cadr (string-split stmt))) '())) + ((mysql) (print "WIP")) + (else (error "Unsupported dbtype" dbtype))))) + +(define (fold-row proc init dbh stmt . params) ;; expecting (proc init/prev res) + (let ((dbtype (db-dbtype dbh)) + (conn (db-conn dbh))) + (case dbtype + ((sql-de-lite) (apply sql:query (sql:fold-rows proc init) + (sql:sql conn stmt) params)) + ((sqlite3) (let ((newproc (lambda (prev . rem) + (proc rem prev)))) + (apply sqlite3:fold-row newproc init conn stmt params))) ;; (fold-row PROC INIT DATABASE SQL . PARAMETERS) + ((pg) (pg:row-fold proc init (exec dbh stmt params))) + ((mysql) (fold proc '() (get-rows dbh stmt))) + (else (error "Unsupported dbtype" dbtype))))) + +(define (map-row proc init dbh stmt . params) + (let ((dbtype (db-dbtype dbh)) + (conn (db-conn dbh))) + (case dbtype + ((sqlite3) (apply sqlite3:map-row proc conn stmt params)) + ((pg) (pg:row-map proc (exec dbh stmt params))) + ((mysql) (map proc (get-rows dbh stmt))) + (else (error "Unsupported dbtype" dbtype))))) + +(define (prepare-exec stmth . params) + (if (sqlite3:statement? stmth) + (apply sqlite3:execute stmth params)) + (if (pair? stmth) + (let* ((dbh (car (car stmth))) + (dbtype (db-dbtype dbh)) + (conn (db-conn dbh)) + (stmth-name (string->symbol (cdr (car stmth))))) + (apply pg:query conn stmth-name params)))) + +(define (get-res handle option) + (if (pg:result? handle) + (case option + ((affected-rows) (pg:affected-rows handle))))) + +(define (close dbh) + (cond + ((database? dbh) + (let ((dbtype (db-dbtype dbh)) + (conn (db-conn dbh))) + (case dbtype + ((sql-de-lite) (sql:close-database conn)) + ((sqlite3) (sqlite3:finalize! conn)) + ((pg) (pg:disconnect conn)) + ((mysql) (void)) ; The mysql-client egg doesn't support closing... + (else (error "Unsupported dbtype " dbtype))))) + ((pair? dbh) + (let ((stmt (conc "DEALLOCATE " (cdr (car dbh)) ";"))) + (exec (car (car dbh)) stmt))) + ((sqlite3:statement? dbh) ;; do this last so that *IF* it is a proper dbh it will be closed above and the sqlite3:statement? will not be called + (sqlite3:finalize! dbh)) + + )) + +;;====================================================================== +;; D B M I S C +;;====================================================================== + +(define (escape-string str) + (let ((parts (split-string str "'"))) + (string-intersperse parts "''"))) +;; (pg:escape-string val))) + +;; convert values to appropriate strings +;; +(define (sqlparam-val->string val) + (cond + ((list? val)(string-intersperse (map conc val) ",")) ;; (a b c) => a,b,c + ((string? val)(string-append "'" (escape-string val) "'")) + ((sql-null? val) "NULL") + ((number? val)(number->string val)) + ((symbol? val)(sqlparam-val->string (symbol->string val))) + ((boolean? val) + (if val "TRUE" "FALSE")) ;; should this be "TRUE" or 1? + ;; should this be "FALSE" or 0 or NULL? + ((vector? val) ;; 'tis a date NB// 5/29/2011 - this is badly borked BUGGY! + (sqlparam-val->string (time->string (seconds->local-time (current-seconds))))) + (else + (error "sqlparam: unknown type for value: " val) + ""))) + +;; (sqlparam "INSERT INTO foo(name,age) VALUES(?,?);" "bob" 20) +;; NB// 1. values only!! +;; 2. terminating semicolon required (used as part of logic) +;; +;; a=? 1 (number) => a=1 +;; a=? 1 (string) => a='1' +;; a=? #f => a=FALSE +;; a=? a (symbol) => a=a +;; +(define (sqlparam query . args) + (let* ((query-parts (string-split query "?")) + (num-parts (length query-parts)) + (num-args (length args))) + (if (not (= (+ num-args 1) num-parts)) + (error "ERROR, sqlparam: wrong number of arguments or missing semicolon, " num-args " for query " query) + (if (= num-args 0) query + (let loop ((section (car query-parts)) + (tail (cdr query-parts)) + (result "") + (arg (car args)) + (argtail (cdr args))) + (let* ((valstr (sqlparam-val->string arg)) + (newresult (string-append result section valstr))) + (if (null? argtail) ;; we are done + (string-append newresult (car tail)) + (loop + (car tail) + (cdr tail) + newresult + (car argtail) + (cdr argtail))))))))) + +;; a poorly written but non-broken split-string +;; +(define (split-string strng delim) + (if (eq? (string-length strng) 0) (list strng) + (let loop ((head (make-string 1 (car (string->list strng)))) + (tail (cdr (string->list strng))) + (dest '()) + (temp "")) + (cond ((equal? head delim) + (set! dest (append dest (list temp))) + (set! temp "")) + ((null? head) + (set! dest (append dest (list temp)))) + (else (set! temp (string-append temp head)))) ;; end if + (cond ((null? tail) + (set! dest (append dest (list temp))) dest) + (else (loop (make-string 1 (car tail)) (cdr tail) dest temp)))))) + +(define (database? dbh) + (if (db? dbh) + (let ((dbtype (db-dbtype dbh)) + (conn (db-conn dbh))) + (case dbtype + ((sqlite3) (if (sqlite3:database? conn) #t #f)) + ((sql-de-lite) #t) ;; don't know how to test for database + ((pg) (if (pg:connection? conn) #t #f)) + ((mysql) #t) + (else (error "Unsupported dbtype " dbtype)))) #f)) + +;;====================================================================== +;; Convienence routines +;;====================================================================== + +;; make a db from a list of statements or open it if it already exists +(define (mk-db path file stmts) + (let* ((fname (conc path "/" file)) + (dbexists (file-exists? fname)) + (dbh (if dbexists (open 'sqlite3 (list (cons 'dbname fname))) #f))) + (if (not dbexists) + (begin + (system (conc "mkdir -p " path)) ;; create the path + (set! dbh (open 'sqlite3 (list (cons 'dbname fname)))) + (for-each + (lambda (sqry) + (exec dbh sqry)) + stmts))) + (sqlite3:set-busy-handler! + (db-conn dbh) (sqlite3:make-busy-timeout 1000000)) + dbh)) + +(define (now dbh) + (let ((dbtype (db-dbtype dbh))) + (case dbtype + ((sqlite3) "datetime('now')") + ;; Standard SQL + (else "now()")))) + +(define (make-pgdatetime)(make-vector 7)) +(define (pgdatetime-get-year vec) (vector-ref vec 0)) +(define (pgdatetime-get-month vec) (vector-ref vec 1)) +(define (pgdatetime-get-day vec) (vector-ref vec 2)) +(define (pgdatetime-get-hour vec) (vector-ref vec 3)) +(define (pgdatetime-get-minute vec) (vector-ref vec 4)) +(define (pgdatetime-get-second vec) (vector-ref vec 5)) +(define (pgdatetime-get-microsecond vec) (vector-ref vec 6)) +(define (pgdatetime-set-year! vec val)(vector-set! vec 0 val)) +(define (pgdatetime-set-month! vec val)(vector-set! vec 1 val)) +(define (pgdatetime-set-day! vec val)(vector-set! vec 2 val)) +(define (pgdatetime-set-hour! vec val)(vector-set! vec 3 val)) +(define (pgdatetime-set-minute! vec val)(vector-set! vec 4 val)) +(define (pgdatetime-set-second! vec val)(vector-set! vec 5 val)) +(define (pgdatetime-set-microsecond! vec val)(vector-set! vec 6 val)) + +;; takes postgres date or timestamp +(define (pg-date->string pgdate) + (conc (pgdatetime-get-month pgdate) "/" + (pgdatetime-get-day pgdate) "/" + (pgdatetime-get-year pgdate))) + +;; takes postgres date or timestamp +(define (pg-datetime->string pgdate) + (conc (pgdatetime-get-month pgdate) "/" + (pgdatetime-get-day pgdate) "/" + (pgdatetime-get-year pgdate) " " + (pgdatetime-get-hour pgdate) ":" + (pgdatetime-get-minute pgdate)`)) + + + +;; map to 0 or 1 from a range of values +;; #f => 0 +;; #t => 1 +;; "0" => 0 +;; "1" => 1 +;; FALSE => 0 +;; TRUE => 1 +;; anything else => 1 +(define (lazy-bool val) + (case val + ((#f) 0) + ((#t) 1) + ((0) 0) + ((1) 1) + (else + (cond + ((string? val) + (let ((nval (string->number val))) + (if nval + (lazy-bool nval) + (cond + ((string=? val "FALSE") 0) + ((string=? val "TRUE") 1) + (else 1))))) + ((symbol? val) + (lazy-bool (symbol->string val))) + (else 1))))) +) ADDED dbi/dbi.setup Index: dbi/dbi.setup ================================================================== --- /dev/null +++ dbi/dbi.setup @@ -0,0 +1,11 @@ +;; Copyright 2007-2018, Matthew Welland. +;; +;; This program is made available under the GNU GPL version 2.0 or +;; greater. See the accompanying file COPYING for details. +;; +;; This program is distributed WITHOUT ANY WARRANTY; without even the +;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +;; PURPOSE. + +;;;; dbi.setup +(standard-extension 'dbi "0.5") ADDED dbi/example.scm Index: dbi/example.scm ================================================================== --- /dev/null +++ dbi/example.scm @@ -0,0 +1,69 @@ +;;; dbi: Minimal gasket to postgresql, sqlite3 and mysql +;;; +;; Copyright (C) 2007-2016 Matt Welland +;; Redistribution and use in source and binary forms, with or without +;; modification, is permitted. +;; +;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS +;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +;; ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE +;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT +;; OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR +;; BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +;; LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +;; USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH +;; DAMAGE. + +;; WARNING: This example is basically useless, I'll rewrite it one of these days .... + +(require-library margs dbi) + +(define help "help me") + +(define remargs (args:get-args + (argv) + (list "-inf") + (list "-h") + args:arg-hash + 0)) + +;; define DBPATH in setup.scm +(include "setup.scm") + +(define (ftf:mk-db) + (let* ((fname (conc DBPATH "/ftfplan.db")) + (dbexists (file-exists? fname)) + (dbh (if dbexists (dbi:open 'sqlite3 (list (cons 'dbname fname))) #f))) + (if (not dbexists) + (begin + ;; (print "fullname: " fullname) + (system (conc "mkdir -p " DBPATH)) ;; create the path + (set! dbh (dbi:open 'sqlite3 (list (cons 'dbname fname)))) + (for-each + (lambda (sqry) + ;; (print sqry) + (dbi:exec dbh sqry)) + ;; types: 0 text, 1 jpg, 2 png, 3 svg, 4 spreadsheet, 5 audio, 6 video :: better specs to come... + (list + "CREATE TABLE pics (id INTEGER PRIMARY KEY,name TEXT,dat_id INTEGER,thumb_dat_id INTEGER,created_on INTEGER,owner_id INTEGER);" + "CREATE TABLE dats (id INTEGER PRIMARY KEY,md5sum TEXT,dat BLOB,type INTEGER);" + ;; on every modification a new tiddlers entry is created. When displaying the tiddlers do: + ;; select where created_on < somedate order by created_on desc limit 1 + "CREATE TABLE tiddlers (id INTEGER PRIMARY KEY,wiki_id INTEGER,name TEXT,rev INTEGER,dat_id INTEGER,created_on INTEGER,owner_id INTEGER);" + ;; rev and tag only utilized when user sets a tag. All results from a select as above for tiddlers are set to the tag + "CREATE TABLE revs (id INTEGER PRIMARY KEY,tag TEXT);" + ;; wikis is here for when postgresql support is added or if a sub wiki is created. + "CREATE TABLE wikis (id INTEGER PRIMARY KEY,key_name TEXT,title TEXT,created_on INTEGER);")) + )) + dbh)) + +(define db (ftf:mk-db)) + +(dbi:exec db "INSERT INTO pics (name,owner_id) VALUES ('bob',1);") +(dbi:for-each-row (lambda (row)(print "Name: " (vector-ref row 0) ", owner_id: " (vector-ref row 1))) + db + "SELECT name,owner_id FROM pics;") + Index: dcommonmod.scm ================================================================== --- dcommonmod.scm +++ dcommonmod.scm @@ -40,11 +40,11 @@ matchable (prefix iup iup:) canvas-draw ;; blindly copied from megamod (prefix base64 base64:) (prefix dbi dbi:) - (prefix nanomsg nmsg:) + ;; (prefix nanomsg nmsg:) (prefix sqlite3 sqlite3:) call-with-environment-variables csv csv-xml data-structures Index: docs/code/Makefile ================================================================== --- docs/code/Makefile +++ docs/code/Makefile @@ -1,3 +1,12 @@ + +all : module-hierarchy.pdf + +deps.data : ../../*scm + cd ../..;csi -bq utils/deps.scm > docs/code/deps.data + +module-hierarchy.dot : preamble.dot deps.data postamble.dot + cat preamble.dot deps.data postamble.dot > module-hierarchy.dot + module-hierarchy.pdf : module-hierarchy.dot dot -Tpdf module-hierarchy.dot -o module-hierarchy.pdf Index: docs/code/module-hierarchy.dot ================================================================== --- docs/code/module-hierarchy.dot +++ docs/code/module-hierarchy.dot @@ -23,38 +23,50 @@ subgraph cluster_megatest { label="megatest"; rmtmod [label="rmt mod"]; - // httpmod [label="http-transportmod"]; - // commonmod -// archivemod.scm -"commonmod" -> "archivemod"; -// clientmod.scm -"commonmod" -> "clientmod"; -// configfmod.scm -// subrunmod.scm -"commonmod" -> "subrunmod"; -// ezstepsmod.scm -"commonmod" -> "ezstepsmod"; -// itemsmod.scm -"commonmod" -> "itemsmod"; -// gutilsmod.scm +// template-mod.scm +"sqlite3" -> "template-mod"; +"commonmod" -> "template-mod"; +// http-transportmod.scm +"commonmod" -> "http-transportmod"; +"stml2" -> "http-transportmod"; +"apimod" -> "http-transportmod"; +"dbmod" -> "http-transportmod"; +"testsmod" -> "http-transportmod"; +"mtargs" -> "http-transportmod"; +"mtconfigf" -> "http-transportmod"; +// apimod.scm +"sqlite3" -> "apimod"; +"commonmod" -> "apimod"; +"dbmod" -> "apimod"; +"servermod" -> "apimod"; +// launchmod.scm +"sqlite3" -> "launchmod"; +"commonmod" -> "launchmod"; +// keysmod.scm +"sqlite3" -> "keysmod"; +"srfi-13" -> "keysmod"; // testsmod.scm +"sqlite3" -> "testsmod"; "commonmod" -> "testsmod"; "servermod" -> "testsmod"; "itemsmod" -> "testsmod"; "dbmod" -> "testsmod"; -// runconfigmod.scm -"commonmod" -> "runconfigmod"; -// keysmod.scm -"srfi-13" -> "keysmod"; -// launchmod.scm -"commonmod" -> "launchmod"; +"mtconfigf" -> "testsmod"; +"mtargs" -> "testsmod"; +// vgmod.scm +"sqlite3" -> "vgmod"; +// clientmod.scm +"sqlite3" -> "clientmod"; +"commonmod" -> "clientmod"; // megamod.scm +"mtconfigf" -> "megamod"; "spiffy" -> "megamod"; "stml2" -> "megamod"; +"mtargs" -> "megamod"; "commonmod" -> "megamod"; "keysmod" -> "megamod"; "pgdbmod" -> "megamod"; "tasksmod" -> "megamod"; "dbmod" -> "megamod"; @@ -67,80 +79,122 @@ "testsmod" -> "megamod"; "servermod" -> "megamod"; "subrunmod" -> "megamod"; "itemsmod" -> "megamod"; "runsmod" -> "megamod"; +// subrunmod.scm +"sqlite3" -> "subrunmod"; +"commonmod" -> "subrunmod"; +"mtconfigf" -> "subrunmod"; +// tasksmod.scm +"sqlite3" -> "tasksmod"; +"commonmod" -> "tasksmod"; +"mtconfigf" -> "tasksmod"; +"pgdbmod" -> "tasksmod"; +// dbmod.scm +"sqlite3" -> "dbmod"; +"commonmod" -> "dbmod"; +"keysmod" -> "dbmod"; +"files" -> "dbmod"; +"tasksmod" -> "dbmod"; +"odsmod" -> "dbmod"; +"mtargs" -> "dbmod"; +"mtconfigf" -> "dbmod"; +// servermod.scm +"sqlite3" -> "servermod"; +"commonmod" -> "servermod"; +"dbmod" -> "servermod"; +"tasksmod" -> "servermod"; +"mtargs" -> "servermod"; +"mtconfigf" -> "servermod"; +// ezstepsmod.scm +"sqlite3" -> "ezstepsmod"; +"commonmod" -> "ezstepsmod"; // odsmod.scm +"sqlite3" -> "odsmod"; "commonmod" -> "odsmod"; -// envmod.scm -"commonmod" -> "envmod"; -// http-transportmod.scm -"commonmod" -> "http-transportmod"; -"stml2" -> "http-transportmod"; -"apimod" -> "http-transportmod"; -"dbmod" -> "http-transportmod"; -"testsmod" -> "http-transportmod"; -// processmod.scm // mtmod.scm +"sqlite3" -> "mtmod"; +"mtargs" -> "mtmod"; +"mtconfigf" -> "mtmod"; "commonmod" -> "mtmod"; "dbmod" -> "mtmod"; "pgdbmod" -> "mtmod"; "rmtmod" -> "mtmod"; "servermod" -> "mtmod"; "stml2" -> "mtmod"; "subrunmod" -> "mtmod"; "tasksmod" -> "mtmod"; "testsmod" -> "mtmod"; -// pgdbmod.scm -"commonmod" -> "pgdbmod"; +// treemod.scm +"iup" -> "treemod"; +// itemsmod.scm +"sqlite3" -> "itemsmod"; +"commonmod" -> "itemsmod"; // runsmod.scm +"base64" -> "runsmod"; "commonmod" -> "runsmod"; "dbmod" -> "runsmod"; "itemsmod" -> "runsmod"; "mtmod" -> "runsmod"; "pgdbmod" -> "runsmod"; +"mtargs" -> "runsmod"; +"mtconfigf" -> "runsmod"; "rmtmod" -> "runsmod"; "servermod" -> "runsmod"; "stml2" -> "runsmod"; "subrunmod" -> "runsmod"; "tasksmod" -> "runsmod"; "testsmod" -> "runsmod"; -// apimod.scm -"commonmod" -> "apimod"; -"dbmod" -> "apimod"; -"servermod" -> "apimod"; -// dbmod.scm -"commonmod" -> "dbmod"; -"keysmod" -> "dbmod"; -"files" -> "dbmod"; -"tasksmod" -> "dbmod"; -"odsmod" -> "dbmod"; -// dcommonmod.scm -"gutilsmod" -> "dcommonmod"; -"commonmod" -> "dcommonmod"; -"testsmod" -> "dcommonmod"; -"megamod" -> "dcommonmod"; -"canvas-draw" -> "dcommonmod"; -"canvas-draw-iup" -> "dcommonmod"; -// tasksmod.scm -"commonmod" -> "tasksmod"; -"pgdbmod" -> "tasksmod"; -// template-mod.scm -"commonmod" -> "template-mod"; -// servermod.scm -"commonmod" -> "servermod"; -"dbmod" -> "servermod"; -"tasksmod" -> "servermod"; -// treemod.scm -// commonmod.scm -"pkts" -> "commonmod"; -// vgmod.scm +// processmod.scm +"sqlite3" -> "processmod"; // rmtmod.scm +"sqlite3" -> "rmtmod"; +"ulex" -> "rmtmod"; "commonmod" -> "rmtmod"; "itemsmod" -> "rmtmod"; "apimod" -> "rmtmod"; "dbmod" -> "rmtmod"; +// commonmod.scm +"sqlite3" -> "commonmod"; +"pkts" -> "commonmod"; +"mtconfigf" -> "commonmod"; +"mtargs" -> "commonmod"; +// pgdbmod.scm +"mtconfigf" -> "pgdbmod"; +"mtargs" -> "pgdbmod"; +"commonmod" -> "pgdbmod"; +// envmod.scm +"sqlite3" -> "envmod"; +"commonmod" -> "envmod"; +// configfmod.scm +"sqlite3" -> "configfmod"; +// dcommonmod.scm +"sqlite3" -> "dcommonmod"; +"mtconfigf" -> "dcommonmod"; +"gutilsmod" -> "dcommonmod"; +"commonmod" -> "dcommonmod"; +"servermod" -> "dcommonmod"; +"testsmod" -> "dcommonmod"; +"megamod" -> "dcommonmod"; +"subrunmod" -> "dcommonmod"; +"runsmod" -> "dcommonmod"; +"rmtmod" -> "dcommonmod"; +"dbmod" -> "dcommonmod"; +"canvas-draw" -> "dcommonmod"; +"canvas-draw-iup" -> "dcommonmod"; +"iup" -> "dcommonmod"; +"mtargs" -> "dcommonmod"; +// gutilsmod.scm +"iup" -> "gutilsmod"; +"canvas-draw" -> "gutilsmod"; +// runconfigmod.scm +"sqlite3" -> "runconfigmod"; +"commonmod" -> "runconfigmod"; +// archivemod.scm +"sqlite3" -> "archivemod"; +"commonmod" -> "archivemod"; } } ADDED docs/code/postamble.dot Index: docs/code/postamble.dot ================================================================== --- /dev/null +++ docs/code/postamble.dot @@ -0,0 +1,5 @@ + +} + +} + ADDED docs/code/preamble.dot Index: docs/code/preamble.dot ================================================================== --- /dev/null +++ docs/code/preamble.dot @@ -0,0 +1,27 @@ +// Copyright 2006-2017, Matthew Welland. +// +// This file is part of Megatest. +// +// Megatest is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Megatest is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Megatest. If not, see . +// +digraph megatest_code_hierarchy { + ranksep=0.05; + // rankdir=LR + + node [shape=box,style=filled,fontname="clear",fontsize="10"]; + + subgraph cluster_megatest { + label="megatest"; + + rmtmod [label="rmt mod"]; Index: envmod.scm ================================================================== --- envmod.scm +++ envmod.scm @@ -22,12 +22,12 @@ (declare (uses commonmod)) (module envmod * -(import scheme chicken data-structures extras) -(import (prefix sqlite3 sqlite3:) posix typed-records srfi-18 srfi-69 format ports srfi-1 matchable) +(import scheme (chicken base)) +(import (prefix sqlite3 sqlite3:) typed-records srfi-18 srfi-69 format (chicken port) srfi-1 matchable) (import commonmod) ;; (use (prefix ulex ulex:)) (include "common_records.scm") Index: ezstepsmod.scm ================================================================== --- ezstepsmod.scm +++ ezstepsmod.scm @@ -22,12 +22,12 @@ (declare (uses commonmod)) (module ezstepsmod * -(import scheme chicken data-structures extras) -(import (prefix sqlite3 sqlite3:) posix typed-records srfi-18 srfi-69 format ports srfi-1 matchable) +(import scheme (chicken base)) +(import (prefix sqlite3 sqlite3:) typed-records srfi-18 srfi-69 format (chicken port) srfi-1 matchable) (import commonmod) ;; (use (prefix ulex ulex:)) (include "common_records.scm") Index: http-transportmod.scm ================================================================== --- http-transportmod.scm +++ http-transportmod.scm @@ -26,11 +26,11 @@ (declare (uses testsmod)) (module apimod * -(import scheme chicken data-structures extras posix files +(import scheme (chicken base) (chicken file) srfi-13 srfi-18 spiffy http-client spiffy-directory-listing spiffy-request-vars tcp ) (import commonmod) Index: itemsmod.scm ================================================================== --- itemsmod.scm +++ itemsmod.scm @@ -23,12 +23,12 @@ (declare (uses mtconfigf)) (module itemsmod * -(import scheme chicken data-structures extras) -(use (prefix sqlite3 sqlite3:) posix typed-records srfi-18 srfi-69 format ports srfi-1 matchable) +(import scheme (chicken base)) +(import (prefix sqlite3 sqlite3:) typed-records srfi-18 srfi-69 format (chicken port) srfi-1 matchable) (import commonmod) (import(prefix mtconfigf configf:)) ;; (use (prefix ulex ulex:)) ;; (include "common_records.scm") Index: keysmod.scm ================================================================== --- keysmod.scm +++ keysmod.scm @@ -21,12 +21,12 @@ (declare (unit keysmod)) (module keysmod * -(import scheme chicken data-structures extras) -(import (prefix sqlite3 sqlite3:) posix typed-records srfi-18 srfi-69 format ports srfi-1 matchable) +(import scheme (chicken base) ) +(import (prefix sqlite3 sqlite3:) typed-records srfi-18 srfi-69 format (chicken port) srfi-1 matchable) ;; (import commonmod) ;; (use (prefix ulex ulex:)) (import srfi-13) Index: megamod.scm ================================================================== --- megamod.scm +++ megamod.scm @@ -47,11 +47,11 @@ (import scheme chicken data-structures extras) (use (prefix base64 base64:) (prefix dbi dbi:) - (prefix nanomsg nmsg:) +;; (prefix nanomsg nmsg:) (prefix sqlite3 sqlite3:) call-with-environment-variables csv csv-xml data-structures Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -22,11 +22,11 @@ ;; fake out readline usage of toplevel-command (define (toplevel-command . a) #f) (use (prefix sqlite3 sqlite3:) srfi-1 posix regex regex-case srfi-69 (prefix base64 base64:) readline apropos json http-client directory-utils typed-records - http-client srfi-18 extras format) + http-client srfi-18 extras format tcp6) ;; Added for csv stuff - will be removed ;; (use sparse-vectors) Index: mtargs/mtargs.scm ================================================================== --- mtargs/mtargs.scm +++ mtargs/mtargs.scm @@ -26,12 +26,12 @@ print-args any-defined? help ) -(import scheme chicken data-structures extras posix ports files) -(use srfi-69 srfi-1) +(import scheme (chicken base) (chicken port) (chicken file) (chicken process-context)) +(import srfi-69 srfi-1) (define arg-hash (make-hash-table)) (define help "") (define (get-arg arg . default) Index: mtconfigf/mtconfigf.scm ================================================================== --- mtconfigf/mtconfigf.scm +++ mtconfigf/mtconfigf.scm @@ -71,15 +71,16 @@ ;; misc realpath find-chicken-lib ) -(import scheme chicken data-structures extras ports files) -(use posix typed-records srfi-18 pathname-expand posix-extras) -(use regex regex-case srfi-69 srfi-1 directory-utils extras srfi-13 ) -(use srfi-69) -(import posix) +(import scheme (chicken base) (chicken string) (chicken file) (chicken port)) +(import typed-records srfi-18 pathname-expand) +(import regex regex-case srfi-69 srfi-1 directory-utils srfi-13 ) +(import (chicken io) (chicken condition) (chicken process-context)) +(import (chicken process) (chicken pathname) (chicken pretty-print) (chicken time)) +(import srfi-69 (chicken platform) (chicken sort)) ;; stub debug printers overridden by set-debug-printers (define (debug:print n e . args) (apply print args)) (define (debug:print-info n e . args) @@ -92,19 +93,19 @@ ;; FROM common.scm ;; ;; this plugs a hole in posix-extras in recent chicken versions > 4.9) -(let-values (( (chicken-release-number chicken-major-version) - (apply values - (map string->number - (take - (string-split (chicken-version) ".") - 2))))) - (if (or (> chicken-release-number 4) - (and (eq? 4 chicken-release-number) (> chicken-major-version 9))) - (define ##sys#expand-home-path pathname-expand))) +;;;(let-values (( (chicken-release-number chicken-major-version) +;;; (apply values +;;; (map string->number +;;; (take +;;; (string-split (chicken-version) ".") +;;; 2))))) +;;; (if (or (> chicken-release-number 4) +;;; (and (eq? 4 chicken-release-number) (> chicken-major-version 9))) +;;; (define ##sys#expand-home-path pathname-expand))) ;;(define (set-verbosity v)(debug:set-verbosity v)) (define *default-log-port* (current-error-port)) @@ -216,11 +217,11 @@ (if (and (string? val) (string? key)) (handle-exceptions exn (debug:print-error 0 *default-log-port* "bad value for setenv, key=" key ", value=" val) - (setenv key val)) + (set-environment-variable! key val)) (debug:print-error 0 *default-log-port* "bad value for setenv, key=" key ", value=" val)))) ;; accept an alist or hash table containing envvar/env value pairs (value of #f causes unset) ;; execute thunk in context of environment modified as per this list ;; restore env to prior state then return value of eval'd thunk. @@ -237,26 +238,26 @@ (new-val (let ((tmp (cdr env-pair))) (if (list? tmp) (car tmp) tmp))) (current-val (get-environment-variable env-var)) (restore-thunk (cond - ((not current-val) (lambda () (unsetenv env-var))) + ((not current-val) (lambda () (unset-environment-variable! env-var))) ((not (string? new-val)) #f) ((eq? current-val new-val) #f) (else - (lambda () (setenv env-var current-val)))))) + (lambda () (set-environment-variable! env-var current-val)))))) ;;(when (not (string? new-val)) ;; (debug:print 0 *default-log-port* " PROBLEM: not a string: "new-val"\n from env-alist:\n"delta-env-alist) ;; (pp delta-env-alist) ;; (exit 1)) (cond ((not new-val) ;; modify env here - (unsetenv env-var)) + (unset-environment-variable! env-var)) ((string? new-val) - (setenv env-var new-val))) + (set-environment-variable! env-var new-val))) restore-thunk)) delta-env-alist)))) (let ((rv (thunk))) (for-each (lambda (x) (x)) restore-thunks) ;; restore env to original state rv))) @@ -683,11 +684,11 @@ (configf:script-rx ( x include-script params);; handle-exceptions ;; exn ;; (begin ;; (debug:print '(0 2 9) #f "INFO: include from script " include-script " failed.") ;; (loop (configf:read-line inp res (calc-allow-system allow-system curr-section-name sections) settings) curr-section-name #f #f)) - (if (and (safe-file-exists? include-script)(file-execute-access? include-script)) + (if (and (safe-file-exists? include-script)(file-executable? include-script)) (let* ((local-allow-system (calc-allow-system allow-system curr-section-name sections)) (env-delta (cfgdat->env-alist curr-section-name res local-allow-system)) (new-inp-port (with-env-vars env-delta @@ -819,11 +820,11 @@ (let* ((curr-dir (current-directory)) (configinfo (find-config fname toppath: given-toppath)) (toppath (car configinfo)) (configfile (cadr configinfo))) (if toppath (change-directory toppath)) - (if (and toppath pathenvvar)(setenv pathenvvar toppath)) + (if (and toppath pathenvvar)(set-environment-variable! pathenvvar toppath)) (let ((configdat (if configfile (read-config configfile #f #t environ-patt: environ-patt post-section-procs: (if set-fields (list (cons "^fields$" set-fields) ) '()) #f keep-filenames: keep-filenames)))) @@ -1051,11 +1052,11 @@ ;; returns (list dat msg) (define (read-refdb refdb-path) (let ((sheets-file (conc refdb-path "/sheet-names.cfg"))) (if (not (safe-file-exists? sheets-file)) (list #f (conc "ERROR: no refdb found at " refdb-path)) - (if (not (file-read-access? sheets-file)) + (if (not (file-readable? sheets-file)) (list #f (conc "ERROR: refdb file not readable at " refdb-path)) (let* ((sheets (with-input-from-file sheets-file (lambda () (let loop ((inl (read-line)) (res '())) Index: odsmod.scm ================================================================== --- odsmod.scm +++ odsmod.scm @@ -22,11 +22,11 @@ (declare (uses commonmod)) (module odsmod * -(import scheme chicken data-structures extras csv-xml regex) +(import scheme (chicken base) (chicken string) (chicken port) (chicken io) (chicken file) csv-xml regex) (import (prefix sqlite3 sqlite3:) posix typed-records srfi-18 srfi-69 format ports srfi-1 matchable srfi-13) (import commonmod) ;; (use (prefix ulex ulex:)) ADDED pathname-expand/pathname-expand.egg Index: pathname-expand/pathname-expand.egg ================================================================== --- /dev/null +++ pathname-expand/pathname-expand.egg @@ -0,0 +1,4 @@ +((license "BSD") + (category os) + (author "The CHICKEN team") + (components (extension pathname-expand))) ADDED pathname-expand/pathname-expand.meta Index: pathname-expand/pathname-expand.meta ================================================================== --- /dev/null +++ pathname-expand/pathname-expand.meta @@ -0,0 +1,8 @@ +;; -*-scheme-*- +((synopsis "Pathname expansion") + (license "BSD") + (category os) + (doc-from-wiki) + ;; No tests; this is very hard to do in a cross-platform way without + ;; writing a reimplementation of the functionality in our tests... + (author "The CHICKEN team")) ADDED pathname-expand/pathname-expand.scm Index: pathname-expand/pathname-expand.scm ================================================================== --- /dev/null +++ pathname-expand/pathname-expand.scm @@ -0,0 +1,112 @@ +;;; Pathname expansion, to replace the deprecated core functionality. +; +; Copyright (c) 2014, The CHICKEN Team +; All rights reserved. +; +; Redistribution and use in source and binary forms, with or without +; modification, are permitted provided that the following conditions +; are met: +; +; Redistributions of source code must retain the above copyright +; notice, this list of conditions and the following disclaimer. +; +; Redistributions in binary form must reproduce the above copyright +; notice, this list of conditions and the following disclaimer in +; the documentation and/or other materials provided with the +; distribution. +; +; Neither the name of the author nor the names of its contributors +; may be used to endorse or promote products derived from this +; software without specific prior written permission. +; +; THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +; "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +; LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +; FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +; COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, +; INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +; (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +; SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +; HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +; STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +; ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED +; OF THE POSSIBILITY OF SUCH DAMAGE. + +(module pathname-expand + (pathname-expand) + +(import (chicken base) (chicken pathname) (chicken condition) (chicken platform) scheme) +(import (chicken fixnum) (chicken process-context) (chicken process-context posix)) +(import srfi-13 (chicken file) ) + +;; Expand pathname starting with "~", and/or apply base directory to +;; relative pathname +; +; Inspired by Gambit's "path-expand" procedure. + +(define pathname-expand + (let* ((home + ;; Effective uid might be changed at runtime so this has to + ;; be a lambda, but we could try to cache the result on uid. + (lambda () + (cond-expand + ((and windows (not cygwin)) + (or (get-environment-variable "USERPROFILE") + (get-environment-variable "HOME") + ".")) + (else + (let ((info (user-information (current-effective-user-id)))) + (list-ref info 5)))))) + (slash + (cond-expand + ((and windows (not cygwin)) '(#\\ #\/)) + (else '(#\/)))) + (ts (string-append "~" (string (car slash)))) + (tts (string-append "~" ts))) + (lambda (path #!optional (base (current-directory))) + (if (absolute-pathname? path) + path + (let ((len (string-length path))) + (cond + ((or (string=? "~~" path) + (and (fx>= len 3) (string=? tts (substring path 0 3)))) + ;; Repository-path + (let ((rp (repository-path))) + (if rp + (string-append rp (substring path 2 len)) + (signal + (make-composite-condition + (make-property-condition + 'exn 'location 'pathname-expand + 'message "No repository path defined" + 'arguments (list path)) + (make-property-condition 'pathname-expand) + (make-property-condition 'repository-path)))))) + ((or (string=? "~" path) + (and (fx> len 2) (string=? ts (substring path 0 2)))) + ;; Current user's home dir + (string-append (home) (substring path 1 len))) + ((and (fx> len 0) (char=? #\~ (string-ref path 0))) + ;; Arbitrary user's home dir + (let ((rest (substring path 1 len))) + (if (and (fx> len 1) (memq (string-ref path 1) slash)) + (string-append (home) rest) + (let* ((p (string-index path (lambda (c) (memq c slash)))) + (user (substring path 1 (or p len))) + (info (user-information user))) + (if info + (let ((dir (list-ref info 5))) + (if p + (make-pathname dir (substring path p)) + dir)) + (signal + (make-composite-condition + (make-property-condition + 'exn 'location 'pathname-expand + 'message "Cannot expand homedir for user" + 'arguments (list path)) + (make-property-condition 'pathname-expand) + (make-property-condition 'username)))))))) + (else (make-pathname base path)))))))) + +) ADDED pathname-expand/pathname-expand.setup Index: pathname-expand/pathname-expand.setup ================================================================== --- /dev/null +++ pathname-expand/pathname-expand.setup @@ -0,0 +1,2 @@ +;; -*-scheme-*- +(standard-extension 'pathname-expand 0.1) Index: pkts/pkts.scm ================================================================== --- pkts/pkts.scm +++ pkts/pkts.scm @@ -162,12 +162,13 @@ ;; utility procs increment-string ;; used to get indexes for strings in ref pkts make-report ;; make a .dot file ) -(import chicken scheme data-structures posix srfi-1 regex srfi-13 srfi-69 ports extras) -(use crypt sha1 message-digest (prefix dbi dbi:) typed-records) +(import (chicken base) scheme (chicken process) (chicken time posix) (chicken io) (chicken file)) +(import chicken.process-context.posix (chicken string) (chicken time) (chicken sort) (chicken file posix) (chicken condition) srfi-1 regex srfi-13 srfi-69 (chicken port) ) +(import crypt sha1 message-digest (prefix dbi dbi:) typed-records) ;;====================================================================== ;; DATA MANIPULATION UTILS ;;====================================================================== @@ -695,11 +696,11 @@ (cond ((not (file-exists? pktsdir)) (print "ERROR: packets directory " pktsdir " does not exist.")) ((not (directory? pktsdir)) (print "ERROR: packets directory path " pktsdir " is not a directory.")) - ((not (file-read-access? pktsdir)) + ((not (file-readable? pktsdir)) (print "ERROR: packets directory path " pktsdir " is not readable.")) (else ;; (print "INFO: Loading packets found in " pktsdir) (let ((pkts (glob (conc pktsdir "/*.pkt")))) (for-each Index: processmod.scm ================================================================== --- processmod.scm +++ processmod.scm @@ -21,13 +21,13 @@ (declare (unit processmod)) (module processmod * -(import scheme chicken data-structures extras) -(import (prefix sqlite3 sqlite3:) posix typed-records srfi-18 srfi-69 - format ports srfi-1 matchable regex directory-utils) +(import scheme (chicken base) ) +(import (prefix sqlite3 sqlite3:) typed-records srfi-18 srfi-69 + format (chicken port) srfi-1 matchable regex directory-utils) ;;====================================================================== ;; supporting functions ;;====================================================================== Index: rmtmod.scm ================================================================== --- rmtmod.scm +++ rmtmod.scm @@ -37,12 +37,10 @@ (import commonmod) (import itemsmod) (import apimod) (import dbmod) -;; (include "rmt-inc.scm") - ;; ;; THESE ARE ALL CALLED ON THE CLIENT SIDE!!! ;; ;; generate entries for ~/.megatestrc with the following @@ -72,14 +70,45 @@ cinfo (if (server:check-if-running areapath) (client:setup areapath) #f)))) -(define *send-receive-mutex* (make-mutex)) ;; should have separate mutex per run-id +;; return the handle struct for sending queries to a specific database +;; - initializes the connection object if this is the first access +;; - finds the "captain" and asks who to talk to for the given dbfname +;; - establishes the connection to the current dbowner +;; +(define (rmt:connect alldat dbfname dbtype) + (let* ((ulexdat (or (alldat-ulexdat alldat) + (rmt:setup-ulex alldat)))) + (ulex:connect ulexdat dbfname dbtype))) +;; setup the remote calls +(define (rmt:setup-ulex alldat) + (let* ((new-ulexdat (ulex:setup))) ;; establish connection to ulex + (alldat-ulexdat-set! alldat new-ulexdat) + (let ((udata (alldat-ulexdat alldat))) + ;; register all needed procs + (ulex:register-handler udata 'ping common:get-full-version) + (ulex:register-handler udata 'login common:get-full-version) ;; force setup of the connection + new-ulexdat))) + +;; set up a connection to the current owner of the dbfile associated with rid +;; then send the query to that dbfile owner and wait for a response. +;; (define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f)) ;; start attemptnum at 1 so the modulo below works as expected - (rmt:open-qry-close-locally cmd 0 params)) + (let* ((alldat *alldat*) + (areapath (alldat-areapath alldat)) + (dbtype (if (or (not rid)(< rid 1)) ;; this is the criteria for "main.db" + 'main 'runs)) + (dbfname (if (eq? dbtype 'main) + "main.db" + (conc rid ".db"))) + (dbfile (conc areapath "/.db/" dbfname)) + (ulexconn (rmt:connect alldat dbfname dbtype))) + (rmt:open-qry-close-locally cmd 0 params))) + ;; ;; ;; #;(common:telemetry-log (conc "rmt:"(->string cmd)) ;; ;; #;(define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f)) ;; start attemptnum at 1 so the modulo below works as expected ;; ;; ;; ;; #;(common:telemetry-log (conc "rmt:"(->string cmd)) Index: runconfigmod.scm ================================================================== --- runconfigmod.scm +++ runconfigmod.scm @@ -22,12 +22,12 @@ (declare (uses commonmod)) (module runconfigmod * -(import scheme chicken data-structures extras) -(import (prefix sqlite3 sqlite3:) posix typed-records srfi-18 srfi-69 format ports srfi-1 matchable) +(import scheme (chicken base) ) +(import (prefix sqlite3 sqlite3:) typed-records srfi-18 srfi-69 format (chicken port) srfi-1 matchable) (import commonmod) ;; (use (prefix ulex ulex:)) (include "common_records.scm") Index: stml2/cookie.scm ================================================================== --- stml2/cookie.scm +++ stml2/cookie.scm @@ -45,11 +45,11 @@ ;; (declare (unit cookie)) (module cookie * -(import chicken scheme data-structures extras srfi-13 ports posix) +(import (chicken base) scheme queues srfi-13 (chicken port) (chicken io)(chicken file) (chicken format) (chicken string) (chicken time posix)) (require-extension srfi-1 srfi-13 srfi-14 regex) ;; (use srfi-1 srfi-13 srfi-14 regex) ;; (declare (export parse-cookie-string construct-cookie-string)) Index: stml2/stml2.scm ================================================================== --- stml2/stml2.scm +++ stml2/stml2.scm @@ -12,14 +12,14 @@ ;; (declare (unit stml)) (module stml2 * -(import chicken scheme data-structures extras srfi-13 ports posix srfi-69 files srfi-1) +(import (chicken base) scheme queues srfi-13 (chicken port) (chicken io) (chicken file) srfi-69 srfi-1 (chicken condition)) (import cookie) -(use (prefix dbi dbi:) (prefix crypt c:) typed-records) +(import (prefix dbi dbi:) (prefix crypt c:) typed-records) ;; (declare (uses misc-stml)) (use regex) ;; The (usually global) sdat contains everything about the session Index: subrunmod.scm ================================================================== --- subrunmod.scm +++ subrunmod.scm @@ -24,13 +24,13 @@ (declare (uses mtconfigf)) (module subrunmod * -(import scheme chicken data-structures extras) -(use (prefix sqlite3 sqlite3:) posix typed-records srfi-18 - srfi-69 format ports srfi-1 matchable irregex +(import scheme (chicken base)) +(use (prefix sqlite3 sqlite3:) typed-records srfi-18 + srfi-69 format (chicken port) srfi-1 matchable irregex call-with-environment-variables) (import commonmod) (import (prefix mtconfigf configf:)) Index: template-mod.scm ================================================================== --- template-mod.scm +++ template-mod.scm @@ -22,12 +22,12 @@ (declare (uses commonmod)) (module rmtmod * -(import scheme chicken data-structures extras) -(import (prefix sqlite3 sqlite3:) posix typed-records srfi-18 srfi-69 format ports srfi-1 matchable) +(import scheme (chicken base)) +(import (prefix sqlite3 sqlite3:) typed-records srfi-18 srfi-69 format (chicken port) srfi-1 matchable) (import commonmod) ;; (use (prefix ulex ulex:)) (include "common_records.scm") Index: treemod.scm ================================================================== --- treemod.scm +++ treemod.scm @@ -21,11 +21,11 @@ (declare (unit treemod)) (module treemod * -(import scheme chicken data-structures extras) +(import scheme (chicken base)) (import (prefix iup iup:)) ;; (prefix sqlite3 sqlite3:) posix typed-records srfi-18 srfi-69 format ports srfi-1 matchable) ;; (import commonmod) ;;; DO NOT ALLOW rmt*scm TO DEPEND ON common*scm!!!! ;; (include "common_records.scm") ) Index: ulex/ulex.scm ================================================================== --- ulex/ulex.scm +++ ulex/ulex.scm @@ -24,1408 +24,94 @@ ;; Why sql-de-lite and not say, dbi? - performance mostly, then simplicity. ;; ;;====================================================================== ;; (use rpc pkts mailbox sqlite3) - -(module ulex - * - ;; #;( - ;; ;; areas - ;; make-area - ;; area->alist - ;; ;; server - ;; launch - ;; update-known-servers - ;; shutdown - ;; get-best-server - ;; ;; client side - ;; call - ;; ;; queries, procs and system commands (i.e. workers) - ;; register - ;; register-batch - ;; ;; database - note: most database stuff need not be exposed, these calls may be removed from exports in future - ;; open-db - ;; ;; ports - ;; pl-find-port - ;; pl-get-prev-used-port - ;; pl-open-db - ;; pl-open-run-close - ;; pl-release-port - ;; pl-set-port - ;; pl-take-port - ;; pl-is-port-available - ;; pl-get-port-state - ;; ;; system - ;; get-normalized-cpu-load - - ;; ) - -(import scheme posix chicken data-structures ports extras files mailbox) -(import rpc srfi-18 pkts matchable regex - typed-records srfi-69 srfi-1 - srfi-4 regex-case - (prefix sqlite3 sqlite3:) - foreign - tcp) ;; ulex-netutil) - -;;====================================================================== -;; D E B U G H E L P E R S -;;====================================================================== - -(define (dbg> . args) - (with-output-to-port (current-error-port) - (lambda () - (apply print "dbg> " args)))) - -(define (debug-pp . args) - (if (get-environment-variable "ULEX_DEBUG") - (with-output-to-port (current-error-port) - (lambda () - (apply pp args))))) - -(define *default-debug-port* (current-error-port)) - -(define (sdbg> fn stage-name stage-start stage-end start-time . message) - (if (get-environment-variable "ULEX_DEBUG") - (with-output-to-port *default-debug-port* - (lambda () - (apply print "ulex:" fn " " stage-name " took " (- (if stage-end stage-end (current-milliseconds)) stage-start) " ms. " - (if start-time - (conc "total time " (- (current-milliseconds) start-time) - " ms.") - "") - message - ))))) - -;;====================================================================== -;; M A C R O S -;;====================================================================== -;; iup callbacks are not dumping the stack, this is a work-around -;; - -;; Some of these routines use: -;; -;; http://www.cs.toronto.edu/~gfb/scheme/simple-macros.html -;; -;; Syntax for defining macros in a simple style similar to function definiton, -;; when there is a single pattern for the argument list and there are no keywords. -;; -;; (define-simple-syntax (name arg ...) body ...) -;; - -(define-syntax define-simple-syntax - (syntax-rules () - ((_ (name arg ...) body ...) - (define-syntax name (syntax-rules () ((name arg ...) (begin body ...))))))) - -(define-simple-syntax (catch-and-dump proc procname) - (handle-exceptions - exn - (begin - (print-call-chain (current-error-port)) - (with-output-to-port (current-error-port) - (lambda () - (print ((condition-property-accessor 'exn 'message) exn)) - (print "Callback error in " procname) - (print "Full condition info:\n" (condition->list exn))))) - (proc))) - - -;;====================================================================== -;; R E C O R D S -;;====================================================================== - -;; information about me as a server -;; -(defstruct area - ;; about this area - (useportlogger #f) - (lowport 32768) - (server-type 'auto) ;; auto=create up to five servers/pkts, main=create pkts, passive=no pkt (unless there are no pkts at all) - (conn #f) - (port #f) - (myaddr (get-my-best-address)) - pktid ;; get pkt from hosts table if needed - pktfile - pktsdir - dbdir - (dbhandles (make-hash-table)) ;; fname => list-of-dbh, NOTE: Should really never need more than one? - (mutex (make-mutex)) - (rtable (make-hash-table)) ;; registration table of available actions - (dbs (make-hash-table)) ;; filename => random number, used for choosing what dbs I serve - ;; about other servers - (hosts (make-hash-table)) ;; key => hostdat - (hoststats (make-hash-table)) ;; key => alist of fname => ( qcount . qtime ) - (reqs (make-hash-table)) ;; uri => queue - ;; work queues - (wqueues (make-hash-table)) ;; fname => qdat - (stats (make-hash-table)) ;; fname => totalqueries - (last-srvup (current-seconds)) ;; last time we updated the known servers - (cookie2mbox (make-hash-table)) ;; map cookie for outstanding request to mailbox of awaiting call - (ready #f) - (health (make-hash-table)) ;; ipaddr:port => num failed pings since last good ping - ) - -;; host stats -;; -(defstruct hostdat - (pkt #f) - (dbload (make-hash-table)) ;; "dbfile.db" => queries/min - (hostload #f) ;; normalized load ( 5min load / numcpus ) - ) - -;; dbdat -;; -(defstruct dbdat - (dbh #f) - (fname #f) - (write-access #f) - (sths (make-hash-table)) ;; hash mapping query strings to handles - ) - -;; qdat -;; -(defstruct qdat - (writeq (make-queue)) - (readq (make-queue)) - (rwq (make-queue)) - (logq (make-queue)) ;; do we need a queue for logging? yes, if we use sqlite3 db for logging - (osshort (make-queue)) - (oslong (make-queue)) - (misc (make-queue)) ;; used for things like ping-full - ) - -;; calldat -;; -(defstruct calldat - (ctype 'dbwrite) - (obj #f) ;; this would normally be an SQL statement e.g. SELECT, INSERT etc. - (rtime (current-milliseconds))) - -;; make it a global? Well, it is local to area module - -(define *pktspec* - `((server (hostname . h) - (port . p) - (pid . i) - (ipaddr . a) - ) - (data (hostname . h) ;; sender hostname - (port . p) ;; sender port - (ipaddr . a) ;; sender ip - (hostkey . k) ;; sending host key - store info at server under this key - (servkey . s) ;; server key - this needs to match at server end or reject the msg - (format . f) ;; sb=serialized-base64, t=text, sx=sexpr, j=json - (data . d) ;; base64 encoded slln data - ))) - -;; work item -;; -(defstruct witem - (rhost #f) ;; return host - (ripaddr #f) ;; return ipaddr - (rport #f) ;; return port - (servkey #f) ;; the packet representing the client of this workitem, used by final send-message - (rdat #f) ;; the request - usually an sql query, type is rdat - (action #f) ;; the action: immediate, dbwrite, dbread,oslong, osshort - (cookie #f) ;; cookie id for response - (data #f) ;; the data payload, i.e. parameters - (result #f) ;; the result from processing the data - (caller #f)) ;; the calling peer according to rpc itself - -(define (trim-pktid pktid) - (if (string? pktid) - (substring pktid 0 4) - "nopkt")) - -(define (any->number num) - (cond - ((number? num) num) - ((string? num) (string->number num)) - (else num))) - -(use trace) -(trace-call-sites #t) - -;;====================================================================== -;; D A T A B A S E H A N D L I N G -;;====================================================================== - -;; look in dbhandles for a db, return it, else return #f -;; -(define (get-dbh acfg fname) - (let ((dbh-lst (hash-table-ref/default (area-dbhandles acfg) fname '()))) - (if (null? dbh-lst) - (begin - ;; (print "opening db for " fname) - (open-db acfg fname)) ;; Note that the handles get put back in the queue in the save-dbh calls - (let ((rem-lst (cdr dbh-lst))) - ;; (print "re-using saved connection for " fname) - (hash-table-set! (area-dbhandles acfg) fname rem-lst) - (car dbh-lst))))) - -(define (save-dbh acfg fname dbdat) - ;; (print "saving dbh for " fname) - (hash-table-set! (area-dbhandles acfg) fname (cons dbdat (hash-table-ref/default (area-dbhandles acfg) fname '())))) - -;; open the database, if never before opened init it. put the handle in the -;; open db's hash table -;; returns: the dbdat -;; -(define (open-db acfg fname) - (let* ((fullname (conc (area-dbdir acfg) "/" fname)) - (exists (file-exists? fullname)) - (write-access (if exists - (file-write-access? fullname) - (file-write-access? (area-dbdir acfg)))) - (db (sqlite3:open-database fullname)) - (handler (sqlite3:make-busy-timeout 136000)) - ) - (sqlite3:set-busy-handler! db handler) - (sqlite3:execute db "PRAGMA synchronous = 0;") - (if (not exists) ;; need to init the db - (if write-access - (let ((isql (get-rsql acfg 'dbinitsql))) ;; get the init sql statements - ;; (sqlite3:with-transaction - ;; db - ;; (lambda () - (if isql - (for-each - (lambda (sql) - (sqlite3:execute db sql)) - isql))) - (print "ERROR: no write access to " (area-dbdir acfg)))) - (make-dbdat dbh: db fname: fname write-access: write-access))) - -;; This is a low-level command to retrieve or to prepare, save and return a prepared statment -;; you must extract the db handle -;; -(define (get-sth db cache stmt) - (if (hash-table-exists? cache stmt) - (begin - ;; (print "Reusing cached stmt for " stmt) - (hash-table-ref/default cache stmt #f)) - (let ((sth (sqlite3:prepare db stmt))) - (hash-table-set! cache stmt sth) - ;; (print "prepared stmt for " stmt) - sth))) - -;; a little more expensive but does all the tedious deferencing - only use if you don't already -;; have dbdat and db sitting around -;; -(define (full-get-sth acfg fname stmt) - (let* ((dbdat (get-dbh acfg fname)) - (db (dbdat-dbh dbdat)) - (sths (dbdat-sths dbdat))) - (get-sth db sths stmt))) - -;; write to a db -;; acfg: area data -;; rdat: request data -;; hdat: (host . port) -;; -;; (define (dbwrite acfg rdat hdat data-in) -;; (let* ((dbname (car data-in)) -;; (dbdat (get-dbh acfg dbname)) -;; (db (dbdat-dbh dbdat)) -;; (sths (dbdat-sths dbdat)) -;; (stmt (calldat-obj rdat)) -;; (sth (get-sth db sths stmt)) -;; (data (cdr data-in))) -;; (print "dbname: " dbname " acfg: " acfg " rdat: " (calldat->alist rdat) " hdat: " hdat " data: " data) -;; (print "dbdat: " (dbdat->alist dbdat)) -;; (apply sqlite3:execute sth data) -;; (save-dbh acfg dbname dbdat) -;; #t -;; )) - -(define (finalize-all-db-handles acfg) - (let* ((dbhandles (area-dbhandles acfg)) ;; dbhandles is hash of fname ==> dbdat - (num 0)) - (for-each - (lambda (area-name) - (print "Closing handles for " area-name) - (let ((dbdats (hash-table-ref/default dbhandles area-name '()))) - (for-each - (lambda (dbdat) - ;; first close all statement handles - (for-each - (lambda (sth) - (sqlite3:finalize! sth) - (set! num (+ num 1))) - (hash-table-values (dbdat-sths dbdat))) - ;; now close the dbh - (set! num (+ num 1)) - (sqlite3:finalize! (dbdat-dbh dbdat))) - dbdats))) - (hash-table-keys dbhandles)) - (print "FINALIZED " num " dbhandles"))) - -;;====================================================================== -;; W O R K Q U E U E H A N D L I N G -;;====================================================================== - -(define (register-db-as-mine acfg dbname) - (let ((ht (area-dbs acfg))) - (if (not (hash-table-ref/default ht dbname #f)) - (hash-table-set! ht dbname (random 10000))))) - -(define (work-queue-add acfg fname witem) - (let* ((work-queue-start (current-milliseconds)) - (action (witem-action witem)) ;; NB the action is the index into the rdat actions - (qdat (or (hash-table-ref/default (area-wqueues acfg) fname #f) - (let ((newqdat (make-qdat))) - (hash-table-set! (area-wqueues acfg) fname newqdat) - newqdat))) - (rdat (hash-table-ref/default (area-rtable acfg) action #f))) - (if rdat - (queue-add! - (case (calldat-ctype rdat) - ((dbwrite) (register-db-as-mine acfg fname)(qdat-writeq qdat)) - ((dbread) (register-db-as-mine acfg fname)(qdat-readq qdat)) - ((dbrw) (register-db-as-mine acfg fname)(qdat-rwq qdat)) - ((oslong) (qdat-oslong qdat)) - ((osshort) (qdat-osshort qdat)) - ((full-ping) (qdat-misc qdat)) - (else - (print "ERROR: no queue for " action ". Adding to dbwrite queue.") - (qdat-writeq qdat))) - witem) - (case action - ((full-ping)(qdat-misc qdat)) - (else - (print "ERROR: No action " action " was registered")))) - (sdbg> "work-queue-add" "queue-add" work-queue-start #f #f) - #t)) ;; for now, simply return #t to indicate request got to the queue - -(define (doqueue acfg q fname dbdat dbh) - ;; (print "doqueue: " fname) - (let* ((start-time (current-milliseconds)) - (qlen (queue-length q))) - (if (> qlen 1) - (print "Processing queue of length " qlen)) - (let loop ((count 0) - (responses '())) - (let ((delta (- (current-milliseconds) start-time))) - (if (or (queue-empty? q) - (> delta 400)) ;; stop working on this queue after 400ms have passed - (list count delta responses) ;; return count, delta and responses list - (let* ((witem (queue-remove! q)) - (action (witem-action witem)) - (rdat (witem-rdat witem)) - (stmt (calldat-obj rdat)) - (sth (full-get-sth acfg fname stmt)) - (ctype (calldat-ctype rdat)) - (data (witem-data witem)) - (cookie (witem-cookie witem))) - ;; do the processing and save the result in witem-result - (witem-result-set! - witem - (case ctype ;; action - ((noblockwrite) ;; blind write, no ack of success returned - (apply sqlite3:execute sth data) - (sqlite3:last-insert-rowid dbh)) - ((dbwrite) ;; blocking write - (apply sqlite3:execute sth data) - #t) - ((dbread) ;; TODO: consider breaking this up and shipping in pieces for large query - (apply sqlite3:map-row (lambda x x) sth data)) - ((full-ping) 'full-ping) - (else (print "Not ready for action " action) #f))) - (loop (add1 count) - (if cookie - (cons witem responses) - responses)))))))) - -;; do up to 400ms of processing on each queue -;; - the work-queue-processor will allow the max 1200ms of work to complete but it will flag as overloaded -;; -(define (process-db-queries acfg fname) - (if (hash-table-exists? (area-wqueues acfg) fname) - (let* ((process-db-queries-start-time (current-milliseconds)) - (qdat (hash-table-ref/default (area-wqueues acfg) fname #f)) - (queue-sym->queue (lambda (queue-sym) - (case queue-sym ;; lookup the queue from qdat given a name (symbol) - ((wqueue) (qdat-writeq qdat)) - ((rqueue) (qdat-readq qdat)) - ((rwqueue) (qdat-rwq qdat)) - ((misc) (qdat-misc qdat)) - (else #f)))) - (dbdat (get-dbh acfg fname)) - (dbh (if (dbdat? dbdat)(dbdat-dbh dbdat) #f)) - (nowtime (current-seconds))) - ;; handle the queues that require a transaction - ;; - (map ;; - (lambda (queue-sym) - ;; (print "processing queue " queue-sym) - (let* ((queue (queue-sym->queue queue-sym))) - (if (not (queue-empty? queue)) - (let ((responses - (sqlite3:with-transaction ;; todo - catch exceptions... - dbh - (lambda () - (let* ((res (doqueue acfg queue fname dbdat dbh))) ;; this does the work! - ;; (print "res=" res) - (match res - ((count delta responses) - (update-stats acfg fname queue-sym delta count) - (sdbg> "process-db-queries" "sqlite3-transaction" process-db-queries-start-time #f #f) - responses) ;; return responses - (else - (print "ERROR: bad return data from doqueue " res))) - ))))) - ;; having completed the transaction, send the responses. - ;; (print "INFO: sending " (length responses) " responses.") - (let loop ((responses-left responses)) - (cond - ((null? responses-left) #t) - (else - (let* ((witem (car responses-left)) - (response (cdr responses-left))) - (call-deliver-response acfg (witem-ripaddr witem)(witem-rport witem) - (witem-cookie witem)(witem-result witem))) - (loop (cdr responses-left)))))) - ))) - '(wqueue rwqueue rqueue)) - - ;; handle misc queue - ;; - ;; (print "processing misc queue") - (let ((queue (queue-sym->queue 'misc))) - (doqueue acfg queue fname dbdat dbh)) - ;; .... - (save-dbh acfg fname dbdat) - #t ;; just to let the tests know we got here - ) - #f ;; nothing processed - )) - -;; run all queues in parallel per db but sequentially per queue for that db. -;; - process the queues every 500 or so ms -;; - allow for long running queries to continue but all other activities for that -;; db will be blocked. -;; -(define (work-queue-processor acfg) - (let* ((threads (make-hash-table))) ;; fname => thread - (let loop ((fnames (hash-table-keys (area-wqueues acfg))) - (target-time (+ (current-milliseconds) 50))) - ;;(if (not (null? fnames))(print "Processing for these databases: " fnames)) - (for-each - (lambda (fname) - ;; (print "processing for " fname) - ;;(process-db-queries acfg fname)) - (let ((th (hash-table-ref/default threads fname #f))) - (if (and th (not (member (thread-state th) '(dead terminated)))) - (begin - (print "WARNING: worker thread for " fname " is taking a long time.") - (print "Thread is in state " (thread-state th))) - (let ((th1 (make-thread (lambda () - ;; (catch-and-dump - ;; (lambda () - ;; (print "Process queries for " fname) - (let ((start-time (current-milliseconds))) - (process-db-queries acfg fname) - ;; (thread-sleep! 0.01) ;; need the thread to take at least some time - (hash-table-delete! threads fname)) ;; no mutexes? - fname) - "th1"))) ;; )) - (hash-table-set! threads fname th1) - (thread-start! th1))))) - fnames) - ;; (thread-sleep! 0.1) ;; give the threads some time to process requests - ;; burn time until 400ms is up - (let ((now-time (current-milliseconds))) - (if (< now-time target-time) - (let ((delta (- target-time now-time))) - (thread-sleep! (/ delta 1000))))) - (loop (hash-table-keys (area-wqueues acfg)) - (+ (current-milliseconds) 50))))) - -;;====================================================================== -;; S T A T S G A T H E R I N G -;;====================================================================== - -(defstruct stat - (qcount-avg 0) ;; coarse running average - (qtime-avg 0) ;; coarse running average - (qcount 0) ;; total - (qtime 0) ;; total - (last-qcount 0) ;; last - (last-qtime 0) ;; last - (dbs '()) ;; list of db files handled by this node - (when 0)) ;; when the last query happened - seconds - - -(define (update-stats acfg fname bucket duration numqueries) - (let* ((key fname) ;; for now do not use bucket. Was: (conc fname "-" bucket)) ;; lazy but good enough - (stats (or (hash-table-ref/default (area-stats acfg) key #f) - (let ((newstats (make-stat))) - (hash-table-set! (area-stats acfg) key newstats) - newstats)))) - ;; when the last query happended (used to remove the fname from the active list) - (stat-when-set! stats (current-seconds)) - ;; last values - (stat-last-qcount-set! stats numqueries) - (stat-last-qtime-set! stats duration) - ;; total over process lifetime - (stat-qcount-set! stats (+ (stat-qcount stats) numqueries)) - (stat-qtime-set! stats (+ (stat-qtime stats) duration)) - ;; coarse average - (stat-qcount-avg-set! stats (/ (+ (stat-qcount-avg stats) numqueries) 2)) - (stat-qtime-avg-set! stats (/ (+ (stat-qtime-avg stats) duration) 2)) - - ;; here is where we add the stats for a given dbfile - (if (not (member fname (stat-dbs stats))) - (stat-dbs-set! stats (cons fname (stat-dbs stats)))) - - )) - -;;====================================================================== -;; S E R V E R S T U F F -;;====================================================================== - -;; this does NOT return! -;; -(define (find-free-port-and-open acfg) - (let ((port (or (area-port acfg) 3200))) - (handle-exceptions - exn - (begin - (print "INFO: cannot bind to port " (rpc:default-server-port) ", trying next port") - (area-port-set! acfg (+ port 1)) - (find-free-port-and-open acfg)) - (rpc:default-server-port port) - (area-port-set! acfg port) - (tcp-read-timeout 120000) - ;; ((rpc:make-server (tcp-listen port)) #t) - (tcp-listen (rpc:default-server-port) - )))) - -;; register this node by putting a packet into the pkts dir. -;; look for other servers -;; contact other servers and compile list of servers -;; there are two types of server -;; main servers - dashboards, runners and dedicated servers - need pkt -;; passive servers - test executers, step calls, list-runs - no pkt -;; -(define (register-node acfg hostip port-num) - ;;(mutex-lock! (area-mutex acfg)) - (let* ((server-type (area-server-type acfg)) ;; auto, main, passive (no pkt created) - (best-ip (or hostip (get-my-best-address))) - (mtdir (area-dbdir acfg)) - (pktdir (area-pktsdir acfg))) ;; conc mtdir "/.server-pkts"))) - (print "Registering node " best-ip ":" port-num) - (if (not mtdir) ;; require a home for this node to put or find databases - #f - (begin - (if (not (directory? pktdir))(create-directory pktdir)) - ;; server is started, now create pkt if needed - (print "Starting server in " server-type " mode with port " port-num) - (if (member server-type '(auto main)) ;; TODO: if auto, count number of servers registers, if > 3 then don't put out a pkt - (begin - (area-pktid-set! acfg - (write-alist->pkt - pktdir - `((hostname . ,(get-host-name)) - (ipaddr . ,best-ip) - (port . ,port-num) - (pid . ,(current-process-id))) - pktspec: *pktspec* - ptype: 'server)) - (area-pktfile-set! acfg (conc pktdir "/" (area-pktid acfg) ".pkt")))) - (area-port-set! acfg port-num) - #;(mutex-unlock! (area-mutex acfg)))))) - -(define *cookie-seqnum* 0) -(define (make-cookie key) - (set! *cookie-seqnum* (add1 *cookie-seqnum*)) - ;;(print "MAKE COOKIE CALLED -- on "servkey"-"*cookie-seqnum*) - (conc key "-" *cookie-seqnum*) - ) - -;; dispatch locally if possible -;; -(define (call-deliver-response acfg ipaddr port cookie data) - (if (and (equal? (area-myaddr acfg) ipaddr) - (equal? (area-port acfg) port)) - (deliver-response acfg cookie data) - ((rpc:procedure 'response ipaddr port) cookie data))) - -(define (deliver-response acfg cookie data) - (let ((deliver-response-start (current-milliseconds))) - (thread-start! (make-thread - (lambda () - (let loop ((tries-left 5)) - ;;(print "TOP OF DELIVER_RESPONSE LOOP; triesleft="tries-left) - ;;(pp (hash-table->alist (area-cookie2mbox acfg))) - (let* ((mbox (hash-table-ref/default (area-cookie2mbox acfg) cookie #f))) - (cond - ((eq? 0 tries-left) - (print "ulex:deliver-response: I give up. Mailbox never appeared. cookie="cookie) - ) - (mbox - ;;(print "got mbox="mbox" got data="data" send.") - (mailbox-send! mbox data)) - (else - ;;(print "no mbox yet. look for "cookie) - (thread-sleep! (/ (- 6 tries-left) 10)) - (loop (sub1 tries-left)))))) - ;; (debug-pp (list (conc "ulex:deliver-response took " (- (current-milliseconds) deliver-response-start) " ms, cookie=" cookie " data=") data)) - (sdbg> "deliver-response" "mailbox-send" deliver-response-start #f #f cookie) - ) - (conc "deliver-response thread for cookie="cookie)))) - #t) - -;; action: -;; immediate - quick actions, no need to put in queues -;; dbwrite - put in dbwrite queue -;; dbread - put in dbread queue -;; oslong - os actions, e.g. du, that could take a long time -;; osshort - os actions that should be quick, e.g. df -;; -(define (request acfg from-ipaddr from-port servkey action cookie fname params) ;; std-peer-handler - ;; NOTE: Use rpc:current-peer for getting return address - (let* ((std-peer-handler-start (current-milliseconds)) - ;; (raw-data (alist-ref 'data dat)) - (rdat (hash-table-ref/default - (area-rtable acfg) action #f)) ;; this looks up the sql query or other details indexed by the action - (witem (make-witem ripaddr: from-ipaddr ;; rhost: from-host - rport: from-port action: action - rdat: rdat cookie: cookie - servkey: servkey data: params ;; TODO - rename data to params - caller: (rpc:current-peer)))) - (if (not (equal? servkey (area-pktid acfg))) - `(#f . ,(conc "I don't know you servkey=" servkey ", pktid=" (area-pktid acfg))) ;; immediately return this - (let* ((ctype (if rdat - (calldat-ctype rdat) ;; is this necessary? these should be identical - action))) - (sdbg> "std-peer-handler" "immediate" std-peer-handler-start #f #f) - (case ctype - ;; (dbwrite acfg rdat (cons from-ipaddr from-port) data))) - ((full-ping) `(#t "ack to full ping" ,(work-queue-add acfg fname witem) ,cookie)) - ((response) `(#t "ack from requestor" ,(deliver-response acfg fname params))) - ((dbwrite) `(#t "db write submitted" ,(work-queue-add acfg fname witem) ,cookie)) - ((dbread) `(#t "db read submitted" ,(work-queue-add acfg fname witem) ,cookie )) - ((dbrw) `(#t "db read/write submitted" ,cookie)) - ((osshort) `(#t "os short submitted" ,cookie)) - ((oslong) `(#t "os long submitted" ,cookie)) - (else `(#f "unrecognised action" ,ctype))))))) - -;; Call this to start the actual server -;; -;; start_server -;; -;; mode: ' -;; handler: proc which takes pktrecieved as argument -;; - -(define (start-server acfg) - (let* ((conn (find-free-port-and-open acfg)) - (port (area-port acfg))) - (rpc:publish-procedure! - 'delist-db - (lambda (fname) - (hash-table-delete! (area-dbs acfg) fname))) - (rpc:publish-procedure! - 'calling-addr - (lambda () - (rpc:current-peer))) - (rpc:publish-procedure! - 'ping - (lambda ()(real-ping acfg))) - (rpc:publish-procedure! - 'request - (lambda (from-addr from-port servkey action cookie dbname params) - (request acfg from-addr from-port servkey action cookie dbname params))) - (rpc:publish-procedure! - 'response - (lambda (cookie res-dat) - (deliver-response acfg cookie res-dat))) - (area-ready-set! acfg #t) - (area-conn-set! acfg conn) - ((rpc:make-server conn) #f)));; ((tcp-listen (rpc:default-server-port)) #t) - - -(define (launch acfg) ;; #!optional (proc std-peer-handler)) - (print "starting launch") - (update-known-servers acfg) ;; gotta do this on every start (thus why limit number of publicised servers) - #;(let ((original-handler (current-exception-handler))) ;; is th - (lambda (exception) - (server-exit-procedure) - (original-handler exception))) - (on-exit (lambda () - (shutdown acfg))) ;; (finalize-all-db-handles acfg))) - ;; set up the rpc handler - (let* ((th1 (make-thread - (lambda ()(start-server acfg)) - "server thread")) - (th2 (make-thread - (lambda () - (print "th2 starting") - (let loop () - (work-queue-processor acfg) - (print "work-queue-processor crashed!") - (loop))) - "work queue thread"))) - (thread-start! th1) - (thread-start! th2) - (let loop () - (thread-sleep! 0.025) - (if (area-ready acfg) - #t - (loop))) - ;; attempt to fix my address - (let* ((all-addr (get-all-ips-sorted))) ;; could use (tcp-addresses conn)? - (let loop ((rem-addrs all-addr)) - (if (null? rem-addrs) - (begin - (print "ERROR: Failed to figure out the ip address of myself as a server. Giving up.") - (exit 1)) ;; BUG Changeme to raising an exception - - (let* ((addr (car rem-addrs)) - (good-addr (handle-exceptions - exn - #f - ((rpc:procedure 'calling-addr addr (area-port acfg)))))) - (if good-addr - (begin - (print "Got good-addr of " good-addr) - (area-myaddr-set! acfg good-addr)) - (loop (cdr rem-addrs))))))) - (register-node acfg (area-myaddr acfg)(area-port acfg)) - (print "INFO: Server started on " (area-myaddr acfg) ":" (area-port acfg)) - ;; (update-known-servers acfg) ;; gotta do this on every start (thus why limit number of publicised servers) - )) - -(define (clear-server-pkt acfg) - (let ((pktf (area-pktfile acfg))) - (if pktf (delete-file* pktf)))) - -(define (shutdown acfg) - (let (;;(conn (area-conn acfg)) - (pktf (area-pktfile acfg)) - (port (area-port acfg))) - (if pktf (delete-file* pktf)) - (send-all "imshuttingdown") - ;; (rpc:close-all-connections!) ;; don't know if this is actually needed - (finalize-all-db-handles acfg))) - -(define (send-all msg) - #f) - -;; given a area record look up all the packets -;; -(define (get-all-server-pkts acfg) - (let ((all-pkt-files (glob (conc (area-pktsdir acfg) "/*.pkt")))) - (map (lambda (pkt-file) - (read-pkt->alist pkt-file pktspec: *pktspec*)) - all-pkt-files))) - -#;((Z . "9a0212302295a19610d5796fce0370fa130758e9") - (port . "34827") - (pid . "28748") - (hostname . "zeus") - (T . "server") - (D . "1549427032.0")) - -#;(define (get-my-best-address) - (let ((all-my-addresses (get-all-ips))) ;; (vector->list (hostinfo-addresses (hostname->hostinfo (get-host-name)))))) - (cond - ((null? all-my-addresses) - (get-host-name)) ;; no interfaces? - ((eq? (length all-my-addresses) 1) - (ip->string (car all-my-addresses))) ;; only one to choose from, just go with it - (else - (ip->string (car (filter (lambda (x) ;; take any but 127. - (not (eq? (u8vector-ref x 0) 127))) - all-my-addresses))))))) - -;; whoami? I am my pkt -;; -(define (whoami? acfg) - (hash-table-ref/default (area-hosts acfg)(area-pktid acfg) #f)) - -;;====================================================================== -;; "Client side" operations -;;====================================================================== - -(define (safe-call call-key host port . params) - (handle-exceptions - exn - (begin - (print "Call " call-key " to " host ":" port " failed") - #f) - (apply (rpc:procedure call-key host port) params))) - -;; ;; convert to/from string / sexpr -;; -;; (define (string->sexpr str) -;; (if (string? str) -;; (with-input-from-string str read) -;; str)) -;; -;; (define (sexpr->string s) -;; (with-output-to-string (lambda ()(write s)))) - -;; is the server alive? -;; -(define (ping acfg host port) - (let* ((myaddr (area-myaddr acfg)) - (myport (area-port acfg)) - (start-time (current-milliseconds)) - (res (if (and (equal? myaddr host) - (equal? myport port)) - (real-ping acfg) - ((rpc:procedure 'ping host port))))) - (cons (- (current-milliseconds) start-time) - res))) - -;; returns ( ipaddr port alist-fname=>randnum ) -(define (real-ping acfg) - `(,(area-myaddr acfg) ,(area-port acfg) ,(get-host-stats acfg))) - -;; is the server alive AND the queues processing? -;; -#;(define (full-ping acfg servpkt) - (let* ((start-time (current-milliseconds)) - (res (send-message acfg servpkt '(full-ping) 'full-ping))) - (cons (- (current-milliseconds) start-time) - res))) ;; (equal? res "got ping")))) - - -;; look up all pkts and get the server id (the hash), port, host/ip -;; store this info in acfg -;; return the number of responsive servers found -;; -;; DO NOT VERIFY THAT THE SERVER IS ALIVE HERE. This is called at times where the current server is not yet alive and cannot ping itself -;; -(define (update-known-servers acfg) - ;; readll all pkts - ;; foreach pkt; if it isn't me ping the server; if alive, add to hosts hash, else rm the pkt - (let* ((start-time (current-milliseconds)) - (all-pkts (delete-duplicates - (append (get-all-server-pkts acfg) - (hash-table-values (area-hosts acfg))))) - (hostshash (area-hosts acfg)) - (my-id (area-pktid acfg)) - (pktsdir (area-pktsdir acfg)) ;; needed to remove pkts from non-responsive servers - (numsrvs 0) - (delpkt (lambda (pktsdir sid) - (print "clearing out server " sid) - (delete-file* (conc pktsdir "/" sid ".pkt")) - (hash-table-delete! hostshash sid)))) - (area-last-srvup-set! acfg (current-seconds)) - (for-each - (lambda (servpkt) - (if (list? servpkt) - ;; (pp servpkt) - (let* ((shost (alist-ref 'ipaddr servpkt)) - (sport (any->number (alist-ref 'port servpkt))) - (res (handle-exceptions - exn - (begin - ;; (print "INFO: bad server on " shost ":" sport) - #f) - (ping acfg shost sport))) - (sid (alist-ref 'Z servpkt)) ;; Z code is our name for the server - (url (conc shost ":" sport)) - ) - #;(if (or (not res) - (null? res)) - (begin - (print "STRANGE: ping of " url " gave " res))) - - ;; (print "Got " res " from " shost ":" sport) - (match res - ((qduration . payload) - ;; (print "Server pkt:" (alist-ref 'ipaddr servpkt) ":" (alist-ref 'port servpkt) - ;; (if payload - ;; "Success" "Fail")) - (match payload - ((host port stats) - ;; (print "From " host ":" port " got stats: " stats) - (if (and host port stats) - (let ((url (conc host ":" port))) - (hash-table-set! hostshash sid servpkt) - ;; store based on host:port - (hash-table-set! (area-hoststats acfg) sid stats)) - (print "missing data from the server, not sure what that means!")) - (set! numsrvs (+ numsrvs 1))) - (#f - (print "Removing pkt " sid " due to #f from server or failed ping") - (delpkt pktsdir sid)) - (else - (print "Got ")(pp res)(print " from server ")(pp servpkt) " but response did not match (#f/#t . msg)"))) - (else - ;; here we delete the pkt - can't reach the server, remove it - ;; however this logic is inadequate. we should mark the server as checked - ;; and not good, if it happens a second time - then remove the pkt - ;; or something similar. I.e. don't be too quick to assume the server is wedged or dead - ;; could be it is simply too busy to reply - (let ((bad-pings (hash-table-ref/default (area-health acfg) url 0))) - (if (> bad-pings 1) ;; two bad pings - remove pkt - (begin - (print "INFO: " bad-pings " bad responses from " url ", deleting pkt " sid) - (delpkt pktsdir sid)) - (begin - (print "INFO: " bad-pings " bad responses from " shost ":" sport " not deleting pkt yet") - (hash-table-set! (area-health acfg) - url - (+ (hash-table-ref/default (area-health acfg) url 0) 1)) - )) - )))) - ;; servpkt is not actually a pkt? - (begin - (print "Bad pkt " servpkt)))) - all-pkts) - (sdbg> "update-known-servers" "end" start-time #f #f " found " numsrvs - " servers, pkts: " (map (lambda (p) - (alist-ref 'Z p)) - all-pkts)) - numsrvs)) - -(defstruct srvstat - (numfiles 0) ;; number of db files handled by this server - subtract 1 for the db being currently looked at - (randnum #f) ;; tie breaker number assigned to by the server itself - applies only to the db under consideration - (pkt #f)) ;; the server pkt - -;;(define (srv->srvstat srvpkt) - -;; Get the server best for given dbname and key -;; -;; NOTE: key is not currently used. The key points to the kind of query, this may be useful for directing read-only queries. -;; -(define (get-best-server acfg dbname key) - (let* (;; (servers (hash-table-values (area-hosts acfg))) - (servers (area-hosts acfg)) - (skeys (sort (hash-table-keys servers) string>=?)) ;; a stable listing - (start-time (current-milliseconds)) - (srvstats (make-hash-table)) ;; srvid => srvstat - (url (conc (area-myaddr acfg) ":" (area-port acfg)))) - ;; (print "scores for " dbname ": " (map (lambda (k)(cons k (calc-server-score acfg dbname k))) skeys)) - (if (null? skeys) - (if (> (update-known-servers acfg) 0) - (get-best-server acfg dbname key) ;; some risk of infinite loop here, TODO add try counter - (begin - (print "ERROR: no server found!") ;; since this process is also a server this should never happen - #f)) - (begin - ;; (print "in get-best-server with skeys=" skeys) - (if (> (- (current-seconds) (area-last-srvup acfg)) 10) - (begin - (update-known-servers acfg) - (sdbg> "get-best-server" "update-known-servers" start-time #f #f))) - - ;; for each server look at the list of dbfiles, total number of dbs being handled - ;; and the rand number, save the best host - ;; also do a delist-db for each server dbfile not used - (let* ((best-server #f) - (servers-to-delist (make-hash-table))) - (for-each - (lambda (srvid) - (let* ((server (hash-table-ref/default servers srvid #f)) - (stats (hash-table-ref/default (area-hoststats acfg) srvid '(())))) - ;; (print "stats: " stats) - (if server - (let* ((dbweights (car stats)) - (srvload (length (filter (lambda (x)(not (equal? dbname (car x)))) dbweights))) - (dbrec (alist-ref dbname dbweights equal?)) ;; get the pair with fname . randscore - (randnum (if dbrec - dbrec ;; (cdr dbrec) - 0))) - (hash-table-set! srvstats srvid (make-srvstat numfiles: srvload randnum: randnum pkt: server)))))) - skeys) - - (let* ((sorted (sort (hash-table-values srvstats) - (lambda (a b) - (let ((numfiles-a (srvstat-numfiles a)) - (numfiles-b (srvstat-numfiles b)) - (randnum-a (srvstat-randnum a)) - (randnum-b (srvstat-randnum b))) - (if (< numfiles-a numfiles-b) ;; Note, I don't think adding an offset works here. Goal was only move file handling to a different server if it has 2 less - #t - (if (and (equal? numfiles-a numfiles-b) - (< randnum-a randnum-b)) - #t - #f)))))) - (best (if (null? sorted) - (begin - (print "ERROR: should never be null due to self as server.") - #f) - (srvstat-pkt (car sorted))))) - #;(print "SERVER(" url "): " dbname ": " (map (lambda (srv) - (let ((p (srvstat-pkt srv))) - (conc (alist-ref 'ipaddr p) ":" (alist-ref 'port p) - "(" (srvstat-numfiles srv)","(srvstat-randnum srv)")"))) - sorted)) - best)))))) - - ;; send out an "I'm about to exit notice to all known servers" - ;; -(define (death-imminent acfg) - '()) - -;;====================================================================== -;; U L E X - T H E I N T E R E S T I N G S T U F F ! ! -;;====================================================================== - -;; register a handler -;; NOTES: -;; dbinitsql is reserved for a list of sql statements for initializing the db -;; dbinitfn is reserved for a db init function, if exists called after dbinitsql -;; -(define (register acfg key obj #!optional (ctype 'dbwrite)) - (let ((ht (area-rtable acfg))) - (if (hash-table-exists? ht key) - (print "WARNING: redefinition of entry " key)) - (hash-table-set! ht key (make-calldat obj: obj ctype: ctype)))) - -;; usage: register-batch acfg '((key1 . sql1) (key2 . sql2) ... ) -;; NB// obj is often an sql query -;; -(define (register-batch acfg ctype data) - (let ((ht (area-rtable acfg))) - (map (lambda (dat) - (hash-table-set! ht (car dat)(make-calldat obj: (cdr dat) ctype: ctype))) - data))) - -(define (initialize-area-calls-from-specfile area specfile) - (let* ((callspec (with-input-from-file specfile read ))) - (for-each (lambda (group) - (register-batch - area - (car group) - (cdr group))) - callspec))) - -;; get-rentry -;; -(define (get-rentry acfg key) - (hash-table-ref/default (area-rtable acfg) key #f)) - -(define (get-rsql acfg key) - (let ((cdat (get-rentry acfg key))) - (if cdat - (calldat-obj cdat) - #f))) - - - -;; blocking call: -;; client server -;; ------ ------ -;; call() -;; send-message() -;; nmsg-send() -;; nmsg-receive() -;; nmsg-respond(ack,cookie) -;; ack, cookie -;; mbox-thread-wait(cookie) -;; nmsg-send(client,cookie,result) -;; nmsg-respond(ack) -;; return result -;; -;; reserved action: -;; 'immediate -;; 'dbinitsql -;; -(define (call acfg dbname action params #!optional (count 0)) - (let* ((call-start-time (current-milliseconds)) - (srv (get-best-server acfg dbname action)) - (post-get-start-time (current-milliseconds)) - (rdat (hash-table-ref/default (area-rtable acfg) action #f)) - (myid (trim-pktid (area-pktid acfg))) - (srvid (trim-pktid (alist-ref 'Z srv))) - (cookie (make-cookie myid))) - (sdbg> "call" "get-best-server" call-start-time #f call-start-time " from: " myid " to server: " srvid " for " dbname " action: " action " params: " params " rdat: " rdat) - (print "INFO: call to " (alist-ref 'ipaddr srv) ":" (alist-ref 'port srv) " from " (area-myaddr acfg) ":" (area-port acfg) " for " dbname) - (if (and srv rdat) ;; need both to dispatch a request - (let* ((ripaddr (alist-ref 'ipaddr srv)) - (rsrvid (alist-ref 'Z srv)) - (rport (any->number (alist-ref 'port srv))) - (res-full (if (and (equal? ripaddr (area-myaddr acfg)) - (equal? rport (area-port acfg))) - (request acfg ripaddr rport (area-pktid acfg) action cookie dbname params) - (safe-call 'request ripaddr rport - (area-myaddr acfg) - (area-port acfg) - #;(area-pktid acfg) - rsrvid - action cookie dbname params)))) - ;; (print "res-full: " res-full) - (match res-full - ((response-ok response-msg rem ...) - (let* ((send-message-time (current-milliseconds)) - ;; (match res-full - ;; ((response-ok response-msg) - ;; (response-ok (car res-full)) - ;; (response-msg (cadr res-full) - ) - ;; (res (take res-full 3))) ;; ctype == action, TODO: converge on one term <<=== what was this? BUG - ;; (print "ulex:call: send-message took " (- send-message-time post-get-start-time) " ms params=" params) - (sdbg> "call" "send-message" post-get-start-time #f call-start-time) - (cond - ((not response-ok) #f) - ((member response-msg '("db read submitted" "db write submitted")) - (let* ((cookie-id (cadddr res-full)) - (mbox (make-mailbox)) - (mbox-time (current-milliseconds))) - (hash-table-set! (area-cookie2mbox acfg) cookie-id mbox) - (let* ((mbox-timeout-secs 20) - (mbox-timeout-result 'MBOX_TIMEOUT) - (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result)) - (mbox-receive-time (current-milliseconds))) - (hash-table-delete! (area-cookie2mbox acfg) cookie-id) - (sdbg> "call" "mailbox-receive" mbox-time #f call-start-time " from: " myid " to server: " srvid " for " dbname) - ;; (print "ulex:call mailbox-receive took " (- mbox-receive-time mbox-time) "ms params=" params) - res))) - (else - (print "Unhandled response \""response-msg"\"") - #f)) - ;; depending on what action (i.e. ctype) is we will block here waiting for - ;; all the data (mechanism to be determined) - ;; - ;; if res is a "working on it" then wait - ;; wait for result - ;; mailbox thread wait on - - ;; if res is a "can't help you" then try a different server - ;; if res is a "ack" (e.g. for one-shot requests) then return res - )) - (else - (if (< count 10) - (let* ((url (conc (alist-ref 'ipaddr srv) ":" (alist-ref 'port srv)))) - (thread-sleep! 1) - (print "ERROR: Bad result from " url ", dbname: " dbname ", action: " action ", params: " params ". Trying again in 1 second.") - (call acfg dbname action params (+ count 1))) - (begin - (error (conc "ERROR: " count " tries, still have improper response res-full=" res-full))))))) - (begin - (if (not rdat) - (print "ERROR: action " action " not registered.") - (if (< count 10) - (begin - (thread-sleep! 1) - (area-hosts-set! acfg (make-hash-table)) ;; clear out all known hosts - (print "ERROR: no server found, srv=" srv ", trying again in 1 seconds") - (call acfg dbname action params (+ count 1))) - (begin - (error (conc "ERROR: no server found after 10 tries, srv=" srv ", giving up.")) - #;(error "No server available")))))))) - - -;;====================================================================== -;; U T I L I T I E S -;;====================================================================== - -;; get a signature for identifing this process -;; -(define (get-process-signature) - (cons (get-host-name)(current-process-id))) - -;;====================================================================== -;; S Y S T E M S T U F F -;;====================================================================== - -;; get normalized cpu load by reading from /proc/loadavg and -;; /proc/cpuinfo return all three values and the number of real cpus -;; and the number of threads returns alist '((adj-cpu-load -;; . normalized-proc-load) ... etc. keys: adj-proc-load, -;; adj-core-load, 1m-load, 5m-load, 15m-load -;; -(define (get-normalized-cpu-load) - (let ((res (get-normalized-cpu-load-raw)) - (default `((adj-proc-load . 2) ;; there is no right answer - (adj-core-load . 2) - (1m-load . 2) - (5m-load . 0) ;; causes a large delta - thus causing default of throttling if stuff goes wrong - (15m-load . 0) - (proc . 1) - (core . 1) - (phys . 1) - (error . #t)))) - (cond - ((and (list? res) - (> (length res) 2)) - res) - ((eq? res #f) default) ;; add messages? - ((eq? res #f) default) ;; this would be the #eof - (else default)))) - -(define (get-normalized-cpu-load-raw) - (let* ((actual-host (get-host-name))) ;; #f is localhost - (let ((data (append - (with-input-from-file "/proc/loadavg" read-lines) - (with-input-from-file "/proc/cpuinfo" read-lines) - (list "end"))) - (load-rx (regexp "^([\\d\\.]+)\\s+([\\d\\.]+)\\s+([\\d\\.]+)\\s+.*$")) - (proc-rx (regexp "^processor\\s+:\\s+(\\d+)\\s*$")) - (core-rx (regexp "^core id\\s+:\\s+(\\d+)\\s*$")) - (phys-rx (regexp "^physical id\\s+:\\s+(\\d+)\\s*$")) - (max-num (lambda (p n)(max (string->number p) n)))) - ;; (print "data=" data) - (if (null? data) ;; something went wrong - #f - (let loop ((hed (car data)) - (tal (cdr data)) - (loads #f) - (proc-num 0) ;; processor includes threads - (phys-num 0) ;; physical chip on motherboard - (core-num 0)) ;; core - ;; (print hed ", " loads ", " proc-num ", " phys-num ", " core-num) - (if (null? tal) ;; have all our data, calculate normalized load and return result - (let* ((act-proc (+ proc-num 1)) - (act-phys (+ phys-num 1)) - (act-core (+ core-num 1)) - (adj-proc-load (/ (car loads) act-proc)) - (adj-core-load (/ (car loads) act-core)) - (result - (append (list (cons 'adj-proc-load adj-proc-load) - (cons 'adj-core-load adj-core-load)) - (list (cons '1m-load (car loads)) - (cons '5m-load (cadr loads)) - (cons '15m-load (caddr loads))) - (list (cons 'proc act-proc) - (cons 'core act-core) - (cons 'phys act-phys))))) - result) - (regex-case - hed - (load-rx ( x l1 l5 l15 ) (loop (car tal)(cdr tal)(map string->number (list l1 l5 l15)) proc-num phys-num core-num)) - (proc-rx ( x p ) (loop (car tal)(cdr tal) loads (max-num p proc-num) phys-num core-num)) - (phys-rx ( x p ) (loop (car tal)(cdr tal) loads proc-num (max-num p phys-num) core-num)) - (core-rx ( x c ) (loop (car tal)(cdr tal) loads proc-num phys-num (max-num c core-num))) - (else - (begin - ;; (print "NO MATCH: " hed) - (loop (car tal)(cdr tal) loads proc-num phys-num core-num)))))))))) - -(define (get-host-stats acfg) - (let ((stats-hash (area-stats acfg))) - ;; use this opportunity to remove references to dbfiles which have not been accessed in a while - (for-each - (lambda (dbname) - (let* ((stats (hash-table-ref stats-hash dbname)) - (last-access (stat-when stats))) - (if (and (> last-access 0) ;; if zero then there has been no access - (> (- (current-seconds) last-access) 10)) ;; not used in ten seconds - (begin - (print "Removing " dbname " from stats list") - (hash-table-delete! stats-hash dbname) ;; remove from stats hash - (stat-dbs-set! stats (hash-table-keys stats)))))) - (hash-table-keys stats-hash)) - - `(,(hash-table->alist (area-dbs acfg)) ;; dbname => randnum - ,(map (lambda (dbname) ;; dbname is the db name - (cons dbname (stat-when (hash-table-ref stats-hash dbname)))) - (hash-table-keys stats-hash)) - (cpuload . ,(get-normalized-cpu-load))))) - #;(stats . ,(map (lambda (k) ;; create an alist from the stats data - (cons k (stat->alist (hash-table-ref (area-stats acfg) k)))) - (hash-table-keys (area-stats acfg)))) - -#;(trace - ;; assv - ;; cdr - ;; caar - ;; ;; cdr - ;; call - ;; finalize-all-db-handles - ;; get-all-server-pkts - ;; get-normalized-cpu-load - ;; get-normalized-cpu-load-raw - ;; launch - ;; nmsg-send - ;; process-db-queries - ;; receive-message - ;; std-peer-handler - ;; update-known-servers - ;; work-queue-processor - ) - -;;====================================================================== -;; netutil -;; move this back to ulex-netutil.scm someday? -;;====================================================================== - -;; #include -;; #include -;; #include -;; #include - -(foreign-declare "#include \"sys/types.h\"") -(foreign-declare "#include \"sys/socket.h\"") -(foreign-declare "#include \"ifaddrs.h\"") -(foreign-declare "#include \"arpa/inet.h\"") - -;; get IP addresses from ALL interfaces -(define get-all-ips - (foreign-safe-lambda* scheme-object () - " - -// from https://stackoverflow.com/questions/17909401/linux-c-get-default-interfaces-ip-address : - - - C_word lst = C_SCHEME_END_OF_LIST, len, str, *a; -// struct ifaddrs *ifa, *i; -// struct sockaddr *sa; - - struct ifaddrs * ifAddrStruct = NULL; - struct ifaddrs * ifa = NULL; - void * tmpAddrPtr = NULL; - - if ( getifaddrs(&ifAddrStruct) != 0) - C_return(C_SCHEME_FALSE); - -// for (i = ifa; i != NULL; i = i->ifa_next) { - for (ifa = ifAddrStruct; ifa != NULL; ifa = ifa->ifa_next) { - if (ifa->ifa_addr->sa_family==AF_INET) { // Check it is - // a valid IPv4 address - tmpAddrPtr = &((struct sockaddr_in *)ifa->ifa_addr)->sin_addr; - char addressBuffer[INET_ADDRSTRLEN]; - inet_ntop(AF_INET, tmpAddrPtr, addressBuffer, INET_ADDRSTRLEN); -// printf(\"%s IP Address %s\\n\", ifa->ifa_name, addressBuffer); - len = strlen(addressBuffer); - a = C_alloc(C_SIZEOF_PAIR + C_SIZEOF_STRING(len)); - str = C_string(&a, len, addressBuffer); - lst = C_a_pair(&a, str, lst); - } - -// else if (ifa->ifa_addr->sa_family==AF_INET6) { // Check it is -// // a valid IPv6 address -// tmpAddrPtr = &((struct sockaddr_in6 *)ifa->ifa_addr)->sin6_addr; -// char addressBuffer[INET6_ADDRSTRLEN]; -// inet_ntop(AF_INET6, tmpAddrPtr, addressBuffer, INET6_ADDRSTRLEN); -//// printf(\"%s IP Address %s\\n\", ifa->ifa_name, addressBuffer); -// len = strlen(addressBuffer); -// a = C_alloc(C_SIZEOF_PAIR + C_SIZEOF_STRING(len)); -// str = C_string(&a, len, addressBuffer); -// lst = C_a_pair(&a, str, lst); -// } - -// else { -// printf(\" not an IPv4 address\\n\"); -// } - - } - - freeifaddrs(ifa); - C_return(lst); - -")) + +(module ulex + * + +(import scheme posix-groups (chicken base) queues (chicken port) (chicken io) (chicken file) mailbox system-information) +(import srfi-18 pkts matchable regex + typed-records srfi-69 srfi-1 + srfi-4 regex-case + (prefix sqlite3 sqlite3:) + (chicken foreign) + (chicken sort) + (chicken process-context posix) + (chicken process-context) + tcp6 + ;; ulex-netutil + hostinfo + (chicken file posix) + (chicken random) + (chicken pretty-print) + (chicken string) + (chicken time) + (chicken condition) + (chicken tcp)) ;; ulex-netutil) + +;;====================================================================== +;; KEY FUNCTIONS - THESE ARE TOO BE EXPOSED AND USED +;;====================================================================== + +;; connection setup and management functions + +;; This is the basic setup command. Must always be +;; called before connecting to a db using connect. +;; +;; find or become the captain +;; setup and return a ulex object +;; +(define (setup) + (let* ((udata (make-udat)) + (cpkts (get-all-captain-pkts udata)) ;; read captain pkts + (captn (get-winning-pkt cpkts))) + (if captn + (let* ((port (alist-ref 'port captn)) + (host (alist-ref 'host captn)) + (ipaddr (alist-ref 'ipaddr captn)) + (pid (alist-ref 'pid captn)) + (Z (alist-ref 'Z captn))) + (udat-captain-address-set! udata ipaddr) + (udat-captain-host-set! udata host) + (udat-captain-port-set! udata port) + (udat-captain-pid-set! udata pid) + (if (ping udata (conc ipaddr ":" port)) + udata + (begin + (remove-captain-pkt udata captn) + (setup)))) + (begin + (setup-as-captain udata) ;; this saves the thread to captain-thread and starts the thread + (setup))) + )) + +;; connect to a specific dbfile +(define (connect udata dbfname dbtype) + udata) + +(define (ping udata host-port) + (let ((cookie (make-cookie udata))) + (send udata host-port 'ping "just pinging" (conc (current-seconds))) + ;; (mailbox-rec + )) + +;;====================================================================== +;; network utilities +;;====================================================================== + +(define (rate-ip ipaddr) + (regex-case ipaddr + ( "^127\\..*" _ 0 ) + ( "^(10\\.0|192\\.168)\\..*" _ 1 ) + ( else 2 ) )) ;; Change this to bias for addresses with a reasonable broadcast value? ;; (define (ip-pref-less? a b) - (let* ((rate (lambda (ipstr) - (regex-case ipstr - ( "^127\\." _ 0 ) - ( "^(10\\.0|192\\.168\\.)\\..*" _ 1 ) - ( else 2 ) )))) - (< (rate a) (rate b)))) + (> (rate-ip a) (rate-ip b))) (define (get-my-best-address) (let ((all-my-addresses (get-all-ips)) ;;(all-my-addresses-old (vector->list (hostinfo-addresses (hostname->hostinfo (get-host-name))))) @@ -1446,7 +132,1800 @@ ))) (define (get-all-ips-sorted) (sort (get-all-ips) ip-pref-less?)) +(define (get-all-ips) + (map ip->string (vector->list + (hostinfo-addresses + (host-information (get-host-name)))))) + +;; make it a global? Well, it is local to area module + +(define *captain-pktspec* + `((captain (host . h) + (port . p) + (pid . i) + (ipaddr . a) + ) + #;(data (hostname . h) ;; sender hostname + (port . p) ;; sender port + (ipaddr . a) ;; sender ip + (hostkey . k) ;; sending host key - store info at server under this key + (servkey . s) ;; server key - this needs to match at server end or reject the msg + (format . f) ;; sb=serialized-base64, t=text, sx=sexpr, j=json + (data . d) ;; base64 encoded slln data + ))) + +;; struct for keeping track of our world +;;(use trace) +;;(trace-call-sites #t) + +(defstruct udat + ;; captain info + (captain-address #f) + (captain-host #f) + (captain-port #f) + (captain-pid #f) + (ulex-dir (conc (get-environment-variable "HOME") "/.ulex")) + (cpkts-dir (conc (get-environment-variable "HOME") "/.ulex/pkts")) + (cpkt-spec *captain-pktspec*) + ;; this processes info + (my-cpkt-key #f) ;; put Z card here when I create a pkt for myself as captain + (my-address #f) + (my-hostname #f) + (my-port #f) + (my-pid (current-process-id)) + ;; server and handler thread + (serv-listener #f) ;; this processes server info + (handler-thread #f) + (handlers (make-hash-table)) + (outgoing-conns (make-hash-table)) ;; host:port -> conn + (mboxes (make-hash-table)) ;; key => mbox + (work-queue (make-queue)) ;; most stuff goes here + (fast-queue (make-queue)) ;; super quick stuff goes here (e.g. ping) + (busy #f) ;; is either of the queues busy + ;; app info + (appname #f) + (dbtypes (make-hash-table)) ;; this should be an alist but hash is easier. dbtype => [ initproc syncproc ] + ;; cookies + (cnum 0) ;; cookie num + ) + +;; struct for keeping track of others we are talking to + +(defstruct peer + (addr-port #f) + (hostname #f) + (pid #f) + (inp #f) ;; input port from the peer + (oup #f) ;; output port to the peer + (owns '()) ;; list of databases this peer is currently handling + ) + +(defstruct work + (peer-dat #f) + (handlerkey #f) + (qrykey #f) + (data #f) + (start (current-milliseconds))) + +;;====================================================================== +;; Captain pkt functions +;;====================================================================== + +;; given a pkts dir read + ;;(hash-table-set! ht dbname (pseudo-random-integer 10000))))) +;; +(define (get-all-captain-pkts udata) + (let* ((pktsdir (let ((d (udat-cpkts-dir udata))) + (if (file-exists? d) + d + (begin + (create-directory d #t) + d)))) + (all-pkt-files (glob (conc pktsdir "/*.pkt"))) + (pkt-spec (udat-cpkt-spec udata))) + (map (lambda (pkt-file) + (read-pkt->alist pkt-file pktspec: pkt-spec)) + all-pkt-files))) + +;; sort by D then Z, return one, choose the oldest then +;; differentiate if needed using the Z key +;;l +(define (get-winning-pkt pkts) + (if (null? pkts) + #f + (car (sort pkts (lambda (a b) + (let ((ad (string->number (alist-ref 'D a))) + (bd (string->number (alist-ref 'D b)))) + (if (eq? a b) + (let ((az (alist-ref 'Z a)) + (bz (alist-ref 'Z b))) + (string>=? az bz)) + (> ad bd)))))))) + +;; remove pkt associated with captn (the Z key .pkt) +;; +(define (remove-captain-pkt udata captn) + (let ((Z (alist-ref 'Z captn)) + (cpktdir (udat-cpkts-dir udata))) + (delete-file* (conc cpktdir "/" Z ".pkt")))) + + +;;====================================================================== +;; server primitives +;;====================================================================== + +(define (make-cookie udata) + (let ((newcnum (+ (udat-cnum udata)))) + (udat-cnum-set! udata newcnum) + (conc (udat-my-address udata) ":" + (udat-my-port udata) "-" + (udat-my-pid udata) "-" + newcnum))) + +;; create a tcp listener and return a populated udat struct with +;; my port, address, hostname, pid etc. +;; return #f if fail to find a port to allocate. +;; +(define (start-server-find-port udata #!optional (port 4242)) + (handle-exceptions + exn + (if (< port 65535)(start-server-find-port udata (+ port 1)) #f) + (connect-server udata port))) + +(define (connect-server udata port) + ;; (tcp-listener-socket LISTENER)(socket-name so) + ;; sockaddr-address, sockaddr-port, sockaddr->string + (let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]]) + (addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname))) + (udat-my-address-set! udata addr) + (udat-my-port-set! udata port) + (udat-my-hostname-set! udata (get-host-name)) + (udat-serv-listener-set! udata tlsn) + udata)) + +;; put the host, ip, port and pid into a pkt in +;; the captain pkts dir +;; - assumes user has already fired up a server +;; which will be in the udata struct +;; +(define (create-captain-pkt udata) + (if (not (udat-serv-listener udata)) + (begin + (print "ERROR: create-captain-pkt called with out a listener") + #f) + (let* ((pktdat `((port . ,(udat-my-port udata)) + (host . ,(udat-my-hostname udata)) + (ipaddr . ,(udat-my-address udata)) + (pid . ,(udat-my-pid udata)))) + (pktdir (udat-cpkts-dir udata)) + (pktspec (udat-cpkt-spec udata)) + ) + (udat-my-cpkt-key-set! + udata + (write-alist->pkt + pktdir + pktdat + pktspec: pktspec + ptype: 'captain)) + (udat-my-cpkt-key udata)))) + +;; NB// This needs to be started in a thread +;; +;; setup to be a captain +;; - start server +;; - create pkt +;; - start server port handler +;; +(define (setup-as-captain udata) + (if (start-server-find-port udata) ;; puts the server in udata + (if (create-captain-pkt udata) + (let* ((th (make-thread (lambda () + (ulex-handler udata)) "Captain handler"))) + (udat-handler-thread-set! udata th) + (thread-start! th)) + #f) + #f)) + +(define (get-peer-dat udata host-port #!optional (hostname #f)(pid #f)) + (let* ((pdat (or (hash-table-ref/default (udat-outgoing-conns udata) host-port #f) + (handle-exceptions ;; ERROR - MAKE THIS EXCEPTION HANDLER MORE SPECIFIC + exn + #f + (let ((npdat (make-peer addr-port: host-port))) + (if hostname (peer-hostname-set! npdat hostname)) + (if pid (peer-pid-set! npdat pid)) + (let-values (((ninp noup)(tcp-connect host-port))) + (peer-inp-set! npdat ninp) + (peer-oup-set! npdat noup)) + (hash-table-set! (udat-outgoing-conns udata) host-port npdat) + npdat))))) + pdat)) + +(define (get-peer-ports udata host-port #!optional (hostname #f)(pid #f)) + (let ((pdat (get-peer-dat udata host-port hostname pid))) + (if pdat + (values (peer-inp pdat)(peer-oup pdat)) + (values #f #f)))) + +;; send structured data to recipient +;; +;; NOTE: qrykey is what was called the "cookie" previously +;; +(define (send udata host-port handler qrykey data #!key (hostname #f)(pid #f)(params '())) + (let-values (((inp oup)(get-peer-ports udata host-port hostname pid))) + ;; CONTROL LINE: (note: removed the hostname - I don't think it adds much value + ;; + ;; handlerkey host:port pid qrykey params ... + ;; + (if (and inp oup) + (begin + (write-line (conc + handler " " + (udat-my-address udata) ":" (udat-my-port udata) " " + ;; (udat-my-hostname udata) " " + (udat-my-pid udata) " " + qrykey + (if (null? params) "" (conc " " (string-intersperse params " ")))) + oup) + (write-line data oup) + #t + ;; NOTE: DO NOT BE TEMPTED TO LOOK AT ANY DATA ON INP HERE! + ;; (there is a listener for handling that) + ) + #f))) ;; #f means failed to connect and send + +;; send a request to the given host-port and register a mailbox in udata +;; wait for the mailbox data and return it +;; +(define (send-receive udata host-port handler qrykey data #!key (hostname #f)(pid #f)(params '())) + (let ((mbox (make-mailbox)) + (mbox-time (current-milliseconds)) + (mboxes (udat-mboxes udata))) + (hash-table-set! mboxes qrykey mbox) + (if (send udata host-port handler qrykey data hostname: hostname pid: pid params: params) + (let* ((mbox-timeout-secs 20) + (mbox-timeout-result 'MBOX_TIMEOUT) + (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result)) + (mbox-receive-time (current-milliseconds))) + (hash-table-delete! mboxes qrykey) + res) + #f))) ;; #f means failed to communicate + +(define (add-to-work-queue udata peer-dat handlerkey qrykey data) + (let ((wdat (make-work peer-dat: peer-dat handlerkey: handlerkey qrykey: qrykey data: data))) + (if (udat-busy udata) + (queue-add! (udat-work-queue udata) wdat) + (process-work udata wdat)) ;; passing in wdat tells process-work to first process the passed in wdat + )) + +(define (do-work udata wdat) + #f) + +(define (process-work udata #!optional wdat) + (if wdat (do-work udata wdat)) ;; process wdat + (let ((wqueue (udat-work-queue udata))) + (if (not (queue-empty? wqueue)) + (let loop ((wd (queue-remove! wqueue))) + (do-work udata wd) + (if (not (queue-empty? wqueue)) + (loop (queue-remove! wqueue))))))) + +;; send back ack - this is tcp we are talking about, do we really need an ack? +;; +;; NOTE: No need to send back host:port of self - that is locked in by qrykey +;; +(define (send-ack udata host-port qrykey) ;; #!optional (hostname #f)(pid #f)) + (send udata host-port "ack" qrykey qrykey)) ;; we must send a second line - for the ack let it be the qrykey + +;; +;; +(define (ulex-handler udata) + (let* ((serv-listener (udat-serv-listener udata))) + (let-values (((inp oup)(tcp-accept serv-listener))) + ;; data comes as two lines + ;; handlerkey resp-addr:resp-port hostname pid qrykey [dbpath/dbfile.db] + ;; data + (let loop ((state 'start)) + (let* ((controldat (read-line inp)) + (data (read-line inp))) + (match (string-split controldat) + ((handlerkey host:port pid qrykey params ...) + (print "handlerkey: " handlerkey " host:port: " host:port " pid: " pid " qrykey: " qrykey " params: " params) + (case (string->symbol handlerkey) + ((ack)(print "Got ack!")) + ((ping) + (let* ((proc (hash-table-ref/default (udat-handlers udata) 'ping #f)) + (val (if proc (proc) "gotping"))) + (send udata host:port "version" qrykey val))) + ((rucaptain) + (send udata host:port "iamcaptain" qrykey (if (udat-my-cpkt-key udata) + "yes" + "no"))) + (else + ;; (send-ack udata host:port qrykey) + (add-to-work-queue udata (get-peer-dat udata host:port) handlerkey qrykey data)))) + (else (print "BAD DATA? controldat=" controldat " data=" data)))) + (loop state))))) + +;; add a proc to the handler list +(define (register-handler udata key proc) + (hash-table-set! (udat-handlers udata) key proc)) + + +;;====================================================================== +;; Generic db handling +;; setup a inmem db instance +;; open connection to on-disk db +;; sync on-disk db to inmem +;; get lock in on-disk db for dbowner of this db +;; put sync-proc, init-proc, on-disk handle, inmem handle in dbconn stuct +;; return the stuct +;;====================================================================== + +(defstruct dbconn + (inmem #f) + (conn #f) + (sync #f) ;; sync proc + (init #f) ;; init proc + (lastsync (current-seconds)) + ) + +(defstruct dbinfo + (initproc #f) + (syncproc #f)) + +;; open inmem and disk database +;; init with initproc +;; return db struct +;; +;; appname; megatest, ulex or something else. +;; +(define (setup-db-connection udata fname-in appname dbtype) + (let* ((is-ulex (eq? appname 'ulex)) + (dbinf (if is-ulex ;; ulex is a built-in special case + (make-dbinfo initproc: ulexdb-init syncproc: ulexdb-sync) + (hash-table-ref/default (udat-dbtypes udata) dbtype #f))) + (initproc (dbinfo-initproc dbinf)) + (syncproc (dbinfo-syncproc dbinf)) + (fname (if is-ulex + (conc (udat-ulex-dir udata) "/ulex.db") + fname-in)) + (inmem-db (open-and-initdb udata #f 'inmem (dbinfo-initproc dbinf))) + (disk-db (open-and-initdb udata fname 'disk (dbinfo-initproc dbinf)))) + (make-dbconn inmem: inmem-db conn: disk-db sync: syncproc init: initproc))) + +;; dest='inmem or 'disk +;; +(define (open-and-initdb udata filename dest init-proc) + (let* ((inmem (eq? dest 'inmem)) + (dbfile (if inmem + ":INMEM:" + filename)) + (dbexists (if inmem #t (file-exists? dbfile))) + (db (sqlite3:open-database dbfile))) + (sqlite3:set-busy-handler! db (sqlite3:make-busy-timeout 136000)) + (if (not dbexists) + (init-proc db)) + db)) + + +;;====================================================================== +;; Ulex db +;;====================================================================== + +(define (ulexdb-init db inmem) + (sqlite3:with-transaction + db + (lambda () + (for-each + (lambda (stmt) + (if stmt (sqlite3:execute db stmt))) + `("CREATE TABLE IF NOT EXISTS processes + (id INTEGER PRIMARY KEY, + host TEXT NOT NULL, + ipadr TEXT NOT NULL, + port INTEGER NOT NULL, + pid INTEGER NOT NULL, + regtime INTEGER DEFAULT (strftime('%s','now')), + last_update INTEGER DEFAULT (strftime('%s','now')));" + (if inmem + "CREATE TRIGGER IF NOT EXISTS update_proces_trigger AFTER UPDATE ON processes + FOR EACH ROW + BEGIN + UPDATE processes SET last_update=(strftime('%s','now')) + WHERE id=old.id; + END;" + #f)))))) + +;; open databases, do initial sync +(define (ulexdb-sync dbconndat udata) + #f) + + +) ;; END OF ULEX + + +;;; ;;====================================================================== +;;; ;; D E B U G H E L P E R S +;;; ;;====================================================================== +;;; +;;; (define (dbg> . args) +;;; (with-output-to-port (current-error-port) +;;; (lambda () +;;; (apply print "dbg> " args)))) +;;; +;;; (define (debug-pp . args) +;;; (if (get-environment-variable "ULEX_DEBUG") +;;; (with-output-to-port (current-error-port) +;;; (lambda () +;;; (apply pp args))))) +;;; +;;; (define *default-debug-port* (current-error-port)) +;;; +;;; (define (sdbg> fn stage-name stage-start stage-end start-time . message) +;;; (if (get-environment-variable "ULEX_DEBUG") +;;; (with-output-to-port *default-debug-port* +;;; (lambda () +;;; (apply print "ulex:" fn " " stage-name " took " (- (if stage-end stage-end (current-milliseconds)) stage-start) " ms. " +;;; (if start-time +;;; (conc "total time " (- (current-milliseconds) start-time) +;;; " ms.") +;;; "") +;;; message +;;; ))))) + +;;====================================================================== +;; M A C R O S +;;====================================================================== +;; iup callbacks are not dumping the stack, this is a work-around +;; + +;; Some of these routines use: +;; +;; http://www.cs.toronto.edu/~gfb/scheme/simple-macros.html +;; +;; Syntax for defining macros in a simple style similar to function definiton, +;; when there is a single pattern for the argument list and there are no keywords. +;; +;; (define-simple-syntax (name arg ...) body ...) +;; +;; +;; (define-syntax define-simple-syntax +;; (syntax-rules () +;; ((_ (name arg ...) body ...) +;; (define-syntax name (syntax-rules () ((name arg ...) (begin body ...))))))) +;; +;; (define-simple-syntax (catch-and-dump proc procname) +;; (handle-exceptions +;; exn +;; (begin +;; (print-call-chain (current-error-port)) +;; (with-output-to-port (current-error-port) +;; (lambda () +;; (print ((condition-property-accessor 'exn 'message) exn)) +;; (print "Callback error in " procname) +;; (print "Full condition info:\n" (condition->list exn))))) +;; (proc))) +;; +;; +;;====================================================================== +;; R E C O R D S +;;====================================================================== -) +;;; ;; information about me as a server +;;; ;; +;;; (defstruct area +;;; ;; about this area +;;; (useportlogger #f) +;;; (lowport 32768) +;;; (server-type 'auto) ;; auto=create up to five servers/pkts, main=create pkts, passive=no pkt (unless there are no pkts at all) +;;; (conn #f) +;;; (port #f) +;;; (myaddr (get-my-best-address)) +;;; pktid ;; get pkt from hosts table if needed +;;; pktfile +;;; pktsdir +;;; dbdir +;;; (dbhandles (make-hash-table)) ;; fname => list-of-dbh, NOTE: Should really never need more than one? +;;; (mutex (make-mutex)) +;;; (rtable (make-hash-table)) ;; registration table of available actions +;;; (dbs (make-hash-table)) ;; filename => random number, used for choosing what dbs I serve +;;; ;; about other servers +;;; (hosts (make-hash-table)) ;; key => hostdat +;;; (hoststats (make-hash-table)) ;; key => alist of fname => ( qcount . qtime ) +;;; (reqs (make-hash-table)) ;; uri => queue +;;; ;; work queues +;;; (wqueues (make-hash-table)) ;; fname => qdat +;;; (stats (make-hash-table)) ;; fname => totalqueries +;;; (last-srvup (current-seconds)) ;; last time we updated the known servers +;;; (cookie2mbox (make-hash-table)) ;; map cookie for outstanding request to mailbox of awaiting call +;;; (ready #f) +;;; (health (make-hash-table)) ;; ipaddr:port => num failed pings since last good ping +;;; ) +;;; +;;; ;; host stats +;;; ;; +;;; (defstruct hostdat +;;; (pkt #f) +;;; (dbload (make-hash-table)) ;; "dbfile.db" => queries/min +;;; (hostload #f) ;; normalized load ( 5min load / numcpus ) +;;; ) +;;; +;;; ;; dbdat +;;; ;; +;;; (defstruct dbdat +;;; (dbh #f) +;;; (fname #f) +;;; (write-access #f) +;;; (sths (make-hash-table)) ;; hash mapping query strings to handles +;;; ) +;;; +;;; ;; qdat +;;; ;; +;;; (defstruct qdat +;;; (writeq (make-queue)) +;;; (readq (make-queue)) +;;; (rwq (make-queue)) +;;; (logq (make-queue)) ;; do we need a queue for logging? yes, if we use sqlite3 db for logging +;;; (osshort (make-queue)) +;;; (oslong (make-queue)) +;;; (misc (make-queue)) ;; used for things like ping-full +;;; ) +;;; +;;; ;; calldat +;;; ;; +;;; (defstruct calldat +;;; (ctype 'dbwrite) +;;; (obj #f) ;; this would normally be an SQL statement e.g. SELECT, INSERT etc. +;;; (rtime (current-milliseconds))) +;;; +;;; ;; make it a global? Well, it is local to area module +;;; +;;; (define *pktspec* +;;; `((server (hostname . h) +;;; (port . p) +;;; (pid . i) +;;; (ipaddr . a) +;;; ) +;;; (data (hostname . h) ;; sender hostname +;;; (port . p) ;; sender port +;;; (ipaddr . a) ;; sender ip +;;; (hostkey . k) ;; sending host key - store info at server under this key +;;; (servkey . s) ;; server key - this needs to match at server end or reject the msg +;;; (format . f) ;; sb=serialized-base64, t=text, sx=sexpr, j=json +;;; (data . d) ;; base64 encoded slln data +;;; ))) +;;; +;;; ;; work item +;;; ;; +;;; (defstruct witem +;;; (rhost #f) ;; return host +;;; (ripaddr #f) ;; return ipaddr +;;; (rport #f) ;; return port +;;; (servkey #f) ;; the packet representing the client of this workitem, used by final send-message +;;; (rdat #f) ;; the request - usually an sql query, type is rdat +;;; (action #f) ;; the action: immediate, dbwrite, dbread,oslong, osshort +;;; (cookie #f) ;; cookie id for response +;;; (data #f) ;; the data payload, i.e. parameters +;;; (result #f) ;; the result from processing the data +;;; (caller #f)) ;; the calling peer according to rpc itself +;;; +;;; (define (trim-pktid pktid) +;;; (if (string? pktid) +;;; (substring pktid 0 4) +;;; "nopkt")) +;;; +;;; (define (any->number num) +;;; (cond +;;; ((number? num) num) +;;; ((string? num) (string->number num)) +;;; (else num))) +;;; +;;; (use trace) +;;; (trace-call-sites #t) +;;; +;;; ;;====================================================================== +;;; ;; D A T A B A S E H A N D L I N G +;;; ;;====================================================================== +;;; +;;; ;; look in dbhandles for a db, return it, else return #f +;;; ;; +;;; (define (get-dbh acfg fname) +;;; (let ((dbh-lst (hash-table-ref/default (area-dbhandles acfg) fname '()))) +;;; (if (null? dbh-lst) +;;; (begin +;;; ;; (print "opening db for " fname) +;;; (open-db acfg fname)) ;; Note that the handles get put back in the queue in the save-dbh calls +;;; (let ((rem-lst (cdr dbh-lst))) +;;; ;; (print "re-using saved connection for " fname) +;;; (hash-table-set! (area-dbhandles acfg) fname rem-lst) +;;; (car dbh-lst))))) +;;; +;;; (define (save-dbh acfg fname dbdat) +;;; ;; (print "saving dbh for " fname) +;;; (hash-table-set! (area-dbhandles acfg) fname (cons dbdat (hash-table-ref/default (area-dbhandles acfg) fname '())))) +;;; +;;; ;; open the database, if never before opened init it. put the handle in the +;;; ;; open db's hash table +;;; ;; returns: the dbdat +;;; ;; +;;; (define (open-db acfg fname) +;;; (let* ((fullname (conc (area-dbdir acfg) "/" fname)) +;;; (exists (file-exists? fullname)) +;;; (write-access (if exists +;;; (file-write-access? fullname) +;;; (file-write-access? (area-dbdir acfg)))) +;;; (db (sqlite3:open-database fullname)) +;;; (handler (sqlite3:make-busy-timeout 136000)) +;;; ) +;;; (sqlite3:set-busy-handler! db handler) +;;; (sqlite3:execute db "PRAGMA synchronous = 0;") +;;; (if (not exists) ;; need to init the db +;;; (if write-access +;;; (let ((isql (get-rsql acfg 'dbinitsql))) ;; get the init sql statements +;;; ;; (sqlite3:with-transaction +;;; ;; db +;;; ;; (lambda () +;;; (if isql +;;; (for-each +;;; (lambda (sql) +;;; (sqlite3:execute db sql)) +;;; isql))) +;;; (print "ERROR: no write access to " (area-dbdir acfg)))) +;;; (make-dbdat dbh: db fname: fname write-access: write-access))) +;;; +;;; ;; This is a low-level command to retrieve or to prepare, save and return a prepared statment +;;; ;; you must extract the db handle +;;; ;; +;;; (define (get-sth db cache stmt) +;;; (if (hash-table-exists? cache stmt) +;;; (begin +;;; ;; (print "Reusing cached stmt for " stmt) +;;; (hash-table-ref/default cache stmt #f)) +;;; (let ((sth (sqlite3:prepare db stmt))) +;;; (hash-table-set! cache stmt sth) +;;; ;; (print "prepared stmt for " stmt) +;;; sth))) +;;; +;;; ;; a little more expensive but does all the tedious deferencing - only use if you don't already +;;; ;; have dbdat and db sitting around +;;; ;; +;;; (define (full-get-sth acfg fname stmt) +;;; (let* ((dbdat (get-dbh acfg fname)) +;;; (db (dbdat-dbh dbdat)) +;;; (sths (dbdat-sths dbdat))) +;;; (get-sth db sths stmt))) +;;; +;;; ;; write to a db +;;; ;; acfg: area data +;;; ;; rdat: request data +;;; ;; hdat: (host . port) +;;; ;; +;;; ;; (define (dbwrite acfg rdat hdat data-in) +;;; ;; (let* ((dbname (car data-in)) +;;; ;; (dbdat (get-dbh acfg dbname)) +;;; ;; (db (dbdat-dbh dbdat)) +;;; ;; (sths (dbdat-sths dbdat)) +;;; ;; (stmt (calldat-obj rdat)) +;;; ;; (sth (get-sth db sths stmt)) +;;; ;; (data (cdr data-in))) +;;; ;; (print "dbname: " dbname " acfg: " acfg " rdat: " (calldat->alist rdat) " hdat: " hdat " data: " data) +;;; ;; (print "dbdat: " (dbdat->alist dbdat)) +;;; ;; (apply sqlite3:execute sth data) +;;; ;; (save-dbh acfg dbname dbdat) +;;; ;; #t +;;; ;; )) +;;; +;;; (define (finalize-all-db-handles acfg) +;;; (let* ((dbhandles (area-dbhandles acfg)) ;; dbhandles is hash of fname ==> dbdat +;;; (num 0)) +;;; (for-each +;;; (lambda (area-name) +;;; (print "Closing handles for " area-name) +;;; (let ((dbdats (hash-table-ref/default dbhandles area-name '()))) +;;; (for-each +;;; (lambda (dbdat) +;;; ;; first close all statement handles +;;; (for-each +;;; (lambda (sth) +;;; (sqlite3:finalize! sth) +;;; (set! num (+ num 1))) +;;; (hash-table-values (dbdat-sths dbdat))) +;;; ;; now close the dbh +;;; (set! num (+ num 1)) +;;; (sqlite3:finalize! (dbdat-dbh dbdat))) +;;; dbdats))) +;;; (hash-table-keys dbhandles)) +;;; (print "FINALIZED " num " dbhandles"))) +;;; +;;; ;;====================================================================== +;;; ;; W O R K Q U E U E H A N D L I N G +;;; ;;====================================================================== +;;; +;;; (define (register-db-as-mine acfg dbname) +;;; (let ((ht (area-dbs acfg))) +;;; (if (not (hash-table-ref/default ht dbname #f)) +;;; (hash-table-set! ht dbname (random 10000))))) +;;; +;;; (define (work-queue-add acfg fname witem) +;;; (let* ((work-queue-start (current-milliseconds)) +;;; (action (witem-action witem)) ;; NB the action is the index into the rdat actions +;;; (qdat (or (hash-table-ref/default (area-wqueues acfg) fname #f) +;;; (let ((newqdat (make-qdat))) +;;; (hash-table-set! (area-wqueues acfg) fname newqdat) +;;; newqdat))) +;;; (rdat (hash-table-ref/default (area-rtable acfg) action #f))) +;;; (if rdat +;;; (queue-add! +;;; (case (calldat-ctype rdat) +;;; ((dbwrite) (register-db-as-mine acfg fname)(qdat-writeq qdat)) +;;; ((dbread) (register-db-as-mine acfg fname)(qdat-readq qdat)) +;;; ((dbrw) (register-db-as-mine acfg fname)(qdat-rwq qdat)) +;;; ((oslong) (qdat-oslong qdat)) +;;; ((osshort) (qdat-osshort qdat)) +;;; ((full-ping) (qdat-misc qdat)) +;;; (else +;;; (print "ERROR: no queue for " action ". Adding to dbwrite queue.") +;;; (qdat-writeq qdat))) +;;; witem) +;;; (case action +;;; ((full-ping)(qdat-misc qdat)) +;;; (else +;;; (print "ERROR: No action " action " was registered")))) +;;; (sdbg> "work-queue-add" "queue-add" work-queue-start #f #f) +;;; #t)) ;; for now, simply return #t to indicate request got to the queue +;;; +;;; (define (doqueue acfg q fname dbdat dbh) +;;; ;; (print "doqueue: " fname) +;;; (let* ((start-time (current-milliseconds)) +;;; (qlen (queue-length q))) +;;; (if (> qlen 1) +;;; (print "Processing queue of length " qlen)) +;;; (let loop ((count 0) +;;; (responses '())) +;;; (let ((delta (- (current-milliseconds) start-time))) +;;; (if (or (queue-empty? q) +;;; (> delta 400)) ;; stop working on this queue after 400ms have passed +;;; (list count delta responses) ;; return count, delta and responses list +;;; (let* ((witem (queue-remove! q)) +;;; (action (witem-action witem)) +;;; (rdat (witem-rdat witem)) +;;; (stmt (calldat-obj rdat)) +;;; (sth (full-get-sth acfg fname stmt)) +;;; (ctype (calldat-ctype rdat)) +;;; (data (witem-data witem)) +;;; (cookie (witem-cookie witem))) +;;; ;; do the processing and save the result in witem-result +;;; (witem-result-set! +;;; witem +;;; (case ctype ;; action +;;; ((noblockwrite) ;; blind write, no ack of success returned +;;; (apply sqlite3:execute sth data) +;;; (sqlite3:last-insert-rowid dbh)) +;;; ((dbwrite) ;; blocking write +;;; (apply sqlite3:execute sth data) +;;; #t) +;;; ((dbread) ;; TODO: consider breaking this up and shipping in pieces for large query +;;; (apply sqlite3:map-row (lambda x x) sth data)) +;;; ((full-ping) 'full-ping) +;;; (else (print "Not ready for action " action) #f))) +;;; (loop (add1 count) +;;; (if cookie +;;; (cons witem responses) +;;; responses)))))))) +;;; +;;; ;; do up to 400ms of processing on each queue +;;; ;; - the work-queue-processor will allow the max 1200ms of work to complete but it will flag as overloaded +;;; ;; +;;; (define (process-db-queries acfg fname) +;;; (if (hash-table-exists? (area-wqueues acfg) fname) +;;; (let* ((process-db-queries-start-time (current-milliseconds)) +;;; (qdat (hash-table-ref/default (area-wqueues acfg) fname #f)) +;;; (queue-sym->queue (lambda (queue-sym) +;;; (case queue-sym ;; lookup the queue from qdat given a name (symbol) +;;; ((wqueue) (qdat-writeq qdat)) +;;; ((rqueue) (qdat-readq qdat)) +;;; ((rwqueue) (qdat-rwq qdat)) +;;; ((misc) (qdat-misc qdat)) +;;; (else #f)))) +;;; (dbdat (get-dbh acfg fname)) +;;; (dbh (if (dbdat? dbdat)(dbdat-dbh dbdat) #f)) +;;; (nowtime (current-seconds))) +;;; ;; handle the queues that require a transaction +;;; ;; +;;; (map ;; +;;; (lambda (queue-sym) +;;; ;; (print "processing queue " queue-sym) +;;; (let* ((queue (queue-sym->queue queue-sym))) +;;; (if (not (queue-empty? queue)) +;;; (let ((responses +;;; (sqlite3:with-transaction ;; todo - catch exceptions... +;;; dbh +;;; (lambda () +;;; (let* ((res (doqueue acfg queue fname dbdat dbh))) ;; this does the work! +;;; ;; (print "res=" res) +;;; (match res +;;; ((count delta responses) +;;; (update-stats acfg fname queue-sym delta count) +;;; (sdbg> "process-db-queries" "sqlite3-transaction" process-db-queries-start-time #f #f) +;;; responses) ;; return responses +;;; (else +;;; (print "ERROR: bad return data from doqueue " res))) +;;; ))))) +;;; ;; having completed the transaction, send the responses. +;;; ;; (print "INFO: sending " (length responses) " responses.") +;;; (let loop ((responses-left responses)) +;;; (cond +;;; ((null? responses-left) #t) +;;; (else +;;; (let* ((witem (car responses-left)) +;;; (response (cdr responses-left))) +;;; (call-deliver-response acfg (witem-ripaddr witem)(witem-rport witem) +;;; (witem-cookie witem)(witem-result witem))) +;;; (loop (cdr responses-left)))))) +;;; ))) +;;; '(wqueue rwqueue rqueue)) +;;; +;;; ;; handle misc queue +;;; ;; +;;; ;; (print "processing misc queue") +;;; (let ((queue (queue-sym->queue 'misc))) +;;; (doqueue acfg queue fname dbdat dbh)) +;;; ;; .... +;;; (save-dbh acfg fname dbdat) +;;; #t ;; just to let the tests know we got here +;;; ) +;;; #f ;; nothing processed +;;; )) +;;; +;;; ;; run all queues in parallel per db but sequentially per queue for that db. +;;; ;; - process the queues every 500 or so ms +;;; ;; - allow for long running queries to continue but all other activities for that +;;; ;; db will be blocked. +;;; ;; +;;; (define (work-queue-processor acfg) +;;; (let* ((threads (make-hash-table))) ;; fname => thread +;;; (let loop ((fnames (hash-table-keys (area-wqueues acfg))) +;;; (target-time (+ (current-milliseconds) 50))) +;;; ;;(if (not (null? fnames))(print "Processing for these databases: " fnames)) +;;; (for-each +;;; (lambda (fname) +;;; ;; (print "processing for " fname) +;;; ;;(process-db-queries acfg fname)) +;;; (let ((th (hash-table-ref/default threads fname #f))) +;;; (if (and th (not (member (thread-state th) '(dead terminated)))) +;;; (begin +;;; (print "WARNING: worker thread for " fname " is taking a long time.") +;;; (print "Thread is in state " (thread-state th))) +;;; (let ((th1 (make-thread (lambda () +;;; ;; (catch-and-dump +;;; ;; (lambda () +;;; ;; (print "Process queries for " fname) +;;; (let ((start-time (current-milliseconds))) +;;; (process-db-queries acfg fname) +;;; ;; (thread-sleep! 0.01) ;; need the thread to take at least some time +;;; (hash-table-delete! threads fname)) ;; no mutexes? +;;; fname) +;;; "th1"))) ;; )) +;;; (hash-table-set! threads fname th1) +;;; (thread-start! th1))))) +;;; fnames) +;;; ;; (thread-sleep! 0.1) ;; give the threads some time to process requests +;;; ;; burn time until 400ms is up +;;; (let ((now-time (current-milliseconds))) +;;; (if (< now-time target-time) +;;; (let ((delta (- target-time now-time))) +;;; (thread-sleep! (/ delta 1000))))) +;;; (loop (hash-table-keys (area-wqueues acfg)) +;;; (+ (current-milliseconds) 50))))) +;;; +;;; ;;====================================================================== +;;; ;; S T A T S G A T H E R I N G +;;; ;;====================================================================== +;;; +;;; (defstruct stat +;;; (qcount-avg 0) ;; coarse running average +;;; (qtime-avg 0) ;; coarse running average +;;; (qcount 0) ;; total +;;; (qtime 0) ;; total +;;; (last-qcount 0) ;; last +;;; (last-qtime 0) ;; last +;;; (dbs '()) ;; list of db files handled by this node +;;; (when 0)) ;; when the last query happened - seconds +;;; +;;; +;;; (define (update-stats acfg fname bucket duration numqueries) +;;; (let* ((key fname) ;; for now do not use bucket. Was: (conc fname "-" bucket)) ;; lazy but good enough +;;; (stats (or (hash-table-ref/default (area-stats acfg) key #f) +;;; (let ((newstats (make-stat))) +;;; (hash-table-set! (area-stats acfg) key newstats) +;;; newstats)))) +;;; ;; when the last query happended (used to remove the fname from the active list) +;;; (stat-when-set! stats (current-seconds)) +;;; ;; last values +;;; (stat-last-qcount-set! stats numqueries) +;;; (stat-last-qtime-set! stats duration) +;;; ;; total over process lifetime +;;; (stat-qcount-set! stats (+ (stat-qcount stats) numqueries)) +;;; (stat-qtime-set! stats (+ (stat-qtime stats) duration)) +;;; ;; coarse average +;;; (stat-qcount-avg-set! stats (/ (+ (stat-qcount-avg stats) numqueries) 2)) +;;; (stat-qtime-avg-set! stats (/ (+ (stat-qtime-avg stats) duration) 2)) +;;; +;;; ;; here is where we add the stats for a given dbfile +;;; (if (not (member fname (stat-dbs stats))) +;;; (stat-dbs-set! stats (cons fname (stat-dbs stats)))) +;;; +;;; )) +;;; +;;; ;;====================================================================== +;;; ;; S E R V E R S T U F F +;;; ;;====================================================================== +;;; +;;; ;; this does NOT return! +;;; ;; +;;; (define (find-free-port-and-open acfg) +;;; (let ((port (or (area-port acfg) 3200))) +;;; (handle-exceptions +;;; exn +;;; (begin +;;; (print "INFO: cannot bind to port " (rpc:default-server-port) ", trying next port") +;;; (area-port-set! acfg (+ port 1)) +;;; (find-free-port-and-open acfg)) +;;; (rpc:default-server-port port) +;;; (area-port-set! acfg port) +;;; (tcp-read-timeout 120000) +;;; ;; ((rpc:make-server (tcp-listen port)) #t) +;;; (tcp-listen (rpc:default-server-port) +;;; )))) +;;; +;;; ;; register this node by putting a packet into the pkts dir. +;;; ;; look for other servers +;;; ;; contact other servers and compile list of servers +;;; ;; there are two types of server +;;; ;; main servers - dashboards, runners and dedicated servers - need pkt +;;; ;; passive servers - test executers, step calls, list-runs - no pkt +;;; ;; +;;; (define (register-node acfg hostip port-num) +;;; ;;(mutex-lock! (area-mutex acfg)) +;;; (let* ((server-type (area-server-type acfg)) ;; auto, main, passive (no pkt created) +;;; (best-ip (or hostip (get-my-best-address))) +;;; (mtdir (area-dbdir acfg)) +;;; (pktdir (area-pktsdir acfg))) ;; conc mtdir "/.server-pkts"))) +;;; (print "Registering node " best-ip ":" port-num) +;;; (if (not mtdir) ;; require a home for this node to put or find databases +;;; #f +;;; (begin +;;; (if (not (directory? pktdir))(create-directory pktdir)) +;;; ;; server is started, now create pkt if needed +;;; (print "Starting server in " server-type " mode with port " port-num) +;;; (if (member server-type '(auto main)) ;; TODO: if auto, count number of servers registers, if > 3 then don't put out a pkt +;;; (begin +;;; (area-pktid-set! acfg +;;; (write-alist->pkt +;;; pktdir +;;; `((hostname . ,(get-host-name)) +;;; (ipaddr . ,best-ip) +;;; (port . ,port-num) +;;; (pid . ,(current-process-id))) +;;; pktspec: *pktspec* +;;; ptype: 'server)) +;;; (area-pktfile-set! acfg (conc pktdir "/" (area-pktid acfg) ".pkt")))) +;;; (area-port-set! acfg port-num) +;;; #;(mutex-unlock! (area-mutex acfg)))))) +;;; +;;; (define *cookie-seqnum* 0) +;;; (define (make-cookie key) +;;; (set! *cookie-seqnum* (add1 *cookie-seqnum*)) +;;; ;;(print "MAKE COOKIE CALLED -- on "servkey"-"*cookie-seqnum*) +;;; (conc key "-" *cookie-seqnum*) +;;; ) +;;; +;;; ;; dispatch locally if possible +;;; ;; +;;; (define (call-deliver-response acfg ipaddr port cookie data) +;;; (if (and (equal? (area-myaddr acfg) ipaddr) +;;; (equal? (area-port acfg) port)) +;;; (deliver-response acfg cookie data) +;;; ((rpc:procedure 'response ipaddr port) cookie data))) +;;; +;;; (define (deliver-response acfg cookie data) +;;; (let ((deliver-response-start (current-milliseconds))) +;;; (thread-start! (make-thread +;;; (lambda () +;;; (let loop ((tries-left 5)) +;;; ;;(print "TOP OF DELIVER_RESPONSE LOOP; triesleft="tries-left) +;;; ;;(pp (hash-table->alist (area-cookie2mbox acfg))) +;;; (let* ((mbox (hash-table-ref/default (area-cookie2mbox acfg) cookie #f))) +;;; (cond +;;; ((eq? 0 tries-left) +;;; (print "ulex:deliver-response: I give up. Mailbox never appeared. cookie="cookie) +;;; ) +;;; (mbox +;;; ;;(print "got mbox="mbox" got data="data" send.") +;;; (mailbox-send! mbox data)) +;;; (else +;;; ;;(print "no mbox yet. look for "cookie) +;;; (thread-sleep! (/ (- 6 tries-left) 10)) +;;; (loop (sub1 tries-left)))))) +;;; ;; (debug-pp (list (conc "ulex:deliver-response took " (- (current-milliseconds) deliver-response-start) " ms, cookie=" cookie " data=") data)) +;;; (sdbg> "deliver-response" "mailbox-send" deliver-response-start #f #f cookie) +;;; ) +;;; (conc "deliver-response thread for cookie="cookie)))) +;;; #t) +;;; +;;; ;; action: +;;; ;; immediate - quick actions, no need to put in queues +;;; ;; dbwrite - put in dbwrite queue +;;; ;; dbread - put in dbread queue +;;; ;; oslong - os actions, e.g. du, that could take a long time +;;; ;; osshort - os actions that should be quick, e.g. df +;;; ;; +;;; (define (request acfg from-ipaddr from-port servkey action cookie fname params) ;; std-peer-handler +;;; ;; NOTE: Use rpc:current-peer for getting return address +;;; (let* ((std-peer-handler-start (current-milliseconds)) +;;; ;; (raw-data (alist-ref 'data dat)) +;;; (rdat (hash-table-ref/default +;;; (area-rtable acfg) action #f)) ;; this looks up the sql query or other details indexed by the action +;;; (witem (make-witem ripaddr: from-ipaddr ;; rhost: from-host +;;; rport: from-port action: action +;;; rdat: rdat cookie: cookie +;;; servkey: servkey data: params ;; TODO - rename data to params +;;; caller: (rpc:current-peer)))) +;;; (if (not (equal? servkey (area-pktid acfg))) +;;; `(#f . ,(conc "I don't know you servkey=" servkey ", pktid=" (area-pktid acfg))) ;; immediately return this +;;; (let* ((ctype (if rdat +;;; (calldat-ctype rdat) ;; is this necessary? these should be identical +;;; action))) +;;; (sdbg> "std-peer-handler" "immediate" std-peer-handler-start #f #f) +;;; (case ctype +;;; ;; (dbwrite acfg rdat (cons from-ipaddr from-port) data))) +;;; ((full-ping) `(#t "ack to full ping" ,(work-queue-add acfg fname witem) ,cookie)) +;;; ((response) `(#t "ack from requestor" ,(deliver-response acfg fname params))) +;;; ((dbwrite) `(#t "db write submitted" ,(work-queue-add acfg fname witem) ,cookie)) +;;; ((dbread) `(#t "db read submitted" ,(work-queue-add acfg fname witem) ,cookie )) +;;; ((dbrw) `(#t "db read/write submitted" ,cookie)) +;;; ((osshort) `(#t "os short submitted" ,cookie)) +;;; ((oslong) `(#t "os long submitted" ,cookie)) +;;; (else `(#f "unrecognised action" ,ctype))))))) +;;; +;;; ;; Call this to start the actual server +;;; ;; +;;; ;; start_server +;;; ;; +;;; ;; mode: ' +;;; ;; handler: proc which takes pktrecieved as argument +;;; ;; +;;; +;;; (define (start-server acfg) +;;; (let* ((conn (find-free-port-and-open acfg)) +;;; (port (area-port acfg))) +;;; (rpc:publish-procedure! +;;; 'delist-db +;;; (lambda (fname) +;;; (hash-table-delete! (area-dbs acfg) fname))) +;;; (rpc:publish-procedure! +;;; 'calling-addr +;;; (lambda () +;;; (rpc:current-peer))) +;;; (rpc:publish-procedure! +;;; 'ping +;;; (lambda ()(real-ping acfg))) +;;; (rpc:publish-procedure! +;;; 'request +;;; (lambda (from-addr from-port servkey action cookie dbname params) +;;; (request acfg from-addr from-port servkey action cookie dbname params))) +;;; (rpc:publish-procedure! +;;; 'response +;;; (lambda (cookie res-dat) +;;; (deliver-response acfg cookie res-dat))) +;;; (area-ready-set! acfg #t) +;;; (area-conn-set! acfg conn) +;;; ((rpc:make-server conn) #f)));; ((tcp-listen (rpc:default-server-port)) #t) +;;; +;;; +;;; (define (launch acfg) ;; #!optional (proc std-peer-handler)) +;;; (print "starting launch") +;;; (update-known-servers acfg) ;; gotta do this on every start (thus why limit number of publicised servers) +;;; #;(let ((original-handler (current-exception-handler))) ;; is th +;;; (lambda (exception) +;;; (server-exit-procedure) +;;; (original-handler exception))) +;;; (on-exit (lambda () +;;; (shutdown acfg))) ;; (finalize-all-db-handles acfg))) +;;; ;; set up the rpc handler +;;; (let* ((th1 (make-thread +;;; (lambda ()(start-server acfg)) +;;; "server thread")) +;;; (th2 (make-thread +;;; (lambda () +;;; (print "th2 starting") +;;; (let loop () +;;; (work-queue-processor acfg) +;;; (print "work-queue-processor crashed!") +;;; (loop))) +;;; "work queue thread"))) +;;; (thread-start! th1) +;;; (thread-start! th2) +;;; (let loop () +;;; (thread-sleep! 0.025) +;;; (if (area-ready acfg) +;;; #t +;;; (loop))) +;;; ;; attempt to fix my address +;;; (let* ((all-addr (get-all-ips-sorted))) ;; could use (tcp-addresses conn)? +;;; (let loop ((rem-addrs all-addr)) +;;; (if (null? rem-addrs) +;;; (begin +;;; (print "ERROR: Failed to figure out the ip address of myself as a server. Giving up.") +;;; (exit 1)) ;; BUG Changeme to raising an exception +;;; +;;; (let* ((addr (car rem-addrs)) +;;; (good-addr (handle-exceptions +;;; exn +;;; #f +;;; ((rpc:procedure 'calling-addr addr (area-port acfg)))))) +;;; (if good-addr +;;; (begin +;;; (print "Got good-addr of " good-addr) +;;; (area-myaddr-set! acfg good-addr)) +;;; (loop (cdr rem-addrs))))))) +;;; (register-node acfg (area-myaddr acfg)(area-port acfg)) +;;; (print "INFO: Server started on " (area-myaddr acfg) ":" (area-port acfg)) +;;; ;; (update-known-servers acfg) ;; gotta do this on every start (thus why limit number of publicised servers) +;;; )) +;;; +;;; (define (clear-server-pkt acfg) +;;; (let ((pktf (area-pktfile acfg))) +;;; (if pktf (delete-file* pktf)))) +;;; +;;; (define (shutdown acfg) +;;; (let (;;(conn (area-conn acfg)) +;;; (pktf (area-pktfile acfg)) +;;; (port (area-port acfg))) +;;; (if pktf (delete-file* pktf)) +;;; (send-all "imshuttingdown") +;;; ;; (rpc:close-all-connections!) ;; don't know if this is actually needed +;;; (finalize-all-db-handles acfg))) +;;; +;;; (define (send-all msg) +;;; #f) +;;; +;;; ;; given a area record look up all the packets +;;; ;; +;;; (define (get-all-server-pkts acfg) +;;; (let ((all-pkt-files (glob (conc (area-pktsdir acfg) "/*.pkt")))) +;;; (map (lambda (pkt-file) +;;; (read-pkt->alist pkt-file pktspec: *pktspec*)) +;;; all-pkt-files))) +;;; +;;; #;((Z . "9a0212302295a19610d5796fce0370fa130758e9") +;;; (port . "34827") +;;; (pid . "28748") +;;; (hostname . "zeus") +;;; (T . "server") +;;; (D . "1549427032.0")) +;;; +;;; #;(define (get-my-best-address) +;;; (let ((all-my-addresses (get-all-ips))) ;; (vector->list (hostinfo-addresses (hostname->hostinfo (get-host-name)))))) +;;; (cond +;;; ((null? all-my-addresses) +;;; (get-host-name)) ;; no interfaces? +;;; ((eq? (length all-my-addresses) 1) +;;; (ip->string (car all-my-addresses))) ;; only one to choose from, just go with it +;;; (else +;;; (ip->string (car (filter (lambda (x) ;; take any but 127. +;;; (not (eq? (u8vector-ref x 0) 127))) +;;; all-my-addresses))))))) +;;; +;;; ;; whoami? I am my pkt +;;; ;; +;;; (define (whoami? acfg) +;;; (hash-table-ref/default (area-hosts acfg)(area-pktid acfg) #f)) +;;; +;;; ;;====================================================================== +;;; ;; "Client side" operations +;;; ;;====================================================================== +;;; +;;; (define (safe-call call-key host port . params) +;;; (handle-exceptions +;;; exn +;;; (begin +;;; (print "Call " call-key " to " host ":" port " failed") +;;; #f) +;;; (apply (rpc:procedure call-key host port) params))) +;;; +;;; ;; ;; convert to/from string / sexpr +;;; ;; +;;; ;; (define (string->sexpr str) +;;; ;; (if (string? str) +;;; ;; (with-input-from-string str read) +;;; ;; str)) +;;; ;; +;;; ;; (define (sexpr->string s) +;;; ;; (with-output-to-string (lambda ()(write s)))) +;;; +;;; ;; is the server alive? +;;; ;; +;;; (define (ping acfg host port) +;;; (let* ((myaddr (area-myaddr acfg)) +;;; (myport (area-port acfg)) +;;; (start-time (current-milliseconds)) +;;; (res (if (and (equal? myaddr host) +;;; (equal? myport port)) +;;; (real-ping acfg) +;;; ((rpc:procedure 'ping host port))))) +;;; (cons (- (current-milliseconds) start-time) +;;; res))) +;;; +;;; ;; returns ( ipaddr port alist-fname=>randnum ) +;;; (define (real-ping acfg) +;;; `(,(area-myaddr acfg) ,(area-port acfg) ,(get-host-stats acfg))) +;;; +;;; ;; is the server alive AND the queues processing? +;;; ;; +;;; #;(define (full-ping acfg servpkt) +;;; (let* ((start-time (current-milliseconds)) +;;; (res (send-message acfg servpkt '(full-ping) 'full-ping))) +;;; (cons (- (current-milliseconds) start-time) +;;; res))) ;; (equal? res "got ping")))) +;;; +;;; +;;; ;; look up all pkts and get the server id (the hash), port, host/ip +;;; ;; store this info in acfg +;;; ;; return the number of responsive servers found +;;; ;; +;;; ;; DO NOT VERIFY THAT THE SERVER IS ALIVE HERE. This is called at times where the current server is not yet alive and cannot ping itself +;;; ;; +;;; (define (update-known-servers acfg) +;;; ;; readll all pkts +;;; ;; foreach pkt; if it isn't me ping the server; if alive, add to hosts hash, else rm the pkt +;;; (let* ((start-time (current-milliseconds)) +;;; (all-pkts (delete-duplicates +;;; (append (get-all-server-pkts acfg) +;;; (hash-table-values (area-hosts acfg))))) +;;; (hostshash (area-hosts acfg)) +;;; (my-id (area-pktid acfg)) +;;; (pktsdir (area-pktsdir acfg)) ;; needed to remove pkts from non-responsive servers +;;; (numsrvs 0) +;;; (delpkt (lambda (pktsdir sid) +;;; (print "clearing out server " sid) +;;; (delete-file* (conc pktsdir "/" sid ".pkt")) +;;; (hash-table-delete! hostshash sid)))) +;;; (area-last-srvup-set! acfg (current-seconds)) +;;; (for-each +;;; (lambda (servpkt) +;;; (if (list? servpkt) +;;; ;; (pp servpkt) +;;; (let* ((shost (alist-ref 'ipaddr servpkt)) +;;; (sport (any->number (alist-ref 'port servpkt))) +;;; (res (handle-exceptions +;;; exn +;;; (begin +;;; ;; (print "INFO: bad server on " shost ":" sport) +;;; #f) +;;; (ping acfg shost sport))) +;;; (sid (alist-ref 'Z servpkt)) ;; Z code is our name for the server +;;; (url (conc shost ":" sport)) +;;; ) +;;; #;(if (or (not res) +;;; (null? res)) +;;; (begin +;;; (print "STRANGE: ping of " url " gave " res))) +;;; +;;; ;; (print "Got " res " from " shost ":" sport) +;;; (match res +;;; ((qduration . payload) +;;; ;; (print "Server pkt:" (alist-ref 'ipaddr servpkt) ":" (alist-ref 'port servpkt) +;;; ;; (if payload +;;; ;; "Success" "Fail")) +;;; (match payload +;;; ((host port stats) +;;; ;; (print "From " host ":" port " got stats: " stats) +;;; (if (and host port stats) +;;; (let ((url (conc host ":" port))) +;;; (hash-table-set! hostshash sid servpkt) +;;; ;; store based on host:port +;;; (hash-table-set! (area-hoststats acfg) sid stats)) +;;; (print "missing data from the server, not sure what that means!")) +;;; (set! numsrvs (+ numsrvs 1))) +;;; (#f +;;; (print "Removing pkt " sid " due to #f from server or failed ping") +;;; (delpkt pktsdir sid)) +;;; (else +;;; (print "Got ")(pp res)(print " from server ")(pp servpkt) " but response did not match (#f/#t . msg)"))) +;;; (else +;;; ;; here we delete the pkt - can't reach the server, remove it +;;; ;; however this logic is inadequate. we should mark the server as checked +;;; ;; and not good, if it happens a second time - then remove the pkt +;;; ;; or something similar. I.e. don't be too quick to assume the server is wedged or dead +;;; ;; could be it is simply too busy to reply +;;; (let ((bad-pings (hash-table-ref/default (area-health acfg) url 0))) +;;; (if (> bad-pings 1) ;; two bad pings - remove pkt +;;; (begin +;;; (print "INFO: " bad-pings " bad responses from " url ", deleting pkt " sid) +;;; (delpkt pktsdir sid)) +;;; (begin +;;; (print "INFO: " bad-pings " bad responses from " shost ":" sport " not deleting pkt yet") +;;; (hash-table-set! (area-health acfg) +;;; url +;;; (+ (hash-table-ref/default (area-health acfg) url 0) 1)) +;;; )) +;;; )))) +;;; ;; servpkt is not actually a pkt? +;;; (begin +;;; (print "Bad pkt " servpkt)))) +;;; all-pkts) +;;; (sdbg> "update-known-servers" "end" start-time #f #f " found " numsrvs +;;; " servers, pkts: " (map (lambda (p) +;;; (alist-ref 'Z p)) +;;; all-pkts)) +;;; numsrvs)) +;;; +;;; (defstruct srvstat +;;; (numfiles 0) ;; number of db files handled by this server - subtract 1 for the db being currently looked at +;;; (randnum #f) ;; tie breaker number assigned to by the server itself - applies only to the db under consideration +;;; (pkt #f)) ;; the server pkt +;;; +;;; ;;(define (srv->srvstat srvpkt) +;;; +;;; ;; Get the server best for given dbname and key +;;; ;; +;;; ;; NOTE: key is not currently used. The key points to the kind of query, this may be useful for directing read-only queries. +;;; ;; +;;; (define (get-best-server acfg dbname key) +;;; (let* (;; (servers (hash-table-values (area-hosts acfg))) +;;; (servers (area-hosts acfg)) +;;; (skeys (sort (hash-table-keys servers) string>=?)) ;; a stable listing +;;; (start-time (current-milliseconds)) +;;; (srvstats (make-hash-table)) ;; srvid => srvstat +;;; (url (conc (area-myaddr acfg) ":" (area-port acfg)))) +;;; ;; (print "scores for " dbname ": " (map (lambda (k)(cons k (calc-server-score acfg dbname k))) skeys)) +;;; (if (null? skeys) +;;; (if (> (update-known-servers acfg) 0) +;;; (get-best-server acfg dbname key) ;; some risk of infinite loop here, TODO add try counter +;;; (begin +;;; (print "ERROR: no server found!") ;; since this process is also a server this should never happen +;;; #f)) +;;; (begin +;;; ;; (print "in get-best-server with skeys=" skeys) +;;; (if (> (- (current-seconds) (area-last-srvup acfg)) 10) +;;; (begin +;;; (update-known-servers acfg) +;;; (sdbg> "get-best-server" "update-known-servers" start-time #f #f))) +;;; +;;; ;; for each server look at the list of dbfiles, total number of dbs being handled +;;; ;; and the rand number, save the best host +;;; ;; also do a delist-db for each server dbfile not used +;;; (let* ((best-server #f) +;;; (servers-to-delist (make-hash-table))) +;;; (for-each +;;; (lambda (srvid) +;;; (let* ((server (hash-table-ref/default servers srvid #f)) +;;; (stats (hash-table-ref/default (area-hoststats acfg) srvid '(())))) +;;; ;; (print "stats: " stats) +;;; (if server +;;; (let* ((dbweights (car stats)) +;;; (srvload (length (filter (lambda (x)(not (equal? dbname (car x)))) dbweights))) +;;; (dbrec (alist-ref dbname dbweights equal?)) ;; get the pair with fname . randscore +;;; (randnum (if dbrec +;;; dbrec ;; (cdr dbrec) +;;; 0))) +;;; (hash-table-set! srvstats srvid (make-srvstat numfiles: srvload randnum: randnum pkt: server)))))) +;;; skeys) +;;; +;;; (let* ((sorted (sort (hash-table-values srvstats) +;;; (lambda (a b) +;;; (let ((numfiles-a (srvstat-numfiles a)) +;;; (numfiles-b (srvstat-numfiles b)) +;;; (randnum-a (srvstat-randnum a)) +;;; (randnum-b (srvstat-randnum b))) +;;; (if (< numfiles-a numfiles-b) ;; Note, I don't think adding an offset works here. Goal was only move file handling to a different server if it has 2 less +;;; #t +;;; (if (and (equal? numfiles-a numfiles-b) +;;; (< randnum-a randnum-b)) +;;; #t +;;; #f)))))) +;;; (best (if (null? sorted) +;;; (begin +;;; (print "ERROR: should never be null due to self as server.") +;;; #f) +;;; (srvstat-pkt (car sorted))))) +;;; #;(print "SERVER(" url "): " dbname ": " (map (lambda (srv) +;;; (let ((p (srvstat-pkt srv))) +;;; (conc (alist-ref 'ipaddr p) ":" (alist-ref 'port p) +;;; "(" (srvstat-numfiles srv)","(srvstat-randnum srv)")"))) +;;; sorted)) +;;; best)))))) +;;; +;;; ;; send out an "I'm about to exit notice to all known servers" +;;; ;; +;;; (define (death-imminent acfg) +;;; '()) +;;; +;;; ;;====================================================================== +;;; ;; U L E X - T H E I N T E R E S T I N G S T U F F ! ! +;;; ;;====================================================================== +;;; +;;; ;; register a handler +;;; ;; NOTES: +;;; ;; dbinitsql is reserved for a list of sql statements for initializing the db +;;; ;; dbinitfn is reserved for a db init function, if exists called after dbinitsql +;;; ;; +;;; (define (register acfg key obj #!optional (ctype 'dbwrite)) +;;; (let ((ht (area-rtable acfg))) +;;; (if (hash-table-exists? ht key) +;;; (print "WARNING: redefinition of entry " key)) +;;; (hash-table-set! ht key (make-calldat obj: obj ctype: ctype)))) +;;; +;;; ;; usage: register-batch acfg '((key1 . sql1) (key2 . sql2) ... ) +;;; ;; NB// obj is often an sql query +;;; ;; +;;; (define (register-batch acfg ctype data) +;;; (let ((ht (area-rtable acfg))) +;;; (map (lambda (dat) +;;; (hash-table-set! ht (car dat)(make-calldat obj: (cdr dat) ctype: ctype))) +;;; data))) +;;; +;;; (define (initialize-area-calls-from-specfile area specfile) +;;; (let* ((callspec (with-input-from-file specfile read ))) +;;; (for-each (lambda (group) +;;; (register-batch +;;; area +;;; (car group) +;;; (cdr group))) +;;; callspec))) +;;; +;;; ;; get-rentry +;;; ;; +;;; (define (get-rentry acfg key) +;;; (hash-table-ref/default (area-rtable acfg) key #f)) +;;; +;;; (define (get-rsql acfg key) +;;; (let ((cdat (get-rentry acfg key))) +;;; (if cdat +;;; (calldat-obj cdat) +;;; #f))) +;;; +;;; +;;; +;;; ;; blocking call: +;;; ;; client server +;;; ;; ------ ------ +;;; ;; call() +;;; ;; send-message() +;;; ;; nmsg-send() +;;; ;; nmsg-receive() +;;; ;; nmsg-respond(ack,cookie) +;;; ;; ack, cookie +;;; ;; mbox-thread-wait(cookie) +;;; ;; nmsg-send(client,cookie,result) +;;; ;; nmsg-respond(ack) +;;; ;; return result +;;; ;; +;;; ;; reserved action: +;;; ;; 'immediate +;;; ;; 'dbinitsql +;;; ;; +;;; (define (call acfg dbname action params #!optional (count 0)) +;;; (let* ((call-start-time (current-milliseconds)) +;;; (srv (get-best-server acfg dbname action)) +;;; (post-get-start-time (current-milliseconds)) +;;; (rdat (hash-table-ref/default (area-rtable acfg) action #f)) +;;; (myid (trim-pktid (area-pktid acfg))) +;;; (srvid (trim-pktid (alist-ref 'Z srv))) +;;; (cookie (make-cookie myid))) +;;; (sdbg> "call" "get-best-server" call-start-time #f call-start-time " from: " myid " to server: " srvid " for " dbname " action: " action " params: " params " rdat: " rdat) +;;; (print "INFO: call to " (alist-ref 'ipaddr srv) ":" (alist-ref 'port srv) " from " (area-myaddr acfg) ":" (area-port acfg) " for " dbname) +;;; (if (and srv rdat) ;; need both to dispatch a request +;;; (let* ((ripaddr (alist-ref 'ipaddr srv)) +;;; (rsrvid (alist-ref 'Z srv)) +;;; (rport (any->number (alist-ref 'port srv))) +;;; (res-full (if (and (equal? ripaddr (area-myaddr acfg)) +;;; (equal? rport (area-port acfg))) +;;; (request acfg ripaddr rport (area-pktid acfg) action cookie dbname params) +;;; (safe-call 'request ripaddr rport +;;; (area-myaddr acfg) +;;; (area-port acfg) +;;; #;(area-pktid acfg) +;;; rsrvid +;;; action cookie dbname params)))) +;;; ;; (print "res-full: " res-full) +;;; (match res-full +;;; ((response-ok response-msg rem ...) +;;; (let* ((send-message-time (current-milliseconds)) +;;; ;; (match res-full +;;; ;; ((response-ok response-msg) +;;; ;; (response-ok (car res-full)) +;;; ;; (response-msg (cadr res-full) +;;; ) +;;; ;; (res (take res-full 3))) ;; ctype == action, TODO: converge on one term <<=== what was this? BUG +;;; ;; (print "ulex:call: send-message took " (- send-message-time post-get-start-time) " ms params=" params) +;;; (sdbg> "call" "send-message" post-get-start-time #f call-start-time) +;;; (cond +;;; ((not response-ok) #f) +;;; ((member response-msg '("db read submitted" "db write submitted")) +;;; (let* ((cookie-id (cadddr res-full)) +;;; (mbox (make-mailbox)) +;;; (mbox-time (current-milliseconds))) +;;; (hash-table-set! (area-cookie2mbox acfg) cookie-id mbox) +;;; (let* ((mbox-timeout-secs 20) +;;; (mbox-timeout-result 'MBOX_TIMEOUT) +;;; (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result)) +;;; (mbox-receive-time (current-milliseconds))) +;;; (hash-table-delete! (area-cookie2mbox acfg) cookie-id) +;;; (sdbg> "call" "mailbox-receive" mbox-time #f call-start-time " from: " myid " to server: " srvid " for " dbname) +;;; ;; (print "ulex:call mailbox-receive took " (- mbox-receive-time mbox-time) "ms params=" params) +;;; res))) +;;; (else +;;; (print "Unhandled response \""response-msg"\"") +;;; #f)) +;;; ;; depending on what action (i.e. ctype) is we will block here waiting for +;;; ;; all the data (mechanism to be determined) +;;; ;; +;;; ;; if res is a "working on it" then wait +;;; ;; wait for result +;;; ;; mailbox thread wait on +;;; +;;; ;; if res is a "can't help you" then try a different server +;;; ;; if res is a "ack" (e.g. for one-shot requests) then return res +;;; )) +;;; (else +;;; (if (< count 10) +;;; (let* ((url (conc (alist-ref 'ipaddr srv) ":" (alist-ref 'port srv)))) +;;; (thread-sleep! 1) +;;; (print "ERROR: Bad result from " url ", dbname: " dbname ", action: " action ", params: " params ". Trying again in 1 second.") +;;; (call acfg dbname action params (+ count 1))) +;;; (begin +;;; (error (conc "ERROR: " count " tries, still have improper response res-full=" res-full))))))) +;;; (begin +;;; (if (not rdat) +;;; (print "ERROR: action " action " not registered.") +;;; (if (< count 10) +;;; (begin +;;; (thread-sleep! 1) +;;; (area-hosts-set! acfg (make-hash-table)) ;; clear out all known hosts +;;; (print "ERROR: no server found, srv=" srv ", trying again in 1 seconds") +;;; (call acfg dbname action params (+ count 1))) +;;; (begin +;;; (error (conc "ERROR: no server found after 10 tries, srv=" srv ", giving up.")) +;;; #;(error "No server available")))))))) +;;; +;;; +;;; ;;====================================================================== +;;; ;; U T I L I T I E S +;;; ;;====================================================================== +;;; +;;; ;; get a signature for identifing this process +;;; ;; +;;; (define (get-process-signature) +;;; (cons (get-host-name)(current-process-id))) +;;; +;;; ;;====================================================================== +;;; ;; S Y S T E M S T U F F +;;; ;;====================================================================== +;;; +;;; ;; get normalized cpu load by reading from /proc/loadavg and +;;; ;; /proc/cpuinfo return all three values and the number of real cpus +;;; ;; and the number of threads returns alist '((adj-cpu-load +;;; ;; . normalized-proc-load) ... etc. keys: adj-proc-load, +;;; ;; adj-core-load, 1m-load, 5m-load, 15m-load +;;; ;; +;;; (define (get-normalized-cpu-load) +;;; (let ((res (get-normalized-cpu-load-raw)) +;;; (default `((adj-proc-load . 2) ;; there is no right answer +;;; (adj-core-load . 2) +;;; (1m-load . 2) +;;; (5m-load . 0) ;; causes a large delta - thus causing default of throttling if stuff goes wrong +;;; (15m-load . 0) +;;; (proc . 1) +;;; (core . 1) +;;; (phys . 1) +;;; (error . #t)))) +;;; (cond +;;; ((and (list? res) +;;; (> (length res) 2)) +;;; res) +;;; ((eq? res #f) default) ;; add messages? +;;; ((eq? res #f) default) ;; this would be the #eof +;;; (else default)))) +;;; +;;; (define (get-normalized-cpu-load-raw) +;;; (let* ((actual-host (get-host-name))) ;; #f is localhost +;;; (let ((data (append +;;; (with-input-from-file "/proc/loadavg" read-lines) +;;; (with-input-from-file "/proc/cpuinfo" read-lines) +;;; (list "end"))) +;;; (load-rx (regexp "^([\\d\\.]+)\\s+([\\d\\.]+)\\s+([\\d\\.]+)\\s+.*$")) +;;; (proc-rx (regexp "^processor\\s+:\\s+(\\d+)\\s*$")) +;;; (core-rx (regexp "^core id\\s+:\\s+(\\d+)\\s*$")) +;;; (phys-rx (regexp "^physical id\\s+:\\s+(\\d+)\\s*$")) +;;; (max-num (lambda (p n)(max (string->number p) n)))) +;;; ;; (print "data=" data) +;;; (if (null? data) ;; something went wrong +;;; #f +;;; (let loop ((hed (car data)) +;;; (tal (cdr data)) +;;; (loads #f) +;;; (proc-num 0) ;; processor includes threads +;;; (phys-num 0) ;; physical chip on motherboard +;;; (core-num 0)) ;; core +;;; ;; (print hed ", " loads ", " proc-num ", " phys-num ", " core-num) +;;; (if (null? tal) ;; have all our data, calculate normalized load and return result +;;; (let* ((act-proc (+ proc-num 1)) +;;; (act-phys (+ phys-num 1)) +;;; (act-core (+ core-num 1)) +;;; (adj-proc-load (/ (car loads) act-proc)) +;;; (adj-core-load (/ (car loads) act-core)) +;;; (result +;;; (append (list (cons 'adj-proc-load adj-proc-load) +;;; (cons 'adj-core-load adj-core-load)) +;;; (list (cons '1m-load (car loads)) +;;; (cons '5m-load (cadr loads)) +;;; (cons '15m-load (caddr loads))) +;;; (list (cons 'proc act-proc) +;;; (cons 'core act-core) +;;; (cons 'phys act-phys))))) +;;; result) +;;; (regex-case +;;; hed +;;; (load-rx ( x l1 l5 l15 ) (loop (car tal)(cdr tal)(map string->number (list l1 l5 l15)) proc-num phys-num core-num)) +;;; (proc-rx ( x p ) (loop (car tal)(cdr tal) loads (max-num p proc-num) phys-num core-num)) +;;; (phys-rx ( x p ) (loop (car tal)(cdr tal) loads proc-num (max-num p phys-num) core-num)) +;;; (core-rx ( x c ) (loop (car tal)(cdr tal) loads proc-num phys-num (max-num c core-num))) +;;; (else +;;; (begin +;;; ;; (print "NO MATCH: " hed) +;;; (loop (car tal)(cdr tal) loads proc-num phys-num core-num)))))))))) +;;; +;;; (define (get-host-stats acfg) +;;; (let ((stats-hash (area-stats acfg))) +;;; ;; use this opportunity to remove references to dbfiles which have not been accessed in a while +;;; (for-each +;;; (lambda (dbname) +;;; (let* ((stats (hash-table-ref stats-hash dbname)) +;;; (last-access (stat-when stats))) +;;; (if (and (> last-access 0) ;; if zero then there has been no access +;;; (> (- (current-seconds) last-access) 10)) ;; not used in ten seconds +;;; (begin +;;; (print "Removing " dbname " from stats list") +;;; (hash-table-delete! stats-hash dbname) ;; remove from stats hash +;;; (stat-dbs-set! stats (hash-table-keys stats)))))) +;;; (hash-table-keys stats-hash)) +;;; +;;; `(,(hash-table->alist (area-dbs acfg)) ;; dbname => randnum +;;; ,(map (lambda (dbname) ;; dbname is the db name +;;; (cons dbname (stat-when (hash-table-ref stats-hash dbname)))) +;;; (hash-table-keys stats-hash)) +;;; (cpuload . ,(get-normalized-cpu-load))))) +;;; #;(stats . ,(map (lambda (k) ;; create an alist from the stats data +;;; (cons k (stat->alist (hash-table-ref (area-stats acfg) k)))) +;;; (hash-table-keys (area-stats acfg)))) +;;; +;;; #;(trace +;;; ;; assv +;;; ;; cdr +;;; ;; caar +;;; ;; ;; cdr +;;; ;; call +;;; ;; finalize-all-db-handles +;;; ;; get-all-server-pkts +;;; ;; get-normalized-cpu-load +;;; ;; get-normalized-cpu-load-raw +;;; ;; launch +;;; ;; nmsg-send +;;; ;; process-db-queries +;;; ;; receive-message +;;; ;; std-peer-handler +;;; ;; update-known-servers +;;; ;; work-queue-processor +;;; ) +;;; +;;; ;;====================================================================== +;;; ;; netutil +;;; ;; move this back to ulex-netutil.scm someday? +;;; ;;====================================================================== +;;; +;;; ;; #include +;;; ;; #include +;;; ;; #include +;;; ;; #include +;;; +;;; (foreign-declare "#include \"sys/types.h\"") +;;; (foreign-declare "#include \"sys/socket.h\"") +;;; (foreign-declare "#include \"ifaddrs.h\"") +;;; (foreign-declare "#include \"arpa/inet.h\"") +;;; +;;; ;; get IP addresses from ALL interfaces +;;; (define get-all-ips +;;; (foreign-safe-lambda* scheme-object () +;;; " +;;; +;;; // from https://stackoverflow.com/questions/17909401/linux-c-get-default-interfaces-ip-address : +;;; +;;; +;;; C_word lst = C_SCHEME_END_OF_LIST, len, str, *a; +;;; // struct ifaddrs *ifa, *i; +;;; // struct sockaddr *sa; +;;; +;;; struct ifaddrs * ifAddrStruct = NULL; +;;; struct ifaddrs * ifa = NULL; +;;; void * tmpAddrPtr = NULL; +;;; +;;; if ( getifaddrs(&ifAddrStruct) != 0) +;;; C_return(C_SCHEME_FALSE); +;;; +;;; // for (i = ifa; i != NULL; i = i->ifa_next) { +;;; for (ifa = ifAddrStruct; ifa != NULL; ifa = ifa->ifa_next) { +;;; if (ifa->ifa_addr->sa_family==AF_INET) { // Check it is +;;; // a valid IPv4 address +;;; tmpAddrPtr = &((struct sockaddr_in *)ifa->ifa_addr)->sin_addr; +;;; char addressBuffer[INET_ADDRSTRLEN]; +;;; inet_ntop(AF_INET, tmpAddrPtr, addressBuffer, INET_ADDRSTRLEN); +;;; // printf(\"%s IP Address %s\\n\", ifa->ifa_name, addressBuffer); +;;; len = strlen(addressBuffer); +;;; a = C_alloc(C_SIZEOF_PAIR + C_SIZEOF_STRING(len)); +;;; str = C_string(&a, len, addressBuffer); +;;; lst = C_a_pair(&a, str, lst); +;;; } +;;; +;;; // else if (ifa->ifa_addr->sa_family==AF_INET6) { // Check it is +;;; // // a valid IPv6 address +;;; // tmpAddrPtr = &((struct sockaddr_in6 *)ifa->ifa_addr)->sin6_addr; +;;; // char addressBuffer[INET6_ADDRSTRLEN]; +;;; // inet_ntop(AF_INET6, tmpAddrPtr, addressBuffer, INET6_ADDRSTRLEN); +;;; //// printf(\"%s IP Address %s\\n\", ifa->ifa_name, addressBuffer); +;;; // len = strlen(addressBuffer); +;;; // a = C_alloc(C_SIZEOF_PAIR + C_SIZEOF_STRING(len)); +;;; // str = C_string(&a, len, addressBuffer); +;;; // lst = C_a_pair(&a, str, lst); +;;; // } +;;; +;;; // else { +;;; // printf(\" not an IPv4 address\\n\"); +;;; // } +;;; +;;; } +;;; +;;; freeifaddrs(ifa); +;;; C_return(lst); +;;; +;;; ")) +;;; +;;; ;; Change this to bias for addresses with a reasonable broadcast value? +;;; ;; +;;; (define (ip-pref-less? a b) +;;; (let* ((rate (lambda (ipstr) +;;; (regex-case ipstr +;;; ( "^127\\." _ 0 ) +;;; ( "^(10\\.0|192\\.168\\.)\\..*" _ 1 ) +;;; ( else 2 ) )))) +;;; (< (rate a) (rate b)))) +;;; +;;; +;;; (define (get-my-best-address) +;;; (let ((all-my-addresses (get-all-ips)) +;;; ;;(all-my-addresses-old (vector->list (hostinfo-addresses (hostname->hostinfo (get-host-name))))) +;;; ) +;;; (cond +;;; ((null? all-my-addresses) +;;; (get-host-name)) ;; no interfaces? +;;; ((eq? (length all-my-addresses) 1) +;;; (car all-my-addresses)) ;; only one to choose from, just go with it +;;; +;;; (else +;;; (car (sort all-my-addresses ip-pref-less?))) +;;; ;; (else +;;; ;; (ip->string (car (filter (lambda (x) ;; take any but 127. +;;; ;; (not (eq? (u8vector-ref x 0) 127))) +;;; ;; all-my-addresses)))) +;;; +;;; ))) +;;; +;;; (define (get-all-ips-sorted) +;;; (sort (get-all-ips) ip-pref-less?)) +;;; +;;; + Index: utils/deps.scm ================================================================== --- utils/deps.scm +++ utils/deps.scm @@ -17,16 +17,21 @@ (if (eof-object? l) data (begin (regex-case l - ("^\\s*\\(import\\s+([^\\s]+)\\).*" (x md ) - (print "\"" md "\" -> \"" modname "\";"))) + ("^\\s*\\((import|use)\\s+([^\\s]+)\\).*" + (x junk md )(print "\"" md "\" -> \"" modname "\";")) + ;; now get entries with prefix + ("^\\s*\\((import|use)\\s+\\(prefix\\s+([^\\s]+)\\s+.*\\).*" + (x junk md ) (print "\"" md "\" -> \"" modname "\";")) + ) (loop (read-line))))))))) (define (do-all-mod-files) - (let ((modfiles (get-files "*mod.scm"))) + (let ((modfiles (append ;; '("megatest.scm" "dashboard.scm") + (get-files "*mod.scm")))) (for-each (lambda (mfile) (print "// " mfile) (get-deps mfile)) modfiles))) Index: vgmod.scm ================================================================== --- vgmod.scm +++ vgmod.scm @@ -22,11 +22,11 @@ ;; (declare (uses commonmod)) (module vgmod * -(import scheme chicken data-structures extras) -(import (prefix sqlite3 sqlite3:) posix typed-records srfi-18 srfi-69 format ports srfi-1 matchable) +(import scheme (chicken base)) +(import (prefix sqlite3 sqlite3:) typed-records srfi-18 srfi-69 format (chicken port) srfi-1 matchable) ;; (import commonmod) ;; (use (prefix ulex ulex:)) )