62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
|
+
-
+
-
+
+
+
+
-
-
-
+
+
+
+
+
+
|
cinfo
;; NB// can cache the answer for server running for 10 seconds ...
;; ;; (and (not (rmt:write-frequency-over-limit? cmd run-id))
(if (tasks:server-running-or-starting? (db:delay-if-busy (tasks:open-db)) run-id)
(client:setup run-id)
#f))))
(define *send-receive-mutex* (make-mutex)) ;; should have separate mutex per run-id
(define (rmt:send-receive cmd rid params #!key (attemptnum 1)) ;; start attemptnum at 1 so the modulo below works as expected
;; clean out old connections
(mutex-lock! *db-multi-sync-mutex*)
;; (mutex-lock! *db-multi-sync-mutex*)
;; (let ((expire-time (- (current-seconds) 60)))
;; (for-each
;; (lambda (run-id)
;; (let ((connection (hash-table-ref/default *runremote* run-id #f)))
;; (if (and connection
;; (< (http-transport:server-dat-get-last-access connection) expire-time))
;; (begin
;; (debug:print-info 0 "Discarding connection to server for run-id " run-id ", too long between accesses")
;; ;; SHOULD CLOSE THE CONNECTION HERE
;; (hash-table-delete! *runremote* run-id)))))
;; (hash-table-keys *runremote*)))
(mutex-unlock! *db-multi-sync-mutex*)
;; (mutex-unlock! *db-multi-sync-mutex*)
;; (mutex-lock! *send-receive-mutex*)
(let* ((run-id (if rid rid 0))
(connection-info (rmt:get-connection-info run-id)))
;; the nmsg method does the encoding under the hood (the http method should be changed to do this also)
(if connection-info
;; use the server if have connection info
(let* ((dat (case *transport-type*
((http)(http-transport:client-api-send-receive run-id connection-info cmd params))
((nmsg)(condition-case
(nmsg-transport:client-api-send-receive run-id connection-info cmd params)
((timeout)(vector #f "timeout talking to server"))))
(else (exit))))
(success (if (and dat (vector? dat)) (vector-ref dat 0) #f))
(res (if (and dat (vector? dat)) (vector-ref dat 1) #f)))
(http-transport:server-dat-update-last-access connection-info)
(if success
(begin
;; (mutex-unlock! *send-receive-mutex*)
(case *transport-type*
((http) res) ;; (db:string->obj res))
((nmsg) res)) ;; (vector-ref res 1)))
(case *transport-type*
((http) res) ;; (db:string->obj res))
((nmsg) res))) ;; (vector-ref res 1)))
(begin ;; let ((new-connection-info (client:setup run-id)))
(debug:print 0 "WARNING: Communication failed, trying call to rmt:send-receive again.")
;; (case *transport-type*
;; ((nmsg)(nn-close (http-transport:server-dat-get-socket connection-info))))
(hash-table-delete! *runremote* run-id) ;; don't keep using the same connection
(if (eq? (modulo attemptnum 5) 0)
(tasks:kill-server-run-id run-id tag: "api-send-receive-failed"))
;; (mutex-unlock! *send-receive-mutex*) ;; close the mutex here to allow other threads access to communications
(tasks:start-and-wait-for-server (tasks:open-db) run-id 15)
;; (nmsg-transport:client-api-send-receive run-id connection-info cmd param remtries: (- remtries 1))))))
;; no longer killing the server in http-transport:client-api-send-receive
;; may kill it here but what are the criteria?
;; start with three calls then kill server
;; (if (eq? attemptnum 3)(tasks:kill-server-run-id run-id))
;; (thread-sleep! 2)
(rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1)))))
;; no connection info? try to start a server
(if (and (< attemptnum 15)
(tasks:need-server run-id))
(begin
(hash-table-delete! *runremote* run-id)
;; (mutex-unlock! *send-receive-mutex*)
(tasks:start-and-wait-for-server (db:delay-if-busy (tasks:open-db)) run-id 10)
(client:setup run-id)
(thread-sleep! (random 5)) ;; give some time to settle and minimize collison?
(rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1)))
(begin
(debug:print 0 "ERROR: Communication failed!")
;; (mutex-unlock! *send-receive-mutex*)
(exit)
;; (rmt:open-qry-close-locally cmd run-id params))))
)))))
(define (rmt:update-db-stats run-id rawcmd params duration)
(mutex-lock! *db-stats-mutex*)
(handle-exceptions
|
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
|
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
|
+
-
+
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
|
(debug:print "ERROR: rmt:get-tests-for-run called with bad run-id=" run-id)
(print-call-chain (current-error-port))
'())))
;; IDEA: Threadify these - they spend a lot of time waiting ...
;;
(define (rmt:get-tests-for-runs-mindata run-ids testpatt states status not-in)
(let ((multi-run-mutex (make-mutex))
(let ((run-id-list (if run-ids
(run-id-list (if run-ids
run-ids
(rmt:get-all-run-ids))))
(apply append (map (lambda (run-id)
(rmt:send-receive 'get-tests-for-run-mindata run-id (list run-id testpatt states status not-in)))
run-id-list))))
(rmt:get-all-run-ids)))
(result '()))
(if (null? run-id-list)
'()
(for-each
(lambda (th)
(thread-join! th)) ;; I assume that joining completed threads just moves on
(let loop ((hed (car run-id-list))
(tal (cdr run-id-list))
(threads '()))
(let* ((newthread (make-thread
(lambda ()
(let ((res (rmt:send-receive 'get-tests-for-run-mindata hed (list hed testpatt states status not-in))))
(if (list? res)
(begin
(mutex-lock! multi-run-mutex)
(set! result (append result res))
(mutex-unlock! multi-run-mutex))
(debug:print 0 "ERROR: get-tests-for-run-mindata failed for run-id " hed ", testpatt " testpatt ", states " states ", status " status ", not-in " not-in))))
(conc "multi-run-thread for run-id " hed)))
(newthreads (cons newthread threads)))
(thread-start! newthread)
(thread-sleep! 0.5) ;; give that thread some time to start
(if (null? tal)
newthreads
(loop (car tal)(cdr tal) newthreads))))))
result))
(define (rmt:delete-test-records run-id test-id)
(rmt:send-receive 'delete-test-records run-id (list run-id test-id)))
;; This is not needed as test steps are deleted on test delete call
;;
;; (define (rmt:delete-test-step-records run-id test-id)
|