Changes In Branch v1.65-telemetry Through [c31defc65c] Excluding Merge-Ins
This is equivalent to a diff from 2aaccbd409 to c31defc65c
2019-02-14
| ||
15:02 | merged in trunk to get docs/manual updates check-in: 73cb4bf58e user: bjbarcla tags: v1.65 | |
14:18 | defend against issue in test execution >>Error: (hash-table->alist) bad argument type - not a structure of the required type << check-in: ca98414851 user: bjbarcla tags: v1.65-telemetry | |
2019-02-13
| ||
19:08 | cleanup telemetry-daemon check-in: c31defc65c user: bjbarcla tags: v1.65-telemetry | |
19:03 | ensured telemetry logging does not happen if no mention in megatest.config check-in: c4fac28c5c user: bjbarcla tags: v1.65-telemetry | |
00:51 | added telemetry logging func common:telemetry-log check-in: 76975179f6 user: bjbarcla tags: v1.65-telemetry | |
2019-02-11
| ||
11:35 | merged brute force syncer check-in: 2aaccbd409 user: bjbarcla tags: v1.65, v1.6524 | |
11:30 | made server messages such that sync handily summarized by watch "grep -h SYNC server-*.log | sort | tail -30" Closed-Leaf check-in: ef2ec4a2aa user: bjbarcla tags: v1.65-dump-for-sync | |
2019-02-07
| ||
17:16 | coordinate multiple servers such that only one server will be syncing exclusively at any given momemt\ check-in: 31c8ca7f78 user: bjbarcla tags: v1.65 | |
Modified api.scm from [1541791de9] to [cf3fabb928].
︙ | ︙ | |||
155 156 157 158 159 160 161 162 163 164 165 166 167 168 | cmd-in (string->symbol cmd-in))) (params (vector-ref dat 1)) (start-t (current-milliseconds)) (readonly-mode (dbr:dbstruct-read-only dbstruct)) (readonly-command (member cmd api:read-only-queries)) (writecmd-in-readonly-mode (and readonly-mode (not readonly-command))) (res (if writecmd-in-readonly-mode (conc "attempt to run write command "cmd" on a read-only database") (case cmd ;;=============================================== ;; READ/WRITE QUERIES ;;=============================================== | > > > > > | 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 | cmd-in (string->symbol cmd-in))) (params (vector-ref dat 1)) (start-t (current-milliseconds)) (readonly-mode (dbr:dbstruct-read-only dbstruct)) (readonly-command (member cmd api:read-only-queries)) (writecmd-in-readonly-mode (and readonly-mode (not readonly-command))) (foo (begin (common:telemetry-log (conc "api-in:"(->string cmd)) payload: `((params . ,params))) #t)) (res (if writecmd-in-readonly-mode (conc "attempt to run write command "cmd" on a read-only database") (case cmd ;;=============================================== ;; READ/WRITE QUERIES ;;=============================================== |
︙ | ︙ | |||
325 326 327 328 329 330 331 332 333 334 335 336 337 338 | ((testmeta-get-record) (apply db:testmeta-get-record dbstruct params)) ;; TASKS ((find-task-queue-records) (apply tasks:find-task-queue-records dbstruct params)) (else (debug:print 0 *default-log-port* "ERROR: bad api call " cmd) (conc "ERROR: BAD api call " cmd)))))) ;; save all stats (let ((delta-t (- (current-milliseconds) start-t))) (hash-table-set! *db-api-call-time* cmd (cons delta-t (hash-table-ref/default *db-api-call-time* cmd '())))) (if writecmd-in-readonly-mode | > > > > > | > > > > | | 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 | ((testmeta-get-record) (apply db:testmeta-get-record dbstruct params)) ;; TASKS ((find-task-queue-records) (apply tasks:find-task-queue-records dbstruct params)) (else (debug:print 0 *default-log-port* "ERROR: bad api call " cmd) (conc "ERROR: BAD api call " cmd)))))) ;; save all stats (let ((delta-t (- (current-milliseconds) start-t))) (hash-table-set! *db-api-call-time* cmd (cons delta-t (hash-table-ref/default *db-api-call-time* cmd '())))) (if writecmd-in-readonly-mode (begin (common:telemetry-log (conc "api-out:"(->string cmd)) payload: `((params . ,params) (ok-res . #t))) (vector #f res)) (begin (common:telemetry-log (conc "api-out:"(->string cmd)) payload: `((params . ,params) (ok-res . #f))) (vector #t res)))))))) ;; http-server send-response ;; api:process-request ;; db:* ;; ;; NB// Runs on the server as part of the server loop ;; |
︙ | ︙ |
Modified common.scm from [b6c40dc319] to [91f2f49c15].
︙ | ︙ | |||
15 16 17 18 19 20 21 | ;; ;; You should have received a copy of the GNU General Public License ;; along with Megatest. If not, see <http://www.gnu.org/licenses/>. ;;====================================================================== (use srfi-1 data-structures posix regex-case (prefix base64 base64:) | | | 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | ;; ;; You should have received a copy of the GNU General Public License ;; along with Megatest. If not, see <http://www.gnu.org/licenses/>. ;;====================================================================== (use srfi-1 data-structures posix regex-case (prefix base64 base64:) format dot-locking csv-xml z3 udp ;; sql-de-lite hostinfo md5 message-digest typed-records directory-utils stack matchable regex posix (srfi 18) extras ;; tcp (prefix nanomsg nmsg:) (prefix sqlite3 sqlite3:) pkts (prefix dbi dbi:) ) |
︙ | ︙ | |||
79 80 81 82 83 84 85 86 87 88 89 90 91 92 | (define (get-file-descriptor-count #!key (pid (current-process-id ))) (list (length (glob (conc "/proc/" pid "/fd/*"))) (length (filter identity (map socket? (glob (conc "/proc/" pid "/fd/*"))))) ) ) ;; GLOBALS ;; CONTEXTS (defstruct cxt (taskdb #f) (cmutex (make-mutex))) | > | 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 | (define (get-file-descriptor-count #!key (pid (current-process-id ))) (list (length (glob (conc "/proc/" pid "/fd/*"))) (length (filter identity (map socket? (glob (conc "/proc/" pid "/fd/*"))))) ) ) ;; GLOBALS ;; CONTEXTS (defstruct cxt (taskdb #f) (cmutex (make-mutex))) |
︙ | ︙ | |||
885 886 887 888 889 890 891 892 893 894 895 896 897 898 | (debug:print-info 13 *default-log-port* "loading writable-watchdog.") (server:writable-watchdog dbstruct))) (debug:print-info 13 *default-log-port* "watchdog done.")) (debug:print-info 13 *default-log-port* "no need for watchdog on non-homehost")))) (define (std-exit-procedure) (on-exit (lambda () 0)) ;;(debug:print-info 13 *default-log-port* "std-exit-procedure called; *time-to-exit*="*time-to-exit*) (let ((no-hurry (if *time-to-exit* ;; hurry up #f (begin (set! *time-to-exit* #t) #t)))) | > | 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 | (debug:print-info 13 *default-log-port* "loading writable-watchdog.") (server:writable-watchdog dbstruct))) (debug:print-info 13 *default-log-port* "watchdog done.")) (debug:print-info 13 *default-log-port* "no need for watchdog on non-homehost")))) (define (std-exit-procedure) ;;(common:telemetry-log-close) (on-exit (lambda () 0)) ;;(debug:print-info 13 *default-log-port* "std-exit-procedure called; *time-to-exit*="*time-to-exit*) (let ((no-hurry (if *time-to-exit* ;; hurry up #f (begin (set! *time-to-exit* #t) #t)))) |
︙ | ︙ | |||
3047 3048 3049 3050 3051 3052 3053 | (if thread (handle-exceptions exn #t ;; just ignore it, it might have died in the meantime so joining it will throw an exception (thread-join! thread)) ))) (hash-table-keys *common:thread-punchlist*))) | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 3049 3050 3051 3052 3053 3054 3055 3056 3057 3058 3059 3060 3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079 3080 3081 3082 3083 3084 3085 3086 3087 3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101 3102 3103 3104 3105 3106 3107 3108 3109 3110 3111 3112 3113 3114 3115 3116 3117 3118 | (if thread (handle-exceptions exn #t ;; just ignore it, it might have died in the meantime so joining it will throw an exception (thread-join! thread)) ))) (hash-table-keys *common:thread-punchlist*))) (define *common:telemetry-log-state* 'startup) (define *common:telemetry-log-socket* #f) (define (common:telemetry-log-open) (if (eq? *common:telemetry-log-state* 'startup) (let* ((serverhost (configf:lookup *configdat* "telemetry" "host")) (serverport (configf:lookup-number *configdat* "telemetry" "port")) (user (or (get-environment-variable "USER") "unknown")) (host (or (get-environment-variable "HOST") "unknown"))) (set! *common:telemetry-log-state* (handle-exceptions exn (begin (debug:print-info 0 *default-log-port* "common-telemetry-log open udp port failure") 'broken) (if (and serverhost serverport user host) (let* ((s (udp-open-socket))) ;;(udp-bind! s #f 0) (udp-connect! s serverhost serverport) (set! *common:telemetry-log-socket* s) 'open) 'not-needed)))))) (define (common:telemetry-log event #!key (payload '())) (if (eq? *common:telemetry-log-state* 'startup) (common:telemetry-log-open)) (if (eq? 'open *common:telemetry-log-state*) (handle-exceptions exn (begin (debug:print-info 0 *default-log-port* "common-telemetry-log comms failure ; disabled (no server?)") ;;(define *common:telemetry-log-state* 'broken-or-no-server-preclose) ;;(common:telemetry-log-close) (define *common:telemetry-log-state* 'broken-or-no-server) (set! *common:telemetry-log-socket* #f) ) (if (and *common:telemetry-log-socket* event) (let* ((user (or (get-environment-variable "USER") "unknown")) (host (or (get-environment-variable "HOST") "unknown")) (start (conc "[megatest "event"]")) (toppath (or *toppath* "/dev/null")) (payload-serialized (base64:base64-encode (z3:encode-buffer (with-output-to-string (lambda () (pp payload)))))) (msg (conc user":"host":"start":"(current-process-id)":"(car (argv))":" toppath":"payload-serialized))) (udp-send *common:telemetry-log-socket* msg)))))) (define (common:telemetry-log-close) (when (or (member *common:telemetry-log-state* '(broken-or-no-server-preclose open)) *common:telemetry-log-socket*) (handle-exceptions exn (begin (define *common:telemetry-log-state* 'closed-fail) (debug:print-info 0 *default-log-port* "common-telemetry-log closure failure") ) (begin (define *common:telemetry-log-state* 'closed) (udp-close-socket *common:telemetry-log-socket*) (set! *common:telemetry-log-socket* #f))))) |
Modified rmt.scm from [0a05f35135] to [bc89e0120c].
︙ | ︙ | |||
53 54 55 56 57 58 59 60 61 62 63 64 65 66 | (define *send-receive-mutex* (make-mutex)) ;; should have separate mutex per run-id ;; RA => e.g. usage (rmt:send-receive 'get-var #f (list varname)) ;; (define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f)) ;; start attemptnum at 1 so the modulo below works as expected ;;DOT digraph megatest_state_status { ;;DOT ranksep=0; ;;DOT // rankdir=LR; ;;DOT node [shape="box"]; ;;DOT "rmt:send-receive" -> MUTEXLOCK; ;;DOT { edge [style=invis];"case 1" -> "case 2" -> "case 3" -> "case 4" -> "case 5" -> "case 6" -> "case 7" -> "case 8" -> "case 9" -> "case 10" -> "case 11"; } ;; do all the prep locked under the rmt-mutex | > > > > > | 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 | (define *send-receive-mutex* (make-mutex)) ;; should have separate mutex per run-id ;; RA => e.g. usage (rmt:send-receive 'get-var #f (list varname)) ;; (define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f)) ;; start attemptnum at 1 so the modulo below works as expected (common:telemetry-log (conc "rmt:"(->string cmd)) payload: `((rid . ,rid) (params . ,params))) ;;DOT digraph megatest_state_status { ;;DOT ranksep=0; ;;DOT // rankdir=LR; ;;DOT node [shape="box"]; ;;DOT "rmt:send-receive" -> MUTEXLOCK; ;;DOT { edge [style=invis];"case 1" -> "case 2" -> "case 3" -> "case 4" -> "case 5" -> "case 6" -> "case 7" -> "case 8" -> "case 9" -> "case 10" -> "case 11"; } ;; do all the prep locked under the rmt-mutex |
︙ | ︙ |
Modified runs.scm from [4560e73753] to [9ffac6688c].
︙ | ︙ | |||
449 450 451 452 453 454 455 456 457 458 459 460 461 462 | (debug:print-info 0 *default-log-port* "filtering initial test list with tagexpr: " (args:get-arg "-tagexpr") " => " allowed-tests) ));; tests will be ANDed with this list ;; register this run in monitor.db (rmt:tasks-add "run-tests" user target runname test-patts task-key) ;; params) (rmt:tasks-set-state-given-param-key task-key "running") ;; Now generate all the tests lists (set! all-tests-registry (tests:get-all)) ;; hash of testname => path-to-test (set! all-test-names (hash-table-keys all-tests-registry)) ;; filter first for allowed-tests (from -tagexpr) then for test-patts. (set! test-names (tests:filter-test-names (if allowed-tests (tests:filter-test-names all-test-names allowed-tests) | > > > > > > > | 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 | (debug:print-info 0 *default-log-port* "filtering initial test list with tagexpr: " (args:get-arg "-tagexpr") " => " allowed-tests) ));; tests will be ANDed with this list ;; register this run in monitor.db (rmt:tasks-add "run-tests" user target runname test-patts task-key) ;; params) (rmt:tasks-set-state-given-param-key task-key "running") (common:telemetry-log "run-tests" payload: `( (target . ,target) (run-name . ,runname) (test-patts . ,test-patts) ) ) ;; Now generate all the tests lists (set! all-tests-registry (tests:get-all)) ;; hash of testname => path-to-test (set! all-test-names (hash-table-keys all-tests-registry)) ;; filter first for allowed-tests (from -tagexpr) then for test-patts. (set! test-names (tests:filter-test-names (if allowed-tests (tests:filter-test-names all-test-names allowed-tests) |
︙ | ︙ |
Added telemetry-daemon version [a2b1d26b8f].
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 | #!/usr/bin/env python # -*- Mode: Python; -*- ## Tiny Syslog Server in Python. ## ## This is a tiny syslog server that is able to receive UDP based syslog ## entries on a specified port and save them to a file. ## That's it... it does nothing else... import os import sys, os, time, atexit from signal import SIGTERM import logging import logging.handlers import SocketServer import datetime from subprocess import call import argparse import os import socket ## code to determine this host's IP on non-loopback interface if os.name != "nt": import fcntl import struct def get_interface_ip(ifname): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) return socket.inet_ntoa(fcntl.ioctl(s.fileno(), 0x8915, struct.pack('256s', ifname[:15]))[20:24]) def get_lan_ip(): ip = socket.gethostbyname(socket.gethostname()) if ip.startswith("127.") and os.name != "nt": interfaces = [ "eth0", "eth1", "eth2", "wlan0", "wlan1", "wifi0", "ath0", "ath1", "ppp0", ] for ifname in interfaces: try: ip = get_interface_ip(ifname) break except IOError: pass return ip class Daemon(object): """ A generic daemon class. Usage: subclass the Daemon class and override the run() method """ def __init__(self, pidfile, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'): self.stdin = stdin self.stdout = stdout self.stderr = stderr self.pidfile = pidfile def daemonize(self): """ do the UNIX double-fork magic, see Stevens' "Advanced Programming in the UNIX Environment" for details (ISBN 0201563177) http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16 """ try: pid = os.fork() if pid > 0: # exit first parent sys.exit(0) except OSError, e: sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror)) sys.exit(1) # decouple from parent environment os.chdir("/") os.setsid() os.umask(0) # do second fork try: pid = os.fork() if pid > 0: # exit from second parent sys.exit(0) except OSError, e: sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror)) sys.exit(1) # redirect standard file descriptors sys.stdout.flush() sys.stderr.flush() si = file(self.stdin, 'r') so = file(self.stdout, 'a+') se = file(self.stderr, 'a+', 0) os.dup2(si.fileno(), sys.stdin.fileno()) os.dup2(so.fileno(), sys.stdout.fileno()) os.dup2(se.fileno(), sys.stderr.fileno()) # write pidfile atexit.register(self.delpid) pid = str(os.getpid()) file(self.pidfile,'w+').write("%s\n" % pid) def delpid(self): os.remove(self.pidfile) def start(self): """ Start the daemon """ # Check for a pidfile to see if the daemon already runs try: pf = file(self.pidfile,'r') pid = int(pf.read().strip()) pf.close() except IOError: pid = None if pid: message = "pidfile %s already exist. Daemon already running?\n" sys.stderr.write(message % self.pidfile) sys.exit(1) # Start the daemon self.daemonize() self.run() def stop(self): """ Stop the daemon """ # Get the pid from the pidfile try: pf = file(self.pidfile,'r') pid = int(pf.read().strip()) pf.close() except IOError: pid = None if not pid: message = "pidfile %s does not exist. Daemon not running?\n" sys.stderr.write(message % self.pidfile) return # not an error in a restart # Try killing the daemon process try: while 1: os.kill(pid, SIGTERM) time.sleep(0.1) except OSError, err: err = str(err) if err.find("No such process") > 0: if os.path.exists(self.pidfile): os.remove(self.pidfile) else: print str(err) sys.exit(1) def restart(self): """ Restart the daemon """ self.stop() self.start() def run(self): """ You should override this method when you subclass Daemon. It will be called after the process has been daemonized by start() or restart(). """ # setup logging module so that the log can be moved aside and will reopen for append def log_setup(logfile): log_handler = logging.handlers.WatchedFileHandler(logfile) formatter = logging.Formatter( '%(message)s','') log_handler.setFormatter(formatter) logger = logging.getLogger() logger.addHandler(log_handler) logger.setLevel(logging.INFO) class SyslogUDPHandler(SocketServer.BaseRequestHandler): def handle(self): data = bytes.decode(self.request[0].strip()) socket = self.request[1] print( "%s : " % self.client_address[0], str(data)) timestamp = datetime.datetime.now().isoformat() logline = timestamp + ":"+self.client_address[0] + ":" + str(data) logging.info(str(logline)) class TelemetryLogDaemon(Daemon): def __init__(self, pidfile, logfile, server_ip, server_port): self.logfile = logfile self.server_ip = server_ip self.server_port = server_port super(TelemetryLogDaemon, self).__init__(pidfile) def run(self): log_setup(self.logfile) server = SocketServer.UDPServer((self.server_ip,int(self.server_port)), SyslogUDPHandler) server.serve_forever(poll_interval=0.5) def main(): default_log_file = os.environ['PWD'] + "/telemetry.log" parser = argparse.ArgumentParser(description = 'telemetry-daemon') actions="start,restart,stop,nodaemon".split(",") parser.add_argument("-a", "--action", required=True, choices=actions, help="manage daemon: start stop or restart") parser.add_argument("-p", "--server-port", default="5929", help="specify alternate udp port number, default is 5929") parser.add_argument("-i", "--server-ip", default=get_lan_ip(), help="specify IP if heuristics to get local host lan ip fails") parser.add_argument("-l", "--log-file", default=default_log_file, help="specify log file to write") parser.add_argument("-z", "--pid-file", default=default_log_file + ".pidfile", help="specify pidfile") opts = parser.parse_args() tld = TelemetryLogDaemon(opts.pid_file, opts.log_file, opts.server_ip, opts.server_port) if opts.action == "start": print "Info: Starting server" print """Example addition to megatest.config to enable telemetry: [telemetry] host %s port %s want-events ALL """ % (opts.server_ip, opts.server_port) tld.start() elif opts.action == "stop": tld.stop() elif opts.action == "restart": print "Info: Restarting server" print """Example addition to megatest.config to enable telemetry: [telemetry] host %s port %s want-events ALL """ % (opts.server_ip, opts.server_port) tld.restart() elif opts.action == "nodaemon": log_setup(opts.log_file) server = SocketServer.UDPServer((opts.server_ip,int(opts.server_port)), SyslogUDPHandler) server.serve_forever(poll_interval=0.5) if __name__ == '__main__': main() |