Index: api.scm ================================================================== --- api.scm +++ api.scm @@ -239,23 +239,32 @@ (api:dispatch-request dbstruct cmd run-id params)))))) (set! *api-process-request-count* newcount) (set! *db-last-access* (current-seconds)) (match indat ((cmd run-id params meta) - (let* ((ttdat *server-info*) + (let* ((db-ok (let* ((dbfname (dbmod:run-id->dbfname run-id)) + (ok (equal? dbfname (dbr:dbstruct-dbfname dbstruct)))) + (case cmd + ((ping) #t) ;; we are fine + (else + (if (not ok)(debug:print 0 *default-log-port* "ERROR: "cmd", run-id "run-id", not correct for dbfname "(dbr:dbstruct-dbfname dbstruct))) + (assert ok "FATAL: database file and run-id not aligned."))))) + (ttdat *server-info*) (server-state (tt-state ttdat)) (status (cond ;; ((> newcount 600) 'busy) - ;; ((> newcount 300) 'loaded) + ((> newcount 300) 'loaded) (else 'ok))) (errmsg (case status ((busy) (conc "Server overloaded, "newcount" threads in flight")) ((loaded) (conc "Server loaded, "newcount" threads in flight")) (else #f))) (result (case status ((busy) (- newcount 29)) ;; call back in as many seconds ((loaded) + (if (eq? (rmt:transport-mode) 'tcp) + (thread-sleep! 0.5)) (normal-proc cmd run-id params)) (else (normal-proc cmd run-id params)))) (meta (case cmd ((ping) `((sstate . ,server-state))) @@ -266,12 +275,11 @@ (else (assert #f "FATAL: failed to deserialize indat "indat)))))) (define (api:dispatch-request dbstruct cmd run-id params) - (if (and (not *no-sync-db*) - (member cmd '(db:no-sync-set db:no-sync-get/default db:no-sync-del! db:no-sync-get-lock ))) + (if (not *no-sync-db*) (db:open-no-sync-db)) (case cmd ;;=============================================== ;; READ/WRITE QUERIES ;;=============================================== Index: common.scm ================================================================== --- common.scm +++ common.scm @@ -163,11 +163,11 @@ ;; (define *db-multi-sync-mutex* (make-mutex)) ;; protect access to *db-sync-in-progress*, *db-last-sync* ;; task db (define *task-db* #f) ;; (vector db path-to-db) (define *db-access-allowed* #t) ;; flag to allow access ;; (define *db-access-mutex* (make-mutex)) ;; moved to dbfile -(define *db-transaction-mutex* (make-mutex)) +;; (define *db-transaction-mutex* (make-mutex)) (define *db-cache-path* #f) ;; (define *db-with-db-mutex* (make-mutex)) (define *db-api-call-time* (make-hash-table)) ;; hash of command => (list of times) ;; SERVER Index: dbfile.scm ================================================================== --- dbfile.scm +++ dbfile.scm @@ -73,10 +73,11 @@ ;; ;; for the inmem approach (see dbmod.scm) ;; this is one db per server (inmem #f) ;; handle for the in memory copy (dbfile #f) ;; path to the db file on disk + (dbfname #f) ;; short name of db file on disk (used to validate accessing correct db) (ondiskdb #f) ;; handle for the on-disk file (dbdat #f) ;; create a dbdat for the downstream calls such as db:with-db (last-update 0) (sync-proc #f) ) @@ -116,10 +117,12 @@ (define *api-process-request-count* 0) (define *db-write-access* #t) (define *db-last-sync* 0) ;; last time the sync to megatest.db happened (define *db-multi-sync-mutex* (make-mutex)) ;; protect access to *db-sync-in-progress*, *db-last-sync* (define *db-last-access* (current-seconds)) + +(define *db-transaction-mutex* (make-mutex)) (define (db:generic-error-printout exn . message) (print-call-chain (current-error-port)) (apply dbfile:print-err message) (dbfile:print-err @@ -1330,20 +1333,23 @@ (if gotlock (let ((res (proc))) (dbfile:simple-file-release-lock fname) res) (assert #t "FATAL: simple file lock never got a lock.")))) - + +(define *get-cache-stmth-mutex* (make-mutex)) + (define (db:get-cache-stmth dbdat db stmt) + (mutex-lock! *get-cache-stmth-mutex*) (let* (;; (dbdat (dbfile:get-dbdat dbstruct run-id)) (stmt-cache (dbr:dbdat-stmt-cache dbdat)) ;; (stmth (db:hoh-get stmt-cache db stmt)) - (stmth (hash-table-ref/default stmt-cache stmt #f))) - (or stmth - (let* ((newstmth (sqlite3:prepare db stmt))) - ;; (db:hoh-set! stmt-cache db stmt newstmth) - (hash-table-set! stmt-cache stmt newstmth) - newstmth)))) - - + (stmth (hash-table-ref/default stmt-cache stmt #f)) + (result (or stmth + (let* ((newstmth (sqlite3:prepare db stmt))) + ;; (db:hoh-set! stmt-cache db stmt newstmth) + (hash-table-set! stmt-cache stmt newstmth) + newstmth)))) + (mutex-unlock! *get-cache-stmth-mutex*) + result)) ) Index: dbmod.scm ================================================================== --- dbmod.scm +++ dbmod.scm @@ -1,5 +1,6 @@ + ;;====================================================================== ;; Copyright 2017, Matthew Welland. ;; ;; This file is part of Megatest. ;; @@ -80,11 +81,11 @@ ;; The inmem one-db file per server method goes in here ;;====================================================================== (define (dbmod:with-db dbstruct run-id r/w proc params) (let* ((dbdat (dbmod:open-db dbstruct run-id (dbfile:db-init-proc))) - (dbh (dbr:dbdat-dbh dbdat)) + (dbh (dbr:dbdat-dbh dbdat)) ;; this will be the inmem handle (dbfile (dbr:dbdat-dbfile dbdat))) (apply proc dbdat dbh params))) (define (dbmod:open-inmem-db initproc) (let* ((db (sqlite3:open-database ":memory:")) @@ -144,44 +145,47 @@ (handler (sqlite3:make-busy-timeout 136000))) (sqlite3:set-busy-handler! db handler) (if write-access (init-proc db)) db))))) - (db (if (dbmod:need-on-disk-db-handle) - (open-the-db) - #f)) + (db ;; (if (dbmod:need-on-disk-db-handle) + (open-the-db)) +;; #f)) (tables (db:sync-all-tables-list keys))) - (dbr:dbstruct-inmem-set! dbstruct inmem) - (dbr:dbstruct-ondiskdb-set! dbstruct db) - (dbr:dbstruct-dbfile-set! dbstruct dbfullname) + (dbr:dbstruct-inmem-set! dbstruct inmem) + (dbr:dbstruct-ondiskdb-set! dbstruct db) + (dbr:dbstruct-dbfile-set! dbstruct dbfullname) + (dbr:dbstruct-dbfname-set! dbstruct dbfname) (dbr:dbstruct-sync-proc-set! dbstruct (lambda (last-update) - (if db - (sync-gasket tables last-update inmem db - dbfullname syncdir)))) + ;; (if db + (dbmod:sync-gasket tables last-update inmem db + dbfullname syncdir))) ;; ) ;; (dbmod:sync-tables tables #f db inmem) - (if db (sync-gasket tables #f inmem db dbfullname 'fromdest)) + ;; (if db + (dbmod:sync-gasket tables #f inmem db dbfullname 'fromdest) ;; ) ;; load into inmem (dbr:dbstruct-last-update-set! dbstruct (current-seconds)) ;; should this be offset back in time by one second? dbstruct)) ;; (if (eq? syncdir 'todisk) ;; sync to disk normally, sync from in dashboard ;; (dbmod:sync-tables tables last-update inmem db) ;; (dbmod:sync-tables tables last-update db inmem)))) ;; direction: 'fromdest 'todest ;; -(define (sync-gasket tables last-update inmem dbh dbfname direction) +(define (dbmod:sync-gasket tables last-update inmem dbh dbfname direction) (case (dbfile:sync-method) ((none) #f) ((attach) (dbmod:attach-sync tables inmem dbfname direction)) ((newsync) (dbmod:new-sync tables inmem dbh dbfname direction)) (else (case direction - ((todest) - (dbmod:sync-tables tables last-update inmem dbh)) + ((todisk) + (dbmod:sync-tables tables last-update inmem dbh) + ) (else (dbmod:sync-tables tables last-update dbh inmem)))))) (define (dbmod:close-db dbstruct) @@ -292,10 +296,11 @@ (hash-table-set! todat a (apply vector a b))) todb full-sel) ;; first pass implementation, just insert all changed rows + (let* ((db todb) (drp-trigger (if (member "last_update" field-names) (db:drop-trigger db tablename) #f)) (has-last-update (member "last_update" field-names)) @@ -304,10 +309,11 @@ #f)) (stmth (sqlite3:prepare db full-ins)) (changed-rows 0)) (for-each (lambda (fromdat-lst) + (mutex-lock! *db-transaction-mutex*) (sqlite3:with-transaction db (lambda () (for-each ;; (lambda (fromrow) @@ -323,17 +329,19 @@ (loop (+ i 1)))) (if (not same) (begin (apply sqlite3:execute stmth (vector->list fromrow)) (hash-table-set! numrecs tablename (+ 1 (hash-table-ref/default numrecs tablename 0))) - (set! changed-rows (+ changed-rows 1)))))) - fromdat-lst)))) + (set! changed-rows (+ changed-rows 1)))))) + fromdat-lst))) + (mutex-unlock! *db-transaction-mutex*)) fromdats) (sqlite3:finalize! stmth) (if (member "last_update" field-names) - (db:create-trigger db tablename))))) + (db:create-trigger db tablename))) + )) tbls) (let* ((runtime (- (current-milliseconds) start-time)) (should-print (or ;; (debug:debug-mode 12) (common:low-noise-print 120 "db sync") (> runtime 500)))) ;; low and high sync times treated as separate. @@ -399,26 +407,30 @@ (if (member "last_update" fields) (conc " AND "fromdb table".last_update > "todb table".last_update);") ");"))) (start-ms (current-milliseconds))) ;; (debug:print 0 *default-log-port* "stmt8="stmt8) - (if (sqlite3:auto-committing? dbh) - (begin - (sqlite3:with-transaction - dbh - (lambda () - (sqlite3:execute dbh stmt1) ;; get all new rows - - #;(if (member "last_update" fields) - (sqlite3:execute dbh stmt8)) ;; get all updated rows - ;; (sqlite3:execute dbh stmt5) - ;; (sqlite3:execute dbh stmt4) ;; if it worked this would be better for incremental up - ;; (sqlite3:execute dbh stmt6) - )) - (debug:print 0 *default-log-port* "Synced table "table - " in "(- (current-milliseconds) start-ms)"ms")) - (debug:print 0 *default-log-port* "Skipping sync of table "table" due to transaction in flight.")))) + ;; (if (sqlite3:auto-committing? dbh) + ;; (begin + (mutex-lock! *db-transaction-mutex*) + (sqlite3:with-transaction + dbh + (lambda () + (debug:print-info 0 *default-log-port* "Sync from "fromdb table" to "todb table" using "stmt1) + (sqlite3:execute dbh stmt1) ;; get all new rows + + #;(if (member "last_update" fields) + (sqlite3:execute dbh stmt8)) ;; get all updated rows + ;; (sqlite3:execute dbh stmt5) + ;; (sqlite3:execute dbh stmt4) ;; if it worked this would be better for incremental up + ;; (sqlite3:execute dbh stmt6) + )) + (debug:print 0 *default-log-port* "Synced table "table + " in "(- (current-milliseconds) start-ms)"ms") ;; ) + (mutex-unlock! *db-transaction-mutex*))) + + ;; (debug:print 0 *default-log-port* "Skipping sync of table "table" due to transaction in flight.")))) table-names) (sqlite3:execute dbh "DETACH auxdb;")))) ;; FAILED ATTEMPTS Index: tcp-transportmod.scm ================================================================== --- tcp-transportmod.scm +++ tcp-transportmod.scm @@ -380,13 +380,10 @@ (if (not (tt-port ttdat)) ;; no connection yet (begin (thread-sleep! 0.25) (loop (+ count 1)))))) - ;; load or reload the data into inmem db before - ;; ((dbr:dbstruct-sync-proc dbstruct) (dbr:dbstruct-last-update dbstruct)) - ;; (dbr:dbstruct-last-update-set! dbstruct (- (current-seconds) 1)) (tt:create-server-registration-file ttdat dbfname) ;; now start watching the last-access, if it hasn't been touched ;; in over ten seconds we exit (thread-sleep! 0.05) ;; any real need for delay here? (let loop ()