1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
;; pub/sub with envelope address
;; Note that if you don't insert a sleep, the server will crash with SIGPIPE as soon
;; as a client disconnects. Also a remaining client may receive tons of
;; messages afterward.
(use zmq srfi-18 sqlite3)
(define pub (make-socket 'pub))
(define pull (make-socket 'pull))
(bind-socket pub "tcp://*:5563")
(bind-socket pull "tcp://*:5564")
(define (open-db)
(let* ((dbpath "mockup.db")
(dbexists (file-exists? dbpath))
(db (open-database dbpath)) ;; (never-give-up-open-db dbpath))
(handler (make-busy-timeout 10)))
(set-busy-handler! db handler)
(if (not dbexists)
(for-each
(lambda (stmt)
(execute db stmt))
(list
"CREATE TABLE clients (id INTEGER PRIMARY KEY,name TEXT,num_accesses INTEGER DEFAULT 0);"
"CREATE TABLE vars (var TEXT,val TEXT,CONSTRAINT vars_constraint UNIQUE (var));")))
db))
(define cid-cache (make-hash-table))
(define (get-client-id db cname)
|
>
>
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
;; pub/sub with envelope address
;; Note that if you don't insert a sleep, the server will crash with SIGPIPE as soon
;; as a client disconnects. Also a remaining client may receive tons of
;; messages afterward.
(use zmq srfi-18 sqlite3)
(define pub (make-socket 'pub))
(define pull (make-socket 'pull))
(define cname "server")
(bind-socket pub "tcp://*:5563")
(bind-socket pull "tcp://*:5564")
(define (open-db)
(let* ((dbpath "mockup.db")
(dbexists (file-exists? dbpath))
(db (open-database dbpath)) ;; (never-give-up-open-db dbpath))
(handler (make-busy-timeout 10)))
(set-busy-handler! db handler)
(if (not dbexists)
(for-each
(lambda (stmt)
(execute db stmt))
(list
"PRAGMA SYNCHRONOUS=0;"
"CREATE TABLE clients (id INTEGER PRIMARY KEY,name TEXT,num_accesses INTEGER DEFAULT 0);"
"CREATE TABLE vars (var TEXT,val TEXT,CONSTRAINT vars_constraint UNIQUE (var));")))
db))
(define cid-cache (make-hash-table))
(define (get-client-id db cname)
|
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
(for-each
(lambda (item)
(let ((cname (vector-ref item 1))
(clcmd (vector-ref item 2))
(cdata (vector-ref item 3)))
(send-message pub cname send-more: #t)
(send-message pub (case clcmd
((set)
(apply execute db "INSERT OR REPLACE INTO vars (var,val) VALUES (?,?);" (string-split cdata))
"ok")
((get)
(let ((res "noval"))
(for-each-row
(lambda (val)
|
>
>
|
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
(for-each
(lambda (item)
(let ((cname (vector-ref item 1))
(clcmd (vector-ref item 2))
(cdata (vector-ref item 3)))
(send-message pub cname send-more: #t)
(send-message pub (case clcmd
((sync)
"ok")
((set)
(apply execute db "INSERT OR REPLACE INTO vars (var,val) VALUES (?,?);" (string-split cdata))
"ok")
((get)
(let ((res "noval"))
(for-each-row
(lambda (val)
|
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
(clcmd (string->symbol (cadr parts))) ;; client cmd
(cdata (caddr parts)) ;; client data
(svect (vector (current-seconds) cname clcmd cdata))) ;; record for the queue
(count-client db cname)
(case clcmd
((sync) ;; just process the queue
(print "Got sync from " cname)
(process-queue queuelst)
(loop '()))
((imediate)
(process-queue (cons svect queuelst))
(loop '()))
(else
(loop (cons svect queuelst))))))))
"server thread"))
(define push (make-socket 'push))
(connect-socket push "tcp://localhost:5564")
;; send a sync to the pull port
(define th2 (make-thread
(lambda ()
(let loop ()
(thread-sleep! 5)
;; (print "Sending sync from server")
(send-message push "server:sync:nodat")
(loop)))
"sync thread"))
(thread-start! th1)
(thread-start! th2)
(thread-join! th1)
|
|
|
|
<
|
|
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
(clcmd (string->symbol (cadr parts))) ;; client cmd
(cdata (caddr parts)) ;; client data
(svect (vector (current-seconds) cname clcmd cdata))) ;; record for the queue
(count-client db cname)
(case clcmd
((sync) ;; just process the queue
(print "Got sync from " cname)
(process-queue (cons svect queuelst))
(loop '()))
((get)
(process-queue (cons svect queuelst))
(loop '()))
(else
(loop (cons svect queuelst))))))))
"server thread"))
(include "mockupclientlib.scm")
;; send a sync to the pull port
(define th2 (make-thread
(lambda ()
(let loop ()
(thread-sleep! 5)
;; (print "Sending sync from server")
(dbaccess "server" 'sync "nada" #f)
(loop)))
"sync thread"))
(thread-start! th1)
(thread-start! th2)
(thread-join! th1)
|