Index: http-transport.scm ================================================================== --- http-transport.scm +++ http-transport.scm @@ -59,13 +59,17 @@ (hostname (get-host-name)) (ipaddrstr (let ((ipstr (if (string=? "-" hostn) (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") #f))) (if ipstr ipstr hostn))) ;; hostname))) - (start-port (if (args:get-arg "-port") + (start-port (if (and (args:get-arg "-port") + (string->number (args:get-arg "-port"))) (string->number (args:get-arg "-port")) - (+ 5000 (random 1001)))) + (if (and (config-lookup *configdat* "server" "port") + (string->number (config-lookup *configdat* "server" "port"))) + (string->number (config-lookup *configdat* "server" "port")) + (+ 5000 (random 1001))))) (link-tree-path (config-lookup *configdat* "setup" "linktree"))) (set! *cache-on* #t) (root-path (if link-tree-path link-tree-path (current-directory))) ;; WARNING: SECURITY HOLE. FIX ASAP! @@ -132,17 +136,10 @@ (print "INFO: Trying to start server on " ipaddrstr ":" portnum) ;; This starts the spiffy server (start-server port: portnum) (print "INFO: server has been stopped"))) -(define (http-transport:mk-signature) - (message-digest-string (md5-primitive) - (with-output-to-string - (lambda () - (write (list (current-directory) - (argv))))))) - ;;====================================================================== ;; S E R V E R U T I L I T I E S ;;====================================================================== ;;====================================================================== @@ -218,12 +215,18 @@ (loop)))))) (iface (car server-info)) (port (cadr server-info)) (last-access 0) (tdb (tasks:open-db)) - (spid (tasks:server-get-server-id tdb #f iface port #f))) - (print "Keep-running got server pid " spid ", using iface " iface " and port " port) + (spid (tasks:server-get-server-id tdb #f iface port #f)) + (server-timeout (let ((tmo (config-lookup *configdat* "server" "timeout"))) + (if (and (string? tmo) + (string->number tmo)) + (* 60 60 (string->number tmo)) + ;; default to three days + (* 3 24 60))))) + (debug:print-info 2 "server-timeout: " server-timeout ", server pid: " spid " on " iface ":" port) (let loop ((count 0)) (thread-sleep! 4) ;; no need to do this very often ;; NB// sync currently does NOT return queue-length (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1))) ;; (print "Server running, count is " count) @@ -235,16 +238,12 @@ ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access (mutex-lock! *heartbeat-mutex*) (set! last-access *last-db-access*) (mutex-unlock! *heartbeat-mutex*) - (if (> (+ last-access - ;; (* 50 60 60) ;; 48 hrs - ;; 60 ;; one minute - ;; (* 60 60) ;; one hour - (* 45 60) ;; 45 minutes, until the db deletion bug is fixed. - ) + ;; (debug:print 11 "last-access=" last-access ", server-timeout=" server-timeout) + (if (> (+ last-access server-timeout) (current-seconds)) (begin (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) (loop 0)) (begin @@ -265,23 +264,40 @@ (debug:print 0 "ERROR: cannot find megatest.config, exiting") (exit)))) (debug:print-info 2 "Starting the standalone server") (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) (debug:print 11 "http-transport:launch hostinfo=" hostinfo) + ;; #(1 "143.182.207.24" 5736 -1 "http" 22771 "hostname") (if hostinfo - (debug:print-info 2 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo)) + (debug:print-info 2 "NOT starting new server, one is already running on " (vector-ref hostinfo 1) ":" (vector-ref hostinfo 2)) (if *toppath* (let* ((th2 (make-thread (lambda () (http-transport:run (if (args:get-arg "-server") (args:get-arg "-server") "-"))) "Server run")) - (th3 (make-thread (lambda ()(http-transport:keep-running)) "Keep running")) - ) + (th3 (make-thread (lambda ()(http-transport:keep-running)) "Keep running"))) (thread-start! th2) (thread-start! th3) (set! *didsomething* #t) - (thread-join! th2) - ) + (thread-join! th2)) (debug:print 0 "ERROR: Failed to setup for megatest"))) (exit))) +(define (http-transport:server-signal-handler signum) + (handle-exceptions + exn + (debug:print " ... exiting ...") + (let ((th1 (make-thread (lambda () + (thread-sleep! 1)) + ;; (if (not *received-response*) + ;; (receive-message* *runremote*))) ;; flush out last call if applicable + "eat response")) + (th2 (make-thread (lambda () + (debug:print 0 "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.") + (thread-sleep! 3) ;; give the flush three seconds to do it's stuff + (debug:print 0 " Done.") + (exit 4)) + "exit on ^C timer"))) + (thread-start! th2) + (thread-start! th1) + (thread-join! th2)))) DELETED rpc-transport.scm Index: rpc-transport.scm ================================================================== --- rpc-transport.scm +++ /dev/null @@ -1,381 +0,0 @@ - -;; Copyright 2006-2012, Matthew Welland. -;; -;; This program is made available under the GNU GPL version 2.0 or -;; greater. See the accompanying file COPYING for details. -;; -;; This program is distributed WITHOUT ANY WARRANTY; without even the -;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR -;; PURPOSE. - -(require-extension (srfi 18) extras tcp s11n) - -(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) -(import (prefix sqlite3 sqlite3:)) - -(use spiffy uri-common intarweb http-client spiffy-request-vars) - -(tcp-buffer-size 2048) - -(declare (unit server)) - -(declare (uses common)) -(declare (uses db)) -(declare (uses tests)) -(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. - -(include "common_records.scm") -(include "db_records.scm") - -(define (rpc-server:make-server-url hostport) - (if (not hostport) - #f - (conc "http://" (car hostport) ":" (cadr hostport)))) - -(define *server-loop-heart-beat* (current-seconds)) -(define *heartbeat-mutex* (make-mutex)) - -;;====================================================================== -;; S E R V E R -;;====================================================================== - -;; Call this to start the actual server -;; - -(define *db:process-queue-mutex* (make-mutex)) - -(define (rpc-server:run hostn) - (debug:print 2 "Attempting to start the server ...") - (if (not *toppath*) - (if (not (setup-for-run)) - (begin - (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") - (exit)))) - (let* (;; (iface (if (string=? "-" hostn) - ;; #f ;; (get-host-name) - ;; hostn)) - (db #f) ;; (open-db)) ;; we don't want the server to be opening and closing the db unnecesarily - (hostname (get-host-name)) - (ipaddrstr (let ((ipstr (if (string=? "-" hostn) - (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") - #f))) - (if ipstr ipstr hostn))) ;; hostname))) - (start-port (if (args:get-arg "-port") - (string->number (args:get-arg "-port")) - (+ 5000 (random 1001)))) - (link-tree-path (config-lookup *configdat* "setup" "linktree"))) - (set! *cache-on* #t) - (root-path (if link-tree-path - link-tree-path - (current-directory))) ;; WARNING: SECURITY HOLE. FIX ASAP! - - ;; Setup the web server and a /ctrl interface - ;; - (vhost-map `(((* any) . ,(lambda (continue) - ;; open the db on the first call - (if (not db)(set! db (open-db))) - (let* (($ (request-vars source: 'both)) - (dat ($ 'dat)) - (res #f)) - (cond - ((equal? (uri-path (request-uri (current-request))) - '(/ "hey")) - (send-response body: "hey there!\n" - headers: '((content-type text/plain)))) - ;; This is the /ctrl path where data is handed to the server and - ;; responses - ((equal? (uri-path (request-uri (current-request))) - '(/ "ctrl")) - (let* ((packet (db:string->obj dat)) - (qtype (cdb:packet-get-qtype packet))) - (debug:print-info 12 "server=> received packet=" packet) - (if (not (member qtype '(sync ping))) - (begin - (mutex-lock! *heartbeat-mutex*) - (set! *last-db-access* (current-seconds)) - (mutex-unlock! *heartbeat-mutex*))) - ;; (mutex-lock! *db:process-queue-mutex*) ;; trying a mutex - ;; (set! res (open-run-close db:process-queue-item open-db packet)) - (set! res (db:process-queue-item db packet)) - ;; (mutex-unlock! *db:process-queue-mutex*) - (debug:print-info 11 "Return value from db:process-queue-item is " res) - (send-response body: (conc "ctrl data\n" - res - "") - headers: '((content-type text/plain))))) - (else (continue)))))))) - (server:try-start-server ipaddrstr start-port) - ;; lite3:finalize! db))) - )) - - - -;; (define (rpc-server:main-loop) -;; (print "INFO: Exectuing main server loop") -;; (access-log "megatest-http.log") -;; (server-bind-address #f) -;; (define-page (main-page-path) -;; (lambda () -;; (let ((dat ($ "dat"))) -;; ;; (with-request-variables (dat) -;; (debug:print-info 12 "Got dat=" dat) -;; (let* ((packet (db:string->obj dat)) -;; (qtype (cdb:packet-get-qtype packet))) -;; (debug:print-info 12 "server=> received packet=" packet) -;; (if (not (member qtype '(sync ping))) -;; (begin -;; (mutex-lock! *heartbeat-mutex*) -;; (set! *last-db-access* (current-seconds)) -;; (mutex-unlock! *heartbeat-mutex*))) -;; (let ((res (open-run-close db:process-queue-item open-db packet))) -;; (debug:print-info 11 "Return value from db:process-queue-item is " res) -;; res)))))) - -;;; (use spiffy uri-common intarweb) -;;; -;;; (root-path "/var/www") -;;; -;;; (vhost-map `(((* any) . ,(lambda (continue) -;;; (if (equal? (uri-path (request-uri (current-request))) -;;; '(/ "hey")) -;;; (send-response body: "hey there!\n" -;;; headers: '((content-type text/plain))) -;;; (continue)))))) -;;; -;;; (start-server port: 12345) - -;; This is recursively run by server:run until sucessful -;; -(define (rpc-server:try-start-server ipaddrstr portnum) - (handle-exceptions - exn - (begin - (print-error-message exn) - (if (< portnum 9000) - (begin - (print "WARNING: failed to start on portnum: " portnum ", trying next port") - (thread-sleep! 0.1) - (open-run-close tasks:remove-server-records tasks:open-db) - (server:try-start-server ipaddrstr (+ portnum 1))) - (print "ERROR: Tried and tried but could not start the server"))) - (set! *runremote* (list ipaddrstr portnum)) - (open-run-close tasks:remove-server-records tasks:open-db) - (open-run-close tasks:server-register - tasks:open-db - (current-process-id) - ipaddrstr portnum 0 'live) - (print "INFO: Trying to start server on " ipaddrstr ":" portnum) - ;; This starts the spiffy server - (start-server port: portnum) - (print "INFO: server has been stopped"))) - -(define (rpc-server:mk-signature) - (message-digest-string (md5-primitive) - (with-output-to-string - (lambda () - (write (list (current-directory) - (argv))))))) - -;;====================================================================== -;; S E R V E R U T I L I T I E S -;;====================================================================== - -;; When using zmq this would send the message back (two step process) -;; with spiffy or rpc this simply returns the return data to be returned -;; -(define (rpc-server:reply return-addr query-sig success/fail result) - (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result) - ;; (send-message pubsock target send-more: #t) - ;; (send-message pubsock - (db:obj->string (vector success/fail query-sig result))) - -;;====================================================================== -;; C L I E N T S -;;====================================================================== - -;; -;; -;; 1 Hello, world! Goodbye Dolly -;; Send msg to serverdat and receive result -(define (rpc-server:client-send-receive serverdat msg) - (let* ((url (server:make-server-url serverdat)) - (fullurl (conc url "/ctrl")) ;; (conc url "/?dat=" msg))) - (numretries 0)) - (handle-exceptions - exn - (if (< numretries 200) - (server:client-send-receive serverdat msg)) - (begin - (debug:print-info 11 "fullurl=" fullurl "\n") - ;; set up the http-client here - (max-retry-attempts 100) - (retry-request? (lambda (request) - (thread-sleep! (/ (if (> numretries 100) 100 numretries) 10)) - (set! numretries (+ numretries 1)) - #t)) - ;; send the data and get the response - ;; extract the needed info from the http data and - ;; process and return it. - (let* ((res (with-input-from-request fullurl - ;; #f - ;; msg - (list (cons 'dat msg)) - read-string))) - (debug:print-info 11 "got res=" res) - (let ((match (string-search (regexp "(.*)<.body>") res))) - (debug:print-info 11 "match=" match) - (let ((final (cadr match))) - (debug:print-info 11 "final=" final) - final))))))) - -(define (client:login serverdat serverdat) - (max-retry-attempts 100) - (cdb:login serverdat *toppath* (client:get-signature))) - -;; Not currently used! But, I think it *should* be used!!! -(define (client:logout serverdat) - (let ((ok (and (socket? serverdat) - (cdb:logout serverdat *toppath* (client:get-signature))))) - ;; (close-socket serverdat) - ok)) - -(define (rpc-server:client-connect iface port) - (let* ((login-res #f) - (serverdat (list iface port))) - (set! login-res (client:login serverdat serverdat)) - (if (and (not (null? login-res)) - (car login-res)) - (begin - (debug:print-info 2 "Logged in and connected to " iface ":" port) - (set! *runremote* serverdat) - serverdat) - (begin - (debug:print-info 2 "Failed to login or connect to " iface ":" port) - (set! *runremote* #f) - #f)))) - -;; Do all the connection work, start a server if not already running -(define (client:setup #!key (numtries 50)) - (if (not *toppath*) - (if (not (setup-for-run)) - (begin - (debug:print 0 "ERROR: failed to find megatest.config, exiting") - (exit)))) - (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) - (if hostinfo - (let ((host (list-ref hostinfo 0)) - (iface (list-ref hostinfo 1)) - (port (list-ref hostinfo 2)) - (pid (list-ref hostinfo 3))) - (debug:print-info 2 "Setting up to connect to " hostinfo) - (server:client-connect iface port)) ;; ) - (if (> numtries 0) - (let ((exe (car (argv))) - (pid #f)) - (debug:print-info 0 "No server available, attempting to start one...") - (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*) - (string-intersperse *verbosity* ",") - (conc *verbosity*))))) - ;; (set! pid (process-fork (lambda () - ;; (current-input-port (open-input-file "/dev/null")) - ;; (current-output-port (open-output-file "/dev/null")) - ;; (current-error-port (open-output-file "/dev/null")) - ;; (server:launch)))) - (let loop ((count 0)) - (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) - (if (not hostinfo) - (begin - (debug:print-info 0 "Waiting for server pid=" pid " to start") - (sleep 2) ;; give server time to start - (if (< count 5) - (loop (+ count 1))))))) - ;; we are starting a server, do not try again! That can lead to - ;; recursively starting many processes!!! - (client:setup numtries: 0)) - (debug:print-info 1 "Too many attempts, giving up"))))) - -;; run server:keep-running in a parallel thread to monitor that the db is being -;; used and to shutdown after sometime if it is not. -;; -(define (rpc-server:keep-running) - ;; if none running or if > 20 seconds since - ;; server last used then start shutdown - ;; This thread waits for the server to come alive - (let* ((server-info (let loop () - (let ((sdat #f)) - (mutex-lock! *heartbeat-mutex*) - (set! sdat *runremote*) - (mutex-unlock! *heartbeat-mutex*) - (if sdat sdat - (begin - (sleep 4) - (loop)))))) - (iface (car server-info)) - (port (cadr server-info)) - (last-access 0) - (tdb (tasks:open-db)) - (spid (tasks:server-get-server-id tdb #f iface port #f))) - (print "Keep-running got server pid " spid ", using iface " iface " and port " port) - (let loop ((count 0)) - (thread-sleep! 4) ;; no need to do this very often - ;; NB// sync currently does NOT return queue-length - (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1))) - ;; (print "Server running, count is " count) - (if (< count 1) ;; 3x3 = 9 secs aprox - (loop (+ count 1))) - - ;; NOTE: Get rid of this mechanism! It really is not needed... - (tasks:server-update-heartbeat tdb spid) - - ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access - (mutex-lock! *heartbeat-mutex*) - (set! last-access *last-db-access*) - (mutex-unlock! *heartbeat-mutex*) - (if (> (+ last-access - ;; (* 50 60 60) ;; 48 hrs - ;; 60 ;; one minute - ;; (* 60 60) ;; one hour - (* 45 60) ;; 45 minutes, until the db deletion bug is fixed. - ) - (current-seconds)) - (begin - (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) - (loop 0)) - (begin - (debug:print-info 0 "Starting to shutdown the server.") - ;; need to delete only *my* server entry (future use) - (set! *time-to-exit* #t) - (tasks:server-deregister-self tdb (get-host-name)) - (thread-sleep! 1) - (debug:print-info 0 "Max cached queries was " *max-cache-size*) - (debug:print-info 0 "Server shutdown complete. Exiting") - (exit))))))) - -;; all routes though here end in exit ... -(define (rpc-server:launch) - (if (not *toppath*) - (if (not (setup-for-run)) - (begin - (debug:print 0 "ERROR: cannot find megatest.config, exiting") - (exit)))) - (debug:print-info 2 "Starting the standalone server") - (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) - (debug:print 11 "server:launch hostinfo=" hostinfo) - (if hostinfo - (debug:print-info 2 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo)) - (if *toppath* - (let* ((th2 (make-thread (lambda () - (server:run - (if (args:get-arg "-server") - (args:get-arg "-server") - "-"))) "Server run")) - (th3 (make-thread (lambda ()(server:keep-running)) "Keep running")) - ) - (thread-start! th2) - (thread-start! th3) - (set! *didsomething* #t) - (thread-join! th2) - ) - (debug:print 0 "ERROR: Failed to setup for megatest"))) - (exit))) - Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -40,74 +40,26 @@ ;; Call this to start the actual server ;; (define *db:process-queue-mutex* (make-mutex)) -(define (server:run hostn) - (debug:print 2 "Attempting to start the server ...") +;; all routes though here end in exit ... +(define (server:launch transport) (if (not *toppath*) (if (not (setup-for-run)) (begin - (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") + (debug:print 0 "ERROR: cannot find megatest.config, exiting") (exit)))) - (let* (;; (iface (if (string=? "-" hostn) - ;; #f ;; (get-host-name) - ;; hostn)) - (db #f) ;; (open-db)) ;; we don't want the server to be opening and closing the db unnecesarily - (hostname (get-host-name)) - (ipaddrstr (let ((ipstr (if (string=? "-" hostn) - (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") - #f))) - (if ipstr ipstr hostn))) ;; hostname))) - (start-port (if (args:get-arg "-port") - (string->number (args:get-arg "-port")) - (+ 5000 (random 1001)))) - (link-tree-path (config-lookup *configdat* "setup" "linktree"))) - (set! *cache-on* #t) - (root-path (if link-tree-path - link-tree-path - (current-directory))) ;; WARNING: SECURITY HOLE. FIX ASAP! - - ;; Setup the web server and a /ctrl interface - ;; - (vhost-map `(((* any) . ,(lambda (continue) - ;; open the db on the first call - (if (not db)(set! db (open-db))) - (let* (($ (request-vars source: 'both)) - (dat ($ 'dat)) - (res #f)) - (cond - ((equal? (uri-path (request-uri (current-request))) - '(/ "hey")) - (send-response body: "hey there!\n" - headers: '((content-type text/plain)))) - ;; This is the /ctrl path where data is handed to the server and - ;; responses - ((equal? (uri-path (request-uri (current-request))) - '(/ "ctrl")) - (let* ((packet (db:string->obj dat)) - (qtype (cdb:packet-get-qtype packet))) - (debug:print-info 12 "server=> received packet=" packet) - (if (not (member qtype '(sync ping))) - (begin - (mutex-lock! *heartbeat-mutex*) - (set! *last-db-access* (current-seconds)) - (mutex-unlock! *heartbeat-mutex*))) - ;; (mutex-lock! *db:process-queue-mutex*) ;; trying a mutex - ;; (set! res (open-run-close db:process-queue-item open-db packet)) - (set! res (db:process-queue-item db packet)) - ;; (mutex-unlock! *db:process-queue-mutex*) - (debug:print-info 11 "Return value from db:process-queue-item is " res) - (send-response body: (conc "ctrl data\n" - res - "") - headers: '((content-type text/plain))))) - (else (continue)))))))) - (server:try-start-server ipaddrstr start-port) - ;; lite3:finalize! db))) - )) - + (debug:print-info 2 "Starting server using " transport " transport") + (set! *transport-type* transport) + (case transport + ((fs) (exit)) ;; there is no "fs" transport + ((http) (http-transport:launch)) + ((zmq) (zmq-transport:launch)) + (else + (debug:print "WARNING: unrecognised transport " transport) + (exit)))) (define (server:mk-signature) (message-digest-string (md5-primitive) (with-output-to-string (lambda () @@ -134,24 +86,5 @@ (send-message pub-socket (db:obj->string (vector success/fail query-sig result))))) (else (debug:print 0 "ERROR: unrecognised transport type: " *transport-type*) result))) - - -;; all routes though here end in exit ... -(define (server:launch transport) - (if (not *toppath*) - (if (not (setup-for-run)) - (begin - (debug:print 0 "ERROR: cannot find megatest.config, exiting") - (exit)))) - (debug:print-info 2 "Starting server using " transport " transport") - (set! *transport-type* transport) - (case transport - ((fs) (exit)) ;; there is no "fs" transport - ((http) (http-transport:launch)) - ((zmq) (zmq-transport:launch)) - (else - (debug:print "WARNING: unrecognised transport " transport) - (exit)))) - Index: tasks.scm ================================================================== --- tasks.scm +++ tasks.scm @@ -192,15 +192,17 @@ (let ((res '()) (best #f)) (sqlite3:for-each-row (lambda (id interface port pubport transport pid hostname) (set! res (cons (vector id interface port pubport transport pid hostname) res)) - (debug:print-info 2 "Found existing server " hostname ":" port " registered in db")) + ;;(debug:print-info 2 "Found existing server " hostname ":" port " registered in db")) + ) mdb - ;; strftime('%s','now')-heartbeat < 10 AND + "SELECT id,interface,port,pubport,transport,pid,hostname FROM servers - WHERE mt_version=? ORDER BY start_time DESC LIMIT 1;" megatest-version) + WHERE strftime('%s','now')-heartbeat < 10 + AND mt_version=? ORDER BY start_time DESC LIMIT 1;" megatest-version) ;; for now we are keeping only one server registered in the db, return #f or first server found (if (null? res) #f (car res)))) ;; BUG: This logic is probably needed unless methodology changes completely... ;; Index: tests/fullrun/megatest.config ================================================================== --- tests/fullrun/megatest.config +++ tests/fullrun/megatest.config @@ -42,10 +42,20 @@ EMPTY_VAR # XTERM [system xterm] # RUNDEAD [system exit 56] +[server] + +# If the server can't be started on this port it will try the next port until +# it succeeds +port 8090 + +# This server will keep running this number of hours after last access. +# Three minutes is 0.05 hours +timeout 0.05 + ## disks are: ## name host:/path/to/area ## -or- ## name /path/to/area [disks]