11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
(use json format)
(declare (unit rmt))
(declare (uses api))
(declare (uses tdb))
(declare (uses http-transport))
;;
;; THESE ARE ALL CALLED ON THE CLIENT SIDE!!!
;;
;; ;; For debugging add the following to ~/.megatestrc
;;
;; (require-library trace)
;; (import trace)
;; (trace
;; rmt:send-receive
;; api:execute-requests
;; )
;;======================================================================
;; S U P P O R T F U N C T I O N S
;;======================================================================
;; #t means - please start a server!
;;
(define (rmt:write-frequency-over-limit? cmd run-id)
(and (not (member cmd api:read-only-queries))
(let* ((tmprec (hash-table-ref/default *write-frequency* run-id #f))
(record (if tmprec tmprec
(let ((v (vector (current-seconds) 0)))
(hash-table-set! *write-frequency* run-id v)
|
>
>
>
>
>
>
>
>
|
|
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
(use json format)
(declare (unit rmt))
(declare (uses api))
(declare (uses tdb))
(declare (uses http-transport))
(declare (uses nmsg-transport))
;;
;; THESE ARE ALL CALLED ON THE CLIENT SIDE!!!
;;
;; ;; For debugging add the following to ~/.megatestrc
;;
;; (require-library trace)
;; (import trace)
;; (trace
;; rmt:send-receive
;; api:execute-requests
;; )
;;======================================================================
;; S U P P O R T F U N C T I O N S
;;======================================================================
(define (rmt:call-transport run-id connection-info cmd jparams)
(case (server:get-transport)
((rpc) ( rpc-transport:client-api-send-receive run-id connection-info cmd jparams))
((http) (http-transport:client-api-send-receive run-id connection-info cmd jparams))
((fs) ( fs-transport:client-api-send-receive run-id connection-info cmd jparams))
((zmq) (zmq-transport:client-api-send-receive run-id connection-info cmd jparams))
(else ( rpc-transport:client-api-send-receive run-id connection-info cmd jparams))))
;;
(define (rmt:write-frequency-over-limit? cmd run-id)
(and (not (member cmd api:read-only-queries))
(let* ((tmprec (hash-table-ref/default *write-frequency* run-id #f))
(record (if tmprec tmprec
(let ((v (vector (current-seconds) 0)))
(hash-table-set! *write-frequency* run-id v)
|
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
cinfo
;; NB// can cache the answer for server running for 10 seconds ...
;; ;; (and (not (rmt:write-frequency-over-limit? cmd run-id))
(if (tasks:server-running-or-starting? (db:delay-if-busy (tasks:open-db)) run-id)
(client:setup run-id)
#f))))
;; cmd is a symbol
;; vars is a json string encoding the parameters for the call
;;
(define (rmt:send-receive cmd rid params #!key (attemptnum 0))
;; clean out old connections
(mutex-lock! *db-multi-sync-mutex*)
(let ((expire-time (- (current-seconds) 60)))
(for-each
(lambda (run-id)
(let ((connection (hash-table-ref/default *runremote* run-id #f)))
(if (and connection
(< (http-transport:server-dat-get-last-access connection) expire-time))
(begin
(debug:print-info 0 "Discarding connection to server for run-id " run-id ", too long between accesses")
(hash-table-delete! *runremote* run-id)))))
(hash-table-keys *runremote*)))
(mutex-unlock! *db-multi-sync-mutex*)
(let* ((run-id (if rid rid 0))
(connection-info (rmt:get-connection-info run-id))
(jparams (db:obj->string params)))
(if connection-info
;; use the server if have connection info
(let* ((dat (http-transport:client-api-send-receive run-id connection-info cmd jparams))
(res (if (and dat (vector? dat)) (vector-ref dat 1) #f))
(success (if (and dat (vector? dat)) (vector-ref dat 0) #f)))
(http-transport:server-dat-update-last-access connection-info)
(if success
(db:string->obj res)
;; (if (< attemptnum 100)
;; (begin
;; (hash-table-delete! *runremote* run-id)
;; (thread-sleep! 0.5)
;; (rmt:send-receive cmd rid params attempnum: (+ attemptnum 1)))
;; (begin
;; (print-call-chain (current-error-port))
;; (debug:print 0 "ERROR: too many attempts to communicate have failed. Giving up. Kill your mtest processes and start over")
;; (exit 1)))))
(begin ;; let ((new-connection-info (client:setup run-id)))
(debug:print 0 "WARNING: Communication failed, trying call to http-transport:client-api-send-receive again.")
(hash-table-delete! *runremote* run-id) ;; don't keep using the same connection
;; no longer killing the server in http-transport:client-api-send-receive
;; may kill it here but what are the criteria?
;; start with three calls then kill server
(if (eq? attemptnum 3)(tasks:kill-server-run-id run-id))
(thread-sleep! 2)
(rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1)))))
(if (and (< attemptnum 10)
(tasks:need-server run-id))
(begin
(tasks:start-and-wait-for-server (db:delay-if-busy (tasks:open-db)) run-id 10)
(rmt:send-receive cmd rid params (+ attemptnum 1)))
(rmt:open-qry-close-locally cmd run-id params)))))
(define (rmt:update-db-stats run-id rawcmd params duration)
(mutex-lock! *db-stats-mutex*)
(handle-exceptions
exn
(begin
(debug:print 0 "WARNING: stats collection failed in update-db-stats")
|
|
<
<
|
|
|
|
|
|
>
>
>
>
|
>
|
<
>
>
>
|
>
>
>
>
>
|
|
<
<
|
<
<
|
>
|
<
<
|
|
>
>
>
>
>
>
>
|
|
>
|
>
>
>
>
|
>
>
>
>
|
>
|
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
|
cinfo
;; NB// can cache the answer for server running for 10 seconds ...
;; ;; (and (not (rmt:write-frequency-over-limit? cmd run-id))
(if (tasks:server-running-or-starting? (db:delay-if-busy (tasks:open-db)) run-id)
(client:setup run-id)
#f))))
(define *send-receive-mutex* (make-mutex)) ;; should have separate mutex per run-id
(define (rmt:send-receive cmd rid params #!key (attemptnum 1)) ;; start attemptnum at 1 so the modulo below works as expected
;; clean out old connections
(mutex-lock! *db-multi-sync-mutex*)
(let ((expire-time (- (current-seconds) (server:get-timeout) 10))) ;; don't forget the 10 second margin
(for-each
(lambda (run-id)
(let ((connection (hash-table-ref/default *runremote* run-id #f)))
(if (and connection
(< (http-transport:server-dat-get-last-access connection) expire-time))
(begin
(debug:print-info 0 "Discarding connection to server for run-id " run-id ", too long between accesses")
;; SHOULD CLOSE THE CONNECTION HERE
(case *transport-type*
((nmsg)(nn-close (http-transport:server-dat-get-socket
(hash-table-ref *runremote* run-id)))))
(hash-table-delete! *runremote* run-id)))))
(hash-table-keys *runremote*)))
(mutex-unlock! *db-multi-sync-mutex*)
;; (mutex-lock! *send-receive-mutex*)
(let* ((run-id (if rid rid 0))
(connection-info (rmt:get-connection-info run-id)))
;; the nmsg method does the encoding under the hood (the http method should be changed to do this also)
(if connection-info
;; use the server if have connection info
(let* ((dat (case *transport-type*
((http)(condition-case
(http-transport:client-api-send-receive run-id connection-info cmd params)
((commfail)(vector #f "communications fail"))))
((nmsg)(condition-case
(nmsg-transport:client-api-send-receive run-id connection-info cmd params)
((timeout)(vector #f "timeout talking to server"))))
(else (exit))))
(success (if (and dat (vector? dat)) (vector-ref dat 0) #f))
(res (if (and dat (vector? dat)) (vector-ref dat 1) #f)))
(http-transport:server-dat-update-last-access connection-info)
(if success
(begin
;; (mutex-unlock! *send-receive-mutex*)
(case *transport-type*
((http) res) ;; (db:string->obj res))
((nmsg) res))) ;; (vector-ref res 1)))
(begin ;; let ((new-connection-info (client:setup run-id)))
(debug:print 0 "WARNING: Communication failed, trying call to rmt:send-receive again.")
;; (case *transport-type*
;; ((nmsg)(nn-close (http-transport:server-dat-get-socket connection-info))))
(hash-table-delete! *runremote* run-id) ;; don't keep using the same connection
(if (eq? (modulo attemptnum 5) 0)
(tasks:kill-server-run-id run-id tag: "api-send-receive-failed"))
;; (mutex-unlock! *send-receive-mutex*) ;; close the mutex here to allow other threads access to communications
(tasks:start-and-wait-for-server (tasks:open-db) run-id 15)
;; (nmsg-transport:client-api-send-receive run-id connection-info cmd param remtries: (- remtries 1))))))
;; no longer killing the server in http-transport:client-api-send-receive
;; may kill it here but what are the criteria?
;; start with three calls then kill server
;; (if (eq? attemptnum 3)(tasks:kill-server-run-id run-id))
;; (thread-sleep! 2)
(rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1)))))
;; no connection info? try to start a server
(if (and (< attemptnum 15)
(tasks:need-server run-id))
(begin
(hash-table-delete! *runremote* run-id)
;; (mutex-unlock! *send-receive-mutex*)
(tasks:start-and-wait-for-server (db:delay-if-busy (tasks:open-db)) run-id 10)
(client:setup run-id)
(thread-sleep! (random 5)) ;; give some time to settle and minimize collison?
(rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1)))
(begin
;; (debug:print 0 "ERROR: Communication failed!")
;; (mutex-unlock! *send-receive-mutex*)
;; (exit)
(rmt:open-qry-close-locally cmd run-id params)
)))))
(define (rmt:update-db-stats run-id rawcmd params duration)
(mutex-lock! *db-stats-mutex*)
(handle-exceptions
exn
(begin
(debug:print 0 "WARNING: stats collection failed in update-db-stats")
|
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
|
(let* ((dbdir (conc (configf:lookup *configdat* "setup" "linktree") "/.db"))
(db (make-dbr:dbstruct path: dbdir local: #t)))
(set! *dbstruct-db* db)
db)))
(db-file-path (db:dbfile-path 0)))
;; (read-only (not (file-read-access? db-file-path)))
(let* ((start (current-milliseconds))
(res (api:execute-requests dbstruct-local (symbol->string cmd) params))
(duration (- (current-milliseconds) start)))
(rmt:update-db-stats run-id cmd params duration)
;; mark this run as dirty if this was a write
(if (not (member cmd api:read-only-queries))
(let ((start-time (current-seconds)))
(mutex-lock! *db-multi-sync-mutex*)
;; (if (not (hash-table-ref/default *db-local-sync* run-id #f))
;; just set it every time. Is a write more expensive than a read and does it matter?
(hash-table-set! *db-local-sync* (or run-id 0) start-time) ;; the oldest "write"
(mutex-unlock! *db-multi-sync-mutex*)))
res)))
(define (rmt:send-receive-no-auto-client-setup connection-info cmd run-id params)
(let* ((run-id (if run-id run-id 0))
(jparams (db:obj->string params)) ;; (rmt:dat->json-str params))
(dat (http-transport:client-api-send-receive run-id connection-info cmd jparams)))
(if (and dat (vector-ref dat 0))
(db:string->obj (vector-ref dat 1))
(begin
(debug:print 0 "ERROR: rmt:send-receive-no-auto-client-setup failed, attempting to continue. Got " dat)
dat))))
;; Wrap json library for strings (why the ports crap in the first place?)
(define (rmt:dat->json-str dat)
(with-output-to-string
(lambda ()
(json-write dat))))
|
|
>
|
|
|
>
>
|
|
|
|
|
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
|
(let* ((dbdir (conc (configf:lookup *configdat* "setup" "linktree") "/.db"))
(db (make-dbr:dbstruct path: dbdir local: #t)))
(set! *dbstruct-db* db)
db)))
(db-file-path (db:dbfile-path 0)))
;; (read-only (not (file-read-access? db-file-path)))
(let* ((start (current-milliseconds))
(resdat (api:execute-requests dbstruct-local (vector (symbol->string cmd) params)))
(res (vector-ref resdat 1))
(duration (- (current-milliseconds) start)))
(rmt:update-db-stats run-id cmd params duration)
;; mark this run as dirty if this was a write
(if (not (member cmd api:read-only-queries))
(let ((start-time (current-seconds)))
(mutex-lock! *db-multi-sync-mutex*)
;; (if (not (hash-table-ref/default *db-local-sync* run-id #f))
;; just set it every time. Is a write more expensive than a read and does it matter?
(hash-table-set! *db-local-sync* (or run-id 0) start-time) ;; the oldest "write"
(mutex-unlock! *db-multi-sync-mutex*)))
res)))
(define (rmt:send-receive-no-auto-client-setup connection-info cmd run-id params)
(let* ((run-id (if run-id run-id 0))
;; (jparams (db:obj->string params)) ;; (rmt:dat->json-str params))
(res (http-transport:client-api-send-receive run-id connection-info cmd params)))
(if (and res (vector-ref res 0))
res
#f)))
;; (db:string->obj (vector-ref dat 1))
;; (begin
;; (debug:print 0 "ERROR: rmt:send-receive-no-auto-client-setup failed, attempting to continue. Got " dat)
;; dat))))
;; Wrap json library for strings (why the ports crap in the first place?)
(define (rmt:dat->json-str dat)
(with-output-to-string
(lambda ()
(json-write dat))))
|
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
|
;; M I S C
;;======================================================================
(define (rmt:login run-id)
(rmt:send-receive 'login run-id (list *toppath* megatest-version run-id *my-client-signature*)))
;; This login does no retries under the hood - it acts a bit like a ping.
;;
(define (rmt:login-no-auto-client-setup connection-info run-id)
(rmt:send-receive-no-auto-client-setup connection-info 'login run-id (list *toppath* megatest-version run-id *my-client-signature*)))
;; hand off a call to one of the db:queries statements
;; added run-id to make looking up the correct db possible
;;
(define (rmt:general-call stmtname run-id . params)
(rmt:send-receive 'general-call run-id (append (list stmtname run-id) params)))
(define (rmt:sync-inmem->db run-id)
|
>
>
|
>
|
|
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
|
;; M I S C
;;======================================================================
(define (rmt:login run-id)
(rmt:send-receive 'login run-id (list *toppath* megatest-version run-id *my-client-signature*)))
;; This login does no retries under the hood - it acts a bit like a ping.
;; Deprecated for nmsg-transport.
;;
(define (rmt:login-no-auto-client-setup connection-info run-id)
(case *transport-type*
((http)(rmt:send-receive-no-auto-client-setup connection-info 'login run-id (list *toppath* megatest-version run-id *my-client-signature*)))
((nmsg)(nmsg-transport:client-api-send-receive run-id connection-info 'login (list *toppath* megatest-version run-id *my-client-signature*)))))
;; hand off a call to one of the db:queries statements
;; added run-id to make looking up the correct db possible
;;
(define (rmt:general-call stmtname run-id . params)
(rmt:send-receive 'general-call run-id (append (list stmtname run-id) params)))
(define (rmt:sync-inmem->db run-id)
|