50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
+
+
+
+
+
+
+
+
+
+
-
+
-
-
-
-
-
-
-
-
-
-
-
-
+
-
+
+
+
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
|
(vector-set! record 1 count)
(if (and (> count 10)
(> queries-per-second 10))
(begin
(debug:print-info 1 "db write rate too high, starting a server, count=" count " start=" start " run-id=" run-id " queries-per-second=" queries-per-second)
#t)
#f))))
(define (rmt:get-connection-info run-id)
(let ((cinfo (hash-table-ref/default *runremote* run-id #f)))
(if cinfo
cinfo
;; NB// can cache the answer for server running for 10 seconds ...
;; ;; (and (not (rmt:write-frequency-over-limit? cmd run-id))
(if (tasks:server-running-or-starting? (db:delay-if-busy (tasks:open-db)) run-id)
(client:setup run-id)
#f))))
;; cmd is a symbol
;; vars is a json string encoding the parameters for the call
;;
(define (rmt:send-receive cmd rid params #!key (attemptnum 0))
;; clean out old connections
(mutex-lock! *db-multi-sync-mutex*)
(let ((expire-time (- (current-seconds) 60)))
(for-each
(lambda (run-id)
(let ((connection (hash-table-ref/default *runremote* run-id #f)))
(if (and connection
(< (http-transport:server-dat-get-last-access connection) expire-time))
(begin
(debug:print-info 0 "Discarding connection to server for run-id " run-id ", too long between accesses")
(hash-table-delete! *runremote* run-id)))))
(hash-table-keys *runremote*)))
(mutex-unlock! *db-multi-sync-mutex*)
(let* ((run-id (if rid rid 0))
(connection-info (let ((cinfo (hash-table-ref/default *runremote* run-id #f)))
(connection-info (rmt:get-connection-info run-id))
(if cinfo
cinfo
;; NB// can cache the answer for server running for 10 seconds ...
;; ;; (and (not (rmt:write-frequency-over-limit? cmd run-id))
(if (tasks:server-running-or-starting? (db:delay-if-busy
(tasks:open-db))
run-id)
(let ((res (client:setup run-id)))
(if res
(hash-table-ref/default *runremote* run-id #f) ;; client:setup filled this in (hopefully)
#f))
#f))))
(jparams (db:obj->string params)))
(if connection-info
;; use the server if have connection info
(let ((res (http-transport:client-api-send-receive run-id connection-info cmd jparams)))
(let* ((dat (http-transport:client-api-send-receive run-id connection-info cmd jparams))
(res (if dat (vector-ref dat 1) #f))
(success (if dat (vector-ref dat 0) #f)))
(http-transport:server-dat-update-last-access connection-info)
(if res
(or(db:string->obj res)
(begin
(thread-sleep! 0.5)
(rmt:send-receive cmd rid params attempnum: (+ attemptnum 1))))
(if success
(db:string->obj res)
;; (if (< attemptnum 100)
;; (begin
;; (hash-table-delete! *runremote* run-id)
;; (thread-sleep! 0.5)
;; (rmt:send-receive cmd rid params attempnum: (+ attemptnum 1)))
;; (begin
;; (print-call-chain)
;; (debug:print 0 "ERROR: too many attempts to communicate have failed. Giving up. Kill your mtest processes and start over")
;; (exit 1)))))
(begin ;; let ((new-connection-info (client:setup run-id)))
(debug:print 0 "WARNING: Communication failed, trying call to http-transport:client-api-send-receive again.")
(hash-table-delete! *runremote* run-id) ;; don't keep using the same connection
;; no longer killing the server in http-transport:client-api-send-receive
;; may kill it here but what are the criteria?
;; start with three calls then kill server
|
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
|
-
+
|
(if (> curr-max-val max-avg-qry)
(if (common:low-noise-print 10 "start server due to max average query too long")
(begin
(debug:print-info 0 "Max average query, " (inexact->exact (round curr-max-val)) "ms (" (car curr-max) ") exceeds " max-avg-qry "ms, try starting server ...")
(server:kind-run run-id))
(debug:print-info 3 "Max average query, " (inexact->exact (round curr-max-val)) "ms (" (car curr-max) ") below " max-avg-qry "ms, not starting server...")))
(rmt:open-qry-close-locally cmd run-id params)))))))
(define (rmt:update-db-stats run-id rawcmd params duration)
(mutex-lock! *db-stats-mutex*)
(handle-exceptions
exn
(begin
(debug:print 0 "WARNING: stats collection failed in update-db-stats")
(debug:print 0 " message: " ((condition-property-accessor 'exn 'message) exn))
|
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
|
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
|
-
-
-
-
-
+
+
+
+
+
+
+
|
;; just set it every time. Is a write more expensive than a read and does it matter?
(hash-table-set! *db-local-sync* run-id start-time) ;; the oldest "write"
(mutex-unlock! *db-multi-sync-mutex*)))
res)))
(define (rmt:send-receive-no-auto-client-setup connection-info cmd run-id params)
(let* ((run-id (if run-id run-id 0))
(jparams (db:obj->string params)) ;; (rmt:dat->json-str params))
(res (http-transport:client-api-send-receive run-id connection-info cmd jparams)))
(if res
(db:string->obj res)
res)))
(jparams (db:obj->string params)) ;; (rmt:dat->json-str params))
(dat (http-transport:client-api-send-receive run-id connection-info cmd jparams)))
(if (and dat (vector-ref dat 0))
(db:string->obj (vector-ref dat 1))
(begin
(debug:print 0 "ERROR: rmt:send-receive-no-auto-client-setup failed, attempting to continue. Got " res)
res))))
;; Wrap json library for strings (why the ports crap in the first place?)
(define (rmt:dat->json-str dat)
(with-output-to-string
(lambda ()
(json-write dat))))
|