Overview
Comment: | Got the rpc server itself starting up |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | multi-transport |
Files: | files | file ages | folders |
SHA1: |
52321931b3f444c488d0c8bad4916871 |
User & Date: | matt on 2014-03-03 21:41:49 |
Other Links: | branch diff | manifest | tags |
Context
2014-03-03
| ||
22:46 | Added skeleton of client:setup for rpc check-in: e1d58e8335 user: matt tags: multi-transport | |
21:41 | Got the rpc server itself starting up check-in: 52321931b3 user: matt tags: multi-transport | |
12:39 | Fixed compilation check-in: 951ec894b7 user: mrwellan tags: multi-transport | |
Changes
Modified common.scm from [03bd87c740] to [4eb1de6666].
︙ | ︙ | |||
48 49 50 51 52 53 54 | (define *passnum* 0) ;; when running track calls to run-tests or similar ;; DATABASE (define *open-dbs* (vector #f (make-hash-table))) ;; megatestdb run-id-dbs ;; SERVER (define *my-client-signature* #f) | | | 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 | (define *passnum* 0) ;; when running track calls to run-tests or similar ;; DATABASE (define *open-dbs* (vector #f (make-hash-table))) ;; megatestdb run-id-dbs ;; SERVER (define *my-client-signature* #f) (define *transport-type* #f) (define *megatest-db* #f) (define *rpc:listener* #f) ;; if set up for server communication this will hold the tcp port (define *runremote* (make-hash-table)) ;; if set up for server communication this will hold <host port> (define *last-db-access* (current-seconds)) ;; update when db is accessed via server (define *max-cache-size* 0) (define *logged-in-clients* (make-hash-table)) (define *client-non-blocking-mode* #f) |
︙ | ︙ |
Modified http-transport.scm from [3720684b06] to [b1f2a5135d].
︙ | ︙ | |||
89 90 91 92 93 94 95 | ((equal? (uri-path (request-uri (current-request))) '(/ "api")) (send-response body: (api:process-request db $) ;; the $ is the request vars proc headers: '((content-type text/plain))) (mutex-lock! *heartbeat-mutex*) (set! *last-db-access* (current-seconds)) (mutex-unlock! *heartbeat-mutex*)) | < < < < < < < < < < < < < < < < < < < < < | 89 90 91 92 93 94 95 96 97 98 99 100 101 102 | ((equal? (uri-path (request-uri (current-request))) '(/ "api")) (send-response body: (api:process-request db $) ;; the $ is the request vars proc headers: '((content-type text/plain))) (mutex-lock! *heartbeat-mutex*) (set! *last-db-access* (current-seconds)) (mutex-unlock! *heartbeat-mutex*)) ((equal? (uri-path (request-uri (current-request))) '(/ "")) (send-response body: (http-transport:main-page))) ((equal? (uri-path (request-uri (current-request))) '(/ "runs")) (send-response body: (http-transport:main-page))) ((equal? (uri-path (request-uri (current-request))) |
︙ | ︙ | |||
327 328 329 330 331 332 333 334 335 336 337 338 339 340 | (* 60 60 (string->number tmo)) ;; (* 3 24 60 60) ;; default to three days ;; (* 60 1) ;; default to one minute (* 60 60 25) ;; default to 25 hours )))) (let loop ((count 0) (server-state 'available)) ;; Use this opportunity to sync the inmemdb to db (let ((start-time (current-milliseconds)) (sync-time #f) (rem-time #f)) (if *inmemdb* (db:sync-touched *inmemdb* force-sync: #t)) (set! sync-time (- (current-milliseconds) start-time)) | > > | 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 | (* 60 60 (string->number tmo)) ;; (* 3 24 60 60) ;; default to three days ;; (* 60 1) ;; default to one minute (* 60 60 25) ;; default to 25 hours )))) (let loop ((count 0) (server-state 'available)) ;; Use this opportunity to sync the inmemdb to db (let ((start-time (current-milliseconds)) (sync-time #f) (rem-time #f)) (if *inmemdb* (db:sync-touched *inmemdb* force-sync: #t)) (set! sync-time (- (current-milliseconds) start-time)) |
︙ | ︙ |
Modified megatest.scm from [34c738736b] to [798f749abb].
︙ | ︙ | |||
23 24 25 26 27 28 29 30 31 32 33 34 35 36 | (declare (uses launch)) (declare (uses server)) (declare (uses client)) (declare (uses tests)) (declare (uses genexample)) (declare (uses daemon)) (declare (uses db)) (declare (uses tdb)) (declare (uses mt)) (declare (uses api)) (declare (uses tasks)) ;; only used for debugging. (define *db* #f) ;; this is only for the repl, do not use in general!!!! | > | 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | (declare (uses launch)) (declare (uses server)) (declare (uses client)) (declare (uses tests)) (declare (uses genexample)) (declare (uses daemon)) (declare (uses db)) (declare (uses tdb)) (declare (uses mt)) (declare (uses api)) (declare (uses tasks)) ;; only used for debugging. (define *db* #f) ;; this is only for the repl, do not use in general!!!! |
︙ | ︙ |
Modified rpc-transport.scm from [f4c768be5b] to [c41c92f350].
︙ | ︙ | |||
64 65 66 67 68 69 70 | (rpc-transport:run (if (args:get-arg "-server") (args:get-arg "-server") "-") run-id server-id)) "Server run")) (th3 (make-thread (lambda () | | | | | | < | > | > > > > > < < | 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 | (rpc-transport:run (if (args:get-arg "-server") (args:get-arg "-server") "-") run-id server-id)) "Server run")) (th3 (make-thread (lambda () (rpc-transport:keep-running run-id server-id)) "Keep running"))) ;; Database connection (set! *inmemdb* (db:setup run-id)) (thread-start! th2) (thread-start! th3) (set! *didsomething* #t) (thread-join! th3) (exit))))) (define (rpc-transport:run hostn run-id server-id) (debug:print 2 "Attempting to start the rpc server ...") (let* ((db #f) (hostname (get-host-name)) (ipaddrstr (let ((ipstr (if (string=? "-" hostn) ;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") (server:get-best-guess-address hostname) #f))) (if ipstr ipstr hostn))) ;; hostname))) (start-port (open-run-close tasks:server-get-next-port tasks:open-db)) (link-tree-path (configf:lookup *configdat* "setup" "linktree")) (rpc:listener (rpc-transport:find-free-port-and-open (rpc:default-server-port))) (th1 (make-thread (cute (rpc:make-server rpc:listener) "rpc:server") 'rpc:server)) (hostname (if (string=? "-" hostn) (get-host-name) hostn)) (ipaddrstr (if (string=? "-" hostn) (server:get-best-guess-address hostname) ;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") #f)) (portnum (rpc:default-server-port)) (host:port (conc (if ipaddrstr ipaddrstr hostname) ":" portnum)) (tdb (tasks:open-db))) (set! db *inmemdb*) (open-run-close tasks:server-set-interface-port tasks:open-db server-id ipaddrstr portnum) (debug:print 0 "Server started on " host:port) ;; can use this to run most anything at the remote (rpc:publish-procedure! 'remote:run (lambda (procstr . params) (rpc-transport:autoremote procstr params))) |
︙ | ︙ | |||
159 160 161 162 163 164 165 166 167 168 169 | ;; ;; (rpc:publish-procedure! ;; 'cdb:flush-queue ;; (lambda () ;; (debug:print-info 12 "Remote call of cdb:flush-queue") ;; (cdb:flush-queue))) ;; ;;====================================================================== ;; ;; end of publish-procedure section ;;====================================================================== ;; | > < | | < < < < < < < < < < < < < < < | | > | < | | | | | | | < | | < | 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 | ;; ;; (rpc:publish-procedure! ;; 'cdb:flush-queue ;; (lambda () ;; (debug:print-info 12 "Remote call of cdb:flush-queue") ;; (cdb:flush-queue))) ;; ;;====================================================================== ;; ;; end of publish-procedure section ;;====================================================================== ;; (on-exit (lambda () (open-run-close tasks:server-set-state! tasks:open-db server-id "stopped"))) (thread-start! th1) (set! *rpc:listener* rpc:listener) (tasks:server-set-state! tdb server-id "running") ; (sqlite3:finalize! tdb) th1 )) ;; rpc:server))) (define (rpc-transport:keep-running run-id server-id) ;; if none running or if > 20 seconds since ;; server last used then start shutdown (let loop ((count 0)) (thread-sleep! 5) ;; no need to do this very often (let ((numrunning -1)) ;; (db:get-count-tests-running db))) (if (or (> numrunning 0) (> (+ *last-db-access* 60)(current-seconds))) (begin (debug:print-info 0 "Server continuing, tests running: " numrunning ", seconds since last db access: " (- (current-seconds) *last-db-access*)) (loop (+ 1 count))) (begin (debug:print-info 0 "Starting to shutdown the server side") (open-run-close tasks:server-force-clean-run-record tasks:open-db run-id ipaddrstr portnum " rpc-transport:try-start-server stop") (thread-sleep! 10) (debug:print-info 0 "Max cached queries was " *max-cache-size*) (debug:print-info 0 "Server shutdown complete. Exiting") ))))) (define (rpc-transport:find-free-port-and-open port) (handle-exceptions exn (begin (print "Failed to bind to port " (rpc:default-server-port) ", trying next port") |
︙ | ︙ |
Modified server.scm from [86d77ff5d9] to [a8caddcfa8].
︙ | ︙ | |||
57 58 59 60 61 62 63 | ;;====================================================================== ;; S E R V E R U T I L I T I E S ;;====================================================================== ;; Get the transport (define (server:get-transport) | | > | | | | > > | < | | > < | 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 | ;;====================================================================== ;; S E R V E R U T I L I T I E S ;;====================================================================== ;; Get the transport (define (server:get-transport) (if *transport-type* *transport-type* (let ((ttype (string->symbol (or (args:get-arg "-transport") (configf:lookup *configdat* "server" "transport") "rpc")))) (set! *transport-type* ttype) ttype))) ;; Generate a unique signature for this server (define (server:mk-signature) (message-digest-string (md5-primitive) (with-output-to-string (lambda () (write (list (current-directory) (argv))))))) ;; When using zmq this would send the message back (two step process) ;; with spiffy or rpc this simply returns the return data to be returned ;; (define (server:reply return-addr query-sig success/fail result) (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result) ;; (send-message pubsock target send-more: #t) ;; (send-message pubsock (case (server:get-transport) ((rpc) (db:obj->string (vector success/fail query-sig result))) ((http) (db:obj->string (vector success/fail query-sig result))) ((zmq) (let ((pub-socket (vector-ref *runremote* 1))) (send-message pub-socket return-addr send-more: #t) (send-message pub-socket (db:obj->string (vector success/fail query-sig result))))) ((fs) result) (else (debug:print 0 "ERROR: unrecognised transport type: " *transport-type*) result))) ;; Given a run id start a server process ### NOTE ### > file 2>&1 ;; if the run-id is zero and the target-host is set ;; try running on that host ;; (define (server:run run-id) (let* ((target-host (configf:lookup *configdat* "server" "homehost" )) |
︙ | ︙ |
Modified tasks.scm from [677b9b3c1c] to [d0fac30298].
︙ | ︙ | |||
103 104 105 106 107 108 109 | ;; register that this server may come online (first to register goes though with the process) (define (tasks:server-set-available mdb run-id) (sqlite3:execute mdb "INSERT INTO servers (pid,hostname,port,pubport,start_time, priority,state,mt_version,heartbeat, interface,transport,run_id) VALUES(?, ?, ?, ?, strftime('%s','now'), ?, ?, ?,-1,?, ?, ?);" | | | | | | | | | | | 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 | ;; register that this server may come online (first to register goes though with the process) (define (tasks:server-set-available mdb run-id) (sqlite3:execute mdb "INSERT INTO servers (pid,hostname,port,pubport,start_time, priority,state,mt_version,heartbeat, interface,transport,run_id) VALUES(?, ?, ?, ?, strftime('%s','now'), ?, ?, ?,-1,?, ?, ?);" (current-process-id) ;; pid (get-host-name) ;; hostname -1 ;; port -1 ;; pubport (random 1000) ;; priority (used a tiebreaker on get-available) "available" ;; state (common:version-signature) ;; mt_version -1 ;; interface (conc (server:get-transport)) ;; transport run-id )) (define (tasks:num-in-available-state mdb run-id) (let ((res 0)) (sqlite3:for-each-row (lambda (num-in-queue) |
︙ | ︙ |