15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
(use spiffy uri-common intarweb http-client spiffy-request-vars)
(declare (unit server))
(declare (uses common))
(declare (uses db))
(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running.
(declare (uses http-transport))
(declare (uses zmq-transport))
(include "common_records.scm")
(include "db_records.scm")
(define (server:make-server-url hostport)
|
>
|
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
(use spiffy uri-common intarweb http-client spiffy-request-vars)
(declare (unit server))
(declare (uses common))
(declare (uses db))
(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running.
(declare (uses synchash))
(declare (uses http-transport))
(declare (uses zmq-transport))
(include "common_records.scm")
(include "db_records.scm")
(define (server:make-server-url hostport)
|
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
|
;;======================================================================
;; S E R V E R
;;======================================================================
;; Call this to start the actual server
;;
(define *db:process-queue-mutex* (make-mutex))
(define (server:run hostn)
(debug:print 2 "Attempting to start the server ...")
(if (not *toppath*)
(if (not (setup-for-run))
(begin
(debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting")
(exit))))
(let* (;; (iface (if (string=? "-" hostn)
;; #f ;; (get-host-name)
;; hostn))
(db #f) ;; (open-db)) ;; we don't want the server to be opening and closing the db unnecesarily
(hostname (get-host-name))
(ipaddrstr (let ((ipstr (if (string=? "-" hostn)
(string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
#f)))
(if ipstr ipstr hostn))) ;; hostname)))
(start-port (if (args:get-arg "-port")
(string->number (args:get-arg "-port"))
(+ 5000 (random 1001))))
(link-tree-path (config-lookup *configdat* "setup" "linktree")))
(set! *cache-on* #t)
(root-path (if link-tree-path
link-tree-path
(current-directory))) ;; WARNING: SECURITY HOLE. FIX ASAP!
;; Setup the web server and a /ctrl interface
;;
(vhost-map `(((* any) . ,(lambda (continue)
;; open the db on the first call
(if (not db)(set! db (open-db)))
(let* (($ (request-vars source: 'both))
(dat ($ 'dat))
(res #f))
(cond
((equal? (uri-path (request-uri (current-request)))
'(/ "hey"))
(send-response body: "hey there!\n"
headers: '((content-type text/plain))))
;; This is the /ctrl path where data is handed to the server and
;; responses
((equal? (uri-path (request-uri (current-request)))
'(/ "ctrl"))
(let* ((packet (db:string->obj dat))
(qtype (cdb:packet-get-qtype packet)))
(debug:print-info 12 "server=> received packet=" packet)
(if (not (member qtype '(sync ping)))
(begin
(mutex-lock! *heartbeat-mutex*)
(set! *last-db-access* (current-seconds))
(mutex-unlock! *heartbeat-mutex*)))
;; (mutex-lock! *db:process-queue-mutex*) ;; trying a mutex
;; (set! res (open-run-close db:process-queue-item open-db packet))
(set! res (db:process-queue-item db packet))
;; (mutex-unlock! *db:process-queue-mutex*)
(debug:print-info 11 "Return value from db:process-queue-item is " res)
(send-response body: (conc "<head>ctrl data</head>\n<body>"
res
"</body>")
headers: '((content-type text/plain)))))
(else (continue))))))))
(server:try-start-server ipaddrstr start-port)
;; lite3:finalize! db)))
))
(define (server:mk-signature)
(message-digest-string (md5-primitive)
(with-output-to-string
(lambda ()
(write (list (current-directory)
(argv)))))))
;;======================================================================
;; S E R V E R U T I L I T I E S
;;======================================================================
;; When using zmq this would send the message back (two step process)
;; with spiffy or rpc this simply returns the return data to be returned
;;
(define (server:reply return-addr query-sig success/fail result)
(debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result)
;; (send-message pubsock target send-more: #t)
;; (send-message pubsock
(case *transport-type*
((fs) result)
((http)(db:obj->string (vector success/fail query-sig result)))
((zmq)
(let ((pub-socket (vector-ref *runremote* 1)))
(send-message pub-socket return-addr send-more: #t)
(send-message pub-socket (db:obj->string (vector success/fail query-sig result)))))
(else
(debug:print 0 "ERROR: unrecognised transport type: " *transport-type*)
result)))
;; all routes though here end in exit ...
(define (server:launch transport)
(if (not *toppath*)
(if (not (setup-for-run))
(begin
(debug:print 0 "ERROR: cannot find megatest.config, exiting")
(exit))))
(debug:print-info 2 "Starting server using " transport " transport")
(set! *transport-type* transport)
(case transport
((fs) (exit)) ;; there is no "fs" transport
((http) (http-transport:launch))
((zmq) (zmq-transport:launch))
(else
(debug:print "WARNING: unrecognised transport " transport)
(exit))))
|
<
|
|
<
|
|
|
|
|
|
|
|
>
|
<
<
<
<
<
<
<
<
<
<
>
|
>
|
<
<
<
<
<
<
>
|
<
|
|
>
|
|
<
|
|
|
|
>
>
|
|
<
<
<
|
<
|
|
<
<
<
<
<
|
<
|
>
>
|
|
>
>
>
>
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
|
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
;;======================================================================
;; S E R V E R
;;======================================================================
;; Call this to start the actual server
;;
;; all routes though here end in exit ...
(define (server:launch transport)
(if (not *toppath*)
(if (not (setup-for-run))
(begin
(debug:print 0 "ERROR: cannot find megatest.config, exiting")
(exit))))
(debug:print-info 2 "Starting server using " transport " transport")
(set! *transport-type* transport)
(case transport
((fs) (exit)) ;; there is no "fs" transport
((http) (http-transport:launch))
((zmq) (zmq-transport:launch))
(else
(debug:print "WARNING: unrecognised transport " transport)
(exit))))
;;======================================================================
;; Q U E U E M A N A G E M E N T
;;======================================================================
;; We don't want to flush the queue if it was just flushed
(define *server:last-write-flush* (current-milliseconds))
;; Flush the queue every third of a second. Can we assume that setup-for-run
;; has already been done?
(define (server:write-queue-handler)
(if (setup-for-run)
(let ((db (open-db)))
(let loop ()
(let ((last-write-flush-time #f))
(mutex-lock! *incoming-mutex*)
(set! last-write-flush-time *server:last-write-flush*)
(mutex-unlock! *incoming-mutex*)
(if (> (- (current-milliseconds) last-write-flush-time) 400)
(begin
(mutex-lock! *db:process-queue-mutex*)
(db:process-cached-writes db)
(mutex-unlock! *db:process-queue-mutex*)
(thread-sleep! 0.5))))
(loop)))
(begin
(debug:print 0 "ERROR: failed to setup for Megatest in server:write-queue-handler")
(exit 1))))
;;======================================================================
;; S E R V E R U T I L I T I E S
;;======================================================================
;; Generate a unique signature for this server
(define (server:mk-signature)
(message-digest-string (md5-primitive)
(with-output-to-string
(lambda ()
(write (list (current-directory)
(argv)))))))
;; When using zmq this would send the message back (two step process)
;; with spiffy or rpc this simply returns the return data to be returned
;;
(define (server:reply return-addr query-sig success/fail result)
(debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result)
;; (send-message pubsock target send-more: #t)
;; (send-message pubsock
(case *transport-type*
((fs) result)
((http)(db:obj->string (vector success/fail query-sig result)))
((zmq)
(let ((pub-socket (vector-ref *runremote* 1)))
(send-message pub-socket return-addr send-more: #t)
(send-message pub-socket (db:obj->string (vector success/fail query-sig result)))))
(else
(debug:print 0 "ERROR: unrecognised transport type: " *transport-type*)
result)))
|