Overview
Comment: | wip |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | v1.64-synclaunch-threaded-q |
Files: | files | file ages | folders |
SHA1: |
2a3c07bb4e190c5aab566e5d46bb8557 |
User & Date: | bb on 2018-02-08 15:38:38 |
Other Links: | branch diff | manifest | tags |
Context
2018-02-09
| ||
00:22 | wip check-in: 9d523bb50a user: bb tags: v1.64-synclaunch-threaded-q | |
2018-02-08
| ||
15:38 | wip check-in: 2a3c07bb4e user: bb tags: v1.64-synclaunch-threaded-q | |
2018-02-06
| ||
05:59 | Rebuilt manual check-in: c57a166878 user: matt tags: v1.65 | |
Changes
Added example/q/threaded-queue.scm version [32eb4584d7].
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 | #!/opt/chicken/bin/csi -s (use mailbox-threads typed-records matchable mailbox) ;;; create a threaded job queue ;;; submit job ;;; - command line ;;; - working dir / default pwd ;;; - env hash / default current env ;;; - callback on exit 0 / default noop ;;; - callback on nonzero exit / default noop ;; tjq == threaded job queue; a job is a unix command (define getenv get-environment-variable) (defstruct tjq:job id ;; assigned at construction time when added to waiting q state ;; assigned at construction time when added to waiting q (pid #f) ;; assigned when moved from ready q to running q (exit-code #f) ;; assigned when moved from running to done (time-entered-waiting #f) (time-entered-ready #f) (time-entered-running #f) (time-entered-done #f) ;; following are key options to submit method (work-dir (getenv "PWD")) ;; where to execute job (setenvs '()) ;; alist of envvars to set when running job (cmdline "/bin/true") ;; job command line; if string, run in subshell, if list, exec. (success-cb (lambda () #t)) ;; fires when exitcode is 0 (fail-cb (lambda () #t)));; fires when exitcode is not 0 (define (tjq:exception e) (print "Exception: "e) ;;(print-call-chain) (exit 1)) (define (tjq:job-thread job-id job this-thread) (print "job-thread setup for jobid "job-id) (make-thread (lambda () (print "job-thread started for jobid "job-id) (print "job-thread finished for jobid "job-id) #f))) (define (tjq:dispatcher-thread qname job-hash sync-job-cap obj) (letrec (;; options to configure behavior of dispatcher (timeout-seconds 0.01) ;; define long-running thread which receives requests and coordinates ;; job execution (this-thread (make-thread (lambda () (let loop ((next-job-id 0) ;; four job queues holding job mbox-type threads ;; they advance from one to the next ;; once in done, the thread has completed. (waiting '()) ;; stay here until count(running) < sync-job-cap (ready '()) ;; launch jobs in here (running '()) ;; wait for pid to complete, then move do done (done '())) ;; ;;(print "loop top") (match (thread-receive timeout-seconds 'timeout) ('timeout ;;*** when timeout happens, examine job queues ;; and move jobs thru their lifecycle ;;** scan for dones ;; count dones, subtract from running total ;;** count waitings ;; move min(sync-job-cap - running total, waiting total) to ready ;; foreach ready, thread & fork (loop next-job-id waiting ready running done)) (('method 'ping '() return-mbox) (print "got ping") (mailbox-send! return-mbox 'pong) (loop next-job-id waiting ready running done)) (('method 'submit args return-mbox) (let* ((job-id next-job-id) (job (apply make-tjq:job id: job-id time-entered-waiting: (current-seconds) state: 'waiting args)) (job-thread (tjq:job-thread job-id job this-thread))) (hash-table-set! job-hash job-id job) (thread-start! job-thread) (mailbox-send! return-mbox job-id) (loop (add1 next-job-id) (cons job-thread waiting) ready running done))) (e (print "tjq:dispatcher-thread> no matching pattern. dispatcher received: ") (pp e) (exit 1)) (('method x args return-mbox) (mailbox-send! return-mbox (list 'missing-method)) (loop next-job-id waiting ready running done)) ) ;; end match qname))))) this-thread)) (define (tjq:new #!key (qname (gensym)) (sync-job-cap 100)) (let* ((job-hash (make-hash-table)) (obj-mbox (make-mailbox))) (letrec ((dispatch-thread (tjq:dispatcher-thread qname job-hash sync-job-cap obj)) (obj (lambda (op . args) (cond ((eq? op 'event-loop) (thread-join! dispatch-thread)) (else (thread-send dispatch-thread (list 'method op args obj-mbox)) (let* ((res (mailbox-receive! obj-mbox))) (if (eq? res 'missing-method) (begin (print "missing method "op" called.") (tjq:exception 'missing-method)) res))))))) (thread-start! dispatch-thread) obj))) (define (test-tjq-simple) (let* ((q (tjq:new qname: 'test-q sync-job-cap: 3))) ;(q 'submit "ls -l") ;(q 'drain) (pp (q 'ping)) (pp (q 'ping)) (thread-sleep! 0.1) ;(q 'event-loop) ) ) (define (test-submit) (let* ((q (tjq:new qname: 'test-q sync-job-cap: 3))) (q 'submit cmdline: "sleep 2; echo job well done") (thread-sleep! 4))) ;(test-tjq-simple) (test-submit) |