;; Copyright 2006-2017, Matthew Welland.
;;
;; This file is part of Megatest.
;;
;; Megatest is free software: you can redistribute it and/or modify
;; it under the terms of the GNU General Public License as published by
;; the Free Software Foundation, either version 3 of the License, or
;; (at your option) any later version.
;;
;; Megatest is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
;; GNU General Public License for more details.
;;
;; You should have received a copy of the GNU General Public License
;; along with Megatest. If not, see <http://www.gnu.org/licenses/>.
;;
;;======================================================================
;;
;; This is the Megatest specific stuff for starting and maintaining a
;; server. Anything that talks to the server should go in client.scm (maybe - might get rid of client.scm)
;; General nanomsg stuff (not Megatest specific) should go in the
;; nmsg-transport.scm file.
;;
;;======================================================================
(require-extension (srfi 18) extras tcp s11n)
(use srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest
directory-utils posix-extras matchable typed-records
pkts)
(use spiffy uri-common intarweb http-client spiffy-request-vars)
(declare (unit server))
(declare (uses common))
(declare (uses db))
(import db)
;; Basic stuff for safely kicking off a server
(declare (uses portlogger))
(import portlogger)
(declare (uses nmsg-transport))
(import nmsg-transport)
;; Might want to bring the daemonizing back
;; (declare (uses daemon))
(include "common_records.scm")
(include "db_records.scm")
;;======================================================================
;; P K T S S T U F F
;;======================================================================
;;======================================================================
;; N A N O M S G B A S E D S E R V E R
;;======================================================================
;; information about me as a server
;;
(defstruct area
(conn #f)
(port #f)
(myaddr #f)
(hosts (make-hash-table))
pktid ;; get pkt from hosts table if needed
pktfile
pktsdir
mtrah
(mutex (make-mutex))
)
;; make it a global? Well, it is local to area module
(define *area-info* (make-area))
(define *pktspec*
`((server (hostname . h)
(port . p)
(pid . i)
(ipaddr . a)
)
(data (hostname . h) ;; sender hostname
(port . p) ;; sender port
(ipaddr . a) ;; sender ip
(hostkey . k) ;; sending host key - store info at server under this key
(servkey . s) ;; server key - this needs to match at server end or reject the msg
(format . f) ;; sb=serialized-base64, t=text, sx=sexpr, j=json
(data . d) ;; base64 encoded slln data
)))
(define (server:get-mtrah)
(or (get-environment-variable "MT_RUN_AREA_HOME")
(if (file-exists? "megatest.config")
(current-directory)
#f)))
;; get a port
;; start the nmsg server
;; look for other servers
;; contact other servers and compile list of servers
;; there are two types of server
;; main servers - dashboards, runners and dedicated servers - need pkt
;; passive servers - test executers, step calls, list-runs - no pkt
;;
(define (server:start-nmsg #!optional (force-server-type #f))
(mutex-lock! (area-mutex *area-info*))
(let* ((server-type (or force-server-type
(if (args:any? "-run" "-server")
'main
'passive)))
(port-num (portlogger:open-run-close portlogger:find-port))
(best-ip (server:get-my-best-address))
(area-conn (nmsg:start-server port-num))
;; (pktspec (area-pktspec *area-info*))
(mtdir (or (server:get-mtrah)
(begin
(print "ERROR: megatest.config not found and MT_RUN_AREA_HOME is not set.")
#f)))
(pktdir (conc mtdir
"/.server-pkts")))
(if (not mtdir)
#f
(begin
(if (not (directory? pktdir))(create-directory pktdir))
;; server is started, now create pkt if needed
(print "Starting server in " server-type " mode")
(if (eq? server-type 'main)
(begin
(area-pktid-set! *area-info*
(write-alist->pkt
pktdir
`((hostname . ,(get-host-name))
(ipaddr . ,best-ip)
(port . ,port-num)
(pid . ,(current-process-id)))
pktspec: *pktspec*
ptype: 'server))
(area-pktfile-set! *area-info* (conc pktdir "/" (area-pktid *area-info*) ".pkt"))))
;; set all the area info in the
(area-pktsdir-set! *area-info* pktdir)
(area-mtrah-set! *area-info* mtdir)
(area-conn-set! *area-info* area-conn)
(area-port-set! *area-info* port-num)
(mutex-unlock! (area-mutex *area-info*))
area-conn))))
;; action:
;; immediate - quick actions, no need to put in queues
;; dbwrite - put in dbwrite queue
;; dbread - put in dbread queue
;; oslong - os actions, e.g. du, that could take a long time
;; osshort - os actions that should be quick, e.g. df
;;
(define (server:std-handler dat)
(let* ((from-host (alist-ref 'hostname dat))
(from-port (alist-ref 'port dat))
(servkey (alist-ref 'servkey dat))
(hostkey (alist-ref 'hostkey dat))
(data (alist-ref 'data dat))
(action (alist-ref 'action dat)))
;; first, if you don't know who I am then I'm ignoring you
(if (not (equal? servkey (area-pktid *area-info*)))
`(#f . "I don't know you") ;; immediately return this
(case action ;; else carry on
((immediate)
(case data
((ping) `(#t "success"))
(else `(#t "I didn't recognise " data))))
((dbwrite) `(#t "db write submitted"))
((dbread) `(#t "db read submitted"))
((oslong) `(#t "os long submitted"))
((dbwrite) `(#t "os short submitted"))
(else `(#f "unrecognised action" action))))))
;; Call this to start the actual server
;;
;; start_server
;;
;; mode: '
;; handler: proc which takes pktrecieved as argument
;;
(define (server:launch mode #!optional (proc server:std-handler))
(let* ((start-time (current-seconds))
(rep (server:start-nmsg mode))
(last-msg-time (current-seconds))
(th1 (make-thread
(lambda ()
(let loop ()
(let ((dat (server:receive rep)))
(set! last-msg-time (current-seconds))
;; (print "received: " pktdat)
(if (not (eof-object? dat))
(let ((resdat (proc dat)))
(print "Got " dat)
(print "Responding with " resdat)
(nmsg:send rep (sexpr->string resdat)) ;; (with-output-to-string (lambda ()(write resdat))))
(loop))))))
"message handler"))
(th2 (make-thread
(lambda ()
(let loop ()
(thread-sleep! 10)
(if (> (- (current-seconds) last-msg-time) 60) ;; timeout after 60 seconds
(begin
(print "Waited for 60 seconds and no messages, exiting now.")
(exit))
(loop)))))))
(thread-start! th1)
(thread-start! th2)
(thread-join! th1)))
;; get the response
;;
(define (server:receive rep)
(let ((instr (nmsg:recv rep)))
(if (string? instr)
(string->sexpr instr) ;; (with-input-from-string instr read)
instr)))
(define (server:shutdown)
(let ((conn (area-conn *area-info*))
(pktf (area-pktfile *area-info*))
(port (area-port *area-info*)))
(if conn
(begin
(if pktf (delete-file* pktf))
(server:send-all "imshuttingdown")
(nmsg:close conn)
(portlogger:open-run-close portlogger:release-port port)))))
(define (server:send-all msg)
#f)
;; given a area record look up all the packets
;;
(define (server:get-all-server-pkts rec)
(let ((all-pkt-files (glob (conc (area-pktsdir rec) "/*.pkt"))))
;; (pktspec (area-pktspec rec)))
(map (lambda (pkt-file)
(read-pkt->alist pkt-file pktspec: *pktspec*))
all-pkt-files)))
#;((Z . "9a0212302295a19610d5796fce0370fa130758e9")
(port . "34827")
(pid . "28748")
(hostname . "zeus")
(T . "server")
(D . "1549427032.0"))
;; srvpkt is the info for the server we wish to send the message to
;;
(define (server:send servpkt data action)
(let* ((port (alist-ref 'port servpkt))
(host (alist-ref 'hostname servpkt))
(ip (alist-ref 'ipaddr servpkt))
(hkey (alist-ref 'Z servpkt))
(addr (conc (or ip host) ":" port)) ;; fall back to host if ip not provided
(myport (area-port *area-info*))
(myhost (area-myaddr *area-info*))
(mykey (area-pktid *area-info*))
(msg (with-output-to-string
(lambda ()
(write `((hostname . ,myhost)
(port . ,myport)
(servkey . ,hkey) ;; server looks at this to ensure message is for them
(hostkey . ,mykey)
(action . ,action) ;; formating of the message
(data . ,data))
;; *pktspec*
;; ptype: 'data))
)))))
;; (print "msg: " msg)
(if (and port host)
(string->sexpr ;; begin
;; (print "sending ")(pp msg)(print " to " addr)
(nmsg:open-send-receive addr msg))
#f)))
(define (server:get-my-best-address)
(ip->string (car (filter (lambda (x)
(not (eq? (u8vector-ref x 0) 127)))
(vector->list (hostinfo-addresses (hostname->hostinfo "zeus")))))))
;; whoami? I am my pkt
;;
(define (server:whoami? area)
(hash-table-ref/default (area-hosts area)(area-pktid area) #f))
;;======================================================================
;; "Client side" operations
;;======================================================================
;; convert to/from string / sexpr
(define (string->sexpr str)
(if (string? str)
(with-input-from-string str read)
str))
(define (sexpr->string s)
(with-output-to-string (lambda ()(write s))))
;; is the server alive?
;;
(define (server:ping servpkt)
(let* ((start-time (current-milliseconds))
(res (server:send servpkt 'ping 'immediate)))
(cons (- (current-milliseconds) start-time)
res))) ;; (equal? res "got ping"))))
;; look up all pkts and get the server id (the hash), port, host/ip
;; store this info in the global struct *area-info*
;;
;; pass in *area-info* as rec
;;
(define (server:update-known-servers rec)
;; readll all pkts
;; foreach pkt; if it isn't me ping the server; if alive, add to hosts hash, else rm the pkt
(let ((all-pkts (delete-duplicates
(append (server:get-all-server-pkts rec)
(hash-table-values (area-hosts rec)))))
(hostshash (area-hosts rec))
(my-id (area-pktid rec))
(pktsdir (area-pktsdir rec)) ;; needed to remove pkts from non-responsive servers
)
(for-each
(lambda (servpkt)
(let* ((res (server:ping servpkt))
(sid (alist-ref 'Z servpkt)) ;; Z code is our name for the server
)
(match res
((qduration . payload)
(print "Server pkt:")(pp servpkt)
(print "res: ")(pp res)
(match payload
((code message)
(print "code: " code " message: " message)
(if code
(hash-table-set! hostshash sid servpkt)
(print "got #f from the server, not sure what that means!")))
(else
(print "Got ")(pp res)(print " from server ")(pp servpkt) " but response did not match (#f/#t . msg)")))
(else
;; here we delete the pkt - can't reach the server, remove it
;; however this logic is inadequate. we should mark the server as checked
;; and not good, if it happens a second time - then remove the pkt
;; or something similar. I.e. don't be too quick to assume the server is wedged or dead
;; could be it is simply too busy to reply
(print "clearing out server " sid)
(delete-file* (conc pktsdir "/" sid ".pkt"))
(hash-table-delete! hostshash side)))))
all-pkts)))
;; send out an "I'm about to exit notice to all known servers"
;;
(define (server:imminent-death)
'())
;;======================================================================
;; S E R V E R U T I L I T I E S
;;======================================================================
;; get a signature for identifing this process
(define (server:get-process-signature)
(cons (get-host-name)(current-process-id)))