;;======================================================================
;; Copyright 2017, Matthew Welland.
;;
;; This file is part of Megatest.
;;
;; Megatest is free software: you can redistribute it and/or modify
;; it under the terms of the GNU General Public License as published by
;; the Free Software Foundation, either version 3 of the License, or
;; (at your option) any later version.
;;
;; Megatest is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
;; GNU General Public License for more details.
;;
;; You should have received a copy of the GNU General Public License
;; along with Megatest. If not, see <http://www.gnu.org/licenses/>.
;;======================================================================
(declare (unit dbmod))
(declare (uses dbfile))
(declare (uses commonmod))
(declare (uses debugprint))
(module dbmod
*
(import scheme
chicken
data-structures
extras
files
(prefix sqlite3 sqlite3:)
matchable
posix
typed-records
srfi-1
srfi-18
srfi-69
commonmod
dbfile
debugprint
)
;; NOTE: This returns only the name "1.db", "main.db", not the path
;;
(define (dbmod:run-id->dbfname run-id)
(conc (dbfile:run-id->dbnum run-id)".db"))
(define (dbmod:get-dbdir dbstruct)
(let* ((areapath (dbr:dbstruct-areapath dbstruct))
(dbdir (conc areapath"/.mtdb")))
(if (and (file-write-access? areapath)
(not (file-exists? dbdir)))
(create-directory dbdir))
dbdir))
(define (dbmod:run-id->full-dbfname dbstruct run-id)
(conc (dbmod:get-dbdir dbstruct
run-id
)"/"(dbmod:run-id->dbfname run-id)))
;;======================================================================
;; Read-only inmem cached direct from disk method
;;======================================================================
(define *dbmod:nfs-db-handles* (make-hash-table)) ;; dbfname -> dbstruct
;; called in rmt.scm nfs-transport-handler
(define (dbmod:nfs-get-dbstruct run-id keys init-proc areapath)
(assert areapath "FATAL: dbmod:nfs-get-dbstruct called without areapath set.")
(let* ((dbfname (dbmod:run-id->dbfname run-id))
(dbstruct (hash-table-ref/default *dbmod:nfs-db-handles* dbfname #f)))
(if dbstruct
dbstruct
(let* ((newdbstruct (dbmod:open-dbmoddb areapath run-id dbfname init-proc keys syncdir: 'fromdisk)))
(hash-table-set! *dbmod:nfs-db-handles* dbfname newdbstruct)
newdbstruct))))
;;======================================================================
;; The inmem one-db file per server method goes in here
;;======================================================================
;; NOTE: the r/w is now w/r, #t=db modified by query, #f=db NOT modified by query
(define (dbmod:with-db dbstruct run-id w/r proc params)
(let* ((use-mutex (or (and w/r ;; use the mutex on queries that modify the db and for sync to disk
(> *api-process-request-count* 5)) ;; when writes are happening throttle more
(> *api-process-request-count* 50)))
(dbdat (dbmod:open-db dbstruct run-id (dbfile:db-init-proc)))
(dbh (dbr:dbdat-dbh dbdat)) ;; this will be the inmem handle
(dbfile (dbr:dbdat-dbfile dbdat)))
;; if nfs mode do a sync if delta > 2
(let* ((last-update (dbr:dbstruct-last-update dbstruct))
(sync-proc (dbr:dbstruct-sync-proc dbstruct))
(curr-secs (current-seconds)))
(if (> (- curr-secs last-update) 3)
(begin
(sync-proc last-update)
;; MOVE THIS CALL TO INSIDE THE sync-proc CALL
(dbr:dbstruct-last-update-set! dbstruct curr-secs)
)))
(assert (sqlite3:database? dbh) "FATAL: bad db handle in dbmod:with-db")
(if use-mutex (mutex-lock! *db-with-db-mutex*))
(let* ((res (apply proc dbdat dbh params)))
(if use-mutex (mutex-unlock! *db-with-db-mutex*))
res)))
(define (db:with-db dbstruct run-id w/r proc . params)
(dbmod:with-db dbstruct run-id w/r proc params))
(define (dbmod:open-inmem-db init-proc #!optional (dbfullname #f))
(let* ((db (if dbfullname
(dbmod:safely-open-db dbfullname init-proc #t)
(sqlite3:open-database ":memory:")))
(handler (sqlite3:make-busy-timeout 3600)))
(sqlite3:set-busy-handler! db handler)
(init-proc db)
db))
(define (dbmod:open-db dbstruct run-id dbinit)
(or (dbr:dbstruct-dbdat dbstruct)
(let* ((dbdat (make-dbr:dbdat
dbfile: (dbr:dbstruct-dbfile dbstruct)
dbh: (dbr:dbstruct-inmem dbstruct)
)))
(dbr:dbstruct-dbdat-set! dbstruct dbdat)
dbdat)))
(define (dbmod:need-on-disk-db-handle)
(case (dbfile:cache-method)
((none tmp) #t)
((inmem)
(case (dbfile:sync-method)
((original) #t)
((attach) #t) ;; we need it to force creation of the on-disk file - FIXME
(else
(debug:print 0 *default-log-port* "Unknown dbfile:sync-method setting: "
(dbfile:sync-method)))))
(else
(debug:print 0 *default-log-port* "Unknown dbfile:cache-method setting: "
(dbfile:cache-method))
#f)))
(define (dbmod:safely-open-db dbfullname init-proc write-access)
(dbfile:with-simple-file-lock
(conc dbfullname".lock")
(lambda ()
(let* ((db (sqlite3:open-database dbfullname))
(handler (sqlite3:make-busy-timeout 136000)))
(sqlite3:set-busy-handler! db handler)
(if write-access
(init-proc db))
db))))
;; try every second until tries times proc
;;
(define (db:keep-trying-until-true proc params tries)
(let* ((res (apply proc params)))
(if res
res
(if (> tries 0)
(begin
(thread-sleep! 1)
(db:keep-trying-until-true proc params (- tries 1)))
(begin
;; (debug:print-info 0 *default-log-port* "proc never returned true, params="params)
(print"db:keep-trying-until-true proc never returned true, proc = " proc " params =" params " tries = " tries)
#f)))))
(define *sync-in-progress* #f)
;; Open the inmem db and the on-disk db
;; populate the inmem db with data
;;
;; Updates fields in dbstruct
;; Returns dbstruct
;;
;; * This routine creates the db if not found
;; * Probably can get rid of the dbstruct-in
;;
(define (dbmod:open-dbmoddb areapath run-id dbfname-in init-proc keys
#!key (dbstruct-in #f)
(syncdir 'todisk))
(let* ((dbstruct (or dbstruct-in (make-dbr:dbstruct areapath: areapath)))
(dbfname (or dbfname-in (dbmod:run-id->dbfname run-id)))
(dbpath (dbmod:get-dbdir dbstruct)) ;; directory where all the .db files are kept
(dbfullname (conc dbpath"/"dbfname)) ;; (dbmod:run-id->full-dbfname dbstruct run-id))
(dbexists (file-exists? dbfullname))
(tmpdir (conc "/tmp/"(current-user-name)))
(tmpdb (let* ((fname (conc tmpdir"/" (string-translate areapath "/" ".")"-"(current-process-id)"-"dbfname)))
(if (not (file-exists? tmpdir))(create-directory tmpdir))
;; check if tmpdb already exists, either delete it or
;; add something to the name
fname))
(inmem (dbmod:open-inmem-db init-proc
(if (eq? (dbcache-mode) 'inmem)
#f
tmpdb)
))
(write-access (file-write-access? dbpath))
(db (dbmod:safely-open-db dbfullname init-proc write-access))
(tables (db:sync-all-tables-list keys)))
(if (not (and (sqlite3:database? inmem)
(sqlite3:database? db)))
(begin
(debug:print 0 *default-log-port* "ERROR: Failed to properly open "dbfname-in", exiting immediately.")
(exit)))
;; (assert (sqlite3:database? inmem) "FATAL: open-dbmoddb: inmem is not a db")
;; (assert (sqlite3:database? db) "FATAL: open-dbmoddb: db is not a db")
(dbr:dbstruct-inmem-set! dbstruct inmem)
(dbr:dbstruct-ondiskdb-set! dbstruct db)
(dbr:dbstruct-dbfile-set! dbstruct dbfullname)
(dbr:dbstruct-dbfname-set! dbstruct dbfname)
(dbr:dbstruct-sync-proc-set! dbstruct
(lambda (last-update)
(if *sync-in-progress*
(debug:print 3 *default-log-port* "WARNING: overlapping calls to sync to disk")
(thread-start!
(make-thread
(lambda ()
(mutex-lock! *db-with-db-mutex*) ;; this mutex is used when overloaded or during a query that modifies the db
(set! *sync-in-progress* #t)
#;(dbmod:sync-gasket tables last-update inmem db
dbfullname syncdir)
(system (conc "megatest -db2db -from "tmpdb" -to "dbfullname))
(mutex-unlock! *db-with-db-mutex*)
(thread-sleep! 0.5) ;; ensure at least 1/2 second down time between sync calls
(set! *sync-in-progress* #f)))))))
;; (dbmod:sync-tables tables #f db inmem)
;; (if db
(dbmod:sync-gasket tables #f inmem db dbfullname 'fromdest keys) ;; ) ;; load into inmem
(dbr:dbstruct-last-update-set! dbstruct (current-seconds)) ;; should this be offset back in time by one second?
dbstruct))
;; (if (eq? syncdir 'todisk) ;; sync to disk normally, sync from in dashboard
;; (dbmod:sync-tables tables last-update inmem db)
;; (dbmod:sync-tables tables last-update db inmem))))
;; direction: 'fromdest 'todest
;;
(define (dbmod:sync-gasket tables last-update inmem dbh dbfname direction keys)
(assert (sqlite3:database? inmem) "FATAL: sync-gasket: inmem is not a db")
(assert (sqlite3:database? dbh) "FATAL: sync-gasket: dbh is not a db")
(debug:print-info 0 *default-log-port* "Db sync using "(dbfile:sync-method)" method")
(case (dbfile:sync-method)
((none) #f)
((attach)
(dbmod:attach-sync tables inmem dbfname direction))
((newsync) ;; DON'T USE THIS ONE. IT IS BORKED
(dbmod:new-sync tables inmem dbh dbfname direction))
(else
(case direction
((todisk)
(dbmod:sync-tables tables last-update keys inmem dbh)
)
(else
(dbmod:sync-tables tables last-update keys dbh inmem))))))
(define (dbmod:close-db dbstruct)
;; do final sync to disk file
;; (do-sync ...)
(sqlite3:finalize! (dbr:dbstruct-ondiskdb dbstruct)))
;;======================================================================
;; Sync db
;;======================================================================
(define (dbmod:calc-use-last-update has-last-update fields last-update)
(cond
((and has-last-update
(member "last_update" fields))
#t) ;; if given a number, just use it for all fields
((number? last-update) #f) ;; if not matched first entry then ignore last-update for this table
((and (pair? last-update)
(member (car last-update) ;; last-update field name
(map car fields)))
#t)
((and last-update (not (pair? last-update)) (not (number? last-update)))
(debug:print 0 *default-log-port* "ERROR: parameter last-update for db:sync-tables must be a pair or a number, received: " last-update);; found in fields
#f)
(else
#f)))
;; tbls is ( ("tablename" ( "field1" [#f|proc1] ) ( "field2" [#f|proc2] ) .... ) )
;; dbs are sqlite3 db handles
;;
;; if last-update specified ("field-name" . time-in-seconds)
;; then sync only records where field-name >= time-in-seconds
;; IFF field-name exists
;;
;; Use (db:sync-all-tables-list keys) to get the tbls input
;;
(define (dbmod:sync-tables tbls last-update keys fromdb todb)
(assert (sqlite3:database? fromdb) "FATAL: dbmod:sync-tables called with fromdb not a database" fromdb)
(assert (sqlite3:database? todb) "FATAL: dbmod:sync-tables called with fromdb not a database" todb)
(let ((specials `(("keys" "fieldname")
("metadat" "var")
,(cons "runs" (cons "runname" keys))
("tests" "run_id" "testname" "item_path")
("test_meta" "testname")
("test_steps" "test_id" "stepname" "state")
("test_data" "test_id" "category" "variable")))
(stmts (make-hash-table)) ;; table-field => stmt
(all-stmts '()) ;; ( ( stmt1 value1 ) ( stml2 value2 ))
(numrecs (make-hash-table))
(start-time (current-milliseconds))
(tot-count 0))
(for-each ;; table
(lambda (tabledat)
(let* ((count (match tabledat
((tablename . fields)
(debug:print-info 0 *default-log-port* "Syncing table "tablename)
(dbmod:sync-table tablename fields fromdb todb specials))
(else
(debug:print-warn 0 *default-log-port* "Bad tabledat entry: "tabledat)
0))))
(set! tot-count (+ tot-count count))))
tbls)
(debug:print-info 0 *default-log-port* "dbmod:sync-tables completed in "(- (current-milliseconds) start-time)"ms")
tot-count))
(define (dbmod:sync-table tablename fields from-db to-db specials)
(let* ((key-fields (alist-ref tablename specials equal?))
(field-names (map car fields))
(has-last-update (member "last_update" field-names))
(fields-sans-lu (filter (lambda (x)
(not (member x '("id" "last_update"))))
field-names))
(get-vals (lambda (db id fields)
(debug:print-info 0 *default-log-port* "get-vals: fields="fields", id="id)
(let* ((qry (conc "SELECT "(string-intersperse fields ",")" FROM "tablename" WHERE id=?;"))
(res #f))
(sqlite3:for-each-row
(lambda tuple
(set! res tuple))
db qry id)
res)))
(clean-up-qry (lambda (from-id)
(debug:print-info 0 *default-log-port* "key-fields="key-fields", from-id="from-id)
(let* ((vals (get-vals from-db from-id key-fields))
(qry (conc "DELETE FROM "tablename" WHERE "(string-intersperse key-fields "=? AND ")"=?;")))
(debug:print-info 0 *default-log-port* "qry: "qry", vals="vals)
(apply sqlite3:execute to-db qry vals))))
(get-ids (lambda (db)
(sqlite3:fold-row (lambda (res id)
(cons id res))
'()
db
(conc "SELECT id FROM "tablename";"))))
(get-val (lambda (db fieldname id)
(let* ((res #f)
(sql (conc "SELECT "fieldname" FROM "tablename" WHERE id=?;")))
(sqlite3:for-each-row
(lambda (val)
(set! res val))
db
sql
id)
;; (debug:print-info 0 *default-log-port* "get-val "db" "fieldname" "id", sql="sql", res="res)
res)))
(get-row (lambda (db id)
(let* ((res #f))
(sqlite3:for-each-row
(lambda tuple
(set! res tuple))
db
(conc "SELECT " (string-intersperse fields-sans-lu ",")
" FROM "tablename" WHERE id=?;")
id)
res)))
(ins-row (lambda (db id row)
(let* ((qry (conc "INSERT INTO "tablename" (id,"
(string-intersperse fields-sans-lu ",")
") VALUES ("id","
(string-intersperse
(make-list (length fields-sans-lu) "?")
",")
");"))
(proc (lambda ()
(apply sqlite3:execute db qry row))))
;; (debug:print-info 0 *default-log-port* "qry="qry)
(handle-exceptions ;; on exception do the cleanup qry then try one more time
exn
(begin
(clean-up-qry id)
(proc))
(proc)))))
(num-inserts 0)
(num-updates 0)
)
;; (debug:print-info 0 *default-log-port* "field-names: "field-names", fields-sans-lu: "fields-sans-lu)
(sqlite3:with-transaction
from-db
(lambda ()
(let* ((from-ids (get-ids from-db)))
;; (debug:print-info 0 *default-log-port* "Table "tablename", has "(length from-ids)" records.")
(sqlite3:with-transaction
to-db
(lambda ()
(let* ((to-ids (get-ids to-db)))
;; (debug:print 0 *default-log-port* "to-ids="to-ids)
(for-each ;; from-id
(lambda (from-id)
(if (member from-id to-ids)
(for-each ;; case where record exists, do one by one the fields if different
(lambda (fieldname)
(let* ((from-val (get-val from-db fieldname from-id))
(dest-val (get-val to-db fieldname from-id)))
#;(debug:print 0 *default-log-port*
"fieldname="fieldname
", from-id="from-id
", from-val="from-val
", dest-val="dest-val
)
(if (not (equal? from-val dest-val))
(let* ((qry-proc (lambda ()
(sqlite3:execute to-db (conc "UPDATE "tablename" SET "fieldname"=? WHERE id=?;")
from-val from-id))))
(handle-exceptions ;; try to remove the offending record and re-try once the update
exn
(begin
(clean-up-qry from-id)
(qry-proc))
(qry-proc))
(set! num-updates (+ num-updates 1))))))
fields-sans-lu)
(let ((row (get-row from-db from-id))) ;; need to insert the row
;; (debug:print 0 *default-log-port* "row="row)
(set! num-inserts (+ num-inserts 1))
(ins-row to-db from-id row))))
from-ids)))))))
(+ num-inserts num-updates)))
;; (for-each ;; table
;; (lambda (tabledat)
;; (let* ((tablename (car tabledat))
;; (fields (cdr tabledat))
;; (has-last-update (member "last_update" fields))
;; (use-last-update (dbmod:calc-use-last-update has-last-update fields last-update))
;; (last-update-value (if use-last-update ;; no need to check for has-last-update - it is already accounted for
;; (if (number? last-update)
;; last-update
;; (cdr last-update))
;; #f))
;; (last-update-field (if use-last-update
;; (if (number? last-update)
;; "last_update"
;; (car last-update))
;; #f))
;; (num-fields (length fields))
;; (field->num (make-hash-table))
;; (num->field (apply vector (map car fields))) ;; BBHERE
;; (full-sel (conc "SELECT " (string-intersperse (map car fields) ",")
;; " FROM " tablename (if use-last-update ;; apply last-update criteria
;; (conc " WHERE " last-update-field " >= " last-update-value)
;; "")
;; ";"))
;; (full-ins (conc "INSERT OR REPLACE INTO " tablename " ( " (string-intersperse (map car fields) ",") " ) "
;; " VALUES ( " (string-intersperse (make-list num-fields "?") ",") " );"))
;; (fromdat '())
;; (fromdats '())
;; (totrecords 0)
;; (batch-len 10000000) ;; (string->number (or (configf:lookup *configdat* "sync" "batchsize") "100")))
;; (todat (make-hash-table))
;; (count 0)
;; (field-names (map car fields)))
;;
;; (debug:print-info 0 *default-log-port* "Syncing table "tablename)
;;
;; ;; set up the field->num table
;; (for-each
;; (lambda (field)
;; (hash-table-set! field->num field count)
;; (set! count (+ count 1)))
;; fields)
;;
;; ;; read the source table
;; ;; store a list of all rows in the table in fromdat, up to batch-len.
;; ;; Then add fromdat to the fromdats list, clear fromdat and repeat.
;; (sqlite3:for-each-row
;; (lambda (a . b)
;; (set! fromdat (cons (apply vector a b) fromdat))
;; (if (> (length fromdat) batch-len)
;; (begin
;; (set! fromdats (cons fromdat fromdats))
;; (set! fromdat '())
;; (set! totrecords (+ totrecords 1)))))
;; fromdb
;; full-sel)
;;
;; (debug:print-info 0 *default-log-port* "Have "totrecords" records to update.")
;; ;; Count less than batch-len as a record
;; (if (> (length fromdat) 0)
;; (set! totrecords (+ totrecords 1)))
;;
;; ;; tack on remaining records in fromdat
;; (if (not (null? fromdat))
;; (set! fromdats (cons fromdat fromdats)))
;;
;; (sqlite3:for-each-row
;; (lambda (a . b)
;; (hash-table-set! todat a (apply vector a b)))
;; todb
;; full-sel)
;;
;; ;; first pass implementation, just insert all changed rows
;;
;; (let* ((db todb)
;; (has-last-update (member "last_update" field-names))
;; (drp-trigger (if has-last-update
;; (db:drop-trigger db tablename)
;; #f))
;; (is-trigger-dropped (if has-last-update
;; (db:is-trigger-dropped db tablename)
;; #f))
;; (stmth (sqlite3:prepare db full-ins))
;; (changed-rows 0))
;; (for-each
;; (lambda (fromdat-lst)
;; (mutex-lock! *db-transaction-mutex*)
;; (sqlite3:with-transaction
;; db
;; (lambda ()
;; (for-each ;;
;; (lambda (fromrow)
;; (let* ((a (vector-ref fromrow 0))
;; (curr (hash-table-ref/default todat a #f))
;; (same #t))
;; (let loop ((i 0))
;; (if (or (not curr)
;; (not (equal? (vector-ref fromrow i)(vector-ref curr i))))
;; (set! same #f))
;; (if (and same
;; (< i (- num-fields 1)))
;; (loop (+ i 1))))
;; (if (not same)
;; (begin
;; (apply sqlite3:execute stmth (vector->list fromrow))
;; (hash-table-set! numrecs tablename (+ 1 (hash-table-ref/default numrecs tablename 0)))
;; (set! changed-rows (+ changed-rows 1))))))
;; fromdat-lst)))
;; (mutex-unlock! *db-transaction-mutex*))
;; fromdats)
;;
;; (sqlite3:finalize! stmth)
;; (if (member "last_update" field-names)
;; (db:create-trigger db tablename)))
;; ))
;; tbls)
;; (let* ((runtime (- (current-milliseconds) start-time))
;; (should-print (or ;; (debug:debug-mode 12)
;; (common:low-noise-print 120 "db sync")
;; (> runtime 500)))) ;; low and high sync times treated as separate.
;; (for-each
;; (lambda (dat)
;; (let ((tblname (car dat))
;; (count (cdr dat)))
;; (set! tot-count (+ tot-count count))))
;; (sort (hash-table->alist numrecs)(lambda (a b)(> (cdr a)(cdr b))))))
(define (has-last-update dbh tablename)
(let* ((has-last #f))
(sqlite3:for-each-row
(lambda (name)
(if (equal? name "last_update")
(set! has-last #t)))
dbh
(conc "SELECT name FROM pragma_table_info('"tablename"') as tblInfo;"))
has-last))
;; tbls is ( ("tablename" ( "field1" [#f|proc1] ) ( "field2" [#f|proc2] ) .... ) )
;;
;; direction = fromdest, todest
;; mode = 'full, 'incr
;;
;; Idea: youngest in dest is last_update time
;;
(define (dbmod:attach-sync tables dbh destdbfile direction #!key
(mode 'full)
(no-update '("keys")) ;; do
)
(debug:print 0 *default-log-port* "Doing sync "direction" "destdbfile)
(if (not (sqlite3:auto-committing? dbh))
(debug:print 0 *default-log-port* "Skipping sync due to transaction in flight.")
(let* ((table-names (map car tables))
(dest-exists (file-exists? destdbfile))
(start-time (current-milliseconds)))
(assert dest-exists "FATAL: sync called with non-existant file, "destdbfile)
;; attach the destdbfile
;; for each table
;; insert into dest.<table> select * from src.<table> where last_update>last_update
;; done
(debug:print 0 *default-log-port* "Attaching "destdbfile" as auxdb")
(sqlite3:execute dbh (conc "ATTACH '"destdbfile"' AS auxdb;"))
(for-each
(lambda (table)
(let* ((tbldat (alist-ref table tables equal?))
(fields (map car tbldat))
(no-id-fields (filter (lambda (x)(not (equal? x "id"))) fields))
(fields-str (string-intersperse fields ","))
(no-id-fields-str (string-intersperse no-id-fields ","))
(dir (eq? direction 'todest))
(fromdb (if dir "" "auxdb."))
(todb (if dir "auxdb." ""))
(set-str (string-intersperse
(map (lambda (field)
(conc fromdb field"="todb field))
fields)
","))
(stmt1 (conc "INSERT OR IGNORE INTO "todb table
" SELECT * FROM "fromdb table";"))
(stmt8 (conc "UPDATE "todb table" SET ("no-id-fields-str") = (SELECT "no-id-fields-str" FROM "fromdb table" WHERE "todb table".id="fromdb table".id"
(if (member "last_update" fields)
(conc " AND "fromdb table".last_update > "todb table".last_update);")
");")))
(start-ms (current-milliseconds)))
;; (debug:print 0 *default-log-port* "stmt8="stmt8)
;; (if (sqlite3:auto-committing? dbh)
;; (begin
(mutex-lock! *db-transaction-mutex*)
(sqlite3:with-transaction
dbh
(lambda ()
(debug:print-info 0 *default-log-port* "Sync from "fromdb table" to "todb table" using INSERT OR UPDATE")
(sqlite3:execute dbh stmt1) ;; get all new rows
(if (member "last_update" fields)
(sqlite3:execute dbh stmt8)) ;; get all updated rows
;; (sqlite3:execute dbh stmt5)
;; (sqlite3:execute dbh stmt4) ;; if it worked this would be better for incremental up
;; (sqlite3:execute dbh stmt6)
))
(debug:print 0 *default-log-port* "Synced table "table
" in "(- (current-milliseconds) start-ms)"ms") ;; )
(mutex-unlock! *db-transaction-mutex*)))
;; (debug:print 0 *default-log-port* "Skipping sync of table "table" due to transaction in flight."))))
table-names)
(sqlite3:execute dbh "DETACH auxdb;")
(debug:print-info 0 *default-log-port* "Total sync time: "(- (current-milliseconds) start-time)"ms")
-1)))
;; FAILED ATTEMPTS
;; (if (not (has-last-update dbh table))
;; (sqlite3:execute dbh (conc "ALTER TABLE "table" ADD COLUMN last_update INTEGER;")))
;; (if (not (has-last-update dbh (conc "auxdb."table)))
;; (sqlite3:execute dbh (conc "ALTER TABLE auxdb."table" ADD COLUMN last_update INTEGER;")))
;; (stmt2 (conc "INSERT OR REPLACE INTO "todb table
;; " SELECT * FROM "fromdb table" WHERE "
;; fromdb table".last_update > "
;; todb table".last_update;"))
;; (stmt3 (conc "INSERT OR REPLACE INTO "todb"."table
;; " SELECT * FROM "fromdb table";"))
;; (stmt4 (conc "DELETE FROM "todb table" WHERE "fromdb
;; table ".last_update > "todb table".last_update;"))
;; (stmt5 (conc "DELETE FROM "todb table";"))
;; (stmt6 (conc "INSERT OR REPLACE INTO "todb table" ("fields-str") SELECT "fields-str" FROM "fromdb table";"))
;; (stmt7 (conc "UPDATE "todb table" SET "set-str (if (member "last_update" fields)
;; (conc " WHERE "fromdb table".last_update > "todb table".last_update;")
;; ";")))
;; prefix is "" or "auxdb."
;;
;; (define (dbmod:last-update-patch dbh prefix)
;; (let ((
;; tbls is ( ("tablename" ( "field1" [#f|proc1] ) ( "field2" [#f|proc2] ) .... ) )
;;
;; direction = fromdest, todest
;; mode = 'full, 'incr
;;
;; Idea: youngest in dest is last_update time
;;
(define (dbmod:new-sync tables dbh1 dbh2 destdbfile direction #!key
(mode 'full))
(debug:print 0 *default-log-port* "Doing sync "direction" "destdbfile)
(if (not (sqlite3:auto-committing? dbh1))
(debug:print 0 *default-log-port* "Skipping sync due to transaction in flight.")
(let* ((table-names (map car tables))
(dest-exists (file-exists? destdbfile)))
(assert dest-exists "FATAL: sync called with non-existant file, "destdbfile)
(for-each
(lambda (table)
(let* ((tbldat (alist-ref table tables equal?))
(fields (map car tbldat))
(no-id-fields (filter (lambda (x)(not (equal? x "id"))) fields))
(questionmarks (string-intersperse (make-list (length no-id-fields) "?") ","))
(fields-str (string-intersperse fields ","))
(no-id-fields-str (string-intersperse no-id-fields ","))
(dir (eq? direction 'todest))
(fromdb (if dir dbh1 dbh2))
(todb (if dir dbh2 dbh1))
(set-str (string-intersperse
(map (lambda (field)
(conc fromdb field"="todb field))
fields)
","))
;; (stmt1 (conc "INSERT OR IGNORE INTO "todb table
;; " SELECT * FROM "fromdb table";"))
;; (stmt8 (conc "UPDATE "todb table" SET ("no-id-fields-str") = (SELECT "no-id-fields-str" FROM "fromdb table " WHERE "todb table".id="fromdb table".id"
;; (if (member "last_update" fields)
;; (conc " AND "fromdb table".last_update > "todb table".last_update);")
;; ");")))
(stmt1 (conc "SELECT MAX(last_update) FROM "table";")) ;; use the highest last_update as your time reference
(stmt2 (conc "SELECT no-id-fields-str FROM "table" WHERE last_update>?;"))
(stmt3 (conc "UPDATE "table" SET ("no-id-fields-str") = ("questionmarks") WHERE id=?;"))
(start-ms (current-milliseconds)))
(debug:print 0 *default-log-port* "stmt3="stmt3)
(if (sqlite3:auto-committing? dbh1)
(begin
(sqlite3:with-transaction
dbh1
(lambda ()
(sqlite3:execute dbh1 stmt1) ;; get all new rows
#;(if (member "last_update" fields)
(sqlite3:execute dbh1 stmt8)) ;; get all updated rows
;; (sqlite3:execute dbh stmt5)
;; (sqlite3:execute dbh stmt4) ;; if it worked this would be better for incremental up
;; (sqlite3:execute dbh stmt6)
))
(debug:print 0 *default-log-port* "Synced table "table
" in "(- (current-milliseconds) start-ms)"ms"))
(debug:print 0 *default-log-port* "Skipping sync of table "table" due to transaction in flight."))))
table-names)
(sqlite3:execute dbh1 "DETACH auxdb;"))))
;;======================================================================
;; Moved from dbfile
;;======================================================================
;; wait up to aprox n seconds for a journal to go away
;;
(define (tasks:wait-on-journal path n #!key (remove #f)(waiting-msg #f))
(if (not (string? path))
(debug:print-error 0 *default-log-port* "Called tasks:wait-on-journal with path=" path " (not a string)")
(let ((fullpath (conc path "-journal")))
(handle-exceptions
exn
(begin
(print-call-chain (current-error-port))
(debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn))
(debug:print 5 *default-log-port* " exn=" (condition->list exn))
(debug:print 0 *default-log-port* "tasks:wait-on-journal failed. Continuing on, you can ignore this call-chain")
#t) ;; if stuff goes wrong just allow it to move on
(let loop ((journal-exists (file-exists? fullpath))
(count n)) ;; wait ten times ...
(if journal-exists
(begin
(if (and waiting-msg
(eq? (modulo n 30) 0))
(debug:print 0 *default-log-port* waiting-msg))
(if (> count 0)
(begin
(thread-sleep! 1)
(loop (file-exists? fullpath)
(- count 1)))
(begin
(debug:print 0 *default-log-port* "ERROR: removing the journal file " fullpath ", this is not good. Look for disk full, write access and other issues.")
(if remove (system (conc "rm -rf " fullpath)))
#f)))
#t))))))
;;======================================================================
;; M E T A G E T A N D S E T V A R S
;;======================================================================
;; returns number if string->number is successful, string otherwise
;; also updates *global-delta*
;;
(define (db:get-var dbstruct var)
(let* ((res #f))
(db:with-db
dbstruct #f #f ;; for the moment vars are only stored in main.db
(lambda (dbdat db)
(sqlite3:for-each-row
(lambda (val)
(set! res val))
db
"SELECT val FROM metadat WHERE var=?;" var)
;; convert to number if can
(if (string? res)
(let ((valnum (string->number res)))
(if valnum (set! res valnum))))
res))))
(define (db:inc-var dbstruct var)
(db:with-db dbstruct #f #t
(lambda (dbdat db)
(sqlite3:execute db "UPDATE metadat SET val=val+1 WHERE var=?;" var))))
(define (db:dec-var dbstruct var)
(db:with-db dbstruct #f #t
(lambda (dbdat db)
(sqlite3:execute db "UPDATE metadat SET val=val-1 WHERE var=?;" var))))
;; This was part of db:get-var. It was used to estimate the load on
;; the database files.
;;
;; scale by 10, average with current value.
;; (set! *global-delta* (/ (+ *global-delta* (* (- (current-milliseconds) start-ms)
;; (if throttle throttle 0.01)))
;; 2))
;; (if (> (abs (- *last-global-delta-printed* *global-delta*)) 0.08) ;; don't print all the time, only if it changes a bit
;; (begin
;; (debug:print-info 4 *default-log-port* "launch throttle factor=" *global-delta*)
;; (set! *last-global-delta-printed* *global-delta*)))
(define (db:set-var dbstruct var val)
(db:with-db dbstruct #f #t
(lambda (dbdat db)
(sqlite3:execute (db:get-cache-stmth dbdat db "INSERT OR REPLACE INTO metadat (var,val) VALUES (?,?);")
var val))))
(define (db:add-var dbstruct var val)
(db:with-db dbstruct #f #t
(lambda (dbdat db)
(sqlite3:execute (db:get-cache-stmth dbdat db "UPDATE metadat SET val=val+? WHERE var=?;") val var))))
(define (db:del-var dbstruct var)
(db:with-db dbstruct #f #t
(lambda (dbdat db)
(sqlite3:execute (db:get-cache-stmth dbdat db "DELETE FROM metadat WHERE var=?;") var))))
(define (db:get-toplevels-and-incompletes dbstruct run-id running-deadtime remotehoststart-deadtime)
(let* ((toplevels '())
(oldlaunched '())
(incompleted '()))
(db:with-db
dbstruct run-id #t ;; not a write but problemtic
(lambda (dbdat db)
(let* ((stmth1 (db:get-cache-stmth
dbdat db
"SELECT id,rundir,uname,testname,item_path,event_time,run_duration FROM tests
WHERE run_id=? AND (strftime('%s','now') - event_time) > (run_duration + ?)
AND state IN ('RUNNING');"))
(stmth2 (db:get-cache-stmth
dbdat db
"SELECT id,rundir,uname,testname,item_path,event_time,run_duration FROM tests
WHERE run_id=? AND (strftime('%s','now') - event_time) > (run_duration + ?)
AND state IN ('REMOTEHOSTSTART');"))
(stmth3 (db:get-cache-stmth
dbdat db
"SELECT id,rundir,uname,testname,item_path FROM tests
WHERE run_id=? AND (strftime('%s','now') - event_time) > 86400
AND state IN ('LAUNCHED');")))
;; in RUNNING or REMOTEHOSTSTART for more than 10 minutes
;;
;; HOWEVER: this code in run:test seems to work fine
;; (> (- (current-seconds)(+ (db:test-get-event_time testdat)
;; (db:test-get-run_duration testdat)))
;; 600)
(sqlite3:for-each-row
(lambda (test-id run-dir uname testname item-path event-time run-duration)
(if (and (equal? uname "n/a")
(equal? item-path "")) ;; this is a toplevel test
;; what to do with toplevel? call rollup?
(begin
(set! toplevels (cons (list test-id run-dir uname testname item-path run-id) toplevels))
(debug:print-info 0 *default-log-port* "Found old toplevel test in RUNNING state, test-id=" test-id))
(begin
(set! incompleted (cons (list test-id run-dir uname testname item-path run-id) incompleted))
(debug:print-info 0 *default-log-port* "Found old test in RUNNING state, test-id="
test-id" exceeded running-deadtime "running-deadtime" now="(current-seconds)
" event-time="event-time" run-duration="run-duration))))
stmth1
run-id running-deadtime) ;; default time 720 seconds
(sqlite3:for-each-row
(lambda (test-id run-dir uname testname item-path event-time run-duration)
(if (and (equal? uname "n/a")
(equal? item-path "")) ;; this is a toplevel test
;; what to do with toplevel? call rollup?
(begin
(set! toplevels (cons (list test-id run-dir uname testname item-path run-id) toplevels))
(debug:print-info 0 *default-log-port* "Found old toplevel test in RUNNING state, test-id=" test-id))
(begin
(debug:print-info 0 *default-log-port* "Found old test in REMOTEHOSTSTART state, test-id=" test-id
" exceeded running-deadtime "running-deadtime" now="(current-seconds)" event-time="event-time
" run-duration="run-duration)
(set! incompleted (cons (list test-id run-dir uname testname item-path run-id) incompleted)))))
stmth2
run-id remotehoststart-deadtime) ;; default time 230 seconds
;; in LAUNCHED for more than one day. Could be long due to job queues TODO/BUG: Need override for this in config
(sqlite3:for-each-row
(lambda (test-id run-dir uname testname item-path)
(if (and (equal? uname "n/a")
(equal? item-path "")) ;; this is a toplevel test
;; what to do with toplevel? call rollup?
(set! toplevels (cons (list test-id run-dir uname testname item-path run-id) toplevels))
(begin
(debug:print-info 0 *default-log-port* "Found old test in LAUNCHED state, test-id=" test-id
" 1 day since event_time marked")
(set! oldlaunched (cons (list test-id run-dir uname testname item-path run-id) oldlaunched)))))
stmth3
run-id))))
(list incompleted oldlaunched toplevels)))
(define (db:set-state-status-by-state-status dbstruct run-id testname currstate currstatus newstate newstatus)
;; clear caches needed
(let* ((qry (conc "UPDATE tests SET state=?,status=? WHERE "
(if currstate (conc "state='" currstate "' AND ") "")
(if currstatus (conc "status='" currstatus "' AND ") "")
" run_id=? AND testname LIKE ?;")))
(db:with-db
dbstruct
run-id
#t
(lambda (dbdat db)
(sqlite3:execute db qry
(or newstate currstate "NOT_STARTED")
(or newstatus currstate "UNKNOWN")
run-id testname)))))
;;======================================================================
;; db to db sync
;;======================================================================
(define (dbmod:db-to-db-sync src-db dest-db last-update init-proc keys)
(if (and (file-exists? src-db)
(file-read-access? src-db))
(let* ((d-wr (or (and (file-exists? dest-db)
(file-write-access? dest-db)) ;; exists and writable
(let* ((dirname (or (pathname-directory dest-db)
".")))
(if dirname
(file-exists? dirname)
(file-write-access? dirname)))))
(tables (db:sync-all-tables-list keys))
(sdb (dbmod:safely-open-db src-db init-proc #t))
(ddb (dbmod:safely-open-db dest-db init-proc d-wr)))
(dbmod:sync-gasket tables last-update sdb ddb dest-db 'todest keys))
#f
))
)