Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -17,10 +17,12 @@ (use (srfi 18) extras tcp stack) ;; RADT => use of require-extension? (use sqlite3 srfi-1 posix regex regex-case srfi-69 csv-xml s11n md5 message-digest base64 format dot-locking z3 typed-records) (import (prefix sqlite3 sqlite3:)) (import (prefix base64 base64:)) ;; RADT => prefix?? +(include "/nfs/site/disks/icf_fdk_cw_gwa002/srehman/fossil/dbi/dbi.scm") +(import (prefix dbi dbi:)) (declare (unit db)) (declare (uses common)) (declare (uses keys)) (declare (uses ods)) @@ -46,10 +48,11 @@ (defstruct dbr:dbstruct ;; (tmpdb #f) (dbstack #f) ;; stack for tmp db handles, do not initialize with a stack (mtdb #f) (refndb #f) + (slave-dbs '()) (homehost #f) ;; not used yet (on-homehost #f) ;; not used yet ) ;; goal is to converge on one struct for an area but for now it is too confusing @@ -280,16 +283,49 @@ (dbfexists (file-exists? (conc dbpath "/megatest.db"))) (tmpdb (db:open-megatest-db path: dbpath)) ;; lock-create-open dbpath db:initialize-main-db)) (mtdb (db:open-megatest-db)) (refndb (db:open-megatest-db path: dbpath name: "megatest_ref.db")) (write-access (file-write-access? dbpath))) + + (if (args:get-arg "-server") + (if (configf:get-section *configdat* "ext-sync") + (let* ((dblist (configf:get-section *configdat* "ext-sync")) + (res '()) + (cfgdb #f)) + (for-each (lambda (dbitem) + (let* ((stringsplit (string-split (cadr dbitem))) + (dbtype (string->symbol (car stringsplit))) + (dbpath (cadr stringsplit))) + (case dbtype + ((sqlite3) + (set! cfgdb (dbi:open dbtype (cons (cons 'dbname dbpath) '()) )) + (db:initialize-main-db (dbi:db-conn cfgdb)) + (db:initialize-run-id-db (dbi:db-conn cfgdb)) + (set! res (cons (cons cfgdb dbpath) res))) + ((pg) + (let* ((dbinfo '())) + (for-each + (lambda (x) + (if (not (eqv? (string->symbol x) dbtype)) + (let* ((pair (string-split x ":"))) + (if (not (eqv? pair '())) + (set! dbinfo (cons (cons (string->symbol (car pair)) (cadr pair)) dbinfo)))))) + stringsplit) + (set! cfgdb (dbi:open dbtype dbinfo)) + (set! res (cons (cons cfgdb (alist-ref 'host dbinfo)) res)) + ))))) + dblist) + (dbr:dbstruct-slave-dbs-set! dbstruct res) + ))) + (if (and dbexists (not write-access)) (set! *db-write-access* #f)) (dbr:dbstruct-mtdb-set! dbstruct mtdb) (dbr:dbstruct-dbstack-set! dbstruct (make-stack)) (stack-push! (dbr:dbstruct-dbstack dbstruct) tmpdb) ;; olddb is already a (cons db path) (dbr:dbstruct-refndb-set! dbstruct refndb) + ;; (mutex-unlock! *rundb-mutex*) (if (and (not dbfexists) write-access) ;; *db-write-access*) ;; did not have a prior db and do have write access (begin (debug:print 0 *default-log-port* "filling db " (db:dbdat-get-path tmpdb) " with data from " (db:dbdat-get-path mtdb)) @@ -378,79 +414,91 @@ (list ;; (list "strs" ;; '("id" #f) ;; '("str" #f)) (list "tests" - '("id" #f) - '("run_id" #f) - '("testname" #f) - '("host" #f) - '("cpuload" #f) - '("diskfree" #f) - '("uname" #f) - '("rundir" #f) - '("shortdir" #f) - '("item_path" #f) - '("state" #f) - '("status" #f) - '("attemptnum" #f) - '("final_logf" #f) - '("logdat" #f) - '("run_duration" #f) - '("comment" #f) - '("event_time" #f) - '("fail_count" #f) - '("pass_count" #f) - '("archived" #f)) + '("id" "INTEGER" 'key) + '("run_id" "INTEGER") + '("testname" "TEXT") + '("host" "TEXT") + '("cpuload" "REAL") + '("diskfree" "INTEGER") + '("uname" "TEXT") + '("rundir" "TEXT") + '("shortdir" "TEXT") + '("item_path" "TEXT") + '("state" "TEXT") + '("status" "TEXT") + '("attemptnum" "INTEGER") + '("final_logf" "TEXT") + '("logdat" "TEXT") + '("run_duration" "INTEGER") + '("comment" "TEXT") + '("event_time" "INTEGER") + '("fail_count" "INTEGER") + '("pass_count" "INTEGER") + '("archived" "INTEGER")) (list "test_steps" - '("id" #f) - '("test_id" #f) - '("stepname" #f) - '("state" #f) - '("status" #f) - '("event_time" #f) - '("comment" #f) - '("logfile" #f)) + '("id" "INTEGER" 'key) + '("test_id" "INTEGER") + '("stepname" "TEXT") + '("state" "TEXT") + '("status" "TEXT") + '("event_time" "INTEGER") + '("comment" "TEXT") + '("logfile" "TEXT")) (list "test_data" - '("id" #f) - '("test_id" #f) - '("category" #f) - '("variable" #f) - '("value" #f) - '("expected" #f) - '("tol" #f) - '("units" #f) - '("comment" #f) - '("status" #f) - '("type" #f)))) + '("id" "INTEGER" 'key) + '("test_id" "INTEGER") + '("category" "TEXT") + '("variable" "TEXT") + '("value" "REAL") + '("expected" "REAL") + '("tol" "REAL") + '("units" "TEXT") + '("comment" "TEXT") + '("status" "TEXT") + '("type" "TEXT")))) ;; needs db to get keys, this is for syncing all tables ;; (define (db:sync-main-list dbstruct) (let ((keys (db:get-keys dbstruct))) (list (list "keys" - '("id" #f) - '("fieldname" #f) - '("fieldtype" #f)) - (list "metadat" '("var" #f) '("val" #f)) - (append (list "runs" - '("id" #f)) - (map (lambda (k)(list k #f)) - (append keys - (list "runname" "state" "status" "owner" "event_time" "comment" "fail_count" "pass_count")))) + '("id" "INTEGER" 'key) + '("fieldname" "TEXT") + '("fieldtype" "TEXT")) + (list "metadat" + '("id" "INTEGER" 'key) + '("var" "TEXT") + '("val" "TEXT")) + (list "runs" + '("id" "INTEGER" 'key) + '("release" "TEXT") + '("iteration" "TEXT") + '("testsuite_mode" "TEXT") + '("runname" "TEXT") + '("state" "TEXT") + '("status" "TEXT") + '("owner" "TEXT") + '("event_time" "INTEGER") + '("comment" "TEXT") + '("fail_count" "INTEGER") + '("pass_count" "INTEGER")) + (list "test_meta" - '("id" #f) - '("testname" #f) - '("owner" #f) - '("description" #f) - '("reviewed" #f) - '("iterated" #f) - '("avg_runtime" #f) - '("avg_disk" #f) - '("tags" #f) - '("jobgroup" #f))))) + '("id" "INTEGER" 'key) + '("testname" "TEXT") + '("owner" "TEXT") + '("description" "TEXT") + '("reviewed" "INTEGER") + '("iterated" "TEXT") + '("avg_runtime" "REAL") + '("avg_disk" "REAL") + '("tags" "TEXT") + '("jobgroup" "TEXT"))))) (define (db:sync-all-tables-list dbstruct) (append (db:sync-main-list dbstruct) db:sync-tests-only)) @@ -510,24 +558,24 @@ "megatest -import-megatest.db;megatest -cleanup-db") "\"\n") (exit) ;; we can not safely continue when a db was corrupted - even if fixed. ) ;; test read/write access to the database - (let ((db (sqlite3:open-database dbpath))) + (let ((db (dbi:open 'sqlite3 (cons (cons ('dbname dbpath) '()))))) (cond ((equal? fname "megatest.db") - (sqlite3:execute db "DELETE FROM tests WHERE state='DELETED';")) + (sqlite3:executeute db "DELETE FROM tests WHERE state='DELETED';")) ((equal? fname "main.db") - (sqlite3:execute db "DELETE FROM runs WHERE state='deleted';")) + (sqlite3:executeute db "DELETE FROM runs WHERE state='deleted';")) ((string-match "\\d.db" fname) - (sqlite3:execute db "UPDATE tests SET state='DELETED' WHERE state='DELETED';")) + (sqlite3:executeute db "UPDATE tests SET state='DELETED' WHERE state='DELETED';")) ((equal? fname "monitor.db") - (sqlite3:execute "DELETE FROM servers WHERE state LIKE 'defunct%';")) + (sqlite3:executeute "DELETE FROM servers WHERE state LIKE 'defunct%';")) (else - (sqlite3:execute db "vacuum;"))) + (sqlite3:executeute db "vacuum;"))) - (finalize! db) + (dbi:close db) #t)))))) ;; tbls is ( ("tablename" ( "field1" [#f|proc1] ) ( "field2" [#f|proc2] ) .... ) ) ;; db's are dbdat's ;; @@ -534,10 +582,13 @@ ;; if last-update specified ("field-name" . time-in-seconds) ;; then sync only records where field-name >= time-in-seconds ;; IFF field-name exists ;; (define (db:sync-tables tbls last-update fromdb todb . slave-dbs) + (set! todb (cons (dbi:convert (db:dbdat-get-db todb)) (db:dbdat-get-path todb))) + (set! fromdb (cons (dbi:convert (db:dbdat-get-db fromdb)) (db:dbdat-get-path fromdb))) + (handle-exceptions exn (begin (debug:print 0 *default-log-port* "EXCEPTION: database probably overloaded or unreadable in db:sync-tables.") (print-call-chain (current-error-port)) @@ -557,13 +608,13 @@ 0) ;; this is the work to be done (cond ((not fromdb) (debug:print 3 *default-log-port* "WARNING: db:sync-tables called with fromdb missing") -1) ((not todb) (debug:print 3 *default-log-port* "WARNING: db:sync-tables called with todb missing") -2) - ((not (sqlite3:database? (db:dbdat-get-db fromdb))) + ((not (dbi:database? (db:dbdat-get-db fromdb))) (debug:print-error 0 *default-log-port* "db:sync-tables called with fromdb not a database " fromdb) -3) - ((not (sqlite3:database? (db:dbdat-get-db todb))) + ((not (dbi:database? (db:dbdat-get-db todb))) (debug:print-error 0 *default-log-port* "db:sync-tables called with todb not a database " todb) -4) (else (let ((stmts (make-hash-table)) ;; table-field => stmt (all-stmts '()) ;; ( ( stmt1 value1 ) ( stml2 value2 )) (numrecs (make-hash-table)) @@ -591,10 +642,11 @@ ";")) (full-ins (conc "INSERT OR REPLACE INTO " tablename " ( " (string-intersperse (map car fields) ",") " ) " " VALUES ( " (string-intersperse (make-list num-fields "?") ",") " );")) (fromdat '()) (fromdats '()) + (tabletypes '()) (totrecords 0) (batch-len (string->number (or (configf:lookup *configdat* "sync" "batchsize") "10"))) (todat (make-hash-table)) (count 0)) @@ -604,13 +656,13 @@ (hash-table-set! field->num field count) (set! count (+ count 1))) fields) ;; read the source table - (sqlite3:for-each-row - (lambda (a . b) - (set! fromdat (cons (apply vector a b) fromdat)) + (dbi:for-each-row + (lambda (a) + (set! fromdat (cons a fromdat)) (if (> (length fromdat) batch-len) (begin (set! fromdats (cons fromdat fromdats)) (set! fromdat '()) (set! totrecords (+ totrecords 1))))) @@ -623,47 +675,76 @@ (if (common:low-noise-print 120 "sync-records") (debug:print-info 4 *default-log-port* "found " totrecords " records to sync")) ;; read the target table - (sqlite3:for-each-row - (lambda (a . b) - (hash-table-set! todat a (apply vector a b))) + (dbi:for-each-row + (lambda (a) + (hash-table-set! todat (vector-ref a 0) a)) (db:dbdat-get-db todb) full-sel) ;; first pass implementation, just insert all changed rows (for-each (lambda (targdb) - (let* ((db (db:dbdat-get-db targdb)) - (stmth (sqlite3:prepare db full-ins))) - (db:delay-if-busy targdb) ;; NO WAITING - (for-each + (set! targdb (dbi:convert (db:dbdat-get-db targdb))) + (if (eqv? (dbi:db-dbtype targdb) 'pg) + (let* ((prep "") + (set-stmt "") + (key (car (map car fields))) + (list-fields (map car fields))) + (set! prep (string-intersperse (map cadr fields) ",")) + (set! prep (conc "PREPARE fullupdate (" prep ") AS UPDATE " tablename " SET ")) ;;maybe add lookup in the future depending on where types are needed + (let loop ((i 1)) + (set! set-stmt (conc set-stmt (list-ref list-fields i) " = $" (+ i 1) ", ")) + (if (< i (- (length list-fields) 2)) + (loop (+ i 1)) + (set! set-stmt (conc set-stmt (list-ref list-fields (+ i 1)) " = $" (+ i 2) " WHERE " key " = $1;")))) + (set! full-ins (conc prep set-stmt)))) + (let* ((db (dbi:convert (db:dbdat-get-db targdb))) + (stmth (dbi:prepare db full-ins))) + ;; (db:delay-if-busy targdb) ;; NO WAITING + + (for-each (lambda (fromdat-lst) - (sqlite3:with-transaction + (dbi:with-transaction db (lambda () (for-each ;; (lambda (fromrow) (let* ((a (vector-ref fromrow 0)) (curr (hash-table-ref/default todat a #f)) - (same #t)) + (same #t) + (res #f) + (len (length (vector->list fromrow)))) (let loop ((i 0)) (if (or (not curr) (not (equal? (vector-ref fromrow i)(vector-ref curr i)))) (set! same #f)) (if (and same (< i (- num-fields 1))) (loop (+ i 1)))) - (if (not same) - (begin - (apply sqlite3:execute stmth (vector->list fromrow)) - (hash-table-set! numrecs tablename (+ 1 (hash-table-ref/default numrecs tablename 0))))))) + (if (not same) + (begin + (set! res (apply dbi:prepare-exec stmth (vector->list fromrow))) + (hash-table-set! numrecs tablename (+ 1 (hash-table-ref/default numrecs tablename 0))))) + (if (and (not same) (eqv? (dbi:get-res res 'affected-rows) 0)) + (let* ((prep "")) + (set! prep (string-intersperse (map cadr fields) ",")) + (set! prep (conc "INSERT INTO " tablename " ( " (string-intersperse (map car fields) ",") + " ) VALUES ( " (string-intersperse (make-list len "?") ",") " );")) ;;maybe add lookup in the future depending on where types are needed + (begin + (hash-table-set! numrecs tablename (- 1 (hash-table-ref/default numrecs tablename 0))) + (apply dbi:exec db prep (vector->list fromrow)) + (hash-table-set! numrecs tablename (+ 1 (hash-table-ref/default numrecs tablename 0)))))))) + ;;(begin + ;; (dbi:prepare-exec stmth (vector->list fromrow)) + ;;(hash-table-set! numrecs tablename (+ 1 (hash-table-ref/default numrecs tablename 0))))))) fromdat-lst)) )) fromdats) - (sqlite3:finalize! stmth))) + (dbi:close stmth))) (append (list todb) slave-dbs)))) tbls) (let* ((runtime (- (current-milliseconds) start-time)) (should-print (or (debug:debug-mode 12) (common:low-noise-print 120 "db sync" (> runtime 500))))) ;; low and high sync times treated as separate. @@ -822,12 +903,13 @@ ;; (define (db:multi-db-sync dbstruct . options) (if (not (launch:setup)) (debug:print 0 *default-log-port* "ERROR: not able to setup up for megatest.") (let* ((mtdb (dbr:dbstruct-mtdb dbstruct)) - (tmpdb (db:get-db dbstruct)) + (tmpdb (db:get-db dbstruct)) (refndb (dbr:dbstruct-refndb dbstruct)) + (slave-dbs (dbr:dbstruct-slave-dbs dbstruct)) (allow-cleanup #t) ;; (if run-ids #f #t)) (tdbdat (tasks:open-db)) (servers (tasks:get-all-servers (db:delay-if-busy tdbdat))) (data-synced 0)) ;; count of changed records (I hope) @@ -874,11 +956,11 @@ ;; now ensure all newdb data are synced to megatest.db ;; do not use the run-ids list passed in to the function ;; (if (member 'new2old options) (set! data-synced - (+ (db:sync-tables (db:sync-all-tables-list dbstruct) #f tmpdb refndb mtdb) + (+ (apply db:sync-tables (db:sync-all-tables-list dbstruct) #f tmpdb refndb mtdb slave-dbs) data-synced))) (if (member 'fixschema options) (begin @@ -1148,11 +1230,11 @@ fail_count INTEGER DEFAULT 0, pass_count INTEGER DEFAULT 0, archived INTEGER DEFAULT 0, -- 0=no, > 1=archive block id where test data can be found last_update INTEGER DEFAULT (strftime('%s','now')), CONSTRAINT testsconstraint UNIQUE (run_id, testname, item_path));") - (sqlite3:execute db "CREATE INDEX IF NOT EXISTS tests_index ON tests (run_id, testname, item_path, uname);") + (sqlite3:execute db "CREATE INDEX IF NOT EXISTS tests_index ON tests (run_id, testname, item_path);") (sqlite3:execute db "CREATE TRIGGER IF NOT EXISTS update_tests_trigger AFTER UPDATE ON tests FOR EACH ROW BEGIN UPDATE tests SET last_update=(strftime('%s','now')) WHERE id=old.id;