Index: apimod.scm
==================================================================
--- apimod.scm
+++ apimod.scm
@@ -32,10 +32,12 @@
chicken.base
chicken.process-context.posix
chicken.string
chicken.time
chicken.condition
+ chicken.process
+ chicken.random
;; (prefix sqlite3 sqlite3:)
typed-records
srfi-18
srfi-69
@@ -201,11 +203,11 @@
;;===============================================
((get-keys-write) (db:get-keys dbstruct)) ;; force a dummy "write" query to force server; for debug in -repl
;; SERVERS
- ((start-server) (apply server:kind-run params))
+ ;; ((start-server) (apply server:kind-run params))
((kill-server) (set! *server-run* #f))
;; TESTS
;;((test-set-state-status-by-id) (apply mt:test-set-state-status-by-id dbstruct params))
@@ -428,7 +430,8 @@
;; (list "ERROR, not string, list, number or boolean" 1 cmd params res)))))
(db:obj->string res transport: 'http)))
(begin
(debug:print 0 *default-log-port* "Server refused to process request. Sever id mismatch. recived " key " expected: " *server-id* ".\nOther arguments recived: cmd=" cmd " params = " params)
(db:obj->string (conc "Server refused to process request server-id mismatch: " key ", " *server-id*) transport: 'http)))))
+
)
ADDED attic/runs-launch-loop-test.scm
Index: attic/runs-launch-loop-test.scm
==================================================================
--- /dev/null
+++ attic/runs-launch-loop-test.scm
@@ -0,0 +1,76 @@
+;; Copyright 2006-2017, Matthew Welland.
+;;
+;; This file is part of Megatest.
+;;
+;; Megatest is free software: you can redistribute it and/or modify
+;; it under the terms of the GNU General Public License as published by
+;; the Free Software Foundation, either version 3 of the License, or
+;; (at your option) any later version.
+;;
+;; Megatest is distributed in the hope that it will be useful,
+;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+;; GNU General Public License for more details.
+;;
+;; You should have received a copy of the GNU General Public License
+;; along with Megatest. If not, see .
+;;
+(use srfi-69)
+
+(define (runs:queue-next-hed tal reg n regful)
+ (if regful
+ (car reg)
+ (car tal)))
+
+(define (runs:queue-next-tal tal reg n regful)
+ (if regful
+ tal
+ (let ((newtal (cdr tal)))
+ (if (null? newtal)
+ reg
+ newtal
+ ))))
+
+(define (runs:queue-next-reg tal reg n regful)
+ (if regful
+ (cdr reg)
+ (if (eq? (length tal) 1)
+ '()
+ reg)))
+
+(use trace)
+(trace runs:queue-next-hed
+ runs:queue-next-tal
+ runs:queue-next-reg)
+
+
+(define tests '(1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20))
+
+(define test-registry (make-hash-table))
+
+(define n 3)
+
+(let loop ((hed (car tests))
+ (tal (cdr tests))
+ (reg '()))
+ (let* ((reglen (length reg))
+ (regful (> reglen n)))
+ (print "hed=" hed ", length reg=" (length reg) ", (> lenreg n)=" (> (length reg) n))
+ (let ((newtal (append tal (list hed)))) ;; used if we are not done with this test
+ (cond
+ ((not (hash-table-ref/default test-registry hed #f))
+ (hash-table-set! test-registry hed #t)
+ (print "Registering #" hed)
+ (if (not (null? tal))
+ (loop (runs:queue-next-hed tal reg n regful)
+ (runs:queue-next-tal tal reg n regful)
+ (let ((newl (append reg (list hed))))
+ (if regful
+ (cdr newl)
+ newl)))))
+ (else
+ (print "Running #" hed)
+ (if (not (null? tal))
+ (loop (runs:queue-next-hed tal reg n regful)
+ (runs:queue-next-tal tal reg n regful)
+ (runs:queue-next-reg tal reg n regful))))))))
Index: clientmod.scm
==================================================================
--- clientmod.scm
+++ clientmod.scm
@@ -87,88 +87,7 @@
#;(define (client:logout serverdat)
(let ((ok (and (socket? serverdat)
(cdb:logout serverdat *toppath* (client:get-signature)))))
ok))
-#;(define (client:connect iface port)
- (http-transport:client-connect iface port)
- #;(case (server:get-transport)
- ((rpc) (rpc:client-connect iface port))
- ((http) (http:client-connect iface port))
- ((zmq) (zmq:client-connect iface port))
- (else (rpc:client-connect iface port))))
-
-(define (client:setup areapath #!key (remaining-tries 100) (failed-connects 0))
- (client:setup-http areapath remaining-tries: remaining-tries failed-connects: failed-connects)
- #;(case (server:get-transport)
- ((rpc) (rpc-transport:client-setup remaining-tries: remaining-tries failed-connects: failed-connects)) ;;(client:setup-rpc run-id))
- ((http)(client:setup-http areapath remaining-tries: remaining-tries failed-connects: failed-connects))
- (else (rpc-transport:client-setup remaining-tries: remaining-tries failed-connects: failed-connects)))) ;; (client:setup-rpc run-id))))
-
-;; Do all the connection work, look up the transport type and set up the
-;; connection if required.
-;;
-;; There are two scenarios.
-;; 1. We are a test manager and we received *transport-type* and *runremote* via cmdline
-;; 2. We are a run tests, list runs or other interactive process and we must figure out
-;; *transport-type* and *runremote* from the monitor.db
-;;
-;; client:setup
-;;
-;; lookup_server, need to remove *runremote* stuff
-;;
-
-(define (client:setup-http areapath #!key (remaining-tries 100) (failed-connects 0)(area-dat #f))
- (debug:print-info 2 *default-log-port* "client:setup remaining-tries=" remaining-tries)
- (server:start-and-wait areapath)
- (if (<= remaining-tries 0)
- (begin
- (debug:print-error 0 *default-log-port* "failed to start or connect to server")
- (exit 1))
- ;;
- ;; Alternatively here, we can get the list of candidate servers and work our way
- ;; through them searching for a good one.
- ;;
- (let* ((server-dat (server:get-rand-best areapath)) ;; (server:get-first-best areapath))
- (runremote (or area-dat *runremote*)))
- (if (not server-dat) ;; no server found
- (client:setup-http areapath remaining-tries: (- remaining-tries 1))
- (let ((host (cadr server-dat))
- (port (caddr server-dat))
- (server-id (caddr (cddr server-dat))))
- (debug:print-info 4 *default-log-port* "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries)
- (if (and (not area-dat)
- (not *runremote*))
- (begin
- (set! *runremote* (make-and-init-remote))
- (let* ((server-info (remote-server-info *runremote*)))
- (if server-info
- (begin
- (remote-server-url-set! *runremote* (server:record->url server-info))
- (remote-server-id-set! *runremote* (server:record->id server-info)))))))
- (if (and host port server-id)
- (let* ((start-res (case *transport-type*
- ((http)(http-transport:client-connect host port server-id))))
- (ping-res (case *transport-type*
- ((http)(rmt:login-no-auto-client-setup start-res)))))
- (if (and start-res
- ping-res)
- (let ((runremote (or area-dat *runremote*))) ;; it might have been generated only a few statements ago
- (remote-conndat-set! runremote start-res) ;; (hash-table-set! runremote run-id start-res)
- (debug:print-info 2 *default-log-port* "connected to " (http-transport:server-dat-make-url start-res))
- start-res)
- (begin ;; login failed but have a server record, clean out the record and try again
- (debug:print-info 0 *default-log-port* "client:setup, login unsuccessful, will attempt to start server ... start-res=" start-res ", server-dat=" server-dat) ;; had runid. Fixes part of Randy;s ticket 1405717332
- (case *transport-type*
- ((http)(http-transport:close-connections)))
- (remote-conndat-set! runremote #f) ;; (hash-table-delete! runremote run-id)
- (thread-sleep! 1)
- (client:setup-http areapath remaining-tries: (- remaining-tries 1))
- )))
- (begin ;; no server registered
- ;; (server:kind-run areapath)
- (server:start-and-wait areapath)
- (debug:print-info 0 *default-log-port* "client:setup, no server registered, remaining-tries=" remaining-tries)
- (thread-sleep! 1) ;; (+ 5 (pseudo-random-integer (- 20 remaining-tries)))) ;; give server a little time to start up, randomize a little to avoid start storms.
- (client:setup-http areapath remaining-tries: (- remaining-tries 1)))))))))
)
Index: common.scm
==================================================================
--- common.scm
+++ common.scm
@@ -42,416 +42,5 @@
;; (define (exit . code)
;; (if (null? code)
;; (old-exit)
;; (old-exit code)))
-
-(define (common:with-queue-db mtconf proc #!key (use-lt #f)(toppath-in #f))
- (let* ((pktsdirs (common:get-pkts-dirs mtconf use-lt))
- (pktsdir (if pktsdirs (car pktsdirs) #f))
- (toppath (or (configf:lookup mtconf "scratchdat" "toppath")
- toppath-in))
- (pdbpath (or (configf:lookup mtconf "setup" "pdbpath") pktsdir)))
- (cond
- ((not (and pktsdir toppath pdbpath))
- (debug:print 0 *default-log-port* "ERROR: settings are missing in your megatest.config for area management.")
- (debug:print 0 *default-log-port* " you need to have pktsdirs in the [setup] section."))
- ((not (common:file-exists? pktsdir))
- (debug:print 0 *default-log-port* "ERROR: pkts directory not found " pktsdir))
- ((not (equal? (file-owner pktsdir)(current-effective-user-id)))
- (debug:print 0 *default-log-port* "ERROR: directory " pktsdir " is not owned by " (current-effective-user-name)))
- (else
- (let* ((pdb (open-queue-db pdbpath "pkts.db"
- schema: '("CREATE TABLE groups (id INTEGER PRIMARY KEY,groupname TEXT, CONSTRAINT group_constraint UNIQUE (groupname));"))))
- (proc pktsdirs pktsdir pdb)
- (dbi:close pdb))))))
-
-(define (common:load-pkts-to-db mtconf #!key (use-lt #f))
- (common:with-queue-db
- mtconf
- (lambda (pktsdirs pktsdir pdb)
- (for-each
- (lambda (pktsdir) ;; look at all
- (cond
- ((not (common:file-exists? pktsdir))
- (debug:print 0 *default-log-port* "ERROR: packets directory " pktsdir " does not exist."))
- ((not (directory? pktsdir))
- (debug:print 0 *default-log-port* "ERROR: packets directory path " pktsdir " is not a directory."))
- ((not (file-readable? pktsdir))
- (debug:print 0 *default-log-port* "ERROR: packets directory path " pktsdir " is not readable."))
- (else
- (debug:print-info 0 *default-log-port* "Loading packets found in " pktsdir)
- (let ((pkts (glob (conc pktsdir "/*.pkt"))))
- (for-each
- (lambda (pkt)
- (let* ((uuid (cadr (string-match ".*/([0-9a-f]+).pkt" pkt)))
- (exists (lookup-by-uuid pdb uuid #f)))
- (if (not exists)
- (let* ((pktdat (string-intersperse
- (with-input-from-file pkt read-lines)
- "\n"))
- (apkt (pkt->alist pktdat))
- (ptype (alist-ref 'T apkt)))
- (add-to-queue pdb pktdat uuid (or ptype 'cmd) #f 0)
- (debug:print 4 *default-log-port* "Added " uuid " of type " ptype " to queue"))
- (debug:print 4 *default-log-port* "pkt: " uuid " exists, skipping...")
- )))
- pkts)))))
- pktsdirs))
- use-lt: use-lt))
-
-
-;;======================================================================
-;; N A N O M S G C L I E N T
-;;======================================================================
-;;
-;;
-;;
-;; (define (common:send-dboard-main-changed)
-;; (let* ((dashboard-ips (mddb:get-dashboards)))
-;; (for-each
-;; (lambda (ipadr)
-;; (let* ((soc (common:open-nm-req (conc "tcp://" ipadr)))
-;; (msg (conc "main " *toppath*))
-;; (res (common:nm-send-receive-timeout soc msg)))
-;; (if (not res) ;; couldn't reach that dashboard - remove it from db
-;; (print "ERROR: couldn't reach dashboard " ipadr))
-;; res))
-;; dashboard-ips)))
-;;
-;;
-;; ;;======================================================================
-;; ;; D A S H B O A R D D B
-;; ;;======================================================================
-;;
-;; (define (mddb:open-db)
-;; (let* ((db (open-database (conc (get-environment-variable "HOME") "/.dashboard.db"))))
-;; (set-busy-handler! db (busy-timeout 10000))
-;; (for-each
-;; (lambda (qry)
-;; (exec (sql db qry)))
-;; (list
-;; "CREATE TABLE IF NOT EXISTS vars (id INTEGER PRIMARY KEY,key TEXT, val TEXT, CONSTRAINT varsconstraint UNIQUE (key));"
-;; "CREATE TABLE IF NOT EXISTS dashboards (
-;; id INTEGER PRIMARY KEY,
-;; pid INTEGER,
-;; username TEXT,
-;; hostname TEXT,
-;; ipaddr TEXT,
-;; portnum INTEGER,
-;; start_time TIMESTAMP DEFAULT (strftime('%s','now')),
-;; CONSTRAINT hostport UNIQUE (hostname,portnum)
-;; );"
-;; ))
-;; db))
-;;
-;; ;; register a dashboard
-;; ;;
-;; (define (mddb:register-dashboard port)
-;; (let* ((pid (current-process-id))
-;; (hostname (get-host-name))
-;; (ipaddr (server:get-best-guess-address hostname))
-;; (username (current-user-name)) ;; (car userinfo)))
-;; (db (mddb:open-db)))
-;; (print "Register monitor, pid: " pid ", hostname: " hostname ", port: " port ", username: " username)
-;; (exec (sql db "INSERT OR REPLACE INTO dashboards (pid,username,hostname,ipaddr,portnum) VALUES (?,?,?,?,?);")
-;; pid username hostname ipaddr port)
-;; (close-database db)))
-;;
-;; ;; unregister a monitor
-;; ;;
-;; (define (mddb:unregister-dashboard host port)
-;; (let* ((db (mddb:open-db)))
-;; (print "Register unregister monitor, host:port=" host ":" port)
-;; (exec (sql db "DELETE FROM dashboards WHERE hostname=? AND portnum=?;") host port)
-;; (close-database db)))
-;;
-;; ;; get registered dashboards
-;; ;;
-;; (define (mddb:get-dashboards)
-;; (let ((db (mddb:open-db)))
-;; (query fetch-column
-;; (sql db "SELECT ipaddr || ':' || portnum FROM dashboards;"))))
-
-;;======================================================================
-;; faux-lock is deprecated. Please use simple-lock below
-;;
-(define (common:faux-lock keyname #!key (wait-time 8)(allow-lock-steal #t))
- (if (rmt:no-sync-get/default keyname #f) ;; do not be tempted to compare to pid. locking is a one-shot action, if already locked for this pid it doesn't actually count
- (if (> wait-time 0)
- (begin
- (thread-sleep! 1)
- (if (eq? wait-time 1) ;; only one second left, steal the lock
- (begin
- (debug:print-info 0 *default-log-port* "stealing lock for " keyname)
- (common:faux-unlock keyname force: #t)))
- (common:faux-lock keyname wait-time: (- wait-time 1)))
- #f)
- (begin
- (rmt:no-sync-set keyname (conc (current-process-id)))
- (equal? (conc (current-process-id)) (conc (rmt:no-sync-get/default keyname #f))))))
-
-(define (common:faux-unlock keyname #!key (force #f))
- (if (or force (equal? (conc (current-process-id)) (conc (rmt:no-sync-get/default keyname #f))))
- (begin
- (if (rmt:no-sync-get/default keyname #f) (rmt:no-sync-del! keyname))
- #t)
- #f))
-
-(define (std-exit-procedure)
- ;;(common:telemetry-log-close)
- (on-exit (lambda () 0))
- ;;(debug:print-info 13 *default-log-port* "std-exit-procedure called; *time-to-exit*="*time-to-exit*)
- (let ((no-hurry (if (bdat-time-to-exit *bdat*) ;; hurry up
- #f
- (begin
- (bdat-time-to-exit-set! *bdat* #t)
- #t))))
- (debug:print-info 4 *default-log-port* "starting exit process, finalizing databases.")
- (if (and no-hurry (debug:debug-mode 18))
- (rmt:print-db-stats))
- (let ((th1 (make-thread (lambda () ;; thread for cleaning up, give it five seconds
- (if *dbstruct-db* (db:close-all *dbstruct-db*)) ;; one second allocated
- (if (bdat-task-db *bdat*)
- (let ((db (cdr (bdat-task-db *bdat*))))
- (if (sqlite3:database? db)
- (begin
- (sqlite3:interrupt! db)
- (sqlite3:finalize! db #t)
- ;; (vector-set! (bdat-task-db *bdat*) 0 #f)
- (bdat-task-db-set! *bdat* #f)))))
- (http-client#close-idle-connections!)
- ;; (if (and *runremote*
- ;; (remote-conndat *runremote*))
- ;; (begin
- ;; (http-client#close-all-connections!))) ;; for http-client
- (if (not (eq? *default-log-port* (current-error-port)))
- (close-output-port *default-log-port*))
- (set! *default-log-port* (current-error-port))) "Cleanup db exit thread"))
- (th2 (make-thread (lambda ()
- (debug:print 4 *default-log-port* "Attempting clean exit. Please be patient and wait a few seconds...")
- (if no-hurry
- (begin
- (thread-sleep! 5)) ;; give the clean up few seconds to do it's stuff
- (begin
- (thread-sleep! 2)))
- (debug:print 4 *default-log-port* " ... done")
- )
- "clean exit")))
- (thread-start! th1)
- (thread-start! th2)
- (thread-join! th1)
- )
- )
-
- 0)
-
-
-;; TODO: for multiple areas, we will have multiple watchdogs; and multiple threads to manage
-(define (init-watchdog)
- (set! (bdat-watchdog-set! *bdat*)
- (make-thread
- (lambda ()
- (handle-exceptions
- exn
- (begin
- (print-call-chain)
- (print " message: " ((condition-property-accessor 'exn 'message) exn) ", exn=" exn))
- (common:watchdog)))
- "Watchdog thread"))
- (start-watchdog))
-
-(define (start-watchdog)
- ;;(if (not (args:get-arg "-server"))
- ;; (thread-start! *watchdog*)) ;; if starting a server; wait till we get to running state before kicking off watchdog
- (let* ((no-watchdog-args
- '("-list-runs"
- "-testdata-csv"
- "-list-servers"
- "-server"
- "-adjutant"
- "-list-disks"
- "-list-targets"
- "-show-runconfig"
- ;;"-list-db-targets"
- "-show-runconfig"
- "-show-config"
- "-show-cmdinfo"
- "-cleanup-db"
- ))
- (no-watchdog-argvals (list '("-archive" . "replicate-db")))
- (start-watchdog-specail-arg-val (let loop ((hed (car no-watchdog-argvals))
- (tail (cdr no-watchdog-argvals)))
- ;; (print "hed" hed " arg " (args:get-arg (car hed)) " val:" (cdr hed) " eql" (equal? (args:get-arg (car hed)) (cdr hed)))
- (if (equal? (args:get-arg (car hed)) (cdr hed))
- #f
- (if (null? tail)
- #t
- (loop (car tail) (cdr tail))))))
- (no-watchdog-args-vals (filter (lambda (x) x)
- (map args:get-arg no-watchdog-args)))
- (start-watchdog (and (null? no-watchdog-args-vals) start-watchdog-specail-arg-val)))
- ;(print "no-watchdog-args="no-watchdog-args "no-watchdog-args-vals="no-watchdog-args-vals " start-watchdog-specail-arg-val:" start-watchdog-specail-arg-val " start-watchdog:" start-watchdog)
- (if start-watchdog
- (thread-start! (bdat-watchdog *bdat*)))))
-
-;;======================================================================
-;; TODO: for multiple areas, we will have multiple watchdogs; and multiple threads to manage
-(define (common:watchdog)
- (debug:print-info 13 *default-log-port* "common:watchdog entered.")
- (if (launch:setup)
- (if (common:on-homehost?)
- (let ((dbstruct (db:setup #t)))
- (debug:print-info 13 *default-log-port* "after db:setup with dbstruct=" dbstruct)
- (cond
- ((dbr:dbstruct-read-only dbstruct)
- (debug:print-info 13 *default-log-port* "loading read-only watchdog")
- (common:readonly-watchdog dbstruct))
- (else
- (debug:print-info 13 *default-log-port* "loading writable-watchdog.")
- (let* ((syncer (or (configf:lookup *configdat* "server" "sync-method") "brute-force-sync")))
- (cond
- ((equal? syncer "brute-force-sync")
- (server:writable-watchdog-bruteforce dbstruct))
- ((equal? syncer "delta-sync")
- (server:writable-watchdog-deltasync dbstruct))
- (else
- (debug:print-error 0 *default-log-port* "Unknown server/sync-method specified ("syncer") - valid values are brute-force-sync and delta-sync.")
- (exit 1)))
- ;;(debug:print 1 *default-log-port* "INFO: ["(common:human-time)"] Syncer started (method="syncer")")
- )))
- (debug:print-info 13 *default-log-port* "watchdog done."))
- (debug:print-info 13 *default-log-port* "no need for watchdog on non-homehost"))))
-
-
-;;======================================================================
-;; currently the primary job of the watchdog is to run the sync back to megatest.db from the db in /tmp
-;; if we are on the homehost and we are a server (by definition we are on the homehost if we are a server)
-;;
-(define (common:readonly-watchdog dbstruct)
- (let ((just-testing 0.0501))
- (thread-sleep! just-testing)) ;; (/ 1 20)) ;; 0.051) ;; delay for startup
- (debug:print-info 13 *default-log-port* "common:readonly-watchdog entered.")
- ;; sync megatest.db to /tmp/.../megatst.db
- (let* ((sync-cool-off-duration 3)
- (golden-mtdb (dbr:dbstruct-mtdb dbstruct))
- (golden-mtpath (db:dbdat-get-path golden-mtdb))
- (tmp-mtdb (dbr:dbstruct-tmpdb dbstruct))
- (tmp-mtpath (db:dbdat-get-path tmp-mtdb)))
- (debug:print-info 0 *default-log-port* "Read-only periodic sync thread started.")
- (let loop ((last-sync-time 0))
- (debug:print-info 13 *default-log-port* "loop top tmp-mtpath="tmp-mtpath" golden-mtpath="golden-mtpath)
- (let* ((duration-since-last-sync (- (current-seconds) last-sync-time)))
- (debug:print-info 13 *default-log-port* "duration-since-last-sync="duration-since-last-sync)
- (if (and (not (bdat-time-to-exit *bdat*))
- (< duration-since-last-sync sync-cool-off-duration))
- (thread-sleep! (- sync-cool-off-duration duration-since-last-sync)))
- (if (not (bdat-time-to-exit *bdat*))
- (let ((golden-mtdb-mtime (file-modification-time golden-mtpath))
- (tmp-mtdb-mtime (file-modification-time tmp-mtpath)))
- (if (> golden-mtdb-mtime tmp-mtdb-mtime)
- (if (< golden-mtdb-mtime (- (current-seconds) 3)) ;; file has NOT been touched in past three seconds, this way multiple servers won't fight to sync back
- (let ((res (db:multi-db-sync dbstruct 'old2new)))
- (debug:print-info 13 *default-log-port* "rosync called, " res " records transferred."))))
- (loop (current-seconds)))
- #t)))
- (debug:print-info 0 *default-log-port* "Exiting readonly-watchdog timer, (bdat-time-to-exit *bdat*) = " (bdat-time-to-exit *bdat*)" pid="(current-process-id)" mtpath="golden-mtpath)))
-
-;;======================================================================
-;; from metadat lookup MEGATEST_VERSION
-;;
-(define (common:get-last-run-version) ;; RADT => How does this work in send-receive function??; assume it is the value saved in some DB
- (rmt:get-var "MEGATEST_VERSION"))
-
-(define (common:get-last-run-version-number)
- (string->number
- (substring (common:get-last-run-version) 0 6)))
-
-(define (common:set-last-run-version)
- (rmt:set-var "MEGATEST_VERSION" (common:version-signature)))
-
-;;======================================================================
-;; postive number if megatest version > db version
-;; negative number if megatest version < db version
-(define (common:version-db-delta)
- (- megatest-version (common:get-last-run-version-number)))
-
-(define (common:version-changed?)
- (not (equal? (common:get-last-run-version)
- (common:version-signature))))
-
-(define (common:api-changed?)
- (not (equal? (substring (->string megatest-version) 0 4)
- (substring (conc (common:get-last-run-version)) 0 4))))
-
-;;======================================================================
-;; Move me elsewhere ...
-;; RADT => Why do we meed the version check here, this is called only if version misma
-;;
-(define (common:cleanup-db dbstruct #!key (full #f))
- (apply db:multi-db-sync
- dbstruct
- 'schema
- ;; 'new2old
- 'killservers
- 'adj-target
- ;; 'old2new
- 'new2old
- ;; (if full
- '(dejunk)
- ;; '())
- )
- (if (common:api-changed?)
- (common:set-last-run-version)))
-
-;;======================================================================
-;; Force a megatest cleanup-db if version is changed and skip-version-check not specified
-;; Do NOT check if not on homehost!
-;;
-(define (common:exit-on-version-changed)
- (if (common:on-homehost?)
- (if (common:api-changed?)
- (let* ((mtconf (conc (get-environment-variable "MT_RUN_AREA_HOME") "/megatest.config"))
- (dbfile (conc (get-environment-variable "MT_RUN_AREA_HOME") "/megatest.db"))
- (read-only (not (file-writable? dbfile)))
- (dbstruct (db:setup #t)))
- (debug:print 0 *default-log-port*
- "WARNING: Version mismatch!\n"
- " expected: " (common:version-signature) "\n"
- " got: " (common:get-last-run-version))
- (cond
- ((get-environment-variable "MT_SKIP_DB_MIGRATE") #t)
- ((and (common:file-exists? mtconf) (common:file-exists? dbfile) (not read-only)
- (eq? (current-user-id)(file-owner mtconf))) ;; safe to run -cleanup-db
- (debug:print 0 *default-log-port* " I see you are the owner of megatest.config, attempting to cleanup and reset to new version")
- (handle-exceptions
- exn
- (begin
- (debug:print 0 *default-log-port* "Failed to switch versions. exn=" exn)
- (debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn))
- (print-call-chain (current-error-port))
- (exit 1))
- (common:cleanup-db dbstruct)))
- ((not (common:file-exists? mtconf))
- (debug:print 0 *default-log-port* " megatest.config does not exist in this area. Cannot proceed with megatest version migration.")
- (exit 1))
- ((not (common:file-exists? dbfile))
- (debug:print 0 *default-log-port* " megatest.db does not exist in this area. Cannot proceed with megatest version migration.")
- (exit 1))
- ((not (eq? (current-user-id)(file-owner mtconf)))
- (debug:print 0 *default-log-port* " You do not own megatest.db in this area. Cannot proceed with megatest version migration.")
- (exit 1))
- (read-only
- (debug:print 0 *default-log-port* " You have read-only access to this area. Cannot proceed with megatest version migration.")
- (exit 1))
- (else
- (debug:print 0 *default-log-port* " to switch versions you can run: \"megatest -cleanup-db\"")
- (exit 1)))))))
-;;======================================================================
-;; (begin
-;; (debug:print 0 *default-log-port* "ERROR: cannot migrate version unless on homehost. Exiting.")
-;; (exit 1))))
-
-(define (common:run-sync?)
- (and (common:on-homehost?)
- (args:get-arg "-server")))
-
Index: commonmod.scm
==================================================================
--- commonmod.scm
+++ commonmod.scm
@@ -3660,7 +3660,85 @@
(let ((fields (configf:get-section confdat "fields")))
(string-join
(map (lambda (field)(conc (car field) " " (cadr field)))
fields)
",")))
+
+
+;;======================================================================
+;; N A N O M S G C L I E N T
+;;======================================================================
+;;
+;;
+;;
+;; (define (common:send-dboard-main-changed)
+;; (let* ((dashboard-ips (mddb:get-dashboards)))
+;; (for-each
+;; (lambda (ipadr)
+;; (let* ((soc (common:open-nm-req (conc "tcp://" ipadr)))
+;; (msg (conc "main " *toppath*))
+;; (res (common:nm-send-receive-timeout soc msg)))
+;; (if (not res) ;; couldn't reach that dashboard - remove it from db
+;; (print "ERROR: couldn't reach dashboard " ipadr))
+;; res))
+;; dashboard-ips)))
+;;
+;;
+;; ;;======================================================================
+;; ;; D A S H B O A R D D B
+;; ;;======================================================================
+;;
+;; (define (mddb:open-db)
+;; (let* ((db (open-database (conc (get-environment-variable "HOME") "/.dashboard.db"))))
+;; (set-busy-handler! db (busy-timeout 10000))
+;; (for-each
+;; (lambda (qry)
+;; (exec (sql db qry)))
+;; (list
+;; "CREATE TABLE IF NOT EXISTS vars (id INTEGER PRIMARY KEY,key TEXT, val TEXT, CONSTRAINT varsconstraint UNIQUE (key));"
+;; "CREATE TABLE IF NOT EXISTS dashboards (
+;; id INTEGER PRIMARY KEY,
+;; pid INTEGER,
+;; username TEXT,
+;; hostname TEXT,
+;; ipaddr TEXT,
+;; portnum INTEGER,
+;; start_time TIMESTAMP DEFAULT (strftime('%s','now')),
+;; CONSTRAINT hostport UNIQUE (hostname,portnum)
+;; );"
+;; ))
+;; db))
+;;
+;; ;; register a dashboard
+;; ;;
+;; (define (mddb:register-dashboard port)
+;; (let* ((pid (current-process-id))
+;; (hostname (get-host-name))
+;; (ipaddr (server:get-best-guess-address hostname))
+;; (username (current-user-name)) ;; (car userinfo)))
+;; (db (mddb:open-db)))
+;; (print "Register monitor, pid: " pid ", hostname: " hostname ", port: " port ", username: " username)
+;; (exec (sql db "INSERT OR REPLACE INTO dashboards (pid,username,hostname,ipaddr,portnum) VALUES (?,?,?,?,?);")
+;; pid username hostname ipaddr port)
+;; (close-database db)))
+;;
+;; ;; unregister a monitor
+;; ;;
+;; (define (mddb:unregister-dashboard host port)
+;; (let* ((db (mddb:open-db)))
+;; (print "Register unregister monitor, host:port=" host ":" port)
+;; (exec (sql db "DELETE FROM dashboards WHERE hostname=? AND portnum=?;") host port)
+;; (close-database db)))
+;;
+;; ;; get registered dashboards
+;; ;;
+;; (define (mddb:get-dashboards)
+;; (let ((db (mddb:open-db)))
+;; (query fetch-column
+;; (sql db "SELECT ipaddr || ':' || portnum FROM dashboards;"))))
+
+
+
+
+
)
Index: dbmod.scm
==================================================================
--- dbmod.scm
+++ dbmod.scm
@@ -25,10 +25,12 @@
(declare (uses mtargs))
(declare (uses mtver))
(declare (uses csv-xml))
(declare (uses keysmod))
(declare (uses mtmod))
+(declare (uses pkts))
+(declare (uses dbi))
(module dbmod
*
(import scheme
@@ -69,11 +71,12 @@
configfmod
debugprint
keysmod
mtmod
mtver
-
+ pkts
+ (prefix dbi dbi:)
)
;;======================================================================
;; Database access
;;======================================================================
@@ -5419,7 +5422,70 @@
(define (mt:get-run-stats dbstruct run-id)
;; Get run stats from local access, move this ... but where?
(db:get-run-stats dbstruct run-id))
+
+;; When using zmq this would send the message back (two step process)
+;; with spiffy or rpc this simply returns the return data to be returned
+;;
+(define (server:reply return-addr query-sig success/fail result)
+ (debug:print-info 11 *default-log-port* "server:reply return-addr=" return-addr ", result=" result)
+ (db:obj->string (vector success/fail query-sig result)))
+
+
+(define (common:with-queue-db mtconf proc #!key (use-lt #f)(toppath-in #f))
+ (let* ((pktsdirs (common:get-pkts-dirs mtconf use-lt))
+ (pktsdir (if pktsdirs (car pktsdirs) #f))
+ (toppath (or (configf:lookup mtconf "scratchdat" "toppath")
+ toppath-in))
+ (pdbpath (or (configf:lookup mtconf "setup" "pdbpath") pktsdir)))
+ (cond
+ ((not (and pktsdir toppath pdbpath))
+ (debug:print 0 *default-log-port* "ERROR: settings are missing in your megatest.config for area management.")
+ (debug:print 0 *default-log-port* " you need to have pktsdirs in the [setup] section."))
+ ((not (common:file-exists? pktsdir))
+ (debug:print 0 *default-log-port* "ERROR: pkts directory not found " pktsdir))
+ ((not (equal? (file-owner pktsdir)(current-effective-user-id)))
+ (debug:print 0 *default-log-port* "ERROR: directory " pktsdir " is not owned by " (current-effective-user-name)))
+ (else
+ (let* ((pdb (open-queue-db pdbpath "pkts.db"
+ schema: '("CREATE TABLE groups (id INTEGER PRIMARY KEY,groupname TEXT, CONSTRAINT group_constraint UNIQUE (groupname));"))))
+ (proc pktsdirs pktsdir pdb)
+ (dbi:close pdb))))))
+
+(define (common:load-pkts-to-db mtconf #!key (use-lt #f))
+ (common:with-queue-db
+ mtconf
+ (lambda (pktsdirs pktsdir pdb)
+ (for-each
+ (lambda (pktsdir) ;; look at all
+ (cond
+ ((not (common:file-exists? pktsdir))
+ (debug:print 0 *default-log-port* "ERROR: packets directory " pktsdir " does not exist."))
+ ((not (directory? pktsdir))
+ (debug:print 0 *default-log-port* "ERROR: packets directory path " pktsdir " is not a directory."))
+ ((not (file-readable? pktsdir))
+ (debug:print 0 *default-log-port* "ERROR: packets directory path " pktsdir " is not readable."))
+ (else
+ (debug:print-info 0 *default-log-port* "Loading packets found in " pktsdir)
+ (let ((pkts (glob (conc pktsdir "/*.pkt"))))
+ (for-each
+ (lambda (pkt)
+ (let* ((uuid (cadr (string-match ".*/([0-9a-f]+).pkt" pkt)))
+ (exists (lookup-by-uuid pdb uuid #f)))
+ (if (not exists)
+ (let* ((pktdat (string-intersperse
+ (with-input-from-file pkt read-lines)
+ "\n"))
+ (apkt (pkt->alist pktdat))
+ (ptype (alist-ref 'T apkt)))
+ (add-to-queue pdb pktdat uuid (or ptype 'cmd) #f 0)
+ (debug:print 4 *default-log-port* "Added " uuid " of type " ptype " to queue"))
+ (debug:print 4 *default-log-port* "pkt: " uuid " exists, skipping...")
+ )))
+ pkts)))))
+ pktsdirs))
+ use-lt: use-lt))
+
)
Index: deps.pdf
==================================================================
--- deps.pdf
+++ deps.pdf
cannot compute difference between binary files
DELETED http-transport.scm
Index: http-transport.scm
==================================================================
--- http-transport.scm
+++ /dev/null
@@ -1,18 +0,0 @@
-
-;; Copyright 2006-2012, Matthew Welland.
-;;
-;; This file is part of Megatest.
-;;
-;; Megatest is free software: you can redistribute it and/or modify
-;; it under the terms of the GNU General Public License as published by
-;; the Free Software Foundation, either version 3 of the License, or
-;; (at your option) any later version.
-;;
-;; Megatest is distributed in the hope that it will be useful,
-;; but WITHOUT ANY WARRANTY; without even the implied warranty of
-;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-;; GNU General Public License for more details.
-;;
-;; You should have received a copy of the GNU General Public License
-;; along with Megatest. If not, see .
-
Index: http-transportmod.scm
==================================================================
--- http-transportmod.scm
+++ http-transportmod.scm
@@ -700,72 +700,10 @@
;; (thread-start! th2)
;; (thread-start! th1)
;; (thread-join! th2))))
-
-
-;; called in megatest.scm, host-port is string hostname:port
-;;
-;; NOTE: This is NOT called directly from clients as not all transports support a client running
-;; in the same process as the server.
-;;
-(define (server:ping host-port-in server-id #!key (do-exit #f))
- (let ((host:port (if (not host-port-in) ;; use read-dotserver to find
- #f ;; (server:check-if-running *toppath*)
- ;; (if (number? host-port-in) ;; we were handed a server-id
- ;; (let ((srec (tasks:get-server-by-id (db:delay-if-busy (tasks:open-db)) host-port-in)))
- ;; ;; (print "srec: " srec " host-port-in: " host-port-in)
- ;; (if srec
- ;; (conc (vector-ref srec 3) ":" (vector-ref srec 4))
- ;; (conc "no such server-id " host-port-in)))
- host-port-in))) ;; )
- (let* ((host-port (if host:port
- (let ((slst (string-split host:port ":")))
- (if (eq? (length slst) 2)
- (list (car slst)(string->number (cadr slst)))
- #f))
- #f)))
-;; (toppath (launch:setup)))
- ;; (print "host-port=" host-port)
- (if (not host-port)
- (begin
- (if host-port-in
- (debug:print 0 *default-log-port* "ERROR: bad host:port"))
- (if do-exit (exit 1))
- #f)
- (let* ((iface (car host-port))
- (port (cadr host-port))
- (server-dat (http-transport:client-connect iface port server-id))
- (login-res (rmt:login-no-auto-client-setup server-dat)))
- (if (and (list? login-res)
- (car login-res))
- (begin
- ;; (print "LOGIN_OK")
- (if do-exit (exit 0))
- #t)
- (begin
- ;; (print "LOGIN_FAILED")
- (if do-exit (exit 1))
- #f)))))))
-
-;; This login does no retries under the hood - it acts a bit like a ping.
-;; Deprecated for nmsg-transport.
-;;
-(define (rmt:login-no-auto-client-setup connection-info)
- (rmt:send-receive-no-auto-client-setup connection-info 'login 0 (list *toppath* megatest-version *my-client-signature*)))
-
-(define (rmt:send-receive-no-auto-client-setup connection-info cmd run-id params)
- (let* ((run-id (if run-id run-id 0))
- (res (handle-exceptions
- exn
- #f
- (http-transport:client-api-send-receive run-id connection-info cmd params))))
- (if (and res (vector-ref res 0))
- (vector-ref res 1) ;;; YES!! THIS IS CORRECT!! CHANGE IT HERE, THEN CHANGE rmt:send-receive ALSO!!!
- #f)))
-
;; Get the transport
(define (server:get-transport)
(if *transport-type*
*transport-type*
@@ -782,8 +720,73 @@
(with-output-to-string
(lambda ()
(write (list (current-directory)
(current-process-id)
(argv)))))))
+
+(define (server:get-client-signature)
+ (if *my-client-signature* *my-client-signature*
+ (let ((sig (server:mk-signature)))
+ (set! *my-client-signature* sig)
+ *my-client-signature*)))
+
+;; run ping in separate process, safest way in some cases
+;;
+(define (server:ping-server ifaceport)
+ (with-input-from-pipe
+ (conc (common:get-megatest-exe) " -ping " ifaceport)
+ (lambda ()
+ (let loop ((inl (read-line))
+ (res "NOREPLY"))
+ (if (eof-object? inl)
+ (case (string->symbol res)
+ ((NOREPLY) #f)
+ ((LOGIN_OK) #t)
+ (else #f))
+ (loop (read-line) inl))))))
+
+
+;;======================================================================
+;; S E R V E R
+;;======================================================================
+
+;; Call this to start the actual server
+;;
+
+;; all routes though here end in exit ...
+;;
+;; start_server
+;;
+(define (server:launch run-id transport-type)
+ (http-transport:launch))
+
+;;======================================================================
+;; S E R V E R U T I L I T I E S
+;;======================================================================
+
+;; NOT USED (well, ok, reference in rpc-transport but otherwise not used).
+;;
+(define (server:login toppath)
+ (lambda (toppath)
+ (set! *db-last-access* (current-seconds)) ;; might not be needed.
+ (if (equal? *toppath* toppath)
+ #t
+ #f)))
+
+;; (define server:sync-lock-token "SERVER_SYNC_LOCK")
+;; (define (server:release-sync-lock)
+;; (db:no-sync-del! *no-sync-db* server:sync-lock-token))
+;; (define (server:have-sync-lock?)
+;; (let* ((have-lock-pair (db:no-sync-get-lock *no-sync-db* server:sync-lock-token))
+;; (have-lock? (car have-lock-pair))
+;; (lock-time (cdr have-lock-pair))
+;; (lock-age (- (current-seconds) lock-time)))
+;; (cond
+;; (have-lock? #t)
+;; ((>lock-age
+;; (* 3 (configf:lookup-number *configdat* "server" "minimum-intersync-delay" default: 180)))
+;; (server:release-sync-lock)
+;; (server:have-sync-lock?))
+;; (else #f))))
)
Index: launchmod.scm
==================================================================
--- launchmod.scm
+++ launchmod.scm
@@ -2269,8 +2269,368 @@
(launch:setup force-reread: #t)
;; (launch:cache-config) ;; there are two independent config cache locations, turning this one off for now. MRW.
)) ;; we can safely cache megatest.config since we have a valid runconfig
data))))
+;;======================================================================
+;; TODO: for multiple areas, we will have multiple watchdogs; and multiple threads to manage
+(define (common:watchdog)
+ (debug:print-info 13 *default-log-port* "common:watchdog entered.")
+ (if (launch:setup)
+ (if (common:on-homehost?)
+ (let ((dbstruct (db:setup #t)))
+ (debug:print-info 13 *default-log-port* "after db:setup with dbstruct=" dbstruct)
+ (cond
+ ((dbr:dbstruct-read-only dbstruct)
+ (debug:print-info 13 *default-log-port* "loading read-only watchdog")
+ (common:readonly-watchdog dbstruct))
+ (else
+ (debug:print-info 13 *default-log-port* "loading writable-watchdog.")
+ (let* ((syncer (or (configf:lookup *configdat* "server" "sync-method") "brute-force-sync")))
+ (cond
+ ((equal? syncer "brute-force-sync")
+ (server:writable-watchdog-bruteforce dbstruct))
+ ((equal? syncer "delta-sync")
+ (server:writable-watchdog-deltasync dbstruct))
+ (else
+ (debug:print-error 0 *default-log-port* "Unknown server/sync-method specified ("syncer") - valid values are brute-force-sync and delta-sync.")
+ (exit 1)))
+ ;;(debug:print 1 *default-log-port* "INFO: ["(common:human-time)"] Syncer started (method="syncer")")
+ )))
+ (debug:print-info 13 *default-log-port* "watchdog done."))
+ (debug:print-info 13 *default-log-port* "no need for watchdog on non-homehost"))))
+
+;;======================================================================
+;; currently the primary job of the watchdog is to run the sync back to megatest.db from the db in /tmp
+;; if we are on the homehost and we are a server (by definition we are on the homehost if we are a server)
+;;
+(define (common:readonly-watchdog dbstruct)
+ (let ((just-testing 0.0501))
+ (thread-sleep! just-testing)) ;; (/ 1 20)) ;; 0.051) ;; delay for startup
+ (debug:print-info 13 *default-log-port* "common:readonly-watchdog entered.")
+ ;; sync megatest.db to /tmp/.../megatst.db
+ (let* ((sync-cool-off-duration 3)
+ (golden-mtdb (dbr:dbstruct-mtdb dbstruct))
+ (golden-mtpath (db:dbdat-get-path golden-mtdb))
+ (tmp-mtdb (dbr:dbstruct-tmpdb dbstruct))
+ (tmp-mtpath (db:dbdat-get-path tmp-mtdb)))
+ (debug:print-info 0 *default-log-port* "Read-only periodic sync thread started.")
+ (let loop ((last-sync-time 0))
+ (debug:print-info 13 *default-log-port* "loop top tmp-mtpath="tmp-mtpath" golden-mtpath="golden-mtpath)
+ (let* ((duration-since-last-sync (- (current-seconds) last-sync-time)))
+ (debug:print-info 13 *default-log-port* "duration-since-last-sync="duration-since-last-sync)
+ (if (and (not (bdat-time-to-exit *bdat*))
+ (< duration-since-last-sync sync-cool-off-duration))
+ (thread-sleep! (- sync-cool-off-duration duration-since-last-sync)))
+ (if (not (bdat-time-to-exit *bdat*))
+ (let ((golden-mtdb-mtime (file-modification-time golden-mtpath))
+ (tmp-mtdb-mtime (file-modification-time tmp-mtpath)))
+ (if (> golden-mtdb-mtime tmp-mtdb-mtime)
+ (if (< golden-mtdb-mtime (- (current-seconds) 3)) ;; file has NOT been touched in past three seconds, this way multiple servers won't fight to sync back
+ (let ((res (db:multi-db-sync dbstruct 'old2new)))
+ (debug:print-info 13 *default-log-port* "rosync called, " res " records transferred."))))
+ (loop (current-seconds)))
+ #t)))
+ (debug:print-info 0 *default-log-port* "Exiting readonly-watchdog timer, (bdat-time-to-exit *bdat*) = " (bdat-time-to-exit *bdat*)" pid="(current-process-id)" mtpath="golden-mtpath)))
+
+
+;; TODO: for multiple areas, we will have multiple watchdogs; and multiple threads to manage
+(define (init-watchdog)
+ (set! (bdat-watchdog-set! *bdat*)
+ (make-thread
+ (lambda ()
+ (handle-exceptions
+ exn
+ (begin
+ (print-call-chain)
+ (print " message: " ((condition-property-accessor 'exn 'message) exn) ", exn=" exn))
+ (common:watchdog)))
+ "Watchdog thread"))
+ (start-watchdog))
+
+(define (start-watchdog)
+ ;;(if (not (args:get-arg "-server"))
+ ;; (thread-start! *watchdog*)) ;; if starting a server; wait till we get to running state before kicking off watchdog
+ (let* ((no-watchdog-args
+ '("-list-runs"
+ "-testdata-csv"
+ "-list-servers"
+ "-server"
+ "-adjutant"
+ "-list-disks"
+ "-list-targets"
+ "-show-runconfig"
+ ;;"-list-db-targets"
+ "-show-runconfig"
+ "-show-config"
+ "-show-cmdinfo"
+ "-cleanup-db"
+ ))
+ (no-watchdog-argvals (list '("-archive" . "replicate-db")))
+ (start-watchdog-specail-arg-val (let loop ((hed (car no-watchdog-argvals))
+ (tail (cdr no-watchdog-argvals)))
+ ;; (print "hed" hed " arg " (args:get-arg (car hed)) " val:" (cdr hed) " eql" (equal? (args:get-arg (car hed)) (cdr hed)))
+ (if (equal? (args:get-arg (car hed)) (cdr hed))
+ #f
+ (if (null? tail)
+ #t
+ (loop (car tail) (cdr tail))))))
+ (no-watchdog-args-vals (filter (lambda (x) x)
+ (map args:get-arg no-watchdog-args)))
+ (start-watchdog (and (null? no-watchdog-args-vals) start-watchdog-specail-arg-val)))
+ ;(print "no-watchdog-args="no-watchdog-args "no-watchdog-args-vals="no-watchdog-args-vals " start-watchdog-specail-arg-val:" start-watchdog-specail-arg-val " start-watchdog:" start-watchdog)
+ (if start-watchdog
+ (thread-start! (bdat-watchdog *bdat*)))))
+
+(define (server:writable-watchdog-deltasync dbstruct)
+ (thread-sleep! 0.054) ;; delay for startup
+ (let ((legacy-sync (common:run-sync?))
+ (sync-stale-seconds (configf:lookup-number *configdat* "server" "sync-stale-seconds" default: 300))
+ (debug-mode (debug:debug-mode 1))
+ (last-time (current-seconds))
+ (no-sync-db (db:open-no-sync-db))
+ (stmt-cache (dbr:dbstruct-stmt-cache dbstruct))
+ (sync-duration 0) ;; run time of the sync in milliseconds
+ )
+ (set! *no-sync-db* no-sync-db) ;; make the no sync db available to api calls
+ (debug:print-info 2 *default-log-port* "Periodic sync thread started.")
+ (debug:print-info 3 *default-log-port* "watchdog starting. legacy-sync is " legacy-sync" pid="(current-process-id) );; " this-wd-num="this-wd-num)
+ (if (and legacy-sync (not (bdat-time-to-exit *bdat*)))
+ (let* (;;(dbstruct (db:setup))
+ (mtdb (dbr:dbstruct-mtdb dbstruct))
+ (mtpath (db:dbdat-get-path mtdb))
+ (tmp-area (common:get-db-tmp-area))
+ (start-file (conc tmp-area "/.start-sync"))
+ (end-file (conc tmp-area "/.end-sync")))
+ (debug:print-info 0 *default-log-port* "Server running, periodic sync started.")
+ (let loop ()
+ ;; sync for filesystem local db writes
+ ;;
+ (mutex-lock! *db-multi-sync-mutex*)
+ (let* ((need-sync (>= *db-last-access* *db-last-sync*)) ;; no sync since last write
+ (sync-in-progress *db-sync-in-progress*)
+ (min-intersync-delay (configf:lookup-number *configdat* "server" "minimum-intersync-delay" default: 5))
+ (should-sync (and (not (bdat-time-to-exit *bdat*))
+ (> (- (current-seconds) *db-last-sync*) min-intersync-delay))) ;; sync every five seconds minimum, deprecated logic, can probably be removed
+ (start-time (current-seconds))
+ (cpu-load-adj (alist-ref 'adj-proc-load (common:get-normalized-cpu-load #f)))
+ (mt-mod-time (file-modification-time mtpath))
+ (last-sync-start (if (common:file-exists? start-file)
+ (file-modification-time start-file)
+ 0))
+ (last-sync-end (if (common:file-exists? end-file)
+ (file-modification-time end-file)
+ 10))
+ (sync-period (+ 3 (* cpu-load-adj 30))) ;; as adjusted load increases increase the sync period
+ (recently-synced (and (< (- start-time mt-mod-time) sync-period) ;; not useful if sync didn't modify megatest.db!
+ (< mt-mod-time last-sync-start)))
+ (sync-done (<= last-sync-start last-sync-end))
+ (sync-stale (> start-time (+ last-sync-start sync-stale-seconds)))
+ (will-sync (and (not (bdat-time-to-exit *bdat*)) ;; do not start a sync if we are in the process of exiting
+ (or need-sync should-sync)
+ (or sync-done sync-stale)
+ (not sync-in-progress)
+ (not recently-synced))))
+ (debug:print-info 13 *default-log-port* "WD writable-watchdog top of loop. need-sync="need-sync" sync-in-progress=" sync-in-progress
+ " should-sync="should-sync" start-time="start-time" mt-mod-time="mt-mod-time" recently-synced="recently-synced" will-sync="will-sync
+ " sync-done=" sync-done " sync-period=" sync-period)
+ (if (and (> sync-period 5)
+ (common:low-noise-print 30 "sync-period"))
+ (debug:print-info 0 *default-log-port* "Increased sync period due to long sync times, sync took: " sync-period " seconds."))
+ ;; (if recently-synced (debug:print-info 0 *default-log-port* "Skipping sync due to recently-synced flag=" recently-synced))
+ ;; (debug:print-info 0 *default-log-port* "need-sync: " need-sync " sync-in-progress: " sync-in-progress " should-sync: " should-sync " will-sync: " will-sync)
+ (if will-sync (set! *db-sync-in-progress* #t))
+ (mutex-unlock! *db-multi-sync-mutex*)
+ (if will-sync
+ (let (;; (max-sync-duration (configf:lookup-number *configdat* "server" "max-sync-duration")) ;; KEEPING THIS AVAILABLE BUT SHOULD NOT USE, I'M PRETTY SURE IT DOES NOT WORK!
+ (sync-start (current-milliseconds)))
+ (with-output-to-file start-file (lambda ()(print (current-process-id))))
+
+ ;; put lock here
+
+ ;; (if (or (not max-sync-duration)
+ ;; (< sync-duration max-sync-duration)) ;; NOTE: db:sync-to-megatest.db keeps track of time of last sync and syncs incrementally
+ (let ((res (db:sync-to-megatest.db dbstruct no-sync-db: no-sync-db))) ;; did we sync any data? If so need to set the db touched flag to keep the server alive
+ (set! sync-duration (- (current-milliseconds) sync-start))
+ (if (> res 0) ;; some records were transferred, keep the db alive
+ (begin
+ (mutex-lock! *heartbeat-mutex*)
+ (set! *db-last-access* (current-seconds))
+ (mutex-unlock! *heartbeat-mutex*)
+ (debug:print-info 0 *default-log-port* "sync called, " res " records transferred."))
+ (debug:print-info 2 *default-log-port* "sync called but zero records transferred")))))
+;; ;; TODO: factor this next routine out into a function
+;; (with-input-from-pipe ;; this should not block other threads but need to verify this
+;; (conc "megatest -sync-to-megatest.db -m testsuite:" (common:get-area-name) ":" *toppath*)
+;; (lambda ()
+;; (let loop ((inl (read-line))
+;; (res #f))
+;; (if (eof-object? inl)
+;; (begin
+;; (set! sync-duration (- (current-milliseconds) sync-start))
+;; (cond
+;; ((not res)
+;; (debug:print 0 *default-log-port* "ERROR: sync from /tmp db to megatest.db appears to have failed. Recommended that you stop your runs and run \"megatest -cleanup-db\""))
+;; ((> res 0)
+;; (mutex-lock! *heartbeat-mutex*)
+;; (set! *db-last-access* (current-seconds))
+;; (mutex-unlock! *heartbeat-mutex*))))
+;; (let ((num-synced (let ((matches (string-match "^Synced (\\d+).*$" inl)))
+;; (if matches
+;; (string->number (cadr matches))
+;; #f))))
+;; (loop (read-line)
+;; (or num-synced res))))))))))
+ (if will-sync
+ (begin
+ (mutex-lock! *db-multi-sync-mutex*)
+ (set! *db-sync-in-progress* #f)
+ (set! *db-last-sync* start-time)
+ (with-output-to-file end-file (lambda ()(print (current-process-id))))
+
+ ;; release lock here
+
+ (mutex-unlock! *db-multi-sync-mutex*)))
+ (if (and debug-mode
+ (> (- start-time last-time) 60))
+ (begin
+ (set! last-time start-time)
+ (debug:print-info 4 *default-log-port* "timestamp -> " (seconds->time-string (current-seconds)) ", time since start -> " (seconds->hr-min-sec (- (current-seconds) *time-zero*))))))
+
+ ;; keep going unless time to exit
+ ;;
+ (if (not (bdat-time-to-exit *bdat*))
+ (let delay-loop ((count 0))
+ ;;(debug:print-info 13 *default-log-port* "delay-loop top; count="count" pid="(current-process-id)" this-wd-num="this-wd-num" *time-to-exit*="(bdat-time-to-exit *bdat*))
+
+ (if (and (not (bdat-time-to-exit *bdat*))
+ (< count 6)) ;; was 11, changing to 4.
+ (begin
+ (thread-sleep! 1)
+ (delay-loop (+ count 1))))
+ (if (not (bdat-time-to-exit *bdat*)) (loop))))
+ ;; time to exit, close the no-sync db here
+ (db:no-sync-close-db no-sync-db stmt-cache)
+ (if (common:low-noise-print 30)
+ (debug:print-info 0 *default-log-port* "Exiting watchdog timer, *time-to-exit* = " (bdat-time-to-exit *bdat*)" pid="(current-process-id) ))))))) ;;" this-wd-num="this-wd-num)))))))
+
+(define (server:writable-watchdog-bruteforce dbstruct)
+ (thread-sleep! 1) ;; delay for startup
+ (let* ((do-a-sync (server:get-bruteforce-syncer dbstruct))
+ (final-sync (server:get-bruteforce-syncer dbstruct fork-to-background: #t persist-until-sync: #t)))
+ (when (and (not (args:get-arg "-sync-to-megatest.db")) ;; conditions under which we do not run the sync
+ (args:get-arg "-server"))
+
+ (let loop ()
+ (do-a-sync)
+ (if (not (bdat-time-to-exit *bdat*)) (loop))) ;; keep going unless time to exit
+
+ ;; time to exit, close the no-sync db here
+ (final-sync)
+
+ (if (common:low-noise-print 30)
+ (debug:print-info 0 *default-log-port* "Exiting watchdog timer, *time-to-exit* = "(bdat-time-to-exit *bdat*)" pid="(current-process-id)
+ )))))
+
+;; moving this here as it needs access to db and cannot be in common.
+;;
+
+(define (server:get-bruteforce-syncer dbstruct #!key (fork-to-background #f) (persist-until-sync #f))
+ (let* ((sqlite-exe (or (get-environment-variable "MT_SQLITE3_EXE"))) ;; defined in cfg.sh
+ (sync-log (or (args:get-arg "-sync-log") (conc *toppath* "/logs/sync-" (current-process-id) "-" (get-host-name) ".log")))
+ (tmp-area (common:get-db-tmp-area))
+ (tmp-db (conc tmp-area "/megatest.db"))
+ (staging-file (conc *toppath* "/.megatest.db"))
+ (mtdbfile (conc *toppath* "/megatest.db"))
+ (lockfile (common:get-sync-lock-filepath))
+ (sync-cmd-core (conc sqlite-exe" " tmp-db " .dump | "sqlite-exe" " staging-file "&>"sync-log))
+ (sync-cmd (if fork-to-background
+ (conc "/usr/bin/env NBFAKE_LOG="*toppath*"/logs/last-server-sync-"(current-process-id)".log nbfake \""sync-cmd-core" && /bin/mv -f " staging-file " " mtdbfile" \"")
+ sync-cmd-core))
+ (default-min-intersync-delay 2)
+ (min-intersync-delay (configf:lookup-number *configdat* "server" "minimum-intersync-delay" default: default-min-intersync-delay))
+ (default-duty-cycle 0.1)
+ (duty-cycle (configf:lookup-number *configdat* "server" "sync-duty-cycle" default: default-duty-cycle))
+ (last-sync-seconds 10) ;; we will adjust this to a measurement and delay last-sync-seconds * (1 - duty-cycle)
+ (calculate-off-time (lambda (work-duration duty-cycle)
+ (* (/ (- 1 duty-cycle) duty-cycle) last-sync-seconds)))
+ (off-time min-intersync-delay) ;; adjusted in closure below.
+ (do-a-sync
+ (lambda ()
+ ;; (BB> "Start do-a-sync with fork-to-background="fork-to-background" persist-until-sync="persist-until-sync)
+ (let* ((finalres
+ (let retry-loop ((num-tries 0))
+ (if (common:simple-file-lock lockfile)
+ (begin
+ (cond
+ ((not (or fork-to-background persist-until-sync))
+ (debug:print 0 *default-log-port* "INFO: syncer thread sleeping for max of (server.minimum-intersync-delay="min-intersync-delay
+ " , off-time="off-time" seconds ]")
+ (thread-sleep! (max off-time min-intersync-delay)))
+ (else
+ (debug:print 0 *default-log-port* "INFO: syncer thread NOT sleeping ; maybe time-to-exit...")))
+
+ (if (not (configf:lookup *configdat* "server" "disable-db-snapshot"))
+ (common:snapshot-file mtdbfile subdir: ".db-snapshot"))
+ (delete-file* staging-file)
+ (let* ((start-time (current-milliseconds))
+ (res (system sync-cmd))
+ (dbbackupfile (conc mtdbfile ".backup"))
+ (res2
+ (cond
+ ((eq? 0 res )
+ (handle-exceptions
+ exn
+ #f
+ (if (file-exists? dbbackupfile)
+ (delete-file* dbbackupfile)
+ )
+ (if (eq? 0 (file-size sync-log))
+ (delete-file* sync-log))
+ (system (conc "/bin/mv " staging-file " " mtdbfile))
+
+ (set! last-sync-seconds (/ (- (current-milliseconds) start-time) 1000))
+ (set! off-time (calculate-off-time
+ last-sync-seconds
+ (cond
+ ((and (number? duty-cycle) (> duty-cycle 0) (< duty-cycle 1))
+ duty-cycle)
+ (else
+ (debug:print 0 *default-log-port* "WARNING: ["(common:human-time)"] server.sync-duty-cycle is invalid. Should be a number between 0 and 1, but "duty-cycle" was specified. Using default value: "default-duty-cycle)
+ default-duty-cycle))))
+
+ (debug:print 1 *default-log-port* "INFO: ["(common:human-time)"] pid="(current-process-id)" SYNC took "last-sync-seconds" sec")
+ (debug:print 1 *default-log-port* "INFO: ["(common:human-time)"] pid="(current-process-id)" SYNC took "last-sync-seconds" sec ; with duty-cycle of "duty-cycle" off time is now "off-time)
+ 'sync-completed))
+ (else
+ (system (conc "/bin/cp "sync-log" "sync-log".fail"))
+ (debug:print 0 *default-log-port* "ERROR: ["(common:human-time)"] Sync failed. See log at "sync-log".fail")
+ (if (file-exists? (conc mtdbfile ".backup"))
+ (system (conc "/bin/cp "mtdbfile ".backup " mtdbfile)))
+ #f))))
+ (common:simple-file-release-lock lockfile)
+ ;; (BB> "released lockfile: " lockfile)
+ ;; (when (common:file-exists? lockfile)
+ ;; (BB> "DID NOT ACTUALLY RELEASE LOCKFILE"))
+ res2) ;; end let
+ );; end begin
+ ;; else
+ (cond
+ (persist-until-sync
+ (thread-sleep! 1)
+ (debug:print 1 *default-log-port* "INFO: ["(common:human-time)"] pid="(current-process-id)" other SYNC in progress; we're in a fork-to-background so we need to succeed. Let's wait a jiffy and and try again. num-tries="num-tries" (waiting for lockfile="lockfile" to disappear)")
+ (retry-loop (add1 num-tries)))
+ (else
+ (thread-sleep! (max off-time (+ last-sync-seconds min-intersync-delay)))
+ (debug:print 1 *default-log-port* "INFO: ["(common:human-time)"] pid="(current-process-id)" other SYNC in progress; not syncing.")
+ 'parallel-sync-in-progress))
+ ) ;; end if got lockfile
+ )
+ ))
+ ;; (BB> "End do-a-sync with fork-to-background="fork-to-background" persist-until-sync="persist-until-sync" and result="finalres)
+ finalres)
+ ) ;; end lambda
+ ))
+ do-a-sync))
)
Index: rmtmod.scm
==================================================================
--- rmtmod.scm
+++ rmtmod.scm
@@ -18,10 +18,11 @@
;;======================================================================
(declare (unit rmtmod))
(declare (uses commonmod))
+(declare (uses configfmod))
(declare (uses apimod))
(declare (uses itemsmod))
(declare (uses debugprint))
(declare (uses mtver))
(declare (uses tasksmod))
@@ -43,10 +44,14 @@
chicken.sort
chicken.time
chicken.base
chicken.file
chicken.format
+ chicken.process
+ chicken.file.posix
+ chicken.process-context.posix
+ chicken.process-context
(prefix sqlite3 sqlite3:)
typed-records
srfi-1
srfi-13
@@ -62,11 +67,12 @@
(prefix mtargs args:)
dbmod
http-transportmod
servermod
clientmod
-
+ configfmod
+
)
(defstruct alldat
(areapath #f)
(ulexdat #f)
@@ -1574,7 +1580,409 @@
result)
(let ((newres (rmt:get-prereqs-not-met run-id waitons ref-item-path mode: mode itemmaps: itemmaps)))
(hash-table-set! *pre-reqs-met-cache* key (vector (current-seconds) newres))
newres))))
+;;======================================================================
+;; from metadat lookup MEGATEST_VERSION
+;;
+(define (common:get-last-run-version) ;; RADT => How does this work in send-receive function??; assume it is the value saved in some DB
+ (rmt:get-var "MEGATEST_VERSION"))
+
+(define (common:get-last-run-version-number)
+ (string->number
+ (substring (common:get-last-run-version) 0 6)))
+
+(define (common:set-last-run-version)
+ (rmt:set-var "MEGATEST_VERSION" (common:version-signature)))
+
+;;======================================================================
+;; faux-lock is deprecated. Please use simple-lock below
+;;
+(define (common:faux-lock keyname #!key (wait-time 8)(allow-lock-steal #t))
+ (if (rmt:no-sync-get/default keyname #f) ;; do not be tempted to compare to pid. locking is a one-shot action, if already locked for this pid it doesn't actually count
+ (if (> wait-time 0)
+ (begin
+ (thread-sleep! 1)
+ (if (eq? wait-time 1) ;; only one second left, steal the lock
+ (begin
+ (debug:print-info 0 *default-log-port* "stealing lock for " keyname)
+ (common:faux-unlock keyname force: #t)))
+ (common:faux-lock keyname wait-time: (- wait-time 1)))
+ #f)
+ (begin
+ (rmt:no-sync-set keyname (conc (current-process-id)))
+ (equal? (conc (current-process-id)) (conc (rmt:no-sync-get/default keyname #f))))))
+
+(define (common:faux-unlock keyname #!key (force #f))
+ (if (or force (equal? (conc (current-process-id)) (conc (rmt:no-sync-get/default keyname #f))))
+ (begin
+ (if (rmt:no-sync-get/default keyname #f) (rmt:no-sync-del! keyname))
+ #t)
+ #f))
+
+;;======================================================================
+;; postive number if megatest version > db version
+;; negative number if megatest version < db version
+(define (common:version-db-delta)
+ (- megatest-version (common:get-last-run-version-number)))
+
+(define (common:version-changed?)
+ (not (equal? (common:get-last-run-version)
+ (common:version-signature))))
+
+(define (common:api-changed?)
+ (not (equal? (substring (->string megatest-version) 0 4)
+ (substring (conc (common:get-last-run-version)) 0 4))))
+
+;;======================================================================
+;; Move me elsewhere ...
+;; RADT => Why do we meed the version check here, this is called only if version misma
+;;
+(define (common:cleanup-db dbstruct #!key (full #f))
+ (apply db:multi-db-sync
+ dbstruct
+ 'schema
+ ;; 'new2old
+ 'killservers
+ 'adj-target
+ ;; 'old2new
+ 'new2old
+ ;; (if full
+ '(dejunk)
+ ;; '())
+ )
+ (if (common:api-changed?)
+ (common:set-last-run-version)))
+
+;; This login does no retries under the hood - it acts a bit like a ping.
+;; Deprecated for nmsg-transport.
+;;
+(define (rmt:login-no-auto-client-setup connection-info)
+ (rmt:send-receive-no-auto-client-setup connection-info 'login 0 (list *toppath* megatest-version *my-client-signature*)))
+
+(define (rmt:send-receive-no-auto-client-setup connection-info cmd run-id params)
+ (let* ((run-id (if run-id run-id 0))
+ (res (handle-exceptions
+ exn
+ #f
+ (http-transport:client-api-send-receive run-id connection-info cmd params))))
+ (if (and res (vector-ref res 0))
+ (vector-ref res 1) ;;; YES!! THIS IS CORRECT!! CHANGE IT HERE, THEN CHANGE rmt:send-receive ALSO!!!
+ #f)))
+
+(define (std-exit-procedure)
+ ;;(common:telemetry-log-close)
+ (on-exit (lambda () 0))
+ ;;(debug:print-info 13 *default-log-port* "std-exit-procedure called; *time-to-exit*="*time-to-exit*)
+ (let ((no-hurry (if (bdat-time-to-exit *bdat*) ;; hurry up
+ #f
+ (begin
+ (bdat-time-to-exit-set! *bdat* #t)
+ #t))))
+ (debug:print-info 4 *default-log-port* "starting exit process, finalizing databases.")
+ (if (and no-hurry (debug:debug-mode 18))
+ (rmt:print-db-stats))
+ (let ((th1 (make-thread (lambda () ;; thread for cleaning up, give it five seconds
+ (if *dbstruct-db* (db:close-all *dbstruct-db*)) ;; one second allocated
+ (if (bdat-task-db *bdat*)
+ (let ((db (cdr (bdat-task-db *bdat*))))
+ (if (sqlite3:database? db)
+ (begin
+ (sqlite3:interrupt! db)
+ (sqlite3:finalize! db #t)
+ ;; (vector-set! (bdat-task-db *bdat*) 0 #f)
+ (bdat-task-db-set! *bdat* #f)))))
+ (http-client#close-idle-connections!)
+ ;; (if (and *runremote*
+ ;; (remote-conndat *runremote*))
+ ;; (begin
+ ;; (http-client#close-all-connections!))) ;; for http-client
+ (if (not (eq? *default-log-port* (current-error-port)))
+ (close-output-port *default-log-port*))
+ (set! *default-log-port* (current-error-port))) "Cleanup db exit thread"))
+ (th2 (make-thread (lambda ()
+ (debug:print 4 *default-log-port* "Attempting clean exit. Please be patient and wait a few seconds...")
+ (if no-hurry
+ (begin
+ (thread-sleep! 5)) ;; give the clean up few seconds to do it's stuff
+ (begin
+ (thread-sleep! 2)))
+ (debug:print 4 *default-log-port* " ... done")
+ )
+ "clean exit")))
+ (thread-start! th1)
+ (thread-start! th2)
+ (thread-join! th1)
+ )
+ )
+
+ 0)
+
+;;======================================================================
+;; Force a megatest cleanup-db if version is changed and skip-version-check not specified
+;; Do NOT check if not on homehost!
+;;
+(define (common:exit-on-version-changed)
+ (if (common:on-homehost?)
+ (if (common:api-changed?)
+ (let* ((mtconf (conc (get-environment-variable "MT_RUN_AREA_HOME") "/megatest.config"))
+ (dbfile (conc (get-environment-variable "MT_RUN_AREA_HOME") "/megatest.db"))
+ (read-only (not (file-writable? dbfile)))
+ (dbstruct (db:setup #t)))
+ (debug:print 0 *default-log-port*
+ "WARNING: Version mismatch!\n"
+ " expected: " (common:version-signature) "\n"
+ " got: " (common:get-last-run-version))
+ (cond
+ ((get-environment-variable "MT_SKIP_DB_MIGRATE") #t)
+ ((and (common:file-exists? mtconf) (common:file-exists? dbfile) (not read-only)
+ (eq? (current-user-id)(file-owner mtconf))) ;; safe to run -cleanup-db
+ (debug:print 0 *default-log-port* " I see you are the owner of megatest.config, attempting to cleanup and reset to new version")
+ (handle-exceptions
+ exn
+ (begin
+ (debug:print 0 *default-log-port* "Failed to switch versions. exn=" exn)
+ (debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn))
+ (print-call-chain (current-error-port))
+ (exit 1))
+ (common:cleanup-db dbstruct)))
+ ((not (common:file-exists? mtconf))
+ (debug:print 0 *default-log-port* " megatest.config does not exist in this area. Cannot proceed with megatest version migration.")
+ (exit 1))
+ ((not (common:file-exists? dbfile))
+ (debug:print 0 *default-log-port* " megatest.db does not exist in this area. Cannot proceed with megatest version migration.")
+ (exit 1))
+ ((not (eq? (current-user-id)(file-owner mtconf)))
+ (debug:print 0 *default-log-port* " You do not own megatest.db in this area. Cannot proceed with megatest version migration.")
+ (exit 1))
+ (read-only
+ (debug:print 0 *default-log-port* " You have read-only access to this area. Cannot proceed with megatest version migration.")
+ (exit 1))
+ (else
+ (debug:print 0 *default-log-port* " to switch versions you can run: \"megatest -cleanup-db\"")
+ (exit 1)))))))
+;;======================================================================
+;; (begin
+;; (debug:print 0 *default-log-port* "ERROR: cannot migrate version unless on homehost. Exiting.")
+;; (exit 1))))
+
+(define (common:run-sync?)
+ (and (common:on-homehost?)
+ (args:get-arg "-server")))
+
+
+
+
+;; this one seems to be the general entry point
+;;
+(define (server:start-and-wait areapath #!key (timeout 60))
+ (let ((give-up-time (+ (current-seconds) timeout)))
+ (let loop ((server-info (server:check-if-running areapath))
+ (try-num 0))
+ (if (or server-info
+ (> (current-seconds) give-up-time)) ;; server-url will be #f if no server available.
+ (server:record->url server-info)
+ (let ((num-ok (length (server:get-best (server:get-list areapath)))))
+ (if (and (> try-num 0) ;; first time through simply wait a little while then try again
+ (< num-ok 1)) ;; if there are no decent candidates for servers then try starting a new one
+ (server:kind-run areapath))
+ (thread-sleep! 5)
+ (loop (server:check-if-running areapath)
+ (+ try-num 1)))))))
+
+(define (make-and-init-remote)
+ (make-remote hh-dat: (common:get-homehost)
+ server-info: (if *toppath* (server:check-if-running *toppath*) #f)
+ server-timeout: (server:expiration-timeout)))
+
+
+
+
+;; called in megatest.scm, host-port is string hostname:port
+;;
+;; NOTE: This is NOT called directly from clients as not all transports support a client running
+;; in the same process as the server.
+;;
+(define (server:ping host-port-in server-id #!key (do-exit #f))
+ (let ((host:port (if (not host-port-in) ;; use read-dotserver to find
+ #f ;; (server:check-if-running *toppath*)
+ ;; (if (number? host-port-in) ;; we were handed a server-id
+ ;; (let ((srec (tasks:get-server-by-id (db:delay-if-busy (tasks:open-db)) host-port-in)))
+ ;; ;; (print "srec: " srec " host-port-in: " host-port-in)
+ ;; (if srec
+ ;; (conc (vector-ref srec 3) ":" (vector-ref srec 4))
+ ;; (conc "no such server-id " host-port-in)))
+ host-port-in))) ;; )
+ (let* ((host-port (if host:port
+ (let ((slst (string-split host:port ":")))
+ (if (eq? (length slst) 2)
+ (list (car slst)(string->number (cadr slst)))
+ #f))
+ #f)))
+;; (toppath (launch:setup)))
+ ;; (print "host-port=" host-port)
+ (if (not host-port)
+ (begin
+ (if host-port-in
+ (debug:print 0 *default-log-port* "ERROR: bad host:port"))
+ (if do-exit (exit 1))
+ #f)
+ (let* ((iface (car host-port))
+ (port (cadr host-port))
+ (server-dat (http-transport:client-connect iface port server-id))
+ (login-res (rmt:login-no-auto-client-setup server-dat)))
+ (if (and (list? login-res)
+ (car login-res))
+ (begin
+ ;; (print "LOGIN_OK")
+ (if do-exit (exit 0))
+ #t)
+ (begin
+ ;; (print "LOGIN_FAILED")
+ (if do-exit (exit 1))
+ #f)))))))
+
+;; ping the given server
+;;
+(define (server:check-server server-record)
+ (let* ((server-url (server:record->url server-record))
+ (server-id (server:record->id server-record))
+ (res (case *transport-type*
+ ((http)(server:ping server-url server-id))
+ ;; ((nmsg)(nmsg-transport:ping (tasks:hostinfo-get-interface server)
+ )))
+ (if res
+ server-url
+ #f)))
+
+;; no longer care if multiple servers are started by accident. older servers will drop off in time.
+;;
+(define (server:check-if-running areapath) ;; #!key (numservers "2"))
+ (let* ((ns (server:get-num-servers))
+ (servers (server:get-best (server:get-list areapath))))
+ (if (or (and servers
+ (null? servers))
+ (not servers)
+ (and (list? servers)
+ (< (length servers) (pseudo-random-integer ns)))) ;; somewhere between 0 and numservers
+ #f
+ (let loop ((hed (car servers))
+ (tal (cdr servers)))
+ (let ((res (server:check-server hed)))
+ (if res
+ hed
+ (if (null? tal)
+ #f
+ (loop (car tal)(cdr tal)))))))))
+
+;; kind start up of servers, wait 40 seconds before allowing another server for a given
+;; run-id to be launched
+;;
+(define (server:kind-run areapath)
+ ;; look for $MT_RUN_AREA_HOME/logs/server-start-last
+ ;; and wait for it to be at least 3 seconds old
+ (server:wait-for-server-start-last-flag areapath)
+ (if (not (server:check-if-running areapath)) ;; why try if there is already a server running?
+ (let* ((last-run-dat (hash-table-ref/default *server-kind-run* areapath '(0 0))) ;; callnum, whenrun
+ (call-num (car last-run-dat))
+ (when-run (cadr last-run-dat))
+ (run-delay (+ (case call-num
+ ((0) 0)
+ ((1) 20)
+ ((2) 300)
+ (else 600))
+ (pseudo-random-integer 5))) ;; add a small random number just in case a lot of jobs hit the work hosts simultaneously
+ (lock-file (conc areapath "/logs/server-start.lock")))
+ (if (> (- (current-seconds) when-run) run-delay)
+ (let* ((start-flag (conc areapath "/logs/server-start-last")))
+ (common:simple-file-lock-and-wait lock-file expire-time: 15)
+ (debug:print-info 0 *default-log-port* "server:kind-run: touching " start-flag)
+ (system (conc "touch " start-flag)) ;; lazy but safe
+ (server:run areapath)
+ (thread-sleep! 2) ;; don't release the lock for at least a few seconds
+ (common:simple-file-release-lock lock-file)))
+ (hash-table-set! *server-kind-run* areapath (list (+ call-num 1)(current-seconds))))))
+
+#;(define (client:connect iface port)
+ (http-transport:client-connect iface port)
+ #;(case (server:get-transport)
+ ((rpc) (rpc:client-connect iface port))
+ ((http) (http:client-connect iface port))
+ ((zmq) (zmq:client-connect iface port))
+ (else (rpc:client-connect iface port))))
+
+(define (client:setup areapath #!key (remaining-tries 100) (failed-connects 0))
+ (client:setup-http areapath remaining-tries: remaining-tries failed-connects: failed-connects)
+ #;(case (server:get-transport)
+ ((rpc) (rpc-transport:client-setup remaining-tries: remaining-tries failed-connects: failed-connects)) ;;(client:setup-rpc run-id))
+ ((http)(client:setup-http areapath remaining-tries: remaining-tries failed-connects: failed-connects))
+ (else (rpc-transport:client-setup remaining-tries: remaining-tries failed-connects: failed-connects)))) ;; (client:setup-rpc run-id))))
+
+;; Do all the connection work, look up the transport type and set up the
+;; connection if required.
+;;
+;; There are two scenarios.
+;; 1. We are a test manager and we received *transport-type* and *runremote* via cmdline
+;; 2. We are a run tests, list runs or other interactive process and we must figure out
+;; *transport-type* and *runremote* from the monitor.db
+;;
+;; client:setup
+;;
+;; lookup_server, need to remove *runremote* stuff
+;;
+
+(define (client:setup-http areapath #!key (remaining-tries 100) (failed-connects 0)(area-dat #f))
+ (debug:print-info 2 *default-log-port* "client:setup remaining-tries=" remaining-tries)
+ (server:start-and-wait areapath)
+ (if (<= remaining-tries 0)
+ (begin
+ (debug:print-error 0 *default-log-port* "failed to start or connect to server")
+ (exit 1))
+ ;;
+ ;; Alternatively here, we can get the list of candidate servers and work our way
+ ;; through them searching for a good one.
+ ;;
+ (let* ((server-dat (server:get-rand-best areapath)) ;; (server:get-first-best areapath))
+ (runremote (or area-dat *runremote*)))
+ (if (not server-dat) ;; no server found
+ (client:setup-http areapath remaining-tries: (- remaining-tries 1))
+ (let ((host (cadr server-dat))
+ (port (caddr server-dat))
+ (server-id (caddr (cddr server-dat))))
+ (debug:print-info 4 *default-log-port* "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries)
+ (if (and (not area-dat)
+ (not *runremote*))
+ (begin
+ (set! *runremote* (make-and-init-remote))
+ (let* ((server-info (remote-server-info *runremote*)))
+ (if server-info
+ (begin
+ (remote-server-url-set! *runremote* (server:record->url server-info))
+ (remote-server-id-set! *runremote* (server:record->id server-info)))))))
+ (if (and host port server-id)
+ (let* ((start-res (case *transport-type*
+ ((http)(http-transport:client-connect host port server-id))))
+ (ping-res (case *transport-type*
+ ((http)(rmt:login-no-auto-client-setup start-res)))))
+ (if (and start-res
+ ping-res)
+ (let ((runremote (or area-dat *runremote*))) ;; it might have been generated only a few statements ago
+ (remote-conndat-set! runremote start-res) ;; (hash-table-set! runremote run-id start-res)
+ (debug:print-info 2 *default-log-port* "connected to " (http-transport:server-dat-make-url start-res))
+ start-res)
+ (begin ;; login failed but have a server record, clean out the record and try again
+ (debug:print-info 0 *default-log-port* "client:setup, login unsuccessful, will attempt to start server ... start-res=" start-res ", server-dat=" server-dat) ;; had runid. Fixes part of Randy;s ticket 1405717332
+ (case *transport-type*
+ ((http)(http-transport:close-connections)))
+ (remote-conndat-set! runremote #f) ;; (hash-table-delete! runremote run-id)
+ (thread-sleep! 1)
+ (client:setup-http areapath remaining-tries: (- remaining-tries 1))
+ )))
+ (begin ;; no server registered
+ ;; (server:kind-run areapath)
+ (server:start-and-wait areapath)
+ (debug:print-info 0 *default-log-port* "client:setup, no server registered, remaining-tries=" remaining-tries)
+ (thread-sleep! 1) ;; (+ 5 (pseudo-random-integer (- 20 remaining-tries)))) ;; give server a little time to start up, randomize a little to avoid start storms.
+ (client:setup-http areapath remaining-tries: (- remaining-tries 1)))))))))
)
DELETED runs-launch-loop-test.scm
Index: runs-launch-loop-test.scm
==================================================================
--- runs-launch-loop-test.scm
+++ /dev/null
@@ -1,76 +0,0 @@
-;; Copyright 2006-2017, Matthew Welland.
-;;
-;; This file is part of Megatest.
-;;
-;; Megatest is free software: you can redistribute it and/or modify
-;; it under the terms of the GNU General Public License as published by
-;; the Free Software Foundation, either version 3 of the License, or
-;; (at your option) any later version.
-;;
-;; Megatest is distributed in the hope that it will be useful,
-;; but WITHOUT ANY WARRANTY; without even the implied warranty of
-;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-;; GNU General Public License for more details.
-;;
-;; You should have received a copy of the GNU General Public License
-;; along with Megatest. If not, see .
-;;
-(use srfi-69)
-
-(define (runs:queue-next-hed tal reg n regful)
- (if regful
- (car reg)
- (car tal)))
-
-(define (runs:queue-next-tal tal reg n regful)
- (if regful
- tal
- (let ((newtal (cdr tal)))
- (if (null? newtal)
- reg
- newtal
- ))))
-
-(define (runs:queue-next-reg tal reg n regful)
- (if regful
- (cdr reg)
- (if (eq? (length tal) 1)
- '()
- reg)))
-
-(use trace)
-(trace runs:queue-next-hed
- runs:queue-next-tal
- runs:queue-next-reg)
-
-
-(define tests '(1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20))
-
-(define test-registry (make-hash-table))
-
-(define n 3)
-
-(let loop ((hed (car tests))
- (tal (cdr tests))
- (reg '()))
- (let* ((reglen (length reg))
- (regful (> reglen n)))
- (print "hed=" hed ", length reg=" (length reg) ", (> lenreg n)=" (> (length reg) n))
- (let ((newtal (append tal (list hed)))) ;; used if we are not done with this test
- (cond
- ((not (hash-table-ref/default test-registry hed #f))
- (hash-table-set! test-registry hed #t)
- (print "Registering #" hed)
- (if (not (null? tal))
- (loop (runs:queue-next-hed tal reg n regful)
- (runs:queue-next-tal tal reg n regful)
- (let ((newl (append reg (list hed))))
- (if regful
- (cdr newl)
- newl)))))
- (else
- (print "Running #" hed)
- (if (not (null? tal))
- (loop (runs:queue-next-hed tal reg n regful)
- (runs:queue-next-tal tal reg n regful)
- (runs:queue-next-reg tal reg n regful))))))))
Index: server.scm
==================================================================
--- server.scm
+++ server.scm
@@ -47,363 +47,5 @@
;; P K T S S T U F F
;;======================================================================
;; ???
-;;======================================================================
-;; S E R V E R
-;;======================================================================
-
-;; Call this to start the actual server
-;;
-
-;; all routes though here end in exit ...
-;;
-;; start_server
-;;
-(define (server:launch run-id transport-type)
- (case transport-type
- ((http)(http-transport:launch))
- ;;((nmsg)(nmsg-transport:launch run-id))
- ;;((rpc) (rpc-transport:launch run-id))
- (else (debug:print-error 0 *default-log-port* "unknown server type " transport-type))))
-
-;;======================================================================
-;; S E R V E R U T I L I T I E S
-;;======================================================================
-
-;; When using zmq this would send the message back (two step process)
-;; with spiffy or rpc this simply returns the return data to be returned
-;;
-(define (server:reply return-addr query-sig success/fail result)
- (debug:print-info 11 *default-log-port* "server:reply return-addr=" return-addr ", result=" result)
- ;; (send-message pubsock target send-more: #t)
- ;; (send-message pubsock
- (case (server:get-transport)
- ((rpc) (db:obj->string (vector success/fail query-sig result)))
- ((http) (db:obj->string (vector success/fail query-sig result)))
- ((fs) result)
- (else
- (debug:print-error 0 *default-log-port* "unrecognised transport type: " *transport-type*)
- result)))
-
-#;(define (server:record->url servr)
- (handle-exceptions
- exn
- (begin
- (debug:print-info 0 *default-log-port* "Unable to get server url from " servr ", exn=" exn)
- #f)
- (match-let (((mod-time host port start-time server-id pid)
- servr))
- (if (and host port)
- (conc host ":" port)
- #f))))
-
-(define (server:get-client-signature) ;; BB> why is this proc named "get-"? it returns nothing -- set! has not return value.
- (if *my-client-signature* *my-client-signature*
- (let ((sig (server:mk-signature)))
- (set! *my-client-signature* sig)
- *my-client-signature*)))
-
-(define server:try-running server:run) ;; there is no more per-run servers ;; REMOVE ME. BUG.
-
-(define (server:kill servr)
- (handle-exceptions
- exn
- (begin
- (debug:print-info 0 *default-log-port* "Unable to get host and/or port from " servr ", exn=" exn)
- #f)
- (match-let (((mod-time hostname port start-time server-id pid)
- servr))
- (tasks:kill-server hostname pid))))
-
-;; run ping in separate process, safest way in some cases
-;;
-(define (server:ping-server ifaceport)
- (with-input-from-pipe
- (conc (common:get-megatest-exe) " -ping " ifaceport)
- (lambda ()
- (let loop ((inl (read-line))
- (res "NOREPLY"))
- (if (eof-object? inl)
- (case (string->symbol res)
- ((NOREPLY) #f)
- ((LOGIN_OK) #t)
- (else #f))
- (loop (read-line) inl))))))
-
-;; NOT USED (well, ok, reference in rpc-transport but otherwise not used).
-;;
-(define (server:login toppath)
- (lambda (toppath)
- (set! *db-last-access* (current-seconds)) ;; might not be needed.
- (if (equal? *toppath* toppath)
- #t
- #f)))
-
-;; (define server:sync-lock-token "SERVER_SYNC_LOCK")
-;; (define (server:release-sync-lock)
-;; (db:no-sync-del! *no-sync-db* server:sync-lock-token))
-;; (define (server:have-sync-lock?)
-;; (let* ((have-lock-pair (db:no-sync-get-lock *no-sync-db* server:sync-lock-token))
-;; (have-lock? (car have-lock-pair))
-;; (lock-time (cdr have-lock-pair))
-;; (lock-age (- (current-seconds) lock-time)))
-;; (cond
-;; (have-lock? #t)
-;; ((>lock-age
-;; (* 3 (configf:lookup-number *configdat* "server" "minimum-intersync-delay" default: 180)))
-;; (server:release-sync-lock)
-;; (server:have-sync-lock?))
-;; (else #f))))
-
-;; moving this here as it needs access to db and cannot be in common.
-;;
-
-(define (server:get-bruteforce-syncer dbstruct #!key (fork-to-background #f) (persist-until-sync #f))
- (let* ((sqlite-exe (or (get-environment-variable "MT_SQLITE3_EXE"))) ;; defined in cfg.sh
- (sync-log (or (args:get-arg "-sync-log") (conc *toppath* "/logs/sync-" (current-process-id) "-" (get-host-name) ".log")))
- (tmp-area (common:get-db-tmp-area))
- (tmp-db (conc tmp-area "/megatest.db"))
- (staging-file (conc *toppath* "/.megatest.db"))
- (mtdbfile (conc *toppath* "/megatest.db"))
- (lockfile (common:get-sync-lock-filepath))
- (sync-cmd-core (conc sqlite-exe" " tmp-db " .dump | "sqlite-exe" " staging-file "&>"sync-log))
- (sync-cmd (if fork-to-background
- (conc "/usr/bin/env NBFAKE_LOG="*toppath*"/logs/last-server-sync-"(current-process-id)".log nbfake \""sync-cmd-core" && /bin/mv -f " staging-file " " mtdbfile" \"")
- sync-cmd-core))
- (default-min-intersync-delay 2)
- (min-intersync-delay (configf:lookup-number *configdat* "server" "minimum-intersync-delay" default: default-min-intersync-delay))
- (default-duty-cycle 0.1)
- (duty-cycle (configf:lookup-number *configdat* "server" "sync-duty-cycle" default: default-duty-cycle))
- (last-sync-seconds 10) ;; we will adjust this to a measurement and delay last-sync-seconds * (1 - duty-cycle)
- (calculate-off-time (lambda (work-duration duty-cycle)
- (* (/ (- 1 duty-cycle) duty-cycle) last-sync-seconds)))
- (off-time min-intersync-delay) ;; adjusted in closure below.
- (do-a-sync
- (lambda ()
- ;; (BB> "Start do-a-sync with fork-to-background="fork-to-background" persist-until-sync="persist-until-sync)
- (let* ((finalres
- (let retry-loop ((num-tries 0))
- (if (common:simple-file-lock lockfile)
- (begin
- (cond
- ((not (or fork-to-background persist-until-sync))
- (debug:print 0 *default-log-port* "INFO: syncer thread sleeping for max of (server.minimum-intersync-delay="min-intersync-delay
- " , off-time="off-time" seconds ]")
- (thread-sleep! (max off-time min-intersync-delay)))
- (else
- (debug:print 0 *default-log-port* "INFO: syncer thread NOT sleeping ; maybe time-to-exit...")))
-
- (if (not (configf:lookup *configdat* "server" "disable-db-snapshot"))
- (common:snapshot-file mtdbfile subdir: ".db-snapshot"))
- (delete-file* staging-file)
- (let* ((start-time (current-milliseconds))
- (res (system sync-cmd))
- (dbbackupfile (conc mtdbfile ".backup"))
- (res2
- (cond
- ((eq? 0 res )
- (handle-exceptions
- exn
- #f
- (if (file-exists? dbbackupfile)
- (delete-file* dbbackupfile)
- )
- (if (eq? 0 (file-size sync-log))
- (delete-file* sync-log))
- (system (conc "/bin/mv " staging-file " " mtdbfile))
-
- (set! last-sync-seconds (/ (- (current-milliseconds) start-time) 1000))
- (set! off-time (calculate-off-time
- last-sync-seconds
- (cond
- ((and (number? duty-cycle) (> duty-cycle 0) (< duty-cycle 1))
- duty-cycle)
- (else
- (debug:print 0 *default-log-port* "WARNING: ["(common:human-time)"] server.sync-duty-cycle is invalid. Should be a number between 0 and 1, but "duty-cycle" was specified. Using default value: "default-duty-cycle)
- default-duty-cycle))))
-
- (debug:print 1 *default-log-port* "INFO: ["(common:human-time)"] pid="(current-process-id)" SYNC took "last-sync-seconds" sec")
- (debug:print 1 *default-log-port* "INFO: ["(common:human-time)"] pid="(current-process-id)" SYNC took "last-sync-seconds" sec ; with duty-cycle of "duty-cycle" off time is now "off-time)
- 'sync-completed))
- (else
- (system (conc "/bin/cp "sync-log" "sync-log".fail"))
- (debug:print 0 *default-log-port* "ERROR: ["(common:human-time)"] Sync failed. See log at "sync-log".fail")
- (if (file-exists? (conc mtdbfile ".backup"))
- (system (conc "/bin/cp "mtdbfile ".backup " mtdbfile)))
- #f))))
- (common:simple-file-release-lock lockfile)
- ;; (BB> "released lockfile: " lockfile)
- ;; (when (common:file-exists? lockfile)
- ;; (BB> "DID NOT ACTUALLY RELEASE LOCKFILE"))
- res2) ;; end let
- );; end begin
- ;; else
- (cond
- (persist-until-sync
- (thread-sleep! 1)
- (debug:print 1 *default-log-port* "INFO: ["(common:human-time)"] pid="(current-process-id)" other SYNC in progress; we're in a fork-to-background so we need to succeed. Let's wait a jiffy and and try again. num-tries="num-tries" (waiting for lockfile="lockfile" to disappear)")
- (retry-loop (add1 num-tries)))
- (else
- (thread-sleep! (max off-time (+ last-sync-seconds min-intersync-delay)))
- (debug:print 1 *default-log-port* "INFO: ["(common:human-time)"] pid="(current-process-id)" other SYNC in progress; not syncing.")
- 'parallel-sync-in-progress))
- ) ;; end if got lockfile
- )
- ))
- ;; (BB> "End do-a-sync with fork-to-background="fork-to-background" persist-until-sync="persist-until-sync" and result="finalres)
- finalres)
- ) ;; end lambda
- ))
- do-a-sync))
-
-(define (server:writable-watchdog-bruteforce dbstruct)
- (thread-sleep! 1) ;; delay for startup
- (let* ((do-a-sync (server:get-bruteforce-syncer dbstruct))
- (final-sync (server:get-bruteforce-syncer dbstruct fork-to-background: #t persist-until-sync: #t)))
- (when (and (not (args:get-arg "-sync-to-megatest.db")) ;; conditions under which we do not run the sync
- (args:get-arg "-server"))
-
- (let loop ()
- (do-a-sync)
- (if (not (bdat-time-to-exit *bdat*)) (loop))) ;; keep going unless time to exit
-
- ;; time to exit, close the no-sync db here
- (final-sync)
-
- (if (common:low-noise-print 30)
- (debug:print-info 0 *default-log-port* "Exiting watchdog timer, *time-to-exit* = "(bdat-time-to-exit *bdat*)" pid="(current-process-id)
- )))))
-
-(define (server:writable-watchdog-deltasync dbstruct)
- (thread-sleep! 0.054) ;; delay for startup
- (let ((legacy-sync (common:run-sync?))
- (sync-stale-seconds (configf:lookup-number *configdat* "server" "sync-stale-seconds" default: 300))
- (debug-mode (debug:debug-mode 1))
- (last-time (current-seconds))
- (no-sync-db (db:open-no-sync-db))
- (stmt-cache (dbr:dbstruct-stmt-cache dbstruct))
- (sync-duration 0) ;; run time of the sync in milliseconds
- )
- (set! *no-sync-db* no-sync-db) ;; make the no sync db available to api calls
- (debug:print-info 2 *default-log-port* "Periodic sync thread started.")
- (debug:print-info 3 *default-log-port* "watchdog starting. legacy-sync is " legacy-sync" pid="(current-process-id) );; " this-wd-num="this-wd-num)
- (if (and legacy-sync (not (bdat-time-to-exit *bdat*)))
- (let* (;;(dbstruct (db:setup))
- (mtdb (dbr:dbstruct-mtdb dbstruct))
- (mtpath (db:dbdat-get-path mtdb))
- (tmp-area (common:get-db-tmp-area))
- (start-file (conc tmp-area "/.start-sync"))
- (end-file (conc tmp-area "/.end-sync")))
- (debug:print-info 0 *default-log-port* "Server running, periodic sync started.")
- (let loop ()
- ;; sync for filesystem local db writes
- ;;
- (mutex-lock! *db-multi-sync-mutex*)
- (let* ((need-sync (>= *db-last-access* *db-last-sync*)) ;; no sync since last write
- (sync-in-progress *db-sync-in-progress*)
- (min-intersync-delay (configf:lookup-number *configdat* "server" "minimum-intersync-delay" default: 5))
- (should-sync (and (not (bdat-time-to-exit *bdat*))
- (> (- (current-seconds) *db-last-sync*) min-intersync-delay))) ;; sync every five seconds minimum, deprecated logic, can probably be removed
- (start-time (current-seconds))
- (cpu-load-adj (alist-ref 'adj-proc-load (common:get-normalized-cpu-load #f)))
- (mt-mod-time (file-modification-time mtpath))
- (last-sync-start (if (common:file-exists? start-file)
- (file-modification-time start-file)
- 0))
- (last-sync-end (if (common:file-exists? end-file)
- (file-modification-time end-file)
- 10))
- (sync-period (+ 3 (* cpu-load-adj 30))) ;; as adjusted load increases increase the sync period
- (recently-synced (and (< (- start-time mt-mod-time) sync-period) ;; not useful if sync didn't modify megatest.db!
- (< mt-mod-time last-sync-start)))
- (sync-done (<= last-sync-start last-sync-end))
- (sync-stale (> start-time (+ last-sync-start sync-stale-seconds)))
- (will-sync (and (not (bdat-time-to-exit *bdat*)) ;; do not start a sync if we are in the process of exiting
- (or need-sync should-sync)
- (or sync-done sync-stale)
- (not sync-in-progress)
- (not recently-synced))))
- (debug:print-info 13 *default-log-port* "WD writable-watchdog top of loop. need-sync="need-sync" sync-in-progress=" sync-in-progress
- " should-sync="should-sync" start-time="start-time" mt-mod-time="mt-mod-time" recently-synced="recently-synced" will-sync="will-sync
- " sync-done=" sync-done " sync-period=" sync-period)
- (if (and (> sync-period 5)
- (common:low-noise-print 30 "sync-period"))
- (debug:print-info 0 *default-log-port* "Increased sync period due to long sync times, sync took: " sync-period " seconds."))
- ;; (if recently-synced (debug:print-info 0 *default-log-port* "Skipping sync due to recently-synced flag=" recently-synced))
- ;; (debug:print-info 0 *default-log-port* "need-sync: " need-sync " sync-in-progress: " sync-in-progress " should-sync: " should-sync " will-sync: " will-sync)
- (if will-sync (set! *db-sync-in-progress* #t))
- (mutex-unlock! *db-multi-sync-mutex*)
- (if will-sync
- (let (;; (max-sync-duration (configf:lookup-number *configdat* "server" "max-sync-duration")) ;; KEEPING THIS AVAILABLE BUT SHOULD NOT USE, I'M PRETTY SURE IT DOES NOT WORK!
- (sync-start (current-milliseconds)))
- (with-output-to-file start-file (lambda ()(print (current-process-id))))
-
- ;; put lock here
-
- ;; (if (or (not max-sync-duration)
- ;; (< sync-duration max-sync-duration)) ;; NOTE: db:sync-to-megatest.db keeps track of time of last sync and syncs incrementally
- (let ((res (db:sync-to-megatest.db dbstruct no-sync-db: no-sync-db))) ;; did we sync any data? If so need to set the db touched flag to keep the server alive
- (set! sync-duration (- (current-milliseconds) sync-start))
- (if (> res 0) ;; some records were transferred, keep the db alive
- (begin
- (mutex-lock! *heartbeat-mutex*)
- (set! *db-last-access* (current-seconds))
- (mutex-unlock! *heartbeat-mutex*)
- (debug:print-info 0 *default-log-port* "sync called, " res " records transferred."))
- (debug:print-info 2 *default-log-port* "sync called but zero records transferred")))))
-;; ;; TODO: factor this next routine out into a function
-;; (with-input-from-pipe ;; this should not block other threads but need to verify this
-;; (conc "megatest -sync-to-megatest.db -m testsuite:" (common:get-area-name) ":" *toppath*)
-;; (lambda ()
-;; (let loop ((inl (read-line))
-;; (res #f))
-;; (if (eof-object? inl)
-;; (begin
-;; (set! sync-duration (- (current-milliseconds) sync-start))
-;; (cond
-;; ((not res)
-;; (debug:print 0 *default-log-port* "ERROR: sync from /tmp db to megatest.db appears to have failed. Recommended that you stop your runs and run \"megatest -cleanup-db\""))
-;; ((> res 0)
-;; (mutex-lock! *heartbeat-mutex*)
-;; (set! *db-last-access* (current-seconds))
-;; (mutex-unlock! *heartbeat-mutex*))))
-;; (let ((num-synced (let ((matches (string-match "^Synced (\\d+).*$" inl)))
-;; (if matches
-;; (string->number (cadr matches))
-;; #f))))
-;; (loop (read-line)
-;; (or num-synced res))))))))))
- (if will-sync
- (begin
- (mutex-lock! *db-multi-sync-mutex*)
- (set! *db-sync-in-progress* #f)
- (set! *db-last-sync* start-time)
- (with-output-to-file end-file (lambda ()(print (current-process-id))))
-
- ;; release lock here
-
- (mutex-unlock! *db-multi-sync-mutex*)))
- (if (and debug-mode
- (> (- start-time last-time) 60))
- (begin
- (set! last-time start-time)
- (debug:print-info 4 *default-log-port* "timestamp -> " (seconds->time-string (current-seconds)) ", time since start -> " (seconds->hr-min-sec (- (current-seconds) *time-zero*))))))
-
- ;; keep going unless time to exit
- ;;
- (if (not (bdat-time-to-exit *bdat*))
- (let delay-loop ((count 0))
- ;;(debug:print-info 13 *default-log-port* "delay-loop top; count="count" pid="(current-process-id)" this-wd-num="this-wd-num" *time-to-exit*="(bdat-time-to-exit *bdat*))
-
- (if (and (not (bdat-time-to-exit *bdat*))
- (< count 6)) ;; was 11, changing to 4.
- (begin
- (thread-sleep! 1)
- (delay-loop (+ count 1))))
- (if (not (bdat-time-to-exit *bdat*)) (loop))))
- ;; time to exit, close the no-sync db here
- (db:no-sync-close-db no-sync-db stmt-cache)
- (if (common:low-noise-print 30)
- (debug:print-info 0 *default-log-port* "Exiting watchdog timer, *time-to-exit* = " (bdat-time-to-exit *bdat*)" pid="(current-process-id) ))))))) ;;" this-wd-num="this-wd-num)))))))
-
Index: servermod.scm
==================================================================
--- servermod.scm
+++ servermod.scm
@@ -60,15 +60,10 @@
(define (server:make-server-url hostport)
(if (not hostport)
#f
(conc "http://" (car hostport) ":" (cadr hostport))))
-(define (make-and-init-remote)
- (make-remote hh-dat: (common:get-homehost)
- server-info: (if *toppath* (server:check-if-running *toppath*) #f)
- server-timeout: (server:expiration-timeout)))
-
;;======================================================================
;; logic for getting homehost. Returns (host . at-home)
;; IF *toppath* is not set, wait up to five seconds trying every two seconds
;; (this is to accomodate the watchdog)
;;
@@ -142,37 +137,10 @@
(common:get-homehost)))
(hh (if hh-dat (car hh-dat) #f)))
(common:wait-for-normalized-load maxnormload msg hh)))
-;; kind start up of servers, wait 40 seconds before allowing another server for a given
-;; run-id to be launched
-;;
-(define (server:kind-run areapath)
- ;; look for $MT_RUN_AREA_HOME/logs/server-start-last
- ;; and wait for it to be at least 3 seconds old
- (server:wait-for-server-start-last-flag areapath)
- (if (not (server:check-if-running areapath)) ;; why try if there is already a server running?
- (let* ((last-run-dat (hash-table-ref/default *server-kind-run* areapath '(0 0))) ;; callnum, whenrun
- (call-num (car last-run-dat))
- (when-run (cadr last-run-dat))
- (run-delay (+ (case call-num
- ((0) 0)
- ((1) 20)
- ((2) 300)
- (else 600))
- (pseudo-random-integer 5))) ;; add a small random number just in case a lot of jobs hit the work hosts simultaneously
- (lock-file (conc areapath "/logs/server-start.lock")))
- (if (> (- (current-seconds) when-run) run-delay)
- (let* ((start-flag (conc areapath "/logs/server-start-last")))
- (common:simple-file-lock-and-wait lock-file expire-time: 15)
- (debug:print-info 0 *default-log-port* "server:kind-run: touching " start-flag)
- (system (conc "touch " start-flag)) ;; lazy but safe
- (server:run areapath)
- (thread-sleep! 2) ;; don't release the lock for at least a few seconds
- (common:simple-file-release-lock lock-file)))
- (hash-table-set! *server-kind-run* areapath (list (+ call-num 1)(current-seconds))))))
;; Given a run id start a server process ### NOTE ### > file 2>&1
;; if the run-id is zero and the target-host is set
;; try running on that host
;; incidental: rotate logs in logs/ dir.
;;
@@ -221,44 +189,10 @@
(unsetenv "TARGETHOST_LOGF")
(if (get-environment-variable "TARGETHOST")(unsetenv "TARGETHOST"))
(thread-join! log-rotate)
(pop-directory)))
-;; no longer care if multiple servers are started by accident. older servers will drop off in time.
-;;
-(define (server:check-if-running areapath) ;; #!key (numservers "2"))
- (let* ((ns (server:get-num-servers))
- (servers (server:get-best (server:get-list areapath))))
- (if (or (and servers
- (null? servers))
- (not servers)
- (and (list? servers)
- (< (length servers) (pseudo-random-integer ns)))) ;; somewhere between 0 and numservers
- #f
- (let loop ((hed (car servers))
- (tal (cdr servers)))
- (let ((res (server:check-server hed)))
- (if res
- hed
- (if (null? tal)
- #f
- (loop (car tal)(cdr tal)))))))))
-
-
-;; ping the given server
-;;
-(define (server:check-server server-record)
- (let* ((server-url (server:record->url server-record))
- (server-id (server:record->id server-record))
- (res (case *transport-type*
- ((http)(server:ping server-url server-id))
- ;; ((nmsg)(nmsg-transport:ping (tasks:hostinfo-get-interface server)
- )))
- (if res
- server-url
- #f)))
-
(define (server:record->url servr)
(handle-exceptions
exn
(begin
(debug:print-info 0 *default-log-port* "Unable to get server url from " servr ", exn=" exn)
@@ -296,25 +230,17 @@
(debug:print-info 0 *default-log-port* "Gating server start, last start: "
fmodtime ", delta: " delta ", reftime: " reftime ", all-go=" all-go)
(thread-sleep! reftime)
(server:wait-for-server-start-last-flag areapath)))))))
-
-;; this one seems to be the general entry point
-;;
-(define (server:start-and-wait areapath #!key (timeout 60))
- (let ((give-up-time (+ (current-seconds) timeout)))
- (let loop ((server-info (server:check-if-running areapath))
- (try-num 0))
- (if (or server-info
- (> (current-seconds) give-up-time)) ;; server-url will be #f if no server available.
- (server:record->url server-info)
- (let ((num-ok (length (server:get-best (server:get-list areapath)))))
- (if (and (> try-num 0) ;; first time through simply wait a little while then try again
- (< num-ok 1)) ;; if there are no decent candidates for servers then try starting a new one
- (server:kind-run areapath))
- (thread-sleep! 5)
- (loop (server:check-if-running areapath)
- (+ try-num 1)))))))
+(define (server:kill servr)
+ (handle-exceptions
+ exn
+ (begin
+ (debug:print-info 0 *default-log-port* "Unable to get host and/or port from " servr ", exn=" exn)
+ #f)
+ (match-let (((mod-time hostname port start-time server-id pid)
+ servr))
+ (tasks:kill-server hostname pid))))
)