Overview
Comment: | Added interface to the monitor db and appropriate handling thereof. |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | trunk | v1.506 |
Files: | files | file ages | folders |
SHA1: |
5f757480e6441e210f1f71d1a87ddbd9 |
User & Date: | mrwellan on 2012-11-02 17:36:08 |
Other Links: | manifest | tags |
Context
2012-11-02
| ||
18:33 | borked server heartbeat logic check-in: ece909ab1c user: mrwellan tags: trunk | |
17:36 | Added interface to the monitor db and appropriate handling thereof. check-in: 5f757480e6 user: mrwellan tags: trunk, v1.506 | |
13:19 | Made repl use non-blocking client mode check-in: 50f33a00a7 user: mrwellan tags: trunk, v1.5105 | |
Changes
Modified db.scm from [2a2b5ea15a] to [0f75487810].
︙ | ︙ | |||
231 232 233 234 235 236 237 | ;;====================================================================== ;; T E S T S P E C I F I C D B ;;====================================================================== ;; Create the sqlite db for the individual test(s) (define (open-test-db testpath) (debug:print-info 11 "open-test-db " testpath) | > | | 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 | ;;====================================================================== ;; T E S T S P E C I F I C D B ;;====================================================================== ;; Create the sqlite db for the individual test(s) (define (open-test-db testpath) (debug:print-info 11 "open-test-db " testpath) (if (and testpath (directory? testpath) (file-read-access? testpath)) (let* ((dbpath (conc testpath "/testdat.db")) (dbexists (file-exists? dbpath)) (db (sqlite3:open-database dbpath)) ;; (never-give-up-open-db dbpath)) (handler (make-busy-timeout (if (args:get-arg "-override-timeout") (string->number (args:get-arg "-override-timeout")) 136000)))) |
︙ | ︙ |
Modified megatest-version.scm from [0ac622c7df] to [4bf7ad110a].
1 2 3 4 5 | ;; Always use two digit decimal ;; 1.01, 1.02...1.10,1.11 ... 1.99,2.00.. (declare (unit megatest-version)) | | | 1 2 3 4 5 6 7 | ;; Always use two digit decimal ;; 1.01, 1.02...1.10,1.11 ... 1.99,2.00.. (declare (unit megatest-version)) (define megatest-version 1.5106) |
Modified megatest.scm from [9c15c3f066] to [1433ed4462].
︙ | ︙ | |||
270 271 272 273 274 275 276 | (server:launch))) (if (or (args:get-arg "-listservers") (args:get-arg "-killserver")) (let ((tl (setup-for-run))) (if tl (let ((servers (open-run-close tasks:get-all-servers tasks:open-db)) | | | | > | | | | | > > | | > > > | | > < > | 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 | (server:launch))) (if (or (args:get-arg "-listservers") (args:get-arg "-killserver")) (let ((tl (setup-for-run))) (if tl (let ((servers (open-run-close tasks:get-all-servers tasks:open-db)) (fmtstr "~5a~8a~8a~20a~20a~10a~20a~10a~10a\n") (servers-to-kill '())) (format #t fmtstr "Id" "MTver" "Pid" "Host" "Interface" "Port" "Time" "Priority" "State") (format #t fmtstr "==" "=====" "===" "====" "=========" "====" "====" "========" "=====") (for-each (lambda (server) (let* ((killinfo (args:get-arg "-killserver")) (khost-port (if killinfo (if (substring-index ":" killinfo)(string-split ":") #f) #f)) (kpid (if killinfo (if (substring-index ":" killinfo) #f (string->number killinfo)) #f)) (id (vector-ref server 0)) (pid (vector-ref server 1)) (hostname (vector-ref server 2)) (interface (vector-ref server 3)) (port (vector-ref server 4)) (start-time (vector-ref server 5)) (priority (vector-ref server 6)) (state (vector-ref server 7)) (mt-ver (vector-ref server 8)) (status (open-run-close tasks:server-alive? tasks:open-db hostname port: port)) (killed #f) (zmq-socket (if status (server:client-connect hostname port) #f))) ;; no need to login as status of #t indicates we are connecting to correct ;; server (if (or (not status) ;; no point in keeping dead records in the db (and khost-port ;; kill by host/port (equal? hostname (car khost-port)) (equal? port (string->number (cadr khost-port))))) (begin (open-run-close tasks:server-deregister tasks:open-db hostname port: port) (if status ;; #t means alive (begin (if (equal? hostname (get-host-name)) (process-signal pid signal/term) (cdb:kill-server zmq-socket)) (debug:print-info 1 "Killed server by host:port at " hostname ":" port)) (debug:print-info 1 "Removing defunct server record for " hostname ":" port)) (set! killed #t))) (if (and kpid ;; (equal? hostname (car khost-port)) (equal? kpid pid)) ;;; YEP, ALL WITH PID WILL BE KILLED!!! (begin (open-run-close tasks:server-deregister tasks:open-db hostname pid: pid) (set! killed #t) (if status (if (equal? hostname (get-host-name)) (process-signal pid signal/term) (debug:print 0 "WARNING: Can't kill a dead server on host " hostname))) (debug:print-info 1 "Killed server by pid at " hostname ":" port))) ;; (if zmq-socket (close-socket zmq-socket)) (format #t fmtstr id mt-ver pid hostname interface port start-time priority (if status "alive" "dead")))) servers) (debug:print-info 1 "Done with listservers") (set! *didsomething* #t) (exit) ;; must do, would have to add checks to many/all calls below ) (exit))) ;; if not list or kill then start a client (if appropriate) (if (or (args-defined? "-h" "-version" "-gen-megatest-area" "-gen-megatest-test") (eq? (length (hash-table-keys args:arg-hash)) 0)) (debug:print-info 1 "Server connection not needed") (server:client-launch do-ping: #t))) |
︙ | ︙ |
Modified server.scm from [59c1e6d986] to [000964c6b9].
︙ | ︙ | |||
33 34 35 36 37 38 39 | (debug:print 0 "Attempting to start the server ...") (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") (exit)))) (let* ((zmq-socket #f) | | | > > | | 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 | (debug:print 0 "Attempting to start the server ...") (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") (exit)))) (let* ((zmq-socket #f) (iface (if (string=? "-" hostn) "*" ;; (get-host-name) hostn)) (hostname (get-host-name)) (ipaddrstr (let ((ipstr (if (string=? "-" hostn) (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") #f))) (if ipstr ipstr hostname)))) ;; (set! zmq-socket (server:find-free-port-and-open iface zmq-socket 5555 0)) (set! zmq-socket (server:find-free-port-and-open ipaddrstr zmq-socket 5555 0)) (set! *cache-on* #t) ;; what to do when we quit ;; (on-exit (lambda () (if (and *toppath* *server-id*) (begin (open-run-close tasks:server-deregister-self tasks:open-db ipaddrstr)) (let loop () (let ((queue-len 0)) (thread-sleep! (random 5)) (mutex-lock! *incoming-mutex*) (set! queue-len (length *incoming-data*)) (mutex-unlock! *incoming-mutex*) (if (> queue-len 0) |
︙ | ︙ | |||
107 108 109 110 111 112 113 | (set! *time-to-exit* #t) (open-run-close tasks:server-deregister-self tasks:open-db) (thread-sleep! 1) (debug:print-info 0 "Max cached queries was " *max-cache-size*) (debug:print-info 0 "Server shutdown complete. Exiting") (exit))))))) | | | > > > | | | | | | | | 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 | (set! *time-to-exit* #t) (open-run-close tasks:server-deregister-self tasks:open-db) (thread-sleep! 1) (debug:print-info 0 "Max cached queries was " *max-cache-size*) (debug:print-info 0 "Server shutdown complete. Exiting") (exit))))))) (define (server:find-free-port-and-open iface s port #!key (trynum 50)) (let ((s (if s s (make-socket 'rep))) (p (if (number? port) port 5555)) (old-handler (current-exception-handler))) (handle-exceptions exn (begin (debug:print 0 "Failed to bind to port " p ", trying next port") (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) ;; (old-handler) ;; (print-call-chain) (if (> trynum 0) (server:find-free-port-and-open iface s (+ p 1) trynum: (- trynum 1)) (debug:print-info 0 "Tried ports up to " p " but all were in use. Please try a different port range by starting the server with parameter \" -port N\" where N is the starting port number to use"))) (let ((zmq-url (conc "tcp://" iface ":" p))) (print "Trying to start server on " zmq-url) (bind-socket s zmq-url) (set! *runremote* #f) (debug:print 0 "Server started on " zmq-url) (set! *server-id* (open-run-close tasks:server-register tasks:open-db (current-process-id) iface p 0 'live)) s)))) (define (server:mk-signature) (message-digest-string (md5-primitive) (with-output-to-string (lambda () (write (list (current-directory) (argv))))))) (define (server:get-client-signature) (if *my-client-signature* *my-client-signature* (let ((sig (server:mk-signature))) (set! *my-client-signature* sig) *my-client-signature*))) ;; (define (server:client-connect iface port #!key (context #f)) (debug:print 3 "client-connect " iface ":" port) (let ((connect-ok #f) (zmq-socket (if context (make-socket 'req context) (make-socket 'req))) (conurl (server:make-server-url (list iface port)))) (if (socket? zmq-socket) (begin (connect-socket zmq-socket conurl) zmq-socket) #f))) |
︙ | ︙ | |||
173 174 175 176 177 178 179 | (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: failed to find megatest.config, exiting") (exit)))) (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db do-ping: do-ping))) (if hostinfo | | | | < < < | | | | | | | | | | | | | | | | | | | | | | | 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 | (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: failed to find megatest.config, exiting") (exit)))) (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db do-ping: do-ping))) (if hostinfo (let ((host (car hostinfo)) (iface (cadr hostinfo)) (port (caddr hostinfo))) (debug:print-info 2 "Setting up to connect to " hostinfo) (handle-exceptions exn (begin (debug:print 0 "ERROR: Failed to open a connection to the server at: " hostinfo) (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) (debug:print 0 " perhaps jobs killed with -9? Removing server records") (open-run-close tasks:server-deregister tasks:open-db host port: port) #f) (let* ((zmq-socket (server:client-connect iface port)) (login-res (server:client-login zmq-socket)) (connect-ok (if (null? login-res) #f (car login-res))) (conurl (server:make-server-url (list iface port)))) (if connect-ok (begin (debug:print-info 2 "Logged in and connected to " conurl) (set! *runremote* zmq-socket) #t) (begin (debug:print-info 2 "Failed to login or connect to " conurl) (set! *runremote* #f) #f))))) (if (> numtries 0) (let ((exe (car (argv)))) (debug:print-info 1 "No server available, attempting to start one...") (process-run exe (list "-server" "-" "-debug" (conc *verbosity*))) (sleep 2) ;; not doing ping, assume the server started and registered itself (server:client-setup numtries: (- numtries 1) do-ping: #f)) |
︙ | ︙ |
Modified tasks.scm from [5ae2b507c5] to [3a1458e323].
︙ | ︙ | |||
47 48 49 50 51 52 53 54 55 56 57 58 59 60 | start_time TIMESTAMP, last_update TIMESTAMP, hostname TEXT, username TEXT, CONSTRAINT monitors_constraint UNIQUE (pid,hostname));") (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS servers (id INTEGER PRIMARY KEY, pid INTEGER, hostname TEXT, port INTEGER, start_time TIMESTAMP, priority INTEGER, state TEXT, mt_version TEXT, heartbeat TIMESTAMP, | > | 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 | start_time TIMESTAMP, last_update TIMESTAMP, hostname TEXT, username TEXT, CONSTRAINT monitors_constraint UNIQUE (pid,hostname));") (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS servers (id INTEGER PRIMARY KEY, pid INTEGER, interface TEXT, hostname TEXT, port INTEGER, start_time TIMESTAMP, priority INTEGER, state TEXT, mt_version TEXT, heartbeat TIMESTAMP, |
︙ | ︙ | |||
72 73 74 75 76 77 78 | mdb)) ;;====================================================================== ;; Server and client management ;;====================================================================== ;; state: 'live, 'shutting-down, 'dead | | | | | | | | | | 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 | mdb)) ;;====================================================================== ;; Server and client management ;;====================================================================== ;; state: 'live, 'shutting-down, 'dead (define (tasks:server-register mdb pid interface port priority state) (sqlite3:execute mdb "INSERT OR REPLACE INTO servers (pid,hostname,port,start_time,priority,state,mt_version,heartbeat,interface) VALUES(?,?,?,strftime('%s','now'),?,?,?,strftime('%s','now'),?);" pid (get-host-name) port priority (conc state) megatest-version interface) (tasks:server-get-server-id mdb (get-host-name) port pid)) ;; NB// two servers with same pid on different hosts will be removed from the list if pid: is used! (define (tasks:server-deregister mdb hostname #!key (port #f)(pid #f)) (debug:print-info 11 "server-deregister " hostname ", port " port ", pid " pid) (if pid (sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid) (if port (sqlite3:execute mdb "DELETE FROM servers WHERE hostname=? AND port=?;" hostname port) (debug:print 0 "ERROR: tasks:server-deregister called with neither pid nor port specified")))) (define (tasks:server-deregister-self mdb hostname) (tasks:server-deregister mdb hostname pid: (current-process-id))) (define (tasks:server-get-server-id mdb hostname port pid) (let ((res #f)) (sqlite3:for-each-row (lambda (id) (set! res id)) mdb (if (and hostname pid) "SELECT id FROM servers WHERE hostname=? AND pid=?;" "SELECT id FROM servers WHERE hostname=? AND port=?;") hostname (if pid pid port)) res)) (define (tasks:server-update-heartbeat mdb server-id) (sqlite3:execute mdb "UPDATE servers SET heartbeat=strftime('%s','now') WHERE id=?;" server-id)) ;; alive servers keep the heartbeat field upto date with seconds every 6 or so seconds (define (tasks:server-alive? mdb server-id #!key (hostname #f)(port #f)(pid #f)) (let* ((server-id (if server-id server-id (tasks:server-get-server-id mdb hostname port pid))) (heartbeat-delta 99e9)) (sqlite3:for-each-row (lambda (delta) (set! heartbeat-delta delta)) mdb "SELECT strftime('%s','now')-heartbeat FROM servers WHERE id=?;" server-id) (> heartbeat-delta 10))) (define (tasks:client-register mdb pid hostname cmdline) (sqlite3:execute mdb "INSERT OR REPLACE INTO clients (server_id,pid,hostname,cmdline,login_time) VALUES(?,?,?,?,strftime('%s','now'));") (tasks:server-get-server-id mdb) pid hostname cmdline) |
︙ | ︙ | |||
149 150 151 152 153 154 155 | ;; ping each server in the db and return first found that responds. ;; remove any others. will not necessarily remove all! (define (tasks:get-best-server mdb #!key (do-ping #f)) (let ((res '()) (best #f)) (sqlite3:for-each-row | | | | | > | > | > > > > > > > > > > > | | | | 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 | ;; ping each server in the db and return first found that responds. ;; remove any others. will not necessarily remove all! (define (tasks:get-best-server mdb #!key (do-ping #f)) (let ((res '()) (best #f)) (sqlite3:for-each-row (lambda (id hostname interface port pid) (set! res (cons (list hostname interface port pid) res)) (debug:print-info 1 "Found " hostname ":" port)) mdb "SELECT id,hostname,interface,port,pid FROM servers WHERE state='live' AND mt_version=? ORDER BY start_time DESC LIMIT 1;" megatest-version) ;; (print "res=" res) (if (null? res) #f (let loop ((hed (car res)) (tal (cdr res))) ;; (print "hed=" hed ", tal=" tal) (let* ((host (car hed)) (iface (cadr hed)) (port (caddr hed)) (pid (cadddr hed)) ;; (ping-res (if do-ping (server:ping host port return-socket: #f) '(#t "NO PING" #f))) (alive (open-run-close tasks:server-alive? tasks:open-db host port: port)) ;; (car ping-res)) ;; (reason (cadr ping-res)) ;; (zsocket (caddr ping-res)) ) (if alive ;; (if (server:ping iface port) (list host iface port) ;; ;; not actually alive, destroy! ;; (begin ;; (if (equal? host (get-host-name)) ;; (begin ;; (debug:print-info 0 "Killing process " pid " on host " host " with signal/term") ;; (send-signal pid signal/term)) ;; (debug:print 0 "WARNING: Can't kill process " pid " on host " host)) ;; (open-run-close tasks:server-deregister tasks:open-db host port: port) ;; #f)) ;; remove defunct server from table (begin (open-run-close tasks:server-deregister tasks:open-db host port: port) (if (null? tal) #f (loop (car tal)(cdr tal)))))))))) (define (tasks:get-all-servers mdb) (let ((res '())) (sqlite3:for-each-row (lambda (id pid hostname interface port start-time priority state mt-version) (set! res (cons (vector id pid hostname interface port start-time priority state mt-version) res))) mdb "SELECT id,pid,hostname,interface,port,start_time,priority,state,mt_version FROM servers ORDER BY start_time DESC;") res)) ;;====================================================================== ;; Tasks and Task monitors ;;====================================================================== |
︙ | ︙ |