;;======================================================================
;; Copyright 2006-2013, Matthew Welland.
;;
;; This file is part of Megatest.
;;
;; Megatest is free software: you can redistribute it and/or modify
;; it under the terms of the GNU General Public License as published by
;; the Free Software Foundation, either version 3 of the License, or
;; (at your option) any later version.
;;
;; Megatest is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
;; GNU General Public License for more details.
;;
;; You should have received a copy of the GNU General Public License
;; along with Megatest. If not, see <http://www.gnu.org/licenses/>.
;;
;;======================================================================
(declare (unit api))
(declare (uses db))
(declare (uses debugprint))
(declare (uses commonmod))
(declare (uses dbmod))
(declare (uses dbfile))
(declare (uses tasks))
(declare (uses tcp-transportmod))
(import commonmod)
(import dbmod)
(import dbfile)
(import debugprint)
(import tcp-transportmod)
(use srfi-69
srfi-18
posix
matchable
s11n
typed-records)
;; allow these queries through without starting a server
;;
(define api:read-only-queries
'(get-key-val-pairs
get-var
get-keys
get-key-vals
test-toplevel-num-items
get-test-info-by-id
get-test-state-status-by-id
get-steps-info-by-id
get-data-info-by-id
test-get-rundir-from-test-id
get-count-tests-running-for-testname
get-count-tests-running
get-count-tests-running-in-jobgroup
get-previous-test-run-record
get-matching-previous-test-run-records
test-get-logfile-info
test-get-records-for-index-file
get-testinfo-state-status
test-get-top-process-pid
test-get-paths-matching-keynames-target-new
get-prereqs-not-met
get-count-tests-running-for-run-id
get-run-info
get-run-status
get-run-state
get-run-stats
get-run-times
get-target
get-targets
;; register-run
get-tests-tags
get-test-times
get-tests-for-run
get-tests-for-run-state-status
get-test-id
get-tests-for-runs-mindata
get-tests-for-run-mindata
get-run-name-from-id
get-runs
simple-get-runs
get-num-runs
get-runs-cnt-by-patt
get-all-run-ids
get-prev-run-ids
get-run-ids-matching-target
get-runs-by-patt
get-steps-data
get-steps-for-test
read-test-data
read-test-data-varpatt
login
tasks-get-last
testmeta-get-record
have-incompletes?
get-changed-record-ids
get-all-runids
get-changed-record-test-ids
get-changed-record-run-ids
get-run-record-ids
get-not-completed-cnt))
(define api:write-queries
'(
get-keys-write ;; dummy "write" query to force server start
;; SERVERS
;; start-server
;; kill-server
;; TESTS
test-set-state-status-by-id
delete-test-records
delete-old-deleted-test-records
test-set-state-status
test-set-top-process-pid
set-state-status-and-roll-up-items
update-pass-fail-counts
top-test-set-per-pf-counts ;; (db:top-test-set-per-pf-counts (db:get-db *db* 5) 5 "runfirst")
;; RUNS
register-run
set-tests-state-status
delete-run
lock/unlock-run
update-run-event_time
mark-incomplete
set-state-status-and-roll-up-run
;; STEPS
teststep-set-status!
delete-steps-for-test
;; TEST DATA
test-data-rollup
csv->test-data
;; MISC
sync-cachedb->db
drop-all-triggers
create-all-triggers
update-tesdata-on-repilcate-db
;; TESTMETA
testmeta-add-record
testmeta-update-field
;; TASKS
tasks-add
tasks-set-state-given-param-key
))
(define *db-write-mutexes* (make-hash-table))
(define *server-signature* #f)
(define *api-threads* '())
(define (api:register-thread th-in)
(set! *api-threads* (cons (cons th-in (current-seconds)) *api-threads*)))
(define (api:unregister-thread th-in)
(set! *api-threads* (filter (lambda (thdat)
(not (eq? th-in (car thdat))))
*api-threads*)))
(define (api:remove-dead-or-terminated)
(set! *api-threads* (filter (lambda (thdat)
(not (member (thread-state (car thdat)) '(terminated dead))))
*api-threads*)))
(define (api:get-count-threads-alive)
(length *api-threads*))
(define *api:last-stats-print* 0)
(define *api-print-db-stats-mutex* (make-mutex))
(define (api:print-db-stats)
(debug:print-info 0 *default-log-port* "Started periodic db stats printer")
(let loop ()
(mutex-lock! *api-print-db-stats-mutex*)
(if (> (- (current-seconds) *api:last-stats-print*) 15)
(begin
(rmt:print-db-stats)
(set! *api:last-stats-print* (current-seconds))))
(mutex-unlock! *api-print-db-stats-mutex*)
(thread-sleep! 5)
(loop)))
(define *api:queue-mutex* (make-mutex))
(define *api:queue-id* 0)
(define *api:in-queue* '())
(define *api:results* (make-hash-table)) ;; id->queue-item
(define (api:start-queue-processor)
;; look for work in in-queue
;; process it
;; put result in out-queue
;; sleep 20ms
#t)
(defstuct api:queue-item
(proc #f)
(cmd #f)
(params #f)
(start-time (current-seconds))
(end-time #f)
(id #f))
(define (api:add-queue-item proc cmd params)
(mutex-lock *api:queue-mutex*)
(set! *api:queue-id* (+ *api:queue-id* 1))
(set! *api:in-queue*
(cons (make-api:queue-item
proc: proc
cmd: cmd
params: params
id: *api:queue-id*
)
*api:in-queue*))
(let ((id *api:queue-id*))
(mutex-unlock *apt:queue-mutex*)
id)) ;; return id so calling proc can find the result in *api:results*
(define (api:get-queue-item)
(mutex-lock *api:queue-mutex*)
(let* ((res (if (null? *api:in-queue*)
#f
(let* ((revlst (reverse *api:in-queue*)))
(set! *api:in-queue* (reverse (cdr revlist)))
(car revlist)))))
(mutex-unlock *api:queue-mutex*)
res))
(define (api:tcp-dispatch-request-make-handler-new dbstruct) ;; cmd run-id params)
;; put proc into in-queue
;; poll out-queue for result evey 25ms
;; time out with big message
(assert *toppath* "FATAL: api:tcp-dispatch-request-make-handler called but *toppath* not set.")
(if (not *server-signature*)
(set! *server-signature* (tt:mk-signature *toppath*)))
(lambda (indat)
(let* ((result
(let* ((numthreads (api:get-count-threads-alive))
(delay-wait (if (> numthreads 10)
(- numthreads 10)
0))
(normal-proc (lambda (cmd run-id params)
(case cmd
((ping) *server-signature*)
(else
(api:dispatch-request dbstruct cmd run-id params))))))
(set! *api-process-request-count* numthreads)
(set! *db-last-access* (current-seconds))
;; (if (not (eq? numthreads numthreads))
;; (begin
;; (api:remove-dead-or-terminated)
;; (let ((threads-now (api:get-count-threads-alive)))
;; (debug:print 0 *default-log-port* "WARNING: numthreads="numthreads", numthreads="numthreads", remaining="threads-now)
;; (set! numthreads threads-now))))
(match indat
((cmd run-id params meta)
(let* ((start-t (current-milliseconds))
(db-ok (let* ((dbfname (dbmod:run-id->dbfname run-id))
(ok (equal? dbfname (dbr:dbstruct-dbfname dbstruct))))
(case cmd
((ping) #t) ;; we are fine
(else
(assert ok "FATAL: database file and run-id not aligned.")))))
(ttdat *server-info*)
(server-state (tt-state ttdat))
(maxthreads 20) ;; make this a parameter?
(status (cond
((and (> numthreads maxthreads)
(> (random 100) 70)) ;; allow a 30% probability to go through so we can figure out what is going wrong in main.db server.
'busy)
;; ((> numthreads 5) 'loaded) ;; this gets transmitted to the client which calls tt:backoff-incr to slow stuff down.
(else 'ok)))
(errmsg (case status
((busy) (conc "Server overloaded, "numthreads" threads in flight"))
((loaded) (conc "Server loaded, "numthreads" threads in flight"))
(else #f)))
(result (case status
((busy)
(if (eq? cmd 'ping)
(normal-proc cmd run-id params)
;; numthreads must be greater than 5 for busy
(* 0.1 (- numthreads maxthreads)) ;; was 15 - return a number for the remote to delay
)) ;; (- numthreads 29)) ;; call back in as many seconds
((loaded)
;; (if (eq? (rmt:transport-mode) 'tcp)
;; (thread-sleep! 0.5))
(normal-proc cmd run-id params))
(else
(normal-proc cmd run-id params))))
(meta (case cmd
((ping) `((sstate . ,server-state)))
(else `((wait . ,delay-wait)))))
(payload (list status errmsg result meta)))
;; (cmd run-id params meta)
(db:add-stats cmd run-id params (- (current-milliseconds) start-t))
payload))
(else
(assert #f "FATAL: failed to deserialize indat "indat))))))
result)))
(define api:tcp-dispatch-request-make-handler api:tcp-dispatch-request-make-handler-old)
;; indat is (cmd run-id params meta)
;;
;; WARNING: Do not print anything in the lambda of this function as it
;; reads/writes to current in/out port
;;
(define (api:tcp-dispatch-request-make-handler-old dbstruct) ;; cmd run-id params)
(assert *toppath* "FATAL: api:tcp-dispatch-request-make-handler called but *toppath* not set.")
(if (not *server-signature*)
(set! *server-signature* (tt:mk-signature *toppath*)))
(lambda (indat)
(api:register-thread (current-thread))
(let* ((result
(let* ((numthreads (api:get-count-threads-alive))
(delay-wait (if (> numthreads 10)
(- numthreads 10)
0))
(normal-proc (lambda (cmd run-id params)
(case cmd
((ping) *server-signature*)
(else
(api:dispatch-request dbstruct cmd run-id params))))))
(set! *api-process-request-count* numthreads)
(set! *db-last-access* (current-seconds))
;; (if (not (eq? numthreads numthreads))
;; (begin
;; (api:remove-dead-or-terminated)
;; (let ((threads-now (api:get-count-threads-alive)))
;; (debug:print 0 *default-log-port* "WARNING: numthreads="numthreads", numthreads="numthreads", remaining="threads-now)
;; (set! numthreads threads-now))))
(match indat
((cmd run-id params meta)
(let* ((start-t (current-milliseconds))
(db-ok (let* ((dbfname (dbmod:run-id->dbfname run-id))
(ok (equal? dbfname (dbr:dbstruct-dbfname dbstruct))))
(case cmd
((ping) #t) ;; we are fine
(else
(assert ok "FATAL: database file and run-id not aligned.")))))
(ttdat *server-info*)
(server-state (tt-state ttdat))
(maxthreads 20) ;; make this a parameter?
(status (cond
((and (> numthreads maxthreads)
(> (random 100) 70)) ;; allow a 30% probability to go through so we can figure out what is going wrong in main.db server.
'busy)
;; ((> numthreads 5) 'loaded) ;; this gets transmitted to the client which calls tt:backoff-incr to slow stuff down.
(else 'ok)))
(errmsg (case status
((busy) (conc "Server overloaded, "numthreads" threads in flight"))
((loaded) (conc "Server loaded, "numthreads" threads in flight"))
(else #f)))
(result (case status
((busy)
(if (eq? cmd 'ping)
(normal-proc cmd run-id params)
;; numthreads must be greater than 5 for busy
(* 0.1 (- numthreads maxthreads)) ;; was 15 - return a number for the remote to delay
)) ;; (- numthreads 29)) ;; call back in as many seconds
((loaded)
;; (if (eq? (rmt:transport-mode) 'tcp)
;; (thread-sleep! 0.5))
(normal-proc cmd run-id params))
(else
(normal-proc cmd run-id params))))
(meta (case cmd
((ping) `((sstate . ,server-state)))
(else `((wait . ,delay-wait)))))
(payload (list status errmsg result meta)))
;; (cmd run-id params meta)
(db:add-stats cmd run-id params (- (current-milliseconds) start-t))
payload))
(else
(assert #f "FATAL: failed to deserialize indat "indat))))))
;; (set! *api-process-request-count* (- *api-process-request-count* 1))
;; (serialize payload)
(api:unregister-thread (current-thread))
result)))
(define *api-halt-writes* #f)
(define (api:dispatch-request dbstruct cmd run-id params)
(if (not *no-sync-db*)
(db:open-no-sync-db))
(let* ((start-time (current-milliseconds)))
(if (member cmd api:write-queries)
(let loop ()
(if *api-halt-writes*
(begin
(thread-sleep! 0.2)
(if (< (- (current-milliseconds) start-time)
5000) ;; hope it don't take more than five seconds to sync
(loop-time)
#;(debug:print 0 *default-log-port* "ERROR: writes halted for more than 5 seconds, sync might be taking too long"))))))
(db:add-stats 'api-write-blocking-for-sync run-id params (- (current-milliseconds) start-time)))
(case cmd
;;===============================================
;; READ/WRITE QUERIES
;;===============================================
((get-keys-write) (db:get-keys dbstruct)) ;; force a dummy "write" query to force server; for debug in -repl
;; SERVERS
((start-server) (apply tt:server-process-run params))
((kill-server) (set! *server-run* #f))
;; TESTS
;;((test-set-state-status-by-id) (apply mt:test-set-state-status-by-id dbstruct params))
;;BB - commented out above because it was calling below, eventually, incorrectly (dbstruct passed to mt:test-set-state-status-by-id, which previosly did more, but now only passes thru to db:set-state-status-and-roll-up-items.
((test-set-state-status-by-id)
;; (define (db:set-state-status-and-roll-up-items dbstruct run-id test-name item-path state status comment)
(db:set-state-status-and-roll-up-items
dbstruct
(list-ref params 0) ; run-id
(list-ref params 1) ; test-name
#f ; item-path
(list-ref params 2) ; state
(list-ref params 3) ; status
(list-ref params 4) ; comment
))
((delete-test-records) (apply db:delete-test-records dbstruct params))
((delete-old-deleted-test-records) (apply db:delete-old-deleted-test-records dbstruct params))
((test-set-state-status) (apply db:test-set-state-status dbstruct params))
((test-set-top-process-pid) (apply db:test-set-top-process-pid dbstruct params))
((set-state-status-and-roll-up-items) (apply db:set-state-status-and-roll-up-items dbstruct params))
((set-state-status-and-roll-up-run) (apply db:set-state-status-and-roll-up-run dbstruct params))
((top-test-set-per-pf-counts) (apply db:top-test-set-per-pf-counts dbstruct params))
((test-set-archive-block-id) (apply db:test-set-archive-block-id dbstruct params))
((insert-test) (db:insert-test dbstruct run-id params))
;; RUNS
((register-run) (apply db:register-run dbstruct params))
((set-tests-state-status) (apply db:set-tests-state-status dbstruct params))
((delete-run) (apply db:delete-run dbstruct params))
((lock/unlock-run) (apply db:lock/unlock-run dbstruct params))
((update-run-event_time) (apply db:update-run-event_time dbstruct params))
((update-run-stats) (apply db:update-run-stats dbstruct params))
((set-var) (apply db:set-var dbstruct params))
((inc-var) (apply db:inc-var dbstruct params))
((dec-var) (apply db:dec-var dbstruct params))
((del-var) (apply db:del-var dbstruct params))
((add-var) (apply db:add-var dbstruct params))
((insert-run) (apply db:insert-run dbstruct params))
;; STEPS
((teststep-set-status!) (apply db:teststep-set-status! dbstruct params))
((delete-steps-for-test!) (apply db:delete-steps-for-test! dbstruct params))
;; TEST DATA
((test-data-rollup) (apply db:test-data-rollup dbstruct params))
((csv->test-data) (apply db:csv->test-data dbstruct params))
;; MISC
((sync-cachedb->db) (let ((run-id (car params)))
(db:sync-touched dbstruct run-id db:initialize-main-db force-sync: #t)))
((get-toplevels-and-incompletes) (apply db:get-toplevels-and-incompletes dbstruct params))
((mark-incomplete) #f);;(thread-start! (make-thread (lambda () ;; no need to block on this one
;; (apply db:find-and-mark-incomplete dbstruct params)
;; #t))))
((create-all-triggers) (db:create-all-triggers dbstruct))
((drop-all-triggers) (db:drop-all-triggers dbstruct))
;; TESTMETA
((testmeta-add-record) (apply db:testmeta-add-record dbstruct params))
((testmeta-update-field) (apply db:testmeta-update-field dbstruct params))
((get-tests-tags) (db:get-tests-tags dbstruct))
;; TASKS
((tasks-add) (apply tasks:add dbstruct params))
((tasks-set-state-given-param-key) (apply tasks:set-state-given-param-key dbstruct params))
((tasks-get-last) (apply tasks:get-last dbstruct params))
;; NO SYNC DB
((no-sync-set) (apply db:no-sync-set *no-sync-db* params))
((no-sync-get/default) (apply db:no-sync-get/default *no-sync-db* params))
((no-sync-del!) (apply db:no-sync-del! *no-sync-db* params))
((no-sync-get-lock) (apply db:no-sync-get-lock *no-sync-db* params))
;; NO SYNC DB PROCESSES
((register-process) (apply dbfile:register-process *no-sync-db* params))
((set-process-done) (apply dbfile:set-process-done *no-sync-db* params))
((set-process-status) (apply dbfile:set-process-status *no-sync-db* params))
((get-process-options) (apply dbfile:get-process-options *no-sync-db* params))
;; ARCHIVES
;; ((archive-get-allocations)
((archive-register-disk) (apply db:archive-register-disk dbstruct params))
((archive-register-block-name)(apply db:archive-register-block-name dbstruct params))
;; ((archive-allocate-testsuite/area-to-block)(apply db:archive-allocate-testsuite/area-to-block dbstruct block-id testsuite-name areakey))
;;======================================================================
;; READ ONLY QUERIES
;;======================================================================
;; KEYS
((get-key-val-pairs) (apply db:get-key-val-pairs dbstruct params))
((get-keys) (db:get-keys dbstruct))
((get-key-vals) (apply db:get-key-vals dbstruct params))
((get-target) (apply db:get-target dbstruct params))
((get-targets) (db:get-targets dbstruct))
;; ARCHIVES
((test-get-archive-block-info) (apply db:test-get-archive-block-info dbstruct params))
;; TESTS
((test-toplevel-num-items) (apply db:test-toplevel-num-items dbstruct params))
((get-test-info-by-id) (apply db:get-test-info-by-id dbstruct params))
((get-test-state-status-by-id) (apply db:get-test-state-status-by-id dbstruct params))
((test-get-rundir-from-test-id) (apply db:test-get-rundir-from-test-id dbstruct params))
((get-count-tests-running-for-testname) (apply db:get-count-tests-running-for-testname dbstruct params))
((get-count-tests-running) (apply db:get-count-tests-running dbstruct params))
((get-count-tests-running-in-jobgroup) (apply db:get-count-tests-running-in-jobgroup dbstruct params))
;; ((delete-test-step-records) (apply db:delete-test-step-records dbstruct params))
;; ((get-previous-test-run-record) (apply db:get-previous-test-run-record dbstruct params))
((get-matching-previous-test-run-records)(apply db:get-matching-previous-test-run-records dbstruct params))
((test-get-logfile-info) (apply db:test-get-logfile-info dbstruct params))
((test-get-records-for-index-file) (apply db:test-get-records-for-index-file dbstruct params))
((get-testinfo-state-status) (apply db:get-testinfo-state-status dbstruct params))
((test-get-top-process-pid) (apply db:test-get-top-process-pid dbstruct params))
((test-get-paths-matching-keynames-target-new) (apply db:test-get-paths-matching-keynames-target-new dbstruct params))
((get-prereqs-not-met) (apply db:get-prereqs-not-met dbstruct params))
((get-count-tests-running-for-run-id) (apply db:get-count-tests-running-for-run-id dbstruct params))
((get-not-completed-cnt) (apply db:get-not-completed-cnt dbstruct params))
((get-raw-run-stats) (apply db:get-raw-run-stats dbstruct params))
((get-test-times) (apply db:get-test-times dbstruct params))
;; RUNS
((get-run-info) (apply db:get-run-info dbstruct params))
((get-run-status) (apply db:get-run-status dbstruct params))
((get-run-state) (apply db:get-run-state dbstruct params))
((get-run-state-status) (apply db:get-run-state-status dbstruct params))
((set-run-status) (apply db:set-run-status dbstruct params))
((set-run-state-status) (apply db:set-run-state-status dbstruct params))
((update-tesdata-on-repilcate-db) (apply db:update-tesdata-on-repilcate-db dbstruct params))
((get-tests-for-run) (apply db:get-tests-for-run dbstruct params))
((get-tests-for-run-state-status) (apply db:get-tests-for-run-state-status dbstruct params))
((get-test-id) (apply db:get-test-id dbstruct params))
((get-tests-for-run-mindata) (apply db:get-tests-for-run-mindata dbstruct params))
;; ((get-tests-for-runs-mindata) (apply db:get-tests-for-runs-mindata dbstruct params))
((get-runs) (apply db:get-runs dbstruct params))
((simple-get-runs) (apply db:simple-get-runs dbstruct params))
((get-num-runs) (apply db:get-num-runs dbstruct params))
((get-runs-cnt-by-patt) (apply db:get-runs-cnt-by-patt dbstruct params))
((get-all-run-ids) (db:get-all-run-ids dbstruct))
((get-prev-run-ids) (apply db:get-prev-run-ids dbstruct params))
((get-run-ids-matching-target) (apply db:get-run-ids-matching-target dbstruct params))
((get-runs-by-patt) (apply db:get-runs-by-patt dbstruct params))
((get-run-name-from-id) (apply db:get-run-name-from-id dbstruct params))
((get-main-run-stats) (apply db:get-main-run-stats dbstruct params))
((get-var) (apply db:get-var dbstruct params))
((get-run-stats) (apply db:get-run-stats dbstruct params))
((get-run-times) (apply db:get-run-times dbstruct params))
;; STEPS
((get-steps-data) (apply db:get-steps-data dbstruct params))
((get-steps-for-test) (apply db:get-steps-for-test dbstruct params))
((get-steps-info-by-id) (apply db:get-steps-info-by-id dbstruct params))
;; TEST DATA
((read-test-data) (apply db:read-test-data dbstruct params))
((read-test-data-varpatt) (apply db:read-test-data-varpatt dbstruct params))
((get-data-info-by-id) (apply db:get-data-info-by-id dbstruct params))
;; MISC
((get-latest-host-load) (apply db:get-latest-host-load dbstruct params))
((have-incompletes?) (apply db:have-incompletes? dbstruct params))
((login) (apply db:login dbstruct params))
((general-call) (let ((stmtname (car params))
(run-id (cadr params))
(realparams (cddr params)))
(db:general-call dbstruct run-id stmtname realparams)))
((sdb-qry) (apply sdb:qry params))
((ping) (current-process-id))
((get-changed-record-ids) (apply db:get-changed-record-ids dbstruct params))
((get-changed-record-test-ids) (apply db:get-changed-record-test-ids dbstruct params))
((get-changed-record-run-ids) (apply db:get-changed-record-run-ids dbstruct params))
((get-run-record-ids) (apply db:get-run-record-ids dbstruct params))
((get-all-runids) (apply db:get-all-runids dbstruct))
;; TESTMETA
((testmeta-get-record) (apply db:testmeta-get-record dbstruct params))
;; TASKS
((find-task-queue-records) (apply tasks:find-task-queue-records dbstruct params))
(else
(debug:print 0 *default-log-port* "ERROR: bad api call " cmd)
(conc "ERROR: BAD api call " cmd))))