Overview
Comment: | Added logic to not attempt to process cached-writes if they were processed within the last 400 ms. Added stats on cached writes |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | newdashboard |
Files: | files | file ages | folders |
SHA1: |
04ba871116e284db3059e19b628ee853 |
User & Date: | mrwellan on 2013-03-19 13:29:33 |
Other Links: | branch diff | manifest | tags |
Context
2013-03-19
| ||
14:23 | Switched tests data to minimal amount needed for runs display check-in: 1b7e157405 user: mrwellan tags: newdashboard | |
13:29 | Added logic to not attempt to process cached-writes if they were processed within the last 400 ms. Added stats on cached writes check-in: 04ba871116 user: mrwellan tags: newdashboard | |
11:26 | Partial re-implementation (again) of transaction wrapped writes, now working except for transaction within transaction issue check-in: 69c6ef28ac user: mrwellan tags: newdashboard | |
Changes
Modified db.scm from [69d9359c67] to [0291a77e98].
︙ | ︙ | |||
1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 | (define (db:process-cached-writes db) (let ((queries (make-hash-table)) (data #f)) (mutex-lock! *incoming-mutex*) ;; data is a list of query packets <vector qry-sig query params (set! data (reverse *incoming-writes*)) ;; (sort ... (lambda (a b)(< (vector-ref a 1)(vector-ref b 1))))) (set! *incoming-writes* '()) (mutex-unlock! *incoming-mutex*) (if (> (length data) 0) ;; Process if we have data (begin (debug:print-info 7 "Writing cached data " data) | > | 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 | (define (db:process-cached-writes db) (let ((queries (make-hash-table)) (data #f)) (mutex-lock! *incoming-mutex*) ;; data is a list of query packets <vector qry-sig query params (set! data (reverse *incoming-writes*)) ;; (sort ... (lambda (a b)(< (vector-ref a 1)(vector-ref b 1))))) (set! *server:last-write-flush* (current-milliseconds)) (set! *incoming-writes* '()) (mutex-unlock! *incoming-mutex*) (if (> (length data) 0) ;; Process if we have data (begin (debug:print-info 7 "Writing cached data " data) |
︙ | ︙ | |||
1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 | (let ((cache-size (length data))) (if (> cache-size *max-cache-size*) (set! *max-cache-size* cache-size))) #t) #f))) (define *db:process-queue-mutex* (make-mutex)) ;; The queue is a list of vectors where the zeroth slot indicates the type of query to ;; apply and the second slot is the time of the query and the third entry is a list of ;; values to be applied ;; (define (db:queue-write-and-wait db qry-sig query params) (let ((queue-len 0) (res #f) (got-it #f) (qry-pkt (vector qry-sig query params)) (timeout (+ 10 (current-seconds)))) ;; set the time out to 10 secs in future ;; Put the item in the queue *incoming-writes* (mutex-lock! *incoming-mutex*) (set! *incoming-writes* (cons qry-pkt *incoming-writes*)) (set! queue-len (length *incoming-writes*)) (mutex-unlock! *incoming-mutex*) | > > > > | 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 | (let ((cache-size (length data))) (if (> cache-size *max-cache-size*) (set! *max-cache-size* cache-size))) #t) #f))) (define *db:process-queue-mutex* (make-mutex)) (define *number-of-writes* 0) (define *writes-total-delay* 0) ;; The queue is a list of vectors where the zeroth slot indicates the type of query to ;; apply and the second slot is the time of the query and the third entry is a list of ;; values to be applied ;; (define (db:queue-write-and-wait db qry-sig query params) (let ((queue-len 0) (res #f) (got-it #f) (qry-pkt (vector qry-sig query params)) (start-time (current-milliseconds)) (timeout (+ 10 (current-seconds)))) ;; set the time out to 10 secs in future ;; Put the item in the queue *incoming-writes* (mutex-lock! *incoming-mutex*) (set! *incoming-writes* (cons qry-pkt *incoming-writes*)) (set! queue-len (length *incoming-writes*)) (mutex-unlock! *incoming-mutex*) |
︙ | ︙ | |||
1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 | (begin (hash-table-delete! *completed-writes* qry-sig) (set! got-it #t))) (mutex-unlock! *completed-mutex*) (if (and (not got-it) (< (current-seconds) timeout)) (loop))) got-it)) (define (db:process-queue-item db item) (let* ((stmt-key (cdb:packet-get-qtype item)) (qry-sig (cdb:packet-get-query-sig item)) (return-address (cdb:packet-get-client-sig item)) (params (cdb:packet-get-params item)) | > > | 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 | (begin (hash-table-delete! *completed-writes* qry-sig) (set! got-it #t))) (mutex-unlock! *completed-mutex*) (if (and (not got-it) (< (current-seconds) timeout)) (loop))) (set! *number-of-writes* (+ *number-of-writes* 1)) (set! *writes-total-delay* (+ *writes-total-delay* 1)) got-it)) (define (db:process-queue-item db item) (let* ((stmt-key (cdb:packet-get-qtype item)) (qry-sig (cdb:packet-get-query-sig item)) (return-address (cdb:packet-get-client-sig item)) (params (cdb:packet-get-params item)) |
︙ | ︙ |
Modified http-transport.scm from [f36a48c1d7] to [9e3e9e2469].
︙ | ︙ | |||
248 249 250 251 252 253 254 | (loop 0)) (begin (debug:print-info 0 "Starting to shutdown the server.") ;; need to delete only *my* server entry (future use) (set! *time-to-exit* #t) (tasks:server-deregister-self tdb (get-host-name)) (thread-sleep! 1) | | > > | 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 | (loop 0)) (begin (debug:print-info 0 "Starting to shutdown the server.") ;; need to delete only *my* server entry (future use) (set! *time-to-exit* #t) (tasks:server-deregister-self tdb (get-host-name)) (thread-sleep! 1) (debug:print-info 0 "Max cached queries was " *max-cache-size*) (debug:print-info 0 "Number of cached writes " *number-of-writes*) (debug:print-info 0 "Average cached write time " (/ *writes-total-delay* *number-of-writes*) " ms") (debug:print-info 0 "Server shutdown complete. Exiting") (exit))))))) ;; all routes though here end in exit ... (define (http-transport:launch) (if (not *toppath*) (if (not (setup-for-run)) |
︙ | ︙ |
Modified server.scm from [34877c239b] to [65c872df95].
︙ | ︙ | |||
57 58 59 60 61 62 63 64 65 66 67 68 69 70 | (else (debug:print "WARNING: unrecognised transport " transport) (exit)))) ;;====================================================================== ;; Q U E U E M A N A G E M E N T ;;====================================================================== ;; Flush the queue every third of a second. Can we assume that setup-for-run ;; has already been done? (define (server:write-queue-handler) (if (setup-for-run) (let ((db (open-db))) (let loop () | > > > > > > > > > | | | | | 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 | (else (debug:print "WARNING: unrecognised transport " transport) (exit)))) ;;====================================================================== ;; Q U E U E M A N A G E M E N T ;;====================================================================== ;; We don't want to flush the queue if it was just flushed (define *server:last-write-flush* (current-milliseconds)) ;; Flush the queue every third of a second. Can we assume that setup-for-run ;; has already been done? (define (server:write-queue-handler) (if (setup-for-run) (let ((db (open-db))) (let loop () (let ((last-write-flush-time #f)) (mutex-lock! *incoming-mutex*) (set! last-write-flush-time *server:last-write-flush*) (mutex-unlock! *incoming-mutex*) (if (> (- (current-milliseconds) last-write-flush-time) 400) (begin (mutex-lock! *db:process-queue-mutex*) (db:process-cached-writes db) (mutex-unlock! *db:process-queue-mutex*) (thread-sleep! 0.5)))) (loop))) (begin (debug:print 0 "ERROR: failed to setup for Megatest in server:write-queue-handler") (exit 1)))) ;;====================================================================== ;; S E R V E R U T I L I T I E S |
︙ | ︙ |