Megatest

Check-in [e06923ca5c]
Login
Overview
Comment:Pinging servers almost working, have finalizer issues
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | monitor-cleanup
Files: files | file ages | folders
SHA1: e06923ca5c3275aa1ad0e5fee6fd35ca991073d7
User & Date: matt on 2012-11-01 01:16:45
Other Links: branch diff | manifest | tags
Context
2012-11-01
01:41
Removed instrumentation check-in: e7e1e90a39 user: matt tags: monitor-cleanup
01:16
Pinging servers almost working, have finalizer issues check-in: e06923ca5c user: matt tags: monitor-cleanup
2012-10-31
22:11
Reverted dashboard to direct access. check-in: c474722d81 user: matt tags: monitor-cleanup
Changes

Modified common.scm from [aee65c218b] to [12802b14f2].

43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
;; SERVER
(define *my-client-signature* #f)
(define *rpc:listener*      #f) ;; if set up for server communication this will hold the tcp port
(define *runremote*         #f) ;; if set up for server communication this will hold <host port>
(define *last-db-access*    (current-seconds))  ;; update when db is accessed via server
(define *max-cache-size*    0)
(define *logged-in-clients* (make-hash-table))


(define *target*            (make-hash-table)) ;; cache the target here; target is keyval1/keyval2/.../keyvalN
(define *keys*              (make-hash-table)) ;; cache the keys here
(define *keyvals*           (make-hash-table))
(define *toptest-paths*     (make-hash-table)) ;; cache toptest path settings here
(define *test-paths*        (make-hash-table)) ;; cache test-id to test run paths here
(define *test-ids*          (make-hash-table)) ;; cache run-id, testname, and item-path => test-id







|







43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
;; SERVER
(define *my-client-signature* #f)
(define *rpc:listener*      #f) ;; if set up for server communication this will hold the tcp port
(define *runremote*         #f) ;; if set up for server communication this will hold <host port>
(define *last-db-access*    (current-seconds))  ;; update when db is accessed via server
(define *max-cache-size*    0)
(define *logged-in-clients* (make-hash-table))
(define *client-non-blocking-mode* #f)

(define *target*            (make-hash-table)) ;; cache the target here; target is keyval1/keyval2/.../keyvalN
(define *keys*              (make-hash-table)) ;; cache the keys here
(define *keyvals*           (make-hash-table))
(define *toptest-paths*     (make-hash-table)) ;; cache toptest path settings here
(define *test-paths*        (make-hash-table)) ;; cache test-id to test run paths here
(define *test-ids*          (make-hash-table)) ;; cache run-id, testname, and item-path => test-id

Modified db.scm from [8e7dce497a] to [c8003a0560].

1161
1162
1163
1164
1165
1166
1167






1168
1169
1170
1171
1172
1173


1174
1175
1176
1177
1178
1179
1180
	       (begin
		 (db:write-cached-data)
		 "WRITTEN")))))))

(define (db:obj->string obj)(with-output-to-string (lambda ()(serialize obj))))
(define (db:string->obj msg)(with-input-from-string msg (lambda ()(deserialize))))







(define (cdb:client-call zmq-socket . params)
  (debug:print-info 11 "cdb:client-call zmq-socket=" zmq-socket " params=" params)
  (let ((zdat (db:obj->string params)) ;; (with-output-to-string (lambda ()(serialize params))))
	(res  #f))
    (send-message zmq-socket zdat)
    (set! res (db:string->obj (receive-message zmq-socket zdat)))


    (debug:print-info 11 "zmq-socket " (car params) " res=" res)
    res))
  
(define (cdb:set-verbosity zmq-socket val)
  (cdb:client-call zmq-socket 'set-verbosity #f val))

(define (cdb:login zmq-socket keyval signature)







>
>
>
>
>
>





|
>
>







1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
	       (begin
		 (db:write-cached-data)
		 "WRITTEN")))))))

(define (db:obj->string obj)(with-output-to-string (lambda ()(serialize obj))))
(define (db:string->obj msg)(with-input-from-string msg (lambda ()(deserialize))))

(define (cdb:use-non-blocking-mode proc)
  (set! *client-non-blocking-mode* #t)
  (let ((res (proc)))
    (set! *client-non-blocking-mode* #f)
    res))
  
(define (cdb:client-call zmq-socket . params)
  (debug:print-info 11 "cdb:client-call zmq-socket=" zmq-socket " params=" params)
  (let ((zdat (db:obj->string params)) ;; (with-output-to-string (lambda ()(serialize params))))
	(res  #f))
    (send-message zmq-socket zdat)
    (set! res (db:string->obj (if *client-non-blocking-mode* 
				  (receive-message* zmq-socket zdat)
				  (receive-message  zmq-socket zdat))))
    (debug:print-info 11 "zmq-socket " (car params) " res=" res)
    res))
  
(define (cdb:set-verbosity zmq-socket val)
  (cdb:client-call zmq-socket 'set-verbosity #f val))

(define (cdb:login zmq-socket keyval signature)

Modified megatest.scm from [786a96adc0] to [bab637bbe2].

286
287
288
289
290
291
292


293
294
295
296
297
298
299
300
301
302
303
304
305
306


307
308

309
310

311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
		      (id         (vector-ref server 0))
		      (pid        (vector-ref server 1))
		      (hostname   (vector-ref server 2))
		      (port       (vector-ref server 3))
		      (start-time (vector-ref server 4))
		      (priority   (vector-ref server 5))
		      (state      (vector-ref server 6))


		      (numclients #f)
		      (stat-numc  ;; (handle-exceptions
				  ;;  exn
				  ;;  (list #f (conc "EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)))
				   (let ((zmq-socket (server:client-connect hostname port)))
				     (if zmq-socket
					 (if (server:client-login zmq-socket)
					     (let ((numclients (cdb:num-clients zmq-socket))
						   (killed     #f))
					       (if (and khost-port ;; kill by host/port
							(equal? hostname (car khost-port))
							(equal? port (string->number (cadr khost-port))))
						   (begin
						     (open-run-close tasks:server-deregister tasks:open-db  hostname port: port)


						     (cdb:kill-server zmq-socket)
						     (debug:print-info 1 "Killed server by host:port at " hostname ":" port)

						     (set! killed #t))
						   (if (and kpid

							    (equal? kpid pid))
						       (begin
							 (open-run-close tasks:server-deregister tasks:open-db hostname pid: pid)
							 (set! killed #t)
							 (cdb:kill-server zmq-socket)
							 (debug:print-info 1 "Killed server by pid at " hostname ":" port))))
					       (if (not killed)(server:client-logout zmq-socket))
					       (close-socket  zmq-socket)
					       (list numclients "ACCESSIBLE")) ;; (server:client-logout zmq-socket)
					     (begin
					       (close-socket zmq-socket)
					       (list #f "CAN'T LOGIN")))
					 (list #f "CAN'T CONNECT"))))) ;; )
		 (format #t fmtstr id pid hostname port start-time priority 
			 (cadr stat-numc)(car stat-numc))))
	     servers)
	    (set! *didsomething* #t))))
    ;; if not list or kill then start a client (if appropriate)
    (if (or (let ((res #f))
	      (for-each
	       (lambda (key)
		 (if (args:get-arg key)(set! res #t)))







>
>
|
<
|
<
|
|
|
|
<
|
|
|
|
|
>
>
|
|
>
|
|
>
|
|
|
|
|
|
<
|
<
<
<
<
<

|







286
287
288
289
290
291
292
293
294
295

296

297
298
299
300

301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319

320





321
322
323
324
325
326
327
328
329
		      (id         (vector-ref server 0))
		      (pid        (vector-ref server 1))
		      (hostname   (vector-ref server 2))
		      (port       (vector-ref server 3))
		      (start-time (vector-ref server 4))
		      (priority   (vector-ref server 5))
		      (state      (vector-ref server 6))
		      (stat-numc  (server:ping hostname port))
		      (status     (car stat-numc))
		      (numclients (cadr stat-numc))

		      (killed     #f)

		      (zmq-socket (if status (server:client-connect hostname port) #f)))
		 ;; no need to login as status of #t indicates we are connecting to correct 
		 ;; server
		 (if (or (not status)    ;; no point in keeping dead records in the db

			 (and khost-port ;; kill by host/port
			      (equal? hostname (car khost-port))
			      (equal? port (string->number (cadr khost-port)))))
		     (begin
		       (open-run-close tasks:server-deregister tasks:open-db  hostname port: port)
		       (if status ;; #t means alive
			   (begin
			     (cdb:kill-server zmq-socket)
			     (debug:print-info 1 "Killed server by host:port at " hostname ":" port))
			   (debug:print-info 1 "Removing defunct server record for " hostname ":" port))
		       (set! killed #t)))
		 (if (and kpid
			  (equal? hostname (car khost-port))
			  (equal? kpid pid))
		     (begin
		       (open-run-close tasks:server-deregister tasks:open-db hostname pid: pid)
		       (set! killed #t)
		       (if status (cdb:kill-server zmq-socket))
		       (debug:print-info 1 "Killed server by pid at " hostname ":" port)))

		 ;; (if zmq-socket (close-socket  zmq-socket))





		 (format #t fmtstr id pid hostname port start-time priority 
			 status numclients)))
	     servers)
	    (set! *didsomething* #t))))
    ;; if not list or kill then start a client (if appropriate)
    (if (or (let ((res #f))
	      (for-each
	       (lambda (key)
		 (if (args:get-arg key)(set! res #t)))
905
906
907
908
909
910
911

912
913
914
915
916
917
918
919
920
      (set! *didsomething* #t)))

;;======================================================================
;; Exit and clean up
;;======================================================================

;; this is the socket if we are a client

(if (socket? *runremote*)
    (close-socket *runremote*))

(if (not *didsomething*)
    (debug:print 0 help))

;; (if *runremote* (rpc:close-all-connections!))
    
(if (not (eq? *globalexitstatus* 0))







>
|
|







902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
      (set! *didsomething* #t)))

;;======================================================================
;; Exit and clean up
;;======================================================================

;; this is the socket if we are a client
;; (if (and *runremote*
;; 	 (socket? *runremote*))
;;     (close-socket *runremote*))

(if (not *didsomething*)
    (debug:print 0 help))

;; (if *runremote* (rpc:close-all-connections!))
    
(if (not (eq? *globalexitstatus* 0))

Modified server.scm from [6908adafb0] to [326eb16dfb].

82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104

105
106
107
108
109
110
111
112
;;
(define (server:keep-running)
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  (let loop ((count 0))
    (thread-sleep! 1) ;; no need to do this very often
    (db:write-cached-data)
    (print "Server running, count is " count)
    (if (< count 10)
	(loop (+ count 1))
	(let ((numrunning (open-run-close db:get-count-tests-running #f)))
	  (if (or (> numrunning 0)
		  (> (+ *last-db-access* 60)(current-seconds)))
	      (begin
		(debug:print-info 0 "Server continuing, tests running: " numrunning ", seconds since last db access: " (- (current-seconds) *last-db-access*))
		(loop 0)))
	      (begin
		(debug:print-info 0 "Starting to shutdown the server side")
		;; need to delete only *my* server entry (future use)
		(open-run-close db:del-var #f "SERVER")
		(thread-sleep! 10)
		(debug:print-info 0 "Max cached queries was " *max-cache-size*)
		(debug:print-info 0 "Server shutdown complete. Exiting")

		)))))

(define (server:find-free-port-and-open host s port #!key (trynum 50))
  (let ((s (if s s (make-socket 'rep)))
	(p (if (number? port) port 5555)))
    (handle-exceptions
     exn
     (begin







|









|





>
|







82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
;;
(define (server:keep-running)
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  (let loop ((count 0))
    (thread-sleep! 1) ;; no need to do this very often
    (db:write-cached-data)
    ;; (print "Server running, count is " count)
    (if (< count 10)
	(loop (+ count 1))
	(let ((numrunning (open-run-close db:get-count-tests-running #f)))
	  (if (or (> numrunning 0)
		  (> (+ *last-db-access* 60)(current-seconds)))
	      (begin
		(debug:print-info 0 "Server continuing, tests running: " numrunning ", seconds since last db access: " (- (current-seconds) *last-db-access*))
		(loop 0)))
	      (begin
		(debug:print-info 0 "Starting to shutdown the server.")
		;; need to delete only *my* server entry (future use)
		(open-run-close db:del-var #f "SERVER")
		(thread-sleep! 10)
		(debug:print-info 0 "Max cached queries was " *max-cache-size*)
		(debug:print-info 0 "Server shutdown complete. Exiting")
		(open-run-close tasks:server-deregister-self tasks:open-db)
		(exit))))))

(define (server:find-free-port-and-open host s port #!key (trynum 50))
  (let ((s (if s s (make-socket 'rep)))
	(p (if (number? port) port 5555)))
    (handle-exceptions
     exn
     (begin
135
136
137
138
139
140
141

142
143
144


145
146

147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
  (if *my-client-signature* *my-client-signature*
      (let ((sig (server:mk-signature)))
	(set! *my-client-signature* sig)
	*my-client-signature*)))

;; 
(define (server:client-connect host port)

  (let ((connect-ok #f)
	(zmq-socket (make-socket 'req))
	(conurl     (server:make-server-url (list host port))))


    (connect-socket zmq-socket conurl)
    zmq-socket))

  

(define (server:client-login zmq-socket)
  (cdb:login zmq-socket *toppath* (server:get-client-signature)))

(define (server:client-logout zmq-socket)
  (let ((ok (and (socket? zmq-socket)
		 (cdb:logout zmq-socket *toppath* (server:get-client-signature)))))
    (close-socket zmq-socket)
    ok))

;; Do all the connection work, start a server if not already running
(define (server:client-setup #!key (numtries 10))
  (if (not *toppath*)(setup-for-run))
  (let ((hostinfo   (open-run-close tasks:get-best-server tasks:open-db)))
    (if hostinfo







>



>
>
|
|
>








|







136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
  (if *my-client-signature* *my-client-signature*
      (let ((sig (server:mk-signature)))
	(set! *my-client-signature* sig)
	*my-client-signature*)))

;; 
(define (server:client-connect host port)
  (debug:print 3 "client-connect " host ":" port)
  (let ((connect-ok #f)
	(zmq-socket (make-socket 'req))
	(conurl     (server:make-server-url (list host port))))
    (if (socket? zmq-socket)
	(begin
	  (connect-socket zmq-socket conurl)
	  zmq-socket)
	#f)))
  

(define (server:client-login zmq-socket)
  (cdb:login zmq-socket *toppath* (server:get-client-signature)))

(define (server:client-logout zmq-socket)
  (let ((ok (and (socket? zmq-socket)
		 (cdb:logout zmq-socket *toppath* (server:get-client-signature)))))
    ;; (close-socket zmq-socket)
    ok))

;; Do all the connection work, start a server if not already running
(define (server:client-setup #!key (numtries 10))
  (if (not *toppath*)(setup-for-run))
  (let ((hostinfo   (open-run-close tasks:get-best-server tasks:open-db)))
    (if hostinfo
195
196
197
198
199
200
201



202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218



































	      (sleep 10)
	      (server:client-setup numtries: (- numtries 1)))
	    (debug:print-info 1 "Too many retries, giving up")))))

(define (server:launch)
  (let* ((toppath (setup-for-run)))
    (debug:print-info 0 "Starting the standalone server")



    (if *toppath* 
	(let* ((th2 (make-thread (lambda ()
				   (server:run (args:get-arg "-server"))))))
	  ;; (th3 (make-thread (lambda ()
	  ;;       		   (server:keep-running)))))
	  (thread-start! th2)
	  ;; (thread-start! th3)
	  (set! *didsomething* #t)
	  (thread-join! th2))
	(debug:print 0 "ERROR: Failed to setup for megatest"))))

(define (server:client-launch)
  (if (server:client-setup)
      (debug:print-info 0 "connected as client")
      (begin
	(debug:print 0 "ERROR: Failed to connect as client")
	(exit))))










































>
>
>
|
|
|
|
|
|
|
|
|
|







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
	      (sleep 10)
	      (server:client-setup numtries: (- numtries 1)))
	    (debug:print-info 1 "Too many retries, giving up")))))

(define (server:launch)
  (let* ((toppath (setup-for-run)))
    (debug:print-info 0 "Starting the standalone server")
    (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db)))
      (if hostinfo
	  (debug:print-info 1 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo))
	  (if *toppath* 
	      (let* ((th2 (make-thread (lambda ()
					 (server:run (args:get-arg "-server")))))
		     (th3 (make-thread (lambda ()
					 (server:keep-running)))))
		(thread-start! th2)
		(thread-start! th3)
		(set! *didsomething* #t)
		(thread-join! th3))
	      (debug:print 0 "ERROR: Failed to setup for megatest"))))))

(define (server:client-launch)
  (if (server:client-setup)
      (debug:print-info 0 "connected as client")
      (begin
	(debug:print 0 "ERROR: Failed to connect as client")
	(exit))))

;; ping a server and return number of clients or #f (if no response)
(define (server:ping host port #!key (secs 10))
  (cdb:use-non-blocking-mode
   (lambda ()
     (let* ((res #f)
	    (th1 (make-thread
		  (lambda ()
		    (let ((zmq-socket (server:client-connect host port)))
		      (if zmq-socket
			  (if (server:client-login zmq-socket)
			      (let ((numclients (cdb:num-clients zmq-socket)))
				(server:client-logout zmq-socket)
				(close-socket  zmq-socket)
				(set! res (list #t numclients)))
			      (begin
				;; (close-socket zmq-socket)
				(set! res (list #f "CAN'T LOGIN"))))
			  (set! res (list #f "CAN'T CONNECT")))))))
	    (th2 (make-thread
		  (lambda ()
		    (let loop ((count 1))
		      (debug:print-info 1 "Ping " count " server on " host " at port " port)
		      (thread-sleep! 2)
		      (if (< count (/ secs 2))
			  (loop (+ count 1))))
		    ;; (thread-terminate! th1)
		    (set! res (list #f "TIMED OUT"))))))
       (thread-start! th2)
       (thread-start! th1)
       (handle-exceptions
	exn
	(set! res (list #f "TIMED OUT"))
	(thread-join! th1 secs))
       res))))

Modified tasks.scm from [d0c7d4c2b8] to [2d9a79950e].

116
117
118
119
120
121
122


123
124

125
126
127

128
129



130











131
132
133
134
135
136
137
     mdb
     "SELECT id,server_id,pid,hostname,cmdline,login_time,logout_time FROM clients WHERE server_id=?;"
     server-id)))

(define (tasks:have-clients? mdb server-id)
  (null? (tasks:get-logged-in-clients mdb server-id)))



(define (tasks:get-best-server mdb)
  (let ((res #f))

    (sqlite3:for-each-row
     (lambda (id hostname port)
       (set! res (list hostname port)))

     mdb
     "SELECT id,hostname,port FROM servers WHERE state='live' ORDER BY start_time DESC LIMIT 1;")



    res))












(define (tasks:get-all-servers mdb)
  (let ((res '()))
    (sqlite3:for-each-row
     (lambda (id pid hostname port start-time priority state)
       (set! res (cons (vector id pid hostname port start-time priority state) res)))
     mdb







>
>

|
>


|
>


>
>
>
|
>
>
>
>
>
>
>
>
>
>
>







116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
     mdb
     "SELECT id,server_id,pid,hostname,cmdline,login_time,logout_time FROM clients WHERE server_id=?;"
     server-id)))

(define (tasks:have-clients? mdb server-id)
  (null? (tasks:get-logged-in-clients mdb server-id)))

;; ping each server in the db and return first found that responds. 
;; remove any others. will not necessarily remove all!
(define (tasks:get-best-server mdb)
  (let ((res '())
	(best #f))
    (sqlite3:for-each-row
     (lambda (id hostname port)
       (set! res (cons (list hostname port) res))
       (debug:print-info 1 "Found " hostname ":" port))
     mdb
     "SELECT id,hostname,port FROM servers WHERE state='live' ORDER BY start_time DESC LIMIT 1;")
    (print "res=" res)
    (if (null? res) #f
	(let loop ((hed (car res))
		   (tal (cdr res)))
	  (print "hed=" hed ", tal=" tal)
	  (let* ((host (car hed))
		 (port (cadr hed))
		 (ping-res (server:ping host port)))
	    (if ping-res hed
		;; remove defunct server from table
		(begin
		  (open-run-close tasks:server-deregister tasks:open-db  host port: port)
		  (if (null? tal)
		      #f
		      (loop (car tal)(cdr tal))))))))))

(define (tasks:get-all-servers mdb)
  (let ((res '()))
    (sqlite3:for-each-row
     (lambda (id pid hostname port start-time priority state)
       (set! res (cons (vector id pid hostname port start-time priority state) res)))
     mdb

Modified tests/tests.scm from [aa0bbb1e5e] to [1b09dbc8f0].

102
103
104
105
106
107
108


109
110
111
112
113
114
115
	      (car res)))

(test #f #t (socket? *runremote*))

;; (test #f #t (server:client-setup))

(test #f #t (car (cdb:login *runremote* *toppath* *my-client-signature*)))



;;======================================================================
;; C O N F I G   F I L E S 
;;======================================================================

(define conffile #f)
(test "Read a config" #t (hash-table? (read-config "test.config" #f #f)))







>
>







102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
	      (car res)))

(test #f #t (socket? *runremote*))

;; (test #f #t (server:client-setup))

(test #f #t (car (cdb:login *runremote* *toppath* *my-client-signature*)))

(test #f #t (open-run-close tasks:get-best-server tasks:open-db))

;;======================================================================
;; C O N F I G   F I L E S 
;;======================================================================

(define conffile #f)
(test "Read a config" #t (hash-table? (read-config "test.config" #f #f)))