Megatest

Check-in [2a3c07bb4e]
Login
Overview
Comment:wip
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | v1.64-synclaunch-threaded-q
Files: files | file ages | folders
SHA1: 2a3c07bb4e190c5aab566e5d46bb85577f356e59
User & Date: bb on 2018-02-08 15:38:38
Other Links: branch diff | manifest | tags
Context
2018-02-09
00:22
wip check-in: 9d523bb50a user: bb tags: v1.64-synclaunch-threaded-q
2018-02-08
15:38
wip check-in: 2a3c07bb4e user: bb tags: v1.64-synclaunch-threaded-q
2018-02-06
05:59
Rebuilt manual check-in: c57a166878 user: matt tags: v1.65
Changes

Added example/q/threaded-queue.scm version [32eb4584d7].



















































































































































































































































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#!/opt/chicken/bin/csi -s

(use mailbox-threads typed-records matchable mailbox)

;;; create a threaded job queue
;;; submit job
;;;   - command line
;;;   - working dir / default pwd
;;;   - env hash    / default current env
;;;   - callback on exit 0 / default noop
;;;   - callback on nonzero exit / default noop
;; tjq == threaded job queue; a job is a unix command
(define getenv get-environment-variable)
(defstruct tjq:job
    id  ;; assigned at construction time when added to waiting q
  state ;; assigned at construction time when added to waiting q
  (pid #f) ;; assigned when moved from ready q to running q
  (exit-code #f) ;; assigned when moved from running to done
  (time-entered-waiting #f)
  (time-entered-ready   #f)
  (time-entered-running #f)
  (time-entered-done    #f)
  ;; following are key options to submit method
  (work-dir     (getenv "PWD")) ;; where to execute job
  (setenvs     '())            ;; alist of envvars to set when running job
  (cmdline     "/bin/true")    ;; job command line; if string, run in subshell, if list, exec.
  (success-cb  (lambda () #t)) ;; fires when exitcode is 0
  (fail-cb     (lambda () #t)));; fires when exitcode is not 0

 
             

(define (tjq:exception e)
  (print "Exception: "e)
  ;;(print-call-chain)
  (exit 1))

(define (tjq:job-thread job-id job this-thread)
  (print "job-thread setup for jobid "job-id)
  (make-thread
   (lambda ()
     (print "job-thread started for jobid "job-id)
     (print "job-thread finished for jobid "job-id)
     #f)))


(define (tjq:dispatcher-thread qname job-hash sync-job-cap obj)
  (letrec
   (;; options to configure behavior of dispatcher
    (timeout-seconds 0.01)
    ;; define long-running thread which receives requests and coordinates
    ;; job execution
    (this-thread
     (make-thread
      (lambda ()
        (let loop ((next-job-id 0)
                   ;; four job queues holding job mbox-type threads
                   ;; they advance from one to the next
                   ;; once in done, the thread has completed.
                   (waiting '()) ;; stay here until count(running) < sync-job-cap
                   (ready   '()) ;; launch jobs in here
                   (running '()) ;; wait for pid to complete, then move do done
                   (done    '())) ;; 
          ;;(print "loop top")
          (match (thread-receive timeout-seconds 'timeout)
            ('timeout
             ;;*** when timeout happens, examine job queues
             ;;    and move jobs thru their lifecycle
             ;;** scan for dones
             ;; count dones, subtract from running total
             ;;** count waitings
             ;; move min(sync-job-cap - running total, waiting total) to ready
             ;; foreach ready, thread & fork
             
             
             (loop next-job-id waiting ready running done))
            (('method 'ping '() return-mbox)
             (print "got ping")
             (mailbox-send! return-mbox 'pong)
             (loop next-job-id waiting ready running done))
            
            (('method 'submit args return-mbox)
             (let* ((job-id next-job-id)
                    (job (apply
                          make-tjq:job
                          id: job-id
                          time-entered-waiting: (current-seconds)
                          state: 'waiting
                          args))
                    (job-thread (tjq:job-thread job-id job this-thread)))
               (hash-table-set! job-hash job-id job)
               (thread-start! job-thread)
               (mailbox-send! return-mbox job-id)
               (loop
                (add1 next-job-id)
                (cons job-thread waiting)
                ready running done)))
                    
            (e
             (print "tjq:dispatcher-thread> no matching pattern.  dispatcher received: ")
             (pp e)
             (exit 1))
            (('method x args return-mbox)
             (mailbox-send! return-mbox (list 'missing-method))
             (loop next-job-id waiting ready running done))
            ) ;; end match
          
          qname)))))
   this-thread))
  

       
(define (tjq:new #!key (qname (gensym)) (sync-job-cap 100))
  (let* ((job-hash (make-hash-table))
         (obj-mbox (make-mailbox)))
    (letrec
        ((dispatch-thread (tjq:dispatcher-thread qname job-hash sync-job-cap obj))
         (obj
          (lambda (op . args)
            (cond
             ((eq? op 'event-loop)
              (thread-join! dispatch-thread))
             (else
              (thread-send dispatch-thread (list 'method op args obj-mbox))
              (let* ((res (mailbox-receive! obj-mbox)))
                (if (eq? res 'missing-method)
                    (begin
                      (print "missing method "op" called.")
                      (tjq:exception 'missing-method))
                    res)))))))
      (thread-start! dispatch-thread)
      obj)))
         
                            
(define (test-tjq-simple)
  (let* ((q (tjq:new qname: 'test-q sync-job-cap: 3)))
    ;(q 'submit "ls -l")
    ;(q 'drain)

    (pp (q 'ping))
    (pp (q 'ping))
    (thread-sleep! 0.1)
    ;(q 'event-loop)
    )
  )

(define (test-submit)
  (let* ((q (tjq:new qname: 'test-q sync-job-cap: 3)))
    (q 'submit cmdline: "sleep 2; echo job well done")
    (thread-sleep! 4)))

;(test-tjq-simple)
(test-submit)