Overview
Context
Changes
Modified api.scm
from [cd180ba59c]
to [923a47fb5f].
︙ | | |
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
+
+
-
+
|
testmeta-get-record))
;; These are called by the server on recipt of /api calls
;; - keep it simple, only return the actual result of the call, i.e. no meta info here
;;
(define (api:execute-requests dbstruct cmd params)
(let ((res
(case (if (symbol? cmd)
cmd
(case (string->symbol cmd)
(string->symbol cmd))
;; SERVERS
((start-server) (apply server:kind-run params))
((kill-server) (set! *server-run* #f))
;; KEYS
((get-key-val-pairs) (apply db:get-key-val-pairs dbstruct params))
((get-keys) (db:get-keys dbstruct))
|
︙ | | |
Modified client.scm
from [6d1c8717b3]
to [aa964f7d14].
︙ | | |
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
|
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
-
+
+
-
-
+
+
+
+
+
+
+
-
-
-
+
-
+
+
+
-
-
+
+
+
+
+
+
-
+
|
(debug:print-info 2 "client:setup remaining-tries=" remaining-tries)
(let* ((tdbdat (tasks:open-db)))
(if (<= remaining-tries 0)
(begin
(debug:print 0 "ERROR: failed to start or connect to server for run-id " run-id)
(exit 1))
(let ((host-info (hash-table-ref/default *runremote* run-id #f)))
(if host-info
(if host-info ;; this is a bit circular. the host-info *is* the start-res FIXME
(let* ((iface (http-transport:server-dat-get-iface host-info))
(port (http-transport:server-dat-get-port host-info))
(start-res (case *transport-type*
(start-res (http-transport:client-connect iface port))
(ping-res (rmt:login-no-auto-client-setup start-res run-id)))
((http)(http-transport:client-connect iface port))
((nmsg) host-info) ;; (http-transport:server-dat-get-socket host-info))
(else #f)))
(ping-res (case *transport-type*
((http)(rmt:login-no-auto-client-setup start-res run-id))
((nmsg)(nmsg-transport:ping iface port timeout: 2 socket: ))
(else #f))))
(if ping-res ;; sucessful login?
(begin
(debug:print-info 2 "client:setup, ping is good using host-info=" host-info ", remaining-tries=" remaining-tries)
;; Why add the close-connections here?
;; (http-transport:close-connections run-id)
(hash-table-set! *runremote* run-id start-res)
start-res) ;; return the server info
;; have host info but no ping. shutdown the current connection and try again
(begin ;; login failed
(debug:print-info 1 "client:setup, ping is bad for start-res=" start-res " and *runremote*=" host-info)
(case *transport-type*
(http-transport:close-connections run-id)
((http)(http-transport:close-connections run-id)))
(hash-table-delete! *runremote* run-id)
(if (< remaining-tries 8)
(thread-sleep! 5)
(thread-sleep! 1))
(client:setup run-id remaining-tries: (- remaining-tries 1)))))
;; YUK: rename server-dat here
(let* ((server-dat (tasks:get-server (db:delay-if-busy tdbdat) run-id)))
(debug:print-info 4 "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries)
(if server-dat
(let* ((iface (tasks:hostinfo-get-interface server-dat))
(hostname (tasks:hostinfo-get-hostname server-dat))
(port (tasks:hostinfo-get-port server-dat))
(start-res (case *transport-type*
(start-res (http-transport:client-connect iface port))
(ping-res (rmt:login-no-auto-client-setup start-res run-id)))
((http)(http-transport:client-connect iface port))
((nmsg)(nmsg-transport:client-connect hostname port))))
(ping-res (case *transport-type*
((http)(rmt:login-no-auto-client-setup start-res run-id))
((nmsg)(http-transport:server-dat-get-socket start-res))))) ;; socket is the result of a ping
(if (and start-res
ping-res)
(begin
(hash-table-set! *runremote* run-id start-res)
(debug:print-info 2 "connected to " (http-transport:server-dat-make-url start-res))
start-res)
(begin ;; login failed but have a server record, clean out the record and try again
(debug:print-info 0 "client:setup, login failed, will attempt to start server ... start-res=" start-res ", run-id=" run-id ", server-dat=" server-dat)
(case *transport-type*
(http-transport:close-connections run-id)
((http)(http-transport:close-connections run-id)))
(hash-table-delete! *runremote* run-id)
(tasks:server-force-clean-run-record (db:delay-if-busy tdbdat)
run-id
(tasks:hostinfo-get-interface server-dat)
(tasks:hostinfo-get-port server-dat)
" client:setup (server-dat = #t)")
(thread-sleep! 2)
|
︙ | | |
Modified common.scm
from [96d40f50d8]
to [0598bb95e7].
︙ | | |
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
-
+
|
(define *inmemdb* #f)
(define *task-db* #f) ;; (vector db path-to-db)
(define *db-access-allowed* #t) ;; flag to allow access
(define *db-access-mutex* (make-mutex))
;; SERVER
(define *my-client-signature* #f)
(define *transport-type* 'nm)
(define *transport-type* 'nmsg)
(define *runremote* (make-hash-table)) ;; if set up for server communication this will hold <host port>
(define *max-cache-size* 0)
(define *logged-in-clients* (make-hash-table))
(define *client-non-blocking-mode* #f)
(define *server-id* #f)
(define *server-info* #f)
(define *time-to-exit* #f)
|
︙ | | |
Modified http-transport.scm
from [d5c0bd2a5f]
to [37e9804757].
︙ | | |
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
|
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
|
-
+
+
|
(if (vector? server-dat)
(let ((api-dat (http-transport:server-dat-get-api-uri server-dat)))
(close-connection! api-dat)
#t)
#f)))
(define (make-http-transport:server-dat)(make-vector 5))
(define (make-http-transport:server-dat)(make-vector 6))
(define (http-transport:server-dat-get-iface vec) (vector-ref vec 0))
(define (http-transport:server-dat-get-port vec) (vector-ref vec 1))
(define (http-transport:server-dat-get-api-uri vec) (vector-ref vec 2))
(define (http-transport:server-dat-get-api-url vec) (vector-ref vec 3))
(define (http-transport:server-dat-get-api-req vec) (vector-ref vec 4))
(define (http-transport:server-dat-get-last-access vec) (vector-ref vec 5))
(define (http-transport:server-dat-get-socket vec) (vector-ref vec 6))
(define (http-transport:server-dat-make-url vec)
(if (and (http-transport:server-dat-get-iface vec)
(http-transport:server-dat-get-port vec))
(conc "http://"
(http-transport:server-dat-get-iface vec)
":"
|
︙ | | |
Modified nmsg-transport.scm
from [5a0952db48]
to [987e090f8d].
︙ | | |
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
-
+
+
|
(let* ((start-port (portlogger:open-run-close portlogger:find-port))
(server-thread (make-thread (lambda ()
(nmsg-transport:try-start-server dbstruct run-id start-port server-id))
"server thread"))
(tdbdat (tasks:open-db)))
(thread-start! server-thread)
(if (nmsg-transport:ping hostn start-port timeout: 2 expected-key: (current-process-id))
(begin
(let ((interface (if (equal? hostn "-")(get-host-name) hostn)))
(tasks:server-set-interface-port (db:delay-if-busy tdbdat) server-id interface start-port)
(tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "dbprep")
(set! *server-info* (list hostn start-port)) ;; probably not needed anymore? currently used by keep-running
(thread-sleep! 3) ;; give some margin for queries to complete before switching from file based access to server based access
(set! *inmemdb* dbstruct)
(tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running")
(thread-start! (make-thread
(lambda ()(nmsg-transport:keep-running server-id))
|
︙ | | |
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
|
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
|
-
-
+
+
+
|
(begin
(debug:print-info 0 "Another server took the slot, exiting")
(exit 0))))
(begin
;; since we didn't get the server lock we are going to clean up and bail out
(debug:print-info 2 "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue")
(tasks:server-delete-records-for-this-pid (db:delay-if-busy tdbdat) " http-transport:launch")
)))
(nmsg-transport:run dbstruct hostn run-id server-id)
))
;; locked in a server id, try to start up
(nmsg-transport:run dbstruct hostn run-id server-id))
(set! *didsomething* #t)
(exit))))
;;======================================================================
;; S E R V E R U T I L I T I E S
;;======================================================================
|
︙ | | |
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
|
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
|
-
+
-
+
|
;;======================================================================
;; ping the server at host:port
;; return the open socket if successful (return-socket == #t)
;; expect the key expected-key returned in payload
;; send our-key or #f as payload
;;
(define (nmsg-transport:ping hostn port #!key (timeout 3)(return-socket #t)(expected-key #f)(our-key #f))
(define (nmsg-transport:ping hostn port #!key (timeout 3)(return-socket #t)(expected-key #f)(our-key #f)(socket #f))
;; send a random number along with pid and check that we get it back
(let* ((req (nn-socket 'req))
(let* ((req (or socket (nn-socket 'req)))
(host (if (or (not hostn)
(equal? hostn "-")) ;; use localhost
(get-host-name)
hostn))
(success #f)
(keepwaiting #t)
(dat (db:obj->string (vector "ping" our-key) transport: 'nmsg))
|
︙ | | |
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
|
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
|
-
+
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
|
(if (and keepwaiting (< count timeout)) ;; yes, this is very aproximate
(loop (+ count 1))))
(if keepwaiting
(begin
(print "timeout waiting for ping")
(thread-terminate! ping))))
"timeout")))
(nn-connect req (conc "tcp://" host ":" port))
(if (not socket)(nn-connect req (conc "tcp://" host ":" port)))
(handle-exceptions
exn
(begin
;; (print-call-chain)
;; (print 0 " message: " ((condition-property-accessor 'exn 'message) exn))
;; (print "exn=" (condition->list exn))
(debug:print-info 1 "ping failed to connect to " host ":" port))
(thread-start! timeout)
(thread-start! ping)
(thread-join! ping)
(if success (thread-terminate! timeout)))
(if return-socket
(if success req #f)
(begin
(nn-close req)
(nn-close req) ;; should it be closed if we were handed a socket?
success))))
(define (nmsg-transport:client-connect iface portnum)
(let* ((reqsoc (nmsg-transport:ping iface portnum))
(login-res #f))
(nn-connect reqsoc (conc "tcp://" iface ":" portnum))
(debug:print-info 11 "nmsg-transport:client-connect started. Next is login")
(set! login-res (client:login serverdat nmsg-sockets))
(if (and (not (null? login-res))
(car login-res))
(begin
(debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".")
(set! *nm-port* nmsg-sockets)
nmsg-sockets)
(begin
(debug:print-info 2 "Failed to login or connect to " conurl)
(set! *runremote* #f)
#f))))
;; run nmsg-transport:keep-running in a parallel thread to monitor that the db is being
;; used and to shutdown after sometime if it is not.
;;
(define (nmsg-transport:keep-running server-id)
;; if none running or if > 20 seconds since
;; server last used then start shutdown
;; This thread waits for the server to come alive
|
︙ | | |
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
|
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
|
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
|
(debug:print-info 0 "Starting to shutdown the server.")
(set! *time-to-exit* #t)
(tasks:server-delete-record (db:delay-if-busy tdbdat) server-id " http-transport:keep-running")
(debug:print-info 0 "Server shutdown complete. Exiting")
;; (exit)
))))))
;;======================================================================
;; C L I E N T S
;;======================================================================
(define (nmsg-transport:client-connect iface portnum)
(let* ((reqsoc (nmsg-transport:ping iface portnum return-socket: #t)))
(vector iface portnum #f #f #f (current-seconds) reqsoc)))
(define (nmsg-transport:client-api-send-receive run-id connection-info cmd param)
(let ((packet (vector cmd param))
(reqsoc (http-transport:server-dat-get-socket connection-info)))
(nn-send reqsoc (db:obj->string packet transport: 'nmsg))
(db:string->obj (nn-recv reqsoc) transport: 'nmsg)))
;;======================================================================
;; J U N K
;;======================================================================
;; DO NOT USE
;;
(define (nmsg-transport:client-signal-handler signum)
(handle-exceptions
exn
(debug:print " ... exiting ...")
(let ((th1 (make-thread (lambda ()
(if (not *received-response*)
(receive-message* *runremote*))) ;; flush out last call if applicable
|
︙ | | |
Modified rmt.scm
from [db091f3ce4]
to [1fd758a7a4].
︙ | | |
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
+
-
+
+
-
+
+
|
;; SHOULD CLOSE THE CONNECTION HERE
(hash-table-delete! *runremote* run-id)))))
(hash-table-keys *runremote*)))
(mutex-unlock! *db-multi-sync-mutex*)
(let* ((run-id (if rid rid 0))
(connection-info (rmt:get-connection-info run-id))
(jparams (db:obj->string params)))
;; the nmsg method does the encoding under the hood (the http method should be changed to do this also)
(if connection-info
;; use the server if have connection info
(let* ((dat (case *transport-type*
((http)(http-transport:client-api-send-receive run-id connection-info cmd jparams))
((nmsg)(nmsg-transport:client-api-send-receive run-id connection-info cmd jparams))
((nmsg)(nmsg-transport:client-api-send-receive run-id connection-info cmd params))
(else (exit))))
(res (if (vector? dat) (vector-ref dat 1) #f))
(success (if (vector? dat) (vector-ref dat 0) #f)))
(http-transport:server-dat-update-last-access connection-info)
(if success
(case *transport-type*
(db:string->obj res)
((http)(db:string->obj res))
((nmsg) res))
(begin ;; let ((new-connection-info (client:setup run-id)))
(debug:print 0 "WARNING: Communication failed, trying call to http-transport:client-api-send-receive again.")
(hash-table-delete! *runremote* run-id) ;; don't keep using the same connection
;; no longer killing the server in http-transport:client-api-send-receive
;; may kill it here but what are the criteria?
;; start with three calls then kill server
|
︙ | | |
Modified server.scm
from [a2c69bc84d]
to [57814a5cf8].
︙ | | |
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
-
+
|
;; all routes though here end in exit ...
;;
;; start_server
;;
(define (server:launch run-id)
(case *transport-type*
((http)(http-transport:launch run-id))
((nm) (nmsg-transport:launch run-id))
((nmsg)(nmsg-transport:launch run-id))
(else (debug:print 0 "ERROR: unknown server type " *transport-type*))))
;;======================================================================
;; Q U E U E M A N A G E M E N T
;;======================================================================
;; We don't want to flush the queue if it was just flushed
|
︙ | | |
Modified tasks.scm
from [7c5174d4f3]
to [84082aa618].
︙ | | |
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
|
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
|
-
+
|
(get-host-name) ;; hostname
-1 ;; port
-1 ;; pubport
(random 1000) ;; priority (used a tiebreaker on get-available)
"available" ;; state
(common:version-signature) ;; mt_version
-1 ;; interface
"http" ;; transport
(conc *transport-type*) ;; transport
run-id
))
(define (tasks:num-in-available-state mdb run-id)
(let ((res 0))
(sqlite3:for-each-row
(lambda (num-in-queue)
|
︙ | | |