︙ | | |
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
-
+
+
+
+
+
-
+
|
ok))
(define (client:connect iface port)
(case (server:get-transport)
((rpc) (rpc:client-connect iface port))
((http) (http:client-connect iface port))
((zmq) (zmq:client-connect iface port))
(else (rpc:client-connect iface port))))
(else
(debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (5)")
(exit))))
(define (client:setup run-id #!key (remaining-tries 10) (failed-connects 0))
(case (server:get-transport)
((rpc) (rpc-transport:client-setup run-id remaining-tries: remaining-tries failed-connects: failed-connects)) ;;(client:setup-rpc run-id))
((http)(client:setup-http run-id remaining-tries: remaining-tries failed-connects: failed-connects))
(else
(debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (6)")
(else (rpc-transport:client-setup run-id remaining-tries: remaining-tries failed-connects: failed-connects)))) ;; (client:setup-rpc run-id))))
(exit)))) ;; (client:setup-rpc run-id))))
;; (define (client:login-no-auto-setup server-info run-id)
;; (case (server:get-transport)
;; ((rpc) (rpc:login-no-auto-client-setup server-info run-id))
;; ((http) (rmt:login-no-auto-client-setup server-info run-id))
;; (else (rpc:login-no-auto-client-setup server-info run-id))))
;;
|
︙ | | |
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
|
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
|
-
-
+
-
-
-
-
+
-
-
-
-
-
-
|
(exit 1))
(let* ((server-dat (tasks:get-server (db:delay-if-busy tdbdat) run-id)))
(debug:print-info 4 *default-log-port* "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries)
(if server-dat
(let* ((iface (tasks:hostinfo-get-interface server-dat))
(hostname (tasks:hostinfo-get-hostname server-dat))
(port (tasks:hostinfo-get-port server-dat))
(start-res (case *transport-type*
((http)(http-transport:client-connect iface port))
(start-res (http-transport:client-connect iface port))
;;((nmsg)(nmsg-transport:client-connect hostname port))
))
(ping-res (case *transport-type*
((http)(rmt:login-no-auto-client-setup start-res))
(ping-res (rmt:login-no-auto-client-setup start-res)))
;; ((nmsg)(let ((logininfo (rmt:login-no-auto-client-setup start-res run-id)))
;; (if logininfo
;; (car (vector-ref logininfo 1))
;; #f)))
)))
(if (and start-res
ping-res)
(begin
(remote-conndat-set! *runremote* start-res) ;; (hash-table-set! *runremote* run-id start-res)
(debug:print-info 2 *default-log-port* "connected to " (http-transport:server-dat-make-url start-res))
start-res)
(begin ;; login failed but have a server record, clean out the record and try again
|
︙ | | |
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
|
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
|
-
+
|
(let ((num-available (tasks:num-in-available-state (db:dbdat-get-db tdbdat) run-id)))
(debug:print-info 0 *default-log-port* "client:setup, no server registered, remaining-tries=" remaining-tries " num-available=" num-available)
(if (< num-available 2)
(server:try-running run-id))
(thread-sleep! (+ 5 (random (- 20 remaining-tries)))) ;; give server a little time to start up, randomize a little to avoid start storms.
(client:setup run-id remaining-tries: (- remaining-tries 1)))))))))
;; keep this as a function to ease future
;; keep this as a function to ease future ;; this is unused, not porting for rpc -BB
(define (client:start run-id server-info)
(http-transport:client-connect (tasks:hostinfo-get-interface server-info)
(tasks:hostinfo-get-port server-info)))
;; ;; client:signal-handler
;; (define (client:signal-handler signum)
;; (signal-mask! signum)
|
︙ | | |
︙ | | |
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
+
|
;;
(use format typed-records) ;; RADT => purpose of json format??
(declare (unit rmt))
(declare (uses api))
(declare (uses tdb))
(declare (uses http-transport))
(declare (uses rpc-transport))
;;(declare (uses nmsg-transport))
(include "common_records.scm")
;;
;; THESE ARE ALL CALLED ON THE CLIENT SIDE!!!
;;
|
︙ | | |
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
|
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
|
+
+
+
+
-
+
|
(print "case 9")
(let* ((conninfo (remote-conndat *runremote*))
(dat (case (remote-transport *runremote*)
((http) (condition-case ;; handling here has caused a lot of problems. However it is needed to deal with attemtped communication to servers that have gone away
(http-transport:client-api-send-receive 0 conninfo cmd params)
((commfail)(vector #f "communications fail"))
((exn)(vector #f "other fail" (print-call-chain)))))
((rpc) (condition-case ;; handling here has caused a lot of problems. However it is needed to deal with attemtped communication to servers that have gone away
(rpc-transport:client-api-send-receive 0 conninfo cmd params)
((commfail)(vector #f "communications fail"))
((exn)(vector #f "other fail" (print-call-chain)))))
(else
(debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported")
(exit))))
(success (if (vector? dat) (vector-ref dat 0) #f))
(res (if (vector? dat) (vector-ref dat 1) #f)))
(if (vector? conninfo)(http-transport:server-dat-update-last-access conninfo)) ;; refresh access time
(print "case 9. conninfo=" conninfo " dat=" dat)
(if success
(case (remote-transport *runremote*)
((http) res)
((http rpc) res)
(else
(debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " is unknown")
(exit 1)))
(begin
(debug:print 0 *default-log-port* "WARNING: communication failed. Trying again, try num: " attemptnum)
(remote-conndat-set! *runremote* #f)
(remote-server-url-set! *runremote* #f)
|
︙ | | |
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
|
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
|
+
-
+
+
+
+
+
+
+
|
res))
(define (rmt:send-receive-no-auto-client-setup connection-info cmd run-id params)
(let* ((run-id (if run-id run-id 0))
(res (handle-exceptions
exn
#f
(case (remote-transport *runremote*)
(http-transport:client-api-send-receive run-id connection-info cmd params))))
((http) (http-transport:client-api-send-receive run-id connection-info cmd params))
((rpc) (rpc-transport:client-api-send-receive run-id connection-info cmd params))
(else
(debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (2)")
(exit))
)))
(if (and res (vector-ref res 0))
(vector-ref res 1) ;;; YES!! THIS IS CORRECT!! CHANGE IT HERE, THEN CHANGE rmt:send-receive ALSO!!!
#f)))
;; ;; Wrap json library for strings (why the ports crap in the first place?)
;; (define (rmt:dat->json-str dat)
;; (with-output-to-string
|
︙ | | |
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
|
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
|
-
+
+
+
+
+
+
|
(rmt:send-receive 'login run-id (list *toppath* megatest-version *my-client-signature*)))
;; This login does no retries under the hood - it acts a bit like a ping.
;; Deprecated for nmsg-transport.
;;
(define (rmt:login-no-auto-client-setup connection-info)
(case *transport-type* ;; run-id of 0 is just a placeholder
((http)(rmt:send-receive-no-auto-client-setup connection-info 'login 0 (list *toppath* megatest-version *my-client-signature*)))
((http rpc)(rmt:send-receive-no-auto-client-setup connection-info 'login 0 (list *toppath* megatest-version *my-client-signature*)))
(else
(debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (3)")
(exit))
;;((nmsg)(nmsg-transport:client-api-send-receive run-id connection-info 'login (list *toppath* megatest-version run-id *my-client-signature*)))
))
;; hand off a call to one of the db:queries statements
;; added run-id to make looking up the correct db possible
;;
(define (rmt:general-call stmtname run-id . params)
|
︙ | | |
︙ | | |
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
|
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
|
-
+
+
+
-
+
+
+
|
(if chatty (print " + last try failed with exception- return canned failure value >"failure-value"<"))
failure-value))))))))
(define (rpc-transport:server-shutdown server-id rpc:listener #!key (from-on-exit #f))
(on-exit (lambda () #t)) ;; turn off on-exit stuff
;;(tcp-close rpc:listener) ;; gotta exit nicely
;;(tasks:bb-server-set-state! server-id "stopped")
;;(tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "stopped")
;; TODO: (low) the following is extraordinaritly slow. Maybe we don't even need portlogger for rpc anyway?? the exception-based failover when ports are taken is fast!
;;(portlogger:open-run-close portlogger:set-port (rpc:default-server-port) "released")
(set! *time-to-exit* #t)
(if *inmemdb* (db:sync-touched *inmemdb* *run-id* force-sync: #t))
(tasks:bb-server-delete-record server-id " rpc-transport:keep-running complete")
(tasks:server-delete-record (db:delay-if-busy (tasks:open-db)) server-id " rpc-transport:keep-running complete")
;;(BB> "Before (exit) (from-on-exit="from-on-exit")")
(unless from-on-exit (exit)) ;; sometimes we hang (around) here with 100% cpu.
;;(BB> "After")
;; strace reveals endless:
;; getrusage(RUSAGE_SELF, {ru_utime={413, 917868}, ru_stime={0, 60003}, ...}) = 0
;; getrusage(RUSAGE_SELF, {ru_utime={414, 9874}, ru_stime={0, 60003}, ...}) = 0
;; getrusage(RUSAGE_SELF, {ru_utime={414, 13874}, ru_stime={0, 60003}, ...}) = 0
|
︙ | | |
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
|
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
|
-
+
-
+
|
(when (server:check-if-running run-id)
(debug:print 0 *default-log-port* "INFO: Server for run-id " run-id " already running")
(exit 0))
;; let's get a server-id for this server
;; if at first we do not suceed, try 3 more times.
(let ((server-id (retry-thunk
(lambda () (tasks:bb-server-lock-slot run-id 'rpc))
(lambda () (tasks:server-lock-slot (db:delay-if-busy (tasks:open-db)) run-id 'rpc))
chatty: #f
retries: 4)))
(when (not server-id) ;; dang we couldn't get a server-id.
;; since we didn't get the server lock we are going to clean up and bail out
(debug:print-info 2 *default-log-port* "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue")
(tasks:bb-server-delete-records-for-this-pid " rpc-transport:launch")
(tasks:server-delete-records-for-this-pid (db:delay-if-busy (tasks:open-db)) " rpc-transport:launch")
(exit 1))
;; we got a server-id (and a corresponding entry in servers table in globally shared mdb)
;; all systems go. Proceed to setup rpc server.
(rpc-transport:run
(if (args:get-arg "-server")
(args:get-arg "-server")
|
︙ | | |
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
|
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
|
-
+
|
;; BB> TODO: remove portlogger!
;; if rpc found it needed a different port than portlogger provided, keep portlogger in the loop.
;; (when (not (equal? start-port portnum))
;; (BB> "portlogger proffered "start-port" but rpc grabbed "portnum)
;; (portlogger:open-run-close portlogger:set-port start-port "released")
;; (portlogger:open-run-close portlogger:take-port portnum))
(tasks:bb-server-set-interface-port server-id ipaddrstr portnum)
(tasks:server-set-interface-port (db:delay-if-busy (tasks:open-db)) server-id ipaddrstr portnum)
;;============================================================
;; activate thread th1 to attach opened tcp port to rpc server
;;=============================================================
(thread-start! th1)
(set! db *inmemdb*)
|
︙ | | |
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
|
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
|
-
+
-
+
-
+
|
(debug:print 0 *default-log-port* "Error: rpc listener did not pass self test. Shutting down. On: " host:port)
(exit)))
(on-exit (lambda ()
(rpc-transport:server-shutdown server-id rpc:listener from-on-exit: #t)))
;; check again for running servers for this run-id in case one has snuck in since we checked last in rpc-transport:launch
(if (not (equal? server-id (tasks:bb-server-am-i-the-server? run-id)));; try to ensure no double registering of servers
(if (not (equal? server-id (tasks:server-am-i-the-server? (db:delay-if-busy (tasks:open-db)) run-id)));; try to ensure no double registering of servers
(begin ;; i am not the server, another server snuck in and beat this one to the punch
(tcp-close rpc:listener) ;; gotta exit nicely and free up that tcp port
(tasks:bb-server-set-state! server-id "collision"))
(tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "collision"))
(begin ;; i am the server
;; setup the in-memory db
(set! *inmemdb* (db:setup run-id))
(db:get-db *inmemdb* run-id)
;; let's make it official
(set! *rpc:listener* rpc:listener)
(tasks:bb-server-set-state! server-id "running") ;; update our mdb servers entry
(tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "running") ;; update our mdb servers entry
;; this let loop will hold open this thread until we want the server to shut down.
;; if no requests received within the last 20 seconds :
;; database hasnt changed in ??
;;
|
︙ | | |
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
|
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
|
-
-
+
+
|
(begin
(if (common:low-noise-print 120 "server continuing")
(debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access)))
;;
;; Consider implementing some smarts here to re-insert the record or kill self is
;; the db indicates so
;;
(if (tasks:bb-server-am-i-the-server? run-id)
(tasks:bb-server-set-state! server-id "running"))
(if (tasks:server-am-i-the-server? (db:delay-if-busy (tasks:open-db)) run-id)
(tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "running"))
;;
(loop 0 bad-sync-count))
(begin
;;(BB> "SERVER SHUTDOWN CALLED! last-access="last-access" current-seconds="(current-seconds)" server-timeout="server-timeout)
(rpc-transport:server-shutdown server-id rpc:listener)))))
;; end new loop
))))
|
︙ | | |
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
|
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
|
-
+
|
(begin
(debug:print-info 0 *default-log-port* "rpc-transport:client-setup CONNECTION ESTABLISHED run-id="run-id" server-dat=" server-dat)
(rmt:set-cinfo run-id runremote-server-dat) ;; (hash-table-set! *runremote* run-id runremote-server-dat) ;; side-effect - *runremote* cache init fpr rmt:*
runremote-server-dat)
(begin ;; login failed but have a server record, clean out the record and try again
(debug:print-info 0 *default-log-port* "rpc-transport:client-setup UNABLE TO CONNECT run-id="run-id" server-dat=" server-dat)
(tasks:kill-server-run-id run-id)
(tasks:bb-server-force-clean-run-record run-id iface port
(tasks:server-force-clean-run-record (db:delay-if-busy (tasks:open-db)) run-id iface port
" rpc-transport:client-setup (server-dat = #t)")
(if (> remtries 2)
(thread-sleep! (+ 1 (random 5))) ;; spread out the starts a little
(thread-sleep! (+ 15 (random 20)))) ;; it isn't going well. give it plenty of time
(server:try-running run-id)
(thread-sleep! 5) ;; give server a little time to start up
(client:setup run-id remaining-tries: (sub1 remtries))))))
|
︙ | | |
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
|
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
|
-
+
|
;; no longer care if multiple servers are started by accident. older servers will drop off in time.
;;
(define (server:check-if-running areapath)
(let* ((dotserver (server:read-dotserver areapath))) ;; tdbdat (tasks:open-db)))
(if dotserver
(let* ((res (case *transport-type*
((http)(server:ping-server dotserver))
((http rpc)(server:ping-server dotserver))
;; ((nmsg)(nmsg-transport:ping (tasks:hostinfo-get-interface server)
)))
(if res
dotserver
#f))
#f)))
|
︙ | | |
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
|
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
|
+
+
-
+
+
+
+
+
|
;; (print "host-port=" host-port)
(if (not host-port)
(begin
(debug:print 0 *default-log-port* "ERROR: bad host:port")
(if do-exit (exit 1)))
(let* ((iface (car host-port))
(port (cadr host-port))
(server-dat
(case (remote-transport *runremote*)
(server-dat (http-transport:client-connect iface port))
((http) (http-transport:client-connect iface port))
((rpc) (rpc-transport:client-connect iface port))
(else
(debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (4)")
(exit))))
(login-res (rmt:login-no-auto-client-setup server-dat)))
(if (and (list? login-res)
(car login-res))
(begin
(print "LOGIN_OK")
(if do-exit (exit 0)))
(begin
|
︙ | | |