153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
|
tasks-set-state-given-param-key
))
(define *db-write-mutexes* (make-hash-table))
(define *server-signature* #f)
(define *api-threads* '())
(define (api:register-thread th-in)
(set! *api-threads* (cons (cons th-in (current-seconds)) *api-threads*)))
(define (api:unregister-thread th-in)
(set! *api-threads* (filter (lambda (thdat)
(not (eq? th-in (car thdat))))
*api-threads*)))
(define (api:remove-dead-or-terminated)
(set! *api-threads* (filter (lambda (thdat)
(not (member (thread-state (car thdat)) '(terminated dead))))
*api-threads*)))
(define (api:get-count-threads-alive)
(length *api-threads*))
(define *api:last-stats-print* 0)
(define *api-print-db-stats-mutex* (make-mutex))
(define (api:print-db-stats)
(debug:print-info 0 *default-log-port* "Started periodic db stats printer")
(let loop ()
(mutex-lock! *api-print-db-stats-mutex*)
|
|
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
|
tasks-set-state-given-param-key
))
(define *db-write-mutexes* (make-hash-table))
(define *server-signature* #f)
(define *api-threads* '())
(define (api:register-thread th-in command)
(set! *api-threads* (cons (list th-in (current-seconds) command) *api-threads*)))
(define (api:get-thread-command th-in)
(let ((thread-data (assoc th-in *api-threads*)))
(if thread-data
(third thread-data) ; Assuming the command is the third element in the list
#f))) ; Return #f if the thread is not found
(define (api:unregister-thread th-in)
(set! *api-threads* (filter (lambda (thdat)
(not (eq? th-in (car thdat))))
*api-threads*)))
(define (api:remove-dead-or-terminated)
(set! *api-threads* (filter (lambda (thdat)
(not (member (thread-state (car thdat)) '(terminated dead))))
*api-threads*)))
(define (api:get-count-threads-alive)
(length *api-threads*))
(define (api:get-threads)
(map (lambda (thdat)
(let ((thread (first thdat))
(timestamp (second thdat))
(command (third thdat)))
(format "\nThread: ~a, age: ~a, Command: ~a" thread (- (current-seconds) timestamp) command)))
*api-threads*))
(define *api:last-stats-print* 0)
(define *api-print-db-stats-mutex* (make-mutex))
(define (api:print-db-stats)
(debug:print-info 0 *default-log-port* "Started periodic db stats printer")
(let loop ()
(mutex-lock! *api-print-db-stats-mutex*)
|
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
|
;; reads/writes to current in/out port
;;
(define (api:tcp-dispatch-request-make-handler dbstruct) ;; cmd run-id params)
(assert *toppath* "FATAL: api:tcp-dispatch-request-make-handler called but *toppath* not set.")
(if (not *server-signature*)
(set! *server-signature* (tt:mk-signature *toppath*)))
(lambda (indat)
(api:register-thread (current-thread))
(let* ((result
(let* ((numthreads (api:get-count-threads-alive))
(delay-wait (if (> numthreads 10)
(- numthreads 10)
0))
(normal-proc (lambda (cmd run-id params)
(case cmd
|
|
|
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
|
;; reads/writes to current in/out port
;;
(define (api:tcp-dispatch-request-make-handler dbstruct) ;; cmd run-id params)
(assert *toppath* "FATAL: api:tcp-dispatch-request-make-handler called but *toppath* not set.")
(if (not *server-signature*)
(set! *server-signature* (tt:mk-signature *toppath*)))
(lambda (indat)
(api:register-thread (current-thread) (car indat))
(let* ((result
(let* ((numthreads (api:get-count-threads-alive))
(delay-wait (if (> numthreads 10)
(- numthreads 10)
0))
(normal-proc (lambda (cmd run-id params)
(case cmd
|
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
|
((ping) #t) ;; we are fine
(else
(assert ok "FATAL: database file and run-id not aligned.")))))
(ttdat *server-info*)
(server-state (tt-state ttdat))
(maxthreads 20) ;; make this a parameter?
(status (cond
((and (> numthreads maxthreads)
(> (random 100) 70)) ;; allow a 30% probability to go through so we can figure out what is going wrong in main.db server.
'busy)
;; ((> numthreads 5) 'loaded) ;; this gets transmitted to the client which calls tt:backoff-incr to slow stuff down.
(else 'ok)))
(errmsg (case status
((busy) (conc "Server overloaded, "numthreads" threads in flight"))
((loaded) (conc "Server loaded, "numthreads" threads in flight"))
(else #f)))
(result (case status
((busy)
(if (eq? cmd 'ping)
(normal-proc cmd run-id params)
;; numthreads must be greater than 5 for busy
(* 0.1 (- numthreads maxthreads)) ;; was 15
)) ;; (- numthreads 29)) ;; call back in as many seconds
((loaded)
;; (if (eq? (rmt:transport-mode) 'tcp)
;; (thread-sleep! 0.5))
(normal-proc cmd run-id params))
(else
(normal-proc cmd run-id params))))
(meta (case cmd
((ping) `((sstate . ,server-state)))
(else `((wait . ,delay-wait)))))
(payload (list status errmsg result meta)))
;; (cmd run-id params meta)
(db:add-stats cmd run-id params (- (current-milliseconds) start-t))
payload))
(else
(assert #f "FATAL: failed to deserialize indat "indat))))))
;; (set! *api-process-request-count* (- *api-process-request-count* 1))
;; (serialize payload)
(api:unregister-thread (current-thread))
result)))
(define *api-halt-writes* #f)
(define (api:dispatch-request dbstruct cmd run-id params)
(if (not *no-sync-db*)
(db:open-no-sync-db))
(let* ((start-time (current-milliseconds)))
|
|
|
>
>
>
>
>
>
|
|
>
|
|
<
<
|
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
|
((ping) #t) ;; we are fine
(else
(assert ok "FATAL: database file and run-id not aligned.")))))
(ttdat *server-info*)
(server-state (tt-state ttdat))
(maxthreads 20) ;; make this a parameter?
(status (cond
((> numthreads maxthreads)
(let* ((testsuite (common:get-testsuite-name))
(mtexe (common:find-local-megatest))
(proc (lambda ()
;; we are overloaded, try to start another server
(debug:print 0 *default-log-port* "Too many threads running, starting another server")
(tt:server-process-run *toppath* testsuite mtexe run-id))))
(set! *server-start-requests* (cons proc *server-start-requests*)))
;; 'busy
'loaded ;; not ideal since the client will not backoff
)
(else 'ok)))
(errmsg (case status
((busy) (conc "Server overloaded, "numthreads" threads in flight, current cmd: " cmd "\n current threads: " (api:get-threads)))
((loaded) (conc "Server loaded, "numthreads" threads in flight"))
(else #f)))
(result (case status
((busy)
(if (eq? cmd 'ping)
(normal-proc cmd run-id params)
;; numthreads must be greater than 5 for busy
(* 0.1 (- numthreads maxthreads)) ;; was 15
)) ;; (- numthreads 29)) ;; call back in as many seconds
((loaded)
;; (if (eq? (rmt:transport-mode) 'tcp)
;; (thread-sleep! 0.5))
(normal-proc cmd run-id params))
(else
(normal-proc cmd run-id params))))
(meta (case cmd
((ping) `((sstate . ,server-state)(sload . ,numthreads)))
(else `((wait . ,delay-wait)))))
(payload (list status errmsg result meta)))
;; (cmd run-id params meta)
(db:add-stats cmd run-id params (- (current-milliseconds) start-t))
payload))
(else
(assert #f "FATAL: failed to deserialize indat "indat))))))
;; (set! *api-process-request-count* (- *api-process-request-count* 1))
;; (serialize payload)
(api:unregister-thread (current-thread))
result)))
(define *api-halt-writes* #f)
(define (api:dispatch-request dbstruct cmd run-id params)
(if (not *no-sync-db*)
(db:open-no-sync-db))
(let* ((start-time (current-milliseconds)))
|