Overview
Comment: | Added queued.scm for testing db idea with re-ordered queries |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | v1.65 |
Files: | files | file ages | folders |
SHA1: |
6a182939795451cf6d3817c41c343ecf |
User & Date: | matt on 2017-07-05 04:13:30 |
Other Links: | branch diff | manifest | tags |
Context
2017-07-05
| ||
18:28 | Partial commit of tsend and tlisten check-in: ddc112387c user: mrwellan tags: v1.65 | |
04:13 | Added queued.scm for testing db idea with re-ordered queries check-in: 6a18293979 user: matt tags: v1.65 | |
2017-07-04
| ||
22:59 | Merged from v1.64 check-in: c269abcad7 user: matt tags: v1.65 | |
Changes
Added minimt/queued.scm version [71e1ba00f3].
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 | (use nanomsg defstruct srfi-18) ;;====================================================================== ;; Commands ;;====================================================================== (define *commands* (make-hash-table)) (defstruct cmd key proc ctype ;; command type; 'r (read), 'w (write) or 't (transaction) ) (define (register-command key ctype proc) (hash-table-set! *commands* key (make-cmd key: key ctype: ctype proc: proc))) (define (get-proc key) (cmd-proc (hash-table-ref key *commands*))) (for-each (lambda (dat) (apply register-command dat)) `( (create-run w ,create-run) (create-step w ,create-step) (create-test w ,create-test) (get-test-id r ,get-test-id) (get-run-id r ,get-run-id) ;; (open-db w ,open-create-db) (step-set-ss w ,step-set-state-status) (test-set-ss w ,test-set-state-status) (test-get-tests r ,test-get-tests) )) ;;====================================================================== ;; Server/client stuff ;;====================================================================== (define-inline (encode data) (with-output-to-string (lambda () (write data)))) (define-inline (decode data) (with-input-from-string data (lambda () (read)))) ;;====================================================================== ;; Command queue ;;====================================================================== (defstruct qitem command params host-port) (define *cmd-queue* '()) (define *queue-mutex* (make-mutex)) (define (queue-push cmddat) (mutex-lock! *queue-mutex*) (set! *cmd-queue* (cons cmddat *cmd-queue*)) (mutex-unlock! *queue-mutex*)) ;; get all the cmds of type ctype and return them, also remove them from the queue (define (queue-take ctype) (mutex-lock! *queue-mutex*) (let ((res (filter (lambda (x)(eq? (cmd-ctype x) ctype)) *cmd-queue*)) (rem (filter (lambda (x)(not (eq? (cmd-ctype x) ctype))) *cmd-queue*))) (set! *queue* rem) (mutex-unlock! *queue-mutex*) res)) (define (queue-process-commands dbconn commands) (for-each (lambda (qitem) (let ((soc (request-connect (qitem-host-port qitem))) ;; we will be sending the data back to host-port via soc (cmd (hash-table-ref/default *commands* (qitem-command qitem) #f))) (if cmd (let* ((res (apply (get-proc cmd) dbconn (qitem-params qitem))) (pkg (encode `((r . ,res))))) (nn-send soc pkg) (if (not (eq? (nn-recv soc)) "ok") (print "Client failed to receive properly the data from " cmd " request")))))) commands)) ;; the continuously running queue processor ;; (define ((queue-processor dbconn)) (let loop () (queue-process-commands dbconn (queue-take 'r)) ;; reads first, probably largest numbers of them (queue-process-commands dbconn (queue-take 'w)) ;; writes next (queue-process-commands dbconn (queue-take 't)) ;; lastly process transactions (thread-sleep! 0.2) ;; open up the db for any other processes to access (loop))) ;;====================================================================== ;; Client stuff ;;====================================================================== ;; client struct (defstruct client host-port socket last-access) (define *clients* (make-hash-table)) ;; host:port -> client struct (define *client-mutex* (make-mutex)) ;; add a channel or return existing channel, this is a normal req ;; (define (request-connect host-port) (mutex-lock! *client-mutex*) (let* ((curr (hash-table-ref/default *clients* host-port #f))) (if curr (begin (mutex-unlock! *client-mutex*) curr) (let ((req (nn-socket 'req))) (nn-connect req host-port) ;; "inproc://test") (hash-table-set! *clients* host-port req) (mutex-unlock! *client-mutex*) req)))) ;; open up a channel to the server and send a package of info for the server to act on ;; host-port needs to be found and provided ;; (define (generic-db-access host-port) (let* ((soc (request-connect host-port)) ;; NEED *MY* host/port also to let the server know where to send the results ))) (define (client-send-receive soc msg) (nn-send soc msg) (nn-recv soc)) ;;====================================================================== ;; Server ;;====================================================================== (defstruct srvdat host port soc) ;; remember, everyone starts a server, both client and the actual server alike. ;; clients start a server for the server to return results to. ;; (define (start-raw-server #!key (given-host-name #f)) (let ((srvdat (let loop ((portnum 10000)) (handle-exceptions exn (if (< portnum 64000) (loop (+ portnum 1)) #f) (let* ((rep (nn-socket 'rep))) (nn-bind rep (conc "tcp://*:" portnum)) ;; "inproc://test") (make-srvdat port: portnum soc: rep))))) (host-name (or give-host-name (get-host-name))) (soc (srvdat-soc srvdat))) (srvdat-host-set! srvdat host-name) srvdat)) ;; The actual *server* side server ;; (define (start-server dbconn #!key (given-host-name #f)) (let* ((srvdat (start-raw-server given-host-name: given-host-name)) (host-name (srvdat-host srvdat)) (soc (srvdat-soc srvdat))) ;; start the queue processor (thread-start! (queue-processory dbconn) "Queue processor") ;; msg is an alist ;; 'r host:port <== where to return the data ;; 'p params <== data to apply the command to ;; 'e j|s|l <== encoding of the params. default is s (sexp), if not specified is assumed to be default ;; 'c command <== look up the function to call using this key ;; (let loop ((msg-in (nn-recv soc))) (if (not (equal? msg-in "quit")) (let* ((dat (decode msg-in)) (host-port (alist-ref 'r dat)) ;; this is for the reverse req rep where the server is a client of the original client (params (alist-ref 'p dat)) (command (let ((c (alist-ref 'c dat)))(if c (string->symbol c) #f))) (all-good (and host-port params command (hash-table-exists? *commands* command)))) (if all-good (let ((cmddat (make-qitem command: command host-port: host-port params: params))) (queue-push cmddat) ;; put request into the queue (nn-send soc "queued")) ;; reply with "queued" (print "ERROR: BAD request " dat)) (loop (nn-recv soc))))) (nn-close soc))) ;;====================================================================== ;; Gasket layer ;;====================================================================== (define rmt:open-create-db open-create-db) (define (rmt:create-run . params) |