Overview
Comment: | Basic communication working, ping, get-keys. |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | v1.80-tcp-inmem |
Files: | files | file ages | folders |
SHA1: |
e01a10845a1da03f6285ffec0034e037 |
User & Date: | matt on 2023-02-19 18:41:25 |
Other Links: | branch diff | manifest | tags |
Context
2023-02-19
| ||
19:19 | Couple fixes. basic queries, register-test, get-test-id, login, working and starting a server on demand check-in: 9e78ced13a user: matt tags: v1.80-tcp-inmem | |
18:41 | Basic communication working, ping, get-keys. check-in: e01a10845a user: matt tags: v1.80-tcp-inmem | |
10:37 | rmt:send-receive -> tt:handler -> tcp -> api:tcp-dispatch-request -> api:dispatch-request and back implemented and compiles. check-in: a91d15ac06 user: matt tags: v1.80-tcp-inmem | |
Changes
Modified api.scm from [c88d2a22c9] to [351c29f44d].
︙ | ︙ | |||
16 17 18 19 20 21 22 | ;; GNU General Public License for more details. ;; ;; You should have received a copy of the GNU General Public License ;; along with Megatest. If not, see <http://www.gnu.org/licenses/>. ;; ;;====================================================================== | < < > > > > > > > | 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | ;; GNU General Public License for more details. ;; ;; You should have received a copy of the GNU General Public License ;; along with Megatest. If not, see <http://www.gnu.org/licenses/>. ;; ;;====================================================================== (declare (unit api)) (declare (uses rmt)) (declare (uses db)) (declare (uses dbmod)) (declare (uses dbfile)) (declare (uses tasks)) (declare (uses tcp-transportmod)) (import dbmod) (import dbfile) (import tcp-transportmod) (use srfi-69 posix matchable s11n) ;; allow these queries through without starting a server ;; (define api:read-only-queries '(get-key-val-pairs get-var get-keys |
︙ | ︙ | |||
223 224 225 226 227 228 229 | (begin #;(common:telemetry-log (conc "api-out:"(->string cmd)) payload: `((params . ,params) (ok-res . #f))) (vector #t res)))))))) ;; indat is (cmd run-id params meta) | | > > | | | | | | | | | | | | | > > > > | | < | | | < | | 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 | (begin #;(common:telemetry-log (conc "api-out:"(->string cmd)) payload: `((params . ,params) (ok-res . #f))) (vector #t res)))))))) ;; indat is (cmd run-id params meta) (define (api:tcp-dispatch-request-make-handler dbstruct) ;; cmd run-id params) (lambda () (let* ((indat (deserialize))) (set! *api-process-request-count* (+ *api-process-request-count* 1)) (match indat ((cmd run-id params meta) (let* ((status (cond ((> *api-process-request-count* 50) 'busy) ((> *api-process-request-count* 25) 'loaded) (else 'ok))) (errmsg (case status ((busy) (conc "Server overloaded, "*api-process-request-count*" threads in flight")) ((loaded) (conc "Server loaded, "*api-process-request-count*" threads in flight")) (else #f))) (result (case status ((busy) #f) (else (case cmd ((ping) (tt:mk-signature *toppath*)) (else (api:dispatch-request dbstruct cmd run-id params)))))) (payload (list status errmsg result '()))) (set! *api-process-request-count* (- *api-process-request-count* 1)) (serialize payload))) (else (assert #f "FATAL: failed to deserialize indat "indat)))))) (define (api:dispatch-request dbstruct cmd run-id params) (case cmd ;;=============================================== ;; READ/WRITE QUERIES ;;=============================================== |
︙ | ︙ |
Modified megatest.scm from [ae2b7cbe8a] to [6f2fe2c4df].
︙ | ︙ | |||
939 940 941 942 943 944 945 | (dbfname (args:get-arg "-db")) (tl (launch:setup))) (case (rmt:transport-mode) ((http)(http-transport:launch)) ((tcp) (debug:print 0 *default-log-port* "INFO: Running using tcp method.") (if run-id | | | 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 | (dbfname (args:get-arg "-db")) (tl (launch:setup))) (case (rmt:transport-mode) ((http)(http-transport:launch)) ((tcp) (debug:print 0 *default-log-port* "INFO: Running using tcp method.") (if run-id (tt:start-server tl run-id dbfname api:tcp-dispatch-request-make-handler) (begin (debug:print 0 *default-log-port* "ERROR: transport mode is tcp - -run-id is required.") (exit 1)))) (else (debug:print 0 *default-log-port* "ERROR: rmt:transport-mode value not recognised "(rmt:transport-mode)))) (set! *didsomething* #t))) ;; The adjutant is a bit different, it does NOT run (launch:setup) as it is not necessarily tied to |
︙ | ︙ |
Modified tcp-transportmod.scm from [5c83e85f41] to [050a577392].
︙ | ︙ | |||
96 97 98 99 100 101 102 | (thread #f) (host-port #f) (cmd-thread #f) (last-access (current-seconds)) ) (define (tt:make-remote areapath) | | | | > > > > > > | > > | | | > | | | | > > | > > > > > | < < < < > > > > > > > > > > > > > > > > | 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 | (thread #f) (host-port #f) (cmd-thread #f) (last-access (current-seconds)) ) (define (tt:make-remote areapath) (make-tt areapath: areapath)) ;; do all the busy work of finding and setting up conn for ;; connecting to a server ;; (define (tt:client-connect-to-server ttdat dbfname run-id ) (let* ((conn (hash-table-ref/default (tt-conns ttdat) dbfname #f)) (server-start-proc (lambda () (tt:server-process-run (tt-areapath ttdat) (dbfile:testsuite-name) (common:find-local-megatest) run-id)))) (if conn conn ;; we are already connected to the server (let* ((sdat (tt:get-current-server-info ttdat dbfname run-id))) (match sdat ((host port start-time server-id pid dbfname2) (assert (equal? dbfname dbfname2) "FATAL: read server info from wrong file.") (let* ((host-port (conc host":"port)) (conn (make-tt-conn host: host port: port host-port: host-port dbfname: dbfname server-id: server-id server-start: start-time pid: pid))) (hash-table-set! (tt-conns ttdat) dbfname conn) ;; verify we can talk to this server (if (tt:ping host port server-id) conn (begin ;; rm the (last server) would go here (server-start-proc) (thread-sleep! 1) (tt:client-connect-to-server ttdat dbfname run-id))))) (else (server-start-proc) (thread-sleep! 1) (tt:client-connect-to-server ttdat dbfname run-id))))))) (define (tt:ping host port server-id) (let* ((res (tt:send-receive-direct host port `(ping #f #f #f)))) ;; please send me your server-id ;; ;; need two threads, one a 5 second timer ;; (match res ((status errmsg result meta) (if (equal? result server-id) #t ;; then we are good (begin (debug:print 0 *default-log-port* "WARNING: server-id does not match, expected: "server-id", got: "result) #f))) (else (debug:print 0 *default-log-port* "res not in form (status errmsg resutl meta), got: "res) #f)))) ;; client side handler ;; (define (tt:handler ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe) ;; NOTE: areapath is passed in and in tt struct. We'll use passed in value for now. (let* ((conn (tt:client-connect-to-server ttdat dbfname run-id))) ;; (hash-table-ref/default (tt-conns ttdat) dbfname #f))) (if conn |
︙ | ︙ | |||
147 148 149 150 151 152 153 | (thread-sleep! 2) (tt:handler ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe)) ((loaded) (debug:print 0 *default-log-port* "WARNING: server is loaded, will try again in a second.") (thread-sleep! 1) (tt:handler ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe)) (else | | > > | 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 | (thread-sleep! 2) (tt:handler ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe)) ((loaded) (debug:print 0 *default-log-port* "WARNING: server is loaded, will try again in a second.") (thread-sleep! 1) (tt:handler ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe)) (else result))) (else (assert #f "FATAL: tt:handler received bad data "res)))) (begin (thread-sleep! 1) ;; give it a rest and try again (tt:handler ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe))))) ;; no conn yet, find and or start and find a server ;; (let* ((server (tt:find-server ttdat dbfname))) ;; (if server |
︙ | ︙ | |||
170 171 172 173 174 175 176 | ;; (tt:handler ttdat cmd run-id params attemptnum area-dat areapath ;; readonly-mode dbfname testsuite mtexe))))))) (define (tt:bid-for-servership run-id) #f) (define (tt:get-current-server-info ttdat dbfname run-id) | > | < | | > > > | | > > | | > > > > > > > | < < | < > > > | < | > | | | | | | | | | | | | | | | | | | | | | | | 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 | ;; (tt:handler ttdat cmd run-id params attemptnum area-dat areapath ;; readonly-mode dbfname testsuite mtexe))))))) (define (tt:bid-for-servership run-id) #f) (define (tt:get-current-server-info ttdat dbfname run-id) (assert (tt-areapath ttdat) "FATAL: areapath not set in ttdat.") (let* ((areapath (tt-areapath ttdat)) (sfiles (tt:find-server areapath dbfname)) (sdats (filter car (map tt:server-get-info sfiles))) ;; first element is #f if the file disappeared while being read (sorted (sort sdats (lambda (a b) (< (list-ref a 2)(list-ref b 2)))))) (if (null? sorted) #f ;; we'll want to wait until extra servers have exited (car sorted)))) (define (tt:send-receive ttdat conn cmd run-id params) (let* ((host-port (tt-conn-host-port conn)) ;; (conc (tt-conn-host conn)":"(tt-conn-port conn))) (host (tt-conn-host conn)) (port (tt-conn-port conn)) (dat (list cmd run-id params #f))) ;; no meta data yet (tt:send-receive-direct host port dat))) (define (tt:send-receive-direct host port dat) (assert (number? port) "FATAL: tt:send-receive-direct called with port not a number "port) (handle-exceptions exn #f ;; Add condition-case or better handling here (let-values (((inp oup)(tcp-connect host port))) (let ((res (if (and inp oup) (begin (serialize dat oup) (close-output-port oup) (deserialize inp)) ))) (close-input-port inp) res)))) ;;====================================================================== ;; server ;;====================================================================== (define (tt:sync-dbs ttdat) #f) ;; start the listener and start responding to requests ;; ;; NOTE: organise by dbfname, not run-id so we don't need ;; to pull in more modules ;; ;; This is the routine called in megatest.scm to start a server ;; (define (tt:start-server areapath run-id dbfname handler) (assert areapath "FATAL: areapath not provided for tt:start-server") ;; is there already a server for this dbfile? Then exit. (let* ((ttdat (make-tt areapath: areapath)) (servers (tt:find-server areapath dbfname))) ;; should use tt:get-current-server-info instead (if (null? servers) (let* ((dbstruct (dbmod:open-dbmoddb areapath run-id (dbfile:db-init-proc)))) (tt-handler-set! ttdat (handler dbstruct)) (let* ((tcp-thread (make-thread (lambda () (tt:start-tcp-server ttdat)) ;; start the tcp-server which applies handler to incoming data "tcp-server-thread")) (run-thread (make-thread (lambda () (tt:keep-running ttdat dbfname))))) (thread-start! tcp-thread) (thread-start! run-thread) (thread-join! run-thread) ;; run thread will exit on timeout or other conditions ;; ;; set a flag here to tell tcp-thread to stop running ;; ;; (thread-join! tcp-thread) ;; can't wait ;; ;; remove the servinfo file ;; ;; close the database, remove lock in on-disk db ;; ;; close the listener ports ;; (exit))) (begin (debug:print 0 *default-log-port* "INFO: found server(s) already running for db "dbfname", "(string-intersperse servers ",")" Exiting.") (exit))))) (define (tt:keep-running ttdat dbfname) ;; verfiy conn for ready ;; listener socket has been started by this stage |
︙ | ︙ | |||
258 259 260 261 262 263 264 | (thread-sleep! 1) (loop (+ count 1)))))) (tt:create-server-registration-file ttdat dbfname) ;; now start watching the last-access, if it hasn't been touched ;; in over ten seconds we exit (let loop () | | | 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 | (thread-sleep! 1) (loop (+ count 1)))))) (tt:create-server-registration-file ttdat dbfname) ;; now start watching the last-access, if it hasn't been touched ;; in over ten seconds we exit (let loop () (if (< (- (current-seconds) (tt-last-access ttdat)) 60) (begin (thread-sleep! 2) (loop)))) (if (tt-cleanup-proc ttdat) ((tt-cleanup-proc ttdat))) (debug:print 0 *default-log-port* "INFO: Server timed out, exiting.")) |
︙ | ︙ | |||
334 335 336 337 338 339 340 | serv-id)) ;; find valid server ;; get servers listed, last part of name must match :<dbfname> ;; if more than one, wait one second and look again ;; future: ping oldest, if alive remove other :<dbfname> files ;; | | < | | | | 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 | serv-id)) ;; find valid server ;; get servers listed, last part of name must match :<dbfname> ;; if more than one, wait one second and look again ;; future: ping oldest, if alive remove other :<dbfname> files ;; (define (tt:find-server areapath dbfname) (let* ((servdir (tt:get-servinfo-dir areapath)) (sfiles (glob (conc servdir"/*:"dbfname)))) sfiles)) ;; given a path to a server info file return: host port startseconds server-id ;; example of what it's looking for in the log file: ;; SERVER STARTED: 10.38.175.67:50216 AT 1616502350.0 server-id: 4907e90fc55c7a09694e3f658c639cf4 ;; (define (tt:server-get-info logf) (let ((server-rx (regexp "^SERVER STARTED: (\\S+):(\\d+) AT ([\\d\\.]+) server-id: (\\S+) pid: (\\d+) dbfname: (\\S+)")) ;; SERVER STARTED: host:port AT timesecs server id (dbprep-rx (regexp "^SERVER: dbprep")) (dbprep-found 0) (bad-dat (list #f #f #f #f #f #f))) (handle-exceptions exn (begin ;; WARNING: this is potentially dangerous to blanket ignore the errors (if (file-exists? logf) (debug:print-info 2 *default-log-port* "Unable to get server info from "logf", exn=" exn)) bad-dat) ;; no idea what went wrong, call it a bad server |
︙ | ︙ | |||
372 373 374 375 376 377 378 | (if (not mlst) (if (< lnum 500) ;; give up if more than 500 lines of server log read (loop (read-line)(+ lnum 1)) (begin (debug:print-info 0 *default-log-port* "Unable to get server info from first 500 lines of " logf ) bad-dat)) (match mlst | | | > | 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 | (if (not mlst) (if (< lnum 500) ;; give up if more than 500 lines of server log read (loop (read-line)(+ lnum 1)) (begin (debug:print-info 0 *default-log-port* "Unable to get server info from first 500 lines of " logf ) bad-dat)) (match mlst ((_ host port start server-id pid dbfname) (list host (string->number port) (string->number start) server-id (string->number pid) dbfname)) (else (debug:print 0 *default-log-port* "ERROR: did not recognise SERVER line info "mlst) bad-dat)))) (begin (if dbprep-found (begin (debug:print-info 2 *default-log-port* "Server is in dbprep at " (common:human-time)) |
︙ | ︙ |