Megatest

ulex.scm at [7889ffe9e5]
Login

File ulex-dual/ulex.scm artifact ba1e2ab076 part of check-in 7889ffe9e5


;; ulex: Distributed sqlite3 db
;;;
;; Copyright (C) 2018-2021 Matt Welland
;; Redistribution and use in source and binary forms, with or without
;; modification, is permitted.
;;
;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS
;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
;; ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE
;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
;; OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
;; BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
;; LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
;; USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
;; DAMAGE.

;;======================================================================
;; ABOUT:
;;   See README in the distribution at https://www.kiatoa.com/fossils/ulex
;; NOTES:
;;   Why sql-de-lite and not say, dbi?  - performance mostly, then simplicity.
;;
;;======================================================================

(module ulex
	*
	#;(
     
     ;; NOTE: looking for the handler proc - find the run-listener :)
     
     run-listener     ;; (run-listener handler-proc [port]) => uconn

     ;; NOTE: handler-proc params;
     ;;       (handler-proc rem-host-port qrykey cmd params)

     send-receive     ;; (send-receive uconn host-port cmd data)

     ;; NOTE: cmd can be any plain text symbol except for these;
     ;;         'ping 'ack 'goodbye 'response
     
     set-work-handler ;; (set-work-handler proc)

     wait-and-close   ;; (wait-and-close uconn)

     ulex-listener?
     
     ;; needed to get the interface:port that was automatically found
     udat-port
     udat-host-port
     
     ;; for testing only
     ;; pp-uconn
     
     ;; parameters
     work-method   ;; parameter; 'threads, 'mailbox, 'limited, 'direct
     return-method ;; parameter; 'mailbox, 'polling, 'direct
     )

(import scheme
	chicken.base
	chicken.file
	chicken.io
	chicken.time
	chicken.condition
	chicken.port
	chicken.string
	chicken.sort
	chicken.pretty-print
	chicken.tcp
	
	address-info
	mailbox
	matchable
	;; queues
	regex
	regex-case
	simple-exceptions
	s11n
	srfi-1
	srfi-18
	srfi-4
	srfi-69
	system-information
	;; tcp6
	tcp-server
	typed-records

	md5
	message-digest
	(prefix base64 base64:)
	z3
	)

;; udat struct, used by both caller and callee
;; instantiated as uconn by convention
;;
(defstruct udat
  ;; the listener side
  (port #f)
  (host-port #f) ;; my host:port
  (socket #f)
  ;; the peers
  (peers  (make-hash-table)) ;; host:port->peer
  ;; work handling
  (work-queue (make-mailbox))
  (work-proc  #f)                ;; set by user
  (cnum       0)                 ;; cookie number
  (mboxes     (make-hash-table)) ;; for the replies
  (avail-cmboxes '())            ;; list of (<cookie> . <mbox>) for re-use
  ;; threads
  (numthreads 10)
  (cmd-thread #f)
  (work-queue-thread #f)
  (num-threads-running 0)
  ) 

;;======================================================================
;; serialization
;;   NOTE: I've had problems with read/write and s11n serialize, deserialize
;;         thus the inefficient method here
;;======================================================================

(define serializing-method (make-parameter 'complex))


;; NOTE: Can remove the regex and base64 encoding for zmq
(define (obj->string obj)
  (case (serializing-method)
    ((complex)
     (string-substitute
      (regexp "=") "_"
      (base64:base64-encode 
       (z3:encode-buffer
        (with-output-to-string
          (lambda ()(serialize obj))))) ;; BB: serialize - this is
					;; what causes problems
					;; between different builds of
					;; megatest communicating.
					;; serialize is sensitive to
					;; binary image of mtest.
      #t))
    ((write)(with-output-to-string (lambda ()(write obj))))
    ((s11n) (with-output-to-string (lambda ()(serialize obj))))
    (else obj))) ;; rpc

(define (string->obj msg #!key (transport 'http))
  (case (serializing-method)
    ((complex)
     (handle-exceptions
      exn
      (begin
        (print "ULEX ERROR: cannot translate received data \""msg"\"")
        (print-call-chain (current-error-port))
        msg)
      (with-input-from-string 
          (z3:decode-buffer
           (base64:base64-decode
            (string-substitute 
             (regexp "_") "=" msg #t)))
        (lambda ()(deserialize)))))
    ((write)(with-input-from-string msg (lambda ()(read))))
    ((s11n)(with-input-from-string msg (lambda ()(deserialize))))
    (else msg))) ;; rpc


;;======================================================================
;; listener
;;======================================================================

;; is uconn a ulex connector (listener)
;;
(define (ulex-listener? uconn)
  (udat? uconn))

;; create a tcp listener and return a populated udat struct with
;; my port, address, hostname, pid etc.
;; return #f if fail to find a port to allocate.
;;
;;  if udata-in is #f create the record
;;  if there is already a serv-listener return the udata
;;
(define (setup-listener uconn #!optional (port 4242))
  (handle-exceptions
   exn
   (if (< port 65535)
       (setup-listener uconn (+ port 1))
       #f)
   (connect-listener uconn port)))

(define (connect-listener uconn port)
  ;; (tcp-listener-socket LISTENER)(socket-name so)
  ;; sockaddr-address, sockaddr-port, sockaddr->string
  (let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]])
	 (addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname)))
    (udat-port-set!      uconn port)
    (udat-host-port-set! uconn (conc addr":"port))
    (udat-socket-set!    uconn tlsn)
    uconn))

;; run-listener does all the work of starting a listener in a thread
;; it then returns control
;;
(define (run-listener handler-proc #!optional (port-suggestion 4242))
  (let* ((uconn (make-udat)))
    (udat-work-proc-set! uconn handler-proc)
    (if (setup-listener uconn port-suggestion)
	(let* ((orig-in  (current-input-port))
	       (orig-out (current-output-port)))
	  ((make-tcp-server
	    (udat-socket uconn)
	    (lambda ()
	      (let* ((rdat
		      (string->obj (read))
		      ;; (read in)
		      ;; (deserialize)
		      ) 
		     (resp (let ((tcp-in  (current-input-port))
				 (tcp-out (current-output-port)))
			     (current-input-port orig-in)
			     (current-output-port orig-out)
			     (let ((res (do-work uconn rdat)))
			       (current-input-port tcp-in)
			       (current-output-port tcp-out)
			       res))))
		(write (obj->string resp))
		;; (serialize resp)
		;; (write resp out)
		)))))
	(assert #f "ERROR: run-listener called without proper setup."))))

(define (wait-and-close uconn)
  (thread-join! (udat-cmd-thread uconn))
  (tcp-close (udat-socket uconn)))

;;========================================================================
;; peers and connections
;;========================================================================

(define *send-mutex* (make-mutex))

;; send structured data to recipient
;;
;;  NOTE: qrykey is what was called the "cookie" previously
;;
;;     retval tells send to expect and wait for return data (one line) and return it or time out
;;       this is for ping where we don't want to necessarily have set up our own server yet.
;;
(define (send-receive udata host-port cmd params)
  (let* ((host-port-lst (string-split host-port ":"))
	 (host          (car host-port-lst))
	 (port          (string->number (cadr host-port-lst)))
	 (my-host-port (and udata (udat-host-port udata)))          ;; remote will return to this
	 (isme         (equal? host-port my-host-port)) ;; calling myself?
	 ;; dat is a self-contained work block that can be sent or handled locally
	 (dat          (list `(host-port . ,my-host-port)
			     `(qrykey . qrykey)
			     `(cmd . ,cmd)
			     `(params . ,params))))
    (cond
     (isme (do-work udata dat)) ;; no transmission needed
     (else
      (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC?
       exn
       (begin
         (print "ULEX send-receive: "cmd", "params", exn="exn)
         (message exn))
       (begin
	 ;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP
	 (let-values (((inp oup)(tcp-connect host port)))
	   (let ((res (if (and inp oup)
			  (begin
			    (write (obj->string dat) oup)
			    (close-output-port oup)
			    (string->obj (read inp)))
			  (begin
			    (print "ERROR: send called but no receiver has been setup. Please call setup first!")
			    #f))))
	     (close-input-port inp)
	     ;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP
	     res)))))))) ;; res will always be 'ack unless return-method is direct

;;======================================================================
;; work queues - this is all happening on the listener side
;;======================================================================

;; move the logic to return the result somewhere else?
;;
(define (do-work uconn rdat)
  ;; put this following into a do-work procedure
  (match rdat
     ((rem-host-port qrykey cmd params)
      (case cmd
	((ping) 'ping-ack) ;; bypass calling the proc
	(else
	 (let* ((proc       (udat-work-proc uconn))
		(start-time (current-milliseconds))
		(result     (with-output-to-port (current-error-port)
			      (lambda ()
				(proc rem-host-port qrykey cmd params))))
		(end-time   (current-milliseconds))
		(run-time   (- end-time start-time)))
	   result))))
     (else
      (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params"))))

;;======================================================================
;; misc utils
;;======================================================================

(define (pp-uconn uconn)
  (pp (udat->alist uconn)))

;;======================================================================
;; network utilities
;;======================================================================

;; NOTE: Look at address-info egg as alternative to some of this

(define (rate-ip ipaddr)
  (regex-case ipaddr
    ( "^127\\..*" _ 0 )
    ( "^(10\\.0|192\\.168)\\..*" _ 1 )
    ( else 2 ) ))

;; Change this to bias for addresses with a reasonable broadcast value?
;;
(define (ip-pref-less? a b)
  (> (rate-ip a) (rate-ip b)))

(define (get-my-best-address)
  (let ((all-my-addresses (get-all-ips)))
    (cond
     ((null? all-my-addresses)
      (get-host-name))                                          ;; no interfaces?
     ((eq? (length all-my-addresses) 1)
      (car all-my-addresses))                      ;; only one to choose from, just go with it
     (else
      (car (sort all-my-addresses ip-pref-less?))))))

(define (get-all-ips-sorted)
  (sort (get-all-ips) ip-pref-less?))

(define (get-all-ips)
  (map address-info-host
       (filter (lambda (x)
		 (equal? (address-info-type x) "tcp"))
	       (address-infos (get-host-name)))))

)