Megatest

dbmod.scm at [ab0494b4b6]
Login

File dbmod.scm artifact 20ac266100 part of check-in ab0494b4b6



;;======================================================================
;; Copyright 2017, Matthew Welland.
;; 
;; This file is part of Megatest.
;; 
;;     Megatest is free software: you can redistribute it and/or modify
;;     it under the terms of the GNU General Public License as published by
;;     the Free Software Foundation, either version 3 of the License, or
;;     (at your option) any later version.
;; 
;;     Megatest is distributed in the hope that it will be useful,
;;     but WITHOUT ANY WARRANTY; without even the implied warranty of
;;     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
;;     GNU General Public License for more details.
;; 
;;     You should have received a copy of the GNU General Public License
;;     along with Megatest.  If not, see <http://www.gnu.org/licenses/>.

;;======================================================================

(declare (unit dbmod))
(declare (uses dbfile))
(declare (uses commonmod))
(declare (uses debugprint))

(module dbmod
	*
	
(import scheme
	chicken
	data-structures
	extras
	files

	(prefix sqlite3 sqlite3:)
	matchable
	posix
	typed-records
	srfi-1
	srfi-18
	srfi-69

	commonmod
	dbfile
	debugprint
	)

;; NOTE: This returns only the name "1.db", "main.db", not the path
;;
(define (dbmod:run-id->dbfname run-id)
  (conc (dbfile:run-id->dbnum run-id)".db"))

(define (dbmod:get-dbdir dbstruct)
  (let* ((areapath (dbr:dbstruct-areapath dbstruct))
	 (dbdir    (conc areapath"/.mtdb")))
    (if (and (file-write-access? areapath)
	     (not (file-exists? dbdir)))
	(create-directory dbdir))
    dbdir))

(define (dbmod:run-id->full-dbfname dbstruct run-id)
  (conc (dbmod:get-dbdir dbstruct

			 run-id

			 )"/"(dbmod:run-id->dbfname run-id)))

;;======================================================================
;; Read-only inmem cached direct from disk method
;;======================================================================

(define *dbmod:nfs-db-handles* (make-hash-table)) ;; dbfname -> dbstruct

;; called in rmt.scm nfs-transport-handler
(define (dbmod:nfs-get-dbstruct run-id keys init-proc areapath)
  (assert areapath "FATAL: dbmod:nfs-get-dbstruct called without areapath set.")
  (let* ((dbfname  (dbmod:run-id->dbfname run-id))
	 (dbstruct (hash-table-ref/default *dbmod:nfs-db-handles* dbfname #f)))
    (if dbstruct
	dbstruct
	(let* ((newdbstruct (dbmod:open-dbmoddb areapath run-id dbfname init-proc keys syncdir: 'fromdisk)))
	  (hash-table-set! *dbmod:nfs-db-handles* dbfname newdbstruct)
	  newdbstruct))))

;;======================================================================
;; The inmem one-db file per server method goes in here
;;======================================================================

;; NOTE: the r/w is now w/r, #t=db modified by query, #f=db NOT modified by query
(define (dbmod:with-db dbstruct run-id w/r proc params)
  (let* ((use-mutex (or (and w/r ;; use the mutex on queries that modify the db and for sync to disk
			     (> *api-process-request-count* 5)) ;; when writes are happening throttle more
			(> *api-process-request-count* 50)))
	 (dbdat     (dbmod:open-db dbstruct run-id (dbfile:db-init-proc)))
	 (dbh       (dbr:dbdat-dbh dbdat)) ;; this will be the inmem handle
	 (dbfile    (dbr:dbdat-dbfile dbdat)))
    ;; if nfs mode do a sync if delta > 2
    (let* ((last-update (dbr:dbstruct-last-update dbstruct))
	   (sync-proc   (dbr:dbstruct-sync-proc dbstruct))
	   (curr-secs   (current-seconds)))
      (if (> (- curr-secs last-update) 3)
	  (begin
	    (sync-proc last-update)

	    ;; MOVE THIS CALL TO INSIDE THE sync-proc CALL
	    (dbr:dbstruct-last-update-set! dbstruct curr-secs)
	    )))
    (assert (sqlite3:database? dbh) "FATAL: bad db handle in dbmod:with-db") 
    (if use-mutex (mutex-lock! *db-with-db-mutex*))
    (let* ((res (apply proc dbdat dbh params)))
      (if use-mutex (mutex-unlock! *db-with-db-mutex*))
      res)))

(define (db:with-db dbstruct run-id w/r proc . params)
  (dbmod:with-db dbstruct run-id w/r proc params))

(define (dbmod:open-inmem-db init-proc #!optional (dbfullname #f))
  (let* ((db      (if dbfullname
		      (dbmod:safely-open-db dbfullname init-proc #t)
		      (sqlite3:open-database ":memory:")))
	 (handler (sqlite3:make-busy-timeout 3600)))
    (sqlite3:set-busy-handler! db handler)
    (init-proc db)
    db))

(define (dbmod:open-db dbstruct run-id dbinit)
  (or (dbr:dbstruct-dbdat dbstruct)
      (let* ((dbdat (make-dbr:dbdat
		     dbfile: (dbr:dbstruct-dbfile dbstruct)
		     dbh:    (dbr:dbstruct-inmem  dbstruct)
		     )))
	(dbr:dbstruct-dbdat-set! dbstruct dbdat)
	dbdat)))

(define (dbmod:need-on-disk-db-handle)
    (case (dbfile:cache-method)
      ((none tmp) #t)
      ((inmem)
       (case (dbfile:sync-method)
	 ((original) #t)
	 ((attach)   #t) ;; we need it to force creation of the on-disk file - FIXME
	 (else
	  (debug:print 0 *default-log-port* "Unknown dbfile:sync-method setting: "
		       (dbfile:sync-method)))))
      (else
       (debug:print 0 *default-log-port* "Unknown dbfile:cache-method setting: "
		    (dbfile:cache-method))
       #f)))

(define (dbmod:safely-open-db dbfullname init-proc write-access)
  (dbfile:with-simple-file-lock
   (conc dbfullname".lock")
   (lambda ()
     (let* ((db      (sqlite3:open-database dbfullname))
	    (handler (sqlite3:make-busy-timeout 136000)))
       (sqlite3:set-busy-handler! db handler)
       (if write-access
	   (init-proc db))
       db))))

;; try every second until tries times proc
;;
(define (db:keep-trying-until-true proc params tries)
  (let* ((res (apply proc params)))
    (if res
	res
	(if (> tries 0)
	    (begin
	      (thread-sleep! 1)
	      (db:keep-trying-until-true proc params (- tries 1)))
	    (begin
	      ;; (debug:print-info 0 *default-log-port* "proc never returned true, params="params)
	      (print"db:keep-trying-until-true proc never returned true, proc = " proc " params =" params " tries = " tries)
	      #f)))))
  

(define *sync-in-progress* #f)

;; Open the inmem db and the on-disk db
;; populate the inmem db with data
;;
;; Updates fields in dbstruct
;; Returns dbstruct
;;
;; * This routine creates the db if not found
;; * Probably can get rid of the dbstruct-in
;; 
(define (dbmod:open-dbmoddb areapath run-id dbfname-in init-proc keys
			    #!key (dbstruct-in #f)
			    (syncdir 'todisk))
  (let* ((dbstruct     (or dbstruct-in (make-dbr:dbstruct areapath: areapath)))
	 (dbfname      (or dbfname-in (dbmod:run-id->dbfname run-id)))
	 (dbpath       (dbmod:get-dbdir dbstruct))             ;; directory where all the .db files are kept
	 (dbfullname   (conc dbpath"/"dbfname)) ;; (dbmod:run-id->full-dbfname dbstruct run-id))
	 (dbexists     (file-exists? dbfullname))
	 (tmpdir       (conc "/tmp/"(current-user-name)))
	 (tmpdb        (let* ((fname (conc tmpdir"/" (string-translate areapath "/" ".")"-"(current-process-id)"-"dbfname)))
			 (if (not (file-exists? tmpdir))(create-directory tmpdir))
			 ;; check if tmpdb already exists, either delete it or
			 ;; add something to the name
			 fname))
	 #;(inmem        (dbmod:open-inmem-db init-proc
					    (if (eq? (dbcache-mode) 'inmem)
						#f
						tmpdb)
					    ))
	 (write-access (file-write-access? dbpath))
	 (db           (dbmod:safely-open-db dbfullname init-proc write-access))
	 (tables       (db:sync-all-tables-list keys)))
    (if (not (sqlite3:database? db)) ;; db is our master database in the .mtdb dir
	(begin
	  (debug:print 0 *default-log-port* "ERROR: Failed to properly open "dbfname-in", exiting immediately.")
	  (exit)))
    ;; we sync to tmpdb here so that we use file-copy to get intial database
    (dbmod:db-to-db-sync dbfullname tmpdb 0 init-proc keys)
    (let* ((inmem (dbmod:open-inmem-db init-proc tmpdb)))
      (dbr:dbstruct-inmem-set!     dbstruct inmem))
    (dbr:dbstruct-ondiskdb-set!  dbstruct db)
    (dbr:dbstruct-dbfile-set!    dbstruct dbfullname)
;    (dbr:dbstruct-dbfname-set!   dbstruct dbfname)
    (dbr:dbstruct-sync-proc-set! dbstruct
				 (lambda (last-update)
				   (if *sync-in-progress*
				       (debug:print 3 *default-log-port* "WARNING: overlapping calls to sync to disk")
				       (thread-start!
					(make-thread
					 (lambda ()
					   (mutex-lock! *db-with-db-mutex*) ;; this mutex is used when overloaded or during a query that modifies the db
					   (set! *sync-in-progress* #t)
					   #;(dbmod:sync-gasket tables last-update inmem db
					   dbfullname syncdir)
					   (system (conc "megatest -db2db -from "tmpdb" -to "dbfullname))
					   (mutex-unlock! *db-with-db-mutex*)
					   (thread-sleep! 0.5) ;; ensure at least 1/2 second down time between sync calls
					   (set! *sync-in-progress* #f)))))))
    (dbr:dbstruct-last-update-set! dbstruct (current-seconds)) ;; should this be offset back in time by one second?
    dbstruct))

;;    (if (eq? syncdir 'todisk) ;; sync to disk normally, sync from in dashboard
;;        (dbmod:sync-tables tables last-update inmem db)
;;        (dbmod:sync-tables tables last-update db inmem))))

;; direction: 'fromdest 'todest
;;
(define (dbmod:sync-gasket tables last-update inmem dbh dbfname direction keys)
  (assert (sqlite3:database? inmem) "FATAL: sync-gasket: inmem is not a db")
  (assert (sqlite3:database? dbh) "FATAL: sync-gasket: dbh is not a db")
  (debug:print-info 0 *default-log-port* "Db sync using "(dbfile:sync-method)" method")
  (case (dbfile:sync-method)
    ((none) #f)
    ((attach)
     (dbmod:attach-sync tables inmem dbfname direction))
    ((newsync) ;; DON'T USE THIS ONE. IT IS BORKED
     (dbmod:new-sync tables inmem dbh dbfname direction))
    (else
     (case direction
       ((todisk)
	(dbmod:sync-tables tables last-update keys inmem dbh)
	)
       (else
	(dbmod:sync-tables tables last-update keys dbh inmem))))))

(define (dbmod:close-db dbstruct)
  ;; do final sync to disk file
  ;; (do-sync ...)
  (sqlite3:finalize! (dbr:dbstruct-ondiskdb dbstruct)))

;;======================================================================
;; Sync db
;;======================================================================

(define (dbmod:calc-use-last-update has-last-update fields last-update)
  (cond
   ((and has-last-update
	 (member "last_update" fields))
    #t) ;; if given a number, just use it for all fields
   ((number? last-update) #f) ;; if not matched first entry then ignore last-update for this table
   ((and (pair? last-update)
	 (member (car last-update)    ;; last-update field name
		 (map car fields)))
    #t)
   ((and last-update (not (pair? last-update)) (not (number? last-update)))
    (debug:print 0 *default-log-port* "ERROR: parameter last-update for db:sync-tables must be a pair or a number, received: " last-update);; found in fields
    #f)
   (else
    #f)))

;; tbls is ( ("tablename" ( "field1" [#f|proc1] ) ( "field2" [#f|proc2] ) .... ) )
;; dbs are sqlite3 db handles
;;
;; if last-update specified ("field-name" . time-in-seconds)
;;    then sync only records where field-name >= time-in-seconds
;;    IFF field-name exists
;;
;; Use (db:sync-all-tables-list keys) to get the tbls input
;;
(define (dbmod:sync-tables tbls last-update keys fromdb todb)
  (debug:print-info 0 *default-log-port* "dbmod:sync-tables called, from: "fromdb", to: "todb)
  (assert (sqlite3:database? fromdb) "FATAL: dbmod:sync-tables called with fromdb not a database" fromdb)
  (assert (sqlite3:database? todb) "FATAL: dbmod:sync-tables called with fromdb not a database" todb)
  (let ((specials    `(("keys" "fieldname")
		       ("metadat" "var")
		       ,(cons "runs" (cons "runname" keys))
		       ("tests" "run_id" "testname" "item_path")
		       ("test_meta" "testname")
		       ("test_steps" "test_id" "stepname" "state")
		       ("test_data" "test_id" "category" "variable")))
	(stmts       (make-hash-table)) ;; table-field => stmt
	(all-stmts   '())              ;; ( ( stmt1 value1 ) ( stml2 value2 ))
	(numrecs     (make-hash-table))
	(start-time  (current-milliseconds))
	(tot-count   0))
    (for-each ;; table
     (lambda (tabledat)
       (let* ((count (match tabledat
		       ((tablename . fields)
			(debug:print-info 0 *default-log-port* "Syncing table "tablename)
			(dbmod:sync-table tablename fields fromdb todb specials))
		       (else
			(debug:print-warn 0 *default-log-port* "Bad tabledat entry: "tabledat)
			0))))
	 (set! tot-count (+ tot-count count))))
     tbls)
    (debug:print-info 0 *default-log-port* "dbmod:sync-tables completed in "(- (current-milliseconds) start-time)"ms")
    tot-count))

(define (dbmod:sync-table tablename fields from-db to-db specials)
  (let* ((key-fields       (alist-ref tablename specials equal?))
	 (field-names      (map car fields))
	 (has-last-update  (member "last_update" field-names))
	 (fields-sans-lu   (filter (lambda (x)
				     (not (member x '("id" "last_update"))))
				   field-names))
	 (get-vals        (lambda (db id fields)
			    (debug:print-info 0 *default-log-port* "get-vals: fields="fields", id="id)
			    (let* ((qry (conc "SELECT "(string-intersperse fields ",")" FROM "tablename" WHERE id=?;"))
				   (res #f))
			      (sqlite3:for-each-row
			       (lambda tuple
				 (set! res tuple))
			       db qry id)
			      res)))
	 (clean-up-qry    (lambda (from-id)
			    (debug:print-info 0 *default-log-port* "key-fields="key-fields", from-id="from-id)
			    (let* ((vals (get-vals from-db from-id key-fields))
				   (qry  (conc "DELETE FROM "tablename" WHERE "(string-intersperse key-fields "=? AND ")"=?;")))
			      (debug:print-info 0 *default-log-port* "qry: "qry", vals="vals)
			      (apply sqlite3:execute to-db qry vals))))
	 (get-ids          (lambda (db)
			     (sqlite3:fold-row (lambda (res id)
						 (cons id res))
					       '()
					       db
					       (conc "SELECT id FROM "tablename";"))))
	 (get-val          (lambda (db fieldname id)
			     (let* ((res #f)
				    (sql (conc "SELECT "fieldname" FROM "tablename" WHERE id=?;")))
			       (sqlite3:for-each-row
				(lambda (val)
				  (set! res val))
				db
				sql
				id)
			       ;; (debug:print-info 0 *default-log-port* "get-val "db" "fieldname" "id", sql="sql", res="res)
			       res)))
	 (get-row         (lambda (db id)
			    (let* ((res #f))
			      (sqlite3:for-each-row
			       (lambda tuple
				 (set! res tuple))
			       db
			       (conc "SELECT " (string-intersperse fields-sans-lu ",")
				     " FROM "tablename" WHERE id=?;")
			       id)
			      res)))
	 (ins-row         (lambda (db id row)
			    (let* ((qry (conc "INSERT INTO "tablename" (id,"
					      (string-intersperse fields-sans-lu ",")
					      ") VALUES ("id","
					      (string-intersperse
					       (make-list (length fields-sans-lu) "?")
					       ",")
					      ");"))
				   (proc (lambda ()
					   (apply sqlite3:execute db qry row))))
			      ;; (debug:print-info 0 *default-log-port* "qry="qry)
			      (handle-exceptions ;; on exception do the cleanup qry then try one more time
				  exn
				(begin
				  ;; (clean-up-qry id)
				  (proc))
				(proc)))))
			      
	 (num-inserts     0)
	 (num-updates     0)
	 )
    ;; (debug:print-info 0 *default-log-port* "field-names: "field-names", fields-sans-lu: "fields-sans-lu)
    ;; (sqlite3:with-transaction
    ;;  from-db
    ;;  (lambda ()
       (let* ((from-ids (get-ids from-db)))
	 ;; (debug:print-info 0 *default-log-port* "Table "tablename", has "(length from-ids)" records.")
 	 (sqlite3:with-transaction
 	  to-db
	  (lambda ()
	    (let* ((to-ids (get-ids to-db)))
	      ;; (debug:print 0 *default-log-port* "to-ids="to-ids)
	      (for-each ;; from-id
	       (lambda (from-id)
		 (if (member from-id to-ids)
		     (for-each ;; case where record exists, do one by one the fields if different
		      (lambda (fieldname)
			(let* ((from-val (get-val from-db fieldname from-id))
			       (dest-val (get-val to-db   fieldname from-id)))
			  #;(debug:print 0 *default-log-port*
				       "fieldname="fieldname
				       ", from-id="from-id
				       ", from-val="from-val
				       ", dest-val="dest-val
				       )
			  (if (not (equal? from-val dest-val))
			      (let* ((qry-proc (lambda ()
						 (sqlite3:execute to-db (conc "UPDATE "tablename" SET "fieldname"=? WHERE id=?;")
								  from-val from-id))))
				(handle-exceptions ;; try to remove the offending record and re-try once the update
				    exn
				  (begin
				    ;; (clean-up-qry from-id)
				    (qry-proc))
				  (qry-proc))
				(set! num-updates (+ num-updates 1))))))
		      fields-sans-lu)
		     (let ((row (get-row from-db from-id))) ;; need to insert the row
		       (debug:print 0 *default-log-port* "from-id="from-id", to-ids="to-ids", row="row)
		       (set! num-inserts (+ num-inserts 1))
		       (ins-row to-db from-id row))))
	       from-ids)))))
    (+ num-inserts num-updates)))

;;     (for-each ;; table
;;      (lambda (tabledat)
;;        (let* ((tablename        (car tabledat))
;; 	      (fields           (cdr tabledat))
;; 	      (has-last-update  (member "last_update" fields))
;; 	      (use-last-update  (dbmod:calc-use-last-update has-last-update fields last-update))
;; 	      (last-update-value (if use-last-update ;; no need to check for has-last-update - it is already accounted for
;; 				     (if (number? last-update)
;; 					 last-update
;; 					 (cdr last-update))
;; 				     #f))
;; 	      (last-update-field (if use-last-update
;; 				     (if (number? last-update)
;; 					 "last_update"
;; 					 (car last-update))
;; 				     #f))
;; 	      (num-fields (length fields))
;; 	      (field->num (make-hash-table))
;; 	      (num->field (apply vector (map car fields))) ;; BBHERE
;; 	      (full-sel   (conc "SELECT " (string-intersperse (map car fields) ",") 
;; 				" FROM " tablename (if use-last-update ;; apply last-update criteria
;; 						       (conc " WHERE " last-update-field " >= " last-update-value)
;; 						       "")
;; 				";"))
;; 	      (full-ins   (conc "INSERT OR REPLACE INTO " tablename " ( " (string-intersperse (map car fields) ",") " ) "
;; 				" VALUES ( " (string-intersperse (make-list num-fields "?") ",") " );"))
;; 	      (fromdat    '())
;; 	      (fromdats   '())
;; 	      (totrecords 0)
;; 	      (batch-len  10000000) ;; (string->number (or (configf:lookup *configdat* "sync" "batchsize") "100")))
;; 	      (todat      (make-hash-table))
;; 	      (count      0)
;;               (field-names (map car fields)))
;; 	 
;; 	 (debug:print-info 0 *default-log-port* "Syncing table "tablename)
;; 	 
;; 	 ;; set up the field->num table
;; 	 (for-each
;; 	  (lambda (field)
;; 	    (hash-table-set! field->num field count)
;; 	    (set! count (+ count 1)))
;; 	  fields)
;; 	 
;; 	 ;; read the source table
;;          ;; store a list of all rows in the table in fromdat, up to batch-len.
;;          ;; Then add fromdat to the fromdats list, clear fromdat and repeat.
;; 	 (sqlite3:for-each-row
;; 	  (lambda (a . b)
;; 	       (set! fromdat (cons (apply vector a b) fromdat))
;; 	       (if (> (length fromdat) batch-len)
;; 		   (begin
;; 		     (set! fromdats (cons fromdat fromdats))
;; 		     (set! fromdat  '())
;; 		     (set! totrecords (+ totrecords 1)))))
;; 	  fromdb
;; 	  full-sel)
;; 
;; 	 (debug:print-info 0 *default-log-port* "Have "totrecords" records to update.")
;;          ;; Count less than batch-len as a record
;;          (if (> (length fromdat) 0)
;;              (set! totrecords (+ totrecords 1)))
;; 	 
;; 	 ;; tack on remaining records in fromdat
;; 	 (if (not (null? fromdat))
;; 	     (set! fromdats (cons fromdat fromdats)))
;; 	 
;; 	 (sqlite3:for-each-row
;; 	  (lambda (a . b)
;; 	    (hash-table-set! todat a (apply vector a b)))
;; 	  todb
;; 	  full-sel)
;; 	 
;; 	 ;; first pass implementation, just insert all changed rows
;; 	 
;; 	 (let* ((db                 todb)
;; 		(has-last-update    (member "last_update" field-names))
;;                 (drp-trigger        (if has-last-update
;; 					(db:drop-trigger db tablename) 
;; 					#f))
;;                 (is-trigger-dropped (if has-last-update
;;                                         (db:is-trigger-dropped db tablename)
;; 					#f))
;; 		(stmth              (sqlite3:prepare db full-ins))
;;                 (changed-rows       0))
;; 	   (for-each
;; 	    (lambda (fromdat-lst)
;; 	      (mutex-lock! *db-transaction-mutex*)
;; 	      (sqlite3:with-transaction
;; 	       db
;; 	       (lambda ()
;; 		 (for-each ;; 
;; 		  (lambda (fromrow)
;; 		    (let* ((a    (vector-ref fromrow 0))
;; 			   (curr (hash-table-ref/default todat a #f))
;; 			   (same #t))
;; 		      (let loop ((i 0))
;; 			(if (or (not curr)
;; 				(not (equal? (vector-ref fromrow i)(vector-ref curr i))))
;; 			    (set! same #f))
;; 			(if (and same
;; 				 (< i (- num-fields 1)))
;; 			    (loop (+ i 1))))
;; 		      (if (not same)
;; 			  (begin
;; 			    (apply sqlite3:execute stmth (vector->list fromrow))
;; 			    (hash-table-set! numrecs tablename (+ 1 (hash-table-ref/default numrecs tablename 0)))
;; 			    (set! changed-rows (+ changed-rows 1))))))
;; 		  fromdat-lst)))
;; 	      (mutex-unlock! *db-transaction-mutex*))
;; 	    fromdats)
;; 	   
;; 	   (sqlite3:finalize! stmth)
;;            (if (member "last_update" field-names)
;;                (db:create-trigger db tablename)))
;; 	 ))
;;      tbls)
;;     (let* ((runtime      (- (current-milliseconds) start-time))
;; 	   (should-print (or ;; (debug:debug-mode 12)
;; 			  (common:low-noise-print 120 "db sync")
;; 			  (> runtime 500)))) ;; low and high sync times treated as separate.
;;       (for-each 
;;        (lambda (dat)
;; 	 (let ((tblname (car dat))
;; 	       (count   (cdr dat)))
;; 	   (set! tot-count (+ tot-count count)))) 
;;        (sort (hash-table->alist numrecs)(lambda (a b)(> (cdr a)(cdr b))))))

(define (has-last-update dbh tablename)
  (let* ((has-last #f))
    (sqlite3:for-each-row
     (lambda (name)
       (if (equal? name "last_update")
	   (set! has-last #t)))
     dbh
     (conc "SELECT name FROM pragma_table_info('"tablename"') as tblInfo;"))
    has-last))

;; tbls is ( ("tablename" ( "field1" [#f|proc1] ) ( "field2" [#f|proc2] ) .... ) )
;;
;; direction = fromdest, todest
;; mode = 'full, 'incr
;;
;; Idea: youngest in dest is last_update time
;;
(define (dbmod:attach-sync tables dbh destdbfile direction #!key
			   (mode 'full)
			   (no-update '("keys")) ;; do
			   )
  (debug:print 0 *default-log-port* "Doing sync "direction" "destdbfile)
  (if (not (sqlite3:auto-committing? dbh))
      (debug:print 0 *default-log-port* "Skipping sync due to transaction in flight.")
      (let* ((table-names  (map car tables))
	     (dest-exists  (file-exists? destdbfile))
	     (start-time   (current-milliseconds)))
	(assert dest-exists "FATAL: sync called with non-existant file, "destdbfile)
	;; attach the destdbfile
	;; for each table
	;;    insert into dest.<table> select * from src.<table> where last_update>last_update
	;; done
	(debug:print 0 *default-log-port* "Attaching "destdbfile" as auxdb")
	(sqlite3:execute dbh (conc "ATTACH '"destdbfile"' AS auxdb;"))
	(for-each
	 (lambda (table)
	   (let* ((tbldat (alist-ref table tables equal?))
		  (fields (map car tbldat))
		  (no-id-fields (filter (lambda (x)(not (equal? x "id"))) fields))
		  (fields-str (string-intersperse fields ","))
		  (no-id-fields-str (string-intersperse no-id-fields ","))
		  (dir    (eq? direction 'todest))
		  (fromdb (if dir "" "auxdb."))
		  (todb   (if dir "auxdb." ""))
		  (set-str (string-intersperse
			    (map (lambda (field)
				   (conc fromdb field"="todb field))
				 fields)
			    ","))
		  (stmt1 (conc "INSERT OR IGNORE INTO "todb table
			       " SELECT * FROM "fromdb table";"))
		  (stmt8 (conc "UPDATE "todb table" SET ("no-id-fields-str") = (SELECT "no-id-fields-str" FROM "fromdb table" WHERE "todb table".id="fromdb table".id"
			       (if (member "last_update" fields)
				   (conc " AND "fromdb table".last_update > "todb table".last_update);")
				   ");")))
		  (start-ms (current-milliseconds)))
	     ;; (debug:print 0 *default-log-port* "stmt8="stmt8)
	     ;; (if (sqlite3:auto-committing? dbh)
	     ;; (begin
	     (mutex-lock! *db-transaction-mutex*)
	     (sqlite3:with-transaction
	      dbh
	      (lambda ()
		(debug:print-info 0 *default-log-port* "Sync from "fromdb table" to "todb table" using INSERT OR UPDATE")
		(sqlite3:execute dbh stmt1)    ;; get all new rows
		
		(if (member "last_update" fields)
		    (sqlite3:execute dbh stmt8))    ;; get all updated rows
		;; (sqlite3:execute dbh stmt5)
		;; (sqlite3:execute dbh stmt4) ;; if it worked this would be better for incremental up
		;; (sqlite3:execute dbh stmt6)
		))
	     (debug:print 0 *default-log-port* "Synced table "table
			  " in "(- (current-milliseconds) start-ms)"ms") ;; )
	     (mutex-unlock! *db-transaction-mutex*)))
	     
	 ;; (debug:print 0 *default-log-port* "Skipping sync of table "table" due to transaction in flight."))))
	 table-names)
	(sqlite3:execute dbh "DETACH auxdb;")
	(debug:print-info 0 *default-log-port* "Total sync time: "(- (current-milliseconds) start-time)"ms")
	-1)))

;; FAILED ATTEMPTS

	     ;; (if (not (has-last-update dbh table))
	     ;;     (sqlite3:execute dbh (conc "ALTER TABLE "table" ADD COLUMN last_update INTEGER;")))
	     ;; (if (not (has-last-update dbh (conc "auxdb."table)))
	     ;;     (sqlite3:execute dbh (conc "ALTER TABLE auxdb."table" ADD COLUMN last_update INTEGER;")))

                ;; (stmt2 (conc "INSERT OR REPLACE INTO "todb table
		;; 	       " SELECT * FROM "fromdb table" WHERE "
		;; 	       fromdb table".last_update > "
		;; 	       todb table".last_update;"))
		;; (stmt3 (conc "INSERT OR REPLACE INTO "todb"."table
		;; 	       " SELECT * FROM "fromdb table";"))
		;; (stmt4 (conc "DELETE FROM "todb table" WHERE "fromdb
		;; 	       table ".last_update > "todb table".last_update;"))
		;; (stmt5 (conc "DELETE FROM "todb table";"))
		;; (stmt6 (conc "INSERT OR REPLACE INTO "todb table" ("fields-str") SELECT "fields-str" FROM "fromdb table";"))
		;; (stmt7 (conc "UPDATE "todb table" SET "set-str (if (member "last_update" fields)
		;; 						     (conc " WHERE "fromdb table".last_update > "todb table".last_update;")
		;; 						     ";")))

;; prefix is "" or "auxdb."
;;
;; (define (dbmod:last-update-patch dbh prefix)
;;   (let ((
  
;; tbls is ( ("tablename" ( "field1" [#f|proc1] ) ( "field2" [#f|proc2] ) .... ) )
;;
;; direction = fromdest, todest
;; mode = 'full, 'incr
;;
;; Idea: youngest in dest is last_update time
;;
(define (dbmod:new-sync tables dbh1 dbh2 destdbfile direction #!key
			   (mode 'full))
  (debug:print 0 *default-log-port* "Doing sync "direction" "destdbfile)
  (if (not (sqlite3:auto-committing? dbh1))
      (debug:print 0 *default-log-port* "Skipping sync due to transaction in flight.")
      (let* ((table-names  (map car tables))
	     (dest-exists  (file-exists? destdbfile)))
	(assert dest-exists "FATAL: sync called with non-existant file, "destdbfile)
	(for-each
	 (lambda (table)
	   (let* ((tbldat (alist-ref table tables equal?))
		  (fields (map car tbldat))
		  (no-id-fields (filter (lambda (x)(not (equal? x "id"))) fields))
		  (questionmarks    (string-intersperse (make-list (length no-id-fields) "?") ","))
		  (fields-str       (string-intersperse fields ","))
		  (no-id-fields-str (string-intersperse no-id-fields ","))
		  (dir    (eq? direction 'todest))
		  (fromdb (if dir dbh1 dbh2))
		  (todb   (if dir dbh2 dbh1))
		  (set-str (string-intersperse
			    (map (lambda (field)
				   (conc fromdb field"="todb field))
				 fields)
			    ","))
		  ;; (stmt1 (conc "INSERT OR IGNORE INTO "todb table
		  ;; 	       " SELECT * FROM "fromdb table";"))
		  ;; (stmt8 (conc "UPDATE "todb table" SET ("no-id-fields-str") = (SELECT "no-id-fields-str" FROM "fromdb table " WHERE "todb table".id="fromdb table".id"
		  ;; 	       (if (member "last_update" fields)
		  ;; 		   (conc " AND "fromdb table".last_update > "todb table".last_update);")
		  ;; 		   ");")))
		  (stmt1    (conc "SELECT MAX(last_update) FROM "table";")) ;; use the highest last_update as your time reference
		  (stmt2    (conc "SELECT no-id-fields-str FROM "table" WHERE last_update>?;"))
		  (stmt3    (conc "UPDATE "table" SET ("no-id-fields-str") = ("questionmarks") WHERE id=?;"))
		  (start-ms (current-milliseconds)))
	     (debug:print 0 *default-log-port* "stmt3="stmt3)
	     (if (sqlite3:auto-committing? dbh1)
		 (begin
		   (sqlite3:with-transaction
		    dbh1
		    (lambda ()
		      (sqlite3:execute dbh1 stmt1)    ;; get all new rows

		      #;(if (member "last_update" fields)
			  (sqlite3:execute dbh1 stmt8))    ;; get all updated rows
		      ;; (sqlite3:execute dbh stmt5)
		      ;; (sqlite3:execute dbh stmt4) ;; if it worked this would be better for incremental up
		      ;; (sqlite3:execute dbh stmt6)
		      ))
		   (debug:print 0 *default-log-port* "Synced table "table
				" in "(- (current-milliseconds) start-ms)"ms"))
		 (debug:print 0 *default-log-port* "Skipping sync of table "table" due to transaction in flight."))))
	 table-names)
	(sqlite3:execute dbh1 "DETACH auxdb;"))))




;;======================================================================
;; Moved from dbfile
;;======================================================================

;; wait up to aprox n seconds for a journal to go away
;;
(define (tasks:wait-on-journal path n #!key (remove #f)(waiting-msg #f))
  (if (not (string? path))
      (debug:print-error 0 *default-log-port* "Called tasks:wait-on-journal with path=" path " (not a string)")
      (let ((fullpath (conc path "-journal")))
	(handle-exceptions
	 exn
	 (begin
	   (print-call-chain (current-error-port))
	   (debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn))
	   (debug:print 5 *default-log-port* " exn=" (condition->list exn))
	   (debug:print 0 *default-log-port* "tasks:wait-on-journal failed. Continuing on, you can ignore this call-chain")
	   #t) ;; if stuff goes wrong just allow it to move on
	 (let loop ((journal-exists (file-exists? fullpath))
		    (count          n)) ;; wait ten times ...
	   (if journal-exists
	       (begin
		 (if (and waiting-msg
			  (eq? (modulo n 30) 0))
		     (debug:print 0 *default-log-port* waiting-msg))
		 (if (> count 0)
		     (begin
		       (thread-sleep! 1)
		       (loop (file-exists? fullpath)
			     (- count 1)))
		     (begin
		       (debug:print 0 *default-log-port* "ERROR: removing the journal file " fullpath ", this is not good. Look for disk full, write access and other issues.")
		       (if remove (system (conc "rm -rf " fullpath)))
		       #f)))
	       #t))))))


;;======================================================================
;; M E T A   G E T   A N D   S E T   V A R S
;;======================================================================

;; returns number if string->number is successful, string otherwise
;; also updates *global-delta*
;;
(define (db:get-var dbstruct var)
  (let* ((res      #f))
    (db:with-db
     dbstruct #f #f  ;; for the moment vars are only stored in main.db
     (lambda (dbdat db)
       (sqlite3:for-each-row
        (lambda (val)
          (set! res val))
        db
        "SELECT val FROM metadat WHERE var=?;" var)
       ;; convert to number if can
       (if (string? res)
           (let ((valnum (string->number res)))
             (if valnum (set! res valnum))))
       res))))

(define (db:inc-var dbstruct var)
  (db:with-db dbstruct #f #t 
	      (lambda (dbdat db)
		(sqlite3:execute db "UPDATE metadat SET val=val+1 WHERE var=?;" var))))

(define (db:dec-var dbstruct var)
  (db:with-db dbstruct #f #t 
	      (lambda (dbdat db)
		(sqlite3:execute db "UPDATE metadat SET val=val-1 WHERE var=?;" var))))

;; This was part of db:get-var. It was used to estimate the load on
;; the database files.
;;
;; scale by 10, average with current value.
;;     (set! *global-delta* (/ (+ *global-delta* (* (- (current-milliseconds) start-ms)
;; 						 (if throttle throttle 0.01)))
;; 			    2))
;;     (if (> (abs (- *last-global-delta-printed* *global-delta*)) 0.08) ;; don't print all the time, only if it changes a bit
;; 	(begin
;; 	  (debug:print-info 4 *default-log-port* "launch throttle factor=" *global-delta*)
;; 	  (set! *last-global-delta-printed* *global-delta*)))

(define (db:set-var dbstruct var val)
  (db:with-db dbstruct #f #t 
	      (lambda (dbdat db)
		(sqlite3:execute  (db:get-cache-stmth dbdat db "INSERT OR REPLACE INTO metadat (var,val) VALUES (?,?);")
				  var val))))

(define (db:add-var dbstruct var val)
  (db:with-db dbstruct #f #t 
	      (lambda (dbdat db)
		(sqlite3:execute  (db:get-cache-stmth dbdat db "UPDATE metadat SET val=val+? WHERE var=?;") val var))))

(define (db:del-var dbstruct var)
  (db:with-db dbstruct #f #t 
	      (lambda (dbdat db)
		(sqlite3:execute  (db:get-cache-stmth dbdat db "DELETE FROM metadat WHERE var=?;") var))))

(define (db:get-toplevels-and-incompletes dbstruct run-id running-deadtime remotehoststart-deadtime)
  (let* ((toplevels   '())
	 (oldlaunched '())
	 (incompleted '()))
    (db:with-db 
     dbstruct run-id #t ;; not a write but problemtic
     (lambda (dbdat db)
       (let* ((stmth1 (db:get-cache-stmth
		       dbdat db
		       "SELECT id,rundir,uname,testname,item_path,event_time,run_duration FROM tests 
                           WHERE run_id=? AND (strftime('%s','now') - event_time) > (run_duration + ?)
                                          AND state IN ('RUNNING');"))
	      (stmth2 (db:get-cache-stmth
		       dbdat db
		       "SELECT id,rundir,uname,testname,item_path,event_time,run_duration FROM tests 
                           WHERE run_id=? AND (strftime('%s','now') - event_time) > (run_duration + ?)
                                          AND state IN ('REMOTEHOSTSTART');"))
	      (stmth3 (db:get-cache-stmth
		       dbdat db
		       "SELECT id,rundir,uname,testname,item_path FROM tests
                           WHERE run_id=? AND (strftime('%s','now') - event_time) > 86400
                                          AND state IN ('LAUNCHED');")))
	 ;; in RUNNING or REMOTEHOSTSTART for more than 10 minutes
	 ;;
	 ;; HOWEVER: this code in run:test seems to work fine
	 ;;              (> (- (current-seconds)(+ (db:test-get-event_time testdat)
	 ;;                     (db:test-get-run_duration testdat)))
	 ;;                    600) 
	 (sqlite3:for-each-row 
	  (lambda (test-id run-dir uname testname item-path event-time run-duration)
	    (if (and (equal? uname "n/a")
		     (equal? item-path "")) ;; this is a toplevel test
		;; what to do with toplevel? call rollup?
		(begin
		  (set! toplevels   (cons (list test-id run-dir uname testname item-path run-id) toplevels))
		  (debug:print-info 0 *default-log-port* "Found old toplevel test in RUNNING state, test-id=" test-id))
		(begin
		  (set! incompleted (cons (list test-id run-dir uname testname item-path run-id) incompleted))
		  (debug:print-info 0 *default-log-port* "Found old test in RUNNING state, test-id="
				    test-id" exceeded running-deadtime "running-deadtime" now="(current-seconds)
				    " event-time="event-time" run-duration="run-duration))))
	  stmth1
	  run-id running-deadtime) ;; default time 720 seconds
	 
	 (sqlite3:for-each-row 
	  (lambda (test-id run-dir uname testname item-path event-time run-duration)
	    (if (and (equal? uname "n/a")
		     (equal? item-path "")) ;; this is a toplevel test
		;; what to do with toplevel? call rollup?
		(begin
		  (set! toplevels   (cons (list test-id run-dir uname testname item-path run-id) toplevels))
		  (debug:print-info 0 *default-log-port* "Found old toplevel test in RUNNING state, test-id=" test-id))
		(begin
		  (debug:print-info 0 *default-log-port* "Found old test in REMOTEHOSTSTART state, test-id=" test-id
				    " exceeded running-deadtime "running-deadtime" now="(current-seconds)" event-time="event-time
				    " run-duration="run-duration)
		  (set! incompleted (cons (list test-id run-dir uname testname item-path run-id) incompleted)))))
	  stmth2
	  run-id remotehoststart-deadtime) ;; default time 230 seconds
	 
	 ;; in LAUNCHED for more than one day. Could be long due to job queues TODO/BUG: Need override for this in config
	 (sqlite3:for-each-row
	  (lambda (test-id run-dir uname testname item-path)
	    (if (and (equal? uname "n/a")
		     (equal? item-path "")) ;; this is a toplevel test
		;; what to do with toplevel? call rollup?
		(set! toplevels   (cons (list test-id run-dir uname testname item-path run-id) toplevels))
		(begin
		  (debug:print-info 0 *default-log-port* "Found old test in LAUNCHED state, test-id=" test-id
				    " 1 day since event_time marked")
                  (set! oldlaunched (cons (list test-id run-dir uname testname item-path run-id) oldlaunched)))))
	  stmth3
	  run-id))))
    (list incompleted oldlaunched toplevels)))

(define (db:set-state-status-by-state-status dbstruct run-id testname currstate currstatus newstate newstatus)


  ;; clear caches needed

  
  (let* ((qry (conc "UPDATE tests SET state=?,status=? WHERE "
		    (if currstate  (conc "state='" currstate "' AND ") "")
		    (if currstatus (conc "status='" currstatus "' AND ") "")
		    " run_id=? AND testname LIKE ?;")))
    (db:with-db
     dbstruct
     run-id
     #t
     (lambda (dbdat db)
       (sqlite3:execute db qry
			(or newstate  currstate "NOT_STARTED")
			(or newstatus currstate "UNKNOWN")
			run-id testname)))))
;;======================================================================
;; db to db sync
;;======================================================================

(define (dbmod:db-to-db-sync src-db dest-db last-update init-proc keys)
  (if (and (file-exists? src-db) ;; can't proceed without a source
	   (file-read-access? src-db))
      (let* ((have-dest     (file-exists? dest-db))
	     (dest-file-wr  (and have-dest
				 (file-write-access? dest-db))) ;; exists and writable
	     (dest-dir      (or (pathname-directory dest-db)
				"."))
	     (dest-dir-wr   (and (file-exists? dest-dir)
				 (file-write-access? dest-dir)))
	     (d-wr          (or (and have-dest
				     dest-file-wr)
				dest-dir-wr))
	     (copied        (if (and (not have-dest)
				     dest-dir-wr)
				(begin
				  (file-copy src-db dest-db)
				  #t)
				#f)))
	(if copied
	    (begin
	      (debug:print-info 0 *default-log-port* "db-to-db-sync done with file-copy")
	      #t)
	    (let* ((tables (db:sync-all-tables-list keys))
		   (sdb    (dbmod:safely-open-db src-db init-proc #t))
		   (ddb    (dbmod:safely-open-db dest-db init-proc d-wr))
		   (res    (dbmod:sync-gasket tables last-update sdb ddb dest-db 'todest keys)))
	      (sqlite3:finalize! sdb)
	      (sqlite3:finalize! ddb)
	      res)))
      #f))

)