1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
-
+
|
;; Copyright 2006-2012, Matthew Welland.
;;
;; This program is made available under the GNU GPL version 2.0 or
;; greater. See the accompanying file COPYING for details.
;;
;; This program is distributed WITHOUT ANY WARRANTY; without even the
;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
;; PURPOSE.
(require-extension (srfi 18) extras tcp s11n)
(use srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest)
(use srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest directory-utils)
;; (use zmq)
(use spiffy uri-common intarweb http-client spiffy-request-vars)
(declare (unit server))
(declare (uses common))
|
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
|
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
+
+
+
-
+
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
+
+
+
+
+
+
+
-
-
-
+
+
+
+
+
+
+
+
+
+
+
-
-
-
+
+
-
-
-
-
-
+
+
-
-
-
-
+
+
+
-
-
-
+
+
+
+
+
-
-
+
-
-
-
+
+
+
+
|
;; S E R V E R
;;======================================================================
;; Call this to start the actual server
;;
;; all routes though here end in exit ...
;;
;; start_server
;;
(define (server:launch transport)
(define (server:launch run-id)
(if (not *toppath*)
(if (not (setup-for-run))
(begin
(debug:print 0 "ERROR: cannot find megatest.config, exiting")
(exit))))
(debug:print-info 2 "Starting server using " transport " transport")
(set! *transport-type* transport)
(case transport
((fs) (exit)) ;; there is no "fs" server transport
((http) (http-transport:launch))
(http-transport:launch run-id))
((zmq) (zmq-transport:launch))
(else
(debug:print "WARNING: unrecognised transport " transport)
(exit))))
;;======================================================================
;; Q U E U E M A N A G E M E N T
;;======================================================================
;; We don't want to flush the queue if it was just flushed
(define *server:last-write-flush* (current-milliseconds))
;; Flush the queue every third of a second. Can we assume that setup-for-run
;; has already been done?
(define (server:write-queue-handler)
(if (setup-for-run)
(let ((db (open-db)))
(let loop ()
(let ((last-write-flush-time #f))
(mutex-lock! *incoming-mutex*)
(set! last-write-flush-time *server:last-write-flush*)
(mutex-unlock! *incoming-mutex*)
(if (> (- (current-milliseconds) last-write-flush-time) 10)
(begin
(mutex-lock! *db:process-queue-mutex*)
(db:process-cached-writes db)
(mutex-unlock! *db:process-queue-mutex*)
(thread-sleep! 0.005))))
(loop)))
(begin
(debug:print 0 "ERROR: failed to setup for Megatest in server:write-queue-handler")
(exit 1))))
;;======================================================================
;; S E R V E R U T I L I T I E S
;;======================================================================
;; Generate a unique signature for this server
(define (server:mk-signature)
(message-digest-string (md5-primitive)
(with-output-to-string
(lambda ()
(write (list (current-directory)
(argv)))))))
;; When using zmq this would send the message back (two step process)
;; with spiffy or rpc this simply returns the return data to be returned
;;
(define (server:reply return-addr query-sig success/fail result)
(debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result)
;; (send-message pubsock target send-more: #t)
;; (send-message pubsock
(case *transport-type*
((fs) result)
((http)(db:obj->string (vector success/fail query-sig result)))
((zmq)
(let ((pub-socket (vector-ref *runremote* 1)))
(send-message pub-socket return-addr send-more: #t)
(send-message pub-socket (db:obj->string (vector success/fail query-sig result)))))
(else
(debug:print 0 "ERROR: unrecognised transport type: " *transport-type*)
result)))
(db:obj->string (vector success/fail query-sig result)))
;; Given a run id start a server process ### NOTE ### > file 2>&1
;; if the run-id is zero and the target-host is set
;; try running on that host
;;
(define (server:run run-id)
(let* ((curr-host (get-host-name))
(target-host (configf:lookup *configdat* "server" "homehost" ))
(logfile (conc *toppath* "/db/" run-id ".log"))
(cmdln (conc (common:get-megatest-exe)
" -server " (or target-host "-") " -run-id " run-id " >> " logfile " 2>&1 &")))
(debug:print 0 "INFO: Starting server (" cmdln ") as none running ...")
(push-directory *toppath*)
;; host.domain.tld match host?
(if (and target-host (not (string-match (conc "("curr-host "|" curr-host"\\..*)") target-host)))
(begin
(debug:print-info 0 "Starting server on " target-host ", logfile is " logfile)
(setenv "TARGETHOST" target-host)
(setenv "TARGETHOST_LOGF" logfile)
(system (conc "nbfake " cmdln)))
(system cmdln))
(pop-directory)))
;; kind start up of servers, wait 40 seconds before allowing another server for a given
;; run-id to be launched
(define (server:kind-run run-id)
(let ((last-run-time (hash-table-ref/default *server-kind-run* run-id #f)))
(if (or (not last-run-time)
(> (- (current-seconds) last-run-time) 40))
(begin
(server:run run-id)
(hash-table-set! *server-kind-run* run-id (current-seconds))))))
;; The generic run a server command. Dispatches the call to server 0 if run-id != 0
;;
(define (server:ensure-running)
(let loop ((servers (open-run-close tasks:get-best-server tasks:open-db))
(define (server:try-running run-id)
(if (eq? run-id 0)
(server:run run-id)
(rmt:start-server run-id)))
(define (server:check-if-running run-id)
(let loop ((server (open-run-close tasks:get-server tasks:open-db run-id))
(trycount 0))
(if (or (not servers)
(null? servers))
(begin
(if server
;; note: client:start will set *runremote*. this needs to be changed
;; also, client:start will login to the server, also need to change that.
;;
;; client:start returns #t if login was successful.
;;
(let ((res (server:ping-server run-id (vector-ref server 1)(vector-ref server 0))))
;; if the server didn't respond we must remove the record
(if res
#t
(begin
(if (even? trycount) ;; just do the server start every other time through this loop (every 8 seconds)
(let ((cmdln (conc (if (getenv "MT_MEGATEST") (getenv "MT_MEGATEST") "megatest")
" -server - -daemonize")))
(open-run-close tasks:server-force-clean-running-records-for-run-id tasks:open-db run-id
" server:check-if-running")
(debug:print 0 "INFO: Starting server (" cmdln ") as none running ...")
;; (server:launch (string->symbol (args:get-arg "-transport" "http"))))
;; no need to use fork, no need to do the list-servers trick. Just start the damn server, it will exit on it's own
;; if there is an existing server
(system cmdln)
res)))
#f)))
(thread-sleep! 3)
;; (process-run (car (argv)) (list "-server" "-" "-daemonize" "-transport" (args:get-arg "-transport" "http")))
)
(begin
(define (server:ping-server run-id iface port)
(with-input-from-pipe
(debug:print-info 0 "Waiting for server to start")
(thread-sleep! 4)))
(if (< trycount 10)
(conc (common:get-megatest-exe) " -run-id " run-id " -ping " (conc iface ":" port))
(lambda ()
(let loop ((inl (read-line))
(res "NOREPLY"))
(if (eof-object? inl)
(loop (open-run-close tasks:get-best-server tasks:open-db)
(+ trycount 1))
(case (string->symbol res)
(debug:print 0 "WARNING: Couldn't start or find a server.")))
(debug:print 2 "INFO: Server(s) running " servers)
)))
((NOREPLY) #f)
((LOGIN_OK) #t)
(else #f))
(loop (read-line) inl))))))
|