Megatest

ulex.scm at [337ae6b713]
Login

File ulex-simple/ulex.scm artifact 52358cb04f part of check-in 337ae6b713


;; ulex: Distributed sqlite3 db
;;;
;; Copyright (C) 2018-2021 Matt Welland
;; Redistribution and use in source and binary forms, with or without
;; modification, is permitted.
;;
;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS
;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
;; ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE
;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
;; OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
;; BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
;; LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
;; USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
;; DAMAGE.

;;======================================================================
;; ABOUT:
;;   See README in the distribution at https://www.kiatoa.com/fossils/ulex
;; NOTES:
;;   Why sql-de-lite and not say, dbi?  - performance mostly, then simplicity.
;;
;;======================================================================

(module ulex
	*
	#;(
     
     ;; NOTE: looking for the handler proc - find the run-listener :)
     
     run-listener     ;; (run-listener handler-proc [port]) => uconn

     ;; NOTE: handler-proc params;
     ;;       (handler-proc rem-host-port qrykey cmd params)

     send-receive     ;; (send-receive uconn host-port cmd data)

     ;; NOTE: cmd can be any plain text symbol except for these;
     ;;         'ping 'ack 'goodbye 'response
     
     set-work-handler ;; (set-work-handler proc)

     wait-and-close   ;; (wait-and-close uconn)

     ulex-listener?
     
     ;; needed to get the interface:port that was automatically found
     udat-port
     udat-host-port
     
     ;; for testing only
     ;; pp-uconn
     
     ;; parameters
     work-method   ;; parameter; 'threads, 'mailbox, 'limited, 'direct
     return-method ;; parameter; 'mailbox, 'polling, 'direct
     )

(import scheme
	chicken.base
	chicken.file
	chicken.io
	chicken.time
	chicken.condition
	chicken.string
	chicken.sort
	chicken.pretty-print
	chicken.tcp
	
	address-info
	mailbox
	matchable
	;; queues
	regex
	regex-case
	simple-exceptions
	s11n
	srfi-1
	srfi-18
	srfi-4
	srfi-69
	system-information
	;; tcp6
	tcp-server
	typed-records
	)

;; udat struct, used by both caller and callee
;; instantiated as uconn by convention
;;
(defstruct udat
  ;; the listener side
  (port #f)
  (host-port #f)
  (socket #f)
  ;; the peers
  (peers  (make-hash-table)) ;; host:port->peer
  ;; work handling
  (work-queue (make-mailbox))
  (work-proc  #f)                ;; set by user
  (cnum       0)                 ;; cookie number
  (mboxes     (make-hash-table)) ;; for the replies
  (avail-cmboxes '())            ;; list of (<cookie> . <mbox>) for re-use
  ;; threads
  (numthreads 10)
  (cmd-thread #f)
  (work-queue-thread #f)
  (num-threads-running 0)
  ) 

;;======================================================================
;; listener
;;======================================================================

;; is uconn a ulex connector (listener)
;;
(define (ulex-listener? uconn)
  (udat? uconn))

;; create a tcp listener and return a populated udat struct with
;; my port, address, hostname, pid etc.
;; return #f if fail to find a port to allocate.
;;
;;  if udata-in is #f create the record
;;  if there is already a serv-listener return the udata
;;
(define (setup-listener uconn #!optional (port 4242))
  (handle-exceptions
   exn
   (if (< port 65535)
       (setup-listener uconn (+ port 1))
       #f)
   (connect-listener uconn port)))

(define (connect-listener uconn port)
  ;; (tcp-listener-socket LISTENER)(socket-name so)
  ;; sockaddr-address, sockaddr-port, sockaddr->string
  (let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]])
	 (addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname)))
    (udat-port-set!      uconn port)
    (udat-host-port-set! uconn (conc addr":"port))
    (udat-socket-set!    uconn tlsn)
    uconn))

;; run-listener does all the work of starting a listener in a thread
;; it then returns control
;;
(define (run-listener handler-proc #!optional (port-suggestion 4242))
  (let* ((uconn (make-udat)))
    (udat-work-proc-set! uconn handler-proc)
    (if (setup-listener uconn port-suggestion)
	((make-tcp-server
	  (udat-socket uconn)
	  (lambda ()
	    (let* ((rdat  (deserialize)) ;; '(my-host-port qrykey cmd params)
		   (resp  (do-work uconn rdat)))
	      (serialize resp)))))
	(assert #f "ERROR: run-listener called without proper setup."))))

(define (wait-and-close uconn)
  (thread-join! (udat-cmd-thread uconn))
  (tcp-close (udat-socket uconn)))

;;======================================================================
;; == << ;; peers and connections
;; == << ;;======================================================================

(define *send-mutex* (make-mutex))

;; send structured data to recipient
;;
;;  NOTE: qrykey is what was called the "cookie" previously
;;
;;     retval tells send to expect and wait for return data (one line) and return it or time out
;;       this is for ping where we don't want to necessarily have set up our own server yet.
;;
(define (send-receive udata host-port cmd params)
  (let* ((my-host-port (udat-host-port udata))          ;; remote will return to this
	 (isme         (equal? host-port my-host-port)) ;; calling myself?
	 ;; dat is a self-contained work block that can be sent or handled locally
	 (dat          (list my-host-port 'qrykey cmd params #;(cons (current-seconds)(current-milliseconds)))))
    (cond
     (isme (do-work udata dat)) ;; no transmission needed
     (else
      (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC?
	  exn
	  (message exn)
	(begin
	  ;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP
	  (let-values (((inp oup)(tcp-connect host-port)))
	    (let ((res (if (and inp oup)
			   (begin
			     (serialize dat oup)
			     (close-output-port oup)
			     (deserialize inp))
			   (begin
			     (print "ERROR: send called but no receiver has been setup. Please call setup first!")
			     #f))))
	      (close-input-port inp)
	      ;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP
	      res)))))))) ;; res will always be 'ack unless return-method is direct

;;======================================================================
;; work queues - this is all happening on the listener side
;;======================================================================

;; move the logic to return the result somewhere else?
;;
(define (do-work uconn rdat)
  (let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change
    ;; put this following into a do-work procedure
    (match rdat
      ((rem-host-port qrykey cmd params)
       (let* ((start-time (current-milliseconds))
	      (result (proc rem-host-port qrykey cmd params))
	      (end-time (current-milliseconds))
	      (run-time (- end-time start-time)))
	 result))
      (else
       (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params")))))

;;======================================================================
;; misc utils
;;======================================================================

(define (pp-uconn uconn)
  (pp (udat->alist uconn)))

  
;;======================================================================
;; network utilities
;;======================================================================

;; NOTE: Look at address-info egg as alternative to some of this

(define (rate-ip ipaddr)
  (regex-case ipaddr
    ( "^127\\..*" _ 0 )
    ( "^(10\\.0|192\\.168)\\..*" _ 1 )
    ( else 2 ) ))

;; Change this to bias for addresses with a reasonable broadcast value?
;;
(define (ip-pref-less? a b)
  (> (rate-ip a) (rate-ip b)))

(define (get-my-best-address)
  (let ((all-my-addresses (get-all-ips)))
    (cond
     ((null? all-my-addresses)
      (get-host-name))                                          ;; no interfaces?
     ((eq? (length all-my-addresses) 1)
      (car all-my-addresses))                      ;; only one to choose from, just go with it
     (else
      (car (sort all-my-addresses ip-pref-less?))))))

(define (get-all-ips-sorted)
  (sort (get-all-ips) ip-pref-less?))

(define (get-all-ips)
  (map address-info-host
       (filter (lambda (x)
		 (equal? (address-info-type x) "tcp"))
	       (address-infos (get-host-name)))))

)