Megatest

Check-in [f1f44b078f]
Login
Overview
Comment:Added example of nanomsg parallizaation
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | v1.62
Files: files | file ages | folders
SHA1: f1f44b078f7687d1ce26a25b7a4bf616e670c070
User & Date: mrwellan on 2016-11-10 15:13:50
Other Links: branch diff | manifest | tags
Context
2016-11-11
10:59
Added deploy makefile check-in: 9c2e96a8c8 user: jmoon18 tags: v1.62
2016-11-10
15:13
Added example of nanomsg parallizaation check-in: f1f44b078f user: mrwellan tags: v1.62
2016-11-07
13:32
Added beginnings of a common context var for passing area specific values to calls check-in: 825534b56a user: mrwellan tags: v1.62
Changes

Added remotediff-nmsg.scm version [c101aad63b].





































































































































































































































































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
(use posix)
(use regex)
(use directory-utils)
(use srfi-18 srfi-69 nanomsg)

(define (client-send-receive soc msg)
  (nn-send soc msg)
  (nn-recv soc))

;;do as calling user
(define (do-as-calling-user proc)
  (let ((eid (current-effective-user-id))
        (cid (current-user-id)))
    (if (not (eq? eid cid)) ;; running suid
            (set! (current-effective-user-id) cid))
    (proc)
    (if (not (eq? eid cid))
        (set! (current-effective-user-id) eid))))

;; use mutex to not open/close files at same time
;;
(define (checksum mtx file)
  (mutex-lock! mtx)
  (let-values (((inp oup pid)
                (process "shasum" (list file))))
    (mutex-unlock! mtx)
    (let ((result (read-line inp)))
      ;; now flush out remaining output
      (let loop ((inl (read-line inp)))
        (if (eof-object? inl)
            (if (string? result)
                (begin
                  (mutex-lock! mtx)
                  (close-input-port inp)
                  (close-output-port oup)
                  (mutex-unlock! mtx)
                  (car (string-split result)))
                #f)
            (loop (read-line inp)))))))

(define *max-running* 40)

(define (gather-dir-info path)
  (let ((mtx1     (make-mutex))
        (threads  (make-hash-table))
        (last-num 0)
        (req      (nn-socket 'req)))
    (print "starting client with pid " (current-process-id))
    (nn-connect req
                "tcp://localhost:5559")
                ;; "ipc:///tmp/test-ipc")
    (find-files 
     path 
     ;; test: #t
     action: (lambda (p res)
               (let ((info (cond
                            ((not (file-read-access? p)) '(cant-read))
                            ((directory? p)              '(dir))
                            ((symbolic-link? p)          (list 'symlink (read-symbolic-link p)))
                            (else                        '(data)))))
                 (if (eq? (car info) 'data)
                     (let loop ((start-time (current-seconds)))
                       (mutex-lock! mtx1)
                       (let* ((num-threads (hash-table-size threads))
                              (ok-to-run   (> *max-running* num-threads)))
                         ;; (if (> (abs (- num-threads last-num)) 2)
                         ;;     (begin
                         ;;       ;; (print "num-threads:" num-threads)
                         ;;       (set! last-num num-threads)))
                         (mutex-unlock! mtx1)
                         (if ok-to-run
                             (let ((run-time-start (current-seconds)))
                               ;; (print "num threads: " num-threads)
                               (let ((th1  (make-thread
                                            (lambda ()
                                              (let ((cksum (checksum mtx1 p))
                                                    (run-time (- (current-seconds) run-time-start)))
                                                (mutex-lock! mtx1)
                                                (client-send-receive req (conc p " " cksum))
                                                (mutex-unlock! mtx1))
                                              (let loop2 ()
                                                (mutex-lock! mtx1)
                                                (let ((registered (hash-table-exists? threads p)))
                                                  (if registered
                                                      (begin
                                                        ;; (print "deleting thread reference for " p)
                                                        (hash-table-delete! threads p))) ;; delete myself
                                                  (mutex-unlock! mtx1)
                                                  (if (not registered)
                                                      (begin
                                                        (thread-sleep! 0.5)
                                                        (loop2))))))
                                            p)))
                                 (thread-start! th1)
                                 ;; (thread-sleep! 0.05) ;; give things a little time to get going
                                 ;; (thread-join! th1) ;; 
                                 (mutex-lock! mtx1)
                                 (hash-table-set! threads p th1)
                                 (mutex-unlock! mtx1)
                                 )) ;; thread is launched
                             (let ((run-time (- (current-seconds) start-time))) ;; couldn't launch yet
                               (cond
                                ((< run-time 5)) ;; blast on through
                                ((< run-time 30)(thread-sleep! 0.1))
                                ((< run-time 60)(thread-sleep! 2))
                                ((< run-time 120)(thread-sleep! 3))
                                (else (thread-sleep! 3)))
                               (loop start-time)))))))))
    (map thread-join! (hash-table-values threads))
    (client-send-receive req "quit")
    (nn-close req)
    (exit)))

;; recieve and store the file data, note: this is effectively a *server*, not a client.
;;
(define (compare-directories path1 path2)
  (let ((last-print  (current-seconds))
        (p1dat       (make-hash-table))
        (p2dat       (make-hash-table))
        (numdone     0) ;; increment when recieved a quit. exit when > 2
        (rep         (nn-socket 'rep)))
     (nn-bind    rep  
                 "tcp://*:5559")
                 ;; "ipc:///tmp/test-ipc")
     ;; start clients
     (thread-sleep! 0.1)
     (system (conc "./remotediff-nmsg " path1 " &"))
     (system (conc "./remotediff-nmsg " path2 " &"))
     (let loop ((msg-in (nn-recv rep)))
       (if (equal? msg-in "quit")
           (set! numdone (+ numdone 1)))
       (if (and (not (equal? msg-in "quit"))
                (< numdone 2))
           (let* ((parts (string-split msg-in))
                  (filen (car parts))
                  (finfo (cadr parts))
                  (isp1  (substring-index path1 filen 0)) ;; is this a path1?
                  (isp2  (substring-index path2 filen 0))) ;; is this a path2?
             (if isp1 
                 (if (hash-table-exists? p2dat 
                 (hash-table-set! p1dat filen finfo)
                 (hash-table-set! p2dat filen finfo))
             ;; (print "parts: " parts)
             (nn-send rep "done")
             (if (> last-print 15)
                 (begin
                   (set! last-print (current-seconds))
                   (print "Processed " num-files-1 ", " num-files-2)))
             (loop (nn-recv rep)))))
     (print "p1: " (hash-table-size p1dat) " p2: " (hash-table-size p2dat))
     (list p1dat p2dat)))

(if (< (length (argv)) 2)
    (begin
      (print "Usage: remotediff-nmsg file1 file2")
      (exit)))

(if (eq? (length (argv)) 2) ;; given a single path
    (gather-dir-info (cadr (argv)))
    (compare-directories (cadr (argv))(caddr (argv))))

(print "Done")