Overview
Comment: | rmt:send-receive -> tt:handler -> tcp -> api:tcp-dispatch-request -> api:dispatch-request and back implemented and compiles. |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | v1.80-tcp-inmem |
Files: | files | file ages | folders |
SHA1: |
a91d15ac06ff79f4a1b9d1e90bfb32d1 |
User & Date: | matt on 2023-02-19 10:37:41 |
Other Links: | branch diff | manifest | tags |
Context
2023-02-19
| ||
18:41 | Basic communication working, ping, get-keys. check-in: e01a10845a user: matt tags: v1.80-tcp-inmem | |
10:37 | rmt:send-receive -> tt:handler -> tcp -> api:tcp-dispatch-request -> api:dispatch-request and back implemented and compiles. check-in: a91d15ac06 user: matt tags: v1.80-tcp-inmem | |
2023-02-18
| ||
20:32 | server registation and timeout working check-in: 743e63bc9e user: matt tags: v1.80-tcp-inmem | |
Changes
Modified api.scm from [f3cc459c57] to [c88d2a22c9].
︙ | ︙ | |||
221 222 223 224 225 226 227 228 229 230 231 232 233 234 | (ok-res . #t))) (vector #f res)) (begin #;(common:telemetry-log (conc "api-out:"(->string cmd)) payload: `((params . ,params) (ok-res . #f))) (vector #t res)))))))) (define (api:dispatch-request dbstruct cmd run-id params) (case cmd ;;=============================================== ;; READ/WRITE QUERIES ;;=============================================== | > > > > > > > > > > > > > > > > > > > > > > > > > | 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 | (ok-res . #t))) (vector #f res)) (begin #;(common:telemetry-log (conc "api-out:"(->string cmd)) payload: `((params . ,params) (ok-res . #f))) (vector #t res)))))))) ;; indat is (cmd run-id params meta) (define (api:tcp-dispatch-request dbstruct indat) ;; cmd run-id params) (set! *api-process-request-count* (+ *api-process-request-count* 1)) (match (deserialize indat) ((cmd run-id params meta) (let* ((status (cond ((> *api-process-request-count* 50) 'busy) ((> *api-process-request-count* 25) 'loaded) (else 'ok))) (errmsg (case status ((busy) (conc "Server overloaded, "*api-process-request-count*" threads in flight")) ((loaded) (conc "Server loaded, "*api-process-request-count*" threads in flight")) (else #f))) (result (case status ((busy) #f) (else (api:dispatch-request dbstruct cmd run-id params)))) (payload (list status errmsg result '())) (pdat (serialize payload))) (set! *api-process-request-count* (- *api-process-request-count* 1)) pdat)) (else (let* ((msg (conc "(deserialize indat)="(deserialize indat)", indat="indat))) (assert #f "FATAL: failed to deserialize indat "msg))))) (define (api:dispatch-request dbstruct cmd run-id params) (case cmd ;;=============================================== ;; READ/WRITE QUERIES ;;=============================================== |
︙ | ︙ |
Modified megatest.scm from [a71b7bf85e] to [ae2b7cbe8a].
︙ | ︙ | |||
939 940 941 942 943 944 945 | (dbfname (args:get-arg "-db")) (tl (launch:setup))) (case (rmt:transport-mode) ((http)(http-transport:launch)) ((tcp) (debug:print 0 *default-log-port* "INFO: Running using tcp method.") (if run-id | | | 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 | (dbfname (args:get-arg "-db")) (tl (launch:setup))) (case (rmt:transport-mode) ((http)(http-transport:launch)) ((tcp) (debug:print 0 *default-log-port* "INFO: Running using tcp method.") (if run-id (tt:start-server tl run-id dbfname api:tcp-dispatch-request) (begin (debug:print 0 *default-log-port* "ERROR: transport mode is tcp - -run-id is required.") (exit 1)))) (else (debug:print 0 *default-log-port* "ERROR: rmt:transport-mode value not recognised "(rmt:transport-mode)))) (set! *didsomething* #t))) ;; The adjutant is a bit different, it does NOT run (launch:setup) as it is not necessarily tied to |
︙ | ︙ |
Modified rmt.scm from [52cc52478a] to [ba515dcb54].
︙ | ︙ | |||
82 83 84 85 86 87 88 | (define *send-receive-mutex* (make-mutex)) ;; should have separate mutex per run-id ;; RA => e.g. usage (rmt:send-receive 'get-var #f (list varname)) ;; (define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f)) ;; start attemptnum at 1 so the modulo below works as expected | < < < < > | | 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 | (define *send-receive-mutex* (make-mutex)) ;; should have separate mutex per run-id ;; RA => e.g. usage (rmt:send-receive 'get-var #f (list varname)) ;; (define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f)) ;; start attemptnum at 1 so the modulo below works as expected (if (> attemptnum 2) (debug:print 0 *default-log-port* "INFO: attemptnum in rmt:send-receive is " attemptnum)) (cond ((> attemptnum 2) (thread-sleep! 0.05)) ((> attemptnum 10) (thread-sleep! 0.5)) ((> attemptnum 20) (thread-sleep! 1))) ;; I'm turning this off, it may make sense to move it ;; into http-transport-handler (if (and (> attemptnum 5) (= 0 (modulo attemptnum 15))) (begin (debug:print 0 *default-log-port* "ERROR: can't connect to server, trying to start a server.") (server:run *toppath*) (thread-sleep! 3))) ;; 1. check if server is started IFF cmd is a write OR if we are not on the homehost, store in runremote ;; 2. check the age of the connections. refresh the connection if it is older than timeout-20 seconds. ;; 3. do the query, if on homehost use local access ;; (let* ((start-time (current-seconds)) ;; snapshot time so all use cases get same value (areapath *toppath*);; TODO - resolve from dbstruct to be compatible with multiple areas (runremote (or area-dat |
︙ | ︙ |
Modified tcp-transportmod.scm from [d835b7f23a] to [5c83e85f41].
︙ | ︙ | |||
134 135 136 137 138 139 140 | ;; (define (tt:handler ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe) ;; NOTE: areapath is passed in and in tt struct. We'll use passed in value for now. (let* ((conn (tt:client-connect-to-server ttdat dbfname run-id))) ;; (hash-table-ref/default (tt-conns ttdat) dbfname #f))) (if conn ;; have connection, call the server (let* ((res (tt:send-receive ttdat conn cmd run-id params))) | > > > | | > > > > > | | | | | 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 | ;; (define (tt:handler ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe) ;; NOTE: areapath is passed in and in tt struct. We'll use passed in value for now. (let* ((conn (tt:client-connect-to-server ttdat dbfname run-id))) ;; (hash-table-ref/default (tt-conns ttdat) dbfname #f))) (if conn ;; have connection, call the server (let* ((res (tt:send-receive ttdat conn cmd run-id params))) ;; res is (status errmsg result meta) (match res ((status errmsg result meta) (case status ((busy) (debug:print 0 *default-log-port* "WARNING: server is overloaded, will try again in few seconds.") (thread-sleep! 2) (tt:handler ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe)) ((loaded) (debug:print 0 *default-log-port* "WARNING: server is loaded, will try again in a second.") (thread-sleep! 1) (tt:handler ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe)) (else result))))) (begin (thread-sleep! 1) ;; give it a rest and try again (tt:handler ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe))))) ;; no conn yet, find and or start and find a server ;; (let* ((server (tt:find-server ttdat dbfname))) ;; (if server |
︙ | ︙ | |||
254 255 256 257 258 259 260 261 262 263 264 265 266 267 | ;; now start watching the last-access, if it hasn't been touched ;; in over ten seconds we exit (let loop () (if (< (- (current-seconds) (tt-last-access ttdat)) 10) (begin (thread-sleep! 2) (loop)))) (debug:print 0 *default-log-port* "INFO: Server timed out, exiting.")) ;; ;; given an already set up uconn start the cmd-loop ;; ;; ;; (define (tt:cmd-loop ttdat) ;; (let* ((serv-listener (-socket uconn)) ;; (listener (lambda () | > > | 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 | ;; now start watching the last-access, if it hasn't been touched ;; in over ten seconds we exit (let loop () (if (< (- (current-seconds) (tt-last-access ttdat)) 10) (begin (thread-sleep! 2) (loop)))) (if (tt-cleanup-proc ttdat) ((tt-cleanup-proc ttdat))) (debug:print 0 *default-log-port* "INFO: Server timed out, exiting.")) ;; ;; given an already set up uconn start the cmd-loop ;; ;; ;; (define (tt:cmd-loop ttdat) ;; (let* ((serv-listener (-socket uconn)) ;; (listener (lambda () |
︙ | ︙ |