Overview
Comment: | Cleaned up server starting. Should be no run-away starting of too many servers now. |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | v1.62-no-rpc |
Files: | files | file ages | folders |
SHA1: |
dc09eb179bb5ff963c2a223e9f2846b8 |
User & Date: | mrwellan on 2016-12-01 08:43:50 |
Other Links: | branch diff | manifest | tags |
Context
2016-12-01
| ||
15:58 | server fixes check-in: f0c98a8cd8 user: mrwellan tags: v1.62-no-rpc | |
08:43 | Cleaned up server starting. Should be no run-away starting of too many servers now. check-in: dc09eb179b user: mrwellan tags: v1.62-no-rpc | |
2016-11-30
| ||
23:10 | Basic server code, removed some junk and corrected couple typos check-in: bb626804c7 user: matt tags: v1.62-no-rpc | |
Changes
Modified db.scm from [77088c1205] to [fef1f3f2e3].
︙ | ︙ | |||
37 38 39 40 41 42 43 44 | (define *number-non-write-queries* 0) ;;====================================================================== ;; R E C O R D S ;;====================================================================== ;; each db entry is a pair ( db . dbfilepath ) (defstruct dbr:dbstruct | > > | | | > > > > | 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | (define *number-non-write-queries* 0) ;;====================================================================== ;; R E C O R D S ;;====================================================================== ;; each db entry is a pair ( db . dbfilepath ) ;; I propose this record evolves into the area record ;; (defstruct dbr:dbstruct (tmpdb #f) (mtdb #f) (refndb #f) (homehost #f) ;; not used yet (on-homehost #f) ;; not used yet ) ;; goal is to converge on one struct for an area but for now it is too confusing ;; record for keeping state,status and count for doing roll-ups in ;; iterated tests ;; (defstruct dbr:counts (state #f) (status #f) |
︙ | ︙ |
Modified remotediff-nmsg.scm from [90308a45f2] to [50100144d4].
︙ | ︙ | |||
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 | (mutex-unlock! mtx) (car (string-split result))) #f) (loop (read-line inp))))))) (define *max-running* 40) (define (gather-dir-info path) (let ((mtx1 (make-mutex)) (threads (make-hash-table)) (last-num 0) (req (nn-socket 'req))) (print "starting client with pid " (current-process-id)) (nn-connect req ;; "tcp://localhost:5559") "ipc:///tmp/test-ipc") (find-files path ;; test: #t action: (lambda (p res) (let ((info (cond ((not (file-read-access? p)) '(cant-read)) ((directory? p) '(dir)) ((symbolic-link? p) (list 'symlink (read-symbolic-link p))) (else '(data))))) (if (eq? (car info) 'data) (let loop ((start-time (current-seconds))) | > > > > > | | | | | | | | | 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 | (mutex-unlock! mtx) (car (string-split result))) #f) (loop (read-line inp))))))) (define *max-running* 40) (define my-mutex-lock! conc) (define my-mutex-unlock! conc) ;; (define my-mutex-lock! mutex-lock!) ;; (define my-mutex-unlock! mutex-unlock!) (define (gather-dir-info path) (let ((mtx1 (make-mutex)) (threads (make-hash-table)) (last-num 0) (req (nn-socket 'req))) (print "starting client with pid " (current-process-id)) (nn-connect req ;; "tcp://localhost:5559") "ipc:///tmp/test-ipc") (find-files path ;; test: #t action: (lambda (p res) (let ((info (cond ((not (file-read-access? p)) '(cant-read)) ((directory? p) '(dir)) ((symbolic-link? p) (list 'symlink (read-symbolic-link p))) (else '(data))))) (if (eq? (car info) 'data) (let loop ((start-time (current-seconds))) (my-mutex-lock! mtx1) (let* ((num-threads (hash-table-size threads)) (ok-to-run (> *max-running* num-threads))) ;; (if (> (abs (- num-threads last-num)) 2) ;; (begin ;; ;; (print "num-threads:" num-threads) ;; (set! last-num num-threads))) (my-mutex-unlock! mtx1) (if ok-to-run (let ((run-time-start (current-seconds))) ;; (print "num threads: " num-threads) (let ((th1 (make-thread (lambda () (let ((cksum (checksum mtx1 p cmd: "md5sum")) (run-time (- (current-seconds) run-time-start))) (my-mutex-lock! mtx1) (client-send-receive req (conc p " " cksum)) (my-mutex-unlock! mtx1)) (let loop2 () (my-mutex-lock! mtx1) (let ((registered (hash-table-exists? threads p))) (if registered (begin ;; (print "deleting thread reference for " p) (hash-table-delete! threads p))) ;; delete myself (my-mutex-unlock! mtx1) (if (not registered) (begin (thread-sleep! 0.5) (loop2)))))) p))) (thread-start! th1) ;; (thread-sleep! 0.05) ;; give things a little time to get going ;; (thread-join! th1) ;; (my-mutex-lock! mtx1) (hash-table-set! threads p th1) (my-mutex-unlock! mtx1) )) ;; thread is launched (let ((run-time (- (current-seconds) start-time))) ;; couldn't launch yet (cond ((< run-time 5)) ;; blast on through ((< run-time 30)(thread-sleep! 0.1)) ((< run-time 60)(thread-sleep! 2)) ((< run-time 120)(thread-sleep! 3)) |
︙ | ︙ |
Modified rmt.scm from [f6fac119b0] to [8fc869d3b5].
︙ | ︙ | |||
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 | (debug:print 0 *default-log-port* "ERROR: 15 tries to start/connect to server. Giving up.") (exit 1)) ;; ensure we have a record for our connection for given area ((not *runremote*) (set! *runremote* (make-remote)) (mutex-unlock! *rmt-mutex*) (rmt:send-receive cmd rid params attemptnum: attemptnum)) ;; no server contact made and this is a write, try starting a server ((and (not (remote-server-url *runremote*)) (not (member cmd api:read-only-queries))) (let ((serverconn (server:check-if-running *toppath*))) (if serverconn (remote-server-url-set! *runremote* serverconn) ;; the string can be consumed by the client setup if needed (if (not (server:start-attempted? *toppath*)) (server:kind-run *toppath*)))) | > > > > > > > > > > > > > > > > > > > | > > > | | | 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 | (debug:print 0 *default-log-port* "ERROR: 15 tries to start/connect to server. Giving up.") (exit 1)) ;; ensure we have a record for our connection for given area ((not *runremote*) (set! *runremote* (make-remote)) (mutex-unlock! *rmt-mutex*) (rmt:send-receive cmd rid params attemptnum: attemptnum)) ;; ensure we have a homehost record ((not (pair? (remote-hh-dat *runremote*))) ;; have a homehost record? (thread-sleep! 0.1) ;; since we shouldn't get here, delay a little (remote-hh-dat-set! *runremote* (common:get-homehost)) (mutex-unlock! *rmt-mutex*) (rmt:send-receive cmd rid params attemptnum: attemptnum)) ;; on homehost and this is a read ((and (cdr (remote-hh-dat *runremote*)) ;; on homehost (member cmd api:read-only-queries)) ;; this is a read (mutex-unlock! *rmt-mutex*) (rmt:open-qry-close-locally cmd 0 params)) ;; on homehost and this is a write, we already have a server ((and (cdr (remote-hh-dat *runremote*)) ;; on homehost (not (member cmd api:read-only-queries)) ;; this is a write (remote-server-url *runremote*)) ;; have a server (mutex-unlock! *rmt-mutex*) (rmt:open-qry-close-locally cmd 0 params)) ;; no server contact made and this is a write, try starting a server ((and (not (remote-server-url *runremote*)) (not (member cmd api:read-only-queries))) (let ((serverconn (server:check-if-running *toppath*))) (if serverconn (remote-server-url-set! *runremote* serverconn) ;; the string can be consumed by the client setup if needed (if (not (server:start-attempted? *toppath*)) (server:kind-run *toppath*)))) (if (cdr (remote-hh-dat *runremote*)) ;; we are on the homehost, just do the call (begin (mutex-unlock! *rmt-mutex*) (rmt:open-qry-close-locally cmd 0 params)) (begin (mutex-unlock! *rmt-mutex*) (rmt:send-receive cmd rid params attemptnum: attemptnum)))) ;; if not on homehost ensure we have a connection to a live server ((or (not (pair? (remote-hh-dat *runremote*))) ;; have a homehost record? (not (cdr (remote-hh-dat *runremote*))) ;; have record, are we on a homehost? (not (remote-conndat *runremote*))) ;; do we not have a connection? (remote-hh-dat-set! *runremote* (common:get-homehost)) (remote-conndat-set! *runremote* (rmt:get-connection-info 0)) (mutex-unlock! *rmt-mutex*) (server:kind-run *toppath*) ;; we need a sever (rmt:send-receive cmd rid params attemptnum: attemptnum)) ;; all set up if get this far, dispatch the query ((cdr (remote-hh-dat *runremote*)) ;; we are on homehost (mutex-unlock! *rmt-mutex*) (rmt:open-qry-close-locally cmd (if rid rid 0) params)) ;; reset the connection if it has been unused too long ((and (remote-conndat *runremote*) (let ((expire-time (- start-time (remote-server-timeout *runremote*)))) (< (http-transport:server-dat-get-last-access connection) expire-time))) (remote-conndatr *runremote* #f)) ;; not on homehost, do server query (else |
︙ | ︙ | |||
129 130 131 132 133 134 135 | (exit 1))) (begin (debug:print 0 *default-log-port* "WARNING: communication failed. Trying again, try num: " attemptnum) (remote-conndat-set! *runremote* #f) (server-url-set! *runremote* #f) (tasks:start-and-wait-for-server (tasks:open-db) 0 15) (rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1))))))))) | < | 151 152 153 154 155 156 157 158 159 160 161 162 163 164 | (exit 1))) (begin (debug:print 0 *default-log-port* "WARNING: communication failed. Trying again, try num: " attemptnum) (remote-conndat-set! *runremote* #f) (server-url-set! *runremote* #f) (tasks:start-and-wait-for-server (tasks:open-db) 0 15) (rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1))))))))) (define (rmt:update-db-stats run-id rawcmd params duration) (mutex-lock! *db-stats-mutex*) (handle-exceptions exn (begin (debug:print 0 *default-log-port* "WARNING: stats collection failed in update-db-stats") |
︙ | ︙ |