168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
|
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
|
-
+
-
+
-
+
-
+
|
(define (tasks:hostinfo-get-interface vec) (vector-ref vec 1))
(define (tasks:hostinfo-get-port vec) (vector-ref vec 2))
(define (tasks:hostinfo-get-pubport vec) (vector-ref vec 3))
(define (tasks:hostinfo-get-transport vec) (vector-ref vec 4))
(define (tasks:hostinfo-get-pid vec) (vector-ref vec 5))
(define (tasks:hostinfo-get-hostname vec) (vector-ref vec 6))
(define (tasks:server-lock-slot mdb run-id)
(define (tasks:server-lock-slot mdb run-id transport-type)
(tasks:server-clean-out-old-records-for-run-id mdb run-id " tasks:server-lock-slot")
(if (< (tasks:num-in-available-state mdb run-id) 4)
(begin
(tasks:server-set-available mdb run-id)
(tasks:server-set-available mdb run-id transport-type)
(thread-sleep! (/ (random 1500) 1000)) ;; (thread-sleep! 2) ;; Try removing this. It may not be needed.
(tasks:server-am-i-the-server? mdb run-id))
#f))
;; register that this server may come online (first to register goes though with the process)
(define (tasks:server-set-available mdb run-id)
(define (tasks:server-set-available mdb run-id transport-type)
(sqlite3:execute
mdb
"INSERT INTO servers (pid,hostname,port,pubport,start_time, priority,state,mt_version,heartbeat, interface,transport,run_id)
VALUES(?, ?, ?, ?, strftime('%s','now'), ?, ?, ?,-1,?, ?, ?);"
(current-process-id) ;; pid
(get-host-name) ;; hostname
-1 ;; port
-1 ;; pubport
(random 1000) ;; priority (used a tiebreaker on get-available)
"available" ;; state
(common:version-signature) ;; mt_version
-1 ;; interface
;; (conc (server:get-transport)) ;; transport
(conc *transport-type*) ;; transport
(symbol->string transport-type) ;; transport
run-id
))
(define (tasks:num-in-available-state mdb run-id)
(let ((res 0))
(sqlite3:for-each-row
(lambda (num-in-queue)
|
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
|
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
|
-
+
-
+
|
(let* ((header (list "id" "hostname" "pid" "interface" "port" "pubport" "state" "run_id" "priority" "start_time"))
(selstr (string-intersperse header ","))
(res '()))
(sqlite3:for-each-row
(lambda (a . b)
(set! res (cons (apply vector a b) res)))
mdb
(conc "SELECT " selstr " FROM servers WHERE run_id=? AND state in ('available','running','dbprep') ORDER BY start_time DESC;")
(conc "SELECT " selstr " FROM servers WHERE state in ('available','running','dbprep') ORDER BY start_time DESC;")
run-id)
)
(vector header res)))
(define (tasks:get-server mdb run-id #!key (retries 10))
(let ((res #f)
(best #f))
(handle-exceptions
exn
|