31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
-
+
-
-
+
+
-
-
+
+
+
-
+
|
(include "common_records.scm")
;; Hack to make these functions visible to the refactored code, goal is to eliminate these over time.
;; (define (rmt:send-receive . params) #f)
(define (http-transport:close-connections . params) #f)
;; from remote defstruct in common.scm
(define (api:execute-requests . params) #f)
;; (define (api:execute-requests . params) #f)
(define (api:read-only-queries . params) #f)
(define (http-transport:client-api-send-receive . params) #f)
(define (client:setup . params) #f)
(define (server:kind-run . params) #f)
(define (server:start-and-wait . params) #f)
(define (server:check-if-running . params) #f)
(define (server:ping . params) #f)
(define (common:force-server? . params) #f)
;; 'send-receive rmt:send-receive ...
(define (set-functions . alldata)
(match
alldata
((a b c d e f g h i j) ;; e f g h i j k l)
(set! http-transport:client-api-send-receive a)
(set! http-transport:close-connections b)
(set! apt:execute-requests d)
;; (set! api:execute-requests c)
;; d
(set! client:setup e)
(set! server:kind-run f)
(set! server:start-and-wait g)
(set! server:check-if-running h)
(set! server:ping i)
(set! common:force-server? j)
)))
(define (rmt:open-qry-close-locally log-port multi-sync-mutex cmd run-id params #!key (ro-queries '())(remretries 5))
(let* ((qry-is-write (not (member cmd ro-queries)))
(define (rmt:open-qry-close-locally log-port multi-sync-mutex cmd run-id params alldat #!key (remretries 5))
(let* ((ro-queries (alldat-read-only-queries alldat))
(qry-is-write (not (member cmd ro-queries)))
(db-file-path (exec-fn 'db:dbfile-path)) ;; 0))
(dbstruct-local (exec-fn 'db:setup #t)) ;; make-dbr:dbstruct path: dbdir local: #t)))
(read-only (not (file-write-access? db-file-path)))
(start (current-milliseconds))
(resdat (if (not (and read-only qry-is-write))
(let ((v (api:execute-requests dbstruct-local (vector (symbol->string cmd) params))))
(let ((v (exec-fn 'api:execute-requests dbstruct-local (vector (symbol->string cmd) params))))
(handle-exceptions ;; there has been a long history of receiving strange errors from values returned by the client when things go wrong..
exn ;; This is an attempt to detect that situation and recover gracefully
(begin
(debug:print 0 log-port "ERROR: bad data from server " v " message: " ((condition-property-accessor 'exn 'message) exn))
(vector #t '())) ;; should always get a vector but if something goes wrong return a dummy
(if (and (vector? v)
(> (vector-length v) 1))
|
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
|
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
-
+
|
(if (and read-only qry-is-write)
(debug:print 0 log-port "ERROR: attempt to write to read-only database ignored. cmd=" cmd))
(if (not success)
(if (> remretries 0)
(begin
(debug:print-error 0 log-port "local query failed. Trying again.")
(thread-sleep! (/ (random 5000) 1000)) ;; some random delay
(rmt:open-qry-close-locally log-port multi-sync-mutex cmd run-id params ro-queries: ro-queries remretries: (- remretries 1)))
(rmt:open-qry-close-locally log-port multi-sync-mutex cmd run-id params alldat remretries: (- remretries 1)))
(begin
(debug:print-error 0 log-port "too many retries in rmt:open-qry-close-locally, giving up")
#f))
(begin
;; (rmt:update-db-stats run-id cmd params duration)
;; mark this run as dirty if this was a write, the watchdog is responsible for syncing it
#;(if qry-is-write
|
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
|
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
|
-
+
-
+
-
+
|
(define (extras-readonly-mode rmt-mutex log-port cmd params)
;;(mutex-unlock! rmt-mutex)
(debug:print-info 12 log-port "rmt:send-receive, case 3")
(debug:print 0 log-port "WARNING: write transaction requested on a readonly area. cmd="cmd" params="params)
#f)
(define (extras-transport-failed log-port rmt-mutex attemptnum runremote areapath cmd rid params)
(define (extras-transport-failed log-port rmt-mutex attemptnum runremote areapath cmd rid params alldat)
(debug:print 0 log-port "WARNING: communication failed. Trying again, try num: " attemptnum)
;;(mutex-lock! rmt-mutex)
(remote-conndat-set! runremote #f)
(http-transport:close-connections area-dat: runremote)
(remote-server-url-set! runremote #f)
;;(mutex-unlock! rmt-mutex)
(debug:print-info 12 log-port "rmt:send-receive, case 9.1")
(rmt:send-receive-orig log-port runremote rmt-mutex areapath cmd rid params attemptnum: (+ attemptnum 1)))
(rmt:send-receive-orig log-port runremote rmt-mutex areapath cmd rid params alldat attemptnum: (+ attemptnum 1)))
(define (extras-transport-succeded log-port rmt-mutex attemptnum runremote areapath res params rid cmd)
(define (extras-transport-succeded log-port rmt-mutex attemptnum runremote areapath res params rid cmd alldat)
(if (and (vector? res)
(eq? (vector-length res) 2)
(eq? (vector-ref res 1) 'overloaded)) ;; since we are
;; looking at the
;; data to carry the
;; error we'll use a
;; fairly obtuse
|
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
|
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
|
-
+
-
+
|
(debug:print 0 log-port "WARNING: server is overloaded. Delaying " wait-delay " seconds and trying call again.")
;;(mutex-lock! rmt-mutex)
(http-transport:close-connections area-dat: runremote)
;; (set! *runremote* #f) ;; force starting over
(remote-server-url-set! runremote #f) ;; I am hoping this will force a redo on server connection. NOT TESTED
;;(mutex-unlock! rmt-mutex)
(thread-sleep! wait-delay)
(rmt:send-receive-orig log-port runremote rmt-mutex areapath cmd rid params attemptnum: (+ attemptnum 1)))
(rmt:send-receive-orig log-port runremote rmt-mutex areapath cmd rid params alldat attemptnum: (+ attemptnum 1)))
res)) ;; All good, return res
;; RA => e.g. usage (rmt:send-receive 'get-var #f (list varname))
;;
;; add multi-sync-mutex
;;
(define (rmt:send-receive-orig log-port runremote rmt-mutex toppath multi-sync-mutex cmd rid params #!key (attemptnum 1)(area-dat #f)) ;; start attemptnum at 1 so the modulo below works as expected
(define (rmt:send-receive-orig log-port runremote rmt-mutex toppath multi-sync-mutex cmd rid params alldat #!key (attemptnum 1)(area-dat #f)) ;; start attemptnum at 1 so the modulo below works as expected
#;(common:telemetry-log (conc "rmt:"(->string cmd))
payload: `((rid . ,rid)
(params . ,params)))
;; do all the prep locked under the rmt-mutex
|
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
|
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
|
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
|
(exit 1))
;; readonly mode, read request- handle it - case 2
((and readonly-mode
(member cmd api:read-only-queries))
;; (mutex-unlock! rmt-mutex)
(debug:print-info 12 log-port "rmt:send-receive, case 2")
(rmt:open-qry-close-locally log-port multi-sync-mutex cmd 0 params ro-queries: api:read-only-queries)
(rmt:open-qry-close-locally log-port multi-sync-mutex cmd 0 params alldat)
)
;; readonly mode, write request. Do nothing, return #f
(readonly-mode (extras-readonly-mode rmt-mutex log-port cmd params))
;; This block was for pre-emptively resetting the connection if there had been no communication for some time.
;; I don't think it adds any value. If the server is not there, just fail and start a new connection.
;; also, the expire-time calculation might not be correct. We want, time-since-last-server-access > (server:get-timeout)
;;
;; reset the connection if it has been unused too long
((and runremote
(remote-conndat runremote)
(> (current-seconds) ;; if it has been more than server-timeout seconds since last contact, close this connection and start a new on
(+ (http-transport:server-dat-get-last-access (remote-conndat runremote))
(remote-server-timeout runremote))))
(debug:print-info 0 log-port "Connection to " (remote-server-url runremote) " expired due to no accesses, forcing new connection.")
(http-transport:close-connections area-dat: runremote)
(remote-conndat-set! runremote #f) ;; invalidate the connection, thus forcing a new connection.
;; (mutex-unlock! rmt-mutex)
(rmt:send-receive-orig log-port runremote rmt-mutex toppath multi-sync-mutex cmd rid params attemptnum: attemptnum))
(rmt:send-receive-orig log-port runremote rmt-mutex toppath multi-sync-mutex cmd rid params alldat attemptnum: attemptnum))
;; on homehost and this is a read
((and (not (remote-force-server runremote)) ;; honor forced use of server, i.e. server NOT required
(pair? (remote-hh-dat runremote))
(cdr (remote-hh-dat runremote)) ;; on homehost
(member cmd api:read-only-queries)) ;; this is a read
;; (mutex-unlock! rmt-mutex)
(debug:print-info 12 log-port "rmt:send-receive, case 5")
(rmt:open-qry-close-locally log-port multi-sync-mutex cmd 0 params ro-queries: api:read-only-queries))
(rmt:open-qry-close-locally log-port multi-sync-mutex cmd 0 params alldat))
;; on homehost and this is a write, we already have a server, but server has died
((and (cdr (remote-hh-dat runremote)) ;; on homehost
(not (member cmd api:read-only-queries)) ;; this is a write
(remote-server-url runremote) ;; have a server
(not (server:ping (remote-server-url runremote)))) ;; server has died. NOTE: this is not a cheap call! Need better approach.
;; (set! *runremote* (make-remote)) ;; WARNING - broken this.
(remote-force-server-set! runremote (common:force-server?))
;; (mutex-unlock! rmt-mutex)
(debug:print-info 12 log-port "rmt:send-receive, case 6")
(rmt:send-receive-orig log-port runremote rmt-mutex toppath multi-sync-mutex cmd rid params attemptnum: attemptnum))
(rmt:send-receive-orig log-port runremote rmt-mutex toppath multi-sync-mutex cmd rid params alldat attemptnum: attemptnum))
;; on homehost and this is a write, we already have a server
((and (not (remote-force-server runremote)) ;; honor forced use of server, i.e. server NOT required
(cdr (remote-hh-dat runremote)) ;; on homehost
(not (member cmd api:read-only-queries)) ;; this is a write
(remote-server-url runremote)) ;; have a server
;;(mutex-unlock! rmt-mutex)
(debug:print-info 12 log-port "rmt:send-receive, case 4.1")
(rmt:open-qry-close-locally log-port multi-sync-mutex cmd 0 params ro-queries: api:read-only-queries))
(rmt:open-qry-close-locally log-port multi-sync-mutex cmd 0 params alldat))
;; on homehost, no server contact made and this is a write, passively start a server
((and (not (remote-force-server runremote)) ;; honor forced use of server, i.e. server NOT required
(cdr (remote-hh-dat runremote)) ;; have homehost
(not (remote-server-url runremote)) ;; no connection yet
(not (member cmd api:read-only-queries))) ;; not a read-only query
(debug:print-info 12 log-port "rmt:send-receive, case 8")
(let ((server-url (server:check-if-running toppath))) ;; (server:read-dotserver->url toppath))) ;; (server:check-if-running toppath))) ;; Do NOT want to run server:check-if-running - very expensive to do for every write call
(if server-url
(remote-server-url-set! runremote server-url) ;; the string can be consumed by the client setup if needed
(if (common:force-server?)
(server:start-and-wait toppath)
(server:kind-run toppath))))
(remote-force-server-set! runremote (common:force-server?))
;; (mutex-unlock! rmt-mutex)
(debug:print-info 12 log-port "rmt:send-receive, case 8.1")
(rmt:open-qry-close-locally log-port multi-sync-mutex cmd 0 params ro-queries: api:read-only-queries))
(rmt:open-qry-close-locally log-port multi-sync-mutex cmd 0 params alldat))
((or (and (remote-force-server runremote) ;; we are forcing a server and don't yet have a connection to one
(not (remote-conndat runremote)))
(and (not (cdr (remote-hh-dat runremote))) ;; not on a homehost
(not (remote-conndat runremote)))) ;; and no connection
(debug:print-info 12 log-port "rmt:send-receive, case 9, hh-dat: " (remote-hh-dat runremote) " conndat: " (remote-conndat runremote))
;;(mutex-unlock! rmt-mutex)
(if (not (server:check-if-running toppath)) ;; who knows, maybe one has started up?
(server:start-and-wait toppath))
(remote-conndat-set! runremote (rmt:get-connection-info runremote toppath)) ;; calls client:setup which calls client:setup-http
(rmt:send-receive-orig log-port runremote rmt-mutex toppath multi-sync-mutex cmd rid params attemptnum: attemptnum)) ;; TODO: add back-off timeout as
(rmt:send-receive-orig log-port runremote rmt-mutex toppath multi-sync-mutex cmd rid params alldat attemptnum: attemptnum)) ;; TODO: add back-off timeout as
;; all set up if get this far, dispatch the query
((and (not (remote-force-server runremote))
(cdr (remote-hh-dat runremote))) ;; we are on homehost
;;(mutex-unlock! rmt-mutex)
(debug:print-info 12 log-port "rmt:send-receive, case 10")
(rmt:open-qry-close-locally log-port multi-sync-mutex cmd (if rid rid 0) params ro-queries: api:read-only-queries))
(rmt:open-qry-close-locally log-port multi-sync-mutex cmd (if rid rid 0) params alldat))
;; not on homehost, do server query
(else (extras-case-11 log-port rmt-mutex runremote toppath cmd params attemptnum rid)))))
(else (extras-case-11 log-port rmt-mutex runremote toppath cmd params attemptnum rid alldat)))))
(define (extras-case-11 log-port rmt-mutex runremote areapath cmd params attemptnum rid)
(define (extras-case-11 log-port rmt-mutex runremote areapath cmd params attemptnum rid alldat)
;; (mutex-unlock! rmt-mutex)
(debug:print-info 12 log-port "rmt:send-receive, case 9")
;; (mutex-lock! rmt-mutex)
(let* ((conninfo (remote-conndat runremote))
(dat (case (remote-transport runremote)
((http) (condition-case ;; handling here has
;; caused a lot of
|
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
|
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
|
-
-
+
+
|
(remote-conndat-set! runremote #f) ;; NOTE: *runremote* is global copy of runremote. Purpose: factor out global.
(http-transport:close-connections area-dat: runremote)))
(debug:print-info 13 log-port "rmt:send-receive, case 9. conninfo=" conninfo " dat=" dat " runremote = " runremote)
;; (mutex-unlock! rmt-mutex)
(if success ;; success only tells us that the transport was
;; successful, have to examine the data to see if
;; there was a detected issue at the other end
(extras-transport-succeded log-port rmt-mutex attemptnum runremote areapath res params rid cmd)
(extras-transport-failed log-port rmt-mutex attemptnum runremote areapath cmd rid params)
(extras-transport-succeded log-port rmt-mutex attemptnum runremote areapath res params rid cmd alldat)
(extras-transport-failed log-port rmt-mutex attemptnum runremote areapath cmd rid params alldat)
)))
;; if a server is either running or in the process of starting call client:setup
;; else return #f to let the calling proc know that there is no server available
;;
(define (rmt:get-connection-info runremote areapath #!key (area-dat #f)) ;; TODO: push areapath down.
(let* (;; (runremote (or area-dat runremote))
|