Index: ulex/ulex.scm ================================================================== --- ulex/ulex.scm +++ ulex/ulex.scm @@ -164,10 +164,14 @@ ;; server and handler thread (serv-listener #f) ;; this processes server info (handler-thread #f) (handlers (make-hash-table)) (outgoing-conns (make-hash-table)) ;; host:port -> conn + (mboxes (make-hash-table)) ;; key => mbox + (work-queue (make-queue)) ;; most stuff goes here + (fast-queue (make-queue)) ;; super quick stuff goes here (e.g. ping) + (busy #f) ;; is either of the queues busy ;; app info (appname #f) (dbtypes (make-hash-table)) ;; this should be an alist but hash is easier. dbtype => [ initproc syncproc ] ;; cookies (cnum 0) ;; cookie num @@ -182,10 +186,17 @@ (inp #f) ;; input port from the peer (oup #f) ;; output port to the peer (owns '()) ;; list of databases this peer is currently handling ) +(defstruct work + (peer-dat #f) + (handlerkey #f) + (qrykey #f) + (data #f) + (start (current-milliseconds))) + ;;====================================================================== ;; Captain pkt functions ;;====================================================================== ;; given a pkts dir read @@ -339,12 +350,44 @@ (write-line data oup) ;; NOTE: DO NOT BE TEMPTED TO LOOK AT ANY DATA ON INP HERE! ;; (there is a listener for handling that) )) -(define (add-to-work-queue udata . blah) +;; send a request to the given host-port and register a mailbox in udata +;; wait for the mailbox data and return it +;; +(define (send-receive udata host-port handler qrykey data #!key (hostname #f)(pid #f)(params '())) + (let ((mbox (make-mailbox)) + (mbox-time (current-milliseconds)) + (mboxes (udat-mboxes udata))) + (hash-table-set! mboxes qrykey mbox) + (send udata host-port handler qrykey data hostname: hostname pid: pid params: params) + (let* ((mbox-timeout-secs 20) + (mbox-timeout-result 'MBOX_TIMEOUT) + (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result)) + (mbox-receive-time (current-milliseconds))) + (hash-table-delete! mboxes qrykey) + res))) + +(define (add-to-work-queue udata peer-dat handlerkey qrykey data) + (let ((wdat (make-work peer-dat: peer-dat handlerkey: handlerkey qrykey: qrykey data: data))) + (if (udat-busy udata) + (queue-add! (udat-work-queue udata) wdat) + (process-work udata wdat)) ;; passing in wdat tells process-work to first process the passed in wdat + )) + +(define (do-work udata wdat) #f) + +(define (process-work udata #!optional wdat) + (if wdat (do-work udata wdat)) ;; process wdat + (let ((wqueue (udat-work-queue udata))) + (if (not (queue-empty? wqueue)) + (let loop ((wd (queue-remove! wqueue))) + (do-work udata wd) + (if (not (queue-empty? wqueue)) + (loop (queue-remove! wqueue))))))) ;; send back ack - this is tcp we are talking about, do we really need an ack? ;; ;; NOTE: No need to send back host:port of self - that is locked in by qrykey ;; @@ -361,24 +404,24 @@ ;; data (let loop ((state 'start)) (let* ((controldat (read-line inp)) (data (read-line inp))) (match (string-split controldat) - ((handlerkey host:port pid qrykey cookie params ...) + ((handlerkey host:port pid qrykey params ...) (case (string->symbol handlerkey) ((ack)(print "Got ack!")) ((ping) (let* ((proc (hash-table-ref/default (udat-handlers udata) 'ping #f)) (val (if proc (proc) "gotping"))) - (send udata host:port "version" qrykey cookie val))) + (send udata host:port "version" qrykey val))) ((rucaptain) (send udata host:port "iamcaptain" qrykey (if (udat-my-cpkt-key udata) "yes" "no"))) (else - (send-ack udata host:port qrykey) - (add-to-work-queue udata (get-peer-dat udata host:port) handlerkey data))) + ;; (send-ack udata host:port qrykey) + (add-to-work-queue udata (get-peer-dat udata host:port) handlerkey qrykey data))) (else (print "BAD DATA? handler=" handlerkey " data=" data))))) (loop state))))) ;; add a proc to the handler list (define (register-handler udata key proc)