1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
;;;; rpc-demo.scm
;;;; Simple database server / client
;;; start server thusly: ./rpctest server test.db
;;; you will need to init test.db:
;;; sqlite3 test.db "CREATE TABLE foo (id INTEGER PRIMARY KEY, var TEXT, val TEXT);"
(require-extension (srfi 18) extras tcp rpc sqlite3)
;;; Common things
(define total-queries 0)
(define start-time (current-seconds))
(define operation (string->symbol (car (command-line-arguments))))
(define param (cadr (command-line-arguments)))
(print "Operation: " operation ", param: " param)
(define rpc:listener
(if (eq? operation 'server)
(tcp-listen (rpc:default-server-port))
(tcp-listen 0)))
;; Start server thread
|
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
;;;; rpc-demo.scm
;;;; Simple database server / client
;;; start server thusly: ./rpctest server test.db
;;; you will need to init test.db:
;;; sqlite3 test.db "CREATE TABLE foo (id INTEGER PRIMARY KEY, var TEXT, val TEXT);"
(require-extension (srfi 18) extras tcp rpc sql-de-lite)
;;; Common things
(define total-queries 0)
(define start-time (current-seconds))
(define operation (string->symbol (car (command-line-arguments))))
(define param (cadr (command-line-arguments)))
(print "Operation: " operation ", param: " param)
;; have a pool of db's to pick from
(define *dbpool* '())
(define *pool-mutex* (make-mutex))
(define (get-db)
(mutex-lock! *pool-mutex*)
(if (null? *dbpool*)
(begin
(mutex-unlock! *pool-mutex*)
(let ((db (open-database param)))
(set-busy-handler! db (busy-timeout 10000))
(exec (sql db "PRAGMA synchronous=0;"))
db))
(let ((res (car *dbpool*)))
(set! *dbpool* (cdr *dbpool*))
(mutex-unlock! *pool-mutex*)
res)))
(define (return-db db)
(mutex-lock! *pool-mutex*)
(set! *dbpool* (cons db *dbpool* ))
(let ((res (length *dbpool*)))
(mutex-unlock! *pool-mutex*)
res))
(define rpc:listener
(if (eq? operation 'server)
(tcp-listen (rpc:default-server-port))
(tcp-listen 0)))
;; Start server thread
|
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
(define (server)
(rpc:publish-procedure!
'change-response-port
(lambda (port)
(rpc:default-server-port port))
#f)
(let ((db (open-database param)))
(set-finalizer! db finalize!)
(rpc:publish-procedure!
'query
(lambda (sql callback)
(set! total-queries (+ total-queries 1))
(print "Executing query '" sql "' ...")
(for-each-row
callback
db sql)
(print "Query rate: " (/ total-queries (/ (- (current-seconds) start-time) 60)) " per minute")
)))
(thread-join! rpc:server))
;;; Client side
(define (callback1 . columns)
(let loop ((c columns) (i 0))
|
|
|
|
|
|
|
|
>
|
|
|
>
|
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
(define (server)
(rpc:publish-procedure!
'change-response-port
(lambda (port)
(rpc:default-server-port port))
#f)
;;(let ((db (get-db))(open-database param)))
;; (set-finalizer! db finalize!)
(rpc:publish-procedure!
'query
(lambda (sqlstmt callback)
(set! total-queries (+ total-queries 1))
(print "Executing query '" sqlstmt "' ...")
(let ((db (get-db)))
(query (for-each-row
callback)
(sql db sqlstmt))
(print "Query rate: " (/ total-queries (/ (- (current-seconds) start-time) 60)) " per minute")
(print "num dbs: " (return-db db))
)))
(thread-join! rpc:server))
;;; Client side
(define (callback1 . columns)
(let loop ((c columns) (i 0))
|
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
(define (client)
((rpc:procedure 'change-response-port "localhost")
(tcp-listener-port rpc:listener))
((rpc:procedure 'query "localhost") param callback1)
(rpc:publish-procedure! 'callback2 callback2)
((rpc:procedure 'query "localhost") param callback2)
(pp callback2-results))
;;; Run it
(if (eq? operation 'server)
(server)
(client))
|
|
>
|
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
(define (client)
((rpc:procedure 'change-response-port "localhost")
(tcp-listener-port rpc:listener))
((rpc:procedure 'query "localhost") param callback1)
(rpc:publish-procedure! 'callback2 callback2)
((rpc:procedure 'query "localhost") param callback2)
(pp callback2-results)
(rpc:close-connection! "localhost" (rpc:default-server-port)))
;;; Run it
(if (eq? operation 'server)
(server)
(client))
|