;; Copyright 2006-2017, Matthew Welland.
;;
;; This file is part of Megatest.
;;
;; Megatest is free software: you can redistribute it and/or modify
;; it under the terms of the GNU General Public License as published by
;; the Free Software Foundation, either version 3 of the License, or
;; (at your option) any later version.
;;
;; Megatest is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
;; GNU General Public License for more details.
;;
;; You should have received a copy of the GNU General Public License
;; along with Megatest. If not, see <http://www.gnu.org/licenses/>.
;;
(declare (unit servermod))
(declare (uses commonmod))
(declare (uses configfmod))
(declare (uses mtmod))
(declare (uses debugprint))
(declare (uses mtargs))
(module servermod
(
remote-hh-dat
server:mk-signature
common:wait-for-normalized-load
server:expiration-timeout
)
(import scheme
chicken)
(use (srfi 18) extras s11n)
(use srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest)
(use directory-utils posix-extras matchable utils)
(use spiffy uri-common intarweb http-client spiffy-request-vars)
(import ports
data-structures
files
srfi-4
typed-records
commonmod
configfmod
debugprint
(prefix mtargs args:)
mtmod
)
;; (include "common_records.scm")
;; (include "db_records.scm")
(define (server:make-server-url hostport)
(if (not hostport)
#f
(conc "http://" (car hostport) ":" (cadr hostport))))
(define *server-loop-heart-beat* (current-seconds))
;;======================================================================
;; P K T S S T U F F
;;======================================================================
;; ???
;;======================================================================
;; P K T S S T U F F
;;======================================================================
;; ???
;;======================================================================
;; S E R V E R
;;======================================================================
;; Call this to start the actual server
;;
;;======================================================================
;; S E R V E R U T I L I T I E S
;;======================================================================
;; Get the transport
(define (server:get-transport)
(if *transport-type*
*transport-type*
(let ((ttype (string->symbol
(or (args:get-arg "-transport")
(configf:lookup *configdat* "server" "transport")
"rpc"))))
(set! *transport-type* ttype)
ttype)))
;; Generate a unique signature for this server
(define (server:mk-signature)
(message-digest-string (md5-primitive)
(with-output-to-string
(lambda ()
(write (list (current-directory)
(current-process-id)
(argv)))))))
(define (server:get-client-signature)
(if *my-client-signature* *my-client-signature*
(let ((sig (server:mk-signature))) ;; clients re-use the server:mk-signature logic
(set! *my-client-signature* sig)
*my-client-signature*)))
(define (server:get-server-id)
(if *server-id* *server-id*
(let ((sig (server:mk-signature))) ;; clients re-use the server:mk-signature logic
(set! *server-id* sig)
*server-id*)))
;; ;; When using zmq this would send the message back (two step process)
;; ;; with spiffy or rpc this simply returns the return data to be returned
;; ;;
;; (define (server:reply return-addr query-sig success/fail result)
;; (debug:print-info 11 *default-log-port* "server:reply return-addr=" return-addr ", result=" result)
;; ;; (send-message pubsock target send-more: #t)
;; ;; (send-message pubsock
;; (case (server:get-transport)
;; ((rpc) (db:obj->string (vector success/fail query-sig result)))
;; ((http) (db:obj->string (vector success/fail query-sig result)))
;; ((fs) result)
;; (else
;; (debug:print-error 0 *default-log-port* "unrecognised transport type: " *transport-type*)
;; result)))
;; Given an area path, start a server process ### NOTE ### > file 2>&1
;; if the target-host is set
;; try running on that host
;; incidental: rotate logs in logs/ dir.
;;
(define (server:run areapath) ;; areapath is *toppath* for a given testsuite area
(let* ((testsuite (common:get-testsuite-name))
(logfile (conc areapath "/logs/server.log")) ;; -" curr-pid "-" target-host ".log"))
(profile-mode (or (configf:lookup *configdat* "misc" "profilesw")
""))
(cmdln (conc (common:get-megatest-exe)
" -server - ";; (or target-host "-")
(if (equal? (configf:lookup *configdat* "server" "daemonize") "yes")
" -daemonize "
"")
;; " -log " logfile
" -m testsuite:" testsuite
" " profile-mode
)) ;; (conc " >> " logfile " 2>&1 &")))))
(log-rotate (make-thread common:rotate-logs "server run, rotate logs thread")) ;; why are we rotating logs here? This is a sensitive location with a lot going on!?
(load-limit (configf:lookup-number *configdat* "jobtools" "max-server-start-load" default: 3.0)))
;; we want the remote server to start in *toppath* so push there
(push-directory areapath)
(debug:print 0 *default-log-port* "INFO: Trying to start server (" cmdln ") ...")
(thread-start! log-rotate)
;; host.domain.tld match host?
;; (if (and target-host
;; ;; look at target host, is it host.domain.tld or ip address and does it
;; ;; match current ip or hostname
;; (not (string-match (conc "("curr-host "|" curr-host"\\..*)") target-host))
;; (not (equal? curr-ip target-host)))
;; (begin
;; (debug:print-info 0 *default-log-port* "Starting server on " target-host ", logfile is " logfile)
;; (setenv "TARGETHOST" target-host)))
;;
(setenv "TARGETHOST_LOGF" logfile)
(thread-sleep! (/ (random 3000) 1000)) ;; add a random initial delay. It seems pretty common that many running tests request a server at the same time
(debug:print 0 *default-log-port* "INFO: starting server at " (common:human-time))
(system (conc "nbfake " cmdln))
(unsetenv "TARGETHOST_LOGF")
;; (if (get-environment-variable "TARGETHOST")(unsetenv "TARGETHOST"))
(thread-join! log-rotate)
(pop-directory)))
(define (server:logf-get-start-info logf)
(let ((server-rx (regexp "^SERVER STARTED: (\\S+):(\\d+) AT ([\\d\\.]+) server-id: (\\S+) pid: (\\d+)")) ;; SERVER STARTED: host:port AT timesecs server id
(dbprep-rx (regexp "^SERVER: dbprep"))
(dbprep-found 0)
(bad-dat (list #f #f #f #f #f)))
(handle-exceptions
exn
(begin
;; WARNING: this is potentially dangerous to blanket ignore the errors
(if (file-exists? logf)
(debug:print-info 2 *default-log-port* "Unable to get server info from "logf", exn=" exn))
bad-dat) ;; no idea what went wrong, call it a bad server
(with-input-from-file
logf
(lambda ()
(let loop ((inl (read-line))
(lnum 0))
(if (not (eof-object? inl))
(let ((mlst (string-match server-rx inl))
(dbprep (string-match dbprep-rx inl)))
(if dbprep (set! dbprep-found 1))
(if (not mlst)
(if (< lnum 500) ;; give up if more than 500 lines of server log read
(loop (read-line)(+ lnum 1))
(begin
(debug:print-info 0 *default-log-port* "Unable to get server info from first 500 lines of " logf )
bad-dat))
(match mlst
((_ host port start server-id pid)
(list host
(string->number port)
(string->number start)
server-id
(string->number pid)))
(else
(debug:print 0 *default-log-port* "ERROR: did not recognise SERVER line info "mlst)
bad-dat))))
(begin
(if dbprep-found
(begin
(debug:print-info 2 *default-log-port* "Server is in dbprep at " (common:human-time))
(thread-sleep! 0.5)) ;; was 25 sec but that blocked things from starting?
(debug:print-info 0 *default-log-port* "Unable to get server info from " logf " at " (seconds->time-string (current-seconds))))
bad-dat))))))))
(define (server:record->id servr)
(handle-exceptions
exn
(begin
(debug:print-info 0 *default-log-port* "Unable to get server id from " servr ", exn=" exn)
#f)
(match-let (((host port start-time server-id pid)
servr))
(if server-id
server-id
#f))))
(define (server:record->url servr)
(handle-exceptions
exn
(begin
(debug:print-info 0 *default-log-port* "Unable to get server url from " servr ", exn=" exn)
#f)
(match-let (((host port start-time server-id pid)
servr))
(if (and host port)
(conc host ":" port)
#f))))
;; oldest server alive determines host then choose random of youngest
;; five servers on that host
;;
(define (server:get-servers-info areapath)
;; (assert *toppath* "FATAL: server:get-servers-info called before *toppath* has been set.")
(let* ((servinfodir (server:get-servinfo-dir areapath))) ;; (conc *toppath*"/.servinfo")))
(if (not (file-exists? servinfodir))
(create-directory servinfodir))
(let* ((allfiles (glob (conc servinfodir"/*")))
(res (make-hash-table)))
(for-each
(lambda (f)
(let* ((hostport (pathname-strip-directory f))
(serverdat (server:logf-get-start-info f)))
(match serverdat
((host port start server-id pid)
(if (and host port start server-id pid)
(hash-table-set! res hostport serverdat)
(debug:print-info 2 *default-log-port* "bad server info for "f": "serverdat)))
(else
(debug:print-info 2 *default-log-port* "bad server info for "f": "serverdat)))))
allfiles)
res)))
;; check the .servinfo directory, are there other servers running on this
;; or another host?
;;
;; returns #t => ok to start another server
;; #f => not ok to start another server
;;
;; (define (server:minimal-check areapath)
;; (server:clean-up-old areapath)
;; (let* ((srvdir (server:get-servinfo-dir areapath)) ;; (conc areapath"/.servinfo"))
;; (servrs (glob (conc srvdir"/*")))
;; (thishostip (server:get-best-guess-address (get-host-name)))
;; (thisservrs (glob (conc srvdir"/"thishostip":*")))
;; (homehostinf (server:choose-server areapath 'homehost))
;; (havehome (car homehostinf))
;; (wearehome (cdr homehostinf)))
;; (debug:print-info 0 *default-log-port* thishostip", have homehost: "havehome", we are homehost: "wearehome
;; ", numservers: "(length thisservrs))
;; (cond
;; ((not havehome) #t) ;; no homehost yet, go for it
;; ((and havehome wearehome (< (length thisservrs) 20)) #t) ;; we are home and less than 20 servers, ok to start another
;; ((and havehome (not wearehome)) #f) ;; we are not the home host
;; ((and havehome wearehome (>= (length thisservrs) 20)) #f) ;; have enough running
;; (else
;; (debug:print 0 *default-log-port* "WARNING: Unrecognised scenario, servrs="servrs", thishostip="thishostip", thisservrs="thisservrs)
;; #t))))
(define server-last-start 0)
;; oldest server alive determines host then choose random of youngest
;; five servers on that host
;;
;; mode:
;; best - get best server (random of newest five)
;; home - get home host based on oldest server
;; info - print info
;; (define (server:choose-server areapath #!optional (mode 'best))
;; ;; age is current-starttime
;; ;; find oldest alive
;; ;; 1. sort by age ascending and ping until good
;; ;; find alive rand from youngest
;; ;; 1. sort by age descending
;; ;; 2. take five
;; ;; 3. check alive, discard if not and repeat
;; ;; first we clean up old server files
;; (assert (eq? (rmt:transport-mode) 'http) "FATAL: server:run called with rmt:transport-mode="(rmt:transport-mode))
;; (server:clean-up-old areapath)
;; (let* ((since-last (- (current-seconds) server-last-start))
;; (server-start-delay 10))
;; (if ( < (- (current-seconds) server-last-start) 10 )
;; (begin
;; (debug:print 2 *default-log-port* "server:choose-server: seconds since last server start: " (- (current-seconds) server-last-start))
;; (debug:print 2 *default-log-port* "server:choose-server: last server start less than " server-start-delay " seconds ago. Sleeping " server-start-delay " seconds")
;; (thread-sleep! server-start-delay)
;; )
;; (debug:print 2 *default-log-port* "server:choose-server: seconds since last server start: " (- (current-seconds) server-last-start))
;; )
;; )
;; (let* ((serversdat (server:get-servers-info areapath))
;; (servkeys (hash-table-keys serversdat))
;; (by-time-asc (if (not (null? servkeys)) ;; NOTE: Oldest is last
;; (sort servkeys ;; list of "host:port"
;; (lambda (a b)
;; (>= (list-ref (hash-table-ref serversdat a) 2)
;; (list-ref (hash-table-ref serversdat b) 2))))
;; '())))
;; (debug:print 2 *default-log-port* "server:choose-server: serversdat: " serversdat)
;; (debug:print 2 *default-log-port* "server:choose-server: servkeys: " servkeys)
;; (if (not (null? by-time-asc))
;; (let* ((oldest (last by-time-asc))
;; (oldest-dat (hash-table-ref serversdat oldest))
;; (host (list-ref oldest-dat 0))
;; (all-valid (filter (lambda (x)
;; (equal? host (list-ref (hash-table-ref serversdat x) 0)))
;; by-time-asc))
;; (best-ten (lambda ()
;; (if (> (length all-valid) 11)
;; (take (drop-right all-valid 1) 10) ;; remove the oldest from consideration so it can age out
;; (if (> (length all-valid) 8)
;; (drop-right all-valid 1)
;; all-valid))))
;; (names->dats (lambda (names)
;; (map (lambda (x)
;; (hash-table-ref serversdat x))
;; names)))
;; (am-home? (lambda ()
;; (let* ((currhost (get-host-name))
;; (bestadrs (server:get-best-guess-address currhost)))
;; (or (equal? host currhost)
;; (equal? host bestadrs))))))
;; (case mode
;; ((info)
;; (debug:print 0 *default-log-port* "oldest: "oldest-dat", selected host: "host", all-valid: "all-valid)
;; (debug:print 0 *default-log-port* "youngest: "(hash-table-ref serversdat (car all-valid))))
;; ((home) host)
;; ((homehost) (cons host (am-home?))) ;; shut up old code
;; ((home?) (am-home?))
;; ((best-ten)(names->dats (best-ten)))
;; ((all-valid)(names->dats all-valid))
;; ((best) (let* ((best-ten (best-ten))
;; (len (length best-ten)))
;; (hash-table-ref serversdat (list-ref best-ten (random len)))))
;; ((count)(length all-valid))
;; (else
;; (debug:print 0 *default-log-port* "ERROR: invalid command "mode)
;; #f)))
;; (begin
;; (server:run areapath)
;; (set! server-last-start (current-seconds))
;; ;; (thread-sleep! 3)
;; (case mode
;; ((homehost) (cons #f #f))
;; (else #f))))))
(define (server:get-servinfo-dir areapath)
(let* ((spath (conc areapath"/.servinfo")))
(if (not (file-exists? spath))
(create-directory spath #t))
spath))
(define (server:clean-up-old areapath)
;; any server file that has not been touched in ten minutes is effectively dead
(let* ((sfiles (glob (conc (server:get-servinfo-dir areapath)"/*"))))
(for-each
(lambda (sfile)
(let* ((modtime (handle-exceptions
exn
(begin
(debug:print 0 *default-log-port* "WARNING: failed to get modification file for "sfile)
(current-seconds))
(file-modification-time sfile))))
(if (and (number? modtime)
(> (- (current-seconds) modtime)
600))
(begin
(debug:print 0 *default-log-port* "WARNING: found old server info file "sfile", removing it.")
(handle-exceptions
exn
(debug:print 0 *default-log-port* "WARNING: failed to delete old server info file "sfile)
(delete-file sfile))))))
sfiles)))
;; timeout is hms string: 1h 5m 3s, default is 1 minute
;; This is currently broken. Just use the number of hours with no unit.
;; Default is 600 seconds.
;;
(define (server:expiration-timeout)
(let* ((tmo (configf:lookup *configdat* "server" "timeout")))
(if (string? tmo)
(let* ((num (string->number tmo)))
(if num
(* 3600 num)
(common:hms-string->seconds tmo)))
600 ;; this is the default
)))
(define (server:get-best-guess-address hostname)
(let ((res #f))
(for-each
(lambda (adr)
(if (not (eq? (u8vector-ref adr 0) 127))
(set! res adr)))
;; NOTE: This can fail when there is no mention of the host in /etc/hosts. FIXME
(vector->list (hostinfo-addresses (hostname->hostinfo hostname))))
(string-intersperse
(map number->string
(u8vector->list
(if res res (hostname->ip hostname)))) ".")))
;; moving this here as it needs access to db and cannot be in common.
;;
(define (server:get-bruteforce-syncer dbstruct #!key (fork-to-background #f) (persist-until-sync #f))
(debug:print "WARNING: bruteforce-syncer is called but has been disabled!")
(lambda ()
(debug:print "WARNING: bruteforce-syncer is called but has been disabled!")))
;;
(defstruct remote
;; transport to be used
;; http - use http-transport
;; http-read-cached - use http-transport for writes but in-mem cached for reads
(rmode 'http)
(hh-dat (let ((res (or ;; (server:choose-server *toppath* 'homehost)
(cons #f #f))))
(assert (pair? res)(conc "FATAL: hh-dat should be a pair, got "res))
res))
(server-url #f) ;; (server:check-if-running *toppath*) #f))
(server-id #f)
(server-info #f) ;; (if *toppath* (server:check-if-running *toppath*) #f))
(last-server-check 0) ;; last time we checked to see if the server was alive
(connect-time (current-seconds)) ;; when we first connected
(last-access (current-seconds)) ;; last time we talked to server
;; (conndat #f) ;; iface port api-uri api-url api-req seconds server-id
(server-timeout (server:expiration-timeout))
(force-server #f)
(ro-mode #f)
(ro-mode-checked #f) ;; flag that indicates we have checked for ro-mode
;; conndat stuff
(iface #f) ;; TODO: Consolidate this data with server-url and server-info above
(port #f)
(api-url #f)
(api-uri #f)
(api-req #f))
;;======================================================================
;; Rotate logs, logic:
;; if > 500k and older than 1 week:
;; remove previous compressed log and compress this log
;; WARNING: This proc operates assuming that it is in the directory above the
;; logs directory you wish to log-rotate.
;;
(define (common:rotate-logs)
(let* ((all-files (make-hash-table))
(stats (make-hash-table))
(inc-stat (lambda (key)
(hash-table-set! stats key (+ (hash-table-ref/default stats key 0) 1))))
(max-allowed (string->number (or (configf:lookup *configdat* "setup" "max-logfiles") "600")))) ;; name -> age
(if (not (directory-exists? "logs"))(create-directory "logs"))
(directory-fold
(lambda (file rem)
(handle-exceptions
exn
(begin
(debug:print-info 2 *default-log-port* "unable to rotate log " file ", probably handled by another process, this is safe to ignore. exn=" exn)
(debug:print 2 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn))
;; (print-call-chain (current-error-port)) ;;
)
(let* ((fullname (conc "logs/" file))
(mod-time (file-modification-time fullname))
(file-age (- (current-seconds) mod-time))
(file-old (> file-age (* 48 60 60)))
(file-big (> (file-size fullname) 200000)))
(hash-table-set! all-files file mod-time)
(if (or (and (string-match "^.*.log" file)
file-old
file-big)
(and (string-match "^server-.*.log" file)
file-old))
(let ((gzfile (conc fullname ".gz")))
(if (common:file-exists? gzfile)
(begin
(debug:print-info 0 *default-log-port* "removing " gzfile)
(delete-file* gzfile)
(hash-table-delete! all-files gzfile) ;; needed?
))
(debug:print-info 0 *default-log-port* "compressing " file)
(system (conc "gzip " fullname))
(inc-stat "gzipped")
(hash-table-set! all-files (conc file ".gz") file-age) ;; add the .gz file and remove the base file
(hash-table-delete! all-files file)
)
(if (and (> file-age (* (string->number (or (configf:lookup *configdat* "setup" "log-expire-days") "30")) 24 3600))
(file-exists? fullname)) ;; just in case it was gzipped - will get it next time
(handle-exceptions
exn
#f
(if (directory? fullname)
(begin
(debug:print-info 0 *default-log-port* fullname " in logs directory is a directory! Cannot rotate it, it is best to not put subdirectories in the logs dir.")
(inc-stat "directories"))
(begin
(delete-file* fullname)
(inc-stat "deleted")))
(hash-table-delete! all-files file)))))))
'()
"logs")
(for-each
(lambda (category)
(let ((quant (hash-table-ref/default stats category 0)))
(if (> quant 0)
(debug:print-info 0 *default-log-port* category " log files: " quant))))
`("deleted" "gzipped" "directories"))
(let ((num-logs (hash-table-size all-files)))
(if (> num-logs max-allowed) ;; because NFS => don't let number of logs exceed 300
(let ((files (take (sort (hash-table-keys all-files)
(lambda (a b)
(< (hash-table-ref all-files a)(hash-table-ref all-files b))))
(- num-logs max-allowed))))
(for-each
(lambda (file)
(let* ((fullname (conc "logs/" file)))
(if (directory? fullname)
(debug:print-info 0 *default-log-port* fullname " in logs directory is a directory! Cannot rotate it, it is best to not put subdirectories in the logs dir.")
(handle-exceptions
exn
(debug:print-error 0 *default-log-port* "failed to remove " fullname ", exn=" exn)
(delete-file* fullname)))))
files)
(debug:print-info 0 *default-log-port* "Deleted " (length files) " files from logs, keeping " max-allowed " files."))))))
;;======================================================================
;; E X I T H A N D L I N G
;;======================================================================
(define (std-signal-handler signum)
;; (signal-mask! signum)
(set! *time-to-exit* #t)
;;(debug:print-info 13 *default-log-port* "got signal "signum)
(debug:print-error 0 *default-log-port* "Received signal " signum " aaa exiting promptly")
;; (std-exit-procedure) ;; shouldn't need this since we are exiting and it will be called anyway
(exit))
(define (special-signal-handler signum)
;; (signal-mask! signum)
(set! *time-to-exit* #t)
;;(debug:print-info 13 *default-log-port* "got signal "signum)
(debug:print-error 0 *default-log-port* "Received signal " signum " sending email befor exiting!!")
;;TODO send email to notify admin contact listed in the config that the lisner got killed
;; (std-exit-procedure) ;; shouldn't need this since we are exiting and it will be called anyway
(exit))
(set-signal-handler! signal/int std-signal-handler) ;; ^C
(set-signal-handler! signal/term std-signal-handler)
;;======================================================================
;; calculate a delay number based on a droop curve
;; inputs are:
;; - load-in, load as from uptime, NOT normalized
;; - numcpus, number of cpus, ideally use the real cpus, not threads
;;
(define (common:get-delay load-in numcpus)
(let* ((ratio (/ load-in numcpus))
(new-option (configf:lookup *configdat* "load" "new-load-method"))
(paramstr (or (configf:lookup *configdat* "load" "exp-params")
"15 12 1281453987.9543 0.75")) ;; 5 4 10 1"))
(paramlst (map string->number (string-split paramstr))))
(if new-option
(begin
(cond ((and (>= ratio 0) (< ratio .5))
0)
((and (>= ratio 0.5) (<= ratio .9))
(* ratio (/ 5 .9)))
((and (> ratio .9) (<= ratio 1.1))
(+ 5 (* (- ratio .9) (/ 55 .2))))
((> ratio 1.1)
60)))
(match paramlst
((r1 r2 s1 s2)
(debug:print 3 *default-log-port* "Using params r1=" r1 " r2=" r2 " s1=" s1 " s2=" s2)
(min (max (/ (expt r1 (* r2 s2 ratio)) s1) 0) 30))
(else
(debug:print 0 *default-log-port* "BAD exp-params, should be \"r1 r2 s1 s2\" but got " paramstr)
30)))))
;;======================================================================
;; DO NOT CALL THIS DIRECTLY. It is called from common:wait-for-normalized-load
;; count - count down to zero, at some point we'd give up if the load never drops
;; num-tries - count down to zero number tries to get numcpus
;;
(define (common:wait-for-cpuload maxnormload numcpus-in
#!key (count 1000)
(msg #f)(remote-host #f)(num-tries 5))
(let* ((loadavg (common:get-cpu-load remote-host))
;; not possible to have zero. If we get 1, it's possible that we got the previous default, and we should check again
(numcpus (if (<= 1 numcpus-in)
(common:get-num-cpus remote-host) numcpus-in))
(first (car loadavg))
(next (cadr loadavg))
(adjmaxload (* maxnormload (max 1 numcpus))) ;; possible bug where numcpus (or could be maxload) is zero, crude
;; fallback is to at least use 1
;; effective load accounts for load jumps, this should elminate all the first-next-avg, adjwait, load-jump-limit
;; etc.
(effective-load (common:get-intercept first next))
(recommended-delay (common:get-delay effective-load numcpus))
(effective-host (or remote-host "localhost"))
(normalized-effective-load (/ effective-load numcpus))
(will-wait (> normalized-effective-load maxnormload)))
(if (and will-wait (> recommended-delay 1))
(let* ((actual-delay (min recommended-delay 30)))
(if (common:low-noise-print 30 (conc (round actual-delay) "-safe-load"))
(debug:print-info 0 *default-log-port* "Load control, delaying "
actual-delay " seconds to maintain safe load. current normalized effective load is "
normalized-effective-load". maxnormload = " maxnormload " numcpus = " numcpus " loadavg = " loadavg " effective-load = " effective-load))
(thread-sleep! actual-delay)))
(cond
;; bad data, try again to get the data
((not will-wait)
(if (common:low-noise-print 3600 (conc (round normalized-effective-load) "-load-acceptable-" effective-host))
(debug:print 0 *default-log-port* "Effective load on " effective-host " is acceptable at " effective-load " continuing.")))
((and (< first 0) ;; this indicates the loadavg data is bad - machine may not be reachable
(> num-tries 0))
(debug:print 0 *default-log-port* "WARNING: received bad data from get-cpu-load "
first ", we'll sleep 10s and try " num-tries " more times.")
(thread-sleep! 10)
(common:wait-for-cpuload maxnormload numcpus-in
count: count remote-host: remote-host num-tries: (- num-tries 1)))
;; need to wait for load to drop
((and will-wait ;; (> first adjmaxload)
(> count 0))
(debug:print-info 0 *default-log-port*
"Delaying 15" ;; adjwait
" seconds due to normalized effective load " normalized-effective-load ;; first
" exceeding max of " adjmaxload
" on server " (or remote-host (get-host-name))
" (normalized load-limit: " maxnormload ") " (if msg msg ""))
(thread-sleep! 15) ;; adjwait)
(common:wait-for-cpuload maxnormload numcpus count: (- count 1) msg: msg remote-host: remote-host)
;; put the message here to indicate came out of waiting
(debug:print-info 1 *default-log-port*
"On host: " effective-host
", effective load: " effective-load
", numcpus: " numcpus
", normalized effective load: " normalized-effective-load
))
;; overloaded and count expired (i.e. went to zero)
(else
(if (> num-tries 0) ;; should be "num-tries-left".
(if (common:low-noise-print 30 (conc (round effective-load) "-load-acceptable-" effective-host))
(debug:print 0 *default-log-port* "Load on " effective-host " is acceptable at effective normalized load of "
normalized-effective-load " continuing."))
(debug:print 0 *default-log-port* "Load on " effective-host ", "
first" could not be retrieved. Giving up and continuing."))))))
;;======================================================================
;; DO NOT CALL THIS DIRECTLY. It is called from common:wait-for-normalized-load
;;
;; (define (common:wait-for-cpuload maxload-in numcpus-in waitdelay #!key (count 1000) (msg #f)(remote-host #f)(force-maxload #f)(num-tries 5))
;; (let* ((loadavg (common:get-cpu-load remote-host))
;; (numcpus (if (<= 1 numcpus-in) ;; not possible to have zero. If we get 1, it's possible that we got the previous default, and we should check again
;; (common:get-num-cpus remote-host)
;; numcpus-in))
;; (maxload (if force-maxload
;; maxload-in
;; (if (number? maxload-in)
;; (max maxload-in 0.5)
;; 0.5))) ;; so maxload must be greater than 0.5 for now BUG - FIXME?
;; (first (car loadavg))
;; (next (cadr loadavg))
;; (adjmaxload (* maxload (max 1 numcpus))) ;; possible bug where
;; ;; numcpus (or could be
;; ;; maxload) is zero,
;; ;; crude fallback is to
;; ;; at least use 1
;; (loadjmp (- first (if (> next (* numcpus 0.7)) ;; could do something with average of first and next?
;; 0
;; next))) ;; we will force a conservative calculation any time next is large.
;; (first-next-avg (/ (+ first next) 2))
;; ;; add some randomness to the time to break any alignment
;; ;; where netbatch dumps many jobs to machines simultaneously
;; (adjwait (min (+ 300 (random 10)) (abs (* (+ (random 10)
;; (/ (- 1000 count) 10)
;; waitdelay)
;; (- first adjmaxload) ))))
;; (load-jump-limit (configf:lookup-number *configdat* "setup" "load-jump-limit"))
;; ;; effective load accounts for load jumps, this should elminate all the first-next-avg, adjwait, load-jump-limit
;; ;; etc.
;; (effective-load (common:get-intercept first next))
;; (effective-host (or remote-host "localhost"))
;; (normalized-effective-load (/ effective-load numcpus))
;; (will-wait (> normalized-effective-load maxload)))
;;
;; ;; let's let the user know once in a long while that load checking
;; ;; is happening but not constantly report it
;; #;(if (common:low-noise-print 30 (conc "cpuload" (or remote-host "localhost"))) ;; (> (random 100) 75) ;; about 25% of the time
;; (debug:print-info 1 *default-log-port* "Checking cpuload on " (or remote-host "localhost") ", maxload: " maxload
;; ", load: " first ", adjmaxload: " adjmaxload ", loadjmp: " loadjmp))
;;
;; (debug:print-info 1 *default-log-port*
;; "On host: " effective-host
;; ", effective load: " effective-load
;; ", numcpus: " numcpus
;; ", normalized effective load: " normalized-effective-load
;; )
;;
;; (cond
;; ;; bad data, try again to get the data
;; ((and (< first 0) ;; this indicates the loadavg data is bad - machine may not be reachable
;; (> num-tries 0))
;; (debug:print 0 *default-log-port* "WARNING: received bad data from get-cpu-load " first ", we'll sleep 10s and try " num-tries " more times.")
;; (thread-sleep! 10)
;; (common:wait-for-cpuload maxload-in numcpus-in waitdelay
;; count: count remote-host: remote-host force-maxload: force-maxload num-tries: (- num-tries 1)))
;; ;; need to wait for load to drop
;; ((and will-wait ;; (> first adjmaxload)
;; (> count 0))
;; (debug:print-info 0 *default-log-port*
;; "Delaying " 15 ;; adjwait
;; " seconds due to normalized effective load " normalized-effective-load ;; first
;; " exceeding max of " adjmaxload
;; " on server " (or remote-host (get-host-name))
;; " (normalized load-limit: " maxload ") " (if msg msg ""))
;; (thread-sleep! 15) ;; adjwait)
;; (common:wait-for-cpuload maxload numcpus waitdelay count: (- count 1) msg: msg remote-host: remote-host))
;; ((and (> loadjmp (cond
;; (load-jump-limit load-jump-limit)
;; ((> numcpus 8)(/ numcpus 2))
;; ((> numcpus 4)(/ numcpus 1.2))
;; (else 0.5)))
;; (> count 0))
;; (debug:print-info 0 *default-log-port* "waiting " adjwait " seconds due to possible load jump " loadjmp ". "
;; (if msg msg ""))
;; (thread-sleep! adjwait)
;; (common:wait-for-cpuload maxload numcpus waitdelay count: (- count 1) msg: msg remote-host: remote-host))
;; (else
;; (if (> num-tries 0)
;; (if (common:low-noise-print 30 (conc (round first) "-load-acceptable-" (or remote-host "localhost")))
;; (debug:print 0 *default-log-port* "Load on " (or remote-host "localhost") " is acceptable at " first " continuing."))
;; (debug:print 0 *default-log-port* "Load on " (or remote-host "localhost") ", "first" could not be retrieved. Giving up and continuing."))))))
;;
;;======================================================================
;; wait for normalized cpu load to drop below maxload
;;
(define (common:wait-for-normalized-load maxnormload msg remote-host #!optional (rem-tries 5))
(let ((num-cpus (common:get-num-cpus remote-host)))
(if num-cpus
(common:wait-for-cpuload maxnormload num-cpus 15 msg: msg remote-host: remote-host)
(begin
(thread-sleep! (random 60)) ;; we failed to get num cpus. wait a bit and try again
(if (> rem-tries 0)
(common:wait-for-normalized-load maxnormload msg remote-host (- rem-tries 1))
#f)))))
)