Overview
Comment: | Interleaving query enablement, server side coded |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | interleaved-queries |
Files: | files | file ages | folders |
SHA1: |
4980ca9f981af54ed0468ee01cb316a8 |
User & Date: | matt on 2012-11-12 00:04:43 |
Other Links: | branch diff | manifest | tags |
Context
2012-11-12
| ||
17:19 | Backed out static only build of zmq libs in installall.sh check-in: a74be59f83 user: matt tags: interleaved-queries | |
00:04 | Interleaving query enablement, server side coded check-in: 4980ca9f98 user: matt tags: interleaved-queries | |
2012-11-06
| ||
09:44 | Testing ordering of loading zmq, fixes to deploy script check-in: 8c476e8627 user: mrwellan tags: trunk | |
Changes
Modified server.scm from [c2beddecc4] to [8e93e2dd1b].
︙ | ︙ | |||
41 42 43 44 45 46 47 48 49 50 51 52 53 54 | (cdb:client-call zsocket 'ping #t) (debug:print 4 "server:self-ping - I'm alive on " iface ":" port "!") (mutex-lock! *heartbeat-mutex*) (set! *server-loop-heart-beat* (current-seconds)) (mutex-unlock! *heartbeat-mutex*) (loop)))) (define (server:run hostn) (debug:print 0 "Attempting to start the server ...") (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") (exit)))) | > > > > > | > | > > > > | | | | | | | | < < | | | | > > > > | > > | | < < < < | < < < < | 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 | (cdb:client-call zsocket 'ping #t) (debug:print 4 "server:self-ping - I'm alive on " iface ":" port "!") (mutex-lock! *heartbeat-mutex*) (set! *server-loop-heart-beat* (current-seconds)) (mutex-unlock! *heartbeat-mutex*) (loop)))) (define-inline (zmqsock:get-pub dat)(vector-ref dat 0)) (define-inline (zmqsock:get-pull dat)(vector-ref dat 1)) (define-inline (zmqsock:set-pub! dat s)(vector-set! dat s 0)) (define-inline (zmqsock:set-pull! dat s)(vector-set! dat s 0)) (define (server:run hostn) (debug:print 0 "Attempting to start the server ...") (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") (exit)))) (let* ((zmq-sdat1 #f) (zmq-sdat2 #f) (zmq-socket1 #f) (zmq-socket2 #f) (p1 #f) (p2 #f) (zmq-sockets-dat #f) (iface (if (string=? "-" hostn) "*" ;; (get-host-name) hostn)) (hostname (get-host-name)) (ipaddrstr (let ((ipstr (if (string=? "-" hostn) (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") #f))) (if ipstr ipstr hostname)))) (set! zmq-sockets-dat (server:setup-ports ipaddrstr (if (args:get-arg "-port") (string->number (args:get-arg "-port")) (+ 5000 (random 1001))))) (set! zmq-sdat1 (car zmq-socket-dat)) (set! zmq-socket1 (car zmq-sdat1)) (set! p1 (caddr zmq-sdat1)) (set! zmq-sdat2 (cadr zmq-socket-dat)) (set! zmq-socket2 (car zmq-sdat2)) (set! p2 (caddr zmq-sdat2)) (set! *cache-on* #t) ;; (set! th1 (make-thread (lambda () ;; (server:self-ping ipaddrstr actual-port)))) ;; (thread-start! th1) ;; what to do when we quit ;; (on-exit (lambda () (if (and *toppath* *server-info*) (begin (open-run-close tasks:server-deregister-self tasks:open-db ipaddrstr p1 p2)) (let loop () (let ((queue-len 0)) (thread-sleep! (random 5)) (mutex-lock! *incoming-mutex*) (set! queue-len (length *incoming-data*)) (mutex-unlock! *incoming-mutex*) (if (> queue-len 0) (begin (debug:print-info 0 "Queue not flushed, waiting ...") (loop)))))))) ;; The heavy lifting ;; (let loop () (let* ((rawmsg (receive-message* zmq-socket1)) (params (db:string->obj rawmsg)) ;; (with-input-from-string rawmsg (lambda ()(deserialize)))) (res #f)) (debug:print-info 12 "server=> received params=" params) (set! res (cdb:cached-access params)) (debug:print-info 12 "server=> processed res=" res) (send-message zmq-socket (db:obj->string res)) (if (not *time-to-exit*) (loop) (begin |
︙ | ︙ | |||
170 171 172 173 174 175 176 | (set! *time-to-exit* #t) (open-run-close tasks:server-deregister-self tasks:open-db (get-host-name)) (thread-sleep! 1) (debug:print-info 0 "Max cached queries was " *max-cache-size*) (debug:print-info 0 "Server shutdown complete. Exiting") (exit))))))) | | | | > | > > > > > > > | | | | | | | 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 | (set! *time-to-exit* #t) (open-run-close tasks:server-deregister-self tasks:open-db (get-host-name)) (thread-sleep! 1) (debug:print-info 0 "Max cached queries was " *max-cache-size*) (debug:print-info 0 "Server shutdown complete. Exiting") (exit))))))) (define (server:find-free-port-and-open iface s port stype #!key (trynum 50)) (let ((s (if s s (make-socket stype))) (p (if (number? port) port 5555)) (old-handler (current-exception-handler))) (handle-exceptions exn (begin (debug:print 0 "Failed to bind to port " p ", trying next port") (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) ;; (old-handler) ;; (print-call-chain) (if (> trynum 0) (server:find-free-port-and-open iface s (+ p 1) trynum: (- trynum 1)) (debug:print-info 0 "Tried ports up to " p " but all were in use. Please try a different port range by starting the server with parameter \" -port N\" where N is the starting port number to use")) (exit)) ;; To exit or not? That is the question. (let ((zmq-url (conc "tcp://" iface ":" p))) (debug:print 0 "Trying to start server on " zmq-url) (bind-socket s zmq-url) (list iface s port))))) (define (server:setup-ports ipadrstr startport) (let* ((s1 (server:find-free-port-and-open ipadrstr #f startport 'pub)) (p1 (caddr s1)) (s2 (server:find-free-port-and-open ipadrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pull)) (p2 (caddr s2))) (set! *runremote* #f) (debug:print 0 "Server started on " ipaddrstr " ports " p1 " and p2") (mutex-lock! *heartbeat-mutex*) (set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) iface p 0 'live)) (mutex-unlock! *heartbeat-mutex*) (list s1 s2))) (define (server:mk-signature) (message-digest-string (md5-primitive) (with-output-to-string (lambda () (write (list (current-directory) (argv))))))) |
︙ | ︙ |