Megatest

Diff
Login

Differences From Artifact [c9fadd13bc]:

To Artifact [db6ec670d8]:


37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
	(begin
	  (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS tasks_queue (id INTEGER PRIMARY KEY,
                                action TEXT DEFAULT '',
                                owner TEXT,
                                state TEXT DEFAULT 'new',
                                target TEXT DEFAULT '',
                                name TEXT DEFAULT '',
                                test TEXT DEFAULT '',
                                item TEXT DEFAULT '',
                                keylock TEXT,
                                params TEXT,
                                creation_time TIMESTAMP,
                                execution_time TIMESTAMP);")
	  (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS monitors (id INTEGER PRIMARY KEY,
                                pid INTEGER,
                                start_time TIMESTAMP,







|
<







37
38
39
40
41
42
43
44

45
46
47
48
49
50
51
	(begin
	  (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS tasks_queue (id INTEGER PRIMARY KEY,
                                action TEXT DEFAULT '',
                                owner TEXT,
                                state TEXT DEFAULT 'new',
                                target TEXT DEFAULT '',
                                name TEXT DEFAULT '',
                                testpatt TEXT DEFAULT '',

                                keylock TEXT,
                                params TEXT,
                                creation_time TIMESTAMP,
                                execution_time TIMESTAMP);")
	  (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS monitors (id INTEGER PRIMARY KEY,
                                pid INTEGER,
                                start_time TIMESTAMP,
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
                                  pubport INTEGER,
                                  start_time TIMESTAMP,
                                  priority INTEGER,
                                  state TEXT,
                                  mt_version TEXT,
                                  heartbeat TIMESTAMP,
                                  transport TEXT,
                                  run_id INTEGER,
                               CONSTRAINT servers_constraint UNIQUE (pid,hostname,port));")
	  (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS clients (id INTEGER PRIMARY KEY,
                                  server_id INTEGER,
                                  pid INTEGER,
                                  hostname TEXT,
                                  cmdline TEXT,
                                  login_time TIMESTAMP,
                                  logout_time TIMESTAMP DEFAULT -1,







|
|







61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
                                  pubport INTEGER,
                                  start_time TIMESTAMP,
                                  priority INTEGER,
                                  state TEXT,
                                  mt_version TEXT,
                                  heartbeat TIMESTAMP,
                                  transport TEXT,
                                  run_id INTEGER);")
;;                               CONSTRAINT servers_constraint UNIQUE (pid,hostname,port));")
	  (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS clients (id INTEGER PRIMARY KEY,
                                  server_id INTEGER,
                                  pid INTEGER,
                                  hostname TEXT,
                                  cmdline TEXT,
                                  login_time TIMESTAMP,
                                  logout_time TIMESTAMP DEFAULT -1,
89
90
91
92
93
94
95
96
97






98


99
100
101
102

103




104
105
106
107
108
109
110
111
112
113
114
115
116

117

118
119
120
121
122
123
124
125
126
127

128
129
130

131
132
133
134
















135



136
137


138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
(define (tasks:hostinfo-get-interface   vec)    (vector-ref  vec 1))
(define (tasks:hostinfo-get-port        vec)    (vector-ref  vec 2))
(define (tasks:hostinfo-get-pubport     vec)    (vector-ref  vec 3))
(define (tasks:hostinfo-get-transport   vec)    (vector-ref  vec 4))
(define (tasks:hostinfo-get-pid         vec)    (vector-ref  vec 5))
(define (tasks:hostinfo-get-hostname    vec)    (vector-ref  vec 6))

;; state: 'live, 'shutting-down, 'dead
(define (tasks:server-register mdb pid interface port priority state transport #!key (pubport -1))






  (debug:print-info 11 "tasks:server-register " pid " " interface " " port " " priority " " state)


  (sqlite3:execute 
   mdb 
   "INSERT OR REPLACE INTO servers (pid,hostname,port,pubport,start_time,priority,state,mt_version,heartbeat,interface,transport)
                             VALUES(?,  ?,       ?,   ?,  strftime('%s','now'), ?, ?, ?, strftime('%s','now'),?,?);"

   pid (get-host-name) port pubport priority (conc state) 




   (common:version-signature)
   interface 
   (conc transport))
  (vector 
   (tasks:server-get-server-id mdb (get-host-name) interface port pid)
   interface
   port
   pubport
   transport
   ))

;; NB// two servers with same pid on different hosts will be removed from the list if pid: is used!
(define (tasks:server-deregister mdb hostname #!key (port #f)(pid #f)(action 'delete))

  (debug:print-info 11 "server-deregister " hostname ", port " port ", pid " pid)

  (if *db-write-access*
      (if pid
	  (case action
	    ((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid))
	    (else    (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE pid=?;" pid)))
	  (if port
	      (case action
	    ((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE (interface=? or hostname=?) AND port=?;" hostname hostname port))
	    (else    (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE (interface=? or hostname=?) AND port=?;" hostname hostname port)))
	      (debug:print 0 "ERROR: tasks:server-deregister called with neither pid nor port specified")))))


(define (tasks:server-deregister-self mdb hostname)
  (tasks:server-deregister mdb hostname pid: (current-process-id)))


;; need a simple call for robustly removing records given host and port
(define (tasks:server-delete mdb hostname port)
  (tasks:server-deregister mdb hostname port: port action: 'delete))




















(define (tasks:server-get-server-id mdb hostname iface port pid)
  (debug:print-info 12 "tasks:server-get-server-id " mdb " " hostname " " iface " " port " " pid)


  (let ((res #f))
    (sqlite3:for-each-row
     (lambda (id)
       (set! res id))
     mdb
     (cond
      ((and hostname  pid)  "SELECT id FROM servers WHERE hostname=?  AND pid=?;")
      ((and iface     port) "SELECT id FROM servers WHERE interface=? AND port=?;")
      ((and hostname  port) "SELECT id FROM servers WHERE hostname=?  AND port=?;")
      (else
       (begin
	 (debug:print 0 "ERROR: tasks:server-get-server-id needs (hostname and pid) OR (iface and port) OR (hostname and port)")
	 "SELECT id FROM servers WHERE pid=-999;")))
     (if hostname hostname iface)(if pid pid port))
    res))

(define (tasks:server-update-heartbeat mdb server-id)
  (debug:print-info 1 "Heart beat update of server id=" server-id)
  (handle-exceptions
   exn
   (begin
     (debug:print 0 "WARNING: probable timeout on monitor.db access")







<
|
>
>
>
>
>
>
|
>
>


|
|
>
|
>
>
>
>
|
|
|
<
<
<
|
<
<


<
|
>
|
>
|
|
<
|
|
|
|
|
|
|
>

|
<
>

<
|
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
>
>
>
|
<
>
>
|

|
|

<
|
<
<
<
|
<
<
<
|







88
89
90
91
92
93
94

95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117



118


119
120

121
122
123
124
125
126

127
128
129
130
131
132
133
134
135
136

137
138

139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161

162
163
164
165
166
167
168

169



170



171
172
173
174
175
176
177
178
(define (tasks:hostinfo-get-interface   vec)    (vector-ref  vec 1))
(define (tasks:hostinfo-get-port        vec)    (vector-ref  vec 2))
(define (tasks:hostinfo-get-pubport     vec)    (vector-ref  vec 3))
(define (tasks:hostinfo-get-transport   vec)    (vector-ref  vec 4))
(define (tasks:hostinfo-get-pid         vec)    (vector-ref  vec 5))
(define (tasks:hostinfo-get-hostname    vec)    (vector-ref  vec 6))


(define (tasks:server-lock-slot mdb run-id)
  (let ((res '())
	(best #f))
    (tasks:server-clean-out-old-records-for-run-id mdb run-id)
    (tasks:server-set-available mdb run-id)
    (thread-sleep! 2) ;; Try removing this. It may not be needed.
    (tasks:server-am-i-the-server? mdb run-id)))
	
;; register that this server may come online (first to register goes though with the process)
(define (tasks:server-set-available mdb run-id)
  (sqlite3:execute 
   mdb 
   "INSERT INTO servers (pid,hostname,port,pubport,start_time,      priority,state,mt_version,heartbeat,   interface,transport,run_id)
                   VALUES(?, ?,       ?,   ?, strftime('%s','now'), ?,       ?,    ?, strftime('%s','now'),?,        ?,        ?);"
   (current-process-id)       ;; pid
   (get-host-name)            ;; hostname
   -1                         ;; port
   -1                         ;; pubport
   (random 1000)              ;; priority (used a tiebreaker on get-available)
   "available"                ;; state
   (common:version-signature) ;; mt_version
   -1                         ;; interface
   "http"                     ;; transport



   run-id


   ))


(define (tasks:server-clean-out-old-records-for-run-id mdb run-id)
  (sqlite3:execute mdb "DELETE FROM servers WHERE state in ('available','shutting-down') AND (strftime('%s','now') - start_time) > 30 AND run_id=?;" run-id)
  (sqlite3:execute mdb "DELETE FROM servers WHERE state='running' AND (strftime('%s','now') - heartbeat)  > 10 AND run_id=?;" run-id)
  )
  


(define (tasks:server-set-state! mdb server-id state)
  (sqlite3:execute mdb "UPDATE servers SET state=? WHERE id=?;" state server-id))

(define (tasks:server-delete-record! mdb server-id)
  (sqlite3:execute mdb "DELETE FROM servers WHERE id=?;" server-id))

(define (tasks:server-delete-records-for-this-pid mdb)
  (sqlite3:execute mdb "DELETE FROM servers WHERE hostname=? AND pid=?;" (get-host-name) (current-process-id)))

(define (tasks:server-set-interface-port mdb server-id interface port)

  (sqlite3:execute mdb "UPDATE servers SET interface=?,port=? WHERE id=?;" interface port server-id))


(define (tasks:server-am-i-the-server? mdb run-id)
  (let* ((all    (tasks:server-get-servers-vying-for-run-id mdb run-id))
	 (first  (if (null? all)
		     (begin (debug:print 0 "ERROR: no servers listed, should be at least one by now.") 
			    (sqlite3:finalize! mdb)
			    (exit 1))
		     (car (db:get-rows all))))
	 (header   (db:get-header all))
	 (id       (db:get-value-by-header first header "id"))
	 (hostname (db:get-value-by-header first header "hostname"))
	 (pid      (db:get-value-by-header first header "pid"))
	 (priority (db:get-value-by-header first header "priority")))
    (debug:print 0 "INFO: am-i-the-server got record " first)
    ;; for now a basic check. add tiebreaking by priority later
    (if (and (equal? hostname (get-host-name))
	     (equal? pid      (current-process-id)))
	id
	#f)))
	     
;; Use: (db:get-value-by-header (car (db:get-rows dat)) (db:get-header dat) "fieldname")
;;  to extract info from the structure returned
;;
(define (tasks:server-get-servers-vying-for-run-id mdb run-id)

   (let* ((header (list "id" "hostname" "pid" "interface" "port" "pubport" "state" "run_id" "priority" "start_time"))
	  (selstr (string-intersperse header ","))
	  (res    '()))
    (sqlite3:for-each-row
     (lambda (a . b)
       (set! res (cons (apply vector a b) res)))
     mdb

     (conc "SELECT " selstr " FROM servers WHERE run_id=? ORDER BY start_time DESC;")



     run-id)



    (vector header res)))

(define (tasks:server-update-heartbeat mdb server-id)
  (debug:print-info 1 "Heart beat update of server id=" server-id)
  (handle-exceptions
   exn
   (begin
     (debug:print 0 "WARNING: probable timeout on monitor.db access")
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224

225
226
227
228
229
230
231
232
233
234

235
236
237
238
239
240
241
242
243
244

245
246
247
248

249
250
251
252
253
254
255



256
257
258
259
260
261
262
	 (heartbeat-delta 99e9))
    (sqlite3:for-each-row
     (lambda (delta)
       (set! heartbeat-delta delta))
     mdb "SELECT strftime('%s','now')-heartbeat FROM servers WHERE id=?;" server-id)
    (< heartbeat-delta 10)))

(define (tasks:client-register mdb pid hostname cmdline)
  (sqlite3:execute
   mdb
   "INSERT OR REPLACE INTO clients (server_id,pid,hostname,cmdline,login_time) VALUES(?,?,?,?,strftime('%s','now'));")
  (tasks:server-get-server-id mdb hostname #f #f pid)
  pid hostname cmdline)

(define (tasks:client-logout mdb pid hostname cmdline)
  (sqlite3:execute
   mdb
   "UPDATE clients SET logout_time=strftime('%s','now') WHERE pid=? AND hostname=? AND cmdline=?;"
   pid hostname cmdline))

(define (tasks:get-logged-in-clients mdb server-id)
  (let ((res '()))
    (sqlite3:for-each-row 
     (lambda (id server-id pid hostname cmdline login-time logout-time)
       (set! res (cons (vector id server-id pid hostname cmdline login-time lougout-time) res)))
     mdb
     "SELECT id,server_id,pid,hostname,cmdline,login_time,logout_time FROM clients WHERE server_id=?;"
     server-id)))

(define (tasks:have-clients? mdb server-id)
  (null? (tasks:get-logged-in-clients mdb server-id)))

;; ping each server in the db and return first found that responds. 
;; remove any others. will not necessarily remove all!
(define (tasks:get-best-server mdb)
  (let ((res '())
	(best #f)
	(transport (if (and *transport-type*
			    (not (eq? *transport-type* 'fs)))
		       (conc *transport-type*)
		       "%")))
    (sqlite3:for-each-row
     (lambda (id interface port pubport transport pid hostname)
       (set! res (cons (vector id interface port pubport transport pid hostname) res))
       ;;(debug:print-info 2 "Found existing server " hostname ":" port " registered in db"))
       )
     mdb
     
     "SELECT id,interface,port,pubport,transport,pid,hostname FROM servers
          WHERE strftime('%s','now')-heartbeat < 10 
          AND mt_version=? AND transport LIKE ? 
          ORDER BY start_time DESC LIMIT 1;" (common:version-signature) transport)
    ;; for now we are keeping only one server registered in the db, return #f or first server found
    (if (null? res) #f (car res))))

;; BUG: This logic is probably needed unless methodology changes completely...

;;
;;     (if (null? res) #f
;; 	(let loop ((hed (car res))
;; 		   (tal (cdr res)))
;; 	  ;; (print "hed=" hed ", tal=" tal)
;; 	  (let* ((host     (list-ref hed 0))
;; 		 (iface    (list-ref hed 1))
;; 		 (port     (list-ref hed 2))
;; 		 (pid      (list-ref hed 4))
;; 		 (alive    (open-run-close tasks:server-alive? tasks:open-db #f hostname: host port: port)))

;; 	    (if alive
;; 		(begin
;; 		  (debug:print-info 2 "Found an existing, alive, server " host ", " port ".")
;; 		  (list host iface port))
;; 		(begin
;; 		  (debug:print-info 1 "Marking " host ":" port " as dead in server registry.")
;; 		  (if port
;; 		      (open-run-close tasks:server-deregister tasks:open-db host port: port)
;; 		      (open-run-close tasks:server-deregister tasks:open-db host pid:  pid))
;; 		  (if (null? tal)

;; 		      #f
;; 		      (loop (car tal)(cdr tal))))))))))

(define (tasks:remove-server-records mdb)

  (sqlite3:execute mdb "DELETE FROM servers;"))

(define (tasks:mark-server hostname port pid state transport)
  (if port
      (open-run-close tasks:server-deregister tasks:open-db hostname port: port)
      (open-run-close tasks:server-deregister tasks:open-db hostname pid:  pid)))





(define (tasks:kill-server status hostname port pid transport)
  (debug:print-info 1 "Removing defunct server record for " hostname ":" port)
  (if port
      (open-run-close tasks:server-deregister tasks:open-db hostname port: port)
      (open-run-close tasks:server-deregister tasks:open-db hostname pid:  pid))
  (if status ;; #t means alive







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
|
|
|
<
<
<
<


|
<
<

<


|
|
<
|

<
>
|
|
<
<
<
|
<
|
|
<
>
|
|
<
<
<
<
<
<
<
<
>
|
<

|
>
|
|
<
<
<
<
|
>
>
>







188
189
190
191
192
193
194



























195
196
197




198
199
200


201

202
203
204
205

206
207

208
209
210



211

212
213

214
215
216








217
218

219
220
221
222
223




224
225
226
227
228
229
230
231
232
233
234
	 (heartbeat-delta 99e9))
    (sqlite3:for-each-row
     (lambda (delta)
       (set! heartbeat-delta delta))
     mdb "SELECT strftime('%s','now')-heartbeat FROM servers WHERE id=?;" server-id)
    (< heartbeat-delta 10)))




























(define (tasks:get-server mdb run-id)
  (let ((res  #f)
	(best #f))




    (sqlite3:for-each-row
     (lambda (id interface port pubport transport pid hostname)
       (set! res (vector id interface port pubport transport pid hostname)))


     mdb

     "SELECT id,interface,port,pubport,transport,pid,hostname FROM servers
          WHERE strftime('%s','now')-heartbeat < 10 
          AND mt_version=? AND run_id=? AND state='running'
          ORDER BY start_time DESC LIMIT 1;" (common:version-signature) run-id)

    res))


;; (define (tasks:get-all-servers mdb)
;;   (let ((res  '()))
;;     (sqlite3:for-each-row



;;      (lambda (id interface port pubport transport pid hostname)

;;        (set! res (cons (vector id interface port pubport transport pid hostname) res)))
;;      mdb

;;      "SELECT id,interface,port,pubport,transport,pid,hostname FROM servers
;;           WHERE strftime('%s','now')-heartbeat < 10 
;;           AND mt_version=? 








;;           ORDER BY start_time DESC;" (common:version-signature))
;;     res))


(define (tasks:get-all-servers mdb)
  (let ((res '()))
    (sqlite3:for-each-row
     (lambda (id pid hostname interface port pubport start-time priority state mt-version last-update transport)




       (set! res (cons (vector id pid hostname interface port pubport start-time priority state mt-version last-update transport) res)))
     mdb
     "SELECT id,pid,hostname,interface,port,pubport,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update,transport FROM servers ORDER BY start_time DESC;")
    res))

(define (tasks:kill-server status hostname port pid transport)
  (debug:print-info 1 "Removing defunct server record for " hostname ":" port)
  (if port
      (open-run-close tasks:server-deregister tasks:open-db hostname port: port)
      (open-run-close tasks:server-deregister tasks:open-db hostname pid:  pid))
  (if status ;; #t means alive
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
		(begin
		  (debug:print-info 1 "Sending signal/term to " pid " on " hostname)
		  (process-signal pid signal/term)  ;; local machine, send sig term
		  (thread-sleep! 5)                 ;; give it five seconds to die peacefully then do a brutal kill
		  (process-signal pid signal/kill)) 
		(debug:print 0 "WARNING: Can't kill frozen server on remote host " hostname))))))



(define (tasks:get-all-servers mdb)
  (let ((res '()))
    (sqlite3:for-each-row
     (lambda (id pid hostname interface port pubport start-time priority state mt-version last-update transport)
       (set! res (cons (vector id pid hostname interface port pubport start-time priority state mt-version last-update transport) res)))
     mdb
     "SELECT id,pid,hostname,interface,port,pubport,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update,transport FROM servers ORDER BY start_time DESC;")
    res))
       

;;======================================================================
;; Tasks and Task monitors
;;======================================================================


;;======================================================================
;; Tasks







<
<
<
<
<
<
<
<
<
<
<
<







257
258
259
260
261
262
263












264
265
266
267
268
269
270
		(begin
		  (debug:print-info 1 "Sending signal/term to " pid " on " hostname)
		  (process-signal pid signal/term)  ;; local machine, send sig term
		  (thread-sleep! 5)                 ;; give it five seconds to die peacefully then do a brutal kill
		  (process-signal pid signal/kill)) 
		(debug:print 0 "WARNING: Can't kill frozen server on remote host " hostname))))))













;;======================================================================
;; Tasks and Task monitors
;;======================================================================


;;======================================================================
;; Tasks
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
       (set! res count))
     mdb
     "SELECT count(id) FROM monitors WHERE last_update < (strftime('%s','now') - 300) AND username=?;"
     (car (user-information (current-user-id))))
    res))

;; register a task
(define (tasks:add mdb action owner target runname test item params)
  (sqlite3:execute mdb "INSERT INTO tasks_queue (action,owner,state,target,name,test,item,params,creation_time,execution_time)
                       VALUES (?,?,'new',?,?,?,?,?,strftime('%s','now'),0);" 
		   action
		   owner
		   target
		   runname
		   test
		   item







|
|







292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
       (set! res count))
     mdb
     "SELECT count(id) FROM monitors WHERE last_update < (strftime('%s','now') - 300) AND username=?;"
     (car (user-information (current-user-id))))
    res))

;; register a task
(define (tasks:add mdb action owner target runname testpatt params)
  (sqlite3:execute mdb "INSERT INTO tasks_queue (action,owner,state,target,name,testpatt,params,creation_time,execution_time)
                       VALUES (?,?,'new',?,?,?,?,?,strftime('%s','now'),0);" 
		   action
		   owner
		   target
		   runname
		   test
		   item