Megatest

Check-in [5f757480e6]
Login
Overview
Comment:Added interface to the monitor db and appropriate handling thereof.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk | v1.506
Files: files | file ages | folders
SHA1: 5f757480e6441e210f1f71d1a87ddbd9af482886
User & Date: mrwellan on 2012-11-02 17:36:08
Other Links: manifest | tags
Context
2012-11-02
18:33
borked server heartbeat logic check-in: ece909ab1c user: mrwellan tags: trunk
17:36
Added interface to the monitor db and appropriate handling thereof. check-in: 5f757480e6 user: mrwellan tags: trunk, v1.506
13:19
Made repl use non-blocking client mode check-in: 50f33a00a7 user: mrwellan tags: trunk, v1.5105
Changes

Modified db.scm from [2a2b5ea15a] to [0f75487810].

231
232
233
234
235
236
237

238

239
240
241
242
243
244
245
231
232
233
234
235
236
237
238

239
240
241
242
243
244
245
246







+
-
+







;;======================================================================
;; T E S T   S P E C I F I C   D B 
;;======================================================================

;; Create the sqlite db for the individual test(s)
(define (open-test-db testpath) 
  (debug:print-info 11 "open-test-db " testpath)
  (if (and testpath 
  (if (and (directory? testpath)
	   (directory? testpath)
	   (file-read-access? testpath))
      (let* ((dbpath    (conc testpath "/testdat.db"))
	     (dbexists  (file-exists? dbpath))
	     (db        (sqlite3:open-database dbpath)) ;; (never-give-up-open-db dbpath))
	     (handler   (make-busy-timeout (if (args:get-arg "-override-timeout")
					       (string->number (args:get-arg "-override-timeout"))
					       136000))))

Modified megatest-version.scm from [0ac622c7df] to [4bf7ad110a].

1
2
3
4
5
6

7
1
2
3
4
5

6
7





-
+

;; Always use two digit decimal
;; 1.01, 1.02...1.10,1.11 ... 1.99,2.00..

(declare (unit megatest-version))

(define megatest-version 1.5105)
(define megatest-version 1.5106)

Modified megatest.scm from [9c15c3f066] to [1433ed4462].

270
271
272
273
274
275
276
277

278
279
280


281
282
283
284
285
286
287
288

289
290
291
292
293





294
295
296
297
298
299
300
301
302
303
304
305
306


307

308
309
310
311
312
313
314
315
316
317




318
319
320
321


322
323

324
325

326
327
328
329
330
331
332
270
271
272
273
274
275
276

277
278


279
280
281
282
283
284
285
286
287
288
289





290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309

310
311
312
313
314
315
316
317
318
319

320
321
322
323
324
325


326
327
328
329
330
331

332
333
334
335
336
337
338
339







-
+

-
-
+
+








+
-
-
-
-
-
+
+
+
+
+













+
+
-
+









-
+
+
+
+


-
-
+
+


+

-
+







      (server:launch)))

(if (or (args:get-arg "-listservers")
	(args:get-arg "-killserver"))
    (let ((tl (setup-for-run)))
      (if tl 
	  (let ((servers (open-run-close tasks:get-all-servers tasks:open-db))
		(fmtstr  "~5a~8a~8a~20a~5a~20a~9a~20a\n")
		(fmtstr  "~5a~8a~8a~20a~20a~10a~20a~10a~10a\n")
		(servers-to-kill '()))
	    (format #t fmtstr "Id" "MTver" "Pid" "Host" "Port" "Time" "Priority" "State")
	    (format #t fmtstr "==" "=====" "===" "====" "====" "====" "========" "=====")
	    (format #t fmtstr "Id" "MTver" "Pid" "Host" "Interface" "Port" "Time" "Priority" "State")
	    (format #t fmtstr "==" "=====" "===" "====" "=========" "====" "====" "========" "=====")
	    (for-each 
	     (lambda (server)
	       (let* ((killinfo   (args:get-arg "-killserver"))
		      (khost-port (if killinfo (if (substring-index ":" killinfo)(string-split ":") #f) #f))
		      (kpid       (if killinfo (if (substring-index ":" killinfo) #f (string->number killinfo)) #f))
		      (id         (vector-ref server 0))
		      (pid        (vector-ref server 1))
		      (hostname   (vector-ref server 2))
		      (interface  (vector-ref server 3))
		      (port       (vector-ref server 3))
		      (start-time (vector-ref server 4))
		      (priority   (vector-ref server 5))
		      (state      (vector-ref server 6))
		      (mt-ver     (vector-ref server 7))
		      (port       (vector-ref server 4))
		      (start-time (vector-ref server 5))
		      (priority   (vector-ref server 6))
		      (state      (vector-ref server 7))
		      (mt-ver     (vector-ref server 8))
		      (status     (open-run-close tasks:server-alive? tasks:open-db hostname port: port))
		      (killed     #f)
		      (zmq-socket (if status (server:client-connect hostname port) #f)))
		 ;; no need to login as status of #t indicates we are connecting to correct 
		 ;; server
		 (if (or (not status)    ;; no point in keeping dead records in the db
			 (and khost-port ;; kill by host/port
			      (equal? hostname (car khost-port))
			      (equal? port (string->number (cadr khost-port)))))
		     (begin
		       (open-run-close tasks:server-deregister tasks:open-db  hostname port: port)
		       (if status ;; #t means alive
			   (begin
			     (if (equal? hostname (get-host-name))
				 (process-signal pid signal/term)
			     (cdb:kill-server zmq-socket)
				 (cdb:kill-server zmq-socket))
			     (debug:print-info 1 "Killed server by host:port at " hostname ":" port))
			   (debug:print-info 1 "Removing defunct server record for " hostname ":" port))
		       (set! killed #t)))
		 (if (and kpid
			  ;; (equal? hostname (car khost-port))
			  (equal? kpid pid)) ;;; YEP, ALL WITH PID WILL BE KILLED!!!
		     (begin
		       (open-run-close tasks:server-deregister tasks:open-db hostname pid: pid)
		       (set! killed #t)
		       (if status (cdb:kill-server zmq-socket))
		       (if status 
			   (if (equal? hostname (get-host-name))
			       (process-signal pid signal/term)
			       (debug:print 0 "WARNING: Can't kill a dead server on host " hostname)))
		       (debug:print-info 1 "Killed server by pid at " hostname ":" port)))
		 ;; (if zmq-socket (close-socket  zmq-socket))
		 (format #t fmtstr id mt-ver pid hostname port start-time priority 
			 status)))
		 (format #t fmtstr id mt-ver pid hostname interface port start-time priority 
			 (if status "alive" "dead"))))
	     servers)
	    (debug:print-info 1 "Done with listservers")
	    (set! *didsomething* #t)
	    (exit) ;; must do, would have to add checks to many/all calls below
	    (set! *didsomething* #t))
	    )
	  (exit)))
    ;; if not list or kill then start a client (if appropriate)
    (if (or (args-defined? "-h" "-version" "-gen-megatest-area" "-gen-megatest-test")
	    (eq? (length (hash-table-keys args:arg-hash)) 0))
	(debug:print-info 1 "Server connection not needed")
	(server:client-launch do-ping: #t)))

Modified server.scm from [59c1e6d986] to [000964c6b9].

33
34
35
36
37
38
39
40
41


42

43
44
45
46

47
48
49
50
51
52
53
54
55

56
57
58
59
60
61
62
33
34
35
36
37
38
39


40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

57
58
59
60
61
62
63
64







-
-
+
+

+




+








-
+







  (debug:print 0 "Attempting to start the server ...")
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting")
	    (exit))))
  (let* ((zmq-socket     #f)
	 (hostname       (if (string=? "-" hostn)
			     (get-host-name) 
	 (iface          (if (string=? "-" hostn)
			     "*" ;; (get-host-name) 
			     hostn))
	 (hostname       (get-host-name))
	 (ipaddrstr      (let ((ipstr (if (string=? "-" hostn)
					  (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
					  #f)))
			   (if ipstr ipstr hostname))))
    ;; (set! zmq-socket (server:find-free-port-and-open iface zmq-socket 5555 0))
    (set! zmq-socket (server:find-free-port-and-open ipaddrstr zmq-socket 5555 0))
    (set! *cache-on* #t)
    
    ;; what to do when we quit
    ;;
    (on-exit (lambda ()
	       (if (and *toppath* *server-id*)
		   (begin
		     (open-run-close tasks:server-deregister-self tasks:open-db #f))
		     (open-run-close tasks:server-deregister-self tasks:open-db ipaddrstr))
		   (let loop () 
		     (let ((queue-len 0))
		       (thread-sleep! (random 5))
		       (mutex-lock! *incoming-mutex*)
		       (set! queue-len (length *incoming-data*))
		       (mutex-unlock! *incoming-mutex*)
		       (if (> queue-len 0)
107
108
109
110
111
112
113
114

115
116


117
118
119
120
121


122
123
124


125
126

127
128
129
130
131

132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149


150
151
152
153
154

155
156
157
158
159
160
161
109
110
111
112
113
114
115

116
117

118
119
120
121
122
123
124
125
126
127


128
129
130

131
132
133
134
135

136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152


153
154
155
156
157
158

159
160
161
162
163
164
165
166







-
+

-
+
+





+
+

-
-
+
+

-
+




-
+
















-
-
+
+




-
+







		(set! *time-to-exit* #t)
		(open-run-close tasks:server-deregister-self tasks:open-db)
		(thread-sleep! 1)
		(debug:print-info 0 "Max cached queries was " *max-cache-size*)
		(debug:print-info 0 "Server shutdown complete. Exiting")
		(exit)))))))

(define (server:find-free-port-and-open host s port #!key (trynum 50))
(define (server:find-free-port-and-open iface s port #!key (trynum 50))
  (let ((s (if s s (make-socket 'rep)))
	(p (if (number? port) port 5555)))
	(p (if (number? port) port 5555))
 	(old-handler (current-exception-handler)))
    (handle-exceptions
     exn
     (begin
       (debug:print 0 "Failed to bind to port " p ", trying next port")
       (debug:print 0 "   EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
       ;; (old-handler)
       ;; (print-call-chain)
       (if (> trynum 0)
	   (server:find-free-port-and-open host s (+ p 1) trynum: (- trynum 1))
	   (debug:print-info 0 "Tried ports from " (- p trynum) " to " p 
	   (server:find-free-port-and-open iface s (+ p 1) trynum: (- trynum 1))
	   (debug:print-info 0 "Tried ports up to " p 
			     " but all were in use. Please try a different port range by starting the server with parameter \" -port N\" where N is the starting port number to use")))
     (let ((zmq-url (conc "tcp://" host ":" p)))
     (let ((zmq-url (conc "tcp://" iface ":" p)))
       (print "Trying to start server on " zmq-url)
       (bind-socket s zmq-url)
       (set! *runremote* #f)
       (debug:print 0 "Server started on " zmq-url)
       (set! *server-id* (open-run-close tasks:server-register tasks:open-db (current-process-id) host p 0 'live))
       (set! *server-id* (open-run-close tasks:server-register tasks:open-db (current-process-id) iface p 0 'live))
       s))))

(define (server:mk-signature)
  (message-digest-string (md5-primitive) 
			 (with-output-to-string
			   (lambda ()
			     (write (list (current-directory)
					  (argv)))))))

(define (server:get-client-signature)
  (if *my-client-signature* *my-client-signature*
      (let ((sig (server:mk-signature)))
	(set! *my-client-signature* sig)
	*my-client-signature*)))

;; 
(define (server:client-connect host port #!key (context #f))
  (debug:print 3 "client-connect " host ":" port)
(define (server:client-connect iface port #!key (context #f))
  (debug:print 3 "client-connect " iface ":" port)
  (let ((connect-ok #f)
	(zmq-socket (if context 
			(make-socket 'req context)
			(make-socket 'req)))
	(conurl     (server:make-server-url (list host port))))
	(conurl     (server:make-server-url (list iface port))))
    (if (socket? zmq-socket)
	(begin
	  (connect-socket zmq-socket conurl)
	  zmq-socket)
	#f)))
  

173
174
175
176
177
178
179
180
181
182



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207






















208
209
210
211
212
213
214
178
179
180
181
182
183
184



185
186
187

























188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216







-
-
-
+
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+







  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: failed to find megatest.config, exiting")
	    (exit))))
  (let ((hostinfo   (open-run-close tasks:get-best-server tasks:open-db do-ping: do-ping)))
    (if hostinfo
	(let ((host    (car hostinfo))
	      (port    (cadr hostinfo)))
	  ;; (zsocket (caddr hostinfo)))
	(let ((host    (car   hostinfo))
	      (iface   (cadr  hostinfo))
	      (port    (caddr hostinfo)))
	;; (set! *runremote* zsocket))
	  (let* ((host       (car hostinfo))
		 (port       (cadr hostinfo)))
	    (debug:print-info 2 "Setting up to connect to " hostinfo)
	    (handle-exceptions
	     exn
	     (begin
	       (debug:print 0 "ERROR: Failed to open a connection to the server at: " hostinfo)
	       (debug:print 0 "   EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
	       (debug:print 0 "   perhaps jobs killed with -9? Removing server records")
	       (open-run-close tasks:server-deregister tasks:open-db host port: port)
	       #f)
	     (let* ((zmq-socket (server:client-connect host port))
		    (login-res  (server:client-login zmq-socket))
		    (connect-ok (if (null? login-res) #f (car login-res)))
		    (conurl     (server:make-server-url hostinfo)))
	       (if connect-ok
		   (begin
		     (debug:print-info 2 "Logged in and connected to " conurl)
		     (set! *runremote* zmq-socket)
		     #t)
		   (begin
		     (debug:print-info 2 "Failed to login or connect to " conurl)
		     (set! *runremote* #f)
		     #f))))))
	  (debug:print-info 2 "Setting up to connect to " hostinfo)
	  (handle-exceptions
	   exn
	   (begin
	     (debug:print 0 "ERROR: Failed to open a connection to the server at: " hostinfo)
	     (debug:print 0 "   EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
	     (debug:print 0 "   perhaps jobs killed with -9? Removing server records")
	     (open-run-close tasks:server-deregister tasks:open-db host port: port)
	     #f)
	   (let* ((zmq-socket (server:client-connect iface port))
		  (login-res  (server:client-login zmq-socket))
		  (connect-ok (if (null? login-res) #f (car login-res)))
		  (conurl     (server:make-server-url (list iface port))))
	     (if connect-ok
		 (begin
		   (debug:print-info 2 "Logged in and connected to " conurl)
		   (set! *runremote* zmq-socket)
		   #t)
		 (begin
		   (debug:print-info 2 "Failed to login or connect to " conurl)
		   (set! *runremote* #f)
		   #f)))))
	(if (> numtries 0)
	    (let ((exe (car (argv))))
	      (debug:print-info 1 "No server available, attempting to start one...")
	      (process-run exe (list "-server" "-" "-debug" (conc *verbosity*)))
	      (sleep 2)
	      ;; not doing ping, assume the server started and registered itself
	      (server:client-setup numtries: (- numtries 1) do-ping: #f))

Modified tasks.scm from [5ae2b507c5] to [3a1458e323].

47
48
49
50
51
52
53

54
55
56
57
58
59
60
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61







+







                                start_time TIMESTAMP,
                                last_update TIMESTAMP,
                                hostname TEXT,
                                username TEXT,
                               CONSTRAINT monitors_constraint UNIQUE (pid,hostname));")
	  (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS servers (id INTEGER PRIMARY KEY,
                                  pid INTEGER,
                                  interface TEXT,
                                  hostname TEXT,
                                  port INTEGER,
                                  start_time TIMESTAMP,
                                  priority INTEGER,
                                  state TEXT,
                                  mt_version TEXT,
                                  heartbeat TIMESTAMP,
72
73
74
75
76
77
78
79

80
81
82
83
84



85
86
87
88
89
90
91
92
93
94
95
96
97
98

99
100
101
102
103
104

105
106
107

108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123

124
125
126
127
128
129
130
73
74
75
76
77
78
79

80
81
82



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98

99
100
101
102
103
104

105
106
107

108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123

124
125
126
127
128
129
130
131







-
+


-
-
-
+
+
+













-
+





-
+


-
+















-
+







    mdb))
    
;;======================================================================
;; Server and client management
;;======================================================================

;; state: 'live, 'shutting-down, 'dead
(define (tasks:server-register mdb pid hostname port priority state)
(define (tasks:server-register mdb pid interface port priority state)
  (sqlite3:execute 
   mdb 
   "INSERT OR REPLACE INTO servers (pid,hostname,port,start_time,priority,state,mt_version,heartbeat) VALUES(?,?,?,strftime('%s','now'),?,?,?,strftime('%s','now'));"
   pid hostname port priority (conc state) megatest-version)
  (tasks:server-get-server-id mdb hostname port pid))
   "INSERT OR REPLACE INTO servers (pid,hostname,port,start_time,priority,state,mt_version,heartbeat,interface) VALUES(?,?,?,strftime('%s','now'),?,?,?,strftime('%s','now'),?);"
   pid (get-host-name) port priority (conc state) megatest-version interface)
  (tasks:server-get-server-id mdb (get-host-name) port pid))

;; NB// two servers with same pid on different hosts will be removed from the list if pid: is used!
(define (tasks:server-deregister mdb hostname #!key (port #f)(pid #f))
  (debug:print-info 11 "server-deregister " hostname ", port " port ", pid " pid)
  (if pid
      (sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid)
      (if port
	  (sqlite3:execute mdb "DELETE FROM servers WHERE  hostname=? AND port=?;" hostname port)
	  (debug:print 0 "ERROR: tasks:server-deregister called with neither pid nor port specified"))))

(define (tasks:server-deregister-self mdb hostname)
  (tasks:server-deregister mdb hostname pid: (current-process-id)))

(define (tasks:server-get-server-id mdb host port pid)
(define (tasks:server-get-server-id mdb hostname port pid)
  (let ((res #f))
    (sqlite3:for-each-row
     (lambda (id)
       (set! res id))
     mdb
     (if (and host  pid)
     (if (and hostname  pid)
	 "SELECT id FROM servers WHERE hostname=? AND pid=?;"
	 "SELECT id FROM servers WHERE hostname=? AND port=?;")
     host (if pid pid port))
     hostname (if pid pid port))
    res))

(define (tasks:server-update-heartbeat mdb server-id)
  (sqlite3:execute mdb "UPDATE servers SET heartbeat=strftime('%s','now') WHERE id=?;" server-id))

;; alive servers keep the heartbeat field upto date with seconds every 6 or so seconds
(define (tasks:server-alive? mdb server-id #!key (hostname #f)(port #f)(pid #f))
  (let* ((server-id  (if server-id 
			 server-id
			 (tasks:server-get-server-id mdb hostname port pid)))
	 (heartbeat-delta 99e9))
    (sqlite3:for-each-row
     (lambda (delta)
       (set! heartbeat-delta delta))
     mdb "SELECT strftime('%s','now')-heartbeat FROM servers WHERE id=?;" server-id)
    (< (- (current-seconds) heartbeat-delta) 10)))
    (> heartbeat-delta 10)))

(define (tasks:client-register mdb pid hostname cmdline)
  (sqlite3:execute
   mdb
   "INSERT OR REPLACE INTO clients (server_id,pid,hostname,cmdline,login_time) VALUES(?,?,?,?,strftime('%s','now'));")
  (tasks:server-get-server-id mdb)
  pid hostname cmdline)
149
150
151
152
153
154
155
156
157


158
159
160

161
162
163
164
165
166
167




168
169
170
171
172
173












174
175
176
177
178
179
180
181
182
183
184
185


186
187

188
189
190
191
192
193
194
150
151
152
153
154
155
156


157
158
159
160

161
162
163
164
165
166


167
168
169
170
171
172
173
174
175

176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197


198
199
200

201
202
203
204
205
206
207
208







-
-
+
+


-
+





-
-
+
+
+
+





-
+
+
+
+
+
+
+
+
+
+
+
+










-
-
+
+

-
+








;; ping each server in the db and return first found that responds. 
;; remove any others. will not necessarily remove all!
(define (tasks:get-best-server mdb #!key (do-ping #f))
  (let ((res '())
	(best #f))
    (sqlite3:for-each-row
     (lambda (id hostname port)
       (set! res (cons (list hostname port) res))
     (lambda (id hostname interface port pid)
       (set! res (cons (list hostname interface port pid) res))
       (debug:print-info 1 "Found " hostname ":" port))
     mdb
     "SELECT id,hostname,port FROM servers WHERE state='live' AND mt_version=? ORDER BY start_time DESC LIMIT 1;" megatest-version)
     "SELECT id,hostname,interface,port,pid FROM servers WHERE state='live' AND mt_version=? ORDER BY start_time DESC LIMIT 1;" megatest-version)
    ;; (print "res=" res)
    (if (null? res) #f
	(let loop ((hed (car res))
		   (tal (cdr res)))
	  ;; (print "hed=" hed ", tal=" tal)
	  (let* ((host     (car hed))
		 (port     (cadr hed))
	  (let* ((host     (car    hed))
		 (iface    (cadr   hed))
		 (port     (caddr  hed))
		 (pid      (cadddr hed))
		 ;; (ping-res (if do-ping (server:ping host port return-socket: #f) '(#t "NO PING" #f)))
		 (alive    (open-run-close tasks:server-alive? tasks:open-db host port: port)) ;; (car ping-res))
		 ;; (reason   (cadr ping-res))
		 ;; (zsocket  (caddr ping-res))
		 )
	    (if alive (list host port)
	    (if alive
		;; (if (server:ping iface port)
		    (list host iface port)
		 ;;    ;; not actually alive, destroy!
		 ;;    (begin
		 ;;      (if (equal? host (get-host-name))
		 ;;          (begin
		 ;;            (debug:print-info 0 "Killing process " pid " on host " host " with signal/term")
		 ;;            (send-signal pid signal/term))
		 ;;          (debug:print 0 "WARNING: Can't kill process " pid " on host " host))
		 ;;      (open-run-close tasks:server-deregister tasks:open-db  host port: port)
		 ;;      #f))
		;; remove defunct server from table
		(begin
		  (open-run-close tasks:server-deregister tasks:open-db  host port: port)
		  (if (null? tal)
		      #f
		      (loop (car tal)(cdr tal))))))))))

(define (tasks:get-all-servers mdb)
  (let ((res '()))
    (sqlite3:for-each-row
     (lambda (id pid hostname port start-time priority state mt-version)
       (set! res (cons (vector id pid hostname port start-time priority state mt-version) res)))
     (lambda (id pid hostname interface port start-time priority state mt-version)
       (set! res (cons (vector id pid hostname interface port start-time priority state mt-version) res)))
     mdb
     "SELECT id,pid,hostname,port,start_time,priority,state,mt_version FROM servers ORDER BY start_time DESC;")
     "SELECT id,pid,hostname,interface,port,start_time,priority,state,mt_version FROM servers ORDER BY start_time DESC;")
    res))
       

;;======================================================================
;; Tasks and Task monitors
;;======================================================================