Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -165,12 +165,12 @@ ;; ;; ulex parameters ;; (work-method 'direct) ;; (return-method 'direct) ;; ulex parameters - (work-method 'mailbox) - (return-method 'mailbox) +;; (work-method 'mailbox) +;; (return-method 'mailbox) ;; fake out readline usage of toplevel-command (define (toplevel-command . a) #f) (define *didsomething* #f) (define *db* #f) ;; this is only for the repl, do not use in general!!!! Index: rmtmod.scm ================================================================== --- rmtmod.scm +++ rmtmod.scm @@ -113,15 +113,15 @@ ;; info about me as a listener and my connections to db servers ;; stored (for now) in *db-serv-info* ;; (defstruct servdat - (host #f) + (host (get-host-name)) (port #f) (uuid #f) (dbfile #f) - (uconn #f) ;; this is the listener *FOR THIS PROCESS* + (uconn (make-udat host: (get-host-name))) ;; this is the ulex record *FOR THIS PROCESS* (mode #f) (status 'starting) (trynum 0) ;; count the number of ports we've tried (conns (make-hash-table)) ;; apath/dbname => conndat ) @@ -198,29 +198,12 @@ ;; (define (rmt:open-main-connection remdat apath) (let* ((fullpath (db:dbname->path apath ".db/main.db")) (conns (servdat-conns remdat)) (conn (rmt:get-conn remdat apath ".db/main.db")) ;; (hash-table-ref/default conns fullpath #f)) ;; TODO - create call for this - (start-rmt:run (lambda () - (let* ((th1 (make-thread (lambda ()(rmt:run (get-host-name))) "non-db mode server"))) - (thread-start! th1) - (thread-sleep! 1) - (let loop ((count 0)) - (assert (< count 30) "FATAL: responder failed to initialize in rmt:open-main-connection") - (if (or (not *db-serv-info*) - (not (servdat-uconn *db-serv-info*))) - (begin - (thread-sleep! 1) - (loop (+ count 1))) - (begin - (servdat-mode-set! *db-serv-info* 'non-db) - (servdat-uconn *db-serv-info*))))))) - (myconn (servdat-uconn *db-serv-info*))) + (myconn (servdat-uconn remdat))) (cond - ((not myconn) - (start-rmt:run) - (rmt:open-main-connection remdat apath)) ((and conn ;; conn is NOT a socket, just saying ... (< (current-seconds) (conndat-expires conn))) #t) ;; we are current and good to go - we'll deal elsewhere with a server that was killed or died ((and conn (>= (current-seconds)(conndat-expires conn))) @@ -248,11 +231,10 @@ (let* ((srv-addr (server-address the-srv)) ;; need serv (ipaddr (alist-ref 'ipaddr the-srv)) (port (alist-ref 'port the-srv)) (srvkey (alist-ref 'servkey the-srv)) (fullpath (db:dbname->path apath dbname)) - (new-the-srv (make-conndat apath: apath dbname: dbname fullname: fullpath hostport: srv-addr @@ -359,22 +341,16 @@ ;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed ;; sometime in the future ;; (define (rmt:send-receive-real sinfo apath dbname cmd params) - (assert (not (eq? 'primordial (thread-name (current-thread)))) "FATAL: Do not call rmt:send-receive-real in the primodial thread.") (let* ((cdat (rmt:get-conn sinfo apath dbname))) (assert cdat "FATAL: rmt:send-receive-real called without the needed channels opened") (let* ((uconn (servdat-uconn sinfo)) ;; get the interface to ulex ;; then send-receive using the ulex layer to host-port stored in cdat - (res (send-receive uconn (conndat-hostport cdat) cmd params)) - #;(th1 (make-thread (lambda () - (set! res (send-receive uconn (conndat-hostport cdat) cmd params))) - "send-receive thread"))) - ;; (thread-start! th1) - ;; (thread-join! th1) ;; gratuitious thread stuff is so that mailbox is not used in primordial thead - ;; since we accessed the server we can bump the expires time up + (res (send-receive uconn (conndat-hostport cdat) cmd params))) + ;; since we accessed the server we can bump the expires time up (conndat-expires-set! cdat (+ (current-seconds) (server:expiration-timeout) -2)) ;; two second margin for network time misalignments etc. res))) @@ -431,18 +407,127 @@ ;;====================================================================== ;; S E R V E R ;;====================================================================== + +(define (http-get-function fnkey) + (hash-table-ref/default *http-functions* fnkey (lambda () "nothing here yet"))) + +(define *rmt:run-mutex* (make-mutex)) +(define *rmt:run-flag* #f) + +;; Main entry point to start a server. was start-server +(define (rmt:run hostn) + (assert (args:get-arg "-server") "FATAL: rmt:run called on non-server process") + (mutex-lock! *rmt:run-mutex*) + (if *rmt:run-flag* + (begin + (debug:print-warn 0 *default-log-port* "rmt:run already running.") + (mutex-unlock! *rmt:run-mutex*)) + (begin + (set! *rmt:run-flag* #t) + (mutex-unlock! *rmt:run-mutex*) + ;; ;; Configurations for server + ;; (tcp-buffer-size 2048) + ;; (max-connections 2048) + (debug:print 0 *default-log-port* "PID: "(current-process-id)". Attempting to start server ...") + (if (and *db-serv-info* + (servdat-port *db-serv-info*)) + (let* ((uconn (servdat-uconn *db-serv-info*))) + (wait-and-close uconn)) + (let* ((port (portlogger:open-run-close portlogger:find-port)) + (handler-proc (lambda (rem-host-port qrykey cmd params) ;; + (set! *db-last-access* (current-seconds)) + (assert (list? params) "FATAL: handler called with non-list params") + (assert (args:get-arg "-server") "FATAL: handler called on non-server side. cmd="cmd", params="params) + (debug:print 0 *default-log-port* "handler call: "cmd", params="params) + (api:execute-requests *dbstruct-db* cmd params)))) + ;; (api:process-request *dbstuct-db* + (if (not *db-serv-info*) + (set! *db-serv-info* (make-servdat host: hostn port: port))) + (let* ((uconn (run-listener handler-proc port)) + (rport (udat-port uconn))) ;; the real port + (servdat-host-set! *db-serv-info* hostn) + (servdat-port-set! *db-serv-info* rport) + (servdat-uconn-set! *db-serv-info* uconn) + (wait-and-close uconn) + (db:print-current-query-stats) + ))) + (let* ((host (servdat-host *db-serv-info*)) + (port (servdat-port *db-serv-info*)) + (mode (or (servdat-mode *db-serv-info*) + "non-db"))) + ;; server exit stuff here + ;; (rmt:server-shutdown host port) - always do in on-exit + ;; (portlogger:open-run-close portlogger:set-port port "released") ;; moved to on-exit + (debug:print-info 0 *default-log-port* "Server "host":"port" mode "mode"shutdown complete. Exiting") + )))) + +;;====================================================================== +;; S E R V E R U T I L I T I E S +;;====================================================================== + +;; host and port are used to ensure we are remove proper records +(define (rmt:server-shutdown host port) + (let ((dbfile (servdat-dbfile *db-serv-info*))) + (debug:print-info 0 *default-log-port* "dbfile is "dbfile) + (if dbfile + (let* ((am-server (args:get-arg "-server")) + (dbfile (args:get-arg "-db")) + (apath *toppath*) + #;(sinfo *remotedat*)) ;; foundation for future fix + (if *dbstruct-db* + (let* ((dbdat (db:get-dbdat *dbstruct-db* apath dbfile)) + (db (dbr:dbdat-db dbdat)) + (inmem (dbr:dbdat-db dbdat)) ;; WRONG + ) + ;; do a final sync here + (debug:print-info 0 *default-log-port* "Doing final sync for "apath" "dbfile" at "(current-seconds)) + (db:sync-inmem->disk *dbstruct-db* apath dbfile force-sync: #t) + ;; let's finalize here + (debug:print-info 0 *default-log-port* "Finalizing db and inmem") + (if (sqlite3:database? db) + (sqlite3:finalize! db) + (debug:print-info 0 *default-log-port* "in rmt:server-shutdown, db is not a database, not finalizing...")) + (if (sqlite3:database? inmem) + (sqlite3:finalize! inmem) + (debug:print-info 0 *default-log-port* "in rmt:server-shutdown, inmem is not a database, not finalizing...")) + (debug:print-info 0 *default-log-port* "Finalizing db and inmem complete")) + (debug:print-info 0 *default-log-port* "Db was never opened, no cleanup to do.")) + (if (not am-server) + (debug:print-info 0 *default-log-port* "I am not a server, should NOT get here!") + (if (string-match ".*/main.db$" dbfile) + (let ((pkt-file (conc (get-pkts-dir *toppath*) + "/" (servdat-uuid *db-serv-info*) + ".pkt"))) + (debug:print-info 0 *default-log-port* "removing pkt "pkt-file) + (delete-file* pkt-file) + (debug:print-info 0 *default-log-port* "Releasing lock (if any) for "dbfile ", host "host", port "port) + (db:with-lock-db + (servdat-dbfile *db-serv-info*) + (lambda (dbh dbfile) + (db:release-lock dbh dbfile host port)))) ;; I'm not the server - should not have a lock to remove + (let* ((sdat *db-serv-info*) ;; we have a run-id server + (host (servdat-host sdat)) + (port (servdat-port sdat)) + (uuid (servdat-uuid sdat)) + (res (rmt:deregister-server *db-serv-info* *toppath* host port uuid dbfile))) + (debug:print-info 0 *default-log-port* "deregistered-server, res="res) + (debug:print-info 0 *default-log-port* "deregistering server "host":"port" with uuid "uuid) + ))))))) + (define (rmt:kill-server run-id) (rmt:send-receive 'kill-server #f (list run-id))) (define (rmt:start-server run-id) (rmt:send-receive 'start-server #f (list run-id))) (define (rmt:server-info apath dbname) (rmt:send-receive 'get-server-info #f (list apath dbname))) + + ;;====================================================================== ;; M I S C ;;====================================================================== @@ -1503,59 +1588,10 @@ (define (common:api-changed?) (not (equal? (substring (->string megatest-version) 0 4) (substring (conc (common:get-last-run-version)) 0 4)))) -;; host and port are used to ensure we are remove proper records -(define (rmt:server-shutdown host port) - (let ((dbfile (servdat-dbfile *db-serv-info*))) - (debug:print-info 0 *default-log-port* "dbfile is "dbfile) - (if dbfile - (let* ((am-server (args:get-arg "-server")) - (dbfile (args:get-arg "-db")) - (apath *toppath*) - #;(sinfo *remotedat*)) ;; foundation for future fix - (if *dbstruct-db* - (let* ((dbdat (db:get-dbdat *dbstruct-db* apath dbfile)) - (db (dbr:dbdat-db dbdat)) - (inmem (dbr:dbdat-db dbdat)) ;; WRONG - ) - ;; do a final sync here - (debug:print-info 0 *default-log-port* "Doing final sync for "apath" "dbfile" at "(current-seconds)) - (db:sync-inmem->disk *dbstruct-db* apath dbfile force-sync: #t) - ;; let's finalize here - (debug:print-info 0 *default-log-port* "Finalizing db and inmem") - (if (sqlite3:database? db) - (sqlite3:finalize! db) - (debug:print-info 0 *default-log-port* "in rmt:server-shutdown, db is not a database, not finalizing...")) - (if (sqlite3:database? inmem) - (sqlite3:finalize! inmem) - (debug:print-info 0 *default-log-port* "in rmt:server-shutdown, inmem is not a database, not finalizing...")) - (debug:print-info 0 *default-log-port* "Finalizing db and inmem complete")) - (debug:print-info 0 *default-log-port* "Db was never opened, no cleanup to do.")) - (if (not am-server) - (debug:print-info 0 *default-log-port* "I am not a server, should NOT get here!") - (if (string-match ".*/main.db$" dbfile) - (let ((pkt-file (conc (get-pkts-dir *toppath*) - "/" (servdat-uuid *db-serv-info*) - ".pkt"))) - (debug:print-info 0 *default-log-port* "removing pkt "pkt-file) - (delete-file* pkt-file) - (debug:print-info 0 *default-log-port* "Releasing lock (if any) for "dbfile ", host "host", port "port) - (db:with-lock-db - (servdat-dbfile *db-serv-info*) - (lambda (dbh dbfile) - (db:release-lock dbh dbfile host port)))) ;; I'm not the server - should not have a lock to remove - (let* ((sdat *db-serv-info*) ;; we have a run-id server - (host (servdat-host sdat)) - (port (servdat-port sdat)) - (uuid (servdat-uuid sdat)) - (res (rmt:deregister-server *db-serv-info* *toppath* host port uuid dbfile))) - (debug:print-info 0 *default-log-port* "deregistered-server, res="res) - (debug:print-info 0 *default-log-port* "deregistering server "host":"port" with uuid "uuid) - ))))))) - (define (std-exit-procedure) ;;(common:telemetry-log-close) (on-exit (lambda () 0)) ;;(debug:print-info 13 *default-log-port* "std-exit-procedure called; *time-to-exit*="*time-to-exit*) (let ((no-hurry (if (bdat-time-to-exit *bdat*) ;; hurry up @@ -1639,67 +1675,10 @@ (conc "http://" (car hostport) ":" (cadr hostport)))) ;;====================================================================== ;; S E R V E R ;; ====================================================================== - -(define (http-get-function fnkey) - (hash-table-ref/default *http-functions* fnkey (lambda () "nothing here yet"))) - -(define *rmt:run-mutex* (make-mutex)) -(define *rmt:run-flag* #f) - -;; Main entry point to start a server. was start-server -(define (rmt:run hostn) - (mutex-lock! *rmt:run-mutex*) - (if *rmt:run-flag* - (begin - (debug:print-warn 0 *default-log-port* "rmt:run already running.") - (mutex-unlock! *rmt:run-mutex*)) - (begin - (set! *rmt:run-flag* #t) - (mutex-unlock! *rmt:run-mutex*) - ;; ;; Configurations for server - ;; (tcp-buffer-size 2048) - ;; (max-connections 2048) - (debug:print 2 *default-log-port* "PID: "(current-process-id)". Attempting to start the server ...") - (if (and *db-serv-info* - (servdat-uconn *db-serv-info*)) - (let* ((uconn (servdat-uconn *db-serv-info*))) - (wait-and-close uconn)) - (let* ((port (portlogger:open-run-close portlogger:find-port)) - (handler-proc (lambda (rem-host-port qrykey cmd params) ;; - (set! *db-last-access* (current-seconds)) - (assert (list? params) "FATAL: handler called with non-list params") - (assert (args:get-arg "-server") "FATAL: handler called on non-server side. cmd="cmd", params="params) - (debug:print 0 *default-log-port* "handler call: "cmd", params="params) - (api:execute-requests *dbstruct-db* cmd params)))) - ;; (api:process-request *dbstuct-db* - (if (not *db-serv-info*) - (set! *db-serv-info* (make-servdat host: hostn port: port))) - (let* ((uconn (run-listener handler-proc port)) - (rport (udat-port uconn))) ;; the real port - (servdat-host-set! *db-serv-info* hostn) - (servdat-port-set! *db-serv-info* rport) - (servdat-uconn-set! *db-serv-info* uconn) - (wait-and-close uconn) - (db:print-current-query-stats) - ))) - (let* ((host (servdat-host *db-serv-info*)) - (port (servdat-port *db-serv-info*)) - (mode (or (servdat-mode *db-serv-info*) - "non-db"))) - ;; server exit stuff here - ;; (rmt:server-shutdown host port) - always do in on-exit - ;; (portlogger:open-run-close portlogger:set-port port "released") ;; moved to on-exit - (debug:print-info 0 *default-log-port* "Server "host":"port" mode "mode"shutdown complete. Exiting") - )))) - -;;====================================================================== -;; S E R V E R U T I L I T I E S -;;====================================================================== - ;;====================================================================== ;; C L I E N T S ;;====================================================================== (define (rmt:get-time-to-cleanup) @@ -1816,19 +1795,11 @@ (define (server-address srv-pkt) (conc (alist-ref 'host srv-pkt) ":" (alist-ref 'port srv-pkt))) (define (server-ready? uconn host-port key) ;; server-address is host:port - (let* ((params `((cmd . ping)(key . ,key))) - (data `((cmd . ping) - (key . ,key) - (params . ,params))) ;; I don't get it. - (res (send-receive uconn host-port 'ping data))) - (if (eq? res 'ack) ;; yep, likely it is who we want on the other end - res - #f))) -;; (begin (debug:print-info 0 *default-log-port* "server-ready? => "res) #f)))) + (send-receive uconn host-port 'ping '())) ; from the pkts return servers associated with dbpath ;; NOTE: Only one can be alive - have to check on each ;; in the list of pkts returned ;; @@ -1848,11 +1819,12 @@ (let* ((host (alist-ref 'host pkt)) (port (alist-ref 'port pkt)) (host-port (conc host":"port)) (key (alist-ref 'servkey pkt)) (pktz (alist-ref 'Z pkt)) - (res (server-ready? uconn host-port key))) + (res (or (equal? host-port (udat-host-port uconn)) ;; might be it is me who is the server + (server-ready? uconn host-port key)))) (if res res (let* ((pktsdir (get-pkts-dir *toppath*)) (pktpath (conc pktsdir"/"pktz".pkt"))) (debug:print 0 *default-log-port* "WARNING: pkt with no server "pktpath) @@ -2198,37 +2170,38 @@ (sexpr->string 'quit)))))))))) (define (rmt:get-reasonable-hostname) (let* ((inhost (or (args:get-arg "-server") "-"))) (if (equal? inhost "-") - (get-host-name) + (get-host-name) ;; (get-my-best-address) inhost))) ;; Call this to start the actual server ;; ;; all routes though here end in exit ... ;; ;; This is the point at which servers are started ;; (define (rmt:server-launch dbname) + (assert (args:get-arg "-server") "FATAL: rmt:server-launch called in non-server process.") (debug:print-info 0 *default-log-port* "Entered rmt:server-launch") (let* ((th2 (make-thread (lambda () (debug:print-info 0 *default-log-port* "Server run thread started") (rmt:run (rmt:get-reasonable-hostname))) "Server run")) (th3 (make-thread (lambda () (debug:print-info 0 *default-log-port* "Server monitor thread started") (if (args:get-arg "-server") (rmt:keep-running dbname))) - "Keep running"))) + "Keep running"))) (thread-start! th2) (thread-sleep! 0.252) ;; give the server time to settle before starting the keep-running monitor. (thread-start! th3) (set! *didsomething* #t) (thread-join! th2) - (thread-join! th3)) - #f) + (thread-join! th3) + #f)) ;; Generate a unique signature for this process, used at both client and ;; server side (define (rmt:mk-signature) (message-digest-string (md5-primitive) Index: tests/tests.scm ================================================================== --- tests/tests.scm +++ tests/tests.scm @@ -27,12 +27,12 @@ ulex ) (define test-work-dir (current-directory)) -(work-method 'mailbox) ;; threads, direct, mailbox -(return-method 'mailbox) ;; polling, mailbox, direct +;; (work-method 'mailbox) ;; threads, direct, mailbox +;; (return-method 'mailbox) ;; polling, mailbox, direct ;; given list of lists ;; ( ( msg expected param1 param2 ...) ;; ( ... ) ) ;; apply test to all Index: ulex-simple/ulex.scm ================================================================== --- ulex-simple/ulex.scm +++ ulex-simple/ulex.scm @@ -24,11 +24,12 @@ ;; Why sql-de-lite and not say, dbi? - performance mostly, then simplicity. ;; ;;====================================================================== (module ulex - ( + * + #;( ;; NOTE: looking for the handler proc - find the run-listener :) run-listener ;; (run-listener handler-proc [port]) => uconn @@ -60,10 +61,11 @@ chicken.time chicken.condition chicken.string chicken.sort chicken.pretty-print + ;; chicken.tcp address-info mailbox matchable ;; queues @@ -75,10 +77,12 @@ srfi-4 srfi-69 system-information tcp6 typed-records + ;; tcp-server + ) ;; udat struct, used by both caller and callee ;; instantiated as uconn by convention ;; @@ -97,27 +101,12 @@ (avail-cmboxes '()) ;; list of ( . ) for re-use ;; threads (numthreads 50) (cmd-thread #f) (work-queue-thread #f) - ) - -;; ;; struct for keeping track of others we are talking to -;; ;; -;; (defstruct pdat -;; (host-port #f) -;; (conns '()) ;; list of pcon structs, pop one off when calling the peer -;; ) -;; -;; ;; struct for peer connections, keep track of expiration etc. -;; ;; -;; (defstruct pcon -;; (inp #f) -;; (oup #f) -;; (exp (+ (current-seconds) 59)) ;; expires at this time, set to (+ (current-seconds) 59) -;; (lifetime (+ (current-seconds) 600)) ;; throw away and create new after five minutes -;; ) + (num-threads-running 0) + ) ;;====================================================================== ;; listener ;;====================================================================== @@ -135,19 +124,22 @@ ;; (define (setup-listener uconn #!optional (port 4242)) (handle-exceptions exn (if (< port 65535) - (setup-listener uconn (+ port 1)) + (begin + (thread-sleep! 0.1) ;; I'm not sure this helps but give the OS some time to do it's thing + (print "ULEX INFO: skipping port already in use "port) + (setup-listener uconn (+ port 1))) #f) (connect-listener uconn port))) (define (connect-listener uconn port) ;; (tcp-listener-socket LISTENER)(socket-name so) ;; sockaddr-address, sockaddr-port, sockaddr->string (let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]]) - (addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname))) + (addr (get-host-name))) ;; (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname))) (udat-port-set! uconn port) (udat-host-port-set! uconn (conc addr":"port)) (udat-socket-set! uconn tlsn) uconn)) @@ -155,26 +147,27 @@ ;; it then returns control ;; (define (run-listener handler-proc #!optional (port-suggestion 4242)) (let* ((uconn (make-udat))) (udat-work-proc-set! uconn handler-proc) + (tcp-buffer-size 2048) (if (setup-listener uconn port-suggestion) - (let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop")) - #;(th2 (make-thread (lambda ()(process-work-queue uconn)) "Ulex work queue processor"))) - (tcp-buffer-size 2048) - ;; (max-connections 2048) + (let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop"))) (thread-start! th1) - #;(thread-start! th2) (udat-cmd-thread-set! uconn th1) - #;(udat-work-queue-thread-set! uconn th2) - (print "cmd loop and process workers started") + (print "cmd loop started") uconn) (assert #f "ERROR: run-listener called without proper setup.")))) (define (wait-and-close uconn) + (let loop () + (if (not (udat-cmd-thread uconn)) + (begin + (thread-sleep! 1) + (loop)))) (thread-join! (udat-cmd-thread uconn)) - (tcp-close (udat-socket uconn))) + #;(tcp-close (udat-socket uconn))) ;;====================================================================== ;; peers and connections ;;====================================================================== @@ -189,48 +182,33 @@ ;; ;; NOTE: see below for beginnings of code to allow re-use of tcp connections ;; - I believe (without substantial evidence) that re-using connections will ;; be beneficial ... ;; -(define (send udata host-port qrykey cmd params) +(define (send-receive udata host-port cmd params) (mutex-lock! *send-mutex*) (let* ((my-host-port (udat-host-port udata)) ;; remote will return to this - (isme #f #;(equal? host-port my-host-port)) ;; calling myself? + (isme (equal? host-port my-host-port)) ;; calling myself? ;; dat is a self-contained work block that can be sent or handled locally - (dat (list my-host-port qrykey cmd params))) + (dat (list my-host-port cmd params)) + (parts (string-split host-port ":")) + (host (car parts)) + (port (string->number (cadr parts)))) (if isme (ulex-handler udata dat) ;; no transmission needed - (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC? - exn - #f - (let-values (((inp oup)(tcp-connect host-port))) - (let ((res (if (and inp oup) - (begin - (serialize dat oup) - (deserialize inp)) ;; yes, we always want an ack - (begin - (print "ERROR: send called but no receiver has been setup. Please call setup first!") - #f)))) - (close-input-port inp) - (close-output-port oup) - (mutex-unlock! *send-mutex*) - res)))))) ;; res will always be 'ack - -;; send a request to the given host-port and register a mailbox in udata -;; wait for the mailbox data and return it -;; -(define (send-receive uconn host-port cmd data) - (cond - ((member cmd '(ping goodbye)) ;; these are immediate - (send uconn host-port 'ping cmd data)) - (else - (let* ((cmbox (get-cmbox uconn)) ;; would it be better to keep a stack of mboxes to reuse? - (qrykey (car cmbox)) - (mbox (cdr cmbox)) - (mbox-time (current-milliseconds)) - (sres (send uconn host-port qrykey cmd data))) ;; short res - sres)))) + (let-values (((inp oup)(tcp-connect host-port))) + (let ((res (if (and inp oup) + (begin + (serialize dat oup) + (deserialize inp)) ;; yes, we always want an ack + (begin + (print "ERROR: send called but no receiver has been setup. Please call setup first!") + #f)))) + (close-input-port inp) + (close-output-port oup) + (mutex-unlock! *send-mutex*) + res))))) ;;====================================================================== ;; responder side ;;====================================================================== @@ -238,57 +216,54 @@ ;; ;; Reserved cmds; ack ping goodbye response ;; (define (ulex-handler uconn rdat) (assert (list? rdat) "FATAL: ulex-handler give rdat as not list") - (match rdat ;; (string-split controldat) - ((rem-host-port qrykey cmd params) - ;; (print "ulex-handler got: "rem-host-port" qrykey: "qrykey" cmd: "cmd" params: "params) - (let ((mbox (hash-table-ref/default (udat-mboxes uconn) qrykey #f))) - (case cmd - ;; ((ack )(print "Got ack! But why? Should NOT get here.") 'ack) - ((ping) - ;; (print "Got Ping!") - ;; (add-to-work-queue uconn rdat) - 'ack) - (else - (do-work uconn rdat))))) + (match rdat + ((rem-host-port cmd params) + (do-work uconn rdat)) (else (print "BAD DATA? controldat=" rdat) - 'ack) ;; send ack anyway? + 'bad-data) )) + +;; given an already set up uconn start the cmd-loop +;; +#;(define (ulex-cmd-loop uconn) + (let* ((serv-listener (udat-socket uconn)) + (server (make-tcp-server + serv-listener + (lambda () + (let* ((rdat (deserialize)) ;; '(my-host-port qrykey cmd params) + (resp (ulex-handler uconn rdat))) + (if resp + (serialize resp) + (write resp))))))) + (server))) ;; given an already set up uconn start the cmd-loop ;; (define (ulex-cmd-loop uconn) - (let* ((serv-listener (udat-socket uconn))) - (let loop ((state 'start)) - (let-values (((inp oup)(tcp-accept serv-listener))) - (let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params) - (resp (ulex-handler uconn rdat))) - (if resp (serialize resp oup)) - (close-input-port inp) - (close-output-port oup)) - (loop state))))) -;;(define (ulex-cmd-loop uconn) -;; (let* ((serv-listener (udat-socket uconn)) -;; ;; (old-listener (lambda () -;; ;; (let loop ((state 'start)) -;; ;; (let-values (((inp oup)(tcp-accept serv-listener))) -;; ;; (let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params) -;; ;; (resp (ulex-handler uconn rdat))) -;; ;; (if resp (serialize resp oup)) -;; ;; (close-input-port inp) -;; ;; (close-output-port oup)) -;; ;; (loop state))))) -;; (server (make-tcp-server -;; serv-listener -;; (lambda () -;; (let* ((rdat (deserialize )) ;; '(my-host-port qrykey cmd params) -;; (resp (ulex-handler uconn rdat))) -;; (if resp (serialize resp) resp)))))) -;; (server))) + (let* ((serv-listener (udat-socket uconn)) + (listener (lambda () + (let loop ((state 'start)) + (let-values (((inp oup)(tcp-accept serv-listener))) + (let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params) + (resp (ulex-handler uconn rdat))) + (serialize resp oup) + (close-input-port inp) + (close-output-port oup)) + (loop state)))))) + ;; start N of them + (let loop ((thnum 0) + (threads '())) + (if (< thnum 100) + (let* ((th (make-thread listener (conc "listener" thnum)))) + (thread-start! th) + (loop (+ thnum 1) + (cons th threads))) + (map thread-join! threads))))) ;; add a proc to the cmd list, these are done symetrically (i.e. in all instances) ;; so that the proc can be dereferenced remotely ;; (define (set-work-handler uconn proc) @@ -306,72 +281,22 @@ (define (do-work uconn rdat) (let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change ;; put this following into a do-work procedure (match rdat - ((rem-host-port qrykey cmd params) + ((rem-host-port cmd params) (let* ((start-time (current-milliseconds)) - (result (proc rem-host-port qrykey cmd params)) + (result (proc rem-host-port cmd params)) (end-time (current-milliseconds)) (run-time (- end-time start-time))) + (if (> run-time 1000) + (print "ULEX INFO: Note that "cmd" with params "params" took "run-time"ms to complete.")) result)) (else (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params") #f)))) - -(define (process-work-queue uconn) - (let ((wqueue (udat-work-queue uconn)) - (proc (udat-work-proc uconn)) - (numthr (udat-numthreads uconn))) - (let loop ((thnum 1) - (threads '())) - (let ((thlst (cons (make-thread (lambda () - (let work-loop () - (let ((rdat (mailbox-receive! wqueue 24000 'MBOX_TIMEOUT))) - (do-work uconn rdat)) - (work-loop))) - (conc "work thread " thnum)) - threads))) - (if (< thnum numthr) - (loop (+ thnum 1) - thlst) - (begin - (print "ULEX: Starting "(length thlst)" worker threads.") - (map thread-start! thlst) - (print "ULEX: Threads started. Joining all.") - (map thread-join! thlst))))))) - -;; below was to enable re-use of connections. This seems non-trivial so for -;; now lets open on each call -;; -;; ;; given host-port get or create peer struct -;; ;; -;; (define (udat-get-peer uconn host-port) -;; (or (hash-table-ref/default (udat-peers uconn) host-port #f) -;; ;; no peer, so create pdat and init it -;; -;; ;; NEED stack of connections, pop and use; inp, oup, -;; ;; creation_time (remove and create new if over 24hrs old -;; ;; -;; (let ((pdat (make-pdat host-port: host-port))) -;; (hash-table-set! (udat-peers uconn) host-port pdat) -;; pdat))) -;; -;; ;; is pcon alive -;; -;; ;; given host-port and pdat get a pcon -;; ;; -;; (define (pdat-get-pcon pdat host-port) -;; (let loop ((conns (pdat-conns pdat))) -;; (if (null? conns) ;; none? make and return - do NOT add - it will be pushed back on list later -;; (init-pcon (make-pcon)) -;; (let* ((conn (pop conns))) -;; -;; ;; given host-port get a pcon struct -;; ;; -;; (define (udat-get-pcon - + ;;====================================================================== ;; misc utils ;;====================================================================== (define (make-cookie uconn) @@ -432,9 +357,9 @@ (sort (get-all-ips) ip-pref-less?)) (define (get-all-ips) (map address-info-host (filter (lambda (x) - (equal? (address-info-type x) "tcp")) + (equal? (address-info-type x) 'tcp)) (address-infos (get-host-name))))) ) Index: ulex.scm ================================================================== --- ulex.scm +++ ulex.scm @@ -18,7 +18,7 @@ ;;====================================================================== (declare (unit ulex)) -(include "ulex/ulex.scm") -;; (include "ulex-simple/ulex.scm") +;; (include "ulex/ulex.scm") +(include "ulex-simple/ulex.scm")