Index: tests/simplerun/debug.scm ================================================================== --- tests/simplerun/debug.scm +++ tests/simplerun/debug.scm @@ -14,23 +14,27 @@ * (import big-chicken rmtmod apimod dbmod srfi-18) (define (make-run-id) - #;(let* ((s (conc (current-process-id))) + (let* ((s (conc (current-process-id))) (l (string-length s))) - (string->number (conc (string-ref s (- l 1)))) - ) -1) + (string->number (substring s (- l 3) l)) + )) (define (run) (let* ((th1 (make-thread (lambda () - (let loop ((r 1) ;; (* 20 (make-run-id))) + (let loop ((r (make-run-id)) (i 1)) - (print "register-test "r" test"i) - (rmt:register-test r "test1" (conc "item_" i)) + (let ((start-time (current-milliseconds))) + (rmt:register-test r "test1" (conc "item_" i)) + (let ((qry-time (- (current-milliseconds) start-time))) + (if (> qry-time 500) + (print "WARNING: rmt:register-test took more than 500ms, "qry-time"ms")))) + (if (eq? (modulo i 100) 0) + (print "For run-id="r", num tests registered="i)) (if (< i 100000) (loop r (+ i 1)) (if (< r 100) (begin (print "get-tests-for-run "r) Index: ulex/ulex.scm ================================================================== --- ulex/ulex.scm +++ ulex/ulex.scm @@ -50,10 +50,11 @@ udat-port udat-host-port ;; for testing only ;; pp-uconn + work-method ;; parameter; 'threads, 'mailbox, 'limited, 'direct ) (import scheme chicken.base chicken.file @@ -97,12 +98,23 @@ (avail-cmboxes '()) ;; list of ( . ) for re-use ;; threads (numthreads 10) (cmd-thread #f) (work-queue-thread #f) + (num-threads-running 0) ) +;; Parameters + +;; work-method: +;; mailbox - all rdat goes through mailbox +;; threads - all rdat immediately executed in new thread +;; limited - run rdats in immediately executed threads until NthreadsMax +;; reached, then put in mailbox +;; direct - no queuing +;; +(define work-method (make-parameter 'threads)) ;; ;; struct for keeping track of others we are talking to ;; ;; ;; (defstruct pdat ;; (host-port #f) ;; (conns '()) ;; list of pcon structs, pop one off when calling the peer @@ -157,18 +169,22 @@ (define (run-listener handler-proc #!optional (port-suggestion 4242)) (let* ((uconn (make-udat))) (udat-work-proc-set! uconn handler-proc) (if (setup-listener uconn port-suggestion) (let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop")) - (th2 (make-thread (lambda ()(process-work-queue uconn)) "Ulex work queue processor"))) + (th2 (make-thread (lambda () + (case (work-method) + ((mailbox limited) + (process-work-queue uconn)))) + "Ulex work queue processor"))) (tcp-buffer-size 2048) ;; (max-connections 2048) (thread-start! th1) (thread-start! th2) (udat-cmd-thread-set! uconn th1) (udat-work-queue-thread-set! uconn th2) - (print "cmd loop and process workers started") + (print "cmd loop and process workers started, listening on "(udat-host-port uconn)".") uconn) (assert #f "ERROR: run-listener called without proper setup.")))) (define (wait-and-close uconn) (thread-join! (udat-cmd-thread uconn)) @@ -239,15 +255,19 @@ (mbox-receive-time (current-milliseconds))) ;; (put-cmbox uconn cmbox) ;; reuse mbox and cookie. is it worth it? (hash-table-delete! (udat-mboxes uconn) qrykey) (if (eq? res 'MBOX_TIMEOUT) (begin - (print "WARNING: mbox timed out for query "cmd", with data "data) + (print "WARNING: mbox timed out for query "cmd", with data "data", waiting for response from "host-port".") + + ;; here it might make sense to clean up connection records and force clean start? + ;; NO. The progam using ulex needs to do the reset. Right thing here is exception + #f) ;; convert to raising exception? res)) (begin - ;; (print "ERROR: Communication failed? Got "sres) + (print "ERROR: Communication failed? Got "sres) #f)))))) ;;====================================================================== ;; responder side ;;====================================================================== @@ -290,19 +310,29 @@ )) ;; given an already set up uconn start the cmd-loop ;; (define (ulex-cmd-loop uconn) - (let* ((serv-listener (udat-socket uconn))) - (let loop ((state 'start)) - (let-values (((inp oup)(tcp-accept serv-listener))) - (let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params) - (resp (ulex-handler uconn rdat))) - (if resp (serialize resp oup)) - (close-input-port inp) - (close-output-port oup)) - (loop state))))) + (let* ((serv-listener (udat-socket uconn)) + (listener (lambda () + (let loop ((state 'start)) + (let-values (((inp oup)(tcp-accept serv-listener))) + (let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params) + (resp (ulex-handler uconn rdat))) + (if resp (serialize resp oup)) + (close-input-port inp) + (close-output-port oup)) + (loop state)))))) + ;; start N of them + (let loop ((thnum 0) + (threads '())) + (if (< thnum 100) + (let* ((th (make-thread listener (conc "listener" thnum)))) + (thread-start! th) + (loop (+ thnum 1) + (cons th threads))) + (map thread-join! threads))))) ;; add a proc to the cmd list, these are done symetrically (i.e. in all instances) ;; so that the proc can be dereferenced remotely ;; (define (set-work-handler uconn proc) @@ -314,11 +344,23 @@ ;; rdat is (rem-host-port qrykey cmd params) (define (add-to-work-queue uconn rdat) #;(queue-add! (udat-work-queue uconn) rdat) - (mailbox-send! (udat-work-queue uconn) rdat)) + (case (work-method) + ((threads) + (thread-start! (make-thread (lambda () + (do-work uconn rdat)) + "worker thread"))) + ((mailbox) + (mailbox-send! (udat-work-queue uconn) rdat)) + ((direct) + (do-work uconn rdat)) + (else + (print "ULEX ERROR: work-method "(work-method)" not recognised, using mailbox.") + (mailbox-send! (udat-work-queue uconn) rdat)))) + (define (do-work uconn rdat) (let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change ;; put this following into a do-work procedure (match rdat @@ -332,11 +374,13 @@ (send uconn rem-host-port qrykey 'response result) ;; could check for ack (print "ULEX: response sent back to "rem-host-port" in "(- (current-milliseconds) end-time)))) (MBOX_TIMEOUT #f) (else (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params"))))) - + +;; NEW APPROACH: +;; (define (process-work-queue uconn) (let ((wqueue (udat-work-queue uconn)) (proc (udat-work-proc uconn)) (numthr (udat-numthreads uconn))) (let loop ((thnum 1)