;;======================================================================
;; Copyright 2019, Matthew Welland.
;;
;; This file is part of Megatest.
;;
;; Megatest is free software: you can redistribute it and/or modify
;; it under the terms of the GNU General Public License as published by
;; the Free Software Foundation, either version 3 of the License, or
;; (at your option) any later version.
;;
;; Megatest is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
;; GNU General Public License for more details.
;;
;; You should have received a copy of the GNU General Public License
;; along with Megatest. If not, see <http://www.gnu.org/licenses/>.
;;======================================================================
(declare (unit tasksmod))
(declare (uses commonmod))
(declare (uses pgdbmod))
(declare (uses mtconfigf))
(module tasksmod
*
(import scheme (chicken base) (chicken condition) (chicken string) (chicken process) (chicken process-context) (chicken process-context posix) (chicken file) (chicken file posix) system-information )
(import (prefix sqlite3 sqlite3:) typed-records srfi-18 srfi-69 format srfi-1 matchable
regex)
(import commonmod)
(import (prefix mtconfigf configf:))
(import pgdbmod)
(include "common_records.scm")
(include "task_records.scm")
;; (include "tasks-inc.scm")
;;======================================================================
;; Tasks db
;;======================================================================
;; wait up to aprox n seconds for a journal to go away
;;
(define (tasks:wait-on-journal path n #!key (remove #f)(waiting-msg #f))
(if (not (string? path))
(debug:print-error 0 *default-log-port* "Called tasks:wait-on-journal with path=" path " (not a string)")
(let ((fullpath (conc path "-journal")))
(handle-exceptions
exn
(begin
(print-call-chain (current-error-port))
(debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn))
(debug:print 5 *default-log-port* " exn=" (condition->list exn))
(debug:print 0 *default-log-port* "tasks:wait-on-journal failed. Continuing on, you can ignore this call-chain")
#t) ;; if stuff goes wrong just allow it to move on
(let loop ((journal-exists (common:file-exists? fullpath))
(count n)) ;; wait ten times ...
(if journal-exists
(begin
(if (and waiting-msg
(eq? (modulo n 30) 0))
(debug:print 0 *default-log-port* waiting-msg))
(if (> count 0)
(begin
(thread-sleep! 1)
(loop (common:file-exists? fullpath)
(- count 1)))
(begin
(debug:print 0 *default-log-port* "ERROR: removing the journal file " fullpath ", this is not good. Look for disk full, write access and other issues.")
(if remove (system (conc "rm -rf " fullpath)))
#f)))
#t))))))
(define (tasks:get-task-db-path)
(let ((dbdir (or (configf:lookup *configdat* "setup" "monitordir")
(configf:lookup *configdat* "setup" "dbdir")
(conc (common:get-linktree) "/.db"))))
(handle-exceptions
exn
(begin
(debug:print-error 0 *default-log-port* "Couldn't create path to " dbdir)
(exit 1))
(if (not (directory? dbdir))(create-directory dbdir #t)))
dbdir))
;; If file exists AND
;; file readable
;; ==> open it
;; If file exists AND
;; file NOT readable
;; ==> open in-mem version
;; If file NOT exists
;; ==> open in-mem version
;;
(define (tasks:open-db #!key (numretries 4))
(if *task-db*
*task-db*
(handle-exceptions
exn
(if (> numretries 0)
(begin
(print-call-chain (current-error-port))
(debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn))
(debug:print 5 *default-log-port* " exn=" (condition->list exn))
(thread-sleep! 1)
(tasks:open-db numretries (- numretries 1)))
(begin
(print-call-chain (current-error-port))
(debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn))
(debug:print 5 *default-log-port* " exn=" (condition->list exn))))
(let* ((dbpath (db:dbfile-path )) ;; (tasks:get-task-db-path))
(dbfile (conc dbpath "/monitor.db"))
(avail (tasks:wait-on-journal dbpath 10)) ;; wait up to about 10 seconds for the journal to go away
(exists (common:file-exists? dbpath))
(write-access (file-writable? dbpath))
(mdb (cond ;; what the hek is *toppath* doing here?
((and (string? *toppath*)(file-writable? *toppath*))
(sqlite3:open-database dbfile))
((file-readable? dbpath) (sqlite3:open-database dbfile))
(else (sqlite3:open-database ":memory:")))) ;; (never-give-up-open-db dbpath))
(handler (sqlite3:make-busy-timeout 36000)))
(if (and exists
(not write-access))
(set! *db-write-access* write-access)) ;; only unset so other db's also can use this control
(sqlite3:set-busy-handler! mdb handler)
(db:set-sync mdb) ;; (sqlite3:execute mdb (conc "PRAGMA synchronous = 0;"))
;; (if (or (and (not exists)
;; (file-write-access? *toppath*))
;; (not (file-read-access? dbpath)))
;; (begin
;;
;; TASKS QUEUE MOVED TO main.db
;;
;; (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS tasks_queue (id INTEGER PRIMARY KEY,
;; action TEXT DEFAULT '',
;; owner TEXT,
;; state TEXT DEFAULT 'new',
;; target TEXT DEFAULT '',
;; name TEXT DEFAULT '',
;; testpatt TEXT DEFAULT '',
;; keylock TEXT,
;; params TEXT,
;; creation_time TIMESTAMP,
;; execution_time TIMESTAMP);")
(sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS monitors (id INTEGER PRIMARY KEY,
pid INTEGER,
start_time TIMESTAMP,
last_update TIMESTAMP,
hostname TEXT,
username TEXT,
CONSTRAINT monitors_constraint UNIQUE (pid,hostname));")
(sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS servers (id INTEGER PRIMARY KEY,
pid INTEGER,
interface TEXT,
hostname TEXT,
port INTEGER,
pubport INTEGER,
start_time TIMESTAMP,
priority INTEGER,
state TEXT,
mt_version TEXT,
heartbeat TIMESTAMP,
transport TEXT,
run_id INTEGER);")
;; CONSTRAINT servers_constraint UNIQUE (pid,hostname,port));")
(sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS clients (id INTEGER PRIMARY KEY,
server_id INTEGER,
pid INTEGER,
hostname TEXT,
cmdline TEXT,
login_time TIMESTAMP,
logout_time TIMESTAMP DEFAULT -1,
CONSTRAINT clients_constraint UNIQUE (pid,hostname));")
;))
(set! *task-db* (cons mdb dbpath))
*task-db*))))
;;======================================================================
;; Server and client management
;;======================================================================
;; make-vector-record tasks hostinfo id interface port pubport transport pid hostname
(define (tasks:hostinfo-get-id vec) (vector-ref vec 0))
(define (tasks:hostinfo-get-interface vec) (vector-ref vec 1))
(define (tasks:hostinfo-get-port vec) (vector-ref vec 2))
(define (tasks:hostinfo-get-pubport vec) (vector-ref vec 3))
(define (tasks:hostinfo-get-transport vec) (vector-ref vec 4))
(define (tasks:hostinfo-get-pid vec) (vector-ref vec 5))
(define (tasks:hostinfo-get-hostname vec) (vector-ref vec 6))
(define (tasks:need-server run-id)
(equal? (configf:lookup *configdat* "server" "required") "yes"))
;; no elegance here ...
;;
(define (tasks:kill-server hostname pid #!key (kill-switch ""))
(debug:print-info 0 *default-log-port* "Attempting to kill server process " pid " on host " hostname)
(set-environment-variable! "TARGETHOST" hostname)
(let* ((logdir (if (directory-exists? "logs")
"logs/"
""))
(logfile (if logdir (conc "logs/server-"pid"-"hostname".log") #f))
(gzfile (if logfile (conc logfile ".gz"))))
(set-environment-variable! "TARGETHOST_LOGF" (conc logdir "server-kills.log"))
(system (conc "nbfake kill "kill-switch" "pid))
(when logfile
(thread-sleep! 0.5)
(if (common:file-exists? gzfile) (delete-file gzfile))
(system (conc "gzip " logfile))
(unset-environment-variable! "TARGETHOST_LOGF")
(unset-environment-variable! "TARGETHOST"))))
;;======================================================================
;; M O N I T O R S
;;======================================================================
(define (tasks:remove-monitor-record mdb)
(sqlite3:execute mdb "DELETE FROM monitors WHERE pid=? AND hostname=?;"
(current-process-id)
(get-host-name)))
(define (tasks:get-monitors mdb)
(let ((res '()))
(sqlite3:for-each-row
(lambda (a . rem)
(set! res (cons (apply vector a rem) res)))
mdb
"SELECT id,pid,strftime('%m/%d/%Y %H:%M',datetime(start_time,'unixepoch'),'localtime'),strftime('%m/%d/%Y %H:%M:%S',datetime(last_update,'unixepoch'),'localtime'),hostname,username FROM monitors ORDER BY last_update ASC;")
(reverse res)
))
(define (tasks:monitors->text-table monitors)
(let ((fmtstr "~4a~8a~20a~20a~10a~10a"))
(conc (format #f fmtstr "id" "pid" "start time" "last update" "hostname" "user") "\n"
(string-intersperse
(map (lambda (monitor)
(format #f fmtstr
(tasks:monitor-get-id monitor)
(tasks:monitor-get-pid monitor)
(tasks:monitor-get-start_time monitor)
(tasks:monitor-get-last_update monitor)
(tasks:monitor-get-hostname monitor)
(tasks:monitor-get-username monitor)))
monitors)
"\n"))))
;; update the last_update field with the current time and
;; if any monitors appear dead, remove them
(define (tasks:monitors-update mdb)
(sqlite3:execute mdb "UPDATE monitors SET last_update=strftime('%s','now') WHERE pid=? AND hostname=?;"
(current-process-id)
(get-host-name))
(let ((deadlist '()))
(sqlite3:for-each-row
(lambda (id pid host last-update delta)
(print "Going to delete stale record for monitor with pid " pid " on host " host " last updated " delta " seconds ago")
(set! deadlist (cons id deadlist)))
mdb
"SELECT id,pid,hostname,last_update,strftime('%s','now')-last_update AS delta FROM monitors WHERE delta > 700;")
(sqlite3:execute mdb (conc "DELETE FROM monitors WHERE id IN ('" (string-intersperse (map conc deadlist) "','") "');")))
)
(define (tasks:register-monitor db port)
(let* ((pid (current-process-id))
(hostname (get-host-name))
(userinfo (user-information (current-user-id)))
(username (car userinfo)))
(print "Register monitor, pid: " pid ", hostname: " hostname ", port: " port ", username: " username)
(sqlite3:execute db "INSERT INTO monitors (pid,start_time,last_update,hostname,username) VALUES (?,strftime('%s','now'),strftime('%s','now'),?,?);"
pid hostname username)))
(define (tasks:get-num-alive-monitors mdb)
(let ((res 0))
(sqlite3:for-each-row
(lambda (count)
(set! res count))
mdb
"SELECT count(id) FROM monitors WHERE last_update < (strftime('%s','now') - 300) AND username=?;"
(car (user-information (current-user-id))))
res))
;;
#;(define (tasks:start-monitor db mdb)
(if (> (tasks:get-num-alive-monitors mdb) 2) ;; have two running, no need for more
(debug:print-info 1 *default-log-port* "Not starting monitor, already have more than two running")
(let* ((megatestdb (conc *toppath* "/megatest.db"))
(monitordbf (conc (db:dbfile-path #f) "/monitor.db"))
(last-db-update 0)) ;; (file-modification-time megatestdb)))
(task:register-monitor mdb)
(let loop ((count 0)
(next-touch 0)) ;; next-touch is the time where we need to update last_update
;; if the db has been modified we'd best look at the task queue
(let ((modtime (file-modification-time megatestdbpath )))
(if (> modtime last-db-update)
(tasks:process-queue db)) ;; BROKEN. mdb last-db-update megatestdb next-touch))
;; WARNING: Possible race conditon here!!
;; should this update be immediately after the task-get-action call above?
(if (> (current-seconds) next-touch)
(begin
(tasks:monitors-update mdb)
(loop (+ count 1)(+ (current-seconds) 240)))
(loop (+ count 1) next-touch)))))))
;;======================================================================
;; T A S K S Q U E U E
;;
;; NOTE:: These operate on task_queue which is in main.db
;;
;;======================================================================
;; NOTE: It might be good to add one more layer of checking to ensure
;; that no task gets run in parallel.
;; id INTEGER PRIMARY KEY,
;; action TEXT DEFAULT '',
;; owner TEXT,
;; state TEXT DEFAULT 'new',
;; target TEXT DEFAULT '',
;; name TEXT DEFAULT '',
;; testpatt TEXT DEFAULT '',
;; keylock TEXT,
;; params TEXT,
;; creation_time TIMESTAMP DEFAULT (strftime('%s','now')),
;; execution_time TIMESTAMP);
;;======================================================================
;; S Y N C T O P O S T G R E S Q L
;;======================================================================
;; In the spirit of "dump your junk in the tasks module" I'll put the
;; sync to postgres here for now.
;; attempt to automatically set up an area. call only if get area by path
;; returns naught of interest
;;
(define (tasks:set-area dbh configdat #!key (toppath #f)) ;; could I safely put *toppath* in for the default for toppath? when would it be evaluated?
(let loop ((area-name (or (configf:lookup configdat "setup" "area-name")
(common:get-area-name)))
(modifier 'none))
(let ((success (handle-exceptions
exn
(begin
(debug:print 0 *default-log-port* "ERROR: cannot create area entry, " ((condition-property-accessor 'exn 'message) exn))
#f) ;; FIXME: I don't care for now but I should look at *why* there was an exception
(pgdb:add-area dbh area-name (or toppath *toppath*)))))
(or success
(case modifier
((none)(loop (conc (current-user-name) "_" area-name) 'user))
((user)(loop (conc (substring (common:get-area-path-signature) 0 4)
area-name) 'areasig))
(else #f)))))) ;; give up
(define (task:print-runtime run-times saperator)
(for-each
(lambda (run-time-info)
(let* ((run-name (vector-ref run-time-info 0))
(run-time (vector-ref run-time-info 1))
(target (vector-ref run-time-info 2)))
(print target saperator run-name saperator run-time )))
run-times))
(define (task:print-runtime-as-json run-times)
(let loop ((run-time-info (car run-times))
(rema (cdr run-times))
(str ""))
(let* ((run-name (vector-ref run-time-info 0))
(run-time (vector-ref run-time-info 1))
(target (vector-ref run-time-info 2)))
;(print (not (equal? str "")))
(if (not (equal? str ""))
(set! str (conc str ",")))
(if (null? rema)
(print "[" str "{target:" target ",run-name:" run-name ", run-time:" run-time "}]")
(loop (car rema) (cdr rema) (conc str "{target:" target ", run-name:" run-name ", run-time:" run-time "}"))))))
(define (task:print-testtime test-times saperator)
(for-each
(lambda (test-time-info)
(let* ((test-name (vector-ref test-time-info 0))
(test-time (vector-ref test-time-info 2))
(test-item (if (eq? (string-length (vector-ref test-time-info 1)) 0)
"N/A"
(vector-ref test-time-info 1))))
(print test-name saperator test-item saperator test-time )))
test-times))
(define (task:print-testtime-as-json test-times)
(let loop ((test-time-info (car test-times))
(rema (cdr test-times))
(str ""))
(let* ((test-name (vector-ref test-time-info 0))
(test-time (vector-ref test-time-info 2))
(item (vector-ref test-time-info 1)))
;(print (not (equal? str "")))
(if (not (equal? str ""))
(set! str (conc str ",")))
(if (null? rema)
(print "[" str "{test-name:" test-name ", item-path:" item ", test-time:" test-time "}]")
(loop (car rema) (cdr rema) (conc str "{test-name:" test-name ", item-path:" item ", test-time:" test-time "}"))))))
(define (db:set-sync db)
(let ((syncprag (configf:lookup *configdat* "setup" "sychronous")))
(sqlite3:execute db (conc "PRAGMA synchronous = " (or syncprag 0) ";"))))
)