︙ | | |
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
+
+
|
;; dbstruct vector containing all the relevant dbs like main.db, megatest.db, run.db etc
(use (srfi 18) extras tcp stack) ;; RADT => use of require-extension?
(use sqlite3 srfi-1 posix regex regex-case srfi-69 csv-xml s11n md5 message-digest base64 format dot-locking z3 typed-records)
(import (prefix sqlite3 sqlite3:))
(import (prefix base64 base64:)) ;; RADT => prefix??
(include "/nfs/site/disks/icf_fdk_cw_gwa002/srehman/fossil/dbi/dbi.scm")
(import (prefix dbi dbi:))
(declare (unit db))
(declare (uses common))
(declare (uses keys))
(declare (uses ods))
(declare (uses client))
(declare (uses mt))
|
︙ | | |
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
+
|
;; I propose this record evolves into the area record
;;
(defstruct dbr:dbstruct
;; (tmpdb #f)
(dbstack #f) ;; stack for tmp db handles, do not initialize with a stack
(mtdb #f)
(refndb #f)
(slave-dbs '())
(homehost #f) ;; not used yet
(on-homehost #f) ;; not used yet
) ;; goal is to converge on one struct for an area but for now it is too confusing
;; record for keeping state,status and count for doing roll-ups in
;; iterated tests
|
︙ | | |
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
|
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
|
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
|
(let* ((dbpath (db:dbfile-path)) ;; 0))
(dbexists (file-exists? dbpath))
(dbfexists (file-exists? (conc dbpath "/megatest.db")))
(tmpdb (db:open-megatest-db path: dbpath)) ;; lock-create-open dbpath db:initialize-main-db))
(mtdb (db:open-megatest-db))
(refndb (db:open-megatest-db path: dbpath name: "megatest_ref.db"))
(write-access (file-write-access? dbpath)))
(if (args:get-arg "-server")
(if (configf:get-section *configdat* "ext-sync")
(let* ((dblist (configf:get-section *configdat* "ext-sync"))
(res '())
(cfgdb #f))
(for-each (lambda (dbitem)
(let* ((stringsplit (string-split (cadr dbitem)))
(dbtype (string->symbol (car stringsplit)))
(dbpath (cadr stringsplit)))
(case dbtype
((sqlite3)
(set! cfgdb (dbi:open dbtype (cons (cons 'dbname dbpath) '()) ))
(db:initialize-main-db (dbi:db-conn cfgdb))
(db:initialize-run-id-db (dbi:db-conn cfgdb))
(set! res (cons (cons cfgdb dbpath) res)))
((pg)
(let* ((dbinfo '()))
(for-each
(lambda (x)
(if (not (eqv? (string->symbol x) dbtype))
(let* ((pair (string-split x ":")))
(if (not (eqv? pair '()))
(set! dbinfo (cons (cons (string->symbol (car pair)) (cadr pair)) dbinfo))))))
stringsplit)
(set! cfgdb (dbi:open dbtype dbinfo))
(set! res (cons (cons cfgdb (alist-ref 'host dbinfo)) res))
)))))
dblist)
(dbr:dbstruct-slave-dbs-set! dbstruct res)
)))
(if (and dbexists (not write-access))
(set! *db-write-access* #f))
(dbr:dbstruct-mtdb-set! dbstruct mtdb)
(dbr:dbstruct-dbstack-set! dbstruct (make-stack))
(stack-push! (dbr:dbstruct-dbstack dbstruct) tmpdb) ;; olddb is already a (cons db path)
(dbr:dbstruct-refndb-set! dbstruct refndb)
;; (mutex-unlock! *rundb-mutex*)
(if (and (not dbfexists)
write-access) ;; *db-write-access*) ;; did not have a prior db and do have write access
(begin
(debug:print 0 *default-log-port* "filling db " (db:dbdat-get-path tmpdb) " with data from " (db:dbdat-get-path mtdb))
(db:sync-tables (db:sync-all-tables-list dbstruct) #f mtdb refndb tmpdb))
(debug:print 0 *default-log-port* " db, " (db:dbdat-get-path tmpdb) " already exists, not propogating data from " (db:dbdat-get-path mtdb)))
|
︙ | | |
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
|
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
|
-
+
-
+
-
+
-
+
-
+
-
+
-
+
+
+
+
|
(if (member fname '("megatest.db" "monitor.db"))
"megatest -cleanup-db"
"megatest -import-megatest.db;megatest -cleanup-db")
"\"\n")
(exit) ;; we can not safely continue when a db was corrupted - even if fixed.
)
;; test read/write access to the database
(let ((db (sqlite3:open-database dbpath)))
(let ((db (dbi:open 'sqlite3 (cons (cons ('dbname dbpath) '())))))
(cond
((equal? fname "megatest.db")
(sqlite3:execute db "DELETE FROM tests WHERE state='DELETED';"))
(sqlite3:executeute db "DELETE FROM tests WHERE state='DELETED';"))
((equal? fname "main.db")
(sqlite3:execute db "DELETE FROM runs WHERE state='deleted';"))
(sqlite3:executeute db "DELETE FROM runs WHERE state='deleted';"))
((string-match "\\d.db" fname)
(sqlite3:execute db "UPDATE tests SET state='DELETED' WHERE state='DELETED';"))
(sqlite3:executeute db "UPDATE tests SET state='DELETED' WHERE state='DELETED';"))
((equal? fname "monitor.db")
(sqlite3:execute "DELETE FROM servers WHERE state LIKE 'defunct%';"))
(sqlite3:executeute "DELETE FROM servers WHERE state LIKE 'defunct%';"))
(else
(sqlite3:execute db "vacuum;")))
(sqlite3:executeute db "vacuum;")))
(finalize! db)
(dbi:close db)
#t))))))
;; tbls is ( ("tablename" ( "field1" [#f|proc1] ) ( "field2" [#f|proc2] ) .... ) )
;; db's are dbdat's
;;
;; if last-update specified ("field-name" . time-in-seconds)
;; then sync only records where field-name >= time-in-seconds
;; IFF field-name exists
;;
(define (db:sync-tables tbls last-update fromdb todb . slave-dbs)
(set! todb (cons (dbi:convert (db:dbdat-get-db todb)) (db:dbdat-get-path todb)))
(set! fromdb (cons (dbi:convert (db:dbdat-get-db fromdb)) (db:dbdat-get-path fromdb)))
(handle-exceptions
exn
(begin
(debug:print 0 *default-log-port* "EXCEPTION: database probably overloaded or unreadable in db:sync-tables.")
(print-call-chain (current-error-port))
(debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn))
(print "exn=" (condition->list exn))
|
︙ | | |
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
|
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
|
-
+
-
+
|
(cons todb slave-dbs))
0)
;; this is the work to be done
(cond
((not fromdb) (debug:print 3 *default-log-port* "WARNING: db:sync-tables called with fromdb missing") -1)
((not todb) (debug:print 3 *default-log-port* "WARNING: db:sync-tables called with todb missing") -2)
((not (sqlite3:database? (db:dbdat-get-db fromdb)))
((not (dbi:database? (db:dbdat-get-db fromdb)))
(debug:print-error 0 *default-log-port* "db:sync-tables called with fromdb not a database " fromdb) -3)
((not (sqlite3:database? (db:dbdat-get-db todb)))
((not (dbi:database? (db:dbdat-get-db todb)))
(debug:print-error 0 *default-log-port* "db:sync-tables called with todb not a database " todb) -4)
(else
(let ((stmts (make-hash-table)) ;; table-field => stmt
(all-stmts '()) ;; ( ( stmt1 value1 ) ( stml2 value2 ))
(numrecs (make-hash-table))
(start-time (current-milliseconds))
(tot-count 0))
|
︙ | | |
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
|
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
|
+
-
-
-
+
+
+
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
+
+
+
-
+
-
-
-
+
+
+
+
+
+
-
+
|
(conc " " (car last-update) ">=" (cdr last-update))
"")
";"))
(full-ins (conc "INSERT OR REPLACE INTO " tablename " ( " (string-intersperse (map car fields) ",") " ) "
" VALUES ( " (string-intersperse (make-list num-fields "?") ",") " );"))
(fromdat '())
(fromdats '())
(tabletypes '())
(totrecords 0)
(batch-len (string->number (or (configf:lookup *configdat* "sync" "batchsize") "10")))
(todat (make-hash-table))
(count 0))
;; set up the field->num table
(for-each
(lambda (field)
(hash-table-set! field->num field count)
(set! count (+ count 1)))
fields)
;; read the source table
(sqlite3:for-each-row
(lambda (a . b)
(set! fromdat (cons (apply vector a b) fromdat))
(dbi:for-each-row
(lambda (a)
(set! fromdat (cons a fromdat))
(if (> (length fromdat) batch-len)
(begin
(set! fromdats (cons fromdat fromdats))
(set! fromdat '())
(set! totrecords (+ totrecords 1)))))
(db:dbdat-get-db fromdb)
full-sel)
;; tack on remaining records in fromdat
(if (not (null? fromdat))
(set! fromdats (cons fromdat fromdats)))
(if (common:low-noise-print 120 "sync-records")
(debug:print-info 4 *default-log-port* "found " totrecords " records to sync"))
;; read the target table
(sqlite3:for-each-row
(lambda (a . b)
(hash-table-set! todat a (apply vector a b)))
(dbi:for-each-row
(lambda (a)
(hash-table-set! todat (vector-ref a 0) a))
(db:dbdat-get-db todb)
full-sel)
;; first pass implementation, just insert all changed rows
(for-each
(lambda (targdb)
(set! targdb (dbi:convert (db:dbdat-get-db targdb)))
(if (eqv? (dbi:db-dbtype targdb) 'pg)
(let* ((prep ""))
(for-each
(lambda (row)
(set! tabletypes (cons (cons (string->symbol (vector-ref row 1)) (vector-ref row 2)) tabletypes)))
(dbi:pull-metadata (db:dbdat-get-db fromdb) tablename))
(set! prep (string-intersperse (map (lambda (x) (alist-ref (string->symbol (car x)) tabletypes)) fields) ","))
(set! prep (conc "PREPARE fullins (" prep ") AS REPLACE INTO " tablename " ( " (string-intersperse (map car fields) ",") " ) VALUES ( "))
(let loop ((i 1))
(set! prep (conc prep "$" i ","))
(if (< i (- num-fields 1))
(loop (+ i 1))
(set! prep (conc prep "$" (+ i 1) " );"))))
(set! full-ins prep)))
(let* ((db (db:dbdat-get-db targdb))
(stmth (sqlite3:prepare db full-ins)))
(db:delay-if-busy targdb) ;; NO WAITING
(let* ((db (dbi:convert (db:dbdat-get-db targdb)))
(stmth (dbi:prepare db full-ins)))
;; (db:delay-if-busy targdb) ;; NO WAITING
(for-each
(lambda (fromdat-lst)
(sqlite3:with-transaction
(dbi:with-transaction
db
(lambda ()
(for-each ;;
(lambda (fromrow)
(let* ((a (vector-ref fromrow 0))
(curr (hash-table-ref/default todat a #f))
(same #t))
(let loop ((i 0))
(if (or (not curr)
(not (equal? (vector-ref fromrow i)(vector-ref curr i))))
(set! same #f))
(if (and same
(< i (- num-fields 1)))
(loop (+ i 1))))
(if (not same)
(begin
(apply sqlite3:execute stmth (vector->list fromrow))
(hash-table-set! numrecs tablename (+ 1 (hash-table-ref/default numrecs tablename 0)))))))
(begin
(apply dbi:prepare-exec stmth (vector->list fromrow))
(hash-table-set! numrecs tablename (+ 1 (hash-table-ref/default numrecs tablename 0)))))))
;;(begin
;; (dbi:prepare-exec stmth (vector->list fromrow))
;;(hash-table-set! numrecs tablename (+ 1 (hash-table-ref/default numrecs tablename 0)))))))
fromdat-lst))
))
fromdats)
(sqlite3:finalize! stmth)))
(dbi:close stmth)))
(append (list todb) slave-dbs))))
tbls)
(let* ((runtime (- (current-milliseconds) start-time))
(should-print (or (debug:debug-mode 12)
(common:low-noise-print 120 "db sync" (> runtime 500))))) ;; low and high sync times treated as separate.
(if should-print (debug:print 3 *default-log-port* "INFO: db sync, total run time " runtime " ms"))
(for-each
|
︙ | | |
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
|
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
|
-
+
+
|
;;
;; run-ids: '(1 2 3 ...) or #f (for all)
;;
(define (db:multi-db-sync dbstruct . options)
(if (not (launch:setup))
(debug:print 0 *default-log-port* "ERROR: not able to setup up for megatest.")
(let* ((mtdb (dbr:dbstruct-mtdb dbstruct))
(tmpdb (db:get-db dbstruct))
(tmpdb (db:get-db dbstruct))
(refndb (dbr:dbstruct-refndb dbstruct))
(slave-dbs (dbr:dbstruct-slave-dbs dbstruct))
(allow-cleanup #t) ;; (if run-ids #f #t))
(tdbdat (tasks:open-db))
(servers (tasks:get-all-servers (db:delay-if-busy tdbdat)))
(data-synced 0)) ;; count of changed records (I hope)
;; kill servers
(if (member 'killservers options)
|
︙ | | |
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
|
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
|
-
+
|
;; run-ids)))
;; now ensure all newdb data are synced to megatest.db
;; do not use the run-ids list passed in to the function
;;
(if (member 'new2old options)
(set! data-synced
(+ (db:sync-tables (db:sync-all-tables-list dbstruct) #f tmpdb refndb mtdb)
(+ (apply db:sync-tables (db:sync-all-tables-list dbstruct) #f tmpdb refndb mtdb slave-dbs)
data-synced)))
(if (member 'fixschema options)
(begin
(db:patch-schema-maindb (db:dbdat-get-db mtdb))
(db:patch-schema-maindb (db:dbdat-get-db tmpdb))
|
︙ | | |
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
|
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
|
-
+
|
comment TEXT DEFAULT '',
event_time TIMESTAMP DEFAULT (strftime('%s','now')),
fail_count INTEGER DEFAULT 0,
pass_count INTEGER DEFAULT 0,
archived INTEGER DEFAULT 0, -- 0=no, > 1=archive block id where test data can be found
last_update INTEGER DEFAULT (strftime('%s','now')),
CONSTRAINT testsconstraint UNIQUE (run_id, testname, item_path));")
(sqlite3:execute db "CREATE INDEX IF NOT EXISTS tests_index ON tests (run_id, testname, item_path, uname);")
(sqlite3:execute db "CREATE INDEX IF NOT EXISTS tests_index ON tests (run_id, testname, item_path);")
(sqlite3:execute db "CREATE TRIGGER IF NOT EXISTS update_tests_trigger AFTER UPDATE ON tests
FOR EACH ROW
BEGIN
UPDATE tests SET last_update=(strftime('%s','now'))
WHERE id=old.id;
END;")
(sqlite3:execute db "CREATE TABLE IF NOT EXISTS test_steps
|
︙ | | |