Megatest

nng-test.scm at [cf42e4ea4d]
Login

File nng-trial/nng-test.scm artifact 1f5de0e9fe part of check-in cf42e4ea4d


(import (chicken io)
	(chicken file)
	(chicken file posix)
        (chicken string)
	(chicken process-context)
	(chicken process-context posix)
        miscmacros
        nng
        srfi-18
	srfi-69
        test
	matchable
	typed-records
	system-information
	directory-utils
	)

(define help "Usage: nng-test COMMAND
  where COMMAND is one of:
    dotest    : run the basic req/rep test
")

(define address-tcp-1 "tcp://localhost:5555")
(define address-tcp-2 "tcp://localhost:6666")

(define address-inproc-1 "inproc://local1")
(define address-inproc-2 "inproc://local2")

;;;
;;; Req-Rep
;;;
(define (make-listening-reply-socket address)
  (let ((socket (make-rep-socket)))
    (socket-set! socket 'nng/recvtimeo 2000)
    (nng-listen socket address)
    socket))

(define (make-dialed-request-socket address)
  (let ((socket (make-req-socket)))
    (socket-set! socket 'nng/recvtimeo 2000)
    (nng-dial socket address)
    socket))

(define (req-rep-test address)
  (let ((rep (make-listening-reply-socket address))
        (req (make-dialed-request-socket address)))
    (nng-send req "message 1")
    (nng-recv rep)
    (nng-send rep "message")
    (begin0
     (nng-recv req)
     (nng-close! rep))))

(define (do-test)
  (test-group "nng"
              (test "tcp req-rep"
                    "message"
                    (req-rep-test address-tcp-1))
              (test "inproc req-rep"
                    "message"
                    (req-rep-test address-inproc-1)))
  (test-exit))

;; talking to self here...
;;
(define (send-n-messages n srvdat)
  (let* ((name (srv-name srvdat)))
    (let loop ((i 0))
      (if (< i n)
	  (begin
	    (nng-send (srv-req srvdat) (conc name "-" i))
	    (print "received: "(nng-recv (srv-rep srvdat)))
	    (loop (+ i 1)))))))

;; this should be run in a thread
(define (run-listener-responder socket myaddr)
  (let loop ((status 'running))
    (let* ((msg (nng-recv socket))
	   (response (process-message msg)))
      (if (not (eq? response 'done))
	  (begin
	    (nng-send socket response)
	    (loop status))))))

(define *channels* (make-hash-table))

(define (call channels msg addr)
  (let* ((csocket (hash-table-ref/default channels addr #f))
	 (socket  (or csocket (make-dialed-request-socket addr))))
    (nng-send socket msg)
    (print "Sent: "msg", received: "(nng-recv socket))
    (if (not (hash-table-exists? channels addr))
	(hash-table-set! channels addr socket))))

;; start    => hello 0
;; hello 0  => hello 1
;; hello 1  => hello 2
;;  ...
;; hello 11 => 'done
;;
(define (process-message mesg)
  (let ((parts (string-split mesg)))
    (match
     parts
     ((msg c)
      (let ((count (string->number c)))
	(if (> count 10)
	    'done
	    (conc msg " " (if count count 0)))))
     ((msg)
      (conc msg " 0"))
     (else
      "hello 0"))))

(define (close-srv srvdat)
  (nng-close! (srv-rep srvdat)))
    
(match
 (command-line-arguments)
 (("do-test")(do-test))
 ((run myaddr)
  ;; start listener
  ;; put myaddr into file by host-pid in .runners
  ;; for 1 minute
  ;;     get all in .runners
  ;;     call each with a message
  ;;
  (let* ((socket (make-listening-reply-socket myaddr))
	 (rfile  (conc ".runners/"(get-host-name)"-"(current-process-id)))
	 (th1    (make-thread (lambda ()
				(run-listener-responder socket myaddr)
				(delete-file* rfile)
				(exit))
		 "responder")))
    (if (not (and (file-exists? ".runners")
		  (directory? ".runners")))
	(create-directory ".runners" #t))
    (with-output-to-file rfile
      (lambda ()
	(print myaddr)))
    (thread-start! th1)
    (let loop ((entries '()))
      (if (null? entries)
	  (loop (glob ".runners/*"))
	  (let* ((entry (car entries))
		 (destaddr (with-input-from-file entry read-line)))
	    (call *channels* (conc "hello-from-"destaddr)  destaddr)
	    (thread-sleep! 0.25)
	    (loop (cdr entries)))))))
 ((cmd)(print "ERROR: command "cmd", not recognised.\n\n"help))
 (else
  (print help)))