Megatest

Check-in [a91d15ac06]
Login
Overview
Comment:rmt:send-receive -> tt:handler -> tcp -> api:tcp-dispatch-request -> api:dispatch-request and back implemented and compiles.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | v1.80-tcp-inmem
Files: files | file ages | folders
SHA1: a91d15ac06ff79f4a1b9d1e90bfb32d163b20460
User & Date: matt on 2023-02-19 10:37:41
Other Links: branch diff | manifest | tags
Context
2023-02-19
18:41
Basic communication working, ping, get-keys. check-in: e01a10845a user: matt tags: v1.80-tcp-inmem
10:37
rmt:send-receive -> tt:handler -> tcp -> api:tcp-dispatch-request -> api:dispatch-request and back implemented and compiles. check-in: a91d15ac06 user: matt tags: v1.80-tcp-inmem
2023-02-18
20:32
server registation and timeout working check-in: 743e63bc9e user: matt tags: v1.80-tcp-inmem
Changes

Modified api.scm from [f3cc459c57] to [c88d2a22c9].

221
222
223
224
225
226
227

























228
229
230
231
232
233
234
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259







+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+







               (ok-res . #t)))
	       (vector #f res))
             (begin
               #;(common:telemetry-log (conc "api-out:"(->string cmd))
               payload: `((params . ,params)
               (ok-res . #f)))
               (vector #t res))))))))

;; indat is (cmd run-id params meta)
(define (api:tcp-dispatch-request dbstruct indat) ;; cmd run-id params)
  (set! *api-process-request-count* (+ *api-process-request-count* 1))
  (match (deserialize indat)
    ((cmd run-id params meta)
     (let* ((status  (cond
		      ((> *api-process-request-count* 50) 'busy)
		      ((> *api-process-request-count* 25) 'loaded)
		      (else 'ok)))
	    (errmsg  (case status
		       ((busy)   (conc "Server overloaded, "*api-process-request-count*" threads in flight"))
		       ((loaded) (conc "Server loaded, "*api-process-request-count*" threads in flight"))
		       (else     #f)))
	    (result  (case status
		       ((busy) #f)
		       (else (api:dispatch-request dbstruct cmd run-id params))))
	    (payload (list status errmsg result '()))
	    (pdat    (serialize payload)))
       (set! *api-process-request-count* (- *api-process-request-count* 1))
       pdat))
    (else
     (let* ((msg (conc "(deserialize indat)="(deserialize indat)", indat="indat)))
       (assert #f "FATAL: failed to deserialize indat "msg)))))
       

(define (api:dispatch-request dbstruct cmd run-id params)
  (case cmd
    ;;===============================================
    ;; READ/WRITE QUERIES
    ;;===============================================

Modified megatest.scm from [a71b7bf85e] to [ae2b7cbe8a].

939
940
941
942
943
944
945
946

947
948
949
950
951
952
953
939
940
941
942
943
944
945

946
947
948
949
950
951
952
953







-
+







	   (dbfname    (args:get-arg "-db"))
	   (tl         (launch:setup)))
      (case (rmt:transport-mode)
	((http)(http-transport:launch))
	((tcp)
	 (debug:print 0 *default-log-port* "INFO: Running using tcp method.")
	 (if run-id
	     (tt:start-server tl run-id dbfname api:dispatch-request)
	     (tt:start-server tl run-id dbfname api:tcp-dispatch-request)
	     (begin
	       (debug:print 0 *default-log-port* "ERROR: transport mode is tcp - -run-id is required.")
	       (exit 1))))
	(else (debug:print 0 *default-log-port* "ERROR: rmt:transport-mode value not recognised "(rmt:transport-mode))))
      (set! *didsomething* #t)))

;; The adjutant is a bit different, it does NOT run (launch:setup) as it is not necessarily tied to

Modified rmt.scm from [52cc52478a] to [ba515dcb54].

82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104

105
106
107

108
109
110
111
112
113
114
82
83
84
85
86
87
88




89
90
91
92
93
94
95
96
97
98
99
100
101
102
103

104
105
106
107
108
109
110
111







-
-
-
-












+


-
+








(define *send-receive-mutex* (make-mutex)) ;; should have separate mutex per run-id

;; RA => e.g. usage (rmt:send-receive 'get-var #f (list varname))
;;
(define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f)) ;; start attemptnum at 1 so the modulo below works as expected

  #;(common:telemetry-log (conc "rmt:"(->string cmd))
                        payload: `((rid . ,rid)
                                   (params . ,params)))
  
  (if (> attemptnum 2)
      (debug:print 0 *default-log-port* "INFO: attemptnum in rmt:send-receive is " attemptnum))
  
  (cond
   ((> attemptnum 2) (thread-sleep! 0.05))
   ((> attemptnum 10) (thread-sleep! 0.5))
   ((> attemptnum 20) (thread-sleep! 1)))

  ;; I'm turning this off, it may make sense to move it
  ;; into http-transport-handler
  (if (and (> attemptnum 5) (= 0 (modulo attemptnum 15)))  
      (begin
	(debug:print 0 *default-log-port* "ERROR: can't connect to server, trying to start a server.")
	(server:run *toppath*)
	(thread-sleep! 3)))

  
  ;; 1. check if server is started IFF cmd is a write OR if we are not on the homehost, store in runremote
  ;; 2. check the age of the connections. refresh the connection if it is older than timeout-20 seconds.
  ;; 3. do the query, if on homehost use local access
  ;;
  (let* ((start-time    (current-seconds)) ;; snapshot time so all use cases get same value
         (areapath      *toppath*);; TODO - resolve from dbstruct to be compatible with multiple areas
	 (runremote     (or area-dat

Modified tcp-transportmod.scm from [d835b7f23a] to [5c83e85f41].

134
135
136
137
138
139
140



141
142
143
144
145
146











147
148
149
150
151
152
153
134
135
136
137
138
139
140
141
142
143






144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161







+
+
+
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+







;;
(define (tt:handler ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe)
  ;; NOTE: areapath is passed in and in tt struct. We'll use passed in value for now.
  (let* ((conn (tt:client-connect-to-server ttdat dbfname run-id))) ;; (hash-table-ref/default (tt-conns ttdat) dbfname #f)))
    (if conn
	;; have connection, call the server
	(let* ((res (tt:send-receive ttdat conn cmd run-id params)))
	  ;; res is (status errmsg result meta)
	  (match res
	    ((status errmsg result meta)
	  (cond
	   ((member res '(busy starting))
	    (thread-sleep! 1)
	    (tt:handler  ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe))
	   (else
	    res)))
	     (case status
	       ((busy)
		(debug:print 0 *default-log-port* "WARNING: server is overloaded, will try again in few seconds.")
		(thread-sleep! 2)
		(tt:handler  ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe))
	       ((loaded)
		(debug:print 0 *default-log-port* "WARNING: server is loaded, will try again in a second.")
		(thread-sleep! 1)
		(tt:handler  ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe))
	       (else
		result)))))
	(begin
	  (thread-sleep! 1) ;; give it a rest and try again
	  (tt:handler ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe)))))

	;; no conn yet, find and or start and find a server
;; 	(let* ((server (tt:find-server ttdat dbfname)))
;; 	  (if server
254
255
256
257
258
259
260


261
262
263
264
265
266
267
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277







+
+







  ;; now start watching the last-access, if it hasn't been touched
  ;; in over ten seconds we exit
  (let loop ()
    (if (< (- (current-seconds) (tt-last-access ttdat)) 10)
	(begin
	  (thread-sleep! 2)
	  (loop))))
  (if (tt-cleanup-proc ttdat)
      ((tt-cleanup-proc ttdat)))
  (debug:print 0 *default-log-port* "INFO: Server timed out, exiting."))

;; ;; given an already set up uconn start the cmd-loop
;; ;;
;; (define (tt:cmd-loop ttdat)
;;   (let* ((serv-listener (-socket uconn))
;; 	 (listener      (lambda ()