Megatest

Check-in [e1bc6c1905]
Login
Overview
Comment:Adding mock up of dual channel approach
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | interleaved-queries
Files: files | file ages | folders
SHA1: e1bc6c19055c4622e7e7c1e28a7c9a9506c48652
User & Date: matt on 2012-11-17 07:48:04
Other Links: branch diff | manifest | tags
Context
2012-11-17
09:16
mockup works check-in: e576c93a7e user: matt tags: interleaved-queries
07:48
Adding mock up of dual channel approach check-in: e1bc6c1905 user: matt tags: interleaved-queries
2012-11-16
13:10
tweaks eh check-in: aea2d07d89 user: mrwellan tags: interleaved-queries
Changes

Added testzmq/mockupclient.scm version [5ff921ff82].































1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
(use zmq)

(define cname "Bob")
(let ((args (argv)))
  (if (< (length args) 3)
      (begin
	(print "Usage: mockupclient clientname")
	(exit))
      (set! cname (cadr args))))
      
(define sub  (make-socket 'sub))
(define push (make-socket 'push))
(socket-option-set! sub 'subscribe cname)
(connect-socket sub "tcp://localhost:5563")
(connect-socket push "tcp://localhost:5564")

(define (dbaccess cmd var val)
  (let ((msg (conc cname ":" cmd ":" (if val (conc var " " val) var))))
    (print "Sending msg: " msg)
    (send-message push msg)
    (receive-message* sub)))

(let loop ()
  (case (random 5)
    ((1)(dbaccess sync "" #f))
    (else
     (thread-sleep! 1)))
  (loop))


Added testzmq/mockupserver.scm version [81fda153ef].






























































































1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
;; pub/sub with envelope address
;; Note that if you don't insert a sleep, the server will crash with SIGPIPE as soon
;; as a client disconnects.  Also a remaining client may receive tons of
;; messages afterward.

(use zmq srfi-18 sqlite3)

(define pub (make-socket 'pub))
(define pull (make-socket 'pull))

(bind-socket pub "tcp://*:5563")
(bind-socket pull "tcp://*:5564")

(define (open-db)
  (let* ((dbpath    "mockupserver.db")
	 (dbexists  (file-exists? dbpath))
	 (db        (open-database dbpath)) ;; (never-give-up-open-db dbpath))
	 (handler   (make-busy-timeout 10)))
    (set-busy-handler! db handler)
    (if (not dbexists)
	(for-each
	 (lambda (stmt)
	   (execute db stmt))
	 (list
	  "CREATE TABLE clients (id INTEGER PRIMARY KEY,name TEXT,num_accesses INTEGER);"
	  "CREATE TABLE vars    (var TEXT,val TEXT,CONSTRAINT vars_constraint UNIQUE (var));")))
    db))

(define db (open-db))
;; (define queuelst '())
;; (define mx1 (make-mutex))

(define (process-queue queuelst)
  (for-each
   (lambda (item)
     (let ((cname (vector-ref item 1))
	   (clcmd (vector-ref item 2))
	   (cdata (vector-ref item 3)))
       (send-message pub cname send-more: #t)
       (send-message pub (case clcmd
			   ((setval)
			    (apply execute db "INSERT OR REPLACE INTO vars (var,val) VALUES (?,?);" (string-split cdata))
			    "ok")
			   ((getval)
			    (let ((res "noval"))
			      (for-each-row
			       (lambda (val)
				 (set! res val))
			       db 
			       "SELECT val FROM vars WHERE var=?;" cdata)
			      res))
			   (else (conc "unk cmd: " clcmd))))))
   queuelst))

(define th1 (make-thread 
	     (lambda ()
	       (let ((last-run 0)) ;; current-seconds when run last
		 (let loop ((queuelst '()))
		   (let* ((indat (receive-message* pull))
			  (parts (string-split indat ":"))
			  (cname (car parts))                   ;; client name
			  (clcmd (string->symbol (cadr parts))) ;; client cmd
			  (cdata (caddr parts))                 ;; client data
			  (svect (vector (current-seconds) cname clcmd cdata))) ;; record for the queue
		     ;; (print "Got indat=" indat)
		     (case clcmd
		       ((sync) ;; just process the queue
			(print "Got sync from " cname)
			(process-queue queuelst)
			(loop '()))
		       ((imediate)
			(process-queue (cons svect queuelst))
			(loop '()))
		       (else
			(loop (cons svect queuelst))))))))
	     "server thread"))

(define push (make-socket 'push))
(connect-socket push "tcp://localhost:5564")

;; send a sync to the pull port
(define th2 (make-thread
	     (lambda ()
	       (let loop ()
		 (thread-sleep! 3)
		 ;; (print "Sending sync from server")
		 (send-message push "server:sync:nodat")
		 (loop)))
	     "sync thread"))

(thread-start! th1)
(thread-start! th2)
(thread-join! th1)

Added testzmq/testmockup.sh version [9fa13ed15b].











1
2
3
4
5
6
7
8
9
10
+
+
+
+
+
+
+
+
+
+
#!/bin/bash

csc mockupserver.scm
csc mockupclient.scm

./mockupserver &

for i in a b;do
  ./mockupclient $i &
done