Megatest

Diff
Login

Differences From Artifact [7404179285]:

To Artifact [4b29489636]:


88
89
90
91
92
93
94
95
96
97
98

99
100
101
102
103
104
105
88
89
90
91
92
93
94

95
96

97
98
99
100
101
102
103
104







-


-
+







;; db stats
(define *db-stats*            (make-hash-table)) ;; hash of vectors < count duration-total >
(define *db-stats-mutex*      (make-mutex))
;; db access
(define *db-last-access*      (current-seconds)) ;; last db access, used in server
(define *db-write-access*     #t)
;; db sync
(define *db-last-write*       0)                 ;; used to record last touch of db
(define *db-last-sync*        0)                 ;; last time the sync to megatest.db happened
(define *db-sync-in-progress* #f)                ;; if there is a sync in progress do not try to start another
(define *db-multi-sync-mutex* (make-mutex))      ;; protect access to *db-sync-in-progress*, *db-last-sync* and *db-last-write*
(define *db-multi-sync-mutex* (make-mutex))      ;; protect access to *db-sync-in-progress*, *db-last-sync*
;; task db
(define *task-db*             #f) ;; (vector db path-to-db)
(define *db-access-allowed*   #t) ;; flag to allow access
(define *db-access-mutex*     (make-mutex))
(define *db-cache-path*       #f)

;; SERVER
133
134
135
136
137
138
139








140
141
142
143
144
145
146
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153







+
+
+
+
+
+
+
+







(define *test-ids*          (make-hash-table)) ;; cache run-id, testname, and item-path => test-id
(define *test-info*         (make-hash-table)) ;; cache the test info records, update the state, status, run_duration etc. from testdat.db

(define *run-info-cache*     (make-hash-table)) ;; run info is stable, no need to reget
(define *launch-setup-mutex* (make-mutex))     ;; need to be able to call launch:setup often so mutex it and re-call the real deal only if *toppath* not set
(define *homehost-mutex*     (make-mutex))

(defstruct remote
  (hh-dat            (common:get-homehost)) ;; homehost record ( addr . hhflag )
  (server-url        (if *toppath* (server:read-dotserver *toppath*))) ;; (server:check-if-running *toppath*) #f))
  (last-server-check 0)  ;; last time we checked to see if the server was alive
  (conndat           #f)
  (transport         *transport-type*)
  (server-timeout    (or (server:get-timeout) 100))) ;; default to 100 seconds

;; launching and hosts
(defstruct host
  (reachable    #f)
  (last-update  0)
  (last-used    0)
  (last-cpuload 1))

529
530
531
532
533
534
535
536
537
538
539
540







541
542
543
544
545
546
547
548
549

550
551

552
553





554
555
556
557

558
559
560
561
562
563





564
565
566

567
568
569
570

571

572

573
574
575
576
577
578
579
536
537
538
539
540
541
542





543
544
545
546
547
548
549
550
551
552
553
554
555
556
557

558
559

560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575



576
577
578
579
580
581
582
583
584
585
586
587

588
589
590

591
592
593
594
595
596
597
598







-
-
-
-
-
+
+
+
+
+
+
+








-
+

-
+


+
+
+
+
+




+



-
-
-
+
+
+
+
+



+



-
+

+
-
+







  (message-digest-string (md5-primitive) *toppath*))

;;======================================================================
;; E X I T   H A N D L I N G
;;======================================================================

(define (common:run-sync?)
  (let ((ohh (common:on-homehost?))
	(srv (args:get-arg "-server")))
    ;; (debug:print-info 0 *default-log-port* "common:run-sync? ohh=" ohh ", srv=" srv)
    (and (common:on-homehost?)
	 (args:get-arg "-server"))))
    (and (common:on-homehost?)
	 (args:get-arg "-server")))

;;   (let ((ohh (common:on-homehost?))
;; 	(srv (args:get-arg "-server")))
;;     (and ohh srv)))
    ;; (debug:print-info 0 *default-log-port* "common:run-sync? ohh=" ohh ", srv=" srv)

;;;; run-ids
;;    if #f use *db-local-sync* : or 'local-sync-flags
;;    if #t use timestamps      : or 'timestamps
(define (common:sync-to-megatest.db dbstruct) 
  (let ((start-time         (current-seconds))
	(res                (db:multi-db-sync dbstruct 'new2old)))
    (let ((sync-time (- (current-seconds) start-time)))
      (debug:print-info 3 *default-log-port* "Sync of newdb to olddb completed in " sync-time " seconds")
      (debug:print-info 3 *default-log-port* "Sync of newdb to olddb completed in " sync-time " seconds pid="(current-process-id))
      (if (common:low-noise-print 30 "sync new to old")
	  (debug:print-info 0 *default-log-port* "Sync of newdb to olddb completed in " sync-time " seconds")))
	  (debug:print-info 0 *default-log-port* "Sync of newdb to olddb completed in " sync-time " seconds pid="(current-process-id))))
    res))




(define *wdnum* 0)
(define *wdnum*mutex (make-mutex))
;; currently the primary job of the watchdog is to run the sync back to megatest.db from the db in /tmp
;; if we are on the homehost and we are a server (by definition we are on the homehost if we are a server)
;;
(define (common:watchdog)
  
  (thread-sleep! 0.05) ;; delay for startup
  (let ((legacy-sync (common:run-sync?))
	(debug-mode  (debug:debug-mode 1))
	(last-time   (current-seconds)))
    (debug:print-info 0 *default-log-port* "watchdog starting. legacy-sync is " legacy-sync)
    (if legacy-sync
	(last-time   (current-seconds))
        (this-wd-num     (begin (mutex-lock! *wdnum*mutex) (let ((x *wdnum*)) (set! *wdnum* (add1 *wdnum*)) (mutex-unlock! *wdnum*mutex) x)))
        )
    (debug:print-info 0 *default-log-port* "watchdog starting. legacy-sync is " legacy-sync" pid="(current-process-id)" this-wd-num="this-wd-num)
    (if (and legacy-sync (not *time-to-exit*))
	(let ((dbstruct (db:setup)))
	  (debug:print-info 0 *default-log-port* "Server running, periodic sync started.")
	  (let loop ()
            ;;(BB> "watchdog loop.  pid="(current-process-id)" this-wd-num="this-wd-num" *time-to-exit*="*time-to-exit*)
	    ;; sync for filesystem local db writes
	    ;;
	    (mutex-lock! *db-multi-sync-mutex*)
	    (let* ((need-sync        (>= *db-last-write* *db-last-sync*)) ;; no sync since last write
	    (let* ((need-sync        (>= *db-last-access* *db-last-sync*)) ;; no sync since last write
		   (sync-in-progress *db-sync-in-progress*)
		   (should-sync      (and (not *time-to-exit*)
		   (should-sync      (> (- (current-seconds) *db-last-sync*) 5)) ;; sync every five seconds minimum
                                          (> (- (current-seconds) *db-last-sync*) 5))) ;; sync every five seconds minimum
		   (will-sync        (and (or need-sync should-sync)
					  (not sync-in-progress)))
		   (start-time       (current-seconds)))
	      ;; (debug:print-info 0 *default-log-port* "need-sync: " need-sync " sync-in-progress: " sync-in-progress " should-sync: " should-sync " will-sync: " will-sync)
	      (if will-sync (set! *db-sync-in-progress* #t))
	      (mutex-unlock! *db-multi-sync-mutex*)
	      (if will-sync
597
598
599
600
601
602
603


604
605
606
607
608
609

610
611

612
613


614
615
616
617
618
619
620
621
622
623

624
625
626
627
628
629
630
631





632

633
634
635
636

637
638
639
640





641
642
643
644





645
646
647
648

649
650
651
652
653
654
655
616
617
618
619
620
621
622
623
624
625
626
627
628
629

630
631

632
633
634
635
636
637
638
639
640
641
642
643
644
645

646
647
648
649
650
651
652
653
654
655
656
657
658
659

660
661
662
663
664
665




666
667
668
669
670
671
672
673

674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690







+
+





-
+

-
+


+
+









-
+








+
+
+
+
+
-
+




+
-
-
-
-
+
+
+
+
+



-
+
+
+
+
+




+







		    (set! last-time start-time)
		    (debug:print-info 4 *default-log-port* "timestamp -> " (seconds->time-string (current-seconds)) ", time since start -> " (seconds->hr-min-sec (- (current-seconds) *time-zero*))))))
	    
	    ;; keep going unless time to exit
	    ;;
	    (if (not *time-to-exit*)
		(let delay-loop ((count 0))
                  ;;(BB> "delay-loop top; count="count" pid="(current-process-id)" this-wd-num="this-wd-num" *time-to-exit*="*time-to-exit*)
                                                            
		  (if (and (not *time-to-exit*)
			   (< count 4)) ;; was 11, changing to 4. 
		      (begin
			(thread-sleep! 1)
			(delay-loop (+ count 1))))
		  (loop)))
		  (if (not *time-to-exit*) (loop))))
	    (if (common:low-noise-print 30)
		(debug:print-info 0 *default-log-port* "Exiting watchdog timer, *time-to-exit* = " *time-to-exit*)))))))
		(debug:print-info 0 *default-log-port* "Exiting watchdog timer, *time-to-exit* = " *time-to-exit*" pid="(current-process-id)" this-wd-num="this-wd-num)))))))

(define (std-exit-procedure)
  (on-exit (lambda () 0))
  ;;(BB> "std-exit-procedure called; *time-to-exit*="*time-to-exit*)
  (let ((no-hurry  (if *time-to-exit* ;; hurry up
		       #f
		       (begin
			 (set! *time-to-exit* #t)
			 #t))))
    (debug:print-info 4 *default-log-port* "starting exit process, finalizing databases.")
    (if (and no-hurry (debug:debug-mode 18))
	(rmt:print-db-stats))
    (let ((th1 (make-thread (lambda () ;; thread for cleaning up, give it five seconds
			      (if *dbstruct-db* (db:close-all *dbstruct-db*)) ;; one second allocated
                              (if *dbstruct-db* (db:close-all *dbstruct-db*)) ;; one second allocated
			      (if *task-db*    
				  (let ((db (cdr *task-db*)))
				    (if (sqlite3:database? db)
					(begin
					  (sqlite3:interrupt! db)
					  (sqlite3:finalize! db #t)
					  ;; (vector-set! *task-db* 0 #f)
					  (set! *task-db* #f)))))
                              (if (and *runremote*
                                       (remote-conndat *runremote*))
                                  (begin
                                    (http-client#close-all-connections!))) ;; for http-client
                              (if (not (eq? *default-log-port* (current-error-port)))
			      (close-output-port *default-log-port*)
                                  (close-output-port *default-log-port*))
			      (set! *default-log-port* (current-error-port))) "Cleanup db exit thread"))
	  (th2 (make-thread (lambda ()
			      (debug:print 4 *default-log-port* "Attempting clean exit. Please be patient and wait a few seconds...")
			      (if no-hurry
                                  (begin
				  (thread-sleep! 5) ;; give the clean up few seconds to do it's stuff
				  (thread-sleep! 2))
			      (debug:print 4 *default-log-port* " ... done")
			      )
                                    (thread-sleep! 5)) ;; give the clean up few seconds to do it's stuff
                                  (begin
      				  (thread-sleep! 2)))
      			      (debug:print 4 *default-log-port* " ... done")
      			      )
			    "clean exit")))
      (thread-start! th1)
      (thread-start! th2)
      (thread-join! th1))))
      (thread-join! th1)
      )
    )

  0)

(define (std-signal-handler signum)
  ;; (signal-mask! signum)
  (set! *time-to-exit* #t)
  ;;(BB> "got signal "signum)
  (debug:print-error 0 *default-log-port* "Received signal " signum " exiting promptly")
  ;; (std-exit-procedure) ;; shouldn't need this since we are exiting and it will be called anyway
  (exit))

(set-signal-handler! signal/int  std-signal-handler)  ;; ^C
(set-signal-handler! signal/term std-signal-handler)
;; (set-signal-handler! signal/stop std-signal-handler)  ;; ^Z NO, do NOT handle ^Z!
777
778
779
780
781
782
783
784
785




786
787
788
789
790
791
792
793










794
795
796
797
798
799
800
812
813
814
815
816
817
818


819
820
821
822








823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839







-
-
+
+
+
+
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+







(define (common:args-get-state)
  (or (args:get-arg "-state")(args:get-arg ":state")))

(define (common:args-get-status)
  (or (args:get-arg "-status")(args:get-arg ":status")))

(define (common:args-get-testpatt rconf)
  (let* ((rtestpatt     (if rconf (runconfigs-get rconf "TESTPATT") #f))
	 (args-testpatt (or (args:get-arg "-testpatt")
  (let* ((tagexpr (args:get-arg "-tagexpr"))
         (tags-testpatt (if tagexpr (string-join (runs:get-tests-matching-tags tagexpr) ",") #f))
         (testpatt-key  (if (args:get-arg "-mode") (args:get-arg "-mode") "TESTPATT"))
         (args-testpatt (or (args:get-arg "-testpatt") (args:get-arg "-runtests") "%"))
			    (args:get-arg "-runtests")
			    "%"))
	 (testpatt    (or (and (equal? args-testpatt "%")
			       rtestpatt)
			  args-testpatt)))
    (if rtestpatt (debug:print-info 0 *default-log-port* "TESTPATT from runconfigs: " rtestpatt))
    testpatt))

         (rtestpatt     (if rconf (runconfigs-get rconf testpatt-key) #f)))
    (cond
     (tags-testpatt
      (debug:print-info 0 *default-log-port* "-tagexpr "tagexpr" selects testpatt "tags-testpatt)
      tags-testpatt)
     ((and (equal? args-testpatt "%") rtestpatt)
      (debug:print-info 0 *default-log-port* "testpatt defined in "testpatt-key" from runconfigs: " rtestpatt)
      rtestpatt)
     (else args-testpatt))))
     
(define (common:get-linktree)
  (or (getenv "MT_LINKTREE")
      (if *configdat*
	  (configf:lookup *configdat* "setup" "linktree"))))

(define (common:args-get-runname)
  (let ((res (or (args:get-arg "-runname")
1082
1083
1084
1085
1086
1087
1088
1089


1090
1091
1092
1093
1094
1095
1096
1121
1122
1123
1124
1125
1126
1127

1128
1129
1130
1131
1132
1133
1134
1135
1136







-
+
+







	   (with-input-from-pipe 
	    (conc "ssh " remote-host " cat /proc/loadavg")
	    (lambda ()(list (read)(read)(read)))))
      (with-input-from-file "/proc/loadavg" 
	(lambda ()(list (read)(read)(read))))))

;; get normalized cpu load by reading from /proc/loadavg and /proc/cpuinfo return all three values and the number of real cpus and the number of threads
;; returns list (normalized-proc-load normalized-core-load 1m 5m 15m ncores nthreads)
;; returns alist '((adj-cpu-load . normalized-proc-load) ... etc.
;;  keys: adj-proc-load, adj-core-load, 1m-load, 5m-load, 15m-load
;;
(define (common:get-normalized-cpu-load remote-host)
  (let ((data (if remote-host
                  (with-input-from-pipe 
                   (conc "ssh " remote-host " cat /proc/loadavg;cat /proc/cpuinfo;echo end")
                   read-lines)
                  (append 
1141
1142
1143
1144
1145
1146
1147


1148



























1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169








1170
1171
1172
1173



1174
1175
1176
1177
1178
1179
1180
1181






























1182
1183
1184
1185
1186
1187
1188






1189
1190
1191
1192
1193
1194
1195
1181
1182
1183
1184
1185
1186
1187
1188
1189

1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219


















1220
1221
1222
1223
1224
1225
1226
1227




1228
1229
1230








1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260







1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273







+
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+



-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
-
-
-
-
+
+
+
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
-
-
-
-
+
+
+
+
+
+








(define (common:unix-ping hostname)
  (let ((res (system (conc "ping -c 1 " hostname " > /dev/null"))))
    (eq? res 0)))

;; ideally put all this info into the db, no need to preserve it across moving homehost
;;
;; return list of
;;  ( reachable? cpuload update-time )
(define (common:get-least-loaded-host hosts-raw)
(define (common:get-host-info hostname)
  (let* ((loadinfo (rmt:get-latest-host-load hostname))
         (load (car loadinfo))
         (load-sample-time (cdr loadinfo))
         (load-sample-age (- (current-seconds) load-sample-time))
         (loadinfo-timeout-seconds 20)
         (host-last-update-timeout-seconds 10)
         (host-rec (hash-table-ref/default *host-loads* hostname #f))
         )
    (cond
     ((< load-sample-age loadinfo-timeout-seconds)
      (list #t
            load-sample-time
            load))
     ((and host-rec
           (< (current-seconds) (+ (host-last-update host-rec) host-last-update-timeout-seconds)))
      (list #t
            (host-last-update host-rec)
            (host-last-cpuload host-rec )))
     ((common:unix-ping hostname)
      (list #t
            (current-seconds)
            (alist-ref 'adj-core-load (common:get-normalized-cpu-load hostname))))
     (else
      (list #f 0 -1)))))
    
(define (common:update-host-loads-table hosts-raw)
  (let* ((hosts (filter (lambda (x)
                          (string-match (regexp "^\\S+$") x))
                        hosts-raw)))
    (if (null? hosts)
        #f
        ;;
        ;; stategy:
        ;;    sort by last-used and normalized-load
        ;;    if last-updated > 15 seconds then re-update
        ;;    take the host with the lowest load with the lowest last-used (i.e. not used for longest time)
        ;;
        (let ((best-host #f)
              (curr-time (current-seconds)))
          (for-each
           (lambda (hostname)
             (let* ((rec       (let ((h (hash-table-ref/default *host-loads* hostname #f)))
                                 (if h
                                     h
                                     (let ((h (make-host)))
                                       (hash-table-set! *host-loads* hostname h)
                                       h))))
    (for-each
     (lambda (hostname)
       (let* ((rec       (let ((h (hash-table-ref/default *host-loads* hostname #f)))
                          (if h
                              h
                              (let ((h (make-host)))
                                (hash-table-set! *host-loads* hostname h)
                                h))))
                    ;; if host hasn't been pinged in 15 sec update it's data
                    (ping-good (if (< (- curr-time (host-last-update rec)) 15)
                                   (host-reachable rec)
                                   (or (host-reachable rec)
              (host-info         (common:get-host-info hostname))
              (is-reachable      (car host-info))
              (last-reached-time (cadr host-info))
                                       (begin
                                         (host-reachable-set! rec (common:unix-ping hostname))
                                         (host-last-update-set! rec curr-time)
                                         (host-last-cpuload-set! rec (common:get-normalized-cpu-load hostname))
                                         (host-reachable rec))))))
               (cond
                ((not best-host)
                 (set! best-host hostname))
              (load              (caddr host-info)))
         (host-reachable-set!    rec is-reachable)
         (host-last-update-set!  rec last-reached-time)
         (host-last-cpuload-set! rec load)))
     hosts)))

(define (common:get-least-loaded-host hosts-raw)
  (let* ((hosts (filter (lambda (x)
                          (string-match (regexp "^\\S+$") x))
                        hosts-raw))
         (best-host #f)
         (best-load 99999)
         (curr-time (current-seconds)))
    (common:update-host-loads-table hosts)
    (for-each
     (lambda (hostname)
       (let* ((rec
               (let ((h (hash-table-ref/default *host-loads* hostname #f)))
                 (if h
                     h
                     (let ((h (make-host)))
                       (hash-table-set! *host-loads* hostname h)
                       h))))
              (reachable (host-reachable rec))
              (load      (host-last-cpuload   rec)))
         (cond
          ((not reachable) #f)
          ((< (+ load (/ (random 250) 1000))         ;; add a random factor to keep from getting in a rut
              (+ best-load (/ (random 250) 1000))  )
           (set! best-load load)
                ((and ping-good
                      (< (alist-ref 'adj-core-load (host-last-cpuload rec))
                         (alist-ref 'adj-core-load
                                    (host-last-cpuload (hash-table-ref *host-loads* best-host)))))
                 (set! best-host hostname)))))
           hosts)
          best-host))))
           (set! best-host hostname)))))
     hosts)
    best-host))




(define (common:wait-for-cpuload maxload numcpus waitdelay #!key (count 1000) (msg #f)(remote-host #f))
  (let* ((loadavg (common:get-cpu-load remote-host))
	 (first   (car loadavg))
	 (next    (cadr loadavg))
	 (adjload (* maxload numcpus))
	 (loadjmp (- first next)))