Megatest

ulex.scm at [366b1b75fd]
Login

File ulex-none/ulex.scm artifact 4551982255 part of check-in 366b1b75fd


;; ulex: Distributed sqlite3 db
;;;
;; Copyright (C) 2018-2021 Matt Welland
;; Redistribution and use in source and binary forms, with or without
;; modification, is permitted.
;;
;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS
;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
;; ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE
;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
;; OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
;; BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
;; LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
;; USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
;; DAMAGE.

;;======================================================================
;; ABOUT:
;;   See README in the distribution at https://www.kiatoa.com/fossils/ulex
;; NOTES:
;;   Why sql-de-lite and not say, dbi?  - performance mostly, then simplicity.
;;
;;======================================================================

(module ulex
	*
	#;(
     
     ;; NOTE: looking for the handler proc - find the run-listener :)
     
     run-listener     ;; (run-listener handler-proc [port]) => uconn

     ;; NOTE: handler-proc params;
     ;;       (handler-proc rem-host-port qrykey cmd params)

     send-receive     ;; (send-receive uconn host-port cmd data)

     ;; NOTE: cmd can be any plain text symbol except for these;
     ;;         'ping 'ack 'goodbye 'response
     
     set-work-handler ;; (set-work-handler proc)

     wait-and-close   ;; (wait-and-close uconn)

     ulex-listener?
     
     ;; needed to get the interface:port that was automatically found
     udat-port
     udat-host-port
     
     ;; for testing only
     ;; pp-uconn
     
     ;; parameters
     work-method   ;; parameter; 'threads, 'mailbox, 'limited, 'direct
     return-method ;; parameter; 'mailbox, 'polling, 'direct
     )

(import scheme
	chicken.base
	chicken.file
	chicken.io
	chicken.time
	chicken.condition
	chicken.string
	chicken.sort
	chicken.pretty-print
	
	address-info
	mailbox
	matchable
	;; queues
	regex
	regex-case
	simple-exceptions
	s11n
	srfi-1
	srfi-18
	srfi-4
	srfi-69
	system-information
	tcp6
	typed-records
	)

;; ;; udat struct, used by both caller and callee
;; ;; instantiated as uconn by convention
;; ;;
;; (defstruct udat
;;   ;; the listener side
;;   (port #f)
;;   (host-port #f)
;;   (socket #f)
;;   ;; the peers
;;   (peers  (make-hash-table)) ;; host:port->peer
;;   ;; work handling
;;   (work-queue (make-mailbox))
;;   (work-proc  #f)                ;; set by user
;;   (cnum       0)                 ;; cookie number
;;   (mboxes     (make-hash-table)) ;; for the replies
;;   (avail-cmboxes '())            ;; list of (<cookie> . <mbox>) for re-use
;;   ;; threads
;;   (numthreads 10)
;;   (cmd-thread #f)
;;   (work-queue-thread #f)
;;   (num-threads-running 0)
;;   ) 
;; 
;; ;; Parameters
;; 
;; ;; work-method:
;; (define work-method (make-parameter 'mailbox))
;; ;;    mailbox - all rdat goes through mailbox
;; ;;    threads - all rdat immediately executed in new thread
;; ;;    direct  - no queuing
;; ;;
;; 
;; ;; return-method, return the result to waiting send-receive:
;; (define return-method (make-parameter 'mailbox))
;; ;;    mailbox - create a mailbox and use it for passing returning results to send-receive
;; ;;    polling - put the result in a hash table keyed by qrykey and send-receive can poll it for result
;; ;;    direct  - no queuing, result is passed back in single tcp connection
;; ;;
;; 
;; ;; ;; struct for keeping track of others we are talking to
;; ;; ;;
;; ;; (defstruct pdat
;; ;;   (host-port  #f)
;; ;;   (conns      '()) ;; list of pcon structs, pop one off when calling the peer
;; ;;   )
;; ;; 
;; ;; ;; struct for peer connections, keep track of expiration etc.
;; ;; ;;
;; ;; (defstruct pcon
;; ;;   (inp #f)
;; ;;   (oup #f)
;; ;;   (exp (+ (current-seconds) 59)) ;; expires at this time, set to (+ (current-seconds) 59)
;; ;;   (lifetime (+ (current-seconds) 600)) ;; throw away and create new after five minutes
;; ;;   )
;; 
;; ;;======================================================================
;; ;; listener
;; ;;======================================================================
;; 
;; ;; is uconn a ulex connector (listener)
;; ;;
;; (define (ulex-listener? uconn)
;;   (udat? uconn))
;; 
;; ;; create a tcp listener and return a populated udat struct with
;; ;; my port, address, hostname, pid etc.
;; ;; return #f if fail to find a port to allocate.
;; ;;
;; ;;  if udata-in is #f create the record
;; ;;  if there is already a serv-listener return the udata
;; ;;
;; (define (setup-listener uconn #!optional (port 4242))
;;   (handle-exceptions
;;    exn
;;    (if (< port 65535)
;;        (setup-listener uconn (+ port 1))
;;        #f)
;;    (connect-listener uconn port)))
;; 
;; (define (connect-listener uconn port)
;;   ;; (tcp-listener-socket LISTENER)(socket-name so)
;;   ;; sockaddr-address, sockaddr-port, sockaddr->string
;;   (let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]])
;; 	 (addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname)))
;;     (udat-port-set!      uconn port)
;;     (udat-host-port-set! uconn (conc addr":"port))
;;     (udat-socket-set!    uconn tlsn)
;;     uconn))
;; 
;; ;; run-listener does all the work of starting a listener in a thread
;; ;; it then returns control
;; ;;
;; (define (run-listener handler-proc #!optional (port-suggestion 4242))
;;   (let* ((uconn (make-udat)))
;;     (udat-work-proc-set! uconn handler-proc)
;;     (if (setup-listener uconn port-suggestion)
;; 	(let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop"))
;; 	       (th2 (make-thread (lambda ()
;; 				   (case (work-method)
;; 				     ((mailbox limited)
;; 				      (process-work-queue uconn))))
;; 				 "Ulex work queue processor")))
;; 	  ;; (tcp-buffer-size 2048)
;; 	  (thread-start! th1)
;; 	  (thread-start! th2)
;; 	  (udat-cmd-thread-set! uconn th1)
;; 	  (udat-work-queue-thread-set! uconn th2)
;; 	  (print "cmd loop and process workers started, listening on "(udat-host-port uconn)".")
;; 	  uconn)
;; 	(assert #f "ERROR: run-listener called without proper setup."))))
;; 
;; (define (wait-and-close uconn)
;;   (thread-join! (udat-cmd-thread uconn))
;;   (tcp-close (udat-socket uconn)))
;; 
;; ;;======================================================================
;; ;; peers and connections
;; ;;======================================================================
;; 
;; (define *send-mutex* (make-mutex))
;; 
;; ;; send structured data to recipient
;; ;;
;; ;;  NOTE: qrykey is what was called the "cookie" previously
;; ;;
;; ;;     retval tells send to expect and wait for return data (one line) and return it or time out
;; ;;       this is for ping where we don't want to necessarily have set up our own server yet.
;; ;;
;; ;; NOTE: see below for beginnings of code to allow re-use of tcp connections
;; ;;        - I believe (without substantial evidence) that re-using connections will
;; ;;          be beneficial ...
;; ;;
;; (define (send udata host-port qrykey cmd params)
;;   (let* ((my-host-port (udat-host-port udata))          ;; remote will return to this
;; 	 (isme         #f #;(equal? host-port my-host-port)) ;; calling myself?
;; 	 ;; dat is a self-contained work block that can be sent or handled locally
;; 	 (dat          (list my-host-port qrykey cmd params #;(cons (current-seconds)(current-milliseconds)))))
;;     (cond
;;      (isme (ulex-handler udata dat)) ;; no transmission needed
;;      (else
;;       (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC?
;; 	  exn
;; 	  (message exn)
;; 	(begin
;; 	  ;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP
;; 	  (let-values (((inp oup)(tcp-connect host-port)))
;; 	    (let ((res (if (and inp oup)
;; 			   (begin
;; 			     (serialize dat oup)
;; 			     (close-output-port oup)
;; 			     (deserialize inp)
;; 			     )
;; 			   (begin
;; 			     (print "ERROR: send called but no receiver has been setup. Please call setup first!")
;; 			     #f))))
;; 	      (close-input-port inp)
;; 	      ;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP
;; 	      res)))))))) ;; res will always be 'ack unless return-method is direct
;; 
;; (define (send-via-polling uconn host-port cmd data)
;;   (let* ((qrykey (make-cookie uconn))
;; 	 (sres   (send uconn host-port qrykey cmd data)))
;;     (case sres
;;       ((ack)
;;        (let loop ((start-time (current-milliseconds)))
;; 	 (if (> (current-milliseconds)(+ start-time 10000)) ;; ten seconds timeout
;; 	     (begin
;; 	       (print "ULEX ERROR: timed out waiting for response from "host-port", "cmd" "data)
;; 	       #f)
;; 	     (let* ((result (hash-table-ref/default (udat-mboxes uconn) qrykey #f))) ;; NOTE: we are re-using mboxes hash
;; 	       (if result ;; result is '(status . result-data) or #f for nothing yet
;; 		   (begin
;; 		     (hash-table-delete! (udat-mboxes uconn) qrykey)
;; 		     (cdr result))
;; 		   (begin
;; 		     (thread-sleep! 0.01)
;; 		     (loop start-time)))))))
;;       (else
;;        (print "ULEX ERROR: Communication failed? sres="sres)
;;        #f))))
;; 
;; (define (send-via-mailbox uconn host-port cmd data)
;;   (let* ((cmbox     (get-cmbox uconn)) ;; would it be better to keep a stack of mboxes to reuse?
;; 	 (qrykey    (car cmbox))
;; 	 (mbox      (cdr cmbox))
;; 	 (mbox-time (current-milliseconds))
;; 	 (sres      (send uconn host-port qrykey cmd data))) ;; short res
;;     (if (eq? sres 'ack)
;; 	(let* ((mbox-timeout-secs    120 #;(if (eq? 'primordial (thread-name (current-thread)))
;; 				     #f
;; 				     120)) ;; timeout)
;; 	       (mbox-timeout-result 'MBOX_TIMEOUT)
;; 	       (res                  (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result))
;; 	       (mbox-receive-time    (current-milliseconds)))
;; 	  ;; (put-cmbox uconn cmbox) ;; reuse mbox and cookie. is it worth it?
;; 	  (hash-table-delete! (udat-mboxes uconn) qrykey)
;; 	  (if (eq? res 'MBOX_TIMEOUT)
;; 	      (begin
;; 		(print "WARNING: mbox timed out for query "cmd", with data "data
;; 		       ", waiting for response from "host-port".")
;; 
;; 		;; here it might make sense to clean up connection records and force clean start?
;; 		;; NO. The progam using ulex needs to do the reset. Right thing here is exception
;; 		
;; 		#f)  ;; convert to raising exception?
;; 	      res))
;; 	(begin
;; 	  (print "ERROR: Communication failed? Got "sres)
;; 	  #f))))
;;   
;; ;; send a request to the given host-port and register a mailbox in udata
;; ;; wait for the mailbox data and return it
;; ;;
;; (define (send-receive uconn host-port cmd data)
;;   (let* ((start-time (current-milliseconds))
;; 	 (result     (cond
;; 		      ((member cmd '(ping goodbye)) ;; these are immediate
;; 		       (send uconn host-port 'ping cmd data))
;; 		      ((eq? (work-method) 'direct)
;; 		       ;; the result from send will be the actual result, not an 'ack
;; 		       (send uconn host-port 'direct cmd data))
;; 		      (else
;; 		       (case (return-method)
;; 			 ((polling)
;; 			  (send-via-polling uconn host-port cmd data))
;; 			 ((mailbox) 
;; 			  (send-via-mailbox uconn host-port cmd data))
;; 			 (else
;; 			  (print "ULEX ERROR: unrecognised return-method "(return-method)".")
;; 			  #f)))))
;; 	 (duration    (- (current-milliseconds) start-time)))
;;     ;; this is ONLY for development and debugging. It will be removed once Ulex is stable.
;;     (if (< 5000 duration)
;; 	(print "ULEX WARNING: round-trip took "(inexact->exact (round (/ duration 1000)))
;; 	       " seconds; "cmd", host-port="host-port", data="data))
;;     result))
;;     
;; 
;; ;;======================================================================
;; ;; responder side
;; ;;======================================================================
;; 
;; ;; take a request, rdat, and if not immediate put it in the work queue
;; ;;
;; ;; Reserved cmds; ack ping goodbye response
;; ;;
;; (define (ulex-handler uconn rdat)
;;   (assert (list? rdat) "FATAL: ulex-handler give rdat as not list")
;;   (match rdat ;;  (string-split controldat)
;;     ((rem-host-port qrykey cmd params);; timedata)
;;      ;; (print "ulex-handler got: "rem-host-port" qrykey: "qrykey" cmd: "cmd" params: "params)
;;      (case cmd
;;        ;; ((ack )(print "Got ack! But why? Should NOT get here.") 'ack)
;;        ((ping)
;; 	;; (print "Got Ping!")
;; 	;; (add-to-work-queue uconn rdat)
;; 	'ack)
;;        ((goodbye)
;; 	;; just clear out references to the caller. NOT COMPLETE
;; 	(add-to-work-queue uconn rdat)
;; 	'ack)
;;        ((response) ;; this is a result from remote processing, send it as mail ...
;; 	(case (return-method)
;; 	  ((polling)
;; 	   (hash-table-set! (udat-mboxes uconn) qrykey (cons 'ok params))
;; 	   'ack)
;; 	  ((mailbox)
;; 	   (let ((mbox (hash-table-ref/default (udat-mboxes uconn) qrykey #f)))
;; 	     (if mbox
;; 		 (begin
;; 		   (mailbox-send! mbox params) ;; params here is our result
;; 		   'ack)
;; 		 (begin
;; 		   (print "ERROR: received result but no associated mbox for cookie "qrykey)
;; 		   'no-mbox-found))))
;; 	  (else (print "ULEX ERROR: unrecognised return-method "(return-method))
;; 		'bad-return-method)))
;;        (else ;; generic request - hand it to the work queue
;; 	(add-to-work-queue uconn rdat)
;; 	'ack)))
;;     (else
;;      (print "ULEX ERROR: bad rdat "rdat)
;;      'bad-rdat)))
;; 
;; ;; given an already set up uconn start the cmd-loop
;; ;;
;; (define (ulex-cmd-loop uconn)
;;   (let* ((serv-listener (udat-socket uconn))
;; 	 (listener      (lambda ()
;; 			  (let loop ((state 'start))
;; 			    (let-values (((inp oup)(tcp-accept serv-listener)))
;; 			      ;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP
;; 			      (let* ((rdat  (deserialize inp)) ;; '(my-host-port qrykey cmd params)
;; 				     (resp  (ulex-handler uconn rdat)))
;; 				(serialize resp oup)
;; 				(close-input-port inp)
;; 				(close-output-port oup)
;; 				;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP
;; 				)
;; 			      (loop state))))))
;;     ;; start N of them
;;     (let loop ((thnum   0)
;; 	       (threads '()))
;;       (if (< thnum 100)
;; 	  (let* ((th (make-thread listener (conc "listener" thnum))))
;; 	    (thread-start! th)
;; 	    (loop (+ thnum 1)
;; 		  (cons th threads)))
;; 	  (map thread-join! threads)))))
;; 
;; ;; add a proc to the cmd list, these are done symetrically (i.e. in all instances)
;; ;; so that the proc can be dereferenced remotely
;; ;;
;; (define (set-work-handler uconn proc)
;;   (udat-work-proc-set! uconn proc))
;; 
;; ;;======================================================================
;; ;; work queues - this is all happening on the listener side
;; ;;======================================================================
;; 
;; ;; rdat is (rem-host-port qrykey cmd params)
;; 					     
;; (define (add-to-work-queue uconn rdat)
;;   #;(queue-add! (udat-work-queue uconn) rdat)
;;   (case (work-method)
;;     ((threads)
;;      (thread-start! (make-thread (lambda ()
;; 				   (do-work uconn rdat))
;; 				 "worker thread")))
;;     ((mailbox)
;;      (mailbox-send! (udat-work-queue uconn) rdat))
;;     ((direct)
;;      (do-work uconn rdat))
;;     (else
;;      (print "ULEX ERROR: work-method "(work-method)" not recognised, using mailbox.")
;;      (mailbox-send! (udat-work-queue uconn) rdat))))
;;      
;; ;; move the logic to return the result somewhere else?
;; ;;
;; (define (do-work uconn rdat)
;;   (let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change
;;     ;; put this following into a do-work procedure
;;     (match rdat
;;       ((rem-host-port qrykey cmd params)
;;        (let* ((start-time (current-milliseconds))
;; 	      (result (proc rem-host-port qrykey cmd params))
;; 	      (end-time (current-milliseconds))
;; 	      (run-time (- end-time start-time)))
;; 	 (case (work-method)
;; 	   ((direct) result)
;; 	   (else
;; 	    (print "ULEX: work "cmd", "params" done in "run-time" ms")
;; 	    ;; send 'response as cmd and result as params
;; 	    (send uconn rem-host-port qrykey 'response result) ;; could check for ack
;; 	    (print "ULEX: response sent back to "rem-host-port" in "(- (current-milliseconds) end-time))))))
;;       (MBOX_TIMEOUT 'do-work-timeout)
;;       (else
;;        (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params")))))
;; 
;; ;; NEW APPROACH:
;; ;;   
;; (define (process-work-queue uconn) 
;;   (let ((wqueue (udat-work-queue uconn))
;; 	(proc   (udat-work-proc  uconn))
;; 	(numthr (udat-numthreads uconn)))
;;     (let loop ((thnum    1)
;; 	       (threads '()))
;;       (let ((thlst (cons (make-thread (lambda ()
;; 					(let work-loop ()
;; 					  (let ((rdat (mailbox-receive! wqueue 24000 'MBOX_TIMEOUT)))
;; 					    (do-work uconn rdat))
;; 					  (work-loop)))
;; 				      (conc "work thread " thnum))
;; 			 threads)))
;; 	(if (< thnum numthr)
;; 	    (loop (+ thnum 1)
;; 		  thlst)
;; 	    (begin
;; 	      (print "ULEX: Starting "(length thlst)" worker threads.")
;; 	      (map thread-start! thlst)
;; 	      (print "ULEX: Threads started. Joining all.")
;; 	      (map thread-join! thlst)))))))
;; 
;; ;; below was to enable re-use of connections. This seems non-trivial so for
;; ;; now lets open on each call
;; ;;
;; ;; ;; given host-port get or create peer struct
;; ;; ;;
;; ;; (define (udat-get-peer uconn host-port)
;; ;;   (or (hash-table-ref/default (udat-peers uconn) host-port #f)
;; ;;       ;; no peer, so create pdat and init it
;; ;;       
;; ;;       ;; NEED stack of connections, pop and use; inp, oup,
;; ;;       ;; creation_time (remove and create new if over 24hrs old
;; ;;       ;; 
;; ;;       (let ((pdat (make-pdat host-port: host-port)))
;; ;; 	(hash-table-set! (udat-peers uconn) host-port pdat)
;; ;; 	pdat)))
;; ;; 
;; ;; ;; is pcon alive
;; ;; 
;; ;; ;; given host-port and pdat get a pcon
;; ;; ;;
;; ;; (define (pdat-get-pcon pdat host-port)
;; ;;   (let loop ((conns (pdat-conns pdat)))
;; ;;     (if (null? conns) ;; none? make and return - do NOT add - it will be pushed back on list later
;; ;; 	(init-pcon (make-pcon))
;; ;; 	(let* ((conn (pop conns)))
;; ;; 	  
;; ;; ;; given host-port get a pcon struct
;; ;; ;;
;; ;; (define (udat-get-pcon 
;;       
;; ;;======================================================================
;; ;; misc utils
;; ;;======================================================================
;; 
;; (define (make-cookie uconn)
;;   (let ((newcnum (+ (udat-cnum uconn) 1)))
;;     (udat-cnum-set! uconn newcnum)
;;     (conc (udat-host-port uconn) ":"
;; 	  newcnum)))
;; 
;; ;; cookie/mboxes
;; 
;; ;; we store each mbox with a cookie (<cookie> . <mbox>)
;; ;;
;; (define (get-cmbox uconn)
;;   (if (null? (udat-avail-cmboxes uconn))
;;       (let ((cookie (make-cookie uconn))
;; 	    (mbox   (make-mailbox)))
;; 	(hash-table-set! (udat-mboxes uconn) cookie mbox)
;; 	`(,cookie . ,mbox))
;;       (let ((cmbox (car (udat-avail-cmboxes uconn))))
;; 	(udat-avail-cmboxes-set! uconn (cdr (udat-avail-cmboxes uconn)))
;; 	cmbox)))
;; 
;; (define (put-cmbox uconn cmbox)
;;   (udat-avail-cmboxes-set! uconn (cons cmbox (udat-avail-cmboxes uconn))))
;; 
;; (define (pp-uconn uconn)
;;   (pp (udat->alist uconn)))
;; 
;;   
;; ;;======================================================================
;; ;; network utilities
;; ;;======================================================================
;; 
;; ;; NOTE: Look at address-info egg as alternative to some of this
;; 
;; (define (rate-ip ipaddr)
;;   (regex-case ipaddr
;;     ( "^127\\..*" _ 0 )
;;     ( "^(10\\.0|192\\.168)\\..*" _ 1 )
;;     ( else 2 ) ))
;; 
;; ;; Change this to bias for addresses with a reasonable broadcast value?
;; ;;
;; (define (ip-pref-less? a b)
;;   (> (rate-ip a) (rate-ip b)))
;; 
;; (define (get-my-best-address)
;;   (let ((all-my-addresses (get-all-ips)))
;;     (cond
;;      ((null? all-my-addresses)
;;       (get-host-name))                                          ;; no interfaces?
;;      ((eq? (length all-my-addresses) 1)
;;       (car all-my-addresses))                      ;; only one to choose from, just go with it
;;      (else
;;       (car (sort all-my-addresses ip-pref-less?))))))
;; 
;; (define (get-all-ips-sorted)
;;   (sort (get-all-ips) ip-pref-less?))
;; 
;; (define (get-all-ips)
;;   (map address-info-host
;;        (filter (lambda (x)
;; 		 (equal? (address-info-type x) "tcp"))
;; 	       (address-infos (get-host-name)))))
;; 
)