Megatest

rmtmod.scm at [fc8c642fb9]
Login

File rmtmod.scm artifact 798b93ee00 part of check-in fc8c642fb9


;;======================================================================
;; Copyright 2019, Matthew Welland.
;; 
;; This file is part of Megatest.
;; 
;;     Megatest is free software: you can redistribute it and/or modify
;;     it under the terms of the GNU General Public License as published by
;;     the Free Software Foundation, either version 3 of the License, or
;;     (at your option) any later version.
;; 
;;     Megatest is distributed in the hope that it will be useful,
;;     but WITHOUT ANY WARRANTY; without even the implied warranty of
;;     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
;;     GNU General Public License for more details.
;; 
;;     You should have received a copy of the GNU General Public License
;;     along with Megatest.  If not, see <http://www.gnu.org/licenses/>.

;;======================================================================

(declare (unit rmtmod))
(declare (uses commonmod))

(module rmtmod
	*
	
(import scheme chicken data-structures extras)
(import (prefix sqlite3 sqlite3:) posix typed-records srfi-18 srfi-69 format ports srfi-1 matchable)
(import commonmod)
(use (prefix ulex ulex:))

(include "common_records.scm")

;; Hack to make these functions visible to the refactored code, goal is to eliminate these over time.
;; (define (rmt:send-receive . params) #f)
;; (define (http-transport:close-connections . params) #f)
;; ;; from remote defstruct in common.scm
;; ;; (define (api:execute-requests . params) #f)
;; (define (http-transport:client-api-send-receive . params) #f)
;; (define (client:setup . params) #f)
;; (define (server:kind-run . params) #f)
;; (define (server:start-and-wait . params) #f)
;; (define (server:check-if-running . params) #f)
;; (define (server:ping . params) #f)
;; (define (common:force-server? . params) #f)
;; 'send-receive rmt:send-receive ...
#;(define (set-functions . alldata)
  (match
   alldata
   ((a b c d e f g h i j) ;; e f g h i j k l)
    (set! http-transport:client-api-send-receive a)
    (set! http-transport:close-connections       b)
    ;; (set! api:execute-requests                   c)
    ;; d
    (set! client:setup                           e)
    (set! server:kind-run                        f)
    (set! server:start-and-wait                  g)
    (set! server:check-if-running                h)
    (set! server:ping                            i)
    (set! common:force-server?                   j)
    )))

(define (rmt:open-qry-close-locally log-port multi-sync-mutex cmd run-id params alldat #!key (remretries 5))
  (let* ((ro-queries     (alldat-read-only-queries alldat))
	 (qry-is-write   (not (member cmd ro-queries)))
	 (db-file-path   (exec-fn 'db:dbfile-path)) ;;  0))
	 (dbstruct-local (exec-fn 'db:setup #t))  ;; make-dbr:dbstruct path:  dbdir local: #t)))
	 (read-only      (not (file-write-access? db-file-path)))
	 (start          (current-milliseconds))
	 (resdat         (if (not (and read-only qry-is-write))
			     (let ((v (exec-fn 'api:execute-requests dbstruct-local (vector (symbol->string cmd) params))))
			       (handle-exceptions ;; there has been a long history of receiving strange errors from values returned by the client when things go wrong..
				exn               ;;  This is an attempt to detect that situation and recover gracefully
				(begin
				  (debug:print 0 log-port "ERROR: bad data from server " v " message: "  ((condition-property-accessor 'exn 'message) exn))
				  (vector #t '())) ;; should always get a vector but if something goes wrong return a dummy
				(if (and (vector? v)
					 (> (vector-length v) 1))
				    (let ((newvec (vector (vector-ref v 0)(vector-ref v 1))))
				      newvec)           ;; by copying the vector while inside the error handler we should force the detection of a corrupted record
				    (vector #t '()))))  ;; we could also check that the returned types are valid
			     (vector #t '())))
	 (success        (vector-ref resdat 0))
	 (res            (vector-ref resdat 1))
	 (duration       (- (current-milliseconds) start)))
    (if (and read-only qry-is-write)
        (debug:print 0 log-port "ERROR: attempt to write to read-only database ignored. cmd=" cmd))
    (if (not success)
	(if (> remretries 0)
	    (begin
	      (debug:print-error 0 log-port "local query failed. Trying again.")
	      (thread-sleep! (/ (random 5000) 1000)) ;; some random delay 
	      (rmt:open-qry-close-locally log-port multi-sync-mutex cmd run-id params alldat remretries: (- remretries 1)))
	    (begin
	      (debug:print-error 0 log-port "too many retries in rmt:open-qry-close-locally, giving up")
	      #f))
	(begin
	  ;; (rmt:update-db-stats run-id cmd params duration)
	  ;; mark this run as dirty if this was a write, the watchdog is responsible for syncing it
	  #;(if qry-is-write
	      (let ((start-time (current-seconds)))
		(mutex-lock! multi-sync-mutex)
		(set! *db-last-access* start-time)  ;; THIS IS PROBABLY USELESS? (we are on a client)
                (mutex-unlock! multi-sync-mutex)))))
    res))



(define (rmtmod:calc-ro-mode runremote toppath)
  (if (and runremote
	   (remote-ro-mode-checked runremote))
      (remote-ro-mode runremote)
      (let* ((dbfile  (conc toppath "/megatest.db"))
	     (ro-mode (not (file-write-access? dbfile)))) ;; TODO: use dbstruct or runremote to figure this out in future
	(if runremote
	    (begin
	      (remote-ro-mode-set! runremote ro-mode)
	      (remote-ro-mode-checked-set! runremote #t)
	      ro-mode)
	    ro-mode))))

(define (extras-readonly-mode rmt-mutex log-port cmd params)
  ;;(mutex-unlock! rmt-mutex)
  (debug:print-info 12 log-port "rmt:send-receive, case 3")
  (debug:print 0 log-port "WARNING: write transaction requested on a readonly area.  cmd="cmd" params="params)
  #f)

(define (extras-transport-failed log-port rmt-mutex attemptnum runremote areapath cmd rid params alldat)
  (debug:print 0 log-port "WARNING: communication failed. Trying again, try num: " attemptnum)
  ;;(mutex-lock! rmt-mutex)
  (remote-conndat-set!    runremote #f)
  (exec-fn 'http-transport:close-connections area-dat: runremote)
  (remote-server-url-set! runremote #f)
  ;;(mutex-unlock! rmt-mutex)
  (debug:print-info 12 log-port "rmt:send-receive, case  9.1")
  (rmt:send-receive-orig log-port runremote rmt-mutex areapath cmd rid params alldat attemptnum: (+ attemptnum 1)))
  
(define (extras-transport-succeded log-port rmt-mutex attemptnum runremote areapath res params rid cmd alldat)
  (if (and (vector? res)
	   (eq? (vector-length res) 2)
	   (eq? (vector-ref res 1) 'overloaded)) ;; since we are
						 ;; looking at the
						 ;; data to carry the
						 ;; error we'll use a
						 ;; fairly obtuse
						 ;; combo to minimise
						 ;; the chances of
						 ;; some sort of
						 ;; collision.  this
						 ;; is the case where
						 ;; the returned data
						 ;; is bad or the
						 ;; server is
						 ;; overloaded and we
						 ;; want to ease off
						 ;; the queries
      (let ((wait-delay (+ attemptnum (* attemptnum 10))))
	(debug:print 0 log-port "WARNING: server is overloaded. Delaying " wait-delay " seconds and trying call again.")
	;;(mutex-lock! rmt-mutex)
	(exec-fn 'http-transport:close-connections area-dat: runremote)
	;; (set! *runremote* #f) ;; force starting over
	(remote-server-url-set! runremote #f) ;; I am hoping this will force a redo on server connection. NOT TESTED
	;;(mutex-unlock! rmt-mutex)
	(thread-sleep! wait-delay)
	(rmt:send-receive-orig log-port runremote rmt-mutex areapath cmd rid params alldat attemptnum: (+ attemptnum 1)))
      res)) ;; All good, return res

;; RA => e.g. usage (rmt:send-receive 'get-var #f (list varname))
;;
;;  add multi-sync-mutex 
;;
(define (rmt:send-receive-orig log-port runremote rmt-mutex toppath multi-sync-mutex cmd rid params alldat #!key (attemptnum 1)(area-dat #f)) ;; start attemptnum at 1 so the modulo below works as expected

  #;(common:telemetry-log (conc "rmt:"(->string cmd))
                        payload: `((rid . ,rid)
                                   (params . ,params)))
                          
  
  ;; do all the prep locked under the rmt-mutex
  ;;(mutex-lock! rmt-mutex)
  
  ;; 1. check if server is started IFF cmd is a write OR if we are not on the homehost, store in runremote
  ;; 2. check the age of the connections. refresh the connection if it is older than timeout-20 seconds.
  ;; 3. do the query, if on homehost use local access
  ;;
  (let* ((start-time    (current-seconds)) ;; snapshot time so all use cases get same value

	 (readonly-mode (rmtmod:calc-ro-mode runremote toppath)))

    ;; (assert (not (pair? (remote-hh-dat runremote))))
    
    ;;(print "BB> readonly-mode is "readonly-mode" dbfile is "dbfile)
    (cond
     ;; give up if more than 15 attempts
     ((> attemptnum 15)
      (debug:print 0 log-port "ERROR: 15 tries to start/connect to server. Giving up.")
      (exit 1))

     ;; readonly mode, read request-  handle it - case 2
     ((and readonly-mode
           (member cmd api:read-only-queries)) 
      ;; (mutex-unlock! rmt-mutex)
      (debug:print-info 12 log-port "rmt:send-receive, case 2")
      (rmt:open-qry-close-locally log-port multi-sync-mutex cmd 0 params alldat)
      )

     ;; readonly mode, write request.  Do nothing, return #f
     (readonly-mode (extras-readonly-mode rmt-mutex log-port cmd params))

     ;; This block was for pre-emptively resetting the connection if there had been no communication for some time.
     ;; I don't think it adds any value. If the server is not there, just fail and start a new connection.
     ;; also, the expire-time calculation might not be correct. We want, time-since-last-server-access > (server:get-timeout)
     ;;
     ;; reset the connection if it has been unused too long
     ((and runremote
           (remote-conndat runremote)
	   (> (current-seconds) ;; if it has been more than server-timeout seconds since last contact, close this connection and start a new on
	      (+ (http-transport:server-dat-get-last-access (remote-conndat runremote))
		 (remote-server-timeout runremote))))
      (debug:print-info 0 log-port "Connection to " (remote-server-url runremote) " expired due to no accesses, forcing new connection.")
      (exec-fn 'http-transport:close-connections area-dat: runremote)
      (remote-conndat-set! runremote #f) ;; invalidate the connection, thus forcing a new connection.
      ;; (mutex-unlock! rmt-mutex)
      (rmt:send-receive-orig log-port runremote rmt-mutex toppath multi-sync-mutex cmd rid params alldat attemptnum: attemptnum))
     

     ;; on homehost and this is a read
     ((and (not (remote-force-server runremote)) ;; honor forced use of server, i.e. server NOT required
	   (pair? (remote-hh-dat runremote))
	   (cdr (remote-hh-dat runremote))       ;; on homehost
           (member cmd api:read-only-queries))   ;; this is a read
      ;; (mutex-unlock! rmt-mutex)
      (debug:print-info 12 log-port "rmt:send-receive, case  5")
      (rmt:open-qry-close-locally log-port multi-sync-mutex cmd 0 params alldat))

     ;; on homehost and this is a write, we already have a server, but server has died
     ((and (cdr (remote-hh-dat runremote))           ;; on homehost
           (not (member cmd api:read-only-queries))  ;; this is a write
           (remote-server-url runremote)             ;; have a server
           (not (exec-fn 'server:ping (remote-server-url runremote))))  ;; server has died. NOTE: this is not a cheap call! Need better approach.
      ;; (set! *runremote* (make-remote)) ;; WARNING - broken this.
      (remote-force-server-set! runremote (exec-fn 'common:force-server?))
      ;; (mutex-unlock! rmt-mutex)
      (debug:print-info 12 log-port "rmt:send-receive, case  6")
      (rmt:send-receive-orig log-port runremote rmt-mutex toppath  multi-sync-mutex cmd rid params alldat attemptnum: attemptnum))

     ;; on homehost and this is a write, we already have a server
     ((and (not (remote-force-server runremote))     ;; honor forced use of server, i.e. server NOT required
	   (cdr (remote-hh-dat runremote))           ;; on homehost
           (not (member cmd api:read-only-queries))  ;; this is a write
           (remote-server-url runremote))            ;; have a server
      ;;(mutex-unlock! rmt-mutex)
      (debug:print-info 12 log-port "rmt:send-receive, case  4.1")
      (rmt:open-qry-close-locally  log-port multi-sync-mutex cmd 0 params alldat))

     ;;  on homehost, no server contact made and this is a write, passively start a server 
     ((and (not (remote-force-server runremote))     ;; honor forced use of server, i.e. server NOT required
	   (cdr (remote-hh-dat runremote))           ;; have homehost
           (not (remote-server-url runremote))       ;; no connection yet
	   (not (member cmd api:read-only-queries))) ;; not a read-only query
      (debug:print-info 12 log-port "rmt:send-receive, case  8")
      (let ((server-url  (exec-fn 'server:check-if-running toppath))) ;; (server:read-dotserver->url toppath))) ;; (server:check-if-running toppath))) ;; Do NOT want to run server:check-if-running - very expensive to do for every write call
	(if server-url
	    (remote-server-url-set! runremote server-url) ;; the string can be consumed by the client setup if needed
	    (if (exec-fn 'common:force-server?)
		(exec-fn 'server:start-and-wait toppath)
		(exec-fn 'server:kind-run toppath))))
      (remote-force-server-set! runremote (exec-fn 'common:force-server?))
      ;; (mutex-unlock! rmt-mutex)
      (debug:print-info 12 log-port "rmt:send-receive, case  8.1")
      (rmt:open-qry-close-locally  log-port multi-sync-mutex cmd 0 params alldat))

     ((or (and (remote-force-server runremote)              ;; we are forcing a server and don't yet have a connection to one
	       (not (remote-conndat runremote)))
	  (and (not (cdr (remote-hh-dat runremote)))        ;; not on a homehost 
	       (not (remote-conndat runremote))))           ;; and no connection
      (debug:print-info 12 log-port "rmt:send-receive, case 9, hh-dat: " (remote-hh-dat runremote) " conndat: " (remote-conndat runremote))
      ;;(mutex-unlock! rmt-mutex)
      (if (not (exec-fn 'server:check-if-running toppath)) ;; who knows, maybe one has started up?
	  (exec-fn 'server:start-and-wait toppath))
      (remote-conndat-set! runremote (rmt:get-connection-info runremote toppath)) ;; calls client:setup which calls client:setup-http
      (rmt:send-receive-orig log-port runremote rmt-mutex toppath multi-sync-mutex cmd rid params alldat attemptnum: attemptnum)) ;; TODO: add back-off timeout as

     ;; all set up if get this far, dispatch the query
     ((and (not (remote-force-server runremote))
	   (cdr (remote-hh-dat runremote))) ;; we are on homehost
      ;;(mutex-unlock! rmt-mutex)
      (debug:print-info 12 log-port "rmt:send-receive, case 10")
      (rmt:open-qry-close-locally  log-port multi-sync-mutex cmd (if rid rid 0) params alldat))

     ;; not on homehost, do server query
     (else (extras-case-11 log-port rmt-mutex runremote toppath cmd params attemptnum rid alldat)))))

(define (extras-case-11 log-port rmt-mutex runremote areapath cmd params attemptnum rid alldat)
  ;; (mutex-unlock! rmt-mutex)
  (debug:print-info 12 log-port "rmt:send-receive, case  9")
  ;; (mutex-lock! rmt-mutex)
  (let* ((conninfo (remote-conndat runremote))
	 (dat      (case (remote-transport runremote)
		     ((http) (condition-case ;; handling here has
					     ;; caused a lot of
					     ;; problems. However it
					     ;; is needed to deal with
					     ;; attemtped
					     ;; communication to
					     ;; servers that have gone
					     ;; away
			      (exec-fn 'http-transport:client-api-send-receive 0 conninfo cmd params)
			      ((commfail)(vector #f "communications fail"))
			      ((exn)(vector #f "other fail" (print-call-chain)))))
		     (else
		      (debug:print 0 log-port "ERROR: transport " (remote-transport runremote) " not supported")
		      (exit))))
	 (success  (if (vector? dat) (vector-ref dat 0) #f))
	 (res      (if (vector? dat) (vector-ref dat 1) #f)))
    (if (and (vector? conninfo) (< 5 (vector-length conninfo)))
	(http-transport:server-dat-update-last-access conninfo) ;; refresh access time
	(begin
	  (debug:print 0 log-port "INFO: Should not get here! conninfo=" conninfo)
	  (set! conninfo #f)
	  (remote-conndat-set! runremote #f) ;; NOTE: *runremote* is global copy of runremote. Purpose: factor out global.
	  (exec-fn 'http-transport:close-connections  area-dat: runremote)))
    (debug:print-info 13 log-port "rmt:send-receive, case  9. conninfo=" conninfo " dat=" dat " runremote = " runremote)
    ;; (mutex-unlock! rmt-mutex)
    (if success ;; success only tells us that the transport was
	;; successful, have to examine the data to see if
	;; there was a detected issue at the other end
	(extras-transport-succeded log-port rmt-mutex attemptnum runremote areapath res params rid cmd alldat)
	(extras-transport-failed log-port rmt-mutex attemptnum runremote areapath cmd rid params alldat)
	)))

;; if a server is either running or in the process of starting call client:setup
;; else return #f to let the calling proc know that there is no server available
;;
(define (rmt:get-connection-info runremote areapath #!key (area-dat #f)) ;; TODO: push areapath down.
  (let* (;; (runremote (or area-dat runremote))
	 (cinfo     (if (remote? runremote)
			(remote-conndat runremote)
			#f)))
	  (if cinfo
	      cinfo
	      (if (exec-fn 'server:check-if-running areapath)
		  (exec-fn 'client:setup runremote areapath)
		  #f))))



;;======================================================================
;; ulex and steps stuff
;;======================================================================

(define (rmtmod:setup-ulex toppath)
  (ulex:make-area
   dbdir:   (conc toppath "/ulexdb")
   pktsdir: (conc toppath "/pkts") 
   ))



(define (rmtmod:send-receive-ulex ulex:conn cmd rid params attemptnum area-dat)
  #f)

(use trace)(trace-call-sites #t)
;; (trace member rmtmod:calc-ro-mode rmt:open-qry-close-locally)

)