Overview
Comment: | Experiments in bypassing parts of the ulex flow. |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | v2.0001-ulex-experiment |
Files: | files | file ages | folders |
SHA1: |
51f97e7aaf8e9b91c73872531aa69cfb |
User & Date: | matt on 2022-01-05 17:36:47 |
Other Links: | branch diff | manifest | tags |
Context
2022-01-05
| ||
17:36 | Experiments in bypassing parts of the ulex flow. Leaf check-in: 51f97e7aaf user: matt tags: v2.0001-ulex-experiment | |
11:48 | Put megatest main call into thread so that mailboxes work check-in: a4d8d9166c user: matt tags: v2.0001 | |
Changes
Modified rmtmod.scm from [f907b596d1] to [c470f3cdbf].
︙ | ︙ | |||
269 270 271 272 273 274 275 276 277 278 279 280 281 282 | (hash-table-set! conns fullpath new-the-srv))) #t))))) ;; NB// sinfo is a servdat struct ;; (define (rmt:general-open-connection sinfo apath dbname #!key (num-tries 5)) (assert (not (equal? dbname ".db/main.db")) "ERROR: general-open-connection should never be called with main as the db") (let* ((mdbname ".db/main.db") ;; (db:run-id->dbname #f)) TODO: put this back to the lookup when stable (fullname (db:dbname->path apath dbname)) (conns (servdat-conns sinfo)) (mconn (rmt:get-conn sinfo apath ".db/main.db")) (dconn (rmt:get-conn sinfo apath dbname))) #;(if (and mconn (not (debug:print-logger))) | > > > > > > > | 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 | (hash-table-set! conns fullpath new-the-srv))) #t))))) ;; NB// sinfo is a servdat struct ;; (define (rmt:general-open-connection sinfo apath dbname #!key (num-tries 5)) (assert (not (equal? dbname ".db/main.db")) "ERROR: general-open-connection should never be called with main as the db") (let loop () (if (not (and *db-serv-info* (servdat-uconn *db-serv-info*))) (begin (debug:print-info 0 *default-log-port* "Waiting for my listener to be available...") (thread-sleep! 1) (loop)))) (let* ((mdbname ".db/main.db") ;; (db:run-id->dbname #f)) TODO: put this back to the lookup when stable (fullname (db:dbname->path apath dbname)) (conns (servdat-conns sinfo)) (mconn (rmt:get-conn sinfo apath ".db/main.db")) (dconn (rmt:get-conn sinfo apath dbname))) #;(if (and mconn (not (debug:print-logger))) |
︙ | ︙ |
Modified ulex/ulex.scm from [c5a87871a1] to [3cbdcb425a].
︙ | ︙ | |||
58 59 60 61 62 63 64 65 66 67 68 69 70 71 | chicken.base chicken.file chicken.time chicken.condition chicken.string chicken.sort chicken.pretty-print address-info mailbox matchable ;; queues regex regex-case | > | 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 | chicken.base chicken.file chicken.time chicken.condition chicken.string chicken.sort chicken.pretty-print chicken.process-context.posix address-info mailbox matchable ;; queues regex regex-case |
︙ | ︙ | |||
97 98 99 100 101 102 103 | (avail-cmboxes '()) ;; list of (<cookie> . <mbox>) for re-use ;; threads (numthreads 50) (cmd-thread #f) (work-queue-thread #f) ) | | | | | < | | < < | | < < < < | 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 | (avail-cmboxes '()) ;; list of (<cookie> . <mbox>) for re-use ;; threads (numthreads 50) (cmd-thread #f) (work-queue-thread #f) ) ;; short: immediately run the task and return the result ;; mailb: use mailbox to queue the tasks on the server side ;; send tasks to mailbox, send results to caller ;; thread: use immediately created threads to do the task, send ;; is called directly ;; (define ulex-mode (make-parameter 'short)) ;;====================================================================== ;; listener ;;====================================================================== ;; is uconn a ulex connector (listener) ;; |
︙ | ︙ | |||
208 209 210 211 212 213 214 215 216 217 218 219 220 221 | (begin (print "ERROR: send called but no receiver has been setup. Please call setup first!") #f)))) (close-input-port inp) (close-output-port oup) res)))))) ;; res will always be 'ack ;; send a request to the given host-port and register a mailbox in udata ;; wait for the mailbox data and return it ;; (define (send-receive uconn host-port cmd data) (cond ((member cmd '(ping goodbye)) ;; these are immediate (send uconn host-port 'ping cmd data)) | > > > > > > > | > | | | | | | | > | | | | | | | | | | > > > | | | | | | | | | | | | | | | | | | | | < | | > > > | | | | | 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 | (begin (print "ERROR: send called but no receiver has been setup. Please call setup first!") #f)))) (close-input-port inp) (close-output-port oup) res)))))) ;; res will always be 'ack (define (delta-print start-ms threshold . params) (let* ((delta (- (current-milliseconds) start-ms))) (if (>= delta threshold) (apply print "ULEX: long wait "delta"ms. " params)))) ;; send a request to the given host-port and register a mailbox in udata ;; wait for the mailbox data and return it ;; (define (send-receive uconn host-port cmd data) (cond ((member cmd '(ping goodbye)) ;; these are immediate (send uconn host-port 'ping cmd data)) ((member (ulex-mode) '(short)) (send uconn host-port 'nokey cmd data)) (else ;; 'full (let* ((cmbox (get-cmbox uconn)) ;; would it be better to keep a stack of mboxes to reuse? (qrykey (car cmbox)) (mbox (cdr cmbox)) (mbox-time (current-milliseconds)) (sres (send uconn host-port qrykey cmd data))) ;; short res (cond ((eq? sres 'ack) (let* ((mbox-timeout-secs 120 #;(if (eq? 'primordial (thread-name (current-thread))) #f 120)) ;; timeout) (mbox-timeout-result 'MBOX_TIMEOUT) (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result)) (mbox-receive-time (current-milliseconds))) (delta-print mbox-time 6000 ", pid="(current-process-id)". send-receive mailbox received "res) ;; (put-cmbox uconn cmbox) ;; reuse mbox and cookie. is it worth it? (hash-table-delete! (udat-mboxes uconn) qrykey) (if (eq? res 'MBOX_TIMEOUT) (begin (print "WARNING: mbox timed out for query "cmd", with data "data) #f) ;; convert to raising exception? res))) (else (print "ERROR: Communication failed? Got "sres) #f)))))) ;; #f means failed to communicate ;;====================================================================== ;; responder side ;;====================================================================== ;; take a request, rdat, and if not immediate put it in the work queue ;; ;; Reserved cmds; ack ping goodbye response ;; (define (ulex-handler uconn rdat) (assert (list? rdat) "FATAL: ulex-handler give rdat as not list") (match rdat ;; (string-split controldat) ((rem-host-port qrykey cmd params) ;; (print "ulex-handler got: "rem-host-port" qrykey: "qrykey" cmd: "cmd" params: "params) (case (ulex-mode) ((short)(do-work uconn rdat)) ((mailb) (let ((mbox (hash-table-ref/default (udat-mboxes uconn) qrykey #f))) (case cmd ;; ((ack )(print "Got ack! But why? Should NOT get here.") 'ack) ((ping) ;; (print "Got Ping!") ;; (add-to-work-queue uconn rdat) 'ack) ((goodbye) ;; just clear out references to the caller (add-to-work-queue uconn rdat) 'ack) ((response) ;; this is a result from remote processing, send it as mail ... (if mbox (begin (mailbox-send! mbox params) ;; params here is our result 'ack) (begin (print "ERROR: received result but no associated mbox for cookie "qrykey) #f))) (else (add-to-work-queue uconn rdat) 'ack)))) ((thread)(thread-start! (make-thread (lambda () (do-work uconn rdat))))) (else (print "BAD DATA? controldat=" rdat) 'ack) ;; send ack anyway? )))) ;; given an already set up uconn start the cmd-loop ;; (define (ulex-cmd-loop uconn) (let* ((serv-listener (udat-socket uconn))) (let loop ((state 'start)) (let-values (((inp oup)(tcp-accept serv-listener))) |
︙ | ︙ | |||
307 308 309 310 311 312 313 | ;; work queues - this is all happening on the listener side ;;====================================================================== ;; rdat is (rem-host-port qrykey cmd params) (define (add-to-work-queue uconn rdat) #;(queue-add! (udat-work-queue uconn) rdat) | > > > | > > > > > > | > > > | | > > | > | 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 | ;; work queues - this is all happening on the listener side ;;====================================================================== ;; rdat is (rem-host-port qrykey cmd params) (define (add-to-work-queue uconn rdat) #;(queue-add! (udat-work-queue uconn) rdat) (case (ulex-mode) ((short) (assert #f "FATAL: Should never get here.")) ((full) (mailbox-send! (udat-work-queue uconn) rdat)) ((thread) (thread-start! (make-thread (lambda ()(do-work uconn rdat)) "do-work"))))) (define (do-work uconn rdat) (let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change ;; put this following into a do-work procedure (match rdat ((rem-host-port qrykey cmd params) (let* ((start-time (current-milliseconds)) (result (proc rem-host-port qrykey cmd params)) (end-time (current-milliseconds)) (run-time (- end-time start-time))) (print "ULEX: work "cmd", "params" done in "run-time" ms") ;; send 'response as cmd and result as params (case (ulex-mode) ((mailb thread) (send uconn rem-host-port qrykey 'response result)) ;; could check for ack ((short) result) (else (print "ULEX: error - ulex-mode unrecognised "(ulex-mode)))))) ;; (print "ULEX: response sent back to "rem-host-port" in "(- (current-milliseconds) end-time)))) (MBOX_TIMEOUT #f) (else (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params") #f)))) (define (process-work-queue uconn) (let ((wqueue (udat-work-queue uconn)) (proc (udat-work-proc uconn)) (numthr (udat-numthreads uconn))) (let loop ((thnum 1) (threads '())) (let ((thlst (cons (make-thread (lambda () (let work-loop () (let ((start-time (current-milliseconds)) (rdat (mailbox-receive! wqueue 24000 'MBOX_TIMEOUT))) (delta-print start-time 60000 ", pid="(current-process-id)" process-work-queue mailbox received "rdat) (do-work uconn rdat)) (work-loop))) (conc "work thread " thnum)) threads))) (if (< thnum numthr) (loop (+ thnum 1) thlst) |
︙ | ︙ |