Megatest

nng-test.scm at [ac3127fbf2]
Login

File nng-trial/nng-test.scm artifact a81f0cc6e1 part of check-in ac3127fbf2


(module nng-test *
	
(import scheme
	(chicken io)
	(chicken base)
	(chicken time)
	(chicken file)
	(chicken file posix)
        (chicken string)
	(chicken process-context)
	(chicken process-context posix)
        miscmacros
        nng
        srfi-18
	srfi-69
        test
	matchable
	typed-records
	system-information
	directory-utils
	)

(define help "Usage: nng-test COMMAND
  where COMMAND is one of:
    do-test              : run the basic req/rep test
    run tcp://host:port  : start test server - start several in same dir
")

(define address-tcp-1 "tcp://localhost:5555")
(define address-tcp-2 "tcp://localhost:6666")

(define address-inproc-1 "inproc://local1")
(define address-inproc-2 "inproc://local2")

;;;
;;; Req-Rep
;;;
(define (make-listening-reply-socket address)
  (let ((socket (make-rep-socket)))
    (socket-set! socket 'nng/recvtimeo 2000)
    (nng-listen socket address)
    socket))

(define (make-dialed-request-socket address)
  (let ((socket (make-req-socket)))
    (socket-set! socket 'nng/recvtimeo 2000)
    (nng-dial socket address)
    socket))

(define (req-rep-test address)
  (let ((rep (make-listening-reply-socket address))
        (req (make-dialed-request-socket address)))
    (nng-send req "message 1")
    (nng-recv rep)
    (nng-send rep "message")
    (begin0
     (nng-recv req)
     (nng-close! rep))))

(define (do-test)
  (test-group "nng"
              (test "tcp req-rep"
                    "message"
                    (req-rep-test address-tcp-1))
              (test "inproc req-rep"
                    "message"
                    (req-rep-test address-inproc-1)))
  (test-exit))

;; this should be run in a thread
(define (run-listener-responder socket myaddr)
  (let loop ((status 'running))
    (let* ((msg (nng-recv socket))
	   (response (process-message msg)))
      (if (not (eq? response 'done))
	  (begin
	    (nng-send socket response)
	    (loop status))))))

(define *channels* (make-hash-table))

(define (call channels msg addr)
  (let* ((csocket (hash-table-ref/default channels addr #f))
	 (socket  (or csocket (make-dialed-request-socket addr))))
    (nng-send socket msg)
    (print "Sent: "msg", received: "(nng-recv socket))
    (if (not (hash-table-exists? channels addr))
	(hash-table-set! channels addr socket))))

;; start    => hello 0
;; hello 0  => hello 1
;; hello 1  => hello 2
;;  ...
;; hello 11 => 'done
;;
(define (process-message mesg)
  (let ((parts (string-split mesg)))
    (match
     parts
     ((msg c)
      (let ((count (string->number c)))
	(if (> count 10)
	    'done
	    (conc msg " " (if count count 0)))))
     ((msg)
      (conc msg " 0"))
     (else
      "hello 0"))))

(define (main)
  (match
   (command-line-arguments)
   (("do-test")(do-test))
   ((run myaddr)
    ;; start listener
    ;; put myaddr into file by host-pid in .runners
    ;; for 1 minute
    ;;     get all in .runners
    ;;     call each with a message
    ;;
    (let* ((endtimes (+ (current-seconds) 20)) ;; run for 20 seconds
	   (socket (make-listening-reply-socket myaddr))
	   (rfile  (conc ".runners/"(get-host-name)"-"(current-process-id)))
	   (th1    (make-thread (lambda ()
				  (run-listener-responder socket myaddr)
				  )
				"responder")))
      (if (not (and (file-exists? ".runners")
		    (directory? ".runners")))
	  (create-directory ".runners" #t))
      (with-output-to-file rfile
	(lambda ()
	  (print myaddr)))
      (thread-start! th1)
      (let loop ((entries '()))
	(if (> (current-seconds) endtimes)
	    (begin
	      (delete-file* rfile)
	      (sleep 1)
	      (exit))
	    (if (null? entries)
		(loop (glob ".runners/*"))
		(let* ((entry (car entries))
		       (destaddr (with-input-from-file entry read-line)))
		  (call *channels* (conc "hello-from-"destaddr)  destaddr)
		  ;; (thread-sleep! 0.025)
		  (loop (cdr entries))))))))
   ((cmd)(print "ERROR: command "cmd", not recognised.\n\n"help))
   (else
    (print help))))

) ;; end module

(import nng-test)
(main)