;; Copyright 2006-2013, Matthew Welland.
;;
;; This program is made available under the GNU GPL version 2.0 or
;; greater. See the accompanying file COPYING for details.
;;
;; This program is distributed WITHOUT ANY WARRANTY; without even the
;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
;; PURPOSE.
(use sqlite3 srfi-18)
(import (prefix sqlite3 sqlite3:))
(declare (unit lock-queue))
(declare (uses common))
(declare (uses tasks))
;;======================================================================
;; attempt to prevent overlapping updates of rollup files by queueing
;; update requests in an sqlite db
;;======================================================================
;;======================================================================
;; db record, <vector db path-to-db>
;;======================================================================
(define (make-lock-queue:db-dat)(make-vector 3))
(define-inline (lock-queue:db-dat-get-db vec) (vector-ref vec 0))
(define-inline (lock-queue:db-dat-get-path vec) (vector-ref vec 1))
(define-inline (lock-queue:db-dat-set-db! vec val)(vector-set! vec 0 val))
(define-inline (lock-queue:db-dat-set-path! vec val)(vector-set! vec 1 val))
(define (lock-queue:delete-lock-db dbdat)
(let ((fname (lock-queue:db-dat-get-path dbdat)))
(system (conc "rm -f " fname "*"))))
(define (lock-queue:open-db fname #!key (count 10))
(let* ((actualfname (conc fname ".lockdb"))
(dbexists (file-exists? actualfname))
(db (sqlite3:open-database actualfname))
(handler (make-busy-timeout 136000)))
(if dbexists
(vector db actualfname)
(begin
(common:debug-handle-exceptions #t
exn
(begin
(thread-sleep! 10)
(if (> count 0)
(lock-queue:open-db fname count: (- count 1))
(vector db actualfname)))
(sqlite3:with-transaction
db
(lambda ()
(sqlite3:execute
db
"CREATE TABLE IF NOT EXISTS queue (
id INTEGER PRIMARY KEY,
test_id INTEGER,
start_time INTEGER,
state TEXT,
CONSTRAINT queue_constraint UNIQUE (test_id));")
(sqlite3:execute
db
"CREATE TABLE IF NOT EXISTS runlocks (
id INTEGER PRIMARY KEY,
test_id INTEGER,
run_lock TEXT,
CONSTRAINT runlock_constraint UNIQUE (run_lock));"))))))
(sqlite3:set-busy-handler! db handler)
(vector db actualfname)))
(define (lock-queue:set-state dbdat test-id newstate #!key (remtries 10))
(tasks:wait-on-journal (lock-queue:db-dat-get-path dbdat) 1200)
(common:debug-handle-exceptions #t
exn
(if (> remtries 0)
(begin
(debug:print 0 *default-log-port* "WARNING: exception on lock-queue:set-state. Trying again in 30 seconds.")
(debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn))
(thread-sleep! 30)
(lock-queue:set-state dbdat test-id newstate remtries: (- remtries 1)))
(begin
(debug:print-error 0 *default-log-port* " Failed to set lock state for test with id " test-id ", error: " ((condition-property-accessor 'exn 'message) exn) ", giving up.")
#f))
(sqlite3:execute (lock-queue:db-dat-get-db dbdat) "UPDATE queue SET state=? WHERE test_id=?;"
newstate
test-id)))
(define (lock-queue:any-younger? dbdat mystart test-id #!key (remtries 10))
;; no need to wait on journal on read only queries
;; (tasks:wait-on-journal (lock-queue:db-dat-get-path dbdat) 1200)
(common:debug-handle-exceptions #t
exn
(if (> remtries 0)
(begin
(debug:print 0 *default-log-port* "WARNING: exception on lock-queue:any-younger. Removing lockdb and trying again in 5 seconds.")
(debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn))
(thread-sleep! 5)
(lock-queue:delete-lock-db dbdat)
(lock-queue:any-younger? dbdat mystart test-id remtries: (- remtries 1)))
(begin
(debug:print-error 0 *default-log-port* " Failed to find younger locks for test with id " test-id ", error: " ((condition-property-accessor 'exn 'message) exn) ", giving up.")
#f))
(let ((res #f))
(sqlite3:for-each-row
(lambda (tid)
;; Actually this should not be needed as mystart cannot be simultaneously less than and test-id same as
(if (not (equal? tid test-id))
(set! res tid)))
(lock-queue:db-dat-get-db dbdat)
"SELECT test_id FROM queue WHERE start_time > ?;" mystart)
res)))
(define (lock-queue:get-lock dbdat test-id #!key (count 10)(waiting-msg #f))
(tasks:wait-on-journal (lock-queue:db-dat-get-path dbdat) 1200 remove: #t waiting-msg: "lock-queue:get-lock, waiting on journal")
(let* ((res #f)
(db (lock-queue:db-dat-get-db dbdat))
(lckqry (sqlite3:prepare db "SELECT test_id,run_lock FROM runlocks WHERE run_lock='locked';"))
(mklckqry (sqlite3:prepare db "INSERT INTO runlocks (test_id,run_lock) VALUES (?,'locked');")))
(let ((result
(common:debug-handle-exceptions #t
exn
(begin
(debug:print 0 *default-log-port* "WARNING: failed to get queue lock. Removing lock db and returning fail") ;; Will try again in a few seconds")
(debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn))
(thread-sleep! 10)
;; (if (> count 0)
;; #f ;; (lock-queue:get-lock dbdat test-id count: (- count 1)) - give up on retries
;; (begin ;; never recovered, remote the lock file and return #f, no lock obtained
(lock-queue:delete-lock-db dbdat)
#f)
(sqlite3:with-transaction
db
(lambda ()
(sqlite3:for-each-row (lambda (tid lockstate)
(set! res (list tid lockstate)))
lckqry)
(if res
(if (equal? (car res) test-id)
#t ;; already have the lock
#f)
(begin
(sqlite3:execute mklckqry test-id)
;; if no error handled then return #t for got the lock
#t)))))))
(sqlite3:finalize! lckqry)
(sqlite3:finalize! mklckqry)
result)))
(define (lock-queue:release-lock fname test-id #!key (count 10))
(let* ((dbdat (lock-queue:open-db fname)))
(tasks:wait-on-journal (lock-queue:db-dat-get-path dbdat) 1200 "lock-queue:release-lock; waiting on journal")
(common:debug-handle-exceptions #t
exn
(begin
(debug:print 0 *default-log-port* "WARNING: Failed to release queue lock. Will try again in few seconds")
(debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn))
(thread-sleep! (/ count 10))
(if (> count 0)
(begin
(sqlite3:finalize! (lock-queue:db-dat-get-db dbdat))
(lock-queue:release-lock fname test-id count: (- count 1)))
(let ((journal (conc fname "-journal")))
;; If we've tried ten times and failed there is a serious problem
;; try to remove the lock db and allow it to be recreated
(common:debug-handle-exceptions #t
exn
#f
(if (file-exists? journal)(delete-file journal))
(if (file-exists? fname) (delete-file fname))
#f))))
(sqlite3:execute (lock-queue:db-dat-get-db dbdat) "DELETE FROM runlocks WHERE test_id=?;" test-id)
(sqlite3:finalize! (lock-queue:db-dat-get-db dbdat)))))
(define (lock-queue:steal-lock dbdat test-id #!key (count 10))
(debug:print-info 0 *default-log-port* "Attempting to steal lock at " (lock-queue:db-dat-get-path dbdat))
(tasks:wait-on-journal (lock-queue:db-dat-get-path dbdat) 1200 "lock-queue:steal-lock; waiting on journal")
(common:debug-handle-exceptions #t
exn
(begin
(debug:print 0 *default-log-port* "WARNING: Failed to steal queue lock. Will try again in few seconds")
(debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn))
(thread-sleep! 10)
(if (> count 0)
(lock-queue:steal-lock dbdat test-id count: (- count 1))
#f))
(sqlite3:execute (lock-queue:db-dat-get-db dbdat) "DELETE FROM runlocks WHERE run_lock='locked';"))
(lock-queue:get-lock dbdat test-it))
;; returns #f if ok to skip the task
;; returns #t if ok to proceed with task
;; otherwise waits
;;
(define (lock-queue:wait-turn fname test-id #!key (count 10)(waiting-msg #f))
(let* ((dbdat (lock-queue:open-db fname))
(mystart (current-seconds))
(db (lock-queue:db-dat-get-db dbdat)))
;; (tasks:wait-on-journal (lock-queue:db-dat-get-path dbdat) 1200 waiting-msg: "lock-queue:wait-turn; waiting on journal file")
(common:debug-handle-exceptions #t
exn
(begin
(debug:print 0 *default-log-port* "WARNING: Failed to find out if it is ok to skip the wait queue. Will try again in few seconds")
(debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn))
(print-call-chain (current-error-port))
(thread-sleep! 10)
(if (> count 0)
(begin
(sqlite3:finalize! db)
(lock-queue:wait-turn fname test-id count: (- count 1)))
(begin
(debug:print 0 *default-log-port* "Giving up calls to lock-queue:wait-turn for test-id " test-id " at path " fname ", printing call chain")
(print-call-chain (current-error-port))
#f)))
;; wait 10 seconds and then check to see if someone is already updating the html
(thread-sleep! 10)
(if (not (lock-queue:any-younger? dbdat mystart test-id)) ;; no processing in flight, must try to start processing
(begin
(tasks:wait-on-journal (lock-queue:db-dat-get-path dbdat) 1200 waiting-msg: "lock-queue:wait-turn; waiting on journal file")
(sqlite3:execute
db
"INSERT OR REPLACE INTO queue (test_id,start_time,state) VALUES (?,?,'waiting');"
test-id mystart)
;; (thread-sleep! 1) ;; give other tests a chance to register
(let ((result
(let loop ((younger-waiting (lock-queue:any-younger? dbdat mystart test-id)))
(if younger-waiting
(begin
;; no need for us to wait. mark in the lock queue db as skipping
;; no point in marking anything in the queue - simply never register this
;; test as it is *covered* by a previously started update to the html file
;; (lock-queue:set-state dbdat test-id "skipping")
#f) ;; let the calling process know that nothing needs to be done
(if (lock-queue:get-lock dbdat test-id)
#t
(if (> (- (current-seconds) mystart) 36000) ;; waited too long, steal the lock
(lock-queue:steal-lock dbdat test-id)
(begin
(thread-sleep! 1)
(loop (lock-queue:any-younger? dbdat mystart test-id)))))))))
(sqlite3:finalize! db)
result))))))
;; (use trace)
;; (trace lock-queue:get-lock lock-queue:release-lock lock-queue:wait-turn lock-queue:any-younger? lock-queue:set-state)