Megatest

Check-in [a8fa1eb8a2]
Login
Overview
Comment:Fixed attach sync
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | v1.80
Files: files | file ages | folders
SHA1: a8fa1eb8a2a9ec31e7e1b7d7d616392c86de8eb6
User & Date: matt on 2023-05-23 19:19:00
Other Links: branch diff | manifest | tags
Context
2023-05-23
20:48
Simplify the locking scenario for sync check-in: cdf8c77efe user: matt tags: v1.80
19:19
Fixed attach sync check-in: a8fa1eb8a2 user: matt tags: v1.80
2023-05-22
21:38
Couple untested fixes check-in: 4a2131ba1d user: matt tags: v1.80
Changes

Modified dbmod.scm from [3017a3343d] to [72e5b21e7c].

257
258
259
260
261
262
263
264


265
266

267
268
269
270
271
272
273
257
258
259
260
261
262
263

264
265
266

267
268
269
270
271
272
273
274







-
+
+

-
+







							   exn
							 #f
							 (delete-file synclock-file))
						       (thethread))
						     (debug:print 0 *default-log-port* "Skipping sync, lockfile "synclock-file" found."))
						 (thethread)))))))
    ;; (dbmod:sync-tables tables #f db cachedb)
    ;; (if db
    ;; 
    (thread-sleep! 1) ;; let things settle before syncing in needed data
    (dbmod:sync-gasket tables #f cachedb db dbfullname 'fromdest keys) ;; ) ;; load into cachedb
    (dbr:dbstruct-last-update-set! dbstruct (current-seconds)) ;; should this be offset back in time by one second?
    (dbr:dbstruct-last-update-set! dbstruct (+ (current-seconds) -10)) ;; should this be offset back in time by one second?
    dbstruct))

;;    (if (eq? syncdir 'todisk) ;; sync to disk normally, sync from in dashboard
;;        (dbmod:sync-tables tables last-update cachedb db)
;;        (dbmod:sync-tables tables last-update db cachedb))))
;;
;; direction: 'fromdest 'todest
466
467
468
469
470
471
472
473

474
475
476
477
478
479
480
481






482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
































512
513
514
515
516













517
518
519
520
521
522
523
524
525

526
527
528
529
530
531
532
533
534
535
536
537
538
539

















540
541


542
543
544
545
546



547
548
549
550

551
552

553
554
555

556
557
558
559

560
561
562
563
564
565
566
467
468
469
470
471
472
473

474
475
476
477
478
479
480
481
482
483
484
485
486
487
488






























489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520





521
522
523
524
525
526
527
528
529
530
531
532
533









534














535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551


552
553





554
555
556




557


558



559




560
561
562
563
564
565
566
567







-
+








+
+
+
+
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
+
+
-
-
-
-
-
+
+
+
-
-
-
-
+
-
-
+
-
-
-
+
-
-
-
-
+







	   (set! has-last #t)))
     dbh
     (conc "SELECT name FROM pragma_table_info('"tablename"') as tblInfo;"))
    has-last))

;; tbls is ( ("tablename" ( "field1" [#f|proc1] ) ( "field2" [#f|proc2] ) .... ) )
;;
;; direction = fromdest, todest
;; direction = fromdest, todisk
;; mode = 'full, 'incr
;;
;; Idea: youngest in dest is last_update time
;;
(define (dbmod:attach-sync tables dbh destdbfile direction #!key
			   (mode 'full)
			   (no-update '("keys")) ;; do
			   )
  (let* ((num-changes 0)
	 (update-changed (lambda (num-changed table qryname)
			   (if (> num-changed 0)
			       (begin
				 (debug:print-info 0 *default-log-port* "Changed "num-changed" rows for table "table", qry "qryname)
				 (set! num-changes (+ num-changes num-changed)))))))
  (debug:print 0 *default-log-port* "Doing sync "direction" "destdbfile)
  (if (not (sqlite3:auto-committing? dbh))
      (debug:print 0 *default-log-port* "Skipping sync due to transaction in flight.")
      (let* ((table-names  (map car tables))
	     (dest-exists  (file-exists? destdbfile)))
	(assert dest-exists "FATAL: sync called with non-existant file, "destdbfile)
	;; attach the destdbfile
	;; for each table
	;;    insert into dest.<table> select * from src.<table> where last_update>last_update
	;; done
	(debug:print 0 *default-log-port* "Attaching "destdbfile" as auxdb")
	(sqlite3:execute dbh (conc "ATTACH '"destdbfile"' AS auxdb;"))
	(for-each
	 (lambda (table)
	   (let* ((tbldat (alist-ref table tables equal?))
		  (fields (map car tbldat))
		  (no-id-fields (filter (lambda (x)(not (equal? x "id"))) fields))
		  (fields-str (string-intersperse fields ","))
		  (no-id-fields-str (string-intersperse no-id-fields ","))
		  (dir    (eq? direction 'todest))
		  (fromdb (if dir "" "auxdb."))
		  (todb   (if dir "auxdb." ""))
		  (set-str (string-intersperse
			    (map (lambda (field)
				   (conc fromdb field"="todb field))
				 fields)
			    ","))
		  (stmt1 (conc "INSERT OR IGNORE INTO "todb table
			       " SELECT * FROM "fromdb table";"))
		  (stmt8 (conc "UPDATE "todb table" SET ("no-id-fields-str") = (SELECT "no-id-fields-str" FROM "fromdb table" WHERE "todb table".id="fromdb table".id"
    (debug:print 0 *default-log-port* "Doing sync "direction" "destdbfile)
    (if (not (sqlite3:auto-committing? dbh))
	(debug:print 0 *default-log-port* "Skipping sync due to transaction in flight.")
	(let* ((table-names  (map car tables))
	       (dest-exists  (file-exists? destdbfile)))
	  (assert dest-exists "FATAL: sync called with non-existant file, "destdbfile)
	  ;; attach the destdbfile
	  ;; for each table
	  ;;    insert into dest.<table> select * from src.<table> where last_update>last_update
	  ;; done
	  (debug:print 0 *default-log-port* "Attaching "destdbfile" as auxdb")
	  (sqlite3:execute dbh (conc "ATTACH '"destdbfile"' AS auxdb;"))
	  (for-each
	   (lambda (table)
	     (let* ((tbldat (alist-ref table tables equal?))
		    (fields (map car tbldat))
		    (no-id-fields (filter (lambda (x)(not (equal? x "id"))) fields))
		    (fields-str (string-intersperse fields ","))
		    (no-id-fields-str (string-intersperse no-id-fields ","))
		    (dir    (eq? direction 'todisk))
		    (fromdb (if dir "main." "auxdb."))
		    (todb   (if dir "auxdb." "main."))
		    (set-str (string-intersperse
			      (map (lambda (field)
				     (conc fromdb field"="todb field))
				   fields)
			      ","))
		    (stmt1      (conc "INSERT OR IGNORE INTO "todb table
				      " SELECT * FROM "fromdb table";"))
		    (stmt2      (conc "INSERT OR IGNORE INTO "todb table
				      " SELECT * FROM "fromdb table" WHERE "fromdb table".id=?;"))
		    (stmt8      (conc "UPDATE "todb table" SET ("no-id-fields-str") = (SELECT "no-id-fields-str" FROM "fromdb table" WHERE "todb table".id="fromdb table".id"
			       (if (member "last_update" fields)
				   (conc " AND "fromdb table".last_update > "todb table".last_update);")
				   ");")))
		  (start-ms (current-milliseconds)))
	     ;; (debug:print 0 *default-log-port* "stmt8="stmt8)
				      (conc " AND "fromdb table".last_update > "todb table".last_update);")
				      ");"))
		    (stmt9      (conc "UPDATE "todb table" SET ("no-id-fields-str") = "
				      "(SELECT "no-id-fields-str" FROM "fromdb table" WHERE "fromdb table".id=?)"
				      " WHERE "todb table".id=?"))
		    (newrec     (conc "SELECT id FROM "fromdb table" WHERE id NOT IN (SELECT id FROM "todb table");"))
		    #;(changedrec (conc "SELECT id FROM "fromdb table" WHERE "fromdb table".last_update > "todb table".last_update AND "
		    fromdb table".id="todb table".id;")) ;; main = fromdb
		    (changedrec (conc "SELECT "fromdb table".id FROM "fromdb table" join "todb table" on "fromdb table".id="todb table".id WHERE "fromdb table".last_update > "todb table".last_update;"))
                                    ;; SELECT main.tests.id FROM main.tests join auxdb.tests on main.tests.id=auxdb.tests.id WHERE main.tests.last_update > auxdb.tests.last_update;"
		    (start-ms   (current-milliseconds))
		    (new-ids    (sqlite3:fold-row (lambda (res id)(cons id res)) '() dbh newrec)))
	       ;; (debug:print 0 *default-log-port* "Got "(length aux-ids)" in aux-ids and "(length main-ids)" in main-ids")
	     ;; (if (sqlite3:auto-committing? dbh)
	     ;; (begin
	     (mutex-lock! *db-transaction-mutex*)
	     (sqlite3:with-transaction
	      dbh
	      (lambda ()
		(debug:print-info 0 *default-log-port* "Sync from "fromdb table" to "todb table" using "stmt1)
		(sqlite3:execute dbh stmt1)    ;; get all new rows
		
	       (update-changed (length new-ids) table "new records")
		#;(if (member "last_update" fields)
		(sqlite3:execute dbh stmt8))    ;; get all updated rows
		;; (sqlite3:execute dbh stmt5)
		;; (sqlite3:execute dbh stmt4) ;; if it worked this would be better for incremental up
		;; (sqlite3:execute dbh stmt6)
		))
	     (debug:print 0 *default-log-port* "Synced table "table
			  " in "(- (current-milliseconds) start-ms)"ms") ;; )
	     (mutex-unlock! *db-transaction-mutex*)))
	     
	 ;; (debug:print 0 *default-log-port* "Skipping sync of table "table" due to transaction in flight."))))
	 table-names)
	(sqlite3:execute dbh "DETACH auxdb;"))))

	       (mutex-lock! *db-transaction-mutex*)
	       (sqlite3:with-transaction
		dbh
		(lambda ()
		  (for-each (lambda (id)
			      (sqlite3:execute dbh stmt2 id))
			    new-ids)))
	       (if (member "last_update" fields)
		   (sqlite3:with-transaction
		    dbh
		    (lambda ()
		      (let* ((changed-ids  (sqlite3:fold-row (lambda (res id)(cons id res)) '() dbh changedrec)))
			(update-changed (length changed-ids) table "changed records")
			(for-each (lambda (id)
				    (sqlite3:execute dbh stmt9 id id))
				  changed-ids)))))
	       
;; FAILED ATTEMPTS

	       (mutex-unlock! *db-transaction-mutex*)
	       
	     ;; (if (not (has-last-update dbh table))
	     ;;     (sqlite3:execute dbh (conc "ALTER TABLE "table" ADD COLUMN last_update INTEGER;")))
	     ;; (if (not (has-last-update dbh (conc "auxdb."table)))
	     ;;     (sqlite3:execute dbh (conc "ALTER TABLE auxdb."table" ADD COLUMN last_update INTEGER;")))

	       (debug:print 0 *default-log-port* "Synced table "table
	        	    " in "(- (current-milliseconds) start-ms)"ms")
	       
                ;; (stmt2 (conc "INSERT OR REPLACE INTO "todb table
		;; 	       " SELECT * FROM "fromdb table" WHERE "
		;; 	       fromdb table".last_update > "
		;; 	       todb table".last_update;"))
	       ))
		;; (stmt3 (conc "INSERT OR REPLACE INTO "todb"."table
		;; 	       " SELECT * FROM "fromdb table";"))
	   table-names)
		;; (stmt4 (conc "DELETE FROM "todb table" WHERE "fromdb
		;; 	       table ".last_update > "todb table".last_update;"))
		;; (stmt5 (conc "DELETE FROM "todb table";"))
	  (sqlite3:execute dbh "DETACH auxdb;")))
		;; (stmt6 (conc "INSERT OR REPLACE INTO "todb table" ("fields-str") SELECT "fields-str" FROM "fromdb table";"))
		;; (stmt7 (conc "UPDATE "todb table" SET "set-str (if (member "last_update" fields)
		;; 						     (conc " WHERE "fromdb table".last_update > "todb table".last_update;")
		;; 						     ";")))
    num-changes))

;; prefix is "" or "auxdb."
;;
;; (define (dbmod:last-update-patch dbh prefix)
;;   (let ((
  
;; tbls is ( ("tablename" ( "field1" [#f|proc1] ) ( "field2" [#f|proc2] ) .... ) )

Modified megatest.scm from [ed67bbef31] to [d75f7cc018].

21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
21
22
23
24
25
26
27


28
29
30
31
32
33
34







-
-








;; fake out readline usage of toplevel-command
(define (toplevel-command . a) #f)

(declare (uses common))
;; (declare (uses megatest-version))
;; (declare (uses margs))
;; (declare (uses mtargs))
;; (declare (uses mtargs.import))
(declare (uses mtargs))
(declare (uses mtargs.import))
(declare (uses debugprint))
(declare (uses debugprint.import))
(declare (uses commonmod))
(declare (uses commonmod.import))

2569
2570
2571
2572
2573
2574
2575
2576
2577
2578
2579






2580
2581
2582
2583
2584
2585
2586
2567
2568
2569
2570
2571
2572
2573




2574
2575
2576
2577
2578
2579
2580
2581
2582
2583
2584
2585
2586







-
-
-
-
+
+
+
+
+
+








;; use with -from and -to
;;
(if (args:get-arg "-db2db")
    (let* ((duh         (launch:setup))
	   (src-db      (args:get-arg "-from"))
	   (dest-db     (args:get-arg "-to"))
	   (sync-period (args:get-arg-number "-period"))
	   (sync-timeout (args:get-arg-number "-timeout"))
	   ;; (sync-period (if sync-period-in (string->number sync-period-in) #f))
	   ;; (sync-timeout (if sync-timeout-in (string->number sync-timeout-in) #f))
	   ;; (sync-period (args:get-arg-number "-period"))
	   ;; (sync-timeout (args:get-arg-number "-timeout"))
	   (sync-period-in (args:get-arg "-period"))
	   (sync-timeout-in (args:get-arg "-timeout"))
	   (sync-period (if sync-period-in (string->number sync-period-in) #f))
	   (sync-timeout (if sync-timeout-in (string->number sync-timeout-in) #f))
	   (lockfile    (conc dest-db".sync-lock"))
	   (keys        (db:get-keys #f))
	   (thesync     (lambda (last-update)
			  (debug:print-info 0 *default-log-port* "Attempting to sync data from "src-db" to "dest-db"...")
			  (if (not (file-exists? dest-db))
			      (begin
				(dbfile:with-simple-file-lock

Modified transport-mode.scm.template from [1f45cb7fe6] to [9dbf69644d].

11
12
13
14
15
16
17
18

19
20
21
22
11
12
13
14
15
16
17

18
19
20
21
22







-
+





;; uncomment this block to test without tcp
;; (dbfile:sync-method 'none)
;; (dbfile:cache-method 'none)
;; (rmt:transport-mode 'nfs)

;; uncomment this block to test with tcp
(dbfile:sync-method 'original) ;; attach) ;; original
(dbfile:sync-method 'attach) ;; attach) ;; original
(dbfile:cache-method 'tmp)
(rmt:transport-mode 'tcp)