;;======================================================================
;; Copyright 2019, Matthew Welland.
;;
;; This file is part of Megatest.
;;
;; Megatest is free software: you can redistribute it and/or modify
;; it under the terms of the GNU General Public License as published by
;; the Free Software Foundation, either version 3 of the License, or
;; (at your option) any later version.
;;
;; Megatest is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
;; GNU General Public License for more details.
;;
;; You should have received a copy of the GNU General Public License
;; along with Megatest. If not, see <http://www.gnu.org/licenses/>.
;;======================================================================
(declare (unit rmtmod))
(declare (uses commonmod))
(module rmtmod
*
(import scheme chicken data-structures extras)
(import (prefix sqlite3 sqlite3:) posix typed-records srfi-18 srfi-69 format ports srfi-1 matchable)
(import commonmod)
(use (prefix ulex ulex:))
(include "common_records.scm")
;; Hack to make these functions visible to the refactored code, goal is to eliminate these over time.
;; (define (rmt:send-receive . params) #f)
(define (http-transport:close-connections . params) #f)
;; from remote defstruct in common.scm
(define (api:execute-requests . params) #f)
(define (api:read-only-queries . params) #f)
(define (http-transport:client-api-send-receive . params) #f)
(define (client:setup . params) #f)
(define (server:kind-run . params) #f)
(define (server:start-and-wait . params) #f)
(define (server:check-if-running . params) #f)
(define (server:ping . params) #f)
(define (common:force-server? . params) #f)
;; 'send-receive rmt:send-receive ...
(define (set-functions . alldata)
(match
alldata
((a b c d e f g h i j) ;; e f g h i j k l)
(set! http-transport:client-api-send-receive a)
(set! http-transport:close-connections b)
(set! apt:execute-requests d)
(set! client:setup e)
(set! server:kind-run f)
(set! server:start-and-wait g)
(set! server:check-if-running h)
(set! server:ping i)
(set! common:force-server? j)
)))
(define (rmt:open-qry-close-locally log-port multi-sync-mutex cmd run-id params #!key (ro-queries '())(remretries 5))
(let* ((qry-is-write (not (member cmd ro-queries)))
(db-file-path (exec-fn 'db:dbfile-path)) ;; 0))
(dbstruct-local (exec-fn 'db:setup #t)) ;; make-dbr:dbstruct path: dbdir local: #t)))
(read-only (not (file-write-access? db-file-path)))
(start (current-milliseconds))
(resdat (if (not (and read-only qry-is-write))
(let ((v (api:execute-requests dbstruct-local (vector (symbol->string cmd) params))))
(handle-exceptions ;; there has been a long history of receiving strange errors from values returned by the client when things go wrong..
exn ;; This is an attempt to detect that situation and recover gracefully
(begin
(debug:print 0 log-port "ERROR: bad data from server " v " message: " ((condition-property-accessor 'exn 'message) exn))
(vector #t '())) ;; should always get a vector but if something goes wrong return a dummy
(if (and (vector? v)
(> (vector-length v) 1))
(let ((newvec (vector (vector-ref v 0)(vector-ref v 1))))
newvec) ;; by copying the vector while inside the error handler we should force the detection of a corrupted record
(vector #t '())))) ;; we could also check that the returned types are valid
(vector #t '())))
(success (vector-ref resdat 0))
(res (vector-ref resdat 1))
(duration (- (current-milliseconds) start)))
(if (and read-only qry-is-write)
(debug:print 0 log-port "ERROR: attempt to write to read-only database ignored. cmd=" cmd))
(if (not success)
(if (> remretries 0)
(begin
(debug:print-error 0 log-port "local query failed. Trying again.")
(thread-sleep! (/ (random 5000) 1000)) ;; some random delay
(rmt:open-qry-close-locally log-port multi-sync-mutex cmd run-id params ro-queries: ro-queries remretries: (- remretries 1)))
(begin
(debug:print-error 0 log-port "too many retries in rmt:open-qry-close-locally, giving up")
#f))
(begin
;; (rmt:update-db-stats run-id cmd params duration)
;; mark this run as dirty if this was a write, the watchdog is responsible for syncing it
#;(if qry-is-write
(let ((start-time (current-seconds)))
(mutex-lock! multi-sync-mutex)
(set! *db-last-access* start-time) ;; THIS IS PROBABLY USELESS? (we are on a client)
(mutex-unlock! multi-sync-mutex)))))
res))
(define (rmtmod:calc-ro-mode runremote toppath)
(if (and runremote
(remote-ro-mode-checked runremote))
(remote-ro-mode runremote)
(let* ((dbfile (conc toppath "/megatest.db"))
(ro-mode (not (file-write-access? dbfile)))) ;; TODO: use dbstruct or runremote to figure this out in future
(if runremote
(begin
(remote-ro-mode-set! runremote ro-mode)
(remote-ro-mode-checked-set! runremote #t)
ro-mode)
ro-mode))))
(define (extras-readonly-mode rmt-mutex log-port cmd params)
;;(mutex-unlock! rmt-mutex)
(debug:print-info 12 log-port "rmt:send-receive, case 3")
(debug:print 0 log-port "WARNING: write transaction requested on a readonly area. cmd="cmd" params="params)
#f)
(define (extras-transport-failed log-port rmt-mutex attemptnum runremote areapath cmd rid params)
(debug:print 0 log-port "WARNING: communication failed. Trying again, try num: " attemptnum)
;;(mutex-lock! rmt-mutex)
(remote-conndat-set! runremote #f)
(http-transport:close-connections area-dat: runremote)
(remote-server-url-set! runremote #f)
;;(mutex-unlock! rmt-mutex)
(debug:print-info 12 log-port "rmt:send-receive, case 9.1")
(rmt:send-receive-orig log-port runremote rmt-mutex areapath cmd rid params attemptnum: (+ attemptnum 1)))
(define (extras-transport-succeded log-port rmt-mutex attemptnum runremote areapath res params rid cmd)
(if (and (vector? res)
(eq? (vector-length res) 2)
(eq? (vector-ref res 1) 'overloaded)) ;; since we are
;; looking at the
;; data to carry the
;; error we'll use a
;; fairly obtuse
;; combo to minimise
;; the chances of
;; some sort of
;; collision. this
;; is the case where
;; the returned data
;; is bad or the
;; server is
;; overloaded and we
;; want to ease off
;; the queries
(let ((wait-delay (+ attemptnum (* attemptnum 10))))
(debug:print 0 log-port "WARNING: server is overloaded. Delaying " wait-delay " seconds and trying call again.")
;;(mutex-lock! rmt-mutex)
(http-transport:close-connections area-dat: runremote)
;; (set! *runremote* #f) ;; force starting over
(remote-server-url-set! runremote #f) ;; I am hoping this will force a redo on server connection. NOT TESTED
;;(mutex-unlock! rmt-mutex)
(thread-sleep! wait-delay)
(rmt:send-receive-orig log-port runremote rmt-mutex areapath cmd rid params attemptnum: (+ attemptnum 1)))
res)) ;; All good, return res
;; RA => e.g. usage (rmt:send-receive 'get-var #f (list varname))
;;
;; add multi-sync-mutex
;;
(define (rmt:send-receive-orig log-port runremote rmt-mutex toppath multi-sync-mutex cmd rid params #!key (attemptnum 1)(area-dat #f)) ;; start attemptnum at 1 so the modulo below works as expected
#;(common:telemetry-log (conc "rmt:"(->string cmd))
payload: `((rid . ,rid)
(params . ,params)))
;; do all the prep locked under the rmt-mutex
;;(mutex-lock! rmt-mutex)
;; 1. check if server is started IFF cmd is a write OR if we are not on the homehost, store in runremote
;; 2. check the age of the connections. refresh the connection if it is older than timeout-20 seconds.
;; 3. do the query, if on homehost use local access
;;
(let* ((start-time (current-seconds)) ;; snapshot time so all use cases get same value
(readonly-mode (rmtmod:calc-ro-mode runremote toppath)))
;; (assert (not (pair? (remote-hh-dat runremote))))
;;(print "BB> readonly-mode is "readonly-mode" dbfile is "dbfile)
(cond
;; give up if more than 15 attempts
((> attemptnum 15)
(debug:print 0 log-port "ERROR: 15 tries to start/connect to server. Giving up.")
(exit 1))
;; readonly mode, read request- handle it - case 2
((and readonly-mode
(member cmd api:read-only-queries))
;; (mutex-unlock! rmt-mutex)
(debug:print-info 12 log-port "rmt:send-receive, case 2")
(rmt:open-qry-close-locally log-port multi-sync-mutex cmd 0 params ro-queries: api:read-only-queries)
)
;; readonly mode, write request. Do nothing, return #f
(readonly-mode (extras-readonly-mode rmt-mutex log-port cmd params))
;; This block was for pre-emptively resetting the connection if there had been no communication for some time.
;; I don't think it adds any value. If the server is not there, just fail and start a new connection.
;; also, the expire-time calculation might not be correct. We want, time-since-last-server-access > (server:get-timeout)
;;
;; reset the connection if it has been unused too long
((and runremote
(remote-conndat runremote)
(> (current-seconds) ;; if it has been more than server-timeout seconds since last contact, close this connection and start a new on
(+ (http-transport:server-dat-get-last-access (remote-conndat runremote))
(remote-server-timeout runremote))))
(debug:print-info 0 log-port "Connection to " (remote-server-url runremote) " expired due to no accesses, forcing new connection.")
(http-transport:close-connections area-dat: runremote)
(remote-conndat-set! runremote #f) ;; invalidate the connection, thus forcing a new connection.
;; (mutex-unlock! rmt-mutex)
(rmt:send-receive-orig log-port runremote rmt-mutex toppath multi-sync-mutex cmd rid params attemptnum: attemptnum))
;; on homehost and this is a read
((and (not (remote-force-server runremote)) ;; honor forced use of server, i.e. server NOT required
(pair? (remote-hh-dat runremote))
(cdr (remote-hh-dat runremote)) ;; on homehost
(member cmd api:read-only-queries)) ;; this is a read
;; (mutex-unlock! rmt-mutex)
(debug:print-info 12 log-port "rmt:send-receive, case 5")
(rmt:open-qry-close-locally log-port multi-sync-mutex cmd 0 params ro-queries: api:read-only-queries))
;; on homehost and this is a write, we already have a server, but server has died
((and (cdr (remote-hh-dat runremote)) ;; on homehost
(not (member cmd api:read-only-queries)) ;; this is a write
(remote-server-url runremote) ;; have a server
(not (server:ping (remote-server-url runremote)))) ;; server has died. NOTE: this is not a cheap call! Need better approach.
;; (set! *runremote* (make-remote)) ;; WARNING - broken this.
(remote-force-server-set! runremote (common:force-server?))
;; (mutex-unlock! rmt-mutex)
(debug:print-info 12 log-port "rmt:send-receive, case 6")
(rmt:send-receive-orig log-port runremote rmt-mutex toppath multi-sync-mutex cmd rid params attemptnum: attemptnum))
;; on homehost and this is a write, we already have a server
((and (not (remote-force-server runremote)) ;; honor forced use of server, i.e. server NOT required
(cdr (remote-hh-dat runremote)) ;; on homehost
(not (member cmd api:read-only-queries)) ;; this is a write
(remote-server-url runremote)) ;; have a server
;;(mutex-unlock! rmt-mutex)
(debug:print-info 12 log-port "rmt:send-receive, case 4.1")
(rmt:open-qry-close-locally log-port multi-sync-mutex cmd 0 params ro-queries: api:read-only-queries))
;; on homehost, no server contact made and this is a write, passively start a server
((and (not (remote-force-server runremote)) ;; honor forced use of server, i.e. server NOT required
(cdr (remote-hh-dat runremote)) ;; have homehost
(not (remote-server-url runremote)) ;; no connection yet
(not (member cmd api:read-only-queries))) ;; not a read-only query
(debug:print-info 12 log-port "rmt:send-receive, case 8")
(let ((server-url (server:check-if-running toppath))) ;; (server:read-dotserver->url toppath))) ;; (server:check-if-running toppath))) ;; Do NOT want to run server:check-if-running - very expensive to do for every write call
(if server-url
(remote-server-url-set! runremote server-url) ;; the string can be consumed by the client setup if needed
(if (common:force-server?)
(server:start-and-wait toppath)
(server:kind-run toppath))))
(remote-force-server-set! runremote (common:force-server?))
;; (mutex-unlock! rmt-mutex)
(debug:print-info 12 log-port "rmt:send-receive, case 8.1")
(rmt:open-qry-close-locally log-port multi-sync-mutex cmd 0 params ro-queries: api:read-only-queries))
((or (and (remote-force-server runremote) ;; we are forcing a server and don't yet have a connection to one
(not (remote-conndat runremote)))
(and (not (cdr (remote-hh-dat runremote))) ;; not on a homehost
(not (remote-conndat runremote)))) ;; and no connection
(debug:print-info 12 log-port "rmt:send-receive, case 9, hh-dat: " (remote-hh-dat runremote) " conndat: " (remote-conndat runremote))
;;(mutex-unlock! rmt-mutex)
(if (not (server:check-if-running toppath)) ;; who knows, maybe one has started up?
(server:start-and-wait toppath))
(remote-conndat-set! runremote (rmt:get-connection-info runremote toppath)) ;; calls client:setup which calls client:setup-http
(rmt:send-receive-orig log-port runremote rmt-mutex toppath multi-sync-mutex cmd rid params attemptnum: attemptnum)) ;; TODO: add back-off timeout as
;; all set up if get this far, dispatch the query
((and (not (remote-force-server runremote))
(cdr (remote-hh-dat runremote))) ;; we are on homehost
;;(mutex-unlock! rmt-mutex)
(debug:print-info 12 log-port "rmt:send-receive, case 10")
(rmt:open-qry-close-locally log-port multi-sync-mutex cmd (if rid rid 0) params ro-queries: api:read-only-queries))
;; not on homehost, do server query
(else (extras-case-11 log-port rmt-mutex runremote toppath cmd params attemptnum rid)))))
(define (extras-case-11 log-port rmt-mutex runremote areapath cmd params attemptnum rid)
;; (mutex-unlock! rmt-mutex)
(debug:print-info 12 log-port "rmt:send-receive, case 9")
;; (mutex-lock! rmt-mutex)
(let* ((conninfo (remote-conndat runremote))
(dat (case (remote-transport runremote)
((http) (condition-case ;; handling here has
;; caused a lot of
;; problems. However it
;; is needed to deal with
;; attemtped
;; communication to
;; servers that have gone
;; away
(http-transport:client-api-send-receive 0 conninfo cmd params)
((commfail)(vector #f "communications fail"))
((exn)(vector #f "other fail" (print-call-chain)))))
(else
(debug:print 0 log-port "ERROR: transport " (remote-transport runremote) " not supported")
(exit))))
(success (if (vector? dat) (vector-ref dat 0) #f))
(res (if (vector? dat) (vector-ref dat 1) #f)))
(if (and (vector? conninfo) (< 5 (vector-length conninfo)))
(http-transport:server-dat-update-last-access conninfo) ;; refresh access time
(begin
(debug:print 0 log-port "INFO: Should not get here! conninfo=" conninfo)
(set! conninfo #f)
(remote-conndat-set! runremote #f) ;; NOTE: *runremote* is global copy of runremote. Purpose: factor out global.
(http-transport:close-connections area-dat: runremote)))
(debug:print-info 13 log-port "rmt:send-receive, case 9. conninfo=" conninfo " dat=" dat " runremote = " runremote)
;; (mutex-unlock! rmt-mutex)
(if success ;; success only tells us that the transport was
;; successful, have to examine the data to see if
;; there was a detected issue at the other end
(extras-transport-succeded log-port rmt-mutex attemptnum runremote areapath res params rid cmd)
(extras-transport-failed log-port rmt-mutex attemptnum runremote areapath cmd rid params)
)))
;; if a server is either running or in the process of starting call client:setup
;; else return #f to let the calling proc know that there is no server available
;;
(define (rmt:get-connection-info runremote areapath #!key (area-dat #f)) ;; TODO: push areapath down.
(let* (;; (runremote (or area-dat runremote))
(cinfo (if (remote? runremote)
(remote-conndat runremote)
#f)))
(if cinfo
cinfo
(if (server:check-if-running areapath)
(client:setup runremote areapath)
#f))))
;;======================================================================
;; ulex and steps stuff
;;======================================================================
(define (rmtmod:setup-ulex toppath)
(ulex:make-area
dbdir: (conc toppath "/ulexdb")
pktsdir: (conc toppath "/pkts")
))
(define (rmtmod:send-receive-ulex ulex:conn cmd rid params attemptnum area-dat)
#f)
(use trace)(trace-call-sites #t)
;; (trace member rmtmod:calc-ro-mode rmt:open-qry-close-locally)
)