1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
;; Copyright 2006-2012, Matthew Welland.
;;
;; This program is made available under the GNU GPL version 2.0 or
;; greater. See the accompanying file COPYING for details.
;;
;; This program is distributed WITHOUT ANY WARRANTY; without even the
;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
;; PURPOSE.
(require-extension (srfi 18) extras tcp s11n)
(use srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest)
;; (use zmq)
(use spiffy uri-common intarweb http-client spiffy-request-vars)
(declare (unit server))
(declare (uses common))
(declare (uses db))
(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running.
(declare (uses synchash))
(declare (uses http-transport))
;; (declare (uses zmq-transport))
(declare (uses daemon))
(include "common_records.scm")
(include "db_records.scm")
(define (server:make-server-url hostport)
(if (not hostport)
|
|
>
>
|
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
;; Copyright 2006-2012, Matthew Welland.
;;
;; This program is made available under the GNU GPL version 2.0 or
;; greater. See the accompanying file COPYING for details.
;;
;; This program is distributed WITHOUT ANY WARRANTY; without even the
;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
;; PURPOSE.
(require-extension (srfi 18) extras tcp s11n)
(use srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest directory-utils)
;; (use zmq)
(use spiffy uri-common intarweb http-client spiffy-request-vars)
(declare (unit server))
(declare (uses common))
(declare (uses db))
(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running.
(declare (uses synchash))
(declare (uses http-transport))
(declare (uses rpc-transport))
(declare (uses nmsg-transport))
(declare (uses launch))
(declare (uses daemon))
(include "common_records.scm")
(include "db_records.scm")
(define (server:make-server-url hostport)
(if (not hostport)
|
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
|
;; S E R V E R
;;======================================================================
;; Call this to start the actual server
;;
;; all routes though here end in exit ...
(define (server:launch transport)
(if (not *toppath*)
(if (not (launch:setup-for-run))
(begin
(debug:print 0 "ERROR: cannot find megatest.config, exiting")
(exit))))
(debug:print-info 2 "Starting server using " transport " transport")
(set! *transport-type* transport)
(case transport
((fs) (exit)) ;; there is no "fs" server transport
((http) (http-transport:launch))
((zmq) (zmq-transport:launch))
(else
(debug:print "WARNING: unrecognised transport " transport)
(exit))))
;;======================================================================
;; Q U E U E M A N A G E M E N T
;;======================================================================
;; We don't want to flush the queue if it was just flushed
(define *server:last-write-flush* (current-milliseconds))
;; Flush the queue every third of a second. Can we assume that setup-for-run
;; has already been done?
(define (server:write-queue-handler)
(if (launch:setup-for-run)
(let ((db (open-db)))
(let loop ()
(let ((last-write-flush-time #f))
(mutex-lock! *incoming-mutex*)
(set! last-write-flush-time *server:last-write-flush*)
(mutex-unlock! *incoming-mutex*)
(if (> (- (current-milliseconds) last-write-flush-time) 10)
(begin
(mutex-lock! *db:process-queue-mutex*)
(db:process-cached-writes db)
(mutex-unlock! *db:process-queue-mutex*)
(thread-sleep! 0.005))))
(loop)))
(begin
(debug:print 0 "ERROR: failed to setup for Megatest in server:write-queue-handler")
(exit 1))))
;;======================================================================
;; S E R V E R U T I L I T I E S
;;======================================================================
;; Generate a unique signature for this server
(define (server:mk-signature)
(message-digest-string (md5-primitive)
(with-output-to-string
(lambda ()
(write (list (current-directory)
(argv)))))))
;; When using zmq this would send the message back (two step process)
;; with spiffy or rpc this simply returns the return data to be returned
;;
(define (server:reply return-addr query-sig success/fail result)
(debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result)
;; (send-message pubsock target send-more: #t)
;; (send-message pubsock
(case *transport-type*
((fs) result)
((http)(db:obj->string (vector success/fail query-sig result)))
((zmq)
(let ((pub-socket (vector-ref *runremote* 1)))
(send-message pub-socket return-addr send-more: #t)
(send-message pub-socket (db:obj->string (vector success/fail query-sig result)))))
(else
(debug:print 0 "ERROR: unrecognised transport type: " *transport-type*)
result)))
(define (server:ensure-running)
(let loop ((servers (open-run-close tasks:get-best-server tasks:open-db))
(trycount 0))
(if (or (not servers)
(null? servers))
(begin
(if (even? trycount) ;; just do the server start every other time through this loop (every 8 seconds)
(let ((cmdln (conc (if (getenv "MT_MEGATEST") (getenv "MT_MEGATEST") "megatest")
" -server - -daemonize")))
(debug:print 0 "INFO: Starting server (" cmdln ") as none running ...")
;; (server:launch (string->symbol (args:get-arg "-transport" "http"))))
;; no need to use fork, no need to do the list-servers trick. Just start the damn server, it will exit on it's own
;; if there is an existing server
(system cmdln)
(thread-sleep! 3)
;; (process-run (car (argv)) (list "-server" "-" "-daemonize" "-transport" (args:get-arg "-transport" "http")))
)
(begin
(debug:print-info 0 "Waiting for server to start")
(thread-sleep! 4)))
(if (< trycount 10)
(loop (open-run-close tasks:get-best-server tasks:open-db)
(+ trycount 1))
(debug:print 0 "WARNING: Couldn't start or find a server.")))
(debug:print 2 "INFO: Server(s) running " servers)
)))
|
>
>
>
|
<
<
<
<
<
<
|
|
<
|
|
<
<
<
|
<
<
<
|
<
<
|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
|
>
>
>
>
>
>
>
>
>
>
>
<
|
|
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
>
>
>
>
|
>
|
>
>
|
<
>
|
>
|
>
>
>
>
>
|
|
|
<
>
>
>
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
>
>
>
|
<
>
|
>
>
>
>
>
>
>
>
>
>
>
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
>
>
>
>
>
>
|
>
|
>
>
>
>
|
|
>
>
>
>
>
>
>
>
|
>
|
>
>
>
>
>
>
>
>
>
|
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
|
;; S E R V E R
;;======================================================================
;; Call this to start the actual server
;;
;; all routes though here end in exit ...
;;
;; start_server
;;
(define (server:launch run-id)
(case *transport-type*
((http)(http-transport:launch run-id))
((nmsg)(nmsg-transport:launch run-id))
((rpc) (rpc-transport:launch run-id))
(else (debug:print 0 "ERROR: unknown server type " *transport-type*))))
;; (else (debug:print 0 "ERROR: No known transport set, transport=" transport ", using rpc")
;; (rpc-transport:launch run-id)))))
;;======================================================================
;; S E R V E R U T I L I T I E S
;;======================================================================
;; Get the transport
(define (server:get-transport)
(if *transport-type*
*transport-type*
(let ((ttype (string->symbol
(or (args:get-arg "-transport")
(configf:lookup *configdat* "server" "transport")
"rpc"))))
(set! *transport-type* ttype)
ttype)))
;; Generate a unique signature for this server
(define (server:mk-signature)
(message-digest-string (md5-primitive)
(with-output-to-string
(lambda ()
(write (list (current-directory)
(argv)))))))
;; When using zmq this would send the message back (two step process)
;; with spiffy or rpc this simply returns the return data to be returned
;;
(define (server:reply return-addr query-sig success/fail result)
(debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result)
;; (send-message pubsock target send-more: #t)
;; (send-message pubsock
(case (server:get-transport)
((rpc) (db:obj->string (vector success/fail query-sig result)))
((http) (db:obj->string (vector success/fail query-sig result)))
((zmq)
(let ((pub-socket (vector-ref *runremote* 1)))
(send-message pub-socket return-addr send-more: #t)
(send-message pub-socket (db:obj->string (vector success/fail query-sig result)))))
((fs) result)
(else
(debug:print 0 "ERROR: unrecognised transport type: " *transport-type*)
result)))
;; Given a run id start a server process ### NOTE ### > file 2>&1
;; if the run-id is zero and the target-host is set
;; try running on that host
;;
(define (server:run run-id)
(let* ((curr-host (get-host-name))
(curr-ip (server:get-best-guess-address curr-host))
(target-host (configf:lookup *configdat* "server" "homehost" ))
(testsuite (common:get-testsuite-name))
(logfile (conc *toppath* "/logs/" run-id ".log"))
(cmdln (conc (common:get-megatest-exe)
" -server " (or target-host "-") " -run-id " run-id (if (equal? (configf:lookup *configdat* "server" "daemonize") "yes")
(conc " -daemonize -log " logfile)
"")
" -debug 4 -m testsuite:" testsuite))) ;; (conc " >> " logfile " 2>&1 &")))))
(debug:print 0 "INFO: Starting server (" cmdln ") as none running ...")
(push-directory *toppath*)
(if (not (directory-exists? "logs"))(create-directory "logs"))
;; host.domain.tld match host?
(if (and target-host
;; look at target host, is it host.domain.tld or ip address and does it
;; match current ip or hostname
(not (string-match (conc "("curr-host "|" curr-host"\\..*)") target-host))
(not (equal? curr-ip target-host)))
(begin
(debug:print-info 0 "Starting server on " target-host ", logfile is " logfile)
(setenv "TARGETHOST" target-host)))
(setenv "TARGETHOST_LOGF" logfile)
(common:wait-for-normalized-load 4 " delaying server start due to load") ;; do not try starting servers on an already overloaded machine, just wait forever
(system (conc "nbfake " cmdln))
(unsetenv "TARGETHOST_LOGF")
(if (get-environment-variable "TARGETHOST")(unsetenv "TARGETHOST"))
;; (system cmdln)
(pop-directory)))
(define (server:get-client-signature)
(if *my-client-signature* *my-client-signature*
(let ((sig (server:mk-signature)))
(set! *my-client-signature* sig)
*my-client-signature*)))
;; kind start up of servers, wait 40 seconds before allowing another server for a given
;; run-id to be launched
(define (server:kind-run run-id)
(let ((last-run-time (hash-table-ref/default *server-kind-run* run-id #f)))
(if (or (not last-run-time)
(> (- (current-seconds) last-run-time) 30))
(begin
(server:run run-id)
(hash-table-set! *server-kind-run* run-id (current-seconds))))))
;; The generic run a server command. Dispatches the call to server 0 if run-id != 0
;;
(define (server:try-running run-id)
(if (eq? run-id 0)
(server:run run-id)
(rmt:start-server run-id)))
(define (server:check-if-running run-id)
(let ((tdbdat (tasks:open-db)))
(let loop ((server (tasks:get-server (db:delay-if-busy tdbdat) run-id))
(trycount 0))
(if server
;; note: client:start will set *runremote*. this needs to be changed
;; also, client:start will login to the server, also need to change that.
;;
;; client:start returns #t if login was successful.
;;
(let ((res (case *transport-type*
((http)(server:ping-server run-id
(tasks:hostinfo-get-interface server)
(tasks:hostinfo-get-port server)))
((nmsg)(nmsg-transport:ping (tasks:hostinfo-get-interface server)
(tasks:hostinfo-get-port server)
timeout: 2)))))
;; if the server didn't respond we must remove the record
(if res
#t
(begin
(debug:print-info 0 "server at " server " not responding, removing record")
(tasks:server-force-clean-running-records-for-run-id (db:delay-if-busy tdbdat) run-id
" server:check-if-running")
res)))
#f))))
;; called in megatest.scm, host-port is string hostname:port
;;
(define (server:ping run-id host:port)
(let ((tdbdat (tasks:open-db)))
(let* ((host-port (let ((slst (string-split host:port ":")))
(if (eq? (length slst) 2)
(list (car slst)(string->number (cadr slst)))
#f)))
(toppath (launch:setup-for-run))
(server-db-dat (if (not host-port)(tasks:get-server (db:delay-if-busy tdbdat) run-id) #f)))
(if (not run-id)
(begin
(debug:print 0 "ERROR: must specify run-id when doing ping, -run-id n")
(print "ERROR: No run-id")
(exit 1))
(if (and (not host-port)
(not server-db-dat))
(begin
(print "ERROR: bad host:port")
(exit 1))
(let* ((iface (if host-port (car host-port) (tasks:hostinfo-get-interface server-db-dat)))
(port (if host-port (cadr host-port)(tasks:hostinfo-get-port server-db-dat)))
(server-dat (http-transport:client-connect iface port))
(login-res (rmt:login-no-auto-client-setup server-dat run-id)))
(if (and (list? login-res)
(car login-res))
(begin
(print "LOGIN_OK")
(exit 0))
(begin
(print "LOGIN_FAILED")
(exit 1)))))))))
;; run ping in separate process, safest way in some cases
;;
(define (server:ping-server run-id iface port)
(with-input-from-pipe
(conc (common:get-megatest-exe) " -run-id " run-id " -ping " (conc iface ":" port))
(lambda ()
(let loop ((inl (read-line))
(res "NOREPLY"))
(if (eof-object? inl)
(case (string->symbol res)
((NOREPLY) #f)
((LOGIN_OK) #t)
(else #f))
(loop (read-line) inl))))))
(define (server:login toppath)
(lambda (toppath)
(set! *last-db-access* (current-seconds))
(if (equal? *toppath* toppath)
(begin
;; (debug:print-info 2 "login successful")
#t)
(begin
;; (debug:print-info 2 "login failed")
#f))))
(define (server:get-timeout)
(let ((tmo (configf:lookup *configdat* "server" "timeout")))
(if (and (string? tmo)
(string->number tmo))
(* 60 60 (string->number tmo))
;; (* 3 24 60 60) ;; default to three days
(* 60 1) ;; default to one minute
;; (* 60 60 25) ;; default to 25 hours
)))
|