Megatest

Check-in [452be75fb9]
Login
Overview
Comment:Partial fix for run-id of zero server refusing to start when other servers are in the available state
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | re-re-factor-server
Files: files | file ages | folders
SHA1: 452be75fb9a93e2350fd6a887fd80570819e5c0a
User & Date: matt on 2014-02-16 23:42:04
Other Links: branch diff | manifest | tags
Context
2014-02-17
18:26
Partially completed rework of server/client logic check-in: 2b3405f60c user: matt tags: re-re-factor-server
2014-02-16
23:42
Partial fix for run-id of zero server refusing to start when other servers are in the available state check-in: 452be75fb9 user: matt tags: re-re-factor-server
22:26
Removed check for megatest version on connecting to server. Can't have more than one server and api should be tolerant to minor version differences. check-in: b6474c4a62 user: matt tags: re-re-factor-server
Changes

Modified http-transport.scm from [c086471463] to [ab4a44cd0c].

262
263
264
265
266
267
268


269
270
271
272
273
274
275
			      (mutex-unlock! *http-mutex*)))
	      (time-out     (lambda ()
			      (thread-sleep! 45)
			      (if (not res)
				  (begin
				    (debug:print 0 "WARNING: communication with the server timed out.")
				    (mutex-unlock! *http-mutex*)


				    (http-transport:client-send-receive serverdat msg numretries: (- numretries 1))
				    (if (< numretries 3) ;; on last try just exit
					(begin
					  (debug:print 0 "ERROR: communication with the server timed out. Giving up.")
					  (exit 1)))))))
	      (th1 (make-thread send-recieve "with-input-from-request"))
	      (th2 (make-thread time-out     "time out")))







>
>







262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
			      (mutex-unlock! *http-mutex*)))
	      (time-out     (lambda ()
			      (thread-sleep! 45)
			      (if (not res)
				  (begin
				    (debug:print 0 "WARNING: communication with the server timed out.")
				    (mutex-unlock! *http-mutex*)
				    ;; Maybe the server died? Try starting it up.
				    (server:ensure-running run-id)
				    (http-transport:client-send-receive serverdat msg numretries: (- numretries 1))
				    (if (< numretries 3) ;; on last try just exit
					(begin
					  (debug:print 0 "ERROR: communication with the server timed out. Giving up.")
					  (exit 1)))))))
	      (th1 (make-thread send-recieve "with-input-from-request"))
	      (th2 (make-thread time-out     "time out")))
283
284
285
286
287
288
289







290
291
292
293
294
295
296
297
298
299
300
301
302
303

304


305
306
307
308
309
310
311
	   (let ((final (cadr match)))
	     (debug:print-info 11 "final=" final)
	     final)))))))

;; Send "cmd" with json payload "params" to serverdat and receive result
;;
(define (http-transport:client-api-send-receive run-id serverdat cmd params #!key (numretries 30))







  (let* ((fullurl    (if (list? serverdat)
			 (cadddr serverdat) ;; this is the uri for /api
			 (begin
			   (debug:print 0 "FATAL ERROR: http-transport:client-send-receive called with no server info")
			   (exit 1))))
	 (res        #f))
    (handle-exceptions
     exn
     (begin
       ;; TODO: Send this output to a log file so it isn't lost when running as daemon
       (if (> numretries 0)
	   ;; on the zeroeth retry do not print the error message - this allows the call to be used as a ping (no junk on output).
	   (begin
	     (print "ERROR IN http-transport:client-send-receive " ((condition-property-accessor 'exn 'message) exn))

	     (if (> (random 100) 80)(server:ensure-running run-id)) ;; every so often try starting a server


	     (http-transport:client-api-send-receive run-id serverdat cmd params numretries: (- numretries 1)))
	   #f))
     (begin
       (debug:print-info 11 "fullurl=" fullurl ", cmd=" cmd ", params=" params ", run-id=" run-id "\n")
       ;; set up the http-client here
       (max-retry-attempts 5)
       ;; consider all requests indempotent







>
>
>
>
>
>
>













|
>
|
>
>







285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
	   (let ((final (cadr match)))
	     (debug:print-info 11 "final=" final)
	     final)))))))

;; Send "cmd" with json payload "params" to serverdat and receive result
;;
(define (http-transport:client-api-send-receive run-id serverdat cmd params #!key (numretries 30))
  (if (not serverdat) ;; get #f, something went wrong. try starting the server again and reconnecting
      (begin
	;; try to restart the server and then reconnect
	(server:ensure-running run-id)
	(hash-table-delete! *runremote* run-id)
	(client:setup run-id)
	(set! serverdat (hash-table-ref/default *runremote* run-id #f))))
  (let* ((fullurl    (if (list? serverdat)
			 (cadddr serverdat) ;; this is the uri for /api
			 (begin
			   (debug:print 0 "FATAL ERROR: http-transport:client-send-receive called with no server info")
			   (exit 1))))
	 (res        #f))
    (handle-exceptions
     exn
     (begin
       ;; TODO: Send this output to a log file so it isn't lost when running as daemon
       (if (> numretries 0)
	   ;; on the zeroeth retry do not print the error message - this allows the call to be used as a ping (no junk on output).
	   (begin
	     (print "ERROR IN http-transport:client-api-send-receive " ((condition-property-accessor 'exn 'message) exn))
	     ;; try to restart the server and then reconnect
	     (server:ensure-running run-id)
	     (hash-table-delete! *runremote* run-id)
	     (client:setup run-id)
	     (http-transport:client-api-send-receive run-id serverdat cmd params numretries: (- numretries 1)))
	   #f))
     (begin
       (debug:print-info 11 "fullurl=" fullurl ", cmd=" cmd ", params=" params ", run-id=" run-id "\n")
       ;; set up the http-client here
       (max-retry-attempts 5)
       ;; consider all requests indempotent
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
				  ;;      (set! res dat)))))))
	      (time-out     (lambda ()
			      (thread-sleep! 45)
			      (if (not res)
				  (begin
				    (debug:print 0 "WARNING: communication with the server timed out.")
				    (mutex-unlock! *http-mutex*)
				    (http-transport:client-api-send-receive serverdat cmd params numretries: (- numretries 1))
				    (if (< numretries 3) ;; on last try just exit
					(begin
					  (debug:print 0 "ERROR: communication with the server timed out. Giving up.")
					  (exit 1)))))))
	      (th1 (make-thread send-recieve "with-input-from-request"))
	      (th2 (make-thread time-out     "time out")))
	 (thread-start! th1)







|







360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
				  ;;      (set! res dat)))))))
	      (time-out     (lambda ()
			      (thread-sleep! 45)
			      (if (not res)
				  (begin
				    (debug:print 0 "WARNING: communication with the server timed out.")
				    (mutex-unlock! *http-mutex*)
				    (http-transport:client-api-send-receive run-id serverdat cmd params numretries: (- numretries 1))
				    (if (< numretries 3) ;; on last try just exit
					(begin
					  (debug:print 0 "ERROR: communication with the server timed out. Giving up.")
					  (exit 1)))))))
	      (th1 (make-thread send-recieve "with-input-from-request"))
	      (th2 (make-thread time-out     "time out")))
	 (thread-start! th1)

Modified megatest.scm from [9958005d06] to [51576793bd].

337
338
339
340
341
342
343

344

345
346
347
348
349
350
351

    ;; Server? Start up here.
    ;;
    (let ((tl        (setup-for-run))
	  (run-id    (and (args:get-arg "-run-id")
			  (string->number (args:get-arg "-run-id")))))
      (if run-id

	  (server:launch run-id)

	  (debug:print 0 "ERROR: server requires run-id be specified with -run-id")))

    ;; Not a server? This section will decide how to communicate
    ;;
    ;;  Setup client for all expect listed here
    (if (null? (lset-intersection 
		     equal?







>
|
>







337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353

    ;; Server? Start up here.
    ;;
    (let ((tl        (setup-for-run))
	  (run-id    (and (args:get-arg "-run-id")
			  (string->number (args:get-arg "-run-id")))))
      (if run-id
	  (begin
	    (server:launch run-id)
	    (set! *didsomething* #t))
	  (debug:print 0 "ERROR: server requires run-id be specified with -run-id")))

    ;; Not a server? This section will decide how to communicate
    ;;
    ;;  Setup client for all expect listed here
    (if (null? (lset-intersection 
		     equal?

Modified server.scm from [6c4eab0178] to [e93cd93c50].

48
49
50
51
52
53
54





55
56
57
58
59
60
61
;; start_server
;;
(define (server:launch run-id)
  (if (server:check-if-running run-id)
      ;; a server is already running
      (exit)
      (http-transport:launch run-id)))






;;======================================================================
;; Q U E U E   M A N A G E M E N T
;;======================================================================

;; We don't want to flush the queue if it was just flushed
(define *server:last-write-flush* (current-milliseconds))







>
>
>
>
>







48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
;; start_server
;;
(define (server:launch run-id)
  (if (server:check-if-running run-id)
      ;; a server is already running
      (exit)
      (http-transport:launch run-id)))

(define (server:launch-no-exit run-id)
  (if (server:check-if-running run-id)
      #t ;; if running
      (http-transport:launch run-id)))

;;======================================================================
;; Q U E U E   M A N A G E M E N T
;;======================================================================

;; We don't want to flush the queue if it was just flushed
(define *server:last-write-flush* (current-milliseconds))

Modified tasks.scm from [6d5f34e707] to [fe9409354b].

89
90
91
92
93
94
95
96



97
98
99
100
101
102




103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
(define (tasks:hostinfo-get-port        vec)    (vector-ref  vec 2))
(define (tasks:hostinfo-get-pubport     vec)    (vector-ref  vec 3))
(define (tasks:hostinfo-get-transport   vec)    (vector-ref  vec 4))
(define (tasks:hostinfo-get-pid         vec)    (vector-ref  vec 5))
(define (tasks:hostinfo-get-hostname    vec)    (vector-ref  vec 6))

(define (tasks:server-lock-slot mdb run-id)
  (let ((res '())



	(best #f))
    (tasks:server-clean-out-old-records-for-run-id mdb run-id)
    (if (tasks:less-than-two-available mdb run-id)
	(tasks:server-set-available mdb run-id))
    (thread-sleep! 2) ;; Try removing this. It may not be needed.
    (tasks:server-am-i-the-server? mdb run-id)))




	
;; register that this server may come online (first to register goes though with the process)
(define (tasks:server-set-available mdb run-id)
  (sqlite3:execute 
   mdb 
   "INSERT INTO servers (pid,hostname,port,pubport,start_time,      priority,state,mt_version,heartbeat,   interface,transport,run_id)
                   VALUES(?, ?,       ?,   ?, strftime('%s','now'), ?,       ?,    ?,-1,?,        ?,        ?);"
   (current-process-id)       ;; pid
   (get-host-name)            ;; hostname
   -1                         ;; port
   -1                         ;; pubport
   (random 1000)              ;; priority (used a tiebreaker on get-available)
   "available"                ;; state
   (common:version-signature) ;; mt_version
   -1                         ;; interface
   "http"                     ;; transport
   run-id
   ))

(define (tasks:less-than-two-available mdb run-id)
  (let ((res 0))
    (sqlite3:for-each-row
     (lambda (num-in-queue)
       (set! res num-in-queue))
     mdb
     "SELECT count(id) FROM servers WHERE run_id=?;"
     run-id)
    (< res 3)))

(define (tasks:server-clean-out-old-records-for-run-id mdb run-id)
  (sqlite3:execute mdb "DELETE FROM servers WHERE state in ('available','shutting-down') AND (strftime('%s','now') - start_time) > 30 AND run_id=?;" run-id)
  (if (server:check-if-running run-id)
      (sqlite3:execute mdb "DELETE FROM servers WHERE run_id=?;" run-id)))

(define (tasks:server-force-clean-running-records-for-run-id mdb run-id)
  (sqlite3:execute mdb "DELETE FROM servers WHERE state = 'running' AND run_id=?;" run-id))

(define (tasks:server-set-state! mdb server-id state)







|
>
>
>
|
|
|
|
|
|
>
>
>
>



















|







|


|







89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
(define (tasks:hostinfo-get-port        vec)    (vector-ref  vec 2))
(define (tasks:hostinfo-get-pubport     vec)    (vector-ref  vec 3))
(define (tasks:hostinfo-get-transport   vec)    (vector-ref  vec 4))
(define (tasks:hostinfo-get-pid         vec)    (vector-ref  vec 5))
(define (tasks:hostinfo-get-hostname    vec)    (vector-ref  vec 6))

(define (tasks:server-lock-slot mdb run-id)
  (let loop ((res       #f)
	     (num-tries 0))
    (if (and (< num-tries 5)
	     (not res))
	(begin
	  (tasks:server-clean-out-old-records-for-run-id mdb run-id)
	  (if (< (tasks:num-in-available-state mdb run-id) 4)
	      (tasks:server-set-available mdb run-id))
	  (thread-sleep! 2) ;; Try removing this. It may not be needed.
	  (loop (tasks:server-am-i-the-server? mdb run-id)
		(+ num-tries 1)))
	res)))
	
      
	
;; register that this server may come online (first to register goes though with the process)
(define (tasks:server-set-available mdb run-id)
  (sqlite3:execute 
   mdb 
   "INSERT INTO servers (pid,hostname,port,pubport,start_time,      priority,state,mt_version,heartbeat,   interface,transport,run_id)
                   VALUES(?, ?,       ?,   ?, strftime('%s','now'), ?,       ?,    ?,-1,?,        ?,        ?);"
   (current-process-id)       ;; pid
   (get-host-name)            ;; hostname
   -1                         ;; port
   -1                         ;; pubport
   (random 1000)              ;; priority (used a tiebreaker on get-available)
   "available"                ;; state
   (common:version-signature) ;; mt_version
   -1                         ;; interface
   "http"                     ;; transport
   run-id
   ))

(define (tasks:num-in-available-state mdb run-id)
  (let ((res 0))
    (sqlite3:for-each-row
     (lambda (num-in-queue)
       (set! res num-in-queue))
     mdb
     "SELECT count(id) FROM servers WHERE run_id=?;"
     run-id)
    res))

(define (tasks:server-clean-out-old-records-for-run-id mdb run-id)
  (sqlite3:execute mdb "DELETE FROM servers WHERE state in ('available','shutting-down') AND (strftime('%s','now') - start_time) > 10 AND run_id=?;" run-id)
  (if (server:check-if-running run-id)
      (sqlite3:execute mdb "DELETE FROM servers WHERE run_id=?;" run-id)))

(define (tasks:server-force-clean-running-records-for-run-id mdb run-id)
  (sqlite3:execute mdb "DELETE FROM servers WHERE state = 'running' AND run_id=?;" run-id))

(define (tasks:server-set-state! mdb server-id state)