;;======================================================================
;; Copyright 2017, Matthew Welland.
;;
;; This file is part of Megatest.
;;
;; Megatest is free software: you can redistribute it and/or modify
;; it under the terms of the GNU General Public License as published by
;; the Free Software Foundation, either version 3 of the License, or
;; (at your option) any later version.
;;
;; Megatest is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
;; GNU General Public License for more details.
;;
;; You should have received a copy of the GNU General Public License
;; along with Megatest. If not, see <http://www.gnu.org/licenses/>.
;;======================================================================
(use srfi-18)
(declare (unit dbfile))
(declare (uses debugprint))
(declare (uses commonmod))
(module dbfile
*
(import scheme
chicken
data-structures
extras
matchable
(prefix sqlite3 sqlite3:)
posix typed-records
srfi-18
srfi-1
srfi-69
stack
files
ports
commonmod
debugprint
)
;; parameters
;;
(define dbfile:testsuite-name (make-parameter #f))
(define keep-age-param (make-parameter 10)) ;; qif file age, if over move to attic
(define num-run-dbs (make-parameter 10)) ;; number of db's in .mtdb
(define dbfile:sync-method (make-parameter 'attach)) ;; 'attach or 'original
(define dbfile:cache-method (make-parameter 'cachedb)) ;; 'direct
(define dbcache-mode (make-parameter 'tmp)) ;; 'cachedb, 'tmp (changes what open cachedb routine does)
;; moved from tcp-transportmod so that it can be used in launch.scm
(define tt-server-profile-string (make-parameter ""))
;; 'original - use old condition code
;; 'suicide-mode - create mtrah/stop-the-train with info on what went wrong
;; else use no condition code (should be production mode)
;;
(define no-condition-db-with-db (make-parameter 'suicide-mode))
;;======================================================================
;; R E C O R D S
;;======================================================================
;; a single Megatest area with it's multiple dbs is
;; managed in a dbstruct
;;
(defstruct dbr:dbstruct
(areapath #f)
(homehost #f)
(tmppath #f)
(read-only #f)
(subdbs (make-hash-table))
;;
;; for the cachedb approach (see dbmod.scm)
;; this is one db per server
(cachedb #f) ;; handle for the in memory copy
(dbfile #f) ;; path to the db file on disk
(dbfname #f) ;; short name of db file on disk (used to validate accessing correct db)
(ondiskdb #f) ;; handle for the on-disk file
(dbtmpname #f) ;; path to db file in /tmp (non-imem method)
(dbdat #f) ;; create a dbdat for the downstream calls such as db:with-db
(last-update 0)
(sync-proc #f)
)
;; NOTE: Need one dbr:subdb per main.db, 1.db ...
;;
(defstruct dbr:subdb
(dbname #f) ;; .mtdb/1.db
(mtdbfile #f) ;; mtrah/.mtdb/1.db
(mtdbdat #f) ;; only need one of these for syncing
;; (dbdats (make-hash-table)) ;; id => dbdat
(tmpdbfile #f) ;; /tmp/.../.mtdb/1.db
;; (refndbfile #f) ;; /tmp/.../.mtdb/1.db_ref
(dbstack (make-stack)) ;; stack for tmp dbr:dbdat,
(homehost #f) ;; not used yet
(on-homehost #f) ;; not used yet
(read-only #f)
(last-sync 0)
(last-write (current-seconds))
) ;; goal is to converge on one struct for an area but for now it is too confusing
;; need to keep dbhandles and cached statements together
(defstruct dbr:dbdat
(dbfile #f)
(dbh #f)
(stmt-cache (make-hash-table))
(read-only #f)
(birth-sec (current-seconds)))
;; used in simple-get-runs (thanks Brandon!)
(define-record simple-run target id runname state status owner event_time)
(define-record-printer (simple-run x out)
(fprintf out "#,(simple-run ~S ~S ~S ~S)"
(simple-run-target x) (simple-run-id x) (simple-run-runname x) (time->string (seconds->local-time (simple-run-event_time x) ))))
(define *dbstruct-dbs* #f)
(define *db-open-mutex* (make-mutex))
(define *db-access-mutex* (make-mutex)) ;; used in common.scm
(define *no-sync-db* #f)
(define *db-sync-in-progress* #f)
(define *db-with-db-mutex* (make-mutex))
(define *max-api-process-requests* 0)
(define *api-process-request-count* 0)
(define *db-write-access* #t)
(define *db-last-sync* 0) ;; last time the sync to megatest.db happened
(define *db-multi-sync-mutex* (make-mutex)) ;; protect access to *db-sync-in-progress*, *db-last-sync*
(define *db-last-access* (current-seconds))
(define *db-transaction-mutex* (make-mutex))
(define (db:generic-error-printout exn . message)
(print-call-chain (current-error-port))
(apply dbfile:print-err message)
(dbfile:print-err
", error: " ((condition-property-accessor 'exn 'message) exn)
", arguments: " ((condition-property-accessor 'exn 'arguments) exn)
", location: " ((condition-property-accessor 'exn 'location) exn)
))
(define (dbfile:run-id->key run-id)
(or run-id 'main))
(define (db:safely-close-sqlite3-db db stmt-cache #!key (try-num 3))
(if (<= try-num 0)
#f
(handle-exceptions
exn
(begin
(print "Attempt to safely close sqlite3 db failed. Trying again. exn=" exn)
(thread-sleep! 3)
(sqlite3:interrupt! db)
(db:safely-close-sqlite3-db db stmt-cache try-num: (- try-num 1)))
(if (sqlite3:database? db)
(let* ((stmts (and stmt-cache (hash-table-ref/default stmt-cache db #f))))
(if stmts (map sqlite3:finalize! (hash-table-values stmts)))
(sqlite3:finalize! db)
#t)
(begin
(dbfile:print-err "db:safely-close-sqlite3-db: " db " is not an sqlite3 db")
#f
)
))))
;; close all opened run-id dbs
(define (db:close-all dbstruct)
(if (dbr:dbstruct? dbstruct)
;; (handle-exceptions
;; exn
;; (begin
;; (debug:print 0 *default-log-port* "WARNING: Finalizing failed, " ((condition-property-accessor 'exn 'message) exn) ", note - exn=" exn)
;; (print-call-chain *default-log-port*))
;; (db:sync-touched dbstruct 0 force-sync: #t) ;; NO. Do not do this here. Instead we rely on a server to be started when there are writes, even if the server itself is not going to be used as a server.
(let* ((subdbs (hash-table-values (dbr:dbstruct-subdbs dbstruct))))
(for-each
(lambda (subdb)
(let* ((tdbs (stack->list (dbr:subdb-dbstack subdb)))
(mtdbdat (dbr:dbdat-dbh (dbr:subdb-mtdbdat subdb)))
#;(rdb (dbr:dbdat-dbh (dbr:subdb-refndb subdb))))
(map (lambda (dbdat)
(let* ((stmt-cache (dbr:dbdat-stmt-cache dbdat))
(dbh (dbr:dbdat-dbh dbdat)))
(db:safely-close-sqlite3-db dbh stmt-cache)))
tdbs)
(db:safely-close-sqlite3-db mtdbdat (dbr:dbdat-stmt-cache (dbr:subdb-mtdbdat subdb)))
;; (if (sqlite3:database? mdb) (sqlite3:finalize! mdb))
#;(db:safely-close-sqlite3-db rdb #f))) ;; stmt-cache))))) ;; (if (sqlite3:database? rdb) (sqlite3:finalize! rdb))))))
subdbs)
#t
)
#f
)
)
(define (dbfile:make-tmpdir-name areapath tmpadj)
(let* ((dname (conc "/tmp/"(current-user-name)"/" (string-translate areapath "/" ".") tmpadj)))
(unless (directory-exists? dname)
(create-directory dname #t))
dname))
(define (dbfile:run-id->path apath run-id)
(conc apath"/"(dbfile:run-id->dbname run-id)))
(define (db:dbname->path apath dbname)
(conc apath"/"dbname))
(define (dbfile:run-id->dbnum run-id)
(cond
((number? run-id)
(modulo run-id (num-run-dbs)))
((not run-id) "main") ;; 0 or main? No, not 0.
(else
(assert #f "FATAL: run-id is required to be a number or #f"))))
;; just the filename
(define (dbfile:run-id->dbfname run-id)
(conc (dbfile:run-id->dbnum run-id)".db"))
;; the path in MTRAH with the filename
(define (dbfile:run-id->dbname run-id)
(conc ".mtdb/"(dbfile:run-id->dbfname run-id)))
;; Make the dbstruct, setup up auxillary db's and call for main db at least once
;;
;; called in http-transport and replicated in rmt.scm for *local* access.
;;
(define (dbfile:setup do-sync areapath tmppath)
(cond
(*dbstruct-dbs*
(dbfile:print-err "WARNING: dbfile:setup called when *dbstruct-dbs* is already initialized")
*dbstruct-dbs*) ;; TODO: when multiple areas are supported, this optimization will be a hazard
(else
(let* ((dbstruct (make-dbr:dbstruct areapath: areapath tmppath: tmppath)))
(set! *dbstruct-dbs* dbstruct)
dbstruct))))
(define (dbfile:get-subdb dbstruct run-id)
(let* ((dbfname (dbfile:run-id->dbname run-id)))
(hash-table-ref/default (dbr:dbstruct-subdbs dbstruct) dbfname #f)))
(define (dbfile:set-subdb dbstruct run-id subdb)
(hash-table-set! (dbr:dbstruct-subdbs dbstruct) (dbfile:run-id->dbname run-id) subdb))
;; (define *dbfile:num-handles-in-use* 0)
;; Get/open a database.
;;
;; NOTE: most usage should call dbfile:open-db to get a dbdat
;;
;; if run-id => get run specific db
;; if #f => get main db
;; if run-id is a string treat it as a filename - DON'T use this - we'll get rid of it.
;; if db already open - return cachedb
;; if db not open, open cachedb, rundb and sync then return cachedb
;; inuse gets set automatically for rundb's
;;
(define (dbfile:get-dbdat dbstruct run-id)
(let* ((subdb (dbfile:get-subdb dbstruct run-id)))
(if (stack-empty? (dbr:subdb-dbstack subdb))
#f
(begin
(stack-pop! (dbr:subdb-dbstack subdb))))))
;; return a previously opened db handle to the stack of available handles
(define (dbfile:add-dbdat dbstruct run-id dbdat)
(let* ((subdb (dbfile:get-subdb dbstruct run-id))
(dbstk (dbr:subdb-dbstack subdb))
(count (stack-count dbstk)))
(if (> count 15)
(dbfile:print-err "WARNING: stack for "run-id".db is "count"."))
(stack-push! dbstk dbdat)
dbdat))
;; set up a subdb
;;
(define (dbfile:init-subdb dbstruct run-id init-proc)
(let* ((dbname (dbfile:run-id->dbname run-id))
(areapath (dbr:dbstruct-areapath dbstruct))
(tmppath (dbr:dbstruct-tmppath dbstruct))
(mtdbpath (dbfile:run-id->path areapath run-id))
(tmpdbpath (dbfile:run-id->path tmppath run-id))
(mtdbdat (dbfile:open-sqlite3-db mtdbpath init-proc sync-mode: 0 journal-mode: #f)) ;; "WAL"))
(newsubdb (make-dbr:subdb dbname: dbname
mtdbfile: mtdbpath
tmpdbfile: tmpdbpath
mtdbdat: mtdbdat)))
(dbfile:set-subdb dbstruct run-id newsubdb)
newsubdb)) ;; return the new subdb - but shouldn't really use it
;; returns dbdat with dbh and dbfilepath
;;
;; NOTE: the handle is on /tmp db file!
;;
;; 1. if needed setup the subdb for the given run-id
;; 2. if there is no existing db handle in the stack
;; create a new handle and return it (do NOT add
;; it to the stack).
;;
(define (dbfile:open-db dbstruct run-id init-proc)
(let* ((subdb (dbfile:get-subdb dbstruct run-id)))
(if (not subdb) ;; not yet defined
(begin
(dbfile:init-subdb dbstruct run-id init-proc)
(dbfile:open-db dbstruct run-id init-proc))
(let* ((dbdat (dbfile:get-dbdat dbstruct run-id)))
(if dbdat
dbdat
(let* ((tmppath (dbr:dbstruct-tmppath dbstruct))
(tmpdbpath (dbfile:run-id->path tmppath run-id))
(dbdat (dbfile:open-sqlite3-db tmpdbpath init-proc sync-mode: 0 journal-mode: "WAL")))
;; the following line short-circuits the "one db handle per thread" model
;;
;; (dbfile:add-dbdat dbstruct run-id dbdat)
;;
dbdat))))))
;; COMBINE dbfile:open-sqlite-db and dbfile:lock-create-open
;;
;; this stuff is for initial debugging, please remove it when
;; this code stabilizes
(define *dbopens* (make-hash-table))
(define (dbfile:inc-db-open dbfile)
(let* ((curr-opens-count (+ (hash-table-ref/default *dbopens* dbfile 0) 1)))
(if (and (> curr-opens-count 1) ;; this should NOT be happening
(common:low-noise-print 15 "db-opens"))
(dbfile:print-err "INFO: db "dbfile" has been opened "curr-opens-count" times!"))
(hash-table-set! *dbopens* dbfile curr-opens-count)
curr-opens-count))
;; Open the classic megatest.db file (defaults to open in toppath)
;;
;; NOTE: returns a dbdat not a dbstruct!
;;
(define (dbfile:open-sqlite3-db dbpath init-proc #!key (sync-mode 0)(journal-mode #f))
(let* ((dbexists (file-exists? dbpath))
(write-access (file-write-access? dbpath))
(db (dbfile:cautious-open-database dbpath init-proc sync-mode journal-mode)))
(dbfile:inc-db-open dbpath)
;; (init-proc db)
(make-dbr:dbdat dbfile: dbpath dbh: db read-only: (not write-access))))
(define (dbfile:print-and-exit . params)
(with-output-to-port
(current-error-port)
(lambda ()
(apply print params)))
(exit 1))
(define (dbfile:print-err . params)
(with-output-to-port
(current-error-port)
(lambda ()
(apply print params))))
(define (dbfile:cautious-open-database fname init-proc sync-mode journal-mode #!optional (tries-left 500))
(let* ((busy-file (conc fname "-journal"))
(delay-time (* (- 51 tries-left) 1.1))
(write-access (file-write-access? fname))
(dir-access (file-write-access? (pathname-directory fname)))
(retry (lambda ()
(thread-sleep! delay-time)
(if (> tries-left 0)
(dbfile:cautious-open-database fname init-proc
sync-mode journal-mode
(- tries-left 1))))))
(assert (>= tries-left 0) (conc "FATAL: too many attempts in dbfile:cautious-open-database of "fname", giving up."))
(if (and (file-write-access? fname)
(file-exists? busy-file))
(begin
(if (common:low-noise-print 120 busy-file)
(dbfile:print-err "INFO: dbfile:cautious-open-database: journal file "
busy-file" exists, trying again in few seconds."))
(thread-sleep! 1)
(if (eq? tries-left 2)
(begin
(dbfile:print-err "INFO: forcing journal rollup "busy-file)
(dbfile:brute-force-salvage-db fname)))
(dbfile:cautious-open-database fname init-proc sync-mode journal-mode (- tries-left 1)))
(let* ((result (condition-case
(if dir-access
(dbfile:with-simple-file-lock
(conc fname ".lock")
(lambda ()
(let* ((db-exists (file-exists? fname))
(db (sqlite3:open-database fname))) ;; creates an empty db if it did not already exist.
(sqlite3:set-busy-handler! db (sqlite3:make-busy-timeout 30000))
(if sync-mode
(sqlite3:execute db (conc "PRAGMA synchronous = "sync-mode";")))
(if journal-mode
(sqlite3:execute db (conc "PRAGMA journal_mode = "journal-mode";")))
(if (and init-proc (not db-exists))
(init-proc db))
db)))
(begin
(if (file-exists? fname )
(let ((db (sqlite3:open-database fname)))
;; pragmas synchronous not needed because this db is used read-only
;; (sqlite3:execute db (conc "PRAGMA synchronous = "mode";")
(sqlite3:set-busy-handler! db (sqlite3:make-busy-timeout 30000)) ;; read-only but still need timeout
db )
(print "file doesn't exist: " fname))))
(exn (io-error)
(dbfile:print-err exn "ERROR: i/o error with " fname ". Check permissions, disk space etc. and try again.")
(retry))
(exn (corrupt)
(dbfile:print-err exn "ERROR: database " fname " is corrupt. Repair it to proceed.")
(retry))
(exn (busy)
(dbfile:print-err exn "ERROR: database " fname
" is locked. Try copying to another location, remove original and copy back.")
(retry))
(exn (permission)(dbfile:print-err exn "ERROR: database " fname " has some permissions problem.")
(retry))
(exn ()
(dbfile:print-err exn "ERROR: Unknown error with database " fname " message: "
((condition-property-accessor 'exn 'message) exn))
(retry)))))
result))))
(define (dbfile:brute-force-salvage-db fname)
(let* ((backupfname (conc fname"-"(current-process-id)".bak"))
(cmd (conc "cp "fname" "backupfname";mv "fname" "(conc fname ".delme;")
"cp "backupfname" "fname)))
(dbfile:print-err "WARNING: attempting recovery of file "fname" by running commands:\n"
" "cmd)
(system cmd)))
;; opens and returns handle and nothing else
;;
;; NOTE: this is already protected by mutex *no-sync-db-mutex*
;;
(define (dbfile:raw-open-no-sync-db dbpath)
(if (not (file-exists? dbpath))
(create-directory dbpath #t))
(debug:print-info 0 *default-log-port* "Opening "dbpath"/no-sync.db")
(let* ((dbname (conc dbpath "/no-sync.db"))
(db-exists (file-exists? dbname))
(init-proc (lambda (db)
(sqlite3:with-transaction
db
(lambda ()
;; I have been having trouble with init of no-sync.db so
;; doing the init in a transaction every time (no gating
;; on file existance.
(for-each
(lambda (stmt)
(sqlite3:execute db stmt))
(list
"CREATE TABLE IF NOT EXISTS no_sync_metadat
(var TEXT,
val TEXT,
CONSTRAINT no_sync_metadat_constraint UNIQUE (var));"
"CREATE TABLE IF NOT EXISTS no_sync_locks
(key TEXT,
val TEXT,
CONSTRAINT no_sync_metadat_constraint UNIQUE (key));"))))))
(on-tmp (equal? (car (string-split dbpath "/")) "tmp"))
(db (if on-tmp
(dbfile:cautious-open-database dbname init-proc 0 "WAL")
(dbfile:cautious-open-database dbname init-proc 0 #f)
;; (sqlite3:open-database dbname)
)))
(if on-tmp ;; done in cautious-open-database
(begin
(sqlite3:execute db "PRAGMA synchronous = 0;")
(sqlite3:set-busy-handler! db (sqlite3:make-busy-timeout 136000))))
db))
(define (dbfile:with-no-sync-db dbpath proc)
(mutex-lock! *no-sync-db-mutex*)
(let* ((already-open *no-sync-db*)
(db (or already-open (dbfile:raw-open-no-sync-db dbpath)))
(res (proc db)))
(if (not already-open)
(sqlite3:finalize! db))
(mutex-unlock! *no-sync-db-mutex*)
res))
(define *no-sync-db-mutex* (make-mutex))
(define (dbfile:open-no-sync-db dbpath)
(mutex-lock! *no-sync-db-mutex*)
(let* ((res (if *no-sync-db*
*no-sync-db*
(let* ((db (dbfile:raw-open-no-sync-db dbpath)))
(set! *no-sync-db* db)
db))))
(mutex-unlock! *no-sync-db-mutex*)
res))
(define (db:no-sync-set db var val)
(sqlite3:execute db "INSERT OR REPLACE INTO no_sync_metadat (var,val) VALUES (?,?);" var val))
(define (db:no-sync-del! db var)
(sqlite3:execute db "DELETE FROM no_sync_metadat WHERE var=?;" var))
(define (db:no-sync-get/default db var default)
(assert (sqlite3:database? db) "FATAL: db:no-sync-get/default called with a bad db handle:" db)
(let ((res default))
(sqlite3:for-each-row
(lambda (val)
(set! res val))
db
"SELECT val FROM no_sync_metadat WHERE var=?;"
var)
(if res
(let ((newres (if (string? res)
(string->number res)
#f)))
(if newres
newres
res))
res)))
;; timestring+identifier+payload
;; locks are unique on identifier, payload is informational
(define (db:extract-time-identifier instr)
(let ((tokens (string-split instr "+")))
(match tokens
((t i)(cons (string->number t) i))
((t) (cons (string->number t) #f))
(else
(assert #f "FATAL: db:extract-time-identifier handed bad data "instr)))))
;; transaction protected lock aquisition
;; either:
;; fails returns (#f lock-creation-time identifier)
;; succeeds (returns (#t lock-creation-time identifier)
;; use (db:no-sync-del! db keyname) to release the lock
;;
(define (db:no-sync-get-lock-with-id db keyname identifier)
(sqlite3:with-transaction
db
(lambda ()
(condition-case
(let* ((curr-val (db:no-sync-get/default db keyname #f)))
(if curr-val
(match (db:extract-time-identifier curr-val) ;; result->timestamp, identifier
((timestamp . ident)
(cons (equal? ident identifier) timestamp))
(else (cons #f 'malformed-lock))) ;; lock malformed
(let ((curr-sec (current-seconds))
(lock-value (if identifier
(conc (current-seconds)"+"identifier)
(current-seconds))))
(sqlite3:execute db "INSERT OR REPLACE INTO no_sync_metadat (var,val) VALUES(?,?);" keyname lock-value)
(cons #t curr-sec))))
(exn (io-error) (dbfile:print-err "ERROR: i/o error with no-sync db. Check permissions, disk space etc. and try again."))
(exn (corrupt) (dbfile:print-err "ERROR: database no-sync db is corrupt. Repair it to proceed."))
(exn (busy) (dbfile:print-err "ERROR: database no-sync db is locked. Try copying to another location, remove original and copy back."))
(exn (permission)(dbfile:print-err "ERROR: database no-sync db has some permissions problem."))
(exn () ;; (status done) ;; I don't know how to detect status done but no data!
(dbfile:print-err "ERROR: Unknown error with database no-sync db message: exn="(condition->list exn)", \n"
((condition-property-accessor 'exn 'message) exn))
(cons #f #f))))))
(define (db:no-sync-check-lock db keyname identifier)
(let* ((curr-val (db:no-sync-get/default db keyname #f)))
(match (db:extract-time-identifier curr-val) ;; result->timestamp, identifier
((timestamp . ident)
(cons (equal? ident identifier) ident))
(else (cons #f 'no-lock)))))
;; get the lock, wait 0.25 seconds and verify still have it.
;; this should not be necessary given the use of transaction in
;; db:no-sync-get-lock-with-id but it does seem to be needed
;;
(define (db:no-sync-lock-and-check db keyname identifier)
(let* ((result (db:no-sync-get-lock-with-id db keyname identifier))
(gotlock (car result)))
(if gotlock
(begin
(thread-sleep! 0.25)
(db:no-sync-check-lock db keyname identifier))
result)))
;; transaction protected lock aquisition
;; either:
;; fails returns (#f . lock-creation-time)
;; succeeds (returns (#t . lock-creation-time)
;; use (db:no-sync-del! db keyname) to release the lock
;;
(define (db:no-sync-get-lock db keyname)
(sqlite3:with-transaction
db
(lambda ()
(condition-case
(let* ((curr-val (db:no-sync-get/default db keyname #f)))
(if curr-val
`(#f . ,curr-val) ;; (sqlite3:first-result db "SELECT val FROM no_sync_metadat WHERE var=?;" keyname))
(let ((lock-time (current-seconds)))
(sqlite3:execute db "INSERT OR REPLACE INTO no_sync_metadat (var,val) VALUES(?,?);" keyname lock-time)
`(#t . ,lock-time))))
(exn (io-error) (dbfile:print-err "ERROR: i/o error with no-sync db. Check permissions, disk space etc. and try again."))
(exn (corrupt) (dbfile:print-err "ERROR: database no-sync db is corrupt. Repair it to proceed."))
(exn (busy) (dbfile:print-err "ERROR: database no-sync db is locked. Try copying to another location, remove original and copy back."))
(exn (permission)(dbfile:print-err "ERROR: database no-sync db has some permissions problem."))
(exn () ;; (status done) ;; I don't know how to detect status done but no data!
(dbfile:print-err "ERROR: Unknown error with database no-sync db message: exn="(condition->list exn)", \n"
((condition-property-accessor 'exn 'message) exn))
`(#f . ,(current-seconds)))))))
(define (db:no-sync-get-lock-timeout db keyname timeout)
(let* ((lockdat (db:no-sync-get-lock db keyname)))
(match lockdat
((#f . lock-time)
(if (> (- (current-seconds) (if (string? lock-time)(string->number lock-time)lock-time)) timeout)
(let ((lock-time (current-seconds)))
;; (debug:print-info 2 *default-log-port* "db:no-sync-get-lock keyname=" keyname ", lock-time=" lock-time ", exn=" exn)
(sqlite3:execute db "INSERT OR REPLACE INTO no_sync_metadat (var,val) VALUES(?,?);" keyname lock-time)
`(#t . ,lock-time))
lockdat))
(else lockdat))))
;; NOTE: This will steal the lock after timeout of waiting.
;;
(define (db:with-no-sync-lock db keyname timeout proc)
(let* ((lockdat (db:no-sync-get-lock-timeout db keyname))
(gotlock (car lockdat))
(locktime (cdr lockdat)))
(if gotlock
(let ((res (proc)))
(db:no-sync-del! db keyname)
res))))
;;======================================================================
;; sync back functions pulled from db.scm
;;======================================================================
;; Get a lock from the no-sync-db for the from-db, then delta sync the from-db to the to-db, otherwise return #f
;;
(define (db:lock-and-delta-sync no-sync-db dbstruct from-db-file runid keys dbinit)
(assert (not *db-sync-in-progress*) "FATAL: db:lock-and-sync called while a sync is in progress.")
;; (dbfile:print-err *default-log-port* "db:lock-and-delta-sync")
(let* ((lock-file (conc from-db-file ".lock")))
(if (common:simple-file-lock lock-file)
(begin
(dbfile:print-err "INFO: db:lock-and-delta-sync copying db "runid" at "(current-seconds))
(set! *db-sync-in-progress* #t)
(db:sync-touched dbstruct runid keys dbinit)
(set! *db-sync-in-progress* #f)
(delete-file* lock-file)
#t)
(begin
(if (common:low-noise-print 120 (conc "no lock "from-db-file))
(dbfile:print-err "INFO: could not get lock for " from-db-file ", sync likely in progress."))
#f
))))
;; ;; Get a lock from the no-sync-db for the from-db, then delta sync the from-db to the to-db, otherwise return #f
;; ;;
;; (define (db:lock-and-delta-sync-orig no-sync-db dbstruct from-db-file runid keys dbinit)
;; (assert (not *db-sync-in-progress*) "FATAL: db:lock-and-sync called while a sync is in progress.")
;; ;; (dbfile:print-err *default-log-port* "db:lock-and-delta-sync")
;; (let* ((lockdat (db:no-sync-get-lock-timeout no-sync-db from-db-file 60))
;; (gotlock (car lockdat))
;; (locktime (cdr lockdat)))
;; ;; (debug:print-info 3 *default-log-port* "db:lock-and-delta-sync: got lock?")
;;
;; (if gotlock
;; (begin
;; (dbfile:print-err "INFO: db:lock-and-delta-sync copying db "runid" at "(current-seconds))
;; (set! *db-sync-in-progress* #t)
;; (db:sync-touched dbstruct runid keys dbinit)
;; (set! *db-sync-in-progress* #f)
;; (db:no-sync-del! no-sync-db from-db-file)
;; #t)
;; (begin
;; (dbfile:print-err "ERROR: could not get lock for " from-db-file " from no-sync-db")
;; #f
;; ))))
;; sync run from tmp disk to nfs disk if touched
;;
;; call with dbinit=db:initialize-main-db
;;
(define (db:sync-touched dbstruct run-id keys #!key dbinit (force-sync #f))
(dbfile:print-err "db:sync-touched Syncing: " (conc (if run-id run-id "main") ".db"))
(let* (;; the subdb is needed to access the mtdbdat
(subdb (or (dbfile:get-subdb dbstruct run-id)
(dbfile:init-subdb dbstruct run-id dbinit)))
(tmpdbfile (dbr:subdb-tmpdbfile subdb))
(mtdb (dbr:subdb-mtdbdat subdb))
(tmpdb (db:open-db dbstruct run-id dbinit)) ;; sqlite3-db tmpdbfile #f))
(start-t (current-seconds)))
(mutex-lock! *db-multi-sync-mutex*)
(let ((update_info (cons "last_update" (if force-sync 0 *db-last-sync*) )))
(mutex-unlock! *db-multi-sync-mutex*)
(db:sync-tables (db:sync-all-tables-list keys) update_info tmpdb mtdb))
(mutex-lock! *db-multi-sync-mutex*)
(set! *db-last-sync* start-t)
(set! *db-last-access* start-t)
(mutex-unlock! *db-multi-sync-mutex*)
(dbfile:add-dbdat dbstruct run-id tmpdb)
#t))
;; just tests, test_steps and test_data tables
(define db:sync-tests-only
(list
;; (list "strs"
;; '("id" #f)
;; '("str" #f))
(list "tests"
'("id" #f)
'("run_id" #f)
'("testname" #f)
'("host" #f)
'("cpuload" #f)
'("diskfree" #f)
'("uname" #f)
'("rundir" #f)
'("shortdir" #f)
'("item_path" #f)
'("state" #f)
'("status" #f)
'("attemptnum" #f)
'("final_logf" #f)
'("logdat" #f)
'("run_duration" #f)
'("comment" #f)
'("event_time" #f)
'("fail_count" #f)
'("pass_count" #f)
'("archived" #f)
'("last_update" #f))
(list "test_steps"
'("id" #f)
'("test_id" #f)
'("stepname" #f)
'("state" #f)
'("status" #f)
'("event_time" #f)
'("comment" #f)
'("logfile" #f)
'("last_update" #f))
(list "test_data"
'("id" #f)
'("test_id" #f)
'("category" #f)
'("variable" #f)
'("value" #f)
'("expected" #f)
'("tol" #f)
'("units" #f)
'("comment" #f)
'("status" #f)
'("type" #f)
'("last_update" #f))))
;; needs db to get keys, this is for syncing all tables
;;
(define (db:sync-main-list keys)
(let ((keys keys))
(list
(list "keys"
'("id" #f)
'("fieldname" #f)
'("fieldtype" #f))
(list "metadat" '("var" #f) '("val" #f))
(append (list "runs"
'("id" #f))
(map (lambda (k)(list k #f))
(append keys
(list "runname" "state" "status" "owner" "event_time" "comment" "fail_count" "pass_count" "contour" "last_update"))))
(list "archive_disks"
'("id" #f)
'("archive_area_name" #f)
'("disk_path" #f)
'("last_df" #f)
'("last_df_time" #f)
'("creation_time" #f))
(list "archive_blocks"
'("id" #f)
'("archive_disk_id" #f)
'("disk_path" #f)
'("last_du" #f)
'("last_du_time" #f)
'("creation_time" #f))
(list "test_meta"
'("id" #f)
'("testname" #f)
'("owner" #f)
'("description" #f)
'("reviewed" #f)
'("iterated" #f)
'("avg_runtime" #f)
'("avg_disk" #f)
'("tags" #f)
'("jobgroup" #f))
(list "tasks_queue"
'("id" #f)
'("action" #f)
'("owner" #f)
'("state" #f)
'("target" #f)
'("name" #f)
'("testpatt" #f)
'("keylock" #f)
'("params" #f)
'("creation_time" #f)
'("execution_time" #f))
)))
(define (db:sync-all-tables-list keys)
(append (db:sync-main-list keys)
db:sync-tests-only))
;; tbls is ( ("tablename" ( "field1" [#f|proc1] ) ( "field2" [#f|proc2] ) .... ) )
;; db's are dbdat's
;;
;; if last-update specified ("field-name" . time-in-seconds)
;; then sync only records where field-name >= time-in-seconds
;; IFF field-name exists
;;
(define (db:sync-tables tbls last-update fromdb todb . slave-dbs)
(handle-exceptions
exn
(begin
(dbfile:print-err "EXCEPTION: database probably overloaded or unreadable in db:sync-tables.")
(print-call-chain (current-error-port))
(dbfile:print-err " message: " ((condition-property-accessor 'exn 'message) exn))
(dbfile:print-err "exn=" (condition->list exn))
(dbfile:print-err " status: " ((condition-property-accessor 'sqlite3 'status) exn))
(dbfile:print-err " src db: " (dbr:dbdat-dbfile fromdb))
(for-each (lambda (dbdat)
(let ((dbpath (dbr:dbdat-dbfile dbdat)))
(dbfile:print-err " dbpath: " dbpath)
(if #t ;; (not (db:repair-db dbdat))
(begin
(dbfile:print-err "Failed to rebuild (repair is turned off) " dbpath ", exiting now.")
(exit)))))
(cons todb slave-dbs))
0)
;; this is the work to be done")
(cond
((not fromdb) (dbfile:print-err "WARNING: db:sync-tables called with fromdb missing")
-1)
((not todb) (dbfile:print-err "WARNING: db:sync-tables called with todb missing")
-2)
((not (sqlite3:database? (dbr:dbdat-dbh fromdb)))
(dbfile:print-err "db:sync-tables called with fromdb not a database " fromdb)
-3)
((not (sqlite3:database? (dbr:dbdat-dbh todb)))
(dbfile:print-err "db:sync-tables called with todb not a database " todb)
-4)
((not (file-write-access? (dbr:dbdat-dbfile todb)))
(dbfile:print-err "db:sync-tables called with todb not a read-only database " todb)
-5)
((not (null? (let ((readonly-slave-dbs
(filter
(lambda (dbdat)
(not (file-write-access? (dbr:dbdat-dbfile todb))))
slave-dbs)))
(for-each
(lambda (bad-dbdat)
(dbfile:print-err "db:sync-tables called with todb not a read-only database " bad-dbdat))
readonly-slave-dbs)
readonly-slave-dbs))) -6)
(else
;; (dbfile:print-err "db:sync-tables: args are good")
(let ((stmts (make-hash-table)) ;; table-field => stmt
(all-stmts '()) ;; ( ( stmt1 value1 ) ( stml2 value2 ))
(numrecs (make-hash-table))
(start-time (current-milliseconds))
(tot-count 0))
(for-each ;; table
(lambda (tabledat)
(let* ((tablename (car tabledat))
(fields (cdr tabledat))
(has-last-update (member "last_update" fields))
(use-last-update (cond
((and has-last-update
(member "last_update" fields))
#t) ;; if given a number, just use it for all fields
((number? last-update) #f) ;; if not matched first entry then ignore last-update for this table
((and (pair? last-update)
(member (car last-update) ;; last-update field name
(map car fields)))
#t)
((and last-update (not (pair? last-update)) (not (number? last-update)))
(dbfile:print-err "ERROR: parameter last-update for db:sync-tables must be a pair or a number, received: " last-update);; found in fields
#f)
(else
#f)))
(last-update-value (if use-last-update ;; no need to check for has-last-update - it is already accounted for
(if (number? last-update)
last-update
(cdr last-update))
#f))
(last-update-field (if use-last-update
(if (number? last-update)
"last_update"
(car last-update))
#f))
(num-fields (length fields))
(field->num (make-hash-table))
(num->field (apply vector (map car fields))) ;; BBHERE
(full-sel (conc "SELECT " (string-intersperse (map car fields) ",")
" FROM " tablename (if use-last-update ;; apply last-update criteria
(conc " WHERE " last-update-field " >= " last-update-value)
"")
";"))
(full-ins (conc "INSERT OR REPLACE INTO " tablename " ( " (string-intersperse (map car fields) ",") " ) "
" VALUES ( " (string-intersperse (make-list num-fields "?") ",") " );"))
(fromdat '())
(fromdats '())
(totrecords 0)
(batch-len 100) ;; (string->number (or (configf:lookup *configdat* "sync" "batchsize") "100")))
(todat (make-hash-table))
(count 0)
(field-names (map car fields))
(delay-handicap 0) ;; (string->number (or (configf:lookup *configdat* "sync" "delay-handicap") "0")))
)
;; set up the field->num table
(for-each
(lambda (field)
(hash-table-set! field->num field count)
(set! count (+ count 1)))
fields)
;; read the source table
;; store a list of all rows in the table in fromdat, up to batch-len.
;; Then add fromdat to the fromdats list, clear fromdat and repeat.
(sqlite3:with-transaction
(dbr:dbdat-dbh fromdb)
(lambda ()
(sqlite3:for-each-row
(lambda (a . b)
(set! fromdat (cons (apply vector a b) fromdat))
(if (> (length fromdat) batch-len)
(begin
(set! fromdats (cons fromdat fromdats))
(set! fromdat '())
(set! totrecords (+ totrecords 1)))
)
)
(dbr:dbdat-dbh fromdb)
full-sel)
)
)
;; Count less than batch-len as a record
(if (> (length fromdat) 0)
(set! totrecords (+ totrecords 1)))
;; tack on remaining records in fromdat
(if (not (null? fromdat))
(set! fromdats (cons fromdat fromdats)))
(sqlite3:for-each-row
(lambda (a . b)
(hash-table-set! todat a (apply vector a b)))
(dbr:dbdat-dbh todb)
full-sel)
(when (and delay-handicap (> delay-handicap 0))
(dbfile:print-err "imposing synthetic sync delay of "delay-handicap" seconds since sync/delay-handicap is configured")
(thread-sleep! delay-handicap)
(dbfile:print-err "synthetic sync delay of "delay-handicap" seconds completed")
)
;; first pass implementation, just insert all changed rows
(for-each
(lambda (targdb)
(let* ((db (dbr:dbdat-dbh targdb))
(drp-trigger (if (member "last_update" field-names)
(db:drop-trigger db tablename)
#f))
(has-last-update (member "last_update" field-names))
(is-trigger-dropped (if has-last-update
(db:is-trigger-dropped db tablename)
#f))
(stmth (sqlite3:prepare db full-ins))
(changed-rows 0))
(for-each
(lambda (fromdat-lst)
(sqlite3:with-transaction
db
(lambda ()
(for-each ;;
(lambda (fromrow)
(let* ((a (vector-ref fromrow 0))
(curr (hash-table-ref/default todat a #f))
(same #t))
(let loop ((i 0))
(if (or (not curr)
(not (equal? (vector-ref fromrow i)(vector-ref curr i))))
(set! same #f))
(if (and same
(< i (- num-fields 1)))
(loop (+ i 1))))
(if (not same)
(begin
(apply sqlite3:execute stmth (vector->list fromrow))
(hash-table-set! numrecs tablename (+ 1 (hash-table-ref/default numrecs tablename 0)))
(set! changed-rows (+ changed-rows 1))
)
)
))
fromdat-lst))))
fromdats)
(sqlite3:finalize! stmth)
(if (member "last_update" field-names)
(db:create-trigger db tablename))))
(append (list todb) slave-dbs)
)
)
)
tbls)
(let* ((runtime (- (current-milliseconds) start-time))
(should-print (or ;; (debug:debug-mode 12)
(common:low-noise-print 120 "db sync")
(> runtime 500)))) ;; low and high sync times treated as separate.
(for-each
(lambda (dat)
(let ((tblname (car dat))
(count (cdr dat)))
(set! tot-count (+ tot-count count))
))
(sort (hash-table->alist numrecs)(lambda (a b)(> (cdr a)(cdr b))))))
tot-count)))))
;;======================================================================
;; trigger setup/takedown
;;======================================================================
(define db:trigger-list
(list (list "update_runs_trigger" "CREATE TRIGGER IF NOT EXISTS update_runs_trigger AFTER UPDATE ON runs
FOR EACH ROW
BEGIN
UPDATE runs SET last_update=(strftime('%s','now'))
WHERE id=old.id;
END;" )
(list "update_run_stats_trigger" "CREATE TRIGGER IF NOT EXISTS update_run_stats_trigger AFTER UPDATE ON run_stats
FOR EACH ROW
BEGIN
UPDATE run_stats SET last_update=(strftime('%s','now'))
WHERE id=old.id;
END;" )
(list "update_tests_trigger" "CREATE TRIGGER IF NOT EXISTS update_tests_trigger AFTER UPDATE ON tests
FOR EACH ROW
BEGIN
UPDATE tests SET last_update=(strftime('%s','now'))
WHERE id=old.id;
END;" )
(list "update_teststeps_trigger" "CREATE TRIGGER IF NOT EXISTS update_teststeps_trigger AFTER UPDATE ON test_steps
FOR EACH ROW
BEGIN
UPDATE test_steps SET last_update=(strftime('%s','now'))
WHERE id=old.id;
END;" )
(list "update_test_data_trigger" "CREATE TRIGGER IF NOT EXISTS update_test_data_trigger AFTER UPDATE ON test_data
FOR EACH ROW
BEGIN
UPDATE test_data SET last_update=(strftime('%s','now'))
WHERE id=old.id;
END;" )))
(define (db:is-trigger-dropped db tbl-name)
(let* ((trigger-name (if (equal? tbl-name "test_steps")
"update_teststeps_trigger"
(conc "update_" tbl-name "_trigger")))
(res #f))
(sqlite3:for-each-row
(lambda (name)
(if (equal? name trigger-name)
(set! res #t)))
db
"SELECT name FROM sqlite_master WHERE type = 'trigger' ;")
res))
(define (db:drop-triggers db)
(for-each
(lambda (key)
(sqlite3:execute db (conc "drop trigger if exists " (car key))))
db:trigger-list))
(define (db:drop-trigger db tbl-name)
(let* ((trigger-name (if (equal? tbl-name "test_steps")
"update_teststeps_trigger"
(conc "update_" tbl-name "_trigger"))))
(for-each
(lambda (key)
(if (equal? (car key) trigger-name)
(sqlite3:execute db (conc "drop trigger if exists " trigger-name))))
db:trigger-list)))
(define (db:create-trigger db tbl-name)
(let* ((trigger-name (if (equal? tbl-name "test_steps")
"update_teststeps_trigger"
(conc "update_" tbl-name "_trigger"))))
(for-each (lambda (key)
(if (equal? (car key) trigger-name)
(sqlite3:execute db (cadr key))))
db:trigger-list)))
;;======================================================================
;; db access stuff
;;======================================================================
;; call with dbinit=db:initialize-main-db
;;
(define (db:open-db dbstruct run-id dbinit)
;; (mutex-lock! *db-open-mutex*)
(let* ((dbdat (dbfile:open-db dbstruct run-id dbinit)))
(set! *db-write-access* (not (dbr:dbdat-read-only dbdat)))
;; (mutex-unlock! *db-open-mutex*)
dbdat))
(define dbfile:db-init-proc (make-parameter #f))
;; in xmaxima this gives a curve close to what I want:
;; plot2d ((exp(x/1.2)-1)/300, [x, 0, 10])$
;; plot2d ((exp(x/1.5)-1)/40, [x, 0, 10])$
;; plot2d ((exp(x/5)-1)/40, [x, 0, 20])$
(define (dbfile:droop x)
(/ (- (exp (/ x 5)) 1) 40))
;; (* numqrys (/ 1 (qif-slope))))
;; create a dropping near the db file in a qif dir
;; use count of such files to gate queries (queries in flight)
;;
(define (dbfile:wait-for-qif fname run-id params)
(let* ((thedir (pathname-directory fname))
(dbnum (dbfile:run-id->dbnum run-id))
(destdir (conc thedir"/qif-"dbnum))
(uniqn (get-area-path-signature (conc dbnum params)))
(crumbn (conc destdir"/"(current-seconds)"-"uniqn"."(current-process-id))))
(if (not (file-exists? destdir))(create-directory (conc destdir"/attic") #t))
(let loop ((count 0))
(let* ((currlks (glob (conc destdir"/*")))
(numqrys (length currlks))
(delayval (cond ;; do a droopish curve
((> numqrys 25)
(for-each
(lambda (f)
(if (> (- (current-seconds)
(handle-exceptions
exn
(current-seconds) ;; file is likely gone, just fake out
(file-modification-time f)))
(keep-age-param))
(let* ((basedir (pathname-directory f))
(filen (pathname-file f))
(destf (conc basedir"/attic/"filen)))
(dbfile:print-err "Moving qif file "f" older than 10 seconds to "destf)
;; (delete-file* f)
(handle-exceptions
exn
#t
(file-move f destf #t)))))
currlks)
4)
((> numqrys 0) (dbfile:droop numqrys)) ;; slope of 1/100
(else #f))))
(if (and delayval
(< count 5))
(begin
(thread-sleep! delayval)
(loop (+ count 1))))))
(with-output-to-file crumbn
(lambda ()
(print fname" run-id="run-id" params="params)
))
crumbn))
;; ;; (db:with-db dbstruct run-id sqlite3:exec "select blah fgrom blaz;")
;; ;; r/w is a flag to indicate if the db is modified by this query #t = yes, #f = no
;; ;;
;; ;; Used only with http - to be removed
;; ;;
;; (define (dbfile:with-db dbstruct run-id r/w proc params)
;; (assert dbstruct "FATAL: db:with-db called with dbstruct "#f)
;; (assert (dbr:dbstruct? dbstruct) "FATAL: dbstruct is "dbstruct)
;; ;; Testing 2023, March 14th. I went from full time use of the mutext to no use at all and
;; ;; didn't see much change in the frequency of the messages:
;; ;; Warning (#<thread: thread14974>): in thread: (bind!) bad parameter or other API misuse
;; ;; allowing request count to go up to 1000 and other crashes showed up:
;; ;; Warning (#<thread: thread1889>): in thread: (deserialize) unexpected end of input: #<input port "(tcp)">
;; ;;
;; ;; leave it fully on for now, test later if there is a performance issue
;; ;;
;; (let* ((use-mutex #t) ;;(> *api-process-request-count* 50)) ;; risk of db corruption
;; (have-struct (dbr:dbstruct? dbstruct))
;; (dbdat (if have-struct ;; this stuff just allows us to call with a db handle directly
;; (db:open-db dbstruct run-id (dbfile:db-init-proc)) ;; (dbfile:get-subdb dbstruct run-id)
;; #f))
;; (db (if have-struct ;; this stuff just allows us to call with a db handle directly
;; (dbr:dbdat-dbh dbdat)
;; dbstruct))
;; (fname (if dbdat
;; (dbr:dbdat-dbfile dbdat)
;; "nofilenameavailable"))
;; (jfile (conc fname"-journal"))
;; (qryproc (lambda ()
;; (if use-mutex (mutex-lock! *db-with-db-mutex*))
;; (let ((res (apply proc dbdat db params))) ;; the actual call is here.
;; (if use-mutex (mutex-unlock! *db-with-db-mutex*))
;; ;; (if (vector? dbstruct)(db:done-with dbstruct run-id r/w))
;; (if dbdat
;; (dbfile:add-dbdat dbstruct run-id dbdat))
;; ;; (delete-file* crumbfile)
;; res)))
;; (stop-train (conc (dbr:dbstruct-areapath dbstruct)"/stop-the-train")))
;;
;; (assert (sqlite3:database? db) "FATAL: db:with-db, db is not a database, db="db
;; ", fname="fname)
;; (if (file-exists? jfile)
;; (begin
;; (dbfile:print-err "INFO: "jfile" exists, delaying to reduce database load")
;; (thread-sleep! 0.2)))
;; (if (and use-mutex
;; (common:low-noise-print 120 "over-50-parallel-api-requests"))
;; (dbfile:print-err *api-process-request-count*
;; " parallel api requests being processed in process "
;; (current-process-id))) ;; ", throttling access"))
;; (case (no-condition-db-with-db)
;; ((production)(qryproc))
;; ((suicide-mode)
;; (handle-exceptions
;; exn
;; (with-output-to-file stop-train
;; (lambda ()
;; (db:generic-error-printout exn "Stop train mode, run-id: "run-id
;; " params: "params" proc: "proc)))
;; (qryproc)))
;; (else
;; (condition-case
;; (qryproc)
;; (exn (io-error)
;; (db:generic-error-printout exn "ERROR: i/o error with "fname
;; ". Check permissions, disk space etc. and try again."))
;; (exn (corrupt)
;; (db:generic-error-printout exn "ERROR: database "fname
;; " is corrupt. Repair it to proceed."))
;; (exn (busy)
;; (db:generic-error-printout exn "ERROR: database "fname
;; " is locked. Try copying to another location,"
;; " remove original and copy back."))
;; (exn (permission)(db:generic-error-printout exn "ERROR: database "fname
;; " has some permissions problem."))
;; (exn ()
;; (db:generic-error-printout exn "ERROR: Unknown error with database "fname
;; " message: "
;; ((condition-property-accessor 'exn 'message) exn))))))))
;;======================================================================
;; another attempt at a transactionized queue
;;======================================================================
;; ;; ;; (define *transaction-queues* (make-hash-table))
;; ;; ;;
;; ;; ;; (define (db:get-queue run-id)
;; ;; ;; (let* ((res (hash-table-ref/default *transaction-queues* run-id #f)))
;; ;; ;; (if res
;; ;; ;; res
;; ;; ;; (let* ((newq (make-queue)))
;; ;; ;; (hash-table-set! *transaction-queues* run-id newq)
;; ;; ;; newq))))
;; ;; ;;
;; ;; ;; (define (db:add-to-transaction-queue dbstruct proc params)
;; ;; ;; (let* ((mbox (make-mailbox))
;; ;; ;; (q (db:get-queue run-id)))
;; ;; ;; (queue-add! *transaction-queue* (list dbstruct proc mbox))
;; ;; ;; (mailbox-receive mbox)))
;; ;; ;;
;; ;; ;; (define (db:process-transaction-queue *dbstruct-dbs*)
;; ;; ;; (for-each
;; ;; ;; (lambda (run-id)
;; ;; ;; (let* ((q (hash-table-ref *transaction-queue* run-id)))
;; ;; ;; ;; with-transaction
;; ;; ;; ;; dbstruct
;; ;; ;; ;; pop items from queue and execute them, return results via mailbox
;; ;; ;; q
;; ;; ;; ;; pop
;; ;; ;; ))
;; ;; ;; (hash-table-keys *transaction-queues*)))
;;======================================================================
;; file utils
;;======================================================================
;;======================================================================
;; lazy-safe get file mod time. on any error (file not existing etc.) return 0
;;
(define (dbfile:lazy-modification-time fpath)
(handle-exceptions
exn
(begin
(dbfile:print-err "Failed to get modification time for " fpath ", treating it as zero. exn=" exn)
0)
(if (file-exists? fpath)
(file-modification-time fpath)
0)))
;;======================================================================
;; find timestamp of newest file associated with a sqlite db file
(define (dbfile:lazy-sqlite-db-modification-time fpath)
(let* ((glob-list (handle-exceptions
exn
(begin
(dbfile:print-err "Failed to glob " fpath "*, exn=" exn)
`(,(conc "/no/such/file, message: " ((condition-property-accessor 'exn 'message) exn))))
(glob (conc fpath "*"))))
(file-list (if (eq? 0 (length glob-list))
'("/no/such/file")
glob-list)))
(apply max
(map
dbfile:lazy-modification-time
file-list))))
;; dot-locking egg seems not to work, using this for now
;; if lock is older than expire-time then remove it and try again
;; to get the lock
;;
(define (dbfile:simple-file-lock fname #!key (expire-time 300))
(let ((fmod-time (handle-exceptions
ext
(current-seconds)
(file-modification-time fname))))
;; if the file exists, if it has expired, delete it and call this function recursively.
(if (file-exists? fname)
(if (> (- (current-seconds) fmod-time) expire-time)
(begin
(dbfile:print-err "simple-file-lock: removing expired file: " fname)
(handle-exceptions exn #f (delete-file* fname))
(dbfile:simple-file-lock fname expire-time: expire-time))
#f
)
;; If it doesn't exist, write the host name and process id to the file
(let ((key-string (conc (get-host-name) "-" (current-process-id) ": " (argv)))
(oup (open-output-file fname)))
(with-output-to-port
oup
(lambda ()
(print key-string)))
(close-output-port oup)
;; sleep 3 seconds and make sure it still exists and contains the same host/process id.
;; if not, return #f
(thread-sleep! 0.25)
(if (file-exists? fname)
(handle-exceptions exn
#f
(with-input-from-file fname
(lambda ()
(equal? key-string (read-line)))))
(begin
(dbfile:print-err "dbfile:simple-file-lock created " fname " but it was gone 3 seconds later")
#f
)
)
)
)
)
)
(define (dbfile:simple-file-lock-and-wait fname #!key (expire-time 300))
(let ((end-time (+ expire-time (current-seconds))))
(let loop ((got-lock (dbfile:simple-file-lock fname expire-time: expire-time)))
(if got-lock
#t
(if (> end-time (current-seconds))
(begin
(thread-sleep! 3)
(loop (dbfile:simple-file-lock fname expire-time: expire-time)))
#f)))))
(define (dbfile:simple-file-release-lock fname)
(handle-exceptions
exn
#f ;; I don't really care why this failed (at least for now)
(delete-file* fname)))
(define (dbfile:with-simple-file-lock fname proc #!key (expire-time 300)(run-anyway #f))
(let ((start-time (current-seconds))
(gotlock (dbfile:simple-file-lock-and-wait fname expire-time: expire-time))
(end-time (current-seconds))
)
(if gotlock
(let ((res (proc)))
(dbfile:simple-file-release-lock fname)
res)
(begin
(dbfile:print-err "dbfile:with-simple-file-lock: " fname " is locked by "
(handle-exceptions
exn
"unreadable"
(with-input-from-file fname
(lambda ()
(read-line)))))
(dbfile:print-err "wait time = " (- end-time start-time))
(dbfile:print-err "ERROR: simple file lock could not get a lock for " fname " in " expire-time " seconds")
(if run-anyway
(let ((res (proc)))
(dbfile:simple-file-release-lock fname)
res)
#f)))))
(define *get-cache-stmth-mutex* (make-mutex))
(define (db:get-cache-stmth dbdat db stmt)
(mutex-lock! *get-cache-stmth-mutex*)
(let* (;; (dbdat (dbfile:get-dbdat dbstruct run-id))
(stmt-cache (dbr:dbdat-stmt-cache dbdat))
;; (stmth (db:hoh-get stmt-cache db stmt))
(stmth (hash-table-ref/default stmt-cache stmt #f))
(result (or stmth
(let* ((newstmth (sqlite3:prepare db stmt)))
;; (db:hoh-set! stmt-cache db stmt newstmth)
(hash-table-set! stmt-cache stmt newstmth)
newstmth))))
(mutex-unlock! *get-cache-stmth-mutex*)
result))
;;======================================================================
;; cached writes - run list of procs inside transaction
;; NOTE: this only works because we have once database per process
;;======================================================================
(define *cached-writes-mutex* (make-mutex))
(define *cached-writes-flag* #f)
(define *cached-writes-queues* (make-hash-table)) ;; dbstruct->list of writes
)