Overview
Comment: | Added example of nanomsg parallizaation |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | v1.62 |
Files: | files | file ages | folders |
SHA1: |
f1f44b078f7687d1ce26a25b7a4bf616 |
User & Date: | mrwellan on 2016-11-10 15:13:50 |
Other Links: | branch diff | manifest | tags |
Context
2016-11-11
| ||
10:59 | Added deploy makefile check-in: 9c2e96a8c8 user: jmoon18 tags: v1.62 | |
2016-11-10
| ||
15:13 | Added example of nanomsg parallizaation check-in: f1f44b078f user: mrwellan tags: v1.62 | |
2016-11-07
| ||
13:32 | Added beginnings of a common context var for passing area specific values to calls check-in: 825534b56a user: mrwellan tags: v1.62 | |
Changes
Added remotediff-nmsg.scm version [c101aad63b].
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 | (use posix) (use regex) (use directory-utils) (use srfi-18 srfi-69 nanomsg) (define (client-send-receive soc msg) (nn-send soc msg) (nn-recv soc)) ;;do as calling user (define (do-as-calling-user proc) (let ((eid (current-effective-user-id)) (cid (current-user-id))) (if (not (eq? eid cid)) ;; running suid (set! (current-effective-user-id) cid)) (proc) (if (not (eq? eid cid)) (set! (current-effective-user-id) eid)))) ;; use mutex to not open/close files at same time ;; (define (checksum mtx file) (mutex-lock! mtx) (let-values (((inp oup pid) (process "shasum" (list file)))) (mutex-unlock! mtx) (let ((result (read-line inp))) ;; now flush out remaining output (let loop ((inl (read-line inp))) (if (eof-object? inl) (if (string? result) (begin (mutex-lock! mtx) (close-input-port inp) (close-output-port oup) (mutex-unlock! mtx) (car (string-split result))) #f) (loop (read-line inp))))))) (define *max-running* 40) (define (gather-dir-info path) (let ((mtx1 (make-mutex)) (threads (make-hash-table)) (last-num 0) (req (nn-socket 'req))) (print "starting client with pid " (current-process-id)) (nn-connect req "tcp://localhost:5559") ;; "ipc:///tmp/test-ipc") (find-files path ;; test: #t action: (lambda (p res) (let ((info (cond ((not (file-read-access? p)) '(cant-read)) ((directory? p) '(dir)) ((symbolic-link? p) (list 'symlink (read-symbolic-link p))) (else '(data))))) (if (eq? (car info) 'data) (let loop ((start-time (current-seconds))) (mutex-lock! mtx1) (let* ((num-threads (hash-table-size threads)) (ok-to-run (> *max-running* num-threads))) ;; (if (> (abs (- num-threads last-num)) 2) ;; (begin ;; ;; (print "num-threads:" num-threads) ;; (set! last-num num-threads))) (mutex-unlock! mtx1) (if ok-to-run (let ((run-time-start (current-seconds))) ;; (print "num threads: " num-threads) (let ((th1 (make-thread (lambda () (let ((cksum (checksum mtx1 p)) (run-time (- (current-seconds) run-time-start))) (mutex-lock! mtx1) (client-send-receive req (conc p " " cksum)) (mutex-unlock! mtx1)) (let loop2 () (mutex-lock! mtx1) (let ((registered (hash-table-exists? threads p))) (if registered (begin ;; (print "deleting thread reference for " p) (hash-table-delete! threads p))) ;; delete myself (mutex-unlock! mtx1) (if (not registered) (begin (thread-sleep! 0.5) (loop2)))))) p))) (thread-start! th1) ;; (thread-sleep! 0.05) ;; give things a little time to get going ;; (thread-join! th1) ;; (mutex-lock! mtx1) (hash-table-set! threads p th1) (mutex-unlock! mtx1) )) ;; thread is launched (let ((run-time (- (current-seconds) start-time))) ;; couldn't launch yet (cond ((< run-time 5)) ;; blast on through ((< run-time 30)(thread-sleep! 0.1)) ((< run-time 60)(thread-sleep! 2)) ((< run-time 120)(thread-sleep! 3)) (else (thread-sleep! 3))) (loop start-time))))))))) (map thread-join! (hash-table-values threads)) (client-send-receive req "quit") (nn-close req) (exit))) ;; recieve and store the file data, note: this is effectively a *server*, not a client. ;; (define (compare-directories path1 path2) (let ((last-print (current-seconds)) (p1dat (make-hash-table)) (p2dat (make-hash-table)) (numdone 0) ;; increment when recieved a quit. exit when > 2 (rep (nn-socket 'rep))) (nn-bind rep "tcp://*:5559") ;; "ipc:///tmp/test-ipc") ;; start clients (thread-sleep! 0.1) (system (conc "./remotediff-nmsg " path1 " &")) (system (conc "./remotediff-nmsg " path2 " &")) (let loop ((msg-in (nn-recv rep))) (if (equal? msg-in "quit") (set! numdone (+ numdone 1))) (if (and (not (equal? msg-in "quit")) (< numdone 2)) (let* ((parts (string-split msg-in)) (filen (car parts)) (finfo (cadr parts)) (isp1 (substring-index path1 filen 0)) ;; is this a path1? (isp2 (substring-index path2 filen 0))) ;; is this a path2? (if isp1 (if (hash-table-exists? p2dat (hash-table-set! p1dat filen finfo) (hash-table-set! p2dat filen finfo)) ;; (print "parts: " parts) (nn-send rep "done") (if (> last-print 15) (begin (set! last-print (current-seconds)) (print "Processed " num-files-1 ", " num-files-2))) (loop (nn-recv rep))))) (print "p1: " (hash-table-size p1dat) " p2: " (hash-table-size p2dat)) (list p1dat p2dat))) (if (< (length (argv)) 2) (begin (print "Usage: remotediff-nmsg file1 file2") (exit))) (if (eq? (length (argv)) 2) ;; given a single path (gather-dir-info (cadr (argv))) (compare-directories (cadr (argv))(caddr (argv)))) (print "Done") |