1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
|
-
+
+
-
+
+
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
+
-
-
-
-
+
+
+
+
+
+
+
+
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
+
+
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
+
+
-
+
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
-
+
-
-
-
-
-
+
+
+
+
+
+
-
+
+
+
+
+
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
|
;;======================================================================
;; Copyright 2017, Matthew Welland.
;; Copyright 2019, Matthew Welland.
;;
;; This file is part of Megatest.
;;
;; Megatest is free software: you can redistribute it and/or modify
;; it under the terms of the GNU General Public License as published by
;; the Free Software Foundation, either version 3 of the License, or
;; (at your option) any later version.
;;
;; Megatest is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
;; GNU General Public License for more details.
;;
;; You should have received a copy of the GNU General Public License
;; along with Megatest. If not, see <http://www.gnu.org/licenses/>.
;;======================================================================
(declare (unit rmtmod))
(declare (uses commonmod))
(declare (uses dbmod))
(module rmtmod
*
(import scheme chicken data-structures extras)
(import (prefix sqlite3 sqlite3:) posix typed-records srfi-18)
(import (prefix sqlite3 sqlite3:) posix typed-records srfi-18 srfi-69 format ports srfi-1 matchable)
(import commonmod)
(import dbmod)
;; Hack to make these functions visible to the refactored code, goal is to eliminate these over time.
(define (rmt:send-receive . params) #f)
(define (http-transport:close-connections . params) #f)
;; from remote defstruct in common.scm
(define (remote-conndat-set! . params) #f)
(define (remote-server-url-set! . params) #f)
(define (remote-ro-mode . params) #f)
(use (prefix ulex ulex:))
(define (remote-ro-mode-set! . params) #f)
(define (remote-ro-mode-checked-set! . params) #f)
(define (remote-ro-mode-checked . params) #f)
(define (debug:print . params) #f)
(define (debug:print-info . params) #f)
(define (set-functions send-receive rsus
close-connections rcs
dbgp dbgpinfo
ro-mode ro-mode-set
(include "common_records.scm")
ro-mode-checked-set ro-mode-checked
)
(set! rmt:send-receive send-receive)
(set! remote-server-url-set! rsus)
(define (rmt:open-qry-close-locally log-port multi-sync-mutex cmd run-id params alldat #!key (remretries 5))
(let* ((ro-queries (alldat-read-only-queries alldat))
(qry-is-write (not (member cmd ro-queries)))
(db-file-path (common:get-db-tmp-area alldat)) ;; 0))
(dbstruct-local (exec-fn 'db:setup #t)) ;; make-dbr:dbstruct path: dbdir local: #t)))
(read-only (not (file-write-access? db-file-path)))
(start (current-milliseconds))
(set! http-transport:close-connections close-connections)
(set! remote-conndat-set! rcs)
(set! debug:print dbgp)
(set! debug:print-info dbgpinfo)
(set! remote-ro-mode ro-mode)
(set! remote-ro-mode-set! ro-mode-set)
(resdat (if (not (and read-only qry-is-write))
(let ((v (exec-fn 'api:execute-requests dbstruct-local (vector (symbol->string cmd) params))))
(handle-exceptions ;; there has been a long history of receiving strange errors from values returned by the client when things go wrong..
exn ;; This is an attempt to detect that situation and recover gracefully
(begin
(debug:print 0 log-port "ERROR: bad data from server " v " message: " ((condition-property-accessor 'exn 'message) exn))
(vector #t '())) ;; should always get a vector but if something goes wrong return a dummy
(if (and (vector? v)
(> (vector-length v) 1))
(let ((newvec (vector (vector-ref v 0)(vector-ref v 1))))
newvec) ;; by copying the vector while inside the error handler we should force the detection of a corrupted record
(vector #t '())))) ;; we could also check that the returned types are valid
(vector #t '())))
(success (vector-ref resdat 0))
(set! remote-ro-mode-checked-set! ro-mode-checked-set)
(set! remote-ro-mode-checked ro-mode-checked))
(res (vector-ref resdat 1))
(duration (- (current-milliseconds) start)))
(define (rmtmod:calc-ro-mode runremote *toppath*)
(if (and runremote
(remote-ro-mode-checked runremote))
(remote-ro-mode runremote)
(let* ((dbfile (conc *toppath* "/megatest.db"))
(ro-mode (not (file-write-access? dbfile)))) ;; TODO: use dbstruct or runremote to figure this out in future
(if runremote
(if (and read-only qry-is-write)
(debug:print 0 log-port "ERROR: attempt to write to read-only database ignored. cmd=" cmd))
(if (not success)
(if (> remretries 0)
(begin
(debug:print-error 0 log-port "local query failed. Trying again.")
(thread-sleep! (/ (random 5000) 1000)) ;; some random delay
(rmt:open-qry-close-locally log-port multi-sync-mutex cmd run-id params alldat remretries: (- remretries 1)))
(begin
(debug:print-error 0 log-port "too many retries in rmt:open-qry-close-locally, giving up")
#f))
(begin
;; (rmt:update-db-stats run-id cmd params duration)
;; mark this run as dirty if this was a write, the watchdog is responsible for syncing it
#;(if qry-is-write
(let ((start-time (current-seconds)))
(mutex-lock! multi-sync-mutex)
(set! *db-last-access* start-time) ;; THIS IS PROBABLY USELESS? (we are on a client)
(mutex-unlock! multi-sync-mutex)))))
res))
(define (rmtmod:calc-ro-mode areadat toppath)
(if (and areadat
(alldat-ro-mode-checked areadat))
(alldat-ro-mode areadat)
(let* ((dbfile (conc toppath "/megatest.db"))
(ro-mode (not (file-write-access? dbfile)))) ;; TODO: use dbstruct or areadat to figure this out in future
(if areadat
(begin
(remote-ro-mode-set! runremote ro-mode)
(remote-ro-mode-checked-set! runremote #t)
(alldat-ro-mode-set! areadat ro-mode)
(alldat-ro-mode-checked-set! areadat #t)
ro-mode)
ro-mode))))
(define (extras-readonly-mode rmt-mutex log-port cmd params)
(mutex-unlock! rmt-mutex)
;;(mutex-unlock! rmt-mutex)
(debug:print-info 12 log-port "rmt:send-receive, case 3")
(debug:print 0 log-port "WARNING: write transaction requested on a readonly area. cmd="cmd" params="params)
#f)
(define (extras-transport-failed *default-log-port* *rmt-mutex* attemptnum runremote cmd rid params)
(debug:print 0 *default-log-port* "WARNING: communication failed. Trying again, try num: " attemptnum)
(mutex-lock! *rmt-mutex*)
(remote-conndat-set! runremote #f)
(http-transport:close-connections area-dat: runremote)
(remote-server-url-set! runremote #f)
(mutex-unlock! *rmt-mutex*)
(debug:print-info 12 *default-log-port* "rmt:send-receive, case 9.1")
(rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1)))
(define (extras-transport-failed log-port rmt-mutex attemptnum areadat areapath cmd rid params alldat)
(debug:print 0 log-port "WARNING: communication failed. Trying again, try num: " attemptnum)
;;(mutex-lock! rmt-mutex)
(alldat-conndat-set! areadat #f)
(exec-fn 'http-transport:close-connections area-dat: areadat)
(alldat-server-url-set! areadat #f)
;;(mutex-unlock! rmt-mutex)
(debug:print-info 12 log-port "rmt:send-receive, case 9.1")
(rmt:send-receive-orig log-port areadat rmt-mutex areapath cmd rid params alldat attemptnum: (+ attemptnum 1)))
(define (extras-transport-succeded *default-log-port* *rmt-mutex* attemptnum runremote res params rid cmd)
(define (extras-transport-succeded log-port rmt-mutex attemptnum areadat areapath res params rid cmd alldat)
(if (and (vector? res)
(eq? (vector-length res) 2)
(eq? (vector-ref res 1) 'overloaded)) ;; since we are
;; looking at the
;; data to carry the
;; error we'll use a
;; fairly obtuse
;; combo to minimise
;; the chances of
;; some sort of
;; collision. this
;; is the case where
;; the returned data
;; is bad or the
;; server is
;; overloaded and we
;; want to ease off
;; the queries
(let ((wait-delay (+ attemptnum (* attemptnum 10))))
(debug:print 0 *default-log-port* "WARNING: server is overloaded. Delaying " wait-delay " seconds and trying call again.")
(mutex-lock! *rmt-mutex*)
(http-transport:close-connections area-dat: runremote)
(set! *runremote* #f) ;; force starting over
(mutex-unlock! *rmt-mutex*)
(debug:print 0 log-port "WARNING: server is overloaded. Delaying " wait-delay " seconds and trying call again.")
;;(mutex-lock! rmt-mutex)
(exec-fn 'http-transport:close-connections area-dat: areadat)
;; (set! *areadat* #f) ;; force starting over
(alldat-server-url-set! areadat #f) ;; I am hoping this will force a redo on server connection. NOT TESTED
;;(mutex-unlock! rmt-mutex)
(thread-sleep! wait-delay)
(rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1)))
(rmt:send-receive-orig log-port areadat rmt-mutex areapath cmd rid params alldat attemptnum: (+ attemptnum 1)))
res)) ;; All good, return res
;; RA => e.g. usage (rmt:send-receive 'get-var #f (list varname))
;;
;; add multi-sync-mutex
;;
(define (rmt:send-receive-orig log-port areadat rmt-mutex toppath multi-sync-mutex cmd rid params alldat #!key (attemptnum 1)(area-dat #f)) ;; start attemptnum at 1 so the modulo below works as expected
)
#;(common:telemetry-log (conc "rmt:"(->string cmd))
payload: `((rid . ,rid)
(params . ,params)))
;; do all the prep locked under the rmt-mutex
;;(mutex-lock! rmt-mutex)
;; 1. check if server is started IFF cmd is a write OR if we are not on the homehost, store in areadat
;; 2. check the age of the connections. refresh the connection if it is older than timeout-20 seconds.
;; 3. do the query, if on homehost use local access
;;
(let* ((start-time (current-seconds)) ;; snapshot time so all use cases get same value
(readonly-mode (rmtmod:calc-ro-mode areadat toppath)))
;; (assert (not (pair? (alldat-hh-dat areadat))))
;;(print "BB> readonly-mode is "readonly-mode" dbfile is "dbfile)
(cond
;; give up if more than 15 attempts
((> attemptnum 15)
(debug:print 0 log-port "ERROR: 15 tries to start/connect to server. Giving up.")
(exit 1))
;; readonly mode, read request- handle it - case 2
((and readonly-mode
(member cmd api:read-only-queries))
;; (mutex-unlock! rmt-mutex)
(debug:print-info 12 log-port "rmt:send-receive, case 2")
(rmt:open-qry-close-locally log-port multi-sync-mutex cmd 0 params alldat)
)
;; readonly mode, write request. Do nothing, return #f
(readonly-mode (extras-readonly-mode rmt-mutex log-port cmd params))
;; This block was for pre-emptively resetting the connection if there had been no communication for some time.
;; I don't think it adds any value. If the server is not there, just fail and start a new connection.
;; also, the expire-time calculation might not be correct. We want, time-since-last-server-access > (server:get-timeout)
;;
;; reset the connection if it has been unused too long
((and areadat
(alldat-conndat areadat)
(> (current-seconds) ;; if it has been more than server-timeout seconds since last contact, close this connection and start a new on
(+ (http-transport:server-dat-get-last-access (alldat-conndat areadat))
(alldat-server-timeout areadat))))
(debug:print-info 0 log-port "Connection to " (alldat-server-url areadat) " expired due to no accesses, forcing new connection.")
(exec-fn 'http-transport:close-connections area-dat: areadat)
(alldat-conndat-set! areadat #f) ;; invalidate the connection, thus forcing a new connection.
;; (mutex-unlock! rmt-mutex)
(rmt:send-receive-orig log-port areadat rmt-mutex toppath multi-sync-mutex cmd rid params alldat attemptnum: attemptnum))
;; on homehost and this is a read
((and (not (alldat-force-server areadat)) ;; honor forced use of server, i.e. server NOT required
(pair? (alldat-hh-dat areadat))
(cdr (alldat-hh-dat areadat)) ;; on homehost
(member cmd api:read-only-queries)) ;; this is a read
;; (mutex-unlock! rmt-mutex)
(debug:print-info 12 log-port "rmt:send-receive, case 5")
(rmt:open-qry-close-locally log-port multi-sync-mutex cmd 0 params alldat))
;; on homehost and this is a write, we already have a server, but server has died
((and (cdr (alldat-hh-dat areadat)) ;; on homehost
(not (member cmd api:read-only-queries)) ;; this is a write
(alldat-server-url areadat) ;; have a server
(not (exec-fn 'server:ping (alldat-server-url areadat)))) ;; server has died. NOTE: this is not a cheap call! Need better approach.
;; (set! *areadat* (make-remote)) ;; WARNING - broken this.
(alldat-force-server-set! areadat (exec-fn 'common:force-server?))
;; (mutex-unlock! rmt-mutex)
(debug:print-info 12 log-port "rmt:send-receive, case 6")
(rmt:send-receive-orig log-port areadat rmt-mutex toppath multi-sync-mutex cmd rid params alldat attemptnum: attemptnum))
;; on homehost and this is a write, we already have a server
((and (not (alldat-force-server areadat)) ;; honor forced use of server, i.e. server NOT required
(cdr (alldat-hh-dat areadat)) ;; on homehost
(not (member cmd api:read-only-queries)) ;; this is a write
(alldat-server-url areadat)) ;; have a server
;;(mutex-unlock! rmt-mutex)
(debug:print-info 12 log-port "rmt:send-receive, case 4.1")
(rmt:open-qry-close-locally log-port multi-sync-mutex cmd 0 params alldat))
;; on homehost, no server contact made and this is a write, passively start a server
((and (not (alldat-force-server areadat)) ;; honor forced use of server, i.e. server NOT required
(cdr (alldat-hh-dat areadat)) ;; have homehost
(not (alldat-server-url areadat)) ;; no connection yet
(not (member cmd api:read-only-queries))) ;; not a read-only query
(debug:print-info 12 log-port "rmt:send-receive, case 8")
(let ((server-url (exec-fn 'server:check-if-running toppath))) ;; (server:read-dotserver->url toppath))) ;; (server:check-if-running toppath))) ;; Do NOT want to run server:check-if-running - very expensive to do for every write call
(if server-url
(alldat-server-url-set! areadat server-url) ;; the string can be consumed by the client setup if needed
(if (exec-fn 'common:force-server?)
(exec-fn 'server:start-and-wait toppath)
(exec-fn 'server:kind-run toppath))))
(alldat-force-server-set! areadat (exec-fn 'common:force-server?))
;; (mutex-unlock! rmt-mutex)
(debug:print-info 12 log-port "rmt:send-receive, case 8.1")
(rmt:open-qry-close-locally log-port multi-sync-mutex cmd 0 params alldat))
((or (and (alldat-force-server areadat) ;; we are forcing a server and don't yet have a connection to one
(not (alldat-conndat areadat)))
(and (not (cdr (alldat-hh-dat areadat))) ;; not on a homehost
(not (alldat-conndat areadat)))) ;; and no connection
(debug:print-info 12 log-port "rmt:send-receive, case 9, hh-dat: " (alldat-hh-dat areadat) " conndat: " (alldat-conndat areadat))
;;(mutex-unlock! rmt-mutex)
(if (not (exec-fn 'server:check-if-running toppath)) ;; who knows, maybe one has started up?
(exec-fn 'server:start-and-wait toppath))
(alldat-conndat-set! areadat (rmt:get-connection-info areadat toppath)) ;; calls client:setup which calls client:setup-http
(rmt:send-receive-orig log-port areadat rmt-mutex toppath multi-sync-mutex cmd rid params alldat attemptnum: attemptnum)) ;; TODO: add back-off timeout as
;; all set up if get this far, dispatch the query
((and (not (alldat-force-server areadat))
(cdr (alldat-hh-dat areadat))) ;; we are on homehost
;;(mutex-unlock! rmt-mutex)
(debug:print-info 12 log-port "rmt:send-receive, case 10")
(rmt:open-qry-close-locally log-port multi-sync-mutex cmd (if rid rid 0) params alldat))
;; not on homehost, do server query
(else (extras-case-11 log-port rmt-mutex areadat toppath cmd params attemptnum rid alldat)))))
(define (extras-case-11 log-port rmt-mutex areadat areapath cmd params attemptnum rid alldat)
;; (mutex-unlock! rmt-mutex)
(debug:print-info 12 log-port "rmt:send-receive, case 9")
;; (mutex-lock! rmt-mutex)
(let* ((conninfo (alldat-conndat areadat))
(dat (case (alldat-transport areadat)
((http) (condition-case ;; handling here has
;; caused a lot of
;; problems. However it
;; is needed to deal with
;; attemtped
;; communication to
;; servers that have gone
;; away
(exec-fn 'http-transport:client-api-send-receive 0 conninfo cmd params)
((commfail)(vector #f "communications fail"))
((exn)(vector #f "other fail" (print-call-chain)))))
(else
(debug:print 0 log-port "ERROR: transport " (alldat-transport areadat) " not supported")
(exit))))
(success (if (vector? dat) (vector-ref dat 0) #f))
(res (if (vector? dat) (vector-ref dat 1) #f)))
(if (and (vector? conninfo) (< 5 (vector-length conninfo)))
(http-transport:server-dat-update-last-access conninfo) ;; refresh access time
(begin
(debug:print 0 log-port "INFO: Should not get here! conninfo=" conninfo)
(set! conninfo #f)
(alldat-conndat-set! areadat #f) ;; NOTE: *areadat* is global copy of areadat. Purpose: factor out global.
(exec-fn 'http-transport:close-connections area-dat: areadat)))
(debug:print-info 13 log-port "rmt:send-receive, case 9. conninfo=" conninfo " dat=" dat " areadat = " areadat)
;; (mutex-unlock! rmt-mutex)
(if success ;; success only tells us that the transport was
;; successful, have to examine the data to see if
;; there was a detected issue at the other end
(extras-transport-succeded log-port rmt-mutex attemptnum areadat areapath res params rid cmd alldat)
(extras-transport-failed log-port rmt-mutex attemptnum areadat areapath cmd rid params alldat)
)))
;; if a server is either running or in the process of starting call client:setup
;; else return #f to let the calling proc know that there is no server available
;;
(define (rmt:get-connection-info areadat areapath #!key (area-dat #f)) ;; TODO: push areapath down.
(let* (;; (areadat (or area-dat areadat))
(cinfo (if (alldat? areadat)
(alldat-conndat areadat)
#f)))
(if cinfo
cinfo
(if (exec-fn 'server:check-if-running areapath)
(exec-fn 'client:setup areadat areapath)
#f))))
;;======================================================================
;; ulex and steps stuff
;;======================================================================
(define (rmtmod:setup-ulex toppath)
(ulex:make-area
dbdir: (conc toppath "/ulexdb")
pktsdir: (conc toppath "/pkts")
))
(define (rmtmod:send-receive-ulex ulex:conn cmd rid params attemptnum area-dat)
#f)
(use trace)(trace-call-sites #t)
;; (trace member rmtmod:calc-ro-mode rmt:open-qry-close-locally)
)
|