Overview
Comment: | Default of direct fs access done |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | multi-transport |
Files: | files | file ages | folders |
SHA1: |
5deab59f4e218b12dd677c4c5bb1fe01 |
User & Date: | matt on 2013-01-27 16:45:03 |
Other Links: | branch diff | manifest | tags |
Context
2013-01-27
| ||
19:55 | Patched in zmq changes. Not functional yet check-in: 22a2d4ad0b user: matt tags: multi-transport | |
16:45 | Default of direct fs access done check-in: 5deab59f4e user: matt tags: multi-transport | |
13:14 | Creating branch for multi transport options, http, rpc, zmq, and network fs check-in: 66763d5399 user: matt tags: multi-transport | |
Changes
Modified Makefile from [a4b44c1e54] to [aa9943da6e].
1 2 3 4 5 6 7 | PREFIX=$(PWD) CSCOPTS= INSTALL=install SRCFILES = common.scm items.scm launch.scm \ ods.scm runconfig.scm server.scm configf.scm \ db.scm keys.scm margs.scm megatest-version.scm \ | | > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | PREFIX=$(PWD) CSCOPTS= INSTALL=install SRCFILES = common.scm items.scm launch.scm \ ods.scm runconfig.scm server.scm configf.scm \ db.scm keys.scm margs.scm megatest-version.scm \ process.scm runs.scm tasks.scm tests.scm genexample.scm \ fs-transport.scm GUISRCF = dashboard.scm dashboard-tests.scm dashboard-guimonitor.scm dashboard-main.scm OFILES = $(SRCFILES:%.scm=%.o) GOFILES = $(GUISRCF:%.scm=%.o) ADTLSCR=mt_laststep mt_runstep mt_ezstep |
︙ | ︙ |
Modified common.scm from [1ba863b641] to [a6d027f297].
︙ | ︙ | |||
38 39 40 41 42 43 44 45 46 47 48 49 50 51 | (define *waiting-queue* (make-hash-table)) (define *test-meta-updated* (make-hash-table)) (define *globalexitstatus* 0) ;; attempt to work around possible thread issues (define *passnum* 0) ;; when running track calls to run-tests or similar ;; SERVER (define *my-client-signature* #f) (define *rpc:listener* #f) ;; if set up for server communication this will hold the tcp port (define *runremote* #f) ;; if set up for server communication this will hold <host port> (define *last-db-access* (current-seconds)) ;; update when db is accessed via server (define *max-cache-size* 0) (define *logged-in-clients* (make-hash-table)) (define *client-non-blocking-mode* #f) (define *server-id* #f) | > > | 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 | (define *waiting-queue* (make-hash-table)) (define *test-meta-updated* (make-hash-table)) (define *globalexitstatus* 0) ;; attempt to work around possible thread issues (define *passnum* 0) ;; when running track calls to run-tests or similar ;; SERVER (define *my-client-signature* #f) (define *transport-type* 'fs) (define *megatest-db* #f) (define *rpc:listener* #f) ;; if set up for server communication this will hold the tcp port (define *runremote* #f) ;; if set up for server communication this will hold <host port> (define *last-db-access* (current-seconds)) ;; update when db is accessed via server (define *max-cache-size* 0) (define *logged-in-clients* (make-hash-table)) (define *client-non-blocking-mode* #f) (define *server-id* #f) |
︙ | ︙ |
Modified db.scm from [4a4c4c2fc7] to [cedb890a0b].
︙ | ︙ | |||
20 21 22 23 24 25 26 27 28 29 30 31 32 33 | (import (prefix sqlite3 sqlite3:)) (import (prefix base64 base64:)) (declare (unit db)) (declare (uses common)) (declare (uses keys)) (declare (uses ods)) (include "common_records.scm") (include "db_records.scm") (include "key_records.scm") (include "run_records.scm") ;; timestamp type (val1 val2 ...) | > | 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | (import (prefix sqlite3 sqlite3:)) (import (prefix base64 base64:)) (declare (unit db)) (declare (uses common)) (declare (uses keys)) (declare (uses ods)) (declare (uses fs-transport)) (include "common_records.scm") (include "db_records.scm") (include "key_records.scm") (include "run_records.scm") ;; timestamp type (val1 val2 ...) |
︙ | ︙ | |||
1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 | (let ((res (proc))) (set! *client-non-blocking-mode* #f) res)) ;; params = 'target cached remparams ;; ;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime ;; (define (cdb:client-call serverdat qtype immediate numretries . params) (debug:print-info 11 "cdb:client-call serverdat=" serverdat ", qtype=" qtype ", immediate=" immediate ", numretries=" numretries ", params=" params) | > > > > > > | < | < | > > | | | < | | < | | | | < < | < < < < < < < < < < < < < < < < < | < < < < < < < < | 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 | (let ((res (proc))) (set! *client-non-blocking-mode* #f) res)) ;; params = 'target cached remparams ;; ;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime ;; ;; cdb:client-call is the unified interface to all the transports. It dispatches the ;; query to a server routine (e.g. server:client-send-recieve) that ;; transports the data to the server where it is passed to db:process-queue-item ;; which either returns the data to the calling server routine or ;; directly calls the returning procedure (e.g. zmq). ;; (define (cdb:client-call serverdat qtype immediate numretries . params) (debug:print-info 11 "cdb:client-call serverdat=" serverdat ", qtype=" qtype ", immediate=" immediate ", numretries=" numretries ", params=" params) (case *transport-type* ((fs) (let ((packet (vector "na" qtype immediate "na" params 0))) (fs:process-queue-item packet))) ((http) (let* ((client-sig (server:get-client-signature)) (query-sig (message-digest-string (md5-primitive) (conc qtype immediate params))) (zdat (db:obj->string (vector client-sig qtype immediate query-sig params (current-seconds))))) ;; (with-output-to-string (lambda ()(serialize params)))) (debug:print-info 11 "zdat=" zdat) (let* ((res #f) (rawdat (server:client-send-receive serverdat zdat)) (tmp #f)) (debug:print-info 11 "Sent " zdat ", received " rawdat) (set! tmp (db:string->obj rawdat)) (vector-ref tmp 2) ))))) (define (cdb:set-verbosity serverdat val) (cdb:client-call serverdat 'set-verbosity #f *default-numtries* val)) (define (cdb:login serverdat keyval signature) (cdb:client-call serverdat 'login #t *default-numtries* keyval megatest-version signature)) |
︙ | ︙ |
Modified fs-transport.scm from [12f5deda72] to [d187681c70].
︙ | ︙ | |||
13 14 15 16 17 18 19 | (use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) (import (prefix sqlite3 sqlite3:)) (use spiffy uri-common intarweb http-client spiffy-request-vars) (tcp-buffer-size 2048) | | < < < < < < < | < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < | < < < < < < < < < < < < < < < < < < < < < < < < | < < < < < < < < < < < < < | < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < | < < | < < < < < < < < < < < < < < < < < < < < < < < | < < < < < < < < < < < < < < < < < < < < < < | < < < < | < < < < < < < < < < < < < < < < | < < < < < < < | | 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | (use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) (import (prefix sqlite3 sqlite3:)) (use spiffy uri-common intarweb http-client spiffy-request-vars) (tcp-buffer-size 2048) (declare (unit fs-transport)) (declare (uses common)) (declare (uses db)) (declare (uses tests)) (declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. (include "common_records.scm") (include "db_records.scm") ;;====================================================================== ;; F S T R A N S P O R T S E R V E R ;;====================================================================== ;; There is no "server" per se but a convience routine to make it non ;; necessary to be reopening the db over and over again. ;; (define (fs:process-queue-item packet) (if (not *megatest-db*) ;; we will require that (setup-for-run) has already been called (set! *megatest-db* (open-db))) (debug:print-info 11 "fs:process-queue-item called with packet=" packet) (db:process-queue-item *megatest-db* packet)) |
Modified server.scm from [12f5deda72] to [7bea8dbf29].
︙ | ︙ | |||
183 184 185 186 187 188 189 | ;; When using zmq this would send the message back (two step process) ;; with spiffy or rpc this simply returns the return data to be returned ;; (define (server:reply return-addr query-sig success/fail result) (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result) ;; (send-message pubsock target send-more: #t) ;; (send-message pubsock | > > | | 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 | ;; When using zmq this would send the message back (two step process) ;; with spiffy or rpc this simply returns the return data to be returned ;; (define (server:reply return-addr query-sig success/fail result) (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result) ;; (send-message pubsock target send-more: #t) ;; (send-message pubsock (case *transport-type* ((fs) result) ((http)(db:obj->string (vector success/fail query-sig result))))) ;;====================================================================== ;; C L I E N T S ;;====================================================================== (define (server:get-client-signature) (if *my-client-signature* *my-client-signature* |
︙ | ︙ | |||
256 257 258 259 260 261 262 | (set! *runremote* serverdat) serverdat) (begin (debug:print-info 2 "Failed to login or connect to " iface ":" port) (set! *runremote* #f) #f)))) | | > > > > > | | | | | | | | > > > > | | | | | | | | | | | | | | | | | | | | | | | | | 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 | (set! *runremote* serverdat) serverdat) (begin (debug:print-info 2 "Failed to login or connect to " iface ":" port) (set! *runremote* #f) #f)))) ;; Do all the connection work, look up the transport type and set up the ;; connection if required. ;; (define (server:client-setup #!key (numtries 50)) (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: failed to find megatest.config, exiting") (exit)))) (case *transport-type* ((fs)(if (not *megatest-db*)(set! *megatest-db* (open-db)))) ((http) (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) (if hostinfo (let ((host (list-ref hostinfo 0)) (iface (list-ref hostinfo 1)) (port (list-ref hostinfo 2)) (pid (list-ref hostinfo 3))) (debug:print-info 2 "Setting up to connect to " hostinfo) (server:client-connect iface port)) ;; ) (begin (debug:print 0 "ERROR: No server found, exiting") (exit))))))) ;; (if (> numtries 0) ;; (let ((exe (car (argv))) ;; (pid #f)) ;; (debug:print-info 0 "No server available, attempting to start one...") ;; (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*) ;; (string-intersperse *verbosity* ",") ;; (conc *verbosity*))))) ;; ;; (set! pid (process-fork (lambda () ;; ;; (current-input-port (open-input-file "/dev/null")) ;; ;; (current-output-port (open-output-file "/dev/null")) ;; ;; (current-error-port (open-output-file "/dev/null")) ;; ;; (server:launch)))) ;; (let loop ((count 0)) ;; (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) ;; (if (not hostinfo) ;; (begin ;; (debug:print-info 0 "Waiting for server pid=" pid " to start") ;; (sleep 2) ;; give server time to start ;; (if (< count 5) ;; (loop (+ count 1))))))) ;; ;; we are starting a server, do not try again! That can lead to ;; ;; recursively starting many processes!!! ;; (server:client-setup numtries: 0)) ;; (debug:print-info 1 "Too many attempts, giving up"))))) ;; run server:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; (define (server:keep-running) ;; if none running or if > 20 seconds since ;; server last used then start shutdown |
︙ | ︙ |