Overview
Comment: | server, list-runs and repl now working |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | interleaved-queries |
Files: | files | file ages | folders |
SHA1: |
0cb9ad87a9541cfdda35e49317637818 |
User & Date: | matt on 2012-11-19 01:55:53 |
Other Links: | branch diff | manifest | tags |
Context
2012-11-19
| ||
13:04 | Tweaked for testing, all calls immediate check-in: 6ac20061e7 user: mrwellan tags: interleaved-queries | |
01:55 | server, list-runs and repl now working check-in: 0cb9ad87a9 user: matt tags: interleaved-queries | |
2012-11-18
| ||
23:30 | Initial coding for interleaved queries done and compiles check-in: b85732a36a user: matt tags: interleaved-queries | |
Changes
Modified db.scm from [c306724289] to [8d3f7767b5].
︙ | ︙ | |||
1107 1108 1109 1110 1111 1112 1113 | res)) ;; params = 'target cached remparams ;; ;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime ;; (define (cdb:client-call zmq-sockets qtype immediate numretries . params) | | | | | > | | | | | | | | > | > | 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 | res)) ;; params = 'target cached remparams ;; ;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime ;; (define (cdb:client-call zmq-sockets qtype immediate numretries . params) (debug:print-info 11 "cdb:client-call zmq-sockets=" zmq-sockets ", qtype=" qtype ", immediate=" immediate ", numretries=" numretries ", params=" params) (let* ((push-socket (vector-ref zmq-sockets 0)) (sub-socket (vector-ref zmq-sockets 1)) (client-sig (server:get-client-signature)) (query-sig (message-digest-string (md5-primitive) (conc qtype immediate params))) (zdat (db:obj->string (vector client-sig qtype immediate query-sig params (current-seconds)))) ;; (with-output-to-string (lambda ()(serialize params)))) (res #f) (send-receive (lambda () (debug:print-info 11 "sending message") (send-message push-socket zdat) (debug:print-info 11 "message sent") (let ((rmsg receive-message*)) ;; (if *client-non-blocking-mode* receive-message* receive-message))) ;; get the sender info ;; this should match (server:get-client-signature) ;; we will need to process "all" messages here some day (rmsg sub-socket) ;; now get the actual message (set! res (db:string->obj (rmsg sub-socket)))))) (timeout (lambda () (thread-sleep! 5) (if (not res) (if (> numretries 0) (begin (debug:print 0 "WARNING: no reply to query " params ", trying again") (apply cdb:client-call zmq-sockets qtype immediate (- numretries 1) params)) (begin (debug:print 0 "ERROR: cdb:client-call timed out " params ", exiting.") (exit 5))))))) (debug:print-info 11 "Starting threads") (let ((th1 (make-thread send-receive "send receive")) (th2 (make-thread timeout "timeout"))) (thread-start! th1) (thread-start! th2) (thread-join! th1) (debug:print-info 11 "cdb:client-call returning res=" res) res))) (define (cdb:set-verbosity zmq-socket val) (cdb:client-call zmq-socket 'set-verbosity #f *default-numtries* val)) (define (cdb:login zmq-sockets keyval signature) (cdb:client-call zmq-sockets 'login #t *default-numtries* keyval megatest-version signature)) |
︙ | ︙ | |||
1226 1227 1228 1229 1230 1231 1232 | '(test-set-rundir "UPDATE tests SET rundir=? WHERE run_id=? AND testname=? AND item_path=?;") '(delete-tests-in-state "DELETE FROM tests WHERE state=? AND run_id=?;") '(tests:test-set-toplog "UPDATE tests SET final_logf=? WHERE run_id=? AND testname=? AND item_path='';") )) ;; do not run these as part of the transaction (define db:special-queries '(rollup-tests-pass-fail | | > | | > | | | | > | < | | | < | | | > > > > > > > > > > > > > > > | | 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 | '(test-set-rundir "UPDATE tests SET rundir=? WHERE run_id=? AND testname=? AND item_path=?;") '(delete-tests-in-state "DELETE FROM tests WHERE state=? AND run_id=?;") '(tests:test-set-toplog "UPDATE tests SET final_logf=? WHERE run_id=? AND testname=? AND item_path='';") )) ;; do not run these as part of the transaction (define db:special-queries '(rollup-tests-pass-fail db:roll-up-pass-fail-counts login)) ;; not used, intended to indicate to run in calling process (define db:run-local-queries '()) ;; rollup-tests-pass-fail)) ;; The queue is a list of vectors where the zeroth slot indicates the type of query to ;; apply and the second slot is the time of the query and the third entry is a list of ;; values to be applied ;; (define (db:process-queue pubsock indata) (open-run-close (lambda (db . junkparams) (let ((queries (make-hash-table)) (data (sort indata (lambda (a b) (< (cdb:packet-get-qtime a)(cdb:packet-get-qtime b)))))) (if (> (length data) 0) (debug:print-info 4 "Writing cached data " data)) ;; prepare the needed statements, do each only once (for-each (lambda (request-item) (let ((stmt-key (cdb:packet-get-qtype request-item))) (if (and (not (hash-table-ref/default queries stmt-key #f)) (not (member stmt-key db:special-queries))) (let ((stmt (alist-ref stmt-key db:queries))) (if stmt (hash-table-set! queries stmt-key (sqlite3:prepare db (car stmt))) (if (procedure? stmt-key) (hash-table-set! queries stmt-key #f) (debug:print 0 "ERROR: Missing query spec for " stmt-key "!"))))))) data) ;; outer loop to handle special queries that cannot be handled in the ;; transaction. (let outerloop ((special-qry #f) (stmts data)) (if special-qry ;; handle a query that cannot be part of the grouped queries (let* ((stmt-key (cdb:packet-get-qtype special-qry)) (return-address (cdb:packet-get-client-sig special-qry)) (qry (hash-table-ref/default queries stmt-key #f)) (params (cdb:packet-get-params special-qry))) (cond ((string? qry) (apply sqlite3:execute db qry params) (server:reply pubsock return-address #t)) ((procedure? stmt-key) ;; we are being handed a procedure so call it (debug:print-info 11 "Running (apply " stmt-key " " db " " params ")") (server:reply pubsock return-address (apply stmt-key db params))) (else (case stmt-key ((login) (if (< (length params) 3) ;; should get toppath, version and signature '(#f "login failed due to missing params") ;; missing params (let ((calling-path (car params)) (calling-vers (cadr params)) (client-key (caddr params))) (if (and (equal? calling-path *toppath*) (equal? megatest-version calling-vers)) (begin (hash-table-set! *logged-in-clients* client-key (current-seconds)) (server:reply pubsock return-address '(#t "successful login"))) ;; path matches - pass! Should vet the caller at this time ... (list #f (conc "Login failed due to mismatch paths: " calling-path ", " *toppath*)))))) (else (debug:print 0 "ERROR: Unrecognised queued call " qry " " params))))) (if (not (null? stmts)) (outerloop #f stmts))) ;; handle normal queries (let ((rem (sqlite3:with-transaction db (lambda () |
︙ | ︙ | |||
1300 1301 1302 1303 1304 1305 1306 | (member stmt-key db:special-queries)) (begin (debug:print-info 11 "Handling special statement " stmt-key) (cons hed tal)) (begin (debug:print-info 11 "Executing " stmt-key " for " params) (apply sqlite3:execute (hash-table-ref queries stmt-key) params) | | | 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 | (member stmt-key db:special-queries)) (begin (debug:print-info 11 "Handling special statement " stmt-key) (cons hed tal)) (begin (debug:print-info 11 "Executing " stmt-key " for " params) (apply sqlite3:execute (hash-table-ref queries stmt-key) params) (server:reply pubsock return-address #t) (if (not (null? tal)) (innerloop (car tal)(cdr tal)) '())) )))))))) (if (not (null? rem)) (outerloop (car rem)(cdr rem)))))) (for-each (lambda (stmt-key) |
︙ | ︙ |
Modified server.scm from [67b3f4e15b] to [a18aca67cf].
︙ | ︙ | |||
134 135 136 137 138 139 140 | (loop)))))))) ;; The heavy lifting ;; ;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime ;; (let loop ((queue-lst '())) | | | | > | | | | 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 | (loop)))))))) ;; The heavy lifting ;; ;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime ;; (let loop ((queue-lst '())) (print "GOT HERE EH?") (let* ((rawmsg (receive-message* pull-socket)) (packet (db:string->obj rawmsg))) (debug:print-info 12 "server=> received packet=" packet) (if (cdb:packet-get-immediate packet) ;; process immediately or put in queue (begin (db:process-queue pub-socket (cons packet queue-lst)) (loop '())) (loop (cons packet queue-lst))))))) (define (server:reply pubsock target result) (debug:print-info 11 "server:reply target=" target ", result=" result) (send-message pubsock target send-more: #t) (send-message pubsock (db:obj->string result))) ;; run server:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; (define (server:keep-running) ;; if none running or if > 20 seconds since ;; server last used then start shutdown ;; This thread waits for the server to come alive (let* ((server-info (let loop () (let ((sdat #f)) (mutex-lock! *heartbeat-mutex*) (set! sdat *server-info*) (mutex-unlock! *heartbeat-mutex*) (if sdat sdat (begin (sleep 4) (loop)))))) (iface (cadr server-info)) (pullport (caddr server-info)) (pubport (cadddr server-info)) ;; id interface pullport pubport) ;; (zmq-sockets (server:client-connect iface pullport pubport)) ) (let loop ((count 0)) (thread-sleep! 4) ;; no need to do this very often ;; (let ((queue-len (string->number (cdb:client-call zmq-sockets 'sync #t 1)))) ;; (print "Server running, count is " count) (if (< count 1) ;; 3x3 = 9 secs aprox (loop (+ count 1))) ;; NOTE: Get rid of this mechanism! It really is not needed... (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info)) |
︙ | ︙ | |||
221 222 223 224 225 226 227 | (exit)) ;; To exit or not? That is the question. (let ((zmq-url (conc "tcp://" iface ":" p))) (debug:print 0 "Trying to start server on " zmq-url) (bind-socket s zmq-url) (list iface s port))))) (define (server:setup-ports ipaddrstr startport) | | | | 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 | (exit)) ;; To exit or not? That is the question. (let ((zmq-url (conc "tcp://" iface ":" p))) (debug:print 0 "Trying to start server on " zmq-url) (bind-socket s zmq-url) (list iface s port))))) (define (server:setup-ports ipaddrstr startport) (let* ((s1 (server:find-free-port-and-open ipaddrstr #f startport 'pull)) (p1 (caddr s1)) (s2 (server:find-free-port-and-open ipaddrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pub)) (p2 (caddr s2))) (set! *runremote* #f) (debug:print 0 "Server started on " ipaddrstr " ports " p1 " and " p2) (mutex-lock! *heartbeat-mutex*) (set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) ipaddrstr p1 p2 0 'live)) (mutex-unlock! *heartbeat-mutex*) (list s1 s2))) |
︙ | ︙ |