58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
+
+
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
+
+
-
+
-
-
+
-
+
-
+
|
(debug:print-info 1 *default-log-port* "db write rate too high, starting a server, count=" count " start=" start " run-id=" run-id " queries-per-second=" queries-per-second)
#t)
#f))))
;; if a server is either running or in the process of starting call client:setup
;; else return #f to let the calling proc know that there is no server available
;;
(define *rrr-mutex* (make-mutex))
(define (rmt:get-connection-info run-id)
(let ((cinfo (hash-table-ref/default *runremote* run-id #f)))
(define (rmt:get-cinfo rid)
(mutex-lock! *rrr-mutex*)
(let* ((run-id (if rid rid 0))
(cinfo (hash-table-ref/default *runremote* run-id #f)))
(mutex-unlock! *rrr-mutex*)
cinfo))
(define (rmt:set-cinfo rid server-dat)
(mutex-lock! *rrr-mutex*)
(let* ((run-id (if rid rid 0))
(res (hash-table-set! *runremote* run-id server-dat)))
(mutex-unlock! *rrr-mutex*)
res))
(define (rmt:del-cinfo rid)
(mutex-lock! *rrr-mutex*)
(let* ((run-id (if rid rid 0))
(res (hash-table-delete! *runremote* run-id)))
(mutex-unlock! *rrr-mutex*)
res))
(define (rmt:get-connection-info-start-server-if-none run-id)
(let ((cinfo (rmt:get-cinfo run-id)))
(if cinfo
cinfo
cinfo
;; NB// can cache the answer for server running for 10 seconds ...
;; ;; (and (not (rmt:write-frequency-over-limit? cmd run-id))
(if (tasks:server-running-or-starting? (db:delay-if-busy (tasks:open-db)) run-id)
(client:setup run-id)
#f))))
(define (rmt:run-id->transport-type rid)
(define (rmt:run-id->transport-type run-id)
(let* ((run-id (if rid rid 0))
(connection-info (hash-table-ref/default *runremote* run-id #f)))
(let* ((connection-info (rmt:get-cinfo run-id)))
;; the nmsg method does the encoding under the hood (the http method should be changed to do this also)
(if connection-info ;; if we already have a connection for this run-id, use that precendent
;; use the server if have connection info
(let* ((transport-type (vector-ref connection-info 6))) ;; BB: assumes all transport-type'-servertdat vector's item 6 ids transport type
transport-type)
;; otherwise pick the global default as preference. (set in common.scm)
*transport-type*)))
(define *send-receive-mutex* (make-mutex)) ;; should have separate mutex per run-id
;; RA => e.g. usage (rmt:send-receive 'get-var #f (list varname))
;;
(define (rmt:send-receive cmd rid params #!key (attemptnum 1)) ;; start attemptnum at 1 so the modulo below works as expected
;; side-effect: clean out old connections
(let ((expire-time (- (current-seconds) (server:get-timeout) 10))) ;; don't forget the 10 second margin
(for-each
(lambda (run-id)
(let ((connection (hash-table-ref/default *runremote* run-id #f)))
(let ((connection (rmt:get-cinfo run-id)))
(if (and (vector? connection)
(< (http-transport:server-dat-get-last-access connection) expire-time)) ;; BB> BBTODO: make this generic, not http transport specific.
(begin
(debug:print-info 0 *default-log-port* "Discarding connection to server for run-id " run-id ", too long between accesses")
(hash-table-delete! *runremote* run-id)))))
(hash-table-keys *runremote*)))
(let* ((run-id (if rid rid 0))
(connection-info (rmt:get-connection-info run-id)))
(connection-info (rmt:get-connection-info-start-server-if-none run-id)))
;; the nmsg method does the encoding under the hood (the http method should be changed to do this also)
(if connection-info
;; use the server if have connection info
(let* ((transport-type (rmt:run-id->transport-type run-id))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Here, we make request to remote server
|