51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
(begin
(sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS tasks_queue (id INTEGER PRIMARY KEY,
action TEXT DEFAULT '',
owner TEXT,
state TEXT DEFAULT 'new',
target TEXT DEFAULT '',
name TEXT DEFAULT '',
test TEXT DEFAULT '',
item TEXT DEFAULT '',
keylock TEXT,
params TEXT,
creation_time TIMESTAMP,
execution_time TIMESTAMP);")
(sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS monitors (id INTEGER PRIMARY KEY,
pid INTEGER,
start_time TIMESTAMP,
|
|
<
|
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
(begin
(sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS tasks_queue (id INTEGER PRIMARY KEY,
action TEXT DEFAULT '',
owner TEXT,
state TEXT DEFAULT 'new',
target TEXT DEFAULT '',
name TEXT DEFAULT '',
testpatt TEXT DEFAULT '',
keylock TEXT,
params TEXT,
creation_time TIMESTAMP,
execution_time TIMESTAMP);")
(sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS monitors (id INTEGER PRIMARY KEY,
pid INTEGER,
start_time TIMESTAMP,
|
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
|
pubport INTEGER,
start_time TIMESTAMP,
priority INTEGER,
state TEXT,
mt_version TEXT,
heartbeat TIMESTAMP,
transport TEXT,
CONSTRAINT servers_constraint UNIQUE (pid,hostname,port));")
(sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS clients (id INTEGER PRIMARY KEY,
server_id INTEGER,
pid INTEGER,
hostname TEXT,
cmdline TEXT,
login_time TIMESTAMP,
logout_time TIMESTAMP DEFAULT -1,
CONSTRAINT clients_constraint UNIQUE (pid,hostname));")
))
mdb))
;;======================================================================
;; Server and client management
;;======================================================================
;; make-vector-record tasks hostinfo id interface port pubport transport pid hostname
(define (tasks:hostinfo-get-id vec) (vector-ref vec 0))
(define (tasks:hostinfo-get-interface vec) (vector-ref vec 1))
(define (tasks:hostinfo-get-port vec) (vector-ref vec 2))
(define (tasks:hostinfo-get-pubport vec) (vector-ref vec 3))
(define (tasks:hostinfo-get-transport vec) (vector-ref vec 4))
(define (tasks:hostinfo-get-pid vec) (vector-ref vec 5))
(define (tasks:hostinfo-get-hostname vec) (vector-ref vec 6))
;; state: 'live, 'shutting-down, 'dead
(define (tasks:server-register mdb pid interface port priority state transport #!key (pubport -1))
(debug:print-info 11 "tasks:server-register " pid " " interface " " port " " priority " " state)
(sqlite3:execute
mdb
"INSERT OR REPLACE INTO servers (pid,hostname,port,pubport,start_time,priority,state,mt_version,heartbeat,interface,transport)
VALUES(?, ?, ?, ?, strftime('%s','now'), ?, ?, ?, strftime('%s','now'),?,?);"
pid (get-host-name) port pubport priority (conc state)
(common:version-signature)
interface
(conc transport))
(vector
(tasks:server-get-server-id mdb (get-host-name) interface port pid)
interface
port
pubport
transport
))
;; NB// two servers with same pid on different hosts will be removed from the list if pid: is used!
(define (tasks:server-deregister mdb hostname #!key (port #f)(pid #f)(action 'delete))
(debug:print-info 11 "server-deregister " hostname ", port " port ", pid " pid)
(if *db-write-access*
(if pid
(case action
((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid))
(else (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE pid=?;" pid)))
(if port
(case action
((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE (interface=? or hostname=?) AND port=?;" hostname hostname port))
(else (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE (interface=? or hostname=?) AND port=?;" hostname hostname port)))
(debug:print 0 "ERROR: tasks:server-deregister called with neither pid nor port specified")))))
(define (tasks:server-deregister-self mdb hostname)
(tasks:server-deregister mdb hostname pid: (current-process-id)))
;; need a simple call for robustly removing records given host and port
(define (tasks:server-delete mdb hostname port)
(tasks:server-deregister mdb hostname port: port action: 'delete))
(define (tasks:server-get-server-id mdb hostname iface port pid)
(debug:print-info 12 "tasks:server-get-server-id " mdb " " hostname " " iface " " port " " pid)
(let ((res #f))
(sqlite3:for-each-row
(lambda (id)
(set! res id))
mdb
(cond
((and hostname pid) "SELECT id FROM servers WHERE hostname=? AND pid=?;")
((and iface port) "SELECT id FROM servers WHERE interface=? AND port=?;")
((and hostname port) "SELECT id FROM servers WHERE hostname=? AND port=?;")
(else
(begin
(debug:print 0 "ERROR: tasks:server-get-server-id needs (hostname and pid) OR (iface and port) OR (hostname and port)")
"SELECT id FROM servers WHERE pid=-999;")))
(if hostname hostname iface)(if pid pid port))
res))
(define (tasks:server-update-heartbeat mdb server-id)
(debug:print-info 1 "Heart beat update of server id=" server-id)
(handle-exceptions
exn
(begin
(debug:print 0 "WARNING: probable timeout on monitor.db access")
(thread-sleep! 1)
(tasks:server-update-heartbeat mdb server-id))
(sqlite3:execute mdb "UPDATE servers SET heartbeat=strftime('%s','now') WHERE id=?;" server-id)))
;; alive servers keep the heartbeat field upto date with seconds every 6 or so seconds
(define (tasks:server-alive? mdb server-id #!key (iface #f)(hostname #f)(port #f)(pid #f))
(let* ((server-id (if server-id
server-id
(tasks:server-get-server-id mdb hostname iface port pid)))
(heartbeat-delta 99e9))
(sqlite3:for-each-row
(lambda (delta)
(set! heartbeat-delta delta))
mdb "SELECT strftime('%s','now')-heartbeat FROM servers WHERE id=?;" server-id)
(< heartbeat-delta 10)))
(define (tasks:client-register mdb pid hostname cmdline)
(sqlite3:execute
mdb
"INSERT OR REPLACE INTO clients (server_id,pid,hostname,cmdline,login_time) VALUES(?,?,?,?,strftime('%s','now'));")
(tasks:server-get-server-id mdb hostname #f #f pid)
pid hostname cmdline)
(define (tasks:client-logout mdb pid hostname cmdline)
(sqlite3:execute
mdb
"UPDATE clients SET logout_time=strftime('%s','now') WHERE pid=? AND hostname=? AND cmdline=?;"
pid hostname cmdline))
(define (tasks:get-logged-in-clients mdb server-id)
(let ((res '()))
(sqlite3:for-each-row
(lambda (id server-id pid hostname cmdline login-time logout-time)
(set! res (cons (vector id server-id pid hostname cmdline login-time lougout-time) res)))
mdb
"SELECT id,server_id,pid,hostname,cmdline,login_time,logout_time FROM clients WHERE server_id=?;"
server-id)))
(define (tasks:have-clients? mdb server-id)
(null? (tasks:get-logged-in-clients mdb server-id)))
;; ping each server in the db and return first found that responds.
;; remove any others. will not necessarily remove all!
(define (tasks:get-best-server mdb)
(let ((res '())
(best #f)
(transport (if (and *transport-type*
(not (eq? *transport-type* 'fs)))
(conc *transport-type*)
"%")))
(sqlite3:for-each-row
(lambda (id interface port pubport transport pid hostname)
(set! res (cons (vector id interface port pubport transport pid hostname) res))
;;(debug:print-info 2 "Found existing server " hostname ":" port " registered in db"))
)
mdb
"SELECT id,interface,port,pubport,transport,pid,hostname FROM servers
WHERE strftime('%s','now')-heartbeat < 10
AND mt_version=? AND transport LIKE ?
ORDER BY start_time DESC LIMIT 1;" (common:version-signature) transport)
;; for now we are keeping only one server registered in the db, return #f or first server found
(if (null? res) #f (car res))))
;; BUG: This logic is probably needed unless methodology changes completely...
;;
;; (if (null? res) #f
;; (let loop ((hed (car res))
;; (tal (cdr res)))
;; ;; (print "hed=" hed ", tal=" tal)
;; (let* ((host (list-ref hed 0))
;; (iface (list-ref hed 1))
;; (port (list-ref hed 2))
;; (pid (list-ref hed 4))
;; (alive (open-run-close tasks:server-alive? tasks:open-db #f hostname: host port: port)))
;; (if alive
;; (begin
;; (debug:print-info 2 "Found an existing, alive, server " host ", " port ".")
;; (list host iface port))
;; (begin
;; (debug:print-info 1 "Marking " host ":" port " as dead in server registry.")
;; (if port
;; (open-run-close tasks:server-deregister tasks:open-db host port: port)
;; (open-run-close tasks:server-deregister tasks:open-db host pid: pid))
;; (if (null? tal)
;; #f
;; (loop (car tal)(cdr tal))))))))))
(define (tasks:remove-server-records mdb)
(sqlite3:execute mdb "DELETE FROM servers;"))
(define (tasks:mark-server hostname port pid state transport)
(if port
(open-run-close tasks:server-deregister tasks:open-db hostname port: port)
(open-run-close tasks:server-deregister tasks:open-db hostname pid: pid)))
(define (tasks:kill-server status hostname port pid transport)
(debug:print-info 1 "Removing defunct server record for " hostname ":" port)
(if port
(open-run-close tasks:server-deregister tasks:open-db hostname port: port)
(open-run-close tasks:server-deregister tasks:open-db hostname pid: pid))
(if status ;; #t means alive
(begin
(if (equal? hostname (get-host-name))
(handle-exceptions
exn
(debug:print-info 0 "server may or may not be dead, check for megatest -server running as pid " pid "\n"
" EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
(debug:print 1 "Sending signal/term to " pid " on " hostname)
(process-signal pid signal/term)
(thread-sleep! 5) ;; give it five seconds to die peacefully then do a brutal kill
;;(process-signal pid signal/kill)
) ;; local machine, send sig term
(begin
;;(debug:print-info 1 "Stopping remote servers not yet supported."))))
(debug:print-info 1 "Telling alive server on " hostname ":" port " to commit servercide")
(let ((serverdat (list hostname port)))
(case (if (string? transport) (string->symbol transport) transport)
((http)(http-transport:client-connect hostname port))
(else (debug:print "ERROR: remote stopping servers of type " transport " not supported yet")))
(cdb:kill-server serverdat pid))))) ;; remote machine, try telling server to commit suicide
(begin
(if status
(if (equal? hostname (get-host-name))
(begin
(debug:print-info 1 "Sending signal/term to " pid " on " hostname)
(process-signal pid signal/term) ;; local machine, send sig term
(thread-sleep! 5) ;; give it five seconds to die peacefully then do a brutal kill
(process-signal pid signal/kill))
(debug:print 0 "WARNING: Can't kill frozen server on remote host " hostname))))))
(define (tasks:get-all-servers mdb)
(let ((res '()))
(sqlite3:for-each-row
(lambda (id pid hostname interface port pubport start-time priority state mt-version last-update transport)
(set! res (cons (vector id pid hostname interface port pubport start-time priority state mt-version last-update transport) res)))
mdb
"SELECT id,pid,hostname,interface,port,pubport,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update,transport FROM servers ORDER BY start_time DESC;")
res))
;;======================================================================
;; Tasks and Task monitors
;;======================================================================
;;======================================================================
|
>
|
|
>
>
>
>
>
>
>
>
<
|
>
>
>
>
>
>
>
|
>
>
|
|
>
|
>
>
>
>
|
|
|
<
<
<
|
<
<
>
>
>
>
>
>
>
>
>
|
>
>
>
>
>
>
>
>
|
>
>
>
|
>
>
>
|
>
>
>
|
>
>
>
|
>
>
>
>
>
|
>
>
>
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
|
>
|
<
>
>
>
>
>
>
>
>
|
>
>
>
>
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
>
>
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
|
>
>
>
|
>
>
>
>
>
>
>
>
|
<
|
<
<
<
<
<
<
<
|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
|
>
|
|
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
|
pubport INTEGER,
start_time TIMESTAMP,
priority INTEGER,
state TEXT,
mt_version TEXT,
heartbeat TIMESTAMP,
transport TEXT,
run_id INTEGER);")
;; CONSTRAINT servers_constraint UNIQUE (pid,hostname,port));")
(sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS clients (id INTEGER PRIMARY KEY,
server_id INTEGER,
pid INTEGER,
hostname TEXT,
cmdline TEXT,
login_time TIMESTAMP,
logout_time TIMESTAMP DEFAULT -1,
CONSTRAINT clients_constraint UNIQUE (pid,hostname));")
))
mdb))
(define (tasks:get-db)
(if *task-db*
(vector-ref *task-db* 0)
(let ((db (tasks:open-db))
(pth (tasks:get-task-db-path)))
(set! *task-db* (vector db pth))
db)))
;;======================================================================
;; Server and client management
;;======================================================================
;; make-vector-record tasks hostinfo id interface port pubport transport pid hostname
(define (tasks:hostinfo-get-id vec) (vector-ref vec 0))
(define (tasks:hostinfo-get-interface vec) (vector-ref vec 1))
(define (tasks:hostinfo-get-port vec) (vector-ref vec 2))
(define (tasks:hostinfo-get-pubport vec) (vector-ref vec 3))
(define (tasks:hostinfo-get-transport vec) (vector-ref vec 4))
(define (tasks:hostinfo-get-pid vec) (vector-ref vec 5))
(define (tasks:hostinfo-get-hostname vec) (vector-ref vec 6))
(define (tasks:server-lock-slot mdb run-id)
(tasks:server-clean-out-old-records-for-run-id mdb run-id " tasks:server-lock-slot")
(if (< (tasks:num-in-available-state mdb run-id) 4)
(begin
(tasks:server-set-available mdb run-id)
(thread-sleep! 2) ;; Try removing this. It may not be needed.
(tasks:server-am-i-the-server? mdb run-id))
#f))
;; register that this server may come online (first to register goes though with the process)
(define (tasks:server-set-available mdb run-id)
(sqlite3:execute
mdb
"INSERT INTO servers (pid,hostname,port,pubport,start_time, priority,state,mt_version,heartbeat, interface,transport,run_id)
VALUES(?, ?, ?, ?, strftime('%s','now'), ?, ?, ?,-1,?, ?, ?);"
(current-process-id) ;; pid
(get-host-name) ;; hostname
-1 ;; port
-1 ;; pubport
(random 1000) ;; priority (used a tiebreaker on get-available)
"available" ;; state
(common:version-signature) ;; mt_version
-1 ;; interface
"http" ;; transport
run-id
))
(define (tasks:num-in-available-state mdb run-id)
(let ((res 0))
(sqlite3:for-each-row
(lambda (num-in-queue)
(set! res num-in-queue))
mdb
"SELECT count(id) FROM servers WHERE run_id=? AND state = 'available' AND (strftime('%s','now') - start_time) < 30 ;"
run-id)
res))
(define (tasks:num-servers-non-zero-running mdb)
(let ((res 0))
(sqlite3:for-each-row
(lambda (num-running)
(set! res num-running))
mdb
"SELECT count(id) FROM servers WHERE run_id != 0 AND state = 'running';")
res))
(define (tasks:server-clean-out-old-records-for-run-id mdb run-id tag)
(sqlite3:execute mdb "UPDATE servers SET state=?,heartbeat=strftime('%s','now') WHERE state in ('available','dbprep','shutting-down') AND (strftime('%s','now') - start_time) > 50 AND run_id=?;"
(conc "defunct" tag) run-id))
(define (tasks:server-force-clean-running-records-for-run-id mdb run-id tag)
(sqlite3:execute mdb "UPDATE servers SET state=?,heartbeat=strftime('%s','now') WHERE state = 'running' AND run_id=?;"
(conc "defunct" tag) run-id))
(define (tasks:server-force-clean-run-record mdb run-id iface port tag)
(sqlite3:execute mdb "UPDATE servers SET state=?,heartbeat=strftime('%s','now') WHERE state = 'running' AND run_id=? AND interface=? AND port=?;"
(conc "defunct" tag) run-id iface port))
(define (tasks:server-delete-records-for-this-pid mdb tag)
(sqlite3:execute mdb "UPDATE servers SET state=?,heartbeat=strftime('%s','now') WHERE hostname=? AND pid=?;"
(conc "defunct" tag) (get-host-name) (current-process-id)))
(define (tasks:server-delete-record mdb server-id tag)
(sqlite3:execute mdb "UPDATE servers SET state=?,heartbeat=strftime('%s','now') WHERE id=?;"
(conc "defunct" tag) server-id)
;; use this opportuntity to clean out records over one month old or over 10 minutes old with port = -1 (i.e. a never used placeholder)
(sqlite3:execute mdb "DELETE FROM servers WHERE state not in ('running','shutting-down','dbprep') AND (strftime('%s','now') - start_time) > 2628000;")
(sqlite3:execute mdb "DELETE FROM servers WHERE state like 'defunct%' AND port=-1 AND (strftime('%s','now') - start_time) > 600;")
)
(define (tasks:server-set-state! mdb server-id state)
(sqlite3:execute mdb "UPDATE servers SET state=?,heartbeat=strftime('%s','now') WHERE id=?;" state server-id))
(define (tasks:server-set-interface-port mdb server-id interface port)
(sqlite3:execute mdb "UPDATE servers SET interface=?,port=?,heartbeat=strftime('%s','now') WHERE id=?;" interface port server-id))
;; Get random port not used in long time
;;
(define (tasks:server-get-next-port mdb)
(let* ((lownum 30000)
(highnum 64000)
(used-ports '())
(get-rand-port (lambda ()
(+ lownum (random (- highnum lownum)))))
(port-param (if (and (args:get-arg "-port")
(string->number (args:get-arg "-port")))
(string->number (args:get-arg "-port"))
#f))
;; (config-port (if (and (config-lookup *configdat* "server" "port")
;; (string->number (config-lookup *configdat* "server" "port")))
;; (string->number (config-lookup *configdat* "server" "port"))
;; #f))
)
(sqlite3:for-each-row
(lambda (port)
(set! used-ports (cons port used-ports)))
mdb
"SELECT port FROM servers;")
(cond
((and port-param res) (if (> res port-param) res port-param))
(port-param port-param)
;; ((and config-port res) (if (> res config-port) res config-port))
;; (config-port config-port)
(else
(let loop ((port (get-rand-port))
(remtries 100))
(if (member port used-ports)
(if (> remtries 0)
(loop (get-rand-port)(- remtries 1))
(get-rand-port))
port))))))
(define (tasks:server-am-i-the-server? mdb run-id)
(let* ((all (tasks:server-get-servers-vying-for-run-id mdb run-id))
(first (if (null? all)
(begin (debug:print 0 "ERROR: no servers listed, should be at least one by now.")
(sqlite3:finalize! mdb)
(exit 1))
(car (db:get-rows all))))
(header (db:get-header all))
(id (db:get-value-by-header first header "id"))
(hostname (db:get-value-by-header first header "hostname"))
(pid (db:get-value-by-header first header "pid"))
(priority (db:get-value-by-header first header "priority")))
(debug:print 0 "INFO: am-i-the-server got record " first)
;; for now a basic check. add tiebreaking by priority later
(if (and (equal? hostname (get-host-name))
(equal? pid (current-process-id)))
id
#f)))
;; Use: (db:get-value-by-header (car (db:get-rows dat)) (db:get-header dat) "fieldname")
;; to extract info from the structure returned
;;
(define (tasks:server-get-servers-vying-for-run-id mdb run-id)
(let* ((header (list "id" "hostname" "pid" "interface" "port" "pubport" "state" "run_id" "priority" "start_time"))
(selstr (string-intersperse header ","))
(res '()))
(sqlite3:for-each-row
(lambda (a . b)
(set! res (cons (apply vector a b) res)))
mdb
(conc "SELECT " selstr " FROM servers WHERE run_id=? AND state in ('available','running','dbprep') ORDER BY start_time DESC;")
run-id)
(vector header res)))
(define (tasks:get-server mdb run-id)
(let ((res #f)
(best #f))
(sqlite3:for-each-row
(lambda (id interface port pubport transport pid hostname)
(set! res (vector id interface port pubport transport pid hostname)))
mdb
;; removed:
;; strftime('%s','now')-heartbeat < 10 AND mt_version = ?
"SELECT id,interface,port,pubport,transport,pid,hostname FROM servers
WHERE run_id=? AND state='running'
ORDER BY start_time DESC LIMIT 1;" run-id) ;; (common:version-signature) run-id)
res))
(define (tasks:server-running-or-starting? mdb run-id)
(let ((res #f))
(sqlite3:for-each-row
(lambda (id)
(set! res id))
mdb ;; NEEDS dbprep ADDED
"SELECT id FROM servers WHERE run_id=? AND (state = 'running' OR (state = 'dbprep' AND (strftime('%s','now') - start_time) < 60));" run-id)
res))
(define (tasks:get-all-servers mdb)
(let ((res '()))
(sqlite3:for-each-row
(lambda (id pid hostname interface port pubport start-time priority state mt-version last-update transport run-id)
;; 0 1 2 3 4 5 6 7 8 9 10 11 12
(set! res (cons (vector id pid hostname interface port pubport start-time priority state mt-version last-update transport run-id) res)))
mdb
"SELECT id,pid,hostname,interface,port,pubport,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update,transport,run_id FROM servers WHERE state NOT LIKE 'defunct%' ORDER BY start_time DESC;")
res))
;; no elegance here ...
;;
(define (tasks:kill-server hostname pid)
(debug:print-info 0 "Attempting to kill server process " pid " on host " hostname)
(setenv "TARGETHOST" hostname)
(setenv "TARGETHOST_LOGF" "server-kills.log")
(system (conc "nbfake kill " pid))
(unsetenv "TARGETHOST_LOGF")
(unsetenv "TARGETHOST"))
;; look up a server by run-id and send it a kill, also delete the record for that server
;;
(define (tasks:kill-server-run-id run-id)
(let* ((tdb (tasks:open-db))
(sdat (tasks:get-server mdb run-id)))
(if sdat
(let ((hostname (vector-ref sdat 6))
(pid (vector-ref sdat 5)))
(debug:print-info 0 "Killing server for run-id " run-id " on host " hostname " with pid " pid)
(tasks:kill-server hostname pid)
(tasks:server-delete-record mdb server-id tag) )
(debug:print-info 0 "No server found for run-id " run-id ", nothing to kill"))))
;; (if status ;; #t means alive
;; (begin
;; (if (equal? hostname (get-host-name))
;; (handle-exceptions
;; exn
;; (debug:print-info 0 "server may or may not be dead, check for megatest -server running as pid " pid "\n"
;; " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
;; (debug:print 1 "Sending signal/term to " pid " on " hostname)
;; (process-signal pid signal/term)
;; (thread-sleep! 5) ;; give it five seconds to die peacefully then do a brutal kill
;; ;;(process-signal pid signal/kill)
;; ) ;; local machine, send sig term
;; (begin
;; ;;(debug:print-info 1 "Stopping remote servers not yet supported."))))
;; (debug:print-info 1 "Telling alive server on " hostname ":" port " to commit servercide")
;; (let ((serverdat (list hostname port)))
;; (hash-table-set! *runremote* run-id (http-transport:client-connect hostname port))
;; (cdb:kill-server serverdat pid))))) ;; remote machine, try telling server to commit suicide
;; (begin
;; (if status
;; (if (equal? hostname (get-host-name))
;; (begin
;; (debug:print-info 1 "Sending signal/term to " pid " on " hostname)
;; (process-signal pid signal/term) ;; local machine, send sig term
;; (thread-sleep! 5) ;; give it five seconds to die peacefully then do a brutal kill
;; (process-signal pid signal/kill))
;; (debug:print 0 "WARNING: Can't kill frozen server on remote host " hostname))))))
;;======================================================================
;; Tasks and Task monitors
;;======================================================================
;;======================================================================
|
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
|
(set! res count))
mdb
"SELECT count(id) FROM monitors WHERE last_update < (strftime('%s','now') - 300) AND username=?;"
(car (user-information (current-user-id))))
res))
;; register a task
(define (tasks:add mdb action owner target runname test item params)
(sqlite3:execute mdb "INSERT INTO tasks_queue (action,owner,state,target,name,test,item,params,creation_time,execution_time)
VALUES (?,?,'new',?,?,?,?,?,strftime('%s','now'),0);"
action
owner
target
runname
test
item
(if params params "")))
(define (keys:key-vals-hash->target keys key-params)
(let ((tmp (hash-table-ref/default key-params (vector-ref (car keys) 0) "")))
(if (> (length keys) 1)
(for-each (lambda (key)
(set! tmp (conc tmp "/" (hash-table-ref/default key-params (vector-ref key 0) ""))))
|
|
|
|
|
<
|
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
|
(set! res count))
mdb
"SELECT count(id) FROM monitors WHERE last_update < (strftime('%s','now') - 300) AND username=?;"
(car (user-information (current-user-id))))
res))
;; register a task
(define (tasks:add mdb action owner target runname testpatt params)
(sqlite3:execute mdb "INSERT INTO tasks_queue (action,owner,state,target,name,testpatt,params,creation_time,execution_time)
VALUES (?,?,'new',?,?,?,?,strftime('%s','now'),0);"
action
owner
target
runname
testpatt
(if params params "")))
(define (keys:key-vals-hash->target keys key-params)
(let ((tmp (hash-table-ref/default key-params (vector-ref (car keys) 0) "")))
(if (> (length keys) 1)
(for-each (lambda (key)
(set! tmp (conc tmp "/" (hash-table-ref/default key-params (vector-ref key 0) ""))))
|