Index: ulex/ulex.scm ================================================================== --- ulex/ulex.scm +++ ulex/ulex.scm @@ -169,16 +169,18 @@ (my-port #f) (my-pid (current-process-id)) ;; server and handler thread (serv-listener #f) ;; this processes server info (handler-thread #f) - (handlers (make-hash-table)) - (outgoing-conns (make-hash-table)) ;; host:port -> conn (mboxes (make-hash-table)) ;; key => mbox + ;; other servers + (peers (make-hash-table)) ;; host-port => peer record + (handlers (make-hash-table)) ;; dbfile => peer record + (outgoing-conns (make-hash-table)) ;; host:port -> conn (work-queue (make-queue)) ;; most stuff goes here - (fast-queue (make-queue)) ;; super quick stuff goes here (e.g. ping) - (busy #f) ;; is either of the queues busy + ;; (fast-queue (make-queue)) ;; super quick stuff goes here (e.g. ping) + (busy #f) ;; is either of the queues busy, use to switch between queuing tasks or doing immediately ;; app info (appname #f) (dbtypes (make-hash-table)) ;; this should be an alist but hash is easier. dbtype => [ initproc syncproc ] ;; cookies (cnum 0) ;; cookie num @@ -188,12 +190,12 @@ (defstruct peer (addr-port #f) (hostname #f) (pid #f) - (inp #f) ;; input port from the peer - (oup #f) ;; output port to the peer + (inp #f) + (oup #f) (owns '()) ;; list of databases this peer is currently handling ) (defstruct work (peer-dat #f) @@ -201,12 +203,29 @@ (qrykey #f) (data #f) (start (current-milliseconds))) ;;====================================================================== -;; Captain pkt functions +;; Captain functions ;;====================================================================== + +;; NB// This needs to be started in a thread +;; +;; setup to be a captain +;; - start server +;; - create pkt +;; - start server port handler +;; +(define (setup-as-captain udata) + (if (start-server-find-port udata) ;; puts the server in udata + (if (create-captain-pkt udata) + (let* ((th (make-thread (lambda () + (ulex-handler udata)) "Captain handler"))) + (udat-handler-thread-set! udata th) + (thread-start! th)) + #f) + #f)) ;; given a pkts dir read ;; (define (get-all-captain-pkts udata) (let* ((pktsdir (let ((d (udat-cpkts-dir udata))) @@ -233,18 +252,43 @@ (if (eq? a b) (let ((az (alist-ref 'Z a)) (bz (alist-ref 'Z b))) (string>=? az bz)) (> ad bd)))))))) + +;; put the host, ip, port and pid into a pkt in +;; the captain pkts dir +;; - assumes user has already fired up a server +;; which will be in the udata struct +;; +(define (create-captain-pkt udata) + (if (not (udat-serv-listener udata)) + (begin + (print "ERROR: create-captain-pkt called with out a listener") + #f) + (let* ((pktdat `((port . ,(udat-my-port udata)) + (host . ,(udat-my-hostname udata)) + (ipaddr . ,(udat-my-address udata)) + (pid . ,(udat-my-pid udata)))) + (pktdir (udat-cpkts-dir udata)) + (pktspec (udat-cpkt-spec udata)) + ) + (udat-my-cpkt-key-set! + udata + (write-alist->pkt + pktdir + pktdat + pktspec: pktspec + ptype: 'captain)) + (udat-my-cpkt-key udata)))) ;; remove pkt associated with captn (the Z key .pkt) ;; (define (remove-captain-pkt udata captn) (let ((Z (alist-ref 'Z captn)) (cpktdir (udat-cpkts-dir udata))) (delete-file* (conc cpktdir "/" Z ".pkt")))) - ;;====================================================================== ;; server primitives ;;====================================================================== @@ -275,54 +319,14 @@ (udat-my-port-set! udata port) (udat-my-hostname-set! udata (get-host-name)) (udat-serv-listener-set! udata tlsn) udata)) -;; put the host, ip, port and pid into a pkt in -;; the captain pkts dir -;; - assumes user has already fired up a server -;; which will be in the udata struct -;; -(define (create-captain-pkt udata) - (if (not (udat-serv-listener udata)) - (begin - (print "ERROR: create-captain-pkt called with out a listener") - #f) - (let* ((pktdat `((port . ,(udat-my-port udata)) - (host . ,(udat-my-hostname udata)) - (ipaddr . ,(udat-my-address udata)) - (pid . ,(udat-my-pid udata)))) - (pktdir (udat-cpkts-dir udata)) - (pktspec (udat-cpkt-spec udata)) - ) - (udat-my-cpkt-key-set! - udata - (write-alist->pkt - pktdir - pktdat - pktspec: pktspec - ptype: 'captain)) - (udat-my-cpkt-key udata)))) - -;; NB// This needs to be started in a thread -;; -;; setup to be a captain -;; - start server -;; - create pkt -;; - start server port handler -;; -(define (setup-as-captain udata) - (if (start-server-find-port udata) ;; puts the server in udata - (if (create-captain-pkt udata) - (let* ((th (make-thread (lambda () - (ulex-handler udata)) "Captain handler"))) - (udat-handler-thread-set! udata th) - (thread-start! th)) - #f) - #f)) - (define (get-peer-dat udata host-port #!optional (hostname #f)(pid #f)) + ;; I'm currently very fuzzy on whether it makes sense to be reusing the outgoing connections. + ;; at the other end of the line I think the reciever has closed the ports - thus each message + ;; requires new connection? (let* ((pdat (or (hash-table-ref/default (udat-outgoing-conns udata) host-port #f) (handle-exceptions ;; ERROR - MAKE THIS EXCEPTION HANDLER MORE SPECIFIC exn #f (let ((npdat (make-peer addr-port: host-port))) @@ -348,11 +352,11 @@ ;; retval tells send to expect and wait for return data (one line) and return it or time out ;; this is for ping where we don't want to necessarily have set up our own server yet. ;; (define (send udata host-port handler qrykey data #!key (hostname #f)(pid #f)(params '())(retval #f)) (let-values (((inp oup)(get-peer-ports udata host-port hostname pid))) - + ;; ;; CONTROL LINE: (note: removed the hostname - I don't think it adds much value ;; ;; handlerkey host:port pid qrykey params ... ;; (if (and inp oup) @@ -417,17 +421,10 @@ (let loop ((wd (queue-remove! wqueue))) (do-work udata wd) (if (not (queue-empty? wqueue)) (loop (queue-remove! wqueue))))))) -;; send back ack - this is tcp we are talking about, do we really need an ack? -;; -;; NOTE: No need to send back host:port of self - that is locked in by qrykey -;; -(define (send-ack udata host-port qrykey) ;; #!optional (hostname #f)(pid #f)) - (send udata host-port "ack" qrykey qrykey)) ;; we must send a second line - for the ack let it be the qrykey - ;; ;; (define (ulex-handler udata) (let* ((serv-listener (udat-serv-listener udata))) (print "serv-listner: " serv-listener) @@ -445,23 +442,30 @@ (print "handlerkey: " handlerkey " host-port: " host-port " pid: " pid " qrykey: " qrykey " params: " params) (case (string->symbol handlerkey) ((ack)(print "Got ack!")) ((ping) ;; special case - return result immediately on the same connection (let* ((proc (hash-table-ref/default (udat-handlers udata) 'ping #f)) - (val (if proc (proc) "gotping"))) + (val (if proc (proc) "gotping")) + (peer (make-peer addr-port: host-port pid: pid))) + (if (not (hash-table-exists? (udat-peers udata) host-port)) + (hash-table-set! (udat-peers udata) host-port peer)) ;; save the details of this caller in peers (write-line qrykey oup) #;(send udata host-port "version" qrykey val) ) (close-input-port inp) - (close-output-port oup) - ) - ((rucaptain) - (send udata host-port "iamcaptain" qrykey (if (udat-my-cpkt-key udata) - "yes" - "no"))) + (close-output-port oup)) + ((rucaptain) ;; remote is asking if I'm the captain + (write-line (if (udat-my-cpkt-key udata) "yes" "no")) + (close-input-port inp) + (close-output-port oup)) + ((whoowns) ;; given a db name who do I send my queries to + ;; look up the file in handlers, if have an entry ping them to be sure + ;; they are still alive and then return that host:port. + ;; if no handler found or if the ping fails pick from peers the oldest that + ;; is managing the fewest dbs + #f) (else - ;; (send-ack udata host-port qrykey) (add-to-work-queue udata (get-peer-dat udata host-port) handlerkey qrykey data)))) (else (print "BAD DATA? controldat=" controldat " data=" data))))) (loop state)))) ;; add a proc to the handler list