Index: Makefile ================================================================== --- Makefile +++ Makefile @@ -4,14 +4,15 @@ INSTALL=install SRCFILES = common.scm items.scm launch.scm \ ods.scm runconfig.scm server.scm configf.scm \ db.scm keys.scm margs.scm megatest-version.scm \ process.scm runs.scm tasks.scm tests.scm genexample.scm \ - http-transport.scm filedb.scm \ + http-transport.scm nmsg-transport.scm filedb.scm \ client.scm gutils.scm synchash.scm daemon.scm mt.scm dcommon.scm \ tree.scm ezsteps.scm lock-queue.scm sdb.scm \ - rmt.scm api.scm tdb.scm portlogger.scm + rmt.scm api.scm tdb.scm rpc-transport.scm \ + portlogger.scm # Eggs to install (straightforward ones) EGGS=matchable readline apropos base64 regex-literals format regex-case test coops trace csv \ dot-locking posix-utils posix-extras directory-utils hostinfo tcp-server rpc csv-xml fmt \ json md5 awful http-client spiffy uri-common intarweb spiffy-request-vars \ @@ -58,11 +59,11 @@ tests.o runs.o dashboard.o dashboard-tests.o dashboard-main.o : run_records.scm db.o ezsteps.o keys.o launch.o megatest.o monitor.o runs-for-ref.o runs.o tests.o : key_records.scm tests.o tasks.o dashboard-tasks.o : task_records.scm runs.o : test_records.scm megatest.o : megatest-fossil-hash.scm -client.scm common.scm configf.scm dashboard-guimonitor.scm dashboard-tests.scm dashboard.scm db.scm dcommon.scm ezsteps.scm fs-transport.scm http-transport.scm index-tree.scm items.scm keys.scm launch.scm megatest.scm monitor.scm mt.scm newdashboard.scm runconfig.scm runs.scm server.scm tdb.scm tests.scm tree.scm zmq-transport.scm : common_records.scm +client.scm common.scm configf.scm dashboard-guimonitor.scm dashboard-tests.scm dashboard.scm db.scm dcommon.scm ezsteps.scm fs-transport.scm http-transport.scm index-tree.scm items.scm keys.scm launch.scm megatest.scm monitor.scm mt.scm newdashboard.scm runconfig.scm runs.scm server.scm tdb.scm tests.scm tree.scm zmq-transport.scm : common_records.scm rpc-transport.scm # Temporary while transitioning to new routine # runs.o : run-tests-queue-classic.scm run-tests-queue-new.scm megatest-fossil-hash.scm : $(SRCFILES) megatest.scm *_records.scm Index: api.scm ================================================================== --- api.scm +++ api.scm @@ -10,10 +10,11 @@ ;;====================================================================== (declare (unit api)) (declare (uses rmt)) (declare (uses db)) +(declare (uses tasks)) ;; allow these queries through without starting a server ;; (define api:read-only-queries '(get-key-val-pairs @@ -44,94 +45,182 @@ get-prev-run-ids get-run-ids-matching-target get-runs-by-patt get-steps-data login - testmeta-get-record)) + testmeta-get-record + have-incompletes? + )) + +(define api:write-queries + '( + ;; SERVERS + start-server + kill-server + + ;; TESTS + test-set-state-status-by-id + delete-test-records + delete-old-deleted-test-records + test-set-status-state + test-set-top-process-pid + roll-up-pass-fail-counts + update-fail-pass-counts + + ;; RUNS + register-run + set-tests-state-status + delete-run + lock/unlock-run + update-run-event_time + mark-incomplete + + ;; STEPS + teststep-set-status! + + ;; TEST DATA + test-data-rollup + csv->test-data + + ;; MISC + sync-inmem->db + + ;; TESTMETA + testmeta-add-record + testmeta-update-field + + ;; TASKS + tasks-add + tasks-set-state-given-param-key + )) ;; These are called by the server on recipt of /api calls ;; - keep it simple, only return the actual result of the call, i.e. no meta info here ;; -(define (api:execute-requests dbstruct cmd params) - (case (string->symbol cmd) - ;; SERVERS - ((start-server) (apply server:kind-run params)) - ((kill-server) (set! *server-run* #f)) - - ;; KEYS - ((get-key-val-pairs) (apply db:get-key-val-pairs dbstruct params)) - ((get-keys) (db:get-keys dbstruct)) - - ;; TESTS - ((test-toplevel-num-items) (apply db:test-toplevel-num-items dbstruct params)) - ((get-test-info-by-id) (apply db:get-test-info-by-id dbstruct params)) - ((test-get-rundir-from-test-id) (apply db:test-get-rundir-from-test-id dbstruct params)) - ((test-set-state-status-by-id) (apply db:test-set-state-status-by-id dbstruct params)) - ((get-count-tests-running) (apply db:get-count-tests-running dbstruct params)) - ((get-count-tests-running-in-jobgroup) (apply db:get-count-tests-running-in-jobgroup dbstruct params)) - ((delete-test-records) (apply db:delete-test-records dbstruct params)) - ;; ((delete-test-step-records) (apply db:delete-test-step-records dbstruct params)) - ((delete-old-deleted-test-records) (apply db:delete-old-deleted-test-records dbstruct params)) - ((test-set-status-state) (apply db:test-set-status-state dbstruct params)) - ((get-previous-test-run-record) (apply db:get-previous-test-run-record dbstruct params)) - ((get-matching-previous-test-run-records)(apply db:get-matching-previous-test-run-records dbstruct params)) - ((test-get-logfile-info) (apply db:test-get-logfile-info dbstruct params)) - ((test-get-records-for-index-file) (apply db:test-get-records-for-index-file dbstruct params)) - ((get-testinfo-state-status) (apply db:get-testinfo-state-status dbstruct params)) - ((test-set-top-process-pid) (apply db:test-set-top-process-pid dbstruct params)) - ((test-get-top-process-pid) (apply db:test-get-top-process-pid dbstruct params)) - ((test-get-paths-matching-keynames-target-new) (apply db:test-get-paths-matching-keynames-target-new dbstruct params)) - ((get-prereqs-not-met) (apply db:get-prereqs-not-met dbstruct params)) - ((roll-up-pass-fail-counts) (apply db:roll-up-pass-fail-counts dbstruct params)) - ((update-fail-pass-counts) (apply db:general-call dbstruct 'update-pass-fail-counts params)) - ((get-count-tests-running-for-run-id) (apply db:get-count-tests-running-for-run-id dbstruct params)) - - ;; RUNS - ((get-run-info) (apply db:get-run-info dbstruct params)) - ((get-run-status) (apply db:get-run-status dbstruct params)) - ((set-run-status) (apply db:set-run-status dbstruct params)) - ((register-run) (apply db:register-run dbstruct params)) - ((set-tests-state-status) (apply db:set-tests-state-status dbstruct params)) - ((get-tests-for-run) (apply db:get-tests-for-run dbstruct params)) - ((get-test-id) (apply db:get-test-id dbstruct params)) +;; - returns #( flag result ) +;; +(define (api:execute-requests dbstruct dat) + (handle-exceptions + exn + (let ((call-chain (get-call-chain))) + (print-call-chain (current-error-port)) + (debug:print 0 " message: " ((condition-property-accessor 'exn 'message) exn)) + (vector #f (vector exn call-chain dat))) ;; return some stuff for debug if an exception happens + (if (not (vector? dat)) ;; it is an error to not receive a vector + (vector #f #f "remote must be called with a vector") + (vector ;; return a vector + the returned data structure + #t + (let ((cmd (vector-ref dat 0)) + (params (vector-ref dat 1))) + (case (if (symbol? cmd) + cmd + (string->symbol cmd)) + + ;;=============================================== + ;; READ/WRITE QUERIES + ;;=============================================== + + ;; SERVERS + ((start-server) (apply server:kind-run params)) + ((kill-server) (set! *server-run* #f)) + + ;; TESTS + ((test-set-state-status-by-id) (apply db:test-set-state-status-by-id dbstruct params)) + ((delete-test-records) (apply db:delete-test-records dbstruct params)) + ((delete-old-deleted-test-records) (apply db:delete-old-deleted-test-records dbstruct params)) + ((test-set-status-state) (apply db:test-set-status-state dbstruct params)) + ((test-set-top-process-pid) (apply db:test-set-top-process-pid dbstruct params)) + ((roll-up-pass-fail-counts) (apply db:roll-up-pass-fail-counts dbstruct params)) + ((update-fail-pass-counts) (apply db:general-call dbstruct 'update-pass-fail-counts params)) + + ;; RUNS + ((register-run) (apply db:register-run dbstruct params)) + ((set-tests-state-status) (apply db:set-tests-state-status dbstruct params)) + ((delete-run) (apply db:delete-run dbstruct params)) + ((lock/unlock-run) (apply db:lock/unlock-run dbstruct params)) + ((update-run-event_time) (apply db:update-run-event_time dbstruct params)) + + ;; STEPS + ((teststep-set-status!) (apply db:teststep-set-status! dbstruct params)) + + ;; TEST DATA + ((test-data-rollup) (apply db:test-data-rollup dbstruct params)) + ((csv->test-data) (apply db:csv->test-data dbstruct params)) + + ;; MISC + ((sync-inmem->db) (let ((run-id (car params))) + (db:sync-touched dbstruct run-id force-sync: #t))) + ((mark-incomplete) (apply db:find-and-mark-incomplete dbstruct params)) + + ;; TESTMETA + ((testmeta-add-record) (apply db:testmeta-add-record dbstruct params)) + ((testmeta-update-field) (apply db:testmeta-update-field dbstruct params)) + + ;; TASKS + ((tasks-add) (apply tasks:add dbstruct params)) + ((tasks-set-state-given-param-key) (apply tasks:set-state-given-param-key dbstruct params)) + + ;;====================================================================== + ;; READ ONLY QUERIES + ;;====================================================================== + + ;; KEYS + ((get-key-val-pairs) (apply db:get-key-val-pairs dbstruct params)) + ((get-keys) (db:get-keys dbstruct)) + + ;; TESTS + ((test-toplevel-num-items) (apply db:test-toplevel-num-items dbstruct params)) + ((get-test-info-by-id) (apply db:get-test-info-by-id dbstruct params)) + ((test-get-rundir-from-test-id) (apply db:test-get-rundir-from-test-id dbstruct params)) + ((get-count-tests-running) (apply db:get-count-tests-running dbstruct params)) + ((get-count-tests-running-in-jobgroup) (apply db:get-count-tests-running-in-jobgroup dbstruct params)) + ;; ((delete-test-step-records) (apply db:delete-test-step-records dbstruct params)) + ((get-previous-test-run-record) (apply db:get-previous-test-run-record dbstruct params)) + ((get-matching-previous-test-run-records)(apply db:get-matching-previous-test-run-records dbstruct params)) + ((test-get-logfile-info) (apply db:test-get-logfile-info dbstruct params)) + ((test-get-records-for-index-file) (apply db:test-get-records-for-index-file dbstruct params)) + ((get-testinfo-state-status) (apply db:get-testinfo-state-status dbstruct params)) + ((test-get-top-process-pid) (apply db:test-get-top-process-pid dbstruct params)) + ((test-get-paths-matching-keynames-target-new) (apply db:test-get-paths-matching-keynames-target-new dbstruct params)) + ((get-prereqs-not-met) (apply db:get-prereqs-not-met dbstruct params)) + ((get-count-tests-running-for-run-id) (apply db:get-count-tests-running-for-run-id dbstruct params)) + + ;; RUNS + ((get-run-info) (apply db:get-run-info dbstruct params)) + ((get-run-status) (apply db:get-run-status dbstruct params)) + ((set-run-status) (apply db:set-run-status dbstruct params)) + ((get-tests-for-run) (apply db:get-tests-for-run dbstruct params)) + ((get-test-id) (apply db:get-test-id dbstruct params)) ((get-tests-for-run-mindata) (apply db:get-tests-for-run-mindata dbstruct params)) - ((get-run-name-from-id) (apply db:get-run-name-from-id dbstruct params)) - ((delete-run) (apply db:delete-run dbstruct params)) - ((get-runs) (apply db:get-runs dbstruct params)) - ((get-all-run-ids) (db:get-all-run-ids dbstruct)) - ((get-prev-run-ids) (apply db:get-prev-run-ids dbstruct params)) - ((get-run-ids-matching-target) (apply db:get-run-ids-matching-target dbstruct params)) - ((get-runs-by-patt) (apply db:get-runs-by-patt dbstruct params)) - ((lock/unlock-run) (apply db:lock/unlock-run dbstruct params)) - ((update-run-event_time) (apply db:update-run-event_time dbstruct params)) - ((find-and-mark-incomplete) (apply db:find-and-mark-incomplete dbstruct params)) - - ;; STEPS - ((teststep-set-status!) (apply db:teststep-set-status! dbstruct params)) - - ;; TEST DATA - ((test-data-rollup) (apply db:test-data-rollup dbstruct params)) - ((csv->test-data) (apply db:csv->test-data dbstruct params)) - ((get-steps-data) (apply db:get-steps-data dbstruct params)) - - ;; MISC - ((login) (apply db:login dbstruct params)) - ((general-call) (let ((stmtname (car params)) - (run-id (cadr params)) - (realparams (cddr params))) - (db:with-db dbstruct run-id #t ;; these are all for modifying the db - (lambda (db) - (db:general-call db stmtname realparams))))) - ((sync-inmem->db) (db:sync-touched dbstruct run-id force-sync: #t)) - ((sdb-qry) (apply sdb:qry params)) - - ;; TESTMETA - ((testmeta-get-record) (apply db:testmeta-get-record dbstruct params)) - ((testmeta-add-record) (apply db:testmeta-add-record dbstruct params)) - ((testmeta-update-field) (apply db:testmeta-update-field dbstruct params)) - (else - (list "ERROR" 0)))) + ((get-runs) (apply db:get-runs dbstruct params)) + ((get-all-run-ids) (db:get-all-run-ids dbstruct)) + ((get-prev-run-ids) (apply db:get-prev-run-ids dbstruct params)) + ((get-run-ids-matching-target) (apply db:get-run-ids-matching-target dbstruct params)) + ((get-runs-by-patt) (apply db:get-runs-by-patt dbstruct params)) + ((get-run-name-from-id) (apply db:get-run-name-from-id dbstruct params)) + + ;; STEPS + ((get-steps-data) (apply db:get-steps-data dbstruct params)) + + ;; MISC + ((have-incompletes?) (apply db:have-incompletes? dbstruct params)) + ((login) (apply db:login dbstruct params)) + ((general-call) (let ((stmtname (car params)) + (run-id (cadr params)) + (realparams (cddr params))) + (db:with-db dbstruct run-id #t ;; these are all for modifying the db + (lambda (db) + (db:general-call db stmtname realparams))))) + ((sdb-qry) (apply sdb:qry params)) + ((ping) (current-process-id)) + + ;; TESTMETA + ((testmeta-get-record) (apply db:testmeta-get-record dbstruct params)) + + ;; TASKS + ((find-task-queue-records) (apply tasks:find-task-queue-records dbstruct params)))))))) + ;; http-server send-response ;; api:process-request ;; db:* ;; @@ -138,18 +227,19 @@ ;; NB// Runs on the server as part of the server loop ;; (define (api:process-request dbstruct $) ;; the $ is the request vars proc (let* ((cmd ($ 'cmd)) (paramsj ($ 'params)) - (params (db:string->obj paramsj)) ;; (rmt:json-str->dat paramsj)) - (res (api:execute-requests dbstruct cmd params))) + (params (db:string->obj paramsj transport: 'http)) ;; (rmt:json-str->dat paramsj)) + (resdat (api:execute-requests dbstruct (vector cmd params))) ;; #( flag result ) + (res (vector-ref resdat 1))) ;; This can be here but needs controls to ensure it doesn't run more than every 4 seconds ;; (rmt:dat->json-str ;; (if (or (string? res) ;; (list? res) ;; (number? res) ;; (boolean? res)) ;; res ;; (list "ERROR, not string, list, number or boolean" 1 cmd params res))))) - (db:obj->string res))) + (db:obj->string res transport: 'http))) Index: client.scm ================================================================== --- client.scm +++ client.scm @@ -41,10 +41,109 @@ (define (client:logout serverdat) (let ((ok (and (socket? serverdat) (cdb:logout serverdat *toppath* (client:get-signature))))) ok)) +(define (client:connect iface port) + (case (server:get-transport) + ((rpc) (rpc:client-connect iface port)) + ((http) (http:client-connect iface port)) + ((zmq) (zmq:client-connect iface port)) + (else (rpc:client-connect iface port)))) + +(define (client:login-no-auto-setup server-info run-id) + (case (server:get-transport) + ((rpc) (rpc:login-no-auto-client-setup server-info run-id)) + ((http) (rmt:login-no-auto-client-setup server-info run-id)) + (else (rpc:login-no-auto-client-setup server-info run-id)))) + +(define (client:setup run-id #!key (remaining-tries 10) (failed-connects 0)) + (case (server:get-transport) + ((rpc) (rpc-transport:client-setup run-id)) ;;(client:setup-rpc run-id)) + ((http)(client:setup-http run-id)) + (else (rpc-transport:client-setup run-id)))) ;; (client:setup-rpc run-id)))) + +;; (define (client:setup-rpc run-id) +;; (debug:print 0 "INFO: client:setup remaining-tries=" remaining-tries) +;; (if (<= remaining-tries 0) +;; (begin +;; (debug:print 0 "ERROR: failed to start or connect to server for run-id " run-id) +;; (exit 1)) +;; (let ((host-info (hash-table-ref/default *runremote* run-id #f))) +;; (debug:print-info 0 "client:setup host-info=" host-info ", remaining-tries=" remaining-tries) +;; (if host-info +;; (let* ((iface (car host-info)) +;; (port (cadr host-info)) +;; (start-res (client:connect iface port)) +;; ;; (ping-res (server:ping-server run-id iface port)) +;; (ping-res (client:login-no-auto-setup start-res run-id))) +;; (if ping-res ;; sucessful login? +;; (begin +;; (hash-table-set! *runremote* run-id start-res) +;; start-res) ;; return the server info +;; (if (member remaining-tries '(3 4 6)) +;; (begin ;; login failed +;; (debug:print 25 "INFO: client:setup start-res=" start-res ", run-id=" run-id ", server-dat=" host-info) +;; (hash-table-delete! *runremote* run-id) +;; (open-run-close tasks:server-force-clean-run-record +;; tasks:open-db +;; run-id +;; (car host-info) +;; (cadr host-info) +;; " client:setup (host-info=#t)") +;; (thread-sleep! 5) +;; (client:setup run-id remaining-tries: 10)) ;; (- remaining-tries 1))) +;; (begin +;; (debug:print 25 "INFO: client:setup failed to connect, start-res=" start-res ", run-id=" run-id ", host-info=" host-info) +;; (thread-sleep! 5) +;; (client:setup run-id remaining-tries: (- remaining-tries 1)))))) +;; ;; YUK: rename server-dat here +;; (let* ((server-dat (open-run-close tasks:get-server tasks:open-db run-id))) +;; (debug:print-info 0 "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries) +;; (if server-dat +;; (let* ((iface (tasks:hostinfo-get-interface server-dat)) +;; (port (tasks:hostinfo-get-port server-dat)) +;; (start-res (http-transport:client-connect iface port)) +;; ;; (ping-res (server:ping-server run-id iface port)) +;; (ping-res (rmt:login-no-auto-client-setup start-res run-id))) +;; (if start-res +;; (begin +;; (hash-table-set! *runremote* run-id start-res) +;; start-res) +;; (if (member remaining-tries '(2 5)) +;; (begin ;; login failed +;; (debug:print 25 "INFO: client:setup start-res=" start-res ", run-id=" run-id ", server-dat=" server-dat) +;; (hash-table-delete! *runremote* run-id) +;; (open-run-close tasks:server-force-clean-run-record +;; tasks:open-db +;; run-id +;; (tasks:hostinfo-get-interface server-dat) +;; (tasks:hostinfo-get-port server-dat) +;; " client:setup (server-dat = #t)") +;; (thread-sleep! 2) +;; (server:try-running run-id) +;; (thread-sleep! 10) ;; give server a little time to start up +;; (client:setup run-id remaining-tries: 10)) ;; (- remaining-tries 1))) +;; (begin +;; (debug:print 25 "INFO: client:setup start-res=" start-res ", run-id=" run-id ", server-dat=" server-dat) +;; (thread-sleep! 5) +;; (client:setup run-id remaining-tries: (- remaining-tries 1)))))) +;; (begin ;; no server registered +;; (if (eq? remaining-tries 2) +;; (begin +;; ;; (open-run-close tasks:server-clean-out-old-records-for-run-id tasks:open-db run-id " client:setup (server-dat=#f)") +;; (client:setup run-id remaining-tries: 10)) +;; (begin +;; (thread-sleep! 2) +;; (debug:print 25 "INFO: client:setup start-res (not defined here), run-id=" run-id ", server-dat=" server-dat) +;; (if (< (open-run-close tasks:num-in-available-state tasks:open-db run-id) 3) +;; (begin +;; ;; (open-run-close tasks:server-clean-out-old-records-for-run-id tasks:open-db run-id " client:setup (server-dat=#f)") +;; (server:try-running run-id))) +;; (thread-sleep! 10) ;; give server a little time to start up +;; (client:setup run-id remaining-tries: (- remaining-tries 1))))))))))) + ;; Do all the connection work, look up the transport type and set up the ;; connection if required. ;; ;; There are two scenarios. ;; 1. We are a test manager and we received *transport-type* and *runremote* via cmdline @@ -53,75 +152,95 @@ ;; ;; client:setup ;; ;; lookup_server, need to remove *runremote* stuff ;; -(define (client:setup run-id #!key (remaining-tries 100) (failed-connects 0)) +(define (client:setup-http run-id #!key (remaining-tries 10) (failed-connects 0)) (debug:print-info 2 "client:setup remaining-tries=" remaining-tries) (let* ((tdbdat (tasks:open-db))) (if (<= remaining-tries 0) (begin (debug:print 0 "ERROR: failed to start or connect to server for run-id " run-id) (exit 1)) - (let ((host-info (hash-table-ref/default *runremote* run-id #f))) - (if host-info - (let* ((iface (http-transport:server-dat-get-iface host-info)) - (port (http-transport:server-dat-get-port host-info)) - (start-res (http-transport:client-connect iface port)) - (ping-res (rmt:login-no-auto-client-setup start-res run-id))) - (if ping-res ;; sucessful login? - (begin - (debug:print-info 2 "client:setup, ping is good using host-info=" host-info ", remaining-tries=" remaining-tries) - ;; Why add the close-connections here? - ;; (http-transport:close-connections run-id) - (hash-table-set! *runremote* run-id start-res) - start-res) ;; return the server info - ;; have host info but no ping. shutdown the current connection and try again - (begin ;; login failed - (debug:print-info 1 "client:setup, ping is bad for start-res=" start-res " and *runremote*=" host-info) - (http-transport:close-connections run-id) - (hash-table-delete! *runremote* run-id) - (if (< remaining-tries 8) - (thread-sleep! 5) - (thread-sleep! 1)) - (client:setup run-id remaining-tries: (- remaining-tries 1))))) - ;; YUK: rename server-dat here - (let* ((server-dat (tasks:get-server (db:delay-if-busy tdbdat) run-id))) - (debug:print-info 4 "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries) - (if server-dat - (let* ((iface (tasks:hostinfo-get-interface server-dat)) - (port (tasks:hostinfo-get-port server-dat)) - (start-res (http-transport:client-connect iface port)) - (ping-res (rmt:login-no-auto-client-setup start-res run-id))) - (if (and start-res - ping-res) - (begin - (hash-table-set! *runremote* run-id start-res) - (debug:print-info 2 "connected to " (http-transport:server-dat-make-url start-res)) - start-res) - (begin ;; login failed but have a server record, clean out the record and try again - (debug:print-info 0 "client:setup, login failed, will attempt to start server ... start-res=" start-res ", run-id=" run-id ", server-dat=" server-dat) - (http-transport:close-connections run-id) - (hash-table-delete! *runremote* run-id) - (tasks:server-force-clean-run-record (db:delay-if-busy tdbdat) - run-id - (tasks:hostinfo-get-interface server-dat) - (tasks:hostinfo-get-port server-dat) - " client:setup (server-dat = #t)") - (thread-sleep! 2) - (server:try-running run-id) - (thread-sleep! 10) ;; give server a little time to start up - (client:setup run-id remaining-tries: (- remaining-tries 1))))) - (begin ;; no server registered - (let ((num-available (tasks:num-in-available-state (db:dbdat-get-db tdbdat) run-id))) - (debug:print-info 0 "client:setup, no server registered, remaining-tries=" remaining-tries " num-available=" num-available) - (thread-sleep! 2) - (if (< num-available 2) - (begin - (server:try-running run-id))) - (thread-sleep! 10) ;; give server a little time to start up - (client:setup run-id remaining-tries: (- remaining-tries 1))))))))))) + (let* ((server-dat (tasks:get-server (db:delay-if-busy tdbdat) run-id))) + (debug:print-info 4 "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries) + (if server-dat + (let* ((iface (tasks:hostinfo-get-interface server-dat)) + (hostname (tasks:hostinfo-get-hostname server-dat)) + (port (tasks:hostinfo-get-port server-dat)) + (start-res (case *transport-type* + ((http)(http-transport:client-connect iface port)) + ((nmsg)(nmsg-transport:client-connect hostname port)))) + (ping-res (case *transport-type* + ((http)(rmt:login-no-auto-client-setup start-res run-id)) + ((nmsg)(let ((logininfo (rmt:login-no-auto-client-setup start-res run-id))) + (if logininfo + (car (vector-ref logininfo 1)) + #f)))))) + (if (and start-res + ping-res) + (begin + (hash-table-set! *runremote* run-id start-res) + (debug:print-info 2 "connected to " (http-transport:server-dat-make-url start-res)) + start-res) + (begin ;; login failed but have a server record, clean out the record and try again + (debug:print-info 0 "client:setup, login failed, will attempt to start server ... start-res=" start-res ", run-id=" run-id ", server-dat=" server-dat) + (case *transport-type* + ((http)(http-transport:close-connections run-id))) + (hash-table-delete! *runremote* run-id) + (tasks:kill-server-run-id run-id) + (tasks:server-force-clean-run-record (db:delay-if-busy tdbdat) + run-id + (tasks:hostinfo-get-interface server-dat) + (tasks:hostinfo-get-port server-dat) + " client:setup (server-dat = #t)") + (if (> remaining-tries 8) + (thread-sleep! (+ 1 (random 5))) ;; spread out the starts a little + (thread-sleep! (+ 15 (random 20)))) ;; it isn't going well. give it plenty of time + (server:try-running run-id) + (thread-sleep! 5) ;; give server a little time to start up + (client:setup run-id remaining-tries: (- remaining-tries 1)) + ))) + (begin ;; no server registered + (let ((num-available (tasks:num-in-available-state (db:dbdat-get-db tdbdat) run-id))) + (debug:print-info 0 "client:setup, no server registered, remaining-tries=" remaining-tries " num-available=" num-available) + (if (< num-available 2) + (server:try-running run-id)) + (thread-sleep! (+ 5 (random (- 20 remaining-tries)))) ;; give server a little time to start up, randomize a little to avoid start storms. + (client:setup run-id remaining-tries: (- remaining-tries 1))))))))) + +;; (let ((host-info (hash-table-ref/default *runremote* run-id #f))) +;; (if host-info ;; this is a bit circular. the host-info *is* the start-res FIXME +;; (let* ((iface (http-transport:server-dat-get-iface host-info)) +;; (port (http-transport:server-dat-get-port host-info)) +;; (start-res (case *transport-type* +;; ((http)(http-transport:client-connect iface port)) +;; ((nmsg)(nmsg-transport:client-connect iface port)) ;; (http-transport:server-dat-get-socket host-info)) +;; (else #f))) +;; (ping-res (case *transport-type* +;; ((http)(rmt:login-no-auto-client-setup start-res run-id)) +;; ((nmsg)(let ((logininfo (rmt:login-no-auto-client-setup start-res run-id))) +;; (if logininfo +;; (vector-ref (vector-ref logininfo 1) 1) +;; #f))) +;; (else #f)))) +;; (if ping-res ;; sucessful login? +;; (begin +;; (debug:print-info 2 "client:setup, ping is good using host-info=" host-info ", remaining-tries=" remaining-tries) +;; start-res) ;; return the server info +;; ;; have host info but no ping. shutdown the current connection and try again +;; (begin ;; login failed +;; (debug:print-info 1 "client:setup, ping is bad for start-res=" start-res " and *runremote*=" host-info) +;; (case *transport-type* +;; ((http)(http-transport:close-connections run-id))) +;; (hash-table-delete! *runremote* run-id) +;; (if (< remaining-tries 8) +;; (thread-sleep! 5) +;; (thread-sleep! 1)) +;; (client:setup run-id remaining-tries: (- remaining-tries 1))))) +;; ;; YUK: rename server-dat here +;; ;; keep this as a function to ease future (define (client:start run-id server-info) (http-transport:client-connect (tasks:hostinfo-get-interface server-info) (tasks:hostinfo-get-port server-info))) Index: common.scm ================================================================== --- common.scm +++ common.scm @@ -66,11 +66,11 @@ (define *db-access-mutex* (make-mutex)) ;; SERVER (define *my-client-signature* #f) (define *transport-type* 'http) -(define *rpc:listener* #f) ;; if set up for server communication this will hold the tcp port +(define *transport-type* 'http) ;; override with [server] transport http|rpc|nmsg (define *runremote* (make-hash-table)) ;; if set up for server communication this will hold (define *max-cache-size* 0) (define *logged-in-clients* (make-hash-table)) (define *client-non-blocking-mode* #f) (define *server-id* #f) @@ -226,10 +226,11 @@ (let ((run-ids (hash-table-keys *db-local-sync*))) (if (and (not (null? run-ids)) (configf:lookup *configdat* "setup" "megatest-db")) (db:multi-db-sync run-ids 'new2old))) (if *dbstruct-db* (db:close-all *dbstruct-db*)) + (if *inmemdb* (db:close-all *inmemdb*)) (if (and *megatest-db* (sqlite3:database? *megatest-db*)) (begin (sqlite3:interrupt! *megatest-db*) (sqlite3:finalize! *megatest-db* #t) Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -296,11 +296,11 @@ (refdb (dbr:dbstruct-get-refdb dbstruct)) (olddb (dbr:dbstruct-get-olddb dbstruct)) ;; (runid (dbr:dbstruct-get-run-id dbstruct)) ) (debug:print-info 4 "Syncing for run-id: " run-id) - (mutex-lock! *http-mutex*) + ;; (mutex-lock! *http-mutex*) (if (eq? run-id 0) ;; runid equal to 0 is main.db (if maindb (if (or (not (number? mtime)) (not (number? stime)) @@ -325,16 +325,16 @@ (> mtime stime) force-sync) (begin (db:delay-if-busy rundb) (db:delay-if-busy olddb) + (dbr:dbstruct-set-stime! dbstruct (current-milliseconds)) (let ((num-synced (db:sync-tables db:sync-tests-only inmem refdb rundb olddb))) - (dbr:dbstruct-set-stime! dbstruct (current-milliseconds)) - (mutex-unlock! *http-mutex*) + ;; (mutex-unlock! *http-mutex*) num-synced) (begin - (mutex-unlock! *http-mutex*) + ;; (mutex-unlock! *http-mutex*) 0)))))) (define (db:close-main dbstruct) (let ((maindb (dbr:dbstruct-get-main dbstruct))) (if maindb @@ -617,10 +617,11 @@ ;; (define (db:multi-db-sync run-ids . options) (let* ((toppath (launch:setup-for-run)) (dbstruct (if toppath (make-dbr:dbstruct path: toppath) #f)) (mtdb (if toppath (db:open-megatest-db))) + (allow-cleanup (if run-ids #f #t)) (run-ids (if run-ids run-ids (if toppath (begin (db:delay-if-busy mtdb) (db:get-all-run-ids mtdb))))) @@ -663,21 +664,47 @@ (db:replace-test-records dbstruct run-id testrecs) (sqlite3:finalize! (db:dbdat-get-db (dbr:dbstruct-get-rundb dbstruct))))) run-ids))) ;; now ensure all newdb data are synced to megatest.db + ;; do not use the run-ids list passed in to the function + ;; (if (member 'new2old options) - (for-each - (lambda (run-id) - (let* ((fromdb (if toppath (make-dbr:dbstruct path: toppath local: #t) #f)) - (frundb (db:dbdat-get-db (db:get-db fromdb run-id)))) - ;; (db:delay-if-busy frundb) - ;; (db:delay-if-busy mtdb) - (if (eq? run-id 0) - (db:sync-tables (db:sync-main-list dbstruct) (db:get-db fromdb #f) mtdb) - (db:sync-tables db:sync-tests-only (db:get-db fromdb run-id) mtdb)))) - (cons 0 run-ids))) + (let* ((maindb (make-dbr:dbstruct path: toppath local: #t)) + (src-run-ids (db:get-all-run-ids (db:dbdat-get-db (db:get-db maindb 0)))) + (all-run-ids (sort (delete-duplicates (cons 0 src-run-ids)) <)) + (count 1) + (total (length all-run-ids)) + (dead-runs '())) + (for-each + (lambda (run-id) + (debug:print 0 "Processing run " (if (eq? run-id 0) " main.db " run-id) ", " count " of " total) + (set! count (+ count 1)) + (let* ((fromdb (if toppath (make-dbr:dbstruct path: toppath local: #t) #f)) + (frundb (db:dbdat-get-db (db:get-db fromdb run-id)))) + ;; (db:delay-if-busy frundb) + ;; (db:delay-if-busy mtdb) + ;; (db:clean-up frundb) + (if (eq? run-id 0) + (begin + (db:sync-tables (db:sync-main-list dbstruct) (db:get-db fromdb #f) mtdb) + (set! dead-runs (db:clean-up-maindb (db:get-db fromdb #f)))) + (begin + ;; NB// must sync first to ensure deleted tests get marked as such in megatest.db + (db:sync-tables db:sync-tests-only (db:get-db fromdb run-id) mtdb) + (db:clean-up-rundb (db:get-db fromdb run-id)) + )))) + all-run-ids) + ;; removed deleted runs + (let ((dbdir (tasks:get-task-db-path))) + (for-each (lambda (run-id) + (let ((fullname (conc dbdir "/" run-id ".db"))) + (if (file-exists? fullname) + (begin + (debug:print 0 "Removing database file for deleted run " fullname) + (delete-file fullname))))) + dead-runs)))) ;; (db:close-all dbstruct) ;; (sqlite3:finalize! mdb) )) ;; keeping it around for debugging purposes only @@ -769,10 +796,23 @@ avg_runtime REAL, avg_disk REAL, tags TEXT DEFAULT '', jobgroup TEXT DEFAULT 'default', CONSTRAINT test_meta_constraint UNIQUE (testname));") + (sqlite3:execute db "CREATE TABLE IF NOT EXISTS tasks_queue (id INTEGER PRIMARY KEY, + action TEXT DEFAULT '', + owner TEXT, + state TEXT DEFAULT 'new', + target TEXT DEFAULT '', + name TEXT DEFAULT '', + testpatt TEXT DEFAULT '', + keylock TEXT, + params TEXT, + creation_time TIMESTAMP, + execution_time TIMESTAMP);") + ;; move this clean up call somewhere else + (sqlite3:execute db "DELETE FROM tasks_queue WHERE state='done' AND creation_time < ?;" (- (current-seconds)(* 24 60 60))) ;; remove older than 24 hrs (sqlite3:execute db (conc "CREATE INDEX IF NOT EXISTS runs_index ON runs (runname" (if havekeys "," "") keystr ");")) ;; (sqlite3:execute db "CREATE VIEW runs_tests AS SELECT * FROM runs INNER JOIN tests ON runs.id=tests.run_id;") (sqlite3:execute db "CREATE TABLE IF NOT EXISTS extradat (id INTEGER PRIMARY KEY, run_id INTEGER, key TEXT, val TEXT);") (sqlite3:execute db "CREATE TABLE IF NOT EXISTS metadat (id INTEGER PRIMARY KEY, var TEXT, val TEXT, CONSTRAINT metadat_constraint UNIQUE (var));") @@ -895,10 +935,64 @@ ;;====================================================================== ;;====================================================================== ;; M A I N T E N A N C E ;;====================================================================== + +(define (db:have-incompletes? dbstruct run-id ovr-deadtime) + (let* ((dbdat (db:get-db dbstruct run-id)) + (db (db:dbdat-get-db dbdat)) + (incompleted '()) + (oldlaunched '()) + (toplevels '()) + (deadtime-str (configf:lookup *configdat* "setup" "deadtime")) + (deadtime (if (and deadtime-str + (string->number deadtime-str)) + (string->number deadtime-str) + 7200))) ;; two hours + (if (number? ovr-deadtime)(set! deadtime ovr-deadtime)) + + ;; in RUNNING or REMOTEHOSTSTART for more than 10 minutes + ;; + ;; HOWEVER: this code in run:test seems to work fine + ;; (> (- (current-seconds)(+ (db:test-get-event_time testdat) + ;; (db:test-get-run_duration testdat))) + ;; 600) + (db:delay-if-busy dbdat) + (sqlite3:for-each-row + (lambda (test-id run-dir uname testname item-path) + (if (and (equal? uname "n/a") + (equal? item-path "")) ;; this is a toplevel test + ;; what to do with toplevel? call rollup? + (begin + (set! toplevels (cons (list test-id run-dir uname testname item-path run-id) toplevels)) + (debug:print-info 0 "Found old toplevel test in RUNNING state, test-id=" test-id)) + (set! incompleted (cons (list test-id run-dir uname testname item-path run-id) incompleted)))) + db + "SELECT id,rundir,uname,testname,item_path FROM tests WHERE run_id=? AND (strftime('%s','now') - event_time) > (run_duration + ?) AND state IN ('RUNNING','REMOTEHOSTSTART');" + run-id deadtime) + + ;; in LAUNCHED for more than one day. Could be long due to job queues TODO/BUG: Need override for this in config + ;; + (db:delay-if-busy dbdat) + (sqlite3:for-each-row + (lambda (test-id run-dir uname testname item-path) + (if (and (equal? uname "n/a") + (equal? item-path "")) ;; this is a toplevel test + ;; what to do with toplevel? call rollup? + (set! toplevels (cons (list test-id run-dir uname testname item-path run-id) toplevels)) + (set! oldlaunched (cons (list test-id run-dir uname testname item-path run-id) oldlaunched)))) + db + "SELECT id,rundir,uname,testname,item_path FROM tests WHERE run_id=? AND (strftime('%s','now') - event_time) > 86400 AND state IN ('LAUNCHED');" + run-id) + + (debug:print-info 18 "Found " (length oldlaunched) " old LAUNCHED items, " (length toplevels) " old LAUNCHED toplevel tests and " (length incompleted) " tests marked RUNNING but apparently dead.") + (if (and (null? incompleted) + (null? oldlaunched) + (null? toplevels)) + #f + #t))) ;; select end_time-now from ;; (select testname,item_path,event_time+run_duration as ;; end_time,strftime('%s','now') as now from tests where state in ;; ('RUNNING','REMOTEHOSTSTART','LAUNCED')); @@ -993,11 +1087,11 @@ ;; 2. Look at run records ;; a. If have tests that are not deleted, set state='unknown' ;; b. .... ;; (define (db:clean-up dbdat) - (debug:print 0 "WARNING: db clean up not fully ported to v1.60, cleanup action will be on megatest.db") + ;; (debug:print 0 "WARNING: db clean up not fully ported to v1.60, cleanup action will be on megatest.db") (let* ((db (db:dbdat-get-db dbdat)) (count-stmt (sqlite3:prepare db "SELECT (SELECT count(id) FROM tests)+(SELECT count(id) FROM runs);")) (statements (map (lambda (stmt) (sqlite3:prepare db stmt)) @@ -1027,10 +1121,99 @@ (map sqlite3:finalize! statements) (sqlite3:finalize! count-stmt) ;; (db:find-and-mark-incomplete db) (db:delay-if-busy dbdat) (sqlite3:execute db "VACUUM;"))) + +;; Clean out old junk and vacuum the database +;; +;; Ultimately do something like this: +;; +;; 1. Look at test records either deleted or part of deleted run: +;; a. If test dir exists, set the the test to state='UNKNOWN', Set the run to 'unknown' +;; b. If test dir gone, delete the test record +;; 2. Look at run records +;; a. If have tests that are not deleted, set state='unknown' +;; b. .... +;; +(define (db:clean-up-rundb dbdat) + ;; (debug:print 0 "WARNING: db clean up not fully ported to v1.60, cleanup action will be on megatest.db") + (let* ((db (db:dbdat-get-db dbdat)) + (count-stmt (sqlite3:prepare db "SELECT (SELECT count(id) FROM tests);")) + (statements + (map (lambda (stmt) + (sqlite3:prepare db stmt)) + (list + ;; delete all tests that belong to runs that are 'deleted' + ;; (conc "DELETE FROM tests WHERE run_id NOT IN (" (string-intersperse (map conc valid-runs) ",") ");") + ;; delete all tests that are 'DELETED' + "DELETE FROM tests WHERE state='DELETED';" + )))) + (db:delay-if-busy dbdat) + (sqlite3:with-transaction + db + (lambda () + (sqlite3:for-each-row (lambda (tot) + (debug:print-info 0 "Records count before clean: " tot)) + count-stmt) + (map sqlite3:execute statements) + (sqlite3:for-each-row (lambda (tot) + (debug:print-info 0 "Records count after clean: " tot)) + count-stmt))) + (map sqlite3:finalize! statements) + (sqlite3:finalize! count-stmt) + ;; (db:find-and-mark-incomplete db) + (db:delay-if-busy dbdat) + (sqlite3:execute db "VACUUM;"))) + +;; Clean out old junk and vacuum the database +;; +;; Ultimately do something like this: +;; +;; 1. Look at test records either deleted or part of deleted run: +;; a. If test dir exists, set the the test to state='UNKNOWN', Set the run to 'unknown' +;; b. If test dir gone, delete the test record +;; 2. Look at run records +;; a. If have tests that are not deleted, set state='unknown' +;; b. .... +;; +(define (db:clean-up-maindb dbdat) + ;; (debug:print 0 "WARNING: db clean up not fully ported to v1.60, cleanup action will be on megatest.db") + (let* ((db (db:dbdat-get-db dbdat)) + (count-stmt (sqlite3:prepare db "SELECT (SELECT count(id) FROM runs);")) + (statements + (map (lambda (stmt) + (sqlite3:prepare db stmt)) + (list + ;; delete all tests that belong to runs that are 'deleted' + ;; (conc "DELETE FROM tests WHERE run_id NOT IN (" (string-intersperse (map conc valid-runs) ",") ");") + ;; delete all tests that are 'DELETED' + "DELETE FROM runs WHERE state='deleted';" + ))) + (dead-runs '())) + (sqlite3:for-each-row + (lambda (run-id) + (set! dead-runs (cons run-id dead-runs))) + db + "SELECT id FROM runs WHERE state='deleted';") + (db:delay-if-busy dbdat) + (sqlite3:with-transaction + db + (lambda () + (sqlite3:for-each-row (lambda (tot) + (debug:print-info 0 "Records count before clean: " tot)) + count-stmt) + (map sqlite3:execute statements) + (sqlite3:for-each-row (lambda (tot) + (debug:print-info 0 "Records count after clean: " tot)) + count-stmt))) + (map sqlite3:finalize! statements) + (sqlite3:finalize! count-stmt) + ;; (db:find-and-mark-incomplete db) + (db:delay-if-busy dbdat) + (sqlite3:execute db "VACUUM;") + dead-runs)) ;;====================================================================== ;; M E T A G E T A N D S E T V A R S ;;====================================================================== @@ -2296,26 +2479,26 @@ ;;====================================================================== ;; QUEUE UP META, TEST STATUS AND STEPS REMOTE ACCESS ;;====================================================================== ;; NOTE: Can remove the regex and base64 encoding for zmq -(define (db:obj->string obj) - (case *transport-type* +(define (db:obj->string obj #!key (transport 'http)) + (case transport ;; ((fs) obj) ((http fs) (string-substitute (regexp "=") "_" (base64:base64-encode (z3:encode-buffer (with-output-to-string (lambda ()(serialize obj))))) #t)) - ((zmq)(with-output-to-string (lambda ()(serialize obj)))) + ((zmq nmsg)(with-output-to-string (lambda ()(serialize obj)))) (else obj))) -(define (db:string->obj msg) - (case *transport-type* +(define (db:string->obj msg #!key (transport 'http)) + (case transport ;; ((fs) msg) ((http fs) (if (string? msg) (with-input-from-string (z3:decode-buffer @@ -2323,12 +2506,12 @@ (string-substitute (regexp "_") "=" msg #t))) (lambda ()(deserialize))) (begin (debug:print 0 "ERROR: reception failed. Received " msg " but cannot translate it.") - #f))) ;; crude reply for when things go awry - ((zmq)(with-input-from-string msg (lambda ()(deserialize)))) + msg))) ;; crude reply for when things go awry + ((zmq nmsg)(with-input-from-string msg (lambda ()(deserialize)))) (else msg))) (define (db:test-set-status-state dbstruct run-id test-id status state msg) (let ((dbdat (db:get-db dbstruct run-id))) (if (member state '("LAUNCHED" "REMOTEHOSTSTART")) Index: docs/manual/megatest_manual.html ================================================================== --- docs/manual/megatest_manual.html +++ docs/manual/megatest_manual.html @@ -1,1284 +1,1361 @@ - - - - - -The Megatest Users Manual - - - - - -
-
-

Preface

-
-

This book is organised as three sub-books; getting started, writing tests and reference.

-
-

Why Megatest?

-

The Megatest project was started for two reasons, the first was an -immediate and pressing need for a generalized tool to manage a suite -of regression tests and the second was the fact that the author had -written or maintained several such tools at different companies over -the years and it seemed a good thing to have a single open source -tool, flexible enough to meet the needs of any team doing continuous -integrating and or running a complex suite of tests for release -qualification.

-
-
-

Megatest Design Philosophy

-

Megatest is intended to provide the minimum needed resources to make -writing a suite of tests and tasks for implementing continuous build -for software, design engineering or process control (via owlfs for -example) without being specialized for any specific problem -space. Megatest in of itself does not know what constitutes a PASS or -FAIL of a test. In most cases megatest is best used in conjunction -with logpro or a similar tool to parse, analyze and decide on the test -outcome.

-
-
-

Megatest Architecture

-

All data to specify the tests and configure the system is stored in -plain text files. All system state is stored in an sqlite3 -database. Tests are launched using the launching system available for -the distributed compute platform in use. A template script is provided -which can launch jobs on local and remote Linux hosts. Currently -megatest uses the network filesystem to call home to your master -sqlite3 database.

-
-
-
-

Road Map

-

Note 1: This road-map is tentative and subject to change without notice.

-

Note 2: Starting over. Old plan is commented out.

-
-

Current Items

-
-
-

ww05 - migrate to inmem-db

-

Keep as much the same as possible. Add internal reference to almost -eliminate contention on db(s).

-
    -
  1. -

    -Add internal reference db -

    -
  2. -
  3. -

    -Verify that actions are accessing correct db -

    -
      -
    1. -

      --runtests - inmem -

      -
    2. -
    3. -

      --list-runs - local (but not megatest.db) -

      -
    4. -
    5. -

      -dashboard - local (but not megatest.db) -

      -
    6. -
    -
  4. -
  5. -

    -Mirror db to /var/tmp… -

    -
  6. -
  7. -

    -Dashboard read db from per-run db. -

    -
  8. -
  9. -

    -Dashboard read db from /var/tmp -

    -
  10. -
  11. -

    -Runs register in tasks table in monitor.db -

    -
  12. -
  13. -

    -Server polls tasks table for next action (in addition?) -

    -
  14. -
  15. -

    -Change run loop to execute in server, triggered by call to polling of tasks table -

    -
  16. -
-
-
-
-

Getting Started

-
-
Getting started with Megatest
-
-

How to install Megatest and set it up for running your regressions and continuous integration process.

-
-
-

Installation

-
-
-

Dependencies

-

Chicken scheme and a number of "eggs" are required for building -Megatest. See the script installall.sch in the utils directory of the -distribution for a mostly automated way to install everything needed -for building Megatest on Linux.

-


[An example footnote.]

-

And now for something completely different: monkeys, lions and -tigers (Bengal and Siberian) using the alternative syntax index -entries. - - - -Note that multi-entry terms generate separate index entries.

-

Here are a couple of image examples: an -images/smallnew.png - -example inline image followed by an example block image:

-
-
-Tiger image -
-
Figure 1. Tiger block image
-
-

Followed by an example table:

-
- - --- - - - - - - - - - - - - - - - -
Table 1. An example table
Option Description

-a USER GROUP

Add USER to GROUP.

-R GROUP

Disables access to GROUP.

-
-
-
Example 1. An example example
-
-

Lorum ipum…

-
-
-
-

Sub-section with Anchor

-

Sub-section at level 2.

-
-

Chapter Sub-section

-

Sub-section at level 3.

-
-
Chapter Sub-section
-

Sub-section at level 4.

-

This is the maximum sub-section depth supported by the distributed -AsciiDoc configuration. -
[A second example footnote.]

-
-
-
-
-
-
-

The Second Chapter

-
-

An example link to anchor at start of the first sub-section.

-

An example link to a bibliography entry [taoup].

-
-
-

Writing Tests

-
-

The First Chapter of the Second Part

-
-

Chapters grouped into book parts are at level 1 and can contain -sub-sections.

-
-
-

How To Do Things

-
-

Tricks

-
-

This section is a compendium of a various useful tricks for debugging, -configuring and generally getting the most out of Megatest.

-
-
-
-

Limiting your running jobs

-
-

The following example will limit a test in the jobgroup "group1" to no more than 10 tests simultaneously.

-

In your testconfig:

-
-
-
[test_meta]
-jobgroup group1
-
-

In your megatest.config:

-
-
-
[jobgroups]
-group1 10
-custdes 4
-
-
-
-
-

Debugging Tricks

-
-
-

Examining The Environment

-
-

During Config File Processing

-
-
-

Organising Your Tests and Tasks

-
-
-
[tests-paths]
-1 #{get misc parent}/simplerun/tests
-
-
-
-
[setup]
-
-

The runscript method is a brute force way to run scripts where the -user is responsible for setting STATE and STATUS

-
-
-
runscript main.csh
-
-
-
-
-

Debugging Server Problems

-
-
-
sudo lsof -i
-sudo netstat -lptu
-sudo netstat -tulpn
-
-
-
-
-

Reference

-
-

The First Chapter of the Second Part

-
-

Chapters grouped into book parts are at level 1 and can contain -sub-sections.

-
-
-
-

The testconfig File

-
-
-

Setup section

-
-

Header

-
-
-
[setup]
-
-

The runscript method is a brute force way to run scripts where the -user is responsible for setting STATE and STATUS

-
-
-
runscript main.csh
-
-
-
-
-

Requirements section

-
-

Header

-
-
-
[requirements]
-
-
-
-

Wait on Other Tests

-
-
-
# A normal waiton waits for the prior tests to be COMPLETED
-# and PASS, CHECK or WAIVED
-waiton test1 test2
-
-
-
-

Mode

-

The default (i.e. if mode is not specified) is normal. All pre-dependent tests -must be COMPLETED and PASS, CHECK or WAIVED before the test will start

-
-
-
mode   normal
-
-

The toplevel mode requires only that the prior tests are COMPLETED.

-
-
-
mode toplevel
-
-

A item based waiton will start items in a test when the -same-named item is COMPLETED and PASS, CHECK or WAIVED -in the prior test

-
-
-
mode itemmatch
-
-
-
-
# With a toplevel test you may wish to generate your list
-# of tests to run dynamically
-#
-# waiton #{shell get-valid-tests-to-run.sh}
-
-
-
-

Run time limit

-
-
-
runtimelim 1h 2m 3s  # this will automatically kill the test if it runs for more than 1h 2m and 3s
-
-
-
-

Skip

-
-
-

Header

-
-
-
[skip]
-
-
-
-

Skip on Still-running Tests

-
-
-
# NB// If the prevrunning line exists with *any* value the test will
-# automatically SKIP if the same-named test is currently RUNNING
-
-prevrunning x
-
-
-
-

Skip if a File Exists

-
-
-
fileexists /path/to/a/file # skip if /path/to/a/file exists
-
-
-
-

Controlled waiver propagation

-

If test is FAIL and previous test in run with same MT_TARGET is WAIVED then apply the following rules from the testconfig: -If a waiver check is specified in the testconfig apply the check and if it passes then set this FAIL to WAIVED

-

Waiver check has two parts, 1) a list of waiver, rulename, filepatterns and 2) the rulename script spec (note that "diff" and "logpro" are predefined)

-
-
-
###### EXAMPLE FROM testconfig #########
-# matching file(s) will be diff'd with previous run and logpro applied
-# if PASS or WARN result from logpro then WAIVER state is set
-#
-[waivers]
-# logpro_file    rulename      input_glob
-waiver_1         logpro        lookittmp.log
-
-[waiver_rules]
-
-# This builtin rule is the default if there is no <waivername>.logpro file
-# diff   diff %file1% %file2%
-
-# This builtin rule is applied if a <waivername>.logpro file exists
-# logpro diff %file1% %file2% | logpro %waivername%.logpro %waivername%.html
-
-
-
-
-

Ezsteps

-

To transfer the environment to the next step you can do the following:

-
-
-
$MT_MEGATEST -env2file .ezsteps/${stepname}
-
-
-
-

Triggers

-

In your testconfig triggers can be specified

-
-
-
[triggers]
-
-# Call script running.sh when test goes to state=RUNNING, status=PASS
-RUNNING/PASS running.sh
-
-# Call script running.sh any time state goes to RUNNING
-RUNNING/ running.sh
-
-# Call script onpass.sh any time status goes to PASS
-PASS/ onpass.sh
-
-

Scripts called will have; test-id test-rundir trigger, added to the commandline.

-

HINT

-

To start an xterm (useful for debugging), use a command line like the following:

-
-
-
[triggers]
-COMPLETED/ xterm -e bash -s --
-
-
- - - -
-
Note
-
There is a trailing space after the --
-
-
-
-

Megatest Internals

-
-
-server.png -
-
-
-
-
-
-

Appendix A: Example Appendix

-
-

One or more optional appendixes go here at section level zero.

-
-

Appendix Sub-section

-
- - - -
-
Note
-
Preface and appendix subsections start out of sequence at level -2 (level 1 is skipped). This only applies to multi-part book -documents.
-
-
-
-
-
-

Example Bibliography

-
-

The bibliography list is a style of AsciiDoc bulleted list.

-
    -
  • -

    -[taoup] Eric Steven Raymond. The Art of Unix - Programming. Addison-Wesley. ISBN 0-13-142901-9. -

    -
  • -
  • -

    -[walsh-muellner] Norman Walsh & Leonard Muellner. - DocBook - The Definitive Guide. O’Reilly & Associates. 1999. - ISBN 1-56592-580-7. -

    -
  • -
-
-
-
-

Example Glossary

-
-

Glossaries are optional. Glossaries entries are an example of a style -of AsciiDoc labeled lists.

-
-
-A glossary term -
-
-

- The corresponding (indented) definition. -

-
-
-A second glossary term -
-
-

- The corresponding (indented) definition. -

-
-
-
-
-
-

Example Colophon

-
-

Text at the end of a book describing facts about its production.

-
-
-
-

Example Index

-
-
-
-
-

- - - + + + + + +The Megatest Users Manual + + + + + +
+
+

Preface

+
+

This book is organised as three sub-books; getting started, writing tests and reference.

+
+

Why Megatest?

+

The Megatest project was started for two reasons, the first was an +immediate and pressing need for a generalized tool to manage a suite +of regression tests and the second was the fact that the author had +written or maintained several such tools at different companies over +the years and it seemed a good thing to have a single open source +tool, flexible enough to meet the needs of any team doing continuous +integrating and or running a complex suite of tests for release +qualification.

+
+
+

Megatest Design Philosophy

+

Megatest is intended to provide the minimum needed resources to make +writing a suite of tests and tasks for implementing continuous build +for software, design engineering or process control (via owlfs for +example) without being specialized for any specific problem +space. Megatest in of itself does not know what constitutes a PASS or +FAIL of a test. In most cases megatest is best used in conjunction +with logpro or a similar tool to parse, analyze and decide on the test +outcome.

+
+
+

Megatest Architecture

+

All data to specify the tests and configure the system is stored in +plain text files. All system state is stored in an sqlite3 +database. Tests are launched using the launching system available for +the distributed compute platform in use. A template script is provided +which can launch jobs on local and remote Linux hosts. Currently +megatest uses the network filesystem to call home to your master +sqlite3 database.

+
+
+
+

Road Map

+

Note 1: This road-map is tentative and subject to change without notice.

+

Note 2: Starting over. Old plan is commented out.

+
+

Current Items

+
+
+

ww05 - migrate to inmem-db

+

Keep as much the same as possible. Add internal reference to almost +eliminate contention on db(s).

+
    +
  1. +

    +Add internal reference db +

    +
  2. +
  3. +

    +Verify that actions are accessing correct db +

    +
      +
    1. +

      +-runtests - inmem +

      +
    2. +
    3. +

      +-list-runs - local (but not megatest.db) +

      +
    4. +
    5. +

      +dashboard - local (but not megatest.db) +

      +
    6. +
    +
  4. +
  5. +

    +Mirror db to /var/tmp… +

    +
  6. +
  7. +

    +Dashboard read db from per-run db. +

    +
  8. +
  9. +

    +Dashboard read db from /var/tmp +

    +
  10. +
  11. +

    +Runs register in tasks table in monitor.db +

    +
  12. +
  13. +

    +Server polls tasks table for next action (in addition?) +

    +
  14. +
  15. +

    +Change run loop to execute in server, triggered by call to polling of tasks table +

    +
  16. +
+
+
+
+

Getting Started

+
+
Getting started with Megatest
+
+

How to install Megatest and set it up for running your regressions and continuous integration process.

+
+
+

Installation

+
+
+

Dependencies

+

Chicken scheme and a number of "eggs" are required for building +Megatest. See the script installall.sch in the utils directory of the +distribution for a mostly automated way to install everything needed +for building Megatest on Linux.

+


[An example footnote.]

+

And now for something completely different: monkeys, lions and +tigers (Bengal and Siberian) using the alternative syntax index +entries. + + + +Note that multi-entry terms generate separate index entries.

+

Here are a couple of image examples: an +images/smallnew.png + +example inline image followed by an example block image:

+
+
+Tiger image +
+
Figure 1. Tiger block image
+
+

Followed by an example table:

+
+ + +++ + + + + + + + + + + + + + + + +
Table 1. An example table
Option Description

-a USER GROUP

Add USER to GROUP.

-R GROUP

Disables access to GROUP.

+
+
+
Example 1. An example example
+
+

Lorum ipum…

+
+
+
+

Sub-section with Anchor

+

Sub-section at level 2.

+
+

Chapter Sub-section

+

Sub-section at level 3.

+
+
Chapter Sub-section
+

Sub-section at level 4.

+

This is the maximum sub-section depth supported by the distributed +AsciiDoc configuration. +
[A second example footnote.]

+
+
+
+
+
+
+

The Second Chapter

+
+

An example link to anchor at start of the first sub-section.

+

An example link to a bibliography entry [taoup].

+
+
+

Writing Tests

+
+

The First Chapter of the Second Part

+
+

Chapters grouped into book parts are at level 1 and can contain +sub-sections.

+
+
+

How To Do Things

+
+

Tricks

+
+

This section is a compendium of a various useful tricks for debugging, +configuring and generally getting the most out of Megatest.

+
+
+
+

Limiting your running jobs

+
+

The following example will limit a test in the jobgroup "group1" to no more than 10 tests simultaneously.

+

In your testconfig:

+
+
+
[test_meta]
+jobgroup group1
+
+

In your megatest.config:

+
+
+
[jobgroups]
+group1 10
+custdes 4
+
+
+
+
+

Debugging Tricks

+
+
+

Examining The Environment

+
+

During Config File Processing

+
+
+

Organising Your Tests and Tasks

+
+
+
[tests-paths]
+1 #{get misc parent}/simplerun/tests
+
+
+
+
[setup]
+
+

The runscript method is a brute force way to run scripts where the +user is responsible for setting STATE and STATUS

+
+
+
runscript main.csh
+
+
+
+
+

Debugging Server Problems

+
+
+
sudo lsof -i
+sudo netstat -lptu
+sudo netstat -tulpn
+
+
+
+
+

Reference

+
+

The testconfig File

+
+
+

Setup section

+
+

Header

+
+
+
[setup]
+
+

The runscript method is a brute force way to run scripts where the +user is responsible for setting STATE and STATUS

+
+
+
runscript main.csh
+
+
+
+
+

Requirements section

+
+

Header

+
+
+
[requirements]
+
+
+
+

Wait on Other Tests

+
+
+
# A normal waiton waits for the prior tests to be COMPLETED
+# and PASS, CHECK or WAIVED
+waiton test1 test2
+
+
+
+

Mode

+

The default (i.e. if mode is not specified) is normal. All pre-dependent tests +must be COMPLETED and PASS, CHECK or WAIVED before the test will start

+
+
+
mode   normal
+
+

The toplevel mode requires only that the prior tests are COMPLETED.

+
+
+
mode toplevel
+
+

A item based waiton will start items in a test when the +same-named item is COMPLETED and PASS, CHECK or WAIVED +in the prior test

+
+
+
mode itemmatch
+
+
+
+
# With a toplevel test you may wish to generate your list
+# of tests to run dynamically
+#
+# waiton #{shell get-valid-tests-to-run.sh}
+
+
+
+

Run time limit

+
+
+
runtimelim 1h 2m 3s  # this will automatically kill the test if it runs for more than 1h 2m and 3s
+
+
+
+

Skip

+
+
+

Header

+
+
+
[skip]
+
+
+
+

Skip on Still-running Tests

+
+
+
# NB// If the prevrunning line exists with *any* value the test will
+# automatically SKIP if the same-named test is currently RUNNING
+
+prevrunning x
+
+
+
+

Skip if a File Exists

+
+
+
fileexists /path/to/a/file # skip if /path/to/a/file exists
+
+
+
+

Controlled waiver propagation

+

If test is FAIL and previous test in run with same MT_TARGET is WAIVED then apply the following rules from the testconfig: +If a waiver check is specified in the testconfig apply the check and if it passes then set this FAIL to WAIVED

+

Waiver check has two parts, 1) a list of waiver, rulename, filepatterns and 2) the rulename script spec (note that "diff" and "logpro" are predefined)

+
+
+
###### EXAMPLE FROM testconfig #########
+# matching file(s) will be diff'd with previous run and logpro applied
+# if PASS or WARN result from logpro then WAIVER state is set
+#
+[waivers]
+# logpro_file    rulename      input_glob
+waiver_1         logpro        lookittmp.log
+
+[waiver_rules]
+
+# This builtin rule is the default if there is no <waivername>.logpro file
+# diff   diff %file1% %file2%
+
+# This builtin rule is applied if a <waivername>.logpro file exists
+# logpro diff %file1% %file2% | logpro %waivername%.logpro %waivername%.html
+
+
+
+
+

Ezsteps

+

To transfer the environment to the next step you can do the following:

+
+
+
$MT_MEGATEST -env2file .ezsteps/${stepname}
+
+
+
+

Triggers

+

In your testconfig triggers can be specified

+
+
+
[triggers]
+
+# Call script running.sh when test goes to state=RUNNING, status=PASS
+RUNNING/PASS running.sh
+
+# Call script running.sh any time state goes to RUNNING
+RUNNING/ running.sh
+
+# Call script onpass.sh any time status goes to PASS
+PASS/ onpass.sh
+
+

Scripts called will have; test-id test-rundir trigger, added to the commandline.

+

HINT

+

To start an xterm (useful for debugging), use a command line like the following:

+
+
+
[triggers]
+COMPLETED/ xterm -e bash -s --
+
+
+ + + +
+
Note
+
There is a trailing space after the --
+
+
+
+
+
+

Programming API

+
+

These routines can be called from the megatest repl.

+
+ + +++++ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Table 2. API Server Management Calls
API Call Purpose comments Returns Comments

(rmt:login run-id)

Verify the the version, testsuite area etc. are correct.

#( #t "successful login" )

(rmt:start-server run-id)

#( success/fail n/a )

(rmt:kill-server run-id)

#( success/fail n/a )

Works only if the server is still reachable

+
+
+ + +++++ + + + + + + + + + + + + + + + + + + + + + + + +
Table 3. API Keys Related Calls
API Call Purpose comments Returns Comments

(rmt:get-key-val-pairs run-id)

#t=success/#f=fail

Works only if the server is still reachable

(rmt:get-keys run-id)

( key1 key2 … )

+
+
+

Megatest Internals

+
+
+server.png +
+
+
+
+
+
+

Appendix A: Example Appendix

+
+

One or more optional appendixes go here at section level zero.

+
+

Appendix Sub-section

+
+ + + +
+
Note
+
Preface and appendix subsections start out of sequence at level +2 (level 1 is skipped). This only applies to multi-part book +documents.
+
+
+
+
+
+

Example Bibliography

+
+

The bibliography list is a style of AsciiDoc bulleted list.

+
    +
  • +

    +[taoup] Eric Steven Raymond. The Art of Unix + Programming. Addison-Wesley. ISBN 0-13-142901-9. +

    +
  • +
  • +

    +[walsh-muellner] Norman Walsh & Leonard Muellner. + DocBook - The Definitive Guide. O’Reilly & Associates. 1999. + ISBN 1-56592-580-7. +

    +
  • +
+
+
+
+

Example Glossary

+
+

Glossaries are optional. Glossaries entries are an example of a style +of AsciiDoc labeled lists.

+
+
+A glossary term +
+
+

+ The corresponding (indented) definition. +

+
+
+A second glossary term +
+
+

+ The corresponding (indented) definition. +

+
+
+
+
+
+

Example Colophon

+
+

Text at the end of a book describing facts about its production.

+
+
+
+

Example Index

+
+
+
+
+

+ + + Index: docs/manual/reference.txt ================================================================== --- docs/manual/reference.txt +++ docs/manual/reference.txt @@ -1,15 +1,9 @@ Reference ========= -The First Chapter of the Second Part ------------------------------------- - -Chapters grouped into book parts are at level 1 and can contain -sub-sections. - The testconfig File ------------------- Setup section ~~~~~~~~~~~~~ @@ -178,8 +172,31 @@ [triggers] COMPLETED/ xterm -e bash -s -- ----------------- NOTE: There is a trailing space after the -- + +Programming API +--------------- + +These routines can be called from the megatest repl. + +.API Server Management Calls +[width="70%",cols="^,2m,2m,2m",frame="topbot",options="header,footer"] +|====================== +|API Call | Purpose comments | Returns | Comments +|(rmt:start-server run-id) | | #( success/fail n/a ) | +|(rmt:kill-server run-id) | | #( success/fail n/a ) | Works only if the server is still reachable +|(rmt:login run-id) | Verify the the version, testsuite area etc. are correct. | #( #t "successful login" ) | +|====================== + +.API Keys Related Calls +[width="70%",cols="^,2m,2m,2m",frame="topbot",options="header,footer"] +|====================== +|API Call | Purpose comments | Returns | Comments +|(rmt:get-keys run-id) | | ( key1 key2 ... ) | +| (rmt:get-key-val-pairs run-id) | | #t=success/#f=fail | Works only if the server is still reachable +|====================== + :numbered!: ADDED fs-transport.scm Index: fs-transport.scm ================================================================== --- /dev/null +++ fs-transport.scm @@ -0,0 +1,44 @@ + +;; Copyright 2006-2012, Matthew Welland. +;; +;; This program is made available under the GNU GPL version 2.0 or +;; greater. See the accompanying file COPYING for details. +;; +;; This program is distributed WITHOUT ANY WARRANTY; without even the +;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +;; PURPOSE. + +(require-extension (srfi 18) extras tcp s11n) + +(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) +(import (prefix sqlite3 sqlite3:)) + +(use spiffy uri-common intarweb http-client spiffy-request-vars) + +(tcp-buffer-size 2048) + +(declare (unit fs-transport)) + +(declare (uses common)) +(declare (uses db)) +(declare (uses tests)) +(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. + +(include "common_records.scm") +(include "db_records.scm") + + +;;====================================================================== +;; F S T R A N S P O R T S E R V E R +;;====================================================================== + +;; There is no "server" per se but a convience routine to make it non +;; necessary to be reopening the db over and over again. +;; + +(define (fs:process-queue-item packet) + (if (not *megatest-db*) ;; we will require that (setup-for-run) has already been called + (set! *megatest-db* (open-db))) + (debug:print-info 11 "fs:process-queue-item called with packet=" packet) + (db:process-queue-item *megatest-db* packet)) + Index: http-transport.scm ================================================================== --- http-transport.scm +++ http-transport.scm @@ -76,10 +76,16 @@ (debug:print-info 0 "portlogger recommended port: " start-port) (root-path (if link-tree-path link-tree-path (current-directory))) ;; WARNING: SECURITY HOLE. FIX ASAP! (handle-directory spiffy-directory-listing) + (handle-exception (lambda (exn chain) + (signal (make-composite-condition + (make-property-condition + 'server + 'message "server error"))))) + ;; http-transport:handle-directory) ;; simple-directory-handler) ;; Setup the web server and a /ctrl interface ;; (vhost-map `(((* any) . ,(lambda (continue) ;; open the db on the first call @@ -93,31 +99,10 @@ (send-response body: (api:process-request *inmemdb* $) ;; the $ is the request vars proc headers: '((content-type text/plain))) (mutex-lock! *heartbeat-mutex*) (set! *last-db-access* (current-seconds)) (mutex-unlock! *heartbeat-mutex*)) - ;; This is the /ctrl path where data is handed to the server and - ;; responses - ;; ((equal? (uri-path (request-uri (current-request))) - ;; '(/ "ctrl")) - ;; (let* ((packet (db:string->obj dat)) - ;; (qtype (cdb:packet-get-qtype packet))) - ;; (debug:print-info 12 "server=> received packet=" packet) - ;; (if (not (member qtype '(sync ping))) - ;; (begin - ;; (mutex-lock! *heartbeat-mutex*) - ;; (set! *last-db-access* (current-seconds)) - ;; (mutex-unlock! *heartbeat-mutex*))) - ;; ;; (mutex-lock! *db:process-queue-mutex*) ;; trying a mutex - ;; ;; (set! res (open-run-close db:process-queue-item open-db packet)) - ;; (set! res (db:process-queue-item db packet)) - ;; ;; (mutex-unlock! *db:process-queue-mutex*) - ;; (debug:print-info 11 "Return value from db:process-queue-item is " res) - ;; (send-response body: (conc "ctrl data\n" - ;; res - ;; "") - ;; headers: '((content-type text/plain))))) ((equal? (uri-path (request-uri (current-request))) '(/ "")) (send-response body: (http-transport:main-page))) ((equal? (uri-path (request-uri (current-request))) '(/ "runs")) @@ -243,28 +228,30 @@ (http-transport:server-dat-get-api-req serverdat) (begin (debug:print 0 "FATAL ERROR: http-transport:client-api-send-receive called with no server info") (exit 1)))) (res #f) - (success #t)) - (handle-exceptions - exn - (if (> numretries 0) - (begin - (mutex-unlock! *http-mutex*) - (thread-sleep! 1) - (handle-exceptions - exn - (debug:print 0 "WARNING: closing connections failed. Server at " fullurl " almost certainly dead") - (close-all-connections!)) - (debug:print 0 "WARNING: Failed to communicate with server, trying again, numretries left: " numretries) - (http-transport:client-api-send-receive run-id serverdat cmd params numretries: (- numretries 1))) - (begin - (mutex-unlock! *http-mutex*) - (tasks:kill-server-run-id run-id) - #f)) - (begin + (success #t) + (sparams (db:obj->string params transport: 'http))) +;; (condition-case +;; handle-exceptions +;; exn +;; (if (> numretries 0) +;; (begin +;; (mutex-unlock! *http-mutex*) +;; (thread-sleep! 1) +;; (handle-exceptions +;; exn +;; (debug:print 0 "WARNING: closing connections failed. Server at " fullurl " almost certainly dead") +;; (close-all-connections!)) +;; (debug:print 0 "WARNING: Failed to communicate with server, trying again, numretries left: " numretries) +;; (http-transport:client-api-send-receive run-id serverdat cmd sparams numretries: (- numretries 1))) +;; (begin +;; (mutex-unlock! *http-mutex*) +;; (tasks:kill-server-run-id run-id) +;; #f)) +;; (begin (debug:print-info 11 "fullurl=" fullurl ", cmd=" cmd ", params=" params ", run-id=" run-id "\n") ;; set up the http-client here (max-retry-attempts 1) ;; consider all requests indempotent (retry-request? (lambda (request) @@ -276,25 +263,30 @@ (mutex-lock! *http-mutex*) ;; (condition-case (with-input-from-request "http://localhost"; #f read-lines) ;; ((exn http client-error) e (print e))) (set! res (vector success - (handle-exceptions - exn - (begin - (set! success #f) - (debug:print 0 "WARNING: failure in with-input-from-request to " fullurl ". Killing associated server to allow clean retry.") - (debug:print 0 " message: " ((condition-property-accessor 'exn 'message) exn)) - (hash-table-delete! *runremote* run-id) - ;; (tasks:kill-server-run-id run-id) ;; better to kill the server in the logic that called this routine. - #f) - (with-input-from-request ;; was dat - fullurl - (list (cons 'key "thekey") - (cons 'cmd cmd) - (cons 'params params)) - read-string)))) + (db:string->obj + (handle-exceptions + exn + (begin + (set! success #f) + (debug:print 0 "WARNING: failure in with-input-from-request to " fullurl ".") + (debug:print 0 " message: " ((condition-property-accessor 'exn 'message) exn)) + (hash-table-delete! *runremote* run-id) + ;; Killing associated server to allow clean retry.") + (tasks:kill-server-run-id run-id) ;; better to kill the server in the logic that called this routine? + ;; (signal (make-composite-condition + ;; (make-property-condition 'commfail 'message "failed to connect to server"))) + "communications failed") + (with-input-from-request ;; was dat + fullurl + (list (cons 'key "thekey") + (cons 'cmd cmd) + (cons 'params sparams)) + read-string)) + transport: 'http))) ;; Shouldn't this be a call to the managed call-all-connections stuff above? (close-all-connections!) (mutex-unlock! *http-mutex*) )) (time-out (lambda () @@ -305,11 +297,24 @@ (thread-start! th1) (thread-start! th2) (thread-join! th1) (thread-terminate! th2) (debug:print-info 11 "got res=" res) - res))))) + (if (vector? res) + (if (vector-ref res 0) + res + (begin ;; note: this code also called in nmsg-transport - consider consolidating it + (debug:print 0 "ERROR: error occured at server, info=" (vector-ref res 2)) + (debug:print 0 " client call chain:") + (print-call-chain (current-error-port)) + (debug:print 0 " server call chain:") + (pp (vector-ref res 1) (current-error-port)) + (signal (vector-ref result 0)))) + (signal (make-composite-condition + (make-property-condition + 'timeout + 'message "nmsg-transport:client-api-send-receive-raw timed out talking to server"))))))) ;; careful closing of connections stored in *runremote* ;; (define (http-transport:close-connections run-id) (let* ((server-dat (hash-table-ref/default *runremote* run-id #f))) @@ -318,17 +323,18 @@ (close-connection! api-dat) #t) #f))) -(define (make-http-transport:server-dat)(make-vector 5)) +(define (make-http-transport:server-dat)(make-vector 6)) (define (http-transport:server-dat-get-iface vec) (vector-ref vec 0)) (define (http-transport:server-dat-get-port vec) (vector-ref vec 1)) (define (http-transport:server-dat-get-api-uri vec) (vector-ref vec 2)) (define (http-transport:server-dat-get-api-url vec) (vector-ref vec 3)) (define (http-transport:server-dat-get-api-req vec) (vector-ref vec 4)) (define (http-transport:server-dat-get-last-access vec) (vector-ref vec 5)) +(define (http-transport:server-dat-get-socket vec) (vector-ref vec 6)) (define (http-transport:server-dat-make-url vec) (if (and (http-transport:server-dat-get-iface vec) (http-transport:server-dat-get-port vec)) (conc "http://" @@ -355,10 +361,11 @@ ;; (define (http-transport:keep-running server-id run-id) ;; if none running or if > 20 seconds since ;; server last used then start shutdown ;; This thread waits for the server to come alive + (debug:print-info 0 "Starting the sync-back, keep alive thread in server for run-id=" run-id) (let* ((tdbdat (tasks:open-db)) (server-info (let loop ((start-time (current-seconds)) (changed #t) (last-sdat "not this")) (let ((sdat #f)) @@ -383,20 +390,15 @@ (equal? sdat last-sdat) sdat))))))) (iface (car server-info)) (port (cadr server-info)) (last-access 0) - (server-timeout (let ((tmo (configf:lookup *configdat* "server" "timeout"))) - (if (and (string? tmo) - (string->number tmo)) - (* 60 60 (string->number tmo)) - ;; (* 3 24 60 60) ;; default to three days - (* 60 1) ;; default to one minute - ;; (* 60 60 25) ;; default to 25 hours - )))) + (server-timeout (server:get-timeout))) (let loop ((count 0) (server-state 'available)) + + ;; Use this opportunity to sync the inmemdb to db (let ((start-time (current-milliseconds)) (sync-time #f) (rem-time #f)) ;; inmemdb is a dbstruct @@ -445,18 +447,12 @@ ;; no_traffic, no running tests, if server 0, no running servers ;; ;; (let ((wait-on-running (configf:lookup *configdat* "server" "wait-on-running"))) ;; wait on running tasks (if not true then exit on time out) ;; (if (and *server-run* - ;; (or (> (+ last-access server-timeout) (current-seconds))) -;; (and (eq? run-id 0) -;; (> (tasks:num-servers-non-zero-running tdb) 0)) -;; (and (not (eq? run-id 0)) ;; only makes sense in non-zero run-id servers -;; (> (db:get-count-tests-actually-running *inmemdb* run-id) 0)) -;; )) (begin (debug:print-info 0 "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) ;; ;; Consider implementing some smarts here to re-insert the record or kill self is ;; the db indicates so @@ -545,10 +541,22 @@ (thread-sleep! 0.25) ;; give the server time to settle before starting the keep-running monitor. (thread-start! th3) (set! *didsomething* #t) (thread-join! th2) (exit)))))) + +(define (http:ping run-id host-port) + (let* ((server-dat (http-transport:client-connect (car host-port)(cadr host-port))) + (login-res (rmt:login-no-auto-client-setup server-dat run-id))) + (if (and (list? login-res) + (car login-res)) + (begin + (print "LOGIN_OK") + (exit 0)) + (begin + (print "LOGIN_FAILED") + (exit 1))))) (define (http-transport:server-signal-handler signum) (signal-mask! signum) (handle-exceptions exn Index: launch.scm ================================================================== --- launch.scm +++ launch.scm @@ -458,10 +458,17 @@ environ-patt: "env-override" given-toppath: (get-environment-variable "MT_RUN_AREA_HOME") pathenvvar: "MT_RUN_AREA_HOME")) (set! *configdat* (if (car *configinfo*)(car *configinfo*) #f)) (set! *toppath* (if (car *configinfo*)(cadr *configinfo*) #f)) + (let* ((tmptransport (configf:lookup *configdat* "server" "transport")) + (transport (if tmptransport (string->symbol tmptransport) 'http))) + (if (member transport '(http rpc nmsg)) + (set! *transport-type* transport) + (begin + (debug:print 0 "ERROR: Unrecognised transport " transport) + (exit)))) (let ((linktree (configf:lookup *configdat* "setup" "linktree"))) ;; link tree is critical (if linktree (if (not (file-exists? linktree)) (begin (handle-exceptions Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -8,15 +8,16 @@ ;; PURPOSE. ;; (include "common.scm") ;; (include "megatest-version.scm") -(use sqlite3 srfi-1 posix regex regex-case srfi-69 base64 format readline apropos json - http-client directory-utils z3 srfi-18) ;; extras) +(use sqlite3 srfi-1 posix regex regex-case srfi-69 base64 format readline apropos json http-client directory-utils rpc ;; (srfi 18) extras) + http-client srfi-18) ;; zmq extras) (import (prefix sqlite3 sqlite3:)) (import (prefix base64 base64:)) +(import (prefix rpc rpc:)) ;; (use zmq) (declare (uses common)) (declare (uses megatest-version)) @@ -27,12 +28,11 @@ (declare (uses client)) (declare (uses tests)) (declare (uses genexample)) (declare (uses daemon)) (declare (uses db)) -;; (declare (uses sdb)) -;; (declare (uses filedb)) + (declare (uses tdb)) (declare (uses mt)) (declare (uses api)) (declare (uses tasks)) ;; only used for debugging. @@ -131,10 +131,11 @@ -update-meta : update the tests metadata for all tests -setvars VAR1=val1,VAR2=val2 : Add environment variables to a run NB// these are overwritten by values set in config files. -server -|hostname : start the server (reduces contention on megatest.db), use - to automatically figure out hostname + -transport http|zmq : use http or zmq for transport (default is http) -daemonize : fork into background and disconnect from stdin/out -log logfile : send stdout and stderr to logfile -list-servers : list the servers -stop-server id : stop server specified by id (see output of -list-servers), use 0 to kill all @@ -206,10 +207,12 @@ ":units" ;; misc "-start-dir" "-server" "-stop-server" + "-transport" + "-kill-server" "-port" "-extract-ods" "-pathmod" "-env2file" "-setvars" @@ -283,15 +286,18 @@ args:arg-hash 0)) ;; The watchdog is to keep an eye on things like db sync etc. ;; +(define *time-zero* (current-seconds)) (define *watchdog* (make-thread (lambda () (thread-sleep! 0.05) ;; delay for startup - (let ((legacy-sync (configf:lookup *configdat* "setup" "megatest-db"))) + (let ((legacy-sync (configf:lookup *configdat* "setup" "megatest-db")) + (debug-mode (debug:debug-mode 1)) + (last-time (current-seconds))) (let loop () ;; sync for filesystem local db writes ;; (let ((start-time (current-seconds)) (servers-started (make-hash-table))) @@ -310,17 +316,22 @@ ;; (begin ;; (debug:print-info 0 "Sync is taking a long time, start up a server to assist for run " run-id) ;; (server:kind-run run-id))))) (hash-table-delete! *db-local-sync* run-id))) (mutex-unlock! *db-multi-sync-mutex*)) - (hash-table-keys *db-local-sync*))) - + (hash-table-keys *db-local-sync*)) + (if (and debug-mode + (> (- start-time last-time) 14)) + (begin + (set! last-time start-time) + (debug:print-info 0 "timestamp -> " (seconds->time-string (current-seconds)) ", time since start -> " (seconds->hr-min-sec (- (current-seconds) *time-zero*)))))) + ;; keep going unless time to exit ;; (if (not *time-to-exit*) (begin - (thread-sleep! 1) ;; wait one second before syncing again + (thread-sleep! 5) ;; wait five seconds before syncing again, we'll also sync on exit (loop))))) "Watchdog thread"))) (thread-start! *watchdog*) @@ -475,10 +486,18 @@ (if (args:get-arg "-ping") (let* ((run-id (string->number (args:get-arg "-run-id"))) (host:port (args:get-arg "-ping"))) (server:ping run-id host:port))) +;; (set! *did-something* #t) +;; (begin +;; (print ((rpc:procedure 'testing (car host-port)(cadr host-port)))) +;; (case (server:get-transport) +;; ((http)(http:ping run-id host-port)) +;; ((rpc) (rpc:procedure 'server:login (car host-port)(cadr host-port));; *toppath*)) ;; (rpc-transport:ping run-id (car host-port)(cadr host-port))) +;; (else (debug:print 0 "ERROR: No transport set")(exit))))) + ;;====================================================================== ;; Start the server - can be done in conjunction with -runall or -runtests (one day...) ;; we start the server if not running else start the client thread ;;====================================================================== @@ -502,11 +521,12 @@ equal? (hash-table-keys args:arg-hash) '("-list-servers" "-stop-server" "-show-cmdinfo" - "-list-runs"))) + "-list-runs" + "-ping"))) (if (launch:setup-for-run) (let ((run-id (and (args:get-arg "-run-id") (string->number (args:get-arg "-run-id"))))) ;; (set! *fdb* (filedb:open-db (conc *toppath* "/db/paths.db"))) ;; if not list or kill then start a client (if appropriate) @@ -908,10 +928,11 @@ (if (or (args:get-arg "-test-files")(args:get-arg "-test-paths")) ;; if we are in a test use the MT_CMDINFO data (if (getenv "MT_CMDINFO") (let* ((startingdir (current-directory)) (cmdinfo (common:read-encoded-string (getenv "MT_CMDINFO"))) + (transport (assoc/default 'transport cmdinfo)) (testpath (assoc/default 'testpath cmdinfo)) (test-name (assoc/default 'test-name cmdinfo)) (runscript (assoc/default 'runscript cmdinfo)) (db-host (assoc/default 'db-host cmdinfo)) (run-id (assoc/default 'run-id cmdinfo)) @@ -955,10 +976,11 @@ (if (args:get-arg "-archive") ;; if we are in a test use the MT_CMDINFO data (if (getenv "MT_CMDINFO") (let* ((startingdir (current-directory)) (cmdinfo (common:read-encoded-string (getenv "MT_CMDINFO"))) + (transport (assoc/default 'transport cmdinfo)) (testpath (assoc/default 'testpath cmdinfo)) (test-name (assoc/default 'test-name cmdinfo)) (runscript (assoc/default 'runscript cmdinfo)) (db-host (assoc/default 'db-host cmdinfo)) (run-id (assoc/default 'run-id cmdinfo)) @@ -1033,10 +1055,11 @@ (if (not (getenv "MT_CMDINFO")) (begin (debug:print 0 "ERROR: MT_CMDINFO env var not set, -step must be called *inside* a megatest invoked environment!") (exit 5)) (let* ((cmdinfo (common:read-encoded-string (getenv "MT_CMDINFO"))) + (transport (assoc/default 'transport cmdinfo)) (testpath (assoc/default 'testpath cmdinfo)) (test-name (assoc/default 'test-name cmdinfo)) (runscript (assoc/default 'runscript cmdinfo)) (db-host (assoc/default 'db-host cmdinfo)) (run-id (assoc/default 'run-id cmdinfo)) @@ -1079,10 +1102,11 @@ (begin (debug:print 0 "ERROR: MT_CMDINFO env var not set, commands -test-status, -runstep and -setlog must be called *inside* a megatest environment!") (exit 5)) (let* ((startingdir (current-directory)) (cmdinfo (common:read-encoded-string (getenv "MT_CMDINFO"))) + (transport (assoc/default 'transport cmdinfo)) (testpath (assoc/default 'testpath cmdinfo)) (test-name (assoc/default 'test-name cmdinfo)) (runscript (assoc/default 'runscript cmdinfo)) (db-host (assoc/default 'db-host cmdinfo)) (run-id (assoc/default 'run-id cmdinfo)) ADDED mlaunch.scm Index: mlaunch.scm ================================================================== --- /dev/null +++ mlaunch.scm @@ -0,0 +1,26 @@ +;; Copyright 2006-2014, Matthew Welland. +;; +;; This program is made available under the GNU GPL version 2.0 or +;; greater. See the accompanying file COPYING for details. +;; +;; This program is distributed WITHOUT ANY WARRANTY; without even the +;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +;; PURPOSE. + +;; strftime('%m/%d/%Y %H:%M:%S','now','localtime') + +;;====================================================================== +;; MLAUNCH +;; +;; take jobs from the given queue and keep launching them keeping +;; the cpu load at the targeted level +;; +;;====================================================================== + +(use sqlite3 srfi-1 posix regex regex-case srfi-69 dot-locking format) +(import (prefix sqlite3 sqlite3:)) + +(declare (unit mlaunch)) +(declare (uses db)) +(declare (uses common)) + Index: newdashboard.scm ================================================================== --- newdashboard.scm +++ newdashboard.scm @@ -78,10 +78,14 @@ ;; (begin ;; (set! *runremote* (string-split (args:get-arg "-host" ":"))) ;; (client:launch)) ;; (client:launch)) +;; ease debugging by loading ~/.dashboardrc +(let ((debugcontrolf (conc (get-environment-variable "HOME") "/.dashboardrc"))) + (if (file-exists? debugcontrolf) + (load debugcontrolf))) (define *dbdir* (conc (configf:lookup *configdat* "setup" "linktree") "/.db")) (define *dbstruct-local* (make-dbr:dbstruct path: *dbdir* local: #t)) (define *db-file-path* (db:dbfile-path 0)) ADDED nmsg-transport.scm Index: nmsg-transport.scm ================================================================== --- /dev/null +++ nmsg-transport.scm @@ -0,0 +1,358 @@ + +;; Copyright 2006-2012, Matthew Welland. +;; +;; This program is made available under the GNU GPL version 2.0 or +;; greater. See the accompanying file COPYING for details. +;; +;; This program is distributed WITHOUT ANY WARRANTY; without even the +;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +;; PURPOSE. + +(require-extension (srfi 18) extras tcp s11n) + +(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) +(import (prefix sqlite3 sqlite3:)) + +(use nanomsg) + +(declare (unit nmsg-transport)) + +(declare (uses common)) +(declare (uses db)) +(declare (uses tests)) +(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. +(declare (uses server)) + +(include "common_records.scm") +(include "db_records.scm") + +;; Transition to pub --> sub with pull <-- push +;; +;; 1. client sends request to server via push to the pull port +;; 2. server puts request in queue or processes immediately as appropriate +;; 3. server puts responses from completed requests into pub port +;; +;; TODO +;; +;; Done Tested +;; [x] [ ] 1. Add columns pullport pubport to servers table +;; [x] [ ] 2. Add rm of monitor.db if older than 11/12/2012 +;; [x] [ ] 3. Add create of pullport and pubport with finding of available ports +;; [x] [ ] 4. Add client compose of request +;; [x] [ ] - name of client: testname/itempath-test_id-hostname +;; [x] [ ] - name of request: callname, params +;; [x] [ ] - request key: f(clientname, callname, params) +;; [x] [ ] 5. Add processing of subscription hits +;; [x] [ ] - done when get key +;; [x] [ ] - return results +;; [x] [ ] 6. Add timeout processing +;; [x] [ ] - after 60 seconds +;; [ ] [ ] i. check server alive, connect to new if necessary +;; [ ] [ ] ii. resend request +;; [ ] [ ] 7. Turn self ping back on + +(define (nmsg-transport:make-server-url hostport #!key (bindall #f)) + (if (not hostport) + #f + (conc "tcp://" (if bindall "*" (car hostport)) ":" (cadr hostport)))) + +(define *server-loop-heart-beat* (current-seconds)) +(define *heartbeat-mutex* (make-mutex)) + +;;====================================================================== +;; S E R V E R +;;====================================================================== + +(define (nmsg-transport:run dbstruct hostn run-id server-id #!key (retrynum 1000)) + (debug:print 2 "Attempting to start the server ...") + (let* ((start-port (portlogger:open-run-close portlogger:find-port)) + (server-thread (make-thread (lambda () + (nmsg-transport:try-start-server dbstruct run-id start-port server-id)) + "server thread")) + (tdbdat (tasks:open-db))) + (thread-start! server-thread) + (thread-sleep! 0.1) + (if (nmsg-transport:ping hostn start-port timeout: 2 expected-key: (current-process-id)) + (let ((interface (if (equal? hostn "-")(get-host-name) hostn))) + (tasks:server-set-interface-port (db:delay-if-busy tdbdat) server-id interface start-port) + (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "dbprep") + (set! *server-info* (list hostn start-port)) ;; probably not needed anymore? currently used by keep-running + (thread-sleep! 3) ;; give some margin for queries to complete before switching from file based access to server based access + ;; (set! *inmemdb* dbstruct) + (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running") + (thread-start! (make-thread + (lambda ()(nmsg-transport:keep-running server-id run-id)) + "keep running")) + (thread-join! server-thread)) + (if (> retrynum 0) + (begin + (debug:print 0 "WARNING: Failed to connect to server (self) on host " hostn ":" start-port ", trying again.") + (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id "failed to start, never received server alive signature") + (portlogger:open-run-close portlogger:set-failed start-port) + (nmsg-transport:run dbstruct hostn run-id server-id)) + (begin + (debug:print 0 "ERROR: could not find an open port to start server on. Giving up") + (exit 1)))))) + +(define (nmsg-transport:try-start-server dbstruct run-id portnum server-id) + (let ((repsoc (nn-socket 'rep))) + (nn-bind repsoc (conc "tcp://*:" portnum)) + (let loop ((msg-in (nn-recv repsoc))) + (let* ((dat (db:string->obj msg-in transport: 'nmsg))) + (debug:print 0 "server, received: " dat) + (let ((result (api:execute-requests dbstruct dat))) + (debug:print 0 "server, sending: " result) + (nn-send repsoc (db:obj->string result transport: 'nmsg))) + (loop (nn-recv repsoc)))))) + +;; all routes though here end in exit ... +;; +(define (nmsg-transport:launch run-id) + (let* ((tdbdat (tasks:open-db)) + (dbstruct (db:setup run-id)) + (hostn (or (args:get-arg "-server") "-"))) + (set! *run-id* run-id) + (set! *inmemdb* dbstruct) + ;; with nbfake daemonize isn't really needed + ;; + ;; (if (args:get-arg "-daemonize") + ;; (begin + ;; (daemon:ize) + ;; (if *alt-log-file* ;; we should re-connect to this port, I think daemon:ize disrupts it + ;; (begin + ;; (current-error-port *alt-log-file*) + ;; (current-output-port *alt-log-file*))))) + (if (server:check-if-running run-id) + (begin + (debug:print-info 0 "Server for run-id " run-id " already running") + (exit 0))) + (let loop ((server-id (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id)) + (remtries 4)) + (if (not server-id) + (if (> remtries 0) + (begin + (thread-sleep! 2) + (if (not (server:check-if-running run-id)) + (loop (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id) + (- remtries 1)) + (begin + (debug:print-info 0 "Another server took the slot, exiting") + (exit 0)))) + (begin + ;; since we didn't get the server lock we are going to clean up and bail out + (debug:print-info 2 "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue") + (tasks:server-delete-records-for-this-pid (db:delay-if-busy tdbdat) " http-transport:launch") + )) + ;; locked in a server id, try to start up + (nmsg-transport:run dbstruct hostn run-id server-id)) + (set! *didsomething* #t) + (exit)))) + +;;====================================================================== +;; S E R V E R U T I L I T I E S +;;====================================================================== + +(define (nmsg-transport:mk-signature) + (message-digest-string (md5-primitive) + (with-output-to-string + (lambda () + (write (list (current-directory) + (argv))))))) + +;;====================================================================== +;; C L I E N T S +;;====================================================================== + +;; ping the server at host:port +;; return the open socket if successful (return-socket == #t) +;; expect the key expected-key returned in payload +;; send our-key or #f as payload +;; +(define (nmsg-transport:ping hostn port #!key (timeout 3)(return-socket #t)(expected-key #f)(our-key #f)(socket #f)) + ;; send a random number along with pid and check that we get it back + (let* ((host (if (or (not hostn) + (equal? hostn "-")) ;; use localhost + (get-host-name) + hostn)) + (req (or socket + (let ((soc (nn-socket 'req))) + (nn-connect soc (conc "tcp://" host ":" port)) + soc))) + (success #t) + (dat (vector "ping" our-key)) + (result (condition-case + (nmsg-transport:client-api-send-receive-raw req dat timeout: timeout) + ((timeout)(set! success #f) #f))) + (key (if success + (vector-ref result 1) + #f))) + (debug:print 0 "success=" success ", key=" key ", expected-key=" expected-key ", equal? " (equal? key expected-key)) + (if (and success + (or (not expected-key) ;; just getting a reply is good enough then + (equal? key expected-key))) + (if return-socket + req + (begin + (if (not socket)(nn-close req)) ;; don't want a side effect of closing socket if handed it + #t)) + (begin + (if (not socket)(nn-close req)) ;; failed to ping, close socket as side effect + #f)))) + +;; send data to server, wait max of timeout seconds for a response. +;; return #( success/fail result ) +;; +;; for effiency it is easier to do the obj->string and string->obj here. +;; +(define (nmsg-transport:client-api-send-receive-raw socreq indat #!key (enable-send #t)(timeout 25)) + (let* ((success #f) + (result #f) + (keepwaiting #t) + (dat (db:obj->string indat transport: 'nmsg)) + (send-recv (make-thread + (lambda () + (nn-send socreq dat) + (let* ((res (nn-recv socreq))) + (set! success #t) + (set! result (db:string->obj res transport: 'nmsg)))) + "send-recv")) + (timeout (make-thread + (lambda () + (let loop ((count 0)) + (thread-sleep! 1) + (debug:print-info 1 "send-receive-raw, still waiting after " count " seconds...") + (if (and keepwaiting (< count timeout)) ;; yes, this is very aproximate + (loop (+ count 1)))) + (if keepwaiting + (begin + (print "timeout waiting for ping") + (thread-terminate! send-recv)))) + "timeout"))) + ;; replace with condition-case? + (handle-exceptions + exn + (set! result "timeout") + (thread-start! timeout) + (thread-start! send-recv) + (thread-join! send-recv) + (if success (thread-terminate! timeout))) + ;; raise timeout error if timed out + (if success + (if (and (vector? result) + (vector-ref result 0)) ;; did it fail at the server? + result ;; nope, all good + (begin + (debug:print 0 "ERROR: error occured at server, info=" (vector-ref result 2)) + (debug:print 0 " client call chain:") + (print-call-chain (current-error-port)) + (debug:print 0 " server call chain:") + (pp (vector-ref result 1) (current-error-port)) + (signal (vector-ref result 0)))) + (signal (make-composite-condition + (make-property-condition 'timeout 'message "nmsg-transport:client-api-send-receive-raw timed out talking to server")))))) + +;; run nmsg-transport:keep-running in a parallel thread to monitor that the db is being +;; used and to shutdown after sometime if it is not. +;; +(define (nmsg-transport:keep-running server-id run-id) + ;; if none running or if > 20 seconds since + ;; server last used then start shutdown + ;; This thread waits for the server to come alive + (let* ((server-info (let loop () + (let ((sdat #f)) + (mutex-lock! *heartbeat-mutex*) + (set! sdat *server-info*) + (mutex-unlock! *heartbeat-mutex*) + (if sdat + (begin + (debug:print-info 0 "keep-running got sdat=" sdat) + sdat) + (begin + (thread-sleep! 0.5) + (loop)))))) + (iface (car server-info)) + (port (cadr server-info)) + (last-access 0) + (tdbdat (tasks:open-db)) + (server-timeout (let ((tmo (configf:lookup *configdat* "server" "timeout"))) + (if (and (string? tmo) + (string->number tmo)) + (* 60 60 (string->number tmo)) + ;; (* 3 24 60 60) ;; default to three days + (* 60 1) ;; default to one minute + ;; (* 60 60 25) ;; default to 25 hours + )))) + (print "Keep-running got server pid " server-id ", using iface " iface " and port " port) + (let loop ((count 0)) + (thread-sleep! 4) ;; no need to do this very often + ;; NB// sync currently does NOT return queue-length + (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1))) + ;; (print "Server running, count is " count) + (if (< count 1) ;; 3x3 = 9 secs aprox + (loop (+ count 1))) + + (mutex-lock! *heartbeat-mutex*) + (set! last-access *last-db-access*) + (mutex-unlock! *heartbeat-mutex*) + (db:sync-touched *inmemdb* run-id force-sync: #t) + (if (and *server-run* + (> (+ last-access server-timeout) + (current-seconds))) + (begin + (debug:print-info 0 "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) + (loop 0)) + (begin + (debug:print-info 0 "Starting to shutdown the server.") + (set! *time-to-exit* #t) + (db:sync-touched *inmemdb* run-id force-sync: #t) + (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id " http-transport:keep-running") + (debug:print-info 0 "Server shutdown complete. Exiting") + (exit) + )))))) + +;;====================================================================== +;; C L I E N T S +;;====================================================================== + +(define (nmsg-transport:client-connect iface portnum) + (let* ((reqsoc (nmsg-transport:ping iface portnum return-socket: #t))) + (vector iface portnum #f #f #f (current-seconds) reqsoc))) + +;; returns result, there is no sucess/fail flag - handled via excpections +;; +(define (nmsg-transport:client-api-send-receive run-id connection-info cmd param #!key (remtries 5)) + ;; NB// In the html version of this routine there is a call to + ;; tasks:kill-server-run-id when there is an exception + (mutex-lock! *http-mutex*) + (let* ((packet (vector cmd param)) + (reqsoc (http-transport:server-dat-get-socket connection-info)) + (res (nmsg-transport:client-api-send-receive-raw reqsoc packet))) +;; (status (vector-ref rawres 0)) +;; (result (vector-ref rawres 1))) + (mutex-unlock! *http-mutex*) + res)) ;; (vector status (if status (db:string->obj result transport: 'nmsg) result)))) + +;;====================================================================== +;; J U N K +;;====================================================================== + +;; DO NOT USE +;; +(define (nmsg-transport:client-signal-handler signum) + (handle-exceptions + exn + (debug:print " ... exiting ...") + (let ((th1 (make-thread (lambda () + (if (not *received-response*) + (receive-message* *runremote*))) ;; flush out last call if applicable + "eat response")) + (th2 (make-thread (lambda () + (debug:print 0 "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.") + (thread-sleep! 3) ;; give the flush three seconds to do it's stuff + (debug:print 0 " Done.") + (exit 4)) + "exit on ^C timer"))) + (thread-start! th2) + (thread-start! th1) + (thread-join! th2)))) + Index: rmt.scm ================================================================== --- rmt.scm +++ rmt.scm @@ -13,10 +13,11 @@ (declare (unit rmt)) (declare (uses api)) (declare (uses tdb)) (declare (uses http-transport)) +(declare (uses nmsg-transport)) ;; ;; THESE ARE ALL CALLED ON THE CLIENT SIDE!!! ;; @@ -32,11 +33,18 @@ ;;====================================================================== ;; S U P P O R T F U N C T I O N S ;;====================================================================== -;; #t means - please start a server! +(define (rmt:call-transport run-id connection-info cmd jparams) + (case (server:get-transport) + ((rpc) ( rpc-transport:client-api-send-receive run-id connection-info cmd jparams)) + ((http) (http-transport:client-api-send-receive run-id connection-info cmd jparams)) + ((fs) ( fs-transport:client-api-send-receive run-id connection-info cmd jparams)) + ((zmq) (zmq-transport:client-api-send-receive run-id connection-info cmd jparams)) + (else ( rpc-transport:client-api-send-receive run-id connection-info cmd jparams)))) + ;; (define (rmt:write-frequency-over-limit? cmd run-id) (and (not (member cmd api:read-only-queries)) (let* ((tmprec (hash-table-ref/default *write-frequency* run-id #f)) (record (if tmprec tmprec @@ -63,63 +71,86 @@ ;; ;; (and (not (rmt:write-frequency-over-limit? cmd run-id)) (if (tasks:server-running-or-starting? (db:delay-if-busy (tasks:open-db)) run-id) (client:setup run-id) #f)))) -;; cmd is a symbol -;; vars is a json string encoding the parameters for the call -;; -(define (rmt:send-receive cmd rid params #!key (attemptnum 0)) +(define *send-receive-mutex* (make-mutex)) ;; should have separate mutex per run-id +(define (rmt:send-receive cmd rid params #!key (attemptnum 1)) ;; start attemptnum at 1 so the modulo below works as expected ;; clean out old connections (mutex-lock! *db-multi-sync-mutex*) - (let ((expire-time (- (current-seconds) 60))) + (let ((expire-time (- (current-seconds) (server:get-timeout) 10))) ;; don't forget the 10 second margin (for-each (lambda (run-id) (let ((connection (hash-table-ref/default *runremote* run-id #f))) - (if (and connection - (< (http-transport:server-dat-get-last-access connection) expire-time)) - (begin - (debug:print-info 0 "Discarding connection to server for run-id " run-id ", too long between accesses") - (hash-table-delete! *runremote* run-id))))) + (if (and connection + (< (http-transport:server-dat-get-last-access connection) expire-time)) + (begin + (debug:print-info 0 "Discarding connection to server for run-id " run-id ", too long between accesses") + ;; SHOULD CLOSE THE CONNECTION HERE + (case *transport-type* + ((nmsg)(nn-close (http-transport:server-dat-get-socket + (hash-table-ref *runremote* run-id))))) + (hash-table-delete! *runremote* run-id))))) (hash-table-keys *runremote*))) (mutex-unlock! *db-multi-sync-mutex*) + ;; (mutex-lock! *send-receive-mutex*) (let* ((run-id (if rid rid 0)) - (connection-info (rmt:get-connection-info run-id)) - (jparams (db:obj->string params))) + (connection-info (rmt:get-connection-info run-id))) + ;; the nmsg method does the encoding under the hood (the http method should be changed to do this also) (if connection-info ;; use the server if have connection info - (let* ((dat (http-transport:client-api-send-receive run-id connection-info cmd jparams)) - (res (if (and dat (vector? dat)) (vector-ref dat 1) #f)) - (success (if (and dat (vector? dat)) (vector-ref dat 0) #f))) + (let* ((dat (case *transport-type* + ((http)(condition-case + (http-transport:client-api-send-receive run-id connection-info cmd params) + ((commfail)(vector #f "communications fail")))) + ((nmsg)(condition-case + (nmsg-transport:client-api-send-receive run-id connection-info cmd params) + ((timeout)(vector #f "timeout talking to server")))) + (else (exit)))) + (success (if (and dat (vector? dat)) (vector-ref dat 0) #f)) + (res (if (and dat (vector? dat)) (vector-ref dat 1) #f))) (http-transport:server-dat-update-last-access connection-info) (if success - (db:string->obj res) - ;; (if (< attemptnum 100) - ;; (begin - ;; (hash-table-delete! *runremote* run-id) - ;; (thread-sleep! 0.5) - ;; (rmt:send-receive cmd rid params attempnum: (+ attemptnum 1))) - ;; (begin - ;; (print-call-chain (current-error-port)) - ;; (debug:print 0 "ERROR: too many attempts to communicate have failed. Giving up. Kill your mtest processes and start over") - ;; (exit 1))))) + (begin + ;; (mutex-unlock! *send-receive-mutex*) + (case *transport-type* + ((http) res) ;; (db:string->obj res)) + ((nmsg) res))) ;; (vector-ref res 1))) (begin ;; let ((new-connection-info (client:setup run-id))) - (debug:print 0 "WARNING: Communication failed, trying call to http-transport:client-api-send-receive again.") + (debug:print 0 "WARNING: Communication failed, trying call to rmt:send-receive again.") + ;; (case *transport-type* + ;; ((nmsg)(nn-close (http-transport:server-dat-get-socket connection-info)))) (hash-table-delete! *runremote* run-id) ;; don't keep using the same connection + ;; NOTE: killing server causes this process to block forever. No idea why. Dec 2. + ;; (if (eq? (modulo attemptnum 5) 0) + ;; (tasks:kill-server-run-id run-id tag: "api-send-receive-failed")) + ;; (mutex-unlock! *send-receive-mutex*) ;; close the mutex here to allow other threads access to communications + (tasks:start-and-wait-for-server (tasks:open-db) run-id 15) + ;; (nmsg-transport:client-api-send-receive run-id connection-info cmd param remtries: (- remtries 1)))))) ;; no longer killing the server in http-transport:client-api-send-receive ;; may kill it here but what are the criteria? ;; start with three calls then kill server - (if (eq? attemptnum 3)(tasks:kill-server-run-id run-id)) - (thread-sleep! 2) + ;; (if (eq? attemptnum 3)(tasks:kill-server-run-id run-id)) + ;; (thread-sleep! 2) (rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1))))) - (if (and (< attemptnum 10) - (tasks:need-server run-id)) + ;; no connection info? try to start a server + (if (and (< attemptnum 15) + (member cmd api:write-queries)) (begin + (hash-table-delete! *runremote* run-id) + ;; (mutex-unlock! *send-receive-mutex*) (tasks:start-and-wait-for-server (db:delay-if-busy (tasks:open-db)) run-id 10) - (rmt:send-receive cmd rid params (+ attemptnum 1))) - (rmt:open-qry-close-locally cmd run-id params))))) + ;; (client:setup run-id) ;; client setup happens in rmt:get-connection-info + (thread-sleep! (random 5)) ;; give some time to settle and minimize collison? + (rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1))) + (begin + ;; (debug:print 0 "ERROR: Communication failed!") + ;; (mutex-unlock! *send-receive-mutex*) + ;; (exit) + (rmt:open-qry-close-locally cmd run-id params) + ))))) (define (rmt:update-db-stats run-id rawcmd params duration) (mutex-lock! *db-stats-mutex*) (handle-exceptions exn @@ -184,11 +215,12 @@ (set! *dbstruct-db* db) db))) (db-file-path (db:dbfile-path 0))) ;; (read-only (not (file-read-access? db-file-path))) (let* ((start (current-milliseconds)) - (res (api:execute-requests dbstruct-local (symbol->string cmd) params)) + (resdat (api:execute-requests dbstruct-local (vector (symbol->string cmd) params))) + (res (vector-ref resdat 1)) (duration (- (current-milliseconds) start))) (rmt:update-db-stats run-id cmd params duration) ;; mark this run as dirty if this was a write (if (not (member cmd api:read-only-queries)) (let ((start-time (current-seconds))) @@ -199,17 +231,19 @@ (mutex-unlock! *db-multi-sync-mutex*))) res))) (define (rmt:send-receive-no-auto-client-setup connection-info cmd run-id params) (let* ((run-id (if run-id run-id 0)) - (jparams (db:obj->string params)) ;; (rmt:dat->json-str params)) - (dat (http-transport:client-api-send-receive run-id connection-info cmd jparams))) - (if (and dat (vector-ref dat 0)) - (db:string->obj (vector-ref dat 1)) - (begin - (debug:print 0 "ERROR: rmt:send-receive-no-auto-client-setup failed, attempting to continue. Got " dat) - dat)))) + ;; (jparams (db:obj->string params)) ;; (rmt:dat->json-str params)) + (res (http-transport:client-api-send-receive run-id connection-info cmd params))) + (if (and res (vector-ref res 0)) + res + #f))) +;; (db:string->obj (vector-ref dat 1)) +;; (begin +;; (debug:print 0 "ERROR: rmt:send-receive-no-auto-client-setup failed, attempting to continue. Got " dat) +;; dat)))) ;; Wrap json library for strings (why the ports crap in the first place?) (define (rmt:dat->json-str dat) (with-output-to-string (lambda () @@ -242,14 +276,17 @@ (define (rmt:login run-id) (rmt:send-receive 'login run-id (list *toppath* megatest-version run-id *my-client-signature*))) ;; This login does no retries under the hood - it acts a bit like a ping. +;; Deprecated for nmsg-transport. ;; (define (rmt:login-no-auto-client-setup connection-info run-id) - (rmt:send-receive-no-auto-client-setup connection-info 'login run-id (list *toppath* megatest-version run-id *my-client-signature*))) - + (case *transport-type* + ((http)(rmt:send-receive-no-auto-client-setup connection-info 'login run-id (list *toppath* megatest-version run-id *my-client-signature*))) + ((nmsg)(nmsg-transport:client-api-send-receive run-id connection-info 'login (list *toppath* megatest-version run-id *my-client-signature*))))) + ;; hand off a call to one of the db:queries statements ;; added run-id to make looking up the correct db possible ;; (define (rmt:general-call stmtname run-id . params) (rmt:send-receive 'general-call run-id (append (list stmtname run-id) params))) @@ -325,33 +362,31 @@ run-ids (rmt:get-all-run-ids))) (result '())) (if (null? run-id-list) '() - (for-each - (lambda (th) - - (thread-join! th)) ;; I assume that joining completed threads just moves on - (let loop ((hed (car run-id-list)) - (tal (cdr run-id-list)) - (threads '())) - (let* ((newthread (make-thread - (lambda () - (let ((res (rmt:send-receive 'get-tests-for-run-mindata hed (list hed testpatt states status not-in)))) - (if (list? res) - (begin - (mutex-lock! multi-run-mutex) - (set! result (append result res)) - (mutex-unlock! multi-run-mutex)) - (debug:print 0 "ERROR: get-tests-for-run-mindata failed for run-id " hed ", testpatt " testpatt ", states " states ", status " status ", not-in " not-in)))) - (conc "multi-run-thread for run-id " hed))) - (newthreads (cons newthread threads))) - (thread-start! newthread) - (thread-sleep! 0.5) ;; give that thread some time to start - (if (null? tal) - newthreads - (loop (car tal)(cdr tal) newthreads)))))) + (let loop ((hed (car run-id-list)) + (tal (cdr run-id-list)) + (threads '())) + (if (> (length threads) 5) + (loop hed tal (filter (lambda (th)(not (member (thread-state th) '(terminated dead)))) threads)) + (let* ((newthread (make-thread + (lambda () + (let ((res (rmt:send-receive 'get-tests-for-run-mindata hed (list hed testpatt states status not-in)))) + (if (list? res) + (begin + (mutex-lock! multi-run-mutex) + (set! result (append result res)) + (mutex-unlock! multi-run-mutex)) + (debug:print 0 "ERROR: get-tests-for-run-mindata failed for run-id " hed ", testpatt " testpatt ", states " states ", status " status ", not-in " not-in)))) + (conc "multi-run-thread for run-id " hed))) + (newthreads (cons newthread threads))) + (thread-start! newthread) + (thread-sleep! 0.5) ;; give that thread some time to start + (if (null? tal) + newthreads + (loop (car tal)(cdr tal) newthreads)))))) result)) ;; ;; IDEA: Threadify these - they spend a lot of time waiting ... ;; ;; ;; (define (rmt:get-tests-for-runs-mindata run-ids testpatt states status not-in) @@ -482,11 +517,12 @@ (define (rmt:get-runs-by-patt keys runnamepatt targpatt offset limit) (rmt:send-receive 'get-runs-by-patt #f (list keys runnamepatt targpatt offset limit))) (define (rmt:find-and-mark-incomplete run-id ovr-deadtime) - (rmt:send-receive 'find-and-mark-incomplete run-id (list run-id ovr-deadtime))) + (if (rmt:send-receive 'have-incompletes? run-id (list run-id ovr-deadtime)) + (rmt:send-receive 'mark-incomplete run-id (list run-id ovr-deadtime)))) ;;====================================================================== ;; M U L T I R U N Q U E R I E S ;;====================================================================== @@ -573,5 +609,18 @@ (define (rmt:test-data-rollup run-id test-id status) (rmt:send-receive 'test-data-rollup run-id (list run-id test-id status))) (define (rmt:csv->test-data run-id test-id csvdata) (rmt:send-receive 'csv->test-data run-id (list run-id test-id csvdata))) + +;;====================================================================== +;; T A S K S +;;====================================================================== + +(define (rmt:tasks-find-task-queue-records target run-name test-patt state-patt action-patt) + (rmt:send-receive 'find-task-queue-records #f (list target run-name test-patt state-patt action-patt))) + +(define (rmt:tasks-add action owner target runname testpatt params) + (rmt:send-receive 'tasks-add #f (list action owner target runname testpatt params))) + +(define (rmt:tasks-set-state-given-param-key param-key new-state) + (rmt:send-receive 'tasks-set-state-given-param-key #f (list param-key new-state))) ADDED rpc-transport.scm Index: rpc-transport.scm ================================================================== --- /dev/null +++ rpc-transport.scm @@ -0,0 +1,226 @@ + +;; Copyright 2006-2012, Matthew Welland. +;; +;; This program is made available under the GNU GPL version 2.0 or +;; greater. See the accompanying file COPYING for details. +;; +;; This program is distributed WITHOUT ANY WARRANTY; without even the +;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +;; PURPOSE. + +(require-extension (srfi 18) extras tcp s11n rpc) +(import (prefix rpc rpc:)) + +(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) +(import (prefix sqlite3 sqlite3:)) + +(declare (unit rpc-transport)) + +(declare (uses common)) +(declare (uses db)) +(declare (uses tests)) +(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. + +(include "common_records.scm") +(include "db_records.scm") + +;; procstr is the name of the procedure to be called as a string +(define (rpc-transport:autoremote procstr params) + (handle-exceptions + exn + (begin + (debug:print 1 "Remote failed for " proc " " params) + (apply (eval (string->symbol procstr)) params)) + ;; (if *runremote* + ;; (apply (eval (string->symbol (conc "remote:" procstr))) params) + (apply (eval (string->symbol procstr)) params))) + +;; all routes though here end in exit ... +;; +;; start_server? +;; +(define (rpc-transport:launch run-id) + (set! *run-id* run-id) + (if (args:get-arg "-daemonize") + (daemon:ize)) + (if (server:check-if-running run-id) + (begin + (debug:print 0 "INFO: Server for run-id " run-id " already running") + (exit 0))) + (let loop ((server-id (open-run-close tasks:server-lock-slot tasks:open-db run-id)) + (remtries 4)) + (if (not server-id) + (if (> remtries 0) + (begin + (thread-sleep! 2) + (loop (open-run-close tasks:server-lock-slot tasks:open-db run-id) + (- remtries 1))) + (begin + ;; since we didn't get the server lock we are going to clean up and bail out + (debug:print-info 2 "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue") + (open-run-close tasks:server-delete-records-for-this-pid tasks:open-db " rpc-transport:launch"))) + (begin + (rpc-transport:run (if (args:get-arg "-server")(args:get-arg "-server") "-") run-id server-id) + (exit))))) + +(define (rpc-transport:run hostn run-id server-id) + (debug:print 2 "Attempting to start the rpc server ...") + ;; (trace rpc:publish-procedure!) + + (rpc:publish-procedure! 'server:login server:login) + (rpc:publish-procedure! 'testing (lambda () "Just testing")) + + (let* ((db #f) + (hostname (get-host-name)) + (ipaddrstr (let ((ipstr (if (string=? "-" hostn) + ;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") + (server:get-best-guess-address hostname) + #f))) + (if ipstr ipstr hostn))) ;; hostname))) + (start-port (open-run-close tasks:server-get-next-port tasks:open-db)) + (link-tree-path (configf:lookup *configdat* "setup" "linktree")) + (rpc:listener (rpc-transport:find-free-port-and-open (rpc:default-server-port))) + (th1 (make-thread + (lambda () + ((rpc:make-server rpc:listener) #t)) + "rpc:server")) + ;; (cute (rpc:make-server rpc:listener) "rpc:server") + ;; 'rpc:server)) + (hostname (if (string=? "-" hostn) + (get-host-name) + hostn)) + (ipaddrstr (if (string=? "-" hostn) + (server:get-best-guess-address hostname) ;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") + #f)) + (portnum (rpc:default-server-port)) + (host:port (conc (if ipaddrstr ipaddrstr hostname) ":" portnum)) + (tdb (tasks:open-db))) + (thread-start! th1) + (set! db *inmemdb*) + (open-run-close tasks:server-set-interface-port + tasks:open-db + server-id + ipaddrstr portnum) + (debug:print 0 "Server started on " host:port) + + ;; (trace rpc:publish-procedure!) + ;; (rpc:publish-procedure! 'server:login server:login) + ;; (rpc:publish-procedure! 'testing (lambda () "Just testing")) + + ;;====================================================================== + ;; ;; end of publish-procedure section + ;;====================================================================== + ;; + (on-exit (lambda () + (open-run-close tasks:server-set-state! tasks:open-db server-id "stopped"))) + + (set! *rpc:listener* rpc:listener) + (tasks:server-set-state! tdb server-id "running") + (set! *inmemdb* (db:setup run-id)) + ;; if none running or if > 20 seconds since + ;; server last used then start shutdown + (let loop ((count 0)) + (thread-sleep! 5) ;; no need to do this very often + (let ((numrunning -1)) ;; (db:get-count-tests-running db))) + (if (or (> numrunning 0) + (> (+ *last-db-access* 60)(current-seconds))) + (begin + (debug:print-info 0 "Server continuing, tests running: " numrunning ", seconds since last db access: " (- (current-seconds) *last-db-access*)) + (loop (+ 1 count))) + (begin + (debug:print-info 0 "Starting to shutdown the server side") + (open-run-close tasks:server-delete-record tasks:open-db server-id " rpc-transport:try-start-server stop") + (thread-sleep! 10) + (debug:print-info 0 "Max cached queries was " *max-cache-size*) + (debug:print-info 0 "Server shutdown complete. Exiting") + )))))) + +(define (rpc-transport:find-free-port-and-open port) + (handle-exceptions + exn + (begin + (print "Failed to bind to port " (rpc:default-server-port) ", trying next port") + (rpc-transport:find-free-port-and-open (+ port 1))) + (rpc:default-server-port port) + (tcp-read-timeout 240000) + (tcp-listen (rpc:default-server-port) 10000))) + +(define (rpc-transport:ping run-id host port) + (handle-exceptions + exn + (begin + (print "SERVER_NOT_FOUND") + (exit 1)) + (let ((login-res ((rpc:procedure 'server:login host port) *toppath*))) + (if (and (list? login-res) + (car login-res)) + (begin + (print "LOGIN_OK") + (exit 0)) + (begin + (print "LOGIN_FAILED") + (exit 1)))))) + +(define (rpc-transport:client-setup run-id #!key (remtries 10)) + (if *runremote* + (begin + (debug:print 0 "ERROR: Attempt to connect to server but already connected") + #f) + (let* ((host-info (hash-table-ref/default *runremote* run-id #f))) ;; (open-run-close db:get-var #f "SERVER")) + (if host-info + (let ((iface (car host-info)) + (port (cadr host-info)) + (ping-res ((rpc:procedure 'server:login host port) *toppath*))) + (if ping-res + (let ((server-dat (list iface port #f #f #f))) + (hash-table-set! *runremote* run-id server-dat) + server-dat) + (begin + (server:try-running run-id) + (thread-sleep! 2) + (rpc-transport:client-setup run-id (- remtries 1))))) + (let* ((server-db-info (open-run-close tasks:get-server tasks:open-db run-id))) + (debug:print-info 0 "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries) + (if server-db-info + (let* ((iface (tasks:hostinfo-get-interface server-db-info)) + (port (tasks:hostinfo-get-port server-db-info)) + (server-dat (list iface port #f #f #f)) + (ping-res ((rpc:procedure 'server:login host port) *toppath*))) + (if start-res + (begin + (hash-table-set! *runremote* run-id server-dat) + server-dat) + (begin + (server:try-running run-id) + (thread-sleep! 2) + (rpc-transport:client-setup run-id (- remtries 1))))) + (begin + (server:try-running run-id) + (thread-sleep! 2) + (rpc-transport:client-setup run-id (- remtries 1))))))))) +;; +;; (port (if (and hostinfo (> (length hostdat) 1))(cadr hostdat) #f))) +;; (if (and port +;; (string->number port)) +;; (let ((portn (string->number port))) +;; (debug:print-info 2 "Setting up to connect to host " host ":" port) +;; (handle-exceptions +;; exn +;; (begin +;; (debug:print 0 "ERROR: Failed to open a connection to the server at host: " host " port: " port) +;; (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) +;; ;; (open-run-close +;; ;; (lambda (db . param) +;; ;; (sqlite3:execute db "DELETE FROM metadat WHERE var='SERVER'")) +;; ;; #f) +;; (set! *runremote* #f)) +;; (if (and (not (args:get-arg "-server")) ;; no point in the server using the server using the server +;; ((rpc:procedure 'server:login host portn) *toppath*)) +;; (begin +;; (debug:print-info 2 "Logged in and connected to " host ":" port) +;; (set! *runremote* (vector host portn))) +;; (begin +;; (debug:print-info 2 "Failed to login or connect to " host ":" port) +;; (set! *runremote* #f))))) +;; (debug:print-info 2 "no server available"))))) + Index: runs.scm ================================================================== --- runs.scm +++ runs.scm @@ -223,17 +223,17 @@ (set-signal-handler! signal/int (lambda (signum) (signal-mask! signum) (print "Received signal " signum ", cleaning up before exit. Please wait...") (let ((tdbdat (tasks:open-db))) - (tasks:set-state-given-param-key (db:delay-if-busy tdbdat) task-key "killed")) + (rmt:tasks-set-state-given-param-key task-key "killed")) (print "Killed by signal " signum ". Exiting") (exit))) ;; register this run in monitor.db - (tasks:add (db:delay-if-busy tdbdat) "run-tests" user target runname test-patts task-key) ;; params) - (tasks:set-state-given-param-key (db:delay-if-busy tdbdat) task-key "running") + (rmt:tasks-add "run-tests" user target runname test-patts task-key) ;; params) + (rmt:tasks-set-state-given-param-key task-key "running") (runs:set-megatest-env-vars run-id inkeys: keys inrunname: runname) ;; these may be needed by the launching process (if (file-exists? runconfigf) (setup-env-defaults runconfigf run-id *already-seen-runconfig-info* keyvals target) (debug:print 0 "WARNING: You do not have a run config file: " runconfigf)) @@ -394,11 +394,11 @@ (if (not (hash-table-ref/default flags "-rerun" #f)) (hash-table-set! flags "-rerun" "STUCK/DEAD,n/a,ZERO_ITEMS")) (runs:run-tests target runname test-patts user flags run-count: (- run-count 1))))) (debug:print-info 0 "No tests to run"))) (debug:print-info 4 "All done by here") - (tasks:set-state-given-param-key (db:delay-if-busy tdbdat) task-key "done") + (rmt:tasks-set-state-given-param-key task-key "done") ;; (sqlite3:finalize! tasks-db) )) ;; loop logic. These are used in runs:run-tests-queue to make it a bit more readable. @@ -944,11 +944,11 @@ (num-running (rmt:get-count-tests-running-for-run-id run-id))) ;; every couple minutes verify the server is there for this run (if (and (common:low-noise-print 60 "try start server" run-id) (tasks:need-server run-id)) - (tasks:start-and-wait-for-server tdbdat run-id 10)) + (tasks:start-and-wait-for-server tdbdat run-id 10)) ;; NOTE: delay and wait is done under the hood (if (> num-running 0) (set! last-time-some-running (current-seconds))) (if (> (current-seconds)(+ last-time-some-running 240)) @@ -1444,11 +1444,11 @@ (case action ((remove-runs) (if (tasks:need-server run-id)(tasks:start-and-wait-for-server tdbdat run-id 10)) ;; seek and kill in flight -runtests with % as testpatt here (if (equal? testpatt "%") - (tasks:kill-runner (db:delay-if-busy tdbdat) target run-name) + (tasks:kill-runner target run-name) (debug:print 0 "not attempting to kill any run launcher processes as testpatt is " testpatt)) (debug:print 1 "Removing tests for run: " runkey " " (db:get-value-by-header run header "runname"))) ((set-state-status) (if (tasks:need-server run-id)(tasks:start-and-wait-for-server tdbdat run-id 10)) (debug:print 1 "Modifying state and staus for tests for run: " runkey " " (db:get-value-by-header run header "runname"))) Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -20,12 +20,13 @@ (declare (uses common)) (declare (uses db)) (declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. (declare (uses synchash)) (declare (uses http-transport)) +(declare (uses rpc-transport)) +(declare (uses nmsg-transport)) (declare (uses launch)) -;; (declare (uses zmq-transport)) (declare (uses daemon)) (include "common_records.scm") (include "db_records.scm") @@ -47,37 +48,59 @@ ;; all routes though here end in exit ... ;; ;; start_server ;; (define (server:launch run-id) - (http-transport:launch run-id)) - -;;====================================================================== -;; Q U E U E M A N A G E M E N T -;;====================================================================== - -;; We don't want to flush the queue if it was just flushed -(define *server:last-write-flush* (current-milliseconds)) + (case *transport-type* + ((http)(http-transport:launch run-id)) + ((nmsg)(nmsg-transport:launch run-id)) + ((rpc) (rpc-transport:launch run-id)) + (else (debug:print 0 "ERROR: unknown server type " *transport-type*)))) +;; (else (debug:print 0 "ERROR: No known transport set, transport=" transport ", using rpc") +;; (rpc-transport:launch run-id))))) ;;====================================================================== ;; S E R V E R U T I L I T I E S ;;====================================================================== +;; Get the transport +(define (server:get-transport) + (if *transport-type* + *transport-type* + (let ((ttype (string->symbol + (or (args:get-arg "-transport") + (configf:lookup *configdat* "server" "transport") + "rpc")))) + (set! *transport-type* ttype) + ttype))) + ;; Generate a unique signature for this server (define (server:mk-signature) (message-digest-string (md5-primitive) (with-output-to-string (lambda () (write (list (current-directory) (argv))))))) - ;; When using zmq this would send the message back (two step process) ;; with spiffy or rpc this simply returns the return data to be returned ;; (define (server:reply return-addr query-sig success/fail result) - (db:obj->string (vector success/fail query-sig result))) + (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result) + ;; (send-message pubsock target send-more: #t) + ;; (send-message pubsock + (case (server:get-transport) + ((rpc) (db:obj->string (vector success/fail query-sig result))) + ((http) (db:obj->string (vector success/fail query-sig result))) + ((zmq) + (let ((pub-socket (vector-ref *runremote* 1))) + (send-message pub-socket return-addr send-more: #t) + (send-message pub-socket (db:obj->string (vector success/fail query-sig result))))) + ((fs) result) + (else + (debug:print 0 "ERROR: unrecognised transport type: " *transport-type*) + result))) ;; Given a run id start a server process ### NOTE ### > file 2>&1 ;; if the run-id is zero and the target-host is set ;; try running on that host ;; @@ -108,10 +131,16 @@ (system (conc "nbfake " cmdln)) (unsetenv "TARGETHOST_LOGF") (if (get-environment-variable "TARGETHOST")(unsetenv "TARGETHOST")) ;; (system cmdln) (pop-directory))) + +(define (server:get-client-signature) + (if *my-client-signature* *my-client-signature* + (let ((sig (server:mk-signature))) + (set! *my-client-signature* sig) + *my-client-signature*))) ;; kind start up of servers, wait 40 seconds before allowing another server for a given ;; run-id to be launched (define (server:kind-run run-id) (let ((last-run-time (hash-table-ref/default *server-kind-run* run-id #f))) @@ -136,13 +165,17 @@ ;; note: client:start will set *runremote*. this needs to be changed ;; also, client:start will login to the server, also need to change that. ;; ;; client:start returns #t if login was successful. ;; - (let ((res (server:ping-server run-id - (tasks:hostinfo-get-interface server) - (tasks:hostinfo-get-port server)))) + (let ((res (case *transport-type* + ((http)(server:ping-server run-id + (tasks:hostinfo-get-interface server) + (tasks:hostinfo-get-port server))) + ((nmsg)(nmsg-transport:ping (tasks:hostinfo-get-interface server) + (tasks:hostinfo-get-port server) + timeout: 2))))) ;; if the server didn't respond we must remove the record (if res #t (begin (debug:print-info 0 "server at " server " not responding, removing record") @@ -196,5 +229,27 @@ (case (string->symbol res) ((NOREPLY) #f) ((LOGIN_OK) #t) (else #f)) (loop (read-line) inl)))))) + +(define (server:login toppath) + (lambda (toppath) + (set! *last-db-access* (current-seconds)) + (if (equal? *toppath* toppath) + (begin + ;; (debug:print-info 2 "login successful") + #t) + (begin + ;; (debug:print-info 2 "login failed") + #f)))) + +(define (server:get-timeout) + (let ((tmo (configf:lookup *configdat* "server" "timeout"))) + (if (and (string? tmo) + (string->number tmo)) + (* 60 60 (string->number tmo)) + ;; (* 3 24 60 60) ;; default to three days + (* 60 1) ;; default to one minute + ;; (* 60 60 25) ;; default to 25 hours + ))) + Index: tasks.scm ================================================================== --- tasks.scm +++ tasks.scm @@ -12,10 +12,11 @@ (use sqlite3 srfi-1 posix regex regex-case srfi-69 dot-locking format) (import (prefix sqlite3 sqlite3:)) (declare (unit tasks)) (declare (uses db)) +(declare (uses rmt)) (declare (uses common)) (include "task_records.scm") ;;====================================================================== @@ -103,21 +104,24 @@ (db:set-sync mdb) ;; (sqlite3:execute mdb (conc "PRAGMA synchronous = 0;")) ;; (if (or (and (not exists) ;; (file-write-access? *toppath*)) ;; (not (file-read-access? dbpath))) ;; (begin - (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS tasks_queue (id INTEGER PRIMARY KEY, - action TEXT DEFAULT '', - owner TEXT, - state TEXT DEFAULT 'new', - target TEXT DEFAULT '', - name TEXT DEFAULT '', - testpatt TEXT DEFAULT '', - keylock TEXT, - params TEXT, - creation_time TIMESTAMP, - execution_time TIMESTAMP);") + ;; + ;; TASKS QUEUE MOVED TO main.db + ;; + ;; (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS tasks_queue (id INTEGER PRIMARY KEY, + ;; action TEXT DEFAULT '', + ;; owner TEXT, + ;; state TEXT DEFAULT 'new', + ;; target TEXT DEFAULT '', + ;; name TEXT DEFAULT '', + ;; testpatt TEXT DEFAULT '', + ;; keylock TEXT, + ;; params TEXT, + ;; creation_time TIMESTAMP, + ;; execution_time TIMESTAMP);") (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS monitors (id INTEGER PRIMARY KEY, pid INTEGER, start_time TIMESTAMP, last_update TIMESTAMP, hostname TEXT, @@ -145,11 +149,10 @@ login_time TIMESTAMP, logout_time TIMESTAMP DEFAULT -1, CONSTRAINT clients_constraint UNIQUE (pid,hostname));") ;)) - (sqlite3:execute mdb "DELETE FROM tasks_queue WHERE state='done' AND creation_time < ?;" (- (current-seconds)(* 24 60 60))) ;; remove older than 24 hrs (set! *task-db* (cons mdb dbpath)) *task-db*)))) ;;====================================================================== ;; Server and client management @@ -167,29 +170,30 @@ (define (tasks:server-lock-slot mdb run-id) (tasks:server-clean-out-old-records-for-run-id mdb run-id " tasks:server-lock-slot") (if (< (tasks:num-in-available-state mdb run-id) 4) (begin (tasks:server-set-available mdb run-id) - ;; (thread-sleep! 2) ;; Try removing this. It may not be needed. + (thread-sleep! (/ (random 1500) 1000)) ;; (thread-sleep! 2) ;; Try removing this. It may not be needed. (tasks:server-am-i-the-server? mdb run-id)) #f)) ;; register that this server may come online (first to register goes though with the process) (define (tasks:server-set-available mdb run-id) (sqlite3:execute mdb "INSERT INTO servers (pid,hostname,port,pubport,start_time, priority,state,mt_version,heartbeat, interface,transport,run_id) VALUES(?, ?, ?, ?, strftime('%s','now'), ?, ?, ?,-1,?, ?, ?);" - (current-process-id) ;; pid - (get-host-name) ;; hostname - -1 ;; port - -1 ;; pubport - (random 1000) ;; priority (used a tiebreaker on get-available) - "available" ;; state - (common:version-signature) ;; mt_version - -1 ;; interface - "http" ;; transport + (current-process-id) ;; pid + (get-host-name) ;; hostname + -1 ;; port + -1 ;; pubport + (random 1000) ;; priority (used a tiebreaker on get-available) + "available" ;; state + (common:version-signature) ;; mt_version + -1 ;; interface + ;; (conc (server:get-transport)) ;; transport + (conc *transport-type*) ;; transport run-id )) (define (tasks:num-in-available-state mdb run-id) (let ((res 0)) @@ -361,15 +365,15 @@ (maxqry (cdr (rmt:get-max-query-average run-id))) (threshold (string->number (or (configf:lookup *configdat* "server" "server-query-threshold") "10")))) (cond (forced (if (common:low-noise-print 60 run-id "server required is set") - (debug:print-info 0 "Server required is set, starting server.")) + (debug:print-info 0 "Server required is set, starting server for run-id " run-id ".")) #t) ((> maxqry threshold) (if (common:low-noise-print 60 run-id "Max query time execeeded") - (debug:print-info 0 "Max avg query time of " maxqry "ms exceeds limit of " threshold "ms, starting server.")) + (debug:print-info 0 "Max avg query time of " maxqry "ms exceeds limit of " threshold "ms, server needed for run-id " run-id ".")) #t) (else #f)))) ;; try to start a server and wait for it to be available @@ -381,12 +385,13 @@ (if (and (not server-dat) (< delay-time delay-max-tries)) (begin (if (common:low-noise-print 60 "tasks:start-and-wait-for-server" run-id) (debug:print 0 "Try starting server for run-id " run-id)) + (thread-sleep! (/ (random 2000) 1000)) (server:kind-run run-id) - (thread-sleep! (min delay-time 5)) + (thread-sleep! (min delay-time 1)) (loop (tasks:get-server (db:delay-if-busy tdbdat) run-id)(+ delay-time 1)))))) (define (tasks:get-all-servers mdb) (let ((res '())) (sqlite3:for-each-row @@ -422,167 +427,18 @@ (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id tag) ) (debug:print-info 0 "No server found for run-id " run-id ", nothing to kill")) ;; (sqlite3:finalize! tdb) )) - -;;====================================================================== -;; Tasks and Task monitors -;;====================================================================== - - -;;====================================================================== -;; Tasks -;;====================================================================== - - - -;;====================================================================== -;; Task Monitors -;;====================================================================== - -(define (tasks:register-monitor db mdb) - (let* ((pid (current-process-id)) - (hostname (get-host-name)) - (userinfo (user-information (current-user-id))) - (username (car userinfo))) - (print "Register monitor, pid: " pid ", hostname: " hostname ", username: " username) - (sqlite3:execute mdb "INSERT INTO monitors (pid,start_time,last_update,hostname,username) VALUES (?,strftime('%s','now'),strftime('%s','now'),?,?);" - pid hostname username))) - -(define (tasks:get-num-alive-monitors mdb) - (let ((res 0)) - (sqlite3:for-each-row - (lambda (count) - (set! res count)) - mdb - "SELECT count(id) FROM monitors WHERE last_update < (strftime('%s','now') - 300) AND username=?;" - (car (user-information (current-user-id)))) - res)) - -;; register a task -(define (tasks:add mdb action owner target runname testpatt params) - (sqlite3:execute mdb "INSERT INTO tasks_queue (action,owner,state,target,name,testpatt,params,creation_time,execution_time) - VALUES (?,?,'new',?,?,?,?,strftime('%s','now'),0);" - action - owner - target - runname - testpatt - (if params params ""))) - -(define (keys:key-vals-hash->target keys key-params) - (let ((tmp (hash-table-ref/default key-params (vector-ref (car keys) 0) ""))) - (if (> (length keys) 1) - (for-each (lambda (key) - (set! tmp (conc tmp "/" (hash-table-ref/default key-params (vector-ref key 0) "")))) - (cdr keys))) - tmp)) - -;; for use from the gui -(define (tasks:add-from-params mdb action keys key-params var-params) - (let ((target (keys:key-vals-hash->target keys key-params)) - (owner (car (user-information (current-user-id)))) - (runname (hash-table-ref/default var-params "runname" #f)) - (testpatts (hash-table-ref/default var-params "testpatts" "%")) - (params (hash-table-ref/default var-params "params" ""))) - (tasks:add mdb action owner target runname testpatts params))) - -;; return one task from those who are 'new' OR 'waiting' AND more than 10sec old -;; -(define (tasks:snag-a-task mdb) - (let ((res #f) - (keytxt (conc (current-process-id) "-" (get-host-name) "-" (car (user-information (current-user-id)))))) - - ;; first randomly set a new to pid-hostname-hostname - (sqlite3:execute - mdb - "UPDATE tasks_queue SET keylock=? WHERE id IN - (SELECT id FROM tasks_queue - WHERE state='new' OR - (state='waiting' AND (strftime('%s','now')-execution_time) > 10) OR - state='reset' - ORDER BY RANDOM() LIMIT 1);" keytxt) - - (sqlite3:for-each-row - (lambda (id . rem) - (set! res (apply vector id rem))) - mdb - "SELECT id,action,owner,state,target,name,test,item,params,creation_time,execution_time FROM tasks_queue WHERE keylock=? ORDER BY execution_time ASC LIMIT 1;" keytxt) - (if res ;; yep, have work to be done - (begin - (sqlite3:execute mdb "UPDATE tasks_queue SET state='inprogress',execution_time=strftime('%s','now') WHERE id=?;" - (tasks:task-get-id res)) - res) - #f))) - -(define (tasks:reset-stuck-tasks mdb) - (let ((res '())) - (sqlite3:for-each-row - (lambda (id delta) - (set! res (cons id res))) - mdb - "SELECT id,strftime('%s','now')-execution_time AS delta FROM tasks_queue WHERE state='inprogress' AND delta>700 ORDER BY delta DESC LIMIT 2;") - (sqlite3:execute - mdb - (conc "UPDATE tasks_queue SET state='reset' WHERE id IN ('" (string-intersperse (map conc res) "','") "');")))) - -;; return all tasks in the tasks_queue table -;; -(define (tasks:get-tasks mdb types states) - (let ((res '())) - (sqlite3:for-each-row - (lambda (id . rem) - (set! res (cons (apply vector id rem) res))) - mdb - (conc "SELECT id,action,owner,state,target,name,test,item,params,creation_time,execution_time - FROM tasks_queue " - ;; WHERE - ;; state IN " statesstr " AND - ;; action IN " actionsstr - " ORDER BY creation_time DESC;")) - res)) - -;; remove tasks given by a string of numbers comma separated -(define (tasks:remove-queue-entries mdb task-ids) - (sqlite3:execute mdb (conc "DELETE FROM tasks_queue WHERE id IN (" task-ids ");"))) - -;; -(define (tasks:start-monitor db mdb) - (if (> (tasks:get-num-alive-monitors mdb) 2) ;; have two running, no need for more - (debug:print-info 1 "Not starting monitor, already have more than two running") - (let* ((megatestdb (conc *toppath* "/megatest.db")) - (monitordbf (conc (configf:lookup *configdat* "setup" "linktree") "/.db/monitor.db")) - (last-db-update 0)) ;; (file-modification-time megatestdb))) - (task:register-monitor mdb) - (let loop ((count 0) - (next-touch 0)) ;; next-touch is the time where we need to update last_update - ;; if the db has been modified we'd best look at the task queue - (let ((modtime (file-modification-time megatestdbpath ))) - (if (> modtime last-db-update) - (tasks:process-queue db mdb last-db-update megatestdb next-touch)) - ;; WARNING: Possible race conditon here!! - ;; should this update be immediately after the task-get-action call above? - (if (> (current-seconds) next-touch) - (begin - (tasks:monitors-update mdb) - (loop (+ count 1)(+ (current-seconds) 240))) - (loop (+ count 1) next-touch))))))) - -(define (tasks:process-queue db mdb) - (let* ((task (tasks:snag-a-task mdb)) - (action (if task (tasks:task-get-action task) #f))) - (if action (print "tasks:process-queue task: " task)) - (if action - (case (string->symbol action) - ((run) (tasks:start-run db mdb task)) - ((remove) (tasks:remove-runs db mdb task)) - ((lock) (tasks:lock-runs db mdb task)) - ;; ((monitor) (tasks:start-monitor db task)) - ((rollup) (tasks:rollup-runs db mdb task)) - ((updatemeta)(tasks:update-meta db mdb task)) - ((kill) (tasks:kill-monitors db mdb task)))))) +;;====================================================================== +;; M O N I T O R S +;;====================================================================== + +(define (tasks:remove-monitor-record mdb) + (sqlite3:execute mdb "DELETE FROM monitors WHERE pid=? AND hostname=?;" + (current-process-id) + (get-host-name))) (define (tasks:get-monitors mdb) (let ((res '())) (sqlite3:for-each-row (lambda (a . rem) @@ -590,27 +446,10 @@ mdb "SELECT id,pid,strftime('%m/%d/%Y %H:%M',datetime(start_time,'unixepoch'),'localtime'),strftime('%m/%d/%Y %H:%M:%S',datetime(last_update,'unixepoch'),'localtime'),hostname,username FROM monitors ORDER BY last_update ASC;") (reverse res) )) -(define (tasks:tasks->text tasks) - (let ((fmtstr "~10a~10a~10a~12a~20a~12a~12a~10a")) - (conc (format #f fmtstr "id" "action" "owner" "state" "target" "runname" "testpatts" "params") "\n" - (string-intersperse - (map (lambda (task) - (format #f fmtstr - (tasks:task-get-id task) - (tasks:task-get-action task) - (tasks:task-get-owner task) - (tasks:task-get-state task) - (tasks:task-get-target task) - (tasks:task-get-name task) - (tasks:task-get-test task) - ;; (tasks:task-get-item task) - (tasks:task-get-params task))) - tasks) "\n")))) - (define (tasks:monitors->text-table monitors) (let ((fmtstr "~4a~8a~20a~20a~10a~10a")) (conc (format #f fmtstr "id" "pid" "start time" "last update" "hostname" "user") "\n" (string-intersperse (map (lambda (monitor) @@ -637,65 +476,259 @@ (set! deadlist (cons id deadlist))) mdb "SELECT id,pid,hostname,last_update,strftime('%s','now')-last_update AS delta FROM monitors WHERE delta > 700;") (sqlite3:execute mdb (conc "DELETE FROM monitors WHERE id IN ('" (string-intersperse (map conc deadlist) "','") "');"))) ) - -(define (tasks:remove-monitor-record mdb) - (sqlite3:execute mdb "DELETE FROM monitors WHERE pid=? AND hostname=?;" - (current-process-id) - (get-host-name))) - -(define (tasks:set-state mdb task-id state) - (sqlite3:execute mdb "UPDATE tasks_queue SET state=? WHERE id=?;" - state - task-id)) +(define (tasks:register-monitor db mdb) + (let* ((pid (current-process-id)) + (hostname (get-host-name)) + (userinfo (user-information (current-user-id))) + (username (car userinfo))) + (print "Register monitor, pid: " pid ", hostname: " hostname ", username: " username) + (sqlite3:execute mdb "INSERT INTO monitors (pid,start_time,last_update,hostname,username) VALUES (?,strftime('%s','now'),strftime('%s','now'),?,?);" + pid hostname username))) + +(define (tasks:get-num-alive-monitors mdb) + (let ((res 0)) + (sqlite3:for-each-row + (lambda (count) + (set! res count)) + mdb + "SELECT count(id) FROM monitors WHERE last_update < (strftime('%s','now') - 300) AND username=?;" + (car (user-information (current-user-id)))) + res)) + +;; +(define (tasks:start-monitor db mdb) + (if (> (tasks:get-num-alive-monitors mdb) 2) ;; have two running, no need for more + (debug:print-info 1 "Not starting monitor, already have more than two running") + (let* ((megatestdb (conc *toppath* "/megatest.db")) + (monitordbf (conc (configf:lookup *configdat* "setup" "linktree") "/.db/monitor.db")) + (last-db-update 0)) ;; (file-modification-time megatestdb))) + (task:register-monitor mdb) + (let loop ((count 0) + (next-touch 0)) ;; next-touch is the time where we need to update last_update + ;; if the db has been modified we'd best look at the task queue + (let ((modtime (file-modification-time megatestdbpath ))) + (if (> modtime last-db-update) + (tasks:process-queue db mdb last-db-update megatestdb next-touch)) + ;; WARNING: Possible race conditon here!! + ;; should this update be immediately after the task-get-action call above? + (if (> (current-seconds) next-touch) + (begin + (tasks:monitors-update mdb) + (loop (+ count 1)(+ (current-seconds) 240))) + (loop (+ count 1) next-touch))))))) + +;;====================================================================== +;; T A S K S Q U E U E +;; +;; NOTE:: These operate on task_queue which is in main.db +;; +;;====================================================================== + +;; NOTE: It might be good to add one more layer of checking to ensure +;; that no task gets run in parallel. + + + +;; register a task +(define (tasks:add dbstruct action owner target runname testpatt params) + (db:with-db + dbstruct #f #t + (lambda (db) + (sqlite3:execute db "INSERT INTO tasks_queue (action,owner,state,target,name,testpatt,params,creation_time,execution_time) + VALUES (?,?,'new',?,?,?,?,strftime('%s','now'),0);" + action + owner + target + runname + testpatt + (if params params ""))))) + +(define (keys:key-vals-hash->target keys key-params) + (let ((tmp (hash-table-ref/default key-params (vector-ref (car keys) 0) ""))) + (if (> (length keys) 1) + (for-each (lambda (key) + (set! tmp (conc tmp "/" (hash-table-ref/default key-params (vector-ref key 0) "")))) + (cdr keys))) + tmp)) + +;; for use from the gui, not ported +;; +;; (define (tasks:add-from-params mdb action keys key-params var-params) +;; (let ((target (keys:key-vals-hash->target keys key-params)) +;; (owner (car (user-information (current-user-id)))) +;; (runname (hash-table-ref/default var-params "runname" #f)) +;; (testpatts (hash-table-ref/default var-params "testpatts" "%")) +;; (params (hash-table-ref/default var-params "params" ""))) +;; (tasks:add mdb action owner target runname testpatts params))) + +;; return one task from those who are 'new' OR 'waiting' AND more than 10sec old +;; +(define (tasks:snag-a-task dbstruct) + (let ((res #f) + (keytxt (conc (current-process-id) "-" (get-host-name) "-" (car (user-information (current-user-id)))))) + (db:with-db + dbstruct #f #t + (lambda (db) + ;; first randomly set a new to pid-hostname-hostname + (sqlite3:execute + db + "UPDATE tasks_queue SET keylock=? WHERE id IN + (SELECT id FROM tasks_queue + WHERE state='new' OR + (state='waiting' AND (strftime('%s','now')-execution_time) > 10) OR + state='reset' + ORDER BY RANDOM() LIMIT 1);" keytxt) + + (sqlite3:for-each-row + (lambda (id . rem) + (set! res (apply vector id rem))) + db + "SELECT id,action,owner,state,target,name,test,item,params,creation_time,execution_time FROM tasks_queue WHERE keylock=? ORDER BY execution_time ASC LIMIT 1;" keytxt) + (if res ;; yep, have work to be done + (begin + (sqlite3:execute db "UPDATE tasks_queue SET state='inprogress',execution_time=strftime('%s','now') WHERE id=?;" + (tasks:task-get-id res)) + res) + #f))))) + +(define (tasks:reset-stuck-tasks dbstruct) + (let ((res '())) + (db:with-db + dbstruct #f #t + (lambda (db) + (sqlite3:for-each-row + (lambda (id delta) + (set! res (cons id res))) + db + "SELECT id,strftime('%s','now')-execution_time AS delta FROM tasks_queue WHERE state='inprogress' AND delta>700 ORDER BY delta DESC LIMIT 2;") + (sqlite3:execute + db + (conc "UPDATE tasks_queue SET state='reset' WHERE id IN ('" (string-intersperse (map conc res) "','") "');") + ))))) + +;; return all tasks in the tasks_queue table +;; +(define (tasks:get-tasks dbstruct types states) + (let ((res '())) + (db:with-db + dbstruct #f #f + (lambda (db) + (sqlite3:for-each-row + (lambda (id . rem) + (set! res (cons (apply vector id rem) res))) + db + (conc "SELECT id,action,owner,state,target,name,test,item,params,creation_time,execution_time + FROM tasks_queue " + ;; WHERE + ;; state IN " statesstr " AND + ;; action IN " actionsstr + " ORDER BY creation_time DESC;")) + res)))) + +;; remove tasks given by a string of numbers comma separated +(define (tasks:remove-queue-entries dbstruct task-ids) + (db:with-db + dbstruct #f #t + (lambda (db) + (sqlite3:execute db (conc "DELETE FROM tasks_queue WHERE id IN (" task-ids ");"))))) + +(define (tasks:process-queue dbstruct) + (let* ((task (tasks:snag-a-task dbstruct)) + (action (if task (tasks:task-get-action task) #f))) + (if action (print "tasks:process-queue task: " task)) + (if action + (case (string->symbol action) + ((run) (tasks:start-run dbstruct task)) + ((remove) (tasks:remove-runs dbstruct task)) + ((lock) (tasks:lock-runs dbstruct task)) + ;; ((monitor) (tasks:start-monitor db task)) + ((rollup) (tasks:rollup-runs dbstruct task)) + ((updatemeta)(tasks:update-meta dbstruct task)) + ((kill) (tasks:kill-monitors dbstruct task)))))) + +(define (tasks:tasks->text tasks) + (let ((fmtstr "~10a~10a~10a~12a~20a~12a~12a~10a")) + (conc (format #f fmtstr "id" "action" "owner" "state" "target" "runname" "testpatts" "params") "\n" + (string-intersperse + (map (lambda (task) + (format #f fmtstr + (tasks:task-get-id task) + (tasks:task-get-action task) + (tasks:task-get-owner task) + (tasks:task-get-state task) + (tasks:task-get-target task) + (tasks:task-get-name task) + (tasks:task-get-test task) + ;; (tasks:task-get-item task) + (tasks:task-get-params task))) + tasks) "\n")))) + +(define (tasks:set-state dbstruct task-id state) + (db:with-db + dbstruct #f #t + (lambda (db) + (sqlite3:execute db "UPDATE tasks_queue SET state=? WHERE id=?;" + state + task-id)))) ;;====================================================================== ;; Access using task key (stored in params; (hash-table->alist flags) hostname pid ;;====================================================================== -(define (tasks:param-key->id mdb task-params) - (handle-exceptions - exn - #f - (sqlite3:first-result mdb "SELECT id FROM tasks_queue WHERE params LIKE ?;" task-params))) - -(define (tasks:set-state-given-param-key mdb param-key new-state) - (sqlite3:execute mdb "UPDATE tasks_queue SET state=? WHERE params LIKE ?;" new-state param-key)) - -(define (tasks:get-records-given-param-key mdb param-key state-patt action-patt test-patt) - (handle-exceptions - exn - '() - (sqlite3:first-row mdb "SELECT id,action,owner,state,target,name,testpatt,keylock,params WHERE - params LIKE ? AND state LIKE ? AND action LIKE ? AND testpatt LIKE ?;" - param-key state-patt action-patt test-patt))) - - -;;====================================================================== -;; Rogue items, no place to put these yet -;;====================================================================== - -(define (tasks:find-task-queue-records mdb target run-name test-patt state-patt action-patt) +(define (tasks:param-key->id dbstruct task-params) + (db:with-db + dbstruct #f #f + (lambda (db) + (handle-exceptions + exn + #f + (sqlite3:first-result db "SELECT id FROM tasks_queue WHERE params LIKE ?;" + task-params))))) + +(define (tasks:set-state-given-param-key dbstruct param-key new-state) + (db:with-db + dbstruct #f #t + (lambda (db) + (sqlite3:execute db "UPDATE tasks_queue SET state=? WHERE params LIKE ?;" new-state param-key)))) + +(define (tasks:get-records-given-param-key dbstruct param-key state-patt action-patt test-patt) + (db:with-db + dbstruct #f #f + (lambda (db) + (handle-exceptions + exn + '() + (sqlite3:first-row db "SELECT id,action,owner,state,target,name,testpatt,keylock,params WHERE + params LIKE ? AND state LIKE ? AND action LIKE ? AND testpatt LIKE ?;" + param-key state-patt action-patt test-patt))))) + + +(define (tasks:find-task-queue-records dbstruct target run-name test-patt state-patt action-patt) ;; (handle-exceptions ;; exn ;; '() ;; (sqlite3:first-row - (let ((res '())) + (let ((db (db:delay-if-busy (db:get-db dbstruct #f))) + (res '())) (sqlite3:for-each-row (lambda (a . b) (set! res (cons (cons a b) res))) - mdb "SELECT id,action,owner,state,target,name,testpatt,keylock,params FROM tasks_queue - WHERE - target = ? AND name = ? AND state LIKE ? AND action LIKE ? AND testpatt LIKE ?;" + db "SELECT id,action,owner,state,target,name,testpatt,keylock,params FROM tasks_queue + WHERE + target = ? AND name = ? AND state LIKE ? AND action LIKE ? AND testpatt LIKE ?;" target run-name state-patt action-patt test-patt) res)) ;; ) - -(define (tasks:kill-runner mdb target run-name) - (let ((records (tasks:find-task-queue-records mdb target run-name "%" "running" "run-tests")) +;; kill any runner processes (i.e. processes handling -runtests) that match target/runname +;; +;; do a remote call to get the task queue info but do the killing as self here. +;; +(define (tasks:kill-runner target run-name) + (let ((records (rmt:tasks-find-task-queue-records target run-name "%" "running" "run-tests")) (hostpid-rx (regexp "\\s+(\\w+)\\s+(\\d+)$"))) ;; host pid is at end of param string (if (null? records) (debug:print 0 "No run launching processes found for " target " / " run-name) (debug:print 0 "Found " (length records) " run(s) to kill.")) (for-each @@ -725,45 +758,38 @@ (setenv "TARGETHOST_LOGF" "server-kills.log") (system (conc "nbfake kill " pid)) (if old-targethost (setenv "TARGETHOST" old-targethost)) (unsetenv "TARGETHOST") (unsetenv "TARGETHOST_LOGF")))) - (debug:print 0 "ERROR: no record or improper record for " target "/" run-name " in tasks_queue in monitor.db")))) + (debug:print 0 "ERROR: no record or improper record for " target "/" run-name " in tasks_queue in main.db")))) records))) - -;;====================================================================== -;; The routines to process tasks -;;====================================================================== - -;; NOTE: It might be good to add one more layer of checking to ensure -;; that no task gets run in parallel. - -(define (tasks:start-run db mdb task) - (let ((flags (make-hash-table))) - (hash-table-set! flags "-rerun" "NOT_STARTED") - (if (not (string=? (tasks:task-get-params task) "")) - (hash-table-set! flags "-setvars" (tasks:task-get-params task))) - (print "Starting run " task) - ;; sillyness, just call the damn routine with the task vector and be done with it. FIXME SOMEDAY - (runs:run-tests db - (tasks:task-get-target task) - (tasks:task-get-name task) - (tasks:task-get-test task) - (tasks:task-get-item task) - (tasks:task-get-owner task) - flags) - (tasks:set-state mdb (tasks:task-get-id task) "waiting"))) - -(define (tasks:rollup-runs db mdb task) - (let* ((flags (make-hash-table)) - (keys (db:get-keys db)) - (keyvals (keys:target-keyval keys (tasks:task-get-target task)))) - ;; (hash-table-set! flags "-rerun" "NOT_STARTED") - (print "Starting rollup " task) - ;; sillyness, just call the damn routine with the task vector and be done with it. FIXME SOMEDAY - (runs:rollup-run db - keys - keyvals - (tasks:task-get-name task) - (tasks:task-get-owner task)) - (tasks:set-state mdb (tasks:task-get-id task) "waiting"))) +;; (define (tasks:start-run dbstruct mdb task) +;; (let ((flags (make-hash-table))) +;; (hash-table-set! flags "-rerun" "NOT_STARTED") +;; (if (not (string=? (tasks:task-get-params task) "")) +;; (hash-table-set! flags "-setvars" (tasks:task-get-params task))) +;; (print "Starting run " task) +;; ;; sillyness, just call the damn routine with the task vector and be done with it. FIXME SOMEDAY +;; (runs:run-tests db +;; (tasks:task-get-target task) +;; (tasks:task-get-name task) +;; (tasks:task-get-test task) +;; (tasks:task-get-item task) +;; (tasks:task-get-owner task) +;; flags) +;; (tasks:set-state mdb (tasks:task-get-id task) "waiting"))) +;; +;; (define (tasks:rollup-runs db mdb task) +;; (let* ((flags (make-hash-table)) +;; (keys (db:get-keys db)) +;; (keyvals (keys:target-keyval keys (tasks:task-get-target task)))) +;; ;; (hash-table-set! flags "-rerun" "NOT_STARTED") +;; (print "Starting rollup " task) +;; ;; sillyness, just call the damn routine with the task vector and be done with it. FIXME SOMEDAY +;; (runs:rollup-run db +;; keys +;; keyvals +;; (tasks:task-get-name task) +;; (tasks:task-get-owner task)) +;; (tasks:set-state mdb (tasks:task-get-id task) "waiting"))) + ADDED testnanomsg/basic-req-rep.scm Index: testnanomsg/basic-req-rep.scm ================================================================== --- /dev/null +++ testnanomsg/basic-req-rep.scm @@ -0,0 +1,3 @@ +(use nanomsg srfi-18 sqlite3 numbers) + +(define resp (nn-socket 'rep)) ADDED testnanomsg/mockupclient.scm Index: testnanomsg/mockupclient.scm ================================================================== --- /dev/null +++ testnanomsg/mockupclient.scm @@ -0,0 +1,42 @@ +(use zmq posix numbers) + +(define cname "Bob") +(define runtime 10) +(let ((args (argv))) + (if (< (length args) 3) + (begin + (print "Usage: mockupclient clientname runtime") + (exit)) + (begin + (set! cname (cadr args)) + (set! runtime (string->number (caddr args)))))) + +;; (define start-delay (/ (random 100) 9)) +;; (define runtime (+ 1 (/ (random 200) 2))) + +(print "Starting client " cname " with runtime " runtime) + +(include "mockupclientlib.scm") + +(set! endtime (+ (current-seconds) runtime)) + +;; first ping the server to ensure we have a connection +(if (server-ping cname 5) + (print "SUCCESS: Client " cname " connected to server") + (begin + (print "ERROR: Client " cname " failed ping of server, exiting") + (exit))) + +(let loop () + (let ((x (random 15)) + (varname (list-ref (list "hello" "goodbye" "saluton" "kiaorana")(random 4)))) + (case x + ;; ((1)(dbaccess cname 'sync "nodat" #f)) + ((2 3 4 5)(dbaccess cname 'set varname (random 999))) + ((6 7 8 9 10)(print cname ": Get \"" varname "\" " (dbaccess cname 'get varname #f))) + (else + (thread-sleep! 0.011))) + (if (< (current-seconds) endtime) + (loop)))) + +(print "Client " cname " all done!!") ADDED testnanomsg/mockupclientlib.scm Index: testnanomsg/mockupclientlib.scm ================================================================== --- /dev/null +++ testnanomsg/mockupclientlib.scm @@ -0,0 +1,58 @@ +(define reqs (nn-socket 'req)) + +(connect-socket reqs "tcp://localhost:6563") + +(thread-sleep! 0.2) + +(define (server-ping cname timeout) + (let ((msg (conc cname ":ping:" timeout)) + (maxtime (+ (current-seconds) timeout))) + (print "pinging server from " cname " with timeout " timeout) + (let loop ((res #f)) + (if (< maxtime (current-seconds)) + #f ;; failed to ping + (if (equal? res "Got ping") + #t + (begin + (print "Ping received from server " res) + (send-message push msg) + (thread-sleep! 0.1) + (loop (receive-message sub non-blocking: #t)))))))) + +(define (dbaccess cname cmd var val #!key (numtries 20)) + (let* ((msg (conc cname ":" cmd ":" (if val (conc var " " val) var))) + (res #f) + (mtx1 (make-mutex)) + (do-access (lambda () + (let ((tmpres #f)) + (print "Sending msg: " msg) + (send-message push msg) + (print "Message " msg " sent") + (print "Client " cname " waiting for response to " msg) + (print "Client " cname " received address " (receive-message* sub)) + (set! tmpres (receive-message* sub)) + (mutex-lock! mtx1) + (set! res tmpres) + (mutex-unlock! mtx1)))) + (th1 (make-thread do-access "do access")) + (th2 (make-thread (lambda () + (let ((result #f)) + (mutex-lock! mtx1) + (set! result res) + (mutex-unlock! mtx1) + (thread-sleep! 5) + (if (not result) + (if (> numtries 0) + (begin + (print "WARNING: access timed out for " cname ", trying again. Trys remaining=" numtries) + (dbaccess cname cmd var val numtries: (- numtries 1))) + (begin + (print "ERROR: dbaccess timed out. Exiting") + (exit))))) + "timeout thread")))) + (thread-start! th1) + (thread-start! th2) + (thread-join! th1) + (if res (print "SUCCESS: received " res " with " numtries " remaining possible attempts")) + res)) + ADDED testnanomsg/mockupserver.scm Index: testnanomsg/mockupserver.scm ================================================================== --- /dev/null +++ testnanomsg/mockupserver.scm @@ -0,0 +1,146 @@ +;; pub/sub with envelope address +;; Note that if you don't insert a sleep, the server will crash with SIGPIPE as soon +;; as a client disconnects. Also a remaining client may receive tons of +;; messages afterward. + +(use nanomsg srfi-18 sqlite3 numbers) + +(define resp (nn-socket 'rep)) +(define cname "server") +(define total-db-accesses 0) +(define start-time (current-seconds)) + +(nn-bind resp "tcp://*:6563") + +(thread-sleep! 0.2) + +(define (open-db) + (let* ((dbpath "mockup.db") + (dbexists (file-exists? dbpath)) + (db (open-database dbpath)) ;; (never-give-up-open-db dbpath)) + (handler (make-busy-timeout 10))) + (set-busy-handler! db handler) + (if (not dbexists) + (for-each + (lambda (stmt) + (execute db stmt)) + (list + "PRAGMA SYNCHRONOUS=0;" + "CREATE TABLE clients (id INTEGER PRIMARY KEY,name TEXT,num_accesses INTEGER DEFAULT 0);" + "CREATE TABLE vars (var TEXT,val TEXT,CONSTRAINT vars_constraint UNIQUE (var));"))) + db)) + +(define cid-cache (make-hash-table)) + +(define (get-client-id db cname) + (let ((cid (hash-table-ref/default cid-cache cname #f))) + (if cid + cid + (begin + (execute db "INSERT OR REPLACE INTO clients (name) VALUES(?);" cname) + (for-each-row + (lambda (id) + (set! cid id)) + db + "SELECT id FROM clients WHERE name=?;" cname) + (hash-table-set! cid-cache cname cid) + (set! total-db-accesses (+ total-db-accesses 2)) + cid)))) + +(define (count-client db cname) + (let ((cid (get-client-id db cname))) + (execute db "UPDATE clients SET num_accesses=num_accesses+1 WHERE id=?;" cid) + (set! total-db-accesses (+ total-db-accesses 1)) + )) + +(define db (open-db)) +;; (define queuelst '()) +;; (define mx1 (make-mutex)) + +(define max-queue-len 0) + +(define (process-queue queuelst) + (let ((queuelen (length queuelst))) + (if (> queuelen max-queue-len) + (set! max-queue-len queuelen)) + (for-each + (lambda (item) + (let ((cname (vector-ref item 1)) + (clcmd (vector-ref item 2)) + (cdata (vector-ref item 3))) + (send-message pub cname send-more: #t) + (send-message pub (case clcmd + ((sync) + (conc queuelen)) + ((set) + (set! total-db-accesses (+ total-db-accesses 1)) + (apply execute db "INSERT OR REPLACE INTO vars (var,val) VALUES (?,?);" (string-split cdata)) + "ok") + ((get) + (set! total-db-accesses (+ total-db-accesses 1)) + (let ((res "noval")) + (for-each-row + (lambda (val) + (set! res val)) + db + "SELECT val FROM vars WHERE var=?;" cdata) + res)) + (else (conc "unk cmd: " clcmd)))))) + queuelst))) + +;; SERVER THREAD +(define th1 (make-thread + (lambda () + (let ((last-run 0)) ;; current-seconds when run last + (let loop ((queuelst '())) + (let* ((indat (receive-message* pull)) + (parts (string-split indat ":")) + (cname (car parts)) ;; client name + (clcmd (string->symbol (cadr parts))) ;; client cmd + (cdata (caddr parts)) ;; client data + (svect (vector (current-seconds) cname clcmd cdata))) ;; record for the queue + ;; (print "Server received message: " indat) + (count-client db cname) + (case clcmd + ((ping) + (print "Got ping from " cname) + (send-message pub cname send-more: #t) + (send-message pub "Got ping") + (loop queuelst)) + ((sync) ;; just process the queue + (print "Got sync from " cname) + (process-queue (cons svect queuelst)) + (loop '())) + ((get) + (process-queue (cons svect queuelst)) + (loop '())) + (else + (loop (cons svect queuelst)))))))) + "server thread")) + +(include "mockupclientlib.scm") + +;; SYNC THREAD +;; send a sync to the pull port +(define th2 (make-thread + (lambda () + (let ((last-action-time (current-seconds))) + (let loop () + (thread-sleep! 5) + (let ((queuelen (string->number (dbaccess "server" 'sync "nada" #f))) + (last-action-delta #f)) + (if (> queuelen 1)(set! last-action-time (current-seconds))) + (set! last-action-delta (- (current-seconds) last-action-time)) + (print "Server: Got queuelen=" queuelen ", last-action-delta=" last-action-delta) + (if (< last-action-delta 60) + (loop) + (print "Server exiting, 25 seconds since last access")))))) + "sync thread")) + +(thread-start! th1) +(thread-start! th2) +(thread-join! th2) + +(let* ((run-time (- (current-seconds) start-time)) + (queries/second (/ total-db-accesses run-time))) + (print "Server exited! Total db accesses=" total-db-accesses " in " run-time " seconds for " queries/second " queries/second with max queue length of: " max-queue-len)) ADDED testnanomsg/pipeline.scm Index: testnanomsg/pipeline.scm ================================================================== --- /dev/null +++ testnanomsg/pipeline.scm @@ -0,0 +1,25 @@ +;; watch nanomsg's pipeline load-balancer in action. +(use nanomsg) + +(define push (nn-socket 'push)) +(define pull1 (nn-socket 'pull)) +(define pull2 (nn-socket 'pull)) + +(nn-bind push "inproc://test") +(nn-connect pull1 "inproc://test") +(nn-connect pull2 "inproc://test") + +(nn-send push "a") +(nn-send push "b") +(nn-send push "c") +(nn-send push "d") + +(define ((th sock)) + (print (current-thread) ": " (nn-recv sock)) + (print (current-thread) ": " (nn-recv sock)) + (print (current-thread) " is done")) + +(thread-start! (th pull1)) +(thread-start! (th pull2)) + +(thread-sleep! 1) ADDED testnanomsg/req-rep-client.scm Index: testnanomsg/req-rep-client.scm ================================================================== --- /dev/null +++ testnanomsg/req-rep-client.scm @@ -0,0 +1,30 @@ +;; watch nanomsg's pipeline load-balancer in action. +(use nanomsg) + +(define req (nn-socket 'req)) + +(nn-connect req "tcp://localhost:22022") + +;; (with-output-to-string (lambda ()(serialize obj))) +(define (client-send-receive soc msg) + (nn-send soc msg) + (nn-recv soc)) + +(define ((talk-to-server soc)) + (let loop ((cnt 20)) + (let ((name (list-ref '("Matt" "Tom" "Bob" "Jill" "James" "Jane")(random 6)))) + (print "Sending " name) + (print (client-send-receive req name)) + (if (> cnt 0)(loop (- cnt 1))))) + (print (client-send-receive req "quit")) + (nn-close req) + (exit)) + +;; (thread-start! (lambda () +;; (thread-sleep! 20) +;; (print "Give up on waiting for the server") +;; (nn-close req) +;; (exit))) + +(thread-join! (thread-start! (talk-to-server req))) + ADDED testnanomsg/req-rep-server.scm Index: testnanomsg/req-rep-server.scm ================================================================== --- /dev/null +++ testnanomsg/req-rep-server.scm @@ -0,0 +1,90 @@ +;; watch nanomsg's pipeline load-balancer in action. +(use nanomsg) + +;; (use trace) +;; (trace nn-bind nn-socket nn-assert nn-recv nn-send thread-terminate! nn-close ) + +(define port 22022) +(define host "127.0.0.1") + +(define rep (nn-socket 'rep)) + +(print "connecting, got: " (nn-bind rep (conc "tcp://" "*" ":" port))) + +(define (server soc) + (print "server starting") + (let loop ((msg-in (nn-recv soc))) + (print "server received: " msg-in) + (cond + ((equal? msg-in "quit") + (nn-send soc "Ok, quitting")) + ((and (>= (string-length msg-in) 4) + (equal? (substring msg-in 0 4) "ping")) + (nn-send soc (conc (current-process-id))) + (loop (nn-recv soc))) + ;;((and (>= (string-length msg-in) + (else + (let ((this-task (random 15))) + (thread-sleep! this-task) + (nn-send soc (conc "hello " msg-in " this task took " this-task " seconds to complete")) + (loop (nn-recv soc))))))) + +(define (ping-self host port #!key (return-socket #t)) + ;; send a random number along with pid and check that we get it back + (let* ((req (nn-socket 'req)) + (key "ping") + (success #f) + (keepwaiting #t) + (ping (make-thread + (lambda () + (print "ping: sending string \"" key "\", expecting " (current-process-id)) + (nn-send req key) + (let ((result (nn-recv req))) + (if (equal? (conc (current-process-id)) result) + (begin + (print "ping, success: received \"" result "\"") + (set! success #t)) + (begin + (print "ping, failed: received key \"" result "\"") + (set! keepwaiting #f) + (set! success #f))))) + "ping")) + (timeout (make-thread (lambda () + (let loop ((count 0)) + (thread-sleep! 1) + (print "still waiting after count seconds...") + (if (and keepwaiting (< count 10)) + (loop (+ count 1)))) + (if keepwaiting + (begin + (print "timeout waiting for ping") + (thread-terminate! ping)))) + "timeout"))) + (nn-connect req (conc "tcp://" host ":" port)) + (handle-exceptions + exn + (begin + (print-call-chain) + (print 0 " message: " ((condition-property-accessor 'exn 'message) exn)) + (print "exn=" (condition->list exn)) + (print "ping failed to connect to " host ":" port)) + (thread-start! timeout) + (thread-start! ping) + (thread-join! ping) + (if success (thread-terminate! timeout))) + (if return-socket + (if success req #f) + (begin + (nn-close req) + success)))) + +(let ((server-thread (make-thread (lambda ()(server rep)) "server"))) + (thread-start! server-thread) + ;; (thread-sleep! 1) + (if (ping-self host port) + (begin + (thread-join! server-thread) + (nn-close rep)) + (print "ping failed"))) + +(exit) ADDED testnanomsg/req-rep.scm Index: testnanomsg/req-rep.scm ================================================================== --- /dev/null +++ testnanomsg/req-rep.scm @@ -0,0 +1,30 @@ +;; watch nanomsg's pipeline load-balancer in action. +(use nanomsg) + +(define req (nn-socket 'req)) +(define rep (nn-socket 'rep)) + +(nn-bind rep "inproc://test") +(nn-connect req "inproc://test") + +(define (client-send-receive soc msg) + (nn-send soc msg) + (nn-recv soc)) + +(define ((server soc)) + (let loop ((msg-in (nn-recv soc))) + (if (not (equal? msg-in "quit")) + (begin + (nn-send soc (conc "hello " msg-in)) + (loop (nn-recv soc)))))) + +(thread-start! (server rep)) + +(print (client-send-receive req "Matt")) +(print (client-send-receive req "Tom")) + +;; (client-send-receive req "quit") + +(nn-close req) +(nn-close rep) +(exit) ADDED testrpc/client.scm Index: testrpc/client.scm ================================================================== --- /dev/null +++ testrpc/client.scm @@ -0,0 +1,8 @@ +;;;; client.scm +(use rpc posix) + +(define call (rpc:procedure 'foo "localhost")) + +(do ((i 10 (sub1 i))) + ((zero? i)) + (print "-> " (call (random 100)))) ADDED testrpc/server.scm Index: testrpc/server.scm ================================================================== --- /dev/null +++ testrpc/server.scm @@ -0,0 +1,15 @@ +;;;; server.scm +(use rpc) + +(rpc:publish-procedure! + 'foo + (lambda (x) + (print "foo: " x) + #f)) + +(rpc:publish-procedure! + 'fini + (lambda () (print "fini") (thread-start! (lambda () (thread-sleep! 3) (print "terminate") (exit))) #f)) + +((rpc:make-server (tcp-listen (rpc:default-server-port))) #t) + Index: tests/Makefile ================================================================== --- tests/Makefile +++ tests/Makefile @@ -34,11 +34,11 @@ cd ..;make -j && make install cd fullrun;$(MEGATEST) -stop-server 0 repl : cd ..;make -j && make install - cd fullrun;$(MEGATEST) -repl + cd fullrun;$(MEGATEST) -:b -repl test0 : cleanprep cd simplerun ; $(MEGATEST) -server - -debug $(DEBUG) test1 : cleanprep @@ -69,10 +69,11 @@ test4a : cleanprep cd fullrun;time $(MEGATEST) -debug $(DEBUG) -preclean -runtests all_toplevel -reqtarg ubuntu/nfs/none :runname $(RUNNAME)_b -m "This is a comment specific to a run" -v $(LOGGING) # NOTE: Only one instance can be a server test5 : cleanprep + rm -f fullrun/a*.log fullrun/logs/* @echo "WARNING: No longer running fullprep, test converage may be lessened" cd fullrun;sleep 0;$(MEGATEST) -preclean -runtests % -target $(TARGET) :runname $(RUNNAME)_aa -debug $(DEBUG) $(LOGGING) > aa.log 2> aa.log & cd fullrun;sleep 0;$(MEGATEST) -preclean -runtests % -target $(TARGET) :runname $(RUNNAME)_ab -debug $(DEBUG) $(LOGGING) > ab.log 2> ab.log & cd fullrun;sleep 5;$(MEGATEST) -preclean -runtests % -target $(TARGET) :runname $(RUNNAME)_ac -debug $(DEBUG) $(LOGGING) > ac.log 2> ac.log & cd fullrun;sleep 8;$(MEGATEST) -preclean -runtests % -target $(TARGET) :runname $(RUNNAME)_ad -debug $(DEBUG) $(LOGGING) > ad.log 2> ad.log & @@ -116,27 +117,27 @@ test9 : minsetup test9a test9b test9c test9d test9e test9a : @echo Run super-simple mintest e, no waitons. cd mintest;$(DASHBOARD)& - cd mintest;$(MEGATEST) -preclean -runtests e -target $(VER) :runname `date +%H.%M.%S` -debug $(DEBUG) + cd mintest;$(MEGATEST) -preclean -runtests e -target $(VER) -runname $(shell date +%H.%M.%S) -debug $(DEBUG) test9b : @echo Run simple mintest d with one waiton c - cd mintest;$(MEGATEST) -preclean -runtests d -target $(VER) :runname `date +%H.%M.%S` -debug $(DEBUG) + cd mintest;$(MEGATEST) -preclean -runtests d -target $(VER) -runname `date +%H.%M.%S` -debug $(DEBUG) test9c : @echo Run mintest a with full waiton chain a -> b -> c -> d -> e - cd mintest;$(MEGATEST) -preclean -runtests a -target $(VER) :runname `date +%H.%M.%S` -debug $(DEBUG) + cd mintest;$(MEGATEST) -preclean -runtests a -target $(VER) -runname `date +%H.%M.%S` -debug $(DEBUG) test9d : @echo Run an itemized test with no items - cd mintest;$(MEGATEST) -preclean -runtests g -target $(VER) :runname `date +%H.%M.%S` -debug $(DEBUG) + cd mintest;$(MEGATEST) -preclean -runtests g -target $(VER) -runname `date +%H.%M.%S` -debug $(DEBUG) test9e : @echo Run mintest a1 with full waiton chain with d1fail: a1 -> b1 -> c1 -> d1fail -> e1 - cd mintest;$(MEGATEST) -preclean -runtests a1 -target $(VER) :runname `date +%H.%M.%S` -debug $(DEBUG) + cd mintest;$(MEGATEST) -preclean -runtests a1 -target $(VER) -runname `date +%H.%M.%S` -debug $(DEBUG) test10 : @echo Run a bunch of different targets simultaneously (cd fullrun;$(MEGATEST) -server - ;sleep 2)& for targ in mint/btrfs/mintdir sunos/sshfs/loc; do \ Index: tests/fullrun/megatest.config ================================================================== --- tests/fullrun/megatest.config +++ tests/fullrun/megatest.config @@ -31,13 +31,10 @@ # yes, anything else is no run-wait yes -# Use http instead of direct filesystem access -# transport http -# transport fs # If set to "default" the old code is used. Otherwise defaults to 200 or uses # numeric value given. # runqueue 20 @@ -124,27 +121,33 @@ MAX_ALLOWED_LOAD 200 # XTERM [system xterm] # RUNDEAD [system exit 56] [server] + +# Use http instead of direct filesystem access +transport http +# transport fs +# transport nmsg + synchronous 0 # If the server can't be started on this port it will try the next port until # it succeeds -port 8080 +port 9080 # This server will keep running this number of hours after last access. # Three minutes is 0.05 hours # timeout 0.025 -timeout 0.1 +timeout 0.061 # Server is required - slower but more resistant to Sqlite issues. -# required yes +required yes # Start server when average query takes longer than this +# server-query-threshold 55500 server-query-threshold 100 -# 55500 # daemonize yes # hostname #{scheme (get-host-name)} ## disks are: @@ -166,5 +169,32 @@ sqlite3 6 blockz 10 # to your jobgroups where N is the number of parallel runs you are likely to see # +#====================================================================== +# Machine flavors +# +# These specify lists of hosts or scripts to use or call for various +# flavors of task. +# +#====================================================================== + +[flavors] + +plain hosts: xena, phoebe +strong command: NBFAKE_HOST=zeus nbfake +arm hosts: cubian + +[archive] + +# use machines of these flavor +useflavors plain +targsize 2G + +[archive-disks] + +# Archives will be organised under these paths like this: +# / +# Within the archive the data is structured like this: +# /// +disk0 /mfs/archives Index: tests/mintest/megatest.config ================================================================== --- tests/mintest/megatest.config +++ tests/mintest/megatest.config @@ -1,11 +1,11 @@ [fields] X TEXT [setup] max_concurrent_jobs 50 -linktree #{getenv PWD}/linktree +linktree #{getenv MT_RUN_AREA_HOME}/linktree transport http [server] port 8090 Index: tests/unittests/basicserver.scm ================================================================== --- tests/unittests/basicserver.scm +++ tests/unittests/basicserver.scm @@ -14,15 +14,16 @@ ;; NON Server tests go here (test #f #f (db:dbdat-get-path *db*)) (test #f #f (db:get-run-name-from-id *db* run-id)) -(test #f '("SYSTEM" "RELEASE") (rmt:get-keys)) +;; (test #f '("SYSTEM" "RELEASE") (rmt:get-keys)) ;; (exit) ;; Server tests go here +(for-each (lambda (run-id) (test #f #f (tasks:server-running-or-starting? (db:delay-if-busy (tasks:open-db)) run-id)) (server:kind-run run-id) (test "did server start within 20 seconds?" #t (let loop ((remtries 20) @@ -31,11 +32,11 @@ run-id))) (if running (> running 0) (if (> remtries 0) (begin - (thread-sleep! 1.1) + (thread-sleep! 1) (loop (- remtries 1) (tasks:server-running-or-starting? (db:delay-if-busy (tasks:open-db)) run-id))))))) @@ -48,11 +49,12 @@ (if (> remtries 0) (begin (thread-sleep! 1.1) (loop (- remtries 1)(tasks:get-server (db:delay-if-busy (tasks:open-db)) run-id))) res))))) - +) +(list 0 1)) (define user (current-user-name)) (define runname "mytestrun") (define keys (rmt:get-keys)) (define runinfo #f) (define keyvals '(("SYSTEM" "abc")("RELEASE" "def"))) @@ -100,16 +102,18 @@ (db:get-header run-info) "runname"))) ;; (vector header (vector "abc" "def" 1 "mytestrun" "new" "n/a" "matt" 1416280640.0)) +(for-each (lambda (run-id) ;; test killing server ;; (tasks:kill-server-run-id run-id) (test #f #f (tasks:server-running-or-starting? (db:delay-if-busy (tasks:open-db)) run-id)) - +) +(list 0 1)) ;; (test #f #f (client:setup run-id)) ;; (set! *transport-type* 'http) ;; ;; (test "setup for run" #t (begin (launch:setup-for-run) Index: utils/Makefile.installall ================================================================== --- utils/Makefile.installall +++ utils/Makefile.installall @@ -145,10 +145,26 @@ cd sqlite-autoconf-$(SQLITE3_VERSION);./configure --prefix=$(PREFIX);make;make install $(PREFIX)/lib/sqlite3.so : $(PREFIX)/bin/sqlite3 CSC_OPTIONS="-I$(PREFIX)/include -L$(PREFIX)/lib" $(CHICKEN_INSTALL) $(PROX) sqlite3 +#====================================================================== +# N A N O M S G +#====================================================================== + +nanomsg-0.5-beta.tar.gz : + wget http://download.nanomsg.org/nanomsg-0.5-beta.tar.gz + +nanomsg-0.5-beta/COPYING : nanomsg-0.5-beta.tar.gz + tar xfvz nanomsg-0.5-beta.tar.gz + +$(PREFIX)/bin/nanocat : nanomsg-0.5-beta/COPYING + cd nanomsg-0.5-beta;./configure --prefix=$(PREFIX);make;make install + +$(PREFIX)/lib/nanomsg.so : $(PREFIX)/bin/nanocat + CSC_OPTIONS="-I$(PREFIX)/include -L$(PREFIX)/lib" $(CHICKEN_INSTALL) $(PROX) nanomsg + #====================================================================== # M A T T S U T I L S #====================================================================== opensrc.fossil : Index: utils/plot-code.scm ================================================================== --- utils/plot-code.scm +++ utils/plot-code.scm @@ -1,7 +1,14 @@ #!/mfs/pkgs/chicken/4.8.0.5/bin/csi -nbq +;; Coming soon (right?) Usage: plot-code file1.scm,file2.scm "fun1,fun2,x*" *.scm > plot.dot +;; Usage: plot-code file1.scm,file2.scm *.scm > plot.dot +;; dot -Tpdf plot.dot > plot.pdf +;; first param is comma separated list of files to include in the map, use - to do all +;; second param is list of regexs for functions to include in the map +;; third param is list of files to scan + (use regex srfi-69 srfi-13) (define targs #f) (define files (cddddr (argv))) @@ -15,10 +22,12 @@ (define defn-rx (regexp "^\\s*\\(define\\s+\\(([^\\s\\)]+).*")) (define all-regexs (make-hash-table)) (define all-fns '()) + +;; for the se (define (print-err . data) (with-output-to-port (current-error-port) (lambda () (apply print data)))) @@ -81,11 +90,32 @@ newres))) (if (null? tal) res (loop (car tal)(cdr tal) res))))) '())) - + +;; (define mm-header #< +;; +;; +;; MMHEADER +;; +;; (define (add-node text) +;; +;; ) +;; +;; minimal mindmap file +;; +;; +;; +;; +;; +;; +;; +;; +;; + ;; Gather the usages (print "digraph G {") (define curr-cluster-num 0) (define function-calls '()) ADDED utils/trace/trace.import.scm Index: utils/trace/trace.import.scm ================================================================== --- /dev/null +++ utils/trace/trace.import.scm @@ -0,0 +1,32 @@ +;;;; trace.import.scm - GENERATED BY CHICKEN 4.9.0.1 -*- Scheme -*- + +(eval '(import + scheme + chicken + csi + advice + extras + ports + data-structures + (except srfi-1 break) + miscmacros)) +(##sys#register-compiled-module + 'trace + (list) + '((breakpoint . trace#breakpoint) + (trace . trace#trace) + (untrace . trace#untrace) + (break . trace#break) + (unbreak . trace#unbreak) + (trace-output-port . trace#trace-output-port) + (continue . trace#continue) + (c . trace#c) + (traced? . trace#traced?) + (trace-module . trace#trace-module) + (untrace-module . trace#untrace-module) + (trace-verbose . trace#trace-verbose) + (trace/untrace . trace#trace/untrace)) + (list) + (list)) + +;; END OF FILE ADDED utils/trace/trace.meta Index: utils/trace/trace.meta ================================================================== --- /dev/null +++ utils/trace/trace.meta @@ -0,0 +1,10 @@ +;;;; trace.meta -*- Scheme -*- + + +((category tools) + (synopsis "tracing and breakpoints") + (author "felix winkelmann") + (license "public domain") + (needs advice ; don't we all? + miscmacros) + (files "tests/run.scm" "trace.meta" "trace.release-info" "trace.scm" "trace.setup") ) ADDED utils/trace/trace.scm Index: utils/trace/trace.scm ================================================================== --- /dev/null +++ utils/trace/trace.scm @@ -0,0 +1,259 @@ +;;;; trace.scm + + +(module trace (breakpoint + trace untrace + break unbreak + trace-output-port + continue c + traced? + trace-module untrace-module + trace-verbose + trace/untrace) + +(import scheme chicken csi) + +(use advice extras ports data-structures) +(require-library srfi-1) +(import (except srfi-1 break) miscmacros) + + +(define *last-breakpoint* #f) +(define *traced-procedures* '()) +(define *broken-procedures* '()) +(define *trace-indent-level* 0) + +(define trace-output-port (make-parameter (current-output-port))) +(define trace-verbose (make-parameter #t)) + +(define (break-entry name args) + ;; Does _not_ unwind! + (##sys#call-with-current-continuation + (lambda (c) + (let ((exn (##sys#make-structure + 'condition + '(exn breakpoint) + (list '(exn . message) "*** breakpoint ***" + '(exn . arguments) (list (cons name args)) + '(exn . location) name + '(exn . continuation) c) ) ) ) + (set! *last-breakpoint* exn) + (signal exn) ) ) ) ) + +(define (break-resume exn) + (let ((a (member '(exn . continuation) (##sys#slot exn 2)))) + (if a + ((cadr a) (void)) + (error "condition has no continuation" exn) ) ) ) + +(define (breakpoint #!optional (name 'breakpoint)) + (break-entry name '()) ) + +(define (trace-indent) + (let ((port (trace-output-port))) + (do ((i (fxmin 3 *trace-indent-level*) (fx- i 1))) + ((fx<= i 0)) + (write-char #\space port) ) + (fprintf port "[~a] " *trace-indent-level*) ) ) + +(define (traced-procedure-entry name args) + (let ((port (trace-output-port))) + (trace-indent) + (set! *trace-indent-level* (fx+ 1 *trace-indent-level*)) + (write (cons name args) port) + (write ", Called from: " port) + (write (conc (car (reverse (get-call-chain))))) + (write-char #\newline port) + (flush-output port) ) ) + +(define (traced-procedure-exit name results) + (let ((port (trace-output-port))) + (set! *trace-indent-level* (fx- *trace-indent-level* 1)) + (trace-indent) + (fprintf port "~a -> " name) + (if results + (for-each + (lambda (x) + (write x port) + (write-char #\space port) ) + results) + (display "(escaping)" port)) + (write-char #\newline port) + (flush-output port) ) ) + +(define (procedure-name proc) + (cond ((procedure-information proc) => + (lambda (info) + (if (pair? info) (car info) info) ) ) + (else ')) ) + +(define (do-trace procs) + (for-each + (lambda (s) + (ensure procedure? s) + (cond ((traced? s) + (warning "procedure already traced" s) ) + (else + (let ((name (procedure-name s))) + (when (trace-verbose) + (fprintf (current-error-port) "; tracing ~a~%" name)) + (set! *traced-procedures* (cons (cons s name) *traced-procedures*)) + (advise + 'around s + (lambda (next args) + (let ((results #f)) + (dynamic-wind + (cut traced-procedure-entry name args) + (lambda () + (call-with-values (cut apply next args) + (lambda rs + (set! results rs) + (apply values rs)))) + (cut traced-procedure-exit name results)))) + '*trace*))))) + procs) ) + +(define (do-untrace-all) + (define (unadvise* p) + (ignore-errors (unadvise p '*trace*))) + (for-each + (lambda (proc) + (let ((proc (car proc))) + (when (trace-verbose) + (fprintf (current-error-port) "; untracing ~a~%" (procedure-name proc)) + (unadvise* proc)))) + *traced-procedures*) + (set! *traced-procedures* '())) + +(define (do-untrace procs) + (for-each + (lambda (s) + (ensure procedure? s) + (let ((p (assq s *traced-procedures*)) + (name (procedure-name s))) + (cond ((not p) (warning "procedure not traced" name)) + (else + (when (trace-verbose) + (fprintf (current-error-port) "; untracing ~a~%" name)) + (ignore-errors (unadvise s '*trace*)) + (set! *traced-procedures* + (delete + p *traced-procedures* + eq?)))))) + procs) ) + +(define (do-break procs) + (for-each + (lambda (s) + (let ((name (procedure-name s))) + (ensure procedure? s) + (cond ((assq s *broken-procedures*) + (warning "procedure already has break-point" name)) + (else + (when (trace-verbose) + (fprintf (current-error-port) "; setting break-point in ~a~%" name)) + (set! *broken-procedures* (cons (cons s name) *broken-procedures*)) + (advise + 'before s + (lambda (args) + (break-entry name args) ) + '*break*) ) ))) + procs) ) + +(define (do-unbreak procs) + (for-each + (lambda (s) + (ensure procedure? s) + (let ((p (assq s *broken-procedures*)) + (name (procedure-name s))) + (cond ((not p) (warning "procedure has no breakpoint" name)) + (else + (when (trace-verbose) + (fprintf (current-error-port) "; removing break-point in ~a~%" name)) + (ignore-errors (unadvise s '*break*)) + (set! *broken-procedures* (delete p *broken-procedures* eq?) ) ) ) ) ) + procs) ) + +(define (do-unbreak-all) + (for-each + (lambda (bp) + (ignore-errors (unadvise (car bp) '*break*))) + *broken-procedures*) + (set! *broken-procedures* '()) + (void)) + +(define (trace . procs) + (cond ((null? procs) + (when (pair? *traced-procedures*) + (printf "Traced:~%~%") + (for-each (lambda (p) (printf " ~a~%" (cdr p))) *traced-procedures*)) ) + (else + (do-trace procs) ) ) ) + +(define (untrace . procs) + (cond ((null? procs) (do-untrace-all)) + (else (do-untrace procs))) + (void)) + +(define (break . procs) + (cond ((null? procs) + (when (pair? *broken-procedures*) + (printf "Breakpoints:~%~%") + (for-each (lambda (p) (printf " ~a~%" (cdr p))) *broken-procedures*)) ) + (else + (do-break procs) ) ) ) + +(define (unbreak . procs) + (cond ((null? procs) (do-unbreak-all)) + (else (do-unbreak procs)))) + +(define (continue #!optional (bp *last-breakpoint*)) + (cond (*last-breakpoint* + (let ((exn *last-breakpoint*)) + (set! *last-breakpoint* #f) + (break-resume exn) ) ) + (else (display "no breakpoint pending\n") ) ) ) + +(define c continue) + +(define (traced? proc) + (assq proc *traced-procedures*)) + +(define (trace/untrace . procs) + (for-each + (lambda (proc) + ((if (traced? proc) do-untrace do-trace) (list proc))) + procs)) + +(define (walk-module mname proc) + (let* ((m (##sys#find-module mname)) + (exps (nth-value 1 (##sys#module-exports m)))) + (for-each + (lambda (exp) + (let* ((realname (cdr exp)) + (prim (get realname '##core#primitive))) + (if prim + (warning "export is a core-library primitive - not traced" (car exp)) + (when (##sys#symbol-has-toplevel-binding? realname) + (let ((val (##sys#slot realname 0))) + (when (procedure? val) + (proc val))))))) + exps))) + +(define (trace-module . mnames) + (for-each + (lambda (mname) + (walk-module mname trace)) + mnames)) + +(define (untrace-module . mnames) + (for-each + (lambda (mname) + (walk-module + mname + (lambda (proc) + (when (traced? proc) + (do-untrace (list proc)))))) + mnames)) + +) ADDED utils/trace/trace.setup Index: utils/trace/trace.setup ================================================================== --- /dev/null +++ utils/trace/trace.setup @@ -0,0 +1,9 @@ +;;;; trace.setup -*- Scheme -*- + + +(compile -s trace.scm -O3 -d1 -j trace) +(compile -s trace.import.scm -O3 -d0) + +(install-extension + 'trace + '("trace.so" "trace.import.so")) ADDED zmq-transport.scm Index: zmq-transport.scm ================================================================== --- /dev/null +++ zmq-transport.scm @@ -0,0 +1,493 @@ + +;; Copyright 2006-2012, Matthew Welland. +;; +;; This program is made available under the GNU GPL version 2.0 or +;; greater. See the accompanying file COPYING for details. +;; +;; This program is distributed WITHOUT ANY WARRANTY; without even the +;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +;; PURPOSE. + +(require-extension (srfi 18) extras tcp s11n) + +(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) +(import (prefix sqlite3 sqlite3:)) + +(use zmq) + +(declare (unit zmq-transport)) + +(declare (uses common)) +(declare (uses db)) +(declare (uses tests)) +(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. +(declare (uses server)) + +(include "common_records.scm") +(include "db_records.scm") + +;; Transition to pub --> sub with pull <-- push +;; +;; 1. client sends request to server via push to the pull port +;; 2. server puts request in queue or processes immediately as appropriate +;; 3. server puts responses from completed requests into pub port +;; +;; TODO +;; +;; Done Tested +;; [x] [ ] 1. Add columns pullport pubport to servers table +;; [x] [ ] 2. Add rm of monitor.db if older than 11/12/2012 +;; [x] [ ] 3. Add create of pullport and pubport with finding of available ports +;; [x] [ ] 4. Add client compose of request +;; [x] [ ] - name of client: testname/itempath-test_id-hostname +;; [x] [ ] - name of request: callname, params +;; [x] [ ] - request key: f(clientname, callname, params) +;; [x] [ ] 5. Add processing of subscription hits +;; [x] [ ] - done when get key +;; [x] [ ] - return results +;; [x] [ ] 6. Add timeout processing +;; [x] [ ] - after 60 seconds +;; [ ] [ ] i. check server alive, connect to new if necessary +;; [ ] [ ] ii. resend request +;; [ ] [ ] 7. Turn self ping back on + +(define (zmq-transport:make-server-url hostport) + (if (not hostport) + #f + (conc "tcp://" (car hostport) ":" (cadr hostport)))) + +(define *server-loop-heart-beat* (current-seconds)) +(define *heartbeat-mutex* (make-mutex)) + +;;====================================================================== +;; S E R V E R +;;====================================================================== + +(define-inline (zmqsock:get-pub dat)(vector-ref dat 0)) +(define-inline (zmqsock:get-pull dat)(vector-ref dat 1)) +(define-inline (zmqsock:set-pub! dat s)(vector-set! dat s 0)) +(define-inline (zmqsock:set-pull! dat s)(vector-set! dat s 0)) + +(define (zmq-transport:run hostn) + (debug:print 2 "Attempting to start the server ...") + (if (not *toppath*) + (if (not (setup-for-run)) + (begin + (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") + (exit)))) + (let* ((db (open-db)) ;; here we *do not* want to be opening and closing the db + (zmq-sdat1 #f) + (zmq-sdat2 #f) + (pull-socket #f) + (pub-socket #f) + (p1 #f) + (p2 #f) + (zmq-sockets-dat #f) + (iface (if (string=? "-" hostn) + "*" ;; (get-host-name) + hostn)) + (hostname (get-host-name)) + (ipaddrstr (let ((ipstr (if (string=? "-" hostn) + (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") + #f))) + (if ipstr ipstr hostname))) + (last-run 0)) + (set! zmq-sockets-dat (zmq-transport:setup-ports ipaddrstr (if (args:get-arg "-port") + (string->number (args:get-arg "-port")) + (+ 5000 (random 1001))))) + + (set! zmq-sdat1 (car zmq-sockets-dat)) + (set! pull-socket (cadr zmq-sdat1)) ;; (iface s port) + (set! p1 (caddr zmq-sdat1)) + + (set! zmq-sdat2 (cadr zmq-sockets-dat)) + (set! pub-socket (cadr zmq-sdat2)) + (set! p2 (caddr zmq-sdat2)) + + (set! *cache-on* #t) + + (set! *runremote* (vector pull-socket pub-socket)) ;; overloading the use of *runremote* BUG!? + + ;; what to do when we quit + ;; +;; (on-exit (lambda () +;; (if (and *toppath* *server-info*) +;; (open-run-close tasks:server-deregister-self tasks:open-db (car *server-info*)) +;; (let loop () +;; (let ((queue-len 0)) +;; (thread-sleep! (random 5)) +;; (mutex-lock! *incoming-mutex*) +;; (set! queue-len (length *incoming-data*)) +;; (mutex-unlock! *incoming-mutex*) +;; (if (> queue-len 0) +;; (begin +;; (debug:print-info 0 "Queue not flushed, waiting ...") +;; (loop)))))))) + + ;; The heavy lifting + ;; + ;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime + ;; + (debug:print-info 11 "Server setup complete, start listening for messages") + (let loop ((queue-lst '())) + (let* ((rawmsg (receive-message* pull-socket)) + (packet (db:string->obj rawmsg)) + (qtype (cdb:packet-get-qtype packet))) + (debug:print-info 12 "server=> received packet=" packet) + (if (not (member qtype '(sync ping))) + (begin + (mutex-lock! *heartbeat-mutex*) + (set! *last-db-access* (current-seconds)) + (mutex-unlock! *heartbeat-mutex*))) + (if #t ;; (cdb:packet-get-immediate packet) ;; process immediately or put in queue + (begin + (db:process-queue-item db packet) + ;; (open-run-close db:process-queue #f pub-socket (cons packet queue-lst)) + + (loop '())) + (loop (cons packet queue-lst))))))) + +;; run zmq-transport:keep-running in a parallel thread to monitor that the db is being +;; used and to shutdown after sometime if it is not. +;; +(define (zmq-transport:keep-running) + ;; if none running or if > 20 seconds since + ;; server last used then start shutdown + ;; This thread waits for the server to come alive + (let* ((server-info (let loop () + (let ((sdat #f)) + (mutex-lock! *heartbeat-mutex*) + (set! sdat *server-info*) + (mutex-unlock! *heartbeat-mutex*) + (if sdat sdat + (begin + (debug:print 12 "WARNING: server not started yet, waiting few seconds before trying again") + (sleep 4) + (loop)))))) + (iface (cadr server-info)) + (pullport (caddr server-info)) + (pubport (cadddr server-info)) ;; id interface pullport pubport) + ;; (zmq-sockets (zmq-transport:client-connect iface pullport pubport)) + (last-access 0)) + (debug:print-info 11 "heartbeat started for zmq server on " iface " " pullport " " pubport) + (let loop ((count 0)) + (thread-sleep! 4) ;; no need to do this very often + ;; NB// sync currently does NOT return queue-length + ;; GET REAL QUEUE LENGTH FROM THE VARIABLE + (let ((queue-len 0)) ;; FOR NOW DO NOT DO THIS (cdb:client-call zmq-sockets 'sync #t 1))) + ;; (print "Server running, count is " count) + (if (< count 1) ;; 3x3 = 9 secs aprox + (loop (+ count 1))) + + ;; NOTE: Get rid of this mechanism! It really is not needed... + (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info)) + + ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access + (mutex-lock! *heartbeat-mutex*) + (set! last-access *last-db-access*) + (mutex-unlock! *heartbeat-mutex*) + (if (> (+ last-access + ;; (* 50 60 60) ;; 48 hrs + ;; 60 ;; one minute + ;; (* 60 60) ;; one hour + (* 45 60) ;; 45 minutes, until the db deletion bug is fixed. + ) + (current-seconds)) + (begin + (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) + (loop 0)) + (begin + (debug:print-info 0 "Starting to shutdown the server.") + ;; need to delete only *my* server entry (future use) + (set! *time-to-exit* #t) + (open-run-close tasks:server-deregister-self tasks:open-db (get-host-name)) + (thread-sleep! 1) + (debug:print-info 0 "Max cached queries was " *max-cache-size*) + (debug:print-info 0 "Server shutdown complete. Exiting") + (exit))))))) + +(define (zmq-transport:find-free-port-and-open iface s port stype #!key (trynum 50)) + (let ((s (if s s (make-socket stype))) + (p (if (number? port) port 5555)) + (old-handler (current-exception-handler))) + (handle-exceptions + exn + (begin + (debug:print 0 "Failed to bind to port " p ", trying next port") + (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) + ;; (old-handler) + ;; (print-call-chain) + (if (> trynum 0) + (zmq-transport:find-free-port-and-open iface s (+ p 1) trynum: (- trynum 1)) + (debug:print-info 0 "Tried ports up to " p + " but all were in use. Please try a different port range by starting the server with parameter \" -port N\" where N is the starting port number to use")) + (exit)) ;; To exit or not? That is the question. + (let ((zmq-url (conc "tcp://" iface ":" p))) + (debug:print 2 "Trying to start server on " zmq-url) + (bind-socket s zmq-url) + (list iface s port))))) + +(define (zmq-transport:setup-ports ipaddrstr startport) + (let* ((s1 (zmq-transport:find-free-port-and-open ipaddrstr #f startport 'pull)) + (p1 (caddr s1)) + (s2 (zmq-transport:find-free-port-and-open ipaddrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pub)) + (p2 (caddr s2))) + (set! *runremote* #f) + (debug:print 0 "Server started on " ipaddrstr " ports " p1 " and " p2) + (mutex-lock! *heartbeat-mutex*) + (set! *server-info* (open-run-close tasks:server-register + tasks:open-db + (current-process-id) + ipaddrstr p1 + 0 + 'live + 'zmq + pubport: p2)) + (debug:print-info 11 "*server-info* set to " *server-info*) + (mutex-unlock! *heartbeat-mutex*) + (list s1 s2))) + +(define (zmq-transport:mk-signature) + (message-digest-string (md5-primitive) + (with-output-to-string + (lambda () + (write (list (current-directory) + (argv))))))) + +;;====================================================================== +;; S E R V E R U T I L I T I E S +;;====================================================================== + +;;====================================================================== +;; C L I E N T S +;;====================================================================== + +;; +(define (zmq-transport:client-socket-connect iface port #!key (context #f)(type 'req)(subscriptions '())) + (debug:print-info 3 "client-connect " iface ":" port ", type=" type ", subscriptions=" subscriptions) + (let ((connect-ok #f) + (zmq-socket (if context + (make-socket type context) + (make-socket type))) + (conurl (zmq-transport:make-server-url (list iface port)))) + (if (socket? zmq-socket) + (begin + ;; first apply subscriptions + (for-each (lambda (subscription) + (debug:print 2 "Subscribing to " subscription) + (socket-option-set! zmq-socket 'subscribe subscription)) + subscriptions) + (connect-socket zmq-socket conurl) + zmq-socket) + (begin + (debug:print 0 "ERROR: Failed to open socket to " conurl) + #f)))) + +(define (zmq-transport:client-connect iface pullport pubport) + (let* ((push-socket (zmq-transport:client-socket-connect iface pullport type: 'push)) + (sub-socket (zmq-transport:client-socket-connect iface pubport + type: 'sub + subscriptions: (list (server:get-client-signature) "all"))) + (zmq-sockets (vector push-socket sub-socket)) + (login-res #f)) + (debug:print-info 11 "zmq-transport:client-connect started. Next is login") + (set! login-res (server:client-login zmq-sockets)) + (if (and (not (null? login-res)) + (car login-res)) + (begin + (debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".") + (set! *runremote* zmq-sockets) + zmq-sockets) + (begin + (debug:print-info 2 "Failed to login or connect to " conurl) + (set! *runremote* #f) + #f)))) + +;; run zmq-transport:keep-running in a parallel thread to monitor that the db is being +;; used and to shutdown after sometime if it is not. +;; +(define (zmq-transport:keep-running) + ;; if none running or if > 20 seconds since + ;; server last used then start shutdown + ;; This thread waits for the server to come alive + (let* ((server-info (let loop () + (let ((sdat #f)) + (mutex-lock! *heartbeat-mutex*) + (set! sdat *runremote*) + (mutex-unlock! *heartbeat-mutex*) + (if sdat sdat + (begin + (sleep 4) + (loop)))))) + (iface (car server-info)) + (port (cadr server-info)) + (last-access 0) + (tdb (tasks:open-db)) + (spid (tasks:server-get-server-id tdb #f iface port #f))) + (print "Keep-running got server pid " spid ", using iface " iface " and port " port) + (let loop ((count 0)) + (thread-sleep! 4) ;; no need to do this very often + ;; NB// sync currently does NOT return queue-length + (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1))) + ;; (print "Server running, count is " count) + (if (< count 1) ;; 3x3 = 9 secs aprox + (loop (+ count 1))) + + ;; NOTE: Get rid of this mechanism! It really is not needed... + (tasks:server-update-heartbeat tdb spid) + + ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access + (mutex-lock! *heartbeat-mutex*) + (set! last-access *last-db-access*) + (mutex-unlock! *heartbeat-mutex*) + (if (> (+ last-access + ;; (* 50 60 60) ;; 48 hrs + ;; 60 ;; one minute + ;; (* 60 60) ;; one hour + (* 45 60) ;; 45 minutes, until the db deletion bug is fixed. + ) + (current-seconds)) + (begin + (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) + (loop 0)) + (begin + (debug:print-info 0 "Starting to shutdown the server.") + ;; need to delete only *my* server entry (future use) + (set! *time-to-exit* #t) + (tasks:server-deregister-self tdb (get-host-name)) + (thread-sleep! 1) + (debug:print-info 0 "Max cached queries was " *max-cache-size*) + (debug:print-info 0 "Server shutdown complete. Exiting") + (exit))))))) + +;; all routes though here end in exit ... +(define (zmq-transport:launch) + (if (not *toppath*) + (if (not (setup-for-run)) + (begin + (debug:print 0 "ERROR: cannot find megatest.config, exiting") + (exit)))) + (debug:print-info 2 "Starting zmq server") + (if *toppath* + (let* (;; (th1 (make-thread (lambda () + ;; (let ((server-info #f)) + ;; ;; wait for the server to be online and available + ;; (let loop () + ;; (debug:print-info 2 "Waiting for the server to come online before starting heartbeat") + ;; (thread-sleep! 2) + ;; (mutex-lock! *heartbeat-mutex*) + ;; (set! server-info *server-info* ) + ;; (mutex-unlock! *heartbeat-mutex*) + ;; (if (not server-info)(loop))) + ;; (debug:print 2 "Server alive, starting self-ping") + ;; (zmq-transport:self-ping server-info) + ;; )) + ;; "Self ping")) + (th2 (make-thread (lambda () + (zmq-transport:run + (if (args:get-arg "-server") + (args:get-arg "-server") + "-"))) "Server run")) + ;; (th3 (make-thread (lambda ()(zmq-transport:keep-running)) "Keep running")) + ) + (set! *client-non-blocking-mode* #t) + ;; (thread-start! th1) + (thread-start! th2) + ;; (thread-start! th3) + (set! *didsomething* #t) + ;; (thread-join! th3) + (thread-join! th2) + ) + (debug:print 0 "ERROR: Failed to setup for megatest"))) + +(define (zmq-transport:client-signal-handler signum) + (handle-exceptions + exn + (debug:print " ... exiting ...") + (let ((th1 (make-thread (lambda () + (if (not *received-response*) + (receive-message* *runremote*))) ;; flush out last call if applicable + "eat response")) + (th2 (make-thread (lambda () + (debug:print 0 "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.") + (thread-sleep! 3) ;; give the flush three seconds to do it's stuff + (debug:print 0 " Done.") + (exit 4)) + "exit on ^C timer"))) + (thread-start! th2) + (thread-start! th1) + (thread-join! th2)))) + +(define (zmq-transport:client-launch) + (set-signal-handler! signal/int zmq-transport:client-signal-handler) + (if (zmq-transport:client-setup) + (debug:print-info 2 "connected as client") + (begin + (debug:print 0 "ERROR: Failed to connect as client") + (exit)))) + +;;====================================================================== +;; Defunct functions +;;====================================================================== + +;; ping a server and return number of clients or #f (if no response) +;; NOT IN USE! +(define (zmq-transport:ping host port #!key (secs 10)(return-socket #f)) + (cdb:use-non-blocking-mode + (lambda () + (let* ((res #f) + (th1 (make-thread + (lambda () + (let* ((zmq-context (make-context 1)) + (zmq-socket (zmq-transport:client-connect host port context: zmq-context))) + (if zmq-socket + (if (zmq-transport:client-login zmq-socket) + (let ((numclients (cdb:num-clients zmq-socket))) + (if (not return-socket) + (begin + (zmq-transport:client-logout zmq-socket) + (close-socket zmq-socket))) + (set! res (list #t numclients (if return-socket zmq-socket #f)))) + (begin + ;; (close-socket zmq-socket) + (set! res (list #f "CAN'T LOGIN" #f)))) + (set! res (list #f "CAN'T CONNECT" #f))))) + "Ping: th1")) + (th2 (make-thread + (lambda () + (let loop ((count 1)) + (debug:print-info 1 "Ping " count " server on " host " at port " port) + (thread-sleep! 2) + (if (< count (/ secs 2)) + (loop (+ count 1)))) + ;; (thread-terminate! th1) + (set! res (list #f "TIMED OUT" #f))) + "Ping: th2"))) + (thread-start! th2) + (thread-start! th1) + (handle-exceptions + exn + (set! res (list #f "TIMED OUT" #f)) + (thread-join! th1 secs)) + res)))) + +;; (define (zmq-transport:self-ping server-info) +;; ;; server-info: server-id interface pullport pubport +;; (let ((iface (list-ref server-info 1)) +;; (pullport (list-ref server-info 2)) +;; (pubport (list-ref server-info 3))) +;; (zmq-transport:client-connect iface pullport pubport) +;; (let loop () +;; (thread-sleep! 2) +;; (cdb:client-call *runremote* 'ping #t) +;; (debug:print 4 "zmq-transport:self-ping - I'm alive on " iface ":" pullport "/" pubport "!") +;; (mutex-lock! *heartbeat-mutex*) +;; (set! *server-loop-heart-beat* (current-seconds)) +;; (mutex-unlock! *heartbeat-mutex*) +;; (loop)))) + +(define (zmq-transport:reply pubsock target query-sig success/fail result) + (debug:print-info 11 "zmq-transport:reply target=" target ", result=" result) + (send-message pubsock target send-more: #t) + (send-message pubsock (db:obj->string (vector success/fail query-sig result)))) +