21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
|
;;======================================================================
;; Tasks db
;;======================================================================
(define (tasks:open-db)
(let* ((dbpath (conc *toppath* "/monitor.db"))
(exists (file-exists? dbpath))
(tdb (sqlite3:open-database dbpath)) ;; (never-give-up-open-db dbpath))
(handler (make-busy-timeout 36000)))
(sqlite3:set-busy-handler! tdb handler)
(if (not exists)
(begin
(sqlite3:execute tdb "CREATE TABLE IF NOT EXISTS tasks_queue (id INTEGER PRIMARY KEY,
action TEXT DEFAULT '',
owner TEXT,
state TEXT DEFAULT 'new',
target TEXT DEFAULT '',
name TEXT DEFAULT '',
test TEXT DEFAULT '',
item TEXT DEFAULT '',
keylock TEXT,
params TEXT,
creation_time TIMESTAMP,
execution_time TIMESTAMP);")
(sqlite3:execute tdb "CREATE TABLE IF NOT EXISTS monitors (id INTEGER PRIMARY KEY,
pid INTEGER,
start_time TIMESTAMP,
last_update TIMESTAMP,
hostname TEXT,
username TEXT,
CONSTRAINT monitors_constraint UNIQUE (pid,hostname));")))
tdb))
;;======================================================================
;; Tasks and Task monitors
;;======================================================================
;;======================================================================
;; Tasks
;;======================================================================
;;======================================================================
;; Task Monitors
;;======================================================================
(define (tasks:register-monitor db tdb)
(let* ((pid (current-process-id))
(hostname (get-host-name))
(userinfo (user-information (current-user-id)))
(username (car userinfo)))
(print "Register monitor, pid: " pid ", hostname: " hostname ", username: " username)
(sqlite3:execute tdb "INSERT INTO monitors (pid,start_time,last_update,hostname,username) VALUES (?,strftime('%s','now'),strftime('%s','now'),?,?);"
pid hostname username)))
(define (tasks:get-num-alive-monitors tdb)
(let ((res 0))
(sqlite3:for-each-row
(lambda (count)
(set! res count))
tdb
"SELECT count(id) FROM monitors WHERE last_update < (strftime('%s','now') - 300) AND username=?;"
(car (user-information (current-user-id))))
res))
;; register a task
(define (tasks:add tdb action owner target runname test item params)
(sqlite3:execute tdb "INSERT INTO tasks_queue (action,owner,state,target,name,test,item,params,creation_time,execution_time)
VALUES (?,?,'new',?,?,?,?,?,strftime('%s','now'),0);"
action
owner
target
runname
test
item
(if params params "")))
(define (keys:key-vals-hash->target keys key-params)
(let ((tmp (hash-table-ref/default key-params (vector-ref (car keys) 0) "")))
(if (> (length keys) 1)
(for-each (lambda (key)
(set! tmp (conc tmp "/" (hash-table-ref/default key-params (vector-ref key 0) ""))))
(cdr keys)))
tmp))
;; for use from the gui
(define (tasks:add-from-params tdb action keys key-params var-params)
(let ((target (keys:key-vals-hash->target keys key-params))
(owner (car (user-information (current-user-id))))
(runname (hash-table-ref/default var-params "runname" #f))
(testpatts (hash-table-ref/default var-params "testpatts" "%"))
(itempatts (hash-table-ref/default var-params "itempatts" "%"))
(params (hash-table-ref/default var-params "params" "")))
(tasks:add tdb action owner target runname testpatts itempatts params)))
;; return one task from those who are 'new' OR 'waiting' AND more than 10sec old
;;
(define (tasks:snag-a-task tdb)
(let ((res #f)
(keytxt (conc (current-process-id) "-" (get-host-name) "-" (car (user-information (current-user-id))))))
;; first randomly set a new to pid-hostname-hostname
(sqlite3:execute
tdb
"UPDATE tasks_queue SET keylock=? WHERE id IN
(SELECT id FROM tasks_queue
WHERE state='new' OR
(state='waiting' AND (strftime('%s','now')-execution_time) > 10) OR
state='reset'
ORDER BY RANDOM() LIMIT 1);" keytxt)
(sqlite3:for-each-row
(lambda (id . rem)
(set! res (apply vector id rem)))
tdb
"SELECT id,action,owner,state,target,name,test,item,params,creation_time,execution_time FROM tasks_queue WHERE keylock=? ORDER BY execution_time ASC LIMIT 1;" keytxt)
(if res ;; yep, have work to be done
(begin
(sqlite3:execute tdb "UPDATE tasks_queue SET state='inprogress',execution_time=strftime('%s','now') WHERE id=?;"
(tasks:task-get-id res))
res)
#f)))
(define (tasks:reset-stuck-tasks tdb)
(let ((res '()))
(sqlite3:for-each-row
(lambda (id delta)
(set! res (cons id res)))
tdb
"SELECT id,strftime('%s','now')-execution_time AS delta FROM tasks_queue WHERE state='inprogress' AND delta>700 ORDER BY delta DESC LIMIT 2;")
(sqlite3:execute
tdb
(conc "UPDATE tasks_queue SET state='reset' WHERE id IN ('" (string-intersperse (map conc res) "','") "');"))))
;; return all tasks in the tasks_queue table
;;
(define (tasks:get-tasks tdb types states)
(let ((res '()))
(sqlite3:for-each-row
(lambda (id . rem)
(set! res (cons (apply vector id rem) res)))
tdb
(conc "SELECT id,action,owner,state,target,name,test,item,params,creation_time,execution_time
FROM tasks_queue "
;; WHERE
;; state IN " statesstr " AND
;; action IN " actionsstr
" ORDER BY creation_time DESC;"))
res))
;; remove tasks given by a string of numbers comma separated
(define (tasks:remove-queue-entries tdb task-ids)
(sqlite3:execute tdb (conc "DELETE FROM tasks_queue WHERE id IN (" task-ids ");")))
;;
(define (tasks:start-monitor db tdb)
(if (> (tasks:get-num-alive-monitors tdb) 2) ;; have two running, no need for more
(debug:print-info 1 "Not starting monitor, already have more than two running")
(let* ((megatestdb (conc *toppath* "/megatest.db"))
(monitordbf (conc *toppath* "/monitor.db"))
(last-db-update 0)) ;; (file-modification-time megatestdb)))
(task:register-monitor tdb)
(let loop ((count 0)
(next-touch 0)) ;; next-touch is the time where we need to update last_update
;; if the db has been modified we'd best look at the task queue
(let ((modtime (file-modification-time megatestdbpath )))
(if (> modtime last-db-update)
(tasks:process-queue db tdb last-db-update megatestdb next-touch))
;; WARNING: Possible race conditon here!!
;; should this update be immediately after the task-get-action call above?
(if (> (current-seconds) next-touch)
(begin
(tasks:monitors-update tdb)
(loop (+ count 1)(+ (current-seconds) 240)))
(loop (+ count 1) next-touch)))))))
(define (tasks:process-queue db tdb)
(let* ((task (tasks:snag-a-task tdb))
(action (if task (tasks:task-get-action task) #f)))
(if action (print "tasks:process-queue task: " task))
(if action
(case (string->symbol action)
((run) (tasks:start-run db tdb task))
((remove) (tasks:remove-runs db tdb task))
((lock) (tasks:lock-runs db tdb task))
;; ((monitor) (tasks:start-monitor db task))
((rollup) (tasks:rollup-runs db tdb task))
((updatemeta)(tasks:update-meta db tdb task))
((kill) (tasks:kill-monitors db tdb task))))))
(define (tasks:get-monitors tdb)
(let ((res '()))
(sqlite3:for-each-row
(lambda (a . rem)
(set! res (cons (apply vector a rem) res)))
tdb
"SELECT id,pid,strftime('%m/%d/%Y %H:%M',datetime(start_time,'unixepoch'),'localtime'),strftime('%m/%d/%Y %H:%M:%S',datetime(last_update,'unixepoch'),'localtime'),hostname,username FROM monitors ORDER BY last_update ASC;")
(reverse res)
))
(define (tasks:tasks->text tasks)
(let ((fmtstr "~10a~10a~10a~12a~20a~12a~12a~12a~10a"))
(conc (format #f fmtstr "id" "action" "owner" "state" "target" "runname" "testpatts" "itempatts" "params") "\n"
|
|
|
|
|
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
|
;;======================================================================
;; Tasks db
;;======================================================================
(define (tasks:open-db)
(let* ((dbpath (conc *toppath* "/monitor.db"))
(exists (file-exists? dbpath))
(mdb (sqlite3:open-database dbpath)) ;; (never-give-up-open-db dbpath))
(handler (make-busy-timeout 36000)))
(sqlite3:set-busy-handler! mdb handler)
(if (not exists)
(begin
(sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS tasks_queue (id INTEGER PRIMARY KEY,
action TEXT DEFAULT '',
owner TEXT,
state TEXT DEFAULT 'new',
target TEXT DEFAULT '',
name TEXT DEFAULT '',
test TEXT DEFAULT '',
item TEXT DEFAULT '',
keylock TEXT,
params TEXT,
creation_time TIMESTAMP,
execution_time TIMESTAMP);")
(sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS monitors (id INTEGER PRIMARY KEY,
pid INTEGER,
start_time TIMESTAMP,
last_update TIMESTAMP,
hostname TEXT,
username TEXT,
CONSTRAINT monitors_constraint UNIQUE (pid,hostname));")
(sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS servers (id INTEGER PRIMARY KEY,
pid INTEGER,
hostname TEXT,
port INTEGER,
start_time TIMESTAMP,
priority INTEGER,
state TEXT,
CONSTRAINT servers_constraint UNIQUE (pid,hostname));")
(sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS clients (id INTEGER PRIMARY KEY,
server_id INTEGER,
pid INTEGER,
hostname TEXT,
cmdline TEXT,
login_time TIMESTAMP,
logout_time TIMESTAMP DEFAULT -1,
CONSTRAINT clients_constraint UNIQUE (pid,hostname));")
))
mdb))
;;======================================================================
;; Server and client management
;;======================================================================
;; state: 'live, 'shutting-down, 'dead
(define (tasks:server-register mdb pid hostname port priority state)
(sqlite3:execute
mdb
"INSERT OR REPLACE INTO servers (pid,hostname,port,start_time,priority,state) VALUES(?,?,?,strftime('%s','now'),?,?);"
pid hostname port priority (conc state)))
(define (tasks:server-deregister mdb hostname #!key (port #f)(pid #f))
(if pid
(sqlite3:execute mdb "DELETE FROM servers WHERE hostname=? AND pid=?;" hostname pid)
(if port
(sqlite3:execute mdb "DELETE FROM servers WHERE hostname=? AND port=?;" hostname port)
(debug:print 0 "ERROR: tasks:server-deregister called with neither pid nor port specified"))))
(define (tasks:server-deregister-self mdb)
(tasks:server-deregister mdb (get-host-name) pid: (current-process-id)))
(define (tasks:server-get-server-id mdb)
;; dunno yet
0)
(define (tasks:client-register mdb pid hostname cmdline)
(sqlite3:execute
mdb
"INSERT OR REPLACE INTO clients (server_id,pid,hostname,cmdline,login_time) VALUES(?,?,?,?,strftime('%s','now'));")
(tasks:server-get-server-id mdb)
pid hostname cmdline)
(define (tasks:client-logout mdb pid hostname cmdline)
(sqlite3:execute
mdb
"UPDATE clients SET logout_time=strftime('%s','now') WHERE pid=? AND hostname=? AND cmdline=?;"
pid hostname cmdline))
(define (tasks:get-logged-in-clients mdb server-id)
(let ((res '()))
(sqlite3:for-each-row
(lambda (id server-id pid hostname cmdline login-time logout-time)
(set! res (cons (vector id server-id pid hostname cmdline login-time lougout-time) res)))
mdb
"SELECT id,server_id,pid,hostname,cmdline,login_time,logout_time FROM clients WHERE server_id=?;"
server-id)))
(define (tasks:have-clients? mdb server-id)
(null? (tasks:get-logged-in-clients mdb server-id)))
(define (tasks:get-best-server mdb)
(let ((res #f))
(sqlite3:for-each-row
(lambda (id hostname port)
(set! res (list hostname port)))
mdb
"SELECT id,hostname,port FROM servers WHERE state='live' ORDER BY start_time DESC LIMIT 1;")
res))
(define (tasks:get-all-servers mdb)
(let ((res '()))
(sqlite3:for-each-row
(lambda (id pid hostname port start-time priority state)
(set! res (cons (vector id pid hostname port start-time priority state) res)))
mdb
"SELECT id,pid,hostname,port,start_time,priority,state FROM servers ORDER BY start_time ASC;")
res))
;;======================================================================
;; Tasks and Task monitors
;;======================================================================
;;======================================================================
;; Tasks
;;======================================================================
;;======================================================================
;; Task Monitors
;;======================================================================
(define (tasks:register-monitor db mdb)
(let* ((pid (current-process-id))
(hostname (get-host-name))
(userinfo (user-information (current-user-id)))
(username (car userinfo)))
(print "Register monitor, pid: " pid ", hostname: " hostname ", username: " username)
(sqlite3:execute mdb "INSERT INTO monitors (pid,start_time,last_update,hostname,username) VALUES (?,strftime('%s','now'),strftime('%s','now'),?,?);"
pid hostname username)))
(define (tasks:get-num-alive-monitors mdb)
(let ((res 0))
(sqlite3:for-each-row
(lambda (count)
(set! res count))
mdb
"SELECT count(id) FROM monitors WHERE last_update < (strftime('%s','now') - 300) AND username=?;"
(car (user-information (current-user-id))))
res))
;; register a task
(define (tasks:add mdb action owner target runname test item params)
(sqlite3:execute mdb "INSERT INTO tasks_queue (action,owner,state,target,name,test,item,params,creation_time,execution_time)
VALUES (?,?,'new',?,?,?,?,?,strftime('%s','now'),0);"
action
owner
target
runname
test
item
(if params params "")))
(define (keys:key-vals-hash->target keys key-params)
(let ((tmp (hash-table-ref/default key-params (vector-ref (car keys) 0) "")))
(if (> (length keys) 1)
(for-each (lambda (key)
(set! tmp (conc tmp "/" (hash-table-ref/default key-params (vector-ref key 0) ""))))
(cdr keys)))
tmp))
;; for use from the gui
(define (tasks:add-from-params mdb action keys key-params var-params)
(let ((target (keys:key-vals-hash->target keys key-params))
(owner (car (user-information (current-user-id))))
(runname (hash-table-ref/default var-params "runname" #f))
(testpatts (hash-table-ref/default var-params "testpatts" "%"))
(itempatts (hash-table-ref/default var-params "itempatts" "%"))
(params (hash-table-ref/default var-params "params" "")))
(tasks:add mdb action owner target runname testpatts itempatts params)))
;; return one task from those who are 'new' OR 'waiting' AND more than 10sec old
;;
(define (tasks:snag-a-task mdb)
(let ((res #f)
(keytxt (conc (current-process-id) "-" (get-host-name) "-" (car (user-information (current-user-id))))))
;; first randomly set a new to pid-hostname-hostname
(sqlite3:execute
mdb
"UPDATE tasks_queue SET keylock=? WHERE id IN
(SELECT id FROM tasks_queue
WHERE state='new' OR
(state='waiting' AND (strftime('%s','now')-execution_time) > 10) OR
state='reset'
ORDER BY RANDOM() LIMIT 1);" keytxt)
(sqlite3:for-each-row
(lambda (id . rem)
(set! res (apply vector id rem)))
mdb
"SELECT id,action,owner,state,target,name,test,item,params,creation_time,execution_time FROM tasks_queue WHERE keylock=? ORDER BY execution_time ASC LIMIT 1;" keytxt)
(if res ;; yep, have work to be done
(begin
(sqlite3:execute mdb "UPDATE tasks_queue SET state='inprogress',execution_time=strftime('%s','now') WHERE id=?;"
(tasks:task-get-id res))
res)
#f)))
(define (tasks:reset-stuck-tasks mdb)
(let ((res '()))
(sqlite3:for-each-row
(lambda (id delta)
(set! res (cons id res)))
mdb
"SELECT id,strftime('%s','now')-execution_time AS delta FROM tasks_queue WHERE state='inprogress' AND delta>700 ORDER BY delta DESC LIMIT 2;")
(sqlite3:execute
mdb
(conc "UPDATE tasks_queue SET state='reset' WHERE id IN ('" (string-intersperse (map conc res) "','") "');"))))
;; return all tasks in the tasks_queue table
;;
(define (tasks:get-tasks mdb types states)
(let ((res '()))
(sqlite3:for-each-row
(lambda (id . rem)
(set! res (cons (apply vector id rem) res)))
mdb
(conc "SELECT id,action,owner,state,target,name,test,item,params,creation_time,execution_time
FROM tasks_queue "
;; WHERE
;; state IN " statesstr " AND
;; action IN " actionsstr
" ORDER BY creation_time DESC;"))
res))
;; remove tasks given by a string of numbers comma separated
(define (tasks:remove-queue-entries mdb task-ids)
(sqlite3:execute mdb (conc "DELETE FROM tasks_queue WHERE id IN (" task-ids ");")))
;;
(define (tasks:start-monitor db mdb)
(if (> (tasks:get-num-alive-monitors mdb) 2) ;; have two running, no need for more
(debug:print-info 1 "Not starting monitor, already have more than two running")
(let* ((megatestdb (conc *toppath* "/megatest.db"))
(monitordbf (conc *toppath* "/monitor.db"))
(last-db-update 0)) ;; (file-modification-time megatestdb)))
(task:register-monitor mdb)
(let loop ((count 0)
(next-touch 0)) ;; next-touch is the time where we need to update last_update
;; if the db has been modified we'd best look at the task queue
(let ((modtime (file-modification-time megatestdbpath )))
(if (> modtime last-db-update)
(tasks:process-queue db mdb last-db-update megatestdb next-touch))
;; WARNING: Possible race conditon here!!
;; should this update be immediately after the task-get-action call above?
(if (> (current-seconds) next-touch)
(begin
(tasks:monitors-update mdb)
(loop (+ count 1)(+ (current-seconds) 240)))
(loop (+ count 1) next-touch)))))))
(define (tasks:process-queue db mdb)
(let* ((task (tasks:snag-a-task mdb))
(action (if task (tasks:task-get-action task) #f)))
(if action (print "tasks:process-queue task: " task))
(if action
(case (string->symbol action)
((run) (tasks:start-run db mdb task))
((remove) (tasks:remove-runs db mdb task))
((lock) (tasks:lock-runs db mdb task))
;; ((monitor) (tasks:start-monitor db task))
((rollup) (tasks:rollup-runs db mdb task))
((updatemeta)(tasks:update-meta db mdb task))
((kill) (tasks:kill-monitors db mdb task))))))
(define (tasks:get-monitors mdb)
(let ((res '()))
(sqlite3:for-each-row
(lambda (a . rem)
(set! res (cons (apply vector a rem) res)))
mdb
"SELECT id,pid,strftime('%m/%d/%Y %H:%M',datetime(start_time,'unixepoch'),'localtime'),strftime('%m/%d/%Y %H:%M:%S',datetime(last_update,'unixepoch'),'localtime'),hostname,username FROM monitors ORDER BY last_update ASC;")
(reverse res)
))
(define (tasks:tasks->text tasks)
(let ((fmtstr "~10a~10a~10a~12a~20a~12a~12a~12a~10a"))
(conc (format #f fmtstr "id" "action" "owner" "state" "target" "runname" "testpatts" "itempatts" "params") "\n"
|