149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
|
(if chatty (print " + last try failed with exception- return canned failure value >"failure-value"<"))
failure-value))))))))
(define (rpc-transport:server-shutdown server-id rpc:listener #!key (from-on-exit #f))
(on-exit (lambda () #t)) ;; turn off on-exit stuff
;;(tcp-close rpc:listener) ;; gotta exit nicely
;;(tasks:bb-server-set-state! server-id "stopped")
;; TODO: (low) the following is extraordinaritly slow. Maybe we don't even need portlogger for rpc anyway?? the exception-based failover when ports are taken is fast!
;;(portlogger:open-run-close portlogger:set-port (rpc:default-server-port) "released")
(set! *time-to-exit* #t)
(if *inmemdb* (db:sync-touched *inmemdb* *run-id* force-sync: #t))
(tasks:bb-server-delete-record server-id " rpc-transport:keep-running complete")
;;(BB> "Before (exit) (from-on-exit="from-on-exit")")
(unless from-on-exit (exit)) ;; sometimes we hang (around) here with 100% cpu.
;;(BB> "After")
;; strace reveals endless:
;; getrusage(RUSAGE_SELF, {ru_utime={413, 917868}, ru_stime={0, 60003}, ...}) = 0
;; getrusage(RUSAGE_SELF, {ru_utime={414, 9874}, ru_stime={0, 60003}, ...}) = 0
;; getrusage(RUSAGE_SELF, {ru_utime={414, 13874}, ru_stime={0, 60003}, ...}) = 0
|
|
>
>
|
>
>
|
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
|
(if chatty (print " + last try failed with exception- return canned failure value >"failure-value"<"))
failure-value))))))))
(define (rpc-transport:server-shutdown server-id rpc:listener #!key (from-on-exit #f))
(on-exit (lambda () #t)) ;; turn off on-exit stuff
;;(tcp-close rpc:listener) ;; gotta exit nicely
;;(tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "stopped")
;; TODO: (low) the following is extraordinaritly slow. Maybe we don't even need portlogger for rpc anyway?? the exception-based failover when ports are taken is fast!
;;(portlogger:open-run-close portlogger:set-port (rpc:default-server-port) "released")
(set! *time-to-exit* #t)
(if *inmemdb* (db:sync-touched *inmemdb* *run-id* force-sync: #t))
(tasks:server-delete-record (db:delay-if-busy (tasks:open-db)) server-id " rpc-transport:keep-running complete")
;;(BB> "Before (exit) (from-on-exit="from-on-exit")")
(unless from-on-exit (exit)) ;; sometimes we hang (around) here with 100% cpu.
;;(BB> "After")
;; strace reveals endless:
;; getrusage(RUSAGE_SELF, {ru_utime={413, 917868}, ru_stime={0, 60003}, ...}) = 0
;; getrusage(RUSAGE_SELF, {ru_utime={414, 9874}, ru_stime={0, 60003}, ...}) = 0
;; getrusage(RUSAGE_SELF, {ru_utime={414, 13874}, ru_stime={0, 60003}, ...}) = 0
|
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
|
(when (server:check-if-running run-id)
(debug:print 0 *default-log-port* "INFO: Server for run-id " run-id " already running")
(exit 0))
;; let's get a server-id for this server
;; if at first we do not suceed, try 3 more times.
(let ((server-id (retry-thunk
(lambda () (tasks:bb-server-lock-slot run-id 'rpc))
chatty: #f
retries: 4)))
(when (not server-id) ;; dang we couldn't get a server-id.
;; since we didn't get the server lock we are going to clean up and bail out
(debug:print-info 2 *default-log-port* "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue")
(tasks:bb-server-delete-records-for-this-pid " rpc-transport:launch")
(exit 1))
;; we got a server-id (and a corresponding entry in servers table in globally shared mdb)
;; all systems go. Proceed to setup rpc server.
(rpc-transport:run
(if (args:get-arg "-server")
(args:get-arg "-server")
|
|
|
|
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
|
(when (server:check-if-running run-id)
(debug:print 0 *default-log-port* "INFO: Server for run-id " run-id " already running")
(exit 0))
;; let's get a server-id for this server
;; if at first we do not suceed, try 3 more times.
(let ((server-id (retry-thunk
(lambda () (tasks:server-lock-slot (db:delay-if-busy (tasks:open-db)) run-id 'rpc))
chatty: #f
retries: 4)))
(when (not server-id) ;; dang we couldn't get a server-id.
;; since we didn't get the server lock we are going to clean up and bail out
(debug:print-info 2 *default-log-port* "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue")
(tasks:server-delete-records-for-this-pid (db:delay-if-busy (tasks:open-db)) " rpc-transport:launch")
(exit 1))
;; we got a server-id (and a corresponding entry in servers table in globally shared mdb)
;; all systems go. Proceed to setup rpc server.
(rpc-transport:run
(if (args:get-arg "-server")
(args:get-arg "-server")
|
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
|
;; BB> TODO: remove portlogger!
;; if rpc found it needed a different port than portlogger provided, keep portlogger in the loop.
;; (when (not (equal? start-port portnum))
;; (BB> "portlogger proffered "start-port" but rpc grabbed "portnum)
;; (portlogger:open-run-close portlogger:set-port start-port "released")
;; (portlogger:open-run-close portlogger:take-port portnum))
(tasks:bb-server-set-interface-port server-id ipaddrstr portnum)
;;============================================================
;; activate thread th1 to attach opened tcp port to rpc server
;;=============================================================
(thread-start! th1)
(set! db *inmemdb*)
|
|
|
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
|
;; BB> TODO: remove portlogger!
;; if rpc found it needed a different port than portlogger provided, keep portlogger in the loop.
;; (when (not (equal? start-port portnum))
;; (BB> "portlogger proffered "start-port" but rpc grabbed "portnum)
;; (portlogger:open-run-close portlogger:set-port start-port "released")
;; (portlogger:open-run-close portlogger:take-port portnum))
(tasks:server-set-interface-port (db:delay-if-busy (tasks:open-db)) server-id ipaddrstr portnum)
;;============================================================
;; activate thread th1 to attach opened tcp port to rpc server
;;=============================================================
(thread-start! th1)
(set! db *inmemdb*)
|
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
|
(debug:print 0 *default-log-port* "Error: rpc listener did not pass self test. Shutting down. On: " host:port)
(exit)))
(on-exit (lambda ()
(rpc-transport:server-shutdown server-id rpc:listener from-on-exit: #t)))
;; check again for running servers for this run-id in case one has snuck in since we checked last in rpc-transport:launch
(if (not (equal? server-id (tasks:bb-server-am-i-the-server? run-id)));; try to ensure no double registering of servers
(begin ;; i am not the server, another server snuck in and beat this one to the punch
(tcp-close rpc:listener) ;; gotta exit nicely and free up that tcp port
(tasks:bb-server-set-state! server-id "collision"))
(begin ;; i am the server
;; setup the in-memory db
(set! *inmemdb* (db:setup run-id))
(db:get-db *inmemdb* run-id)
;; let's make it official
(set! *rpc:listener* rpc:listener)
(tasks:bb-server-set-state! server-id "running") ;; update our mdb servers entry
;; this let loop will hold open this thread until we want the server to shut down.
;; if no requests received within the last 20 seconds :
;; database hasnt changed in ??
;;
|
|
|
|
|
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
|
(debug:print 0 *default-log-port* "Error: rpc listener did not pass self test. Shutting down. On: " host:port)
(exit)))
(on-exit (lambda ()
(rpc-transport:server-shutdown server-id rpc:listener from-on-exit: #t)))
;; check again for running servers for this run-id in case one has snuck in since we checked last in rpc-transport:launch
(if (not (equal? server-id (tasks:server-am-i-the-server? (db:delay-if-busy (tasks:open-db)) run-id)));; try to ensure no double registering of servers
(begin ;; i am not the server, another server snuck in and beat this one to the punch
(tcp-close rpc:listener) ;; gotta exit nicely and free up that tcp port
(tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "collision"))
(begin ;; i am the server
;; setup the in-memory db
(set! *inmemdb* (db:setup run-id))
(db:get-db *inmemdb* run-id)
;; let's make it official
(set! *rpc:listener* rpc:listener)
(tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "running") ;; update our mdb servers entry
;; this let loop will hold open this thread until we want the server to shut down.
;; if no requests received within the last 20 seconds :
;; database hasnt changed in ??
;;
|
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
|
(begin
(if (common:low-noise-print 120 "server continuing")
(debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access)))
;;
;; Consider implementing some smarts here to re-insert the record or kill self is
;; the db indicates so
;;
(if (tasks:bb-server-am-i-the-server? run-id)
(tasks:bb-server-set-state! server-id "running"))
;;
(loop 0 bad-sync-count))
(begin
;;(BB> "SERVER SHUTDOWN CALLED! last-access="last-access" current-seconds="(current-seconds)" server-timeout="server-timeout)
(rpc-transport:server-shutdown server-id rpc:listener)))))
;; end new loop
))))
|
|
|
|
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
|
(begin
(if (common:low-noise-print 120 "server continuing")
(debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access)))
;;
;; Consider implementing some smarts here to re-insert the record or kill self is
;; the db indicates so
;;
(if (tasks:server-am-i-the-server? (db:delay-if-busy (tasks:open-db)) run-id)
(tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "running"))
;;
(loop 0 bad-sync-count))
(begin
;;(BB> "SERVER SHUTDOWN CALLED! last-access="last-access" current-seconds="(current-seconds)" server-timeout="server-timeout)
(rpc-transport:server-shutdown server-id rpc:listener)))))
;; end new loop
))))
|
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
|
(begin
(debug:print-info 0 *default-log-port* "rpc-transport:client-setup CONNECTION ESTABLISHED run-id="run-id" server-dat=" server-dat)
(rmt:set-cinfo run-id runremote-server-dat) ;; (hash-table-set! *runremote* run-id runremote-server-dat) ;; side-effect - *runremote* cache init fpr rmt:*
runremote-server-dat)
(begin ;; login failed but have a server record, clean out the record and try again
(debug:print-info 0 *default-log-port* "rpc-transport:client-setup UNABLE TO CONNECT run-id="run-id" server-dat=" server-dat)
(tasks:kill-server-run-id run-id)
(tasks:bb-server-force-clean-run-record run-id iface port
" rpc-transport:client-setup (server-dat = #t)")
(if (> remtries 2)
(thread-sleep! (+ 1 (random 5))) ;; spread out the starts a little
(thread-sleep! (+ 15 (random 20)))) ;; it isn't going well. give it plenty of time
(server:try-running run-id)
(thread-sleep! 5) ;; give server a little time to start up
(client:setup run-id remaining-tries: (sub1 remtries))))))
|
|
|
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
|
(begin
(debug:print-info 0 *default-log-port* "rpc-transport:client-setup CONNECTION ESTABLISHED run-id="run-id" server-dat=" server-dat)
(rmt:set-cinfo run-id runremote-server-dat) ;; (hash-table-set! *runremote* run-id runremote-server-dat) ;; side-effect - *runremote* cache init fpr rmt:*
runremote-server-dat)
(begin ;; login failed but have a server record, clean out the record and try again
(debug:print-info 0 *default-log-port* "rpc-transport:client-setup UNABLE TO CONNECT run-id="run-id" server-dat=" server-dat)
(tasks:kill-server-run-id run-id)
(tasks:server-force-clean-run-record (db:delay-if-busy (tasks:open-db)) run-id iface port
" rpc-transport:client-setup (server-dat = #t)")
(if (> remtries 2)
(thread-sleep! (+ 1 (random 5))) ;; spread out the starts a little
(thread-sleep! (+ 15 (random 20)))) ;; it isn't going well. give it plenty of time
(server:try-running run-id)
(thread-sleep! 5) ;; give server a little time to start up
(client:setup run-id remaining-tries: (sub1 remtries))))))
|