1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
;;======================================================================
;; Copyright 2006-2012, Matthew Welland.
;;
;; This program is made available under the GNU GPL version 2.0 or
;; greater. See the accompanying file COPYING for details.
;;
;; This program is distributed WITHOUT ANY WARRANTY; without even the
;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
;; PURPOSE.
;;======================================================================
(use srfi-1 posix regex-case base64 format dot-locking csv-xml z3 sql-de-lite hostinfo md5 message-digest typed-records directory-utils stack
matchable)
(require-extension regex posix)
(require-extension (srfi 18) extras tcp rpc)
(import (prefix sqlite3 sqlite3:))
(import (prefix base64 base64:))
|
|
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
;;======================================================================
;; Copyright 2006-2012, Matthew Welland.
;;
;; This program is made available under the GNU GPL version 2.0 or
;; greater. See the accompanying file COPYING for details.
;;
;; This program is distributed WITHOUT ANY WARRANTY; without even the
;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
;; PURPOSE.
;;======================================================================
(use srfi-1 data-structures posix regex-case base64 format dot-locking csv-xml z3 sql-de-lite hostinfo md5 message-digest typed-records directory-utils stack
matchable)
(require-extension regex posix)
(require-extension (srfi 18) extras tcp rpc)
(import (prefix sqlite3 sqlite3:))
(import (prefix base64 base64:))
|
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
;; (define old-exit exit)
;;
;; (define (exit . code)
;; (if (null? code)
;; (old-exit)
;; (old-exit code)))
(define getenv get-environment-variable)
(define (safe-setenv key val)
(if (substring-index ":" key) ;; variables containing : are for internal use and cannot be environment variables.
(debug:print-error 4 *default-log-port* "skip setting internal use only variables containing \":\"")
(if (and (string? val)
(string? key))
(handle-exceptions
exn
(debug:print-error 0 *default-log-port* "bad value for setenv, key=" key ", value=" val)
(setenv key val))
(debug:print-error 0 *default-log-port* "bad value for setenv, key=" key ", value=" val))))
(define home (getenv "HOME"))
(define user (getenv "USER"))
;; GLOBALS
;; CONTEXTS
(defstruct cxt
(taskdb #f)
(cmutex (make-mutex)))
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
;; (define old-exit exit)
;;
;; (define (exit . code)
;; (if (null? code)
;; (old-exit)
;; (old-exit code)))
;; execute thunk, return value. If exception thrown, trap exception, return #f, and emit nonfatal condition note to *default-log-port* .
;; arguments - thunk, message
(define (common:fail-safe thunk warning-message-on-exception)
(handle-exceptions
exn
(begin
(debug:print-info 0 *default-log-port* "notable but nonfatal condition - "warning-message-on-exception)
(debug:print-info 0 *default-log-port*
(string-substitute "\n?Error:" "nonfatal condition:"
(with-output-to-string
(lambda ()
(print-error-message exn) ))))
(debug:print-info 0 *default-log-port* " -- continuing after nonfatal condition...")
#f)
(thunk)))
(define getenv get-environment-variable)
(define (safe-setenv key val)
(if (substring-index ":" key) ;; variables containing : are for internal use and cannot be environment variables.
(debug:print-error 4 *default-log-port* "skip setting internal use only variables containing \":\"")
(if (and (string? val)
(string? key))
(handle-exceptions
exn
(debug:print-error 0 *default-log-port* "bad value for setenv, key=" key ", value=" val)
(setenv key val))
(debug:print-error 0 *default-log-port* "bad value for setenv, key=" key ", value=" val))))
(define home (getenv "HOME"))
(define user (getenv "USER"))
;; returns list of fd count, socket count
(define (get-file-descriptor-count #!key (pid (current-process-id )))
(list
(length (glob (conc "/proc/" pid "/fd/*")))
(length (filter identity (map socket? (glob (conc "/proc/" pid "/fd/*")))))
)
)
;; GLOBALS
;; CONTEXTS
(defstruct cxt
(taskdb #f)
(cmutex (make-mutex)))
|
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
|
(defstruct remote
(hh-dat (common:get-homehost)) ;; homehost record ( addr . hhflag )
(server-url (if *toppath* (server:check-if-running *toppath*))) ;; (server:check-if-running *toppath*) #f))
(last-server-check 0) ;; last time we checked to see if the server was alive
(conndat #f)
(transport *transport-type*)
(server-timeout (server:get-timeout)) ;; default from server:get-timeout
(force-server #f)
(ro-mode #f)
(ro-mode-checked #f)) ;; flag that indicates we have checked for ro-mode
;; launching and hosts
(defstruct host
(reachable #f)
|
|
|
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
|
(defstruct remote
(hh-dat (common:get-homehost)) ;; homehost record ( addr . hhflag )
(server-url (if *toppath* (server:check-if-running *toppath*))) ;; (server:check-if-running *toppath*) #f))
(last-server-check 0) ;; last time we checked to see if the server was alive
(conndat #f)
(transport *transport-type*)
(server-timeout (server:expiration-timeout))
(force-server #f)
(ro-mode #f)
(ro-mode-checked #f)) ;; flag that indicates we have checked for ro-mode
;; launching and hosts
(defstruct host
(reachable #f)
|
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
|
(first (car loadavg))
(next (cadr loadavg))
(adjload (* maxload numcpus))
(loadjmp (- first next)))
(cond
((and (> first adjload)
(> count 0))
(debug:print-info 0 *default-log-port* "server start delayed " waitdelay " seconds due to load " first " exceeding max of " adjload " (normalized load-limit: " maxload ") " (if msg msg ""))
(thread-sleep! waitdelay)
(common:wait-for-cpuload maxload numcpus waitdelay count: (- count 1)))
((and (> loadjmp numcpus)
(> count 0))
(debug:print-info 0 *default-log-port* "waiting " waitdelay " seconds due to load jump " loadjmp " > numcpus " numcpus (if msg msg ""))
(thread-sleep! waitdelay)
(common:wait-for-cpuload maxload numcpus waitdelay count: (- count 1))))))
(define (common:wait-for-homehost-load maxload msg)
(let* ((hh-dat (if (common:on-homehost?) ;; if we are on the homehost then pass in #f so the calls are local.
#f
(common:get-homehost)))
(hh (if hh-dat (car hh-dat) #f))
(numcpus (common:get-num-cpus hh)))
(common:wait-for-normalized-load maxload msg: msg remote-host: hh)))
(define (common:get-num-cpus remote-host)
(let ((proc (lambda ()
(let loop ((numcpu 0)
(inl (read-line)))
(if (eof-object? inl)
numcpu
(loop (if (string-match "^processor\\s+:\\s+\\d+$" inl)
(+ numcpu 1)
numcpu)
(read-line)))))))
(if remote-host
(with-input-from-pipe
(conc "ssh " remote-host " cat /proc/cpuinfo")
proc)
(with-input-from-file "/proc/cpuinfo" proc))))
;; wait for normalized cpu load to drop below maxload
;;
(define (common:wait-for-normalized-load maxload #!key (msg #f)(remote-host #f))
(let ((num-cpus (common:get-num-cpus remote-host)))
(common:wait-for-cpuload maxload num-cpus 15 msg: msg remote-host: remote-host)))
(define (get-uname . params)
(let* ((uname-res (process:cmd-run->list (conc "uname " (if (null? params) "-a" (car params)))))
(uname #f))
(if (null? (car uname-res))
|
|
|
|
|
|
|
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
|
(first (car loadavg))
(next (cadr loadavg))
(adjload (* maxload numcpus))
(loadjmp (- first next)))
(cond
((and (> first adjload)
(> count 0))
(debug:print-info 0 *default-log-port* "server start delayed " waitdelay " seconds due to load " first " exceeding max of " adjload " on server " (or remote-host (get-host-name)) " (normalized load-limit: " maxload ") " (if msg msg ""))
(thread-sleep! waitdelay)
(common:wait-for-cpuload maxload numcpus waitdelay count: (- count 1) msg: msg remote-host: remote-host))
((and (> loadjmp numcpus)
(> count 0))
(debug:print-info 0 *default-log-port* "waiting " waitdelay " seconds due to load jump " loadjmp " > numcpus " numcpus (if msg msg ""))
(thread-sleep! waitdelay)
(common:wait-for-cpuload maxload numcpus waitdelay count: (- count 1) msg: msg remote-host: remote-host)))))
(define (common:wait-for-homehost-load maxload msg)
(let* ((hh-dat (if (common:on-homehost?) ;; if we are on the homehost then pass in #f so the calls are local.
#f
(common:get-homehost)))
(hh (if hh-dat (car hh-dat) #f))
(numcpus (common:get-num-cpus hh)))
(common:wait-for-normalized-load maxload msg hh)))
(define (common:get-num-cpus remote-host)
(let ((proc (lambda ()
(let loop ((numcpu 0)
(inl (read-line)))
(if (eof-object? inl)
numcpu
(loop (if (string-match "^processor\\s+:\\s+\\d+$" inl)
(+ numcpu 1)
numcpu)
(read-line)))))))
(if remote-host
(with-input-from-pipe
(conc "ssh " remote-host " cat /proc/cpuinfo")
proc)
(with-input-from-file "/proc/cpuinfo" proc))))
;; wait for normalized cpu load to drop below maxload
;;
(define (common:wait-for-normalized-load maxload msg remote-host)
(let ((num-cpus (common:get-num-cpus remote-host)))
(common:wait-for-cpuload maxload num-cpus 15 msg: msg remote-host: remote-host)))
(define (get-uname . params)
(let* ((uname-res (process:cmd-run->list (conc "uname " (if (null? params) "-a" (car params)))))
(uname #f))
(if (null? (car uname-res))
|