This is equivalent to a diff from
19861e6399
to 97a3c4ad11
Modified api.scm
from [0676a2f9d1]
to [e4088e0a3d].
︙ | | |
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
-
+
|
;; GNU General Public License for more details.
;;
;; You should have received a copy of the GNU General Public License
;; along with Megatest. If not, see <http://www.gnu.org/licenses/>.
;;
;;======================================================================
(use srfi-69 posix)
(use srfi-69 posix srfi-18)
(declare (unit api))
(declare (uses rmt))
(declare (uses db))
(declare (uses dbmod))
(declare (uses dbfile))
(declare (uses tasks))
|
︙ | | |
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
|
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
|
+
+
+
+
+
+
+
+
+
+
+
-
+
-
-
-
-
-
-
-
-
-
-
|
;; TASKS
tasks-add
tasks-set-state-given-param-key
))
(define *db-write-mutexes* (make-hash-table))
(define *api-watchdog* #f)
(define (api:watchdog dbstruct) ;; trim not-used sqlite3 db handles
(let* ((th1 (make-thread (lambda ()
(let loop ()
(thread-sleep! 60) ;; 2x the age we close at
(db:close-old dbstruct)
(loop)))
"api:watchdog thread")))
(thread-start! th1)
(set! *api-watchdog* th1)))
;; These are called by the server on recipt of /api calls
;; - keep it simple, only return the actual result of the call, i.e. no meta info here
;;
;; - returns #( flag result )
;;
(define (api:execute-requests dbstruct dat)
(db:open-no-sync-db) ;; sets *no-sync-db*
;; (handle-exceptions
(if (not *api-watchdog*)(api:watchdog dbstruct))
;; exn
;; (let ((call-chain (get-call-chain)))
;; (debug:print 0 *default-log-port* "WARNING: api:execute-requests received an exception from peer, dat=" dat ", exn=" exn)
;; (print-call-chain (current-error-port))
;; (debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn))
;; (vector #f (vector exn call-chain dat))) ;; return some stuff for debug if an exception happens
(if (> *api-process-request-count* 200)
(begin
(if (common:low-noise-print 30 "too many threads")
(debug:print 0 *default-log-port* "WARNING: "*api-process-request-count*" threads, potential overload, adding 0.5 sec delay."))
(thread-sleep! 0.5) ;; take a nap
))
(cond
((not (vector? dat)) ;; it is an error to not receive a vector
(vector #f (vector #f "remote must be called with a vector")))
#;((> *api-process-request-count* 200) ;; 20)
(debug:print 0 *default-log-port* "WARNING: api:execute-requests received an overloaded message.")
(set! *server-overloaded* #t)
(vector #f (vector #f 'overloaded))) ;; the inner vector is what gets returned. nope, don't know why. please refactor!
(else
(let* ((cmd-in (vector-ref dat 0))
(cmd (if (symbol? cmd-in)
cmd-in
(string->symbol cmd-in)))
(params (vector-ref dat 1))
(run-id (if (null? params)
|
︙ | | |
Modified db.scm
from [66cca5d3c4]
to [9fa55aaa76].
︙ | | |
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
|
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
|
-
-
+
-
+
-
-
|
;; (> (- (current-seconds) time1) 3)) ;; if file is changed and three seconds have passed.
#t)
((and changed *time-to-exit*) ;; last sync
#t)
(else
#f))))
(if (or dejunk do-cp)
(let* (
(start-time (current-milliseconds))
(let* ((start-time (current-milliseconds))
(subdb (or (dbfile:get-subdb dbstruct run-id) (dbfile:init-subdb dbstruct run-id dbfile:db-init-proc)))
(mtdb (dbr:subdb-mtdbdat subdb))
(tmpdb (dbfile:open-db dbstruct run-id dbfile:db-init-proc))
(tmpdb (dbfile:open-db dbstruct run-id dbfile:db-init-proc)))
)
(debug:print-info 2 *default-log-port* "delta syncing file: " srcfile ", time diff: " (- time1 time2) " seconds")
(if old2new
(begin
(if dejunk (db:clean-up run-id mtdb))
(db:sync-tables (db:sync-all-tables-list dbstruct (db:get-keys dbstruct)) #f mtdb tmpdb)
)
|
︙ | | |
4347
4348
4349
4350
4351
4352
4353
4354
4355
4356
4357
4358
4359
4360
4361
4362
4363
4364
4365
4366
4367
4368
4369
4370
4371
4372
4373
4374
4375
4376
4377
4378
4379
4380
4381
4382
4383
4384
4385
4386
4387
4388
|
4344
4345
4346
4347
4348
4349
4350
4351
4352
4353
4354
4355
4356
4357
4358
4359
4360
4361
4362
4363
4364
4365
4366
4367
4368
4369
4370
4371
4372
4373
4374
4375
4376
4377
4378
4379
4380
4381
4382
4383
4384
4385
|
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
|
;; moving watch dogs here due to dependencies
;;======================================================================
;;======================================================================
;; currently the primary job of the watchdog is to run the sync back to megatest.db from the db in /tmp
;; if we are on the homehost and we are a server (by definition we are on the homehost if we are a server)
;;
(define (common:readonly-watchdog dbstruct)
(thread-sleep! 0.05) ;; delay for startup
(debug:print-info 13 *default-log-port* "common:readonly-watchdog entered.")
;; sync megatest.db to /tmp/.../megatst.db
(let* ((sync-cool-off-duration 3)
(golden-mtdb (dbr:dbstruct-mtdb dbstruct))
(golden-mtpath (db:dbdat-get-path golden-mtdb))
(tmp-mtdb (dbr:dbstruct-tmpdb dbstruct))
(tmp-mtpath (db:dbdat-get-path tmp-mtdb)))
(debug:print-info 0 *default-log-port* "Read-only periodic sync thread started.")
(let loop ((last-sync-time 0))
(debug:print-info 13 *default-log-port* "loop top tmp-mtpath="tmp-mtpath" golden-mtpath="golden-mtpath)
(let* ((duration-since-last-sync (- (current-seconds) last-sync-time)))
(debug:print-info 13 *default-log-port* "duration-since-last-sync="duration-since-last-sync)
(if (and (not *time-to-exit*)
(< duration-since-last-sync sync-cool-off-duration))
(thread-sleep! (- sync-cool-off-duration duration-since-last-sync)))
(if (not *time-to-exit*)
(let ((golden-mtdb-mtime (file-modification-time golden-mtpath))
(tmp-mtdb-mtime (file-modification-time tmp-mtpath)))
(if (> golden-mtdb-mtime tmp-mtdb-mtime)
(if (< golden-mtdb-mtime (- (current-seconds) 3)) ;; file has NOT been touched in past three seconds, this way multiple servers won't fight to sync back
(let ((res (db:multi-db-sync dbstruct 'old2new)))
(debug:print-info 13 *default-log-port* "rosync called, " res " records transferred."))))
(loop (current-seconds)))
#t)))
(debug:print-info 0 *default-log-port* "Exiting readonly-watchdog timer, *time-to-exit* = " *time-to-exit*" pid="(current-process-id)" mtpath="golden-mtpath)))
;; (define (common:readonly-watchdog dbstruct)
;; (thread-sleep! 0.05) ;; delay for startup
;; (debug:print-info 13 *default-log-port* "common:readonly-watchdog entered.")
;; ;; sync megatest.db to /tmp/.../megatst.db
;; (let* ((sync-cool-off-duration 3)
;; (golden-mtdb (dbr:dbstruct-mtdb dbstruct))
;; (golden-mtpath (db:dbdat-get-path golden-mtdb))
;; (tmp-mtdb (dbr:dbstruct-tmpdb dbstruct))
;; (tmp-mtpath (db:dbdat-get-path tmp-mtdb)))
;; (debug:print-info 0 *default-log-port* "Read-only periodic sync thread started.")
;; (let loop ((last-sync-time 0))
;; (debug:print-info 13 *default-log-port* "loop top tmp-mtpath="tmp-mtpath" golden-mtpath="golden-mtpath)
;; (let* ((duration-since-last-sync (- (current-seconds) last-sync-time)))
;; (debug:print-info 13 *default-log-port* "duration-since-last-sync="duration-since-last-sync)
;; (if (and (not *time-to-exit*)
;; (< duration-since-last-sync sync-cool-off-duration))
;; (thread-sleep! (- sync-cool-off-duration duration-since-last-sync)))
;; (if (not *time-to-exit*)
;; (let ((golden-mtdb-mtime (file-modification-time golden-mtpath))
;; (tmp-mtdb-mtime (file-modification-time tmp-mtpath)))
;; (if (> golden-mtdb-mtime tmp-mtdb-mtime)
;; (if (< golden-mtdb-mtime (- (current-seconds) 3)) ;; file has NOT been touched in past three seconds, this way multiple servers won't fight to sync back
;; (let ((res (db:multi-db-sync dbstruct 'old2new)))
;; (debug:print-info 13 *default-log-port* "rosync called, " res " records transferred."))))
;; (loop (current-seconds)))
;; #t)))
;; (debug:print-info 0 *default-log-port* "Exiting readonly-watchdog timer, *time-to-exit* = " *time-to-exit*" pid="(current-process-id)" mtpath="golden-mtpath)))
;;
;; Get a lock from the no-sync-db for the from-db, then copy the from-db to the to-db, otherwise return #f
(define (db:lock-and-sync no-sync-db from-db to-db)
(assert (not *db-sync-in-progress*) "FATAL: db:lock-and-sync called while a sync is in progress.")
(let* ((lockdat (db:no-sync-get-lock no-sync-db from-db))
(gotlock (car lockdat))
|
︙ | | |
Modified dbfile.scm
from [25f8271ef2]
to [8d67468750].
︙ | | |
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
|
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
+
-
+
+
+
|
(dbname #f) ;; .megatest/1.db
(mtdbfile #f) ;; mtrah/.megatest/1.db
(mtdbdat #f) ;; only need one of these for syncing
;; (dbdats (make-hash-table)) ;; id => dbdat
(tmpdbfile #f) ;; /tmp/.../.megatest/1.db
;; (refndbfile #f) ;; /tmp/.../.megatest/1.db_ref
(dbstack (make-stack)) ;; stack for tmp dbr:dbdat,
(stack-mutex (make-mutex)) ;; gate pop, push, peek and replace with this mutex (allows safe clean up of old handles)
(homehost #f) ;; not used yet
(on-homehost #f) ;; not used yet
(read-only #f)
(last-sync 0)
(last-write (current-seconds))
) ;; goal is to converge on one struct for an area but for now it is too confusing
;; need to keep dbhandles and cached statements together
(defstruct dbr:dbdat
(dbfile #f)
(dbh #f)
(stmt-cache (make-hash-table))
(read-only #f)
(birth-sec (current-seconds)))
(birth-sec (current-seconds))
(last-used (current-seconds))
(in-use #f))
(define *dbstruct-dbs* #f)
(define *db-open-mutex* (make-mutex))
(define *db-access-mutex* (make-mutex)) ;; used in common.scm
(define *no-sync-db* #f)
(define *db-sync-in-progress* #f)
(define *db-with-db-mutex* (make-mutex))
|
︙ | | |
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
|
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
|
-
+
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
-
-
-
-
-
-
-
-
+
+
+
+
+
+
-
-
-
+
+
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
|
(dbfile:print-err "db:safely-close-sqlite3-db: " db " is not an sqlite3 db")
#f
)
))))
;; close all opened run-id dbs
(define (db:close-all dbstruct)
(if (dbr:dbstruct? dbstruct)
(assert (dbr:dbstruct? dbstruct) "FATAL: db:close-all called with non-dbstruct "dbstruct)
;; (handle-exceptions
;; exn
;; (begin
;; (debug:print 0 *default-log-port* "WARNING: Finalizing failed, " ((condition-property-accessor 'exn 'message) exn) ", note - exn=" exn)
;; (print-call-chain *default-log-port*))
;; (db:sync-touched dbstruct 0 force-sync: #t) ;; NO. Do not do this here. Instead we rely on a server to be started when there are writes, even if the server itself is not going to be used as a server.
(let* ((subdbs (hash-table-values (dbr:dbstruct-subdbs dbstruct))))
(for-each
(lambda (subdb)
(let* ((tdbs (stack->list (dbr:subdb-dbstack subdb)))
(mtdbdat (dbr:dbdat-dbh (dbr:subdb-mtdbdat subdb)))
(let* ((subdbs (hash-table-values (dbr:dbstruct-subdbs dbstruct))))
(for-each
(lambda (subdb)
(mutex-lock! (dbr:subdb-stack-mutex subdb))
(let* ((tdbs (stack->list (dbr:subdb-dbstack subdb)))
(mtdbdat (dbr:dbdat-dbh (dbr:subdb-mtdbdat subdb))))
#;(rdb (dbr:dbdat-dbh (dbr:subdb-refndb subdb))))
(map (lambda (dbdat)
(let* ((stmt-cache (dbr:dbdat-stmt-cache dbdat))
(dbh (dbr:dbdat-dbh dbdat)))
(db:safely-close-sqlite3-db dbh stmt-cache)))
tdbs)
(db:safely-close-sqlite3-db mtdbdat (dbr:dbdat-stmt-cache (dbr:subdb-mtdbdat subdb)))
(map (lambda (dbdat)
(let* ((stmt-cache (dbr:dbdat-stmt-cache dbdat))
(dbh (dbr:dbdat-dbh dbdat)))
(db:safely-close-sqlite3-db dbh stmt-cache)))
tdbs)
(db:safely-close-sqlite3-db mtdbdat (dbr:dbdat-stmt-cache (dbr:subdb-mtdbdat subdb))))
;; (if (sqlite3:database? mdb) (sqlite3:finalize! mdb))
#;(db:safely-close-sqlite3-db rdb #f))) ;; stmt-cache))))) ;; (if (sqlite3:database? rdb) (sqlite3:finalize! rdb))))))
subdbs)
(mutex-unlock! (dbr:subdb-stack-mutex subdb)))
subdbs)))
#t
)
#f
)
;; close opened run-id dbs that haven't been used in age seconds
(define (db:close-old dbstruct #!key (age 30)) ;; close dbs older than this age
(assert (dbr:dbstruct? dbstruct) "FATAL: db:close-all called with non-dbstruct "dbstruct)
(let* ((subdbs (hash-table-values (dbr:dbstruct-subdbs dbstruct))))
(for-each
(lambda (subdb)
(mutex-lock! (dbr:subdb-stack-mutex subdb))
(let* ((tdbs (stack->list (dbr:subdb-dbstack subdb)))
(mtdbdat (dbr:dbdat-dbh (dbr:subdb-mtdbdat subdb))))
(dbr:subdb-dbstack-set! subdb (make-stack)) ;; replace the stack with a new one
(map (lambda (dbdat)
(assert (dbr:dbdat-in-use dbdat) "FATAL: dbdat in stack was in use "(dbr:dbdat-dbfile dbdat))
(if (< (- (current-seconds)
(dbr:dbdat-last-used dbdat))
age)
(stack-push! (dbr:subdb-dbstack subdb) dbdat) ;; keep it
(let* ((stmt-cache (dbr:dbdat-stmt-cache dbdat)) ;; close and discard
(dbh (dbr:dbdat-dbh dbdat)))
(dbfile:print-err "INFO: closing unused dbdat for "(dbr:dbdat-dbfile dbdat))
(db:safely-close-sqlite3-db dbh stmt-cache))))
tdbs)
(let* ((size (stack-count (dbr:subdb-dbstack subdb)))
(delta (- (length tdbs) size)))
(if (> delta 0)
(dbfile:print-err "INFO: removed "delta" and "size" dbs left."))))
(mutex-unlock! (dbr:subdb-stack-mutex subdb)))
subdbs)))
)
;; ;; set up a single db (e.g. main.db, 1.db ... etc.)
;; ;;
;; (define (db:setup-db dbstruct areapath run-id)
;; (let* ((dbname (db:run-id->dbname run-id))
;; (dbstruct (hash-table-ref/default dbstructs dbname #f)))
;; (if dbstruct
|
︙ | | |
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
|
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
|
+
-
-
+
+
-
-
+
+
+
+
+
+
+
|
;; if run-id is a string treat it as a filename
;; if db already open - return inmem
;; if db not open, open inmem, rundb and sync then return inmem
;; inuse gets set automatically for rundb's
;;
(define (dbfile:get-dbdat dbstruct run-id)
(let* ((subdb (dbfile:get-subdb dbstruct run-id)))
(mutex-lock! (dbr:subdb-stack-mutex subdb))
(if (stack-empty? (dbr:subdb-dbstack subdb))
#f
(let* ((res (if (stack-empty? (dbr:subdb-dbstack subdb))
#f
(begin
(stack-pop! (dbr:subdb-dbstack subdb))))))
(let ((dbdat (stack-pop! (dbr:subdb-dbstack subdb))))
(dbr:dbdat-last-used-set! dbdat (current-seconds))
(dbr:dbdat-in-use-set! dbdat #t)
dbdat))))
(mutex-unlock! (dbr:subdb-stack-mutex subdb))
res)))
;; return a previously opened db handle to the stack of available handles
(define (dbfile:add-dbdat dbstruct run-id dbdat)
(let* ((subdb (dbfile:get-subdb dbstruct run-id)))
(dbr:dbdat-in-use-set! dbdat #f)
(stack-push! (dbr:subdb-dbstack subdb) dbdat)
dbdat))
;; set up a subdb
;;
(define (dbfile:init-subdb dbstruct run-id init-proc)
(let* ((dbname (dbfile:run-id->dbname run-id))
|
︙ | | |