(import (chicken io)
(chicken file)
(chicken file posix)
(chicken string)
(chicken process-context)
(chicken process-context posix)
miscmacros
nng
srfi-18
srfi-69
test
matchable
typed-records
system-information
directory-utils
)
(define help "Usage: nng-test COMMAND
where COMMAND is one of:
dotest : run the basic req/rep test
")
(define address-tcp-1 "tcp://localhost:5555")
(define address-tcp-2 "tcp://localhost:6666")
(define address-inproc-1 "inproc://local1")
(define address-inproc-2 "inproc://local2")
;;;
;;; Req-Rep
;;;
(define (make-listening-reply-socket address)
(let ((socket (make-rep-socket)))
(socket-set! socket 'nng/recvtimeo 2000)
(nng-listen socket address)
socket))
(define (make-dialed-request-socket address)
(let ((socket (make-req-socket)))
(socket-set! socket 'nng/recvtimeo 2000)
(nng-dial socket address)
socket))
(define (req-rep-test address)
(let ((rep (make-listening-reply-socket address))
(req (make-dialed-request-socket address)))
(nng-send req "message 1")
(nng-recv rep)
(nng-send rep "message")
(begin0
(nng-recv req)
(nng-close! rep))))
(define (do-test)
(test-group "nng"
(test "tcp req-rep"
"message"
(req-rep-test address-tcp-1))
(test "inproc req-rep"
"message"
(req-rep-test address-inproc-1)))
(test-exit))
;; talking to self here...
;;
(define (send-n-messages n srvdat)
(let* ((name (srv-name srvdat)))
(let loop ((i 0))
(if (< i n)
(begin
(nng-send (srv-req srvdat) (conc name "-" i))
(print "received: "(nng-recv (srv-rep srvdat)))
(loop (+ i 1)))))))
;; this should be run in a thread
(define (run-listener-responder socket myaddr)
(let loop ((status 'running))
(let* ((msg (nng-recv socket))
(response (process-message msg)))
(if (not (eq? response 'done))
(begin
(nng-send socket response)
(loop status))))))
(define *channels* (make-hash-table))
(define (call channels msg addr)
(let* ((csocket (hash-table-ref/default channels addr #f))
(socket (or csocket (make-dialed-request-socket addr))))
(nng-send socket msg)
(print "Sent: "msg", received: "(nng-recv socket))
(if (not (hash-table-exists? channels addr))
(hash-table-set! channels addr socket))))
;; start => hello 0
;; hello 0 => hello 1
;; hello 1 => hello 2
;; ...
;; hello 11 => 'done
;;
(define (process-message mesg)
(let ((parts (string-split mesg)))
(match
parts
((msg c)
(let ((count (string->number c)))
(if (> count 10)
'done
(conc msg " " (if count count 0)))))
((msg)
(conc msg " 0"))
(else
"hello 0"))))
(define (close-srv srvdat)
(nng-close! (srv-rep srvdat)))
(match
(command-line-arguments)
(("do-test")(do-test))
((run myaddr)
;; start listener
;; put myaddr into file by host-pid in .runners
;; for 1 minute
;; get all in .runners
;; call each with a message
;;
(let* ((socket (make-listening-reply-socket myaddr))
(rfile (conc ".runners/"(get-host-name)"-"(current-process-id)))
(th1 (make-thread (lambda ()
(run-listener-responder socket myaddr)
(delete-file* rfile)
(exit))
"responder")))
(if (not (and (file-exists? ".runners")
(directory? ".runners")))
(create-directory ".runners" #t))
(with-output-to-file rfile
(lambda ()
(print myaddr)))
(thread-start! th1)
(let loop ((entries '()))
(if (null? entries)
(loop (glob ".runners/*"))
(let* ((entry (car entries))
(destaddr (with-input-from-file entry read-line)))
(call *channels* (conc "hello-from-"destaddr) destaddr)
(thread-sleep! 0.25)
(loop (cdr entries)))))))
((cmd)(print "ERROR: command "cmd", not recognised.\n\n"help))
(else
(print help)))