#!/opt/chicken/bin/csi -s
(use mailbox-threads typed-records matchable mailbox)
;;; create a threaded job queue
;;; submit job
;;; - command line
;;; - working dir / default pwd
;;; - env hash / default current env
;;; - callback on exit 0 / default noop
;;; - callback on nonzero exit / default noop
;; tjq == threaded job queue; a job is a unix command
(define getenv get-environment-variable)
(defstruct tjq:job
id ;; assigned at construction time when added to waiting q
state ;; assigned at construction time when added to waiting q
(pid #f) ;; assigned when moved from ready q to running q
(exit-code #f) ;; assigned when moved from running to done
(time-entered-waiting #f)
(time-entered-ready #f)
(time-entered-running #f)
(time-entered-done #f)
;; following are key options to submit method
(work-dir (getenv "PWD")) ;; where to execute job
(setenvs '()) ;; alist of envvars to set when running job
(cmdline "/bin/true") ;; job command line; if string, run in subshell, if list, exec.
(success-cb (lambda () #t)) ;; fires when exitcode is 0
(fail-cb (lambda () #t)));; fires when exitcode is not 0
(define (tjq:exception e)
(print "Exception: "e)
;;(print-call-chain)
(exit 1))
(define (tjq:job-thread job-id job this-thread)
(print "job-thread setup for jobid "job-id)
(make-thread
(lambda ()
(print "job-thread started for jobid "job-id)
(print "job-thread finished for jobid "job-id)
#f)))
(define (tjq:dispatcher-thread qname job-hash sync-job-cap obj)
(letrec
(;; options to configure behavior of dispatcher
(timeout-seconds 0.01)
;; define long-running thread which receives requests and coordinates
;; job execution
(this-thread
(make-thread
(lambda ()
(let loop ((next-job-id 0)
;; four job queues holding job mbox-type threads
;; they advance from one to the next
;; once in done, the thread has completed.
(waiting '()) ;; stay here until count(running) < sync-job-cap
(ready '()) ;; launch jobs in here
(running '()) ;; wait for pid to complete, then move do done
(done '())) ;;
;;(print "loop top")
(match (thread-receive timeout-seconds 'timeout)
('timeout
;;*** when timeout happens, examine job queues
;; and move jobs thru their lifecycle
;;** scan for dones
;; count dones, subtract from running total
;;** count waitings
;; move min(sync-job-cap - running total, waiting total) to ready
;; foreach ready, thread & fork
(loop next-job-id waiting ready running done))
(('method 'ping '() return-mbox)
(print "got ping")
(mailbox-send! return-mbox 'pong)
(loop next-job-id waiting ready running done))
(('method 'submit args return-mbox)
(let* ((job-id next-job-id)
(job (apply
make-tjq:job
id: job-id
time-entered-waiting: (current-seconds)
state: 'waiting
args))
(job-thread (tjq:job-thread job-id job this-thread)))
(hash-table-set! job-hash job-id job)
(thread-start! job-thread)
(mailbox-send! return-mbox job-id)
(loop
(add1 next-job-id)
(cons job-thread waiting)
ready running done)))
(e
(print "tjq:dispatcher-thread> no matching pattern. dispatcher received: ")
(pp e)
(exit 1))
(('method x args return-mbox)
(mailbox-send! return-mbox (list 'missing-method))
(loop next-job-id waiting ready running done))
) ;; end match
qname)))))
this-thread))
(define (tjq:new #!key (qname (gensym)) (sync-job-cap 100))
(let* ((job-hash (make-hash-table))
(obj-mbox (make-mailbox)))
(letrec
((dispatch-thread (tjq:dispatcher-thread qname job-hash sync-job-cap obj))
(obj
(lambda (op . args)
(cond
((eq? op 'event-loop)
(thread-join! dispatch-thread))
(else
(thread-send dispatch-thread (list 'method op args obj-mbox))
(let* ((res (mailbox-receive! obj-mbox)))
(if (eq? res 'missing-method)
(begin
(print "missing method "op" called.")
(tjq:exception 'missing-method))
res)))))))
(thread-start! dispatch-thread)
obj)))
(define (test-tjq-simple)
(let* ((q (tjq:new qname: 'test-q sync-job-cap: 3)))
;(q 'submit "ls -l")
;(q 'drain)
(pp (q 'ping))
(pp (q 'ping))
(thread-sleep! 0.1)
;(q 'event-loop)
)
)
(define (test-submit)
(let* ((q (tjq:new qname: 'test-q sync-job-cap: 3)))
(q 'submit cmdline: "sleep 2; echo job well done")
(thread-sleep! 4)))
;(test-tjq-simple)
(test-submit)