Megatest

Artifact [88c5ce4192]
Login

Artifact 88c5ce41928402e8d465d28669a3614a75b8b1ab:


;; ulex: Distributed sqlite3 db
;;;
;; Copyright (C) 2018 Matt Welland
;; Redistribution and use in source and binary forms, with or without
;; modification, is permitted.
;;
;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS
;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
;; ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE
;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
;; OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
;; BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
;; LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
;; USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
;; DAMAGE.

;;======================================================================
;; ABOUT:
;;   See README in the distribution at https://www.kiatoa.com/fossils/ulex
;; NOTES:
;;   Why sql-de-lite and not say, dbi?  - performance mostly, then simplicity.
;;
;;======================================================================

(module ulex
 *

(import scheme
	chicken.base
	chicken.file
	chicken.time
	chicken.condition
	chicken.string
	chicken.sort
	chicken.pretty-print
	
	address-info
	mailbox
	matchable
	queues
	regex
	regex-case
	srfi-1
	srfi-18
	srfi-4
	srfi-69
	system-information
	tcp6
	typed-records
	)

;; udat struct, used by both caller and callee
;; instantiated as uconn by convention
;;
(defstruct udat
  ;; the listener side
  (port #f)
  (host-port #f)
  (socket #f)
  ;; the peers
  (peers  (make-hash-table)) ;; host:port->peer
  ;; work handling
  (work-queue (make-queue))
  (work-proc  #f)            ;; set by user
  (cnum       0)             ;; cookie number
  (mboxes     (make-hash-table))
  (avail-cmboxes '())  ;; list of (<cookie> . <mbox>) for re-use
  ) 

;; struct for keeping track of others we are talking to
;;
(defstruct pdat
  (host-port  #f)
  (conns      '()) ;; list of pcon structs, pop one off when calling the peer
  )

;; struct for peer connections, keep track of expiration etc.
;;
(defstruct pcon
  (inp #f)
  (oup #f)
  (exp (+ (current-seconds) 59)) ;; expires at this time, set to (+ (current-seconds) 59)
  (lifetime (+ (current-seconds) 600)) ;; throw away and create new after five minutes
  )

;;======================================================================
;; listener
;;======================================================================
;; create a tcp listener and return a populated udat struct with
;; my port, address, hostname, pid etc.
;; return #f if fail to find a port to allocate.
;;
;;  if udata-in is #f create the record
;;  if there is already a serv-listener return the udata
;;
(define (setup-listener uconn #!optional (port 4242))
  (handle-exceptions
   exn
   (if (< port 65535)
       (setup-listener uconn (+ port 1))
       #f)
   (connect-listener uconn port)))

(define (connect-listener uconn port)
  ;; (tcp-listener-socket LISTENER)(socket-name so)
  ;; sockaddr-address, sockaddr-port, sockaddr->string
  (let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]])
	 (addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname)))
    (udat-port-set!      uconn port)
    (udat-host-port-set! uconn (conc addr":"port))
    (udat-socket-set!    uconn tlsn)
    uconn))

;;======================================================================
;; peers and connections
;;======================================================================

;; send structured data to recipient
;;
;;  NOTE: qrykey is what was called the "cookie" previously
;;
;;     retval tells send to expect and wait for return data (one line) and return it or time out
;;       this is for ping where we don't want to necessarily have set up our own server yet.
;;
;; NOTE: see below for beginnings of code to allow re-use of tcp connections
;;        - I believe (without substantial evidence) that re-using connections will
;;          be beneficial ...
;;
(define (send udata host-port qrykey cmd params)
  (let* ((my-host-port (udat-host-port udata))          ;; remote will return to this
	 (isme         #f #;(equal? host-port my-host-port)) ;; calling myself?
	 ;; dat is a self-contained work block that can be sent or handled locally
	 (dat          (list my-host-port qrykey cmd params))
	 )
    (if isme
	(ulex-handler udata dat) ;; no transmission needed
	(handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC?
	    exn
	    #f
	  (let-values (((inp oup)(tcp-connect host-port)))
	    (let ((res (if (and inp oup)
			   (begin
			     (write dat oup)
			     (read inp)) ;; yes, we always want an ack
			   (begin
			     (print "ERROR: send called but no receiver has been setup. Please call setup first!")
			     #f))))
	      (close-input-port inp)
	      (close-output-port oup)
	      res)))))) ;; res will always be 'ack

;; send a request to the given host-port and register a mailbox in udata
;; wait for the mailbox data and return it
;;
(define (send-receive uconn host-port cmd data)
  (let* ((cmbox     (get-cmbox uconn)) ;; would it be better to keep a stack of mboxes to reuse?
	 (qrykey    (car cmbox))
	 (mbox      (cdr cmbox))
	 (mbox-time (current-milliseconds)))
    (if (eq? (send uconn host-port qrykey cmd data) 'ack)
	(let* ((mbox-timeout-secs    120) ;; timeout)
	       (mbox-timeout-result 'MBOX_TIMEOUT)
	       (res                  (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result))
	       (mbox-receive-time    (current-milliseconds)))
	  (if (eq? res 'MBOX_TIMEOUT)
	      #f  ;; convert to raising exception?
	      res))
	(begin
	  (print "ERROR: Communication failed?")
	  #f)))) ;; #f means failed to communicate

;;======================================================================
;; responder side
;;======================================================================

;; take a request, rdata, and if not immediate put it in the work queue
;;
;; Reserved cmds; ack ping goodbye response
;;
(define (ulex-handler uconn rdata)
  (match rdata ;;  (string-split controldat)
    ((rem-host-port qrykey cmd params)
     ;; (print "ulex-handler got: "rem-host-port" qrykey: "qrykey" cmd: "cmd" params: "params)
     (let ((mbox (hash-table-ref/default (udat-mboxes uconn) qrykey #f)))
       (case cmd
	 ;; ((ack )(print "Got ack! But why? Should NOT get here.") 'ack)
	 ((ping)
	  ;; (print "Got Ping!")
	  (add-to-work-queue uconn rdata)
	 'ack)
	 ((goodbye)
	  ;; just clear out references to the caller
	  (add-to-work-queue uconn rdata)
	  'ack)
	 ((response) ;; this is a result from remote processing, send it as mail ...
	  (if mbox
	      (begin
		(mailbox-send! mbox params) ;; params here is our result
		'ack)
	      (begin
		(print "ERROR: received result but no associated mbox for cookie "qrykey)
		#f)))
	 (else
	  ;; (print "Got generic request: "cmd)
	  (add-to-work-queue uconn rdata)
	  'ack))))
    (else
     (print "BAD DATA? controldat=" rdata)
     'ack) ;; send ack anyway?
    ))

;; given an already set up uconn start the cmd-loop
;;
(define (ulex-cmd-loop uconn)
  (let* ((serv-listener (udat-socket uconn)))
    (let loop ((state 'start))
      (let-values (((inp oup)(tcp-accept serv-listener)))
	(let* ((rdat  (read inp))
	       (resp  (ulex-handler uconn rdat)))
	  (if resp (write resp oup))
	  (close-input-port inp)
	  (close-output-port oup))
	(loop state)))))

;; add a proc to the cmd list, these are done symetrically (i.e. in all instances)
;; so that the proc can be dereferenced remotely
;;
(define (set-work-handler uconn proc)
  (udat-work-proc-set! uconn proc))

;; run-listener does all the work of starting a listener in a thread
;; it then returns control
;;
(define (run-listener handler-proc)
  (let* ((uconn (make-udat)))
    (udat-work-proc-set! uconn handler-proc)
    (if (setup-listener uconn)
	(let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop"))
	       (th2 (make-thread (lambda ()(process-work-queue uconn)) "Ulex work queue processor")))
	  (thread-start! th1)
	  (thread-start! th2)
	  (print "cmd loop and process workers started")
	  uconn)
	(begin
	  (print "ERROR: run-listener called without proper setup.")
	  (exit)))))

;;======================================================================
;; work queues - this is all happening on the listener side
;;======================================================================

;; rdata is (rem-host-port qrykey cmd params)
					     
(define (add-to-work-queue uconn rdata)
  (queue-add! (udat-work-queue uconn) rdata))

(define (do-work uconn rdata)
  (let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change
    ;; put this following into a do-work procedure
    (match rdata
      ((rem-host-port qrykey cmd params)
       (let* ((result (proc rem-host-port qrykey cmd params)))
	 ;; send 'response as cmd and result as params
	 (send uconn rem-host-port qrykey 'response result))) ;; could check for ack
      (else
       (print "ERROR: rdata "rdata", did not match rem-host-port qrykey cmd params")))))
     
     
(define (process-work-queue uconn) 
  (let ((wqueue (udat-work-queue uconn))
	(proc   (udat-work-proc  uconn)))
    (let loop ()
      (if (queue-empty? wqueue)
	  (thread-sleep! 0.1)
	  (let ((rdata (queue-remove! wqueue)))
	    (do-work uconn rdata)))
      (loop))))

;; below was to enable re-use of connections. This seems non-trivial so for
;; now lets open on each call
;;
;; ;; given host-port get or create peer struct
;; ;;
;; (define (udat-get-peer uconn host-port)
;;   (or (hash-table-ref/default (udat-peers uconn) host-port #f)
;;       ;; no peer, so create pdat and init it
;;       
;;       ;; NEED stack of connections, pop and use; inp, oup,
;;       ;; creation_time (remove and create new if over 24hrs old
;;       ;; 
;;       (let ((pdat (make-pdat host-port: host-port)))
;; 	(hash-table-set! (udat-peers uconn) host-port pdat)
;; 	pdat)))
;; 
;; ;; is pcon alive
;; 
;; ;; given host-port and pdat get a pcon
;; ;;
;; (define (pdat-get-pcon pdat host-port)
;;   (let loop ((conns (pdat-conns pdat)))
;;     (if (null? conns) ;; none? make and return - do NOT add - it will be pushed back on list later
;; 	(init-pcon (make-pcon))
;; 	(let* ((conn (pop conns)))
;; 	  
;; ;; given host-port get a pcon struct
;; ;;
;; (define (udat-get-pcon 
      
;;======================================================================
;; misc utils
;;======================================================================

(define (make-cookie uconn)
  (let ((newcnum (+ (udat-cnum uconn) 1)))
    (udat-cnum-set! uconn newcnum)
    (conc (udat-host-port uconn) ":"
	  newcnum)))

;; cookie/mboxes

;; we store each mbox with a cookie (<cookie> . <mbox>)
;;
(define (get-cmbox uconn)
  (if (null? (udat-avail-cmboxes uconn))
      (let ((cookie (make-cookie uconn))
	    (mbox   (make-mailbox)))
	(hash-table-set! (udat-mboxes uconn) cookie mbox)
	`(,cookie . ,mbox))
      (let ((cmbox (car (udat-avail-cmboxes uconn))))
	(udat-avail-cmboxes-set! uconn (cdr (udat-avail-cmboxes uconn)))
	cmbox)))

(define (put-cmbox uconn cmbox)
  (udat-avail-cmboxes-set! uconn (cons cmbox (udat-avail-cmboxes uconn))))

(define (pp-uconn uconn)
  (pp (udat->alist uconn)))

  
;;======================================================================
;; network utilities
;;======================================================================

;; NOTE: Look at address-info egg as alternative to some of this

(define (rate-ip ipaddr)
  (regex-case ipaddr
    ( "^127\\..*" _ 0 )
    ( "^(10\\.0|192\\.168)\\..*" _ 1 )
    ( else 2 ) ))

;; Change this to bias for addresses with a reasonable broadcast value?
;;
(define (ip-pref-less? a b)
  (> (rate-ip a) (rate-ip b)))

(define (get-my-best-address)
  (let ((all-my-addresses (get-all-ips)))
    (cond
     ((null? all-my-addresses)
      (get-host-name))                                          ;; no interfaces?
     ((eq? (length all-my-addresses) 1)
      (car all-my-addresses))                      ;; only one to choose from, just go with it
     (else
      (car (sort all-my-addresses ip-pref-less?))))))

(define (get-all-ips-sorted)
  (sort (get-all-ips) ip-pref-less?))

(define (get-all-ips)
  (map address-info-host
       (filter (lambda (x)
		 (equal? (address-info-type x) "tcp"))
	       (address-infos (get-host-name)))))

;; (map ip->string (vector->list 
;; 		   (hostinfo-addresses
;; 		    (host-information (current-hostname))))))


)

(import ulex trace big-chicken srfi-18 test matchable system-information)
(trace-call-sites #t)
(trace
 ;; ulex-handler
 ;; send
 ;; add-to-work-queue
 )
 
(define (handler-proc rem-host-port qrykey cmd params)
  (print "handler-proc "rem-host-port" "qrykey" "cmd" "params)
  (case cmd
    ((ping) 'pong)
    ((calc)  (eval (with-input-from-string params read)))
    ((print)
     (print "params="params)
     params)
    ((reflect) `(,rem-host-port ,qrykey ,cmd ,params))
    (else  `(data ,data))))

(define uconn (run-listener handler-proc))

(pp-uconn uconn)

;; super basic loop back test
(define res #f)
(define targhost (conc (get-host-name)":"4242))
(define th1 (make-thread (lambda ()
			   (test #f 10 (send-receive uconn targhost 'calc "(+ 5 5)"))
			   (set! res (send-receive uconn targhost 'ping '()))
			   (test #f 'pong (send-receive uconn targhost 'ping '()))
			   )))

(thread-start! th1)
(thread-join! th1)
		
(print "All done")
(print "Received "res)