55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
|
#t)
#f))))
;; cmd is a symbol
;; vars is a json string encoding the parameters for the call
;;
(define (rmt:send-receive cmd rid params)
(let* ((run-id (if rid rid 0))
(connection-info (let ((cinfo (hash-table-ref/default *runremote* run-id #f)))
(if cinfo
cinfo
;; NB// can cache the answer for server running for 10 seconds ...
;; ;; (and (not (rmt:write-frequency-over-limit? cmd run-id))
(if (tasks:server-running-or-starting? (tasks:get-db) run-id)
(let ((res (client:setup run-id)))
(if res
(hash-table-ref/default *runremote* run-id #f) ;; client:setup filled this in (hopefully)
#f))
#f))))
(jparams (db:obj->string params)))
(if connection-info
(let ((res (http-transport:client-api-send-receive run-id connection-info cmd jparams)))
(if res
(db:string->obj res)
(let ((new-connection-info (client:setup run-id)))
(debug:print 0 "WARNING: Communication failed, trying call to http-transport:client-api-send-receive again.")
(rmt:send-receive cmd run-id params))))
(let ((max-avg-qry (string->number (or (configf:lookup *configdat* "server" "server-query-threshold") "800"))))
(debug:print-info 4 "no server and read-only query, bypassing normal channel")
;; (if (rmt:write-frequency-over-limit? cmd run-id)(server:kind-run run-id))
(let ((curr-max (rmt:get-max-query-average)))
(if (> (cdr curr-max) max-avg-qry)
(begin
(debug:print-info 3 "Max average query, " (inexact->exact (round (cdr curr-max))) "ms (" (car curr-max) ") exceeds " max-avg-qry ", try starting server ...")
(server:kind-run run-id))))
(rmt:open-qry-close-locally cmd run-id params)))))
(define (rmt:update-db-stats rawcmd params duration)
(mutex-lock! *db-stats-mutex*)
(let* ((cmd (if (eq? rawcmd 'general-call) (car params) rawcmd))
(stat-vec (hash-table-ref/default *db-stats* cmd #f)))
(if (not stat-vec)
(let ((newvec (vector 0 0)))
(hash-table-set! *db-stats* cmd newvec)
(set! stat-vec newvec)))
(vector-set! stat-vec 0 (+ (vector-ref stat-vec 0) 1))
(vector-set! stat-vec 1 (+ (vector-ref stat-vec 1) duration)))
(mutex-unlock! *db-stats-mutex*))
(define (rmt:print-db-stats)
(let ((fmtstr "~40a~7-d~9-d~20,2-f")) ;; "~20,2-f"
(debug:print 18 "DB Stats\n========")
(debug:print 18 (format #f "~40a~8a~10a~10a" "Cmd" "Count" "TotTime" "Avg"))
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
>
>
>
>
>
>
>
|
|
|
|
|
|
|
|
|
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
#t)
#f))))
;; cmd is a symbol
;; vars is a json string encoding the parameters for the call
;;
(define (rmt:send-receive cmd rid params)
;; clean out old connections
(mutex-lock! *db-multi-sync-mutex*)
(let ((expire-time (- (current-seconds) 60)))
(for-each
(lambda (run-id)
(let ((connection (hash-table-ref/default *runremote* run-id #f)))
(if ;; (and connection
(< (http-transport:server-dat-get-last-access connection) expire-time) ; )
(begin
(debug:print-info 0 "Discarding connection to server for run-id " run-id ", too long between accesses")
(hash-table-delete! *runremote* run-id)))))
(hash-table-keys *runremote*)))
(mutex-unlock! *db-multi-sync-mutex*)
(let* ((run-id (if rid rid 0))
(connection-info (let ((cinfo (hash-table-ref/default *runremote* run-id #f)))
(if cinfo
cinfo
;; NB// can cache the answer for server running for 10 seconds ...
;; ;; (and (not (rmt:write-frequency-over-limit? cmd run-id))
(if (tasks:server-running-or-starting? (tasks:get-db) run-id)
(let ((res (client:setup run-id)))
(if res
(hash-table-ref/default *runremote* run-id #f) ;; client:setup filled this in (hopefully)
#f))
#f))))
(jparams (db:obj->string params)))
(if connection-info
(let ((res (http-transport:client-api-send-receive run-id connection-info cmd jparams)))
(http-transport:server-dat-update-last-access connection-info)
(if res
(db:string->obj res)
(let ((new-connection-info (client:setup run-id)))
(debug:print 0 "WARNING: Communication failed, trying call to http-transport:client-api-send-receive again.")
(rmt:send-receive cmd run-id params))))
(let ((max-avg-qry (string->number (or (configf:lookup *configdat* "server" "server-query-threshold") "-1"))))
(debug:print-info 4 "no server and read-only query, bypassing normal channel")
;; (if (rmt:write-frequency-over-limit? cmd run-id)(server:kind-run run-id))
(let ((curr-max (rmt:get-max-query-average)))
(if (> (cdr curr-max) max-avg-qry)
(begin
(debug:print-info 3 "Max average query, " (inexact->exact (round (cdr curr-max))) "ms (" (car curr-max) ") exceeds " max-avg-qry ", try starting server ...")
(server:kind-run run-id))))
(rmt:open-qry-close-locally cmd run-id params)))))
(define (rmt:update-db-stats rawcmd params duration)
(mutex-lock! *db-stats-mutex*)
(handle-exceptions
exn
(begin
(debug:print 0 "WARNING: stats collection failed in update-db-stats")
(debug:print 0 " message: " ((condition-property-accessor 'exn 'message) exn))
(print "exn=" (condition->list exn))
#f) ;; if this fails we don't care, it is just stats
(let* ((cmd (if (eq? rawcmd 'general-call) (car params) rawcmd))
(stat-vec (hash-table-ref/default *db-stats* cmd #f)))
(if (not stat-vec)
(let ((newvec (vector 0 0)))
(hash-table-set! *db-stats* cmd newvec)
(set! stat-vec newvec)))
(vector-set! stat-vec 0 (+ (vector-ref stat-vec 0) 1))
(vector-set! stat-vec 1 (+ (vector-ref stat-vec 1) duration))))
(mutex-unlock! *db-stats-mutex*))
(define (rmt:print-db-stats)
(let ((fmtstr "~40a~7-d~9-d~20,2-f")) ;; "~20,2-f"
(debug:print 18 "DB Stats\n========")
(debug:print 18 (format #f "~40a~8a~10a~10a" "Cmd" "Count" "TotTime" "Avg"))
|
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
|
(cons newmax-cmd currmax)
(cons 'none 0))
(loop (car tal)(cdr tal) newmax-cmd currmax)))))))
(mutex-unlock! *db-stats-mutex*)
res))
(define (rmt:open-qry-close-locally cmd run-id params)
(let* ((dbdir (conc (configf:lookup *configdat* "setup" "linktree") "/.db"))
(dbstruct-local (if *dbstruct-db*
*dbstruct-db*
(let ((db (make-dbr:dbstruct path: dbdir local: #t)))
(set! *dbstruct-db* db)
db)))
(db-file-path (db:dbfile-path 0)))
;; (read-only (not (file-read-access? db-file-path)))
(let* ((start (current-milliseconds))
(res (api:execute-requests dbstruct-local (symbol->string cmd) params))
(duration (- (current-milliseconds) start)))
(rmt:update-db-stats cmd params duration)
;; mark this run as dirty if this was a write
(if (not (member cmd api:read-only-queries))
(let ((start-time (current-seconds)))
(mutex-lock! *db-multi-sync-mutex*)
(let ((last-sync (hash-table-ref/default *db-local-sync* run-id 0)))
(if (> (- start-time last-sync) 5) ;; every five seconds
(begin
(db:multi-db-sync (list run-id) 'new2old)
(debug:print-info 0 "Sync of newdb to olddb for run-id " run-id " completed in " (- (current-seconds) start-time) " seconds")
(hash-table-set! *db-local-sync* run-id start-time))))
(mutex-unlock! *db-multi-sync-mutex*)))
res)))
(define (rmt:send-receive-no-auto-client-setup connection-info cmd run-id params)
(let* ((run-id (if run-id run-id 0))
(jparams (db:obj->string params)) ;; (rmt:dat->json-str params))
(res (http-transport:client-api-send-receive run-id connection-info cmd jparams)))
|
<
|
>
|
|
<
<
<
<
|
|
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
|
(cons newmax-cmd currmax)
(cons 'none 0))
(loop (car tal)(cdr tal) newmax-cmd currmax)))))))
(mutex-unlock! *db-stats-mutex*)
res))
(define (rmt:open-qry-close-locally cmd run-id params)
(let* ((dbstruct-local (if *dbstruct-db*
*dbstruct-db*
(let* ((dbdir (conc (configf:lookup *configdat* "setup" "linktree") "/.db"))
(db (make-dbr:dbstruct path: dbdir local: #t)))
(set! *dbstruct-db* db)
db)))
(db-file-path (db:dbfile-path 0)))
;; (read-only (not (file-read-access? db-file-path)))
(let* ((start (current-milliseconds))
(res (api:execute-requests dbstruct-local (symbol->string cmd) params))
(duration (- (current-milliseconds) start)))
(rmt:update-db-stats cmd params duration)
;; mark this run as dirty if this was a write
(if (not (member cmd api:read-only-queries))
(let ((start-time (current-seconds)))
(mutex-lock! *db-multi-sync-mutex*)
(if (not (hash-table-ref/default *db-local-sync* run-id #f))
(hash-table-set! *db-local-sync* run-id start-time)) ;; the oldest "write"
(mutex-unlock! *db-multi-sync-mutex*)))
res)))
(define (rmt:send-receive-no-auto-client-setup connection-info cmd run-id params)
(let* ((run-id (if run-id run-id 0))
(jparams (db:obj->string params)) ;; (rmt:dat->json-str params))
(res (http-transport:client-api-send-receive run-id connection-info cmd jparams)))
|
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
|
(define (rmt:get-test-id run-id testname item-path)
(rmt:send-receive 'get-test-id run-id (list run-id testname item-path)))
(define (rmt:get-test-info-by-id run-id test-id)
(if (and (number? run-id)(number? test-id))
(rmt:send-receive 'get-test-info-by-id run-id (list run-id test-id))
(begin
(debug:print 0 "ERROR: Bad data handed to rmt:get-test-info-by-id run-id=" run-id ", test-id=" test-id)
(print-call-chain)
#f)))
(define (rmt:test-get-rundir-from-test-id run-id test-id)
(rmt:send-receive 'test-get-rundir-from-test-id run-id (list run-id test-id)))
(define (rmt:open-test-db-by-test-id run-id test-id #!key (work-area #f))
|
|
|
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
|
(define (rmt:get-test-id run-id testname item-path)
(rmt:send-receive 'get-test-id run-id (list run-id testname item-path)))
(define (rmt:get-test-info-by-id run-id test-id)
(if (and (number? run-id)(number? test-id))
(rmt:send-receive 'get-test-info-by-id run-id (list run-id test-id))
(begin
(debug:print 0 "WARNING: Bad data handed to rmt:get-test-info-by-id run-id=" run-id ", test-id=" test-id)
(print-call-chain)
#f)))
(define (rmt:test-get-rundir-from-test-id run-id test-id)
(rmt:send-receive 'test-get-rundir-from-test-id run-id (list run-id test-id)))
(define (rmt:open-test-db-by-test-id run-id test-id #!key (work-area #f))
|
314
315
316
317
318
319
320
321
322
323
324
325
326
327
|
(define (rmt:get-testinfo-state-status run-id test-id)
(rmt:send-receive 'get-testinfo-state-status run-id (list run-id test-id)))
(define (rmt:test-set-log! run-id test-id logf)
(if (string? logf)(rmt:general-call 'test-set-log run-id logf test-id)))
(define (rmt:get-run-ids-matching-target keynames target res runname testpatt statepatt statuspatt)
(rmt:send-receive 'get-run-ids-matching-target #f (list keynames target res runname testpatt statepatt statuspatt)))
;; NOTE: This will open and access ALL run databases.
;;
(define (rmt:test-get-paths-matching-keynames-target-new keynames target res testpatt statepatt statuspatt runname)
(let ((run-ids (rmt:get-run-ids-matching-target keynames target res runname testpatt statepatt statuspatt)))
|
>
>
>
>
>
>
|
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
|
(define (rmt:get-testinfo-state-status run-id test-id)
(rmt:send-receive 'get-testinfo-state-status run-id (list run-id test-id)))
(define (rmt:test-set-log! run-id test-id logf)
(if (string? logf)(rmt:general-call 'test-set-log run-id logf test-id)))
(define (rmt:test-set-top-process-pid run-id test-id pid)
(rmt:send-receive 'test-set-top-process-pid run-id (list run-id test-id pid)))
(define (rmt:test-get-top-process-pid run-id test-id)
(rmt:send-receive 'test-get-top-process-pid run-id (list run-id test-id)))
(define (rmt:get-run-ids-matching-target keynames target res runname testpatt statepatt statuspatt)
(rmt:send-receive 'get-run-ids-matching-target #f (list keynames target res runname testpatt statepatt statuspatt)))
;; NOTE: This will open and access ALL run databases.
;;
(define (rmt:test-get-paths-matching-keynames-target-new keynames target res testpatt statepatt statuspatt runname)
(let ((run-ids (rmt:get-run-ids-matching-target keynames target res runname testpatt statepatt statuspatt)))
|