Overview
Comment: | Adding mock up of dual channel approach |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | interleaved-queries |
Files: | files | file ages | folders |
SHA1: |
e1bc6c19055c4622e7e7c1e28a7c9a95 |
User & Date: | matt on 2012-11-17 07:48:04 |
Other Links: | branch diff | manifest | tags |
Context
2012-11-17
| ||
09:16 | mockup works check-in: e576c93a7e user: matt tags: interleaved-queries | |
07:48 | Adding mock up of dual channel approach check-in: e1bc6c1905 user: matt tags: interleaved-queries | |
2012-11-16
| ||
13:10 | tweaks eh check-in: aea2d07d89 user: mrwellan tags: interleaved-queries | |
Changes
Added testzmq/mockupclient.scm version [5ff921ff82].
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | (use zmq) (define cname "Bob") (let ((args (argv))) (if (< (length args) 3) (begin (print "Usage: mockupclient clientname") (exit)) (set! cname (cadr args)))) (define sub (make-socket 'sub)) (define push (make-socket 'push)) (socket-option-set! sub 'subscribe cname) (connect-socket sub "tcp://localhost:5563") (connect-socket push "tcp://localhost:5564") (define (dbaccess cmd var val) (let ((msg (conc cname ":" cmd ":" (if val (conc var " " val) var)))) (print "Sending msg: " msg) (send-message push msg) (receive-message* sub))) (let loop () (case (random 5) ((1)(dbaccess sync "" #f)) (else (thread-sleep! 1))) (loop)) |
Added testzmq/mockupserver.scm version [81fda153ef].
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 | ;; pub/sub with envelope address ;; Note that if you don't insert a sleep, the server will crash with SIGPIPE as soon ;; as a client disconnects. Also a remaining client may receive tons of ;; messages afterward. (use zmq srfi-18 sqlite3) (define pub (make-socket 'pub)) (define pull (make-socket 'pull)) (bind-socket pub "tcp://*:5563") (bind-socket pull "tcp://*:5564") (define (open-db) (let* ((dbpath "mockupserver.db") (dbexists (file-exists? dbpath)) (db (open-database dbpath)) ;; (never-give-up-open-db dbpath)) (handler (make-busy-timeout 10))) (set-busy-handler! db handler) (if (not dbexists) (for-each (lambda (stmt) (execute db stmt)) (list "CREATE TABLE clients (id INTEGER PRIMARY KEY,name TEXT,num_accesses INTEGER);" "CREATE TABLE vars (var TEXT,val TEXT,CONSTRAINT vars_constraint UNIQUE (var));"))) db)) (define db (open-db)) ;; (define queuelst '()) ;; (define mx1 (make-mutex)) (define (process-queue queuelst) (for-each (lambda (item) (let ((cname (vector-ref item 1)) (clcmd (vector-ref item 2)) (cdata (vector-ref item 3))) (send-message pub cname send-more: #t) (send-message pub (case clcmd ((setval) (apply execute db "INSERT OR REPLACE INTO vars (var,val) VALUES (?,?);" (string-split cdata)) "ok") ((getval) (let ((res "noval")) (for-each-row (lambda (val) (set! res val)) db "SELECT val FROM vars WHERE var=?;" cdata) res)) (else (conc "unk cmd: " clcmd)))))) queuelst)) (define th1 (make-thread (lambda () (let ((last-run 0)) ;; current-seconds when run last (let loop ((queuelst '())) (let* ((indat (receive-message* pull)) (parts (string-split indat ":")) (cname (car parts)) ;; client name (clcmd (string->symbol (cadr parts))) ;; client cmd (cdata (caddr parts)) ;; client data (svect (vector (current-seconds) cname clcmd cdata))) ;; record for the queue ;; (print "Got indat=" indat) (case clcmd ((sync) ;; just process the queue (print "Got sync from " cname) (process-queue queuelst) (loop '())) ((imediate) (process-queue (cons svect queuelst)) (loop '())) (else (loop (cons svect queuelst)))))))) "server thread")) (define push (make-socket 'push)) (connect-socket push "tcp://localhost:5564") ;; send a sync to the pull port (define th2 (make-thread (lambda () (let loop () (thread-sleep! 3) ;; (print "Sending sync from server") (send-message push "server:sync:nodat") (loop))) "sync thread")) (thread-start! th1) (thread-start! th2) (thread-join! th1) |
Added testzmq/testmockup.sh version [9fa13ed15b].
> > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 | #!/bin/bash csc mockupserver.scm csc mockupclient.scm ./mockupserver & for i in a b;do ./mockupclient $i & done |