Megatest

Diff
Login

Differences From Artifact [0582204ff9]:

To Artifact [6f19056a06]:


11
12
13
14
15
16
17

18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36







37

38
39
40
41
42
43
44
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

45
46
47
48
49
50
51
52







+



















+
+
+
+
+
+
+
-
+








(use json format)

(declare (unit rmt))
(declare (uses api))
(declare (uses tdb))
(declare (uses http-transport))
(declare (uses nmsg-transport))

;;
;; THESE ARE ALL CALLED ON THE CLIENT SIDE!!!
;;

;; ;; For debugging add the following to ~/.megatestrc
;;
;; (require-library trace)
;; (import trace)
;; (trace
;; rmt:send-receive
;; api:execute-requests
;; )


;;======================================================================
;;  S U P P O R T   F U N C T I O N S
;;======================================================================

(define (rmt:call-transport run-id connection-info cmd jparams)
  (case (server:get-transport)
    ((rpc)  ( rpc-transport:client-api-send-receive run-id connection-info cmd jparams))
    ((http) (http-transport:client-api-send-receive run-id connection-info cmd jparams))
    ((fs)   ( fs-transport:client-api-send-receive run-id connection-info cmd jparams))
    ((zmq)  (zmq-transport:client-api-send-receive run-id connection-info cmd jparams))
    (else   ( rpc-transport:client-api-send-receive run-id connection-info cmd jparams))))
;; #t means - please start a server!

;;
(define (rmt:write-frequency-over-limit? cmd run-id)
  (and (not (member cmd api:read-only-queries))
       (let* ((tmprec (hash-table-ref/default *write-frequency* run-id #f))
	      (record (if tmprec tmprec 
			  (let ((v (vector (current-seconds) 0)))
			    (hash-table-set! *write-frequency* run-id v)
61
62
63
64
65
66
67
68

69
70
71

72
73
74

75
76
77
78
79
80
81
82









83
84

85
86

87

88
89


90
91
92








93
94
95
96
97

98
99
100
101



102
103
104

105
106



107





108
109
110
111
112
113


114

115

116
117


118


119
120







121
122
123
124
125
126
127
69
70
71
72
73
74
75

76



77
78
79

80
81
82
83





84
85
86
87
88
89
90
91
92
93
94
95
96

97

98
99
100
101
102



103
104
105
106
107
108
109
110
111
112



113




114
115
116



117
118

119
120
121
122
123
124
125
126
127
128
129
130
131


132
133
134
135

136
137
138
139
140
141
142
143


144
145
146
147
148
149
150
151
152
153
154
155
156
157







-
+
-
-
-
+


-
+



-
-
-
-
-
+
+
+
+
+
+
+
+
+


+

-
+
-
+


+
+
-
-
-
+
+
+
+
+
+
+
+


-
-
-
+
-
-
-
-
+
+
+
-
-
-
+

-
+
+
+

+
+
+
+
+




-
-
+
+

+
-
+


+
+

+
+
-
-
+
+
+
+
+
+
+







	cinfo
	;; NB// can cache the answer for server running for 10 seconds ...
	;;  ;; (and (not (rmt:write-frequency-over-limit? cmd run-id))
	(if (tasks:server-running-or-starting? (db:delay-if-busy (tasks:open-db)) run-id)
	    (client:setup run-id)
	    #f))))

;; cmd is a symbol
(define *send-receive-mutex* (make-mutex)) ;; should have separate mutex per run-id
;; vars is a json string encoding the parameters for the call
;;
(define (rmt:send-receive cmd rid params #!key (attemptnum 0))
(define (rmt:send-receive cmd rid params #!key (attemptnum 1)) ;; start attemptnum at 1 so the modulo below works as expected
  ;; clean out old connections
  (mutex-lock! *db-multi-sync-mutex*)
  (let ((expire-time (- (current-seconds) 60)))
  (let ((expire-time (- (current-seconds) (server:get-timeout) 10))) ;; don't forget the 10 second margin
    (for-each 
     (lambda (run-id)
       (let ((connection (hash-table-ref/default *runremote* run-id #f)))
	 (if (and connection 
		  (< (http-transport:server-dat-get-last-access connection) expire-time))
	     (begin
	       (debug:print-info 0 "Discarding connection to server for run-id " run-id ", too long between accesses")
	       (hash-table-delete! *runremote* run-id)))))
         (if (and connection 
        	  (< (http-transport:server-dat-get-last-access connection) expire-time))
             (begin
               (debug:print-info 0 "Discarding connection to server for run-id " run-id ", too long between accesses")
               ;; SHOULD CLOSE THE CONNECTION HERE
	       (case *transport-type*
		 ((nmsg)(nn-close (http-transport:server-dat-get-socket 
				   (hash-table-ref *runremote* run-id)))))
               (hash-table-delete! *runremote* run-id)))))
     (hash-table-keys *runremote*)))
  (mutex-unlock! *db-multi-sync-mutex*)
  ;; (mutex-lock! *send-receive-mutex*)
  (let* ((run-id          (if rid rid 0))
	 (connection-info (rmt:get-connection-info run-id))
	 (connection-info (rmt:get-connection-info run-id)))
	 (jparams         (db:obj->string params)))
    ;; the nmsg method does the encoding under the hood (the http method should be changed to do this also)
    (if connection-info
	;; use the server if have connection info
	(let* ((dat     (case *transport-type*
			  ((http)(condition-case
	(let* ((dat     (http-transport:client-api-send-receive run-id connection-info cmd jparams))
	       (res     (if (and dat (vector? dat)) (vector-ref dat 1) #f))
	       (success (if (and dat (vector? dat)) (vector-ref dat 0) #f)))
				  (http-transport:client-api-send-receive run-id connection-info cmd params)
				  ((commfail)(vector #f "communications fail"))))
			  ((nmsg)(condition-case
				  (nmsg-transport:client-api-send-receive run-id connection-info cmd params)
				  ((timeout)(vector #f "timeout talking to server"))))
			  (else  (exit))))
	       (success (if (and dat (vector? dat)) (vector-ref dat 0) #f))
	       (res     (if (and dat (vector? dat)) (vector-ref dat 1) #f)))
	  (http-transport:server-dat-update-last-access connection-info)
	  (if success
	      (db:string->obj res)
	      ;; (if (< attemptnum 100)
	      ;;     (begin
	      (begin
	      ;;       (hash-table-delete! *runremote* run-id)
	      ;;       (thread-sleep! 0.5)
	      ;;       (rmt:send-receive cmd rid params attempnum: (+ attemptnum 1)))
	      ;;     (begin
		;; (mutex-unlock! *send-receive-mutex*)
		(case *transport-type* 
		  ((http) res) ;; (db:string->obj res))
	      ;;       (print-call-chain (current-error-port))
	      ;;       (debug:print 0 "ERROR: too many attempts to communicate have failed. Giving up. Kill your mtest processes and start over")
	      ;;       (exit 1)))))
		  ((nmsg) res))) ;; (vector-ref res 1)))
	      (begin ;; let ((new-connection-info (client:setup run-id)))
		(debug:print 0 "WARNING: Communication failed, trying call to http-transport:client-api-send-receive again.")
		(debug:print 0 "WARNING: Communication failed, trying call to rmt:send-receive again.")
		;; (case *transport-type*
		;;   ((nmsg)(nn-close (http-transport:server-dat-get-socket connection-info))))
		(hash-table-delete! *runremote* run-id) ;; don't keep using the same connection
		(if (eq? (modulo attemptnum 5) 0)
		    (tasks:kill-server-run-id run-id tag: "api-send-receive-failed"))
		;; (mutex-unlock! *send-receive-mutex*) ;; close the mutex here to allow other threads access to communications
		(tasks:start-and-wait-for-server (tasks:open-db) run-id 15)
		;; (nmsg-transport:client-api-send-receive run-id connection-info cmd param remtries: (- remtries 1))))))

		;; no longer killing the server in http-transport:client-api-send-receive
		;; may kill it here but what are the criteria?
		;; start with three calls then kill server
		(if (eq? attemptnum 3)(tasks:kill-server-run-id run-id))
		(thread-sleep! 2)
		;; (if (eq? attemptnum 3)(tasks:kill-server-run-id run-id))
		;; (thread-sleep! 2)
		(rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1)))))
	;; no connection info? try to start a server
	(if (and (< attemptnum 10)
	(if (and (< attemptnum 15)
		 (tasks:need-server run-id))
	    (begin
	      (hash-table-delete! *runremote* run-id)
	      ;; (mutex-unlock! *send-receive-mutex*)
	      (tasks:start-and-wait-for-server (db:delay-if-busy (tasks:open-db)) run-id 10)
	      (client:setup run-id)
	      (thread-sleep! (random 5)) ;; give some time to settle and minimize collison?
	      (rmt:send-receive cmd rid params (+ attemptnum 1)))
	    (rmt:open-qry-close-locally cmd run-id params)))))
	      (rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1)))
	    (begin
	      ;; (debug:print 0 "ERROR: Communication failed!")
	      ;; (mutex-unlock! *send-receive-mutex*)
	      ;; (exit)
	      (rmt:open-qry-close-locally cmd run-id params)
	      )))))

(define (rmt:update-db-stats run-id rawcmd params duration)
  (mutex-lock! *db-stats-mutex*)
  (handle-exceptions
   exn
   (begin
     (debug:print 0 "WARNING: stats collection failed in update-db-stats")
182
183
184
185
186
187
188
189


190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210









211
212
213
214
215
216
217
212
213
214
215
216
217
218

219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234







235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250







-
+
+














-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+







			     (let* ((dbdir (conc    (configf:lookup *configdat* "setup" "linktree") "/.db"))
				    (db (make-dbr:dbstruct path:  dbdir local: #t)))
			       (set! *dbstruct-db* db)
			       db)))
	 (db-file-path   (db:dbfile-path 0)))
    ;; (read-only      (not (file-read-access? db-file-path)))
    (let* ((start         (current-milliseconds))
	   (res           (api:execute-requests dbstruct-local (symbol->string cmd) params))
	   (resdat        (api:execute-requests dbstruct-local (vector (symbol->string cmd) params)))
	   (res           (vector-ref resdat 1))
	   (duration      (- (current-milliseconds) start)))
      (rmt:update-db-stats run-id cmd params duration)
      ;; mark this run as dirty if this was a write
      (if (not (member cmd api:read-only-queries))
	  (let ((start-time (current-seconds)))
	    (mutex-lock! *db-multi-sync-mutex*)
	    ;; (if (not (hash-table-ref/default *db-local-sync* run-id #f))
	    ;; just set it every time. Is a write more expensive than a read and does it matter?
	    (hash-table-set! *db-local-sync* (or run-id 0) start-time) ;; the oldest "write"
	    (mutex-unlock! *db-multi-sync-mutex*)))
      res)))

(define (rmt:send-receive-no-auto-client-setup connection-info cmd run-id params)
  (let* ((run-id   (if run-id run-id 0))
	 (jparams  (db:obj->string params)) ;; (rmt:dat->json-str params))
	 (dat      (http-transport:client-api-send-receive run-id connection-info cmd jparams)))
    (if (and dat (vector-ref dat 0))
	(db:string->obj (vector-ref dat 1))
	(begin
	  (debug:print 0 "ERROR: rmt:send-receive-no-auto-client-setup failed, attempting to continue. Got " dat)
	  dat))))
	 ;; (jparams  (db:obj->string params)) ;; (rmt:dat->json-str params))
	 (res  	   (http-transport:client-api-send-receive run-id connection-info cmd params)))
    (if (and res (vector-ref res 0))
	res
	#f)))
;; 	(db:string->obj (vector-ref dat 1))
;; 	(begin
;; 	  (debug:print 0 "ERROR: rmt:send-receive-no-auto-client-setup failed, attempting to continue. Got " dat)
;; 	  dat))))

;; Wrap json library for strings (why the ports crap in the first place?)
(define (rmt:dat->json-str dat)
  (with-output-to-string 
    (lambda ()
      (json-write dat))))

240
241
242
243
244
245
246

247
248

249
250



251
252
253
254
255
256
257
273
274
275
276
277
278
279
280
281
282
283


284
285
286
287
288
289
290
291
292
293







+


+
-
-
+
+
+







;;  M I S C
;;======================================================================

(define (rmt:login run-id)
  (rmt:send-receive 'login run-id (list *toppath* megatest-version run-id *my-client-signature*)))

;; This login does no retries under the hood - it acts a bit like a ping.
;; Deprecated for nmsg-transport.
;;
(define (rmt:login-no-auto-client-setup connection-info run-id)
  (case *transport-type*
  (rmt:send-receive-no-auto-client-setup connection-info 'login run-id (list *toppath* megatest-version run-id *my-client-signature*)))
  
    ((http)(rmt:send-receive-no-auto-client-setup connection-info 'login run-id (list *toppath* megatest-version run-id *my-client-signature*)))
    ((nmsg)(nmsg-transport:client-api-send-receive run-id connection-info 'login (list *toppath* megatest-version run-id *my-client-signature*)))))

;; hand off a call to one of the db:queries statements
;; added run-id to make looking up the correct db possible 
;;
(define (rmt:general-call stmtname run-id . params)
  (rmt:send-receive 'general-call run-id (append (list stmtname run-id) params)))

(define (rmt:sync-inmem->db run-id)