Overview
Comment: | Switch to using threads instead of mailbox for worker calls. Seems to not block as much. |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | v2.0001 |
Files: | files | file ages | folders |
SHA1: |
ba5884c651f453dd4cc71de9c36bb2ad |
User & Date: | matt on 2022-01-08 20:46:47 |
Other Links: | branch diff | manifest | tags |
Context
2022-01-10
| ||
06:42 | Added support to switch between various methods of handling call loops check-in: 10af298b33 user: matt tags: v2.0001 | |
2022-01-08
| ||
20:46 | Switch to using threads instead of mailbox for worker calls. Seems to not block as much. check-in: ba5884c651 user: matt tags: v2.0001 | |
2022-01-07
| ||
17:11 | wip. not much improvement... check-in: 49f0afc304 user: matt tags: v2.0001 | |
Changes
Modified tests/simplerun/debug.scm from [f6de86f926] to [d176a07199].
︙ | ︙ | |||
12 13 14 15 16 17 18 | (module junk * (import big-chicken rmtmod apimod dbmod srfi-18) (define (make-run-id) | | | | < | | | > > > > > | 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | (module junk * (import big-chicken rmtmod apimod dbmod srfi-18) (define (make-run-id) (let* ((s (conc (current-process-id))) (l (string-length s))) (string->number (substring s (- l 3) l)) )) (define (run) (let* ((th1 (make-thread (lambda () (let loop ((r (make-run-id)) (i 1)) (let ((start-time (current-milliseconds))) (rmt:register-test r "test1" (conc "item_" i)) (let ((qry-time (- (current-milliseconds) start-time))) (if (> qry-time 500) (print "WARNING: rmt:register-test took more than 500ms, "qry-time"ms")))) (if (eq? (modulo i 100) 0) (print "For run-id="r", num tests registered="i)) (if (< i 100000) (loop r (+ i 1)) (if (< r 100) (begin (print "get-tests-for-run "r) (rmt:get-tests-for-run r "%" '() '() 0 #f #f #f #f #f 0 #f) (loop (+ r 1) 0))))) |
︙ | ︙ |
Modified ulex/ulex.scm from [3ca1071e09] to [123c4c1081].
︙ | ︙ | |||
48 49 50 51 52 53 54 55 56 57 58 59 60 61 | ;; needed to get the interface:port that was automatically found udat-port udat-host-port ;; for testing only ;; pp-uconn ) (import scheme chicken.base chicken.file chicken.time chicken.condition | > | 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 | ;; needed to get the interface:port that was automatically found udat-port udat-host-port ;; for testing only ;; pp-uconn work-method ;; parameter; 'threads, 'mailbox, 'limited, 'direct ) (import scheme chicken.base chicken.file chicken.time chicken.condition |
︙ | ︙ | |||
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 | (cnum 0) ;; cookie number (mboxes (make-hash-table)) ;; for the replies (avail-cmboxes '()) ;; list of (<cookie> . <mbox>) for re-use ;; threads (numthreads 10) (cmd-thread #f) (work-queue-thread #f) ) ;; ;; struct for keeping track of others we are talking to ;; ;; ;; (defstruct pdat ;; (host-port #f) ;; (conns '()) ;; list of pcon structs, pop one off when calling the peer ;; ) ;; | > > > > > > > > > > > | 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 | (cnum 0) ;; cookie number (mboxes (make-hash-table)) ;; for the replies (avail-cmboxes '()) ;; list of (<cookie> . <mbox>) for re-use ;; threads (numthreads 10) (cmd-thread #f) (work-queue-thread #f) (num-threads-running 0) ) ;; Parameters ;; work-method: ;; mailbox - all rdat goes through mailbox ;; threads - all rdat immediately executed in new thread ;; limited - run rdats in immediately executed threads until NthreadsMax ;; reached, then put in mailbox ;; direct - no queuing ;; (define work-method (make-parameter 'threads)) ;; ;; struct for keeping track of others we are talking to ;; ;; ;; (defstruct pdat ;; (host-port #f) ;; (conns '()) ;; list of pcon structs, pop one off when calling the peer ;; ) ;; |
︙ | ︙ | |||
155 156 157 158 159 160 161 | ;; it then returns control ;; (define (run-listener handler-proc #!optional (port-suggestion 4242)) (let* ((uconn (make-udat))) (udat-work-proc-set! uconn handler-proc) (if (setup-listener uconn port-suggestion) (let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop")) | | > > > > | | 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 | ;; it then returns control ;; (define (run-listener handler-proc #!optional (port-suggestion 4242)) (let* ((uconn (make-udat))) (udat-work-proc-set! uconn handler-proc) (if (setup-listener uconn port-suggestion) (let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop")) (th2 (make-thread (lambda () (case (work-method) ((mailbox limited) (process-work-queue uconn)))) "Ulex work queue processor"))) (tcp-buffer-size 2048) ;; (max-connections 2048) (thread-start! th1) (thread-start! th2) (udat-cmd-thread-set! uconn th1) (udat-work-queue-thread-set! uconn th2) (print "cmd loop and process workers started, listening on "(udat-host-port uconn)".") uconn) (assert #f "ERROR: run-listener called without proper setup.")))) (define (wait-and-close uconn) (thread-join! (udat-cmd-thread uconn)) (tcp-close (udat-socket uconn))) |
︙ | ︙ | |||
237 238 239 240 241 242 243 | (mbox-timeout-result 'MBOX_TIMEOUT) (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result)) (mbox-receive-time (current-milliseconds))) ;; (put-cmbox uconn cmbox) ;; reuse mbox and cookie. is it worth it? (hash-table-delete! (udat-mboxes uconn) qrykey) (if (eq? res 'MBOX_TIMEOUT) (begin | | > > > > | | 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 | (mbox-timeout-result 'MBOX_TIMEOUT) (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result)) (mbox-receive-time (current-milliseconds))) ;; (put-cmbox uconn cmbox) ;; reuse mbox and cookie. is it worth it? (hash-table-delete! (udat-mboxes uconn) qrykey) (if (eq? res 'MBOX_TIMEOUT) (begin (print "WARNING: mbox timed out for query "cmd", with data "data", waiting for response from "host-port".") ;; here it might make sense to clean up connection records and force clean start? ;; NO. The progam using ulex needs to do the reset. Right thing here is exception #f) ;; convert to raising exception? res)) (begin (print "ERROR: Communication failed? Got "sres) #f)))))) ;;====================================================================== ;; responder side ;;====================================================================== ;; take a request, rdat, and if not immediate put it in the work queue |
︙ | ︙ | |||
288 289 290 291 292 293 294 | (print "BAD DATA? controldat=" rdat) 'ack) ;; send ack anyway? )) ;; given an already set up uconn start the cmd-loop ;; (define (ulex-cmd-loop uconn) | | > | | | | | | | | > > > > > > > > > > > > > > > | > > > > > > | > > | 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 | (print "BAD DATA? controldat=" rdat) 'ack) ;; send ack anyway? )) ;; given an already set up uconn start the cmd-loop ;; (define (ulex-cmd-loop uconn) (let* ((serv-listener (udat-socket uconn)) (listener (lambda () (let loop ((state 'start)) (let-values (((inp oup)(tcp-accept serv-listener))) (let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params) (resp (ulex-handler uconn rdat))) (if resp (serialize resp oup)) (close-input-port inp) (close-output-port oup)) (loop state)))))) ;; start N of them (let loop ((thnum 0) (threads '())) (if (< thnum 100) (let* ((th (make-thread listener (conc "listener" thnum)))) (thread-start! th) (loop (+ thnum 1) (cons th threads))) (map thread-join! threads))))) ;; add a proc to the cmd list, these are done symetrically (i.e. in all instances) ;; so that the proc can be dereferenced remotely ;; (define (set-work-handler uconn proc) (udat-work-proc-set! uconn proc)) ;;====================================================================== ;; work queues - this is all happening on the listener side ;;====================================================================== ;; rdat is (rem-host-port qrykey cmd params) (define (add-to-work-queue uconn rdat) #;(queue-add! (udat-work-queue uconn) rdat) (case (work-method) ((threads) (thread-start! (make-thread (lambda () (do-work uconn rdat)) "worker thread"))) ((mailbox) (mailbox-send! (udat-work-queue uconn) rdat)) ((direct) (do-work uconn rdat)) (else (print "ULEX ERROR: work-method "(work-method)" not recognised, using mailbox.") (mailbox-send! (udat-work-queue uconn) rdat)))) (define (do-work uconn rdat) (let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change ;; put this following into a do-work procedure (match rdat ((rem-host-port qrykey cmd params) (let* ((start-time (current-milliseconds)) (result (proc rem-host-port qrykey cmd params)) (end-time (current-milliseconds)) (run-time (- end-time start-time))) (print "ULEX: work "cmd", "params" done in "run-time" ms") ;; send 'response as cmd and result as params (send uconn rem-host-port qrykey 'response result) ;; could check for ack (print "ULEX: response sent back to "rem-host-port" in "(- (current-milliseconds) end-time)))) (MBOX_TIMEOUT #f) (else (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params"))))) ;; NEW APPROACH: ;; (define (process-work-queue uconn) (let ((wqueue (udat-work-queue uconn)) (proc (udat-work-proc uconn)) (numthr (udat-numthreads uconn))) (let loop ((thnum 1) (threads '())) (let ((thlst (cons (make-thread (lambda () |
︙ | ︙ |