Megatest

ulex.scm at [caf99578ef]
Login

File ulex/ulex.scm artifact 81b8992868 part of check-in caf99578ef


;; ulex: Distributed sqlite3 db
;;;
;; Copyright (C) 2018-2021 Matt Welland
;; Redistribution and use in source and binary forms, with or without
;; modification, is permitted.
;;
;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS
;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
;; ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE
;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
;; OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
;; BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
;; LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
;; USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
;; DAMAGE.

;;======================================================================
;; ABOUT:
;;   See README in the distribution at https://www.kiatoa.com/fossils/ulex
;; NOTES:
;;   Why sql-de-lite and not say, dbi?  - performance mostly, then simplicity.
;;
;;======================================================================

(module ulex
	*
	#;(
     
     ;; NOTE: looking for the handler proc - find the run-listener :)
     
     run-listener     ;; (run-listener handler-proc [port]) => uconn

     ;; NOTE: handler-proc params;
     ;;       (handler-proc rem-host-port qrykey cmd params)

     send-receive     ;; (send-receive uconn host-port cmd data)

     ;; NOTE: cmd can be any plain text symbol except for these;
     ;;         'ping 'ack 'goodbye 'response
     
     set-work-handler ;; (set-work-handler proc)

     wait-and-close   ;; (wait-and-close uconn)

     ulex-listener?
     
     ;; needed to get the interface:port that was automatically found
     udat-port
     udat-host-port
     
     ;; for testing only
     ;; pp-uconn
     
     ;; parameters
     work-method   ;; parameter; 'threads, 'mailbox, 'limited, 'direct
     return-method ;; parameter; 'mailbox, 'polling, 'direct
     )

(import scheme
	chicken.base
	chicken.file
	chicken.time
	chicken.condition
	chicken.string
	chicken.sort
	chicken.pretty-print
	
	address-info
	mailbox
	matchable
	;; queues
	regex
	regex-case
	s11n
	srfi-1
	srfi-18
	srfi-4
	srfi-69
	system-information
	tcp6
	typed-records
	)

;; udat struct, used by both caller and callee
;; instantiated as uconn by convention
;;
(defstruct udat
  ;; the listener side
  (port #f)
  (host-port #f)
  (socket #f)
  ;; the peers
  (peers  (make-hash-table)) ;; host:port->peer
  ;; work handling
  (work-queue (make-mailbox))
  (work-proc  #f)                ;; set by user
  (cnum       0)                 ;; cookie number
  (mboxes     (make-hash-table)) ;; for the replies
  (avail-cmboxes '())            ;; list of (<cookie> . <mbox>) for re-use
  ;; threads
  (numthreads 10)
  (cmd-thread #f)
  (work-queue-thread #f)
  (num-threads-running 0)
  ) 

;; Parameters

;; work-method:
(define work-method (make-parameter 'mailbox))
;;    mailbox - all rdat goes through mailbox
;;    threads - all rdat immediately executed in new thread
;;    direct  - no queuing
;;

;; return-method, return the result to waiting send-receive:
(define return-method (make-parameter 'mailbox))
;;    mailbox - create a mailbox and use it for passing returning results to send-receive
;;    polling - put the result in a hash table keyed by qrykey and send-receive can poll it for result
;;    direct  - no queuing, result is passed back in single tcp connection
;;

;; ;; struct for keeping track of others we are talking to
;; ;;
;; (defstruct pdat
;;   (host-port  #f)
;;   (conns      '()) ;; list of pcon structs, pop one off when calling the peer
;;   )
;; 
;; ;; struct for peer connections, keep track of expiration etc.
;; ;;
;; (defstruct pcon
;;   (inp #f)
;;   (oup #f)
;;   (exp (+ (current-seconds) 59)) ;; expires at this time, set to (+ (current-seconds) 59)
;;   (lifetime (+ (current-seconds) 600)) ;; throw away and create new after five minutes
;;   )

;;======================================================================
;; listener
;;======================================================================

;; is uconn a ulex connector (listener)
;;
(define (ulex-listener? uconn)
  (udat? uconn))

;; create a tcp listener and return a populated udat struct with
;; my port, address, hostname, pid etc.
;; return #f if fail to find a port to allocate.
;;
;;  if udata-in is #f create the record
;;  if there is already a serv-listener return the udata
;;
(define (setup-listener uconn #!optional (port 4242))
  (handle-exceptions
   exn
   (if (< port 65535)
       (setup-listener uconn (+ port 1))
       #f)
   (connect-listener uconn port)))

(define (connect-listener uconn port)
  ;; (tcp-listener-socket LISTENER)(socket-name so)
  ;; sockaddr-address, sockaddr-port, sockaddr->string
  (let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]])
	 (addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname)))
    (udat-port-set!      uconn port)
    (udat-host-port-set! uconn (conc addr":"port))
    (udat-socket-set!    uconn tlsn)
    uconn))

;; run-listener does all the work of starting a listener in a thread
;; it then returns control
;;
(define (run-listener handler-proc #!optional (port-suggestion 4242))
  (let* ((uconn (make-udat)))
    (udat-work-proc-set! uconn handler-proc)
    (if (setup-listener uconn port-suggestion)
	(let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop"))
	       (th2 (make-thread (lambda ()
				   (case (work-method)
				     ((mailbox limited)
				      (process-work-queue uconn))))
				 "Ulex work queue processor")))
	  (tcp-buffer-size 2048)
	  ;; (max-connections 2048) 
	  (thread-start! th1)
	  (thread-start! th2)
	  (udat-cmd-thread-set! uconn th1)
	  (udat-work-queue-thread-set! uconn th2)
	  (print "cmd loop and process workers started, listening on "(udat-host-port uconn)".")
	  uconn)
	(assert #f "ERROR: run-listener called without proper setup."))))

(define (wait-and-close uconn)
  (thread-join! (udat-cmd-thread uconn))
  (tcp-close (udat-socket uconn)))

;;======================================================================
;; peers and connections
;;======================================================================

(define *send-mutex* (make-mutex))

;; send structured data to recipient
;;
;;  NOTE: qrykey is what was called the "cookie" previously
;;
;;     retval tells send to expect and wait for return data (one line) and return it or time out
;;       this is for ping where we don't want to necessarily have set up our own server yet.
;;
;; NOTE: see below for beginnings of code to allow re-use of tcp connections
;;        - I believe (without substantial evidence) that re-using connections will
;;          be beneficial ...
;;
(define (send udata host-port qrykey cmd params)
  (let* ((my-host-port (udat-host-port udata))          ;; remote will return to this
	 (isme         #f #;(equal? host-port my-host-port)) ;; calling myself?
	 ;; dat is a self-contained work block that can be sent or handled locally
	 (dat          (list my-host-port qrykey cmd params #;(cons (current-seconds)(current-milliseconds)))))
    (cond
     (isme (ulex-handler udata dat)) ;; no transmission needed
     (else
      (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC?
	  exn
	  #f
	(begin
	  ;; (mutex-lock! *send-mutex*)
	  (let-values (((inp oup)(tcp-connect host-port)))
	    (let ((res (if (and inp oup)
			   (begin
			     (serialize dat oup)
			     (deserialize inp))
			   (begin
			     (print "ERROR: send called but no receiver has been setup. Please call setup first!")
			     #f))))
	      (close-input-port inp)
	      (close-output-port oup)
	      ;; (mutex-unlock! *send-mutex*)
	      res)))))))) ;; res will always be 'ack unless return-method is direct

(define (send-via-polling uconn host-port cmd data)
  (let* ((qrykey (make-cookie uconn))
	 (sres   (send uconn host-port qrykey cmd data)))
    (case sres
      ((ack)
       (let loop ((start-time (current-milliseconds)))
	 (if (> (current-milliseconds)(+ start-time 10000)) ;; ten seconds timeout
	     (begin
	       (print "ULEX ERROR: timed out waiting for response from "host-port", "cmd" "data)
	       #f)
	     (let* ((result (hash-table-ref/default (udat-mboxes uconn) qrykey #f))) ;; NOTE: we are re-using mboxes hash
	       (if result ;; result is '(status . result-data) or #f for nothing yet
		   (begin
		     (hash-table-delete! (udat-mboxes uconn) qrykey)
		     (cdr result))
		   (begin
		     (thread-sleep! 0.01)
		     (loop start-time)))))))
      (else
       (print "ULEX ERROR: Communication failed? sres="sres)
       #f))))

(define (send-via-mailbox uconn host-port cmd data)
  (let* ((cmbox     (get-cmbox uconn)) ;; would it be better to keep a stack of mboxes to reuse?
	 (qrykey    (car cmbox))
	 (mbox      (cdr cmbox))
	 (mbox-time (current-milliseconds))
	 (sres      (send uconn host-port qrykey cmd data))) ;; short res
    (if (eq? sres 'ack)
	(let* ((mbox-timeout-secs    120 #;(if (eq? 'primordial (thread-name (current-thread)))
				     #f
				     120)) ;; timeout)
	       (mbox-timeout-result 'MBOX_TIMEOUT)
	       (res                  (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result))
	       (mbox-receive-time    (current-milliseconds)))
	  ;; (put-cmbox uconn cmbox) ;; reuse mbox and cookie. is it worth it?
	  (hash-table-delete! (udat-mboxes uconn) qrykey)
	  (if (eq? res 'MBOX_TIMEOUT)
	      (begin
		(print "WARNING: mbox timed out for query "cmd", with data "data
		       ", waiting for response from "host-port".")

		;; here it might make sense to clean up connection records and force clean start?
		;; NO. The progam using ulex needs to do the reset. Right thing here is exception
		
		#f)  ;; convert to raising exception?
	      res))
	(begin
	  (print "ERROR: Communication failed? Got "sres)
	  #f))))
  
;; send a request to the given host-port and register a mailbox in udata
;; wait for the mailbox data and return it
;;
(define (send-receive uconn host-port cmd data)
  (let* ((start-time (current-milliseconds))
	 (result     (cond
		      ((member cmd '(ping goodbye)) ;; these are immediate
		       (send uconn host-port 'ping cmd data))
		      ((eq? (work-method) 'direct)
		       ;; the result from send will be the actual result, not an 'ack
		       (send uconn host-port 'direct cmd data))
		      (else
		       (case (return-method)
			 ((polling)
			  (send-via-polling uconn host-port cmd data))
			 ((mailbox) 
			  (send-via-mailbox uconn host-port cmd data))
			 (else
			  (print "ULEX ERROR: unrecognised return-method "(return-method)".")
			  #f))))))
    ;; this is ONLY for development and debugging. It will be removed once Ulex is stable.
    (if (< 5000 (- (current-milliseconds) start-time))
	(print "ULEX WARNING: round-trip took over 5 seconds; "
	       cmd", host-port="host-port", data="data))
    result))
    

;;======================================================================
;; responder side
;;======================================================================

;; take a request, rdat, and if not immediate put it in the work queue
;;
;; Reserved cmds; ack ping goodbye response
;;
(define (ulex-handler uconn rdat)
  (assert (list? rdat) "FATAL: ulex-handler give rdat as not list")
  (match rdat ;;  (string-split controldat)
    ((rem-host-port qrykey cmd params);; timedata)
     ;; (print "ulex-handler got: "rem-host-port" qrykey: "qrykey" cmd: "cmd" params: "params)
     (case cmd
       ;; ((ack )(print "Got ack! But why? Should NOT get here.") 'ack)
       ((ping)
	;; (print "Got Ping!")
	;; (add-to-work-queue uconn rdat)
	'ack)
       ((goodbye)
	;; just clear out references to the caller. NOT COMPLETE
	(add-to-work-queue uconn rdat)
	'ack)
       ((response) ;; this is a result from remote processing, send it as mail ...
	(case (return-method)
	  ((polling)
	   (hash-table-set! (udat-mboxes uconn) qrykey (cons 'ok params))
	   'ack)
	  ((mailbox)
	   (let ((mbox (hash-table-ref/default (udat-mboxes uconn) qrykey #f)))
	     (if mbox
		 (begin
		   (mailbox-send! mbox params) ;; params here is our result
		   'ack)
		 (begin
		   (print "ERROR: received result but no associated mbox for cookie "qrykey)
		   'no-mbox-found))))
	  (else (print "ULEX ERROR: unrecognised return-method "(return-method))
		'bad-return-method)))
       (else ;; generic request - hand it to the work queue
	(add-to-work-queue uconn rdat)
	'ack)))
    (else
     (print "ULEX ERROR: bad rdat "rdat)
     'bad-rdat)))

;; given an already set up uconn start the cmd-loop
;;
(define (ulex-cmd-loop uconn)
  (let* ((serv-listener (udat-socket uconn))
	 (listener      (lambda ()
			  (let loop ((state 'start))
			    (let-values (((inp oup)(tcp-accept serv-listener)))
			      (let* ((rdat  (deserialize inp)) ;; '(my-host-port qrykey cmd params)
				     (resp  (ulex-handler uconn rdat)))
				(if resp (serialize resp oup))
				(close-input-port inp)
				(close-output-port oup))
			      (loop state))))))
    ;; start N of them
    (let loop ((thnum   0)
	       (threads '()))
      (if (< thnum 100)
	  (let* ((th (make-thread listener (conc "listener" thnum))))
	    (thread-start! th)
	    (loop (+ thnum 1)
		  (cons th threads)))
	  (map thread-join! threads)))))

;; add a proc to the cmd list, these are done symetrically (i.e. in all instances)
;; so that the proc can be dereferenced remotely
;;
(define (set-work-handler uconn proc)
  (udat-work-proc-set! uconn proc))

;;======================================================================
;; work queues - this is all happening on the listener side
;;======================================================================

;; rdat is (rem-host-port qrykey cmd params)
					     
(define (add-to-work-queue uconn rdat)
  #;(queue-add! (udat-work-queue uconn) rdat)
  (case (work-method)
    ((threads)
     (thread-start! (make-thread (lambda ()
				   (do-work uconn rdat))
				 "worker thread")))
    ((mailbox)
     (mailbox-send! (udat-work-queue uconn) rdat))
    ((direct)
     (do-work uconn rdat))
    (else
     (print "ULEX ERROR: work-method "(work-method)" not recognised, using mailbox.")
     (mailbox-send! (udat-work-queue uconn) rdat))))
     
;; move the logic to return the result somewhere else?
;;
(define (do-work uconn rdat)
  (let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change
    ;; put this following into a do-work procedure
    (match rdat
      ((rem-host-port qrykey cmd params)
       (let* ((start-time (current-milliseconds))
	      (result (proc rem-host-port qrykey cmd params))
	      (end-time (current-milliseconds))
	      (run-time (- end-time start-time)))
	 (case (work-method)
	   ((direct) result)
	   (else
	    (print "ULEX: work "cmd", "params" done in "run-time" ms")
	    ;; send 'response as cmd and result as params
	    (send uconn rem-host-port qrykey 'response result) ;; could check for ack
	    (print "ULEX: response sent back to "rem-host-port" in "(- (current-milliseconds) end-time))))))
      (MBOX_TIMEOUT #f)
      (else
       (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params")))))

;; NEW APPROACH:
;;   
(define (process-work-queue uconn) 
  (let ((wqueue (udat-work-queue uconn))
	(proc   (udat-work-proc  uconn))
	(numthr (udat-numthreads uconn)))
    (let loop ((thnum    1)
	       (threads '()))
      (let ((thlst (cons (make-thread (lambda ()
					(let work-loop ()
					  (let ((rdat (mailbox-receive! wqueue 24000 'MBOX_TIMEOUT)))
					    (do-work uconn rdat))
					  (work-loop)))
				      (conc "work thread " thnum))
			 threads)))
	(if (< thnum numthr)
	    (loop (+ thnum 1)
		  thlst)
	    (begin
	      (print "ULEX: Starting "(length thlst)" worker threads.")
	      (map thread-start! thlst)
	      (print "ULEX: Threads started. Joining all.")
	      (map thread-join! thlst)))))))

;; below was to enable re-use of connections. This seems non-trivial so for
;; now lets open on each call
;;
;; ;; given host-port get or create peer struct
;; ;;
;; (define (udat-get-peer uconn host-port)
;;   (or (hash-table-ref/default (udat-peers uconn) host-port #f)
;;       ;; no peer, so create pdat and init it
;;       
;;       ;; NEED stack of connections, pop and use; inp, oup,
;;       ;; creation_time (remove and create new if over 24hrs old
;;       ;; 
;;       (let ((pdat (make-pdat host-port: host-port)))
;; 	(hash-table-set! (udat-peers uconn) host-port pdat)
;; 	pdat)))
;; 
;; ;; is pcon alive
;; 
;; ;; given host-port and pdat get a pcon
;; ;;
;; (define (pdat-get-pcon pdat host-port)
;;   (let loop ((conns (pdat-conns pdat)))
;;     (if (null? conns) ;; none? make and return - do NOT add - it will be pushed back on list later
;; 	(init-pcon (make-pcon))
;; 	(let* ((conn (pop conns)))
;; 	  
;; ;; given host-port get a pcon struct
;; ;;
;; (define (udat-get-pcon 
      
;;======================================================================
;; misc utils
;;======================================================================

(define (make-cookie uconn)
  (let ((newcnum (+ (udat-cnum uconn) 1)))
    (udat-cnum-set! uconn newcnum)
    (conc (udat-host-port uconn) ":"
	  newcnum)))

;; cookie/mboxes

;; we store each mbox with a cookie (<cookie> . <mbox>)
;;
(define (get-cmbox uconn)
  (if (null? (udat-avail-cmboxes uconn))
      (let ((cookie (make-cookie uconn))
	    (mbox   (make-mailbox)))
	(hash-table-set! (udat-mboxes uconn) cookie mbox)
	`(,cookie . ,mbox))
      (let ((cmbox (car (udat-avail-cmboxes uconn))))
	(udat-avail-cmboxes-set! uconn (cdr (udat-avail-cmboxes uconn)))
	cmbox)))

(define (put-cmbox uconn cmbox)
  (udat-avail-cmboxes-set! uconn (cons cmbox (udat-avail-cmboxes uconn))))

(define (pp-uconn uconn)
  (pp (udat->alist uconn)))

  
;;======================================================================
;; network utilities
;;======================================================================

;; NOTE: Look at address-info egg as alternative to some of this

(define (rate-ip ipaddr)
  (regex-case ipaddr
    ( "^127\\..*" _ 0 )
    ( "^(10\\.0|192\\.168)\\..*" _ 1 )
    ( else 2 ) ))

;; Change this to bias for addresses with a reasonable broadcast value?
;;
(define (ip-pref-less? a b)
  (> (rate-ip a) (rate-ip b)))

(define (get-my-best-address)
  (let ((all-my-addresses (get-all-ips)))
    (cond
     ((null? all-my-addresses)
      (get-host-name))                                          ;; no interfaces?
     ((eq? (length all-my-addresses) 1)
      (car all-my-addresses))                      ;; only one to choose from, just go with it
     (else
      (car (sort all-my-addresses ip-pref-less?))))))

(define (get-all-ips-sorted)
  (sort (get-all-ips) ip-pref-less?))

(define (get-all-ips)
  (map address-info-host
       (filter (lambda (x)
		 (equal? (address-info-type x) "tcp"))
	       (address-infos (get-host-name)))))

)