Megatest

Diff
Login

Differences From Artifact [49da501f89]:

To Artifact [881c1c2827]:


1
2
3
4
5
6
7
8
9
10

11
12
13
14
15
16
17
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18










+







;; pub/sub with envelope address
;; Note that if you don't insert a sleep, the server will crash with SIGPIPE as soon
;; as a client disconnects.  Also a remaining client may receive tons of
;; messages afterward.

(use zmq srfi-18 sqlite3)

(define pub (make-socket 'pub))
(define pull (make-socket 'pull))
(define cname "server")
(define total-db-accesses 0)

(bind-socket pub "tcp://*:5563")
(bind-socket pull "tcp://*:5564")

(define (open-db)
  (let* ((dbpath    "mockup.db")
	 (dbexists  (file-exists? dbpath))
38
39
40
41
42
43
44

45
46
47
48
49



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66

67
68
69

70
71
72
73
74
75
76
39
40
41
42
43
44
45
46
47
48
49
50

51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82







+




-
+
+
+

















+



+







	  (execute db "INSERT OR REPLACE INTO clients (name) VALUES(?);" cname)
	  (for-each-row 
	   (lambda (id)
	     (set! cid id))
	   db
	   "SELECT id FROM clients WHERE name=?;" cname)
	  (hash-table-set! cid-cache cname cid)
	  (set! total-db-accesses (+ total-db-accesses 2))
	  cid))))

(define (count-client db cname)
  (let ((cid (get-client-id db cname)))
    (execute db "UPDATE clients SET num_accesses=num_accesses+1 WHERE id=?;" cid)))
    (execute db "UPDATE clients SET num_accesses=num_accesses+1 WHERE id=?;" cid)
    (set! total-db-accesses (+ total-db-accesses 1))
    ))

(define db (open-db))
;; (define queuelst '())
;; (define mx1 (make-mutex))

(define (process-queue queuelst)
  (let ((queuelen (length queuelst)))
    (for-each
     (lambda (item)
       (let ((cname (vector-ref item 1))
	     (clcmd (vector-ref item 2))
	     (cdata (vector-ref item 3)))
	 (send-message pub cname send-more: #t)
	 (send-message pub (case clcmd
			     ((sync)
			      (conc queuelen))
			     ((set)
			      (set! total-db-accesses (+ total-db-accesses 1))
			      (apply execute db "INSERT OR REPLACE INTO vars (var,val) VALUES (?,?);" (string-split cdata))
			      "ok")
			     ((get)
			      (set! total-db-accesses (+ total-db-accesses 1))
			      (let ((res "noval"))
				(for-each-row
				 (lambda (val)
				   (set! res val))
				 db 
				 "SELECT val FROM vars WHERE var=?;" cdata)
				res))
105
106
107
108
109
110
111


112

113
114
115

116
117

118
119
120
121
122
123
124

111
112
113
114
115
116
117
118
119

120
121


122
123

124
125
126
127
128
129
130

131







+
+
-
+

-
-
+

-
+






-
+
;; send a sync to the pull port
(define th2 (make-thread
	     (lambda ()
	       (let ((last-action-time (current-seconds)))
		 (let loop ()
		   (thread-sleep! 5)
		   (let ((queuelen (string->number (dbaccess "server" 'sync "nada" #f)))
			 (last-action-delta #f))
		     (if (> queuelen 1)(set! last-action-time (current-seconds)))
			 (last-action-delta (- (current-seconds) last-action-time)))
		     (set! last-action-delta (- (current-seconds) last-action-time))
		     (print "Server: Got queuelen=" queuelen ", last-action-delta=" last-action-delta)
		     (if (> queuelen 1)(set! last-action-time (current-seconds)))
		     (if (< last-action-delta 15)
		     (if (< last-action-delta 25)
			 (loop)
			 (print "Server exiting, 15 seconds since last access"))))))
			 (print "Server exiting, 25 seconds since last access"))))))
	     "sync thread"))

(thread-start! th1)
(thread-start! th2)
(thread-join! th2)

(print "Server exited!")
(print "Server exited! Total db accesses=" total-db-accesses)