Overview
Comment: | ping and login working |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | try-nanomsg |
Files: | files | file ages | folders |
SHA1: |
5f6b43d51b5f155518d8c37550bbaf9b |
User & Date: | matt on 2014-11-23 23:13:11 |
Other Links: | branch diff | manifest | tags |
Context
2014-11-23
| ||
23:29 | Adding trace with mods to print origin. Too useful to wait for offical release check-in: 377d0f07a4 user: matt tags: try-nanomsg | |
23:13 | ping and login working check-in: 5f6b43d51b user: matt tags: try-nanomsg | |
20:56 | Basic server functioning, responding to ping and login check-in: eab23b866a user: matt tags: try-nanomsg | |
Changes
Modified api.scm from [cd180ba59c] to [923a47fb5f].
︙ | ︙ | |||
49 50 51 52 53 54 55 | testmeta-get-record)) ;; These are called by the server on recipt of /api calls ;; - keep it simple, only return the actual result of the call, i.e. no meta info here ;; (define (api:execute-requests dbstruct cmd params) (let ((res | > > | | 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 | testmeta-get-record)) ;; These are called by the server on recipt of /api calls ;; - keep it simple, only return the actual result of the call, i.e. no meta info here ;; (define (api:execute-requests dbstruct cmd params) (let ((res (case (if (symbol? cmd) cmd (string->symbol cmd)) ;; SERVERS ((start-server) (apply server:kind-run params)) ((kill-server) (set! *server-run* #f)) ;; KEYS ((get-key-val-pairs) (apply db:get-key-val-pairs dbstruct params)) ((get-keys) (db:get-keys dbstruct)) |
︙ | ︙ |
Modified client.scm from [6d1c8717b3] to [aa964f7d14].
︙ | ︙ | |||
59 60 61 62 63 64 65 | (debug:print-info 2 "client:setup remaining-tries=" remaining-tries) (let* ((tdbdat (tasks:open-db))) (if (<= remaining-tries 0) (begin (debug:print 0 "ERROR: failed to start or connect to server for run-id " run-id) (exit 1)) (let ((host-info (hash-table-ref/default *runremote* run-id #f))) | | > | > > > | > > < < < > | > > | > > | > > | | 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 | (debug:print-info 2 "client:setup remaining-tries=" remaining-tries) (let* ((tdbdat (tasks:open-db))) (if (<= remaining-tries 0) (begin (debug:print 0 "ERROR: failed to start or connect to server for run-id " run-id) (exit 1)) (let ((host-info (hash-table-ref/default *runremote* run-id #f))) (if host-info ;; this is a bit circular. the host-info *is* the start-res FIXME (let* ((iface (http-transport:server-dat-get-iface host-info)) (port (http-transport:server-dat-get-port host-info)) (start-res (case *transport-type* ((http)(http-transport:client-connect iface port)) ((nmsg) host-info) ;; (http-transport:server-dat-get-socket host-info)) (else #f))) (ping-res (case *transport-type* ((http)(rmt:login-no-auto-client-setup start-res run-id)) ((nmsg)(nmsg-transport:ping iface port timeout: 2 socket: )) (else #f)))) (if ping-res ;; sucessful login? (begin (debug:print-info 2 "client:setup, ping is good using host-info=" host-info ", remaining-tries=" remaining-tries) start-res) ;; return the server info ;; have host info but no ping. shutdown the current connection and try again (begin ;; login failed (debug:print-info 1 "client:setup, ping is bad for start-res=" start-res " and *runremote*=" host-info) (case *transport-type* ((http)(http-transport:close-connections run-id))) (hash-table-delete! *runremote* run-id) (if (< remaining-tries 8) (thread-sleep! 5) (thread-sleep! 1)) (client:setup run-id remaining-tries: (- remaining-tries 1))))) ;; YUK: rename server-dat here (let* ((server-dat (tasks:get-server (db:delay-if-busy tdbdat) run-id))) (debug:print-info 4 "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries) (if server-dat (let* ((iface (tasks:hostinfo-get-interface server-dat)) (hostname (tasks:hostinfo-get-hostname server-dat)) (port (tasks:hostinfo-get-port server-dat)) (start-res (case *transport-type* ((http)(http-transport:client-connect iface port)) ((nmsg)(nmsg-transport:client-connect hostname port)))) (ping-res (case *transport-type* ((http)(rmt:login-no-auto-client-setup start-res run-id)) ((nmsg)(http-transport:server-dat-get-socket start-res))))) ;; socket is the result of a ping (if (and start-res ping-res) (begin (hash-table-set! *runremote* run-id start-res) (debug:print-info 2 "connected to " (http-transport:server-dat-make-url start-res)) start-res) (begin ;; login failed but have a server record, clean out the record and try again (debug:print-info 0 "client:setup, login failed, will attempt to start server ... start-res=" start-res ", run-id=" run-id ", server-dat=" server-dat) (case *transport-type* ((http)(http-transport:close-connections run-id))) (hash-table-delete! *runremote* run-id) (tasks:server-force-clean-run-record (db:delay-if-busy tdbdat) run-id (tasks:hostinfo-get-interface server-dat) (tasks:hostinfo-get-port server-dat) " client:setup (server-dat = #t)") (thread-sleep! 2) |
︙ | ︙ |
Modified common.scm from [96d40f50d8] to [0598bb95e7].
︙ | ︙ | |||
63 64 65 66 67 68 69 | (define *inmemdb* #f) (define *task-db* #f) ;; (vector db path-to-db) (define *db-access-allowed* #t) ;; flag to allow access (define *db-access-mutex* (make-mutex)) ;; SERVER (define *my-client-signature* #f) | | | 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 | (define *inmemdb* #f) (define *task-db* #f) ;; (vector db path-to-db) (define *db-access-allowed* #t) ;; flag to allow access (define *db-access-mutex* (make-mutex)) ;; SERVER (define *my-client-signature* #f) (define *transport-type* 'nmsg) (define *runremote* (make-hash-table)) ;; if set up for server communication this will hold <host port> (define *max-cache-size* 0) (define *logged-in-clients* (make-hash-table)) (define *client-non-blocking-mode* #f) (define *server-id* #f) (define *server-info* #f) (define *time-to-exit* #f) |
︙ | ︙ |
Modified http-transport.scm from [d5c0bd2a5f] to [37e9804757].
︙ | ︙ | |||
316 317 318 319 320 321 322 | (if (vector? server-dat) (let ((api-dat (http-transport:server-dat-get-api-uri server-dat))) (close-connection! api-dat) #t) #f))) | | > | 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 | (if (vector? server-dat) (let ((api-dat (http-transport:server-dat-get-api-uri server-dat))) (close-connection! api-dat) #t) #f))) (define (make-http-transport:server-dat)(make-vector 6)) (define (http-transport:server-dat-get-iface vec) (vector-ref vec 0)) (define (http-transport:server-dat-get-port vec) (vector-ref vec 1)) (define (http-transport:server-dat-get-api-uri vec) (vector-ref vec 2)) (define (http-transport:server-dat-get-api-url vec) (vector-ref vec 3)) (define (http-transport:server-dat-get-api-req vec) (vector-ref vec 4)) (define (http-transport:server-dat-get-last-access vec) (vector-ref vec 5)) (define (http-transport:server-dat-get-socket vec) (vector-ref vec 6)) (define (http-transport:server-dat-make-url vec) (if (and (http-transport:server-dat-get-iface vec) (http-transport:server-dat-get-port vec)) (conc "http://" (http-transport:server-dat-get-iface vec) ":" |
︙ | ︙ |
Modified nmsg-transport.scm from [5a0952db48] to [987e090f8d].
︙ | ︙ | |||
68 69 70 71 72 73 74 | (let* ((start-port (portlogger:open-run-close portlogger:find-port)) (server-thread (make-thread (lambda () (nmsg-transport:try-start-server dbstruct run-id start-port server-id)) "server thread")) (tdbdat (tasks:open-db))) (thread-start! server-thread) (if (nmsg-transport:ping hostn start-port timeout: 2 expected-key: (current-process-id)) | | > | 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 | (let* ((start-port (portlogger:open-run-close portlogger:find-port)) (server-thread (make-thread (lambda () (nmsg-transport:try-start-server dbstruct run-id start-port server-id)) "server thread")) (tdbdat (tasks:open-db))) (thread-start! server-thread) (if (nmsg-transport:ping hostn start-port timeout: 2 expected-key: (current-process-id)) (let ((interface (if (equal? hostn "-")(get-host-name) hostn))) (tasks:server-set-interface-port (db:delay-if-busy tdbdat) server-id interface start-port) (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "dbprep") (set! *server-info* (list hostn start-port)) ;; probably not needed anymore? currently used by keep-running (thread-sleep! 3) ;; give some margin for queries to complete before switching from file based access to server based access (set! *inmemdb* dbstruct) (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running") (thread-start! (make-thread (lambda ()(nmsg-transport:keep-running server-id)) |
︙ | ︙ | |||
140 141 142 143 144 145 146 | (begin (debug:print-info 0 "Another server took the slot, exiting") (exit 0)))) (begin ;; since we didn't get the server lock we are going to clean up and bail out (debug:print-info 2 "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue") (tasks:server-delete-records-for-this-pid (db:delay-if-busy tdbdat) " http-transport:launch") | | > | | 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 | (begin (debug:print-info 0 "Another server took the slot, exiting") (exit 0)))) (begin ;; since we didn't get the server lock we are going to clean up and bail out (debug:print-info 2 "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue") (tasks:server-delete-records-for-this-pid (db:delay-if-busy tdbdat) " http-transport:launch") )) ;; locked in a server id, try to start up (nmsg-transport:run dbstruct hostn run-id server-id)) (set! *didsomething* #t) (exit)))) ;;====================================================================== ;; S E R V E R U T I L I T I E S ;;====================================================================== |
︙ | ︙ | |||
165 166 167 168 169 170 171 | ;;====================================================================== ;; ping the server at host:port ;; return the open socket if successful (return-socket == #t) ;; expect the key expected-key returned in payload ;; send our-key or #f as payload ;; | | | | 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 | ;;====================================================================== ;; ping the server at host:port ;; return the open socket if successful (return-socket == #t) ;; expect the key expected-key returned in payload ;; send our-key or #f as payload ;; (define (nmsg-transport:ping hostn port #!key (timeout 3)(return-socket #t)(expected-key #f)(our-key #f)(socket #f)) ;; send a random number along with pid and check that we get it back (let* ((req (or socket (nn-socket 'req))) (host (if (or (not hostn) (equal? hostn "-")) ;; use localhost (get-host-name) hostn)) (success #f) (keepwaiting #t) (dat (db:obj->string (vector "ping" our-key) transport: 'nmsg)) |
︙ | ︙ | |||
201 202 203 204 205 206 207 | (if (and keepwaiting (< count timeout)) ;; yes, this is very aproximate (loop (+ count 1)))) (if keepwaiting (begin (print "timeout waiting for ping") (thread-terminate! ping)))) "timeout"))) | | | < < < < < < < < < < < < < < < < < | 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 | (if (and keepwaiting (< count timeout)) ;; yes, this is very aproximate (loop (+ count 1)))) (if keepwaiting (begin (print "timeout waiting for ping") (thread-terminate! ping)))) "timeout"))) (if (not socket)(nn-connect req (conc "tcp://" host ":" port))) (handle-exceptions exn (begin ;; (print-call-chain) ;; (print 0 " message: " ((condition-property-accessor 'exn 'message) exn)) ;; (print "exn=" (condition->list exn)) (debug:print-info 1 "ping failed to connect to " host ":" port)) (thread-start! timeout) (thread-start! ping) (thread-join! ping) (if success (thread-terminate! timeout))) (if return-socket (if success req #f) (begin (nn-close req) ;; should it be closed if we were handed a socket? success)))) ;; run nmsg-transport:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; (define (nmsg-transport:keep-running server-id) ;; if none running or if > 20 seconds since ;; server last used then start shutdown ;; This thread waits for the server to come alive |
︙ | ︙ | |||
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 | (debug:print-info 0 "Starting to shutdown the server.") (set! *time-to-exit* #t) (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id " http-transport:keep-running") (debug:print-info 0 "Server shutdown complete. Exiting") ;; (exit) )))))) (define (nmsg-transport:client-signal-handler signum) (handle-exceptions exn (debug:print " ... exiting ...") (let ((th1 (make-thread (lambda () (if (not *received-response*) (receive-message* *runremote*))) ;; flush out last call if applicable | > > > > > > > > > > > > > > > > > > > | 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 | (debug:print-info 0 "Starting to shutdown the server.") (set! *time-to-exit* #t) (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id " http-transport:keep-running") (debug:print-info 0 "Server shutdown complete. Exiting") ;; (exit) )))))) ;;====================================================================== ;; C L I E N T S ;;====================================================================== (define (nmsg-transport:client-connect iface portnum) (let* ((reqsoc (nmsg-transport:ping iface portnum return-socket: #t))) (vector iface portnum #f #f #f (current-seconds) reqsoc))) (define (nmsg-transport:client-api-send-receive run-id connection-info cmd param) (let ((packet (vector cmd param)) (reqsoc (http-transport:server-dat-get-socket connection-info))) (nn-send reqsoc (db:obj->string packet transport: 'nmsg)) (db:string->obj (nn-recv reqsoc) transport: 'nmsg))) ;;====================================================================== ;; J U N K ;;====================================================================== ;; DO NOT USE ;; (define (nmsg-transport:client-signal-handler signum) (handle-exceptions exn (debug:print " ... exiting ...") (let ((th1 (make-thread (lambda () (if (not *received-response*) (receive-message* *runremote*))) ;; flush out last call if applicable |
︙ | ︙ |
Modified rmt.scm from [db091f3ce4] to [1fd758a7a4].
︙ | ︙ | |||
80 81 82 83 84 85 86 87 88 89 90 | ;; SHOULD CLOSE THE CONNECTION HERE (hash-table-delete! *runremote* run-id))))) (hash-table-keys *runremote*))) (mutex-unlock! *db-multi-sync-mutex*) (let* ((run-id (if rid rid 0)) (connection-info (rmt:get-connection-info run-id)) (jparams (db:obj->string params))) (if connection-info ;; use the server if have connection info (let* ((dat (case *transport-type* ((http)(http-transport:client-api-send-receive run-id connection-info cmd jparams)) | > | > | > | 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 | ;; SHOULD CLOSE THE CONNECTION HERE (hash-table-delete! *runremote* run-id))))) (hash-table-keys *runremote*))) (mutex-unlock! *db-multi-sync-mutex*) (let* ((run-id (if rid rid 0)) (connection-info (rmt:get-connection-info run-id)) (jparams (db:obj->string params))) ;; the nmsg method does the encoding under the hood (the http method should be changed to do this also) (if connection-info ;; use the server if have connection info (let* ((dat (case *transport-type* ((http)(http-transport:client-api-send-receive run-id connection-info cmd jparams)) ((nmsg)(nmsg-transport:client-api-send-receive run-id connection-info cmd params)) (else (exit)))) (res (if (vector? dat) (vector-ref dat 1) #f)) (success (if (vector? dat) (vector-ref dat 0) #f))) (http-transport:server-dat-update-last-access connection-info) (if success (case *transport-type* ((http)(db:string->obj res)) ((nmsg) res)) (begin ;; let ((new-connection-info (client:setup run-id))) (debug:print 0 "WARNING: Communication failed, trying call to http-transport:client-api-send-receive again.") (hash-table-delete! *runremote* run-id) ;; don't keep using the same connection ;; no longer killing the server in http-transport:client-api-send-receive ;; may kill it here but what are the criteria? ;; start with three calls then kill server |
︙ | ︙ |
Modified server.scm from [a2c69bc84d] to [57814a5cf8].
︙ | ︙ | |||
48 49 50 51 52 53 54 | ;; all routes though here end in exit ... ;; ;; start_server ;; (define (server:launch run-id) (case *transport-type* ((http)(http-transport:launch run-id)) | | | 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 | ;; all routes though here end in exit ... ;; ;; start_server ;; (define (server:launch run-id) (case *transport-type* ((http)(http-transport:launch run-id)) ((nmsg)(nmsg-transport:launch run-id)) (else (debug:print 0 "ERROR: unknown server type " *transport-type*)))) ;;====================================================================== ;; Q U E U E M A N A G E M E N T ;;====================================================================== ;; We don't want to flush the queue if it was just flushed |
︙ | ︙ |
Modified tasks.scm from [7c5174d4f3] to [84082aa618].
︙ | ︙ | |||
183 184 185 186 187 188 189 | (get-host-name) ;; hostname -1 ;; port -1 ;; pubport (random 1000) ;; priority (used a tiebreaker on get-available) "available" ;; state (common:version-signature) ;; mt_version -1 ;; interface | | | 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 | (get-host-name) ;; hostname -1 ;; port -1 ;; pubport (random 1000) ;; priority (used a tiebreaker on get-available) "available" ;; state (common:version-signature) ;; mt_version -1 ;; interface (conc *transport-type*) ;; transport run-id )) (define (tasks:num-in-available-state mdb run-id) (let ((res 0)) (sqlite3:for-each-row (lambda (num-in-queue) |
︙ | ︙ |