Index: ulex-simple/ulex.scm ================================================================== --- ulex-simple/ulex.scm +++ ulex-simple/ulex.scm @@ -61,11 +61,11 @@ chicken.time chicken.condition chicken.string chicken.sort chicken.pretty-print - chicken.tcp + ;; chicken.tcp address-info mailbox matchable ;; queues @@ -75,13 +75,13 @@ srfi-1 srfi-18 srfi-4 srfi-69 system-information - ;; tcp6 + tcp6 typed-records - tcp-server + ;; tcp-server ) ;; udat struct, used by both caller and callee ;; instantiated as uconn by convention @@ -101,10 +101,11 @@ (avail-cmboxes '()) ;; list of ( . ) for re-use ;; threads (numthreads 50) (cmd-thread #f) (work-queue-thread #f) + (num-threads-running 0) ) ;; ;; struct for keeping track of others we are talking to ;; ;; ;; (defstruct pdat @@ -203,41 +204,28 @@ (parts (string-split host-port ":")) (host (car parts)) (port (string->number (cadr parts)))) (if isme (ulex-handler udata dat) ;; no transmission needed - ;; (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC? - ;; exn - ;; #f - (let-values (((inp oup)(tcp-connect host port))) - (let ((res (if (and inp oup) - (begin - (serialize dat oup) - (deserialize inp)) ;; yes, we always want an ack - (begin - (print "ERROR: send called but no receiver has been setup. Please call setup first!") - #f)))) - (close-input-port inp) - (close-output-port oup) - (mutex-unlock! *send-mutex*) - res))))) ;; res will always be 'ack + (let-values (((inp oup)(tcp-connect host port))) + (let ((res (if (and inp oup) + (begin + (serialize dat oup) + (deserialize inp)) ;; yes, we always want an ack + (begin + (print "ERROR: send called but no receiver has been setup. Please call setup first!") + #f)))) + (close-input-port inp) + (close-output-port oup) + (mutex-unlock! *send-mutex*) + res))))) ;; send a request to the given host-port and register a mailbox in udata ;; wait for the mailbox data and return it ;; (define (send-receive uconn host-port cmd data) - (send uconn host-port 'qrykey cmd data) - #;(cond - ((member cmd '(ping goodbye)) ;; these are immediate - (send uconn host-port 'ping cmd data)) - (else - (let* ((cmbox (get-cmbox uconn)) ;; would it be better to keep a stack of mboxes to reuse? - (qrykey (car cmbox)) - (mbox (cdr cmbox)) - (mbox-time (current-milliseconds)) - (sres (send uconn host-port qrykey cmd data))) ;; short res - sres)))) + (send uconn host-port 'qrykey cmd data)) ;;====================================================================== ;; responder side ;;====================================================================== @@ -245,44 +233,54 @@ ;; ;; Reserved cmds; ack ping goodbye response ;; (define (ulex-handler uconn rdat) (assert (list? rdat) "FATAL: ulex-handler give rdat as not list") - (match rdat ;; (string-split controldat) + (match rdat ((rem-host-port qrykey cmd params) - ;; (print "ulex-handler got: "rem-host-port" qrykey: "qrykey" cmd: "cmd" params: "params) - (let ((mbox (hash-table-ref/default (udat-mboxes uconn) qrykey #f))) - (case cmd - ;; ((ack )(print "Got ack! But why? Should NOT get here.") 'ack) - ((ping) - ;; (print "Got Ping!") - ;; (add-to-work-queue uconn rdat) - 'ack) - (else - (do-work uconn rdat))))) + (do-work uconn rdat)) (else (print "BAD DATA? controldat=" rdat) - 'ack) ;; send ack anyway? + 'bad-data) )) + +;; given an already set up uconn start the cmd-loop +;; +#;(define (ulex-cmd-loop uconn) + (let* ((serv-listener (udat-socket uconn)) + (server (make-tcp-server + serv-listener + (lambda () + (let* ((rdat (deserialize)) ;; '(my-host-port qrykey cmd params) + (resp (ulex-handler uconn rdat))) + (if resp + (serialize resp) + (write resp))))))) + (server))) ;; given an already set up uconn start the cmd-loop ;; (define (ulex-cmd-loop uconn) (let* ((serv-listener (udat-socket uconn)) - (server (make-tcp-server - serv-listener - (lambda () - (let* ((rdat (read)#;(deserialize)) ;; '(my-host-port qrykey cmd params) - (resp #;(ulex-handler uconn rdat) - (do-work uconn rdat))) - (if resp - #;(serialize resp) - (write resp) - (begin - (print "ULEX ERROR: communication error in ulex-cmd-loop.") - resp))))))) - (server))) + (listener (lambda () + (let loop ((state 'start)) + (let-values (((inp oup)(tcp-accept serv-listener))) + (let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params) + (resp (ulex-handler uconn rdat))) + (serialize resp oup) + (close-input-port inp) + (close-output-port oup)) + (loop state)))))) + ;; start N of them + (let loop ((thnum 0) + (threads '())) + (if (< thnum 100) + (let* ((th (make-thread listener (conc "listener" thnum)))) + (thread-start! th) + (loop (+ thnum 1) + (cons th threads))) + (map thread-join! threads))))) ;; add a proc to the cmd list, these are done symetrically (i.e. in all instances) ;; so that the proc can be dereferenced remotely ;; (define (set-work-handler uconn proc) @@ -309,41 +307,11 @@ (run-time (- end-time start-time))) result)) (else (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params") #f)))) - -;; below was to enable re-use of connections. This seems non-trivial so for -;; now lets open on each call -;; -;; ;; given host-port get or create peer struct -;; ;; -;; (define (udat-get-peer uconn host-port) -;; (or (hash-table-ref/default (udat-peers uconn) host-port #f) -;; ;; no peer, so create pdat and init it -;; -;; ;; NEED stack of connections, pop and use; inp, oup, -;; ;; creation_time (remove and create new if over 24hrs old -;; ;; -;; (let ((pdat (make-pdat host-port: host-port))) -;; (hash-table-set! (udat-peers uconn) host-port pdat) -;; pdat))) -;; -;; ;; is pcon alive -;; -;; ;; given host-port and pdat get a pcon -;; ;; -;; (define (pdat-get-pcon pdat host-port) -;; (let loop ((conns (pdat-conns pdat))) -;; (if (null? conns) ;; none? make and return - do NOT add - it will be pushed back on list later -;; (init-pcon (make-pcon)) -;; (let* ((conn (pop conns))) -;; -;; ;; given host-port get a pcon struct -;; ;; -;; (define (udat-get-pcon - + ;;====================================================================== ;; misc utils ;;====================================================================== (define (make-cookie uconn) Index: ulex.scm ================================================================== --- ulex.scm +++ ulex.scm @@ -18,7 +18,7 @@ ;;====================================================================== (declare (unit ulex)) -(include "ulex/ulex.scm") -;; (include "ulex-simple/ulex.scm") +;; (include "ulex/ulex.scm") +(include "ulex-simple/ulex.scm")