#!/opt/chicken/bin/csi -s
(use mailbox-threads typed-records matchable mailbox posix)
;;; create a threaded job queue
;;; submit job
;;; - command line
;;; - working dir / default pwd
;;; - env hash / default current env
;;; - callback on exit 0 / default noop
;;; - callback on nonzero exit / default noop
;; tjq == threaded job queue; a job is a unix command
(define getenv get-environment-variable)
(defstruct tjq:job
id ;; assigned at construction time when added to waiting q
state ;; assigned at construction time when added to waiting q
(pid #f) ;; assigned when moved from ready q to running q
(threadobj #f)
(normal-exit #f)
(exit-code #f) ;; assigned when moved from running to done
(time-entered-waiting #f)
(time-entered-ready #f)
(time-entered-running #f)
(time-entered-done #f)
;; following are key options to submit method
(work-dir (getenv "PWD")) ;; where to execute job
(setenvs '()) ;; alist of envvars to set when running job
(cmdline "/bin/true") ;; job command line; if string, run in subshell, if list, exec.
(success-cb (lambda () #t)) ;; fires when exitcode is 0
(fail-cb (lambda () #t)));; fires when exitcode is not 0
(define (tjq:exception e)
(print "Exception: "e)
;;(print-call-chain)
(exit 1))
(define (tjq:job-thread job-id job dispatch-thread timeout-seconds)
;;(print "job-thread setup for jobid "job-id)
(letrec
((this-thread
(make-thread
(lambda ()
(tjq:job-threadobj-set! job this-thread)
;;(print "job-thread started for jobid "job-id)
(let loop ((pid #f))
;;(print "job-thr("job-id")> loop top.")
(match (thread-receive timeout-seconds 'timeout)
('timeout
;;(print "job-thr("job-id")> timeout; pid="pid" before cond1")
(cond
((number? pid) ;; check if still running
;;(print "job-thr("job-id")> timeout; pid="pid" cond1 pid-is-number branch")
(let-values (((pid-or-zero normal-exit exitcode-or-signal)
(process-wait pid #t)))
;; can get for large number of parallel threads (~ >= 42)
;; Warning (#<thread: anonymous>): in thread: (process-wait) waiting for child process failed - No child processes: 11322
;; Call history:
;; threaded-queue.scm:56: ##sys#call-with-values
;; threaded-queue.scm:57: process-wait <--
;; Warning (#<thread: anonymous>): in thread: (process-wait) waiting for child process failed - No child processes: 11323did job 467
;; did job 464
;; Warning (#<thread: anonymous>): in thread: (process-wait) waiting for child process failed - No child processes: 11318
;; Call history:
;; threaded-queue.scm:56: ##sys#call-with-values
;; threaded-queue.scm:57: process-wait <--
;;(print "job-thr("job-id")> pid-or-zero="pid-or-zero)
(cond
((zero? pid-or-zero)
;;(print "job-thr("job-id")> timeout; pid="pid" cond2 pid-or-zero is zero branch")
;;(print "job-thr("job-id")> zero; loop.")
(loop pid))
(else
;;(print "job-thr("job-id")> timeout; pid="pid" cond2 else branch")
(tjq:job-normal-exit-set! job
(if normal-exit 'normal 'signal))
(tjq:job-exit-code-set! job exitcode-or-signal)
(thread-send dispatch-thread (list 'job-now-done job-id))))
;;(print "job-thr("job-id")> after cond2")
))
(else
;;(print "job-thr("job-id")> timeout; pid="pid" cond1 else branch")
;;(print "job-thr("job-id")> no action; loop")
(thread-sleep! timeout-seconds)
(loop pid))))
('run
;;(print "job-thr("job-id")> run called")
(let* ((cmdline (tjq:job-cmdline job))
(newpid (if (list? cmdline)
(process-run (car cmdline) (cdr cmdline))
(process-run cmdline))))
(tjq:job-pid-set! job newpid)
(thread-send dispatch-thread
(list 'job-now-running job-id))
(loop newpid)))
(e
(print "tjq:job-thread("job-id") illegal message received:")
(pp e)
(exit 1))))
;;(print "job-thread finished for jobid "job-id)
#f))))
this-thread))
(define (tjq:dispatcher-thread qname job-hash sync-job-cap obj #!key
(job-thread-timeout-seconds 0.1)
(timeout-seconds 0.1)
)
(letrec
(;; options to configure behavior of dispatcher
(stop #f)
;;(timeout-seconds 0.5)
;;(job-thread-timeout-seconds 0.5)
;; define long-running thread which receives requests and coordinates
;; job execution
(this-thread
(make-thread
(lambda ()
(let loop ((next-job-id 0)
;; four job queues holding job mbox-type threads
;; they advance from one to the next
;; once in done, the thread has completed.
(waiting '()) ;; stay here until count(running) < sync-job-cap
(ready '()) ;; launch jobs in here
(running '()) ;; wait for pid to complete, then move do done
(done '())) ;;
;;(print "loop top")
(match (thread-receive timeout-seconds 'timeout)
;; (let ((res (thread-receive timeout-seconds 'timeout)
;; (if (not stop)
;; 'done)))
;; res)
('timeout
;;(print "to: "stop" ; next-job-id="next-job-id)
(if (and
(not next-job-id) ;; we're draining jobs
(null? waiting)
(null? ready)
(null? running)
)
(begin
(print "Drained. Done.")
(set! stop #t)
#f)
;;*** when timeout happens, examine job queues
;; and move jobs thru their lifecycle
;;** count waitings
;; move min(sync-job-cap - running total, waiting total) to ready
;; foreach ready, run it
;;(print "disp: ready="ready)
(begin
(for-each (lambda (job-id)
(let* ((job (hash-table-ref job-hash job-id))
(job-thread (tjq:job-threadobj job)))
(tjq:job-state-set! job 'ready)
(thread-send job-thread 'run)))
ready)
(let* ((new-running (flatten ready running))
(avail-slots (- sync-job-cap (length new-running)))
(queueable-count (min avail-slots (length waiting))))
(let-values (((new-ready new-waiting)
(split-at waiting queueable-count)))
(loop next-job-id new-waiting new-ready new-running done))))))
(('job-now-running job-id)
(let ((job (hash-table-ref job-hash job-id)))
(tjq:job-state-set! job 'running)
(loop next-job-id waiting ready running done)))
(('job-now-done job-id)
(let* ((job (hash-table-ref job-hash job-id))
(successful
(and
(eq? 'normal (tjq:job-normal-exit job))
(zero? (tjq:job-exit-code job))))
(new-running (filter
(lambda (x) (not (eq? x job-id)))
running))
(new-done (cons job-id done)))
(tjq:job-state-set! job 'done)
(loop next-job-id waiting ready new-running new-done)))
(('method 'ping '() return-mbox)
(print "got ping")
(mailbox-send! return-mbox 'pong)
(loop next-job-id waiting ready running done))
(('method 'submit args return-mbox)
(if (not next-job-id)
(begin
(print "refuse to submit new job -- draining jobs now.")
(mailbox-send! return-mbox #f)
(loop #f waiting ready running done))
(let* ((job-id next-job-id)
(job (apply
make-tjq:job
id: job-id
time-entered-waiting: (current-seconds)
state: 'waiting
args))
(job-thread (tjq:job-thread job-id job this-thread job-thread-timeout-seconds)))
(hash-table-set! job-hash job-id job)
(thread-start! job-thread)
(mailbox-send! return-mbox job-id)
(loop
(add1 next-job-id)
(cons job-id waiting)
ready running done))))
(('method 'kill)
;; (for-each (job-id)
;; (lambda (job-id)
;; (let* ((job (hash-table-ref job-hash job-id))
;; (job-thread (tjq:job-threadobj job)))
;; (thread-send job-thread 'abort))
)
(('method 'drain args return-mbox)
(mailbox-send! return-mbox 'drain)
(loop #f waiting ready running done))
;; (for-each
;; (lambda (job-id)
;; (let* ((job (hash-table-ref job-hash job-id))
;; (job-thread (tjq:job-threadobj job)))
;; (thread-join! job-thread)))
;; '() ;; FIXME
;; )
(e
(print "tjq:dispatcher-thread> no matching pattern. dispatcher received: ")
(pp e)
(exit 1))
(('method x args return-mbox)
(mailbox-send! return-mbox (list 'missing-method))
(loop next-job-id waiting ready running done))
) ;; end match
;;(print "Done dispatch thread")
#f
)))))
this-thread))
(define (tjq:new #!key (qname (gensym)) (sync-job-cap 100))
(let* ((job-hash (make-hash-table))
(obj-mbox (make-mailbox)))
(letrec
((dispatch-thread
(tjq:dispatcher-thread
job-thread-timeout-seconds: 0.01
timeout-seconds: 0.01
qname job-hash sync-job-cap obj))
(obj
(lambda (op . args)
(cond
((eq? op 'event-loop)
(print "got event-loop")
(thread-join! dispatch-thread)
(print "after thread-join dispatch-thread")
)
((eq? op 'drain)
(thread-send dispatch-thread (list 'method op args obj-mbox))
(thread-join! dispatch-thread)
(print "Done with queue "qname)
#t)
(else
;;(print "send method op="op)
(thread-send dispatch-thread (list 'method op args obj-mbox))
(let* ((res (mailbox-receive! obj-mbox)))
(if (eq? res 'missing-method)
(begin
(print "missing method "op" called.")
(tjq:exception 'missing-method))
res)))))
) ;; end obj binding
); end letrec bindings
(thread-start! dispatch-thread)
obj)))
(define (test-tjq-simple)
(let* ((q (tjq:new qname: 'test-q sync-job-cap: 3)))
;(q 'submit "ls -l")
;(q 'drain)
(pp (q 'ping))
(pp (q 'ping))
(thread-sleep! 0.1)
;(q 'event-loop)
)
)
(define (test-submit)
(let* ((q (tjq:new qname: 'test-q sync-job-cap: 3)))
(q 'submit cmdline: "sleep 2; echo job well done")
(thread-sleep! 4)))
(define (test-drain-simple)
(let* ((q (tjq:new qname: 'test-q sync-job-cap: 3)))
;(q 'submit cmdline: "sleep 2; echo job well done")
(thread-sleep! 1)
(q 'drain)))
(define (test-submit-bunch)
(let* ((q (tjq:new qname: 'test-q sync-job-cap: 3)))
(for-each (lambda (x)
(let* ((cmd (conc "echo did job "x)))
(print "submit it--"x)
(q 'submit cmdline: cmd))
)
(iota 6))
;;(thread-sleep! 10)
(q 'drain)
;;(q 'event-loop)
))
(define (test-submit-bunch2)
(let* ((q (tjq:new qname: 'test-q sync-job-cap: 20 )))
(for-each (lambda (x)
;;(let* ((cmd (conc "echo did job "x)))
(let* ((cmd "/bin/true"))
;;(print "submit it--"x)
(q 'submit cmdline: cmd))
)
(iota 6000))
;;(thread-sleep! 10)
(q 'drain)
;;(q 'event-loop)
))
;(test-tjq-simple)
;;(test-submit)
;;(test-drain-simple)
(print "top")
(test-submit-bunch)