257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
|
exn
#f
(delete-file synclock-file))
(thethread))
(debug:print 0 *default-log-port* "Skipping sync, lockfile "synclock-file" found."))
(thethread)))))))
;; (dbmod:sync-tables tables #f db cachedb)
;; (if db
(dbmod:sync-gasket tables #f cachedb db dbfullname 'fromdest keys) ;; ) ;; load into cachedb
(dbr:dbstruct-last-update-set! dbstruct (current-seconds)) ;; should this be offset back in time by one second?
dbstruct))
;; (if (eq? syncdir 'todisk) ;; sync to disk normally, sync from in dashboard
;; (dbmod:sync-tables tables last-update cachedb db)
;; (dbmod:sync-tables tables last-update db cachedb))))
;;
;; direction: 'fromdest 'todest
|
|
>
|
|
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
|
exn
#f
(delete-file synclock-file))
(thethread))
(debug:print 0 *default-log-port* "Skipping sync, lockfile "synclock-file" found."))
(thethread)))))))
;; (dbmod:sync-tables tables #f db cachedb)
;;
(thread-sleep! 1) ;; let things settle before syncing in needed data
(dbmod:sync-gasket tables #f cachedb db dbfullname 'fromdest keys) ;; ) ;; load into cachedb
(dbr:dbstruct-last-update-set! dbstruct (+ (current-seconds) -10)) ;; should this be offset back in time by one second?
dbstruct))
;; (if (eq? syncdir 'todisk) ;; sync to disk normally, sync from in dashboard
;; (dbmod:sync-tables tables last-update cachedb db)
;; (dbmod:sync-tables tables last-update db cachedb))))
;;
;; direction: 'fromdest 'todest
|
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
|
(set! has-last #t)))
dbh
(conc "SELECT name FROM pragma_table_info('"tablename"') as tblInfo;"))
has-last))
;; tbls is ( ("tablename" ( "field1" [#f|proc1] ) ( "field2" [#f|proc2] ) .... ) )
;;
;; direction = fromdest, todest
;; mode = 'full, 'incr
;;
;; Idea: youngest in dest is last_update time
;;
(define (dbmod:attach-sync tables dbh destdbfile direction #!key
(mode 'full)
(no-update '("keys")) ;; do
)
(debug:print 0 *default-log-port* "Doing sync "direction" "destdbfile)
(if (not (sqlite3:auto-committing? dbh))
(debug:print 0 *default-log-port* "Skipping sync due to transaction in flight.")
(let* ((table-names (map car tables))
(dest-exists (file-exists? destdbfile)))
(assert dest-exists "FATAL: sync called with non-existant file, "destdbfile)
;; attach the destdbfile
;; for each table
;; insert into dest.<table> select * from src.<table> where last_update>last_update
;; done
(debug:print 0 *default-log-port* "Attaching "destdbfile" as auxdb")
(sqlite3:execute dbh (conc "ATTACH '"destdbfile"' AS auxdb;"))
(for-each
(lambda (table)
(let* ((tbldat (alist-ref table tables equal?))
(fields (map car tbldat))
(no-id-fields (filter (lambda (x)(not (equal? x "id"))) fields))
(fields-str (string-intersperse fields ","))
(no-id-fields-str (string-intersperse no-id-fields ","))
(dir (eq? direction 'todest))
(fromdb (if dir "" "auxdb."))
(todb (if dir "auxdb." ""))
(set-str (string-intersperse
(map (lambda (field)
(conc fromdb field"="todb field))
fields)
","))
(stmt1 (conc "INSERT OR IGNORE INTO "todb table
" SELECT * FROM "fromdb table";"))
(stmt8 (conc "UPDATE "todb table" SET ("no-id-fields-str") = (SELECT "no-id-fields-str" FROM "fromdb table" WHERE "todb table".id="fromdb table".id"
(if (member "last_update" fields)
(conc " AND "fromdb table".last_update > "todb table".last_update);")
");")))
(start-ms (current-milliseconds)))
;; (debug:print 0 *default-log-port* "stmt8="stmt8)
;; (if (sqlite3:auto-committing? dbh)
;; (begin
(mutex-lock! *db-transaction-mutex*)
(sqlite3:with-transaction
dbh
(lambda ()
(debug:print-info 0 *default-log-port* "Sync from "fromdb table" to "todb table" using "stmt1)
(sqlite3:execute dbh stmt1) ;; get all new rows
#;(if (member "last_update" fields)
(sqlite3:execute dbh stmt8)) ;; get all updated rows
;; (sqlite3:execute dbh stmt5)
;; (sqlite3:execute dbh stmt4) ;; if it worked this would be better for incremental up
;; (sqlite3:execute dbh stmt6)
))
(debug:print 0 *default-log-port* "Synced table "table
" in "(- (current-milliseconds) start-ms)"ms") ;; )
(mutex-unlock! *db-transaction-mutex*)))
;; (debug:print 0 *default-log-port* "Skipping sync of table "table" due to transaction in flight."))))
table-names)
(sqlite3:execute dbh "DETACH auxdb;"))))
;; FAILED ATTEMPTS
;; (if (not (has-last-update dbh table))
;; (sqlite3:execute dbh (conc "ALTER TABLE "table" ADD COLUMN last_update INTEGER;")))
;; (if (not (has-last-update dbh (conc "auxdb."table)))
;; (sqlite3:execute dbh (conc "ALTER TABLE auxdb."table" ADD COLUMN last_update INTEGER;")))
;; (stmt2 (conc "INSERT OR REPLACE INTO "todb table
;; " SELECT * FROM "fromdb table" WHERE "
;; fromdb table".last_update > "
;; todb table".last_update;"))
;; (stmt3 (conc "INSERT OR REPLACE INTO "todb"."table
;; " SELECT * FROM "fromdb table";"))
;; (stmt4 (conc "DELETE FROM "todb table" WHERE "fromdb
;; table ".last_update > "todb table".last_update;"))
;; (stmt5 (conc "DELETE FROM "todb table";"))
;; (stmt6 (conc "INSERT OR REPLACE INTO "todb table" ("fields-str") SELECT "fields-str" FROM "fromdb table";"))
;; (stmt7 (conc "UPDATE "todb table" SET "set-str (if (member "last_update" fields)
;; (conc " WHERE "fromdb table".last_update > "todb table".last_update;")
;; ";")))
;; prefix is "" or "auxdb."
;;
;; (define (dbmod:last-update-patch dbh prefix)
;; (let ((
;; tbls is ( ("tablename" ( "field1" [#f|proc1] ) ( "field2" [#f|proc2] ) .... ) )
|
|
>
>
>
>
>
>
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
>
>
|
<
|
|
>
>
>
>
>
>
>
>
|
>
|
<
<
<
<
<
<
<
<
|
<
<
<
<
<
<
<
<
|
|
>
>
>
>
>
>
|
>
|
>
>
>
|
>
|
<
>
|
<
<
<
<
>
>
|
<
<
<
|
<
|
<
<
|
<
<
<
|
|
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
|
(set! has-last #t)))
dbh
(conc "SELECT name FROM pragma_table_info('"tablename"') as tblInfo;"))
has-last))
;; tbls is ( ("tablename" ( "field1" [#f|proc1] ) ( "field2" [#f|proc2] ) .... ) )
;;
;; direction = fromdest, todisk
;; mode = 'full, 'incr
;;
;; Idea: youngest in dest is last_update time
;;
(define (dbmod:attach-sync tables dbh destdbfile direction #!key
(mode 'full)
(no-update '("keys")) ;; do
)
(let* ((num-changes 0)
(update-changed (lambda (num-changed table qryname)
(if (> num-changed 0)
(begin
(debug:print-info 0 *default-log-port* "Changed "num-changed" rows for table "table", qry "qryname)
(set! num-changes (+ num-changes num-changed)))))))
(debug:print 0 *default-log-port* "Doing sync "direction" "destdbfile)
(if (not (sqlite3:auto-committing? dbh))
(debug:print 0 *default-log-port* "Skipping sync due to transaction in flight.")
(let* ((table-names (map car tables))
(dest-exists (file-exists? destdbfile)))
(assert dest-exists "FATAL: sync called with non-existant file, "destdbfile)
;; attach the destdbfile
;; for each table
;; insert into dest.<table> select * from src.<table> where last_update>last_update
;; done
(debug:print 0 *default-log-port* "Attaching "destdbfile" as auxdb")
(sqlite3:execute dbh (conc "ATTACH '"destdbfile"' AS auxdb;"))
(for-each
(lambda (table)
(let* ((tbldat (alist-ref table tables equal?))
(fields (map car tbldat))
(no-id-fields (filter (lambda (x)(not (equal? x "id"))) fields))
(fields-str (string-intersperse fields ","))
(no-id-fields-str (string-intersperse no-id-fields ","))
(dir (eq? direction 'todisk))
(fromdb (if dir "main." "auxdb."))
(todb (if dir "auxdb." "main."))
(set-str (string-intersperse
(map (lambda (field)
(conc fromdb field"="todb field))
fields)
","))
(stmt1 (conc "INSERT OR IGNORE INTO "todb table
" SELECT * FROM "fromdb table";"))
(stmt2 (conc "INSERT OR IGNORE INTO "todb table
" SELECT * FROM "fromdb table" WHERE "fromdb table".id=?;"))
(stmt8 (conc "UPDATE "todb table" SET ("no-id-fields-str") = (SELECT "no-id-fields-str" FROM "fromdb table" WHERE "todb table".id="fromdb table".id"
(conc " AND "fromdb table".last_update > "todb table".last_update);")
");"))
(stmt9 (conc "UPDATE "todb table" SET ("no-id-fields-str") = "
"(SELECT "no-id-fields-str" FROM "fromdb table" WHERE "fromdb table".id=?)"
" WHERE "todb table".id=?"))
(newrec (conc "SELECT id FROM "fromdb table" WHERE id NOT IN (SELECT id FROM "todb table");"))
#;(changedrec (conc "SELECT id FROM "fromdb table" WHERE "fromdb table".last_update > "todb table".last_update AND "
fromdb table".id="todb table".id;")) ;; main = fromdb
(changedrec (conc "SELECT "fromdb table".id FROM "fromdb table" join "todb table" on "fromdb table".id="todb table".id WHERE "fromdb table".last_update > "todb table".last_update;"))
;; SELECT main.tests.id FROM main.tests join auxdb.tests on main.tests.id=auxdb.tests.id WHERE main.tests.last_update > auxdb.tests.last_update;"
(start-ms (current-milliseconds))
(new-ids (sqlite3:fold-row (lambda (res id)(cons id res)) '() dbh newrec)))
;; (debug:print 0 *default-log-port* "Got "(length aux-ids)" in aux-ids and "(length main-ids)" in main-ids")
(update-changed (length new-ids) table "new records")
(mutex-lock! *db-transaction-mutex*)
(sqlite3:with-transaction
dbh
(lambda ()
(for-each (lambda (id)
(sqlite3:execute dbh stmt2 id))
new-ids)))
(if (member "last_update" fields)
(sqlite3:with-transaction
dbh
(lambda ()
(let* ((changed-ids (sqlite3:fold-row (lambda (res id)(cons id res)) '() dbh changedrec)))
(update-changed (length changed-ids) table "changed records")
(for-each (lambda (id)
(sqlite3:execute dbh stmt9 id id))
changed-ids)))))
(mutex-unlock! *db-transaction-mutex*)
(debug:print 0 *default-log-port* "Synced table "table
" in "(- (current-milliseconds) start-ms)"ms")
))
table-names)
(sqlite3:execute dbh "DETACH auxdb;")))
num-changes))
;; prefix is "" or "auxdb."
;;
;; (define (dbmod:last-update-patch dbh prefix)
;; (let ((
;; tbls is ( ("tablename" ( "field1" [#f|proc1] ) ( "field2" [#f|proc2] ) .... ) )
|