Megatest

Check-in [02ca954846]
Login
Overview
Comment:zmq transport, registration in monitor.db fix
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk | This is * a % #^$@ test of tagging
Files: files | file ages | folders
SHA1: 02ca954846fdcc42c7fd9ee66ea8059386c01ad1
User & Date: matt on 2013-01-31 22:41:28
Other Links: manifest | tags
Context
2013-02-04
21:19
Decrease some noise. Added more instrumentation check-in: 1398d61951 user: matt tags: trunk
2013-01-31
22:41
zmq transport, registration in monitor.db fix check-in: 02ca954846 user: matt tags: trunk, This is * a % #^$@ test of tagging
2013-01-30
23:32
Changed backoff rate and recover rate when no jobs can run but there are still jobs to run check-in: edda18813d user: mrwellan tags: trunk, v1.5212
Changes

Modified db.scm from [56cbe800e8] to [11ded6638d].

1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386







1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397


1398
1399
1400
1401
1402
1403
1404
1373
1374
1375
1376
1377
1378
1379







1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406







-
-
-
-
-
-
-
+
+
+
+
+
+
+











+
+







   #f))


;; The queue is a list of vectors where the zeroth slot indicates the type of query to
;; apply and the second slot is the time of the query and the third entry is a list of 
;; values to be applied
;;
(define (db:process-queue db pubsock indata)
  (let* ((data       (sort indata (lambda (a b)
				    (< (cdb:packet-get-qtime a)(cdb:packet-get-qtime b))))))
    (for-each
     (lambda (item)
       (db:process-queue-item db pubsock item))
     data)))
;; (define (db:process-queue db pubsock indata)
;;   (let* ((data       (sort indata (lambda (a b)
;; 				    (< (cdb:packet-get-qtime a)(cdb:packet-get-qtime b))))))
;;     (for-each
;;      (lambda (item)
;;        (db:process-queue-item db pubsock item))
;;      data)))

(define (db:process-queue-item db item)
  (let* ((stmt-key       (cdb:packet-get-qtype item))
	 (qry-sig        (cdb:packet-get-query-sig item))
	 (return-address (cdb:packet-get-client-sig item))
	 (params         (cdb:packet-get-params item))
	 (query          (let ((q (alist-ref stmt-key db:queries)))
			   (if q (car q) #f))))
    (debug:print-info 11 "Special queries/requests stmt-key=" stmt-key ", return-address=" return-address ", query=" query ", params=" params)
    (cond
     (query
      ;; transactionize needed here.

      (apply sqlite3:execute db query params)
      (server:reply return-address qry-sig #t #t))
     ((member stmt-key db:special-queries)
      (debug:print-info 11 "Handling special statement " stmt-key)
      (case stmt-key
	((immediate)
	 (let ((proc      (car params))

Modified zmq-transport.scm from [84997153bf] to [20e9358f02].

158
159
160
161
162
163
164

165
166
167
168
169
170
171
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172







+







				(sleep 4)
				(loop))))))
	 (iface       (cadr server-info))
	 (pullport    (caddr server-info))
	 (pubport     (cadddr server-info)) ;; id interface pullport pubport)
	 (zmq-sockets (zmq-transport:client-connect iface pullport pubport))
	 (last-access 0))
    (debug:print-info 11 "heartbeat started for zmq server on " iface " " pullport " " pubport)
    (let loop ((count 0))
      (thread-sleep! 4) ;; no need to do this very often
      ;; NB// sync currently does NOT return queue-length
      (let ((queue-len (cdb:client-call zmq-sockets 'sync #t 1)))
      ;; (print "Server running, count is " count)
	(if (< count 1) ;; 3x3 = 9 secs aprox
	    (loop (+ count 1)))
222
223
224
225
226
227
228
229








230
231
232
233
234
235
236
223
224
225
226
227
228
229

230
231
232
233
234
235
236
237
238
239
240
241
242
243
244







-
+
+
+
+
+
+
+
+







  (let* ((s1 (zmq-transport:find-free-port-and-open ipaddrstr #f startport 'pull))
         (p1 (caddr s1))
         (s2 (zmq-transport:find-free-port-and-open ipaddrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pub))
         (p2 (caddr s2)))
    (set! *runremote* #f)
    (debug:print 0 "Server started on " ipaddrstr " ports " p1 " and " p2)
    (mutex-lock! *heartbeat-mutex*)
    (set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) ipaddrstr p1 p2 0 'live 'zmq))
    (set! *server-info* (open-run-close tasks:server-register 
					tasks:open-db 
					(current-process-id) 
					ipaddrstr p1 
					0 
					'live
					'zmq
					pubport: p2))
    (mutex-unlock! *heartbeat-mutex*)
    (list s1 s2)))

(define (zmq-transport:mk-signature)
  (message-digest-string (md5-primitive) 
			 (with-output-to-string
			   (lambda ()
269
270
271
272
273
274
275

276
277
278
279
280
281
282
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291







+







(define (zmq-transport:client-connect iface pullport pubport)
  (let* ((push-socket (zmq-transport:client-socket-connect iface pullport type: 'push))
	 (sub-socket  (zmq-transport:client-socket-connect iface pubport
						    type: 'sub
						    subscriptions: (list (zmq-transport:get-client-signature) "all")))
	 (zmq-sockets (vector push-socket sub-socket))
	 (login-res   #f))
    (debug:print-info 11 "zmq-transport:client-connect started. Next is login")
    (set! login-res (zmq-transport:client-login zmq-sockets))
    (if (and (not (null? login-res))
	     (car login-res))
	(begin
	  (debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".")
	  (set! *runremote* zmq-sockets)
	  zmq-sockets)