Overview
Comment: | Cherry picked 1e29e5e90e to fix db syncing. |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | v1.80-revolution |
Files: | files | file ages | folders |
SHA1: |
4d3f148ed566f0e480cb1c7bee802a65 |
User & Date: | mmgraham on 2023-12-21 11:18:51 |
Other Links: | branch diff | manifest | tags |
Context
2023-12-22
| ||
13:52 | Made -cleanup-db remove test_steps and tests from deleted runs. check-in: 97e7119428 user: mmgraham tags: v1.80-revolution | |
2023-12-21
| ||
11:18 | Cherry picked 1e29e5e90e to fix db syncing. check-in: 4d3f148ed5 user: mmgraham tags: v1.80-revolution | |
2023-12-20
| ||
12:47 | Changed version to 1.8025 check-in: df9018e732 user: mmgraham tags: v1.80-revolution, v1.8025 | |
Changes
Modified dbmod.scm from [44746b8c36] to [cb31c71f30].
︙ | ︙ | |||
239 240 241 242 243 244 245 | (dbr:dbstruct-ondiskdb-set! dbstruct db) (dbr:dbstruct-dbfile-set! dbstruct dbfullname) (dbr:dbstruct-dbtmpname-set! dbstruct tmpdb) (dbr:dbstruct-dbfname-set! dbstruct dbfname) (dbr:dbstruct-sync-proc-set! dbstruct (lambda (last-update) (if *sync-in-progress* | | | 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 | (dbr:dbstruct-ondiskdb-set! dbstruct db) (dbr:dbstruct-dbfile-set! dbstruct dbfullname) (dbr:dbstruct-dbtmpname-set! dbstruct tmpdb) (dbr:dbstruct-dbfname-set! dbstruct dbfname) (dbr:dbstruct-sync-proc-set! dbstruct (lambda (last-update) (if *sync-in-progress* (debug:print 0 *default-log-port* "WARNING: overlapping calls to sync to disk") (begin ;; turn off writes - send busy or block? ;; call db2db internally ;; turn writes back on ;; (set! *api-halt-writes* #t) ;; do we need a mutex? ;; (dbmod:db-to-db-sync src-db dest-db last-update (dbfile:db-init-proc) keys) |
︙ | ︙ | |||
460 461 462 463 464 465 466 467 468 469 470 471 472 | (lambda (name) (if (equal? name "last_update") (set! has-last #t))) dbh (conc "SELECT name FROM pragma_table_info('"tablename"') as tblInfo;")) has-last)) ;; tbls is ( ("tablename" ( "field1" [#f|proc1] ) ( "field2" [#f|proc2] ) .... ) ) ;; ;; direction = fromdest, todisk ;; mode = 'full, 'incr ;; ;; Idea: youngest in dest is last_update time | > > > > > > > > > > > > > > | > | | | > | | > > > > > > > > > > > > > > > | < < < < | | | > > > > > > > > | > > > > | | > > > > > | | 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 | (lambda (name) (if (equal? name "last_update") (set! has-last #t))) dbh (conc "SELECT name FROM pragma_table_info('"tablename"') as tblInfo;")) has-last)) (define (replace-question-marks-with-number str num) (define (replace-helper str index result) (if (>= index (string-length str)) result (let ((char (string-ref str index))) (if (char=? char #\?) (replace-helper str (+ index 1) (string-append result (number->string num))) (replace-helper str (+ index 1) (string-append result (string char))))))) (replace-helper str 0 "")) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; tbls is ( ("tablename" ( "field1" [#f|proc1] ) ( "field2" [#f|proc2] ) .... ) ) ;; ;; direction = fromdest, todisk ;; mode = 'full, 'incr ;; ;; Idea: youngest in dest is last_update time ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (define (dbmod:attach-sync tables dbh destdbfile direction #!key (mode 'full) (no-update '("keys")) ;; do ) (debug:print-info 2 *default-log-port* "dbmod:attach-sync") (let* ((num-changes 0) (update-changed (lambda (num-changed table qryname) (if (> num-changed 0) (begin (debug:print-info 0 *default-log-port* "Changed "num-changed" rows for table "table", qry "qryname) (set! num-changes (+ num-changes num-changed))))))) (debug:print 2 *default-log-port* "Doing sync "direction" "destdbfile) (if (not (sqlite3:auto-committing? dbh)) (debug:print 0 *default-log-port* "Skipping sync due to transaction in flight.") (let* ((table-names (map car tables)) (dest-exists (file-exists? destdbfile))) (assert dest-exists "FATAL: sync called with non-existant file, "destdbfile) ;; attach the destdbfile ;; for each table ;; insert into dest.<table> select * from src.<table> where last_update>last_update ;; done (debug:print 2 *default-log-port* "Attaching "destdbfile" as auxdb") (handle-exceptions exn (begin (debug:print 0 "ATTACH failed, exiting. exn="(condition->list exn)) (exit 1)) (sqlite3:execute dbh (conc "ATTACH '"destdbfile"' AS auxdb;"))) (for-each (lambda (table) (let* ((dummy (debug:print 2 *default-log-port* "Doing table " table)) (tbldat (alist-ref table tables equal?)) (fields (map car tbldat)) (no-id-fields (filter (lambda (x)(not (equal? x "id"))) fields)) (fields-str (string-intersperse fields ",")) (no-id-fields-str (string-intersperse no-id-fields ",")) (dir (eq? direction 'todisk)) (fromdb (if dir "main." "auxdb.")) (todb (if dir "auxdb." "main.")) (set-str (string-intersperse (map (lambda (field) (conc fromdb field"="todb field)) fields) ",")) (stmt1 (conc "INSERT OR IGNORE INTO "todb table " SELECT * FROM "fromdb table";")) (stmt2 (conc "INSERT OR IGNORE INTO "todb table " SELECT * FROM "fromdb table" WHERE "fromdb table".id=?;")) (stmt8 (conc "UPDATE "todb table" SET ("no-id-fields-str") = (SELECT "no-id-fields-str" FROM "fromdb table" WHERE "todb table".id="fromdb table".id" (conc " AND "fromdb table".last_update > "todb table".last_update);") ");")) (update-string (conc "UPDATE "todb table" SET ")) (split-update (let () (for-each (lambda (column) (set! update-string (conc update-string column" = (SELECT "column" FROM "fromdb table" WHERE "fromdb table".id=?), ")) ) no-id-fields ) ;; drop the last ", " (conc (substring update-string 0 (-(string-length update-string) 2)) " WHERE "todb table".id=? ") ) ) (stmt9 (conc "UPDATE "todb table" SET ("no-id-fields-str") = " "(SELECT "no-id-fields-str" FROM "fromdb table" WHERE "fromdb table".id=?)" " WHERE "todb table".id=?")) (newrec (conc "SELECT id FROM "fromdb table" WHERE id NOT IN (SELECT id FROM "todb table");")) (changedrec (conc "SELECT "fromdb table".id FROM "fromdb table" join "todb table" on "fromdb table".id="todb table".id WHERE "fromdb table".last_update > "todb table".last_update;")) (start-ms (current-milliseconds)) (new-ids (sqlite3:fold-row (lambda (res id)(cons id res)) '() dbh newrec))) (update-changed (length new-ids) table "new records") (mutex-lock! *db-transaction-mutex*) (handle-exceptions exn (debug:print 0 *default-log-port* "Transaction update of id fields in "table" failed.") (sqlite3:with-transaction dbh (lambda () (for-each (lambda (id) (sqlite3:execute dbh stmt2 id)) new-ids)))) (if (member "last_update" fields) (handle-exceptions exn (debug:print 0 *default-log-port* "Transaction update of non id fields in "table" failed.") (sqlite3:with-transaction dbh (lambda () (let* ((changed-ids (sqlite3:fold-row (lambda (res id)(cons id res)) '() dbh changedrec)) (sql-query "") ) (update-changed (length changed-ids) table "changed records") (for-each (lambda (id) (let* ((update-with-ids (replace-question-marks-with-number split-update id)) ) (debug:print 2 *default-log-port* "about to do sqlite3:execute " dbh " " update-with-ids ) (handle-exceptions exn (debug:print 0 *default-log-port* "update from " fromdb table " to " todb table " failed: " ((condition-property-accessor 'exn 'message) exn)) (sqlite3:execute dbh update-with-ids) ) (debug:print 2 *default-log-port* "after sqlite3:execute") ) ) changed-ids ) ) ) ) ) ) (mutex-unlock! *db-transaction-mutex*) (debug:print 2 *default-log-port* "Synced table "table " in "(- (current-milliseconds) start-ms)"ms") )) table-names) (sqlite3:execute dbh "DETACH auxdb;"))) num-changes)) |
︙ | ︙ |
Modified megatest.scm from [11c996f69d] to [865b3a3469].
︙ | ︙ | |||
2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 | (debug:print 0 *default-log-port* "Skipping sync, there is a sync in progress.")) (set! *didsomething* #t))) (if (args:get-arg "-sync-to") (let ((toppath (launch:setup))) (tasks:sync-to-postgres *configdat* (args:get-arg "-sync-to")) (set! *didsomething* #t))) ;; use with -from and -to ;; (if (args:get-arg "-db2db") (let* ((duh (launch:setup)) (src-db (args:get-arg "-from")) (dest-db (args:get-arg "-to")) ;; (sync-period (args:get-arg-number "-period")) ;; (sync-timeout (args:get-arg-number "-timeout")) (sync-period-in (args:get-arg "-period")) (sync-timeout-in (args:get-arg "-timeout")) (sync-period (if sync-period-in (string->number sync-period-in) #f)) (sync-timeout (if sync-timeout-in (string->number sync-timeout-in) #f)) (lockfile (conc dest-db".sync-lock")) (keys (db:get-keys #f)) (thesync (lambda (last-update) (debug:print-info 0 *default-log-port* "Attempting to sync data from "src-db" to "dest-db"...") (if (not (file-exists? dest-db)) (begin (debug:print 0 *default-log-port* "Using copy to create "dest-db" from "src-db) (file-copy src-db dest-db) 1) (let ((res (dbmod:db-to-db-sync src-db dest-db last-update (dbfile:db-init-proc) keys))) (if res | > > | | > > > > > > | | 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 | (debug:print 0 *default-log-port* "Skipping sync, there is a sync in progress.")) (set! *didsomething* #t))) (if (args:get-arg "-sync-to") (let ((toppath (launch:setup))) (tasks:sync-to-postgres *configdat* (args:get-arg "-sync-to")) (set! *didsomething* #t))) ;; use with -from and -to ;; (if (args:get-arg "-db2db") (let* ((duh (launch:setup)) (src-db (args:get-arg "-from")) (dest-db (args:get-arg "-to")) ;; (sync-period (args:get-arg-number "-period")) ;; (sync-timeout (args:get-arg-number "-timeout")) (sync-period-in (args:get-arg "-period")) (sync-timeout-in (args:get-arg "-timeout")) (sync-period (if sync-period-in (string->number sync-period-in) #f)) (sync-timeout (if sync-timeout-in (string->number sync-timeout-in) #f)) (lockfile (conc dest-db".sync-lock")) (keys (db:get-keys #f)) (thesync (lambda (last-update) (debug:print-info 0 *default-log-port* "Attempting to sync data from "src-db" to "dest-db"...") (debug:print-info 0 *default-log-port* "PID = " (current-process-id)) (if (not (file-exists? dest-db)) (begin (debug:print 0 *default-log-port* "Using copy to create "dest-db" from "src-db) (file-copy src-db dest-db) 1) (let ((res (dbmod:db-to-db-sync src-db dest-db last-update (dbfile:db-init-proc) keys))) (if res (debug:print-info 2 *default-log-port* "Synced " res " records from "src-db" to "dest-db) (debug:print-info 0 *default-log-port* "No sync due to permissions or other issue.")) res)))) (start-time (current-seconds)) (synclock-mod-time (if (file-exists? lockfile) (handle-exceptions exn #f (file-modification-time synclock-file)) #f)) (age (if synclock-mod-time (- (current-seconds) synclock-mod-time) 1000)) ) (if (and src-db dest-db) (if (file-exists? src-db) (if (and (file-exists? lockfile) (< age 20)) (debug:print 0 *default-log-port* "Lock "lockfile" exists, skipping sync...") (begin (if (file-exists? lockfile) (begin (debug:print 0 *default-log-port* "Deleting old lock file " lockfile) (delete-file lockfile) ) ) (dbfile:with-simple-file-lock lockfile (lambda () (let loop ((last-changed (current-seconds)) (last-update 0)) (let* ((changes (handle-exceptions exn (begin (debug:print 0 *default-log-port* "Exception in sync: "(condition->list exn)) (delete-file lockfile) (exit)) (thesync last-update))) (now-time (current-seconds))) (if (and sync-period sync-timeout) ;; (if (and (< (- now-time start-time) 600) ;; hard limit on how long we run for (> sync-timeout (- now-time last-changed))) (begin (if sync-period (thread-sleep! sync-period)) (loop (if (> changes 0) now-time last-changed) now-time)))))))) (debug:print 0 *default-log-port* "Releasing lock file " lockfile) ) ) (debug:print 0 *default-log-port* "No sync due to unreadble or non-existant source file"src-db)) (debug:print 0 *default-log-port* "Usage for -db2db; -to and -from must be specified")) (set! *didsomething* #t))) (if (args:get-arg "-list-test-time") (let* ((toppath (launch:setup))) |
︙ | ︙ |