22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
-
-
-
-
-
-
-
-
-
-
+
+
+
-
-
+
+
|
;; THESE ARE ALL CALLED ON THE CLIENT SIDE!!!
;;
;; generate entries for ~/.megatestrc with the following
;;
;; grep define ../rmt.scm | grep rmt: |perl -pi -e 's/\(define\s+\((\S+)\W.*$/\1/'|sort -u
(defstruct remote
(hh-dat (common:get-homehost)) ;; homehost record ( addr . hhflag )
(server-url (if *toppath* (server:read-dotserver *toppath*))) ;; (server:check-if-running *toppath*) #f))
(last-server-check 0) ;; last time we checked to see if the server was alive
(conndat #f)
(transport *transport-type*)
(server-timeout (or (server:get-timeout) 100))) ;; default to 100 seconds
;;======================================================================
;; S U P P O R T F U N C T I O N S
;;======================================================================
;; if a server is either running or in the process of starting call client:setup
;; else return #f to let the calling proc know that there is no server available
;;
(define (rmt:get-connection-info run-id)
(let ((cinfo (remote-conndat *runremote*)))
(define (rmt:get-connection-info areapath) ;; TODO: push areapath down.
(let ((cinfo (remote-conndat *runremote*))
(run-id 0))
(if cinfo
cinfo
(if (tasks:server-running-or-starting? (db:delay-if-busy (tasks:open-db)) run-id)
(client:setup run-id)
(if (server:check-if-running areapath)
(client:setup areapath)
#f))))
(define *send-receive-mutex* (make-mutex)) ;; should have separate mutex per run-id
;; RA => e.g. usage (rmt:send-receive 'get-var #f (list varname))
;;
(define (rmt:send-receive cmd rid params #!key (attemptnum 1)) ;; start attemptnum at 1 so the modulo below works as expected
|
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
|
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
|
-
+
-
+
+
+
+
+
+
+
+
+
+
+
+
-
-
+
+
+
-
-
-
-
-
-
+
-
-
-
+
+
+
-
-
+
-
-
-
-
-
-
+
+
+
+
-
-
-
-
-
-
-
+
-
-
-
+
+
+
+
+
+
-
+
-
+
+
+
+
+
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
|
;; give up if more than 15 attempts
((> attemptnum 15)
(debug:print 0 *default-log-port* "ERROR: 15 tries to start/connect to server. Giving up.")
(exit 1))
;; reset the connection if it has been unused too long
((and *runremote*
(remote-conndat *runremote*)
(let ((expire-time (- start-time (remote-server-timeout *runremote*))))
(let ((expire-time (+ (- start-time (remote-server-timeout *runremote*))(random 30)))) ;; add 30 seconds of noise so that not all running tests expire at the same time causing a storm of server starts
(< (http-transport:server-dat-get-last-access (remote-conndat *runremote*)) expire-time)))
(debug:print-info 12 *default-log-port* "rmt:send-receive, case 8")
(remote-conndat-set! *runremote* #f)
(mutex-unlock! *rmt-mutex*)
(rmt:send-receive cmd rid params attemptnum: attemptnum))
;; ensure we have a record for our connection for given area
((not *runremote*)
(set! *runremote* (make-remote))
(mutex-unlock! *rmt-mutex*)
(debug:print-info 12 *default-log-port* "rmt:send-receive, case 1")
(rmt:send-receive cmd rid params attemptnum: attemptnum))
;; ensure we have a homehost record
((not (pair? (remote-hh-dat *runremote*))) ;; have a homehost record?
((not (pair? (remote-hh-dat *runremote*))) ;; not on homehost
(thread-sleep! 0.1) ;; since we shouldn't get here, delay a little
(remote-hh-dat-set! *runremote* (common:get-homehost))
(mutex-unlock! *rmt-mutex*)
(debug:print-info 12 *default-log-port* "rmt:send-receive, case 2")
(rmt:send-receive cmd rid params attemptnum: attemptnum))
;; on homehost and this is a read
((and (cdr (remote-hh-dat *runremote*)) ;; on homehost
(member cmd api:read-only-queries)) ;; this is a read
(mutex-unlock! *rmt-mutex*)
(debug:print-info 12 *default-log-port* "rmt:send-receive, case 3")
(rmt:open-qry-close-locally cmd 0 params))
;; on homehost and this is a write, we already have a server, but server has died
((and (cdr (remote-hh-dat *runremote*)) ;; on homehost
(not (member cmd api:read-only-queries)) ;; this is a write
(remote-server-url *runremote*) ;; have a server
(not (server:check-if-running *toppath*))) ;; server has died.
(set! *runremote* (make-remote))
(mutex-unlock! *rmt-mutex*)
(debug:print-info 12 *default-log-port* "rmt:send-receive, case 4.1")
(rmt:send-receive cmd rid params attemptnum: attemptnum))
;; on homehost and this is a write, we already have a server
((and (cdr (remote-hh-dat *runremote*)) ;; on homehost
(not (member cmd api:read-only-queries)) ;; this is a write
(remote-server-url *runremote*)) ;; have a server
(mutex-unlock! *rmt-mutex*)
(debug:print-info 12 *default-log-port* "rmt:send-receive, case 4")
(rmt:open-qry-close-locally cmd 0 params))
;; on homehost and this is a write, we have a server (we know because case 4 checked)
((and (cdr (remote-hh-dat *runremote*)) ;; on homehost
;; on homehost, no server contact made and this is a write, passively start a server
((and (cdr (remote-hh-dat *runremote*)) ; new
(not (member cmd api:read-only-queries)))
(mutex-unlock! *rmt-mutex*)
(debug:print-info 12 *default-log-port* "rmt:send-receive, case 4.1")
(rmt:open-qry-close-locally cmd 0 params))
;; no server contact made and this is a write, passively start a server
((and (not (remote-server-url *runremote*))
(not (remote-server-url *runremote*))
(not (member cmd api:read-only-queries)))
(debug:print-info 12 *default-log-port* "rmt:send-receive, case 5")
(let ((serverconn (server:read-dotserver *toppath*))) ;; (server:check-if-running *toppath*))) ;; Do NOT want to run server:check-if-running - very expensive to do for every write call
(if serverconn
(remote-server-url-set! *runremote* serverconn) ;; the string can be consumed by the client setup if needed
(let ((server-url (server:check-if-running *toppath*))) ;; (server:read-dotserver->url *toppath*))) ;; (server:check-if-running *toppath*))) ;; Do NOT want to run server:check-if-running - very expensive to do for every write call
(if server-url
(remote-server-url-set! *runremote* server-url) ;; the string can be consumed by the client setup if needed
(if (not (server:start-attempted? *toppath*))
(server:kind-run *toppath*))))
(server:kind-run *toppath*)))
(if (cdr (remote-hh-dat *runremote*)) ;; we are on the homehost, just do the call
(begin
(mutex-unlock! *rmt-mutex*)
(debug:print-info 12 *default-log-port* "rmt:send-receive, case 5.1")
(rmt:open-qry-close-locally cmd 0 params))
(begin ;; not on homehost, start server and wait
(mutex-unlock! *rmt-mutex*)
(debug:print-info 12 *default-log-port* "rmt:send-receive, case 5.1")
(rmt:open-qry-close-locally cmd 0 params))
(mutex-unlock! *rmt-mutex*)
(debug:print-info 12 *default-log-port* "rmt:send-receive, case 5.2")
(tasks:start-and-wait-for-server (tasks:open-db) 0 15)
(rmt:send-receive cmd rid params attemptnum: attemptnum))))
;; if not on homehost ensure we have a connection to a live server
;; NOTE: we *have* a homehost record by now
((and (not (cdr (remote-hh-dat *runremote*))) ;; are we on a homehost?
((and (not (cdr (remote-hh-dat *runremote*))) ;; not on a homehost
(not (remote-conndat *runremote*))) ;; and no connection
(debug:print-info 12 *default-log-port* "rmt:send-receive, case 6 hh-dat: " (remote-hh-dat *runremote*) " conndat: " (remote-conndat *runremote*))
(mutex-unlock! *rmt-mutex*)
(tasks:start-and-wait-for-server (tasks:open-db) 0 15)
(remote-conndat-set! *runremote* (rmt:get-connection-info 0)) ;; calls client:setup which calls client:setup-http
(rmt:send-receive cmd rid params attemptnum: attemptnum))
(server:start-and-wait *toppath*)
(remote-conndat-set! *runremote* (rmt:get-connection-info *toppath*)) ;; calls client:setup which calls client:setup-http
(rmt:send-receive cmd rid params attemptnum: attemptnum)) ;; TODO: add back-off timeout as
;; all set up if get this far, dispatch the query
((cdr (remote-hh-dat *runremote*)) ;; we are on homehost
(mutex-unlock! *rmt-mutex*)
(debug:print-info 12 *default-log-port* "rmt:send-receive, case 7")
(rmt:open-qry-close-locally cmd (if rid rid 0) params))
;; not on homehost, do server query
(else
(mutex-unlock! *rmt-mutex*)
(debug:print-info 12 *default-log-port* "rmt:send-receive, case 9")
(mutex-lock! *rmt-mutex*)
(let* ((conninfo (remote-conndat *runremote*))
(dat (case (remote-transport *runremote*)
((http) (condition-case ;; handling here has caused a lot of problems. However it is needed to deal with attemtped communication to servers that have gone away
(http-transport:client-api-send-receive 0 conninfo cmd params)
((commfail)(vector #f "communications fail"))
((exn)(vector #f "other fail" (print-call-chain)))))
(else
(debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported")
(exit))))
(success (if (vector? dat) (vector-ref dat 0) #f))
(res (if (vector? dat) (vector-ref dat 1) #f)))
(if (vector? conninfo)(http-transport:server-dat-update-last-access conninfo)) ;; refresh access time
;; (mutex-unlock! *rmt-mutex*)
(debug:print-info 12 *default-log-port* "rmt:send-receive, case 9. conninfo=" conninfo " dat=" dat)
(debug:print-info 12 *default-log-port* "rmt:send-receive, case 9. conninfo=" conninfo " dat=" dat " *runremote* = "*runremote*)
(if success
(case (remote-transport *runremote*)
((http) res)
((http)
(mutex-unlock! *rmt-mutex*)
res)
(else
(debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " is unknown")
(mutex-unlock! *rmt-mutex*)
(exit 1)))
(begin
(debug:print 0 *default-log-port* "WARNING: communication failed. Trying again, try num: " attemptnum)
(remote-conndat-set! *runremote* #f)
(remote-server-url-set! *runremote* #f)
(debug:print-info 12 *default-log-port* "rmt:send-receive, case 9.1")
(mutex-unlock! *rmt-mutex*)
(tasks:start-and-wait-for-server (tasks:open-db) 0 15)
(server:start-and-wait *toppath*)
(rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1)))))))))
(define (rmt:update-db-stats run-id rawcmd params duration)
(mutex-lock! *db-stats-mutex*)
(handle-exceptions
exn
(begin
(debug:print 0 *default-log-port* "WARNING: stats collection failed in update-db-stats")
(debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn))
(print "exn=" (condition->list exn))
#f) ;; if this fails we don't care, it is just stats
(let* ((cmd (conc "run-id=" run-id " " (if (eq? rawcmd 'general-call) (car params) rawcmd)))
(stat-vec (hash-table-ref/default *db-stats* cmd #f)))
(if (not (vector? stat-vec))
(let ((newvec (vector 0 0)))
(hash-table-set! *db-stats* cmd newvec)
(set! stat-vec newvec)))
(vector-set! stat-vec 0 (+ (vector-ref stat-vec 0) 1))
(vector-set! stat-vec 1 (+ (vector-ref stat-vec 1) duration))))
(mutex-unlock! *db-stats-mutex*))
;; (define (rmt:update-db-stats run-id rawcmd params duration)
;; (mutex-lock! *db-stats-mutex*)
;; (handle-exceptions
;; exn
;; (begin
;; (debug:print 0 *default-log-port* "WARNING: stats collection failed in update-db-stats")
;; (debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn))
;; (print "exn=" (condition->list exn))
;; #f) ;; if this fails we don't care, it is just stats
;; (let* ((cmd (conc "run-id=" run-id " " (if (eq? rawcmd 'general-call) (car params) rawcmd)))
;; (stat-vec (hash-table-ref/default *db-stats* cmd #f)))
;; (if (not (vector? stat-vec))
;; (let ((newvec (vector 0 0)))
;; (hash-table-set! *db-stats* cmd newvec)
;; (set! stat-vec newvec)))
;; (vector-set! stat-vec 0 (+ (vector-ref stat-vec 0) 1))
;; (vector-set! stat-vec 1 (+ (vector-ref stat-vec 1) duration))))
;; (mutex-unlock! *db-stats-mutex*))
(define (rmt:print-db-stats)
(let ((fmtstr "~40a~7-d~9-d~20,2-f")) ;; "~20,2-f"
(debug:print 18 *default-log-port* "DB Stats\n========")
(debug:print 18 *default-log-port* (format #f "~40a~8a~10a~10a" "Cmd" "Count" "TotTime" "Avg"))
(for-each (lambda (cmd)
(let ((cmd-dat (hash-table-ref *db-stats* cmd)))
|
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
|
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
|
-
+
|
#f))
(begin
;; (rmt:update-db-stats run-id cmd params duration)
;; mark this run as dirty if this was a write, the watchdog is responsible for syncing it
(if qry-is-write
(let ((start-time (current-seconds)))
(mutex-lock! *db-multi-sync-mutex*)
(set! *db-last-write* start-time) ;; the oldest "write"
(set! *db-last-access* start-time) ;; THIS IS PROBABLY USELESS? (we are on a client)
(mutex-unlock! *db-multi-sync-mutex*)))))
res))
(define (rmt:send-receive-no-auto-client-setup connection-info cmd run-id params)
(let* ((run-id (if run-id run-id 0))
(res (handle-exceptions
exn
|
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
|
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
|
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
|
;; hand off a call to one of the db:queries statements
;; added run-id to make looking up the correct db possible
;;
(define (rmt:general-call stmtname run-id . params)
(rmt:send-receive 'general-call run-id (append (list stmtname run-id) params)))
;; given a hostname, return a pair of cpu load and update time representing latest intelligence from tests running on that host
(define (rmt:get-latest-host-load hostname)
(rmt:send-receive 'get-latest-host-load 0 (list hostname)))
;; (define (rmt:sync-inmem->db run-id)
;; (rmt:send-receive 'sync-inmem->db run-id '()))
(define (rmt:sdb-qry qry val run-id)
;; add caching if qry is 'getid or 'getstr
(rmt:send-receive 'sdb-qry run-id (list qry val)))
;; NOT COMPLETED
(define (rmt:runtests user run-id testpatt params)
(rmt:send-receive 'runtests run-id testpatt))
;;======================================================================
;; T E S T M E T A
;;======================================================================
(define (rmt:get-tests-tags)
(rmt:send-receive 'get-tests-tags #f '()))
;;======================================================================
;; K E Y S
;;======================================================================
;; These require run-id because the values come from the run!
;;
(define (rmt:get-key-val-pairs run-id)
(rmt:send-receive 'get-key-val-pairs run-id (list run-id)))
(define (rmt:get-keys)
(if *db-keys* *db-keys*
(let ((res (rmt:send-receive 'get-keys #f '())))
(set! *db-keys* res)
res)))
(define (rmt:get-keys-write) ;; dummy query to force server start
(let ((res (rmt:send-receive 'get-keys-write #f '())))
(set! *db-keys* res)
res))
;; we don't reuse run-id's (except possibly *after* a db cleanup) so it is safe
;; to cache the resuls in a hash
;;
(define (rmt:get-key-vals run-id)
(or (hash-table-ref/default *keyvals* run-id #f)
(let ((res (rmt:send-receive 'get-key-vals #f (list run-id))))
|