;;======================================================================
;; Copyright 2017, Matthew Welland.
;;
;; This file is part of Megatest.
;;
;; Megatest is free software: you can redistribute it and/or modify
;; it under the terms of the GNU General Public License as published by
;; the Free Software Foundation, either version 3 of the License, or
;; (at your option) any later version.
;;
;; Megatest is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
;; GNU General Public License for more details.
;;
;; You should have received a copy of the GNU General Public License
;; along with Megatest. If not, see <http://www.gnu.org/licenses/>.
;;======================================================================
(declare (unit dbmod))
(declare (uses dbfile))
(declare (uses commonmod))
(declare (uses debugprint))
(module dbmod
*
(import scheme)
(cond-expand
(chicken-4
(import chicken
data-structures
extras
posix
)
(define current-process-milliseconds current-milliseconds)
)
(chicken-5
(import chicken.base
chicken.file
chicken.sort
chicken.string
chicken.time
)))
(import (prefix sqlite3 sqlite3:))
(import typed-records)
(import srfi-1)
(import srfi-18)
(import srfi-69)
(import commonmod)
(import dbfile)
(import debugprint)
;; NOTE: This returns only the name "1.db", "main.db", not the path
;;
(define (dbmod:run-id->dbfname run-id)
(conc (dbfile:run-id->dbnum run-id)".db"))
(define (dbmod:get-dbdir dbstruct)
(let* ((areapath (dbr:dbstruct-areapath dbstruct))
(dbdir (conc areapath"/.mtdb")))
(if (and (file-write-access? areapath)
(not (file-exists? dbdir)))
(create-directory dbdir))
dbdir))
(define (dbmod:run-id->full-dbfname dbstruct run-id)
(conc (dbmod:get-dbdir dbstruct
run-id
)"/"(dbmod:run-id->dbfname run-id)))
;;======================================================================
;; Read-only inmem cached direct from disk method
;;======================================================================
(define *dbmod:nfs-db-handles* (make-hash-table)) ;; dbfname -> dbstruct
;; called in rmt.scm nfs-transport-handler
(define (dbmod:nfs-get-dbstruct run-id keys init-proc areapath)
(assert areapath "FATAL: dbmod:nfs-get-dbstruct called without areapath set.")
(let* ((dbfname (dbmod:run-id->dbfname run-id))
(dbstruct (hash-table-ref/default *dbmod:nfs-db-handles* dbfname #f)))
(if dbstruct
dbstruct
(let* ((newdbstruct (dbmod:open-dbmoddb areapath run-id dbfname init-proc keys syncdir: 'fromdisk)))
(hash-table-set! *dbmod:nfs-db-handles* dbfname newdbstruct)
newdbstruct))))
;;======================================================================
;; The inmem one-db file per server method goes in here
;;======================================================================
(define (dbmod:with-db dbstruct run-id r/w proc params)
(let* ((dbdat (dbmod:open-db dbstruct run-id (dbfile:db-init-proc)))
(dbh (dbr:dbdat-dbh dbdat)) ;; this will be the inmem handle
(dbfile (dbr:dbdat-dbfile dbdat)))
;; if nfs mode do a sync if delta > 2
(let* ((last-update (dbr:dbstruct-last-update dbstruct))
(sync-proc (dbr:dbstruct-sync-proc dbstruct))
(curr-secs (current-seconds)))
(if (> (- curr-secs last-update) 3)
(begin
(sync-proc last-update)
(dbr:dbstruct-last-update-set! dbstruct curr-secs))))
(apply proc dbdat dbh params)))
(define (dbmod:open-inmem-db initproc)
(let* ((db (sqlite3:open-database ":memory:"))
(handler (sqlite3:make-busy-timeout 3600)))
(sqlite3:set-busy-handler! db handler)
(initproc db)
db))
(define (dbmod:open-db dbstruct run-id dbinit)
(or (dbr:dbstruct-dbdat dbstruct)
(let* ((dbdat (make-dbr:dbdat
dbfile: (dbr:dbstruct-dbfile dbstruct)
dbh: (dbr:dbstruct-inmem dbstruct)
)))
(dbr:dbstruct-dbdat-set! dbstruct dbdat)
dbdat)))
(define (dbmod:need-on-disk-db-handle)
(case (dbfile:cache-method)
((none tmp) #t)
((inmem)
(case (dbfile:sync-method)
((original) #t)
((attach) #t) ;; we need it to force creation of the on-disk file - FIXME
(else
(debug:print 0 *default-log-port* "Unknown dbfile:sync-method setting: "
(dbfile:sync-method)))))
(else
(debug:print 0 *default-log-port* "Unknown dbfile:cache-method setting: "
(dbfile:cache-method))
#f)))
;; Open the inmem db and the on-disk db
;; populate the inmem db with data
;;
;; Updates fields in dbstruct
;; Returns dbstruct
;;
;; * This routine creates the db if not found
;; * Probably can get rid of the dbstruct-in
;;
(define (dbmod:open-dbmoddb areapath run-id dbfname-in init-proc keys
#!key (dbstruct-in #f)
(syncdir 'todisk))
(let* ((dbstruct (or dbstruct-in (make-dbr:dbstruct areapath: areapath)))
(dbfname (or dbfname-in (dbmod:run-id->dbfname run-id)))
(dbpath (dbmod:get-dbdir dbstruct)) ;; directory where all the .db files are kept
(dbfullname (conc dbpath"/"dbfname)) ;; (dbmod:run-id->full-dbfname dbstruct run-id))
(dbexists (file-exists? dbfullname))
(inmem (dbmod:open-inmem-db init-proc))
(write-access (file-write-access? dbpath))
(open-the-db (lambda ()
(dbfile:with-simple-file-lock
(conc dbfullname".lock")
(lambda ()
(let* ((db (sqlite3:open-database dbfullname))
(handler (sqlite3:make-busy-timeout 136000)))
(sqlite3:set-busy-handler! db handler)
(if write-access
(init-proc db))
db)))))
(db ;; (if (dbmod:need-on-disk-db-handle)
(open-the-db))
;; #f))
(tables (db:sync-all-tables-list keys)))
(dbr:dbstruct-inmem-set! dbstruct inmem)
(dbr:dbstruct-ondiskdb-set! dbstruct db)
(dbr:dbstruct-dbfile-set! dbstruct dbfullname)
(dbr:dbstruct-dbfname-set! dbstruct dbfname)
(dbr:dbstruct-sync-proc-set! dbstruct
(lambda (last-update)
;; (if db
(dbmod:sync-gasket tables last-update inmem db
dbfullname syncdir))) ;; )
;; (dbmod:sync-tables tables #f db inmem)
;; (if db
(dbmod:sync-gasket tables #f inmem db dbfullname 'fromdest) ;; ) ;; load into inmem
(dbr:dbstruct-last-update-set! dbstruct (current-seconds)) ;; should this be offset back in time by one second?
dbstruct))
;; (if (eq? syncdir 'todisk) ;; sync to disk normally, sync from in dashboard
;; (dbmod:sync-tables tables last-update inmem db)
;; (dbmod:sync-tables tables last-update db inmem))))
;; direction: 'fromdest 'todest
;;
(define (dbmod:sync-gasket tables last-update inmem dbh dbfname direction)
(case (dbfile:sync-method)
((none) #f)
((attach)
(dbmod:attach-sync tables inmem dbfname direction))
((newsync)
(dbmod:new-sync tables inmem dbh dbfname direction))
(else
(case direction
((todisk)
(dbmod:sync-tables tables last-update inmem dbh)
)
(else
(dbmod:sync-tables tables last-update dbh inmem))))))
(define (dbmod:close-db dbstruct)
;; do final sync to disk file
;; (do-sync ...)
(sqlite3:finalize! (dbr:dbstruct-ondiskdb dbstruct)))
;;======================================================================
;; Sync db
;;======================================================================
(define (dbmod:calc-use-last-update has-last-update fields last-update)
(cond
((and has-last-update
(member "last_update" fields))
#t) ;; if given a number, just use it for all fields
((number? last-update) #f) ;; if not matched first entry then ignore last-update for this table
((and (pair? last-update)
(member (car last-update) ;; last-update field name
(map car fields)))
#t)
((and last-update (not (pair? last-update)) (not (number? last-update)))
(debug:print 0 *default-log-port* "ERROR: parameter last-update for db:sync-tables must be a pair or a number, received: " last-update);; found in fields
#f)
(else
#f)))
;; tbls is ( ("tablename" ( "field1" [#f|proc1] ) ( "field2" [#f|proc2] ) .... ) )
;; dbs are sqlite3 db handles
;;
;; if last-update specified ("field-name" . time-in-seconds)
;; then sync only records where field-name >= time-in-seconds
;; IFF field-name exists
;;
;; Use (db:sync-all-tables-list keys) to get the tbls input
;;
(define (dbmod:sync-tables tbls last-update fromdb todb)
(let ((stmts (make-hash-table)) ;; table-field => stmt
(all-stmts '()) ;; ( ( stmt1 value1 ) ( stml2 value2 ))
(numrecs (make-hash-table))
(start-time (current-process-milliseconds))
(tot-count 0))
(for-each ;; table
(lambda (tabledat)
(let* ((tablename (car tabledat))
(fields (cdr tabledat))
(has-last-update (member "last_update" fields))
(use-last-update (dbmod:calc-use-last-update has-last-update fields last-update))
(last-update-value (if use-last-update ;; no need to check for has-last-update - it is already accounted for
(if (number? last-update)
last-update
(cdr last-update))
#f))
(last-update-field (if use-last-update
(if (number? last-update)
"last_update"
(car last-update))
#f))
(num-fields (length fields))
(field->num (make-hash-table))
(num->field (apply vector (map car fields))) ;; BBHERE
(full-sel (conc "SELECT " (string-intersperse (map car fields) ",")
" FROM " tablename (if use-last-update ;; apply last-update criteria
(conc " WHERE " last-update-field " >= " last-update-value)
"")
";"))
(full-ins (conc "INSERT OR REPLACE INTO " tablename " ( " (string-intersperse (map car fields) ",") " ) "
" VALUES ( " (string-intersperse (make-list num-fields "?") ",") " );"))
(fromdat '())
(fromdats '())
(totrecords 0)
(batch-len 100) ;; (string->number (or (configf:lookup *configdat* "sync" "batchsize") "100")))
(todat (make-hash-table))
(count 0)
(field-names (map car fields)))
;; set up the field->num table
(for-each
(lambda (field)
(hash-table-set! field->num field count)
(set! count (+ count 1)))
fields)
;; read the source table
;; store a list of all rows in the table in fromdat, up to batch-len.
;; Then add fromdat to the fromdats list, clear fromdat and repeat.
(sqlite3:for-each-row
(lambda (a . b)
(set! fromdat (cons (apply vector a b) fromdat))
(if (> (length fromdat) batch-len)
(begin
(set! fromdats (cons fromdat fromdats))
(set! fromdat '())
(set! totrecords (+ totrecords 1)))))
fromdb
full-sel)
;; Count less than batch-len as a record
(if (> (length fromdat) 0)
(set! totrecords (+ totrecords 1)))
;; tack on remaining records in fromdat
(if (not (null? fromdat))
(set! fromdats (cons fromdat fromdats)))
(sqlite3:for-each-row
(lambda (a . b)
(hash-table-set! todat a (apply vector a b)))
todb
full-sel)
;; first pass implementation, just insert all changed rows
(let* ((db todb)
(drp-trigger (if (member "last_update" field-names)
(db:drop-trigger db tablename)
#f))
(has-last-update (member "last_update" field-names))
(is-trigger-dropped (if has-last-update
(db:is-trigger-dropped db tablename)
#f))
(stmth (sqlite3:prepare db full-ins))
(changed-rows 0))
(for-each
(lambda (fromdat-lst)
(mutex-lock! *db-transaction-mutex*)
(sqlite3:with-transaction
db
(lambda ()
(for-each ;;
(lambda (fromrow)
(let* ((a (vector-ref fromrow 0))
(curr (hash-table-ref/default todat a #f))
(same #t))
(let loop ((i 0))
(if (or (not curr)
(not (equal? (vector-ref fromrow i)(vector-ref curr i))))
(set! same #f))
(if (and same
(< i (- num-fields 1)))
(loop (+ i 1))))
(if (not same)
(begin
(apply sqlite3:execute stmth (vector->list fromrow))
(hash-table-set! numrecs tablename (+ 1 (hash-table-ref/default numrecs tablename 0)))
(set! changed-rows (+ changed-rows 1))))))
fromdat-lst)))
(mutex-unlock! *db-transaction-mutex*))
fromdats)
(sqlite3:finalize! stmth)
(if (member "last_update" field-names)
(db:create-trigger db tablename)))
))
tbls)
(let* ((runtime (- (current-process-milliseconds) start-time))
(should-print (or ;; (debug:debug-mode 12)
(common:low-noise-print 120 "db sync")
(> runtime 500)))) ;; low and high sync times treated as separate.
(for-each
(lambda (dat)
(let ((tblname (car dat))
(count (cdr dat)))
(set! tot-count (+ tot-count count))))
(sort (hash-table->alist numrecs)(lambda (a b)(> (cdr a)(cdr b))))))
tot-count))
(define (has-last-update dbh tablename)
(let* ((has-last #f))
(sqlite3:for-each-row
(lambda (name)
(if (equal? name "last_update")
(set! has-last #t)))
dbh
(conc "SELECT name FROM pragma_table_info('"tablename"') as tblInfo;"))
has-last))
;; tbls is ( ("tablename" ( "field1" [#f|proc1] ) ( "field2" [#f|proc2] ) .... ) )
;;
;; direction = fromdest, todest
;; mode = 'full, 'incr
;;
;; Idea: youngest in dest is last_update time
;;
(define (dbmod:attach-sync tables dbh destdbfile direction #!key
(mode 'full)
(no-update '("keys")) ;; do
)
(debug:print 0 *default-log-port* "Doing sync "direction" "destdbfile)
(if (not (sqlite3:auto-committing? dbh))
(debug:print 0 *default-log-port* "Skipping sync due to transaction in flight.")
(let* ((table-names (map car tables))
(dest-exists (file-exists? destdbfile)))
(assert dest-exists "FATAL: sync called with non-existant file, "destdbfile)
;; attach the destdbfile
;; for each table
;; insert into dest.<table> select * from src.<table> where last_update>last_update
;; done
(debug:print 0 *default-log-port* "Attaching "destdbfile" as auxdb")
(sqlite3:execute dbh (conc "ATTACH '"destdbfile"' AS auxdb;"))
(for-each
(lambda (table)
(let* ((tbldat (alist-ref table tables equal?))
(fields (map car tbldat))
(no-id-fields (filter (lambda (x)(not (equal? x "id"))) fields))
(fields-str (string-intersperse fields ","))
(no-id-fields-str (string-intersperse no-id-fields ","))
(dir (eq? direction 'todest))
(fromdb (if dir "" "auxdb."))
(todb (if dir "auxdb." ""))
(set-str (string-intersperse
(map (lambda (field)
(conc fromdb field"="todb field))
fields)
","))
(stmt1 (conc "INSERT OR IGNORE INTO "todb table
" SELECT * FROM "fromdb table";"))
(stmt8 (conc "UPDATE "todb table" SET ("no-id-fields-str") = (SELECT "no-id-fields-str" FROM "fromdb table" WHERE "todb table".id="fromdb table".id"
(if (member "last_update" fields)
(conc " AND "fromdb table".last_update > "todb table".last_update);")
");")))
(start-ms (current-process-milliseconds)))
;; (debug:print 0 *default-log-port* "stmt8="stmt8)
;; (if (sqlite3:auto-committing? dbh)
;; (begin
(mutex-lock! *db-transaction-mutex*)
(sqlite3:with-transaction
dbh
(lambda ()
(debug:print-info 0 *default-log-port* "Sync from "fromdb table" to "todb table" using "stmt1)
(sqlite3:execute dbh stmt1) ;; get all new rows
#;(if (member "last_update" fields)
(sqlite3:execute dbh stmt8)) ;; get all updated rows
;; (sqlite3:execute dbh stmt5)
;; (sqlite3:execute dbh stmt4) ;; if it worked this would be better for incremental up
;; (sqlite3:execute dbh stmt6)
))
(debug:print 0 *default-log-port* "Synced table "table
" in "(- (current-process-milliseconds) start-ms)"ms") ;; )
(mutex-unlock! *db-transaction-mutex*)))
;; (debug:print 0 *default-log-port* "Skipping sync of table "table" due to transaction in flight."))))
table-names)
(sqlite3:execute dbh "DETACH auxdb;"))))
;; FAILED ATTEMPTS
;; (if (not (has-last-update dbh table))
;; (sqlite3:execute dbh (conc "ALTER TABLE "table" ADD COLUMN last_update INTEGER;")))
;; (if (not (has-last-update dbh (conc "auxdb."table)))
;; (sqlite3:execute dbh (conc "ALTER TABLE auxdb."table" ADD COLUMN last_update INTEGER;")))
;; (stmt2 (conc "INSERT OR REPLACE INTO "todb table
;; " SELECT * FROM "fromdb table" WHERE "
;; fromdb table".last_update > "
;; todb table".last_update;"))
;; (stmt3 (conc "INSERT OR REPLACE INTO "todb"."table
;; " SELECT * FROM "fromdb table";"))
;; (stmt4 (conc "DELETE FROM "todb table" WHERE "fromdb
;; table ".last_update > "todb table".last_update;"))
;; (stmt5 (conc "DELETE FROM "todb table";"))
;; (stmt6 (conc "INSERT OR REPLACE INTO "todb table" ("fields-str") SELECT "fields-str" FROM "fromdb table";"))
;; (stmt7 (conc "UPDATE "todb table" SET "set-str (if (member "last_update" fields)
;; (conc " WHERE "fromdb table".last_update > "todb table".last_update;")
;; ";")))
;; prefix is "" or "auxdb."
;;
;; (define (dbmod:last-update-patch dbh prefix)
;; (let ((
;; tbls is ( ("tablename" ( "field1" [#f|proc1] ) ( "field2" [#f|proc2] ) .... ) )
;;
;; direction = fromdest, todest
;; mode = 'full, 'incr
;;
;; Idea: youngest in dest is last_update time
;;
(define (dbmod:new-sync tables dbh1 dbh2 destdbfile direction #!key
(mode 'full))
(debug:print 0 *default-log-port* "Doing sync "direction" "destdbfile)
(if (not (sqlite3:auto-committing? dbh1))
(debug:print 0 *default-log-port* "Skipping sync due to transaction in flight.")
(let* ((table-names (map car tables))
(dest-exists (file-exists? destdbfile)))
(assert dest-exists "FATAL: sync called with non-existant file, "destdbfile)
(for-each
(lambda (table)
(let* ((tbldat (alist-ref table tables equal?))
(fields (map car tbldat))
(no-id-fields (filter (lambda (x)(not (equal? x "id"))) fields))
(questionmarks (string-intersperse (make-list (length no-id-fields) "?") ","))
(fields-str (string-intersperse fields ","))
(no-id-fields-str (string-intersperse no-id-fields ","))
(dir (eq? direction 'todest))
(fromdb (if dir dbh1 dbh2))
(todb (if dir dbh2 dbh1))
(set-str (string-intersperse
(map (lambda (field)
(conc fromdb field"="todb field))
fields)
","))
;; (stmt1 (conc "INSERT OR IGNORE INTO "todb table
;; " SELECT * FROM "fromdb table";"))
;; (stmt8 (conc "UPDATE "todb table" SET ("no-id-fields-str") = (SELECT "no-id-fields-str" FROM "fromdb table " WHERE "todb table".id="fromdb table".id"
;; (if (member "last_update" fields)
;; (conc " AND "fromdb table".last_update > "todb table".last_update);")
;; ");")))
(stmt1 (conc "SELECT MAX(last_update) FROM "table";")) ;; use the highest last_update as your time reference
(stmt2 (conc "SELECT no-id-fields-str FROM "table" WHERE last_update>?;"))
(stmt3 (conc "UPDATE "table" SET ("no-id-fields-str") = ("questionmarks") WHERE id=?;"))
(start-ms (current-process-milliseconds)))
(debug:print 0 *default-log-port* "stmt3="stmt3)
(if (sqlite3:auto-committing? dbh1)
(begin
(sqlite3:with-transaction
dbh1
(lambda ()
(sqlite3:execute dbh1 stmt1) ;; get all new rows
#;(if (member "last_update" fields)
(sqlite3:execute dbh1 stmt8)) ;; get all updated rows
;; (sqlite3:execute dbh stmt5)
;; (sqlite3:execute dbh stmt4) ;; if it worked this would be better for incremental up
;; (sqlite3:execute dbh stmt6)
))
(debug:print 0 *default-log-port* "Synced table "table
" in "(- (current-process-milliseconds) start-ms)"ms"))
(debug:print 0 *default-log-port* "Skipping sync of table "table" due to transaction in flight."))))
table-names)
(sqlite3:execute dbh1 "DETACH auxdb;"))))
;;======================================================================
;; Moved from dbfile
;;======================================================================
)