︙ | | |
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
-
-
-
-
+
+
+
+
+
-
+
-
+
+
-
+
-
-
+
+
-
+
-
+
|
(loop (file-exists? fullpath)
(- count 1)))
(begin
(if remove (system (conc "rm -rf " fullpath)))
#f)))
#t))))))
(define (tasks:get-task-db-path)
(let ((dbdir (or (configf:lookup *configdat* "setup" "monitordir")
(configf:lookup *configdat* "setup" "dbdir")
(conc (configf:lookup *configdat* "setup" "linktree") "/.db"))))
(define (tasks:get-task-db-path area-dat)
(let* ((configdat (megatest:area-configdat area-dat))
(dbdir (or (configf:lookup configdat "setup" "monitordir")
(configf:lookup configdat "setup" "dbdir")
(conc (configf:lookup configdat "setup" "linktree") "/.db"))))
(handle-exceptions
exn
(begin
(debug:print 0 "ERROR: Couldn't create path to " dbdir)
(exit 1))
(if (not (directory? dbdir))(create-directory dbdir #t)))
dbdir))
;; If file exists AND
;; file readable
;; ==> open it
;; If file exists AND
;; file NOT readable
;; ==> open in-mem version
;; If file NOT exists
;; ==> open in-mem version
;;
(define (tasks:open-db #!key (numretries 4))
(define (tasks:open-db area-dat #!key (numretries 4))
(if *task-db*
*task-db*
(handle-exceptions
exn
(if (> numretries 0)
(begin
(print-call-chain (current-error-port))
(debug:print 0 " message: " ((condition-property-accessor 'exn 'message) exn))
(debug:print 0 " exn=" (condition->list exn))
(thread-sleep! 1)
(tasks:open-db numretries (- numretries 1)))
(tasks:open-db area-dat numretries: (- numretries 1)))
(begin
(print-call-chain (current-error-port))
(debug:print 0 " message: " ((condition-property-accessor 'exn 'message) exn))
(debug:print 0 " exn=" (condition->list exn))))
(let* ((toppath (megatest:area-path area-dat))
(let* ((dbpath (tasks:get-task-db-path))
(dbpath (tasks:get-task-db-path area-dat))
(dbfile (conc dbpath "/monitor.db"))
(avail (tasks:wait-on-journal dbpath 10)) ;; wait up to about 10 seconds for the journal to go away
(exists (file-exists? dbpath))
(write-access (file-write-access? dbpath))
(mdb (cond ;; what the hek is *toppath* doing here?
((and (string? *toppath*)(file-write-access? *toppath*))
(mdb (cond ;; what the hek is toppath doing here?
((and (string? toppath)(file-write-access? toppath))
(sqlite3:open-database dbfile))
((file-read-access? dbpath) (sqlite3:open-database dbfile))
(else (sqlite3:open-database ":memory:")))) ;; (never-give-up-open-db dbpath))
(handler (make-busy-timeout 36000)))
(if (and exists
(not write-access))
(set! *db-write-access* write-access)) ;; only unset so other db's also can use this control
(sqlite3:set-busy-handler! mdb handler)
(db:set-sync mdb) ;; (sqlite3:execute mdb (conc "PRAGMA synchronous = 0;"))
(db:set-sync mdb area-dat) ;; (sqlite3:execute mdb (conc "PRAGMA synchronous = 0;"))
;; (if (or (and (not exists)
;; (file-write-access? *toppath*))
;; (file-write-access? toppath))
;; (not (file-read-access? dbpath)))
;; (begin
;;
;; TASKS QUEUE MOVED TO main.db
;;
;; (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS tasks_queue (id INTEGER PRIMARY KEY,
;; action TEXT DEFAULT '',
|
︙ | | |
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
|
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
|
-
+
-
+
-
+
-
+
-
+
|
hostname TEXT,
cmdline TEXT,
login_time TIMESTAMP,
logout_time TIMESTAMP DEFAULT -1,
CONSTRAINT clients_constraint UNIQUE (pid,hostname));")
;))
(set! *task-db* (cons mdb dbpath))
(set! *task-db* (cons mdb dbpath)) ;; Move into area-dat !!!!
*task-db*))))
;;======================================================================
;; Server and client management
;;======================================================================
;; make-vector-record tasks hostinfo id interface port pubport transport pid hostname
(define (tasks:hostinfo-get-id vec) (vector-ref vec 0))
(define (tasks:hostinfo-get-interface vec) (vector-ref vec 1))
(define (tasks:hostinfo-get-port vec) (vector-ref vec 2))
(define (tasks:hostinfo-get-pubport vec) (vector-ref vec 3))
(define (tasks:hostinfo-get-transport vec) (vector-ref vec 4))
(define (tasks:hostinfo-get-pid vec) (vector-ref vec 5))
(define (tasks:hostinfo-get-hostname vec) (vector-ref vec 6))
(define (tasks:server-lock-slot mdb run-id)
(define (tasks:server-lock-slot mdb run-id area-dat)
(tasks:server-clean-out-old-records-for-run-id mdb run-id " tasks:server-lock-slot")
(if (< (tasks:num-in-available-state mdb run-id) 4)
(begin
(tasks:server-set-available mdb run-id)
(tasks:server-set-available mdb run-id area-dat)
(thread-sleep! (/ (random 1500) 1000)) ;; (thread-sleep! 2) ;; Try removing this. It may not be needed.
(tasks:server-am-i-the-server? mdb run-id))
#f))
;; register that this server may come online (first to register goes though with the process)
(define (tasks:server-set-available mdb run-id)
(define (tasks:server-set-available mdb run-id area-dat)
(sqlite3:execute
mdb
"INSERT INTO servers (pid,hostname,port,pubport,start_time, priority,state,mt_version,heartbeat, interface,transport,run_id)
VALUES(?, ?, ?, ?, strftime('%s','now'), ?, ?, ?,-1,?, ?, ?);"
(current-process-id) ;; pid
(get-host-name) ;; hostname
-1 ;; port
-1 ;; pubport
(random 1000) ;; priority (used a tiebreaker on get-available)
"available" ;; state
(common:version-signature) ;; mt_version
-1 ;; interface
;; (conc (server:get-transport)) ;; transport
(conc *transport-type*) ;; transport
(conc (megatest:area-transport area-dat)) ;; transport
run-id
))
(define (tasks:num-in-available-state mdb run-id)
(let ((res 0))
(sqlite3:for-each-row
(lambda (num-in-queue)
|
︙ | | |
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
|
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
|
-
+
-
-
-
-
-
|
(highnum 64000)
(used-ports '())
(get-rand-port (lambda ()
(+ lownum (random (- highnum lownum)))))
(port-param (if (and (args:get-arg "-port")
(string->number (args:get-arg "-port")))
(string->number (args:get-arg "-port"))
#f))
#f)))
;; (config-port (if (and (config-lookup *configdat* "server" "port")
;; (string->number (config-lookup *configdat* "server" "port")))
;; (string->number (config-lookup *configdat* "server" "port"))
;; #f))
)
(sqlite3:for-each-row
(lambda (port)
(set! used-ports (cons port used-ports)))
mdb
"SELECT port FROM servers;")
(cond
((and port-param res) (if (> res port-param) res port-param))
|
︙ | | |
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
|
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
|
-
-
+
+
+
+
+
+
-
+
-
+
-
+
-
+
|
(sqlite3:for-each-row
(lambda (id)
(set! res id))
mdb ;; NEEDS dbprep ADDED
"SELECT id FROM servers WHERE run_id=? AND state = 'running';" run-id)
res))
(define (tasks:need-server run-id)
(configf:lookup *configdat* "server" "required"))
(define (tasks:need-server run-id area-dat)
(let ((req (configf:lookup (megatest:area-configdat area-dat) "server" "required")))
(if (and req
(equal? req "yes"))
#t
#f)))
;; (maxqry (cdr (rmt:get-max-query-average run-id)))
;; (threshold (string->number (or (configf:lookup *configdat* "server" "server-query-threshold") "10"))))
;; (threshold (string->number (or (configf:lookup configdat "server" "server-query-threshold") "10"))))
;; (cond
;; (forced
;; (if (common:low-noise-print 60 run-id "server required is set")
;; (debug:print-info 0 "Server required is set, starting server for run-id " run-id "."))
;; #t)
;; ((> maxqry threshold)
;; (if (common:low-noise-print 60 run-id "Max query time execeeded")
;; (debug:print-info 0 "Max avg query time of " maxqry "ms exceeds limit of " threshold "ms, server needed for run-id " run-id "."))
;; #t)
;; (else
;; #f))))
;; try to start a server and wait for it to be available
;;
(define (tasks:start-and-wait-for-server tdbdat run-id delay-max-tries)
(define (tasks:start-and-wait-for-server tdbdat run-id delay-max-tries area-dat)
;; ensure a server is running for this run
(let loop ((server-dat (tasks:get-server (db:delay-if-busy tdbdat) run-id))
(let loop ((server-dat (tasks:get-server (db:delay-if-busy tdbdat area-dat) run-id))
(delay-time 0))
(if (and (not server-dat)
(< delay-time delay-max-tries))
(begin
(if (common:low-noise-print 60 "tasks:start-and-wait-for-server" run-id)
(debug:print 0 "Try starting server for run-id " run-id))
(thread-sleep! (/ (random 2000) 1000))
(server:kind-run run-id)
(thread-sleep! (min delay-time 1))
(loop (tasks:get-server (db:delay-if-busy tdbdat) run-id)(+ delay-time 1))))))
(loop (tasks:get-server (db:delay-if-busy tdbdat area-dat) run-id)(+ delay-time 1))))))
(define (tasks:get-all-servers mdb)
(let ((res '()))
(sqlite3:for-each-row
(lambda (id pid hostname interface port pubport start-time priority state mt-version last-update transport run-id)
;; 0 1 2 3 4 5 6 7 8 9 10 11 12
(set! res (cons (vector id pid hostname interface port pubport start-time priority state mt-version last-update transport run-id) res)))
|
︙ | | |
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
|
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
|
-
-
-
+
+
+
-
+
-
+
|
(setenv "TARGETHOST_LOGF" "server-kills.log")
(system (conc "nbfake kill " pid))
(unsetenv "TARGETHOST_LOGF")
(unsetenv "TARGETHOST"))
;; look up a server by run-id and send it a kill, also delete the record for that server
;;
(define (tasks:kill-server-run-id run-id #!key (tag "default"))
(let* ((tdbdat (tasks:open-db))
(sdat (tasks:get-server (db:delay-if-busy tdbdat) run-id)))
(define (tasks:kill-server-run-id run-id area-dat #!key (tag "default"))
(let* ((tdbdat (tasks:open-db area-dat))
(sdat (tasks:get-server (db:delay-if-busy tdbdat area-dat) run-id)))
(if sdat
(let ((hostname (vector-ref sdat 6))
(pid (vector-ref sdat 5))
(server-id (vector-ref sdat 0)))
(tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "killed")
(tasks:server-set-state! (db:delay-if-busy tdbdat area-dat) server-id "killed")
(debug:print-info 0 "Killing server " server-id " for run-id " run-id " on host " hostname " with pid " pid)
(tasks:kill-server hostname pid)
(tasks:server-delete-record (db:delay-if-busy tdbdat) server-id tag) )
(tasks:server-delete-record (db:delay-if-busy tdbdat area-dat) server-id tag) )
(debug:print-info 0 "No server found for run-id " run-id ", nothing to kill"))
;; (sqlite3:finalize! tdb)
))
;;======================================================================
;; M O N I T O R S
;;======================================================================
|
︙ | | |
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
|
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
|
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
+
|
(lambda (count)
(set! res count))
mdb
"SELECT count(id) FROM monitors WHERE last_update < (strftime('%s','now') - 300) AND username=?;"
(car (user-information (current-user-id))))
res))
;;
(define (tasks:start-monitor db mdb)
(if (> (tasks:get-num-alive-monitors mdb) 2) ;; have two running, no need for more
(debug:print-info 1 "Not starting monitor, already have more than two running")
(let* ((megatestdb (conc *toppath* "/megatest.db"))
(monitordbf (conc (db:dbfile-path #f) "/monitor.db"))
(last-db-update 0)) ;; (file-modification-time megatestdb)))
(task:register-monitor mdb)
(let loop ((count 0)
(next-touch 0)) ;; next-touch is the time where we need to update last_update
;; if the db has been modified we'd best look at the task queue
(let ((modtime (file-modification-time megatestdbpath )))
(if (> modtime last-db-update)
(tasks:process-queue db mdb last-db-update megatestdb next-touch))
;; WARNING: Possible race conditon here!!
;; should this update be immediately after the task-get-action call above?
(if (> (current-seconds) next-touch)
(begin
(tasks:monitors-update mdb)
(loop (+ count 1)(+ (current-seconds) 240)))
(loop (+ count 1) next-touch)))))))
;;======================================================================
;; T A S K S Q U E U E
;;
;; NOTE:: These operate on task_queue which is in main.db
;;
;;======================================================================
;; NOTE: It might be good to add one more layer of checking to ensure
;; that no task gets run in parallel.
;; register a task
(define (tasks:add dbstruct action owner target runname testpatt params)
(define (tasks:add dbstruct area-dat action owner target runname testpatt params)
(db:with-db
dbstruct #f #t
dbstruct area-dat #f #t
(lambda (db)
(sqlite3:execute db "INSERT INTO tasks_queue (action,owner,state,target,name,testpatt,params,creation_time,execution_time)
VALUES (?,?,'new',?,?,?,?,strftime('%s','now'),0);"
action
owner
target
runname
|
︙ | | |
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
|
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
|
-
+
-
+
-
+
-
+
-
+
-
+
-
-
+
+
|
(lambda (db)
(handle-exceptions
exn
#f
(sqlite3:first-result db "SELECT id FROM tasks_queue WHERE params LIKE ?;"
task-params)))))
(define (tasks:set-state-given-param-key dbstruct param-key new-state)
(define (tasks:set-state-given-param-key dbstruct area-dat param-key new-state)
(db:with-db
dbstruct #f #t
dbstruct area-dat #f #t
(lambda (db)
(sqlite3:execute db "UPDATE tasks_queue SET state=? WHERE params LIKE ?;" new-state param-key))))
(define (tasks:get-records-given-param-key dbstruct param-key state-patt action-patt test-patt)
(define (tasks:get-records-given-param-key dbstruct area-dat param-key state-patt action-patt test-patt)
(db:with-db
dbstruct #f #f
dbstruct area-dat #f #f
(lambda (db)
(handle-exceptions
exn
'()
(sqlite3:first-row db "SELECT id,action,owner,state,target,name,testpatt,keylock,params WHERE
params LIKE ? AND state LIKE ? AND action LIKE ? AND testpatt LIKE ?;"
param-key state-patt action-patt test-patt)))))
(define (tasks:find-task-queue-records dbstruct target run-name test-patt state-patt action-patt)
(define (tasks:find-task-queue-records dbstruct area-dat target run-name test-patt state-patt action-patt)
;; (handle-exceptions
;; exn
;; '()
;; (sqlite3:first-row
(let ((db (db:delay-if-busy (db:get-db dbstruct #f)))
(let ((db (db:delay-if-busy (db:get-db dbstruct #f) area-dat))
(res '()))
(sqlite3:for-each-row
(lambda (a . b)
(set! res (cons (cons a b) res)))
db "SELECT id,action,owner,state,target,name,testpatt,keylock,params FROM tasks_queue
WHERE
target = ? AND name = ? AND state LIKE ? AND action LIKE ? AND testpatt LIKE ?;"
target run-name state-patt action-patt test-patt)
res)) ;; )
;; kill any runner processes (i.e. processes handling -runtests) that match target/runname
;;
;; do a remote call to get the task queue info but do the killing as self here.
;;
(define (tasks:kill-runner target run-name)
(let ((records (rmt:tasks-find-task-queue-records target run-name "%" "running" "run-tests"))
(define (tasks:kill-runner target run-name area-dat)
(let ((records (rmt:tasks-find-task-queue-records target run-name "%" "running" "run-tests" area-dat))
(hostpid-rx (regexp "\\s+(\\w+)\\s+(\\d+)$"))) ;; host pid is at end of param string
(if (null? records)
(debug:print 0 "No run launching processes found for " target " / " run-name)
(debug:print 0 "Found " (length records) " run(s) to kill."))
(for-each
(lambda (record)
(let* ((param-key (list-ref record 8))
|
︙ | | |