Overview
Comment: | Disabled fs and got zmq working again (I think) |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | network-only-transport |
Files: | files | file ages | folders |
SHA1: |
9bf6d9d0fe87db09be7e73a391269cfb |
User & Date: | matt on 2013-03-04 22:27:22 |
Other Links: | branch diff | manifest | tags |
Context
2013-03-04
| ||
23:30 | zmq running ok on Ubuntu check-in: 1a3dcbdc46 user: matt tags: network-only-transport | |
22:27 | Disabled fs and got zmq working again (I think) check-in: 9bf6d9d0fe user: matt tags: network-only-transport | |
2013-02-28
| ||
23:28 | Converted couple more calls in runs.scm to full remote check-in: c6cee5cbb1 user: matt tags: network-only-transport | |
Changes
Modified db.scm from [786acaa696] to [d9063fc14a].
︙ | ︙ | |||
1123 1124 1125 1126 1127 1128 1129 | (set! tmp (db:string->obj rawdat)) (vector-ref tmp 2)))) ((zmq) (handle-exceptions exn (begin (thread-sleep! 5) | | | | | 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 | (set! tmp (db:string->obj rawdat)) (vector-ref tmp 2)))) ((zmq) (handle-exceptions exn (begin (thread-sleep! 5) (if (> numretries 0)(apply cdb:client-call serverdat qtype immediate (- numretries 1) params))) (let* ((push-socket (vector-ref serverdat 0)) (sub-socket (vector-ref serverdat 1)) (client-sig (server:get-client-signature)) (query-sig (message-digest-string (md5-primitive) (conc qtype immediate params))) (zdat (db:obj->string (vector client-sig qtype immediate query-sig params (current-seconds)))) ;; (with-output-to-string (lambda ()(serialize params)))) (res #f) (send-receive (lambda () (debug:print-info 11 "sending message") (send-message push-socket zdat) |
︙ | ︙ | |||
1155 1156 1157 1158 1159 1160 1161 | (if (> numretries 0) (begin (debug:print 2 "WARNING: no reply to query " params ", trying resend") (debug:print-info 11 "re-sending message") (send-message push-socket zdat) (debug:print-info 11 "message re-sent") (loop (- n 1))) | | | 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 | (if (> numretries 0) (begin (debug:print 2 "WARNING: no reply to query " params ", trying resend") (debug:print-info 11 "re-sending message") (send-message push-socket zdat) (debug:print-info 11 "message re-sent") (loop (- n 1))) ;; (apply cdb:client-call *runremote* qtype immediate (- numretries 1) params)) (begin (debug:print 0 "ERROR: cdb:client-call timed out " params ", exiting.") (exit 5)))))))) (debug:print-info 11 "Starting threads") (let ((th1 (make-thread send-receive "send receive")) (th2 (make-thread timeout "timeout"))) (thread-start! th1) |
︙ | ︙ |
Modified launch.scm from [9d5e223a7b] to [ae5b023e35].
︙ | ︙ | |||
58 59 60 61 62 63 64 65 66 67 68 69 70 71 | (let* ((testpath (assoc/default 'testpath cmdinfo)) ;; How is testpath different from work-area ?? (top-path (assoc/default 'toppath cmdinfo)) (work-area (assoc/default 'work-area cmdinfo)) (test-name (assoc/default 'test-name cmdinfo)) (runscript (assoc/default 'runscript cmdinfo)) (ezsteps (assoc/default 'ezsteps cmdinfo)) (runremote (assoc/default 'runremote cmdinfo)) (run-id (assoc/default 'run-id cmdinfo)) (test-id (assoc/default 'test-id cmdinfo)) (target (assoc/default 'target cmdinfo)) (itemdat (assoc/default 'itemdat cmdinfo)) (env-ovrd (assoc/default 'env-ovrd cmdinfo)) (set-vars (assoc/default 'set-vars cmdinfo)) ;; pre-overrides from -setvar (runname (assoc/default 'runname cmdinfo)) | > | 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 | (let* ((testpath (assoc/default 'testpath cmdinfo)) ;; How is testpath different from work-area ?? (top-path (assoc/default 'toppath cmdinfo)) (work-area (assoc/default 'work-area cmdinfo)) (test-name (assoc/default 'test-name cmdinfo)) (runscript (assoc/default 'runscript cmdinfo)) (ezsteps (assoc/default 'ezsteps cmdinfo)) (runremote (assoc/default 'runremote cmdinfo)) (transport (assoc/default 'transport cmdinfo)) (run-id (assoc/default 'run-id cmdinfo)) (test-id (assoc/default 'test-id cmdinfo)) (target (assoc/default 'target cmdinfo)) (itemdat (assoc/default 'itemdat cmdinfo)) (env-ovrd (assoc/default 'env-ovrd cmdinfo)) (set-vars (assoc/default 'set-vars cmdinfo)) ;; pre-overrides from -setvar (runname (assoc/default 'runname cmdinfo)) |
︙ | ︙ | |||
83 84 85 86 87 88 89 90 91 92 93 94 95 96 | fulln runscript))))) ;; assume it is on the path (rollup-status 0)) (debug:print 2 "Exectuing " test-name " (id: " test-id ") on " (get-host-name)) ;; Setup the *runremote* global var (if *runremote* (debug:print 2 "ERROR: I'm not expecting *runremote* to be set at this time")) (set! *runremote* runremote) (set! keys (cdb:remote-run db:get-keys #f)) (set! keyvals (if run-id (cdb:remote-run db:get-key-vals #f run-id) #f)) ;; apply pre-overrides before other variables. The pre-override vars must not ;; clobbers things from the official sources such as megatest.config and runconfigs.config (if (string? set-vars) (let ((varpairs (string-split set-vars ","))) (debug:print 4 "varpairs: " varpairs) | > | 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 | fulln runscript))))) ;; assume it is on the path (rollup-status 0)) (debug:print 2 "Exectuing " test-name " (id: " test-id ") on " (get-host-name)) ;; Setup the *runremote* global var (if *runremote* (debug:print 2 "ERROR: I'm not expecting *runremote* to be set at this time")) (set! *runremote* runremote) (set! *transport-type* transport) (set! keys (cdb:remote-run db:get-keys #f)) (set! keyvals (if run-id (cdb:remote-run db:get-key-vals #f run-id) #f)) ;; apply pre-overrides before other variables. The pre-override vars must not ;; clobbers things from the official sources such as megatest.config and runconfigs.config (if (string? set-vars) (let ((varpairs (string-split set-vars ","))) (debug:print 4 "varpairs: " varpairs) |
︙ | ︙ | |||
577 578 579 580 581 582 583 584 585 586 587 588 589 590 | (create-directory work-area #t) (debug:print 0 "WARNING: No disk work area specified - running in the test directory under tmp_run"))) (set! cmdparms (base64:base64-encode (with-output-to-string (lambda () ;; (list 'hosts hosts) (write (list (list 'testpath test-path) (list 'runremote *runremote*) (list 'toppath *toppath*) (list 'work-area work-area) (list 'test-name test-name) (list 'runscript runscript) (list 'run-id run-id ) (list 'test-id test-id ) (list 'itemdat itemdat ) | > | 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 | (create-directory work-area #t) (debug:print 0 "WARNING: No disk work area specified - running in the test directory under tmp_run"))) (set! cmdparms (base64:base64-encode (with-output-to-string (lambda () ;; (list 'hosts hosts) (write (list (list 'testpath test-path) (list 'runremote *runremote*) (list 'transport *transport-type*) (list 'toppath *toppath*) (list 'work-area work-area) (list 'test-name test-name) (list 'runscript runscript) (list 'run-id run-id ) (list 'test-id test-id ) (list 'itemdat itemdat ) |
︙ | ︙ |
Modified megatest.scm from [cf6054b955] to [ff38c8608a].
︙ | ︙ | |||
101 102 103 104 105 106 107 108 109 110 111 112 113 114 | -setvars VAR1=val1,VAR2=val2 : Add environment variables to a run NB// these are overwritten by values set in config files. -server -|hostname : start the server (reduces contention on megatest.db), use - to automatically figure out hostname -transport http|zmq : use http or zmq for transport (default is http) -list-servers : list the servers -repl : start a repl (useful for extending megatest) Spreadsheet generation -extract-ods fname.ods : extract an open document spreadsheet from the database -pathmod path : insert path, i.e. path/runame/itempath/logfile.html will clear the field if no rundir/testname/itempath/logfile if it contains forward slashes the path will be converted to windows style | > | 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 | -setvars VAR1=val1,VAR2=val2 : Add environment variables to a run NB// these are overwritten by values set in config files. -server -|hostname : start the server (reduces contention on megatest.db), use - to automatically figure out hostname -transport http|zmq : use http or zmq for transport (default is http) -list-servers : list the servers -repl : start a repl (useful for extending megatest) -load file.scm : load and run file.scm Spreadsheet generation -extract-ods fname.ods : extract an open document spreadsheet from the database -pathmod path : insert path, i.e. path/runame/itempath/logfile.html will clear the field if no rundir/testname/itempath/logfile if it contains forward slashes the path will be converted to windows style |
︙ | ︙ | |||
170 171 172 173 174 175 176 177 178 179 180 181 182 183 | "-env2file" "-setvars" "-set-state-status" "-debug" ;; for *verbosity* > 2 "-gen-megatest-test" "-override-timeout" "-test-files" ;; -test-paths is for listing all ) (list "-h" "-version" "-force" "-xterm" "-showkeys" "-test-status" | > | 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 | "-env2file" "-setvars" "-set-state-status" "-debug" ;; for *verbosity* > 2 "-gen-megatest-test" "-override-timeout" "-test-files" ;; -test-paths is for listing all "-load" ;; load and exectute a scheme file ) (list "-h" "-version" "-force" "-xterm" "-showkeys" "-test-status" |
︙ | ︙ | |||
388 389 390 391 392 393 394 | (args:get-arg "-list-db-targets")) (if (setup-for-run) (let* ((db #f) (runpatt (args:get-arg "-list-runs")) (testpatt (if (args:get-arg "-testpatt") (args:get-arg "-testpatt") "%")) | | | | | | 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 | (args:get-arg "-list-db-targets")) (if (setup-for-run) (let* ((db #f) (runpatt (args:get-arg "-list-runs")) (testpatt (if (args:get-arg "-testpatt") (args:get-arg "-testpatt") "%")) (runsdat (cdb:remote-run db:get-runs #f runpatt #f #f '())) (runs (db:get-rows runsdat)) (header (db:get-header runsdat)) (keys (cdb:remote-run db:get-keys #f)) (keynames (map key:get-fieldname keys)) (db-targets (args:get-arg "-list-db-targets")) (seen (make-hash-table))) ;; Each run (for-each (lambda (run) (let ((targetstr (string-intersperse (map (lambda (x) (db:get-value-by-header run header x)) keynames) "/"))) (if db-targets (if (not (hash-table-ref/default seen targetstr #f)) (begin (hash-table-set! seen targetstr #t) ;; (print "[" targetstr "]")))) (print targetstr)))) (if (not db-targets) (let* ((run-id (db:get-value-by-header run header "id")) (tests (cdb:remote-run db:get-tests-for-run #f run-id testpatt '() '()))) (debug:print 1 "Run: " targetstr " status: " (db:get-value-by-header run header "state") " run-id: " run-id ", number tests: " (length tests)) (for-each (lambda (test) (format #t " Test: ~25a State: ~15a Status: ~15a Runtime: ~5@as Time: ~22a Host: ~10a\n" (conc (db:test-get-testname test) |
︙ | ︙ | |||
435 436 437 438 439 440 441 | (begin (print " cpuload: " (db:test-get-cpuload test) "\n diskfree: " (db:test-get-diskfree test) "\n uname: " (db:test-get-uname test) "\n rundir: " (db:test-get-rundir test) ) ;; Each test | | | 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 | (begin (print " cpuload: " (db:test-get-cpuload test) "\n diskfree: " (db:test-get-diskfree test) "\n uname: " (db:test-get-uname test) "\n rundir: " (db:test-get-rundir test) ) ;; Each test (let ((steps (cdb:remote-run db:get-steps-for-test #f (db:test-get-id test)))) (for-each (lambda (step) (format #t " Step: ~20a State: ~10a Status: ~10a Time ~22a\n" (db:step-get-stepname step) (db:step-get-state step) (db:step-get-status step) |
︙ | ︙ | |||
673 674 675 676 677 678 679 680 681 | (begin (launch:execute (args:get-arg "-execute")) (set! *didsomething* #t))) ;;====================================================================== ;; Test commands (i.e. for use inside tests) ;;====================================================================== (if (args:get-arg "-step") | > > > > > > > > > > > > > > > > > > > > > > > > > > > < | | < | < < < < < < < < < < | | | < < < < < < < | < < < | | | | 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 | (begin (launch:execute (args:get-arg "-execute")) (set! *didsomething* #t))) ;;====================================================================== ;; Test commands (i.e. for use inside tests) ;;====================================================================== (define (megatest:step step state status logfile msg) (if (not (getenv "MT_CMDINFO")) (begin (debug:print 0 "ERROR: MT_CMDINFO env var not set, -step must be called *inside* a megatest invoked environment!") (exit 5)) (let* ((cmdinfo (read (open-input-string (base64:base64-decode (getenv "MT_CMDINFO"))))) (runremote (assoc/default 'runremote cmdinfo)) (testpath (assoc/default 'testpath cmdinfo)) (test-name (assoc/default 'test-name cmdinfo)) (runscript (assoc/default 'runscript cmdinfo)) (db-host (assoc/default 'db-host cmdinfo)) (run-id (assoc/default 'run-id cmdinfo)) (test-id (assoc/default 'test-id cmdinfo)) (itemdat (assoc/default 'itemdat cmdinfo)) (db #f)) (change-directory testpath) (set! *runremote* runremote) (if (not (setup-for-run)) (begin (debug:print 0 "Failed to setup, exiting") (exit 1))) (if (and state status) (open-run-close db:teststep-set-status! db test-id step state status msg logfile) (begin (debug:print 0 "ERROR: You must specify :state and :status with every call to -step") (exit 6)))))) (if (args:get-arg "-step") (begin (megatest:step (args:get-arg "-step") (args:get-arg ":state") (args:get-arg ":status") (args:get-arg "-setlog") (args:get-arg "-m")) ;; (if db (sqlite3:finalize! db)) (set! *didsomething* #t))) (if (or (args:get-arg "-setlog") ;; since setting up is so costly lets piggyback on -test-status (args:get-arg "-set-toplog") (args:get-arg "-test-status") (args:get-arg "-set-values") (args:get-arg "-load-test-data") (args:get-arg "-runstep") (args:get-arg "-summarize-items")) |
︙ | ︙ | |||
885 886 887 888 889 890 891 | (open-run-close runs:update-all-test_meta db) (set! *didsomething* #t))) ;;====================================================================== ;; Start a repl ;;====================================================================== | | > > | > | 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 | (open-run-close runs:update-all-test_meta db) (set! *didsomething* #t))) ;;====================================================================== ;; Start a repl ;;====================================================================== (if (or (args:get-arg "-repl") (args:get-arg "-load")) (let* ((toppath (setup-for-run)) (db (if toppath (open-db) #f))) (if db (begin (set! *db* db) (set! *client-non-blocking-mode* #t) (server:client-setup) (import readline) (import apropos) (gnu-history-install-file-manager (string-append (or (get-environment-variable "HOME") ".") "/.megatest_history")) (current-input-port (make-gnu-readline-port "megatest> ")) (if (args:get-arg "-repl") (repl) (load (args:get-arg "-load")))) (exit)) (set! *didsomething* #t))) ;;====================================================================== ;; Exit and clean up ;;====================================================================== |
︙ | ︙ |
Modified server.scm from [a9e744212f] to [898af852f2].
1 2 3 4 5 6 7 8 9 10 11 12 | ;; Copyright 2006-2012, Matthew Welland. ;; ;; This program is made available under the GNU GPL version 2.0 or ;; greater. See the accompanying file COPYING for details. ;; ;; This program is distributed WITHOUT ANY WARRANTY; without even the ;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR ;; PURPOSE. (require-extension (srfi 18) extras tcp s11n) | | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | ;; Copyright 2006-2012, Matthew Welland. ;; ;; This program is made available under the GNU GPL version 2.0 or ;; greater. See the accompanying file COPYING for details. ;; ;; This program is distributed WITHOUT ANY WARRANTY; without even the ;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR ;; PURPOSE. (require-extension (srfi 18) extras tcp s11n) (use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest zmq) (import (prefix sqlite3 sqlite3:)) (use spiffy uri-common intarweb http-client spiffy-request-vars) (declare (unit server)) (declare (uses common)) |
︙ | ︙ | |||
127 128 129 130 131 132 133 | (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result) ;; (send-message pubsock target send-more: #t) ;; (send-message pubsock (case *transport-type* ((fs) result) ((http)(db:obj->string (vector success/fail query-sig result))) ((zmq) | > | | | 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 | (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result) ;; (send-message pubsock target send-more: #t) ;; (send-message pubsock (case *transport-type* ((fs) result) ((http)(db:obj->string (vector success/fail query-sig result))) ((zmq) (let ((pub-socket (vector-ref *runremote* 1))) (send-message pub-socket return-addr send-more: #t) (send-message pub-socket (db:obj->string (vector success/fail query-sig result))))) (else (debug:print 0 "ERROR: unrecognised transport type: " *transport-type*) result))) ;;====================================================================== ;; C L I E N T S ;;====================================================================== |
︙ | ︙ | |||
152 153 154 155 156 157 158 | ;; Not currently used! But, I think it *should* be used!!! (define (server:client-logout serverdat) (let ((ok (and (socket? serverdat) (cdb:logout serverdat *toppath* (server:get-client-signature))))) ok)) | < < < < < < < < < < < < < < < > > > > > | | | | | > > > | | | | | | | | | > | | < | 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 | ;; Not currently used! But, I think it *should* be used!!! (define (server:client-logout serverdat) (let ((ok (and (socket? serverdat) (cdb:logout serverdat *toppath* (server:get-client-signature))))) ok)) ;; Do all the connection work, look up the transport type and set up the ;; connection if required. ;; ;; There are two scenarios. ;; 1. We are a test manager and we received *transport-type* and *runremote* via cmdline ;; 2. We are a run tests, list runs or other interactive process and we mush figure out ;; *transport-type* and *runremote* from the monitor.db ;; (define (server:client-setup #!key (numtries 50)) (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: failed to find megatest.config, exiting") (exit)))) (debug:print-info 11 "*transport-type* is " *transport-type* ", *runremote* is " *runremote*) (let* ((hostinfo (if (not *transport-type*) ;; If we dont' already have transport type set then figure it out (open-run-close tasks:get-best-server tasks:open-db) #f))) ;; if have hostinfo then extract the transport type ;; else fall back to fs (debug:print-info 11 "CLIENT SETUP, hostinfo=" hostinfo) (set! *transport-type* (if hostinfo (string->symbol (tasks:hostinfo-get-transport hostinfo)) 'fs)) ;; DEBUG STUFF (if (eq? *transport-type* 'fs)(begin (print "ERROR!!!!!!! refusing to run with transport " *transport-type*)(exit 99))) (debug:print-info 11 "Using transport type of " *transport-type* (if hostinfo (conc " to connect to " hostinfo) "")) (case *transport-type* ((fs)(if (not *megatest-db*)(set! *megatest-db* (open-db)))) ((http) (http-transport:client-connect (tasks:hostinfo-get-interface hostinfo) (tasks:hostinfo-get-port hostinfo))) ((zmq) (zmq-transport:client-connect (tasks:hostinfo-get-interface hostinfo) (tasks:hostinfo-get-port hostinfo) (tasks:hostinfo-get-pubport hostinfo))) (else ;; default to fs (debug:print 0 "ERROR: unrecognised transport type " *transport-type* " attempting to continue with fs") (set! *transport-type* 'fs) (set! *megatest-db* (open-db)))))) ;; all routes though here end in exit ... (define (server:launch transport) (if (not *toppath*) (if (not (setup-for-run)) (begin |
︙ | ︙ |
Modified tasks.scm from [a2249c790f] to [6d66ace21c].
︙ | ︙ | |||
192 193 194 195 196 197 198 199 | (let ((res '()) (best #f)) (sqlite3:for-each-row (lambda (id interface port pubport transport pid hostname) (set! res (cons (vector id interface port pubport transport pid hostname) res)) (debug:print-info 2 "Found existing server " hostname ":" port " registered in db")) mdb "SELECT id,interface,port,pubport,transport,pid,hostname FROM servers | > < | | 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 | (let ((res '()) (best #f)) (sqlite3:for-each-row (lambda (id interface port pubport transport pid hostname) (set! res (cons (vector id interface port pubport transport pid hostname) res)) (debug:print-info 2 "Found existing server " hostname ":" port " registered in db")) mdb ;; strftime('%s','now')-heartbeat < 10 AND "SELECT id,interface,port,pubport,transport,pid,hostname FROM servers WHERE mt_version=? ORDER BY start_time DESC LIMIT 1;" megatest-version) ;; for now we are keeping only one server registered in the db, return #f or first server found (if (null? res) #f (car res)))) ;; BUG: This logic is probably needed unless methodology changes completely... ;; ;; (if (null? res) #f ;; (let loop ((hed (car res)) |
︙ | ︙ |
Modified zmq-transport.scm from [9fdff728f3] to [e1f3152a02].
︙ | ︙ | |||
71 72 73 74 75 76 77 | (define (zmq-transport:run hostn) (debug:print 2 "Attempting to start the server ...") (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") (exit)))) | > | | 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 | (define (zmq-transport:run hostn) (debug:print 2 "Attempting to start the server ...") (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") (exit)))) (let* ((db (open-db)) ;; here we *do not* want to be opening and closing the db (zmq-sdat1 #f) (zmq-sdat2 #f) (pull-socket #f) (pub-socket #f) (p1 #f) (p2 #f) (zmq-sockets-dat #f) (iface (if (string=? "-" hostn) |
︙ | ︙ | |||
100 101 102 103 104 105 106 107 108 109 110 111 112 113 | (set! p1 (caddr zmq-sdat1)) (set! zmq-sdat2 (cadr zmq-sockets-dat)) (set! pub-socket (cadr zmq-sdat2)) (set! p2 (caddr zmq-sdat2)) (set! *cache-on* #t) ;; what to do when we quit ;; ;; (on-exit (lambda () ;; (if (and *toppath* *server-info*) ;; (open-run-close tasks:server-deregister-self tasks:open-db (car *server-info*)) ;; (let loop () | > > | 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 | (set! p1 (caddr zmq-sdat1)) (set! zmq-sdat2 (cadr zmq-sockets-dat)) (set! pub-socket (cadr zmq-sdat2)) (set! p2 (caddr zmq-sdat2)) (set! *cache-on* #t) (set! *runremote* (vector pull-socket pub-socket)) ;; overloading the use of *runremote* BUG!? ;; what to do when we quit ;; ;; (on-exit (lambda () ;; (if (and *toppath* *server-info*) ;; (open-run-close tasks:server-deregister-self tasks:open-db (car *server-info*)) ;; (let loop () |
︙ | ︙ | |||
134 135 136 137 138 139 140 | (if (not (member qtype '(sync ping))) (begin (mutex-lock! *heartbeat-mutex*) (set! *last-db-access* (current-seconds)) (mutex-unlock! *heartbeat-mutex*))) (if #t ;; (cdb:packet-get-immediate packet) ;; process immediately or put in queue (begin | > | > | 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 | (if (not (member qtype '(sync ping))) (begin (mutex-lock! *heartbeat-mutex*) (set! *last-db-access* (current-seconds)) (mutex-unlock! *heartbeat-mutex*))) (if #t ;; (cdb:packet-get-immediate packet) ;; process immediately or put in queue (begin (db:process-queue-item db packet) ;; (open-run-close db:process-queue #f pub-socket (cons packet queue-lst)) (loop '())) (loop (cons packet queue-lst))))))) ;; run zmq-transport:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; (define (zmq-transport:keep-running) |
︙ | ︙ | |||
278 279 280 281 282 283 284 | (debug:print 0 "ERROR: Failed to open socket to " conurl) #f)))) (define (zmq-transport:client-connect iface pullport pubport) (let* ((push-socket (zmq-transport:client-socket-connect iface pullport type: 'push)) (sub-socket (zmq-transport:client-socket-connect iface pubport type: 'sub | | | | 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 | (debug:print 0 "ERROR: Failed to open socket to " conurl) #f)))) (define (zmq-transport:client-connect iface pullport pubport) (let* ((push-socket (zmq-transport:client-socket-connect iface pullport type: 'push)) (sub-socket (zmq-transport:client-socket-connect iface pubport type: 'sub subscriptions: (list (server:get-client-signature) "all"))) (zmq-sockets (vector push-socket sub-socket)) (login-res #f)) (debug:print-info 11 "zmq-transport:client-connect started. Next is login") (set! login-res (server:client-login zmq-sockets)) (if (and (not (null? login-res)) (car login-res)) (begin (debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".") (set! *runremote* zmq-sockets) zmq-sockets) (begin |
︙ | ︙ | |||
379 380 381 382 383 384 385 | ;; )) ;; "Self ping")) (th2 (make-thread (lambda () (zmq-transport:run (if (args:get-arg "-server") (args:get-arg "-server") "-"))) "Server run")) | | | | 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 | ;; )) ;; "Self ping")) (th2 (make-thread (lambda () (zmq-transport:run (if (args:get-arg "-server") (args:get-arg "-server") "-"))) "Server run")) ;; (th3 (make-thread (lambda ()(zmq-transport:keep-running)) "Keep running")) ) (set! *client-non-blocking-mode* #t) ;; (thread-start! th1) (thread-start! th2) ;; (thread-start! th3) (set! *didsomething* #t) ;; (thread-join! th3) (thread-join! th2) ) (debug:print 0 "ERROR: Failed to setup for megatest"))) (define (zmq-transport:client-signal-handler signum) |
︙ | ︙ |