Megatest

Check-in [d0462389b2]
Login
Overview
Comment:Bit's 'n pieces
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | interleaved-queries
Files: files | file ages | folders
SHA1: d0462389b21f69d6a310466de51d6b1994e9539c
User & Date: matt on 2012-11-15 01:44:51
Other Links: branch diff | manifest | tags
Context
2012-11-15
23:42
Partially complete, just taking a snapshot check-in: 81e546a994 user: matt tags: interleaved-queries
01:44
Bit's 'n pieces check-in: d0462389b2 user: matt tags: interleaved-queries
2012-11-12
22:34
Added outline for interleaved queries check-in: a72841295a user: matt tags: interleaved-queries
Changes

Modified db.scm from [108dfe0472] to [b0135e31f1].

1179
1180
1181
1182
1183
1184
1185
1186
1187



1188
1189
1190
1191
1192
1193
1194
1195
1196

1197
1198


1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
(define (cdb:use-non-blocking-mode proc)
  (set! *client-non-blocking-mode* #t)
  (let ((res (proc)))
    (set! *client-non-blocking-mode* #f)
    res))
  
;; params = 'target cached remparams
(define (cdb:client-call zmq-socket . params)
  (debug:print-info 11 "cdb:client-call zmq-socket=" zmq-socket " params=" params)



  (let ((zdat (db:obj->string params)) ;; (with-output-to-string (lambda ()(serialize params))))
	(res  #f))
    ;; (signal-mask! signal/int)
    (set! *received-response* #f)
    (send-message zmq-socket zdat)
    ;; (signal-unmask! signal/int)
    (set! res (db:string->obj (if *client-non-blocking-mode* 
				  (receive-message* zmq-socket)
				  (receive-message  zmq-socket))))

    (set! *received-response* #t)
    (debug:print-info 11 "zmq-socket " (car params) " res=" res)


    res))
  
(define (cdb:set-verbosity zmq-socket val)
  (cdb:client-call zmq-socket 'set-verbosity #f val))

(define (cdb:login zmq-socket keyval signature)
  (cdb:client-call zmq-socket 'login #t keyval megatest-version signature))

(define (cdb:logout zmq-socket keyval signature)
  (cdb:client-call zmq-socket 'logout #t keyval signature))

(define (cdb:num-clients zmq-socket)
  (cdb:client-call zmq-socket 'numclients #t))








|
|
>
>
>
|
|
<
|
<
<
|
|
|
>
|
|
>
>
|




|
|







1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192

1193


1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
(define (cdb:use-non-blocking-mode proc)
  (set! *client-non-blocking-mode* #t)
  (let ((res (proc)))
    (set! *client-non-blocking-mode* #f)
    res))
  
;; params = 'target cached remparams
(define (cdb:client-call zmq-sockets . params)
  (debug:print-info 11 "cdb:client-call zmq-sockets=" zmq-sockets " params=" params)
  (let* ((push-socket (vector-ref zmq-sockets 0))
	 (sub-socket  (vector-ref zmq-sockets 1))
	 (query-id (conc (server:get-client-signature) "-" (message-digest-string (md5-primitive) (conc params))))
	 (zdat (db:obj->string (vector query-id params))) ;; (with-output-to-string (lambda ()(serialize params))))
	 (res  #f)

	 (get-res (lambda ()


		    (db:string->obj (if *client-non-blocking-mode* 
					(receive-message* zmq-socket)
					(receive-message  zmq-socket))))))
    (send-message zmq-socket zdat)
    (let loop ((res (get-res)))
      (if res res
	  (begin 
	    (thread-sleep! 0.5)
	    (get-res))))))
  
(define (cdb:set-verbosity zmq-socket val)
  (cdb:client-call zmq-socket 'set-verbosity #f val))

(define (cdb:login zmq-sockets keyval signature)
  (cdb:client-call zmq-sockets 'login #t keyval megatest-version signature))

(define (cdb:logout zmq-socket keyval signature)
  (cdb:client-call zmq-socket 'logout #t keyval signature))

(define (cdb:num-clients zmq-socket)
  (cdb:client-call zmq-socket 'numclients #t))

Modified server.scm from [20ea8c6ce6] to [129e174847].

31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
;;   1. client sends request to server via push to the pull port
;;   2. server puts request in queue or processes immediately as appropriate
;;   3. server puts responses from completed requests into pub port 
;;
;; TODO
;;
;; Done Tested
;; [ ]  [ ]    1. Add columns pullport pubport to servers table
;; [ ]  [ ]    2. Add rm of monitor.db if older than 11/12/2012 
;; [x]  [ ]    3. Add create of pullport and pubport with finding of available ports
;; [ ]  [ ]    4. Add client compose of request
;; [ ]  [ ]        - name of client: testname/itempath-test_id-hostname 
;; [ ]  [ ]        - name of request: callname, params
;; [ ]  [ ]        - request key: f(clientname, callname, params)
;; [ ]  [ ]    5. Add processing of subscription hits
;; [ ]  [ ]        - done when get key 







|
|







31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
;;   1. client sends request to server via push to the pull port
;;   2. server puts request in queue or processes immediately as appropriate
;;   3. server puts responses from completed requests into pub port 
;;
;; TODO
;;
;; Done Tested
;; [x]  [ ]    1. Add columns pullport pubport to servers table
;; [x]  [ ]    2. Add rm of monitor.db if older than 11/12/2012 
;; [x]  [ ]    3. Add create of pullport and pubport with finding of available ports
;; [ ]  [ ]    4. Add client compose of request
;; [ ]  [ ]        - name of client: testname/itempath-test_id-hostname 
;; [ ]  [ ]        - name of request: callname, params
;; [ ]  [ ]        - request key: f(clientname, callname, params)
;; [ ]  [ ]    5. Add processing of subscription hits
;; [ ]  [ ]        - done when get key 
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288

289
290
291
292
293
294
295
296
297
298
299
300
301


302
303
304


305

306
307
308
309
310
311
312
(define (server:get-client-signature)
  (if *my-client-signature* *my-client-signature*
      (let ((sig (server:mk-signature)))
	(set! *my-client-signature* sig)
	*my-client-signature*)))

;; 
(define (server:client-connect iface port #!key (context #f))
  (debug:print-info 3 "client-connect " iface ":" port)
  (let ((connect-ok #f)
	(zmq-socket (if context 
			(make-socket 'req context)
			(make-socket 'req)))
	(conurl     (server:make-server-url (list iface port))))
    (if (socket? zmq-socket)
	(begin
	  (connect-socket zmq-socket conurl)
	  zmq-socket)
	#f)))
  

(define (server:client-login zmq-socket)
  (cdb:login zmq-socket *toppath* (server:get-client-signature)))

(define (server:client-logout zmq-socket)
  (let ((ok (and (socket? zmq-socket)
		 (cdb:logout zmq-socket *toppath* (server:get-client-signature)))))
    ;; (close-socket zmq-socket)
    ok))

;; Do all the connection work, start a server if not already running
(define (server:client-setup #!key (numtries 50))
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: failed to find megatest.config, exiting")
	    (exit))))
  (let ((hostinfo   (open-run-close tasks:get-best-server tasks:open-db)))
    (if hostinfo
	(let ((host    (car   hostinfo))
	      (iface   (cadr  hostinfo))
	      (port    (caddr hostinfo)))

	  (debug:print-info 2 "Setting up to connect to " hostinfo)
	  (handle-exceptions
	   exn
	   (begin
	     ;; something went wrong in connecting to the server. In this scenario it is ok
	     ;; to try again
	     (debug:print 0 "ERROR: Failed to open a connection to the server at: " hostinfo)
	     (debug:print 0 "   EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
	     (debug:print 0 "   perhaps jobs killed with -9? Removing server records")
	     (open-run-close tasks:server-deregister tasks:open-db host port: port)
	     (server:client-setup (- numtries 1))
	     #f)
	   (let* ((zmq-socket (server:client-connect iface port))


		  (login-res  (server:client-login zmq-socket))
		  (connect-ok (if (null? login-res) #f (car login-res)))
		  (conurl     (server:make-server-url (list iface port))))


	     (if connect-ok

		 (begin
		   (debug:print-info 2 "Logged in and connected to " conurl)
		   (set! *runremote* zmq-socket)
		   #t)
		 (begin
		   (debug:print-info 2 "Failed to login or connect to " conurl)
		   (set! *runremote* #f)







|



|
|








|
|
















|
|
|
>












|
>
>
|
|

>
>
|
>







247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
(define (server:get-client-signature)
  (if *my-client-signature* *my-client-signature*
      (let ((sig (server:mk-signature)))
	(set! *my-client-signature* sig)
	*my-client-signature*)))

;; 
(define (server:client-connect iface port #!key (context #f)(type 'req))
  (debug:print-info 3 "client-connect " iface ":" port)
  (let ((connect-ok #f)
	(zmq-socket (if context 
			(make-socket type context)
			(make-socket type)))
	(conurl     (server:make-server-url (list iface port))))
    (if (socket? zmq-socket)
	(begin
	  (connect-socket zmq-socket conurl)
	  zmq-socket)
	#f)))
  

(define (server:client-login zmq-sockets)
  (cdb:login zmq-sockets *toppath* (server:get-client-signature)))

(define (server:client-logout zmq-socket)
  (let ((ok (and (socket? zmq-socket)
		 (cdb:logout zmq-socket *toppath* (server:get-client-signature)))))
    ;; (close-socket zmq-socket)
    ok))

;; Do all the connection work, start a server if not already running
(define (server:client-setup #!key (numtries 50))
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: failed to find megatest.config, exiting")
	    (exit))))
  (let ((hostinfo   (open-run-close tasks:get-best-server tasks:open-db)))
    (if hostinfo
	(let ((host     (list-ref hostinfo 0))
	      (iface    (list-ref hostinfo 1))
	      (pullport (list-ref hostinfo 2))
	      (pubport  (list-ref hostinfo 3)))
	  (debug:print-info 2 "Setting up to connect to " hostinfo)
	  (handle-exceptions
	   exn
	   (begin
	     ;; something went wrong in connecting to the server. In this scenario it is ok
	     ;; to try again
	     (debug:print 0 "ERROR: Failed to open a connection to the server at: " hostinfo)
	     (debug:print 0 "   EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
	     (debug:print 0 "   perhaps jobs killed with -9? Removing server records")
	     (open-run-close tasks:server-deregister tasks:open-db host port: port)
	     (server:client-setup (- numtries 1))
	     #f)
	   (let* ((push-socket (server:client-connect iface pullport 'push))
		  (sub-socket  (server:client-connect iface pubport  'sub))
		  (zmq-sockets (vector push-socket sub-socket))
		  (login-res   #f)
		  ;; (connect-ok 
		  (conurl     (server:make-server-url (list iface port))))
	     (socket-option-set! sub-socket 'subscribe  (server:get-client-signature))
	     (set! login-res (server:client-login zmq-sockets))
	     (if (and (not (null? login-res))
		      (car login-res))
		 (begin
		   (debug:print-info 2 "Logged in and connected to " conurl)
		   (set! *runremote* zmq-socket)
		   #t)
		 (begin
		   (debug:print-info 2 "Failed to login or connect to " conurl)
		   (set! *runremote* #f)

Modified tasks.scm from [53e3d6e8bd] to [90e00b3daa].

20
21
22
23
24
25
26
27




28
29
30
31
32
33
34

;;======================================================================
;; Tasks db
;;======================================================================

(define (tasks:open-db)
  (let* ((dbpath  (conc *toppath* "/monitor.db"))
	 (exists  (file-exists? dbpath))




	 (mdb     (sqlite3:open-database dbpath)) ;; (never-give-up-open-db dbpath))
	 (handler (make-busy-timeout 36000)))
    (sqlite3:set-busy-handler! mdb handler)
    (sqlite3:execute mdb (conc "PRAGMA synchronous = 0;"))
    (if (not exists)
	(begin
	  (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS tasks_queue (id INTEGER PRIMARY KEY,







|
>
>
>
>







20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

;;======================================================================
;; Tasks db
;;======================================================================

(define (tasks:open-db)
  (let* ((dbpath  (conc *toppath* "/monitor.db"))
	 (exists  (if (file-exists? dbpath)
		      ;; BUGGISHNESS: Remove this code in six months. Today is 11/13/2012
		      (if (< (file-change-time dbpath) 1352851396.0)
			  (begin (delete-file dbpath) #f)
			  #t) #t))
	 (mdb     (sqlite3:open-database dbpath)) ;; (never-give-up-open-db dbpath))
	 (handler (make-busy-timeout 36000)))
    (sqlite3:set-busy-handler! mdb handler)
    (sqlite3:execute mdb (conc "PRAGMA synchronous = 0;"))
    (if (not exists)
	(begin
	  (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS tasks_queue (id INTEGER PRIMARY KEY,
50
51
52
53
54
55
56
57

58
59
60
61
62
63
64
                                hostname TEXT,
                                username TEXT,
                               CONSTRAINT monitors_constraint UNIQUE (pid,hostname));")
	  (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS servers (id INTEGER PRIMARY KEY,
                                  pid INTEGER,
                                  interface TEXT,
                                  hostname TEXT,
                                  port INTEGER,

                                  start_time TIMESTAMP,
                                  priority INTEGER,
                                  state TEXT,
                                  mt_version TEXT,
                                  heartbeat TIMESTAMP,
                               CONSTRAINT servers_constraint UNIQUE (pid,hostname,port));")
	  (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS clients (id INTEGER PRIMARY KEY,







|
>







54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
                                hostname TEXT,
                                username TEXT,
                               CONSTRAINT monitors_constraint UNIQUE (pid,hostname));")
	  (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS servers (id INTEGER PRIMARY KEY,
                                  pid INTEGER,
                                  interface TEXT,
                                  hostname TEXT,
                                  pullport INTEGER,
                                  pubport  INTEGER,
                                  start_time TIMESTAMP,
                                  priority INTEGER,
                                  state TEXT,
                                  mt_version TEXT,
                                  heartbeat TIMESTAMP,
                               CONSTRAINT servers_constraint UNIQUE (pid,hostname,port));")
	  (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS clients (id INTEGER PRIMARY KEY,
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174

175
176
177
178
179
180
181
182
183
184


185
186
187
188







189
190
191
192
193
194
195
  (let ((res '())
	(best #f))
    (sqlite3:for-each-row
     (lambda (id hostname interface port pid)
       (set! res (cons (list hostname interface port pid) res))
       (debug:print-info 2 "Found existing server " hostname ":" port " registered in db"))
     mdb
     "SELECT id,hostname,interface,port,pid FROM servers WHERE state='live' AND mt_version=? ORDER BY start_time ASC LIMIT 1;" megatest-version)
    ;; (print "res=" res)
    (if (null? res) #f
	(let loop ((hed (car res))
		   (tal (cdr res)))
	  ;; (print "hed=" hed ", tal=" tal)
	  (let* ((host     (car    hed))
		 (iface    (cadr   hed))

		 (port     (caddr  hed))
		 (pid      (cadddr hed))
		 (alive    (open-run-close tasks:server-alive? tasks:open-db #f hostname: host port: port)))
	    (if alive
		(begin
		  (debug:print-info 2 "Found an existing, alive, server " host ":" port ".")
		  (list host iface port))
		(begin
		  (debug:print-info 1 "Removing " host ":" port " from server registry as it appears to be dead")
		  (tasks:kill-server #f host port pid)


		  (if (null? tal)
		      #f
		      (loop (car tal)(cdr tal))))))))))








(define (tasks:kill-server status hostname port pid)
  (debug:print-info 1 "Removing defunct server record for " hostname ":" port)
  (if port
      (open-run-close tasks:server-deregister tasks:open-db hostname port: port)
      (open-run-close tasks:server-deregister tasks:open-db hostname pid:  pid))
  (if status ;; #t means alive
      (begin







|





|
|
>
|
|
|


|
|

|
|
>
>




>
>
>
>
>
>
>







165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
  (let ((res '())
	(best #f))
    (sqlite3:for-each-row
     (lambda (id hostname interface port pid)
       (set! res (cons (list hostname interface port pid) res))
       (debug:print-info 2 "Found existing server " hostname ":" port " registered in db"))
     mdb
     "SELECT id,hostname,interface,pullport,pubport,pid FROM servers WHERE state='live' AND mt_version=? ORDER BY start_time ASC LIMIT 1;" megatest-version)
    ;; (print "res=" res)
    (if (null? res) #f
	(let loop ((hed (car res))
		   (tal (cdr res)))
	  ;; (print "hed=" hed ", tal=" tal)
	  (let* ((host     (list-ref hed 0))
		 (iface    (list-ref hed 1))
		 (pullport (list-ref hed 2))
		 (pubport  (list-ref hed 3))
		 (pid      (list-ref hed 4))
		 (alive    (open-run-close tasks:server-alive? tasks:open-db #f hostname: host pullport: pullport)))
	    (if alive
		(begin
		  (debug:print-info 2 "Found an existing, alive, server " host ", " pullport " and " pubport ".")
		  (list host iface pullport pubport))
		(begin
		  (debug:print-info 1 "Marking " host ":" pullport " as dead in server registry.")
		  (if port
		      (open-run-close tasks:server-deregister tasks:open-db host pullport: pullport)
		      (open-run-close tasks:server-deregister tasks:open-db host pid:  pid))
		  (if (null? tal)
		      #f
		      (loop (car tal)(cdr tal))))))))))

(define (tasks:mark-server hostname pullport pid state)
  (if port
      (open-run-close tasks:server-deregister tasks:open-db hostname port: port)
      (open-run-close tasks:server-deregister tasks:open-db hostname pid:  pid)))


;; NOTE: NOT PORTED TO WORK WITH pullport/pubport
(define (tasks:kill-server status hostname port pid)
  (debug:print-info 1 "Removing defunct server record for " hostname ":" port)
  (if port
      (open-run-close tasks:server-deregister tasks:open-db hostname port: port)
      (open-run-close tasks:server-deregister tasks:open-db hostname pid:  pid))
  (if status ;; #t means alive
      (begin