Megatest

Diff
Login

Differences From Artifact [efc84088da]:

To Artifact [6c83278de0]:


78
79
80
81
82
83
84

85



86
87
88
89
90
91


92
93


94
95
96
97
98
99
100
78
79
80
81
82
83
84
85

86
87
88
89
90
91
92
93

94
95
96

97
98
99
100
101
102
103
104
105







+
-
+
+
+





-
+
+

-
+
+








;; state: 'live, 'shutting-down, 'dead
(define (tasks:server-register mdb pid interface port priority state)
  (sqlite3:execute 
   mdb 
   "INSERT OR REPLACE INTO servers (pid,hostname,port,start_time,priority,state,mt_version,heartbeat,interface) VALUES(?,?,?,strftime('%s','now'),?,?,?,strftime('%s','now'),?);"
   pid (get-host-name) port priority (conc state) megatest-version interface)
  (list 
  (tasks:server-get-server-id mdb (get-host-name) port pid))
   (tasks:server-get-server-id mdb (get-host-name) port pid)
   interface
   port))

;; NB// two servers with same pid on different hosts will be removed from the list if pid: is used!
(define (tasks:server-deregister mdb hostname #!key (port #f)(pid #f))
  (debug:print-info 11 "server-deregister " hostname ", port " port ", pid " pid)
  (if pid
      (sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid)
      ;; (sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid)
      (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE pid=?;" pid)
      (if port
	  (sqlite3:execute mdb "DELETE FROM servers WHERE  hostname=? AND port=?;" hostname port)
	  ;; (sqlite3:execute mdb "DELETE FROM servers WHERE  hostname=? AND port=?;" hostname port)
	  (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE hostname=? AND port=?;" hostname port)
	  (debug:print 0 "ERROR: tasks:server-deregister called with neither pid nor port specified"))))

(define (tasks:server-deregister-self mdb hostname)
  (tasks:server-deregister mdb hostname pid: (current-process-id)))

(define (tasks:server-get-server-id mdb hostname port pid)
  (let ((res #f))
117
118
119
120
121
122
123

124
125
126
127
128
129
130

131
132
133
134
135
136
137
122
123
124
125
126
127
128
129
130
131
132
133
134
135

136
137
138
139
140
141
142
143







+






-
+







			 server-id
			 (tasks:server-get-server-id mdb hostname port pid)))
	 (heartbeat-delta 99e9))
    (sqlite3:for-each-row
     (lambda (delta)
       (set! heartbeat-delta delta))
     mdb "SELECT strftime('%s','now')-heartbeat FROM servers WHERE id=?;" server-id)
    (debug:print 1 "Found heartbeat-delta of " heartbeat-delta " for server with id " server-id)
    (< heartbeat-delta 10)))

(define (tasks:client-register mdb pid hostname cmdline)
  (sqlite3:execute
   mdb
   "INSERT OR REPLACE INTO clients (server_id,pid,hostname,cmdline,login_time) VALUES(?,?,?,?,strftime('%s','now'));")
  (tasks:server-get-server-id mdb)
  (tasks:server-get-server-id mdb hostname #f pid)
  pid hostname cmdline)

(define (tasks:client-logout mdb pid hostname cmdline)
  (sqlite3:execute
   mdb
   "UPDATE clients SET logout_time=strftime('%s','now') WHERE pid=? AND hostname=? AND cmdline=?;"
   pid hostname cmdline))
146
147
148
149
150
151
152
153

154
155
156
157
158
159

160
161
162
163
164
165
166
167
168
169
170
171
172

173
174
175
176

177
178


179
180
181
182
183
184
185
186
187
188
189

190

191
192
193
194



























195
196
197
198
199
200
201
152
153
154
155
156
157
158

159
160
161
162
163
164

165
166
167
168
169
170
171
172
173
174
175
176


177



178
179


180
181










182
183

184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222







-
+





-
+











-
-
+
-
-
-

+
-
-
+
+
-
-
-
-
-
-
-
-
-
-

+
-
+




+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+







     server-id)))

(define (tasks:have-clients? mdb server-id)
  (null? (tasks:get-logged-in-clients mdb server-id)))

;; ping each server in the db and return first found that responds. 
;; remove any others. will not necessarily remove all!
(define (tasks:get-best-server mdb #!key (do-ping #f))
(define (tasks:get-best-server mdb)
  (let ((res '())
	(best #f))
    (sqlite3:for-each-row
     (lambda (id hostname interface port pid)
       (set! res (cons (list hostname interface port pid) res))
       (debug:print-info 1 "Found " hostname ":" port))
       (debug:print-info 1 "Found existing server " hostname ":" port " registered in db"))
     mdb
     "SELECT id,hostname,interface,port,pid FROM servers WHERE state='live' AND mt_version=? ORDER BY start_time DESC LIMIT 1;" megatest-version)
    ;; (print "res=" res)
    (if (null? res) #f
	(let loop ((hed (car res))
		   (tal (cdr res)))
	  ;; (print "hed=" hed ", tal=" tal)
	  (let* ((host     (car    hed))
		 (iface    (cadr   hed))
		 (port     (caddr  hed))
		 (pid      (cadddr hed))
		 ;; (ping-res (if do-ping (server:ping host port return-socket: #f) '(#t "NO PING" #f)))
		 (alive    (open-run-close tasks:server-alive? tasks:open-db host port: port)) ;; (car ping-res))
		 (alive    (open-run-close tasks:server-alive? tasks:open-db #f hostname: host port: port)))
		 ;; (reason   (cadr ping-res))
		 ;; (zsocket  (caddr ping-res))
		 )
	    (if alive
		(begin
		;; (if (server:ping iface port)
		    (list host iface port)
		  (debug:print 1 "Found an existing, alive, server " host ":" port ".")
		  (list host iface port))
		 ;;    ;; not actually alive, destroy!
		 ;;    (begin
		 ;;      (if (equal? host (get-host-name))
		 ;;          (begin
		 ;;            (debug:print-info 0 "Killing process " pid " on host " host " with signal/term")
		 ;;            (send-signal pid signal/term))
		 ;;          (debug:print 0 "WARNING: Can't kill process " pid " on host " host))
		 ;;      (open-run-close tasks:server-deregister tasks:open-db  host port: port)
		 ;;      #f))
		;; remove defunct server from table
		(begin
		  (debug:print-info 1 "Removing " host ":" port " from server registry as it appears to be dead")
		  (open-run-close tasks:server-deregister tasks:open-db  host port: port)
		  (tasks:kill-server #f host port pid)
		  (if (null? tal)
		      #f
		      (loop (car tal)(cdr tal))))))))))

(define (tasks:kill-server status hostname port pid)
  (debug:print-info 1 "Removing defunct server record for " hostname ":" port)
  (if port
      (open-run-close tasks:server-deregister tasks:open-db  hostname port: port)
      (open-run-close tasks:server-deregister tasks:open-db hostname pid: pid))
  
  (if status ;; #t means alive
      (begin
	(if (equal? hostname (get-host-name))
	    (begin
	      (debug:print 1 "Sending signal/term to " pid " on " hostname)
	      (process-signal pid signal/term)
	      (thread-sleep! 5) ;; give it five seconds to die peacefully then do a brutal kill
	      (process-signal pid signal/kill)) ;; local machine, send sig term
	    (begin
	      (debug:print-info 1 "Telling alive server on " hostname ":" port " to commit servercide")
	      (cdb:kill-server zmq-socket))))    ;; remote machine, try telling server to commit suicide
      (begin
	(if status 
	    (if (equal? hostname (get-host-name))
		(begin
		  (debug:print-info 1 "Sending signal/term to " pid " on " hostname)
		  (process-signal pid signal/term)  ;; local machine, send sig term
		  (thread-sleep! 5)                 ;; give it five seconds to die peacefully then do a brutal kill
		  (process-signal pid signal/kill)) 
		(debug:print 0 "WARNING: Can't kill frozen server on remote host " hostname))))))

(define (tasks:get-all-servers mdb)
  (let ((res '()))
    (sqlite3:for-each-row
     (lambda (id pid hostname interface port start-time priority state mt-version)
       (set! res (cons (vector id pid hostname interface port start-time priority state mt-version) res)))
     mdb
     "SELECT id,pid,hostname,interface,port,start_time,priority,state,mt_version FROM servers ORDER BY start_time DESC;")