Overview
Comment: | Partial resurection of nsmg transport |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | trunk |
Files: | files | file ages | folders |
SHA1: |
3e20c90912c3a94d71eac564e628025d |
User & Date: | matt on 2015-09-16 00:56:17 |
Other Links: | manifest | tags |
Context
2015-09-16
| ||
23:58 | tweaks check-in: 399a5a4dfd user: matt tags: trunk | |
00:56 | Partial resurection of nsmg transport check-in: 3e20c90912 user: matt tags: trunk | |
2015-09-15
| ||
22:00 | Start v1.70 branch of development check-in: 1905bd503c user: matt tags: trunk | |
Changes
Modified Makefile from [0976c16632] to [8917f4f980].
1 2 3 4 5 6 7 8 9 10 11 | # make install CSCOPTS='-accumulate-profile -profile-name $(PWD)/profile-ww$(shell date +%V.%u)' PREFIX=$(PWD) CSCOPTS= INSTALL=install SRCFILES = common.scm items.scm launch.scm \ ods.scm runconfig.scm server.scm configf.scm \ db.scm keys.scm margs.scm megatest-version.scm \ process.scm runs.scm tasks.scm tests.scm genexample.scm \ http-transport.scm nmsg-transport.scm filedb.scm \ client.scm gutils.scm synchash.scm daemon.scm mt.scm dcommon.scm \ tree.scm ezsteps.scm lock-queue.scm sdb.scm \ | | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | # make install CSCOPTS='-accumulate-profile -profile-name $(PWD)/profile-ww$(shell date +%V.%u)' PREFIX=$(PWD) CSCOPTS= INSTALL=install SRCFILES = common.scm items.scm launch.scm \ ods.scm runconfig.scm server.scm configf.scm \ db.scm keys.scm margs.scm megatest-version.scm \ process.scm runs.scm tasks.scm tests.scm genexample.scm \ http-transport.scm nmsg-transport.scm filedb.scm \ client.scm gutils.scm synchash.scm daemon.scm mt.scm dcommon.scm \ tree.scm ezsteps.scm lock-queue.scm sdb.scm \ rmt.scm api.scm tdb.scm \ portlogger.scm archive.scm # Eggs to install (straightforward ones) EGGS=matchable readline apropos base64 regex-literals format regex-case test coops trace csv \ dot-locking posix-utils posix-extras directory-utils hostinfo tcp-server rpc csv-xml fmt \ json md5 awful http-client spiffy uri-common intarweb spiffy-request-vars \ spiffy-directory-listing ssax sxml-serializer sxml-modifications iup canvas-draw sqlite3 |
︙ | ︙ | |||
58 59 60 61 62 63 64 | tests.o db.o launch.o runs.o dashboard-tests.o dashboard-guimonitor.o dashboard-main.o monitor.o dashboard.o \ archive.o megatest.o : db_records.scm tests.o runs.o dashboard.o dashboard-tests.o dashboard-main.o : run_records.scm db.o ezsteps.o keys.o launch.o megatest.o monitor.o runs-for-ref.o runs.o tests.o : key_records.scm tests.o tasks.o dashboard-tasks.o : task_records.scm runs.o : test_records.scm megatest.o : megatest-fossil-hash.scm | | | 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 | tests.o db.o launch.o runs.o dashboard-tests.o dashboard-guimonitor.o dashboard-main.o monitor.o dashboard.o \ archive.o megatest.o : db_records.scm tests.o runs.o dashboard.o dashboard-tests.o dashboard-main.o : run_records.scm db.o ezsteps.o keys.o launch.o megatest.o monitor.o runs-for-ref.o runs.o tests.o : key_records.scm tests.o tasks.o dashboard-tasks.o : task_records.scm runs.o : test_records.scm megatest.o : megatest-fossil-hash.scm client.scm common.scm configf.scm dashboard-guimonitor.scm dashboard-tests.scm dashboard.scm db.scm dcommon.scm ezsteps.scm fs-transport.scm http-transport.scm index-tree.scm items.scm keys.scm launch.scm megatest.scm monitor.scm mt.scm newdashboard.scm runconfig.scm runs.scm server.scm tdb.scm tests.scm tree.scm zmq-transport.scm : common_records.scm # Temporary while transitioning to new routine # runs.o : run-tests-queue-classic.scm run-tests-queue-new.scm megatest-fossil-hash.scm : $(SRCFILES) megatest.scm *_records.scm echo "(define megatest-fossil-hash \"$(MTESTHASH)\")" > megatest-fossil-hash.new if ! diff -q megatest-fossil-hash.new megatest-fossil-hash.scm ; then echo copying .new to .scm;cp -f megatest-fossil-hash.new megatest-fossil-hash.scm;fi |
︙ | ︙ |
Modified client.scm from [ecbc2f1355] to [15e90907e5].
︙ | ︙ | |||
41 42 43 44 45 46 47 | (define (client:logout serverdat) (let ((ok (and (socket? serverdat) (cdb:logout serverdat *toppath* (client:get-signature))))) ok)) (define (client:connect iface port) (case (server:get-transport) | | | | > > | > | | 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 | (define (client:logout serverdat) (let ((ok (and (socket? serverdat) (cdb:logout serverdat *toppath* (client:get-signature))))) ok)) (define (client:connect iface port) (case (server:get-transport) ((nmsg) (nmsg-transport:client-connect iface port)) ((http) (http:client-connect iface port)) ;; ((zmq) (zmq:client-connect iface port)) (else (debug:print 0 "ERROR: Unknown transport " (server:get-transport)) (exit 1)))) (define (client:setup run-id #!key (remaining-tries 10) (failed-connects 0)) (case (server:get-transport) ((nmsg) (client:setup-nmsg run-id)) ((http)(client:setup-http run-id)) (else (debug:print 0 "ERROR: Unknown transport " (server:get-transport))))) ;; (define (client:login-no-auto-setup server-info run-id) ;; (case (server:get-transport) ;; ((rpc) (rpc:login-no-auto-client-setup server-info run-id)) ;; ((http) (rmt:login-no-auto-client-setup server-info run-id)) ;; (else (rpc:login-no-auto-client-setup server-info run-id)))) ;; |
︙ | ︙ | |||
154 155 156 157 158 159 160 161 162 163 164 165 166 167 | ;; ;; lookup_server, need to remove *runremote* stuff ;; (define (client:setup-http run-id #!key (remaining-tries 10) (failed-connects 0)) (debug:print-info 2 "client:setup remaining-tries=" remaining-tries) (let* ((tdbdat (tasks:open-db))) (if (<= remaining-tries 0) (begin (debug:print 0 "ERROR: failed to start or connect to server for run-id " run-id) (exit 1)) (let* ((server-dat (tasks:get-server (db:delay-if-busy tdbdat) run-id))) (debug:print-info 4 "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries) (if server-dat (let* ((iface (tasks:hostinfo-get-interface server-dat)) | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 | ;; ;; lookup_server, need to remove *runremote* stuff ;; (define (client:setup-http run-id #!key (remaining-tries 10) (failed-connects 0)) (debug:print-info 2 "client:setup remaining-tries=" remaining-tries) (let* ((tdbdat (tasks:open-db))) (if (<= remaining-tries 0) (begin (debug:print 0 "ERROR: failed to start or connect to server for run-id " run-id) (exit 1)) (let* ((server-dat (tasks:get-server (db:delay-if-busy tdbdat) run-id))) (debug:print-info 4 "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries) (if server-dat (let* ((iface (tasks:hostinfo-get-interface server-dat)) (hostname (tasks:hostinfo-get-hostname server-dat)) (port (tasks:hostinfo-get-port server-dat)) (start-res (case *transport-type* ((http)(http-transport:client-connect iface port)) ((nmsg)(nmsg-transport:client-connect hostname port)))) (ping-res (case *transport-type* ((http)(rmt:login-no-auto-client-setup start-res run-id)) ((nmsg)(let ((logininfo (rmt:login-no-auto-client-setup start-res run-id))) (if logininfo (car (vector-ref logininfo 1)) #f)))))) (if (and start-res ping-res) (begin (hash-table-set! *runremote* run-id start-res) (debug:print-info 2 "connected to " (http-transport:server-dat-make-url start-res)) start-res) (begin ;; login failed but have a server record, clean out the record and try again (debug:print-info 0 "client:setup, login failed, will attempt to start server ... start-res=" start-res ", run-id=" run-id ", server-dat=" server-dat) (case *transport-type* ((http)(http-transport:close-connections run-id))) (hash-table-delete! *runremote* run-id) (tasks:kill-server-run-id run-id) (tasks:server-force-clean-run-record (db:delay-if-busy tdbdat) run-id (tasks:hostinfo-get-interface server-dat) (tasks:hostinfo-get-port server-dat) " client:setup (server-dat = #t)") (if (> remaining-tries 8) (thread-sleep! (+ 1 (random 5))) ;; spread out the starts a little (thread-sleep! (+ 15 (random 20)))) ;; it isn't going well. give it plenty of time (server:try-running run-id) (thread-sleep! 5) ;; give server a little time to start up (client:setup run-id remaining-tries: (- remaining-tries 1)) ))) (begin ;; no server registered (let ((num-available (tasks:num-in-available-state (db:dbdat-get-db tdbdat) run-id))) (debug:print-info 0 "client:setup, no server registered, remaining-tries=" remaining-tries " num-available=" num-available) (if (< num-available 2) (server:try-running run-id)) (thread-sleep! (+ 5 (random (- 20 remaining-tries)))) ;; give server a little time to start up, randomize a little to avoid start storms. (client:setup run-id remaining-tries: (- remaining-tries 1))))))))) ;; for nanomsg ;; (define (client:setup-nmsg run-id #!key (remaining-tries 10) (failed-connects 0)) (debug:print-info 2 "client:setup remaining-tries=" remaining-tries) (let* ((tdbdat (tasks:open-db))) (if (<= remaining-tries 0) (begin (debug:print 0 "ERROR: failed to start or connect to server for run-id " run-id) (exit 1)) (let* ((server-dat (tasks:get-server (db:delay-if-busy tdbdat) run-id))) (debug:print-info 4 "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries) (if server-dat (let* ((iface (tasks:hostinfo-get-interface server-dat)) |
︙ | ︙ |
Modified megatest.scm from [0816f72c8d] to [8006136d14].
︙ | ︙ | |||
143 144 145 146 147 148 149 | -import-megatest.db : migrate a database from v1.55 series to v1.60 series -sync-to-megatest.db : migrate data back to megatest.db -update-meta : update the tests metadata for all tests -setvars VAR1=val1,VAR2=val2 : Add environment variables to a run NB// these are overwritten by values set in config files. -server -|hostname : start the server (reduces contention on megatest.db), use - to automatically figure out hostname | | | 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 | -import-megatest.db : migrate a database from v1.55 series to v1.60 series -sync-to-megatest.db : migrate data back to megatest.db -update-meta : update the tests metadata for all tests -setvars VAR1=val1,VAR2=val2 : Add environment variables to a run NB// these are overwritten by values set in config files. -server -|hostname : start the server (reduces contention on megatest.db), use - to automatically figure out hostname -transport http|nmsg : use http or nanomsg for transport (default is http) -daemonize : fork into background and disconnect from stdin/out -log logfile : send stdout and stderr to logfile -list-servers : list the servers -stop-server id : stop server specified by id (see output of -list-servers), use 0 to kill all -repl : start a repl (useful for extending megatest) -load file.scm : load and run file.scm |
︙ | ︙ |
Modified nmsg-transport.scm from [c28712df60] to [81ad0e0130].
︙ | ︙ | |||
9 10 11 12 13 14 15 | ;; PURPOSE. (require-extension (srfi 18) extras tcp s11n) (use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) (import (prefix sqlite3 sqlite3:)) | | > > > > > > > > > > > > > > > > > > > | 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 | ;; PURPOSE. (require-extension (srfi 18) extras tcp s11n) (use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) (import (prefix sqlite3 sqlite3:)) (use nanomsg) (declare (unit nmsg-transport)) (declare (uses common)) (declare (uses db)) (declare (uses tests)) (declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. (declare (uses server)) (declare (uses portlogger)) (include "common_records.scm") (include "db_records.scm") (define (make-http-transport:server-dat)(make-vector 6)) (define (http-transport:server-dat-get-iface vec) (vector-ref vec 0)) (define (http-transport:server-dat-get-port vec) (vector-ref vec 1)) (define (http-transport:server-dat-get-api-uri vec) (vector-ref vec 2)) (define (http-transport:server-dat-get-api-url vec) (vector-ref vec 3)) (define (http-transport:server-dat-get-api-req vec) (vector-ref vec 4)) (define (http-transport:server-dat-get-last-access vec) (vector-ref vec 5)) (define (http-transport:server-dat-get-socket vec) (vector-ref vec 6)) (define (http-transport:server-dat-make-url vec) (if (and (http-transport:server-dat-get-iface vec) (http-transport:server-dat-get-port vec)) (conc "http://" (http-transport:server-dat-get-iface vec) ":" (http-transport:server-dat-get-port vec)) #f)) ;; Transition to pub --> sub with pull <-- push ;; ;; 1. client sends request to server via push to the pull port ;; 2. server puts request in queue or processes immediately as appropriate ;; 3. server puts responses from completed requests into pub port ;; |
︙ | ︙ | |||
54 55 56 57 58 59 60 61 62 63 64 65 66 67 | (define (nmsg-transport:make-server-url hostport #!key (bindall #f)) (if (not hostport) #f (conc "tcp://" (if bindall "*" (car hostport)) ":" (cadr hostport)))) (define *server-loop-heart-beat* (current-seconds)) (define *heartbeat-mutex* (make-mutex)) ;;====================================================================== ;; S E R V E R ;;====================================================================== (define (nmsg-transport:run dbstruct hostn run-id server-id #!key (retrynum 1000)) (debug:print 2 "Attempting to start the server ...") | > | 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 | (define (nmsg-transport:make-server-url hostport #!key (bindall #f)) (if (not hostport) #f (conc "tcp://" (if bindall "*" (car hostport)) ":" (cadr hostport)))) (define *server-loop-heart-beat* (current-seconds)) (define *heartbeat-mutex* (make-mutex)) (define *nmsg-mutex* (make-mutex)) ;;====================================================================== ;; S E R V E R ;;====================================================================== (define (nmsg-transport:run dbstruct hostn run-id server-id #!key (retrynum 1000)) (debug:print 2 "Attempting to start the server ...") |
︙ | ︙ | |||
278 279 280 281 282 283 284 | (if (and (string? tmo) (string->number tmo)) (* 60 60 (string->number tmo)) ;; (* 3 24 60 60) ;; default to three days (* 60 1) ;; default to one minute ;; (* 60 60 25) ;; default to 25 hours )))) | | | 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 | (if (and (string? tmo) (string->number tmo)) (* 60 60 (string->number tmo)) ;; (* 3 24 60 60) ;; default to three days (* 60 1) ;; default to one minute ;; (* 60 60 25) ;; default to 25 hours )))) (print "Keep-running got server pid " server-id ", using iface " iface " and port " port " and transport of " *transport-type*) (let loop ((count 0)) (thread-sleep! 4) ;; no need to do this very often ;; NB// sync currently does NOT return queue-length (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1))) ;; (print "Server running, count is " count) (if (< count 1) ;; 3x3 = 9 secs aprox (loop (+ count 1))) |
︙ | ︙ | |||
319 320 321 322 323 324 325 | (vector iface portnum #f #f #f (current-seconds) reqsoc))) ;; returns result, there is no sucess/fail flag - handled via excpections ;; (define (nmsg-transport:client-api-send-receive run-id connection-info cmd param #!key (remtries 5)) ;; NB// In the html version of this routine there is a call to ;; tasks:kill-server-run-id when there is an exception | | | | 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 | (vector iface portnum #f #f #f (current-seconds) reqsoc))) ;; returns result, there is no sucess/fail flag - handled via excpections ;; (define (nmsg-transport:client-api-send-receive run-id connection-info cmd param #!key (remtries 5)) ;; NB// In the html version of this routine there is a call to ;; tasks:kill-server-run-id when there is an exception (mutex-lock! *nmsg-mutex*) (let* ((packet (vector cmd param)) (reqsoc (http-transport:server-dat-get-socket connection-info)) (res (nmsg-transport:client-api-send-receive-raw reqsoc packet))) ;; (status (vector-ref rawres 0)) ;; (result (vector-ref rawres 1))) (mutex-unlock! *nmsg-mutex*) res)) ;; (vector status (if status (db:string->obj result transport: 'nmsg) result)))) ;;====================================================================== ;; J U N K ;;====================================================================== ;; DO NOT USE |
︙ | ︙ |
Modified server.scm from [2936193c4a] to [f78f96d212].
︙ | ︙ | |||
17 18 19 20 21 22 23 | (declare (unit server)) (declare (uses common)) (declare (uses db)) (declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. (declare (uses synchash)) | | | | 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | (declare (unit server)) (declare (uses common)) (declare (uses db)) (declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. (declare (uses synchash)) ;; (declare (uses http-transport)) ;; (declare (uses rpc-transport)) (declare (uses nmsg-transport)) (declare (uses launch)) (declare (uses daemon)) (include "common_records.scm") (include "db_records.scm") |
︙ | ︙ | |||
49 50 51 52 53 54 55 | ;; ;; start_server ;; (define (server:launch run-id) (case *transport-type* ((http)(http-transport:launch run-id)) ((nmsg)(nmsg-transport:launch run-id)) | | | | | 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 | ;; ;; start_server ;; (define (server:launch run-id) (case *transport-type* ((http)(http-transport:launch run-id)) ((nmsg)(nmsg-transport:launch run-id)) ;; ((rpc) (rpc-transport:launch run-id)) (else (debug:print 0 "ERROR: unknown server type " *transport-type*)))) ;; (else (debug:print 0 "ERROR: No known transport set, transport=" transport ", using rpc") ;; (rpc-transport:launch run-id))))) ;;====================================================================== ;; S E R V E R U T I L I T I E S ;;====================================================================== ;; Get the transport (define (server:get-transport) (if *transport-type* *transport-type* (let ((ttype (string->symbol (or (args:get-arg "-transport") (configf:lookup *configdat* "server" "transport") "http")))) (set! *transport-type* ttype) ttype))) ;; Generate a unique signature for this server (define (server:mk-signature) (message-digest-string (md5-primitive) (with-output-to-string (lambda () (write (list (current-directory) (argv))))))) ;; When using zmq this would send the message back (two step process) ;; with spiffy or rpc this simply returns the return data to be returned ;; (define (server:reply return-addr query-sig success/fail result) (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result) ;; (send-message pubsock target send-more: #t) ;; (send-message pubsock (case (server:get-transport) ;; ((rpc) (db:obj->string (vector success/fail query-sig result))) ((http) (db:obj->string (vector success/fail query-sig result))) ((zmq) (let ((pub-socket (vector-ref *runremote* 1))) (send-message pub-socket return-addr send-more: #t) (send-message pub-socket (db:obj->string (vector success/fail query-sig result))))) ((fs) result) (else |
︙ | ︙ |
Modified tests/fullrun/megatest.config from [007216e935] to [a6a13f154d].
︙ | ︙ | |||
143 144 145 146 147 148 149 | # XTERM [system xterm] # RUNDEAD [system exit 56] [server] # force use of server always | | | | | | | 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 | # XTERM [system xterm] # RUNDEAD [system exit 56] [server] # force use of server always required yes # Use http instead of direct filesystem access # transport http # transport fs transport nmsg synchronous 0 # If the server can't be started on this port it will try the next port until # it succeeds port 9080 # This server will keep running this number of hours after last access. # Three minutes is 0.05 hours # timeout 0.025 timeout 0.01 # faststart; unless no, start server but proceed with writes until server started faststart no # faststart yes # Start server when average query takes longer than this # server-query-threshold 55500 server-query-threshold 1000 # daemonize yes # hostname #{scheme (get-host-name)} |
︙ | ︙ |