89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
(define (tasks:hostinfo-get-port vec) (vector-ref vec 2))
(define (tasks:hostinfo-get-pubport vec) (vector-ref vec 3))
(define (tasks:hostinfo-get-transport vec) (vector-ref vec 4))
(define (tasks:hostinfo-get-pid vec) (vector-ref vec 5))
(define (tasks:hostinfo-get-hostname vec) (vector-ref vec 6))
(define (tasks:server-lock-slot mdb run-id)
(tasks:server-clean-out-old-records-for-run-id mdb run-id)
(if (< (tasks:num-in-available-state mdb run-id) 4)
(begin
(tasks:server-set-available mdb run-id)
(thread-sleep! 2) ;; Try removing this. It may not be needed.
(tasks:server-am-i-the-server? mdb run-id))
#f))
|
|
|
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
(define (tasks:hostinfo-get-port vec) (vector-ref vec 2))
(define (tasks:hostinfo-get-pubport vec) (vector-ref vec 3))
(define (tasks:hostinfo-get-transport vec) (vector-ref vec 4))
(define (tasks:hostinfo-get-pid vec) (vector-ref vec 5))
(define (tasks:hostinfo-get-hostname vec) (vector-ref vec 6))
(define (tasks:server-lock-slot mdb run-id)
(tasks:server-clean-out-old-records-for-run-id mdb run-id " tasks:server-lock-slot")
(if (< (tasks:num-in-available-state mdb run-id) 4)
(begin
(tasks:server-set-available mdb run-id)
(thread-sleep! 2) ;; Try removing this. It may not be needed.
(tasks:server-am-i-the-server? mdb run-id))
#f))
|
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
|
(lambda (num-in-queue)
(set! res num-in-queue))
mdb
"SELECT count(id) FROM servers WHERE run_id=? AND state = 'available';"
run-id)
res))
(define (tasks:server-clean-out-old-records-for-run-id mdb run-id)
(sqlite3:execute mdb "UPDATE servers SET state='defunct' WHERE state in ('available','shutting-down') AND (strftime('%s','now') - start_time) > 300 AND run_id=?;" run-id))
(define (tasks:server-force-clean-running-records-for-run-id mdb run-id)
(sqlite3:execute mdb "UPDATE servers SET state='defunct' WHERE state = 'running' AND run_id=?;" run-id))
(define (tasks:server-force-clean-run-record mdb run-id iface port)
(sqlite3:execute mdb "UPDATE servers SET state='defunct' WHERE state = 'running' AND run_id=? AND interface=? AND port=?;"
run-id iface port))
(define (tasks:server-delete-records-for-this-pid mdb)
(sqlite3:execute mdb "UPDATE servers SET state='defunct' WHERE hostname=? AND pid=?;" (get-host-name) (current-process-id)))
(define (tasks:server-delete-record mdb server-id)
(sqlite3:execute mdb "UPDATE servers SET state='defunct' WHERE id=?;" server-id)
;; use this opportuntity to clean out records over one month old
(sqlite3:execute mdb "DELETE FROM servers WHERE state not in ('running','shutting-down') AND (strftime('%s','now') - start_time) > 2628000;"))
(define (tasks:server-set-state! mdb server-id state)
(sqlite3:execute mdb "UPDATE servers SET state=? WHERE id=?;" state server-id))
(define (tasks:server-set-interface-port mdb server-id interface port)
(sqlite3:execute mdb "UPDATE servers SET interface=?,port=? WHERE id=?;" interface port server-id))
;; Get random port not used in long time
;;
(define (tasks:server-get-next-port mdb)
(let* ((lownum 30000)
(highnum 64000)
(used-ports '())
|
|
|
>
|
|
>
|
|
|
|
|
>
|
|
>
|
|
|
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
|
(lambda (num-in-queue)
(set! res num-in-queue))
mdb
"SELECT count(id) FROM servers WHERE run_id=? AND state = 'available';"
run-id)
res))
(define (tasks:server-clean-out-old-records-for-run-id mdb run-id tag)
(sqlite3:execute mdb "UPDATE servers SET state=?,heartbeat=strftime('%s','now') WHERE state in ('available','shutting-down') AND (strftime('%s','now') - start_time) > 300 AND run_id=?;"
(conc "defunct" tag) run-id))
(define (tasks:server-force-clean-running-records-for-run-id mdb run-id tag)
(sqlite3:execute mdb "UPDATE servers SET state=?,heartbeat=strftime('%s','now') WHERE state = 'running' AND run_id=?;"
(conc "defunct" tag) run-id))
(define (tasks:server-force-clean-run-record mdb run-id iface port tag)
(sqlite3:execute mdb "UPDATE servers SET state=?,heartbeat=strftime('%s','now') WHERE state = 'running' AND run_id=? AND interface=? AND port=?;"
(conc "defunct" tag) run-id iface port))
(define (tasks:server-delete-records-for-this-pid mdb tag)
(sqlite3:execute mdb "UPDATE servers SET state=?,heartbeat=strftime('%s','now') WHERE hostname=? AND pid=?;"
(conc "defunct" tag) (get-host-name) (current-process-id)))
(define (tasks:server-delete-record mdb server-id tag)
(sqlite3:execute mdb "UPDATE servers SET state=?,heartbeat=strftime('%s','now') WHERE id=?;"
(conc "defunct" tag) server-id)
;; use this opportuntity to clean out records over one month old
(sqlite3:execute mdb "DELETE FROM servers WHERE state not in ('running','shutting-down') AND (strftime('%s','now') - start_time) > 2628000;"))
(define (tasks:server-set-state! mdb server-id state)
(sqlite3:execute mdb "UPDATE servers SET state=?,heartbeat=strftime('%s','now') WHERE id=?;" state server-id))
(define (tasks:server-set-interface-port mdb server-id interface port)
(sqlite3:execute mdb "UPDATE servers SET interface=?,port=?,heartbeat=strftime('%s','now') WHERE id=?;" interface port server-id))
;; Get random port not used in long time
;;
(define (tasks:server-get-next-port mdb)
(let* ((lownum 30000)
(highnum 64000)
(used-ports '())
|
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
|
WHERE run_id=? AND state='running'
ORDER BY start_time DESC LIMIT 1;" run-id) ;; (common:version-signature) run-id)
res))
(define (tasks:get-all-servers mdb)
(let ((res '()))
(sqlite3:for-each-row
(lambda (id pid hostname interface port pubport start-time priority state mt-version last-update transport)
(set! res (cons (vector id pid hostname interface port pubport start-time priority state mt-version last-update transport) res)))
mdb
"SELECT id,pid,hostname,interface,port,pubport,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update,transport FROM servers ORDER BY start_time DESC;")
res))
(define (tasks:kill-server status hostname port pid)
(debug:print-info 1 "Removing defunct server record for " hostname ":" port)
(if port
(open-run-close tasks:server-deregister tasks:open-db hostname port: port)
(open-run-close tasks:server-deregister tasks:open-db hostname pid: pid))
|
|
>
|
|
|
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
|
WHERE run_id=? AND state='running'
ORDER BY start_time DESC LIMIT 1;" run-id) ;; (common:version-signature) run-id)
res))
(define (tasks:get-all-servers mdb)
(let ((res '()))
(sqlite3:for-each-row
(lambda (id pid hostname interface port pubport start-time priority state mt-version last-update transport run-id)
;; 0 1 2 3 4 5 6 7 8 9 10 11 12
(set! res (cons (vector id pid hostname interface port pubport start-time priority state mt-version last-update transport run-id) res)))
mdb
"SELECT id,pid,hostname,interface,port,pubport,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update,transport,run_id FROM servers WHERE state NOT LIKE 'defunct%' ORDER BY start_time DESC;")
res))
(define (tasks:kill-server status hostname port pid)
(debug:print-info 1 "Removing defunct server record for " hostname ":" port)
(if port
(open-run-close tasks:server-deregister tasks:open-db hostname port: port)
(open-run-close tasks:server-deregister tasks:open-db hostname pid: pid))
|