Index: dbmod.scm
==================================================================
--- dbmod.scm
+++ dbmod.scm
@@ -259,13 +259,14 @@
(delete-file synclock-file))
(thethread))
(debug:print 0 *default-log-port* "Skipping sync, lockfile "synclock-file" found."))
(thethread)))))))
;; (dbmod:sync-tables tables #f db cachedb)
- ;; (if db
+ ;;
+ (thread-sleep! 1) ;; let things settle before syncing in needed data
(dbmod:sync-gasket tables #f cachedb db dbfullname 'fromdest keys) ;; ) ;; load into cachedb
- (dbr:dbstruct-last-update-set! dbstruct (current-seconds)) ;; should this be offset back in time by one second?
+ (dbr:dbstruct-last-update-set! dbstruct (+ (current-seconds) -10)) ;; should this be offset back in time by one second?
dbstruct))
;; (if (eq? syncdir 'todisk) ;; sync to disk normally, sync from in dashboard
;; (dbmod:sync-tables tables last-update cachedb db)
;; (dbmod:sync-tables tables last-update db cachedb))))
@@ -468,97 +469,97 @@
(conc "SELECT name FROM pragma_table_info('"tablename"') as tblInfo;"))
has-last))
;; tbls is ( ("tablename" ( "field1" [#f|proc1] ) ( "field2" [#f|proc2] ) .... ) )
;;
-;; direction = fromdest, todest
+;; direction = fromdest, todisk
;; mode = 'full, 'incr
;;
;; Idea: youngest in dest is last_update time
;;
(define (dbmod:attach-sync tables dbh destdbfile direction #!key
(mode 'full)
(no-update '("keys")) ;; do
)
- (debug:print 0 *default-log-port* "Doing sync "direction" "destdbfile)
- (if (not (sqlite3:auto-committing? dbh))
- (debug:print 0 *default-log-port* "Skipping sync due to transaction in flight.")
- (let* ((table-names (map car tables))
- (dest-exists (file-exists? destdbfile)))
- (assert dest-exists "FATAL: sync called with non-existant file, "destdbfile)
- ;; attach the destdbfile
- ;; for each table
- ;; insert into dest.
select * from src. where last_update>last_update
- ;; done
- (debug:print 0 *default-log-port* "Attaching "destdbfile" as auxdb")
- (sqlite3:execute dbh (conc "ATTACH '"destdbfile"' AS auxdb;"))
- (for-each
- (lambda (table)
- (let* ((tbldat (alist-ref table tables equal?))
- (fields (map car tbldat))
- (no-id-fields (filter (lambda (x)(not (equal? x "id"))) fields))
- (fields-str (string-intersperse fields ","))
- (no-id-fields-str (string-intersperse no-id-fields ","))
- (dir (eq? direction 'todest))
- (fromdb (if dir "" "auxdb."))
- (todb (if dir "auxdb." ""))
- (set-str (string-intersperse
- (map (lambda (field)
- (conc fromdb field"="todb field))
- fields)
- ","))
- (stmt1 (conc "INSERT OR IGNORE INTO "todb table
- " SELECT * FROM "fromdb table";"))
- (stmt8 (conc "UPDATE "todb table" SET ("no-id-fields-str") = (SELECT "no-id-fields-str" FROM "fromdb table" WHERE "todb table".id="fromdb table".id"
- (if (member "last_update" fields)
- (conc " AND "fromdb table".last_update > "todb table".last_update);")
- ");")))
- (start-ms (current-milliseconds)))
- ;; (debug:print 0 *default-log-port* "stmt8="stmt8)
- ;; (if (sqlite3:auto-committing? dbh)
- ;; (begin
- (mutex-lock! *db-transaction-mutex*)
- (sqlite3:with-transaction
- dbh
- (lambda ()
- (debug:print-info 0 *default-log-port* "Sync from "fromdb table" to "todb table" using "stmt1)
- (sqlite3:execute dbh stmt1) ;; get all new rows
-
- #;(if (member "last_update" fields)
- (sqlite3:execute dbh stmt8)) ;; get all updated rows
- ;; (sqlite3:execute dbh stmt5)
- ;; (sqlite3:execute dbh stmt4) ;; if it worked this would be better for incremental up
- ;; (sqlite3:execute dbh stmt6)
- ))
- (debug:print 0 *default-log-port* "Synced table "table
- " in "(- (current-milliseconds) start-ms)"ms") ;; )
- (mutex-unlock! *db-transaction-mutex*)))
-
- ;; (debug:print 0 *default-log-port* "Skipping sync of table "table" due to transaction in flight."))))
- table-names)
- (sqlite3:execute dbh "DETACH auxdb;"))))
-
-;; FAILED ATTEMPTS
-
- ;; (if (not (has-last-update dbh table))
- ;; (sqlite3:execute dbh (conc "ALTER TABLE "table" ADD COLUMN last_update INTEGER;")))
- ;; (if (not (has-last-update dbh (conc "auxdb."table)))
- ;; (sqlite3:execute dbh (conc "ALTER TABLE auxdb."table" ADD COLUMN last_update INTEGER;")))
-
- ;; (stmt2 (conc "INSERT OR REPLACE INTO "todb table
- ;; " SELECT * FROM "fromdb table" WHERE "
- ;; fromdb table".last_update > "
- ;; todb table".last_update;"))
- ;; (stmt3 (conc "INSERT OR REPLACE INTO "todb"."table
- ;; " SELECT * FROM "fromdb table";"))
- ;; (stmt4 (conc "DELETE FROM "todb table" WHERE "fromdb
- ;; table ".last_update > "todb table".last_update;"))
- ;; (stmt5 (conc "DELETE FROM "todb table";"))
- ;; (stmt6 (conc "INSERT OR REPLACE INTO "todb table" ("fields-str") SELECT "fields-str" FROM "fromdb table";"))
- ;; (stmt7 (conc "UPDATE "todb table" SET "set-str (if (member "last_update" fields)
- ;; (conc " WHERE "fromdb table".last_update > "todb table".last_update;")
- ;; ";")))
+ (let* ((num-changes 0)
+ (update-changed (lambda (num-changed table qryname)
+ (if (> num-changed 0)
+ (begin
+ (debug:print-info 0 *default-log-port* "Changed "num-changed" rows for table "table", qry "qryname)
+ (set! num-changes (+ num-changes num-changed)))))))
+ (debug:print 0 *default-log-port* "Doing sync "direction" "destdbfile)
+ (if (not (sqlite3:auto-committing? dbh))
+ (debug:print 0 *default-log-port* "Skipping sync due to transaction in flight.")
+ (let* ((table-names (map car tables))
+ (dest-exists (file-exists? destdbfile)))
+ (assert dest-exists "FATAL: sync called with non-existant file, "destdbfile)
+ ;; attach the destdbfile
+ ;; for each table
+ ;; insert into dest. select * from src. where last_update>last_update
+ ;; done
+ (debug:print 0 *default-log-port* "Attaching "destdbfile" as auxdb")
+ (sqlite3:execute dbh (conc "ATTACH '"destdbfile"' AS auxdb;"))
+ (for-each
+ (lambda (table)
+ (let* ((tbldat (alist-ref table tables equal?))
+ (fields (map car tbldat))
+ (no-id-fields (filter (lambda (x)(not (equal? x "id"))) fields))
+ (fields-str (string-intersperse fields ","))
+ (no-id-fields-str (string-intersperse no-id-fields ","))
+ (dir (eq? direction 'todisk))
+ (fromdb (if dir "main." "auxdb."))
+ (todb (if dir "auxdb." "main."))
+ (set-str (string-intersperse
+ (map (lambda (field)
+ (conc fromdb field"="todb field))
+ fields)
+ ","))
+ (stmt1 (conc "INSERT OR IGNORE INTO "todb table
+ " SELECT * FROM "fromdb table";"))
+ (stmt2 (conc "INSERT OR IGNORE INTO "todb table
+ " SELECT * FROM "fromdb table" WHERE "fromdb table".id=?;"))
+ (stmt8 (conc "UPDATE "todb table" SET ("no-id-fields-str") = (SELECT "no-id-fields-str" FROM "fromdb table" WHERE "todb table".id="fromdb table".id"
+ (conc " AND "fromdb table".last_update > "todb table".last_update);")
+ ");"))
+ (stmt9 (conc "UPDATE "todb table" SET ("no-id-fields-str") = "
+ "(SELECT "no-id-fields-str" FROM "fromdb table" WHERE "fromdb table".id=?)"
+ " WHERE "todb table".id=?"))
+ (newrec (conc "SELECT id FROM "fromdb table" WHERE id NOT IN (SELECT id FROM "todb table");"))
+ #;(changedrec (conc "SELECT id FROM "fromdb table" WHERE "fromdb table".last_update > "todb table".last_update AND "
+ fromdb table".id="todb table".id;")) ;; main = fromdb
+ (changedrec (conc "SELECT "fromdb table".id FROM "fromdb table" join "todb table" on "fromdb table".id="todb table".id WHERE "fromdb table".last_update > "todb table".last_update;"))
+ ;; SELECT main.tests.id FROM main.tests join auxdb.tests on main.tests.id=auxdb.tests.id WHERE main.tests.last_update > auxdb.tests.last_update;"
+ (start-ms (current-milliseconds))
+ (new-ids (sqlite3:fold-row (lambda (res id)(cons id res)) '() dbh newrec)))
+ ;; (debug:print 0 *default-log-port* "Got "(length aux-ids)" in aux-ids and "(length main-ids)" in main-ids")
+ (update-changed (length new-ids) table "new records")
+ (mutex-lock! *db-transaction-mutex*)
+ (sqlite3:with-transaction
+ dbh
+ (lambda ()
+ (for-each (lambda (id)
+ (sqlite3:execute dbh stmt2 id))
+ new-ids)))
+ (if (member "last_update" fields)
+ (sqlite3:with-transaction
+ dbh
+ (lambda ()
+ (let* ((changed-ids (sqlite3:fold-row (lambda (res id)(cons id res)) '() dbh changedrec)))
+ (update-changed (length changed-ids) table "changed records")
+ (for-each (lambda (id)
+ (sqlite3:execute dbh stmt9 id id))
+ changed-ids)))))
+
+ (mutex-unlock! *db-transaction-mutex*)
+
+ (debug:print 0 *default-log-port* "Synced table "table
+ " in "(- (current-milliseconds) start-ms)"ms")
+
+ ))
+ table-names)
+ (sqlite3:execute dbh "DETACH auxdb;")))
+ num-changes))
;; prefix is "" or "auxdb."
;;
;; (define (dbmod:last-update-patch dbh prefix)
;; (let ((
Index: megatest.scm
==================================================================
--- megatest.scm
+++ megatest.scm
@@ -23,12 +23,10 @@
(define (toplevel-command . a) #f)
(declare (uses common))
;; (declare (uses megatest-version))
;; (declare (uses margs))
-;; (declare (uses mtargs))
-;; (declare (uses mtargs.import))
(declare (uses mtargs))
(declare (uses mtargs.import))
(declare (uses debugprint))
(declare (uses debugprint.import))
(declare (uses commonmod))
@@ -2571,14 +2569,16 @@
;;
(if (args:get-arg "-db2db")
(let* ((duh (launch:setup))
(src-db (args:get-arg "-from"))
(dest-db (args:get-arg "-to"))
- (sync-period (args:get-arg-number "-period"))
- (sync-timeout (args:get-arg-number "-timeout"))
- ;; (sync-period (if sync-period-in (string->number sync-period-in) #f))
- ;; (sync-timeout (if sync-timeout-in (string->number sync-timeout-in) #f))
+ ;; (sync-period (args:get-arg-number "-period"))
+ ;; (sync-timeout (args:get-arg-number "-timeout"))
+ (sync-period-in (args:get-arg "-period"))
+ (sync-timeout-in (args:get-arg "-timeout"))
+ (sync-period (if sync-period-in (string->number sync-period-in) #f))
+ (sync-timeout (if sync-timeout-in (string->number sync-timeout-in) #f))
(lockfile (conc dest-db".sync-lock"))
(keys (db:get-keys #f))
(thesync (lambda (last-update)
(debug:print-info 0 *default-log-port* "Attempting to sync data from "src-db" to "dest-db"...")
(if (not (file-exists? dest-db))
Index: transport-mode.scm.template
==================================================================
--- transport-mode.scm.template
+++ transport-mode.scm.template
@@ -13,10 +13,10 @@
;; (dbfile:sync-method 'none)
;; (dbfile:cache-method 'none)
;; (rmt:transport-mode 'nfs)
;; uncomment this block to test with tcp
-(dbfile:sync-method 'original) ;; attach) ;; original
+(dbfile:sync-method 'attach) ;; attach) ;; original
(dbfile:cache-method 'tmp)
(rmt:transport-mode 'tcp)