Index: apimod.scm ================================================================== --- apimod.scm +++ apimod.scm @@ -32,10 +32,12 @@ chicken.base chicken.process-context.posix chicken.string chicken.time chicken.condition + chicken.process + chicken.random ;; (prefix sqlite3 sqlite3:) typed-records srfi-18 srfi-69 @@ -201,11 +203,11 @@ ;;=============================================== ((get-keys-write) (db:get-keys dbstruct)) ;; force a dummy "write" query to force server; for debug in -repl ;; SERVERS - ((start-server) (apply server:kind-run params)) + ;; ((start-server) (apply server:kind-run params)) ((kill-server) (set! *server-run* #f)) ;; TESTS ;;((test-set-state-status-by-id) (apply mt:test-set-state-status-by-id dbstruct params)) @@ -428,7 +430,8 @@ ;; (list "ERROR, not string, list, number or boolean" 1 cmd params res))))) (db:obj->string res transport: 'http))) (begin (debug:print 0 *default-log-port* "Server refused to process request. Sever id mismatch. recived " key " expected: " *server-id* ".\nOther arguments recived: cmd=" cmd " params = " params) (db:obj->string (conc "Server refused to process request server-id mismatch: " key ", " *server-id*) transport: 'http))))) + ) ADDED attic/runs-launch-loop-test.scm Index: attic/runs-launch-loop-test.scm ================================================================== --- /dev/null +++ attic/runs-launch-loop-test.scm @@ -0,0 +1,76 @@ +;; Copyright 2006-2017, Matthew Welland. +;; +;; This file is part of Megatest. +;; +;; Megatest is free software: you can redistribute it and/or modify +;; it under the terms of the GNU General Public License as published by +;; the Free Software Foundation, either version 3 of the License, or +;; (at your option) any later version. +;; +;; Megatest is distributed in the hope that it will be useful, +;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +;; GNU General Public License for more details. +;; +;; You should have received a copy of the GNU General Public License +;; along with Megatest. If not, see . +;; +(use srfi-69) + +(define (runs:queue-next-hed tal reg n regful) + (if regful + (car reg) + (car tal))) + +(define (runs:queue-next-tal tal reg n regful) + (if regful + tal + (let ((newtal (cdr tal))) + (if (null? newtal) + reg + newtal + )))) + +(define (runs:queue-next-reg tal reg n regful) + (if regful + (cdr reg) + (if (eq? (length tal) 1) + '() + reg))) + +(use trace) +(trace runs:queue-next-hed + runs:queue-next-tal + runs:queue-next-reg) + + +(define tests '(1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20)) + +(define test-registry (make-hash-table)) + +(define n 3) + +(let loop ((hed (car tests)) + (tal (cdr tests)) + (reg '())) + (let* ((reglen (length reg)) + (regful (> reglen n))) + (print "hed=" hed ", length reg=" (length reg) ", (> lenreg n)=" (> (length reg) n)) + (let ((newtal (append tal (list hed)))) ;; used if we are not done with this test + (cond + ((not (hash-table-ref/default test-registry hed #f)) + (hash-table-set! test-registry hed #t) + (print "Registering #" hed) + (if (not (null? tal)) + (loop (runs:queue-next-hed tal reg n regful) + (runs:queue-next-tal tal reg n regful) + (let ((newl (append reg (list hed)))) + (if regful + (cdr newl) + newl))))) + (else + (print "Running #" hed) + (if (not (null? tal)) + (loop (runs:queue-next-hed tal reg n regful) + (runs:queue-next-tal tal reg n regful) + (runs:queue-next-reg tal reg n regful)))))))) Index: clientmod.scm ================================================================== --- clientmod.scm +++ clientmod.scm @@ -87,88 +87,7 @@ #;(define (client:logout serverdat) (let ((ok (and (socket? serverdat) (cdb:logout serverdat *toppath* (client:get-signature))))) ok)) -#;(define (client:connect iface port) - (http-transport:client-connect iface port) - #;(case (server:get-transport) - ((rpc) (rpc:client-connect iface port)) - ((http) (http:client-connect iface port)) - ((zmq) (zmq:client-connect iface port)) - (else (rpc:client-connect iface port)))) - -(define (client:setup areapath #!key (remaining-tries 100) (failed-connects 0)) - (client:setup-http areapath remaining-tries: remaining-tries failed-connects: failed-connects) - #;(case (server:get-transport) - ((rpc) (rpc-transport:client-setup remaining-tries: remaining-tries failed-connects: failed-connects)) ;;(client:setup-rpc run-id)) - ((http)(client:setup-http areapath remaining-tries: remaining-tries failed-connects: failed-connects)) - (else (rpc-transport:client-setup remaining-tries: remaining-tries failed-connects: failed-connects)))) ;; (client:setup-rpc run-id)))) - -;; Do all the connection work, look up the transport type and set up the -;; connection if required. -;; -;; There are two scenarios. -;; 1. We are a test manager and we received *transport-type* and *runremote* via cmdline -;; 2. We are a run tests, list runs or other interactive process and we must figure out -;; *transport-type* and *runremote* from the monitor.db -;; -;; client:setup -;; -;; lookup_server, need to remove *runremote* stuff -;; - -(define (client:setup-http areapath #!key (remaining-tries 100) (failed-connects 0)(area-dat #f)) - (debug:print-info 2 *default-log-port* "client:setup remaining-tries=" remaining-tries) - (server:start-and-wait areapath) - (if (<= remaining-tries 0) - (begin - (debug:print-error 0 *default-log-port* "failed to start or connect to server") - (exit 1)) - ;; - ;; Alternatively here, we can get the list of candidate servers and work our way - ;; through them searching for a good one. - ;; - (let* ((server-dat (server:get-rand-best areapath)) ;; (server:get-first-best areapath)) - (runremote (or area-dat *runremote*))) - (if (not server-dat) ;; no server found - (client:setup-http areapath remaining-tries: (- remaining-tries 1)) - (let ((host (cadr server-dat)) - (port (caddr server-dat)) - (server-id (caddr (cddr server-dat)))) - (debug:print-info 4 *default-log-port* "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries) - (if (and (not area-dat) - (not *runremote*)) - (begin - (set! *runremote* (make-and-init-remote)) - (let* ((server-info (remote-server-info *runremote*))) - (if server-info - (begin - (remote-server-url-set! *runremote* (server:record->url server-info)) - (remote-server-id-set! *runremote* (server:record->id server-info))))))) - (if (and host port server-id) - (let* ((start-res (case *transport-type* - ((http)(http-transport:client-connect host port server-id)))) - (ping-res (case *transport-type* - ((http)(rmt:login-no-auto-client-setup start-res))))) - (if (and start-res - ping-res) - (let ((runremote (or area-dat *runremote*))) ;; it might have been generated only a few statements ago - (remote-conndat-set! runremote start-res) ;; (hash-table-set! runremote run-id start-res) - (debug:print-info 2 *default-log-port* "connected to " (http-transport:server-dat-make-url start-res)) - start-res) - (begin ;; login failed but have a server record, clean out the record and try again - (debug:print-info 0 *default-log-port* "client:setup, login unsuccessful, will attempt to start server ... start-res=" start-res ", server-dat=" server-dat) ;; had runid. Fixes part of Randy;s ticket 1405717332 - (case *transport-type* - ((http)(http-transport:close-connections))) - (remote-conndat-set! runremote #f) ;; (hash-table-delete! runremote run-id) - (thread-sleep! 1) - (client:setup-http areapath remaining-tries: (- remaining-tries 1)) - ))) - (begin ;; no server registered - ;; (server:kind-run areapath) - (server:start-and-wait areapath) - (debug:print-info 0 *default-log-port* "client:setup, no server registered, remaining-tries=" remaining-tries) - (thread-sleep! 1) ;; (+ 5 (pseudo-random-integer (- 20 remaining-tries)))) ;; give server a little time to start up, randomize a little to avoid start storms. - (client:setup-http areapath remaining-tries: (- remaining-tries 1))))))))) ) Index: common.scm ================================================================== --- common.scm +++ common.scm @@ -42,416 +42,5 @@ ;; (define (exit . code) ;; (if (null? code) ;; (old-exit) ;; (old-exit code))) - -(define (common:with-queue-db mtconf proc #!key (use-lt #f)(toppath-in #f)) - (let* ((pktsdirs (common:get-pkts-dirs mtconf use-lt)) - (pktsdir (if pktsdirs (car pktsdirs) #f)) - (toppath (or (configf:lookup mtconf "scratchdat" "toppath") - toppath-in)) - (pdbpath (or (configf:lookup mtconf "setup" "pdbpath") pktsdir))) - (cond - ((not (and pktsdir toppath pdbpath)) - (debug:print 0 *default-log-port* "ERROR: settings are missing in your megatest.config for area management.") - (debug:print 0 *default-log-port* " you need to have pktsdirs in the [setup] section.")) - ((not (common:file-exists? pktsdir)) - (debug:print 0 *default-log-port* "ERROR: pkts directory not found " pktsdir)) - ((not (equal? (file-owner pktsdir)(current-effective-user-id))) - (debug:print 0 *default-log-port* "ERROR: directory " pktsdir " is not owned by " (current-effective-user-name))) - (else - (let* ((pdb (open-queue-db pdbpath "pkts.db" - schema: '("CREATE TABLE groups (id INTEGER PRIMARY KEY,groupname TEXT, CONSTRAINT group_constraint UNIQUE (groupname));")))) - (proc pktsdirs pktsdir pdb) - (dbi:close pdb)))))) - -(define (common:load-pkts-to-db mtconf #!key (use-lt #f)) - (common:with-queue-db - mtconf - (lambda (pktsdirs pktsdir pdb) - (for-each - (lambda (pktsdir) ;; look at all - (cond - ((not (common:file-exists? pktsdir)) - (debug:print 0 *default-log-port* "ERROR: packets directory " pktsdir " does not exist.")) - ((not (directory? pktsdir)) - (debug:print 0 *default-log-port* "ERROR: packets directory path " pktsdir " is not a directory.")) - ((not (file-readable? pktsdir)) - (debug:print 0 *default-log-port* "ERROR: packets directory path " pktsdir " is not readable.")) - (else - (debug:print-info 0 *default-log-port* "Loading packets found in " pktsdir) - (let ((pkts (glob (conc pktsdir "/*.pkt")))) - (for-each - (lambda (pkt) - (let* ((uuid (cadr (string-match ".*/([0-9a-f]+).pkt" pkt))) - (exists (lookup-by-uuid pdb uuid #f))) - (if (not exists) - (let* ((pktdat (string-intersperse - (with-input-from-file pkt read-lines) - "\n")) - (apkt (pkt->alist pktdat)) - (ptype (alist-ref 'T apkt))) - (add-to-queue pdb pktdat uuid (or ptype 'cmd) #f 0) - (debug:print 4 *default-log-port* "Added " uuid " of type " ptype " to queue")) - (debug:print 4 *default-log-port* "pkt: " uuid " exists, skipping...") - ))) - pkts))))) - pktsdirs)) - use-lt: use-lt)) - - -;;====================================================================== -;; N A N O M S G C L I E N T -;;====================================================================== -;; -;; -;; -;; (define (common:send-dboard-main-changed) -;; (let* ((dashboard-ips (mddb:get-dashboards))) -;; (for-each -;; (lambda (ipadr) -;; (let* ((soc (common:open-nm-req (conc "tcp://" ipadr))) -;; (msg (conc "main " *toppath*)) -;; (res (common:nm-send-receive-timeout soc msg))) -;; (if (not res) ;; couldn't reach that dashboard - remove it from db -;; (print "ERROR: couldn't reach dashboard " ipadr)) -;; res)) -;; dashboard-ips))) -;; -;; -;; ;;====================================================================== -;; ;; D A S H B O A R D D B -;; ;;====================================================================== -;; -;; (define (mddb:open-db) -;; (let* ((db (open-database (conc (get-environment-variable "HOME") "/.dashboard.db")))) -;; (set-busy-handler! db (busy-timeout 10000)) -;; (for-each -;; (lambda (qry) -;; (exec (sql db qry))) -;; (list -;; "CREATE TABLE IF NOT EXISTS vars (id INTEGER PRIMARY KEY,key TEXT, val TEXT, CONSTRAINT varsconstraint UNIQUE (key));" -;; "CREATE TABLE IF NOT EXISTS dashboards ( -;; id INTEGER PRIMARY KEY, -;; pid INTEGER, -;; username TEXT, -;; hostname TEXT, -;; ipaddr TEXT, -;; portnum INTEGER, -;; start_time TIMESTAMP DEFAULT (strftime('%s','now')), -;; CONSTRAINT hostport UNIQUE (hostname,portnum) -;; );" -;; )) -;; db)) -;; -;; ;; register a dashboard -;; ;; -;; (define (mddb:register-dashboard port) -;; (let* ((pid (current-process-id)) -;; (hostname (get-host-name)) -;; (ipaddr (server:get-best-guess-address hostname)) -;; (username (current-user-name)) ;; (car userinfo))) -;; (db (mddb:open-db))) -;; (print "Register monitor, pid: " pid ", hostname: " hostname ", port: " port ", username: " username) -;; (exec (sql db "INSERT OR REPLACE INTO dashboards (pid,username,hostname,ipaddr,portnum) VALUES (?,?,?,?,?);") -;; pid username hostname ipaddr port) -;; (close-database db))) -;; -;; ;; unregister a monitor -;; ;; -;; (define (mddb:unregister-dashboard host port) -;; (let* ((db (mddb:open-db))) -;; (print "Register unregister monitor, host:port=" host ":" port) -;; (exec (sql db "DELETE FROM dashboards WHERE hostname=? AND portnum=?;") host port) -;; (close-database db))) -;; -;; ;; get registered dashboards -;; ;; -;; (define (mddb:get-dashboards) -;; (let ((db (mddb:open-db))) -;; (query fetch-column -;; (sql db "SELECT ipaddr || ':' || portnum FROM dashboards;")))) - -;;====================================================================== -;; faux-lock is deprecated. Please use simple-lock below -;; -(define (common:faux-lock keyname #!key (wait-time 8)(allow-lock-steal #t)) - (if (rmt:no-sync-get/default keyname #f) ;; do not be tempted to compare to pid. locking is a one-shot action, if already locked for this pid it doesn't actually count - (if (> wait-time 0) - (begin - (thread-sleep! 1) - (if (eq? wait-time 1) ;; only one second left, steal the lock - (begin - (debug:print-info 0 *default-log-port* "stealing lock for " keyname) - (common:faux-unlock keyname force: #t))) - (common:faux-lock keyname wait-time: (- wait-time 1))) - #f) - (begin - (rmt:no-sync-set keyname (conc (current-process-id))) - (equal? (conc (current-process-id)) (conc (rmt:no-sync-get/default keyname #f)))))) - -(define (common:faux-unlock keyname #!key (force #f)) - (if (or force (equal? (conc (current-process-id)) (conc (rmt:no-sync-get/default keyname #f)))) - (begin - (if (rmt:no-sync-get/default keyname #f) (rmt:no-sync-del! keyname)) - #t) - #f)) - -(define (std-exit-procedure) - ;;(common:telemetry-log-close) - (on-exit (lambda () 0)) - ;;(debug:print-info 13 *default-log-port* "std-exit-procedure called; *time-to-exit*="*time-to-exit*) - (let ((no-hurry (if (bdat-time-to-exit *bdat*) ;; hurry up - #f - (begin - (bdat-time-to-exit-set! *bdat* #t) - #t)))) - (debug:print-info 4 *default-log-port* "starting exit process, finalizing databases.") - (if (and no-hurry (debug:debug-mode 18)) - (rmt:print-db-stats)) - (let ((th1 (make-thread (lambda () ;; thread for cleaning up, give it five seconds - (if *dbstruct-db* (db:close-all *dbstruct-db*)) ;; one second allocated - (if (bdat-task-db *bdat*) - (let ((db (cdr (bdat-task-db *bdat*)))) - (if (sqlite3:database? db) - (begin - (sqlite3:interrupt! db) - (sqlite3:finalize! db #t) - ;; (vector-set! (bdat-task-db *bdat*) 0 #f) - (bdat-task-db-set! *bdat* #f))))) - (http-client#close-idle-connections!) - ;; (if (and *runremote* - ;; (remote-conndat *runremote*)) - ;; (begin - ;; (http-client#close-all-connections!))) ;; for http-client - (if (not (eq? *default-log-port* (current-error-port))) - (close-output-port *default-log-port*)) - (set! *default-log-port* (current-error-port))) "Cleanup db exit thread")) - (th2 (make-thread (lambda () - (debug:print 4 *default-log-port* "Attempting clean exit. Please be patient and wait a few seconds...") - (if no-hurry - (begin - (thread-sleep! 5)) ;; give the clean up few seconds to do it's stuff - (begin - (thread-sleep! 2))) - (debug:print 4 *default-log-port* " ... done") - ) - "clean exit"))) - (thread-start! th1) - (thread-start! th2) - (thread-join! th1) - ) - ) - - 0) - - -;; TODO: for multiple areas, we will have multiple watchdogs; and multiple threads to manage -(define (init-watchdog) - (set! (bdat-watchdog-set! *bdat*) - (make-thread - (lambda () - (handle-exceptions - exn - (begin - (print-call-chain) - (print " message: " ((condition-property-accessor 'exn 'message) exn) ", exn=" exn)) - (common:watchdog))) - "Watchdog thread")) - (start-watchdog)) - -(define (start-watchdog) - ;;(if (not (args:get-arg "-server")) - ;; (thread-start! *watchdog*)) ;; if starting a server; wait till we get to running state before kicking off watchdog - (let* ((no-watchdog-args - '("-list-runs" - "-testdata-csv" - "-list-servers" - "-server" - "-adjutant" - "-list-disks" - "-list-targets" - "-show-runconfig" - ;;"-list-db-targets" - "-show-runconfig" - "-show-config" - "-show-cmdinfo" - "-cleanup-db" - )) - (no-watchdog-argvals (list '("-archive" . "replicate-db"))) - (start-watchdog-specail-arg-val (let loop ((hed (car no-watchdog-argvals)) - (tail (cdr no-watchdog-argvals))) - ;; (print "hed" hed " arg " (args:get-arg (car hed)) " val:" (cdr hed) " eql" (equal? (args:get-arg (car hed)) (cdr hed))) - (if (equal? (args:get-arg (car hed)) (cdr hed)) - #f - (if (null? tail) - #t - (loop (car tail) (cdr tail)))))) - (no-watchdog-args-vals (filter (lambda (x) x) - (map args:get-arg no-watchdog-args))) - (start-watchdog (and (null? no-watchdog-args-vals) start-watchdog-specail-arg-val))) - ;(print "no-watchdog-args="no-watchdog-args "no-watchdog-args-vals="no-watchdog-args-vals " start-watchdog-specail-arg-val:" start-watchdog-specail-arg-val " start-watchdog:" start-watchdog) - (if start-watchdog - (thread-start! (bdat-watchdog *bdat*))))) - -;;====================================================================== -;; TODO: for multiple areas, we will have multiple watchdogs; and multiple threads to manage -(define (common:watchdog) - (debug:print-info 13 *default-log-port* "common:watchdog entered.") - (if (launch:setup) - (if (common:on-homehost?) - (let ((dbstruct (db:setup #t))) - (debug:print-info 13 *default-log-port* "after db:setup with dbstruct=" dbstruct) - (cond - ((dbr:dbstruct-read-only dbstruct) - (debug:print-info 13 *default-log-port* "loading read-only watchdog") - (common:readonly-watchdog dbstruct)) - (else - (debug:print-info 13 *default-log-port* "loading writable-watchdog.") - (let* ((syncer (or (configf:lookup *configdat* "server" "sync-method") "brute-force-sync"))) - (cond - ((equal? syncer "brute-force-sync") - (server:writable-watchdog-bruteforce dbstruct)) - ((equal? syncer "delta-sync") - (server:writable-watchdog-deltasync dbstruct)) - (else - (debug:print-error 0 *default-log-port* "Unknown server/sync-method specified ("syncer") - valid values are brute-force-sync and delta-sync.") - (exit 1))) - ;;(debug:print 1 *default-log-port* "INFO: ["(common:human-time)"] Syncer started (method="syncer")") - ))) - (debug:print-info 13 *default-log-port* "watchdog done.")) - (debug:print-info 13 *default-log-port* "no need for watchdog on non-homehost")))) - - -;;====================================================================== -;; currently the primary job of the watchdog is to run the sync back to megatest.db from the db in /tmp -;; if we are on the homehost and we are a server (by definition we are on the homehost if we are a server) -;; -(define (common:readonly-watchdog dbstruct) - (let ((just-testing 0.0501)) - (thread-sleep! just-testing)) ;; (/ 1 20)) ;; 0.051) ;; delay for startup - (debug:print-info 13 *default-log-port* "common:readonly-watchdog entered.") - ;; sync megatest.db to /tmp/.../megatst.db - (let* ((sync-cool-off-duration 3) - (golden-mtdb (dbr:dbstruct-mtdb dbstruct)) - (golden-mtpath (db:dbdat-get-path golden-mtdb)) - (tmp-mtdb (dbr:dbstruct-tmpdb dbstruct)) - (tmp-mtpath (db:dbdat-get-path tmp-mtdb))) - (debug:print-info 0 *default-log-port* "Read-only periodic sync thread started.") - (let loop ((last-sync-time 0)) - (debug:print-info 13 *default-log-port* "loop top tmp-mtpath="tmp-mtpath" golden-mtpath="golden-mtpath) - (let* ((duration-since-last-sync (- (current-seconds) last-sync-time))) - (debug:print-info 13 *default-log-port* "duration-since-last-sync="duration-since-last-sync) - (if (and (not (bdat-time-to-exit *bdat*)) - (< duration-since-last-sync sync-cool-off-duration)) - (thread-sleep! (- sync-cool-off-duration duration-since-last-sync))) - (if (not (bdat-time-to-exit *bdat*)) - (let ((golden-mtdb-mtime (file-modification-time golden-mtpath)) - (tmp-mtdb-mtime (file-modification-time tmp-mtpath))) - (if (> golden-mtdb-mtime tmp-mtdb-mtime) - (if (< golden-mtdb-mtime (- (current-seconds) 3)) ;; file has NOT been touched in past three seconds, this way multiple servers won't fight to sync back - (let ((res (db:multi-db-sync dbstruct 'old2new))) - (debug:print-info 13 *default-log-port* "rosync called, " res " records transferred.")))) - (loop (current-seconds))) - #t))) - (debug:print-info 0 *default-log-port* "Exiting readonly-watchdog timer, (bdat-time-to-exit *bdat*) = " (bdat-time-to-exit *bdat*)" pid="(current-process-id)" mtpath="golden-mtpath))) - -;;====================================================================== -;; from metadat lookup MEGATEST_VERSION -;; -(define (common:get-last-run-version) ;; RADT => How does this work in send-receive function??; assume it is the value saved in some DB - (rmt:get-var "MEGATEST_VERSION")) - -(define (common:get-last-run-version-number) - (string->number - (substring (common:get-last-run-version) 0 6))) - -(define (common:set-last-run-version) - (rmt:set-var "MEGATEST_VERSION" (common:version-signature))) - -;;====================================================================== -;; postive number if megatest version > db version -;; negative number if megatest version < db version -(define (common:version-db-delta) - (- megatest-version (common:get-last-run-version-number))) - -(define (common:version-changed?) - (not (equal? (common:get-last-run-version) - (common:version-signature)))) - -(define (common:api-changed?) - (not (equal? (substring (->string megatest-version) 0 4) - (substring (conc (common:get-last-run-version)) 0 4)))) - -;;====================================================================== -;; Move me elsewhere ... -;; RADT => Why do we meed the version check here, this is called only if version misma -;; -(define (common:cleanup-db dbstruct #!key (full #f)) - (apply db:multi-db-sync - dbstruct - 'schema - ;; 'new2old - 'killservers - 'adj-target - ;; 'old2new - 'new2old - ;; (if full - '(dejunk) - ;; '()) - ) - (if (common:api-changed?) - (common:set-last-run-version))) - -;;====================================================================== -;; Force a megatest cleanup-db if version is changed and skip-version-check not specified -;; Do NOT check if not on homehost! -;; -(define (common:exit-on-version-changed) - (if (common:on-homehost?) - (if (common:api-changed?) - (let* ((mtconf (conc (get-environment-variable "MT_RUN_AREA_HOME") "/megatest.config")) - (dbfile (conc (get-environment-variable "MT_RUN_AREA_HOME") "/megatest.db")) - (read-only (not (file-writable? dbfile))) - (dbstruct (db:setup #t))) - (debug:print 0 *default-log-port* - "WARNING: Version mismatch!\n" - " expected: " (common:version-signature) "\n" - " got: " (common:get-last-run-version)) - (cond - ((get-environment-variable "MT_SKIP_DB_MIGRATE") #t) - ((and (common:file-exists? mtconf) (common:file-exists? dbfile) (not read-only) - (eq? (current-user-id)(file-owner mtconf))) ;; safe to run -cleanup-db - (debug:print 0 *default-log-port* " I see you are the owner of megatest.config, attempting to cleanup and reset to new version") - (handle-exceptions - exn - (begin - (debug:print 0 *default-log-port* "Failed to switch versions. exn=" exn) - (debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn)) - (print-call-chain (current-error-port)) - (exit 1)) - (common:cleanup-db dbstruct))) - ((not (common:file-exists? mtconf)) - (debug:print 0 *default-log-port* " megatest.config does not exist in this area. Cannot proceed with megatest version migration.") - (exit 1)) - ((not (common:file-exists? dbfile)) - (debug:print 0 *default-log-port* " megatest.db does not exist in this area. Cannot proceed with megatest version migration.") - (exit 1)) - ((not (eq? (current-user-id)(file-owner mtconf))) - (debug:print 0 *default-log-port* " You do not own megatest.db in this area. Cannot proceed with megatest version migration.") - (exit 1)) - (read-only - (debug:print 0 *default-log-port* " You have read-only access to this area. Cannot proceed with megatest version migration.") - (exit 1)) - (else - (debug:print 0 *default-log-port* " to switch versions you can run: \"megatest -cleanup-db\"") - (exit 1))))))) -;;====================================================================== -;; (begin -;; (debug:print 0 *default-log-port* "ERROR: cannot migrate version unless on homehost. Exiting.") -;; (exit 1)))) - -(define (common:run-sync?) - (and (common:on-homehost?) - (args:get-arg "-server"))) - Index: commonmod.scm ================================================================== --- commonmod.scm +++ commonmod.scm @@ -3660,7 +3660,85 @@ (let ((fields (configf:get-section confdat "fields"))) (string-join (map (lambda (field)(conc (car field) " " (cadr field))) fields) ","))) + + +;;====================================================================== +;; N A N O M S G C L I E N T +;;====================================================================== +;; +;; +;; +;; (define (common:send-dboard-main-changed) +;; (let* ((dashboard-ips (mddb:get-dashboards))) +;; (for-each +;; (lambda (ipadr) +;; (let* ((soc (common:open-nm-req (conc "tcp://" ipadr))) +;; (msg (conc "main " *toppath*)) +;; (res (common:nm-send-receive-timeout soc msg))) +;; (if (not res) ;; couldn't reach that dashboard - remove it from db +;; (print "ERROR: couldn't reach dashboard " ipadr)) +;; res)) +;; dashboard-ips))) +;; +;; +;; ;;====================================================================== +;; ;; D A S H B O A R D D B +;; ;;====================================================================== +;; +;; (define (mddb:open-db) +;; (let* ((db (open-database (conc (get-environment-variable "HOME") "/.dashboard.db")))) +;; (set-busy-handler! db (busy-timeout 10000)) +;; (for-each +;; (lambda (qry) +;; (exec (sql db qry))) +;; (list +;; "CREATE TABLE IF NOT EXISTS vars (id INTEGER PRIMARY KEY,key TEXT, val TEXT, CONSTRAINT varsconstraint UNIQUE (key));" +;; "CREATE TABLE IF NOT EXISTS dashboards ( +;; id INTEGER PRIMARY KEY, +;; pid INTEGER, +;; username TEXT, +;; hostname TEXT, +;; ipaddr TEXT, +;; portnum INTEGER, +;; start_time TIMESTAMP DEFAULT (strftime('%s','now')), +;; CONSTRAINT hostport UNIQUE (hostname,portnum) +;; );" +;; )) +;; db)) +;; +;; ;; register a dashboard +;; ;; +;; (define (mddb:register-dashboard port) +;; (let* ((pid (current-process-id)) +;; (hostname (get-host-name)) +;; (ipaddr (server:get-best-guess-address hostname)) +;; (username (current-user-name)) ;; (car userinfo))) +;; (db (mddb:open-db))) +;; (print "Register monitor, pid: " pid ", hostname: " hostname ", port: " port ", username: " username) +;; (exec (sql db "INSERT OR REPLACE INTO dashboards (pid,username,hostname,ipaddr,portnum) VALUES (?,?,?,?,?);") +;; pid username hostname ipaddr port) +;; (close-database db))) +;; +;; ;; unregister a monitor +;; ;; +;; (define (mddb:unregister-dashboard host port) +;; (let* ((db (mddb:open-db))) +;; (print "Register unregister monitor, host:port=" host ":" port) +;; (exec (sql db "DELETE FROM dashboards WHERE hostname=? AND portnum=?;") host port) +;; (close-database db))) +;; +;; ;; get registered dashboards +;; ;; +;; (define (mddb:get-dashboards) +;; (let ((db (mddb:open-db))) +;; (query fetch-column +;; (sql db "SELECT ipaddr || ':' || portnum FROM dashboards;")))) + + + + + ) Index: dbmod.scm ================================================================== --- dbmod.scm +++ dbmod.scm @@ -25,10 +25,12 @@ (declare (uses mtargs)) (declare (uses mtver)) (declare (uses csv-xml)) (declare (uses keysmod)) (declare (uses mtmod)) +(declare (uses pkts)) +(declare (uses dbi)) (module dbmod * (import scheme @@ -69,11 +71,12 @@ configfmod debugprint keysmod mtmod mtver - + pkts + (prefix dbi dbi:) ) ;;====================================================================== ;; Database access ;;====================================================================== @@ -5419,7 +5422,70 @@ (define (mt:get-run-stats dbstruct run-id) ;; Get run stats from local access, move this ... but where? (db:get-run-stats dbstruct run-id)) + +;; When using zmq this would send the message back (two step process) +;; with spiffy or rpc this simply returns the return data to be returned +;; +(define (server:reply return-addr query-sig success/fail result) + (debug:print-info 11 *default-log-port* "server:reply return-addr=" return-addr ", result=" result) + (db:obj->string (vector success/fail query-sig result))) + + +(define (common:with-queue-db mtconf proc #!key (use-lt #f)(toppath-in #f)) + (let* ((pktsdirs (common:get-pkts-dirs mtconf use-lt)) + (pktsdir (if pktsdirs (car pktsdirs) #f)) + (toppath (or (configf:lookup mtconf "scratchdat" "toppath") + toppath-in)) + (pdbpath (or (configf:lookup mtconf "setup" "pdbpath") pktsdir))) + (cond + ((not (and pktsdir toppath pdbpath)) + (debug:print 0 *default-log-port* "ERROR: settings are missing in your megatest.config for area management.") + (debug:print 0 *default-log-port* " you need to have pktsdirs in the [setup] section.")) + ((not (common:file-exists? pktsdir)) + (debug:print 0 *default-log-port* "ERROR: pkts directory not found " pktsdir)) + ((not (equal? (file-owner pktsdir)(current-effective-user-id))) + (debug:print 0 *default-log-port* "ERROR: directory " pktsdir " is not owned by " (current-effective-user-name))) + (else + (let* ((pdb (open-queue-db pdbpath "pkts.db" + schema: '("CREATE TABLE groups (id INTEGER PRIMARY KEY,groupname TEXT, CONSTRAINT group_constraint UNIQUE (groupname));")))) + (proc pktsdirs pktsdir pdb) + (dbi:close pdb)))))) + +(define (common:load-pkts-to-db mtconf #!key (use-lt #f)) + (common:with-queue-db + mtconf + (lambda (pktsdirs pktsdir pdb) + (for-each + (lambda (pktsdir) ;; look at all + (cond + ((not (common:file-exists? pktsdir)) + (debug:print 0 *default-log-port* "ERROR: packets directory " pktsdir " does not exist.")) + ((not (directory? pktsdir)) + (debug:print 0 *default-log-port* "ERROR: packets directory path " pktsdir " is not a directory.")) + ((not (file-readable? pktsdir)) + (debug:print 0 *default-log-port* "ERROR: packets directory path " pktsdir " is not readable.")) + (else + (debug:print-info 0 *default-log-port* "Loading packets found in " pktsdir) + (let ((pkts (glob (conc pktsdir "/*.pkt")))) + (for-each + (lambda (pkt) + (let* ((uuid (cadr (string-match ".*/([0-9a-f]+).pkt" pkt))) + (exists (lookup-by-uuid pdb uuid #f))) + (if (not exists) + (let* ((pktdat (string-intersperse + (with-input-from-file pkt read-lines) + "\n")) + (apkt (pkt->alist pktdat)) + (ptype (alist-ref 'T apkt))) + (add-to-queue pdb pktdat uuid (or ptype 'cmd) #f 0) + (debug:print 4 *default-log-port* "Added " uuid " of type " ptype " to queue")) + (debug:print 4 *default-log-port* "pkt: " uuid " exists, skipping...") + ))) + pkts))))) + pktsdirs)) + use-lt: use-lt)) + ) Index: deps.pdf ================================================================== --- deps.pdf +++ deps.pdf cannot compute difference between binary files DELETED http-transport.scm Index: http-transport.scm ================================================================== --- http-transport.scm +++ /dev/null @@ -1,18 +0,0 @@ - -;; Copyright 2006-2012, Matthew Welland. -;; -;; This file is part of Megatest. -;; -;; Megatest is free software: you can redistribute it and/or modify -;; it under the terms of the GNU General Public License as published by -;; the Free Software Foundation, either version 3 of the License, or -;; (at your option) any later version. -;; -;; Megatest is distributed in the hope that it will be useful, -;; but WITHOUT ANY WARRANTY; without even the implied warranty of -;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -;; GNU General Public License for more details. -;; -;; You should have received a copy of the GNU General Public License -;; along with Megatest. If not, see . - Index: http-transportmod.scm ================================================================== --- http-transportmod.scm +++ http-transportmod.scm @@ -700,72 +700,10 @@ ;; (thread-start! th2) ;; (thread-start! th1) ;; (thread-join! th2)))) - - -;; called in megatest.scm, host-port is string hostname:port -;; -;; NOTE: This is NOT called directly from clients as not all transports support a client running -;; in the same process as the server. -;; -(define (server:ping host-port-in server-id #!key (do-exit #f)) - (let ((host:port (if (not host-port-in) ;; use read-dotserver to find - #f ;; (server:check-if-running *toppath*) - ;; (if (number? host-port-in) ;; we were handed a server-id - ;; (let ((srec (tasks:get-server-by-id (db:delay-if-busy (tasks:open-db)) host-port-in))) - ;; ;; (print "srec: " srec " host-port-in: " host-port-in) - ;; (if srec - ;; (conc (vector-ref srec 3) ":" (vector-ref srec 4)) - ;; (conc "no such server-id " host-port-in))) - host-port-in))) ;; ) - (let* ((host-port (if host:port - (let ((slst (string-split host:port ":"))) - (if (eq? (length slst) 2) - (list (car slst)(string->number (cadr slst))) - #f)) - #f))) -;; (toppath (launch:setup))) - ;; (print "host-port=" host-port) - (if (not host-port) - (begin - (if host-port-in - (debug:print 0 *default-log-port* "ERROR: bad host:port")) - (if do-exit (exit 1)) - #f) - (let* ((iface (car host-port)) - (port (cadr host-port)) - (server-dat (http-transport:client-connect iface port server-id)) - (login-res (rmt:login-no-auto-client-setup server-dat))) - (if (and (list? login-res) - (car login-res)) - (begin - ;; (print "LOGIN_OK") - (if do-exit (exit 0)) - #t) - (begin - ;; (print "LOGIN_FAILED") - (if do-exit (exit 1)) - #f))))))) - -;; This login does no retries under the hood - it acts a bit like a ping. -;; Deprecated for nmsg-transport. -;; -(define (rmt:login-no-auto-client-setup connection-info) - (rmt:send-receive-no-auto-client-setup connection-info 'login 0 (list *toppath* megatest-version *my-client-signature*))) - -(define (rmt:send-receive-no-auto-client-setup connection-info cmd run-id params) - (let* ((run-id (if run-id run-id 0)) - (res (handle-exceptions - exn - #f - (http-transport:client-api-send-receive run-id connection-info cmd params)))) - (if (and res (vector-ref res 0)) - (vector-ref res 1) ;;; YES!! THIS IS CORRECT!! CHANGE IT HERE, THEN CHANGE rmt:send-receive ALSO!!! - #f))) - ;; Get the transport (define (server:get-transport) (if *transport-type* *transport-type* @@ -782,8 +720,73 @@ (with-output-to-string (lambda () (write (list (current-directory) (current-process-id) (argv))))))) + +(define (server:get-client-signature) + (if *my-client-signature* *my-client-signature* + (let ((sig (server:mk-signature))) + (set! *my-client-signature* sig) + *my-client-signature*))) + +;; run ping in separate process, safest way in some cases +;; +(define (server:ping-server ifaceport) + (with-input-from-pipe + (conc (common:get-megatest-exe) " -ping " ifaceport) + (lambda () + (let loop ((inl (read-line)) + (res "NOREPLY")) + (if (eof-object? inl) + (case (string->symbol res) + ((NOREPLY) #f) + ((LOGIN_OK) #t) + (else #f)) + (loop (read-line) inl)))))) + + +;;====================================================================== +;; S E R V E R +;;====================================================================== + +;; Call this to start the actual server +;; + +;; all routes though here end in exit ... +;; +;; start_server +;; +(define (server:launch run-id transport-type) + (http-transport:launch)) + +;;====================================================================== +;; S E R V E R U T I L I T I E S +;;====================================================================== + +;; NOT USED (well, ok, reference in rpc-transport but otherwise not used). +;; +(define (server:login toppath) + (lambda (toppath) + (set! *db-last-access* (current-seconds)) ;; might not be needed. + (if (equal? *toppath* toppath) + #t + #f))) + +;; (define server:sync-lock-token "SERVER_SYNC_LOCK") +;; (define (server:release-sync-lock) +;; (db:no-sync-del! *no-sync-db* server:sync-lock-token)) +;; (define (server:have-sync-lock?) +;; (let* ((have-lock-pair (db:no-sync-get-lock *no-sync-db* server:sync-lock-token)) +;; (have-lock? (car have-lock-pair)) +;; (lock-time (cdr have-lock-pair)) +;; (lock-age (- (current-seconds) lock-time))) +;; (cond +;; (have-lock? #t) +;; ((>lock-age +;; (* 3 (configf:lookup-number *configdat* "server" "minimum-intersync-delay" default: 180))) +;; (server:release-sync-lock) +;; (server:have-sync-lock?)) +;; (else #f)))) ) Index: launchmod.scm ================================================================== --- launchmod.scm +++ launchmod.scm @@ -2269,8 +2269,368 @@ (launch:setup force-reread: #t) ;; (launch:cache-config) ;; there are two independent config cache locations, turning this one off for now. MRW. )) ;; we can safely cache megatest.config since we have a valid runconfig data)))) +;;====================================================================== +;; TODO: for multiple areas, we will have multiple watchdogs; and multiple threads to manage +(define (common:watchdog) + (debug:print-info 13 *default-log-port* "common:watchdog entered.") + (if (launch:setup) + (if (common:on-homehost?) + (let ((dbstruct (db:setup #t))) + (debug:print-info 13 *default-log-port* "after db:setup with dbstruct=" dbstruct) + (cond + ((dbr:dbstruct-read-only dbstruct) + (debug:print-info 13 *default-log-port* "loading read-only watchdog") + (common:readonly-watchdog dbstruct)) + (else + (debug:print-info 13 *default-log-port* "loading writable-watchdog.") + (let* ((syncer (or (configf:lookup *configdat* "server" "sync-method") "brute-force-sync"))) + (cond + ((equal? syncer "brute-force-sync") + (server:writable-watchdog-bruteforce dbstruct)) + ((equal? syncer "delta-sync") + (server:writable-watchdog-deltasync dbstruct)) + (else + (debug:print-error 0 *default-log-port* "Unknown server/sync-method specified ("syncer") - valid values are brute-force-sync and delta-sync.") + (exit 1))) + ;;(debug:print 1 *default-log-port* "INFO: ["(common:human-time)"] Syncer started (method="syncer")") + ))) + (debug:print-info 13 *default-log-port* "watchdog done.")) + (debug:print-info 13 *default-log-port* "no need for watchdog on non-homehost")))) + +;;====================================================================== +;; currently the primary job of the watchdog is to run the sync back to megatest.db from the db in /tmp +;; if we are on the homehost and we are a server (by definition we are on the homehost if we are a server) +;; +(define (common:readonly-watchdog dbstruct) + (let ((just-testing 0.0501)) + (thread-sleep! just-testing)) ;; (/ 1 20)) ;; 0.051) ;; delay for startup + (debug:print-info 13 *default-log-port* "common:readonly-watchdog entered.") + ;; sync megatest.db to /tmp/.../megatst.db + (let* ((sync-cool-off-duration 3) + (golden-mtdb (dbr:dbstruct-mtdb dbstruct)) + (golden-mtpath (db:dbdat-get-path golden-mtdb)) + (tmp-mtdb (dbr:dbstruct-tmpdb dbstruct)) + (tmp-mtpath (db:dbdat-get-path tmp-mtdb))) + (debug:print-info 0 *default-log-port* "Read-only periodic sync thread started.") + (let loop ((last-sync-time 0)) + (debug:print-info 13 *default-log-port* "loop top tmp-mtpath="tmp-mtpath" golden-mtpath="golden-mtpath) + (let* ((duration-since-last-sync (- (current-seconds) last-sync-time))) + (debug:print-info 13 *default-log-port* "duration-since-last-sync="duration-since-last-sync) + (if (and (not (bdat-time-to-exit *bdat*)) + (< duration-since-last-sync sync-cool-off-duration)) + (thread-sleep! (- sync-cool-off-duration duration-since-last-sync))) + (if (not (bdat-time-to-exit *bdat*)) + (let ((golden-mtdb-mtime (file-modification-time golden-mtpath)) + (tmp-mtdb-mtime (file-modification-time tmp-mtpath))) + (if (> golden-mtdb-mtime tmp-mtdb-mtime) + (if (< golden-mtdb-mtime (- (current-seconds) 3)) ;; file has NOT been touched in past three seconds, this way multiple servers won't fight to sync back + (let ((res (db:multi-db-sync dbstruct 'old2new))) + (debug:print-info 13 *default-log-port* "rosync called, " res " records transferred.")))) + (loop (current-seconds))) + #t))) + (debug:print-info 0 *default-log-port* "Exiting readonly-watchdog timer, (bdat-time-to-exit *bdat*) = " (bdat-time-to-exit *bdat*)" pid="(current-process-id)" mtpath="golden-mtpath))) + + +;; TODO: for multiple areas, we will have multiple watchdogs; and multiple threads to manage +(define (init-watchdog) + (set! (bdat-watchdog-set! *bdat*) + (make-thread + (lambda () + (handle-exceptions + exn + (begin + (print-call-chain) + (print " message: " ((condition-property-accessor 'exn 'message) exn) ", exn=" exn)) + (common:watchdog))) + "Watchdog thread")) + (start-watchdog)) + +(define (start-watchdog) + ;;(if (not (args:get-arg "-server")) + ;; (thread-start! *watchdog*)) ;; if starting a server; wait till we get to running state before kicking off watchdog + (let* ((no-watchdog-args + '("-list-runs" + "-testdata-csv" + "-list-servers" + "-server" + "-adjutant" + "-list-disks" + "-list-targets" + "-show-runconfig" + ;;"-list-db-targets" + "-show-runconfig" + "-show-config" + "-show-cmdinfo" + "-cleanup-db" + )) + (no-watchdog-argvals (list '("-archive" . "replicate-db"))) + (start-watchdog-specail-arg-val (let loop ((hed (car no-watchdog-argvals)) + (tail (cdr no-watchdog-argvals))) + ;; (print "hed" hed " arg " (args:get-arg (car hed)) " val:" (cdr hed) " eql" (equal? (args:get-arg (car hed)) (cdr hed))) + (if (equal? (args:get-arg (car hed)) (cdr hed)) + #f + (if (null? tail) + #t + (loop (car tail) (cdr tail)))))) + (no-watchdog-args-vals (filter (lambda (x) x) + (map args:get-arg no-watchdog-args))) + (start-watchdog (and (null? no-watchdog-args-vals) start-watchdog-specail-arg-val))) + ;(print "no-watchdog-args="no-watchdog-args "no-watchdog-args-vals="no-watchdog-args-vals " start-watchdog-specail-arg-val:" start-watchdog-specail-arg-val " start-watchdog:" start-watchdog) + (if start-watchdog + (thread-start! (bdat-watchdog *bdat*))))) + +(define (server:writable-watchdog-deltasync dbstruct) + (thread-sleep! 0.054) ;; delay for startup + (let ((legacy-sync (common:run-sync?)) + (sync-stale-seconds (configf:lookup-number *configdat* "server" "sync-stale-seconds" default: 300)) + (debug-mode (debug:debug-mode 1)) + (last-time (current-seconds)) + (no-sync-db (db:open-no-sync-db)) + (stmt-cache (dbr:dbstruct-stmt-cache dbstruct)) + (sync-duration 0) ;; run time of the sync in milliseconds + ) + (set! *no-sync-db* no-sync-db) ;; make the no sync db available to api calls + (debug:print-info 2 *default-log-port* "Periodic sync thread started.") + (debug:print-info 3 *default-log-port* "watchdog starting. legacy-sync is " legacy-sync" pid="(current-process-id) );; " this-wd-num="this-wd-num) + (if (and legacy-sync (not (bdat-time-to-exit *bdat*))) + (let* (;;(dbstruct (db:setup)) + (mtdb (dbr:dbstruct-mtdb dbstruct)) + (mtpath (db:dbdat-get-path mtdb)) + (tmp-area (common:get-db-tmp-area)) + (start-file (conc tmp-area "/.start-sync")) + (end-file (conc tmp-area "/.end-sync"))) + (debug:print-info 0 *default-log-port* "Server running, periodic sync started.") + (let loop () + ;; sync for filesystem local db writes + ;; + (mutex-lock! *db-multi-sync-mutex*) + (let* ((need-sync (>= *db-last-access* *db-last-sync*)) ;; no sync since last write + (sync-in-progress *db-sync-in-progress*) + (min-intersync-delay (configf:lookup-number *configdat* "server" "minimum-intersync-delay" default: 5)) + (should-sync (and (not (bdat-time-to-exit *bdat*)) + (> (- (current-seconds) *db-last-sync*) min-intersync-delay))) ;; sync every five seconds minimum, deprecated logic, can probably be removed + (start-time (current-seconds)) + (cpu-load-adj (alist-ref 'adj-proc-load (common:get-normalized-cpu-load #f))) + (mt-mod-time (file-modification-time mtpath)) + (last-sync-start (if (common:file-exists? start-file) + (file-modification-time start-file) + 0)) + (last-sync-end (if (common:file-exists? end-file) + (file-modification-time end-file) + 10)) + (sync-period (+ 3 (* cpu-load-adj 30))) ;; as adjusted load increases increase the sync period + (recently-synced (and (< (- start-time mt-mod-time) sync-period) ;; not useful if sync didn't modify megatest.db! + (< mt-mod-time last-sync-start))) + (sync-done (<= last-sync-start last-sync-end)) + (sync-stale (> start-time (+ last-sync-start sync-stale-seconds))) + (will-sync (and (not (bdat-time-to-exit *bdat*)) ;; do not start a sync if we are in the process of exiting + (or need-sync should-sync) + (or sync-done sync-stale) + (not sync-in-progress) + (not recently-synced)))) + (debug:print-info 13 *default-log-port* "WD writable-watchdog top of loop. need-sync="need-sync" sync-in-progress=" sync-in-progress + " should-sync="should-sync" start-time="start-time" mt-mod-time="mt-mod-time" recently-synced="recently-synced" will-sync="will-sync + " sync-done=" sync-done " sync-period=" sync-period) + (if (and (> sync-period 5) + (common:low-noise-print 30 "sync-period")) + (debug:print-info 0 *default-log-port* "Increased sync period due to long sync times, sync took: " sync-period " seconds.")) + ;; (if recently-synced (debug:print-info 0 *default-log-port* "Skipping sync due to recently-synced flag=" recently-synced)) + ;; (debug:print-info 0 *default-log-port* "need-sync: " need-sync " sync-in-progress: " sync-in-progress " should-sync: " should-sync " will-sync: " will-sync) + (if will-sync (set! *db-sync-in-progress* #t)) + (mutex-unlock! *db-multi-sync-mutex*) + (if will-sync + (let (;; (max-sync-duration (configf:lookup-number *configdat* "server" "max-sync-duration")) ;; KEEPING THIS AVAILABLE BUT SHOULD NOT USE, I'M PRETTY SURE IT DOES NOT WORK! + (sync-start (current-milliseconds))) + (with-output-to-file start-file (lambda ()(print (current-process-id)))) + + ;; put lock here + + ;; (if (or (not max-sync-duration) + ;; (< sync-duration max-sync-duration)) ;; NOTE: db:sync-to-megatest.db keeps track of time of last sync and syncs incrementally + (let ((res (db:sync-to-megatest.db dbstruct no-sync-db: no-sync-db))) ;; did we sync any data? If so need to set the db touched flag to keep the server alive + (set! sync-duration (- (current-milliseconds) sync-start)) + (if (> res 0) ;; some records were transferred, keep the db alive + (begin + (mutex-lock! *heartbeat-mutex*) + (set! *db-last-access* (current-seconds)) + (mutex-unlock! *heartbeat-mutex*) + (debug:print-info 0 *default-log-port* "sync called, " res " records transferred.")) + (debug:print-info 2 *default-log-port* "sync called but zero records transferred"))))) +;; ;; TODO: factor this next routine out into a function +;; (with-input-from-pipe ;; this should not block other threads but need to verify this +;; (conc "megatest -sync-to-megatest.db -m testsuite:" (common:get-area-name) ":" *toppath*) +;; (lambda () +;; (let loop ((inl (read-line)) +;; (res #f)) +;; (if (eof-object? inl) +;; (begin +;; (set! sync-duration (- (current-milliseconds) sync-start)) +;; (cond +;; ((not res) +;; (debug:print 0 *default-log-port* "ERROR: sync from /tmp db to megatest.db appears to have failed. Recommended that you stop your runs and run \"megatest -cleanup-db\"")) +;; ((> res 0) +;; (mutex-lock! *heartbeat-mutex*) +;; (set! *db-last-access* (current-seconds)) +;; (mutex-unlock! *heartbeat-mutex*)))) +;; (let ((num-synced (let ((matches (string-match "^Synced (\\d+).*$" inl))) +;; (if matches +;; (string->number (cadr matches)) +;; #f)))) +;; (loop (read-line) +;; (or num-synced res)))))))))) + (if will-sync + (begin + (mutex-lock! *db-multi-sync-mutex*) + (set! *db-sync-in-progress* #f) + (set! *db-last-sync* start-time) + (with-output-to-file end-file (lambda ()(print (current-process-id)))) + + ;; release lock here + + (mutex-unlock! *db-multi-sync-mutex*))) + (if (and debug-mode + (> (- start-time last-time) 60)) + (begin + (set! last-time start-time) + (debug:print-info 4 *default-log-port* "timestamp -> " (seconds->time-string (current-seconds)) ", time since start -> " (seconds->hr-min-sec (- (current-seconds) *time-zero*)))))) + + ;; keep going unless time to exit + ;; + (if (not (bdat-time-to-exit *bdat*)) + (let delay-loop ((count 0)) + ;;(debug:print-info 13 *default-log-port* "delay-loop top; count="count" pid="(current-process-id)" this-wd-num="this-wd-num" *time-to-exit*="(bdat-time-to-exit *bdat*)) + + (if (and (not (bdat-time-to-exit *bdat*)) + (< count 6)) ;; was 11, changing to 4. + (begin + (thread-sleep! 1) + (delay-loop (+ count 1)))) + (if (not (bdat-time-to-exit *bdat*)) (loop)))) + ;; time to exit, close the no-sync db here + (db:no-sync-close-db no-sync-db stmt-cache) + (if (common:low-noise-print 30) + (debug:print-info 0 *default-log-port* "Exiting watchdog timer, *time-to-exit* = " (bdat-time-to-exit *bdat*)" pid="(current-process-id) ))))))) ;;" this-wd-num="this-wd-num))))))) + +(define (server:writable-watchdog-bruteforce dbstruct) + (thread-sleep! 1) ;; delay for startup + (let* ((do-a-sync (server:get-bruteforce-syncer dbstruct)) + (final-sync (server:get-bruteforce-syncer dbstruct fork-to-background: #t persist-until-sync: #t))) + (when (and (not (args:get-arg "-sync-to-megatest.db")) ;; conditions under which we do not run the sync + (args:get-arg "-server")) + + (let loop () + (do-a-sync) + (if (not (bdat-time-to-exit *bdat*)) (loop))) ;; keep going unless time to exit + + ;; time to exit, close the no-sync db here + (final-sync) + + (if (common:low-noise-print 30) + (debug:print-info 0 *default-log-port* "Exiting watchdog timer, *time-to-exit* = "(bdat-time-to-exit *bdat*)" pid="(current-process-id) + ))))) + +;; moving this here as it needs access to db and cannot be in common. +;; + +(define (server:get-bruteforce-syncer dbstruct #!key (fork-to-background #f) (persist-until-sync #f)) + (let* ((sqlite-exe (or (get-environment-variable "MT_SQLITE3_EXE"))) ;; defined in cfg.sh + (sync-log (or (args:get-arg "-sync-log") (conc *toppath* "/logs/sync-" (current-process-id) "-" (get-host-name) ".log"))) + (tmp-area (common:get-db-tmp-area)) + (tmp-db (conc tmp-area "/megatest.db")) + (staging-file (conc *toppath* "/.megatest.db")) + (mtdbfile (conc *toppath* "/megatest.db")) + (lockfile (common:get-sync-lock-filepath)) + (sync-cmd-core (conc sqlite-exe" " tmp-db " .dump | "sqlite-exe" " staging-file "&>"sync-log)) + (sync-cmd (if fork-to-background + (conc "/usr/bin/env NBFAKE_LOG="*toppath*"/logs/last-server-sync-"(current-process-id)".log nbfake \""sync-cmd-core" && /bin/mv -f " staging-file " " mtdbfile" \"") + sync-cmd-core)) + (default-min-intersync-delay 2) + (min-intersync-delay (configf:lookup-number *configdat* "server" "minimum-intersync-delay" default: default-min-intersync-delay)) + (default-duty-cycle 0.1) + (duty-cycle (configf:lookup-number *configdat* "server" "sync-duty-cycle" default: default-duty-cycle)) + (last-sync-seconds 10) ;; we will adjust this to a measurement and delay last-sync-seconds * (1 - duty-cycle) + (calculate-off-time (lambda (work-duration duty-cycle) + (* (/ (- 1 duty-cycle) duty-cycle) last-sync-seconds))) + (off-time min-intersync-delay) ;; adjusted in closure below. + (do-a-sync + (lambda () + ;; (BB> "Start do-a-sync with fork-to-background="fork-to-background" persist-until-sync="persist-until-sync) + (let* ((finalres + (let retry-loop ((num-tries 0)) + (if (common:simple-file-lock lockfile) + (begin + (cond + ((not (or fork-to-background persist-until-sync)) + (debug:print 0 *default-log-port* "INFO: syncer thread sleeping for max of (server.minimum-intersync-delay="min-intersync-delay + " , off-time="off-time" seconds ]") + (thread-sleep! (max off-time min-intersync-delay))) + (else + (debug:print 0 *default-log-port* "INFO: syncer thread NOT sleeping ; maybe time-to-exit..."))) + + (if (not (configf:lookup *configdat* "server" "disable-db-snapshot")) + (common:snapshot-file mtdbfile subdir: ".db-snapshot")) + (delete-file* staging-file) + (let* ((start-time (current-milliseconds)) + (res (system sync-cmd)) + (dbbackupfile (conc mtdbfile ".backup")) + (res2 + (cond + ((eq? 0 res ) + (handle-exceptions + exn + #f + (if (file-exists? dbbackupfile) + (delete-file* dbbackupfile) + ) + (if (eq? 0 (file-size sync-log)) + (delete-file* sync-log)) + (system (conc "/bin/mv " staging-file " " mtdbfile)) + + (set! last-sync-seconds (/ (- (current-milliseconds) start-time) 1000)) + (set! off-time (calculate-off-time + last-sync-seconds + (cond + ((and (number? duty-cycle) (> duty-cycle 0) (< duty-cycle 1)) + duty-cycle) + (else + (debug:print 0 *default-log-port* "WARNING: ["(common:human-time)"] server.sync-duty-cycle is invalid. Should be a number between 0 and 1, but "duty-cycle" was specified. Using default value: "default-duty-cycle) + default-duty-cycle)))) + + (debug:print 1 *default-log-port* "INFO: ["(common:human-time)"] pid="(current-process-id)" SYNC took "last-sync-seconds" sec") + (debug:print 1 *default-log-port* "INFO: ["(common:human-time)"] pid="(current-process-id)" SYNC took "last-sync-seconds" sec ; with duty-cycle of "duty-cycle" off time is now "off-time) + 'sync-completed)) + (else + (system (conc "/bin/cp "sync-log" "sync-log".fail")) + (debug:print 0 *default-log-port* "ERROR: ["(common:human-time)"] Sync failed. See log at "sync-log".fail") + (if (file-exists? (conc mtdbfile ".backup")) + (system (conc "/bin/cp "mtdbfile ".backup " mtdbfile))) + #f)))) + (common:simple-file-release-lock lockfile) + ;; (BB> "released lockfile: " lockfile) + ;; (when (common:file-exists? lockfile) + ;; (BB> "DID NOT ACTUALLY RELEASE LOCKFILE")) + res2) ;; end let + );; end begin + ;; else + (cond + (persist-until-sync + (thread-sleep! 1) + (debug:print 1 *default-log-port* "INFO: ["(common:human-time)"] pid="(current-process-id)" other SYNC in progress; we're in a fork-to-background so we need to succeed. Let's wait a jiffy and and try again. num-tries="num-tries" (waiting for lockfile="lockfile" to disappear)") + (retry-loop (add1 num-tries))) + (else + (thread-sleep! (max off-time (+ last-sync-seconds min-intersync-delay))) + (debug:print 1 *default-log-port* "INFO: ["(common:human-time)"] pid="(current-process-id)" other SYNC in progress; not syncing.") + 'parallel-sync-in-progress)) + ) ;; end if got lockfile + ) + )) + ;; (BB> "End do-a-sync with fork-to-background="fork-to-background" persist-until-sync="persist-until-sync" and result="finalres) + finalres) + ) ;; end lambda + )) + do-a-sync)) ) Index: rmtmod.scm ================================================================== --- rmtmod.scm +++ rmtmod.scm @@ -18,10 +18,11 @@ ;;====================================================================== (declare (unit rmtmod)) (declare (uses commonmod)) +(declare (uses configfmod)) (declare (uses apimod)) (declare (uses itemsmod)) (declare (uses debugprint)) (declare (uses mtver)) (declare (uses tasksmod)) @@ -43,10 +44,14 @@ chicken.sort chicken.time chicken.base chicken.file chicken.format + chicken.process + chicken.file.posix + chicken.process-context.posix + chicken.process-context (prefix sqlite3 sqlite3:) typed-records srfi-1 srfi-13 @@ -62,11 +67,12 @@ (prefix mtargs args:) dbmod http-transportmod servermod clientmod - + configfmod + ) (defstruct alldat (areapath #f) (ulexdat #f) @@ -1574,7 +1580,409 @@ result) (let ((newres (rmt:get-prereqs-not-met run-id waitons ref-item-path mode: mode itemmaps: itemmaps))) (hash-table-set! *pre-reqs-met-cache* key (vector (current-seconds) newres)) newres)))) +;;====================================================================== +;; from metadat lookup MEGATEST_VERSION +;; +(define (common:get-last-run-version) ;; RADT => How does this work in send-receive function??; assume it is the value saved in some DB + (rmt:get-var "MEGATEST_VERSION")) + +(define (common:get-last-run-version-number) + (string->number + (substring (common:get-last-run-version) 0 6))) + +(define (common:set-last-run-version) + (rmt:set-var "MEGATEST_VERSION" (common:version-signature))) + +;;====================================================================== +;; faux-lock is deprecated. Please use simple-lock below +;; +(define (common:faux-lock keyname #!key (wait-time 8)(allow-lock-steal #t)) + (if (rmt:no-sync-get/default keyname #f) ;; do not be tempted to compare to pid. locking is a one-shot action, if already locked for this pid it doesn't actually count + (if (> wait-time 0) + (begin + (thread-sleep! 1) + (if (eq? wait-time 1) ;; only one second left, steal the lock + (begin + (debug:print-info 0 *default-log-port* "stealing lock for " keyname) + (common:faux-unlock keyname force: #t))) + (common:faux-lock keyname wait-time: (- wait-time 1))) + #f) + (begin + (rmt:no-sync-set keyname (conc (current-process-id))) + (equal? (conc (current-process-id)) (conc (rmt:no-sync-get/default keyname #f)))))) + +(define (common:faux-unlock keyname #!key (force #f)) + (if (or force (equal? (conc (current-process-id)) (conc (rmt:no-sync-get/default keyname #f)))) + (begin + (if (rmt:no-sync-get/default keyname #f) (rmt:no-sync-del! keyname)) + #t) + #f)) + +;;====================================================================== +;; postive number if megatest version > db version +;; negative number if megatest version < db version +(define (common:version-db-delta) + (- megatest-version (common:get-last-run-version-number))) + +(define (common:version-changed?) + (not (equal? (common:get-last-run-version) + (common:version-signature)))) + +(define (common:api-changed?) + (not (equal? (substring (->string megatest-version) 0 4) + (substring (conc (common:get-last-run-version)) 0 4)))) + +;;====================================================================== +;; Move me elsewhere ... +;; RADT => Why do we meed the version check here, this is called only if version misma +;; +(define (common:cleanup-db dbstruct #!key (full #f)) + (apply db:multi-db-sync + dbstruct + 'schema + ;; 'new2old + 'killservers + 'adj-target + ;; 'old2new + 'new2old + ;; (if full + '(dejunk) + ;; '()) + ) + (if (common:api-changed?) + (common:set-last-run-version))) + +;; This login does no retries under the hood - it acts a bit like a ping. +;; Deprecated for nmsg-transport. +;; +(define (rmt:login-no-auto-client-setup connection-info) + (rmt:send-receive-no-auto-client-setup connection-info 'login 0 (list *toppath* megatest-version *my-client-signature*))) + +(define (rmt:send-receive-no-auto-client-setup connection-info cmd run-id params) + (let* ((run-id (if run-id run-id 0)) + (res (handle-exceptions + exn + #f + (http-transport:client-api-send-receive run-id connection-info cmd params)))) + (if (and res (vector-ref res 0)) + (vector-ref res 1) ;;; YES!! THIS IS CORRECT!! CHANGE IT HERE, THEN CHANGE rmt:send-receive ALSO!!! + #f))) + +(define (std-exit-procedure) + ;;(common:telemetry-log-close) + (on-exit (lambda () 0)) + ;;(debug:print-info 13 *default-log-port* "std-exit-procedure called; *time-to-exit*="*time-to-exit*) + (let ((no-hurry (if (bdat-time-to-exit *bdat*) ;; hurry up + #f + (begin + (bdat-time-to-exit-set! *bdat* #t) + #t)))) + (debug:print-info 4 *default-log-port* "starting exit process, finalizing databases.") + (if (and no-hurry (debug:debug-mode 18)) + (rmt:print-db-stats)) + (let ((th1 (make-thread (lambda () ;; thread for cleaning up, give it five seconds + (if *dbstruct-db* (db:close-all *dbstruct-db*)) ;; one second allocated + (if (bdat-task-db *bdat*) + (let ((db (cdr (bdat-task-db *bdat*)))) + (if (sqlite3:database? db) + (begin + (sqlite3:interrupt! db) + (sqlite3:finalize! db #t) + ;; (vector-set! (bdat-task-db *bdat*) 0 #f) + (bdat-task-db-set! *bdat* #f))))) + (http-client#close-idle-connections!) + ;; (if (and *runremote* + ;; (remote-conndat *runremote*)) + ;; (begin + ;; (http-client#close-all-connections!))) ;; for http-client + (if (not (eq? *default-log-port* (current-error-port))) + (close-output-port *default-log-port*)) + (set! *default-log-port* (current-error-port))) "Cleanup db exit thread")) + (th2 (make-thread (lambda () + (debug:print 4 *default-log-port* "Attempting clean exit. Please be patient and wait a few seconds...") + (if no-hurry + (begin + (thread-sleep! 5)) ;; give the clean up few seconds to do it's stuff + (begin + (thread-sleep! 2))) + (debug:print 4 *default-log-port* " ... done") + ) + "clean exit"))) + (thread-start! th1) + (thread-start! th2) + (thread-join! th1) + ) + ) + + 0) + +;;====================================================================== +;; Force a megatest cleanup-db if version is changed and skip-version-check not specified +;; Do NOT check if not on homehost! +;; +(define (common:exit-on-version-changed) + (if (common:on-homehost?) + (if (common:api-changed?) + (let* ((mtconf (conc (get-environment-variable "MT_RUN_AREA_HOME") "/megatest.config")) + (dbfile (conc (get-environment-variable "MT_RUN_AREA_HOME") "/megatest.db")) + (read-only (not (file-writable? dbfile))) + (dbstruct (db:setup #t))) + (debug:print 0 *default-log-port* + "WARNING: Version mismatch!\n" + " expected: " (common:version-signature) "\n" + " got: " (common:get-last-run-version)) + (cond + ((get-environment-variable "MT_SKIP_DB_MIGRATE") #t) + ((and (common:file-exists? mtconf) (common:file-exists? dbfile) (not read-only) + (eq? (current-user-id)(file-owner mtconf))) ;; safe to run -cleanup-db + (debug:print 0 *default-log-port* " I see you are the owner of megatest.config, attempting to cleanup and reset to new version") + (handle-exceptions + exn + (begin + (debug:print 0 *default-log-port* "Failed to switch versions. exn=" exn) + (debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn)) + (print-call-chain (current-error-port)) + (exit 1)) + (common:cleanup-db dbstruct))) + ((not (common:file-exists? mtconf)) + (debug:print 0 *default-log-port* " megatest.config does not exist in this area. Cannot proceed with megatest version migration.") + (exit 1)) + ((not (common:file-exists? dbfile)) + (debug:print 0 *default-log-port* " megatest.db does not exist in this area. Cannot proceed with megatest version migration.") + (exit 1)) + ((not (eq? (current-user-id)(file-owner mtconf))) + (debug:print 0 *default-log-port* " You do not own megatest.db in this area. Cannot proceed with megatest version migration.") + (exit 1)) + (read-only + (debug:print 0 *default-log-port* " You have read-only access to this area. Cannot proceed with megatest version migration.") + (exit 1)) + (else + (debug:print 0 *default-log-port* " to switch versions you can run: \"megatest -cleanup-db\"") + (exit 1))))))) +;;====================================================================== +;; (begin +;; (debug:print 0 *default-log-port* "ERROR: cannot migrate version unless on homehost. Exiting.") +;; (exit 1)))) + +(define (common:run-sync?) + (and (common:on-homehost?) + (args:get-arg "-server"))) + + + + +;; this one seems to be the general entry point +;; +(define (server:start-and-wait areapath #!key (timeout 60)) + (let ((give-up-time (+ (current-seconds) timeout))) + (let loop ((server-info (server:check-if-running areapath)) + (try-num 0)) + (if (or server-info + (> (current-seconds) give-up-time)) ;; server-url will be #f if no server available. + (server:record->url server-info) + (let ((num-ok (length (server:get-best (server:get-list areapath))))) + (if (and (> try-num 0) ;; first time through simply wait a little while then try again + (< num-ok 1)) ;; if there are no decent candidates for servers then try starting a new one + (server:kind-run areapath)) + (thread-sleep! 5) + (loop (server:check-if-running areapath) + (+ try-num 1))))))) + +(define (make-and-init-remote) + (make-remote hh-dat: (common:get-homehost) + server-info: (if *toppath* (server:check-if-running *toppath*) #f) + server-timeout: (server:expiration-timeout))) + + + + +;; called in megatest.scm, host-port is string hostname:port +;; +;; NOTE: This is NOT called directly from clients as not all transports support a client running +;; in the same process as the server. +;; +(define (server:ping host-port-in server-id #!key (do-exit #f)) + (let ((host:port (if (not host-port-in) ;; use read-dotserver to find + #f ;; (server:check-if-running *toppath*) + ;; (if (number? host-port-in) ;; we were handed a server-id + ;; (let ((srec (tasks:get-server-by-id (db:delay-if-busy (tasks:open-db)) host-port-in))) + ;; ;; (print "srec: " srec " host-port-in: " host-port-in) + ;; (if srec + ;; (conc (vector-ref srec 3) ":" (vector-ref srec 4)) + ;; (conc "no such server-id " host-port-in))) + host-port-in))) ;; ) + (let* ((host-port (if host:port + (let ((slst (string-split host:port ":"))) + (if (eq? (length slst) 2) + (list (car slst)(string->number (cadr slst))) + #f)) + #f))) +;; (toppath (launch:setup))) + ;; (print "host-port=" host-port) + (if (not host-port) + (begin + (if host-port-in + (debug:print 0 *default-log-port* "ERROR: bad host:port")) + (if do-exit (exit 1)) + #f) + (let* ((iface (car host-port)) + (port (cadr host-port)) + (server-dat (http-transport:client-connect iface port server-id)) + (login-res (rmt:login-no-auto-client-setup server-dat))) + (if (and (list? login-res) + (car login-res)) + (begin + ;; (print "LOGIN_OK") + (if do-exit (exit 0)) + #t) + (begin + ;; (print "LOGIN_FAILED") + (if do-exit (exit 1)) + #f))))))) + +;; ping the given server +;; +(define (server:check-server server-record) + (let* ((server-url (server:record->url server-record)) + (server-id (server:record->id server-record)) + (res (case *transport-type* + ((http)(server:ping server-url server-id)) + ;; ((nmsg)(nmsg-transport:ping (tasks:hostinfo-get-interface server) + ))) + (if res + server-url + #f))) + +;; no longer care if multiple servers are started by accident. older servers will drop off in time. +;; +(define (server:check-if-running areapath) ;; #!key (numservers "2")) + (let* ((ns (server:get-num-servers)) + (servers (server:get-best (server:get-list areapath)))) + (if (or (and servers + (null? servers)) + (not servers) + (and (list? servers) + (< (length servers) (pseudo-random-integer ns)))) ;; somewhere between 0 and numservers + #f + (let loop ((hed (car servers)) + (tal (cdr servers))) + (let ((res (server:check-server hed))) + (if res + hed + (if (null? tal) + #f + (loop (car tal)(cdr tal))))))))) + +;; kind start up of servers, wait 40 seconds before allowing another server for a given +;; run-id to be launched +;; +(define (server:kind-run areapath) + ;; look for $MT_RUN_AREA_HOME/logs/server-start-last + ;; and wait for it to be at least 3 seconds old + (server:wait-for-server-start-last-flag areapath) + (if (not (server:check-if-running areapath)) ;; why try if there is already a server running? + (let* ((last-run-dat (hash-table-ref/default *server-kind-run* areapath '(0 0))) ;; callnum, whenrun + (call-num (car last-run-dat)) + (when-run (cadr last-run-dat)) + (run-delay (+ (case call-num + ((0) 0) + ((1) 20) + ((2) 300) + (else 600)) + (pseudo-random-integer 5))) ;; add a small random number just in case a lot of jobs hit the work hosts simultaneously + (lock-file (conc areapath "/logs/server-start.lock"))) + (if (> (- (current-seconds) when-run) run-delay) + (let* ((start-flag (conc areapath "/logs/server-start-last"))) + (common:simple-file-lock-and-wait lock-file expire-time: 15) + (debug:print-info 0 *default-log-port* "server:kind-run: touching " start-flag) + (system (conc "touch " start-flag)) ;; lazy but safe + (server:run areapath) + (thread-sleep! 2) ;; don't release the lock for at least a few seconds + (common:simple-file-release-lock lock-file))) + (hash-table-set! *server-kind-run* areapath (list (+ call-num 1)(current-seconds)))))) + +#;(define (client:connect iface port) + (http-transport:client-connect iface port) + #;(case (server:get-transport) + ((rpc) (rpc:client-connect iface port)) + ((http) (http:client-connect iface port)) + ((zmq) (zmq:client-connect iface port)) + (else (rpc:client-connect iface port)))) + +(define (client:setup areapath #!key (remaining-tries 100) (failed-connects 0)) + (client:setup-http areapath remaining-tries: remaining-tries failed-connects: failed-connects) + #;(case (server:get-transport) + ((rpc) (rpc-transport:client-setup remaining-tries: remaining-tries failed-connects: failed-connects)) ;;(client:setup-rpc run-id)) + ((http)(client:setup-http areapath remaining-tries: remaining-tries failed-connects: failed-connects)) + (else (rpc-transport:client-setup remaining-tries: remaining-tries failed-connects: failed-connects)))) ;; (client:setup-rpc run-id)))) + +;; Do all the connection work, look up the transport type and set up the +;; connection if required. +;; +;; There are two scenarios. +;; 1. We are a test manager and we received *transport-type* and *runremote* via cmdline +;; 2. We are a run tests, list runs or other interactive process and we must figure out +;; *transport-type* and *runremote* from the monitor.db +;; +;; client:setup +;; +;; lookup_server, need to remove *runremote* stuff +;; + +(define (client:setup-http areapath #!key (remaining-tries 100) (failed-connects 0)(area-dat #f)) + (debug:print-info 2 *default-log-port* "client:setup remaining-tries=" remaining-tries) + (server:start-and-wait areapath) + (if (<= remaining-tries 0) + (begin + (debug:print-error 0 *default-log-port* "failed to start or connect to server") + (exit 1)) + ;; + ;; Alternatively here, we can get the list of candidate servers and work our way + ;; through them searching for a good one. + ;; + (let* ((server-dat (server:get-rand-best areapath)) ;; (server:get-first-best areapath)) + (runremote (or area-dat *runremote*))) + (if (not server-dat) ;; no server found + (client:setup-http areapath remaining-tries: (- remaining-tries 1)) + (let ((host (cadr server-dat)) + (port (caddr server-dat)) + (server-id (caddr (cddr server-dat)))) + (debug:print-info 4 *default-log-port* "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries) + (if (and (not area-dat) + (not *runremote*)) + (begin + (set! *runremote* (make-and-init-remote)) + (let* ((server-info (remote-server-info *runremote*))) + (if server-info + (begin + (remote-server-url-set! *runremote* (server:record->url server-info)) + (remote-server-id-set! *runremote* (server:record->id server-info))))))) + (if (and host port server-id) + (let* ((start-res (case *transport-type* + ((http)(http-transport:client-connect host port server-id)))) + (ping-res (case *transport-type* + ((http)(rmt:login-no-auto-client-setup start-res))))) + (if (and start-res + ping-res) + (let ((runremote (or area-dat *runremote*))) ;; it might have been generated only a few statements ago + (remote-conndat-set! runremote start-res) ;; (hash-table-set! runremote run-id start-res) + (debug:print-info 2 *default-log-port* "connected to " (http-transport:server-dat-make-url start-res)) + start-res) + (begin ;; login failed but have a server record, clean out the record and try again + (debug:print-info 0 *default-log-port* "client:setup, login unsuccessful, will attempt to start server ... start-res=" start-res ", server-dat=" server-dat) ;; had runid. Fixes part of Randy;s ticket 1405717332 + (case *transport-type* + ((http)(http-transport:close-connections))) + (remote-conndat-set! runremote #f) ;; (hash-table-delete! runremote run-id) + (thread-sleep! 1) + (client:setup-http areapath remaining-tries: (- remaining-tries 1)) + ))) + (begin ;; no server registered + ;; (server:kind-run areapath) + (server:start-and-wait areapath) + (debug:print-info 0 *default-log-port* "client:setup, no server registered, remaining-tries=" remaining-tries) + (thread-sleep! 1) ;; (+ 5 (pseudo-random-integer (- 20 remaining-tries)))) ;; give server a little time to start up, randomize a little to avoid start storms. + (client:setup-http areapath remaining-tries: (- remaining-tries 1))))))))) ) DELETED runs-launch-loop-test.scm Index: runs-launch-loop-test.scm ================================================================== --- runs-launch-loop-test.scm +++ /dev/null @@ -1,76 +0,0 @@ -;; Copyright 2006-2017, Matthew Welland. -;; -;; This file is part of Megatest. -;; -;; Megatest is free software: you can redistribute it and/or modify -;; it under the terms of the GNU General Public License as published by -;; the Free Software Foundation, either version 3 of the License, or -;; (at your option) any later version. -;; -;; Megatest is distributed in the hope that it will be useful, -;; but WITHOUT ANY WARRANTY; without even the implied warranty of -;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -;; GNU General Public License for more details. -;; -;; You should have received a copy of the GNU General Public License -;; along with Megatest. If not, see . -;; -(use srfi-69) - -(define (runs:queue-next-hed tal reg n regful) - (if regful - (car reg) - (car tal))) - -(define (runs:queue-next-tal tal reg n regful) - (if regful - tal - (let ((newtal (cdr tal))) - (if (null? newtal) - reg - newtal - )))) - -(define (runs:queue-next-reg tal reg n regful) - (if regful - (cdr reg) - (if (eq? (length tal) 1) - '() - reg))) - -(use trace) -(trace runs:queue-next-hed - runs:queue-next-tal - runs:queue-next-reg) - - -(define tests '(1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20)) - -(define test-registry (make-hash-table)) - -(define n 3) - -(let loop ((hed (car tests)) - (tal (cdr tests)) - (reg '())) - (let* ((reglen (length reg)) - (regful (> reglen n))) - (print "hed=" hed ", length reg=" (length reg) ", (> lenreg n)=" (> (length reg) n)) - (let ((newtal (append tal (list hed)))) ;; used if we are not done with this test - (cond - ((not (hash-table-ref/default test-registry hed #f)) - (hash-table-set! test-registry hed #t) - (print "Registering #" hed) - (if (not (null? tal)) - (loop (runs:queue-next-hed tal reg n regful) - (runs:queue-next-tal tal reg n regful) - (let ((newl (append reg (list hed)))) - (if regful - (cdr newl) - newl))))) - (else - (print "Running #" hed) - (if (not (null? tal)) - (loop (runs:queue-next-hed tal reg n regful) - (runs:queue-next-tal tal reg n regful) - (runs:queue-next-reg tal reg n regful)))))))) Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -47,363 +47,5 @@ ;; P K T S S T U F F ;;====================================================================== ;; ??? -;;====================================================================== -;; S E R V E R -;;====================================================================== - -;; Call this to start the actual server -;; - -;; all routes though here end in exit ... -;; -;; start_server -;; -(define (server:launch run-id transport-type) - (case transport-type - ((http)(http-transport:launch)) - ;;((nmsg)(nmsg-transport:launch run-id)) - ;;((rpc) (rpc-transport:launch run-id)) - (else (debug:print-error 0 *default-log-port* "unknown server type " transport-type)))) - -;;====================================================================== -;; S E R V E R U T I L I T I E S -;;====================================================================== - -;; When using zmq this would send the message back (two step process) -;; with spiffy or rpc this simply returns the return data to be returned -;; -(define (server:reply return-addr query-sig success/fail result) - (debug:print-info 11 *default-log-port* "server:reply return-addr=" return-addr ", result=" result) - ;; (send-message pubsock target send-more: #t) - ;; (send-message pubsock - (case (server:get-transport) - ((rpc) (db:obj->string (vector success/fail query-sig result))) - ((http) (db:obj->string (vector success/fail query-sig result))) - ((fs) result) - (else - (debug:print-error 0 *default-log-port* "unrecognised transport type: " *transport-type*) - result))) - -#;(define (server:record->url servr) - (handle-exceptions - exn - (begin - (debug:print-info 0 *default-log-port* "Unable to get server url from " servr ", exn=" exn) - #f) - (match-let (((mod-time host port start-time server-id pid) - servr)) - (if (and host port) - (conc host ":" port) - #f)))) - -(define (server:get-client-signature) ;; BB> why is this proc named "get-"? it returns nothing -- set! has not return value. - (if *my-client-signature* *my-client-signature* - (let ((sig (server:mk-signature))) - (set! *my-client-signature* sig) - *my-client-signature*))) - -(define server:try-running server:run) ;; there is no more per-run servers ;; REMOVE ME. BUG. - -(define (server:kill servr) - (handle-exceptions - exn - (begin - (debug:print-info 0 *default-log-port* "Unable to get host and/or port from " servr ", exn=" exn) - #f) - (match-let (((mod-time hostname port start-time server-id pid) - servr)) - (tasks:kill-server hostname pid)))) - -;; run ping in separate process, safest way in some cases -;; -(define (server:ping-server ifaceport) - (with-input-from-pipe - (conc (common:get-megatest-exe) " -ping " ifaceport) - (lambda () - (let loop ((inl (read-line)) - (res "NOREPLY")) - (if (eof-object? inl) - (case (string->symbol res) - ((NOREPLY) #f) - ((LOGIN_OK) #t) - (else #f)) - (loop (read-line) inl)))))) - -;; NOT USED (well, ok, reference in rpc-transport but otherwise not used). -;; -(define (server:login toppath) - (lambda (toppath) - (set! *db-last-access* (current-seconds)) ;; might not be needed. - (if (equal? *toppath* toppath) - #t - #f))) - -;; (define server:sync-lock-token "SERVER_SYNC_LOCK") -;; (define (server:release-sync-lock) -;; (db:no-sync-del! *no-sync-db* server:sync-lock-token)) -;; (define (server:have-sync-lock?) -;; (let* ((have-lock-pair (db:no-sync-get-lock *no-sync-db* server:sync-lock-token)) -;; (have-lock? (car have-lock-pair)) -;; (lock-time (cdr have-lock-pair)) -;; (lock-age (- (current-seconds) lock-time))) -;; (cond -;; (have-lock? #t) -;; ((>lock-age -;; (* 3 (configf:lookup-number *configdat* "server" "minimum-intersync-delay" default: 180))) -;; (server:release-sync-lock) -;; (server:have-sync-lock?)) -;; (else #f)))) - -;; moving this here as it needs access to db and cannot be in common. -;; - -(define (server:get-bruteforce-syncer dbstruct #!key (fork-to-background #f) (persist-until-sync #f)) - (let* ((sqlite-exe (or (get-environment-variable "MT_SQLITE3_EXE"))) ;; defined in cfg.sh - (sync-log (or (args:get-arg "-sync-log") (conc *toppath* "/logs/sync-" (current-process-id) "-" (get-host-name) ".log"))) - (tmp-area (common:get-db-tmp-area)) - (tmp-db (conc tmp-area "/megatest.db")) - (staging-file (conc *toppath* "/.megatest.db")) - (mtdbfile (conc *toppath* "/megatest.db")) - (lockfile (common:get-sync-lock-filepath)) - (sync-cmd-core (conc sqlite-exe" " tmp-db " .dump | "sqlite-exe" " staging-file "&>"sync-log)) - (sync-cmd (if fork-to-background - (conc "/usr/bin/env NBFAKE_LOG="*toppath*"/logs/last-server-sync-"(current-process-id)".log nbfake \""sync-cmd-core" && /bin/mv -f " staging-file " " mtdbfile" \"") - sync-cmd-core)) - (default-min-intersync-delay 2) - (min-intersync-delay (configf:lookup-number *configdat* "server" "minimum-intersync-delay" default: default-min-intersync-delay)) - (default-duty-cycle 0.1) - (duty-cycle (configf:lookup-number *configdat* "server" "sync-duty-cycle" default: default-duty-cycle)) - (last-sync-seconds 10) ;; we will adjust this to a measurement and delay last-sync-seconds * (1 - duty-cycle) - (calculate-off-time (lambda (work-duration duty-cycle) - (* (/ (- 1 duty-cycle) duty-cycle) last-sync-seconds))) - (off-time min-intersync-delay) ;; adjusted in closure below. - (do-a-sync - (lambda () - ;; (BB> "Start do-a-sync with fork-to-background="fork-to-background" persist-until-sync="persist-until-sync) - (let* ((finalres - (let retry-loop ((num-tries 0)) - (if (common:simple-file-lock lockfile) - (begin - (cond - ((not (or fork-to-background persist-until-sync)) - (debug:print 0 *default-log-port* "INFO: syncer thread sleeping for max of (server.minimum-intersync-delay="min-intersync-delay - " , off-time="off-time" seconds ]") - (thread-sleep! (max off-time min-intersync-delay))) - (else - (debug:print 0 *default-log-port* "INFO: syncer thread NOT sleeping ; maybe time-to-exit..."))) - - (if (not (configf:lookup *configdat* "server" "disable-db-snapshot")) - (common:snapshot-file mtdbfile subdir: ".db-snapshot")) - (delete-file* staging-file) - (let* ((start-time (current-milliseconds)) - (res (system sync-cmd)) - (dbbackupfile (conc mtdbfile ".backup")) - (res2 - (cond - ((eq? 0 res ) - (handle-exceptions - exn - #f - (if (file-exists? dbbackupfile) - (delete-file* dbbackupfile) - ) - (if (eq? 0 (file-size sync-log)) - (delete-file* sync-log)) - (system (conc "/bin/mv " staging-file " " mtdbfile)) - - (set! last-sync-seconds (/ (- (current-milliseconds) start-time) 1000)) - (set! off-time (calculate-off-time - last-sync-seconds - (cond - ((and (number? duty-cycle) (> duty-cycle 0) (< duty-cycle 1)) - duty-cycle) - (else - (debug:print 0 *default-log-port* "WARNING: ["(common:human-time)"] server.sync-duty-cycle is invalid. Should be a number between 0 and 1, but "duty-cycle" was specified. Using default value: "default-duty-cycle) - default-duty-cycle)))) - - (debug:print 1 *default-log-port* "INFO: ["(common:human-time)"] pid="(current-process-id)" SYNC took "last-sync-seconds" sec") - (debug:print 1 *default-log-port* "INFO: ["(common:human-time)"] pid="(current-process-id)" SYNC took "last-sync-seconds" sec ; with duty-cycle of "duty-cycle" off time is now "off-time) - 'sync-completed)) - (else - (system (conc "/bin/cp "sync-log" "sync-log".fail")) - (debug:print 0 *default-log-port* "ERROR: ["(common:human-time)"] Sync failed. See log at "sync-log".fail") - (if (file-exists? (conc mtdbfile ".backup")) - (system (conc "/bin/cp "mtdbfile ".backup " mtdbfile))) - #f)))) - (common:simple-file-release-lock lockfile) - ;; (BB> "released lockfile: " lockfile) - ;; (when (common:file-exists? lockfile) - ;; (BB> "DID NOT ACTUALLY RELEASE LOCKFILE")) - res2) ;; end let - );; end begin - ;; else - (cond - (persist-until-sync - (thread-sleep! 1) - (debug:print 1 *default-log-port* "INFO: ["(common:human-time)"] pid="(current-process-id)" other SYNC in progress; we're in a fork-to-background so we need to succeed. Let's wait a jiffy and and try again. num-tries="num-tries" (waiting for lockfile="lockfile" to disappear)") - (retry-loop (add1 num-tries))) - (else - (thread-sleep! (max off-time (+ last-sync-seconds min-intersync-delay))) - (debug:print 1 *default-log-port* "INFO: ["(common:human-time)"] pid="(current-process-id)" other SYNC in progress; not syncing.") - 'parallel-sync-in-progress)) - ) ;; end if got lockfile - ) - )) - ;; (BB> "End do-a-sync with fork-to-background="fork-to-background" persist-until-sync="persist-until-sync" and result="finalres) - finalres) - ) ;; end lambda - )) - do-a-sync)) - -(define (server:writable-watchdog-bruteforce dbstruct) - (thread-sleep! 1) ;; delay for startup - (let* ((do-a-sync (server:get-bruteforce-syncer dbstruct)) - (final-sync (server:get-bruteforce-syncer dbstruct fork-to-background: #t persist-until-sync: #t))) - (when (and (not (args:get-arg "-sync-to-megatest.db")) ;; conditions under which we do not run the sync - (args:get-arg "-server")) - - (let loop () - (do-a-sync) - (if (not (bdat-time-to-exit *bdat*)) (loop))) ;; keep going unless time to exit - - ;; time to exit, close the no-sync db here - (final-sync) - - (if (common:low-noise-print 30) - (debug:print-info 0 *default-log-port* "Exiting watchdog timer, *time-to-exit* = "(bdat-time-to-exit *bdat*)" pid="(current-process-id) - ))))) - -(define (server:writable-watchdog-deltasync dbstruct) - (thread-sleep! 0.054) ;; delay for startup - (let ((legacy-sync (common:run-sync?)) - (sync-stale-seconds (configf:lookup-number *configdat* "server" "sync-stale-seconds" default: 300)) - (debug-mode (debug:debug-mode 1)) - (last-time (current-seconds)) - (no-sync-db (db:open-no-sync-db)) - (stmt-cache (dbr:dbstruct-stmt-cache dbstruct)) - (sync-duration 0) ;; run time of the sync in milliseconds - ) - (set! *no-sync-db* no-sync-db) ;; make the no sync db available to api calls - (debug:print-info 2 *default-log-port* "Periodic sync thread started.") - (debug:print-info 3 *default-log-port* "watchdog starting. legacy-sync is " legacy-sync" pid="(current-process-id) );; " this-wd-num="this-wd-num) - (if (and legacy-sync (not (bdat-time-to-exit *bdat*))) - (let* (;;(dbstruct (db:setup)) - (mtdb (dbr:dbstruct-mtdb dbstruct)) - (mtpath (db:dbdat-get-path mtdb)) - (tmp-area (common:get-db-tmp-area)) - (start-file (conc tmp-area "/.start-sync")) - (end-file (conc tmp-area "/.end-sync"))) - (debug:print-info 0 *default-log-port* "Server running, periodic sync started.") - (let loop () - ;; sync for filesystem local db writes - ;; - (mutex-lock! *db-multi-sync-mutex*) - (let* ((need-sync (>= *db-last-access* *db-last-sync*)) ;; no sync since last write - (sync-in-progress *db-sync-in-progress*) - (min-intersync-delay (configf:lookup-number *configdat* "server" "minimum-intersync-delay" default: 5)) - (should-sync (and (not (bdat-time-to-exit *bdat*)) - (> (- (current-seconds) *db-last-sync*) min-intersync-delay))) ;; sync every five seconds minimum, deprecated logic, can probably be removed - (start-time (current-seconds)) - (cpu-load-adj (alist-ref 'adj-proc-load (common:get-normalized-cpu-load #f))) - (mt-mod-time (file-modification-time mtpath)) - (last-sync-start (if (common:file-exists? start-file) - (file-modification-time start-file) - 0)) - (last-sync-end (if (common:file-exists? end-file) - (file-modification-time end-file) - 10)) - (sync-period (+ 3 (* cpu-load-adj 30))) ;; as adjusted load increases increase the sync period - (recently-synced (and (< (- start-time mt-mod-time) sync-period) ;; not useful if sync didn't modify megatest.db! - (< mt-mod-time last-sync-start))) - (sync-done (<= last-sync-start last-sync-end)) - (sync-stale (> start-time (+ last-sync-start sync-stale-seconds))) - (will-sync (and (not (bdat-time-to-exit *bdat*)) ;; do not start a sync if we are in the process of exiting - (or need-sync should-sync) - (or sync-done sync-stale) - (not sync-in-progress) - (not recently-synced)))) - (debug:print-info 13 *default-log-port* "WD writable-watchdog top of loop. need-sync="need-sync" sync-in-progress=" sync-in-progress - " should-sync="should-sync" start-time="start-time" mt-mod-time="mt-mod-time" recently-synced="recently-synced" will-sync="will-sync - " sync-done=" sync-done " sync-period=" sync-period) - (if (and (> sync-period 5) - (common:low-noise-print 30 "sync-period")) - (debug:print-info 0 *default-log-port* "Increased sync period due to long sync times, sync took: " sync-period " seconds.")) - ;; (if recently-synced (debug:print-info 0 *default-log-port* "Skipping sync due to recently-synced flag=" recently-synced)) - ;; (debug:print-info 0 *default-log-port* "need-sync: " need-sync " sync-in-progress: " sync-in-progress " should-sync: " should-sync " will-sync: " will-sync) - (if will-sync (set! *db-sync-in-progress* #t)) - (mutex-unlock! *db-multi-sync-mutex*) - (if will-sync - (let (;; (max-sync-duration (configf:lookup-number *configdat* "server" "max-sync-duration")) ;; KEEPING THIS AVAILABLE BUT SHOULD NOT USE, I'M PRETTY SURE IT DOES NOT WORK! - (sync-start (current-milliseconds))) - (with-output-to-file start-file (lambda ()(print (current-process-id)))) - - ;; put lock here - - ;; (if (or (not max-sync-duration) - ;; (< sync-duration max-sync-duration)) ;; NOTE: db:sync-to-megatest.db keeps track of time of last sync and syncs incrementally - (let ((res (db:sync-to-megatest.db dbstruct no-sync-db: no-sync-db))) ;; did we sync any data? If so need to set the db touched flag to keep the server alive - (set! sync-duration (- (current-milliseconds) sync-start)) - (if (> res 0) ;; some records were transferred, keep the db alive - (begin - (mutex-lock! *heartbeat-mutex*) - (set! *db-last-access* (current-seconds)) - (mutex-unlock! *heartbeat-mutex*) - (debug:print-info 0 *default-log-port* "sync called, " res " records transferred.")) - (debug:print-info 2 *default-log-port* "sync called but zero records transferred"))))) -;; ;; TODO: factor this next routine out into a function -;; (with-input-from-pipe ;; this should not block other threads but need to verify this -;; (conc "megatest -sync-to-megatest.db -m testsuite:" (common:get-area-name) ":" *toppath*) -;; (lambda () -;; (let loop ((inl (read-line)) -;; (res #f)) -;; (if (eof-object? inl) -;; (begin -;; (set! sync-duration (- (current-milliseconds) sync-start)) -;; (cond -;; ((not res) -;; (debug:print 0 *default-log-port* "ERROR: sync from /tmp db to megatest.db appears to have failed. Recommended that you stop your runs and run \"megatest -cleanup-db\"")) -;; ((> res 0) -;; (mutex-lock! *heartbeat-mutex*) -;; (set! *db-last-access* (current-seconds)) -;; (mutex-unlock! *heartbeat-mutex*)))) -;; (let ((num-synced (let ((matches (string-match "^Synced (\\d+).*$" inl))) -;; (if matches -;; (string->number (cadr matches)) -;; #f)))) -;; (loop (read-line) -;; (or num-synced res)))))))))) - (if will-sync - (begin - (mutex-lock! *db-multi-sync-mutex*) - (set! *db-sync-in-progress* #f) - (set! *db-last-sync* start-time) - (with-output-to-file end-file (lambda ()(print (current-process-id)))) - - ;; release lock here - - (mutex-unlock! *db-multi-sync-mutex*))) - (if (and debug-mode - (> (- start-time last-time) 60)) - (begin - (set! last-time start-time) - (debug:print-info 4 *default-log-port* "timestamp -> " (seconds->time-string (current-seconds)) ", time since start -> " (seconds->hr-min-sec (- (current-seconds) *time-zero*)))))) - - ;; keep going unless time to exit - ;; - (if (not (bdat-time-to-exit *bdat*)) - (let delay-loop ((count 0)) - ;;(debug:print-info 13 *default-log-port* "delay-loop top; count="count" pid="(current-process-id)" this-wd-num="this-wd-num" *time-to-exit*="(bdat-time-to-exit *bdat*)) - - (if (and (not (bdat-time-to-exit *bdat*)) - (< count 6)) ;; was 11, changing to 4. - (begin - (thread-sleep! 1) - (delay-loop (+ count 1)))) - (if (not (bdat-time-to-exit *bdat*)) (loop)))) - ;; time to exit, close the no-sync db here - (db:no-sync-close-db no-sync-db stmt-cache) - (if (common:low-noise-print 30) - (debug:print-info 0 *default-log-port* "Exiting watchdog timer, *time-to-exit* = " (bdat-time-to-exit *bdat*)" pid="(current-process-id) ))))))) ;;" this-wd-num="this-wd-num))))))) - Index: servermod.scm ================================================================== --- servermod.scm +++ servermod.scm @@ -60,15 +60,10 @@ (define (server:make-server-url hostport) (if (not hostport) #f (conc "http://" (car hostport) ":" (cadr hostport)))) -(define (make-and-init-remote) - (make-remote hh-dat: (common:get-homehost) - server-info: (if *toppath* (server:check-if-running *toppath*) #f) - server-timeout: (server:expiration-timeout))) - ;;====================================================================== ;; logic for getting homehost. Returns (host . at-home) ;; IF *toppath* is not set, wait up to five seconds trying every two seconds ;; (this is to accomodate the watchdog) ;; @@ -142,37 +137,10 @@ (common:get-homehost))) (hh (if hh-dat (car hh-dat) #f))) (common:wait-for-normalized-load maxnormload msg hh))) -;; kind start up of servers, wait 40 seconds before allowing another server for a given -;; run-id to be launched -;; -(define (server:kind-run areapath) - ;; look for $MT_RUN_AREA_HOME/logs/server-start-last - ;; and wait for it to be at least 3 seconds old - (server:wait-for-server-start-last-flag areapath) - (if (not (server:check-if-running areapath)) ;; why try if there is already a server running? - (let* ((last-run-dat (hash-table-ref/default *server-kind-run* areapath '(0 0))) ;; callnum, whenrun - (call-num (car last-run-dat)) - (when-run (cadr last-run-dat)) - (run-delay (+ (case call-num - ((0) 0) - ((1) 20) - ((2) 300) - (else 600)) - (pseudo-random-integer 5))) ;; add a small random number just in case a lot of jobs hit the work hosts simultaneously - (lock-file (conc areapath "/logs/server-start.lock"))) - (if (> (- (current-seconds) when-run) run-delay) - (let* ((start-flag (conc areapath "/logs/server-start-last"))) - (common:simple-file-lock-and-wait lock-file expire-time: 15) - (debug:print-info 0 *default-log-port* "server:kind-run: touching " start-flag) - (system (conc "touch " start-flag)) ;; lazy but safe - (server:run areapath) - (thread-sleep! 2) ;; don't release the lock for at least a few seconds - (common:simple-file-release-lock lock-file))) - (hash-table-set! *server-kind-run* areapath (list (+ call-num 1)(current-seconds)))))) ;; Given a run id start a server process ### NOTE ### > file 2>&1 ;; if the run-id is zero and the target-host is set ;; try running on that host ;; incidental: rotate logs in logs/ dir. ;; @@ -221,44 +189,10 @@ (unsetenv "TARGETHOST_LOGF") (if (get-environment-variable "TARGETHOST")(unsetenv "TARGETHOST")) (thread-join! log-rotate) (pop-directory))) -;; no longer care if multiple servers are started by accident. older servers will drop off in time. -;; -(define (server:check-if-running areapath) ;; #!key (numservers "2")) - (let* ((ns (server:get-num-servers)) - (servers (server:get-best (server:get-list areapath)))) - (if (or (and servers - (null? servers)) - (not servers) - (and (list? servers) - (< (length servers) (pseudo-random-integer ns)))) ;; somewhere between 0 and numservers - #f - (let loop ((hed (car servers)) - (tal (cdr servers))) - (let ((res (server:check-server hed))) - (if res - hed - (if (null? tal) - #f - (loop (car tal)(cdr tal))))))))) - - -;; ping the given server -;; -(define (server:check-server server-record) - (let* ((server-url (server:record->url server-record)) - (server-id (server:record->id server-record)) - (res (case *transport-type* - ((http)(server:ping server-url server-id)) - ;; ((nmsg)(nmsg-transport:ping (tasks:hostinfo-get-interface server) - ))) - (if res - server-url - #f))) - (define (server:record->url servr) (handle-exceptions exn (begin (debug:print-info 0 *default-log-port* "Unable to get server url from " servr ", exn=" exn) @@ -296,25 +230,17 @@ (debug:print-info 0 *default-log-port* "Gating server start, last start: " fmodtime ", delta: " delta ", reftime: " reftime ", all-go=" all-go) (thread-sleep! reftime) (server:wait-for-server-start-last-flag areapath))))))) - -;; this one seems to be the general entry point -;; -(define (server:start-and-wait areapath #!key (timeout 60)) - (let ((give-up-time (+ (current-seconds) timeout))) - (let loop ((server-info (server:check-if-running areapath)) - (try-num 0)) - (if (or server-info - (> (current-seconds) give-up-time)) ;; server-url will be #f if no server available. - (server:record->url server-info) - (let ((num-ok (length (server:get-best (server:get-list areapath))))) - (if (and (> try-num 0) ;; first time through simply wait a little while then try again - (< num-ok 1)) ;; if there are no decent candidates for servers then try starting a new one - (server:kind-run areapath)) - (thread-sleep! 5) - (loop (server:check-if-running areapath) - (+ try-num 1))))))) +(define (server:kill servr) + (handle-exceptions + exn + (begin + (debug:print-info 0 *default-log-port* "Unable to get host and/or port from " servr ", exn=" exn) + #f) + (match-let (((mod-time hostname port start-time server-id pid) + servr)) + (tasks:kill-server hostname pid)))) )