Index: http-transport.scm ================================================================== --- http-transport.scm +++ http-transport.scm @@ -84,11 +84,11 @@ ((equal? (uri-path (request-uri (current-request))) '(/ "api")) (send-response body: (api:process-request *dbstruct-db* $) ;; the $ is the request vars proc headers: '((content-type text/plain))) (mutex-lock! *heartbeat-mutex*) - (set! *db-last-access* (current-seconds)) + (set! *last-db-access* (current-seconds)) (mutex-unlock! *heartbeat-mutex*)) ((equal? (uri-path (request-uri (current-request))) '(/ "")) (send-response body: (http-transport:main-page))) ((equal? (uri-path (request-uri (current-request))) @@ -426,13 +426,13 @@ (begin (debug:print-info 0 *default-log-port* "interface changed, refreshing iface and port info") (set! iface (car sdat)) (set! port (cadr sdat)))) - ;; Transfer *db-last-access* to last-access to use in checking that we are still alive + ;; Transfer *last-db-access* to last-access to use in checking that we are still alive (mutex-lock! *heartbeat-mutex*) - (set! last-access *db-last-access*) + (set! last-access *last-db-access*) (mutex-unlock! *heartbeat-mutex*) ;; (debug:print 11 *default-log-port* "last-access=" last-access ", server-timeout=" server-timeout) ;; ;; no_traffic, no running tests, if server 0, no running servers @@ -540,17 +540,17 @@ (begin (debug:print 0 *default-log-port* "INFO: Server for run-id " run-id " already running") (exit 0)) (begin ;; ok, no server detected, clean out any lingering records (tasks:server-force-clean-running-records-for-run-id (db:delay-if-busy tdbdat) run-id "notresponding"))) - (let loop ((server-id (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id)) + (let loop ((server-id (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id 'http)) (remtries 4)) (if (not server-id) (if (> remtries 0) (begin (thread-sleep! 2) - (loop (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id) + (loop (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id 'http) (- remtries 1))) (begin ;; since we didn't get the server lock we are going to clean up and bail out (debug:print-info 2 *default-log-port* "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue") (tasks:server-delete-records-for-this-pid (db:delay-if-busy tdbdat) " http-transport:launch") @@ -638,11 +638,11 @@ "Average non-cached time " (if (eq? *number-non-write-queries* 0) "n/a (no queries)" (/ *total-non-write-delay* *number-non-write-queries*)) " ms" - "Last access" (seconds->time-string *db-last-access*) "" + "Last access" (seconds->time-string *last-db-access*) "" ""))) (mutex-unlock! *heartbeat-mutex*) res)) (define (http-transport:runs linkpath) Index: rpc-transport.scm ================================================================== --- rpc-transport.scm +++ rpc-transport.scm @@ -148,28 +148,28 @@ (begin (if chatty (print " + last try failed with exception- return canned failure value >"failure-value"<")) failure-value)))))))) -(define (rpc-transport:server-shutdown server-id rpc:listener #!key (from-on-exit #f)) - (on-exit (lambda () #t)) ;; turn off on-exit stuff +(define (rpc-transport:server-shutdown server-id rpc:listener ) ;;#!key (from-on-exit #f)) + ;;(on-exit (lambda () #t)) ;; turn off on-exit stuff ;;(tcp-close rpc:listener) ;; gotta exit nicely ;;(tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "stopped") ;; TODO: (low) the following is extraordinaritly slow. Maybe we don't even need portlogger for rpc anyway?? the exception-based failover when ports are taken is fast! ;;(portlogger:open-run-close portlogger:set-port (rpc:default-server-port) "released") (set! *time-to-exit* #t) - (if *inmemdb* (db:sync-touched *inmemdb* *run-id* force-sync: #t)) + ;;(if *inmemdb* (db:sync-touched *inmemdb* *run-id* force-sync: #t)) (tasks:server-delete-record (db:delay-if-busy (tasks:open-db)) server-id " rpc-transport:keep-running complete") ;;(BB> "Before (exit) (from-on-exit="from-on-exit")") - (unless from-on-exit (exit)) ;; sometimes we hang (around) here with 100% cpu. + ;;(unless from-on-exit (exit)) ;; sometimes we hang (around) here with 100% cpu. ;;(BB> "After") ;; strace reveals endless: ;; getrusage(RUSAGE_SELF, {ru_utime={413, 917868}, ru_stime={0, 60003}, ...}) = 0 ;; getrusage(RUSAGE_SELF, {ru_utime={414, 9874}, ru_stime={0, 60003}, ...}) = 0 ;; getrusage(RUSAGE_SELF, {ru_utime={414, 13874}, ru_stime={0, 60003}, ...}) = 0 @@ -180,13 +180,16 @@ ;; getrusage(RUSAGE_SELF, {ru_utime={414, 297892}, ru_stime={0, 60003}, ...}) = 0 ;; getrusage(RUSAGE_SELF, {ru_utime={414, 301892}, ru_stime={0, 60003}, ...}) = 0 ;; getrusage(RUSAGE_SELF, {ru_utime={414, 393898}, ru_stime={0, 60003}, ...}) = 0 ;; getrusage(RUSAGE_SELF, {ru_utime={414, 397898}, ru_stime={0, 60003}, ...}) = 0 ;; make a post to chicken-users w/ http://paste.call-cc.org/paste?id=60a4b66a29ccf7d11359ea866db642c970735978 - (if from-on-exit - ;; avoid above condition! End current process externally since 1 in 20 (exit)'s result in hung, 100% cpu zombies. (see above) - (system (conc "kill -9 "(current-process-id)))) + + + ;; (if from-on-exit + ;; ;; avoid above condition! End current process externally since 1 in 20 (exit)'s result in hung, 100% cpu zombies. (see above) + + (system (conc "kill -9 "(current-process-id))) ) ;; all routes though here end in exit ... ;; @@ -205,15 +208,53 @@ ;; double check we dont alrady have a running server for this run-id (when (server:check-if-running run-id) (debug:print 0 *default-log-port* "INFO: Server for run-id " run-id " already running") (exit 0)) + + ;; clean up dead servers (duped in megatest.scm in -list-servers processing; may want to consolidate into proc) + (for-each + (lambda (server) + (let* ((id (vector-ref server 0)) + (pid (vector-ref server 1)) + (hostname (vector-ref server 2)) + (interface (vector-ref server 3)) + (pullport (vector-ref server 4)) + (pubport (vector-ref server 5)) + (start-time (vector-ref server 6)) + (priority (vector-ref server 7)) + (state (vector-ref server 8)) + (mt-ver (vector-ref server 9)) + (last-update (vector-ref server 10)) + (transport (vector-ref server 11)) + (killed #f) + (status (< last-update 20))) + + (if (equal? state "dead") + (if (> last-update (* 25 60 60)) ;; keep records around for slighly over a day. + (tasks:server-deregister (db:delay-if-busy (tasks:open-db)) hostname pullport: pullport pid: pid action: 'delete)) + (if (> last-update 20) ;; Mark as dead if not updated in last 20 seconds + (tasks:server-deregister (db:delay-if-busy (tasks:open-db)) hostname pullport: pullport pid: pid))) + ;;(format #t fmtstr id mt-ver pid hostname (conc interface ":" pullport) pubport last-update + ;; (if status "alive" "dead") transport) + ;; (if (or (equal? id sid) + ;; (equal? sid 0)) ;; kill all/any + ;; (begin + ;; (debug:print-info 0 *default-log-port* "Attempting to kill "kill-switch" server with pid " pid) + ;; (tasks:kill-server hostname pid kill-switch: kill-switch))) + + ) + + ) + (tasks:get-all-servers (db:delay-if-busy (tasks:open-db)))) + ;; let's get a server-id for this server ;; if at first we do not suceed, try 3 more times. (let ((server-id (retry-thunk (lambda () (tasks:server-lock-slot (db:delay-if-busy (tasks:open-db)) run-id 'rpc)) chatty: #f + final-failure-returns-actual: #t retries: 4))) (when (not server-id) ;; dang we couldn't get a server-id. ;; since we didn't get the server lock we are going to clean up and bail out (debug:print-info 2 *default-log-port* "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue") (tasks:server-delete-records-for-this-pid (db:delay-if-busy (tasks:open-db)) " rpc-transport:launch") @@ -390,17 +431,10 @@ #f)) (portnum (let ((res (rpc:default-server-port))) res)) (host:port (conc (if ipaddrstr ipaddrstr hostname) ":" portnum))) - ;; BB> TODO: remove portlogger! - ;; if rpc found it needed a different port than portlogger provided, keep portlogger in the loop. - ;; (when (not (equal? start-port portnum)) - ;; (BB> "portlogger proffered "start-port" but rpc grabbed "portnum) - ;; (portlogger:open-run-close portlogger:set-port start-port "released") - ;; (portlogger:open-run-close portlogger:take-port portnum)) - (tasks:server-set-interface-port (db:delay-if-busy (tasks:open-db)) server-id ipaddrstr portnum) ;;============================================================ ;; activate thread th1 to attach opened tcp port to rpc server ;;============================================================= @@ -410,18 +444,29 @@ (debug:print 0 *default-log-port* "Server started on " host:port) ;; (thread-sleep! 5) (if (retry-thunk (lambda () - (rpc-transport:self-test run-id ipaddrstr portnum))) + (rpc-transport:self-test run-id ipaddrstr portnum)) + final-failure-returns-actual: #t + ) (debug:print 0 *default-log-port* "INFO: rpc self test passed!") (begin (debug:print 0 *default-log-port* "Error: rpc listener did not pass self test. Shutting down. On: " host:port) + (BB> 1) + (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "dead") + (BB> 2) + (tcp-close rpc:listener) ;; gotta exit nicely and free up that tcp port + (BB> 3) + (rpc-transport:server-shutdown server-id rpc:listener) (exit))) - - (on-exit (lambda () - (rpc-transport:server-shutdown server-id rpc:listener from-on-exit: #t))) + (mutex-lock! *heartbeat-mutex*) + (set! *last-db-access* (current-seconds)) + (mutex-unlock! *heartbeat-mutex*) + + ;;(on-exit (lambda () + ;; (rpc-transport:server-shutdown server-id rpc:listener from-on-exit: #t))) ;; check again for running servers for this run-id in case one has snuck in since we checked last in rpc-transport:launch (if (not (equal? server-id (tasks:server-am-i-the-server? (db:delay-if-busy (tasks:open-db)) run-id)));; try to ensure no double registering of servers (begin ;; i am not the server, another server snuck in and beat this one to the punch (tcp-close rpc:listener) ;; gotta exit nicely and free up that tcp port @@ -555,21 +600,25 @@ (begin (print "LOGIN_FAILED") (exit 1)))))) (define (rpc-transport:self-test run-id host port) + (if (not host) + (abort "host not set.")) + (if (not port) + (abort "port not set.")) (tcp-buffer-size 0) ;; gotta do this because http-transport undoes it. (let* ((testing-res ((rpc:procedure 'testing host port))) (login-res ((rpc:procedure 'server:login host port) *toppath*)) (res (and login-res (equal? testing-res "Just testing")))) (if login-res (begin - ;;(BB> "Self test PASS. login-res="login-res" testing-res="testing-res" *toppath*="*toppath*) + (BB> "Self test PASS. login-res="login-res" testing-res="testing-res" *toppath*="*toppath*) #t) (begin - ;;(BB> "Self test fail. login-res="login-res" testing-res="testing-res" *toppath*="*toppath*) + (BB> "Self test fail. login-res="login-res" testing-res="testing-res" *toppath*="*toppath*) #f)) res)) (define (rpc-transport:client-setup run-id server-dat #!key (remtries 10)) Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -46,20 +46,24 @@ ;; all routes though here end in exit ... ;; ;; start_server ;; -(define (server:launch run-id transport-type) - (BB> "server:launch fired for run-id="run-id" transport-type="transport-type) - (case transport-type - ((http)(http-transport:launch run-id)) - ;;((nmsg)(nmsg-transport:launch run-id)) - ((rpc) (rpc-transport:launch run-id)) - (else (debug:print-error 0 *default-log-port* "unknown server type " transport-type)))) -;; (else (debug:print-error 0 *default-log-port* "No known transport set, transport=" transport ", using rpc") -;; (rpc-transport:launch run-id))))) - +(define (server:launch run-id transport-type-raw) + (let ((transport-type + (cond + ((string? transport-type-raw) (string->symbol transport-type-raw)) + (else transport-type-raw)))) + + (BB> "server:launch fired for run-id="run-id" transport-type="transport-type) + + (case transport-type + ((http)(http-transport:launch run-id)) + ;;((nmsg)(nmsg-transport:launch run-id)) + ((rpc) (rpc-transport:launch run-id)) + (else (debug:print-error 0 *default-log-port* "unknown server type " transport-type))))) + ;;====================================================================== ;; S E R V E R U T I L I T I E S ;;====================================================================== ;; Get the transport Index: tasks.scm ================================================================== --- tasks.scm +++ tasks.scm @@ -170,21 +170,21 @@ (define (tasks:hostinfo-get-pubport vec) (vector-ref vec 3)) (define (tasks:hostinfo-get-transport vec) (vector-ref vec 4)) (define (tasks:hostinfo-get-pid vec) (vector-ref vec 5)) (define (tasks:hostinfo-get-hostname vec) (vector-ref vec 6)) -(define (tasks:server-lock-slot mdb run-id) +(define (tasks:server-lock-slot mdb run-id transport-type) (tasks:server-clean-out-old-records-for-run-id mdb run-id " tasks:server-lock-slot") (if (< (tasks:num-in-available-state mdb run-id) 4) (begin - (tasks:server-set-available mdb run-id) + (tasks:server-set-available mdb run-id transport-type) (thread-sleep! (/ (random 1500) 1000)) ;; (thread-sleep! 2) ;; Try removing this. It may not be needed. (tasks:server-am-i-the-server? mdb run-id)) #f)) ;; register that this server may come online (first to register goes though with the process) -(define (tasks:server-set-available mdb run-id) +(define (tasks:server-set-available mdb run-id transport-type) (sqlite3:execute mdb "INSERT INTO servers (pid,hostname,port,pubport,start_time, priority,state,mt_version,heartbeat, interface,transport,run_id) VALUES(?, ?, ?, ?, strftime('%s','now'), ?, ?, ?,-1,?, ?, ?);" (current-process-id) ;; pid @@ -194,11 +194,11 @@ (random 1000) ;; priority (used a tiebreaker on get-available) "available" ;; state (common:version-signature) ;; mt_version -1 ;; interface ;; (conc (server:get-transport)) ;; transport - (conc *transport-type*) ;; transport + (symbol->string transport-type) ;; transport run-id )) (define (tasks:num-in-available-state mdb run-id) (let ((res 0))