Megatest

threaded-queue.scm at [977ca5be29]
Login

File example/q/threaded-queue.scm artifact 628e270284 part of check-in 977ca5be29


#!/opt/chicken/bin/csi -s
;; This file is part of Megatest.
;; 
;;     Megatest is free software: you can redistribute it and/or modify
;;     it under the terms of the GNU General Public License as published by
;;     the Free Software Foundation, either version 3 of the License, or
;;     (at your option) any later version.
;; 
;;     Megatest is distributed in the hope that it will be useful,
;;     but WITHOUT ANY WARRANTY; without even the implied warranty of
;;     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
;;     GNU General Public License for more details.
;; 
;;     You should have received a copy of the GNU General Public License
;;     along with Megatest.  If not, see <http://www.gnu.org/licenses/>.


(use mailbox-threads typed-records matchable mailbox posix)

;;; create a threaded job queue
;;; submit job
;;;   - command line
;;;   - working dir / default pwd
;;;   - env hash    / default current env
;;;   - callback on exit 0 / default noop
;;;   - callback on nonzero exit / default noop
;; tjq == threaded job queue; a job is a unix command
(define getenv get-environment-variable)
(defstruct tjq:job
    id  ;; assigned at construction time when added to waiting q
  state ;; assigned at construction time when added to waiting q
  (pid #f) ;; assigned when moved from ready q to running q
  (threadobj #f)
  (normal-exit #f)
  (exit-code #f) ;; assigned when moved from running to done
  (time-entered-waiting #f)
  (time-entered-ready   #f)
  (time-entered-running #f)
  (time-entered-done    #f)
  ;; following are key options to submit method
  (work-dir     (getenv "PWD")) ;; where to execute job
  (setenvs     '())            ;; alist of envvars to set when running job
  (cmdline     "/bin/true")    ;; job command line; if string, run in subshell, if list, exec.
  (success-cb  (lambda () #t)) ;; fires when exitcode is 0
  (fail-cb     (lambda () #t)));; fires when exitcode is not 0

 
             

(define (tjq:exception e)
  (print "Exception: "e)
  ;;(print-call-chain)
  (exit 1))

(define (tjq:job-thread job-id job dispatch-thread timeout-seconds)
  ;;(print "job-thread setup for jobid "job-id)
  (letrec
      ((this-thread
        (make-thread
         (lambda ()
           (tjq:job-threadobj-set! job this-thread)
           ;;(print "job-thread started for jobid "job-id)
           (let loop ((pid #f))
             ;;(print "job-thr("job-id")> loop top.")
             (match (thread-receive timeout-seconds 'timeout)
               ('timeout
                ;;(print "job-thr("job-id")> timeout; pid="pid" before cond1")
                (cond
                 ((number? pid) ;; check if still running
                  ;;(print "job-thr("job-id")> timeout; pid="pid" cond1 pid-is-number branch")
                  (let-values (((pid-or-zero normal-exit exitcode-or-signal)
                                (process-wait pid #t)))


                    ;; can get for large number of parallel threads (~ >= 42)
                    ;;                     Warning (#<thread: anonymous>): in thread: (process-wait) waiting for child process failed - No child processes: 11322
                    
                    ;; 	Call history:
                    
                    ;; 	threaded-queue.scm:56: ##sys#call-with-values	  
                    ;; 	threaded-queue.scm:57: process-wait	  	<--
                    
                    ;; Warning (#<thread: anonymous>): in thread: (process-wait) waiting for child process failed - No child processes: 11323did job 467
                    ;; did job 464
                    
                    ;; Warning (#<thread: anonymous>): in thread: (process-wait) waiting for child process failed - No child processes: 11318
                    
                    ;; 	Call history:
                    
                    ;; 	threaded-queue.scm:56: ##sys#call-with-values	  
                    ;; 	threaded-queue.scm:57: process-wait	  	<--
                    

                    ;;(print "job-thr("job-id")> pid-or-zero="pid-or-zero)
                    (cond
                     ((zero? pid-or-zero)
                      ;;(print "job-thr("job-id")> timeout; pid="pid" cond2 pid-or-zero is zero branch")
                      ;;(print "job-thr("job-id")> zero; loop.")
                      (loop pid))
                     (else
                      ;;(print "job-thr("job-id")> timeout; pid="pid" cond2 else branch")
                      (tjq:job-normal-exit-set! job
                                                (if normal-exit 'normal 'signal))
                      (tjq:job-exit-code-set! job exitcode-or-signal)
                      
                      (thread-send dispatch-thread (list 'job-now-done job-id))))
                    ;;(print "job-thr("job-id")> after cond2")
                    ))
                 (else
                  ;;(print "job-thr("job-id")> timeout; pid="pid" cond1 else branch")
                  ;;(print "job-thr("job-id")> no action; loop")
                  (thread-sleep! timeout-seconds)
                  (loop pid))))
               ('run
                ;;(print "job-thr("job-id")> run called")
                (let* ((cmdline (tjq:job-cmdline job))
                       (newpid (if (list? cmdline)
                                (process-run (car cmdline) (cdr cmdline))
                                (process-run cmdline))))
                  (tjq:job-pid-set! job newpid)
                  (thread-send dispatch-thread
                               (list 'job-now-running job-id))
                  (loop newpid)))
               (e
                (print "tjq:job-thread("job-id") illegal message received:")
                (pp e)
               (exit 1))))
           ;;(print "job-thread finished for jobid "job-id)
           #f))))
    this-thread))
       
       
(define (tjq:dispatcher-thread qname job-hash sync-job-cap obj #!key
                               (job-thread-timeout-seconds 0.1)
                               (timeout-seconds 0.1)
                               )
  (letrec
   (;; options to configure behavior of dispatcher
    (stop #f)
    ;;(timeout-seconds 0.5)
    ;;(job-thread-timeout-seconds 0.5)
    ;; define long-running thread which receives requests and coordinates
    ;; job execution
    (this-thread
     (make-thread
      (lambda ()
        (let loop ((next-job-id 0)
                   ;; four job queues holding job mbox-type threads
                   ;; they advance from one to the next
                   ;; once in done, the thread has completed.
                   (waiting '()) ;; stay here until count(running) < sync-job-cap
                   (ready   '()) ;; launch jobs in here
                   (running '()) ;; wait for pid to complete, then move do done
                   (done    '())) ;; 
          ;;(print "loop top")
          (match (thread-receive timeout-seconds 'timeout)
              ;; (let ((res (thread-receive timeout-seconds 'timeout)
              ;;        (if (not stop)
                         
              ;;            'done)))
              ;;   res)
            ('timeout
             ;;(print "to: "stop" ; next-job-id="next-job-id)
             (if (and
                  (not next-job-id) ;; we're draining jobs
                  (null? waiting)
                  (null? ready)
                  (null? running)
                  )
                 (begin
                   (print "Drained.  Done.")
                   (set! stop #t)
                   #f)
                 ;;*** when timeout happens, examine job queues
                 ;;    and move jobs thru their lifecycle
                 ;;** count waitings
                 ;; move min(sync-job-cap - running total, waiting total) to ready
                 ;; foreach ready, run it
                 ;;(print "disp: ready="ready)
                 (begin
                   (for-each (lambda (job-id)
                               (let* ((job (hash-table-ref job-hash job-id))
                                      (job-thread (tjq:job-threadobj job)))
                                 (tjq:job-state-set! job 'ready)
                                 (thread-send job-thread 'run)))
                             ready)
                   (let* ((new-running (flatten ready running))
                          (avail-slots (- sync-job-cap (length new-running)))
                          (queueable-count (min avail-slots (length waiting))))
                     (let-values (((new-ready new-waiting)
                                   (split-at waiting queueable-count)))
                       (loop next-job-id new-waiting new-ready new-running done))))))

            (('job-now-running job-id)
             (let ((job (hash-table-ref job-hash job-id)))
               (tjq:job-state-set! job 'running)
               (loop next-job-id waiting ready running done)))
            (('job-now-done job-id)
             (let* ((job (hash-table-ref job-hash job-id))
                   (successful
                    (and
                     (eq? 'normal (tjq:job-normal-exit job))
                     (zero? (tjq:job-exit-code job))))
                   (new-running (filter
                                 (lambda (x) (not (eq? x job-id)))
                                 running))
                   (new-done    (cons job-id done)))
               (tjq:job-state-set! job 'done)
               (loop next-job-id waiting ready new-running new-done)))
            (('method 'ping '() return-mbox)
             (print "got ping")
             (mailbox-send! return-mbox 'pong)
             (loop next-job-id waiting ready running done))
            
            (('method 'submit args return-mbox)
             (if (not next-job-id)
                 (begin
                   (print "refuse to submit new job -- draining jobs now.")
                   (mailbox-send! return-mbox #f)
                   (loop #f waiting ready running done))
                 (let* ((job-id next-job-id)
                        (job (apply
                              make-tjq:job
                              id: job-id
                              time-entered-waiting: (current-seconds)
                              state: 'waiting
                              args))
                        (job-thread (tjq:job-thread job-id job this-thread job-thread-timeout-seconds)))
                   (hash-table-set! job-hash job-id job)
                   (thread-start! job-thread)
                   (mailbox-send! return-mbox job-id)
                   (loop
                    (add1 next-job-id)
                    (cons job-id waiting)
                    ready running done))))

            (('method 'kill)
             ;; (for-each (job-id)
             ;;  (lambda (job-id)
             ;;    (let* ((job (hash-table-ref job-hash job-id))
             ;;           (job-thread (tjq:job-threadobj job)))
             ;;      (thread-send job-thread 'abort))
             )
            
            (('method 'drain args return-mbox)
             (mailbox-send! return-mbox 'drain)
             (loop #f waiting ready running done))
                
                       
            ;; (for-each
            ;;   (lambda (job-id)
            ;;     (let* ((job (hash-table-ref job-hash job-id))
            ;;            (job-thread (tjq:job-threadobj job)))
            ;;       (thread-join! job-thread)))
            ;;   '() ;; FIXME
            ;;  )
            
            (e
             (print "tjq:dispatcher-thread> no matching pattern.  dispatcher received: ")
             (pp e)
             (exit 1))
            (('method x args return-mbox)
             (mailbox-send! return-mbox (list 'missing-method))
             (loop next-job-id waiting ready running done))
            ) ;; end match
          ;;(print "Done dispatch thread")
          #f
          )))))
   this-thread))
  

       
(define (tjq:new #!key (qname (gensym)) (sync-job-cap 100))
  (let* ((job-hash (make-hash-table))
         (obj-mbox (make-mailbox)))
    (letrec
        ((dispatch-thread
          (tjq:dispatcher-thread
           job-thread-timeout-seconds: 0.01
           timeout-seconds: 0.01

           qname job-hash sync-job-cap obj))
         (obj
          (lambda (op . args)
            (cond
             ((eq? op 'event-loop)
              (print "got event-loop")
              (thread-join! dispatch-thread)
              (print "after thread-join dispatch-thread")
              )
             ((eq? op 'drain)
              (thread-send dispatch-thread (list 'method op args obj-mbox))
              (thread-join! dispatch-thread)
              (print "Done with queue "qname)
              #t)
             (else
              ;;(print "send method op="op)
              (thread-send dispatch-thread (list 'method op args obj-mbox))
              (let* ((res (mailbox-receive! obj-mbox)))
                (if (eq? res 'missing-method)
                    (begin
                      (print "missing method "op" called.")
                      (tjq:exception 'missing-method))
                    res)))))

          ) ;; end obj binding
         ); end letrec bindings
      (thread-start! dispatch-thread)
      obj)))
         
                            
(define (test-tjq-simple)
  (let* ((q (tjq:new qname: 'test-q sync-job-cap: 3)))
    ;(q 'submit "ls -l")
    ;(q 'drain)

    (pp (q 'ping))
    (pp (q 'ping))
    (thread-sleep! 0.1)
    ;(q 'event-loop)
    )
  )

(define (test-submit)
  (let* ((q (tjq:new qname: 'test-q sync-job-cap: 3)))
    (q 'submit cmdline: "sleep 2; echo job well done")
    (thread-sleep! 4)))


(define (test-drain-simple)
  (let* ((q (tjq:new qname: 'test-q sync-job-cap: 3)))
    ;(q 'submit cmdline: "sleep 2; echo job well done")
    (thread-sleep! 1)
    (q 'drain)))
  

(define (test-submit-bunch)
  (let* ((q (tjq:new qname: 'test-q sync-job-cap: 3)))
    (for-each (lambda (x)
                (let* ((cmd (conc "echo did job "x)))
                  (print "submit it--"x)
                  (q 'submit cmdline: cmd))
               )
             (iota 6))
    ;;(thread-sleep! 10)
    (q 'drain)
    ;;(q 'event-loop)
    ))

(define (test-submit-bunch2)
  (let* ((q (tjq:new qname: 'test-q sync-job-cap: 20 )))
    (for-each (lambda (x)
                ;;(let* ((cmd (conc "echo did job "x)))
                (let* ((cmd "/bin/true"))
                  ;;(print "submit it--"x)
                  (q 'submit cmdline: cmd))
               )
             (iota 6000))
    ;;(thread-sleep! 10)
    (q 'drain)
    ;;(q 'event-loop)
    ))

;(test-tjq-simple)
;;(test-submit)
;;(test-drain-simple)
(print "top")
(test-submit-bunch)