Index: api.scm ================================================================== --- api.scm +++ api.scm @@ -114,11 +114,11 @@ ;; These are called by the server on recipt of /api calls ;; - keep it simple, only return the actual result of the call, i.e. no meta info here ;; ;; - returns #( flag result ) ;; -(define (api:execute-requests dbstruct dat) +(define (api:execute-requests dbstruct dat queues) (handle-exceptions exn (let ((call-chain (get-call-chain))) (debug:print 0 *default-log-port* "WARNING: api:execute-requests received an exception from peer, dat=" dat) (print-call-chain (current-error-port)) @@ -147,11 +147,13 @@ (case cmd ;;=============================================== ;; READ/WRITE QUERIES ;;=============================================== - ((get-keys-write) (db:get-keys dbstruct)) ;; force a dummy "write" query to force server; for debug in -repl + ((get-keys-write) (api:queued-request queues 'write params + (lambda () + (db:get-keys dbstruct)))) ;; force a dummy "write" query to force server; for debug in -repl ;; SERVERS ((start-server) (apply server:kind-run params)) ((kill-server) (set! *server-run* #f)) @@ -322,11 +324,11 @@ (define (api:process-request dbstruct $) ;; the $ is the request vars proc (set! *api-process-request-count* (+ *api-process-request-count* 1)) (let* ((cmd ($ 'cmd)) (paramsj ($ 'params)) (params (db:string->obj paramsj transport: 'http)) ;; incoming data from the POST (or is it a GET?) - (resdat (api:execute-requests dbstruct (vector cmd params))) ;; process the request, resdat = #( flag result ) + (resdat (api:execute-requests dbstruct (vector cmd params) *queues*)) ;; process the request, resdat = #( flag result ), we resort to a global here for the queues. (success (vector-ref resdat 0)) (res (vector-ref resdat 1))) ;; (vector flag payload), get the payload, ignore the flag (why?) (if (not success) (debug:print 0 *default-log-port* "ERROR: success flag is #f for " cmd " with params " params)) (if (> *api-process-request-count* *max-api-process-requests*) @@ -340,5 +342,93 @@ ;; (boolean? res)) ;; res ;; (list "ERROR, not string, list, number or boolean" 1 cmd params res))))) (db:obj->string res transport: 'http))) +(define api:queue-mutex (make-mutex)) + +(defstruct api:queues + (enable #f) + (dbstruct #f) ;; must be initialized! + (mutex (make-mutex)) + (readq '()) + (writeq '()) + (last-read (current-milliseconds)) + (last-write (current-milliseconds)) + (read-cvar (make-condition-variable "reads")) + (write-cvar (make-condition-variable "writes")) + ) + +;; api queued request handler +;; +;; qry-type: read write transaction +;; +(define (api:queued-request queues qry-type params proc) + ;; add proc to read, write queue or if transaction do it immediately (for now, not sure but might need to process differently.) + (if *queues* + (begin + (mutex-lock! (api:queue-mutex queues)) + (let ((dat (vector proc params #f))) ;; #f is placeholder for the result + (case qry-type + ((read) + (api:queue-readq-set! queues (cons dat (api:queue-readq queues))) + (mutex-unlock! (api:queue-mutex queues)(api:queue-read-cvar queues)) ;; unlock mutex and proceed when condition var is triggered + (vector-ref dat 2)) ;; return the value from the query to the caller + ((write) + (api:queue-writeq-set! queues (cons dat (api:queue-writeq queues))) + (mutex-unlock! (api:queue-mutex queues)(api:queue-write-cvar queues)) ;; unlock mutex and proceed when condition var is triggered + (vector-ref dat 2)) + (else + (proc))))) + (proc))) + +;; process queues +;; +(define (api:process-queues queues) + (mutex-lock (api:queues-mutex queues)) + (let* ((now (current-milliseconds)) + (due (- now 500)) ;; we will process the queue if it has not been processed in 500 ms + (reads (api:queues-readq queues)) + (writes (api:queues-writeq queues)) + (last-read (api:queues-last-read queues)) + (last-write (api:queues-last-write queues))) + (cond + ((and (>= last-read last-write) ;; nudge the system to toggle between processing the reads and processing the writes + (not (null? reads)) + (> due last-read)) + (db:with-db ;; process the procs inside a transaction + (api:queues-dbstruct queues) + #f + #f + (lambda (db) + (sqlite3:with-transaction ;; the transaction + db + (lambda () + (for-each + (lambda (procdat) + (vector-set! procdat 2 ((vector-ref procdat 0)))) ;; set vector 3rd pos to the result of calculating proc + reads))))) + ;; now reset the queue values + (api:queues-read-set! queues '()) + (api:queues-last-read-set! queues now) + (condition-variable-broadcast! (api:queues-read-cvar queues))) + ((and (not (null? writes)) + (> due last-write)) + (db:with-db + (api:queues-dbstruct queues) + #f + #f + (lambda (db) + (sqlite3:with-transaction + db + (lambda () + (for-each + (lambda (procdat) + (vector-set! procdat 2 ((vector-ref procdat 0)))) + writes))))) + ;; now reset the queue values + (api:queues-write-set! queues '()) + (api:queues-last-write-set! queues now) + (condition-variable-broadcast! (api:queues-write-cvar queues)))) + (mutex-unlock (api:queues-mutex queues)))) + + Index: common.scm ================================================================== --- common.scm +++ common.scm @@ -126,18 +126,19 @@ (define *time-to-exit* #f) (define *server-run* #t) (define *run-id* #f) (define *server-kind-run* (make-hash-table)) (define *home-host* #f) +(define *queues* (make-api:queues enable: #t)) ;; set up the queues for coalescing queries ;; (define *total-non-write-delay* 0) (define *heartbeat-mutex* (make-mutex)) (define *api-process-request-count* 0) (define *max-api-process-requests* 0) (define *server-overloaded* #f) ;; client -(define *rmt-mutex* (make-mutex)) ;; remote access calls mutex +(define *rmt-mutex* (make-mutex)) ;; remote access calls mutex ;; RPC transport (define *rpc:listener* #f) ;; KEY info Index: rmt.scm ================================================================== --- rmt.scm +++ rmt.scm @@ -85,11 +85,11 @@ ;; ensure we have a record for our connection for given area (if (not runremote) ;; can remove this one. should never get here. (begin (set! *runremote* (make-remote)) (set! runremote *runremote*))) ;; new runremote will come from this on next iteration - + ;; DOT SET_HOMEHOST; // leaving off - doesn't really add to the clarity ;; DOT MUTEXLOCK -> SET_HOMEHOST [label="no homehost?"]; ;; DOT SET_HOMEHOST -> MUTEXLOCK; ;; ensure we have a homehost record (if (not (pair? (remote-hh-dat runremote))) ;; not on homehost @@ -335,11 +335,11 @@ (db-file-path (db:dbfile-path)) ;; 0)) (dbstruct-local (db:setup #t)) ;; make-dbr:dbstruct path: dbdir local: #t))) (read-only (not (file-write-access? db-file-path))) (start (current-milliseconds)) (resdat (if (not (and read-only qry-is-write)) - (let ((v (api:execute-requests dbstruct-local (vector (symbol->string cmd) params)))) + (let ((v (api:execute-requests dbstruct-local (vector (symbol->string cmd) params *queues*)))) (handle-exceptions ;; there has been a long history of receiving strange errors from values returned by the client when things go wrong.. exn ;; This is an attempt to detect that situation and recover gracefully (begin (debug:print0 *default-log-port* "ERROR: bad data from server " v " message: " ((condition-property-accessor 'exn 'message) exn)) (vector #t '())) ;; should always get a vector but if something goes wrong return a dummy