Index: api.scm ================================================================== --- api.scm +++ api.scm @@ -157,10 +157,15 @@ (params (vector-ref dat 1)) (start-t (current-milliseconds)) (readonly-mode (dbr:dbstruct-read-only dbstruct)) (readonly-command (member cmd api:read-only-queries)) (writecmd-in-readonly-mode (and readonly-mode (not readonly-command))) + (foo (begin + (common:telemetry-log (conc "api-in:"(->string cmd)) + payload: `((params . ,params))) + + #t)) (res (if writecmd-in-readonly-mode (conc "attempt to run write command "cmd" on a read-only database") (case cmd ;;=============================================== @@ -327,19 +332,28 @@ ;; TASKS ((find-task-queue-records) (apply tasks:find-task-queue-records dbstruct params)) (else (debug:print 0 *default-log-port* "ERROR: bad api call " cmd) (conc "ERROR: BAD api call " cmd)))))) + ;; save all stats (let ((delta-t (- (current-milliseconds) start-t))) (hash-table-set! *db-api-call-time* cmd (cons delta-t (hash-table-ref/default *db-api-call-time* cmd '())))) (if writecmd-in-readonly-mode - (vector #f res) - (vector #t res))))))) + (begin + (common:telemetry-log (conc "api-out:"(->string cmd)) + payload: `((params . ,params) + (ok-res . #t))) + (vector #f res)) + (begin + (common:telemetry-log (conc "api-out:"(->string cmd)) + payload: `((params . ,params) + (ok-res . #f))) + (vector #t res)))))))) ;; http-server send-response ;; api:process-request ;; db:* ;; Index: common.scm ================================================================== --- common.scm +++ common.scm @@ -17,11 +17,11 @@ ;; along with Megatest. If not, see . ;;====================================================================== (use srfi-1 data-structures posix regex-case (prefix base64 base64:) - format dot-locking csv-xml z3 ;; sql-de-lite + format dot-locking csv-xml z3 udp ;; sql-de-lite hostinfo md5 message-digest typed-records directory-utils stack matchable regex posix (srfi 18) extras ;; tcp (prefix nanomsg nmsg:) (prefix sqlite3 sqlite3:) pkts (prefix dbi dbi:) @@ -81,10 +81,11 @@ (length (glob (conc "/proc/" pid "/fd/*"))) (length (filter identity (map socket? (glob (conc "/proc/" pid "/fd/*"))))) ) ) + ;; GLOBALS ;; CONTEXTS (defstruct cxt @@ -887,10 +888,11 @@ (debug:print-info 13 *default-log-port* "watchdog done.")) (debug:print-info 13 *default-log-port* "no need for watchdog on non-homehost")))) (define (std-exit-procedure) + ;;(common:telemetry-log-close) (on-exit (lambda () 0)) ;;(debug:print-info 13 *default-log-port* "std-exit-procedure called; *time-to-exit*="*time-to-exit*) (let ((no-hurry (if *time-to-exit* ;; hurry up #f (begin @@ -3049,5 +3051,68 @@ exn #t ;; just ignore it, it might have died in the meantime so joining it will throw an exception (thread-join! thread)) ))) (hash-table-keys *common:thread-punchlist*))) + +(define *common:telemetry-log-state* 'startup) +(define *common:telemetry-log-socket* #f) + +(define (common:telemetry-log-open) + (if (eq? *common:telemetry-log-state* 'startup) + (let* ((serverhost (configf:lookup *configdat* "telemetry" "host")) + (serverport (configf:lookup-number *configdat* "telemetry" "port")) + (user (or (get-environment-variable "USER") "unknown")) + (host (or (get-environment-variable "HOST") "unknown"))) + (set! *common:telemetry-log-state* + (handle-exceptions + exn + (begin + (debug:print-info 0 *default-log-port* "common-telemetry-log open udp port failure") + 'broken) + (if (and serverhost serverport user host) + (let* ((s (udp-open-socket))) + ;;(udp-bind! s #f 0) + (udp-connect! s serverhost serverport) + (set! *common:telemetry-log-socket* s) + 'open) + 'not-needed)))))) + +(define (common:telemetry-log event #!key (payload '())) + (if (eq? *common:telemetry-log-state* 'startup) + (common:telemetry-log-open)) + + (if (eq? 'open *common:telemetry-log-state*) + (handle-exceptions + exn + (begin + (debug:print-info 0 *default-log-port* "common-telemetry-log comms failure ; disabled (no server?)") + ;;(define *common:telemetry-log-state* 'broken-or-no-server-preclose) + ;;(common:telemetry-log-close) + (define *common:telemetry-log-state* 'broken-or-no-server) + (set! *common:telemetry-log-socket* #f) + ) + (if (and *common:telemetry-log-socket* event) + (let* ((user (or (get-environment-variable "USER") "unknown")) + (host (or (get-environment-variable "HOST") "unknown")) + (start (conc "[megatest "event"]")) + (toppath (or *toppath* "/dev/null")) + (payload-serialized + (base64:base64-encode + (z3:encode-buffer + (with-output-to-string (lambda () (pp payload)))))) + (msg (conc user":"host":"start":"(current-process-id)":"(car (argv))":" + toppath":"payload-serialized))) + (udp-send *common:telemetry-log-socket* msg)))))) + +(define (common:telemetry-log-close) + (when (or (member *common:telemetry-log-state* '(broken-or-no-server-preclose open)) *common:telemetry-log-socket*) + (handle-exceptions + exn + (begin + (define *common:telemetry-log-state* 'closed-fail) + (debug:print-info 0 *default-log-port* "common-telemetry-log closure failure") + ) + (begin + (define *common:telemetry-log-state* 'closed) + (udp-close-socket *common:telemetry-log-socket*) + (set! *common:telemetry-log-socket* #f))))) Index: rmt.scm ================================================================== --- rmt.scm +++ rmt.scm @@ -55,10 +55,15 @@ ;; RA => e.g. usage (rmt:send-receive 'get-var #f (list varname)) ;; (define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f)) ;; start attemptnum at 1 so the modulo below works as expected + (common:telemetry-log (conc "rmt:"(->string cmd)) + payload: `((rid . ,rid) + (params . ,params))) + + ;;DOT digraph megatest_state_status { ;;DOT ranksep=0; ;;DOT // rankdir=LR; ;;DOT node [shape="box"]; ;;DOT "rmt:send-receive" -> MUTEXLOCK; Index: runs.scm ================================================================== --- runs.scm +++ runs.scm @@ -451,10 +451,17 @@ ;; register this run in monitor.db (rmt:tasks-add "run-tests" user target runname test-patts task-key) ;; params) (rmt:tasks-set-state-given-param-key task-key "running") + (common:telemetry-log "run-tests" + payload: + `( (target . ,target) + (run-name . ,runname) + (test-patts . ,test-patts) ) ) + + ;; Now generate all the tests lists (set! all-tests-registry (tests:get-all)) ;; hash of testname => path-to-test (set! all-test-names (hash-table-keys all-tests-registry)) ;; filter first for allowed-tests (from -tagexpr) then for test-patts. (set! test-names (tests:filter-test-names ADDED telemetry-daemon Index: telemetry-daemon ================================================================== --- /dev/null +++ telemetry-daemon @@ -0,0 +1,265 @@ +#!/usr/bin/env python +# -*- Mode: Python; -*- +## Tiny Syslog Server in Python. +## +## This is a tiny syslog server that is able to receive UDP based syslog +## entries on a specified port and save them to a file. +## That's it... it does nothing else... + + +import os +import sys, os, time, atexit +from signal import SIGTERM +import logging +import logging.handlers +import SocketServer +import datetime +from subprocess import call +import argparse +import os +import socket + +## code to determine this host's IP on non-loopback interface +if os.name != "nt": + import fcntl + import struct + + def get_interface_ip(ifname): + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + return socket.inet_ntoa(fcntl.ioctl(s.fileno(), 0x8915, struct.pack('256s', + ifname[:15]))[20:24]) + +def get_lan_ip(): + ip = socket.gethostbyname(socket.gethostname()) + if ip.startswith("127.") and os.name != "nt": + interfaces = [ + "eth0", + "eth1", + "eth2", + "wlan0", + "wlan1", + "wifi0", + "ath0", + "ath1", + "ppp0", + ] + for ifname in interfaces: + try: + ip = get_interface_ip(ifname) + break + except IOError: + pass + return ip + +class Daemon(object): + """ + A generic daemon class. + + Usage: subclass the Daemon class and override the run() method + """ + def __init__(self, pidfile, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'): + self.stdin = stdin + self.stdout = stdout + self.stderr = stderr + self.pidfile = pidfile + + def daemonize(self): + """ + do the UNIX double-fork magic, see Stevens' "Advanced + Programming in the UNIX Environment" for details (ISBN 0201563177) + http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16 + """ + try: + pid = os.fork() + if pid > 0: + # exit first parent + sys.exit(0) + except OSError, e: + sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror)) + sys.exit(1) + + # decouple from parent environment + os.chdir("/") + os.setsid() + os.umask(0) + + # do second fork + try: + pid = os.fork() + if pid > 0: + # exit from second parent + sys.exit(0) + except OSError, e: + sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror)) + sys.exit(1) + + # redirect standard file descriptors + sys.stdout.flush() + sys.stderr.flush() + si = file(self.stdin, 'r') + so = file(self.stdout, 'a+') + se = file(self.stderr, 'a+', 0) + os.dup2(si.fileno(), sys.stdin.fileno()) + os.dup2(so.fileno(), sys.stdout.fileno()) + os.dup2(se.fileno(), sys.stderr.fileno()) + + # write pidfile + atexit.register(self.delpid) + pid = str(os.getpid()) + file(self.pidfile,'w+').write("%s\n" % pid) + + def delpid(self): + os.remove(self.pidfile) + + def start(self): + """ + Start the daemon + """ + # Check for a pidfile to see if the daemon already runs + try: + pf = file(self.pidfile,'r') + pid = int(pf.read().strip()) + pf.close() + except IOError: + pid = None + + if pid: + message = "pidfile %s already exist. Daemon already running?\n" + sys.stderr.write(message % self.pidfile) + sys.exit(1) + + # Start the daemon + self.daemonize() + self.run() + + def stop(self): + """ + Stop the daemon + """ + # Get the pid from the pidfile + try: + pf = file(self.pidfile,'r') + pid = int(pf.read().strip()) + pf.close() + except IOError: + pid = None + + if not pid: + message = "pidfile %s does not exist. Daemon not running?\n" + sys.stderr.write(message % self.pidfile) + return # not an error in a restart + + # Try killing the daemon process + try: + while 1: + os.kill(pid, SIGTERM) + time.sleep(0.1) + except OSError, err: + err = str(err) + if err.find("No such process") > 0: + if os.path.exists(self.pidfile): + os.remove(self.pidfile) + else: + print str(err) + sys.exit(1) + + def restart(self): + """ + Restart the daemon + """ + self.stop() + self.start() + + def run(self): + """ + You should override this method when you subclass Daemon. It will be called after the process has been + daemonized by start() or restart(). + """ + +# setup logging module so that the log can be moved aside and will reopen for append +def log_setup(logfile): + log_handler = logging.handlers.WatchedFileHandler(logfile) + formatter = logging.Formatter( + '%(message)s','') + log_handler.setFormatter(formatter) + logger = logging.getLogger() + logger.addHandler(log_handler) + logger.setLevel(logging.INFO) + + +class SyslogUDPHandler(SocketServer.BaseRequestHandler): + def handle(self): + data = bytes.decode(self.request[0].strip()) + socket = self.request[1] + print( "%s : " % self.client_address[0], str(data)) + timestamp = datetime.datetime.now().isoformat() + logline = timestamp + ":"+self.client_address[0] + ":" + str(data) + logging.info(str(logline)) + + + +class TelemetryLogDaemon(Daemon): + def __init__(self, pidfile, logfile, server_ip, server_port): + self.logfile = logfile + self.server_ip = server_ip + self.server_port = server_port + super(TelemetryLogDaemon, self).__init__(pidfile) + + def run(self): + log_setup(self.logfile) + server = SocketServer.UDPServer((self.server_ip,int(self.server_port)), SyslogUDPHandler) + server.serve_forever(poll_interval=0.5) + + +def main(): + default_log_file = os.environ['PWD'] + "/telemetry.log" + + parser = argparse.ArgumentParser(description = 'telemetry-daemon') + actions="start,restart,stop,nodaemon".split(",") + + parser.add_argument("-a", "--action", required=True, choices=actions, help="manage daemon: start stop or restart") + parser.add_argument("-p", "--server-port", default="5929", help="specify alternate udp port number, default is 5929") + parser.add_argument("-i", "--server-ip", default=get_lan_ip(), help="specify IP if heuristics to get local host lan ip fails") + parser.add_argument("-l", "--log-file", default=default_log_file, help="specify log file to write") + parser.add_argument("-z", "--pid-file", default=default_log_file + ".pidfile", help="specify pidfile") + opts = parser.parse_args() + + tld = TelemetryLogDaemon(opts.pid_file, opts.log_file, opts.server_ip, opts.server_port) + + if opts.action == "start": + print "Info: Starting server" + print """Example addition to megatest.config to enable telemetry: + +[telemetry] +host %s +port %s +want-events ALL + + """ % (opts.server_ip, opts.server_port) + tld.start() + + elif opts.action == "stop": + tld.stop() + elif opts.action == "restart": + + print "Info: Restarting server" + print """Example addition to megatest.config to enable telemetry: + +[telemetry] +host %s +port %s +want-events ALL + + """ % (opts.server_ip, opts.server_port) + tld.restart() + elif opts.action == "nodaemon": + log_setup(opts.log_file) + server = SocketServer.UDPServer((opts.server_ip,int(opts.server_port)), SyslogUDPHandler) + server.serve_forever(poll_interval=0.5) + +if __name__ == '__main__': + main() + + + +