17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
|
(declare (uses common))
(include "task_records.scm")
;;======================================================================
;; Tasks db
;;======================================================================
;; If file exists AND
;; file readable
;; ==> open it
;; If file exists AND
;; file NOT readable
;; ==> open in-mem version
;; If file NOT exists
;; ==> open in-mem version
;;
(define (tasks:open-db)
(let* ((dbpath (conc *toppath* "/monitor.db"))
(exists (file-exists? dbpath))
(write-access (file-write-access? dbpath))
(mdb (cond
((file-write-access? *toppath*)(sqlite3:open-database dbpath))
((file-read-access? dbpath) (sqlite3:open-database dbpath))
(else (sqlite3:open-database ":memory:")))) ;; (never-give-up-open-db dbpath))
(handler (make-busy-timeout 36000)))
(if (and exists
(not write-access))
(set! *db-write-access* write-access)) ;; only unset so other db's also can use this control
(sqlite3:set-busy-handler! mdb handler)
(sqlite3:execute mdb (conc "PRAGMA synchronous = 0;"))
(if (or (and (not exists)
(file-write-access? *toppath*))
(not (file-read-access? dbpath)))
(begin
(sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS tasks_queue (id INTEGER PRIMARY KEY,
action TEXT DEFAULT '',
owner TEXT,
state TEXT DEFAULT 'new',
target TEXT DEFAULT '',
name TEXT DEFAULT '',
test TEXT DEFAULT '',
item TEXT DEFAULT '',
keylock TEXT,
params TEXT,
creation_time TIMESTAMP,
execution_time TIMESTAMP);")
(sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS monitors (id INTEGER PRIMARY KEY,
pid INTEGER,
start_time TIMESTAMP,
last_update TIMESTAMP,
hostname TEXT,
username TEXT,
CONSTRAINT monitors_constraint UNIQUE (pid,hostname));")
(sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS servers (id INTEGER PRIMARY KEY,
pid INTEGER,
interface TEXT,
hostname TEXT,
port INTEGER,
pubport INTEGER,
start_time TIMESTAMP,
priority INTEGER,
state TEXT,
mt_version TEXT,
heartbeat TIMESTAMP,
transport TEXT,
CONSTRAINT servers_constraint UNIQUE (pid,hostname,port));")
(sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS clients (id INTEGER PRIMARY KEY,
server_id INTEGER,
pid INTEGER,
hostname TEXT,
cmdline TEXT,
login_time TIMESTAMP,
logout_time TIMESTAMP DEFAULT -1,
CONSTRAINT clients_constraint UNIQUE (pid,hostname));")
))
mdb))
;;======================================================================
;; Server and client management
;;======================================================================
;; make-vector-record tasks hostinfo id interface port pubport transport pid hostname
(define (tasks:hostinfo-get-id vec) (vector-ref vec 0))
(define (tasks:hostinfo-get-interface vec) (vector-ref vec 1))
(define (tasks:hostinfo-get-port vec) (vector-ref vec 2))
(define (tasks:hostinfo-get-pubport vec) (vector-ref vec 3))
(define (tasks:hostinfo-get-transport vec) (vector-ref vec 4))
(define (tasks:hostinfo-get-pid vec) (vector-ref vec 5))
(define (tasks:hostinfo-get-hostname vec) (vector-ref vec 6))
;; state: 'live, 'shutting-down, 'dead
(define (tasks:server-register mdb pid interface port priority state transport #!key (pubport -1))
(debug:print-info 11 "tasks:server-register " pid " " interface " " port " " priority " " state)
(sqlite3:execute
mdb
"INSERT OR REPLACE INTO servers (pid,hostname,port,pubport,start_time,priority,state,mt_version,heartbeat,interface,transport)
VALUES(?, ?, ?, ?, strftime('%s','now'), ?, ?, ?, strftime('%s','now'),?,?);"
pid (get-host-name) port pubport priority (conc state)
(common:version-signature)
interface
(conc transport))
(vector
(tasks:server-get-server-id mdb (get-host-name) interface port pid)
interface
port
pubport
transport
))
;; NB// two servers with same pid on different hosts will be removed from the list if pid: is used!
(define (tasks:server-deregister mdb hostname #!key (port #f)(pid #f)(action 'delete))
(debug:print-info 11 "server-deregister " hostname ", port " port ", pid " pid)
(if *db-write-access*
(if pid
(case action
((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid))
(else (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE pid=?;" pid)))
(if port
(case action
((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE (interface=? or hostname=?) AND port=?;" hostname hostname port))
(else (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE (interface=? or hostname=?) AND port=?;" hostname hostname port)))
(debug:print 0 "ERROR: tasks:server-deregister called with neither pid nor port specified")))))
(define (tasks:server-deregister-self mdb hostname)
(tasks:server-deregister mdb hostname pid: (current-process-id)))
;; need a simple call for robustly removing records given host and port
(define (tasks:server-delete mdb hostname port)
(tasks:server-deregister mdb hostname port: port action: 'delete))
(define (tasks:server-get-server-id mdb hostname iface port pid)
(debug:print-info 12 "tasks:server-get-server-id " mdb " " hostname " " iface " " port " " pid)
(let ((res #f))
(sqlite3:for-each-row
(lambda (id)
(set! res id))
mdb
(cond
((and hostname pid) "SELECT id FROM servers WHERE hostname=? AND pid=?;")
((and iface port) "SELECT id FROM servers WHERE interface=? AND port=?;")
((and hostname port) "SELECT id FROM servers WHERE hostname=? AND port=?;")
(else
(begin
(debug:print 0 "ERROR: tasks:server-get-server-id needs (hostname and pid) OR (iface and port) OR (hostname and port)")
"SELECT id FROM servers WHERE pid=-999;")))
(if hostname hostname iface)(if pid pid port))
res))
(define (tasks:server-update-heartbeat mdb server-id)
(debug:print-info 1 "Heart beat update of server id=" server-id)
(handle-exceptions
exn
(begin
(debug:print 0 "WARNING: probable timeout on monitor.db access")
(thread-sleep! 1)
(tasks:server-update-heartbeat mdb server-id))
(sqlite3:execute mdb "UPDATE servers SET heartbeat=strftime('%s','now') WHERE id=?;" server-id)))
;; alive servers keep the heartbeat field upto date with seconds every 6 or so seconds
(define (tasks:server-alive? mdb server-id #!key (iface #f)(hostname #f)(port #f)(pid #f))
(let* ((server-id (if server-id
server-id
(tasks:server-get-server-id mdb hostname iface port pid)))
(heartbeat-delta 99e9))
(sqlite3:for-each-row
(lambda (delta)
(set! heartbeat-delta delta))
mdb "SELECT strftime('%s','now')-heartbeat FROM servers WHERE id=?;" server-id)
(< heartbeat-delta 10)))
(define (tasks:client-register mdb pid hostname cmdline)
(sqlite3:execute
mdb
"INSERT OR REPLACE INTO clients (server_id,pid,hostname,cmdline,login_time) VALUES(?,?,?,?,strftime('%s','now'));")
(tasks:server-get-server-id mdb hostname #f #f pid)
pid hostname cmdline)
(define (tasks:client-logout mdb pid hostname cmdline)
(sqlite3:execute
mdb
"UPDATE clients SET logout_time=strftime('%s','now') WHERE pid=? AND hostname=? AND cmdline=?;"
pid hostname cmdline))
(define (tasks:get-logged-in-clients mdb server-id)
(let ((res '()))
(sqlite3:for-each-row
(lambda (id server-id pid hostname cmdline login-time logout-time)
(set! res (cons (vector id server-id pid hostname cmdline login-time lougout-time) res)))
mdb
"SELECT id,server_id,pid,hostname,cmdline,login_time,logout_time FROM clients WHERE server_id=?;"
server-id)))
(define (tasks:have-clients? mdb server-id)
(null? (tasks:get-logged-in-clients mdb server-id)))
;; ping each server in the db and return first found that responds.
;; remove any others. will not necessarily remove all!
(define (tasks:get-best-server mdb)
(let ((res '())
(best #f)
(transport (if (and *transport-type*
(not (eq? *transport-type* 'fs)))
(conc *transport-type*)
"%")))
(sqlite3:for-each-row
(lambda (id interface port pubport transport pid hostname)
(set! res (cons (vector id interface port pubport transport pid hostname) res))
;;(debug:print-info 2 "Found existing server " hostname ":" port " registered in db"))
)
mdb
"SELECT id,interface,port,pubport,transport,pid,hostname FROM servers
WHERE strftime('%s','now')-heartbeat < 10
AND mt_version=? AND transport LIKE ?
ORDER BY start_time DESC LIMIT 1;" (common:version-signature) transport)
;; for now we are keeping only one server registered in the db, return #f or first server found
(if (null? res) #f (car res))))
;; BUG: This logic is probably needed unless methodology changes completely...
;;
;; (if (null? res) #f
;; (let loop ((hed (car res))
;; (tal (cdr res)))
;; ;; (print "hed=" hed ", tal=" tal)
;; (let* ((host (list-ref hed 0))
;; (iface (list-ref hed 1))
;; (port (list-ref hed 2))
;; (pid (list-ref hed 4))
;; (alive (open-run-close tasks:server-alive? tasks:open-db #f hostname: host port: port)))
;; (if alive
;; (begin
;; (debug:print-info 2 "Found an existing, alive, server " host ", " port ".")
;; (list host iface port))
;; (begin
;; (debug:print-info 1 "Marking " host ":" port " as dead in server registry.")
;; (if port
;; (open-run-close tasks:server-deregister tasks:open-db host port: port)
;; (open-run-close tasks:server-deregister tasks:open-db host pid: pid))
;; (if (null? tal)
;; #f
;; (loop (car tal)(cdr tal))))))))))
(define (tasks:remove-server-records mdb)
(sqlite3:execute mdb "DELETE FROM servers;"))
(define (tasks:mark-server hostname port pid state transport)
(if port
(open-run-close tasks:server-deregister tasks:open-db hostname port: port)
(open-run-close tasks:server-deregister tasks:open-db hostname pid: pid)))
(define (tasks:kill-server status hostname port pid transport)
(debug:print-info 1 "Removing defunct server record for " hostname ":" port)
(if port
(open-run-close tasks:server-deregister tasks:open-db hostname port: port)
(open-run-close tasks:server-deregister tasks:open-db hostname pid: pid))
(if status ;; #t means alive
(begin
(if (equal? hostname (get-host-name))
(handle-exceptions
exn
(debug:print-info 0 "server may or may not be dead, check for megatest -server running as pid " pid "\n"
" EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
(debug:print 1 "Sending signal/term to " pid " on " hostname)
(process-signal pid signal/term)
(thread-sleep! 5) ;; give it five seconds to die peacefully then do a brutal kill
;;(process-signal pid signal/kill)
) ;; local machine, send sig term
(begin
;;(debug:print-info 1 "Stopping remote servers not yet supported."))))
(debug:print-info 1 "Telling alive server on " hostname ":" port " to commit servercide")
(let ((serverdat (list hostname port)))
(case (if (string? transport) (string->symbol transport) transport)
((http)(http-transport:client-connect hostname port))
(else (debug:print "ERROR: remote stopping servers of type " transport " not supported yet")))
(cdb:kill-server serverdat pid))))) ;; remote machine, try telling server to commit suicide
(begin
(if status
(if (equal? hostname (get-host-name))
(begin
(debug:print-info 1 "Sending signal/term to " pid " on " hostname)
(process-signal pid signal/term) ;; local machine, send sig term
(thread-sleep! 5) ;; give it five seconds to die peacefully then do a brutal kill
(process-signal pid signal/kill))
(debug:print 0 "WARNING: Can't kill frozen server on remote host " hostname))))))
(define (tasks:get-all-servers mdb)
(let ((res '()))
(sqlite3:for-each-row
(lambda (id pid hostname interface port pubport start-time priority state mt-version last-update transport)
(set! res (cons (vector id pid hostname interface port pubport start-time priority state mt-version last-update transport) res)))
mdb
"SELECT id,pid,hostname,interface,port,pubport,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update,transport FROM servers ORDER BY start_time DESC;")
res))
;;======================================================================
;; Tasks and Task monitors
;;======================================================================
;;======================================================================
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
>
>
|
|
|
>
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
<
|
|
>
|
|
|
|
>
>
|
|
<
|
>
>
>
>
>
>
>
|
>
>
|
|
>
|
>
>
>
>
|
|
|
<
<
<
|
<
<
>
>
>
>
>
>
>
>
>
|
|
>
>
>
>
>
>
>
|
>
>
>
|
>
>
|
|
>
>
>
|
>
|
>
|
>
>
>
>
>
>
>
|
>
>
|
>
|
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
>
>
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
>
>
>
>
>
>
>
>
|
<
|
<
<
<
<
<
<
<
|
<
|
<
<
<
<
<
<
|
<
|
<
<
<
|
<
<
>
|
<
<
<
<
|
<
<
<
<
<
<
|
<
|
<
<
<
|
<
<
<
<
<
<
<
<
|
<
<
|
<
<
<
<
|
<
|
<
|
<
<
<
<
|
<
|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
|
<
<
|
<
<
<
<
|
|
<
<
|
<
<
<
>
|
<
<
<
<
<
>
|
<
<
<
<
<
<
<
|
<
<
<
<
<
<
<
<
<
<
<
<
<
|
|
|
>
|
|
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
|
(declare (uses common))
(include "task_records.scm")
;;======================================================================
;; Tasks db
;;======================================================================
;; wait up to aprox n seconds for a journal to go away
;;
(define (tasks:wait-on-journal path n #!key (remove #f)(waiting-msg #f))
(if (not (string? path))
(debug:print 0 "ERROR: Called tasks:wait-on-journal with path=" path " (not a string)")
(let ((fullpath (conc path "-journal")))
(handle-exceptions
exn
(begin
(print-call-chain (current-error-port))
(debug:print 0 " message: " ((condition-property-accessor 'exn 'message) exn))
(debug:print 0 " exn=" (condition->list exn))
(debug:print 0 "tasks:wait-on-journal failed. Continuing on, you can ignore this call-chain")
#t) ;; if stuff goes wrong just allow it to move on
(let loop ((journal-exists (file-exists? fullpath))
(count n)) ;; wait ten times ...
(if journal-exists
(begin
(if (and waiting-msg
(eq? (modulo n 30) 0))
(debug:print 0 waiting-msg))
(if (> count 0)
(begin
(thread-sleep! 1)
(loop (file-exists? fullpath)
(- count 1)))
(begin
(if remove (system (conc "rm -rf " fullpath)))
#f)))
#t))))))
(define (tasks:get-task-db-path)
(let* ((linktree (configf:lookup *configdat* "setup" "linktree"))
(dbpath (conc linktree "/.db")))
dbpath))
;; If file exists AND
;; file readable
;; ==> open it
;; If file exists AND
;; file NOT readable
;; ==> open in-mem version
;; If file NOT exists
;; ==> open in-mem version
;;
(define (tasks:open-db #!key (numretries 4))
(if *task-db*
*task-db*
(handle-exceptions
exn
(if (> numretries 0)
(begin
(print-call-chain (current-error-port))
(debug:print 0 " message: " ((condition-property-accessor 'exn 'message) exn))
(debug:print 0 " exn=" (condition->list exn))
(thread-sleep! 1)
(tasks:open-db numretries (- numretries 1)))
(begin
(print-call-chain (current-error-port))
(debug:print 0 " message: " ((condition-property-accessor 'exn 'message) exn))
(debug:print 0 " exn=" (condition->list exn))))
(let* ((dbpath (tasks:get-task-db-path))
(dbfile (conc dbpath "/monitor.db"))
(avail (tasks:wait-on-journal dbpath 10)) ;; wait up to about 10 seconds for the journal to go away
(exists (file-exists? dbpath))
(write-access (file-write-access? dbpath))
(mdb (cond ;; what the hek is *toppath* doing here?
((and (string? *toppath*)(file-write-access? *toppath*))
(sqlite3:open-database dbfile))
((file-read-access? dbpath) (sqlite3:open-database dbfile))
(else (sqlite3:open-database ":memory:")))) ;; (never-give-up-open-db dbpath))
(handler (make-busy-timeout 36000)))
(if (and exists
(not write-access))
(set! *db-write-access* write-access)) ;; only unset so other db's also can use this control
(sqlite3:set-busy-handler! mdb handler)
(db:set-sync mdb) ;; (sqlite3:execute mdb (conc "PRAGMA synchronous = 0;"))
;; (if (or (and (not exists)
;; (file-write-access? *toppath*))
;; (not (file-read-access? dbpath)))
;; (begin
(sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS tasks_queue (id INTEGER PRIMARY KEY,
action TEXT DEFAULT '',
owner TEXT,
state TEXT DEFAULT 'new',
target TEXT DEFAULT '',
name TEXT DEFAULT '',
testpatt TEXT DEFAULT '',
keylock TEXT,
params TEXT,
creation_time TIMESTAMP,
execution_time TIMESTAMP);")
(sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS monitors (id INTEGER PRIMARY KEY,
pid INTEGER,
start_time TIMESTAMP,
last_update TIMESTAMP,
hostname TEXT,
username TEXT,
CONSTRAINT monitors_constraint UNIQUE (pid,hostname));")
(sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS servers (id INTEGER PRIMARY KEY,
pid INTEGER,
interface TEXT,
hostname TEXT,
port INTEGER,
pubport INTEGER,
start_time TIMESTAMP,
priority INTEGER,
state TEXT,
mt_version TEXT,
heartbeat TIMESTAMP,
transport TEXT,
run_id INTEGER);")
;; CONSTRAINT servers_constraint UNIQUE (pid,hostname,port));")
(sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS clients (id INTEGER PRIMARY KEY,
server_id INTEGER,
pid INTEGER,
hostname TEXT,
cmdline TEXT,
login_time TIMESTAMP,
logout_time TIMESTAMP DEFAULT -1,
CONSTRAINT clients_constraint UNIQUE (pid,hostname));")
;))
(sqlite3:execute mdb "DELETE FROM tasks_queue WHERE state='done' AND creation_time < ?;" (- (current-seconds)(* 24 60 60))) ;; remove older than 24 hrs
(set! *task-db* (cons mdb dbpath))
*task-db*))))
;;======================================================================
;; Server and client management
;;======================================================================
;; make-vector-record tasks hostinfo id interface port pubport transport pid hostname
(define (tasks:hostinfo-get-id vec) (vector-ref vec 0))
(define (tasks:hostinfo-get-interface vec) (vector-ref vec 1))
(define (tasks:hostinfo-get-port vec) (vector-ref vec 2))
(define (tasks:hostinfo-get-pubport vec) (vector-ref vec 3))
(define (tasks:hostinfo-get-transport vec) (vector-ref vec 4))
(define (tasks:hostinfo-get-pid vec) (vector-ref vec 5))
(define (tasks:hostinfo-get-hostname vec) (vector-ref vec 6))
(define (tasks:server-lock-slot mdb run-id)
(tasks:server-clean-out-old-records-for-run-id mdb run-id " tasks:server-lock-slot")
(if (< (tasks:num-in-available-state mdb run-id) 4)
(begin
(tasks:server-set-available mdb run-id)
;; (thread-sleep! 2) ;; Try removing this. It may not be needed.
(tasks:server-am-i-the-server? mdb run-id))
#f))
;; register that this server may come online (first to register goes though with the process)
(define (tasks:server-set-available mdb run-id)
(sqlite3:execute
mdb
"INSERT INTO servers (pid,hostname,port,pubport,start_time, priority,state,mt_version,heartbeat, interface,transport,run_id)
VALUES(?, ?, ?, ?, strftime('%s','now'), ?, ?, ?,-1,?, ?, ?);"
(current-process-id) ;; pid
(get-host-name) ;; hostname
-1 ;; port
-1 ;; pubport
(random 1000) ;; priority (used a tiebreaker on get-available)
"available" ;; state
(common:version-signature) ;; mt_version
-1 ;; interface
"http" ;; transport
run-id
))
(define (tasks:num-in-available-state mdb run-id)
(let ((res 0))
(sqlite3:for-each-row
(lambda (num-in-queue)
(set! res num-in-queue))
mdb
"SELECT count(id) FROM servers WHERE run_id=? AND state = 'available' AND (strftime('%s','now') - start_time) < 30 ;"
run-id)
res))
(define (tasks:num-servers-non-zero-running mdb)
(let ((res 0))
(sqlite3:for-each-row
(lambda (num-running)
(set! res num-running))
mdb
"SELECT count(id) FROM servers WHERE run_id != 0 AND state = 'running';")
res))
(define (tasks:server-clean-out-old-records-for-run-id mdb run-id tag)
(sqlite3:execute mdb "UPDATE servers SET state=?,heartbeat=strftime('%s','now') WHERE state in ('available','dbprep','shutting-down') AND (strftime('%s','now') - start_time) > 50 AND run_id=?;"
(conc "defunct" tag) run-id))
(define (tasks:server-force-clean-running-records-for-run-id mdb run-id tag)
(sqlite3:execute mdb "UPDATE servers SET state=?,heartbeat=strftime('%s','now') WHERE state = 'running' AND run_id=?;"
(conc "defunct" tag) run-id))
(define (tasks:server-force-clean-run-record mdb run-id iface port tag)
(sqlite3:execute mdb "UPDATE servers SET state=?,heartbeat=strftime('%s','now') WHERE state = 'running' AND run_id=? AND interface=? AND port=?;"
(conc "defunct" tag) run-id iface port))
(define (tasks:server-delete-records-for-this-pid mdb tag)
(sqlite3:execute mdb "UPDATE servers SET state=?,heartbeat=strftime('%s','now') WHERE hostname=? AND pid=?;"
(conc "defunct" tag) (get-host-name) (current-process-id)))
(define (tasks:server-delete-record mdb server-id tag)
(sqlite3:execute mdb "UPDATE servers SET state=?,heartbeat=strftime('%s','now') WHERE id=?;"
(conc "defunct" tag) server-id)
;; use this opportuntity to clean out records over one month old or over 10 minutes old with port = -1 (i.e. a never used placeholder)
(sqlite3:execute mdb "DELETE FROM servers WHERE state not in ('running','shutting-down','dbprep') AND (strftime('%s','now') - start_time) > 2628000;")
(sqlite3:execute mdb "DELETE FROM servers WHERE state like 'defunct%' AND port=-1 AND (strftime('%s','now') - start_time) > 600;")
)
(define (tasks:server-set-state! mdb server-id state)
(sqlite3:execute mdb "UPDATE servers SET state=?,heartbeat=strftime('%s','now') WHERE id=?;" state server-id))
(define (tasks:server-set-interface-port mdb server-id interface port)
(sqlite3:execute mdb "UPDATE servers SET interface=?,port=?,heartbeat=strftime('%s','now') WHERE id=?;" interface port server-id))
;; Get random port not used in long time
;;
(define (tasks:server-get-next-port mdb)
(let* ((lownum 30000)
(highnum 64000)
(used-ports '())
(get-rand-port (lambda ()
(+ lownum (random (- highnum lownum)))))
(port-param (if (and (args:get-arg "-port")
(string->number (args:get-arg "-port")))
(string->number (args:get-arg "-port"))
#f))
;; (config-port (if (and (config-lookup *configdat* "server" "port")
;; (string->number (config-lookup *configdat* "server" "port")))
;; (string->number (config-lookup *configdat* "server" "port"))
;; #f))
)
(sqlite3:for-each-row
(lambda (port)
(set! used-ports (cons port used-ports)))
mdb
"SELECT port FROM servers;")
(cond
((and port-param res) (if (> res port-param) res port-param))
(port-param port-param)
;; ((and config-port res) (if (> res config-port) res config-port))
;; (config-port config-port)
(else
(let loop ((port (get-rand-port))
(remtries 100))
(if (member port used-ports)
(if (> remtries 0)
(loop (get-rand-port)(- remtries 1))
(get-rand-port))
port))))))
(define (tasks:server-am-i-the-server? mdb run-id)
(let* ((all (tasks:server-get-servers-vying-for-run-id mdb run-id))
(first (if (null? all)
(begin (debug:print 0 "ERROR: no servers listed, should be at least one by now.")
(sqlite3:finalize! mdb)
(exit 1))
(car (db:get-rows all))))
(header (db:get-header all))
(id (db:get-value-by-header first header "id"))
(hostname (db:get-value-by-header first header "hostname"))
(pid (db:get-value-by-header first header "pid"))
(priority (db:get-value-by-header first header "priority")))
(debug:print 0 "INFO: am-i-the-server got record " first)
;; for now a basic check. add tiebreaking by priority later
(if (and (equal? hostname (get-host-name))
(equal? pid (current-process-id)))
id
#f)))
;; Use: (db:get-value-by-header (car (db:get-rows dat)) (db:get-header dat) "fieldname")
;; to extract info from the structure returned
;;
(define (tasks:server-get-servers-vying-for-run-id mdb run-id)
(let* ((header (list "id" "hostname" "pid" "interface" "port" "pubport" "state" "run_id" "priority" "start_time"))
(selstr (string-intersperse header ","))
(res '()))
(sqlite3:for-each-row
(lambda (a . b)
(set! res (cons (apply vector a b) res)))
mdb
(conc "SELECT " selstr " FROM servers WHERE run_id=? AND state in ('available','running','dbprep') ORDER BY start_time DESC;")
run-id)
(vector header res)))
(define (tasks:get-server mdb run-id #!key (retries 10))
(let ((res #f)
(best #f))
(handle-exceptions
exn
(begin
(print-call-chain (current-error-port))
(debug:print 0 "WARNING: tasks:get-server db access error.")
(debug:print 0 " message: " ((condition-property-accessor 'exn 'message) exn))
(debug:print 0 " for run " run-id)
(print-call-chain (current-error-port))
(if (> retries 0)
(begin
(debug:print 0 " trying call to tasks:get-server again in 10 seconds")
(thread-sleep! 10)
(tasks:get-server mdb run-id retries: (- retries 0)))
(debug:print 0 "10 tries of tasks:get-server all crashed and burned. Giving up and returning \"no server found\"")))
(sqlite3:for-each-row
(lambda (id interface port pubport transport pid hostname)
(set! res (vector id interface port pubport transport pid hostname)))
mdb
;; removed:
;; strftime('%s','now')-heartbeat < 10 AND mt_version = ?
"SELECT id,interface,port,pubport,transport,pid,hostname FROM servers
WHERE run_id=? AND state='running'
ORDER BY start_time DESC LIMIT 1;" run-id) ;; (common:version-signature) run-id)
res)))
(define (tasks:server-running-or-starting? mdb run-id)
(let ((res #f))
(sqlite3:for-each-row
(lambda (id)
(set! res id))
mdb ;; NEEDS dbprep ADDED
"SELECT id FROM servers WHERE run_id=? AND (state = 'running' OR (state = 'dbprep' AND (strftime('%s','now') - start_time) < 60));" run-id)
res))
(define (tasks:server-running? mdb run-id)
(let ((res #f))
(sqlite3:for-each-row
(lambda (id)
(set! res id))
mdb ;; NEEDS dbprep ADDED
"SELECT id FROM servers WHERE run_id=? AND state = 'running';" run-id)
res))
(define (tasks:need-server run-id)
(let ((forced (configf:lookup *configdat* "server" "required"))
(maxqry (cdr (rmt:get-max-query-average run-id)))
(threshold (string->number (or (configf:lookup *configdat* "server" "server-query-threshold") "10"))))
(cond
(forced
(if (common:low-noise-print 60 run-id "server required is set")
(debug:print-info 0 "Server required is set, starting server."))
#t)
((> maxqry threshold)
(if (common:low-noise-print 60 run-id "Max query time execeeded")
(debug:print-info 0 "Max avg query time of " maxqry "ms exceeds limit of " threshold "ms, starting server."))
#t)
(else
#f))))
;; try to start a server and wait for it to be available
;;
(define (tasks:start-and-wait-for-server tdbdat run-id delay-max-tries)
;; ensure a server is running for this run
(let loop ((server-dat (tasks:get-server (db:delay-if-busy tdbdat) run-id))
(delay-time 0))
(if (and (not server-dat)
(< delay-time delay-max-tries))
(begin
(if (common:low-noise-print 60 "tasks:start-and-wait-for-server" run-id)
(debug:print 0 "Try starting server for run-id " run-id))
(server:kind-run run-id)
(thread-sleep! (min delay-time 5))
(loop (tasks:get-server (db:delay-if-busy tdbdat) run-id)(+ delay-time 1))))))
(define (tasks:get-all-servers mdb)
(let ((res '()))
(sqlite3:for-each-row
(lambda (id pid hostname interface port pubport start-time priority state mt-version last-update transport run-id)
;; 0 1 2 3 4 5 6 7 8 9 10 11 12
(set! res (cons (vector id pid hostname interface port pubport start-time priority state mt-version last-update transport run-id) res)))
mdb
"SELECT id,pid,hostname,interface,port,pubport,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update,transport,run_id FROM servers WHERE state NOT LIKE 'defunct%' ORDER BY start_time DESC;")
res))
;; no elegance here ...
;;
(define (tasks:kill-server hostname pid)
(debug:print-info 0 "Attempting to kill server process " pid " on host " hostname)
(setenv "TARGETHOST" hostname)
(setenv "TARGETHOST_LOGF" "server-kills.log")
(system (conc "nbfake kill " pid))
(unsetenv "TARGETHOST_LOGF")
(unsetenv "TARGETHOST"))
;; look up a server by run-id and send it a kill, also delete the record for that server
;;
(define (tasks:kill-server-run-id run-id #!key (tag "default"))
(let* ((tdbdat (tasks:open-db))
(sdat (tasks:get-server (db:delay-if-busy tdbdat) run-id)))
(if sdat
(let ((hostname (vector-ref sdat 6))
(pid (vector-ref sdat 5))
(server-id (vector-ref sdat 0)))
(tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "killed")
(debug:print-info 0 "Killing server " server-id " for run-id " run-id " on host " hostname " with pid " pid)
(tasks:kill-server hostname pid)
(tasks:server-delete-record (db:delay-if-busy tdbdat) server-id tag) )
(debug:print-info 0 "No server found for run-id " run-id ", nothing to kill"))
;; (sqlite3:finalize! tdb)
))
;;======================================================================
;; Tasks and Task monitors
;;======================================================================
;;======================================================================
|
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
|
(set! res count))
mdb
"SELECT count(id) FROM monitors WHERE last_update < (strftime('%s','now') - 300) AND username=?;"
(car (user-information (current-user-id))))
res))
;; register a task
(define (tasks:add mdb action owner target runname test item params)
(sqlite3:execute mdb "INSERT INTO tasks_queue (action,owner,state,target,name,test,item,params,creation_time,execution_time)
VALUES (?,?,'new',?,?,?,?,?,strftime('%s','now'),0);"
action
owner
target
runname
test
item
(if params params "")))
(define (keys:key-vals-hash->target keys key-params)
(let ((tmp (hash-table-ref/default key-params (vector-ref (car keys) 0) "")))
(if (> (length keys) 1)
(for-each (lambda (key)
(set! tmp (conc tmp "/" (hash-table-ref/default key-params (vector-ref key 0) ""))))
|
|
|
|
|
<
|
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
|
(set! res count))
mdb
"SELECT count(id) FROM monitors WHERE last_update < (strftime('%s','now') - 300) AND username=?;"
(car (user-information (current-user-id))))
res))
;; register a task
(define (tasks:add mdb action owner target runname testpatt params)
(sqlite3:execute mdb "INSERT INTO tasks_queue (action,owner,state,target,name,testpatt,params,creation_time,execution_time)
VALUES (?,?,'new',?,?,?,?,strftime('%s','now'),0);"
action
owner
target
runname
testpatt
(if params params "")))
(define (keys:key-vals-hash->target keys key-params)
(let ((tmp (hash-table-ref/default key-params (vector-ref (car keys) 0) "")))
(if (> (length keys) 1)
(for-each (lambda (key)
(set! tmp (conc tmp "/" (hash-table-ref/default key-params (vector-ref key 0) ""))))
|