Megatest

telemetry.scm at [852e127507]
Login

File ulex/telemetry/telemetry.scm artifact 7663509699 part of check-in 852e127507



(module telemetry
    (telemetry-open telemetry-send telemetry-close telemetry-server
                    telemetry-show-debugs telemetry-hide-debugs )
  
  (import chicken scheme data-structures
          base64 srfi-18
          z3 udp posix extras ports mailbox mailbox-threads)

  (use udp)
  (use base64)
  (use z3)
  (use mailbox-threads)
  
  (define *telemetry:telemetry-log-state* 'startup)
  (define *telemetry:telemetry-log-socket* #f)

  (define *debug-print-flag* #f)

  (define (telemetry-show-debugs)
    (set! *debug-print-flag* #t))

  (define (telemetry-hide-debugs)
    (set! *debug-print-flag* #f))

  (define (debug-print  . args)
    (if *debug-print-flag*
        (apply print "telemetry> " args)))

  (define (make-telemetry-server-thread port callback)
    (let* ((thr
            (make-thread
             (lambda ()
               (let* ((s (udp-open-socket)))
                 (udp-bind! s #f port)
                 ;;(udp-connect! s "localhost" port)
                 (let loop ((seq 0))
                   (debug-print "loop seq="seq)
                   (receive (n data from-host from-port) (udp-recvfrom s 640000)
                     (let* ((encapsulated-payload
                             (with-input-from-string
                                 (z3:decode-buffer
                                  (base64-decode data)) read))
                            (callback-res `( (from-host . ,from-host)
                                             (from-port . ,from-port)
                                             (data-len  . ,n)
                                             ,@encapsulated-payload )))
                       (callback callback-res))
                     
                       )
                   (loop (add1 seq)))
                 (udp-close-socket s))))))
      (thread-start! thr)
      thr))

  (define (telemetry-server port handler-callback)
    (let* ((serv-thread (make-telemetry-server-thread port handler-callback)))
      (print serv-thread)
      (thread-join! serv-thread)))

  
  (define (telemetry-open serverhost serverport)
    (let* ((user (or (get-environment-variable "USER") "unknown"))
           (host (or (get-environment-variable "HOST") "unknown")))
      (set! *telemetry:telemetry-log-state*
            (handle-exceptions
             exn
             (begin
               (debug-print "telemetry-open udp port failure")
               'broken)
             (if (and serverhost serverport user host)
                 (let* ((s (udp-open-socket)))
                   ;;(udp-bind! s #f 0)
                   (udp-connect! s serverhost serverport)
                   (set! *telemetry:telemetry-log-socket* s)
                   'open)
                 'not-needed)))))


  (define (telemetry-close)
    (when (or (member *telemetry:telemetry-log-state* '(broken-or-no-server-preclose open)) *telemetry:telemetry-log-socket*)
      (handle-exceptions
       exn
       (begin
         (define *telemetry:telemetry-log-state* 'closed-fail)
         (debug-print "telemetry-telemetry-log closure failure")
         )
       (begin
         (define *telemetry:telemetry-log-state* 'closed)
         (udp-close-socket *telemetry:telemetry-log-socket*)
         (set! *telemetry:telemetry-log-socket* #f)))))

    (define (telemetry-send payload)
    (if (eq? 'open *telemetry:telemetry-log-state*)
        (handle-exceptions
         exn
         (begin
           (debug-print "telemetry-telemetry-log comms failure ; disabled (no server?)")
           (define *telemetry:telemetry-log-state* 'broken-or-no-server-preclose)
           (telemetry-close)
           (define *telemetry:telemetry-log-state* 'broken-or-no-server)
           (set! *telemetry:telemetry-log-socket* #f)
           )
         (if (and *telemetry:telemetry-log-socket* payload) 
             (let* ((user (or (get-environment-variable "USER") "unknown"))
                    (host (or (get-environment-variable "HOST") "unknown"))
                    (encapsulated-payload
                     `( (user . ,user)
                        (host . ,host)
                        (pid . ,(current-process-id))
                        (payload . ,payload) ) )
                    (msg
                     (base64-encode
                      (z3:encode-buffer
                       (with-output-to-string (lambda () (pp encapsulated-payload)))))))
               ;;(debug-print "pre-send")
               (let ((res (udp-send *telemetry:telemetry-log-socket* msg)))
                 ;;(debug-print "post-send >"res"<")
                 res)
               
               )))) )


  )