︙ | | | ︙ | |
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
(import (prefix sqlite3 sqlite3:))
(import (prefix base64 base64:))
(declare (unit common))
(include "common_records.scm")
;; (require-library margs)
;; (include "margs.scm")
;; (define old-exit exit)
;;
;; (define (exit . code)
|
>
>
|
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
(import (prefix sqlite3 sqlite3:))
(import (prefix base64 base64:))
(declare (unit common))
(include "common_records.scm")
(include "thunk-utils.scm")
;; (require-library margs)
;; (include "margs.scm")
;; (define old-exit exit)
;;
;; (define (exit . code)
|
︙ | | | ︙ | |
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
(mutex-lock! *context-mutex*)
(let ((cxt (hash-table-ref/default *contexts* toppath #f)))
(if (not cxt)
(set! cxt (let ((x (make-cxt)))(hash-table-set! *contexts* toppath x) x)))
(let ((cxt-mutex (cxt-mutex cxt)))
(mutex-unlock! *context-mutex*)
(mutex-lock! cxt-mutex)
(let ((res (proc cxt)))
(mutex-unlock! cxt-mutex)
res))))
(define *db-keys* #f)
(define *configinfo* #f) ;; raw results from setup, includes toppath and table from megatest.config
(define *runconfigdat* #f) ;; run configs data
(define *configdat* #f) ;; megatest.config data
(define *configstatus* #f) ;; status of data; 'fulldata : all processing done, #f : no data yet, 'partialdata : partial read done
|
>
>
>
>
>
>
|
>
>
|
>
>
|
>
|
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
(mutex-lock! *context-mutex*)
(let ((cxt (hash-table-ref/default *contexts* toppath #f)))
(if (not cxt)
(set! cxt (let ((x (make-cxt)))(hash-table-set! *contexts* toppath x) x)))
(let ((cxt-mutex (cxt-mutex cxt)))
(mutex-unlock! *context-mutex*)
(mutex-lock! cxt-mutex)
;; here we guard proc with exception handler so
;; no matter how proc succeeds or fails,
;; the cxt-mutex will be unlocked afterward.
(let* ((EXCEPTION-SYMBOL (gensym)) ;; use a generated symbol
(guarded-proc ;; to avoid collision
(lambda args
(let* ((res (condition-case
(apply proc args)
[x () (cons EXCEPTION-SYMBOL x)])))
(mutex-unlock! cxt-mutex)
(if (and (pair? res) (eq? (car res) EXCEPTION))
(abort (cdr res))
res)))))
(guarded-proc cxt)))))
(define *db-keys* #f)
(define *configinfo* #f) ;; raw results from setup, includes toppath and table from megatest.config
(define *runconfigdat* #f) ;; run configs data
(define *configdat* #f) ;; megatest.config data
(define *configstatus* #f) ;; status of data; 'fulldata : all processing done, #f : no data yet, 'partialdata : partial read done
|
︙ | | | ︙ | |
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
(define *task-db* #f) ;; (vector db path-to-db)
(define *db-access-allowed* #t) ;; flag to allow access
(define *db-access-mutex* (make-mutex))
(define *db-cache-path* #f)
;; SERVER
(define *my-client-signature* #f)
(define *transport-type* 'http) ;; override with [server] transport http|rpc|nmsg
(define *runremote* #f) ;; if set up for server communication this will hold <host port>
(define *max-cache-size* 0)
(define *logged-in-clients* (make-hash-table))
(define *server-id* #f)
(define *server-info* #f)
(define *time-to-exit* #f)
(define *server-run* #t)
|
|
>
>
>
>
>
>
>
>
>
>
|
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
(define *task-db* #f) ;; (vector db path-to-db)
(define *db-access-allowed* #t) ;; flag to allow access
(define *db-access-mutex* (make-mutex))
(define *db-cache-path* #f)
;; SERVER
(define *my-client-signature* #f)
(define *transport-type* #f) ;; override with [server] transport http|rpc|nmsg
(define (common:set-transport-type)
(set! *transport-type*
(string->symbol
(or
(args:get-arg "-transport")
(configf:lookup *configdat* "server" "transport")
"http")))
*transport-type*)
(define *runremote* #f) ;; if set up for server communication this will hold <host port>
(define *max-cache-size* 0)
(define *logged-in-clients* (make-hash-table))
(define *server-id* #f)
(define *server-info* #f)
(define *time-to-exit* #f)
(define *server-run* #t)
|
︙ | | | ︙ | |
598
599
600
601
602
603
604
605
606
607
608
609
610
611
|
(thread-sleep! 1)
(delay-loop (+ count 1))))
(loop)))
(if (common:low-noise-print 30)
(debug:print-info 0 *default-log-port* "Exiting watchdog timer, *time-to-exit* = " *time-to-exit*)))))))
(define (std-exit-procedure)
(let ((no-hurry (if *time-to-exit* ;; hurry up
#f
(begin
(set! *time-to-exit* #t)
#t))))
(debug:print-info 4 *default-log-port* "starting exit process, finalizing databases.")
(if (and no-hurry (debug:debug-mode 18))
|
>
|
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
|
(thread-sleep! 1)
(delay-loop (+ count 1))))
(loop)))
(if (common:low-noise-print 30)
(debug:print-info 0 *default-log-port* "Exiting watchdog timer, *time-to-exit* = " *time-to-exit*)))))))
(define (std-exit-procedure)
(let ((no-hurry (if *time-to-exit* ;; hurry up
#f
(begin
(set! *time-to-exit* #t)
#t))))
(debug:print-info 4 *default-log-port* "starting exit process, finalizing databases.")
(if (and no-hurry (debug:debug-mode 18))
|
︙ | | | ︙ | |
626
627
628
629
630
631
632
633
634
635
636
637
638
639
|
(debug:print 4 *default-log-port* "Attempting clean exit. Please be patient and wait a few seconds...")
(if no-hurry
(thread-sleep! 5) ;; give the clean up few seconds to do it's stuff
(thread-sleep! 2))
(debug:print 4 *default-log-port* " ... done")
)
"clean exit")))
(thread-start! th1)
(thread-start! th2)
(thread-join! th1))))
(define (std-signal-handler signum)
;; (signal-mask! signum)
(set! *time-to-exit* #t)
|
>
>
>
>
>
>
>
>
>
|
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
|
(debug:print 4 *default-log-port* "Attempting clean exit. Please be patient and wait a few seconds...")
(if no-hurry
(thread-sleep! 5) ;; give the clean up few seconds to do it's stuff
(thread-sleep! 2))
(debug:print 4 *default-log-port* " ... done")
)
"clean exit")))
;; let's try to clean up open sockets
(if *runremote*
(case (remote-transport *runremote*)
((http) (close-all-connections!))
((rpc) (rpc:close-all-connections!))
(else
(debug:print-info 0 *default-log-port* "Transport "(remote-transport *runremote*)" not supported"))))
(thread-start! th1)
(thread-start! th2)
(thread-join! th1))))
(define (std-signal-handler signum)
;; (signal-mask! signum)
(set! *time-to-exit* #t)
|
︙ | | | ︙ | |
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
|
(map (lambda (res)
(if (eof-object? res) 9e99 res))
(with-input-from-pipe
(conc "ssh " remote-host " cat /proc/loadavg")
(lambda ()(list (read)(read)(read)))))
(with-input-from-file "/proc/loadavg"
(lambda ()(list (read)(read)(read))))))
(define (common:wait-for-cpuload maxload numcpus waitdelay #!key (count 1000) (msg #f)(remote-host #f))
(let* ((loadavg (common:get-cpu-load remote-host))
(first (car loadavg))
(next (cadr loadavg))
(adjload (* maxload numcpus))
(loadjmp (- first next)))
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
|
(map (lambda (res)
(if (eof-object? res) 9e99 res))
(with-input-from-pipe
(conc "ssh " remote-host " cat /proc/loadavg")
(lambda ()(list (read)(read)(read)))))
(with-input-from-file "/proc/loadavg"
(lambda ()(list (read)(read)(read))))))
;; get normalized cpu load by reading from /proc/loadavg and /proc/cpuinfo return all three values and the number of real cpus and the number of threads
;; returns list (normalized-proc-load normalized-core-load 1m 5m 15m ncores nthreads)
;;
(define (common:get-normalized-cpu-load remote-host)
(let ((data (if remote-host
(with-input-from-pipe
(conc "ssh " remote-host " cat /proc/loadavg;cat /proc/cpuinfo;echo end")
read-lines)
(append
(with-input-from-file "/proc/loadavg"
read-lines)
(with-input-from-file "/proc/cpuinfo"
read-lines)
(list "end"))))
(load-rx (regexp "^([\\d\\.]+)\\s+([\\d\\.]+)\\s+([\\d\\.]+)\\s+.*$"))
(proc-rx (regexp "^processor\\s+:\\s+(\\d+)\\s*$"))
(core-rx (regexp "^core id\\s+:\\s+(\\d+)\\s*$"))
(phys-rx (regexp "^physical id\\s+:\\s+(\\d+)\\s*$"))
(max-num (lambda (p n)(max (string->number p) n))))
;; (print "data=" data)
(if (null? data) ;; something went wrong
#f
(let loop ((hed (car data))
(tal (cdr data))
(loads #f)
(proc-num 0) ;; processor includes threads
(phys-num 0) ;; physical chip on motherboard
(core-num 0)) ;; core
;; (print hed ", " loads ", " proc-num ", " phys-num ", " core-num)
(if (null? tal) ;; have all our data, calculate normalized load and return result
(let* ((act-proc (+ proc-num 1))
(act-phys (+ phys-num 1))
(act-core (+ core-num 1))
(adj-proc-load (/ (car loads) act-proc))
(adj-core-load (/ (car loads) act-core)))
(append (list (cons 'adj-proc-load adj-proc-load)
(cons 'adj-core-load adj-core-load))
(list (cons '1m-load (car loads))
(cons '5m-load (cadr loads))
(cons '15m-load (caddr loads)))
(list (cons 'proc act-proc)
(cons 'core act-core)
(cons 'phys act-phys))))
(regex-case
hed
(load-rx ( x l1 l5 l15 ) (loop (car tal)(cdr tal)(map string->number (list l1 l5 l15)) proc-num phys-num core-num))
(proc-rx ( x p ) (loop (car tal)(cdr tal) loads (max-num p proc-num) phys-num core-num))
(phys-rx ( x p ) (loop (car tal)(cdr tal) loads proc-num (max-num p phys-num) core-num))
(core-rx ( x c ) (loop (car tal)(cdr tal) loads proc-num phys-num (max-num c core-num)))
(else
(begin
;; (print "NO MATCH: " hed)
(loop (car tal)(cdr tal) loads proc-num phys-num core-num)))))))))
(define (common:wait-for-cpuload maxload numcpus waitdelay #!key (count 1000) (msg #f)(remote-host #f))
(let* ((loadavg (common:get-cpu-load remote-host))
(first (car loadavg))
(next (cadr loadavg))
(adjload (* maxload numcpus))
(loadjmp (- first next)))
|
︙ | | | ︙ | |