Index: http-transport.scm
==================================================================
--- http-transport.scm
+++ http-transport.scm
@@ -84,11 +84,11 @@
((equal? (uri-path (request-uri (current-request)))
'(/ "api"))
(send-response body: (api:process-request *dbstruct-db* $) ;; the $ is the request vars proc
headers: '((content-type text/plain)))
(mutex-lock! *heartbeat-mutex*)
- (set! *db-last-access* (current-seconds))
+ (set! *last-db-access* (current-seconds))
(mutex-unlock! *heartbeat-mutex*))
((equal? (uri-path (request-uri (current-request)))
'(/ ""))
(send-response body: (http-transport:main-page)))
((equal? (uri-path (request-uri (current-request)))
@@ -426,13 +426,13 @@
(begin
(debug:print-info 0 *default-log-port* "interface changed, refreshing iface and port info")
(set! iface (car sdat))
(set! port (cadr sdat))))
- ;; Transfer *db-last-access* to last-access to use in checking that we are still alive
+ ;; Transfer *last-db-access* to last-access to use in checking that we are still alive
(mutex-lock! *heartbeat-mutex*)
- (set! last-access *db-last-access*)
+ (set! last-access *last-db-access*)
(mutex-unlock! *heartbeat-mutex*)
;; (debug:print 11 *default-log-port* "last-access=" last-access ", server-timeout=" server-timeout)
;;
;; no_traffic, no running tests, if server 0, no running servers
@@ -540,17 +540,17 @@
(begin
(debug:print 0 *default-log-port* "INFO: Server for run-id " run-id " already running")
(exit 0))
(begin ;; ok, no server detected, clean out any lingering records
(tasks:server-force-clean-running-records-for-run-id (db:delay-if-busy tdbdat) run-id "notresponding")))
- (let loop ((server-id (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id))
+ (let loop ((server-id (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id 'http))
(remtries 4))
(if (not server-id)
(if (> remtries 0)
(begin
(thread-sleep! 2)
- (loop (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id)
+ (loop (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id 'http)
(- remtries 1)))
(begin
;; since we didn't get the server lock we are going to clean up and bail out
(debug:print-info 2 *default-log-port* "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue")
(tasks:server-delete-records-for-this-pid (db:delay-if-busy tdbdat) " http-transport:launch")
@@ -638,11 +638,11 @@
"
Average non-cached time | " (if (eq? *number-non-write-queries* 0)
"n/a (no queries)"
(/ *total-non-write-delay*
*number-non-write-queries*))
" ms |
"
- "Last access | " (seconds->time-string *db-last-access*) " |
"
+ "Last access | " (seconds->time-string *last-db-access*) " |
"
"")))
(mutex-unlock! *heartbeat-mutex*)
res))
(define (http-transport:runs linkpath)
Index: rpc-transport.scm
==================================================================
--- rpc-transport.scm
+++ rpc-transport.scm
@@ -148,28 +148,28 @@
(begin
(if chatty (print " + last try failed with exception- return canned failure value >"failure-value"<"))
failure-value))))))))
-(define (rpc-transport:server-shutdown server-id rpc:listener #!key (from-on-exit #f))
- (on-exit (lambda () #t)) ;; turn off on-exit stuff
+(define (rpc-transport:server-shutdown server-id rpc:listener ) ;;#!key (from-on-exit #f))
+ ;;(on-exit (lambda () #t)) ;; turn off on-exit stuff
;;(tcp-close rpc:listener) ;; gotta exit nicely
;;(tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "stopped")
;; TODO: (low) the following is extraordinaritly slow. Maybe we don't even need portlogger for rpc anyway?? the exception-based failover when ports are taken is fast!
;;(portlogger:open-run-close portlogger:set-port (rpc:default-server-port) "released")
(set! *time-to-exit* #t)
- (if *inmemdb* (db:sync-touched *inmemdb* *run-id* force-sync: #t))
+ ;;(if *inmemdb* (db:sync-touched *inmemdb* *run-id* force-sync: #t))
(tasks:server-delete-record (db:delay-if-busy (tasks:open-db)) server-id " rpc-transport:keep-running complete")
;;(BB> "Before (exit) (from-on-exit="from-on-exit")")
- (unless from-on-exit (exit)) ;; sometimes we hang (around) here with 100% cpu.
+ ;;(unless from-on-exit (exit)) ;; sometimes we hang (around) here with 100% cpu.
;;(BB> "After")
;; strace reveals endless:
;; getrusage(RUSAGE_SELF, {ru_utime={413, 917868}, ru_stime={0, 60003}, ...}) = 0
;; getrusage(RUSAGE_SELF, {ru_utime={414, 9874}, ru_stime={0, 60003}, ...}) = 0
;; getrusage(RUSAGE_SELF, {ru_utime={414, 13874}, ru_stime={0, 60003}, ...}) = 0
@@ -180,13 +180,16 @@
;; getrusage(RUSAGE_SELF, {ru_utime={414, 297892}, ru_stime={0, 60003}, ...}) = 0
;; getrusage(RUSAGE_SELF, {ru_utime={414, 301892}, ru_stime={0, 60003}, ...}) = 0
;; getrusage(RUSAGE_SELF, {ru_utime={414, 393898}, ru_stime={0, 60003}, ...}) = 0
;; getrusage(RUSAGE_SELF, {ru_utime={414, 397898}, ru_stime={0, 60003}, ...}) = 0
;; make a post to chicken-users w/ http://paste.call-cc.org/paste?id=60a4b66a29ccf7d11359ea866db642c970735978
- (if from-on-exit
- ;; avoid above condition! End current process externally since 1 in 20 (exit)'s result in hung, 100% cpu zombies. (see above)
- (system (conc "kill -9 "(current-process-id))))
+
+
+ ;; (if from-on-exit
+ ;; ;; avoid above condition! End current process externally since 1 in 20 (exit)'s result in hung, 100% cpu zombies. (see above)
+
+ (system (conc "kill -9 "(current-process-id)))
)
;; all routes though here end in exit ...
;;
@@ -205,15 +208,53 @@
;; double check we dont alrady have a running server for this run-id
(when (server:check-if-running run-id)
(debug:print 0 *default-log-port* "INFO: Server for run-id " run-id " already running")
(exit 0))
+
+ ;; clean up dead servers (duped in megatest.scm in -list-servers processing; may want to consolidate into proc)
+ (for-each
+ (lambda (server)
+ (let* ((id (vector-ref server 0))
+ (pid (vector-ref server 1))
+ (hostname (vector-ref server 2))
+ (interface (vector-ref server 3))
+ (pullport (vector-ref server 4))
+ (pubport (vector-ref server 5))
+ (start-time (vector-ref server 6))
+ (priority (vector-ref server 7))
+ (state (vector-ref server 8))
+ (mt-ver (vector-ref server 9))
+ (last-update (vector-ref server 10))
+ (transport (vector-ref server 11))
+ (killed #f)
+ (status (< last-update 20)))
+
+ (if (equal? state "dead")
+ (if (> last-update (* 25 60 60)) ;; keep records around for slighly over a day.
+ (tasks:server-deregister (db:delay-if-busy (tasks:open-db)) hostname pullport: pullport pid: pid action: 'delete))
+ (if (> last-update 20) ;; Mark as dead if not updated in last 20 seconds
+ (tasks:server-deregister (db:delay-if-busy (tasks:open-db)) hostname pullport: pullport pid: pid)))
+ ;;(format #t fmtstr id mt-ver pid hostname (conc interface ":" pullport) pubport last-update
+ ;; (if status "alive" "dead") transport)
+ ;; (if (or (equal? id sid)
+ ;; (equal? sid 0)) ;; kill all/any
+ ;; (begin
+ ;; (debug:print-info 0 *default-log-port* "Attempting to kill "kill-switch" server with pid " pid)
+ ;; (tasks:kill-server hostname pid kill-switch: kill-switch)))
+
+ )
+
+ )
+ (tasks:get-all-servers (db:delay-if-busy (tasks:open-db))))
+
;; let's get a server-id for this server
;; if at first we do not suceed, try 3 more times.
(let ((server-id (retry-thunk
(lambda () (tasks:server-lock-slot (db:delay-if-busy (tasks:open-db)) run-id 'rpc))
chatty: #f
+ final-failure-returns-actual: #t
retries: 4)))
(when (not server-id) ;; dang we couldn't get a server-id.
;; since we didn't get the server lock we are going to clean up and bail out
(debug:print-info 2 *default-log-port* "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue")
(tasks:server-delete-records-for-this-pid (db:delay-if-busy (tasks:open-db)) " rpc-transport:launch")
@@ -390,17 +431,10 @@
#f))
(portnum (let ((res (rpc:default-server-port))) res))
(host:port (conc (if ipaddrstr ipaddrstr hostname) ":" portnum)))
- ;; BB> TODO: remove portlogger!
- ;; if rpc found it needed a different port than portlogger provided, keep portlogger in the loop.
- ;; (when (not (equal? start-port portnum))
- ;; (BB> "portlogger proffered "start-port" but rpc grabbed "portnum)
- ;; (portlogger:open-run-close portlogger:set-port start-port "released")
- ;; (portlogger:open-run-close portlogger:take-port portnum))
-
(tasks:server-set-interface-port (db:delay-if-busy (tasks:open-db)) server-id ipaddrstr portnum)
;;============================================================
;; activate thread th1 to attach opened tcp port to rpc server
;;=============================================================
@@ -410,18 +444,29 @@
(debug:print 0 *default-log-port* "Server started on " host:port)
;; (thread-sleep! 5)
(if (retry-thunk (lambda ()
- (rpc-transport:self-test run-id ipaddrstr portnum)))
+ (rpc-transport:self-test run-id ipaddrstr portnum))
+ final-failure-returns-actual: #t
+ )
(debug:print 0 *default-log-port* "INFO: rpc self test passed!")
(begin
(debug:print 0 *default-log-port* "Error: rpc listener did not pass self test. Shutting down. On: " host:port)
+ (BB> 1)
+ (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "dead")
+ (BB> 2)
+ (tcp-close rpc:listener) ;; gotta exit nicely and free up that tcp port
+ (BB> 3)
+ (rpc-transport:server-shutdown server-id rpc:listener)
(exit)))
-
- (on-exit (lambda ()
- (rpc-transport:server-shutdown server-id rpc:listener from-on-exit: #t)))
+ (mutex-lock! *heartbeat-mutex*)
+ (set! *last-db-access* (current-seconds))
+ (mutex-unlock! *heartbeat-mutex*)
+
+ ;;(on-exit (lambda ()
+ ;; (rpc-transport:server-shutdown server-id rpc:listener from-on-exit: #t)))
;; check again for running servers for this run-id in case one has snuck in since we checked last in rpc-transport:launch
(if (not (equal? server-id (tasks:server-am-i-the-server? (db:delay-if-busy (tasks:open-db)) run-id)));; try to ensure no double registering of servers
(begin ;; i am not the server, another server snuck in and beat this one to the punch
(tcp-close rpc:listener) ;; gotta exit nicely and free up that tcp port
@@ -555,21 +600,25 @@
(begin
(print "LOGIN_FAILED")
(exit 1))))))
(define (rpc-transport:self-test run-id host port)
+ (if (not host)
+ (abort "host not set."))
+ (if (not port)
+ (abort "port not set."))
(tcp-buffer-size 0) ;; gotta do this because http-transport undoes it.
(let* ((testing-res ((rpc:procedure 'testing host port)))
(login-res ((rpc:procedure 'server:login host port) *toppath*))
(res (and login-res (equal? testing-res "Just testing"))))
(if login-res
(begin
- ;;(BB> "Self test PASS. login-res="login-res" testing-res="testing-res" *toppath*="*toppath*)
+ (BB> "Self test PASS. login-res="login-res" testing-res="testing-res" *toppath*="*toppath*)
#t)
(begin
- ;;(BB> "Self test fail. login-res="login-res" testing-res="testing-res" *toppath*="*toppath*)
+ (BB> "Self test fail. login-res="login-res" testing-res="testing-res" *toppath*="*toppath*)
#f))
res))
(define (rpc-transport:client-setup run-id server-dat #!key (remtries 10))
Index: server.scm
==================================================================
--- server.scm
+++ server.scm
@@ -46,20 +46,24 @@
;; all routes though here end in exit ...
;;
;; start_server
;;
-(define (server:launch run-id transport-type)
- (BB> "server:launch fired for run-id="run-id" transport-type="transport-type)
- (case transport-type
- ((http)(http-transport:launch run-id))
- ;;((nmsg)(nmsg-transport:launch run-id))
- ((rpc) (rpc-transport:launch run-id))
- (else (debug:print-error 0 *default-log-port* "unknown server type " transport-type))))
-;; (else (debug:print-error 0 *default-log-port* "No known transport set, transport=" transport ", using rpc")
-;; (rpc-transport:launch run-id)))))
-
+(define (server:launch run-id transport-type-raw)
+ (let ((transport-type
+ (cond
+ ((string? transport-type-raw) (string->symbol transport-type-raw))
+ (else transport-type-raw))))
+
+ (BB> "server:launch fired for run-id="run-id" transport-type="transport-type)
+
+ (case transport-type
+ ((http)(http-transport:launch run-id))
+ ;;((nmsg)(nmsg-transport:launch run-id))
+ ((rpc) (rpc-transport:launch run-id))
+ (else (debug:print-error 0 *default-log-port* "unknown server type " transport-type)))))
+
;;======================================================================
;; S E R V E R U T I L I T I E S
;;======================================================================
;; Get the transport
Index: tasks.scm
==================================================================
--- tasks.scm
+++ tasks.scm
@@ -170,21 +170,21 @@
(define (tasks:hostinfo-get-pubport vec) (vector-ref vec 3))
(define (tasks:hostinfo-get-transport vec) (vector-ref vec 4))
(define (tasks:hostinfo-get-pid vec) (vector-ref vec 5))
(define (tasks:hostinfo-get-hostname vec) (vector-ref vec 6))
-(define (tasks:server-lock-slot mdb run-id)
+(define (tasks:server-lock-slot mdb run-id transport-type)
(tasks:server-clean-out-old-records-for-run-id mdb run-id " tasks:server-lock-slot")
(if (< (tasks:num-in-available-state mdb run-id) 4)
(begin
- (tasks:server-set-available mdb run-id)
+ (tasks:server-set-available mdb run-id transport-type)
(thread-sleep! (/ (random 1500) 1000)) ;; (thread-sleep! 2) ;; Try removing this. It may not be needed.
(tasks:server-am-i-the-server? mdb run-id))
#f))
;; register that this server may come online (first to register goes though with the process)
-(define (tasks:server-set-available mdb run-id)
+(define (tasks:server-set-available mdb run-id transport-type)
(sqlite3:execute
mdb
"INSERT INTO servers (pid,hostname,port,pubport,start_time, priority,state,mt_version,heartbeat, interface,transport,run_id)
VALUES(?, ?, ?, ?, strftime('%s','now'), ?, ?, ?,-1,?, ?, ?);"
(current-process-id) ;; pid
@@ -194,11 +194,11 @@
(random 1000) ;; priority (used a tiebreaker on get-available)
"available" ;; state
(common:version-signature) ;; mt_version
-1 ;; interface
;; (conc (server:get-transport)) ;; transport
- (conc *transport-type*) ;; transport
+ (symbol->string transport-type) ;; transport
run-id
))
(define (tasks:num-in-available-state mdb run-id)
(let ((res 0))