Overview
Comment: | Added queuefeeder |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | v1.60 |
Files: | files | file ages | folders |
SHA1: |
a095ada3d17b330c5775a2dd02394642 |
User & Date: | matt on 2015-06-18 23:12:18 |
Other Links: | branch diff | manifest | tags |
Context
2015-06-19
| ||
21:17 | Missing changes check-in: 0ee351862a user: matt tags: v1.60 | |
2015-06-18
| ||
23:12 | Added queuefeeder check-in: a095ada3d1 user: matt tags: v1.60 | |
18:00 | Tweaks to loadwatch check-in: d0797d3ec2 user: mrwellan tags: v1.60 | |
Changes
Added loadwatch/bjob-count.sh version [0c8ad639ee].
> > > | 1 2 3 | #!/bin/bash bqueues | grep normal |awk '{print $8}' |
Added loadwatch/launch-many.scm version [72e97c4511].
> > > > > > > | 1 2 3 4 5 6 7 | (let loop ((count 0)) (if (> count 500000) (print "DONE") (let ((cmd (conc "./queuefeeder xena:22022 bsub ./testopenlava.sh " count " " (random 30)))) (print "Running: " cmd) (system cmd) (loop (+ count 1))))) |
Added loadwatch/queuefeeder-server.scm version [befbabbb2c].
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 | ;;====================================================================== ;; Copyright 2015-2015, Matthew Welland. ;; ;; This program is made available under the GNU GPL version 2.0 or ;; greater. See the accompanying file COPYING for details. ;; ;; This program is distributed WITHOUT ANY WARRANTY; without even the ;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR ;; PURPOSE. ;;====================================================================== ;; Queue Feeder. Use a crude droop curve to limit feeding jobs into a queue ;; to prevent slamming the queue ;;====================================================================== ;; Methodology ;; ;; Connect to the server, the server delays the appropriate time (if ;; any) and then launch the task. ;; (use nanomsg posix regex) ;; (use trace) ;; (trace nn-bind nn-socket nn-assert nn-recv nn-send thread-terminate! nn-close ) (define port 22022) ;; get needed stuff from commandline ;; (define cmd '()) ;; cmd is run to give a count of the queue length => returns number in queue (let ((args (argv))) (if (> (length args) 2) (begin (set! port (cadr args)) (set! cmd (caddr args))) ;; no params supported (begin (print "Usage: queuefeeder-server port command") (print " where the command gives an integer on stdout indicating the queue load") (exit)))) (print "Running queue feeder with port=" port ", command=" cmd) (define rep (nn-socket 'rep)) (print "connecting, got: " (nn-bind rep (conc "tcp://" "*" ":" port))) (define *current-delay* 0) (define (server soc) (print "server starting") (let loop ((msg-in (nn-recv soc)) (count 0)) (if (eq? 0 (modulo count 1000)) (print "server received: " msg-in ", count=" count)) (cond ((equal? msg-in "quit") (nn-send soc "Ok, quitting")) ((and (>= (string-length msg-in) 4) (equal? (substring msg-in 0 4) "ping")) (nn-send soc (conc (current-process-id))) (loop (nn-recv soc)(+ count 1))) (else (mutex-lock! *current-delay-mutex*) (let ((current-delay *current-delay*)) (mutex-unlock! *current-delay-mutex*) (thread-sleep! current-delay) (nn-send soc (conc "hello " msg-in " you waited " current-delay " seconds")) (loop (nn-recv soc)(if (> count 20000000) 0 (+ count 1)))))))) (define (ping-self host port #!key (return-socket #t)) ;; send a random number along with pid and check that we get it back (let* ((req (nn-socket 'req)) (key "ping") (success #f) (keepwaiting #t) (ping (make-thread (lambda () (print "ping: sending string \"" key "\", expecting " (current-process-id)) (nn-send req key) (let ((result (nn-recv req))) (if (equal? (conc (current-process-id)) result) (begin (print "ping, success: received \"" result "\"") (set! success #t)) (begin (print "ping, failed: received key \"" result "\"") (set! keepwaiting #f) (set! success #f))))) "ping")) (timeout (make-thread (lambda () (let loop ((count 0)) (thread-sleep! 1) (print "still waiting after count seconds...") (if (and keepwaiting (< count 10)) (loop (+ count 1)))) (if keepwaiting (begin (print "timeout waiting for ping") (thread-terminate! ping)))) "timeout"))) (nn-connect req (conc "tcp://" host ":" port)) (handle-exceptions exn (begin (print-call-chain) (print 0 " message: " ((condition-property-accessor 'exn 'message) exn)) (print "exn=" (condition->list exn)) (print "ping failed to connect to " host ":" port)) (thread-start! timeout) (thread-start! ping) (thread-join! ping) (if success (thread-terminate! timeout))) (if return-socket (if success req #f) (begin (nn-close req) success)))) (define *current-delay-mutex* (make-mutex)) ;; update the *current-delay* value every minute or QUEUE_CHK_DELAY seconds (thread-start! (make-thread (lambda () (let ((delay-time (string->number (or (get-environment-variable "QUEUE_CHK_DELAY") "60")))) (let loop () (with-input-from-pipe cmd (lambda () (let* ((val (read)) (droop-val (if (number? val)(/ val 50) #f))) ;; val is number of jobs in queue. Use a linear droop of val/50 (mutex-lock! *current-delay-mutex*) (set! *current-delay* (/ (or droop-val 100) 50)) (mutex-unlock! *current-delay-mutex*) (print "droop-val=" droop-val) (thread-sleep! delay-time)))) (loop)))))) (let ((server-thread (make-thread (lambda ()(server rep)) "server"))) (thread-start! server-thread) (if (ping-self (get-host-name) port) (begin (thread-join! server-thread) (nn-close rep)) (print "ping failed"))) (exit) |
Added loadwatch/queuefeeder.scm version [175b252945].
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 | ;;====================================================================== ;; Copyright 2015-2015, Matthew Welland. ;; ;; This program is made available under the GNU GPL version 2.0 or ;; greater. See the accompanying file COPYING for details. ;; ;; This program is distributed WITHOUT ANY WARRANTY; without even the ;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR ;; PURPOSE. ;;====================================================================== ;; Queue Feeder. Use a crude droop curve to limit feeding jobs into a queue ;; to prevent slamming the queue ;;====================================================================== ;; Methodology ;; ;; Connect to the server, the server delays the appropriate time (if ;; any) and then launch the task. ;; (use nanomsg posix regex) (define req (nn-socket 'req)) ;; get needed stuff from commandline ;; (define hostport #f) (define cmd '()) (let ((args (argv))) (if (> (length args) 2) (begin (set! hostport (cadr args)) (set! cmd (cddr args))) (begin (print "Usage: queuefeeder host:port command params ....") (exit)))) (nn-connect req (conc "tcp://" hostport)) ;; xena:22022") (define (client-send-receive soc msg) (nn-send soc msg) (nn-recv soc)) ;; (define ((talk-to-server soc)) ;; (let loop ((cnt 200000)) ;; (let ((name (list-ref '("Matt" "Tom" "Bob" "Jill" "James" "Jane")(random 6)))) ;; ;; (print "Sending " name) ;; ;; (print ;; (client-send-receive req name) ;; ) ;; (if (> cnt 0)(loop (- cnt 1))))) ;; (print (client-send-receive req "quit")) ;; (nn-close req) ;; (exit)) ;; (thread-start! (lambda () (thread-sleep! 20) (print "Give up on waiting for the server") (nn-close req) (exit))) (thread-join! (thread-start! (lambda () (print (client-send-receive req (conc (current-user-name) "@" (get-host-name))))))) (process-execute (car cmd) (cdr cmd)) |
Added loadwatch/testopenlava.sh version [1f61657fdf].
> > > > > > > > > | 1 2 3 4 5 6 7 8 9 | #!/bin/bash job_order=$1 job_length=$2 echo "START: $job_order" > $job_order.log sleep $job_length echo "END: $job_order" >> $job_order.log |