Index: Makefile ================================================================== --- Makefile +++ Makefile @@ -7,11 +7,11 @@ db.scm keys.scm margs.scm megatest-version.scm \ process.scm runs.scm tasks.scm tests.scm genexample.scm \ http-transport.scm filedb.scm \ client.scm gutils.scm synchash.scm daemon.scm mt.scm dcommon.scm \ tree.scm ezsteps.scm lock-queue.scm sdb.scm \ - rmt.scm api.scm tdb.scm + rmt.scm api.scm tdb.scm rpc-transport.scm GUISRCF = dashboard-tests.scm dashboard-guimonitor.scm OFILES = $(SRCFILES:%.scm=%.o) GOFILES = $(GUISRCF:%.scm=%.o) @@ -62,11 +62,11 @@ tests.o runs.o dashboard.o dashboard-tests.o dashboard-main.o : run_records.scm db.o ezsteps.o keys.o launch.o megatest.o monitor.o runs-for-ref.o runs.o tests.o : key_records.scm tests.o tasks.o dashboard-tasks.o : task_records.scm runs.o : test_records.scm megatest.o : megatest-fossil-hash.scm -client.scm common.scm configf.scm dashboard-guimonitor.scm dashboard-tests.scm dashboard.scm db.scm dcommon.scm ezsteps.scm fs-transport.scm http-transport.scm index-tree.scm items.scm keys.scm launch.scm megatest.scm monitor.scm mt.scm newdashboard.scm runconfig.scm runs.scm server.scm tdb.scm tests.scm tree.scm zmq-transport.scm : common_records.scm +client.scm common.scm configf.scm dashboard-guimonitor.scm dashboard-tests.scm dashboard.scm db.scm dcommon.scm ezsteps.scm fs-transport.scm http-transport.scm index-tree.scm items.scm keys.scm launch.scm megatest.scm monitor.scm mt.scm newdashboard.scm runconfig.scm runs.scm server.scm tdb.scm tests.scm tree.scm zmq-transport.scm : common_records.scm rpc-transport.scm # Temporary while transitioning to new routine # runs.o : run-tests-queue-classic.scm run-tests-queue-new.scm megatest-fossil-hash.scm : $(SRCFILES) megatest.scm *_records.scm Index: common.scm ================================================================== --- common.scm +++ common.scm @@ -50,11 +50,11 @@ ;; DATABASE (define *open-dbs* (vector #f (make-hash-table))) ;; megatestdb run-id-dbs ;; SERVER (define *my-client-signature* #f) -(define *transport-type* 'http) +(define *transport-type* #f) (define *megatest-db* #f) (define *rpc:listener* #f) ;; if set up for server communication this will hold the tcp port (define *runremote* (make-hash-table)) ;; if set up for server communication this will hold (define *last-db-access* (current-seconds)) ;; update when db is accessed via server (define *max-cache-size* 0) ADDED fs-transport.scm Index: fs-transport.scm ================================================================== --- /dev/null +++ fs-transport.scm @@ -0,0 +1,44 @@ + +;; Copyright 2006-2012, Matthew Welland. +;; +;; This program is made available under the GNU GPL version 2.0 or +;; greater. See the accompanying file COPYING for details. +;; +;; This program is distributed WITHOUT ANY WARRANTY; without even the +;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +;; PURPOSE. + +(require-extension (srfi 18) extras tcp s11n) + +(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) +(import (prefix sqlite3 sqlite3:)) + +(use spiffy uri-common intarweb http-client spiffy-request-vars) + +(tcp-buffer-size 2048) + +(declare (unit fs-transport)) + +(declare (uses common)) +(declare (uses db)) +(declare (uses tests)) +(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. + +(include "common_records.scm") +(include "db_records.scm") + + +;;====================================================================== +;; F S T R A N S P O R T S E R V E R +;;====================================================================== + +;; There is no "server" per se but a convience routine to make it non +;; necessary to be reopening the db over and over again. +;; + +(define (fs:process-queue-item packet) + (if (not *megatest-db*) ;; we will require that (setup-for-run) has already been called + (set! *megatest-db* (open-db))) + (debug:print-info 11 "fs:process-queue-item called with packet=" packet) + (db:process-queue-item *megatest-db* packet)) + Index: http-transport.scm ================================================================== --- http-transport.scm +++ http-transport.scm @@ -91,31 +91,10 @@ (send-response body: (api:process-request db $) ;; the $ is the request vars proc headers: '((content-type text/plain))) (mutex-lock! *heartbeat-mutex*) (set! *last-db-access* (current-seconds)) (mutex-unlock! *heartbeat-mutex*)) - ;; This is the /ctrl path where data is handed to the server and - ;; responses - ((equal? (uri-path (request-uri (current-request))) - '(/ "ctrl")) - (let* ((packet (db:string->obj dat)) - (qtype (cdb:packet-get-qtype packet))) - (debug:print-info 12 "server=> received packet=" packet) - (if (not (member qtype '(sync ping))) - (begin - (mutex-lock! *heartbeat-mutex*) - (set! *last-db-access* (current-seconds)) - (mutex-unlock! *heartbeat-mutex*))) - ;; (mutex-lock! *db:process-queue-mutex*) ;; trying a mutex - ;; (set! res (open-run-close db:process-queue-item open-db packet)) - (set! res (db:process-queue-item db packet)) - ;; (mutex-unlock! *db:process-queue-mutex*) - (debug:print-info 11 "Return value from db:process-queue-item is " res) - (send-response body: (conc "ctrl data\n" - res - "") - headers: '((content-type text/plain))))) ((equal? (uri-path (request-uri (current-request))) '(/ "")) (send-response body: (http-transport:main-page))) ((equal? (uri-path (request-uri (current-request))) '(/ "runs")) @@ -329,10 +308,12 @@ ;; (* 60 1) ;; default to one minute (* 60 60 25) ;; default to 25 hours )))) (let loop ((count 0) (server-state 'available)) + + ;; Use this opportunity to sync the inmemdb to db (let ((start-time (current-milliseconds)) (sync-time #f) (rem-time #f)) @@ -458,10 +439,22 @@ (thread-start! th2) (thread-start! th3) (set! *didsomething* #t) (thread-join! th2) (exit))))) + +(define (http:ping run-id host-port) + (let* ((server-dat (http-transport:client-connect (car host-port)(cadr host-port))) + (login-res (rmt:login-no-auto-client-setup server-dat run-id))) + (if (and (list? login-res) + (car login-res)) + (begin + (print "LOGIN_OK") + (exit 0)) + (begin + (print "LOGIN_FAILED") + (exit 1))))) (define (http-transport:server-signal-handler signum) (handle-exceptions exn (debug:print " ... exiting ...") Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -25,12 +25,11 @@ (declare (uses client)) (declare (uses tests)) (declare (uses genexample)) (declare (uses daemon)) (declare (uses db)) -;; (declare (uses sdb)) -;; (declare (uses filedb)) + (declare (uses tdb)) (declare (uses mt)) (declare (uses api)) (declare (uses tasks)) ;; only used for debugging. @@ -125,10 +124,11 @@ -env2file fname : write the environment to fname.csh and fname.sh -setvars VAR1=val1,VAR2=val2 : Add environment variables to a run NB// these are overwritten by values set in config files. -server -|hostname : start the server (reduces contention on megatest.db), use - to automatically figure out hostname + -transport http|zmq : use http or zmq for transport (default is http) -daemonize : fork into background and disconnect from stdin/out -list-servers : list the servers -stop-server id : stop server specified by id (see output of -list-servers), use 0 to kill all -repl : start a repl (useful for extending megatest) @@ -194,10 +194,12 @@ ":units" ;; misc "-start-dir" "-server" "-stop-server" + "-transport" + "-kill-server" "-port" "-extract-ods" "-pathmod" "-env2file" "-setvars" @@ -349,11 +351,13 @@ (let* ((run-id (string->number (args:get-arg "-run-id"))) (host-port (let ((slst (string-split (args:get-arg "-ping") ":"))) (if (eq? (length slst) 2) (list (car slst)(string->number (cadr slst))) #f))) - (toppath (setup-for-run))) + (toppath (setup-for-run)) + (transport (server:get-transport))) + (set! *did-something* #t) (if (not run-id) (begin (debug:print 0 "ERROR: must specify run-id when doing ping, -run-id n") (print "ERROR: No run-id") (exit 1)) @@ -360,20 +364,14 @@ (if (not host-port) (begin (debug:print 0 "ERROR: argument to -ping is host:port, got " (args:get-arg "-ping")) (print "ERROR: bad host:port") (exit 1)) - (let* ((server-dat (http-transport:client-connect (car host-port)(cadr host-port))) - (login-res (rmt:login-no-auto-client-setup server-dat run-id))) - (if (and (list? login-res) - (car login-res)) - (begin - (print "LOGIN_OK") - (exit 0)) - (begin - (print "LOGIN_FAILED") - (exit 1)))))))) + (case transport + ((http)(http:ping run-id host-port)) + ((rpc) (rpc:ping run-id host-port)) + (else (debug:print 0 "ERROR: No transport set")(exit))))))) ;;====================================================================== ;; Start the server - can be done in conjunction with -runall or -runtests (one day...) ;; we start the server if not running else start the client thread ;;====================================================================== @@ -798,10 +796,11 @@ ;; if we are in a test use the MT_CMDINFO data (if (getenv "MT_CMDINFO") (let* ((startingdir (current-directory)) (cmdinfo (read (open-input-string (base64:base64-decode (getenv "MT_CMDINFO"))))) ;; (runremote (assoc/default 'runremote cmdinfo)) + (transport (assoc/default 'transport cmdinfo)) (testpath (assoc/default 'testpath cmdinfo)) (test-name (assoc/default 'test-name cmdinfo)) (runscript (assoc/default 'runscript cmdinfo)) (db-host (assoc/default 'db-host cmdinfo)) (run-id (assoc/default 'run-id cmdinfo)) @@ -846,10 +845,11 @@ ;; if we are in a test use the MT_CMDINFO data (if (getenv "MT_CMDINFO") (let* ((startingdir (current-directory)) (cmdinfo (read (open-input-string (base64:base64-decode (getenv "MT_CMDINFO"))))) ;; (runremote (assoc/default 'runremote cmdinfo)) + (transport (assoc/default 'transport cmdinfo)) (testpath (assoc/default 'testpath cmdinfo)) (test-name (assoc/default 'test-name cmdinfo)) (runscript (assoc/default 'runscript cmdinfo)) (db-host (assoc/default 'db-host cmdinfo)) (run-id (assoc/default 'run-id cmdinfo)) @@ -925,10 +925,11 @@ (begin (debug:print 0 "ERROR: MT_CMDINFO env var not set, -step must be called *inside* a megatest invoked environment!") (exit 5)) (let* ((cmdinfo (read (open-input-string (base64:base64-decode (getenv "MT_CMDINFO"))))) ;; (runremote (assoc/default 'runremote cmdinfo)) + (transport (assoc/default 'transport cmdinfo)) (testpath (assoc/default 'testpath cmdinfo)) (test-name (assoc/default 'test-name cmdinfo)) (runscript (assoc/default 'runscript cmdinfo)) (db-host (assoc/default 'db-host cmdinfo)) (run-id (assoc/default 'run-id cmdinfo)) @@ -972,10 +973,11 @@ (debug:print 0 "ERROR: MT_CMDINFO env var not set, commands -test-status, -runstep and -setlog must be called *inside* a megatest environment!") (exit 5)) (let* ((startingdir (current-directory)) (cmdinfo (read (open-input-string (base64:base64-decode (getenv "MT_CMDINFO"))))) ;; (runremote (assoc/default 'runremote cmdinfo)) + (transport (assoc/default 'transport cmdinfo)) (testpath (assoc/default 'testpath cmdinfo)) (test-name (assoc/default 'test-name cmdinfo)) (runscript (assoc/default 'runscript cmdinfo)) (db-host (assoc/default 'db-host cmdinfo)) (run-id (assoc/default 'run-id cmdinfo)) @@ -1252,10 +1254,12 @@ (if *runremote* (close-all-connections!)) (if (not *didsomething*) (debug:print 0 help)) + +;; (if *runremote* (rpc:close-all-connections!)) (if (not (eq? *globalexitstatus* 0)) (if (or (args:get-arg "-runtests")(args:get-arg "-runall")) (begin (debug:print 0 "NOTE: Subprocesses with non-zero exit code detected: " *globalexitstatus*) ADDED rpc-transport.scm Index: rpc-transport.scm ================================================================== --- /dev/null +++ rpc-transport.scm @@ -0,0 +1,249 @@ + +;; Copyright 2006-2012, Matthew Welland. +;; +;; This program is made available under the GNU GPL version 2.0 or +;; greater. See the accompanying file COPYING for details. +;; +;; This program is distributed WITHOUT ANY WARRANTY; without even the +;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +;; PURPOSE. + +(require-extension (srfi 18) extras tcp s11n rpc) +(import (prefix rpc rpc:)) + +(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) +(import (prefix sqlite3 sqlite3:)) + +(declare (unit rpc-transport)) + +(declare (uses common)) +(declare (uses db)) +(declare (uses tests)) +(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. + +(include "common_records.scm") +(include "db_records.scm") + +;; procstr is the name of the procedure to be called as a string +(define (rpc-transport:autoremote procstr params) + (handle-exceptions + exn + (begin + (debug:print 1 "Remote failed for " proc " " params) + (apply (eval (string->symbol procstr)) params)) + ;; (if *runremote* + ;; (apply (eval (string->symbol (conc "remote:" procstr))) params) + (apply (eval (string->symbol procstr)) params))) + +;; all routes though here end in exit ... +;; +;; start_server? +;; +(define (rpc-transport:launch run-id) + (set! *run-id* run-id) + (if (args:get-arg "-daemonize") + (daemon:ize)) + (if (server:check-if-running run-id) + (begin + (debug:print 0 "INFO: Server for run-id " run-id " already running") + (exit 0))) + (let loop ((server-id (open-run-close tasks:server-lock-slot tasks:open-db run-id)) + (remtries 4)) + (if (not server-id) + (if (> remtries 0) + (begin + (thread-sleep! 2) + (loop (open-run-close tasks:server-lock-slot tasks:open-db run-id) + (- remtries 1))) + (begin + ;; since we didn't get the server lock we are going to clean up and bail out + (debug:print-info 2 "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue") + (open-run-close tasks:server-delete-records-for-this-pid tasks:open-db " rpc-transport:launch") + )) + (let* ((th2 (make-thread (lambda () + (rpc-transport:run + (if (args:get-arg "-server") + (args:get-arg "-server") + "-") + run-id + server-id)) "Server run")) + (th3 (make-thread (lambda () + (rpc-transport:keep-running run-id server-id)) + "Keep running"))) + ;; Database connection + (set! *inmemdb* (db:setup run-id)) + (thread-start! th2) + (thread-start! th3) + (set! *didsomething* #t) + (thread-join! th3) + (exit))))) + +(define (rpc-transport:run hostn run-id server-id) + (debug:print 2 "Attempting to start the rpc server ...") + (let* ((db #f) + (hostname (get-host-name)) + (ipaddrstr (let ((ipstr (if (string=? "-" hostn) + ;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") + (server:get-best-guess-address hostname) + #f))) + (if ipstr ipstr hostn))) ;; hostname))) + (start-port (open-run-close tasks:server-get-next-port tasks:open-db)) + (link-tree-path (configf:lookup *configdat* "setup" "linktree")) + (rpc:listener (rpc-transport:find-free-port-and-open (rpc:default-server-port))) + (th1 (make-thread + (cute (rpc:make-server rpc:listener) "rpc:server") + 'rpc:server)) + (hostname (if (string=? "-" hostn) + (get-host-name) + hostn)) + (ipaddrstr (if (string=? "-" hostn) + (server:get-best-guess-address hostname) ;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") + #f)) + (portnum (rpc:default-server-port)) + (host:port (conc (if ipaddrstr ipaddrstr hostname) ":" portnum)) + (tdb (tasks:open-db))) + (set! db *inmemdb*) + (open-run-close tasks:server-set-interface-port + tasks:open-db + server-id + ipaddrstr portnum) + (debug:print 0 "Server started on " host:port) + + ;; can use this to run most anything at the remote + (rpc:publish-procedure! + 'remote:run + (lambda (procstr . params) + (rpc-transport:autoremote procstr params))) + + ;; (rpc:publish-procedure! + ;; 'server:login + ;; (lambda (toppath) + ;; (set! *last-db-access* (current-seconds)) + ;; (if (equal? *toppath* toppath) + ;; (begin + ;; (debug:print-info 2 "login successful") + ;; #t) + ;; #f))) + ;; + ;; ;;====================================================================== + ;; ;; db specials here + ;; ;;====================================================================== + ;; ;; remote call to open-run-close + ;; (rpc:publish-procedure! + ;; 'rdb:open-run-close + ;; (lambda (procname . remargs) + ;; (debug:print-info 12 "Remote call of rdb:open-run-close " procname " " remargs) + ;; (set! *last-db-access* (current-seconds)) + ;; (apply open-run-close (eval procname) remargs))) + ;; + ;; (rpc:publish-procedure! + ;; 'cdb:test-set-status-state + ;; (lambda (test-id status state msg) + ;; (debug:print-info 12 "Remote call of cdb:test-set-status-state test-id=" test-id ", status=" status ", state=" state ", msg=" msg) + ;; (cdb:test-set-status-state test-id status state msg))) + ;; + ;; (rpc:publish-procedure! + ;; 'cdb:test-rollup-test_data-pass-fail + ;; (lambda (test-id) + ;; (debug:print-info 12 "Remote call of cdb:test-rollup-test_data-pass-fail " test-id) + ;; (cdb:test-rollup-test_data-pass-fail test-id))) + ;; + ;; (rpc:publish-procedure! + ;; 'cdb:pass-fail-counts + ;; (lambda (test-id fail-count pass-count) + ;; (debug:print-info 12 "Remote call of cdb:pass-fail-counts " test-id " passes: " pass-count " fails: " fail-count) + ;; (cdb:pass-fail-counts test-id fail-count pass-count))) + ;; + ;; (rpc:publish-procedure! + ;; 'cdb:tests-register-test + ;; (lambda (db run-id test-name item-path) + ;; (debug:print-info 12 "Remote call of cdb:tests-register-test " run-id " testname: " test-name " item-path: " item-path) + ;; (cdb:tests-register-test db run-id test-name item-path))) + ;; + ;; (rpc:publish-procedure! + ;; 'cdb:flush-queue + ;; (lambda () + ;; (debug:print-info 12 "Remote call of cdb:flush-queue") + ;; (cdb:flush-queue))) + ;; + + ;;====================================================================== + ;; ;; end of publish-procedure section + ;;====================================================================== + ;; + (on-exit (lambda () + (open-run-close tasks:server-set-state! tasks:open-db server-id "stopped"))) + + (thread-start! th1) + + (set! *rpc:listener* rpc:listener) + (tasks:server-set-state! tdb server-id "running") + ; (sqlite3:finalize! tdb) + th1 + )) ;; rpc:server))) + +(define (rpc-transport:keep-running run-id server-id) + ;; if none running or if > 20 seconds since + ;; server last used then start shutdown + (let loop ((count 0)) + (thread-sleep! 5) ;; no need to do this very often + (let ((numrunning -1)) ;; (db:get-count-tests-running db))) + (if (or (> numrunning 0) + (> (+ *last-db-access* 60)(current-seconds))) + (begin + (debug:print-info 0 "Server continuing, tests running: " numrunning ", seconds since last db access: " (- (current-seconds) *last-db-access*)) + (loop (+ 1 count))) + (begin + (debug:print-info 0 "Starting to shutdown the server side") + (open-run-close tasks:server-force-clean-run-record tasks:open-db run-id ipaddrstr portnum " rpc-transport:try-start-server stop") + (thread-sleep! 10) + (debug:print-info 0 "Max cached queries was " *max-cache-size*) + (debug:print-info 0 "Server shutdown complete. Exiting") + ))))) + +(define (rpc-transport:find-free-port-and-open port) + (handle-exceptions + exn + (begin + (print "Failed to bind to port " (rpc:default-server-port) ", trying next port") + (rpc-transport:find-free-port-and-open (+ port 1))) + (rpc:default-server-port port) + (tcp-read-timeout 240000) + (tcp-listen (rpc:default-server-port) 10000))) + +(define (rpc:ping run-id host-port) + #f) + +(define (rpc-transport:client-setup) + (if *runremote* + (begin + (debug:print 0 "ERROR: Attempt to connect to server but already connected") + #f) + (let* ((hostinfo (open-run-close db:get-var #f "SERVER")) + (hostdat (if hostinfo (string-split hostinfo ":") #f)) + (host (if hostinfo (car hostdat) #f)) + (port (if (and hostinfo (> (length hostdat) 1))(cadr hostdat) #f))) + (if (and port + (string->number port)) + (let ((portn (string->number port))) + (debug:print-info 2 "Setting up to connect to host " host ":" port) + (handle-exceptions + exn + (begin + (debug:print 0 "ERROR: Failed to open a connection to the server at host: " host " port: " port) + (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) + ;; (open-run-close + ;; (lambda (db . param) + ;; (sqlite3:execute db "DELETE FROM metadat WHERE var='SERVER'")) + ;; #f) + (set! *runremote* #f)) + (if (and (not (args:get-arg "-server")) ;; no point in the server using the server using the server + ((rpc:procedure 'server:login host portn) *toppath*)) + (begin + (debug:print-info 2 "Logged in and connected to " host ":" port) + (set! *runremote* (vector host portn))) + (begin + (debug:print-info 2 "Failed to login or connect to " host ":" port) + (set! *runremote* #f))))) + (debug:print-info 2 "no server available"))))) + Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -20,11 +20,11 @@ (declare (uses common)) (declare (uses db)) (declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. (declare (uses synchash)) (declare (uses http-transport)) -;; (declare (uses zmq-transport)) +(declare (uses rpc-transport)) (declare (uses daemon)) (include "common_records.scm") (include "db_records.scm") @@ -46,37 +46,58 @@ ;; all routes though here end in exit ... ;; ;; start_server ;; (define (server:launch run-id) - (http-transport:launch run-id)) - -;;====================================================================== -;; Q U E U E M A N A G E M E N T -;;====================================================================== - -;; We don't want to flush the queue if it was just flushed -(define *server:last-write-flush* (current-milliseconds)) + (let ((transport (server:get-transport))) + (case transport + ((http) (http-transport:launch run-id)) + ((rpc) (rpc-transport:launch run-id)) + (else (debug:print 0 "ERROR: No known transport set, transport=" transport ", using rpc") + (rpc-transport:launch run-id))))) ;;====================================================================== ;; S E R V E R U T I L I T I E S ;;====================================================================== +;; Get the transport +(define (server:get-transport) + (if *transport-type* + *transport-type* + (let ((ttype (string->symbol + (or (args:get-arg "-transport") + (configf:lookup *configdat* "server" "transport") + "rpc")))) + (set! *transport-type* ttype) + ttype))) + ;; Generate a unique signature for this server (define (server:mk-signature) (message-digest-string (md5-primitive) (with-output-to-string (lambda () (write (list (current-directory) (argv))))))) - ;; When using zmq this would send the message back (two step process) ;; with spiffy or rpc this simply returns the return data to be returned ;; (define (server:reply return-addr query-sig success/fail result) - (db:obj->string (vector success/fail query-sig result))) + (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result) + ;; (send-message pubsock target send-more: #t) + ;; (send-message pubsock + (case (server:get-transport) + ((rpc) (db:obj->string (vector success/fail query-sig result))) + ((http) (db:obj->string (vector success/fail query-sig result))) + ((zmq) + (let ((pub-socket (vector-ref *runremote* 1))) + (send-message pub-socket return-addr send-more: #t) + (send-message pub-socket (db:obj->string (vector success/fail query-sig result))))) + ((fs) result) + (else + (debug:print 0 "ERROR: unrecognised transport type: " *transport-type*) + result))) ;; Given a run id start a server process ### NOTE ### > file 2>&1 ;; if the run-id is zero and the target-host is set ;; try running on that host ;; @@ -90,10 +111,16 @@ (begin (set-environment-variable "TARGETHOST" target-host) (system (conc "nbfake " cmdln))) (system cmdln)) (pop-directory))) + +(define (server:get-client-signature) + (if *my-client-signature* *my-client-signature* + (let ((sig (server:mk-signature))) + (set! *my-client-signature* sig) + *my-client-signature*))) ;; kind start up of servers, wait 40 seconds before allowing another server for a given ;; run-id to be launched (define (server:kind-run run-id) (let ((last-run-time (hash-table-ref/default *server-kind-run* run-id #f))) Index: tasks.scm ================================================================== --- tasks.scm +++ tasks.scm @@ -105,19 +105,19 @@ (define (tasks:server-set-available mdb run-id) (sqlite3:execute mdb "INSERT INTO servers (pid,hostname,port,pubport,start_time, priority,state,mt_version,heartbeat, interface,transport,run_id) VALUES(?, ?, ?, ?, strftime('%s','now'), ?, ?, ?,-1,?, ?, ?);" - (current-process-id) ;; pid - (get-host-name) ;; hostname - -1 ;; port - -1 ;; pubport - (random 1000) ;; priority (used a tiebreaker on get-available) - "available" ;; state - (common:version-signature) ;; mt_version - -1 ;; interface - "http" ;; transport + (current-process-id) ;; pid + (get-host-name) ;; hostname + -1 ;; port + -1 ;; pubport + (random 1000) ;; priority (used a tiebreaker on get-available) + "available" ;; state + (common:version-signature) ;; mt_version + -1 ;; interface + (conc (server:get-transport)) ;; transport run-id )) (define (tasks:num-in-available-state mdb run-id) (let ((res 0)) Index: tests/fullrun/megatest.config ================================================================== --- tests/fullrun/megatest.config +++ tests/fullrun/megatest.config @@ -111,11 +111,11 @@ [server] # If the server can't be started on this port it will try the next port until # it succeeds -port 8080 +port 9080 # This server will keep running this number of hours after last access. # Three minutes is 0.05 hours # timeout 0.025 timeout 0.25 ADDED zmq-transport.scm Index: zmq-transport.scm ================================================================== --- /dev/null +++ zmq-transport.scm @@ -0,0 +1,493 @@ + +;; Copyright 2006-2012, Matthew Welland. +;; +;; This program is made available under the GNU GPL version 2.0 or +;; greater. See the accompanying file COPYING for details. +;; +;; This program is distributed WITHOUT ANY WARRANTY; without even the +;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +;; PURPOSE. + +(require-extension (srfi 18) extras tcp s11n) + +(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) +(import (prefix sqlite3 sqlite3:)) + +(use zmq) + +(declare (unit zmq-transport)) + +(declare (uses common)) +(declare (uses db)) +(declare (uses tests)) +(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. +(declare (uses server)) + +(include "common_records.scm") +(include "db_records.scm") + +;; Transition to pub --> sub with pull <-- push +;; +;; 1. client sends request to server via push to the pull port +;; 2. server puts request in queue or processes immediately as appropriate +;; 3. server puts responses from completed requests into pub port +;; +;; TODO +;; +;; Done Tested +;; [x] [ ] 1. Add columns pullport pubport to servers table +;; [x] [ ] 2. Add rm of monitor.db if older than 11/12/2012 +;; [x] [ ] 3. Add create of pullport and pubport with finding of available ports +;; [x] [ ] 4. Add client compose of request +;; [x] [ ] - name of client: testname/itempath-test_id-hostname +;; [x] [ ] - name of request: callname, params +;; [x] [ ] - request key: f(clientname, callname, params) +;; [x] [ ] 5. Add processing of subscription hits +;; [x] [ ] - done when get key +;; [x] [ ] - return results +;; [x] [ ] 6. Add timeout processing +;; [x] [ ] - after 60 seconds +;; [ ] [ ] i. check server alive, connect to new if necessary +;; [ ] [ ] ii. resend request +;; [ ] [ ] 7. Turn self ping back on + +(define (zmq-transport:make-server-url hostport) + (if (not hostport) + #f + (conc "tcp://" (car hostport) ":" (cadr hostport)))) + +(define *server-loop-heart-beat* (current-seconds)) +(define *heartbeat-mutex* (make-mutex)) + +;;====================================================================== +;; S E R V E R +;;====================================================================== + +(define-inline (zmqsock:get-pub dat)(vector-ref dat 0)) +(define-inline (zmqsock:get-pull dat)(vector-ref dat 1)) +(define-inline (zmqsock:set-pub! dat s)(vector-set! dat s 0)) +(define-inline (zmqsock:set-pull! dat s)(vector-set! dat s 0)) + +(define (zmq-transport:run hostn) + (debug:print 2 "Attempting to start the server ...") + (if (not *toppath*) + (if (not (setup-for-run)) + (begin + (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") + (exit)))) + (let* ((db (open-db)) ;; here we *do not* want to be opening and closing the db + (zmq-sdat1 #f) + (zmq-sdat2 #f) + (pull-socket #f) + (pub-socket #f) + (p1 #f) + (p2 #f) + (zmq-sockets-dat #f) + (iface (if (string=? "-" hostn) + "*" ;; (get-host-name) + hostn)) + (hostname (get-host-name)) + (ipaddrstr (let ((ipstr (if (string=? "-" hostn) + (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") + #f))) + (if ipstr ipstr hostname))) + (last-run 0)) + (set! zmq-sockets-dat (zmq-transport:setup-ports ipaddrstr (if (args:get-arg "-port") + (string->number (args:get-arg "-port")) + (+ 5000 (random 1001))))) + + (set! zmq-sdat1 (car zmq-sockets-dat)) + (set! pull-socket (cadr zmq-sdat1)) ;; (iface s port) + (set! p1 (caddr zmq-sdat1)) + + (set! zmq-sdat2 (cadr zmq-sockets-dat)) + (set! pub-socket (cadr zmq-sdat2)) + (set! p2 (caddr zmq-sdat2)) + + (set! *cache-on* #t) + + (set! *runremote* (vector pull-socket pub-socket)) ;; overloading the use of *runremote* BUG!? + + ;; what to do when we quit + ;; +;; (on-exit (lambda () +;; (if (and *toppath* *server-info*) +;; (open-run-close tasks:server-deregister-self tasks:open-db (car *server-info*)) +;; (let loop () +;; (let ((queue-len 0)) +;; (thread-sleep! (random 5)) +;; (mutex-lock! *incoming-mutex*) +;; (set! queue-len (length *incoming-data*)) +;; (mutex-unlock! *incoming-mutex*) +;; (if (> queue-len 0) +;; (begin +;; (debug:print-info 0 "Queue not flushed, waiting ...") +;; (loop)))))))) + + ;; The heavy lifting + ;; + ;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime + ;; + (debug:print-info 11 "Server setup complete, start listening for messages") + (let loop ((queue-lst '())) + (let* ((rawmsg (receive-message* pull-socket)) + (packet (db:string->obj rawmsg)) + (qtype (cdb:packet-get-qtype packet))) + (debug:print-info 12 "server=> received packet=" packet) + (if (not (member qtype '(sync ping))) + (begin + (mutex-lock! *heartbeat-mutex*) + (set! *last-db-access* (current-seconds)) + (mutex-unlock! *heartbeat-mutex*))) + (if #t ;; (cdb:packet-get-immediate packet) ;; process immediately or put in queue + (begin + (db:process-queue-item db packet) + ;; (open-run-close db:process-queue #f pub-socket (cons packet queue-lst)) + + (loop '())) + (loop (cons packet queue-lst))))))) + +;; run zmq-transport:keep-running in a parallel thread to monitor that the db is being +;; used and to shutdown after sometime if it is not. +;; +(define (zmq-transport:keep-running) + ;; if none running or if > 20 seconds since + ;; server last used then start shutdown + ;; This thread waits for the server to come alive + (let* ((server-info (let loop () + (let ((sdat #f)) + (mutex-lock! *heartbeat-mutex*) + (set! sdat *server-info*) + (mutex-unlock! *heartbeat-mutex*) + (if sdat sdat + (begin + (debug:print 12 "WARNING: server not started yet, waiting few seconds before trying again") + (sleep 4) + (loop)))))) + (iface (cadr server-info)) + (pullport (caddr server-info)) + (pubport (cadddr server-info)) ;; id interface pullport pubport) + ;; (zmq-sockets (zmq-transport:client-connect iface pullport pubport)) + (last-access 0)) + (debug:print-info 11 "heartbeat started for zmq server on " iface " " pullport " " pubport) + (let loop ((count 0)) + (thread-sleep! 4) ;; no need to do this very often + ;; NB// sync currently does NOT return queue-length + ;; GET REAL QUEUE LENGTH FROM THE VARIABLE + (let ((queue-len 0)) ;; FOR NOW DO NOT DO THIS (cdb:client-call zmq-sockets 'sync #t 1))) + ;; (print "Server running, count is " count) + (if (< count 1) ;; 3x3 = 9 secs aprox + (loop (+ count 1))) + + ;; NOTE: Get rid of this mechanism! It really is not needed... + (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info)) + + ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access + (mutex-lock! *heartbeat-mutex*) + (set! last-access *last-db-access*) + (mutex-unlock! *heartbeat-mutex*) + (if (> (+ last-access + ;; (* 50 60 60) ;; 48 hrs + ;; 60 ;; one minute + ;; (* 60 60) ;; one hour + (* 45 60) ;; 45 minutes, until the db deletion bug is fixed. + ) + (current-seconds)) + (begin + (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) + (loop 0)) + (begin + (debug:print-info 0 "Starting to shutdown the server.") + ;; need to delete only *my* server entry (future use) + (set! *time-to-exit* #t) + (open-run-close tasks:server-deregister-self tasks:open-db (get-host-name)) + (thread-sleep! 1) + (debug:print-info 0 "Max cached queries was " *max-cache-size*) + (debug:print-info 0 "Server shutdown complete. Exiting") + (exit))))))) + +(define (zmq-transport:find-free-port-and-open iface s port stype #!key (trynum 50)) + (let ((s (if s s (make-socket stype))) + (p (if (number? port) port 5555)) + (old-handler (current-exception-handler))) + (handle-exceptions + exn + (begin + (debug:print 0 "Failed to bind to port " p ", trying next port") + (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) + ;; (old-handler) + ;; (print-call-chain) + (if (> trynum 0) + (zmq-transport:find-free-port-and-open iface s (+ p 1) trynum: (- trynum 1)) + (debug:print-info 0 "Tried ports up to " p + " but all were in use. Please try a different port range by starting the server with parameter \" -port N\" where N is the starting port number to use")) + (exit)) ;; To exit or not? That is the question. + (let ((zmq-url (conc "tcp://" iface ":" p))) + (debug:print 2 "Trying to start server on " zmq-url) + (bind-socket s zmq-url) + (list iface s port))))) + +(define (zmq-transport:setup-ports ipaddrstr startport) + (let* ((s1 (zmq-transport:find-free-port-and-open ipaddrstr #f startport 'pull)) + (p1 (caddr s1)) + (s2 (zmq-transport:find-free-port-and-open ipaddrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pub)) + (p2 (caddr s2))) + (set! *runremote* #f) + (debug:print 0 "Server started on " ipaddrstr " ports " p1 " and " p2) + (mutex-lock! *heartbeat-mutex*) + (set! *server-info* (open-run-close tasks:server-register + tasks:open-db + (current-process-id) + ipaddrstr p1 + 0 + 'live + 'zmq + pubport: p2)) + (debug:print-info 11 "*server-info* set to " *server-info*) + (mutex-unlock! *heartbeat-mutex*) + (list s1 s2))) + +(define (zmq-transport:mk-signature) + (message-digest-string (md5-primitive) + (with-output-to-string + (lambda () + (write (list (current-directory) + (argv))))))) + +;;====================================================================== +;; S E R V E R U T I L I T I E S +;;====================================================================== + +;;====================================================================== +;; C L I E N T S +;;====================================================================== + +;; +(define (zmq-transport:client-socket-connect iface port #!key (context #f)(type 'req)(subscriptions '())) + (debug:print-info 3 "client-connect " iface ":" port ", type=" type ", subscriptions=" subscriptions) + (let ((connect-ok #f) + (zmq-socket (if context + (make-socket type context) + (make-socket type))) + (conurl (zmq-transport:make-server-url (list iface port)))) + (if (socket? zmq-socket) + (begin + ;; first apply subscriptions + (for-each (lambda (subscription) + (debug:print 2 "Subscribing to " subscription) + (socket-option-set! zmq-socket 'subscribe subscription)) + subscriptions) + (connect-socket zmq-socket conurl) + zmq-socket) + (begin + (debug:print 0 "ERROR: Failed to open socket to " conurl) + #f)))) + +(define (zmq-transport:client-connect iface pullport pubport) + (let* ((push-socket (zmq-transport:client-socket-connect iface pullport type: 'push)) + (sub-socket (zmq-transport:client-socket-connect iface pubport + type: 'sub + subscriptions: (list (server:get-client-signature) "all"))) + (zmq-sockets (vector push-socket sub-socket)) + (login-res #f)) + (debug:print-info 11 "zmq-transport:client-connect started. Next is login") + (set! login-res (server:client-login zmq-sockets)) + (if (and (not (null? login-res)) + (car login-res)) + (begin + (debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".") + (set! *runremote* zmq-sockets) + zmq-sockets) + (begin + (debug:print-info 2 "Failed to login or connect to " conurl) + (set! *runremote* #f) + #f)))) + +;; run zmq-transport:keep-running in a parallel thread to monitor that the db is being +;; used and to shutdown after sometime if it is not. +;; +(define (zmq-transport:keep-running) + ;; if none running or if > 20 seconds since + ;; server last used then start shutdown + ;; This thread waits for the server to come alive + (let* ((server-info (let loop () + (let ((sdat #f)) + (mutex-lock! *heartbeat-mutex*) + (set! sdat *runremote*) + (mutex-unlock! *heartbeat-mutex*) + (if sdat sdat + (begin + (sleep 4) + (loop)))))) + (iface (car server-info)) + (port (cadr server-info)) + (last-access 0) + (tdb (tasks:open-db)) + (spid (tasks:server-get-server-id tdb #f iface port #f))) + (print "Keep-running got server pid " spid ", using iface " iface " and port " port) + (let loop ((count 0)) + (thread-sleep! 4) ;; no need to do this very often + ;; NB// sync currently does NOT return queue-length + (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1))) + ;; (print "Server running, count is " count) + (if (< count 1) ;; 3x3 = 9 secs aprox + (loop (+ count 1))) + + ;; NOTE: Get rid of this mechanism! It really is not needed... + (tasks:server-update-heartbeat tdb spid) + + ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access + (mutex-lock! *heartbeat-mutex*) + (set! last-access *last-db-access*) + (mutex-unlock! *heartbeat-mutex*) + (if (> (+ last-access + ;; (* 50 60 60) ;; 48 hrs + ;; 60 ;; one minute + ;; (* 60 60) ;; one hour + (* 45 60) ;; 45 minutes, until the db deletion bug is fixed. + ) + (current-seconds)) + (begin + (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) + (loop 0)) + (begin + (debug:print-info 0 "Starting to shutdown the server.") + ;; need to delete only *my* server entry (future use) + (set! *time-to-exit* #t) + (tasks:server-deregister-self tdb (get-host-name)) + (thread-sleep! 1) + (debug:print-info 0 "Max cached queries was " *max-cache-size*) + (debug:print-info 0 "Server shutdown complete. Exiting") + (exit))))))) + +;; all routes though here end in exit ... +(define (zmq-transport:launch) + (if (not *toppath*) + (if (not (setup-for-run)) + (begin + (debug:print 0 "ERROR: cannot find megatest.config, exiting") + (exit)))) + (debug:print-info 2 "Starting zmq server") + (if *toppath* + (let* (;; (th1 (make-thread (lambda () + ;; (let ((server-info #f)) + ;; ;; wait for the server to be online and available + ;; (let loop () + ;; (debug:print-info 2 "Waiting for the server to come online before starting heartbeat") + ;; (thread-sleep! 2) + ;; (mutex-lock! *heartbeat-mutex*) + ;; (set! server-info *server-info* ) + ;; (mutex-unlock! *heartbeat-mutex*) + ;; (if (not server-info)(loop))) + ;; (debug:print 2 "Server alive, starting self-ping") + ;; (zmq-transport:self-ping server-info) + ;; )) + ;; "Self ping")) + (th2 (make-thread (lambda () + (zmq-transport:run + (if (args:get-arg "-server") + (args:get-arg "-server") + "-"))) "Server run")) + ;; (th3 (make-thread (lambda ()(zmq-transport:keep-running)) "Keep running")) + ) + (set! *client-non-blocking-mode* #t) + ;; (thread-start! th1) + (thread-start! th2) + ;; (thread-start! th3) + (set! *didsomething* #t) + ;; (thread-join! th3) + (thread-join! th2) + ) + (debug:print 0 "ERROR: Failed to setup for megatest"))) + +(define (zmq-transport:client-signal-handler signum) + (handle-exceptions + exn + (debug:print " ... exiting ...") + (let ((th1 (make-thread (lambda () + (if (not *received-response*) + (receive-message* *runremote*))) ;; flush out last call if applicable + "eat response")) + (th2 (make-thread (lambda () + (debug:print 0 "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.") + (thread-sleep! 3) ;; give the flush three seconds to do it's stuff + (debug:print 0 " Done.") + (exit 4)) + "exit on ^C timer"))) + (thread-start! th2) + (thread-start! th1) + (thread-join! th2)))) + +(define (zmq-transport:client-launch) + (set-signal-handler! signal/int zmq-transport:client-signal-handler) + (if (zmq-transport:client-setup) + (debug:print-info 2 "connected as client") + (begin + (debug:print 0 "ERROR: Failed to connect as client") + (exit)))) + +;;====================================================================== +;; Defunct functions +;;====================================================================== + +;; ping a server and return number of clients or #f (if no response) +;; NOT IN USE! +(define (zmq-transport:ping host port #!key (secs 10)(return-socket #f)) + (cdb:use-non-blocking-mode + (lambda () + (let* ((res #f) + (th1 (make-thread + (lambda () + (let* ((zmq-context (make-context 1)) + (zmq-socket (zmq-transport:client-connect host port context: zmq-context))) + (if zmq-socket + (if (zmq-transport:client-login zmq-socket) + (let ((numclients (cdb:num-clients zmq-socket))) + (if (not return-socket) + (begin + (zmq-transport:client-logout zmq-socket) + (close-socket zmq-socket))) + (set! res (list #t numclients (if return-socket zmq-socket #f)))) + (begin + ;; (close-socket zmq-socket) + (set! res (list #f "CAN'T LOGIN" #f)))) + (set! res (list #f "CAN'T CONNECT" #f))))) + "Ping: th1")) + (th2 (make-thread + (lambda () + (let loop ((count 1)) + (debug:print-info 1 "Ping " count " server on " host " at port " port) + (thread-sleep! 2) + (if (< count (/ secs 2)) + (loop (+ count 1)))) + ;; (thread-terminate! th1) + (set! res (list #f "TIMED OUT" #f))) + "Ping: th2"))) + (thread-start! th2) + (thread-start! th1) + (handle-exceptions + exn + (set! res (list #f "TIMED OUT" #f)) + (thread-join! th1 secs)) + res)))) + +;; (define (zmq-transport:self-ping server-info) +;; ;; server-info: server-id interface pullport pubport +;; (let ((iface (list-ref server-info 1)) +;; (pullport (list-ref server-info 2)) +;; (pubport (list-ref server-info 3))) +;; (zmq-transport:client-connect iface pullport pubport) +;; (let loop () +;; (thread-sleep! 2) +;; (cdb:client-call *runremote* 'ping #t) +;; (debug:print 4 "zmq-transport:self-ping - I'm alive on " iface ":" pullport "/" pubport "!") +;; (mutex-lock! *heartbeat-mutex*) +;; (set! *server-loop-heart-beat* (current-seconds)) +;; (mutex-unlock! *heartbeat-mutex*) +;; (loop)))) + +(define (zmq-transport:reply pubsock target query-sig success/fail result) + (debug:print-info 11 "zmq-transport:reply target=" target ", result=" result) + (send-message pubsock target send-more: #t) + (send-message pubsock (db:obj->string (vector success/fail query-sig result)))) +