Index: api.scm ================================================================== --- api.scm +++ api.scm @@ -111,10 +111,11 @@ ;; (define (api:execute-requests dbstruct dat) (handle-exceptions exn (let ((call-chain (get-call-chain))) + (debug:print 0 *default-log-port* "WARNING: api:execute-requests received an exception from peer") (print-call-chain (current-error-port)) (debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn)) (vector #f (vector exn call-chain dat))) ;; return some stuff for debug if an exception happens (if (not (vector? dat)) ;; it is an error to not receive a vector (vector #f #f "remote must be called with a vector") @@ -239,10 +240,11 @@ ;; TEST DATA ((read-test-data) (apply db:read-test-data dbstruct params)) ;; MISC + ((get-latest-host-load) (apply db:get-latest-host-load dbstruct params)) ((have-incompletes?) (apply db:have-incompletes? dbstruct params)) ((login) (apply db:login dbstruct params)) ((general-call) (let ((stmtname (car params)) (run-id (cadr params)) (realparams (cddr params))) Index: client.scm ================================================================== --- client.scm +++ client.scm @@ -46,17 +46,23 @@ (define (client:connect iface port) (case (server:get-transport) ((rpc) (rpc:client-connect iface port)) ((http) (http:client-connect iface port)) ((zmq) (zmq:client-connect iface port)) - (else (rpc:client-connect iface port)))) + (else + (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (5)") + (exit)))) (define (client:setup run-id #!key (remaining-tries 10) (failed-connects 0)) (case (server:get-transport) - ((rpc) (rpc-transport:client-setup run-id remaining-tries: remaining-tries failed-connects: failed-connects)) ;;(client:setup-rpc run-id)) + ((rpc) (let ((res (client:setup-rpc run-id remaining-tries: remaining-tries))) + (remote-conndat-set! *runremote* res) + res)) ((http)(client:setup-http run-id remaining-tries: remaining-tries failed-connects: failed-connects)) - (else (rpc-transport:client-setup run-id remaining-tries: remaining-tries failed-connects: failed-connects)))) ;; (client:setup-rpc run-id)))) + (else + (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (6)") + (exit)))) ;; (client:setup-rpc run-id)))) ;; (define (client:login-no-auto-setup server-info run-id) ;; (case (server:get-transport) ;; ((rpc) (rpc:login-no-auto-client-setup server-info run-id)) ;; ((http) (rmt:login-no-auto-client-setup server-info run-id)) @@ -152,10 +158,30 @@ ;; ;; client:setup ;; ;; lookup_server, need to remove *runremote* stuff ;; + +(define (client:setup-rpc run-id #!key (remaining-tries 10) (failed-connects 0)) + (debug:print-info 2 *default-log-port* "client:setup-rpc remaining-tries=" remaining-tries) + (let* ((server-dat (tasks:get-server (db:delay-if-busy (tasks:open-db)) run-id)) + (num-available (tasks:num-in-available-state (db:delay-if-busy (tasks:open-db)) run-id))) + (cond + ((<= remaining-tries 0) + (debug:print-error 0 *default-log-port* "failed to start or connect to server for run-id " run-id) + (exit 1)) + (server-dat + (debug:print-info 4 *default-log-port* "client:setup-rpc server-dat=" server-dat ", remaining-tries=" remaining-tries) + + (rpc-transport:client-setup run-id server-dat remaining-tries: remaining-tries)) + (else + (if (< num-available 2) + (server:try-running run-id)) + (thread-sleep! (+ 2 (random (- 20 remaining-tries)))) ;; give server a little time to start up, randomize a little to avoid start storms. + (client:setup run-id remaining-tries: (- remaining-tries 1)))))) + + (define (client:setup-http run-id #!key (remaining-tries 10) (failed-connects 0)) (debug:print-info 2 *default-log-port* "client:setup remaining-tries=" remaining-tries) (let* ((tdbdat (tasks:open-db))) (if (<= remaining-tries 0) (begin @@ -165,22 +191,12 @@ (debug:print-info 4 *default-log-port* "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries) (if server-dat (let* ((iface (tasks:hostinfo-get-interface server-dat)) (hostname (tasks:hostinfo-get-hostname server-dat)) (port (tasks:hostinfo-get-port server-dat)) - (start-res (case *transport-type* - ((http)(http-transport:client-connect iface port)) - ;;((nmsg)(nmsg-transport:client-connect hostname port)) - )) - (ping-res (case *transport-type* - ((http)(rmt:login-no-auto-client-setup start-res)) - ;; ((nmsg)(let ((logininfo (rmt:login-no-auto-client-setup start-res run-id))) - ;; (if logininfo - ;; (car (vector-ref logininfo 1)) - ;; #f))) - - ))) + (start-res (http-transport:client-connect iface port)) + (ping-res (rmt:login-no-auto-client-setup start-res))) (if (and start-res ping-res) (begin (remote-conndat-set! *runremote* start-res) ;; (hash-table-set! *runremote* run-id start-res) (debug:print-info 2 *default-log-port* "connected to " (http-transport:server-dat-make-url start-res)) @@ -209,11 +225,11 @@ (if (< num-available 2) (server:try-running run-id)) (thread-sleep! (+ 5 (random (- 20 remaining-tries)))) ;; give server a little time to start up, randomize a little to avoid start storms. (client:setup run-id remaining-tries: (- remaining-tries 1))))))))) -;; keep this as a function to ease future +;; keep this as a function to ease future ;; this is unused, not porting for rpc -BB (define (client:start run-id server-info) (http-transport:client-connect (tasks:hostinfo-get-interface server-info) (tasks:hostinfo-get-port server-info))) ;; ;; client:signal-handler Index: common.scm ================================================================== --- common.scm +++ common.scm @@ -18,10 +18,12 @@ (import (prefix base64 base64:)) (declare (unit common)) (include "common_records.scm") +(include "thunk-utils.scm") + ;; (require-library margs) ;; (include "margs.scm") ;; (define old-exit exit) @@ -60,13 +62,24 @@ (if (not cxt) (set! cxt (let ((x (make-cxt)))(hash-table-set! *contexts* toppath x) x))) (let ((cxt-mutex (cxt-mutex cxt))) (mutex-unlock! *context-mutex*) (mutex-lock! cxt-mutex) - (let ((res (proc cxt))) - (mutex-unlock! cxt-mutex) - res)))) + ;; here we guard proc with exception handler so + ;; no matter how proc succeeds or fails, + ;; the cxt-mutex will be unlocked afterward. + (let* ((EXCEPTION-SYMBOL (gensym)) ;; use a generated symbol + (guarded-proc ;; to avoid collision + (lambda args + (let* ((res (condition-case + (apply proc args) + [x () (cons EXCEPTION-SYMBOL x)]))) + (mutex-unlock! cxt-mutex) + (if (and (pair? res) (eq? (car res) EXCEPTION)) + (abort (cdr res)) + res))))) + (guarded-proc cxt))))) (define *db-keys* #f) (define *configinfo* #f) ;; raw results from setup, includes toppath and table from megatest.config (define *runconfigdat* #f) ;; run configs data @@ -102,11 +115,22 @@ (define *db-access-mutex* (make-mutex)) (define *db-cache-path* #f) ;; SERVER (define *my-client-signature* #f) -(define *transport-type* 'http) ;; override with [server] transport http|rpc|nmsg +(define *transport-type* #f) ;; override with [server] transport http|rpc|nmsg + +(define *DEFAULT-TRANSPORT* "http") +(define (common:set-transport-type) + (set! *transport-type* + (string->symbol + (or + (args:get-arg "-transport") + (configf:lookup *configdat* "server" "transport") + *DEFAULT-TRANSPORT*))) + *transport-type*) + (define *runremote* #f) ;; if set up for server communication this will hold (define *max-cache-size* 0) (define *logged-in-clients* (make-hash-table)) (define *server-id* #f) (define *server-info* #f) @@ -609,10 +633,11 @@ (loop))) (if (common:low-noise-print 30) (debug:print-info 0 *default-log-port* "Exiting watchdog timer, *time-to-exit* = " *time-to-exit*))))))) (define (std-exit-procedure) + (let ((no-hurry (if *time-to-exit* ;; hurry up #f (begin (set! *time-to-exit* #t) #t)))) @@ -637,10 +662,19 @@ (thread-sleep! 5) ;; give the clean up few seconds to do it's stuff (thread-sleep! 2)) (debug:print 4 *default-log-port* " ... done") ) "clean exit"))) + + ;; let's try to clean up open sockets + (if *runremote* + (case (remote-transport *runremote*) + ((http) #t) + ((rpc) (rpc:close-all-connections!)) + (else + (debug:print-info 0 *default-log-port* "Transport "(remote-transport *runremote*)" not supported")))) + (thread-start! th1) (thread-start! th2) (thread-join! th1)))) (define (std-signal-handler signum) @@ -1084,11 +1118,12 @@ (lambda ()(list (read)(read)(read))))) (with-input-from-file "/proc/loadavg" (lambda ()(list (read)(read)(read)))))) ;; get normalized cpu load by reading from /proc/loadavg and /proc/cpuinfo return all three values and the number of real cpus and the number of threads -;; returns list (normalized-proc-load normalized-core-load 1m 5m 15m ncores nthreads) +;; returns alist '((adj-cpu-load . normalized-proc-load) ... etc. +;; keys: adj-proc-load, adj-core-load, 1m-load, 5m-load, 15m-load ;; (define (common:get-normalized-cpu-load remote-host) (let ((data (if remote-host (with-input-from-pipe (conc "ssh " remote-host " cat /proc/loadavg;cat /proc/cpuinfo;echo end") @@ -1143,51 +1178,89 @@ (let ((res (system (conc "ping -c 1 " hostname " > /dev/null")))) (eq? res 0))) ;; ideally put all this info into the db, no need to preserve it across moving homehost ;; -(define (common:get-least-loaded-host hosts-raw) +;; return list of +;; ( reachable? cpuload update-time ) +(define (common:get-host-info hostname) + (let* ((loadinfo (rmt:get-latest-host-load hostname)) + (load (car loadinfo)) + (load-sample-time (cdr loadinfo)) + (load-sample-age (- (current-seconds) load-sample-time)) + (loadinfo-timeout-seconds 20) + (host-last-update-timeout-seconds 10) + (host-rec (hash-table-ref/default *host-loads* hostname #f)) + ) + (cond + ((< load-sample-age loadinfo-timeout-seconds) + (list #t + load-sample-time + load)) + ((and host-rec + (< (current-seconds) (+ (host-last-update host-rec) host-last-update-timeout-seconds))) + (list #t + (host-last-update host-rec) + (host-last-cpuload host-rec ))) + ((common:unix-ping hostname) + (list #t + (current-seconds) + (alist-ref 'adj-core-load (common:get-normalized-cpu-load hostname)))) + (else + (list #f 0 -1))))) + +(define (common:update-host-loads-table hosts-raw) (let* ((hosts (filter (lambda (x) (string-match (regexp "^\\S+$") x)) hosts-raw))) - (if (null? hosts) - #f - ;; - ;; stategy: - ;; sort by last-used and normalized-load - ;; if last-updated > 15 seconds then re-update - ;; take the host with the lowest load with the lowest last-used (i.e. not used for longest time) - ;; - (let ((best-host #f) - (curr-time (current-seconds))) - (for-each - (lambda (hostname) - (let* ((rec (let ((h (hash-table-ref/default *host-loads* hostname #f))) - (if h - h - (let ((h (make-host))) - (hash-table-set! *host-loads* hostname h) - h)))) - ;; if host hasn't been pinged in 15 sec update it's data - (ping-good (if (< (- curr-time (host-last-update rec)) 15) - (host-reachable rec) - (or (host-reachable rec) - (begin - (host-reachable-set! rec (common:unix-ping hostname)) - (host-last-update-set! rec curr-time) - (host-last-cpuload-set! rec (common:get-normalized-cpu-load hostname)) - (host-reachable rec)))))) - (cond - ((not best-host) - (set! best-host hostname)) - ((and ping-good - (< (alist-ref 'adj-core-load (host-last-cpuload rec)) - (alist-ref 'adj-core-load - (host-last-cpuload (hash-table-ref *host-loads* best-host))))) - (set! best-host hostname))))) - hosts) - best-host)))) + (for-each + (lambda (hostname) + (let* ((rec (let ((h (hash-table-ref/default *host-loads* hostname #f))) + (if h + h + (let ((h (make-host))) + (hash-table-set! *host-loads* hostname h) + h)))) + (host-info (common:get-host-info hostname)) + (is-reachable (car host-info)) + (last-reached-time (cadr host-info)) + (load (caddr host-info))) + (host-reachable-set! rec is-reachable) + (host-last-update-set! rec last-reached-time) + (host-last-cpuload-set! rec load))) + hosts))) + +(define (common:get-least-loaded-host hosts-raw) + (let* ((hosts (filter (lambda (x) + (string-match (regexp "^\\S+$") x)) + hosts-raw)) + (best-host #f) + (best-load 99999) + (curr-time (current-seconds))) + (common:update-host-loads-table hosts) + (for-each + (lambda (hostname) + (let* ((rec + (let ((h (hash-table-ref/default *host-loads* hostname #f))) + (if h + h + (let ((h (make-host))) + (hash-table-set! *host-loads* hostname h) + h)))) + (reachable (host-reachable rec)) + (load (host-last-cpuload rec))) + (cond + ((not reachable) #f) + ((< (+ load (/ (random 250) 1000)) ;; add a random factor to keep from getting in a rut + (+ best-load (/ (random 250) 1000)) ) + (set! best-load load) + (set! best-host hostname))))) + hosts) + best-host)) + + + (define (common:wait-for-cpuload maxload numcpus waitdelay #!key (count 1000) (msg #f)(remote-host #f)) (let* ((loadavg (common:get-cpu-load remote-host)) (first (car loadavg)) (next (cadr loadavg)) Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -1497,26 +1497,33 @@ (min-incompleted-ids (map car incompleted)) ;; do 'em all (all-ids (append min-incompleted-ids (map car oldlaunched)))) (if (> (length all-ids) 0) (begin (debug:print 0 *default-log-port* "WARNING: Marking test(s); " (string-intersperse (map conc all-ids) ", ") " as INCOMPLETE") - (sqlite3:execute - db - (conc "UPDATE tests SET state='INCOMPLETE' WHERE run_id=? AND id IN (" - (string-intersperse (map conc all-ids) ",") - ");") - run-id)))) - - ;; Now do rollups for the toplevel tests - ;; - ;; (db:delay-if-busy dbdat) - (for-each - (lambda (toptest) - (let ((test-name (list-ref toptest 3))) -;; (run-id (list-ref toptest 5))) - (db:top-test-set-per-pf-counts dbstruct run-id test-name))) - toplevels))) + (for-each + (lambda (test-id) + (db:test-set-status-state dbstruct run-id test-id "COMPLETE" "DEAD" "Test failed to complete")) + all-ids)))))) + +;; ALL REPLACED BY THE BLOCK ABOVE +;; +;; (sqlite3:execute +;; db +;; (conc "UPDATE tests SET state='INCOMPLETE' WHERE run_id=? AND id IN (" +;; (string-intersperse (map conc all-ids) ",") +;; ");") +;; run-id)))) +;; +;; ;; Now do rollups for the toplevel tests +;; ;; +;; ;; (db:delay-if-busy dbdat) +;; (for-each +;; (lambda (toptest) +;; (let ((test-name (list-ref toptest 3))) +;; ;; (run-id (list-ref toptest 5))) +;; (db:top-test-set-per-pf-counts dbstruct run-id test-name))) +;; toplevels))) ;; BUG: Probably broken - does not explicitly use run-id in the query ;; (define (db:top-test-set-per-pf-counts dbstruct run-id test-name) (db:general-call (db:get-db dbstruct run-id) 'top-test-set-per-pf-counts (list test-name test-name test-name test-name test-name test-name test-name test-name test-name test-name test-name test-name test-name test-name test-name test-name test-name))) @@ -3469,10 +3476,28 @@ (set! res (cons (vector state status count) res))) db "SELECT state,status,count(state) FROM tests WHERE run_id=? AND testname=? AND item_path='' GROUP BY state,status;" run-id testname) res)) + + +(define (db:get-latest-host-load dbstruct raw-hostname) + (let* ((hostname (string-substitute "\\..*$" "" raw-hostname)) + (res (cons -1 0)) + (mydb (db:dbdat-get-db (db:get-db dbstruct 0))) + ) + (db:with-db + dbstruct + 0 + #f + (lambda (db) + (sqlite3:for-each-row + (lambda (cpuload update-time) (set! res (cons cpuload update-time))) + db + "SELECT tr.cpuload, tr.update_time FROM test_rundat tr, tests t WHERE t.host=? AND tr.cpuload != -1 AND tr.test_id=t.id ORDER BY tr.update_time DESC LIMIT 1;" + hostname))) res )) + (define (db:set-top-level-from-items dbstruct run-id testname) (let* ((dbdat (db:get-db dbstruct run-id)) (db (db:dbdat-get-db dbdat)) (summ (db:get-state-status-summary db run-id testname)) Index: http-transport.scm ================================================================== --- http-transport.scm +++ http-transport.scm @@ -84,11 +84,11 @@ ((equal? (uri-path (request-uri (current-request))) '(/ "api")) (send-response body: (api:process-request *dbstruct-db* $) ;; the $ is the request vars proc headers: '((content-type text/plain))) (mutex-lock! *heartbeat-mutex*) - (set! *db-last-access* (current-seconds)) + (set! *db-lastaccess* (current-seconds)) (mutex-unlock! *heartbeat-mutex*)) ((equal? (uri-path (request-uri (current-request))) '(/ "")) (send-response body: (http-transport:main-page))) ((equal? (uri-path (request-uri (current-request))) @@ -217,11 +217,11 @@ (let* ((fullurl (if (vector? serverdat) (http-transport:server-dat-get-api-req serverdat) (begin (debug:print 0 *default-log-port* "FATAL ERROR: http-transport:client-api-send-receive called with no server info") (exit 1)))) - (res #f) + (res (vector #f "uninitialized")) (success #t) (sparams (db:obj->string params transport: 'http))) (debug:print-info 11 *default-log-port* "fullurl=" fullurl ", cmd=" cmd ", params=" params ", run-id=" run-id "\n") ;; set up the http-client here (max-retry-attempts 1) @@ -335,11 +335,11 @@ ;; (define (http-transport:client-connect iface port) (let* ((api-url (conc "http://" iface ":" port "/api")) (api-uri (uri-reference (conc "http://" iface ":" port "/api"))) (api-req (make-request method: 'POST uri: api-uri)) - (server-dat (vector iface port api-uri api-url api-req (current-seconds)))) + (server-dat (vector iface port api-uri api-url api-req (current-seconds) 'http))) server-dat)) ;; run http-transport:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; @@ -398,12 +398,12 @@ (thread-sleep! 0.5) ;; give some margin for queries to complete before switching from file based access to server based access (set! *dbstruct-db* (db:setup)) ;; run-id)) (set! server-going #t) (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running") (server:write-dotserver *toppath* (conc iface ":" port)) - (delete-file* (conc *toppath* "/.starting-server"))) - (begin ;; gotta exit nicely + (server:dotserver-starting-remove)) + (begin ;; gotta exit nicely (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "collision") (http-transport:server-shutdown server-id port)))))) ;; when things go wrong we don't want to be doing the various queries too often ;; so we strive to run this stuff only every four seconds or so. @@ -520,14 +520,11 @@ ;; all routes though here end in exit ... ;; ;; start_server? ;; (define (http-transport:launch run-id) - (with-output-to-file - (conc *toppath* "/.starting-server") - (lambda () - (print (current-process-id) " on " (get-host-name)))) + (server:dotserver-starting) (let* ((tdbdat (tasks:open-db))) (set! *run-id* run-id) (if (args:get-arg "-daemonize") (begin (daemon:ize) @@ -540,23 +537,24 @@ (begin (debug:print 0 *default-log-port* "INFO: Server for run-id " run-id " already running") (exit 0)) (begin ;; ok, no server detected, clean out any lingering records (tasks:server-force-clean-running-records-for-run-id (db:delay-if-busy tdbdat) run-id "notresponding"))) - (let loop ((server-id (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id)) + (let loop ((server-id (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id 'http)) (remtries 4)) (if (not server-id) (if (> remtries 0) (begin (thread-sleep! 2) - (loop (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id) + (loop (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id 'http) (- remtries 1))) (begin ;; since we didn't get the server lock we are going to clean up and bail out (debug:print-info 2 *default-log-port* "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue") (tasks:server-delete-records-for-this-pid (db:delay-if-busy tdbdat) " http-transport:launch") - (delete-file* (conc *toppath* "/.starting-server")) + + (server:dotserver-starting-remove) )) (let* ((th2 (make-thread (lambda () (debug:print-info 0 *default-log-port* "Server run thread started") (http-transport:run (if (args:get-arg "-server") @@ -638,11 +636,11 @@ "Average non-cached time " (if (eq? *number-non-write-queries* 0) "n/a (no queries)" (/ *total-non-write-delay* *number-non-write-queries*)) " ms" - "Last access" (seconds->time-string *db-last-access*) "" + "Last access" (seconds->time-string *last-db-access*) "" ""))) (mutex-unlock! *heartbeat-mutex*) res)) (define (http-transport:runs linkpath) Index: launch.scm ================================================================== --- launch.scm +++ launch.scm @@ -122,10 +122,22 @@ (call-with-environment-variables (list (cons "PATH" (conc (get-environment-variable "PATH") ":."))) (lambda () ;; (process-run "/bin/bash" "-c" "exec ls -l /tmp/foobar > /tmp/delme-more.log 2>&1") (let* ((cmd (conc stepcmd " > " stepname ".log 2>&1")) ;; >outfile 2>&1 (pid (process-run "/bin/bash" (list "-c" cmd)))) + + (with-output-to-file "Makefile.ezsteps" + (lambda () + (print stepname ".log :") + (print "\t" cmd) + (if (file-exists? (conc stepname ".logpro")) + (print "\tlogpro " stepname ".logpro " stepname ".html < " stepname ".log")) + (print) + (print stepname " : " stepname ".log") + (print)) + #:append) + (rmt:test-set-top-process-pid run-id test-id pid) (let processloop ((i 0)) (let-values (((pid-val exit-status exit-code)(process-wait pid #t))) (mutex-lock! m) (launch:einf-pid-set! exit-info pid) ;; (vector-set! exit-info 0 pid) @@ -316,15 +328,15 @@ (kill-tries 0)) ;; (tests:set-full-meta-info #f test-id run-id (calc-minutes) work-area) ;; (tests:set-full-meta-info test-id run-id (calc-minutes) work-area) (tests:set-full-meta-info #f test-id run-id (calc-minutes) work-area 10) (let loop ((minutes (calc-minutes)) - (cpu-load (get-cpu-load)) + (cpu-load (alist-ref 'adj-core-load (common:get-normalized-cpu-load #f))) (disk-free (get-df (current-directory)))) - (let ((new-cpu-load (let* ((load (get-cpu-load)) + (let ((new-cpu-load (let* ((load (alist-ref 'adj-core-load (common:get-normalized-cpu-load #f))) (delta (abs (- load cpu-load)))) - (if (> delta 0.6) ;; don't bother updating with small changes + (if (> delta 0.1) ;; don't bother updating with small changes load #f))) (new-disk-free (let* ((df (get-df (current-directory))) (delta (abs (- df disk-free)))) (if (> delta 200) ;; ignore changes under 200 Meg @@ -816,10 +828,11 @@ (set! *configstatus* 'partial)) (begin (debug:print-error 0 *default-log-port* "No " mtconfig " file found. Giving up.") (exit 2)))))) ;; additional house keeping + (common:set-transport-type) (let* ((linktree (or (getenv "MT_LINKTREE") (if *configdat* (configf:lookup *configdat* "setup" "linktree") #f)))) (if linktree (begin (if (not (file-exists? linktree)) @@ -861,11 +874,11 @@ (if res (cdr res) (begin (if (common:low-noise-print 20 "No valid disks or no disk with enough space") (debug:print-error 0 *default-log-port* "No valid disks found in megatest.config. Please add some to your [disks] section and ensure the directory exists and has enough space!\n You can change minspace in the [setup] section of megatest.config. Current setting is: " minspace)) - (exit 1))))))) + (exit 1))))))) ;; TODO - move the exit to the calling location and return #f ;; Desired directory structure: ;; ;; - - -. ;; | @@ -1053,30 +1066,35 @@ ;; 4. remotely run the test on allocated host ;; - could be ssh to host from hosts table (update regularly with load) ;; - could be netbatch ;; (launch-test db (cadr status) test-conf)) (define (launch-test test-id run-id run-info keyvals runname test-conf test-name test-path itemdat params) + (mutex-lock! *launch-setup-mutex*) ;; setting variables and processing the testconfig is NOT thread-safe, reuse the launch-setup mutex (let* ((item-path (item-list->path itemdat))) (let loop ((delta (- (current-seconds) *last-launch*)) (launch-delay (string->number (or (configf:lookup *configdat* "setup" "launch-delay") "5")))) (if (> launch-delay delta) (begin (debug:print-info 0 *default-log-port* "Delaying launch of " test-name " for " (- launch-delay delta) " seconds") (thread-sleep! (- launch-delay delta)) (loop (- (current-seconds) *last-launch*) launch-delay)))) - (set! *last-launch* (current-seconds)) (change-directory *toppath*) (alist->env-vars ;; consolidate this code with the code in megatest.scm for "-execute", *maybe* - the longer they are set the longer each launch takes (must be non-overlapping with the vars) - (list - (list "MT_RUN_AREA_HOME" *toppath*) - (list "MT_TEST_NAME" test-name) - (list "MT_RUNNAME" runname) - (list "MT_ITEMPATH" item-path) - )) - (let* ((tregistry (tests:get-all)) + (append + (list + (list "MT_RUN_AREA_HOME" *toppath*) + (list "MT_TEST_NAME" test-name) + (list "MT_RUNNAME" runname) + (list "MT_ITEMPATH" item-path) + ) + itemdat)) + (let* ((tregistry (tests:get-all)) ;; third param (below) is system-allowed + ;; for tconfig, why do we allow fallback to test-conf? (tconfig (or (tests:get-testconfig test-name tregistry #t force-create: #t) - test-conf)) ;; force re-read now that all vars are set + (begin + (debug:print 0 *default-log-port* "WARNING: falling back to pre-calculated testconfig. This is likely not desired.") + test-conf))) ;; force re-read now that all vars are set (useshell (let ((ush (config-lookup *configdat* "jobtools" "useshell"))) (if ush (if (equal? ush "no") ;; must use "no" to NOT use shell #f ush) @@ -1112,11 +1130,10 @@ (mt-bindir-path #f) (testinfo (rmt:get-test-info-by-id run-id test-id)) (mt_target (string-intersperse (map cadr keyvals) "/")) (debug-param (append (if (args:get-arg "-debug") (list "-debug" (args:get-arg "-debug")) '()) (if (args:get-arg "-logging")(list "-logging") '())))) - ;; (if hosts (set! hosts (string-split hosts))) ;; set the megatest to be called on the remote host (if (not remote-megatest)(set! remote-megatest local-megatest)) ;; "megatest")) (set! mt-bindir-path (pathname-directory remote-megatest)) (if launcher (set! launcher (string-split launcher))) @@ -1130,10 +1147,11 @@ ;; prevent overlapping actions - set to LAUNCHED as early as possible ;; ;; the following call handles waiver propogation. cannot yet condense into roll-up-pass-fail (tests:test-set-status! run-id test-id "LAUNCHED" "n/a" #f #f) ;; (if launch-results launch-results "FAILED")) (rmt:roll-up-pass-fail-counts run-id test-name item-path #f "LAUNCHED" #f) + ;; (pp (hash-table->alist tconfig)) (set! diskpath (get-best-disk *configdat* tconfig)) (if diskpath (let ((dat (create-work-area run-id run-info keyvals test-id test-path diskpath test-name itemdat))) (set! work-area (car dat)) (set! toptest-work-area (cadr dat)) @@ -1184,10 +1202,11 @@ ;; (set! fullcmd (list remote-megatest test-sig "-execute" cmdparms (if useshell "&" ""))))) (if (args:get-arg "-xterm")(set! fullcmd (append fullcmd (list "-xterm")))) (debug:print 1 *default-log-port* "Launching " work-area) ;; set pre-launch-env-vars before launching, keep the vars in prevvals and put the envionment back when done (debug:print 4 *default-log-port* "fullcmd: " fullcmd) + (set! *last-launch* (current-seconds)) ;; all that junk above takes time, set this as late as possible. (let* ((commonprevvals (alist->env-vars (hash-table-ref/default *configdat* "env-override" '()))) (miscprevvals (alist->env-vars ;; consolidate this code with the code in megatest.scm for "-execute" (append (list (list "MT_TEST_RUN_DIR" work-area) (list "MT_TEST_NAME" test-name) @@ -1211,10 +1230,11 @@ (conc cmdstr " >> mt_launch.log 2>&1"))) (car fullcmd)) (if useshell '() (cdr fullcmd))))) + (mutex-unlock! *launch-setup-mutex*) ;; yes, really should mutex all the way to here. Need to put this entire process into a fork. (if (not launchwait) ;; give the OS a little time to allow the process to start (thread-sleep! 0.01)) (with-output-to-file "mt_launch.log" (lambda () (print "LAUNCHCMD: " (string-intersperse fullcmd " ")) Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -327,10 +327,11 @@ args:arg-hash 0)) ;; Add args that use remargs here ;; + (if (and (not (null? remargs)) (not (or (args:get-arg "-runstep") (args:get-arg "-envcap") (args:get-arg "-envdelta") @@ -699,13 +700,11 @@ ;; Server? Start up here. ;; (let ((tl (launch:setup)) ;; (run-id (and (args:get-arg "-run-id") ;; (string->number (args:get-arg "-run-id")))) - (transport-type (string->symbol (or (args:get-arg "-transport") "http")))) - ;; (if run-id - ;; (begin + (transport-type *transport-type* )) (server:launch 0 transport-type) (set! *didsomething* #t))) ;; ;; (debug:print-error 0 *default-log-port* "server requires run-id be specified with -run-id"))) ;; ;; ;; Not a server? This section will decide how to communicate @@ -1986,11 +1985,10 @@ ;;====================================================================== ;; Exit and clean up ;;====================================================================== -(if *runremote* (close-all-connections!)) ;; for http-client (if (not *didsomething*) (debug:print 0 *default-log-port* help)) (set! *time-to-exit* #t) Index: rmt.scm ================================================================== --- rmt.scm +++ rmt.scm @@ -6,17 +6,18 @@ ;; ;; This program is distributed WITHOUT ANY WARRANTY; without even the ;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR ;; PURPOSE. ;;====================================================================== - +;; (use format typed-records) ;; RADT => purpose of json format?? (declare (unit rmt)) (declare (uses api)) (declare (uses tdb)) (declare (uses http-transport)) +(declare (uses rpc-transport)) ;;(declare (uses nmsg-transport)) (include "common_records.scm") ;; ;; THESE ARE ALL CALLED ON THE CLIENT SIDE!!! @@ -133,11 +134,16 @@ ((and (not (cdr (remote-hh-dat *runremote*))) ;; are we on a homehost? (not (remote-conndat *runremote*))) ;; and no connection (debug:print-info 12 *default-log-port* "rmt:send-receive, case 6 hh-dat: " (remote-hh-dat *runremote*) " conndat: " (remote-conndat *runremote*)) (mutex-unlock! *rmt-mutex*) (tasks:start-and-wait-for-server (tasks:open-db) 0 15) - (remote-conndat-set! *runremote* (rmt:get-connection-info 0)) ;; calls client:setup which calls client:setup-http + (let* ((cinfo (rmt:get-connection-info 0)) + (transport (if cinfo + (vector-ref cinfo 6) + (server:get-transport)))) ;; TODO: replace with tasks:server-dat-accessor-?? for transport + (remote-conndat-set! *runremote* cinfo) ;; calls client:setup which calls client:setup-http + (remote-transport-set! *runremote* transport)) (rmt:send-receive cmd rid params attemptnum: attemptnum)) ;; all set up if get this far, dispatch the query ((cdr (remote-hh-dat *runremote*)) ;; we are on homehost (mutex-unlock! *rmt-mutex*) (debug:print-info 12 *default-log-port* "rmt:send-receive, case 7") @@ -150,20 +156,24 @@ (dat (case (remote-transport *runremote*) ((http) (condition-case ;; handling here has caused a lot of problems. However it is needed to deal with attemtped communication to servers that have gone away (http-transport:client-api-send-receive 0 conninfo cmd params) ((commfail)(vector #f "communications fail")) ((exn)(vector #f "other fail" (print-call-chain))))) + ((rpc) (condition-case ;; handling here has caused a lot of problems. However it is needed to deal with attemtped communication to servers that have gone away + (rpc-transport:client-api-send-receive 0 conninfo cmd params) + ((commfail)(vector #f "communications fail")) + ((exn)(vector #f "other fail" (print-call-chain))))) (else - (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported") + (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (1)") (exit)))) (success (if (vector? dat) (vector-ref dat 0) #f)) (res (if (vector? dat) (vector-ref dat 1) #f))) (if (vector? conninfo)(http-transport:server-dat-update-last-access conninfo)) ;; refresh access time (debug:print-info 12 *default-log-port* "rmt:send-receive, case 9. conninfo=" conninfo " dat=" dat) (if success (case (remote-transport *runremote*) - ((http) res) + ((http rpc) res) (else (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " is unknown") (exit 1))) (begin (debug:print 0 *default-log-port* "WARNING: communication failed. Trying again, try num: " attemptnum) @@ -263,14 +273,22 @@ (mutex-unlock! *db-multi-sync-mutex*))))) res)) (define (rmt:send-receive-no-auto-client-setup connection-info cmd run-id params) (let* ((run-id (if run-id run-id 0)) + (transport (or (remote-transport *runremote*) (server:get-transport))) (res (handle-exceptions exn #f - (http-transport:client-api-send-receive run-id connection-info cmd params)))) + (case transport + ((http) (http-transport:client-api-send-receive run-id connection-info cmd params)) + ((rpc) (rpc-transport:client-api-send-receive run-id connection-info cmd params)) + (else + (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (2)") + (exit)) + + )))) (if (and res (vector-ref res 0)) (vector-ref res 1) ;;; YES!! THIS IS CORRECT!! CHANGE IT HERE, THEN CHANGE rmt:send-receive ALSO!!! #f))) ;; ;; Wrap json library for strings (why the ports crap in the first place?) @@ -310,20 +328,30 @@ ;; This login does no retries under the hood - it acts a bit like a ping. ;; Deprecated for nmsg-transport. ;; (define (rmt:login-no-auto-client-setup connection-info) (case *transport-type* ;; run-id of 0 is just a placeholder - ((http)(rmt:send-receive-no-auto-client-setup connection-info 'login 0 (list *toppath* megatest-version *my-client-signature*))) + ((http rpc)(rmt:send-receive-no-auto-client-setup connection-info 'login 0 (list *toppath* megatest-version *my-client-signature*))) + (else + (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (3)") + (exit)) + + ;;((nmsg)(nmsg-transport:client-api-send-receive run-id connection-info 'login (list *toppath* megatest-version run-id *my-client-signature*))) )) ;; hand off a call to one of the db:queries statements ;; added run-id to make looking up the correct db possible ;; (define (rmt:general-call stmtname run-id . params) (rmt:send-receive 'general-call run-id (append (list stmtname run-id) params))) + +;; given a hostname, return a pair of cpu load and update time representing latest intelligence from tests running on that host +(define (rmt:get-latest-host-load hostname) + (rmt:send-receive 'get-latest-host-load 0 (list hostname))) + ;; (define (rmt:sync-inmem->db run-id) ;; (rmt:send-receive 'sync-inmem->db run-id '())) (define (rmt:sdb-qry qry val run-id) ;; add caching if qry is 'getid or 'getstr Index: rpc-transport.scm ================================================================== --- rpc-transport.scm +++ rpc-transport.scm @@ -1,7 +1,7 @@ -;; Copyright 2006-2012, Matthew Welland. +;; Copyright 2006-2016, Matthew Welland. ;; ;; This program is made available under the GNU GPL version 2.0 or ;; greater. See the accompanying file COPYING for details. ;; ;; This program is distributed WITHOUT ANY WARRANTY; without even the @@ -21,208 +21,621 @@ (declare (uses tests)) (declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. (include "common_records.scm") (include "db_records.scm") + +(define *heartbeat-mutex* (make-mutex)) +(define *server-loop-heart-beat* (current-seconds)) + ;; procstr is the name of the procedure to be called as a string -(define (rpc-transport:autoremote procstr params) - (handle-exceptions - exn - (begin - (debug:print 1 *default-log-port* "Remote failed for " proc " " params) - (apply (eval (string->symbol procstr)) params)) - ;; (if *runremote* - ;; (apply (eval (string->symbol (conc "remote:" procstr))) params) - (apply (eval (string->symbol procstr)) params))) +(define (rpc-transport:autoremote procstr params) ;; may be unused, I think api-exec deprecates this one. + (let* ((procsym (if (symbol? procstr) + procstr + (string->symbol (->string procstr)))) + (res + (begin + (apply (eval procsym) params)))) + res)) + + +;; rpc receiver +(define (rpc-transport:api-exec cmd params) + (let* ( (resdat (api:execute-requests *dbstruct-db* (vector cmd params))) ;; #( flag result ) + (flag (vector-ref resdat 0)) + (res (vector-ref resdat 1))) + + (mutex-lock! *heartbeat-mutex*) + + (set! *last-db-access* (current-seconds)) ;; bump *last-db-access*; this will renew keep-running thread's lease on life for another (server:get-timeout) seconds + ;;(BB> "in api-exec; last-db-access updated to "*last-db-access*) + (mutex-unlock! *heartbeat-mutex*) + + res)) + + +;; retry an operation (depends on srfi-18) +;; ================== +;; idea here is to avoid spending time on coding retrying something. Trying to be generic here. +;; +;; Exception handling: +;; ------------------- +;; if evaluating the thunk results in exception, it will be retried. +;; on last try, if final-failure-returns-actual is true, the exception will be re-thrown to caller. +;; +;; look at options below #!key to see how to configure behavior +;; +;; +(define (retry-thunk + the-thunk + #!key ;;;; options below + (accept-result? (lambda (x) x)) ;; retry if predicate applied to thunk's result is false + (retries 4) ;; how many tries + (failure-value #f) ;; return this on final failure, unless following option is enabled: + (final-failure-returns-actual #f) ;; on failure, on the last try, just return the result, not failure-value + + (retry-delay 0.1) ;; delay between tries + (back-off-factor 1) ;; multiply retry-delay by this factor on retry + (random-delay 0.1) ;; add a random portion of this value to wait + + (chatty #f) ;; print status as we go, for debugging. + ) + + (when chatty (print) (print "Entered retry-thunk") (print "-=-=-=-=-=-")) + (let* ((guarded-thunk ;; we are guarding the thunk against exceptions. We will record whether result of evaluation is an exception or a regular result. + (lambda () + (let* ((EXCEPTION (gensym)) ;; using gensym to avoid potential collision + (res + (condition-case + (the-thunk) ;; this is what we are guarding the execution of + [x () (cons EXCEPTION x)] + ))) + (cond + ((and (pair? res) (eq? (car res) EXCEPTION)) + (if chatty + (print " - the-thunk threw exception >"(cdr res)"<")) + (cons 'exception (cdr res))) + (else + (if chatty + (print " - the-thunk returned result >"res"<")) + (cons 'regular-result res))))))) + + (let loop ((guarded-res (guarded-thunk)) + (retries-left retries) + (fail-wait retry-delay)) + (if chatty (print " ==========")) + (let* ((wait-time (+ fail-wait (+ (* fail-wait back-off-factor) + (* random-delay + (/ (random 1024) 1024) )))) + (res-type (car guarded-res)) + (res-value (cdr guarded-res))) + (cond + ((and (eq? res-type 'regular-result) (accept-result? res-value)) + (if chatty (print " + return result that satisfied accept-result? >"res-value"<")) + res-value) + + ((> retries-left 0) + (if chatty (print " - sleep "wait-time)) + (thread-sleep! wait-time) + (if chatty (print " + retry ["retries-left" tries left]")) + (loop (guarded-thunk) + (sub1 retries-left) + wait-time)) + + ((eq? res-type 'regular-result) + (if final-failure-returns-actual + (begin + (if chatty (print " + last try failed- return the result >"res-value"<")) + res-value) + (begin + (if chatty (print " + last try failed- return canned failure value >"failure-value"<")) + failure-value))) + + (else ;; no retries left; result was not accepted and res-type can only be 'exception + (if final-failure-returns-actual + (begin + (if chatty (print " + last try failed with exception- re-throw it >"res-value"<")) + (abort res-value)); re-raise the exception. TODO: find a way for call-history to show as though from entry to this function + (begin + (if chatty (print " + last try failed with exception- return canned failure value >"failure-value"<")) + failure-value)))))))) + + +(define (rpc-transport:server-shutdown server-id rpc:listener ) ;;#!key (from-on-exit #f)) + ;;(on-exit (lambda () #t)) ;; turn off on-exit stuff + ;;(tcp-close rpc:listener) ;; gotta exit nicely + ;;(tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "stopped") + + + ;; TODO: (low) the following is extraordinaritly slow. Maybe we don't even need portlogger for rpc anyway?? the exception-based failover when ports are taken is fast! + ;;(portlogger:open-run-close portlogger:set-port (rpc:default-server-port) "released") + + (set! *time-to-exit* #t) + ;;(if *dbstruct-db* (db:sync-touched *dbstruct-db* *run-id* force-sync: #t)) + + (server:remove-dotserver-file *toppath* "anyhost:anyport" force: #t) + (tasks:server-delete-record (db:delay-if-busy (tasks:open-db)) server-id " rpc-transport:keep-running complete") + + (rpc:close-all-connections!) + ;;(BB> "Before (exit) (from-on-exit="from-on-exit")") + ;;(unless from-on-exit (exit)) ;; sometimes we hang (around) here with 100% cpu. + ;;(BB> "After") + ;; strace reveals endless: + ;; getrusage(RUSAGE_SELF, {ru_utime={413, 917868}, ru_stime={0, 60003}, ...}) = 0 + ;; getrusage(RUSAGE_SELF, {ru_utime={414, 9874}, ru_stime={0, 60003}, ...}) = 0 + ;; getrusage(RUSAGE_SELF, {ru_utime={414, 13874}, ru_stime={0, 60003}, ...}) = 0 + ;; getrusage(RUSAGE_SELF, {ru_utime={414, 105880}, ru_stime={0, 60003}, ...}) = 0 + ;; getrusage(RUSAGE_SELF, {ru_utime={414, 109880}, ru_stime={0, 60003}, ...}) = 0 + ;; getrusage(RUSAGE_SELF, {ru_utime={414, 201886}, ru_stime={0, 60003}, ...}) = 0 + ;; getrusage(RUSAGE_SELF, {ru_utime={414, 205886}, ru_stime={0, 60003}, ...}) = 0 + ;; getrusage(RUSAGE_SELF, {ru_utime={414, 297892}, ru_stime={0, 60003}, ...}) = 0 + ;; getrusage(RUSAGE_SELF, {ru_utime={414, 301892}, ru_stime={0, 60003}, ...}) = 0 + ;; getrusage(RUSAGE_SELF, {ru_utime={414, 393898}, ru_stime={0, 60003}, ...}) = 0 + ;; getrusage(RUSAGE_SELF, {ru_utime={414, 397898}, ru_stime={0, 60003}, ...}) = 0 + ;; make a post to chicken-users w/ http://paste.call-cc.org/paste?id=60a4b66a29ccf7d11359ea866db642c970735978 + + + ;; (if from-on-exit + ;; ;; avoid above condition! End current process externally since 1 in 20 (exit)'s result in hung, 100% cpu zombies. (see above) + + (system (conc "kill -9 "(current-process-id))) + ) + ;; all routes though here end in exit ... ;; ;; start_server? ;; (define (rpc-transport:launch run-id) - (let* ((tdbdat (tasks:open-db))) - (BB> "rpc-transport:launch fired for run-id="run-id) - (set! *run-id* run-id) - (if (args:get-arg "-daemonize") - (daemon:ize)) - (if (server:check-if-running run-id) - (begin - (debug:print 0 *default-log-port* "INFO: Server for run-id " run-id " already running") - (exit 0))) - (let loop ((server-id (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id)) - (remtries 4)) - (if (not server-id) - (if (> remtries 0) - (begin - (thread-sleep! 2) - (loop (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id) - (- remtries 1))) - (begin - ;; since we didn't get the server lock we are going to clean up and bail out - (debug:print-info 2 *default-log-port* "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue") - (tasks:server-delete-records-for-this-pid (db:delay-if-busy tdbdat) " rpc-transport:launch"))) - (begin - (rpc-transport:run (if (args:get-arg "-server")(args:get-arg "-server") "-") run-id server-id) - (exit)))))) + (set! *run-id* run-id) + + ;; ;; send to background if requested + ;; (when (args:get-arg "-daemonize") + ;; (daemon:ize) + ;; (when *alt-log-file* ;; we should re-connect to this port, I think daemon:ize disrupts it + ;; (current-error-port *alt-log-file*) + ;; (current-output-port *alt-log-file*))) + + ;; double check we dont alrady have a running server for this run-id + (when (and (server:read-dotserver *toppath*) + (server:check-if-running run-id)) + (debug:print 0 *default-log-port* "INFO: Server for run-id " run-id " already running") + (exit 0)) + + ;; did not find server running, let's clean up the table of dead servers + (tasks:server-force-clean-running-records-for-run-id (db:delay-if-busy (tasks:open-db)) run-id "notresponding") + + (server:dotserver-starting) + + + + + + ;; let's get a server-id for this server + ;; if at first we do not suceed, try 3 more times. + (let ((server-id (retry-thunk + (lambda () (tasks:server-lock-slot (db:delay-if-busy (tasks:open-db)) run-id 'rpc)) + chatty: #f + final-failure-returns-actual: #t + retries: 4))) + (when (not server-id) ;; dang we couldn't get a server-id. + ;; since we didn't get the server lock we are going to clean up and bail out + + + (debug:print-info 2 *default-log-port* "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue") + (tasks:server-delete-records-for-this-pid (db:delay-if-busy (tasks:open-db)) " rpc-transport:launch") + (server:dotserver-starting-remove) + (exit 1)) + + ;; we got a server-id (and a corresponding entry in servers table in globally shared mdb) + ;; all systems go. Proceed to setup rpc server. + (rpc-transport:run + (if (args:get-arg "-server") + (args:get-arg "-server") + "-") + run-id + server-id) + (exit))) + +(define *rpc-listener-port* #f) +(define *rpc-listener-port-bind-timestamp* #f) + +(define *on-exit-flag #f) + +(define (rpc-transport:server-dat-get-iface vec) (vector-ref vec 0)) +(define (rpc-transport:server-dat-get-port vec) (vector-ref vec 1)) +(define (rpc-transport:server-dat-get-last-access vec) (vector-ref vec 5)) +(define (rpc-transport:server-dat-get-transport vec) (vector-ref vec 6)) +(define (rpc-transport:server-dat-update-last-access vec) + (if (vector? vec) + (vector-set! vec 5 (current-seconds)) + (begin + (print-call-chain (current-error-port)) + (debug:print-error 0 *default-log-port* "call to rpc-transport:server-dat-update-last-access with non-vector!!")))) + + +(define *api-exec-ht* (make-hash-table)) +(define *api-exec-mutex* (make-mutex)) +;; let's see if caching the rpc stub curbs thread-profusion on server side +(define (rpc-transport:get-api-exec iface port) + (mutex-lock! *api-exec-mutex*) + (let* ((lu (hash-table-ref/default *api-exec-ht* (cons iface port) #f))) + (if lu + (begin + (mutex-unlock! *api-exec-mutex*) + lu) + (let ((res (rpc:procedure 'api-exec iface port))) + (hash-table-set! *api-exec-ht* (cons iface port) res) + (mutex-unlock! *api-exec-mutex*) + res)))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; this client-side procedure makes rpc call to server and returns result +;; +(define (rpc-transport:client-api-send-receive run-id serverdat cmd params #!key (numretries 3)) + ;;(BB> "entered rpc-transport:client-api-send-receive with run-id="run-id " serverdat="serverdat" cmd="cmd" params="params" numretries="numretries) + (if (not (vector? serverdat)) + (begin + (BB> "WHAT?? for run-id="run-id", serverdat="serverdat) + (print-call-chain) + (rpc:close-all-connections!) + (exit 1))) + (let* ((iface (rpc-transport:server-dat-get-iface serverdat)) + (port (rpc-transport:server-dat-get-port serverdat)) + (res #f) + (api-exec (rpc-transport:get-api-exec iface port)) ;; chached by host/port. may need to clear... + (send-receive (lambda () + (tcp-buffer-size 0) + (set! res (retry-thunk + (lambda () + (condition-case + ;;(vector #t (run-remote cmd params)) + (vector 'success (api-exec cmd params)) + [x (exn i/o net) (vector 'comms-fail (conc "communications fail ["(->string x)"]") x)] + [x () (vector 'other-fail "other fail ["(->string x)"]" x)])) + chatty: #f + accept-result?: (lambda(x) + (and (vector? x) (vector-ref x 0))) + retries: 8 + back-off-factor: 1.5 + random-wait: 0.2 + retry-delay: 0.1 + final-failure-returns-actual: #t)) + ;;(BB> "HEY res="res) + res + )) + (th1 (make-thread send-receive "send-receive")) + (time-out-reached #f) + (time-out (lambda () + (thread-sleep! 45) + (set! time-out-reached #t) + (thread-terminate! th1) + + #f)) + + (th2 (make-thread time-out "time out"))) + (thread-start! th1) + (thread-start! th2) + (thread-join! th1) + (thread-terminate! th2) + ;;(BB> "alt got res="res) + (debug:print-info 11 *default-log-port* "got res=" res) + (if (vector? res) + (case (vector-ref res 0) + ((success) (vector #t (vector-ref res 1))) + ( + (comms-fail other-fail) + ;;(comms-fail) + (debug:print 0 *default-log-port* "WARNING: comms failure for rpc request >>"res"<<") + ;;(debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn)) + (vector #f (vector-ref res 1))) + (else + (debug:print-error 0 *default-log-port* "error occured at server, info=" (vector-ref res 1)) + (debug:print 0 *default-log-port* " client call chain:") + (print-call-chain (current-error-port)) + (debug:print 0 *default-log-port* " server call chain:") + (pp (vector-ref res 1) (current-error-port)) + (signal (vector-ref res 2)))) + (signal (make-composite-condition + (make-property-condition + 'timeout + 'message "nmsg-transport:client-api-send-receive-raw timed out talking to server")))))) + + (define (rpc-transport:run hostn run-id server-id) (debug:print 2 *default-log-port* "Attempting to start the rpc server ...") ;; (trace rpc:publish-procedure!) - (rpc:publish-procedure! 'server:login server:login) - (rpc:publish-procedure! 'testing (lambda () "Just testing")) + ;;====================================================================== + ;; start of publish-procedure section + ;;====================================================================== + (rpc:publish-procedure! 'server:login server:login) ;; this allows client to validate it is the same megatest instance as the server. No security here, just making sure we're in the right room. + (rpc:publish-procedure! + 'testing + (lambda () + "Just testing")) + + ;; procedure to receive arbitrary API request from client's rpc:send-receive/rpc-transport:client-api-send-receive + (rpc:publish-procedure! 'rpc-transport:autoremote rpc-transport:autoremote) + ;; can use this to run most anything at the remote + (rpc:publish-procedure! 'api-exec rpc-transport:api-exec) + + + ;;====================================================================== + ;; end of publish-procedure section + ;;====================================================================== + (let* ((db #f) - (hostname (get-host-name)) - (ipaddrstr (let ((ipstr (if (string=? "-" hostn) + (hostname (let ((res (get-host-name))) res)) + (server-start-time (current-seconds)) + (server-timeout (server:get-timeout)) + (ipaddrstr (let* ((ipstr (if (string=? "-" hostn) ;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") (server:get-best-guess-address hostname) - #f))) - (if ipstr ipstr hostn))) ;; hostname))) - (start-port (open-run-close tasks:server-get-next-port tasks:open-db)) + #f)) + (res (if ipstr ipstr hostn))) + res)) ;; hostname))) + (start-port (let ((res (portlogger:open-run-close portlogger:find-port))) ;; BB> TODO: remove portlogger! + res)) (link-tree-path (configf:lookup *configdat* "setup" "linktree")) - (rpc:listener (rpc-transport:find-free-port-and-open (rpc:default-server-port))) + + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + ;; rpc:listener is the tcp-listen result from inside the find-free-port-and-open complex. + ;; It is our handle on the listening tcp port + ;; We will attach this to our rpc server with rpc:make-server in thread th1 . + (rpc:listener (rpc-transport:find-free-port-and-open start-port)) (th1 (make-thread (lambda () - ((rpc:make-server rpc:listener) #t)) + ;;(BB> "BEFORE rpc:make-server") + ((rpc:make-server rpc:listener) #t) + ;;(BB> "BEFORE rpc:make-server") + ) "rpc:server")) - ;; (cute (rpc:make-server rpc:listener) "rpc:server") - ;; 'rpc:server)) - (hostname (if (string=? "-" hostn) + + + (hostname (if (string=? "-" hostn) (get-host-name) hostn)) (ipaddrstr (if (string=? "-" hostn) (server:get-best-guess-address hostname) ;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") - #f)) - (portnum (rpc:default-server-port)) - (host:port (conc (if ipaddrstr ipaddrstr hostname) ":" portnum)) - (tdb (tasks:open-db))) + (string-intersperse + (map number->string + (u8vector->list + (hostname->ip hostn))) ".") + )) + (portnum (let ((res (rpc:default-server-port))) res)) + (host:port (conc (if ipaddrstr ipaddrstr hostname) ":" portnum))) + + (when (not (equal? ipaddrstr (server:get-best-guess-address (get-host-name)))) + + (debug:print 0 *default-log-port* "Error: This host "(ip->string (hostname->ip (get-host-name)))" ("(get-host-name)") is not the homehost "ipaddrstr" ("(ip->hostname (string->ip ipaddrstr))"; Cannot proceed.") + (server:dotserver-starting-remove) + (tcp-close rpc:listener) ;; gotta exit nicely and free up that tcp port + (exit)) + + (tasks:server-set-interface-port (db:delay-if-busy (tasks:open-db)) server-id ipaddrstr portnum) + + ;;============================================================ + ;; activate thread th1 to attach opened tcp port to rpc server + ;;============================================================= (thread-start! th1) (set! db *dbstruct-db*) - (open-run-close tasks:server-set-interface-port - tasks:open-db - server-id - ipaddrstr portnum) + (debug:print 0 *default-log-port* "Server started on " host:port) - - ;; (trace rpc:publish-procedure!) - ;; (rpc:publish-procedure! 'server:login server:login) - ;; (rpc:publish-procedure! 'testing (lambda () "Just testing")) - - ;;====================================================================== - ;; ;; end of publish-procedure section - ;;====================================================================== - ;; - (on-exit (lambda () - (open-run-close tasks:server-set-state! tasks:open-db server-id "stopped"))) - - (set! *rpc:listener* rpc:listener) - (tasks:server-set-state! tdb server-id "running") - (set! *dbstruct-db* (db:setup run-id)) - ;; if none running or if > 20 seconds since - ;; server last used then start shutdown - (let loop ((count 0)) - (thread-sleep! 5) ;; no need to do this very often - (let ((numrunning -1)) ;; (db:get-count-tests-running db))) - (if (or (> numrunning 0) - (> (+ *db-last-access* 60)(current-seconds))) - (begin - (debug:print-info 0 *default-log-port* "Server continuing, tests running: " numrunning ", seconds since last db access: " (- (current-seconds) *db-last-access*)) - (loop (+ 1 count))) - (begin - (debug:print-info 0 *default-log-port* "Starting to shutdown the server side") - (open-run-close tasks:server-delete-record tasks:open-db server-id " rpc-transport:try-start-server stop") - (thread-sleep! 10) - (debug:print-info 0 *default-log-port* "Max cached queries was " *max-cache-size*) - (debug:print-info 0 *default-log-port* "Server shutdown complete. Exiting") - )))))) - -(define (rpc-transport:find-free-port-and-open port) + ;;(BB> "before SELF-TEST") + (if (retry-thunk (lambda () + (rpc-transport:self-test run-id ipaddrstr portnum)) + final-failure-returns-actual: #t ;; TODO: remove this line + ) + (debug:print 0 *default-log-port* "INFO: rpc self test passed!") + (begin + (debug:print 0 *default-log-port* "Error: rpc listener did not pass self test. Shutting down. On: " host:port) + (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "dead") + (tcp-close rpc:listener) ;; gotta exit nicely and free up that tcp port + (rpc-transport:server-shutdown server-id rpc:listener) + (server:dotserver-starting-remove) + (exit))) + + + + + ;;(on-exit (lambda () + ;; (rpc-transport:server-shutdown server-id rpc:listener from-on-exit: #t))) + + ;; check again for running servers for this run-id in case one has snuck in since we checked last in rpc-transport:launch + (if (not (equal? server-id (tasks:server-am-i-the-server? (db:delay-if-busy (tasks:open-db)) run-id)));; try to ensure no double registering of servers + (begin ;; i am not the server, another server snuck in and beat this one to the punch + (tcp-close rpc:listener) ;; gotta exit nicely and free up that tcp port + (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "collision") + (server:dotserver-starting-remove)) + + (begin ;; i am the server + ;; setup the in-memory db + (set! *dbstruct-db* (db:setup run-id)) + (db:get-db *dbstruct-db* run-id) + + ;; at this point, satisfied server has started + ;; let's make it official + (server:write-dotserver *toppath* (conc ipaddrstr ":" portnum)) + (mutex-lock! *heartbeat-mutex*) + (set! *last-db-access* (current-seconds)) + (mutex-unlock! *heartbeat-mutex*) + (set! *rpc:listener* rpc:listener) + (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "running") ;; update our mdb servers entry + + + + ;; this let loop will hold open this thread until we want the server to shut down. + ;; if no requests received within the last 20 seconds : + ;; database hasnt changed in ?? + ;; + + + + ;; keep-running loop: polls last-db-access to see if we have timed out. keep running if not. + (let loop ((count 0) + (bad-sync-count 0)) + (BB> "keep running: count = "count) + ;; Use this opportunity to sync the inmemdb to db + + (let ((start-time (current-milliseconds)) + (sync-time #f) + (rem-time #f)) + + ;; following is now done in common:watchdog, commenting out. sync-time will now be 0; can live with that. + ;; ;; inmemddb is a dbstruct + ;; (condition-case + ;; (db:sync-touched *dbstruct-db* *run-id* force-sync: #t) + ;; ((sync-failed)(cond + ;; ((> bad-sync-count 10) ;; time to give up + ;; (rpc-transport:server-shutdown server-id rpc:listener)) + ;; (else ;; (> bad-sync-count 0) ;; we've had a fail or two, delay and loop + ;; (thread-sleep! 5) + ;; (loop count (+ bad-sync-count 1))))) + ;; ((exn) + ;; (debug:print-error 0 *default-log-port* "error from sync code other than 'sync-failed. Attempting to gracefully shutdown the server ") + ;; (rpc-transport:server-shutdown server-id rpc:listener))) + + (set! sync-time (- (current-milliseconds) start-time)) + (set! rem-time (quotient (- 4000 sync-time) 1000)) + (debug:print 4 *default-log-port* "SYNC: time= " sync-time ", rem-time=" rem-time) + + (if (and (<= rem-time 4) + (> rem-time 0)) + (thread-sleep! rem-time) + (thread-sleep! 4))) ;; fallback for if the math is changed ... + + (if (< count 1) ;; 3x3 = 9 secs aprox + (loop (+ count 1) bad-sync-count)) + + ;; BB: don't see how this is possible with RPC + ;; ;; Check that iface and port have not changed (can happen if server port collides) + ;; (mutex-lock! *heartbeat-mutex*) + ;; (set! sdat *server-info*) + ;; (mutex-unlock! *heartbeat-mutex*) + + ;; (if (or (not (equal? sdat (list iface port))) + ;; (not server-id)) + ;; (begin + ;; (debug:print-info 0 *default-log-port* "interface changed, refreshing iface and port info") + ;; (set! iface (car sdat)) + ;; (set! port (cadr sdat)))) + + ;; Transfer *last-db-access* to last-access to use in checking that we are still alive + (mutex-lock! *heartbeat-mutex*) + (set! last-access *last-db-access*) + (mutex-unlock! *heartbeat-mutex*) + + ;; (debug:print 11 *default-log-port* "last-access=" last-access ", server-timeout=" server-timeout) + ;; + ;; no_traffic, no running tests, if server 0, no running servers + ;; + ;; (let ((wait-on-running (configf:lookup *configdat* "server" b"wait-on-running"))) ;; wait on running tasks (if not true then exit on time out) + ;; + (let* ((hrs-since-start (/ (- (current-seconds) server-start-time) 3600)) + (adjusted-timeout (if (> hrs-since-start 1) + (- server-timeout (inexact->exact (round (* hrs-since-start 60)))) ;; subtract 60 seconds per hour + server-timeout))) + (if (common:low-noise-print 120 "server timeout") + (debug:print-info 0 *default-log-port* "Adjusted server timeout: " adjusted-timeout)) + (if (and *server-run* + (> (+ last-access server-timeout) + (current-seconds))) + (begin + (if (common:low-noise-print 120 "server continuing") + (debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access))) + ;; + ;; Consider implementing some smarts here to re-insert the record or kill self is + ;; the db indicates so + ;; + (if (tasks:server-am-i-the-server? (db:delay-if-busy (tasks:open-db)) run-id) + (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "running")) + ;; + (loop 0 bad-sync-count)) + (begin + ;;(BB> "SERVER SHUTDOWN CALLED! last-access="last-access" current-seconds="(current-seconds)" server-timeout="server-timeout) + + (rpc-transport:server-shutdown server-id rpc:listener))))) + ;; end new loop + )))) + + +(define (rpc-transport:find-free-port-and-open port #!key ) (handle-exceptions exn - (begin + (begin (print "Failed to bind to port " (rpc:default-server-port) ", trying next port") - (rpc-transport:find-free-port-and-open (+ port 1))) + (rpc-transport:find-free-port-and-open (add1 port))) (rpc:default-server-port port) + (set! *rpc-listener-port* port) ;; a bit paranoid about rpc:default-server-port parameter not changing across threads (as params are wont to do). keeping this global in my back pocket in case this causes problems + (set! *rpc-listener-port-bind-timestamp* (current-milliseconds)) ;; may want to test how long it has been since the last bind attempt happened... (tcp-read-timeout 240000) - (tcp-listen (rpc:default-server-port) 10000))) - + (tcp-buffer-size 0) ;; gotta do this because http-transport undoes it. + (tcp-listen (rpc:default-server-port) 10000) + )) + (define (rpc-transport:ping run-id host port) (handle-exceptions exn (begin - (print "SERVER_NOT_FOUND") + (print "SERVER_NOT_FOUND exn="exn) (exit 1)) (let ((login-res ((rpc:procedure 'server:login host port) *toppath*))) - (if (and (list? login-res) - (car login-res)) + (if login-res (begin (print "LOGIN_OK") (exit 0)) (begin (print "LOGIN_FAILED") (exit 1)))))) -(define (rpc-transport:client-setup run-id #!key (remtries 10)) - (if *runremote* - (begin - (debug:print-error 0 *default-log-port* "Attempt to connect to server but already connected") - #f) - (let* ((host-info (hash-table-ref/default *runremote* run-id #f))) ;; (open-run-close db:get-var #f "SERVER")) - (if host-info - (let ((iface (car host-info)) - (port (cadr host-info)) - (ping-res ((rpc:procedure 'server:login host port) *toppath*))) - (if ping-res - (let ((server-dat (list iface port #f #f #f))) - (hash-table-set! *runremote* run-id server-dat) - server-dat) - (begin - (server:try-running run-id) - (thread-sleep! 2) - (rpc-transport:client-setup run-id (- remtries 1))))) - (let* ((server-db-info (open-run-close tasks:get-server tasks:open-db run-id))) - (debug:print-info 0 *default-log-port* "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries) - (if server-db-info - (let* ((iface (tasks:hostinfo-get-interface server-db-info)) - (port (tasks:hostinfo-get-port server-db-info)) - (server-dat (list iface port #f #f #f)) - (ping-res ((rpc:procedure 'server:login host port) *toppath*))) - (if start-res - (begin - (hash-table-set! *runremote* run-id server-dat) - server-dat) - (begin - (server:try-running run-id) - (thread-sleep! 2) - (rpc-transport:client-setup run-id (- remtries 1))))) - (begin - (server:try-running run-id) - (thread-sleep! 2) - (rpc-transport:client-setup run-id (- remtries 1))))))))) -;; -;; (port (if (and hostinfo (> (length hostdat) 1))(cadr hostdat) #f))) -;; (if (and port -;; (string->number port)) -;; (let ((portn (string->number port))) -;; (debug:print-info 2 *default-log-port* "Setting up to connect to host " host ":" port) -;; (handle-exceptions -;; exn -;; (begin -;; (debug:print-error 0 *default-log-port* "Failed to open a connection to the server at host: " host " port: " port) -;; (debug:print 0 *default-log-port* " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) -;; ;; (open-run-close -;; ;; (lambda (db . param) -;; ;; (sqlite3:execute db "DELETE FROM metadat WHERE var='SERVER'")) -;; ;; #f) -;; (set! *runremote* #f)) -;; (if (and (not (args:get-arg "-server")) ;; no point in the server using the server using the server -;; ((rpc:procedure 'server:login host portn) *toppath*)) -;; (begin -;; (debug:print-info 2 *default-log-port* "Logged in and connected to " host ":" port) -;; (set! *runremote* (vector host portn))) -;; (begin -;; (debug:print-info 2 *default-log-port* "Failed to login or connect to " host ":" port) -;; (set! *runremote* #f))))) -;; (debug:print-info 2 *default-log-port* "no server available"))))) - +(define (rpc-transport:self-test run-id host port) + (if (not host) + (abort "host not set.")) + (if (not port) + (abort "port not set.")) + (tcp-buffer-size 0) ;; gotta do this because http-transport undoes it. + (let* ((testing-res ((rpc:procedure 'testing host port))) + (login-res ((rpc:procedure 'server:login host port) *toppath*)) + (res (and login-res (equal? testing-res "Just testing")))) + + (if login-res + (begin + ;;(BB> "Self test PASS. login-res="login-res" testing-res="testing-res" *toppath*="*toppath*) + #t) + (begin + (BB> "Self test fail. login-res="login-res" testing-res="testing-res" *toppath*="*toppath*) + + #f)) + res)) + +(define (rpc-transport:client-setup run-id server-dat #!key (remaining-tries 10)) + ;;(BB> "entered rpc-transport:client-setup with run-id="run-id" and server-dat="server-dat" and retries="remaining-tries) + (tcp-buffer-size 0) + (debug:print-info 0 *default-log-port* "rpc-transport:client-setup run-id="run-id" server-dat=" server-dat ", remaining-tries=" remaining-tries) + (let* ((iface (tasks:hostinfo-get-interface server-dat)) + (hostname (tasks:hostinfo-get-hostname server-dat)) + (port (tasks:hostinfo-get-port server-dat)) + (runremote-server-dat (vector iface port #f #f #f (current-seconds) 'rpc)) ;; http version := (vector iface port api-uri api-url api-req (current-seconds) 'http ) + (ping-res (retry-thunk (lambda () ;; make 3 attempts to ping. + ((rpc:procedure 'server:login iface port) *toppath*)) + chatty: #f + retries: 3))) + ;; we got here from rmt:get-connection-info on the condition that *runremote* has no entry for run-id... + (if ping-res + (begin + (debug:print-info 0 *default-log-port* "rpc-transport:client-setup CONNECTION ESTABLISHED run-id="run-id" server-dat=" server-dat) + runremote-server-dat) + (begin ;; login failed but have a server record, clean out the record and try again + (debug:print-info 0 *default-log-port* "rpc-transport:client-setup UNABLE TO CONNECT run-id="run-id" server-dat=" server-dat) + (tasks:kill-server-run-id run-id) + (tasks:server-force-clean-run-record (db:delay-if-busy (tasks:open-db)) run-id iface port + " rpc-transport:client-setup (server-dat = #t)") + (if (> remaining-tries 2) + (thread-sleep! (+ 1 (random 5))) ;; spread out the starts a little + (thread-sleep! (+ 15 (random 20)))) ;; it isn't going well. give it plenty of time + (server:try-running run-id) + (thread-sleep! 5) ;; give server a little time to start up + (client:setup run-id remaining-tries: (sub1 remaining-tries)))))) Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -46,20 +46,24 @@ ;; all routes though here end in exit ... ;; ;; start_server ;; -(define (server:launch run-id transport-type) - (BB> "server:launch fired for run-id="run-id" transport-type="transport-type) - (case transport-type - ((http)(http-transport:launch run-id)) - ;;((nmsg)(nmsg-transport:launch run-id)) - ((rpc) (rpc-transport:launch run-id)) - (else (debug:print-error 0 *default-log-port* "unknown server type " transport-type)))) -;; (else (debug:print-error 0 *default-log-port* "No known transport set, transport=" transport ", using rpc") -;; (rpc-transport:launch run-id))))) - +(define (server:launch run-id transport-type-raw) + (let ((transport-type + (cond + ((string? transport-type-raw) (string->symbol transport-type-raw)) + (else transport-type-raw)))) + + ;;(BB> "server:launch fired for run-id="run-id" transport-type="transport-type) + + (case transport-type + ((http)(http-transport:launch run-id)) + ;;((nmsg)(nmsg-transport:launch run-id)) + ((rpc) (rpc-transport:launch run-id)) + (else (debug:print-error 0 *default-log-port* "unknown server type " transport-type))))) + ;;====================================================================== ;; S E R V E R U T I L I T I E S ;;====================================================================== ;; Get the transport @@ -67,11 +71,11 @@ (if *transport-type* *transport-type* (let ((ttype (string->symbol (or (args:get-arg "-transport") (configf:lookup *configdat* "server" "transport") - "rpc")))) + *DEFAULT-TRANSPORT*)))) (set! *transport-type* ttype) ttype))) ;; Generate a unique signature for this server (define (server:mk-signature) @@ -112,13 +116,15 @@ (homehost (common:get-homehost)) ;; configf:lookup *configdat* "server" "homehost" )) (target-host (car homehost)) (testsuite (common:get-testsuite-name)) (logfile (conc *toppath* "/logs/server.log")) (cmdln (conc (common:get-megatest-exe) - " -server " (or target-host "-") " -run-id " 0 (if (equal? (configf:lookup *configdat* "server" "daemonize") "yes") - (conc " -daemonize -log " logfile) - "") + " -server " (or target-host "-") " -run-id " 0 + (if (equal? (configf:lookup *configdat* "server" "daemonize") "yes") + (conc " -daemonize -log " logfile) + "") + " -transport " (server:get-transport) " -m testsuite:" testsuite)) ;; (conc " >> " logfile " 2>&1 &"))))) (log-rotate (make-thread common:rotate-logs "server run, rotate logs thread"))) ;; we want the remote server to start in *toppath* so push there (push-directory *toppath*) (debug:print 0 *default-log-port* "INFO: Trying to start server (" cmdln ") ...") @@ -187,10 +193,22 @@ dotfile (lambda () (read-line))) #f)))) + +(define (server:dotserver-starting) + (with-output-to-file + (conc *toppath* "/.starting-server") + (lambda () + (print (current-process-id) " on " (get-host-name))))) + +(define (server:dotserver-starting-remove) + (delete-file* (conc *toppath* "/.starting-server"))) + + + ;; write a .server file in *toppath* with hostport ;; return #t on success, #f otherwise ;; (define (server:write-dotserver areapath hostport) (let ((lock-file (conc areapath "/.server.lock")) @@ -206,15 +224,15 @@ (debug:print-info 0 *default-log-port* "server file " server-file " for " hostport " created") (common:simple-file-release-lock lock-file) res) #f))) -(define (server:remove-dotserver-file areapath hostport) +(define (server:remove-dotserver-file areapath hostport #!key (force #f)) (let ((dotserver (server:read-dotserver areapath)) (server-file (conc areapath "/.server")) (lock-file (conc areapath "/.server.lock"))) - (if (and dotserver (string-match (conc ".*:" hostport "$") dotserver)) ;; port matches, good enough info to decide to remove the file + (if (or force (and dotserver (string-match (conc ".*:" hostport "$") dotserver))) ;; port matches, good enough info to decide to remove the file (if (common:simple-file-lock lock-file) (begin (handle-exceptions exn #f @@ -226,11 +244,11 @@ ;; (define (server:check-if-running areapath) (let* ((dotserver (server:read-dotserver areapath))) ;; tdbdat (tasks:open-db))) (if dotserver (let* ((res (case *transport-type* - ((http)(server:ping-server dotserver)) + ((http rpc)(server:ping-server dotserver)) ;; ((nmsg)(nmsg-transport:ping (tasks:hostinfo-get-interface server) ))) (if res dotserver #f)) @@ -265,11 +283,17 @@ (debug:print 0 *default-log-port* "ERROR: bad host:port")) (if do-exit (exit 1)) #f) (let* ((iface (car host-port)) (port (cadr host-port)) - (server-dat (http-transport:client-connect iface port)) + (server-dat + (case (remote-transport *runremote*) + ((http) (http-transport:client-connect iface port)) + ((rpc) (rpc-transport:client-connect iface port)) + (else + (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (4)") + (exit)))) (login-res (rmt:login-no-auto-client-setup server-dat))) (if (and (list? login-res) (car login-res)) (begin (print "LOGIN_OK") Index: tasks.scm ================================================================== --- tasks.scm +++ tasks.scm @@ -170,21 +170,21 @@ (define (tasks:hostinfo-get-pubport vec) (vector-ref vec 3)) (define (tasks:hostinfo-get-transport vec) (vector-ref vec 4)) (define (tasks:hostinfo-get-pid vec) (vector-ref vec 5)) (define (tasks:hostinfo-get-hostname vec) (vector-ref vec 6)) -(define (tasks:server-lock-slot mdb run-id) +(define (tasks:server-lock-slot mdb run-id transport-type) (tasks:server-clean-out-old-records-for-run-id mdb run-id " tasks:server-lock-slot") (if (< (tasks:num-in-available-state mdb run-id) 4) (begin - (tasks:server-set-available mdb run-id) + (tasks:server-set-available mdb run-id transport-type) (thread-sleep! (/ (random 1500) 1000)) ;; (thread-sleep! 2) ;; Try removing this. It may not be needed. (tasks:server-am-i-the-server? mdb run-id)) #f)) ;; register that this server may come online (first to register goes though with the process) -(define (tasks:server-set-available mdb run-id) +(define (tasks:server-set-available mdb run-id transport-type) (sqlite3:execute mdb "INSERT INTO servers (pid,hostname,port,pubport,start_time, priority,state,mt_version,heartbeat, interface,transport,run_id) VALUES(?, ?, ?, ?, strftime('%s','now'), ?, ?, ?,-1,?, ?, ?);" (current-process-id) ;; pid @@ -194,11 +194,11 @@ (random 1000) ;; priority (used a tiebreaker on get-available) "available" ;; state (common:version-signature) ;; mt_version -1 ;; interface ;; (conc (server:get-transport)) ;; transport - (conc *transport-type*) ;; transport + (symbol->string transport-type) ;; transport run-id )) (define (tasks:num-in-available-state mdb run-id) (let ((res 0)) @@ -323,12 +323,12 @@ (res '())) (sqlite3:for-each-row (lambda (a . b) (set! res (cons (apply vector a b) res))) mdb - (conc "SELECT " selstr " FROM servers WHERE run_id=? AND state in ('available','running','dbprep') ORDER BY start_time DESC;") - run-id) + (conc "SELECT " selstr " FROM servers WHERE state in ('available','running','dbprep') ORDER BY start_time DESC;") + ) (vector header res))) (define (tasks:get-server mdb run-id #!key (retries 10)) (let ((res #f) (best #f)) Index: tests/fullrun/megatest.config ================================================================== --- tests/fullrun/megatest.config +++ tests/fullrun/megatest.config @@ -157,11 +157,12 @@ # force use of server always # required yes # Use http instead of direct filesystem access -transport http +transport rpc +# transport http # transport fs # transport nmsg synchronous 0 Index: tests/fullrun/tests/all_toplevel/testconfig ================================================================== --- tests/fullrun/tests/all_toplevel/testconfig +++ tests/fullrun/tests/all_toplevel/testconfig @@ -1,8 +1,8 @@ [ezsteps] calcresults megatest -list-runs $MT_RUNNAME -target $MT_TARGET -check_triggers cat $MT_RUN_AREA_HOME/triggers_$MT_RUN_NAME.dat +check_triggers cat $MT_RUN_AREA_HOME/triggers_$MT_RUNNAME.dat [logpro] check_triggers ;; (expect:error in "LogFileBody" = 0 "No errors" #/error/i) ADDED thunk-utils.scm Index: thunk-utils.scm ================================================================== --- /dev/null +++ thunk-utils.scm @@ -0,0 +1,121 @@ +(use srfi-18) + + +;; wrap a proc with a mutex so that two threads may not call proc simultaneously. +;; will catch exceptions to ensure mutex is unlocked even if exception is thrown. +;; will generate a unique mutex for proc unless one is specified with canned-mutex: option +;; +;; example 1: (define thread-safe-+ (make-synchronized-proc +)) +;; example 2: (define thread-safe-plus +;; (make-synchronized-proc +;; (lambda (x y) +;; (+ x y)))) + +(define (make-synchronized-proc proc + #!key (canned-mutex #f)) + (let* ((guard-mutex (if canned-mutex canned-mutex (make-mutex))) + (guarded-proc ;; we are guarding the thunk against exceptions. We will record whether result of evaluation is an exception or a regular result. + (lambda args + (mutex-lock! guard-mutex) + (let* ((EXCEPTION (gensym)) ;; using gensym to avoid potential collision with a proc that returns a pair having the first element be our flag. gensym guarantees the symbol is unique. + (res + (condition-case + (apply proc args) ;; this is what we are guarding the execution of + [x () (cons EXCEPTION x)] + ))) + (mutex-unlock! guard-mutex) + (cond + ((and (pair? res) (eq? (car res) EXCEPTION)) + (raise (cdr res))) + (else + res)))))) + guarded-proc)) + + +;; retry an operation (depends on srfi-18) +;; ================== +;; idea here is to avoid spending time on coding retrying something. Trying to be generic here. +;; +;; Exception handling: +;; ------------------- +;; if evaluating the thunk results in exception, it will be retried. +;; on last try, if final-failure-returns-actual is true, the exception will be re-thrown to caller. +;; +;; look at options below #!key to see how to configure behavior +;; +;; + +(define (retry-thunk + the-thunk + #!key ;;;; options below + (accept-result? (lambda (x) x)) ;; retry if predicate applied to thunk's result is false + (retries 4) ;; how many tries + (failure-value #f) ;; return this on final failure, unless following option is enabled: + (final-failure-returns-actual #f) ;; on failure, on the last try, just return the result, not failure-value + + (retry-delay 0.1) ;; delay between tries + (back-off-factor 1) ;; multiply retry-delay by this factor on retry + (random-delay 0.1) ;; add a random portion of this value to wait + + (chatty #f) ;; print status as we go, for debugging. + ) + + (when chatty (print) (print "Entered retry-thunk") (print "-=-=-=-=-=-")) + (let* ((guarded-thunk ;; we are guarding the thunk against exceptions. We will record whether result of evaluation is an exception or a regular result. + (lambda () + (let* ((EXCEPTION (gensym)) ;; using gensym to avoid potential collision + (res + (condition-case + (the-thunk) ;; this is what we are guarding the execution of + [x () (cons EXCEPTION x)] + ))) + (cond + ((and (pair? res) (eq? (car res) EXCEPTION)) + (if chatty + (print " - the-thunk threw exception >"(cdr res)"<")) + (cons 'exception (cdr res))) + (else + (if chatty + (print " - the-thunk returned result >"res"<")) + (cons 'regular-result res))))))) + + (let loop ((guarded-res (guarded-thunk)) + (retries-left retries) + (fail-wait retry-delay)) + (if chatty (print " ==========")) + (let* ((wait-time (+ fail-wait (+ (* fail-wait back-off-factor) + (* random-delay + (/ (random 1024) 1024) )))) + (res-type (car guarded-res)) + (res-value (cdr guarded-res))) + (cond + ((and (eq? res-type 'regular-result) (accept-result? res-value)) + (if chatty (print " + return result that satisfied accept-result? >"res-value"<")) + res-value) + + ((> retries-left 0) + (if chatty (print " - sleep "wait-time)) + (thread-sleep! wait-time) + (if chatty (print " + retry ["retries-left" tries left]")) + (loop (guarded-thunk) + (sub1 retries-left) + wait-time)) + + ((eq? res-type 'regular-result) + (if final-failure-returns-actual + (begin + (if chatty (print " + last try failed- return the result >"res-value"<")) + res-value) + (begin + (if chatty (print " + last try failed- return canned failure value >"failure-value"<")) + failure-value))) + + (else ;; no retries left; result was not accepted and res-type can only be 'exception + (if final-failure-returns-actual + (begin + (if chatty (print " + last try failed with exception- re-throw it >"res-value"<")) + (abort res-value)); re-raise the exception. TODO: find a way for call-history to show as though from entry to this function + (begin + (if chatty (print " + last try failed with exception- return canned failure value >"failure-value"<")) + failure-value)))))))) + Index: utils/installall.sh ================================================================== --- utils/installall.sh +++ utils/installall.sh @@ -10,14 +10,13 @@ # This program is distributed WITHOUT ANY WARRANTY; without even the # implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR # PURPOSE. echo You may need to do the following first: -echo sudo apt-get install libreadline-dev -echo sudo apt-get install libwebkitgtk-dev +echo sudo apt-get install libreadline-dev libsqlite3-dev libwebkitgtk-dev echo sudo apt-get install libpangox-1.0-0 zlib1g-dev libfreetype6-dev cmake -echo sudo apt-get install libssl-dev +echo sudo apt-get install libssl-dev uuid-dev libglu1-mesa-dev echo sudo apt-get install libmotif3 -OR- set KTYPE=26g4 echo echo Set OPTION to std, currently OPTION=$OPTION echo echo Additionally, if you want mysql-client, you will need to make sure @@ -25,21 +24,37 @@ echo echo You are using PREFIX=$PREFIX echo You are using proxy="$proxy" echo echo "Set additional_libpath to help find gtk or other libraries, don't forget a leading :" + +if [[ "$OPTION"x == "x" ]];then + OPTION=std +fi SYSTEM_TYPE=$(lsb_release -irs |tr ' ' '_' |tr '\n' '-')$(uname -i)-$OPTION + +# default chicken version variables. Override in case statement as appropriate +CHICKEN_VERSION=4.10.0 +CHICKEN_BASEVER=4.10.0 # Set up variables # case $SYSTEM_TYPE in Ubuntu-16.04-x86_64-std) KTYPE=32 CDVER=5.10 IUPVER=3.17 IMVER=3.11 + ;; +Ubuntu-16.04-x86_64-new) + KTYPE=32 + CDVER=5.10 + IUPVER=3.17 + IMVER=3.11 + CHICKEN_VERSION=4.10.0 + CHICKEN_BASEVER=4.10.0 ;; Ubuntu-16.04-i686-std) KTYPE=32 CDVER=5.10 IUPVER=3.17 @@ -103,12 +118,10 @@ # Put all the downloaded tar files in tgz mkdir -p tgz # http://code.call-cc.org/releases/4.8.0/chicken-4.8.0.5.tar.gz -export CHICKEN_VERSION=4.11.0 -export CHICKEN_BASEVER=4.11.0 chicken_targz=chicken-${CHICKEN_VERSION}.tar.gz if ! [[ -e tgz/$chicken_targz ]]; then wget http://code.call-cc.org/releases/${CHICKEN_BASEVER}/${chicken_targz} mv $chicken_targz tgz fi @@ -177,11 +190,11 @@ cd $BUILDHOME # Some eggs are quoted since they are reserved to Bash # for f in matchable readline apropos base64 regex-literals format "regex-case" "test" coops trace csv dot-locking posix-utils posix-extras directory-utils hostinfo tcp rpc csv-xml fmt json md5; do # $CHICKEN_INSTALL $PROX -keep-installed matchable readline apropos base64 regex-literals format "regex-case" "test" coops trace csv dot-locking posix-utils posix-extras directory-utils hostinfo tcp rpc csv-xml fmt json md5 awful http-client spiffy uri-common intarweb http-client spiffy-request-vars md5 message-digest http-client spiffy-directory-listing -for egg in matchable readline apropos base64 regex-literals format "regex-case" "test" \ +for egg in matchable readline apropos dbi base64 regex-literals format "regex-case" "test" \ coops trace csv dot-locking posix-utils posix-extras directory-utils hostinfo \ tcp rpc csv-xml fmt json md5 awful http-client spiffy uri-common intarweb http-client \ spiffy-request-vars s md5 message-digest spiffy-directory-listing ssax sxml-serializer \ sxml-modifications logpro z3 call-with-environment-variables \ pathname-expand typed-records simple-exceptions numbers crypt parley srfi-42 \ @@ -302,12 +315,12 @@ cd histstore $PREFIX/bin/csc histstore.scm -o hs cp -f hs $PREFIX/bin/hs cd ../mutils $PREFIX/bin/chicken-install - cd ../dbi - $PREFIX/bin/chicken-install + # cd ../dbi + # $PREFIX/bin/chicken-install cd ../margs $PREFIX/bin/chicken-install fi cd $BUILDHOME