19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
(include "task_records.scm")
;;======================================================================
;; Tasks db
;;======================================================================
(define (tasks:open-db)
(let* ((dbpath (conc *toppath* "/monitor.db"))
(exists (file-exists? dbpath))
(mdb (sqlite3:open-database dbpath)) ;; (never-give-up-open-db dbpath))
(handler (make-busy-timeout 36000)))
(sqlite3:set-busy-handler! mdb handler)
(sqlite3:execute mdb (conc "PRAGMA synchronous = 1;"))
(if (not exists)
(begin
(sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS tasks_queue (id INTEGER PRIMARY KEY,
action TEXT DEFAULT '',
owner TEXT,
state TEXT DEFAULT 'new',
target TEXT DEFAULT '',
|
|
|
>
|
|
>
>
>
|
|
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
(include "task_records.scm")
;;======================================================================
;; Tasks db
;;======================================================================
(define (tasks:open-db)
(let* ((dbpath (conc *toppath* "/monitor.db"))
(exists (file-exists? dbpath))
(write-access (file-write-access? dbpath))
(mdb (sqlite3:open-database dbpath)) ;; (never-give-up-open-db dbpath))
(handler (make-busy-timeout 36000)))
(if (and exists
(not write-access))
(set! *db-write-access* write-access)) ;; only unset so other db's also can use this control
(sqlite3:set-busy-handler! mdb handler)
(sqlite3:execute mdb (conc "PRAGMA synchronous = 0;"))
(if (not exists)
(begin
(sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS tasks_queue (id INTEGER PRIMARY KEY,
action TEXT DEFAULT '',
owner TEXT,
state TEXT DEFAULT 'new',
target TEXT DEFAULT '',
|
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
;; state: 'live, 'shutting-down, 'dead
(define (tasks:server-register mdb pid interface port priority state transport #!key (pubport -1))
(debug:print-info 11 "tasks:server-register " pid " " interface " " port " " priority " " state)
(sqlite3:execute
mdb
"INSERT OR REPLACE INTO servers (pid,hostname,port,pubport,start_time,priority,state,mt_version,heartbeat,interface,transport)
VALUES(?, ?, ?, ?, strftime('%s','now'), ?, ?, ?, strftime('%s','now'),?,?);"
pid (get-host-name) port pubport priority (conc state) megatest-version interface (conc transport))
(vector
(tasks:server-get-server-id mdb (get-host-name) interface port pid)
interface
port
pubport
transport
))
;; NB// two servers with same pid on different hosts will be removed from the list if pid: is used!
(define (tasks:server-deregister mdb hostname #!key (port #f)(pid #f)(action 'markdead))
(debug:print-info 11 "server-deregister " hostname ", port " port ", pid " pid)
(if pid
(case action
((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid))
(else (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE pid=?;" pid)))
(if port
(case action
((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE hostname=? AND port=?;" hostname port))
(else (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE hostname=? AND port=?;" hostname port)))
(debug:print 0 "ERROR: tasks:server-deregister called with neither pid nor port specified"))))
(define (tasks:server-deregister-self mdb hostname)
(tasks:server-deregister mdb hostname pid: (current-process-id)))
;; need a simple call for robustly removing records given host and port
(define (tasks:server-delete mdb hostname port)
(tasks:server-deregister mdb hostname port: port action: 'delete))
|
|
>
>
>
|
>
|
|
|
|
|
|
|
|
|
|
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
;; state: 'live, 'shutting-down, 'dead
(define (tasks:server-register mdb pid interface port priority state transport #!key (pubport -1))
(debug:print-info 11 "tasks:server-register " pid " " interface " " port " " priority " " state)
(sqlite3:execute
mdb
"INSERT OR REPLACE INTO servers (pid,hostname,port,pubport,start_time,priority,state,mt_version,heartbeat,interface,transport)
VALUES(?, ?, ?, ?, strftime('%s','now'), ?, ?, ?, strftime('%s','now'),?,?);"
pid (get-host-name) port pubport priority (conc state)
(common:version-signature)
interface
(conc transport))
(vector
(tasks:server-get-server-id mdb (get-host-name) interface port pid)
interface
port
pubport
transport
))
;; NB// two servers with same pid on different hosts will be removed from the list if pid: is used!
(define (tasks:server-deregister mdb hostname #!key (port #f)(pid #f)(action 'delete))
(debug:print-info 11 "server-deregister " hostname ", port " port ", pid " pid)
(if *db-write-access*
(if pid
(case action
((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid))
(else (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE pid=?;" pid)))
(if port
(case action
((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE (interface=? or hostname=?) AND port=?;" hostname hostname port))
(else (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE (interface=? or hostname=?) AND port=?;" hostname hostname port)))
(debug:print 0 "ERROR: tasks:server-deregister called with neither pid nor port specified")))))
(define (tasks:server-deregister-self mdb hostname)
(tasks:server-deregister mdb hostname pid: (current-process-id)))
;; need a simple call for robustly removing records given host and port
(define (tasks:server-delete mdb hostname port)
(tasks:server-deregister mdb hostname port: port action: 'delete))
|
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
|
(begin
(debug:print 0 "ERROR: tasks:server-get-server-id needs (hostname and pid) OR (iface and port) OR (hostname and port)")
"SELECT id FROM servers WHERE pid=-999;")))
(if hostname hostname iface)(if pid pid port))
res))
(define (tasks:server-update-heartbeat mdb server-id)
(debug:print-info 0 "Heart beat update of server id=" server-id)
(sqlite3:execute mdb "UPDATE servers SET heartbeat=strftime('%s','now') WHERE id=?;" server-id))
;; alive servers keep the heartbeat field upto date with seconds every 6 or so seconds
(define (tasks:server-alive? mdb server-id #!key (iface #f)(hostname #f)(port #f)(pid #f))
(let* ((server-id (if server-id
server-id
(tasks:server-get-server-id mdb hostname iface port pid)))
(heartbeat-delta 99e9))
|
|
>
>
>
>
>
>
|
|
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
|
(begin
(debug:print 0 "ERROR: tasks:server-get-server-id needs (hostname and pid) OR (iface and port) OR (hostname and port)")
"SELECT id FROM servers WHERE pid=-999;")))
(if hostname hostname iface)(if pid pid port))
res))
(define (tasks:server-update-heartbeat mdb server-id)
(debug:print-info 1 "Heart beat update of server id=" server-id)
(handle-exceptions
exn
(begin
(debug:print 0 "WARNING: probable timeout on monitor.db access")
(thread-sleep! 1)
(tasks:server-update-heartbeat mdb server-id))
(sqlite3:execute mdb "UPDATE servers SET heartbeat=strftime('%s','now') WHERE id=?;" server-id)))
;; alive servers keep the heartbeat field upto date with seconds every 6 or so seconds
(define (tasks:server-alive? mdb server-id #!key (iface #f)(hostname #f)(port #f)(pid #f))
(let* ((server-id (if server-id
server-id
(tasks:server-get-server-id mdb hostname iface port pid)))
(heartbeat-delta 99e9))
|
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
|
(define (tasks:have-clients? mdb server-id)
(null? (tasks:get-logged-in-clients mdb server-id)))
;; ping each server in the db and return first found that responds.
;; remove any others. will not necessarily remove all!
(define (tasks:get-best-server mdb)
(let ((res '())
(best #f))
(sqlite3:for-each-row
(lambda (id interface port pubport transport pid hostname)
(set! res (cons (vector id interface port pubport transport pid hostname) res))
;;(debug:print-info 2 "Found existing server " hostname ":" port " registered in db"))
)
mdb
"SELECT id,interface,port,pubport,transport,pid,hostname FROM servers
WHERE strftime('%s','now')-heartbeat < 10
AND mt_version=? ORDER BY start_time DESC LIMIT 1;" megatest-version)
;; for now we are keeping only one server registered in the db, return #f or first server found
(if (null? res) #f (car res))))
;; BUG: This logic is probably needed unless methodology changes completely...
;;
;; (if (null? res) #f
;; (let loop ((hed (car res))
|
|
>
>
>
>
>
|
|
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
|
(define (tasks:have-clients? mdb server-id)
(null? (tasks:get-logged-in-clients mdb server-id)))
;; ping each server in the db and return first found that responds.
;; remove any others. will not necessarily remove all!
(define (tasks:get-best-server mdb)
(let ((res '())
(best #f)
(transport (if (and *transport-type*
(not (eq? *transport-type* 'fs)))
(conc *transport-type*)
"%")))
(sqlite3:for-each-row
(lambda (id interface port pubport transport pid hostname)
(set! res (cons (vector id interface port pubport transport pid hostname) res))
;;(debug:print-info 2 "Found existing server " hostname ":" port " registered in db"))
)
mdb
"SELECT id,interface,port,pubport,transport,pid,hostname FROM servers
WHERE strftime('%s','now')-heartbeat < 10
AND mt_version=? AND transport LIKE ?
ORDER BY start_time DESC LIMIT 1;" (common:version-signature) transport)
;; for now we are keeping only one server registered in the db, return #f or first server found
(if (null? res) #f (car res))))
;; BUG: This logic is probably needed unless methodology changes completely...
;;
;; (if (null? res) #f
;; (let loop ((hed (car res))
|
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
|
" EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
(debug:print 1 "Sending signal/term to " pid " on " hostname)
(process-signal pid signal/term)
(thread-sleep! 5) ;; give it five seconds to die peacefully then do a brutal kill
;;(process-signal pid signal/kill)
) ;; local machine, send sig term
(begin
(debug:print-info 1 "Stopping remote servers not yet supported."))))
;; (debug:print-info 1 "Telling alive server on " hostname ":" port " to commit servercide")
;; (let ((serverdat (list hostname port)))
;; (case (string->symbol transport)
;; ((http)(http-transport:client-connect hostname port))
;; (else (debug:print "ERROR: remote stopping servers of type " transport " not supported yet")))
;; (cdb:kill-server serverdat))))) ;; remote machine, try telling server to commit suicide
(begin
(if status
(if (equal? hostname (get-host-name))
(begin
(debug:print-info 1 "Sending signal/term to " pid " on " hostname)
(process-signal pid signal/term) ;; local machine, send sig term
(thread-sleep! 5) ;; give it five seconds to die peacefully then do a brutal kill
|
|
|
|
|
|
|
|
|
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
|
" EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
(debug:print 1 "Sending signal/term to " pid " on " hostname)
(process-signal pid signal/term)
(thread-sleep! 5) ;; give it five seconds to die peacefully then do a brutal kill
;;(process-signal pid signal/kill)
) ;; local machine, send sig term
(begin
;;(debug:print-info 1 "Stopping remote servers not yet supported."))))
(debug:print-info 1 "Telling alive server on " hostname ":" port " to commit servercide")
(let ((serverdat (list hostname port)))
(case (if (string? transport) (string->symbol transport) transport)
((http)(http-transport:client-connect hostname port))
(else (debug:print "ERROR: remote stopping servers of type " transport " not supported yet")))
(cdb:kill-server serverdat pid))))) ;; remote machine, try telling server to commit suicide
(begin
(if status
(if (equal? hostname (get-host-name))
(begin
(debug:print-info 1 "Sending signal/term to " pid " on " hostname)
(process-signal pid signal/term) ;; local machine, send sig term
(thread-sleep! 5) ;; give it five seconds to die peacefully then do a brutal kill
|
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
|
(tasks:task-get-owner task)
flags)
(tasks:set-state mdb (tasks:task-get-id task) "waiting")))
(define (tasks:rollup-runs db mdb task)
(let* ((flags (make-hash-table))
(keys (db:get-keys db))
(keyvallst (keys:target->keyval keys (tasks:task-get-target task))))
;; (hash-table-set! flags "-rerun" "NOT_STARTED")
(print "Starting rollup " task)
;; sillyness, just call the damn routine with the task vector and be done with it. FIXME SOMEDAY
(runs:rollup-run db
keys
keyvallst
(tasks:task-get-name task)
(tasks:task-get-owner task))
(tasks:set-state mdb (tasks:task-get-id task) "waiting")))
|
|
|
|
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
|
(tasks:task-get-owner task)
flags)
(tasks:set-state mdb (tasks:task-get-id task) "waiting")))
(define (tasks:rollup-runs db mdb task)
(let* ((flags (make-hash-table))
(keys (db:get-keys db))
(keyvals (keys:target-keyval keys (tasks:task-get-target task))))
;; (hash-table-set! flags "-rerun" "NOT_STARTED")
(print "Starting rollup " task)
;; sillyness, just call the damn routine with the task vector and be done with it. FIXME SOMEDAY
(runs:rollup-run db
keys
keyvals
(tasks:task-get-name task)
(tasks:task-get-owner task))
(tasks:set-state mdb (tasks:task-get-id task) "waiting")))
|