Overview
Comment: | Backed out accelerations |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | interleaved-queries |
Files: | files | file ages | folders |
SHA1: |
44292aaf12265a381b80dba8df449bee |
User & Date: | matt on 2012-11-20 19:47:42 |
Other Links: | branch diff | manifest | tags |
Context
2012-11-20
| ||
20:25 | Added exception handling to deal with high cpu loads check-in: aaf246854c user: matt tags: interleaved-queries | |
19:47 | Backed out accelerations check-in: 44292aaf12 user: matt tags: interleaved-queries | |
18:09 | (no comment) check-in: fc67718610 user: mrwellan tags: interleaved-queries | |
Changes
Modified db.scm from [aec6974814] to [e37e26940d].
︙ | ︙ | |||
1252 1253 1254 1255 1256 1257 1258 | ;; not used, intended to indicate to run in calling process (define db:run-local-queries '()) ;; rollup-tests-pass-fail)) ;; The queue is a list of vectors where the zeroth slot indicates the type of query to ;; apply and the second slot is the time of the query and the third entry is a list of ;; values to be applied ;; | | < < < | | | | > > > > | | | < | > > | | | < | | < | < < < | < < < | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | > | < < < < | < < < | | | < < < < < < < < < < < < < < < < < < < < < < | 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 | ;; not used, intended to indicate to run in calling process (define db:run-local-queries '()) ;; rollup-tests-pass-fail)) ;; The queue is a list of vectors where the zeroth slot indicates the type of query to ;; apply and the second slot is the time of the query and the third entry is a list of ;; values to be applied ;; (define (db:process-queue db pubsock indata) (let* ((data (sort indata (lambda (a b) (< (cdb:packet-get-qtime a)(cdb:packet-get-qtime b)))))) (for-each (lambda (item) (db:process-queue-item db pubsock item)) data))) (define (db:process-queue-item db pubsock item) (let* ((stmt-key (cdb:packet-get-qtype item)) (qry-sig (cdb:packet-get-query-sig item)) (return-address (cdb:packet-get-client-sig item)) (params (cdb:packet-get-params item)) (query (let ((q (alist-ref stmt-key db:queries))) (if q (car q) #f)))) (debug:print-info 11 "Special queries/requests stmt-key=" stmt-key ", return-address=" return-address ", qrery=" query ", params=" params) (cond (query (apply sqlite3:execute db query params) (server:reply pubsock return-address qry-sig #t #t)) ((member stmt-key db:special-queries) (debug:print-info 11 "Handling special statement " stmt-key) (case stmt-key ((immediate) (let ((proc (car params)) (remparams (cdr params))) ;; we are being handed a procedure so call it (debug:print-info 11 "Running (apply " proc " " remparams ")") (server:reply pubsock return-address qry-sig #t (apply proc remparams)))) ((login) (if (< (length params) 3) ;; should get toppath, version and signature '(#f "login failed due to missing params") ;; missing params (let ((calling-path (car params)) (calling-vers (cadr params)) (client-key (caddr params))) (if (and (equal? calling-path *toppath*) (equal? megatest-version calling-vers)) (begin (hash-table-set! *logged-in-clients* client-key (current-seconds)) (server:reply pubsock return-address qry-sig #t '(#t "successful login"))) ;; path matches - pass! Should vet the caller at this time ... (list #f (conc "Login failed due to mismatch paths: " calling-path ", " *toppath*)))))) ((flush sync) (server:reply pubsock return-address qry-sig #t 1)) ;; (length data))) ((set-verbosity) (set! *verbosity* (car params)) (server:reply pubsock return-address qry-sig #t '(#t *verbosity*))) ((killserver) (debug:print 0 "WARNING: Server going down in 15 seconds by user request!") (open-run-close tasks:server-deregister tasks:open-db (cadr *server-info*) pullport: (caddr *server-info*)) (thread-start! (make-thread (lambda ()(thread-sleep! 15)(exit)))) (server:reply pubsock return-address qry-sig #t '(#t "exit process started"))) (else ;; not a command, i.e. is a query (debug:print 0 "ERROR: Unrecognised query/command " stmt-key) (server:reply pubsock return-address qry-sig #f 'failed)))) (else (debug:print-info 11 "Executing " stmt-key " for " params) (apply sqlite3:execute (hash-table-ref queries stmt-key) params) (server:reply pubsock return-address qry-sig #t #t))))) (define (db:test-get-records-for-index-file db run-id test-name) (let ((res '())) (sqlite3:for-each-row (lambda (id itempath state status run_duration logf comment) (set! res (cons (vector id itempath state status run_duration logf comment) res))) db |
︙ | ︙ |
Modified server.scm from [cfe6037649] to [18494713b4].
︙ | ︙ | |||
114 115 116 117 118 119 120 | (set! pub-socket (cadr zmq-sdat2)) (set! p2 (caddr zmq-sdat2)) (set! *cache-on* #t) ;; what to do when we quit ;; | | | | | | | | | | | | | | | | 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 | (set! pub-socket (cadr zmq-sdat2)) (set! p2 (caddr zmq-sdat2)) (set! *cache-on* #t) ;; what to do when we quit ;; ;; (on-exit (lambda () ;; (if (and *toppath* *server-info*) ;; (open-run-close tasks:server-deregister-self tasks:open-db (car *server-info*)) ;; (let loop () ;; (let ((queue-len 0)) ;; (thread-sleep! (random 5)) ;; (mutex-lock! *incoming-mutex*) ;; (set! queue-len (length *incoming-data*)) ;; (mutex-unlock! *incoming-mutex*) ;; (if (> queue-len 0) ;; (begin ;; (debug:print-info 0 "Queue not flushed, waiting ...") ;; (loop)))))))) ;; The heavy lifting ;; ;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime ;; (let loop ((queue-lst '())) (let* ((rawmsg (receive-message* pull-socket)) (packet (db:string->obj rawmsg))) (debug:print-info 12 "server=> received packet=" packet) (if #t ;; (cdb:packet-get-immediate packet) ;; process immediately or put in queue (begin (open-run-close db:process-queue #f pub-socket (cons packet queue-lst)) (loop '())) (loop (cons packet queue-lst))))))) (define (server:reply pubsock target query-sig success/fail result) (debug:print-info 11 "server:reply target=" target ", result=" result) (send-message pubsock target send-more: #t) (send-message pubsock (db:obj->string (vector success/fail query-sig result)))) |
︙ | ︙ | |||
238 239 240 241 242 243 244 245 246 247 248 249 250 251 | (define (server:mk-signature) (message-digest-string (md5-primitive) (with-output-to-string (lambda () (write (list (current-directory) (argv))))))) ;;====================================================================== ;; C L I E N T S ;;====================================================================== (define (server:get-client-signature) (if *my-client-signature* *my-client-signature* | > > > > > | 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 | (define (server:mk-signature) (message-digest-string (md5-primitive) (with-output-to-string (lambda () (write (list (current-directory) (argv))))))) ;;====================================================================== ;; S E R V E R U T I L I T I E S ;;====================================================================== ;;====================================================================== ;; C L I E N T S ;;====================================================================== (define (server:get-client-signature) (if *my-client-signature* *my-client-signature* |
︙ | ︙ | |||
362 363 364 365 366 367 368 | ;; (if (not server-info)(loop))) ;; (debug:print 1 "Server alive, starting self-ping") ;; (server:self-ping server-info) ;; )) ;; "Self ping")) (th2 (make-thread (lambda () (server:run (args:get-arg "-server"))) "Server run")) | | | | 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 | ;; (if (not server-info)(loop))) ;; (debug:print 1 "Server alive, starting self-ping") ;; (server:self-ping server-info) ;; )) ;; "Self ping")) (th2 (make-thread (lambda () (server:run (args:get-arg "-server"))) "Server run")) ;; (th3 (make-thread (lambda ()(server:keep-running)) "Keep running")) ) (set! *client-non-blocking-mode* #t) ;; (thread-start! th1) (thread-start! th2) ;; (thread-start! th3) (set! *didsomething* #t) ;; (thread-join! th3) (thread-join! th2) ) (debug:print 0 "ERROR: Failed to setup for megatest"))) (exit))) |
︙ | ︙ |