;;======================================================================
;; Copyright 2019, Matthew Welland.
;;
;; This file is part of Megatest.
;;
;; Megatest is free software: you can redistribute it and/or modify
;; it under the terms of the GNU General Public License as published by
;; the Free Software Foundation, either version 3 of the License, or
;; (at your option) any later version.
;;
;; Megatest is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
;; GNU General Public License for more details.
;;
;; You should have received a copy of the GNU General Public License
;; along with Megatest. If not, see <http://www.gnu.org/licenses/>.
;;======================================================================
(declare (unit dbmod))
(declare (uses commonmod))
(declare (uses configfmod))
(module dbmod
*
(import scheme chicken data-structures extras)
(import (prefix sqlite3 sqlite3:) posix typed-records srfi-18 srfi-69 format ports srfi-1 matchable stack regex)
(import commonmod)
(import configfmod)
(import files)
;; (use (prefix ulex ulex:))
(include "common_records.scm")
;; ;; legacy handling of structure for managing db's. Refactor this into dbr:?
(define (db:dbdat-get-db dbdat)
(if (pair? dbdat)
(car dbdat)
dbdat))
;; Make the dbstruct, setup up auxillary db's and call for main db at least once
;;
;; called in http-transport and replicated in rmt.scm for *local* access.
;;
(define (db:setup do-sync alldat #!key (areapath #f))
(let* ((log-port (alldat-log-port alldat)))
(cond
((alldat-dbstack alldat) alldat) ;; already initialized
((not (alldat-areapath alldat)) ;; no top path yet? Just exit
(debug:print-info 13 log-port "in db:setup, area-path not set; give up and exit.")
(exit 1))
(else ;;(common:on-homehost?)
(debug:print-info 13 log-port "db:setup entered (first time, not cached.)")
(debug:print-info 13 log-port "Begin db:open-db")
(db:open-db alldat areapath: areapath do-sync: do-sync)
(debug:print-info 13 log-port "Done db:open-db")
;; (set! *dbstruct-db* dbstruct)
alldat))))
;; This routine creates the db if not already present. It is only called if the db is not already opened
;;
(define (db:open-db alldat #!key (areapath #f)(do-sync #t)) ;; TODO: actually use areapath
(let ((log-port (alldat-log-port alldat))
(tmpdb-stack (alldat-dbstack alldat))) ;; RA => Returns the first reference in alldat
(if (stack? tmpdb-stack)
(db:get-db tmpdb-stack) ;; get previously opened db (will create new db handle if all in the stack are already used
(let* ((max-stale-tmp (configf:lookup-number *configdat* "server" "filling-db-max-stale-seconds" default: 10))
(dbpath (common:get-db-tmp-area alldat)) ;; path to tmp db area
(dbexists (common:file-exists? dbpath))
(tmpdbfname (conc dbpath "/megatest.db"))
(dbfexists (common:file-exists? tmpdbfname)) ;; (conc dbpath "/megatest.db")))
(mtdbexists (common:file-exists? (conc *toppath* "/megatest.db")))
(mtdbmodtime (if mtdbexists (common:lazy-sqlite-db-modification-time (conc *toppath* "/megatest.db")) #f))
(tmpdbmodtime (if dbfexists (common:lazy-sqlite-db-modification-time tmpdbfname) #f))
(mtdb (db:open-megatest-db))
(mtdbpath (db:dbdat-get-path mtdb))
(tmpdb (db:open-megatest-db path: dbpath)) ;; lock-create-open dbpath db:initialize-main-db))
(refndb (db:open-megatest-db path: dbpath name: "megatest_ref.db"))
(write-access (file-write-access? mtdbpath))
;;(mtdbmodtime (if mtdbexists
;;(common:lazy-sqlite-db-modification-time mtdbpath)
;;#f)) ; moving this before db:open-megatest-db is
;;called. if wal mode is on -WAL and -shm file get
;;created with causing the tmpdbmodtime timestamp
;;always greater than mtdbmodtime (tmpdbmodtime (if
;;dbfexists (common:lazy-sqlite-db-modification-time
;;tmpdbfname) #f))
;;if wal mode is on -WAL and -shm file get created when
;;db:open-megatest-db is called. modtimedelta will
;;always be < 10 so db in tmp not get synced
;;(tmpdbmodtime (if dbfexists (db:get-last-update-time
;;(car tmpdb)) #f)) (fmt (file-modification-time
;;tmpdbfname))
(modtimedelta (and mtdbmodtime tmpdbmodtime (- mtdbmodtime tmpdbmodtime))))
(handle-exceptions
exn
(let ((call-chain (get-call-chain))
(msg ((condition-property-accessor 'exn 'message) exn)))
(debug:print 0 log-port "ERROR: attempted to drop triggers on MTRA/megatest.db but failed. Error is " msg)
(set! write-access #f)) ;; if we failed to drop the triggers then we probably don't have write access
(when write-access
(sqlite3:execute (car mtdb) "drop trigger if exists update_tests_trigger")
(sqlite3:execute (car mtdb) "drop trigger if exists update_runs_trigger")))
;;(print "mtdbmodtime " mtdbmodtime " tmpdbmodtime "
;;tmpdbmodtime " mtdbpath " mtdbpath " " (conc *toppath*
;;"/megatest.db")) (debug:print-info 13 log-port
;;"db:open-db>> mtdbpath="mtdbpath" mtdbexists="mtdbexists"
;;and write-access="write-access)
(if (and dbexists (not write-access))
(begin
(set! *db-write-access* #f)
(alldat-read-only-set! alldat #t)))
(alldat-mtdb-set! alldat mtdb)
(alldat-tmpdb-set! alldat tmpdb)
(alldat-dbstack-set! alldat (make-stack)) ;; why a stack?
(stack-push! (alldat-dbstack alldat) tmpdb) ;; olddb is already a (cons db path)
(alldat-refndb-set! alldat refndb)
;; (mutex-unlock! *rundb-mutex*)
(if (and (or (not dbfexists)
(and modtimedelta
(> modtimedelta max-stale-tmp))) ;; if db in tmp is over ten seconds older than the file in MTRA then do a sync back
do-sync)
(begin
(debug:print 1 log-port "filling db " (db:dbdat-get-path tmpdb) " with data \n from " (db:dbdat-get-path mtdb) " mod time delta: " modtimedelta)
(db:sync-tables (db:sync-all-tables-list alldat) #f mtdb refndb tmpdb)
;touch tmp db to avoid wal mode wierdness
(set! (file-modification-time tmpdbfname) (current-seconds))
(debug:print-info 13 log-port "db:sync-all-tables-list done.")
)
(debug:print 4 log-port " db, " (db:dbdat-get-path tmpdb) " already exists or fresh enough, not propogating data from\n " (db:dbdat-get-path mtdb) " mod time delta: " modtimedelta) )
;; (db:multi-db-sync alldat 'old2new)) ;; migrate data from megatest.db automatically
tmpdb))))
;; Get/open a database
;; if run-id => get run specific db
;; if #f => get main db
;; if db already open - return inmem
;; if db not open, open inmem, rundb and sync then return inmem
;; inuse gets set automatically for rundb's
;;
(define (db:get-db alldat) ;; run-id)
(if (stack? (alldat-dbstack alldat))
(if (stack-empty? (alldat-dbstack alldat))
(let ((newdb (db:open-megatest-db path: (common:get-db-tmp-area alldat))))
;; (stack-push! (alldat-dbstack alldat) newdb)
newdb)
(stack-pop! (alldat-dbstack alldat)))
(db:open-db alldat)))
(define (db:sync-all-tables-list alldat)
(append (db:sync-main-list alldat)
db:sync-tests-only))
;; just tests, test_steps and test_data tables
(define db:sync-tests-only
(list
;; (list "strs"
;; '("id" #f)
;; '("str" #f))
(list "tests"
'("id" #f)
'("run_id" #f)
'("testname" #f)
'("host" #f)
'("cpuload" #f)
'("diskfree" #f)
'("uname" #f)
'("rundir" #f)
'("shortdir" #f)
'("item_path" #f)
'("state" #f)
'("status" #f)
'("attemptnum" #f)
'("final_logf" #f)
'("logdat" #f)
'("run_duration" #f)
'("comment" #f)
'("event_time" #f)
'("fail_count" #f)
'("pass_count" #f)
'("archived" #f)
'("last_update" #f))
(list "test_steps"
'("id" #f)
'("test_id" #f)
'("stepname" #f)
'("state" #f)
'("status" #f)
'("event_time" #f)
'("comment" #f)
'("logfile" #f)
'("last_update" #f))
(list "test_data"
'("id" #f)
'("test_id" #f)
'("category" #f)
'("variable" #f)
'("value" #f)
'("expected" #f)
'("tol" #f)
'("units" #f)
'("comment" #f)
'("status" #f)
'("type" #f)
'("last_update" #f))))
;; needs db to get keys, this is for syncing all tables
;;
(define (db:sync-main-list alldat)
(let ((keys (db:get-keys alldat)))
(list
(list "keys"
'("id" #f)
'("fieldname" #f)
'("fieldtype" #f))
(list "metadat" '("var" #f) '("val" #f))
(append (list "runs"
'("id" #f))
(map (lambda (k)(list k #f))
(append keys
(list "runname" "state" "status" "owner" "event_time" "comment" "fail_count" "pass_count" "contour" "last_update"))))
(list "test_meta"
'("id" #f)
'("testname" #f)
'("owner" #f)
'("description" #f)
'("reviewed" #f)
'("iterated" #f)
'("avg_runtime" #f)
'("avg_disk" #f)
'("tags" #f)
'("jobgroup" #f)))))
;; why get the keys from the db? why not get from the *configdat*
;; using keys:config-get-fields?
(define (db:get-keys alldat)
(if (alldat-db-keys alldat)
(alldat-db-keys alldat)
(let ((res '()))
(db:with-db alldat #f #f
(lambda (db)
(sqlite3:for-each-row
(lambda (key)
(set! res (cons key res)))
db
"SELECT fieldname FROM keys ORDER BY id DESC;")))
(alldat-db-keys-set! alldat res)
res)))
;; (db:with-db alldat run-id sqlite3:exec "select blah fgrom blaz;")
;; r/w is a flag to indicate if the db is modified by this query #t = yes, #f = no
;;
(define (db:with-db alldat run-id r/w proc . params)
(let* ((have-struct (alldat? alldat))
(dbdat (if have-struct
(db:get-db alldat)
#f))
(db (if have-struct
(db:dbdat-get-db dbdat)
alldat))
(use-mutex (> (alldat-api-process-request-count alldat) 25))
(db-with-db-mutex (alldat-db-with-db-mutex alldat))
(log-port (alldat-log-port alldat)))
(if (and use-mutex
(common:low-noise-print 120 "over-50-parallel-api-requests"))
(debug:print-info 0 log-port (alldat-api-process-request-count alldat) " parallel api requests being processed in process " (current-process-id) ", throttling access"))
(if (common:low-noise-print 600 (conc "parallel-api-requests" (alldat-max-api-process-requests alldat)))
(debug:print-info 2 log-port "Parallel api request count: " (alldat-api-process-request-count alldat) " max parallel requests: " (alldat-max-api-process-requests alldat)))
(handle-exceptions
exn
(begin
(print-call-chain (current-error-port))
(debug:print-error 0 log-port "sqlite3 issue in db:with-db, alldat=" alldat ", run-id=" run-id ", proc=" proc ", params=" params " error: " ((condition-property-accessor 'exn 'message) exn))
;; there is no recovering at this time. exit
(exit 50))
(if use-mutex (mutex-lock! db-with-db-mutex))
(let ((res (apply proc db params)))
(if use-mutex (mutex-unlock! db-with-db-mutex))
(if dbdat (stack-push! (alldat-dbstack alldat) dbdat))
res))))
;; tbls is ( ("tablename" ( "field1" [#f|proc1] ) ( "field2" [#f|proc2] ) .... ) )
;; db's are dbdat's
;;
;; if last-update specified ("field-name" . time-in-seconds)
;; then sync only records where field-name >= time-in-seconds
;; IFF field-name exists
;;
(define (db:sync-tables alldat tbls last-update fromdb todb . slave-dbs)
(handle-exceptions
exn
(begin
(debug:print 0 *default-log-port* "EXCEPTION: database probably overloaded or unreadable in db:sync-tables.")
(print-call-chain (current-error-port))
(debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn))
(debug:print 5 *default-log-port* "exn=" (condition->list exn))
(debug:print 0 *default-log-port* " status: " ((condition-property-accessor 'sqlite3 'status) exn))
(debug:print 0 *default-log-port* " src db: " (db:dbdat-get-path fromdb))
(for-each (lambda (dbdat)
(let ((dbpath (db:dbdat-get-path dbdat)))
(debug:print 0 *default-log-port* " dbpath: " dbpath)
(if (not (db:repair-db dbdat))
(begin
(debug:print-error 0 *default-log-port* "Failed to rebuild " dbpath ", exiting now.")
(exit)))))
(cons todb slave-dbs))
0)
;; this is the work to be done
(cond
((not fromdb) (debug:print 3 *default-log-port* "WARNING: db:sync-tables called with fromdb missing")
-1)
((not todb) (debug:print 3 *default-log-port* "WARNING: db:sync-tables called with todb missing")
-2)
((not (sqlite3:database? (db:dbdat-get-db fromdb)))
(debug:print-error 0 *default-log-port* "db:sync-tables called with fromdb not a database " fromdb)
-3)
((not (sqlite3:database? (db:dbdat-get-db todb)))
(debug:print-error 0 *default-log-port* "db:sync-tables called with todb not a database " todb)
-4)
((not (file-write-access? (db:dbdat-get-path todb)))
(debug:print-error 0 *default-log-port* "db:sync-tables called with todb not a read-only database " todb)
-5)
((not (null? (let ((readonly-slave-dbs
(filter
(lambda (dbdat)
(not (file-write-access? (db:dbdat-get-path todb))))
slave-dbs)))
(for-each
(lambda (bad-dbdat)
(debug:print-error
0 *default-log-port* "db:sync-tables called with todb not a read-only database " bad-dbdat))
readonly-slave-dbs)
readonly-slave-dbs))) -6)
(else
(let ((stmts (make-hash-table)) ;; table-field => stmt
(all-stmts '()) ;; ( ( stmt1 value1 ) ( stml2 value2 ))
(numrecs (make-hash-table))
(start-time (current-milliseconds))
(tot-count 0))
(for-each ;; table
(lambda (tabledat)
(let* ((tablename (car tabledat))
(fields (cdr tabledat))
(has-last-update (member "last_update" fields))
(use-last-update (cond
((and has-last-update
(member "last_update" fields))
#t) ;; if given a number, just use it for all fields
((number? last-update) #f) ;; if not matched first entry then ignore last-update for this table
((and (pair? last-update)
(member (car last-update) ;; last-update field name
(map car fields))) #t)
(last-update
(debug:print 0 *default-log-port* "ERROR: parameter last-update for db:sync-tables must be a pair or a number, received: " last-update) ;; found in fields
#f)
(else
#f)))
(last-update-value (if use-last-update ;; no need to check for has-last-update - it is already accounted for
(if (number? last-update)
last-update
(cdr last-update))
#f))
(last-update-field (if use-last-update
(if (number? last-update)
"last_update"
(car last-update))
#f))
(num-fields (length fields))
(field->num (make-hash-table))
(num->field (apply vector (map car fields))) ;; BBHERE
(full-sel (conc "SELECT " (string-intersperse (map car fields) ",")
" FROM " tablename (if use-last-update ;; apply last-update criteria
(conc " WHERE " last-update-field " >= " last-update-value)
"")
";"))
(full-ins (conc "INSERT OR REPLACE INTO " tablename " ( " (string-intersperse (map car fields) ",") " ) "
" VALUES ( " (string-intersperse (make-list num-fields "?") ",") " );"))
(fromdat '())
(fromdats '())
(totrecords 0)
(batch-len (string->number (or (configf:lookup *configdat* "sync" "batchsize") "100")))
(todat (make-hash-table))
(count 0)
(delay-handicap (string->number (or (configf:lookup *configdat* "sync" "delay-handicap") "0")))
)
;; set up the field->num table
(for-each
(lambda (field)
(hash-table-set! field->num field count)
(set! count (+ count 1)))
fields)
;; read the source table
(sqlite3:for-each-row
(lambda (a . b)
(set! fromdat (cons (apply vector a b) fromdat))
(if (> (length fromdat) batch-len)
(begin
(set! fromdats (cons fromdat fromdats))
(set! fromdat '())
(set! totrecords (+ totrecords 1)))))
(db:dbdat-get-db fromdb)
full-sel)
;; tack on remaining records in fromdat
(if (not (null? fromdat))
(set! fromdats (cons fromdat fromdats)))
(if (common:low-noise-print 120 "sync-records")
(debug:print-info 4 *default-log-port* "found " totrecords " records to sync"))
;; read the target table; BBHERE
(sqlite3:for-each-row
(lambda (a . b)
(hash-table-set! todat a (apply vector a b)))
(db:dbdat-get-db todb)
full-sel)
(when (and delay-handicap (> delay-handicap 0))
(debug:print-info 0 *default-log-port* "imposing synthetic sync delay of "delay-handicap" seconds since sync/delay-handicap is configured")
(thread-sleep! delay-handicap)
(debug:print-info 0 *default-log-port* "synthetic sync delay of "delay-handicap" seconds completed")
)
;; first pass implementation, just insert all changed rows
(for-each
(lambda (targdb)
(let* ((db (db:dbdat-get-db targdb))
(stmth (sqlite3:prepare db full-ins)))
(for-each
(lambda (fromdat-lst)
(sqlite3:with-transaction
db
(lambda ()
(for-each ;;
(lambda (fromrow)
(let* ((a (vector-ref fromrow 0))
(curr (hash-table-ref/default todat a #f))
(same #t))
(let loop ((i 0))
(if (or (not curr)
(not (equal? (vector-ref fromrow i)(vector-ref curr i))))
(set! same #f))
(if (and same
(< i (- num-fields 1)))
(loop (+ i 1))))
(if (not same)
(begin
(apply sqlite3:execute stmth (vector->list fromrow))
(hash-table-set! numrecs tablename (+ 1 (hash-table-ref/default numrecs tablename 0)))))))
fromdat-lst))
))
fromdats)
(sqlite3:finalize! stmth)))
(append (list todb) slave-dbs))))
tbls)
(let* ((runtime (- (current-milliseconds) start-time))
(should-print (or (debug:debug-mode 12)
(common:low-noise-print 120 "db sync" (> runtime 500))))) ;; low and high sync times treated as separate.
(if should-print (debug:print 3 *default-log-port* "INFO: db sync, total run time " runtime " ms"))
(for-each
(lambda (dat)
(let ((tblname (car dat))
(count (cdr dat)))
(set! tot-count (+ tot-count count))
(if (> count 0)
(if should-print (debug:print 0 *default-log-port* (format #f " ~10a ~5a" tblname count))))))
(sort (hash-table->alist numrecs)(lambda (a b)(> (cdr a)(cdr b))))))
tot-count)))))
;; return #f to indicate the dbdat should be closed/reopened
;; else return dbdat
;;
(define (db:repair-db dbdat #!key (numtries 1))
(let* ((dbpath (db:dbdat-get-path dbdat))
(dbdir (pathname-directory dbpath))
(fname (pathname-strip-directory dbpath)))
(debug:print-info 0 *default-log-port* "Checking db " dbpath " for errors.")
(cond
((not (file-write-access? dbdir))
(debug:print 0 *default-log-port* "WARNING: can't write to " dbdir ", can't fix " fname)
#f)
;; handle special cases, megatest.db and monitor.db
;;
;; NOPE: apply this same approach to all db files
;;
(else ;; ((equal? fname "megatest.db") ;; this file can be regenerated if needed
(handle-exceptions
exn
(begin
;; (db:move-and-recreate-db dbdat)
(if (> numtries 0)
(db:repair-db dbdat numtries: (- numtries 1))
#f)
(debug:print 0 *default-log-port* "FATAL: file " dbpath " was found corrupted, an attempt to fix has been made but you must start over.")
(debug:print 0 *default-log-port*
" check the following:\n"
" 1. full directories, look in ~/ /tmp and " dbdir "\n"
" 2. write access to " dbdir "\n\n"
" if the automatic recovery failed you may be able to recover data by doing \""
(if (member fname '("megatest.db" "monitor.db"))
"megatest -cleanup-db"
"megatest -import-megatest.db;megatest -cleanup-db")
"\"\n")
(exit) ;; we can not safely continue when a db was corrupted - even if fixed.
)
;; test read/write access to the database
(let ((db (sqlite3:open-database dbpath)))
(cond
((equal? fname "megatest.db")
(sqlite3:execute db "DELETE FROM tests WHERE state='DELETED';"))
((equal? fname "main.db")
(sqlite3:execute db "DELETE FROM runs WHERE state='deleted';"))
((string-match "\\d.db" fname)
(sqlite3:execute db "UPDATE tests SET state='DELETED' WHERE state='DELETED';"))
((equal? fname "monitor.db")
(sqlite3:execute "DELETE FROM servers WHERE state LIKE 'defunct%';"))
(else
(sqlite3:execute db "vacuum;")))
(sqlite3:finalize! db)
#t))))))
)