1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
;; pub/sub with envelope address
;; Note that if you don't insert a sleep, the server will crash with SIGPIPE as soon
;; as a client disconnects. Also a remaining client may receive tons of
;; messages afterward.
(use zmq srfi-18 sqlite3)
(define pub (make-socket 'pub))
(define pull (make-socket 'pull))
(define cname "server")
(define total-db-accesses 0)
(define start-time (current-seconds))
(bind-socket pub "tcp://*:5563")
(bind-socket pull "tcp://*:5564")
(define (open-db)
(let* ((dbpath "mockup.db")
(dbexists (file-exists? dbpath))
(db (open-database dbpath)) ;; (never-give-up-open-db dbpath))
(handler (make-busy-timeout 10)))
(set-busy-handler! db handler)
|
>
>
>
|
|
>
>
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
;; pub/sub with envelope address
;; Note that if you don't insert a sleep, the server will crash with SIGPIPE as soon
;; as a client disconnects. Also a remaining client may receive tons of
;; messages afterward.
(use zmq srfi-18 sqlite3)
(define pub (make-socket 'pub))
(define pull (make-socket 'pull))
(define cname "server")
(define total-db-accesses 0)
(define start-time (current-seconds))
(socket-option-set! pub 'hwm 1000)
(socket-option-set! pull 'hwm 1000)
(bind-socket pub "tcp://*:6563")
(bind-socket pull "tcp://*:6564")
(thread-sleep! 0.2)
(define (open-db)
(let* ((dbpath "mockup.db")
(dbexists (file-exists? dbpath))
(db (open-database dbpath)) ;; (never-give-up-open-db dbpath))
(handler (make-busy-timeout 10)))
(set-busy-handler! db handler)
|
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
(set! res val))
db
"SELECT val FROM vars WHERE var=?;" cdata)
res))
(else (conc "unk cmd: " clcmd))))))
queuelst)))
(define th1 (make-thread
(lambda ()
(let ((last-run 0)) ;; current-seconds when run last
(let loop ((queuelst '()))
(let* ((indat (receive-message* pull))
(parts (string-split indat ":"))
(cname (car parts)) ;; client name
(clcmd (string->symbol (cadr parts))) ;; client cmd
(cdata (caddr parts)) ;; client data
(svect (vector (current-seconds) cname clcmd cdata))) ;; record for the queue
(count-client db cname)
(case clcmd
((sync) ;; just process the queue
(print "Got sync from " cname)
(process-queue (cons svect queuelst))
(loop '()))
((get)
(process-queue (cons svect queuelst))
(loop '()))
(else
(loop (cons svect queuelst))))))))
"server thread"))
(include "mockupclientlib.scm")
;; send a sync to the pull port
(define th2 (make-thread
(lambda ()
(let ((last-action-time (current-seconds)))
(let loop ()
(thread-sleep! 5)
(let ((queuelen (string->number (dbaccess "server" 'sync "nada" #f)))
|
>
>
>
>
>
>
>
>
|
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
(set! res val))
db
"SELECT val FROM vars WHERE var=?;" cdata)
res))
(else (conc "unk cmd: " clcmd))))))
queuelst)))
;; SERVER THREAD
(define th1 (make-thread
(lambda ()
(let ((last-run 0)) ;; current-seconds when run last
(let loop ((queuelst '()))
(let* ((indat (receive-message* pull))
(parts (string-split indat ":"))
(cname (car parts)) ;; client name
(clcmd (string->symbol (cadr parts))) ;; client cmd
(cdata (caddr parts)) ;; client data
(svect (vector (current-seconds) cname clcmd cdata))) ;; record for the queue
;; (print "Server received message: " indat)
(count-client db cname)
(case clcmd
((ping)
(print "Got ping from " cname)
(send-message pub cname send-more: #t)
(send-message pub "Got ping")
(loop queuelst))
((sync) ;; just process the queue
(print "Got sync from " cname)
(process-queue (cons svect queuelst))
(loop '()))
((get)
(process-queue (cons svect queuelst))
(loop '()))
(else
(loop (cons svect queuelst))))))))
"server thread"))
(include "mockupclientlib.scm")
;; SYNC THREAD
;; send a sync to the pull port
(define th2 (make-thread
(lambda ()
(let ((last-action-time (current-seconds)))
(let loop ()
(thread-sleep! 5)
(let ((queuelen (string->number (dbaccess "server" 'sync "nada" #f)))
|