Overview
Comment: | switched default transport back to http |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | v1.62-rpc |
Files: | files | file ages | folders |
SHA1: |
37f53d58abfc53de3635d5456dc37f2a |
User & Date: | bjbarcla on 2016-12-09 17:25:06 |
Other Links: | branch diff | manifest | tags |
Context
2016-12-09
| ||
17:33 |
merged v1.62-rpc. Adds rpc transport capability but leaves default http.
NOTE: This commit appears to have broken the db sync-back to megatest.db. Moving this to side branch for repairs. check-in: c9ec425fc4 user: bjbarcla tags: v1.63-fix-db-sync | |
17:25 | switched default transport back to http Closed-Leaf check-in: 37f53d58ab user: bjbarcla tags: v1.62-rpc | |
2016-12-08
| ||
14:54 | made rpc default transport, not http. check-in: ee51b46291 user: bjbarcla tags: v1.62-rpc | |
Changes
Modified common.scm from [98bfc33987] to [298d65055e].
︙ | ︙ | |||
115 116 117 118 119 120 121 122 123 124 125 126 127 | (define *db-access-mutex* (make-mutex)) (define *db-cache-path* #f) ;; SERVER (define *my-client-signature* #f) (define *transport-type* #f) ;; override with [server] transport http|rpc|nmsg (define (common:set-transport-type) (set! *transport-type* (string->symbol (or (args:get-arg "-transport") (configf:lookup *configdat* "server" "transport") | > | | 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 | (define *db-access-mutex* (make-mutex)) (define *db-cache-path* #f) ;; SERVER (define *my-client-signature* #f) (define *transport-type* #f) ;; override with [server] transport http|rpc|nmsg (define *DEFAULT-TRANSPORT* "http") (define (common:set-transport-type) (set! *transport-type* (string->symbol (or (args:get-arg "-transport") (configf:lookup *configdat* "server" "transport") *DEFAULT-TRANSPORT*))) *transport-type*) (define *runremote* #f) ;; if set up for server communication this will hold <host port> (define *max-cache-size* 0) (define *logged-in-clients* (make-hash-table)) (define *server-id* #f) (define *server-info* #f) |
︙ | ︙ | |||
654 655 656 657 658 659 660 | (debug:print 4 *default-log-port* " ... done") ) "clean exit"))) ;; let's try to clean up open sockets (if *runremote* (case (remote-transport *runremote*) | | | 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 | (debug:print 4 *default-log-port* " ... done") ) "clean exit"))) ;; let's try to clean up open sockets (if *runremote* (case (remote-transport *runremote*) ((http) #t) ((rpc) (rpc:close-all-connections!)) (else (debug:print-info 0 *default-log-port* "Transport "(remote-transport *runremote*)" not supported")))) (thread-start! th1) (thread-start! th2) (thread-join! th1)))) |
︙ | ︙ |
Modified http-transport.scm from [67f16e95fd] to [b18d5a6f65].
︙ | ︙ | |||
82 83 84 85 86 87 88 | (res #f)) (cond ((equal? (uri-path (request-uri (current-request))) '(/ "api")) (send-response body: (api:process-request *dbstruct-db* $) ;; the $ is the request vars proc headers: '((content-type text/plain))) (mutex-lock! *heartbeat-mutex*) | | | 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 | (res #f)) (cond ((equal? (uri-path (request-uri (current-request))) '(/ "api")) (send-response body: (api:process-request *dbstruct-db* $) ;; the $ is the request vars proc headers: '((content-type text/plain))) (mutex-lock! *heartbeat-mutex*) (set! *db-lastaccess* (current-seconds)) (mutex-unlock! *heartbeat-mutex*)) ((equal? (uri-path (request-uri (current-request))) '(/ "")) (send-response body: (http-transport:main-page))) ((equal? (uri-path (request-uri (current-request))) '(/ "json_api")) (send-response body: (http-transport:main-page))) |
︙ | ︙ | |||
424 425 426 427 428 429 430 | (if (or (not (equal? sdat (list iface port))) (not server-id)) (begin (debug:print-info 0 *default-log-port* "interface changed, refreshing iface and port info") (set! iface (car sdat)) (set! port (cadr sdat)))) | | | | 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 | (if (or (not (equal? sdat (list iface port))) (not server-id)) (begin (debug:print-info 0 *default-log-port* "interface changed, refreshing iface and port info") (set! iface (car sdat)) (set! port (cadr sdat)))) ;; Transfer *db-last-access* to last-access to use in checking that we are still alive (mutex-lock! *heartbeat-mutex*) (set! last-access *db-last-access*) (mutex-unlock! *heartbeat-mutex*) ;; (debug:print 11 *default-log-port* "last-access=" last-access ", server-timeout=" server-timeout) ;; ;; no_traffic, no running tests, if server 0, no running servers ;; ;; (let ((wait-on-running (configf:lookup *configdat* "server" b"wait-on-running"))) ;; wait on running tasks (if not true then exit on time out) |
︙ | ︙ |
Modified rmt.scm from [a4b1443352] to [55635aac65].
︙ | ︙ | |||
133 134 135 136 137 138 139 | ;; NOTE: we *have* a homehost record by now ((and (not (cdr (remote-hh-dat *runremote*))) ;; are we on a homehost? (not (remote-conndat *runremote*))) ;; and no connection (debug:print-info 12 *default-log-port* "rmt:send-receive, case 6 hh-dat: " (remote-hh-dat *runremote*) " conndat: " (remote-conndat *runremote*)) (mutex-unlock! *rmt-mutex*) (tasks:start-and-wait-for-server (tasks:open-db) 0 15) (let* ((cinfo (rmt:get-connection-info 0)) | > > | | 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 | ;; NOTE: we *have* a homehost record by now ((and (not (cdr (remote-hh-dat *runremote*))) ;; are we on a homehost? (not (remote-conndat *runremote*))) ;; and no connection (debug:print-info 12 *default-log-port* "rmt:send-receive, case 6 hh-dat: " (remote-hh-dat *runremote*) " conndat: " (remote-conndat *runremote*)) (mutex-unlock! *rmt-mutex*) (tasks:start-and-wait-for-server (tasks:open-db) 0 15) (let* ((cinfo (rmt:get-connection-info 0)) (transport (if cinfo (vector-ref cinfo 6) (server:get-transport)))) ;; TODO: replace with tasks:server-dat-accessor-?? for transport (remote-conndat-set! *runremote* cinfo) ;; calls client:setup which calls client:setup-http (remote-transport-set! *runremote* transport)) (rmt:send-receive cmd rid params attemptnum: attemptnum)) ;; all set up if get this far, dispatch the query ((cdr (remote-hh-dat *runremote*)) ;; we are on homehost (mutex-unlock! *rmt-mutex*) (debug:print-info 12 *default-log-port* "rmt:send-receive, case 7") |
︙ | ︙ | |||
269 270 271 272 273 274 275 276 277 278 | (mutex-lock! *db-multi-sync-mutex*) (set! *db-last-write* start-time) ;; the oldest "write" (mutex-unlock! *db-multi-sync-mutex*))))) res)) (define (rmt:send-receive-no-auto-client-setup connection-info cmd run-id params) (let* ((run-id (if run-id run-id 0)) (res (handle-exceptions exn #f | > | | 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 | (mutex-lock! *db-multi-sync-mutex*) (set! *db-last-write* start-time) ;; the oldest "write" (mutex-unlock! *db-multi-sync-mutex*))))) res)) (define (rmt:send-receive-no-auto-client-setup connection-info cmd run-id params) (let* ((run-id (if run-id run-id 0)) (transport (or (remote-transport *runremote*) (server:get-transport))) (res (handle-exceptions exn #f (case transport ((http) (http-transport:client-api-send-receive run-id connection-info cmd params)) ((rpc) (rpc-transport:client-api-send-receive run-id connection-info cmd params)) (else (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (2)") (exit)) )))) |
︙ | ︙ |
Modified rpc-transport.scm from [1a12c353bd] to [a3a8e96335].
︙ | ︙ | |||
152 153 154 155 156 157 158 | (set! *time-to-exit* #t) ;;(if *dbstruct-db* (db:sync-touched *dbstruct-db* *run-id* force-sync: #t)) (server:remove-dotserver-file *toppath* "anyhost:anyport" force: #t) (tasks:server-delete-record (db:delay-if-busy (tasks:open-db)) server-id " rpc-transport:keep-running complete") | | | 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 | (set! *time-to-exit* #t) ;;(if *dbstruct-db* (db:sync-touched *dbstruct-db* *run-id* force-sync: #t)) (server:remove-dotserver-file *toppath* "anyhost:anyport" force: #t) (tasks:server-delete-record (db:delay-if-busy (tasks:open-db)) server-id " rpc-transport:keep-running complete") (rpc:close-all-connections!) ;;(BB> "Before (exit) (from-on-exit="from-on-exit")") ;;(unless from-on-exit (exit)) ;; sometimes we hang (around) here with 100% cpu. ;;(BB> "After") ;; strace reveals endless: ;; getrusage(RUSAGE_SELF, {ru_utime={413, 917868}, ru_stime={0, 60003}, ...}) = 0 ;; getrusage(RUSAGE_SELF, {ru_utime={414, 9874}, ru_stime={0, 60003}, ...}) = 0 ;; getrusage(RUSAGE_SELF, {ru_utime={414, 13874}, ru_stime={0, 60003}, ...}) = 0 |
︙ | ︙ | |||
274 275 276 277 278 279 280 281 282 283 284 285 286 287 | ;; (define (rpc-transport:client-api-send-receive run-id serverdat cmd params #!key (numretries 3)) ;;(BB> "entered rpc-transport:client-api-send-receive with run-id="run-id " serverdat="serverdat" cmd="cmd" params="params" numretries="numretries) (if (not (vector? serverdat)) (begin (BB> "WHAT?? for run-id="run-id", serverdat="serverdat) (print-call-chain) (exit 1))) (let* ((iface (rpc-transport:server-dat-get-iface serverdat)) (port (rpc-transport:server-dat-get-port serverdat)) (res #f) (api-exec (rpc-transport:get-api-exec iface port)) ;; chached by host/port. may need to clear... (send-receive (lambda () (tcp-buffer-size 0) | > | 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 | ;; (define (rpc-transport:client-api-send-receive run-id serverdat cmd params #!key (numretries 3)) ;;(BB> "entered rpc-transport:client-api-send-receive with run-id="run-id " serverdat="serverdat" cmd="cmd" params="params" numretries="numretries) (if (not (vector? serverdat)) (begin (BB> "WHAT?? for run-id="run-id", serverdat="serverdat) (print-call-chain) (rpc:close-all-connections!) (exit 1))) (let* ((iface (rpc-transport:server-dat-get-iface serverdat)) (port (rpc-transport:server-dat-get-port serverdat)) (res #f) (api-exec (rpc-transport:get-api-exec iface port)) ;; chached by host/port. may need to clear... (send-receive (lambda () (tcp-buffer-size 0) |
︙ | ︙ | |||
305 306 307 308 309 310 311 312 313 314 315 316 317 318 | )) (th1 (make-thread send-receive "send-receive")) (time-out-reached #f) (time-out (lambda () (thread-sleep! 45) (set! time-out-reached #t) (thread-terminate! th1) #f)) (th2 (make-thread time-out "time out"))) (thread-start! th1) (thread-start! th2) (thread-join! th1) (thread-terminate! th2) | > | 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 | )) (th1 (make-thread send-receive "send-receive")) (time-out-reached #f) (time-out (lambda () (thread-sleep! 45) (set! time-out-reached #t) (thread-terminate! th1) #f)) (th2 (make-thread time-out "time out"))) (thread-start! th1) (thread-start! th2) (thread-join! th1) (thread-terminate! th2) |
︙ | ︙ |
Modified server.scm from [3d61545655] to [9cba9551df].
︙ | ︙ | |||
69 70 71 72 73 74 75 | ;; Get the transport (define (server:get-transport) (if *transport-type* *transport-type* (let ((ttype (string->symbol (or (args:get-arg "-transport") (configf:lookup *configdat* "server" "transport") | | | 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 | ;; Get the transport (define (server:get-transport) (if *transport-type* *transport-type* (let ((ttype (string->symbol (or (args:get-arg "-transport") (configf:lookup *configdat* "server" "transport") *DEFAULT-TRANSPORT*)))) (set! *transport-type* ttype) ttype))) ;; Generate a unique signature for this server (define (server:mk-signature) (message-digest-string (md5-primitive) (with-output-to-string |
︙ | ︙ |