11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
(use json format)
(declare (unit rmt))
(declare (uses api))
(declare (uses tdb))
(declare (uses http-transport))
;;
;; THESE ARE ALL CALLED ON THE CLIENT SIDE!!!
;;
;; ;; For debugging add the following to ~/.megatestrc
;;
|
>
|
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
(use json format)
(declare (unit rmt))
(declare (uses api))
(declare (uses tdb))
(declare (uses http-transport))
(declare (uses nmsg-transport))
;;
;; THESE ARE ALL CALLED ON THE CLIENT SIDE!!!
;;
;; ;; For debugging add the following to ~/.megatestrc
;;
|
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
cinfo
;; NB// can cache the answer for server running for 10 seconds ...
;; ;; (and (not (rmt:write-frequency-over-limit? cmd run-id))
(if (tasks:server-running-or-starting? (db:delay-if-busy (tasks:open-db)) run-id)
(client:setup run-id)
#f))))
;; cmd is a symbol
;; vars is a json string encoding the parameters for the call
;;
(define (rmt:send-receive cmd rid params #!key (attemptnum 0))
;; clean out old connections
(mutex-lock! *db-multi-sync-mutex*)
(let ((expire-time (- (current-seconds) 60)))
(for-each
(lambda (run-id)
(let ((connection (hash-table-ref/default *runremote* run-id #f)))
(if (and connection
(< (http-transport:server-dat-get-last-access connection) expire-time))
(begin
(debug:print-info 0 "Discarding connection to server for run-id " run-id ", too long between accesses")
(hash-table-delete! *runremote* run-id)))))
(hash-table-keys *runremote*)))
(mutex-unlock! *db-multi-sync-mutex*)
(let* ((run-id (if rid rid 0))
(connection-info (rmt:get-connection-info run-id))
(jparams (db:obj->string params)))
(if connection-info
;; use the server if have connection info
(let* ((dat (http-transport:client-api-send-receive run-id connection-info cmd jparams))
(res (if (and dat (vector? dat)) (vector-ref dat 1) #f))
(success (if (and dat (vector? dat)) (vector-ref dat 0) #f)))
(http-transport:server-dat-update-last-access connection-info)
(if success
(db:string->obj res)
;; (if (< attemptnum 100)
;; (begin
;; (hash-table-delete! *runremote* run-id)
;; (thread-sleep! 0.5)
;; (rmt:send-receive cmd rid params attempnum: (+ attemptnum 1)))
;; (begin
;; (print-call-chain (current-error-port))
;; (debug:print 0 "ERROR: too many attempts to communicate have failed. Giving up. Kill your mtest processes and start over")
;; (exit 1)))))
(begin ;; let ((new-connection-info (client:setup run-id)))
(debug:print 0 "WARNING: Communication failed, trying call to http-transport:client-api-send-receive again.")
(hash-table-delete! *runremote* run-id) ;; don't keep using the same connection
;; no longer killing the server in http-transport:client-api-send-receive
;; may kill it here but what are the criteria?
;; start with three calls then kill server
(if (eq? attemptnum 3)(tasks:kill-server-run-id run-id))
(thread-sleep! 2)
(rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1)))))
(if (and (< attemptnum 10)
(tasks:need-server run-id))
(begin
(tasks:start-and-wait-for-server (db:delay-if-busy (tasks:open-db)) run-id 10)
(rmt:send-receive cmd rid params (+ attemptnum 1)))
(rmt:open-qry-close-locally cmd run-id params)))))
(define (rmt:update-db-stats run-id rawcmd params duration)
(mutex-lock! *db-stats-mutex*)
(handle-exceptions
exn
(begin
(debug:print 0 "WARNING: stats collection failed in update-db-stats")
|
<
<
<
|
|
|
|
|
|
|
|
|
>
|
|
|
<
>
>
|
>
>
>
>
|
|
>
|
|
<
<
<
<
<
<
<
<
>
>
>
>
>
>
|
|
>
|
>
>
>
|
>
>
>
|
>
|
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
|
cinfo
;; NB// can cache the answer for server running for 10 seconds ...
;; ;; (and (not (rmt:write-frequency-over-limit? cmd run-id))
(if (tasks:server-running-or-starting? (db:delay-if-busy (tasks:open-db)) run-id)
(client:setup run-id)
#f))))
(define (rmt:send-receive cmd rid params #!key (attemptnum 1)) ;; start attemptnum at 1 so the modulo below works as expected
;; clean out old connections
(mutex-lock! *db-multi-sync-mutex*)
;; (let ((expire-time (- (current-seconds) 60)))
;; (for-each
;; (lambda (run-id)
;; (let ((connection (hash-table-ref/default *runremote* run-id #f)))
;; (if (and connection
;; (< (http-transport:server-dat-get-last-access connection) expire-time))
;; (begin
;; (debug:print-info 0 "Discarding connection to server for run-id " run-id ", too long between accesses")
;; ;; SHOULD CLOSE THE CONNECTION HERE
;; (hash-table-delete! *runremote* run-id)))))
;; (hash-table-keys *runremote*)))
(mutex-unlock! *db-multi-sync-mutex*)
(let* ((run-id (if rid rid 0))
(connection-info (rmt:get-connection-info run-id)))
;; the nmsg method does the encoding under the hood (the http method should be changed to do this also)
(if connection-info
;; use the server if have connection info
(let* ((dat (case *transport-type*
((http)(http-transport:client-api-send-receive run-id connection-info cmd params))
((nmsg)(condition-case
(nmsg-transport:client-api-send-receive run-id connection-info cmd params)
((timeout)(vector #f "timeout talking to server"))))
(else (exit))))
(success (if (and dat (vector? dat)) (vector-ref dat 0) #f))
(res (if (and dat (vector? dat)) (vector-ref dat 1) #f)))
(http-transport:server-dat-update-last-access connection-info)
(if success
(case *transport-type*
((http) res) ;; (db:string->obj res))
((nmsg) res)) ;; (vector-ref res 1)))
(begin ;; let ((new-connection-info (client:setup run-id)))
(debug:print 0 "WARNING: Communication failed, trying call to http-transport:client-api-send-receive again.")
;; (case *transport-type*
;; ((nmsg)(nn-close (http-transport:server-dat-get-socket connection-info))))
(hash-table-delete! *runremote* run-id) ;; don't keep using the same connection
(if (eq? (modulo attemptnum 5) 0)
(tasks:kill-server-run-id run-id tag: "api-send-receive-failed"))
(tasks:start-and-wait-for-server (tasks:open-db) run-id 15)
;; (nmsg-transport:client-api-send-receive run-id connection-info cmd param remtries: (- remtries 1))))))
;; no longer killing the server in http-transport:client-api-send-receive
;; may kill it here but what are the criteria?
;; start with three calls then kill server
;; (if (eq? attemptnum 3)(tasks:kill-server-run-id run-id))
;; (thread-sleep! 2)
(rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1)))))
;; no connection info? try to start a server
(if (and (< attemptnum 15)
(tasks:need-server run-id))
(begin
(hash-table-delete! *runremote* run-id)
(tasks:start-and-wait-for-server (db:delay-if-busy (tasks:open-db)) run-id 10)
(client:setup run-id)
(thread-sleep! (random 5)) ;; give some time to settle and minimize collison?
(rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1)))
(begin
(debug:print 0 "ERROR: Communication failed!")
(exit)
;; (rmt:open-qry-close-locally cmd run-id params))))
)))))
(define (rmt:update-db-stats run-id rawcmd params duration)
(mutex-lock! *db-stats-mutex*)
(handle-exceptions
exn
(begin
(debug:print 0 "WARNING: stats collection failed in update-db-stats")
|
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
|
(let* ((dbdir (conc (configf:lookup *configdat* "setup" "linktree") "/.db"))
(db (make-dbr:dbstruct path: dbdir local: #t)))
(set! *dbstruct-db* db)
db)))
(db-file-path (db:dbfile-path 0)))
;; (read-only (not (file-read-access? db-file-path)))
(let* ((start (current-milliseconds))
(res (api:execute-requests dbstruct-local (symbol->string cmd) params))
(duration (- (current-milliseconds) start)))
(rmt:update-db-stats run-id cmd params duration)
;; mark this run as dirty if this was a write
(if (not (member cmd api:read-only-queries))
(let ((start-time (current-seconds)))
(mutex-lock! *db-multi-sync-mutex*)
;; (if (not (hash-table-ref/default *db-local-sync* run-id #f))
|
|
>
|
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
|
(let* ((dbdir (conc (configf:lookup *configdat* "setup" "linktree") "/.db"))
(db (make-dbr:dbstruct path: dbdir local: #t)))
(set! *dbstruct-db* db)
db)))
(db-file-path (db:dbfile-path 0)))
;; (read-only (not (file-read-access? db-file-path)))
(let* ((start (current-milliseconds))
(resdat (api:execute-requests dbstruct-local (symbol->string cmd) params))
(res (vector-ref resdat 1))
(duration (- (current-milliseconds) start)))
(rmt:update-db-stats run-id cmd params duration)
;; mark this run as dirty if this was a write
(if (not (member cmd api:read-only-queries))
(let ((start-time (current-seconds)))
(mutex-lock! *db-multi-sync-mutex*)
;; (if (not (hash-table-ref/default *db-local-sync* run-id #f))
|
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
|
;; M I S C
;;======================================================================
(define (rmt:login run-id)
(rmt:send-receive 'login run-id (list *toppath* megatest-version run-id *my-client-signature*)))
;; This login does no retries under the hood - it acts a bit like a ping.
;;
(define (rmt:login-no-auto-client-setup connection-info run-id)
(rmt:send-receive-no-auto-client-setup connection-info 'login run-id (list *toppath* megatest-version run-id *my-client-signature*)))
;; hand off a call to one of the db:queries statements
;; added run-id to make looking up the correct db possible
;;
(define (rmt:general-call stmtname run-id . params)
(rmt:send-receive 'general-call run-id (append (list stmtname run-id) params)))
(define (rmt:sync-inmem->db run-id)
|
>
>
|
>
|
|
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
|
;; M I S C
;;======================================================================
(define (rmt:login run-id)
(rmt:send-receive 'login run-id (list *toppath* megatest-version run-id *my-client-signature*)))
;; This login does no retries under the hood - it acts a bit like a ping.
;; Deprecated for nmsg-transport.
;;
(define (rmt:login-no-auto-client-setup connection-info run-id)
(case *transport-type*
((http)(rmt:send-receive-no-auto-client-setup connection-info 'login run-id (list *toppath* megatest-version run-id *my-client-signature*)))
((nmsg)(nmsg-transport:client-api-send-receive run-id connection-info 'login (list *toppath* megatest-version run-id *my-client-signature*)))))
;; hand off a call to one of the db:queries statements
;; added run-id to make looking up the correct db possible
;;
(define (rmt:general-call stmtname run-id . params)
(rmt:send-receive 'general-call run-id (append (list stmtname run-id) params)))
(define (rmt:sync-inmem->db run-id)
|