;; Copyright 2006-2017, Matthew Welland.
;;
;; This file is part of Megatest.
;;
;; Megatest is free software: you can redistribute it and/or modify
;; it under the terms of the GNU General Public License as published by
;; the Free Software Foundation, either version 3 of the License, or
;; (at your option) any later version.
;;
;; Megatest is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
;; GNU General Public License for more details.
;;
;; You should have received a copy of the GNU General Public License
;; along with Megatest. If not, see <http://www.gnu.org/licenses/>.
;;
;; (require-extension (srfi 18) extras tcp s11n)
;;
;; (use srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest
;; directory-utils posix-extras matchable)
;;
;; (use spiffy uri-common intarweb http-client spiffy-request-vars)
;;
;; (declare (unit server))
;;
;; (declare (uses common))
;; (declare (uses db))
;; (declare (uses tasks)) ;; tasks are where stuff is maintained about what is running.
;; ;; (declare (uses synchash))
;; (declare (uses http-transport))
;; ;;(declare (uses rpc-transport))
;; (declare (uses launch))
;; ;; (declare (uses daemon))
;;
;; (include "common_records.scm")
;; (include "db_records.scm")
;;======================================================================
;; P K T S S T U F F
;;======================================================================
;; ???
;;======================================================================
;; P K T S S T U F F
;;======================================================================
;; ???
;;======================================================================
;; S E R V E R
;;======================================================================
;; Call this to start the actual server
;;
;; all routes though here end in exit ...
;;
;; start_server
;;
(define (server:launch run-id transport-type)
(case transport-type
((http)(http-transport:launch))
;;((nmsg)(nmsg-transport:launch run-id))
;;((rpc) (rpc-transport:launch run-id))
(else (debug:print-error 0 *default-log-port* "unknown server type " transport-type))))
;;======================================================================
;; S E R V E R U T I L I T I E S
;;======================================================================
;; When using zmq this would send the message back (two step process)
;; with spiffy or rpc this simply returns the return data to be returned
;;
(define (server:reply return-addr query-sig success/fail result)
(debug:print-info 11 *default-log-port* "server:reply return-addr=" return-addr ", result=" result)
;; (send-message pubsock target send-more: #t)
;; (send-message pubsock
(case (server:get-transport)
((rpc) (db:obj->string (vector success/fail query-sig result)))
((http) (db:obj->string (vector success/fail query-sig result)))
((fs) result)
(else
(debug:print-error 0 *default-log-port* "unrecognised transport type: " *transport-type*)
result)))
(define (server:record->url servr)
(handle-exceptions
exn
(begin
(debug:print-info 0 *default-log-port* "Unable to get server url from " servr ", exn=" exn)
#f)
(match-let (((mod-time host port start-time server-id pid)
servr))
(if (and host port)
(conc host ":" port)
#f))))
(define (server:get-client-signature) ;; BB> why is this proc named "get-"? it returns nothing -- set! has not return value.
(if *my-client-signature* *my-client-signature*
(let ((sig (server:mk-signature)))
(set! *my-client-signature* sig)
*my-client-signature*)))
;; this one seems to be the general entry point
;;
(define (server:start-and-wait areapath #!key (timeout 60))
(let ((give-up-time (+ (current-seconds) timeout)))
(let loop ((server-info (server:check-if-running areapath))
(try-num 0))
(if (or server-info
(> (current-seconds) give-up-time)) ;; server-url will be #f if no server available.
(server:record->url server-info)
(let ((num-ok (length (server:get-best (server:get-list areapath)))))
(if (and (> try-num 0) ;; first time through simply wait a little while then try again
(< num-ok 1)) ;; if there are no decent candidates for servers then try starting a new one
(server:kind-run areapath))
(thread-sleep! 5)
(loop (server:check-if-running areapath)
(+ try-num 1)))))))
(define server:try-running server:run) ;; there is no more per-run servers ;; REMOVE ME. BUG.
(define (server:kill servr)
(handle-exceptions
exn
(begin
(debug:print-info 0 *default-log-port* "Unable to get host and/or port from " servr ", exn=" exn)
#f)
(match-let (((mod-time hostname port start-time server-id pid)
servr))
(tasks:kill-server hostname pid))))
;; run ping in separate process, safest way in some cases
;;
(define (server:ping-server ifaceport)
(with-input-from-pipe
(conc (common:get-megatest-exe) " -ping " ifaceport)
(lambda ()
(let loop ((inl (read-line))
(res "NOREPLY"))
(if (eof-object? inl)
(case (string->symbol res)
((NOREPLY) #f)
((LOGIN_OK) #t)
(else #f))
(loop (read-line) inl))))))
;; NOT USED (well, ok, reference in rpc-transport but otherwise not used).
;;
(define (server:login toppath)
(lambda (toppath)
(set! *db-last-access* (current-seconds)) ;; might not be needed.
(if (equal? *toppath* toppath)
#t
#f)))
;; (define server:sync-lock-token "SERVER_SYNC_LOCK")
;; (define (server:release-sync-lock)
;; (db:no-sync-del! *no-sync-db* server:sync-lock-token))
;; (define (server:have-sync-lock?)
;; (let* ((have-lock-pair (db:no-sync-get-lock *no-sync-db* server:sync-lock-token))
;; (have-lock? (car have-lock-pair))
;; (lock-time (cdr have-lock-pair))
;; (lock-age (- (current-seconds) lock-time)))
;; (cond
;; (have-lock? #t)
;; ((>lock-age
;; (* 3 (configf:lookup-number *configdat* "server" "minimum-intersync-delay" default: 180)))
;; (server:release-sync-lock)
;; (server:have-sync-lock?))
;; (else #f))))
;; moving this here as it needs access to db and cannot be in common.
;;
(define (server:get-bruteforce-syncer dbstruct #!key (fork-to-background #f) (persist-until-sync #f))
(let* ((sqlite-exe (or (get-environment-variable "MT_SQLITE3_EXE"))) ;; defined in cfg.sh
(sync-log (or (args:get-arg "-sync-log") (conc *toppath* "/logs/sync-" (current-process-id) "-" (get-host-name) ".log")))
(tmp-area (common:get-db-tmp-area))
(tmp-db (conc tmp-area "/megatest.db"))
(staging-file (conc *toppath* "/.megatest.db"))
(mtdbfile (conc *toppath* "/megatest.db"))
(lockfile (common:get-sync-lock-filepath))
(sync-cmd-core (conc sqlite-exe" " tmp-db " .dump | "sqlite-exe" " staging-file "&>"sync-log))
(sync-cmd (if fork-to-background
(conc "/usr/bin/env NBFAKE_LOG="*toppath*"/logs/last-server-sync-"(current-process-id)".log nbfake \""sync-cmd-core" && /bin/mv -f " staging-file " " mtdbfile" \"")
sync-cmd-core))
(default-min-intersync-delay 2)
(min-intersync-delay (configf:lookup-number *configdat* "server" "minimum-intersync-delay" default: default-min-intersync-delay))
(default-duty-cycle 0.1)
(duty-cycle (configf:lookup-number *configdat* "server" "sync-duty-cycle" default: default-duty-cycle))
(last-sync-seconds 10) ;; we will adjust this to a measurement and delay last-sync-seconds * (1 - duty-cycle)
(calculate-off-time (lambda (work-duration duty-cycle)
(* (/ (- 1 duty-cycle) duty-cycle) last-sync-seconds)))
(off-time min-intersync-delay) ;; adjusted in closure below.
(do-a-sync
(lambda ()
;; (BB> "Start do-a-sync with fork-to-background="fork-to-background" persist-until-sync="persist-until-sync)
(let* ((finalres
(let retry-loop ((num-tries 0))
(if (common:simple-file-lock lockfile)
(begin
(cond
((not (or fork-to-background persist-until-sync))
(debug:print 0 *default-log-port* "INFO: syncer thread sleeping for max of (server.minimum-intersync-delay="min-intersync-delay
" , off-time="off-time" seconds ]")
(thread-sleep! (max off-time min-intersync-delay)))
(else
(debug:print 0 *default-log-port* "INFO: syncer thread NOT sleeping ; maybe time-to-exit...")))
(if (not (configf:lookup *configdat* "server" "disable-db-snapshot"))
(common:snapshot-file mtdbfile subdir: ".db-snapshot"))
(delete-file* staging-file)
(let* ((start-time (current-milliseconds))
(res (system sync-cmd))
(dbbackupfile (conc mtdbfile ".backup"))
(res2
(cond
((eq? 0 res )
(handle-exceptions
exn
#f
(if (file-exists? dbbackupfile)
(delete-file* dbbackupfile)
)
(if (eq? 0 (file-size sync-log))
(delete-file* sync-log))
(system (conc "/bin/mv " staging-file " " mtdbfile))
(set! last-sync-seconds (/ (- (current-milliseconds) start-time) 1000))
(set! off-time (calculate-off-time
last-sync-seconds
(cond
((and (number? duty-cycle) (> duty-cycle 0) (< duty-cycle 1))
duty-cycle)
(else
(debug:print 0 *default-log-port* "WARNING: ["(common:human-time)"] server.sync-duty-cycle is invalid. Should be a number between 0 and 1, but "duty-cycle" was specified. Using default value: "default-duty-cycle)
default-duty-cycle))))
(debug:print 1 *default-log-port* "INFO: ["(common:human-time)"] pid="(current-process-id)" SYNC took "last-sync-seconds" sec")
(debug:print 1 *default-log-port* "INFO: ["(common:human-time)"] pid="(current-process-id)" SYNC took "last-sync-seconds" sec ; with duty-cycle of "duty-cycle" off time is now "off-time)
'sync-completed))
(else
(system (conc "/bin/cp "sync-log" "sync-log".fail"))
(debug:print 0 *default-log-port* "ERROR: ["(common:human-time)"] Sync failed. See log at "sync-log".fail")
(if (file-exists? (conc mtdbfile ".backup"))
(system (conc "/bin/cp "mtdbfile ".backup " mtdbfile)))
#f))))
(common:simple-file-release-lock lockfile)
;; (BB> "released lockfile: " lockfile)
;; (when (common:file-exists? lockfile)
;; (BB> "DID NOT ACTUALLY RELEASE LOCKFILE"))
res2) ;; end let
);; end begin
;; else
(cond
(persist-until-sync
(thread-sleep! 1)
(debug:print 1 *default-log-port* "INFO: ["(common:human-time)"] pid="(current-process-id)" other SYNC in progress; we're in a fork-to-background so we need to succeed. Let's wait a jiffy and and try again. num-tries="num-tries" (waiting for lockfile="lockfile" to disappear)")
(retry-loop (add1 num-tries)))
(else
(thread-sleep! (max off-time (+ last-sync-seconds min-intersync-delay)))
(debug:print 1 *default-log-port* "INFO: ["(common:human-time)"] pid="(current-process-id)" other SYNC in progress; not syncing.")
'parallel-sync-in-progress))
) ;; end if got lockfile
)
))
;; (BB> "End do-a-sync with fork-to-background="fork-to-background" persist-until-sync="persist-until-sync" and result="finalres)
finalres)
) ;; end lambda
))
do-a-sync))
(define (server:writable-watchdog-bruteforce dbstruct)
(thread-sleep! 1) ;; delay for startup
(let* ((do-a-sync (server:get-bruteforce-syncer dbstruct))
(final-sync (server:get-bruteforce-syncer dbstruct fork-to-background: #t persist-until-sync: #t)))
(when (and (not (args:get-arg "-sync-to-megatest.db")) ;; conditions under which we do not run the sync
(args:get-arg "-server"))
(let loop ()
(do-a-sync)
(if (not *time-to-exit*) (loop))) ;; keep going unless time to exit
;; time to exit, close the no-sync db here
(final-sync)
(if (common:low-noise-print 30)
(debug:print-info 0 *default-log-port* "Exiting watchdog timer, *time-to-exit* = " *time-to-exit*" pid="(current-process-id)
)))))
(define (server:writable-watchdog-deltasync dbstruct)
(thread-sleep! 0.054) ;; delay for startup
(let ((legacy-sync (common:run-sync?))
(sync-stale-seconds (configf:lookup-number *configdat* "server" "sync-stale-seconds" default: 300))
(debug-mode (debug:debug-mode 1))
(last-time (current-seconds))
(no-sync-db (db:open-no-sync-db))
(stmt-cache (dbr:dbstruct-stmt-cache dbstruct))
(sync-duration 0) ;; run time of the sync in milliseconds
)
(set! *no-sync-db* no-sync-db) ;; make the no sync db available to api calls
(debug:print-info 2 *default-log-port* "Periodic sync thread started.")
(debug:print-info 3 *default-log-port* "watchdog starting. legacy-sync is " legacy-sync" pid="(current-process-id) );; " this-wd-num="this-wd-num)
(if (and legacy-sync (not *time-to-exit*))
(let* (;;(dbstruct (db:setup))
(mtdb (dbr:dbstruct-mtdb dbstruct))
(mtpath (db:dbdat-get-path mtdb))
(tmp-area (common:get-db-tmp-area))
(start-file (conc tmp-area "/.start-sync"))
(end-file (conc tmp-area "/.end-sync")))
(debug:print-info 0 *default-log-port* "Server running, periodic sync started.")
(let loop ()
;; sync for filesystem local db writes
;;
(mutex-lock! *db-multi-sync-mutex*)
(let* ((need-sync (>= *db-last-access* *db-last-sync*)) ;; no sync since last write
(sync-in-progress *db-sync-in-progress*)
(min-intersync-delay (configf:lookup-number *configdat* "server" "minimum-intersync-delay" default: 5))
(should-sync (and (not *time-to-exit*)
(> (- (current-seconds) *db-last-sync*) min-intersync-delay))) ;; sync every five seconds minimum, deprecated logic, can probably be removed
(start-time (current-seconds))
(cpu-load-adj (alist-ref 'adj-proc-load (common:get-normalized-cpu-load #f)))
(mt-mod-time (file-modification-time mtpath))
(last-sync-start (if (common:file-exists? start-file)
(file-modification-time start-file)
0))
(last-sync-end (if (common:file-exists? end-file)
(file-modification-time end-file)
10))
(sync-period (+ 3 (* cpu-load-adj 30))) ;; as adjusted load increases increase the sync period
(recently-synced (and (< (- start-time mt-mod-time) sync-period) ;; not useful if sync didn't modify megatest.db!
(< mt-mod-time last-sync-start)))
(sync-done (<= last-sync-start last-sync-end))
(sync-stale (> start-time (+ last-sync-start sync-stale-seconds)))
(will-sync (and (not *time-to-exit*) ;; do not start a sync if we are in the process of exiting
(or need-sync should-sync)
(or sync-done sync-stale)
(not sync-in-progress)
(not recently-synced))))
(debug:print-info 13 *default-log-port* "WD writable-watchdog top of loop. need-sync="need-sync" sync-in-progress=" sync-in-progress
" should-sync="should-sync" start-time="start-time" mt-mod-time="mt-mod-time" recently-synced="recently-synced" will-sync="will-sync
" sync-done=" sync-done " sync-period=" sync-period)
(if (and (> sync-period 5)
(common:low-noise-print 30 "sync-period"))
(debug:print-info 0 *default-log-port* "Increased sync period due to long sync times, sync took: " sync-period " seconds."))
;; (if recently-synced (debug:print-info 0 *default-log-port* "Skipping sync due to recently-synced flag=" recently-synced))
;; (debug:print-info 0 *default-log-port* "need-sync: " need-sync " sync-in-progress: " sync-in-progress " should-sync: " should-sync " will-sync: " will-sync)
(if will-sync (set! *db-sync-in-progress* #t))
(mutex-unlock! *db-multi-sync-mutex*)
(if will-sync
(let (;; (max-sync-duration (configf:lookup-number *configdat* "server" "max-sync-duration")) ;; KEEPING THIS AVAILABLE BUT SHOULD NOT USE, I'M PRETTY SURE IT DOES NOT WORK!
(sync-start (current-milliseconds)))
(with-output-to-file start-file (lambda ()(print (current-process-id))))
;; put lock here
;; (if (or (not max-sync-duration)
;; (< sync-duration max-sync-duration)) ;; NOTE: db:sync-to-megatest.db keeps track of time of last sync and syncs incrementally
(let ((res (db:sync-to-megatest.db dbstruct no-sync-db: no-sync-db))) ;; did we sync any data? If so need to set the db touched flag to keep the server alive
(set! sync-duration (- (current-milliseconds) sync-start))
(if (> res 0) ;; some records were transferred, keep the db alive
(begin
(mutex-lock! *heartbeat-mutex*)
(set! *db-last-access* (current-seconds))
(mutex-unlock! *heartbeat-mutex*)
(debug:print-info 0 *default-log-port* "sync called, " res " records transferred."))
(debug:print-info 2 *default-log-port* "sync called but zero records transferred")))))
;; ;; TODO: factor this next routine out into a function
;; (with-input-from-pipe ;; this should not block other threads but need to verify this
;; (conc "megatest -sync-to-megatest.db -m testsuite:" (common:get-area-name) ":" *toppath*)
;; (lambda ()
;; (let loop ((inl (read-line))
;; (res #f))
;; (if (eof-object? inl)
;; (begin
;; (set! sync-duration (- (current-milliseconds) sync-start))
;; (cond
;; ((not res)
;; (debug:print 0 *default-log-port* "ERROR: sync from /tmp db to megatest.db appears to have failed. Recommended that you stop your runs and run \"megatest -cleanup-db\""))
;; ((> res 0)
;; (mutex-lock! *heartbeat-mutex*)
;; (set! *db-last-access* (current-seconds))
;; (mutex-unlock! *heartbeat-mutex*))))
;; (let ((num-synced (let ((matches (string-match "^Synced (\\d+).*$" inl)))
;; (if matches
;; (string->number (cadr matches))
;; #f))))
;; (loop (read-line)
;; (or num-synced res))))))))))
(if will-sync
(begin
(mutex-lock! *db-multi-sync-mutex*)
(set! *db-sync-in-progress* #f)
(set! *db-last-sync* start-time)
(with-output-to-file end-file (lambda ()(print (current-process-id))))
;; release lock here
(mutex-unlock! *db-multi-sync-mutex*)))
(if (and debug-mode
(> (- start-time last-time) 60))
(begin
(set! last-time start-time)
(debug:print-info 4 *default-log-port* "timestamp -> " (seconds->time-string (current-seconds)) ", time since start -> " (seconds->hr-min-sec (- (current-seconds) *time-zero*))))))
;; keep going unless time to exit
;;
(if (not *time-to-exit*)
(let delay-loop ((count 0))
;;(debug:print-info 13 *default-log-port* "delay-loop top; count="count" pid="(current-process-id)" this-wd-num="this-wd-num" *time-to-exit*="*time-to-exit*)
(if (and (not *time-to-exit*)
(< count 6)) ;; was 11, changing to 4.
(begin
(thread-sleep! 1)
(delay-loop (+ count 1))))
(if (not *time-to-exit*) (loop))))
;; time to exit, close the no-sync db here
(db:no-sync-close-db no-sync-db stmt-cache)
(if (common:low-noise-print 30)
(debug:print-info 0 *default-log-port* "Exiting watchdog timer, *time-to-exit* = " *time-to-exit*" pid="(current-process-id) ))))))) ;;" this-wd-num="this-wd-num)))))))