︙ | | |
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
|
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
|
+
+
+
+
-
-
+
+
+
+
+
+
+
|
(define (common:version-db-delta)
(- megatest-version (common:get-last-run-version-number)))
(define (common:version-changed?)
(not (equal? (common:get-last-run-version)
(common:version-signature))))
;; From 1.70 to 1.80, db's are compatible.
(define (common:api-changed?)
(let* (
(not (equal? (substring (->string megatest-version) 0 4)
(substring (conc (common:get-last-run-version)) 0 4))))
(megatest-major-version (substring (->string megatest-version) 0 4))
(run-major-version (substring (conc (common:get-last-run-version)) 0 4))
)
(and (not (equal? megatest-major-version "1.80"))
(not (equal? megatest-major-version megatest-run-version)))
)
)
;;======================================================================
;; Move me elsewhere ...
;; RADT => Why do we meed the version check here, this is called only if version misma
;;
(define (common:cleanup-db dbstruct #!key (full #f))
(apply db:multi-db-sync
|
︙ | | |
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
|
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
|
-
+
|
;; logs directory you wish to log-rotate.
;;
(define (common:rotate-logs)
(let* ((all-files (make-hash-table))
(stats (make-hash-table))
(inc-stat (lambda (key)
(hash-table-set! stats key (+ (hash-table-ref/default stats key 0) 1))))
(max-allowed (string->number (or (configf:lookup *configdat* "setup" "max-logfiles") "300")))) ;; name -> age
(max-allowed (string->number (or (configf:lookup *configdat* "setup" "max-logfiles") "600")))) ;; name -> age
(if (not (directory-exists? "logs"))(create-directory "logs"))
(directory-fold
(lambda (file rem)
(handle-exceptions
exn
(begin
(debug:print-info 2 *default-log-port* "unable to rotate log " file ", probably handled by another process, this is safe to ignore. exn=" exn)
|
︙ | | |
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
|
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
|
+
-
+
-
+
|
(debug:print-info 0 *default-log-port* "Deleted " (length files) " files from logs, keeping " max-allowed " files."))))))
;;======================================================================
;; Force a megatest cleanup-db if version is changed and skip-version-check not specified
;; Do NOT check if not on homehost!
;;
(define (common:exit-on-version-changed)
(if (and *toppath* ;; do nothing if *toppath* not yet provided
(if (common:on-homehost?)
(common:on-homehost?))
(if (common:api-changed?)
(let* ((mtconf (conc (get-environment-variable "MT_RUN_AREA_HOME") "/megatest.config"))
(dbfile (conc (get-environment-variable "MT_RUN_AREA_HOME") "/megatest.db"))
(dbfile (conc (get-environment-variable "MT_RUN_AREA_HOME") ".megatest/main.db"))
(read-only (not (file-write-access? dbfile)))
(dbstruct (db:setup #t))) ;; (db:setup-db *dbstruct-dbs* *toppath* #f))) ;; #t)))
(debug:print 0 *default-log-port*
"WARNING: Version mismatch!\n"
" expected: " (common:version-signature) "\n"
" got: " (common:get-last-run-version))
(cond
|
︙ | | |
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
|
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
|
-
+
-
+
|
(print-call-chain (current-error-port))
(exit 1))
(common:cleanup-db dbstruct)))
((not (common:file-exists? mtconf))
(debug:print 0 *default-log-port* " megatest.config does not exist in this area. Cannot proceed with megatest version migration.")
(exit 1))
((not (common:file-exists? dbfile))
(debug:print 0 *default-log-port* " megatest.db does not exist in this area. Cannot proceed with megatest version migration.")
(debug:print 0 *default-log-port* " .megatest/main.db does not exist in this area. Cannot proceed with megatest version migration.")
(exit 1))
((not (eq? (current-user-id)(file-owner mtconf)))
(debug:print 0 *default-log-port* " You do not own megatest.db in this area. Cannot proceed with megatest version migration.")
(debug:print 0 *default-log-port* " You do not own .megatest/main.db in this area. Cannot proceed with megatest version migration.")
(exit 1))
(read-only
(debug:print 0 *default-log-port* " You have read-only access to this area. Cannot proceed with megatest version migration.")
(exit 1))
(else
(debug:print 0 *default-log-port* " to switch versions you can run: \"megatest -cleanup-db\"")
(exit 1)))))))
|
︙ | | |
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
|
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
|
+
+
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
|
(if dat
dat
""))))
(define (common:alist-ref/default key alist default)
(or (alist-ref key alist) default))
;; moved into commonmod
;;
(define (common:low-noise-print waitval . keys)
(let* ((key (string-intersperse (map conc keys) "-" ))
(lasttime (hash-table-ref/default *common:denoise* key 0))
(currtime (current-seconds)))
(if (> (- currtime lasttime) waitval)
(begin
(hash-table-set! *common:denoise* key currtime)
#t)
#f)))
;; (define (common:low-noise-print waitval . keys)
;; (let* ((key (string-intersperse (map conc keys) "-" ))
;; (lasttime (hash-table-ref/default *common:denoise* key 0))
;; (currtime (current-seconds)))
;; (if (> (- currtime lasttime) waitval)
;; (begin
;; (hash-table-set! *common:denoise* key currtime)
;; #t)
;; #f)))
(define (common:get-megatest-exe)
(or (getenv "MT_MEGATEST") "megatest"))
(define (common:read-encoded-string instr)
(handle-exceptions
exn
|
︙ | | |
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
|
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
|
+
-
-
+
+
|
(message-digest-string (md5-primitive) str))
;;======================================================================
;; E X I T H A N D L I N G
;;======================================================================
(define (common:run-sync?)
(and *toppath* ;; gate if called before *toppath* is set
(and (common:on-homehost?)
(args:get-arg "-server")))
(common:on-homehost?)
(args:get-arg "-server")))
(define (common:human-time)
(time->string (seconds->local-time (current-seconds)) "%Y-%m-%d %H:%M:%S"))
(define (std-signal-handler signum)
;; (signal-mask! signum)
|
︙ | | |
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
|
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
|
+
+
+
+
+
+
+
|
(begin ;; found a host, return it
(debug:print 0 *default-log-port* "INFO: Found host: " new-best " load: " load " last-used: " delta " seconds ago, with job-rate: " job-rate)
(host-last-used-set! rec curr-time)
new-best)
(if (null? tal) #f (loop (car tal)(cdr tal) best-host)))))))))
(define (common:wait-for-homehost-load maxnormload msg)
(let loop ((start-time (current-seconds))) ;; we saw some instances of this being called before *toppath* was set. This might be an early setup race. This delay should help but it is impossible to test...
(if (not *toppath*)
(begin
(debug:print 0 *default-log-port* "ERROR: common:wait-for-homehost-load called before *toppath* set.")
(thread-sleep! 30)
(if (< (- (current-seconds) start-time) 300)
(loop start-time)))))
(let* ((hh-dat (if (common:on-homehost?) ;; if we are on the homehost then pass in #f so the calls are local.
#f
(server:choose-server *toppath* 'homehost)))
(hh (if hh-dat (car hh-dat) #f)))
(common:wait-for-normalized-load maxnormload msg hh)))
(define (common:get-num-cpus remote-host)
|
︙ | | |
︙ | | |
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
|
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
|
+
-
-
-
+
+
+
+
|
;; return the target db handle so it can be used
;;
(define (db:cache-for-read-only source target #!key (use-last-update #f))
(if (and (hash-table-ref/default *global-db-store* target #f)
(>= (file-modification-time target)(file-modification-time source)))
(hash-table-ref *global-db-store* target)
(let* ((toppath (launch:setup))
(targ-db-last-mod (db:get-sqlite3-mod-time target))
(targ-db-last-mod (if (common:file-exists? target)
(file-modification-time target)
0))
;; (if (common:file-exists? target)
;; BUG: This needs to include wal mode stuff .shm etc.
;; (file-modification-time target)
;; 0))
(cache-db (or (hash-table-ref/default *global-db-store* target #f)
(db:open-megatest-db path: target)))
(source-db (db:open-megatest-db path: source))
(curr-time (current-seconds))
(res '())
(last-update (if use-last-update (cons "last_update" targ-db-last-mod) #f)))
(db:sync-tables (db:sync-main-list source-db) last-update source-db cache-db)
|
︙ | | |
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
|
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
|
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
-
+
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
|
;; megatest-db
;; (conc cache-dir "/" fname)
;; use-last-update: #t)))
;; (thread-start! th1)
;; (apply proc cache-db params)
;; ))))
(define (db:get-sqlite3-mod-time fname)
(let* ((wal-file (conc fname "-wal"))
(shm-file (conc fname "-shm"))
(get-mtime (lambda (f)
(if (and (file-exists? f)
(file-read-access? f))
(file-modification-time f)
0))))
(max (get-mtime fname)
(get-mtime wal-file)
(get-mtime shm-file))))
(define (db:all-db-sync dbstruct)
(let* ((dbdat (db:open-db dbstruct #f db:initialize-main-db))
(data-synced 0) ;; count of changed records
(tmp-area (common:get-db-tmp-area))
(dbfiles (glob (conc tmp-area"/.megatest/*.db")))
(sync-durations (make-hash-table))
(no-sync-db (db:open-no-sync-db)))
(for-each
(lambda (file)
(lambda (file) ;; tmp db file
(debug:print-info 3 *default-log-port* "file: " file)
(let* ((fname (conc (pathname-file file) ".db"))
(fulln (conc *toppath*"/.megatest/"fname))
(time1 (if (file-exists? file)
(file-modification-time file)
(begin
(debug:print-info 2 *default-log-port* "Sync - I do not see file "file)
1)))
(time2 (if (file-exists? fulln)
(file-modification-time fulln)
(begin
(debug:print-info 2 *default-log-port* "Sync - I do not see file "fulln)
0)))
(let* ((fname (conc (pathname-file file) ".db")) ;; fname is tmp db file
(wal-file (conc fname "-wal"))
(shm-file (conc fname "-shm"))
(fulln (conc *toppath*"/.megatest/"fname)) ;; fulln is nfs db name
(wal-time (if (file-exists? wal-file)
(file-modification-time wal-file)
0))
(shm-time (if (file-exists? shm-file)
(file-modification-time shm-file)
0))
(time1 (db:get-sqlite3-mod-time file))
;; (if (file-exists? file) ;; time1 is the max itime of the tmp db, -wal and -shm files.
;; (max (file-modification-time file) wal-time shm-time)
;; (begin
;; (debug:print-info 2 *default-log-port* "Sync - I do not see file "file)
;; 1)))
(time2 (db:get-sqlite3-mod-time fulln))
;; (if (file-exists? fulln) ;; time2 is nfs file time
;; (file-modification-time fulln)
;; (begin
;; (debug:print-info 2 *default-log-port* "Sync - I do not see file "fulln)
;; 0)))
(changed (> (- time1 time2) (+ (random 5) 1))) ;; it has been at some few seconds since last synced
(changed10 (> (- time1 time2) 10)) ;; it has been at least ten seconds since sync'd
(jfile-exists (file-exists? (conc file"-journal"))) ;; i.e. are we busy?
(do-cp (cond
((not (file-exists? fulln)) ;; shouldn't happen, but this might recover
(cons #t (conc "File "fulln" not found! Copying "fname" to "fulln)))
((and (not jfile-exists) changed)
|
︙ | | |
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
|
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
|
-
-
+
+
+
+
|
(servers (server:choose-server *toppath* 'all-valid)) ;; (server:get-list *toppath*))
(src-area (if old2new *toppath* tmp-area))
(dest-area (if old2new tmp-area *toppath*))
(dbfiles (if old2new (glob (conc *toppath* "/.megatest/*.db")) (glob (conc tmp-area "/.megatest/*.db"))))
(keys (db:get-keys dbstruct))
(sync-durations (make-hash-table)))
(if killservers
(if (and killservers servers)
(begin
(for-each
(lambda (server)
(handle-exceptions
exn
(begin
(debug:print-info 0 *default-log-port* "Unable to get host and/or port from " server ", exn=" exn)
#f)
(match-let (((mod-time host port start-time server-id pid) server))
(if (and host pid)
(tasks:kill-server host pid)))))
servers)
(delete-file* (common:get-sync-lock-filepath))
)
)
(if (not dbfiles)
(debug:print-error 0 *default-log-port* "no dbfiles found in " (conc *toppath* "/.megatest"))
(for-each
(lambda (srcfile)
(debug:print-info 3 *default-log-port* "file: " srcfile)
(let* ((fname (conc (pathname-file srcfile) ".db"))
(basename (pathname-file srcfile))
(run-id (if (string= basename "main") #f (string->number basename)))
(destfile (conc dest-area "/.megatest/" fname))
|
︙ | | |
523
524
525
526
527
528
529
530
531
532
533
534
535
536
|
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
|
+
|
(debug:print-info 0 *default-log-port* "Sync - I do not see file " destfile)
0)))
(changed ( < (- time2 time1) 6.0)) ;; dest db not updated within last 6 seconds
(do-cp (cond
((not (file-exists? destfile)) ;; shouldn't happen, but this might recover
(debug:print-info 2 *default-log-port* "File " destfile " not found. Copying "srcfile" to "destfile)
;; TODO: Need to fix this for WAL mod. Can't just copy.
(system (conc "/bin/mkdir -p " dest-directory))
(system (conc "/bin/cp " srcfile " " destfile))
#t)
(changed ;; (and changed
;; (> (- (current-seconds) time1) 3)) ;; if file is changed and three seconds have passed.
#t)
((and changed *time-to-exit*) ;; last sync
|
︙ | | |
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
|
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
|
+
+
+
|
)
(hash-table-set! sync-durations (conc srcfile ".db") (- (current-milliseconds) start-time)))
(debug:print-info 2 *default-log-port* "skipping delta sync. " srcfile " is up to date")
)
)
)
dbfiles
)
)
data-synced
)
)
;; Sync all changed db's
;;
(define (db:tmp->megatest.db-sync dbstruct run-id last-update)
(let* ((subdbs (hash-table-values (dbr:dbstruct-subdbs dbstruct)))
(res '()))
(for-each
|
︙ | | |
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
|
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
|
+
+
+
-
-
-
+
+
-
|
(db:with-db
dbstruct #f #f
(lambda (dbdat db)
(if msg
(sqlite3:execute db "UPDATE runs SET status=?,comment=? WHERE id=?;" status msg run-id)
(sqlite3:execute db "UPDATE runs SET status=? WHERE id=?;" status run-id)))))
(define (db:set-run-state-status-db db run-id state status )
(sqlite3:execute db "UPDATE runs SET status=?,state=? WHERE id=?;" status state run-id))
(define (db:set-run-state-status dbstruct run-id state status )
(db:with-db
dbstruct #f #f
(lambda (dbdat db)
(sqlite3:execute db "UPDATE runs SET status=?,state=? WHERE id=?;" status state run-id))))
(db:set-run-state-status-db db run-id state status))))
(define (db:get-run-status dbstruct run-id)
(let ((res "n/a"))
(db:with-db
dbstruct #f #f
(lambda (dbdat db)
(sqlite3:for-each-row
(lambda (status)
|
︙ | | |
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
|
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
|
-
-
+
+
|
(db:with-db dbstruct run-id #f
(lambda (dbdat db)
(sqlite3:for-each-row
(lambda (run-id testname item-path state status)
;; id,run_id,testname,state,status,event_time,host,cpuload,diskfree,uname,rundir,item_path,run_duration,final_logf,comment
(set! res (vector test-id run-id testname state status -1 "" -1 -1 "" "-" item-path -1 "-" "-")))
db
"SELECT run_id,testname,item_path,state,status FROM tests WHERE id=?;"
test-id)))
"SELECT run_id,testname,item_path,state,status FROM tests WHERE id=? and run_id=?;"
test-id run-id)))
res))
;; get a useful subset of the tests data (used in dashboard
;; use db:mintest-get-{id ,run_id,testname ...}
;;
(define (db:get-tests-for-run-mindata dbstruct run-id testpatt states statuses not-in)
(db:get-tests-for-run dbstruct run-id testpatt states statuses #f #f not-in #f #f "id,run_id,testname,state,status,event_time,item_path" 0 #f))
|
︙ | | |
2400
2401
2402
2403
2404
2405
2406
2407
2408
2409
2410
2411
2412
2413
2414
2415
2416
2417
2418
2419
2420
2421
2422
2423
2424
2425
2426
2427
2428
|
2428
2429
2430
2431
2432
2433
2434
2435
2436
2437
2438
2439
2440
2441
2442
2443
2444
2445
2446
2447
2448
2449
2450
2451
2452
2453
2454
2455
2456
2457
2458
2459
|
-
+
-
+
+
+
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
|
;; ;; NB// Ultimately this will be deprecated in deference to mt:test-set-state-status-by-id
;;
;; NOTE: run-id is not used
;; ;;
(define (db:test-set-state-status dbstruct run-id test-id newstate newstatus newcomment)
(db:with-db
dbstruct
run-id
run-id #f
#t
(lambda (dbdat db)
(db:test-set-state-status-db db run-id test-id newstate newstatus newcomment))))
(define (db:test-set-state-status-db db run-id test-id newstate newstatus newcomment)
(cond
((and newstate newstatus newcomment)
(sqlite3:execute db "UPDATE tests SET state=?,status=?,comment=? WHERE id=?;" newstate newstatus newcomment ;; (sdb:qry 'getid newcomment)
test-id))
((and newstate newstatus)
(sqlite3:execute db "UPDATE tests SET state=?,status=? WHERE id=?;" newstate newstatus test-id))
(else
(if newstate (sqlite3:execute db "UPDATE tests SET state=? WHERE id=?;" newstate test-id))
(if newstatus (sqlite3:execute db "UPDATE tests SET status=? WHERE id=?;" newstatus test-id))
(if newcomment (sqlite3:execute db "UPDATE tests SET comment=? WHERE id=?;" newcomment ;; (sdb:qry 'getid newcomment)
test-id))))))
(mt:process-triggers dbstruct run-id test-id newstate newstatus))
(cond
((and newstate newstatus newcomment)
(sqlite3:execute db "UPDATE tests SET state=?,status=?,comment=? WHERE id=?;" newstate newstatus newcomment ;; (sdb:qry 'getid newcomment)
test-id))
((and newstate newstatus)
(sqlite3:execute db "UPDATE tests SET state=?,status=? WHERE id=?;" newstate newstatus test-id))
(else
(if newstate (sqlite3:execute db "UPDATE tests SET state=? WHERE id=?;" newstate test-id))
(if newstatus (sqlite3:execute db "UPDATE tests SET status=? WHERE id=?;" newstatus test-id))
(if newcomment (sqlite3:execute db "UPDATE tests SET comment=? WHERE id=?;" newcomment ;; (sdb:qry 'getid newcomment)
test-id))))
;; (mt:process-triggers dbstruct run-id test-id newstate newstatus)) ;; NOTE: Moved into calling function
)
;; NEW BEHAVIOR: Count tests running in all runs!
;;
(define (db:get-count-tests-running dbstruct run-id) ;; fastmode)
(let* ((qry ;; (if fastmode
;; "SELECT count(id) FROM tests WHERE state in ('RUNNING','LAUNCHED','REMOTEHOSTSTART') AND NOT (uname = 'n/a' AND item_path = '') LIMIT 1;"
"SELECT count(id) FROM tests WHERE state in ('RUNNING','LAUNCHED','REMOTEHOSTSTART') AND NOT (uname = 'n/a' AND item_path = '');")) ;; )
|
︙ | | |
2713
2714
2715
2716
2717
2718
2719
2720
2721
2722
2723
2724
2725
2726
2727
2728
2729
2730
2731
2732
2733
2734
|
2744
2745
2746
2747
2748
2749
2750
2751
2752
2753
2754
2755
2756
2757
2758
2759
2760
2761
2762
2763
2764
2765
2766
2767
2768
|
+
+
+
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
|
(define (db:get-test-info dbstruct run-id test-name item-path)
(db:with-db
dbstruct
run-id
#f
(lambda (dbdat db)
(db:get-test-info-db db run-id test-name item-path))))
(define (db:get-test-info-db db run-id test-name item-path)
(let ((res #f))
(sqlite3:for-each-row
(lambda (a . b)
(set! res (apply vector a b)))
db
(conc "SELECT " db:test-record-qry-selector " FROM tests WHERE testname=? AND item_path=? AND run_id=?;")
test-name item-path run-id)
res))))
(let ((res #f))
(sqlite3:for-each-row
(lambda (a . b)
(set! res (apply vector a b)))
db
(conc "SELECT " db:test-record-qry-selector " FROM tests WHERE testname=? AND item_path=? AND run_id=?;")
test-name item-path run-id)
res))
(define (db:test-get-rundir-from-test-id dbstruct run-id test-id)
(db:with-db
dbstruct
run-id
#f
(lambda (dbdat db)
|
︙ | | |
3175
3176
3177
3178
3179
3180
3181
3182
3183
3184
3185
3186
3187
3188
3189
3190
3191
3192
3193
3194
3195
3196
3197
3198
3199
3200
3201
3202
3203
3204
3205
3206
3207
3208
3209
3210
3211
3212
3213
3214
3215
3216
3217
3218
3219
|
3209
3210
3211
3212
3213
3214
3215
3216
3217
3218
3219
3220
3221
3222
3223
3224
3225
3226
3227
3228
3229
3230
3231
3232
3233
3234
3235
3236
3237
3238
3239
3240
3241
3242
3243
3244
3245
3246
3247
3248
3249
3250
3251
3252
3253
3254
3255
3256
3257
3258
|
-
+
+
+
-
+
-
-
-
-
+
+
+
+
+
+
-
-
-
-
-
+
+
+
+
+
-
-
-
+
+
+
+
|
(test-name (if (number? test-name)
(db:test-get-testname testdat)
test-name))
(item-path (db:test-get-item-path testdat))
(tl-testdat (db:get-test-info dbstruct run-id test-name ""))
(tl-test-id (if tl-testdat
(db:test-get-id tl-testdat)
#f)))
#f))
(new-state-eh #f)
(new-status-eh #f))
(if (member state '("LAUNCHED" "REMOTEHOSTSTART"))
(db:general-call dbstruct run-id 'set-test-start-time (list test-id)))
(mutex-lock! *db-transaction-mutex*)
(db:with-db
dbstruct run-id #f
(lambda (dbdat db)
(let ((tr-res
(sqlite3:with-transaction
db
(lambda ()
;; NB// Pass the db so it is part fo the transaction
(db:test-set-state-status db run-id test-id state status comment) ;; this call sets the item state/status
(db:test-set-state-status-db db run-id test-id state status comment) ;; this call sets the item state/status
(if (not (equal? item-path "")) ;; only roll up IF incoming test is an item
(let* ((state-status-counts (db:get-all-state-status-counts-for-test db run-id test-name item-path state status)) ;; item-path is used to exclude current state/status of THIS test
(state-statuses (db:roll-up-rules state-status-counts state status))
(newstate (car state-statuses))
(newstatus (cadr state-statuses)))
(let* ((state-status-counts (db:get-all-state-status-counts-for-test db run-id test-name item-path state status)) ;; item-path is used to exclude current state/status of THIS test
(state-statuses (db:roll-up-rules state-status-counts state status))
(newstate (car state-statuses))
(newstatus (cadr state-statuses)))
(set! new-state-eh newstate)
(set! new-status-eh newstatus)
(debug:print 4 *default-log-port* "BB> tl-test-id="tl-test-id" ; "test-name":"item-path" newstate="newstate" newstatus="newstatus" len(sscs)="(length state-status-counts) " state-status-counts: "
(apply conc
(map (lambda (x)
(conc
(with-output-to-string (lambda () (pp (dbr:counts->alist x)))) " | "))
state-status-counts))); end debug:print
(apply conc
(map (lambda (x)
(conc
(with-output-to-string (lambda () (pp (dbr:counts->alist x)))) " | "))
state-status-counts))); end debug:print
(if tl-test-id
(db:test-set-state-status db run-id tl-test-id newstate newstatus #f)) ;; we are still in the transaction - must access the db and not the dbstruct
(if tl-test-id
(db:test-set-state-status-db db run-id tl-test-id newstate newstatus #f)) ;; we are still in the transaction - must access the db and not the dbstruct
))))))
(mutex-unlock! *db-transaction-mutex*)
(if (and test-id state status (equal? status "AUTO"))
(db:test-data-rollup dbstruct run-id test-id status))
(if new-state-eh ;; moved from db:test-set-state-status
(mt:process-triggers dbstruct run-id test-id new-state-eh new-status-eh))
tr-res)))))
(define (db:roll-up-rules state-status-counts state status)
(let* ((running (length (filter (lambda (x)
(member (dbr:counts-state x) *common:running-states*))
state-status-counts)))
(bad-not-started (length (filter (lambda (x)
|
︙ | | |
3273
3274
3275
3276
3277
3278
3279
3280
3281
3282
3283
3284
3285
3286
3287
3288
3289
3290
3291
3292
3293
3294
3295
3296
3297
3298
3299
3300
3301
3302
3303
3304
3305
3306
3307
3308
3309
3310
3311
3312
3313
3314
3315
3316
3317
3318
3319
3320
3321
3322
3323
3324
3325
3326
3327
3328
3329
3330
3331
3332
3333
3334
3335
3336
3337
3338
3339
3340
3341
3342
3343
3344
3345
3346
|
3312
3313
3314
3315
3316
3317
3318
3319
3320
3321
3322
3323
3324
3325
3326
3327
3328
3329
3330
3331
3332
3333
3334
3335
3336
3337
3338
3339
3340
3341
3342
3343
3344
3345
3346
3347
3348
3349
3350
3351
3352
3353
3354
3355
3356
3357
3358
3359
3360
3361
3362
3363
3364
3365
3366
3367
3368
3369
3370
3371
3372
3373
3374
3375
3376
3377
3378
3379
3380
3381
|
-
+
-
+
-
+
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
-
+
+
+
+
+
-
-
+
+
-
+
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
-
-
-
+
+
-
+
-
|
(db:with-db
dbstruct run-id #f
(lambda (dbdat db)
(let ((tr-res
(sqlite3:with-transaction
db
(lambda ()
(let* ((state-status-counts (db:get-all-state-status-counts-for-run db run-id))
(let* ((state-status-counts (db:get-all-state-status-counts-for-run-db db run-id))
(state-statuses (db:roll-up-rules state-status-counts #f #f ))
(newstate (car state-statuses))
(newstatus (cadr state-statuses)))
(if (or (not (eq? newstate curr-state)) (not (eq? newstatus curr-status)))
(db:set-run-state-status db run-id newstate newstatus )))))))
(db:set-run-state-status-db db run-id newstate newstatus )))))))
(mutex-unlock! *db-transaction-mutex*)
tr-res))))
(define (db:get-all-state-status-counts-for-run dbstruct run-id)
(define (db:get-all-state-status-counts-for-run-db db run-id)
(let* ((test-count-recs (db:with-db
dbstruct #f #f
(lambda (dbdat db)
(sqlite3:map-row
(lambda (state status count)
(make-dbr:counts state: state status: status count: count))
db
"SELECT state,status,count(id) FROM tests WHERE run_id=? GROUP BY state,status;"
run-id )))))
(sqlite3:map-row
(lambda (state status count)
(make-dbr:counts state: state status: status count: count))
db
"SELECT state,status,count(id) FROM tests WHERE run_id=? GROUP BY state,status;"
run-id ))
test-count-recs))
(define (db:get-all-state-status-counts-for-run dbstruct run-id)
(db:with-db
dbstruct #f #f
(lambda (dbdat db)
(db:get-all-state-status-counts-for-run-db dbstruct run-id))))
;; BBnote: db:get-all-state-status-counts-for-test returns dbr:counts object aggregating state and status of items of a given test, *not including rollup state/status*
;;
;; NOTE: This is called within a transaction
;;
(define (db:get-all-state-status-counts-for-test dbstruct run-id test-name item-path item-state-in item-status-in)
(let* ((test-info (db:get-test-info dbstruct run-id test-name item-path))
(define (db:get-all-state-status-counts-for-test db run-id test-name item-path item-state-in item-status-in)
(let* ((test-info (db:get-test-info-db db run-id test-name item-path))
(item-state (or item-state-in (db:test-get-state test-info)))
(item-status (or item-status-in (db:test-get-status test-info)))
(other-items-count-recs (db:with-db
(other-items-count-recs (sqlite3:map-row
dbstruct run-id #f
(lambda (dbdat db)
(sqlite3:map-row
(lambda (state status count)
(make-dbr:counts state: state status: status count: count))
db
;; ignore current item because we have changed its value in the current transation so this select will see the old value.
"SELECT state,status,count(id) FROM tests WHERE run_id=? AND testname=? AND item_path != '' AND item_path !=? GROUP BY state,status;"
run-id test-name item-path))))
(lambda (state status count)
(make-dbr:counts state: state status: status count: count))
db
;; ignore current item because we have changed its value in the current transation so this select will see the old value.
"SELECT state,status,count(id) FROM tests WHERE run_id=? AND testname=? AND item_path != '' AND item_path !=? GROUP BY state,status;"
run-id test-name item-path))
;; add current item to tally outside of sql query
(match-countrec-lambda (lambda (countrec)
(and (equal? (dbr:counts-state countrec) item-state)
(match-countrec-lambda (lambda (countrec)
(and (equal? (dbr:counts-state countrec) item-state)
(equal? (dbr:counts-status countrec) item-status))))
(already-have-count-rec-list
(filter match-countrec-lambda other-items-count-recs)) ;; will have either 0 or 1 count recs depending if another item shares this item's state/status
(updated-count-rec (if (null? already-have-count-rec-list)
(make-dbr:counts state: item-state status: item-status count: 1)
(let* ((our-count-rec (car already-have-count-rec-list))
(new-count (add1 (dbr:counts-count our-count-rec))))
(make-dbr:counts state: item-state status: item-status count: new-count))))
(nonmatch-countrec-lambda (lambda (countrec) (not (match-countrec-lambda countrec))))
(unrelated-rec-list
(filter nonmatch-countrec-lambda other-items-count-recs)))
(cons updated-count-rec unrelated-rec-list)))
;; (define (db:get-all-item-states db run-id test-name)
;; (sqlite3:map-row
;; (lambda (a) a)
;; db
;; "SELECT DISTINCT state FROM tests WHERE item_path != '' AND state != 'DELETED' AND run_id=? AND testname=?"
|
︙ | | |
4614
4615
4616
4617
4618
4619
4620
4621
4622
4623
4624
4625
4626
4627
4628
|
4649
4650
4651
4652
4653
4654
4655
4656
4657
4658
4659
4660
4661
4662
4663
4664
|
+
-
+
|
(sqlite3:finalize! db #t)
;; (vector-set! *task-db* 0 #f)
(set! *task-db* #f)))))
(if (and (not (args:get-arg "-server"))
*runremote*)
(begin
(debug:print-info 0 *default-log-port* "Closing all client connections...")
(http-transport:close-connections *runremote*)
(http-client#close-all-connections!)))
#;(http-client#close-all-connections!)))
;; (if (and *runremote*
;; (remote-conndat *runremote*))
;; (begin
;; (http-client#close-all-connections!))) ;; for http-client
(if (not (eq? *default-log-port* (current-error-port)))
(close-output-port *default-log-port*))
(set! *default-log-port* (current-error-port))) "Cleanup db exit thread"))
|
︙ | | |
︙ | | |
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
|
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
|
-
+
-
+
-
+
|
(define (dbfile:print-err . params)
(with-output-to-port
(current-error-port)
(lambda ()
(apply print params))))
(define (dbfile:cautious-open-database fname init-proc sync-mode journal-mode #!optional (tries-left 500))
(let* ((busy-file (conc fname"-journal"))
(let* ((busy-file (conc fname "-journal"))
(delay-time (* (- 51 tries-left) 1.1))
(write-access (file-write-access? fname))
(dir-access (file-write-access? (pathname-directory fname)))
(retry (lambda ()
(thread-sleep! delay-time)
(if (> tries-left 0)
(dbfile:cautious-open-database fname init-proc
sync-mode: sync-mode journal-mode: journal-mode
sync-mode: sync-mode journal-mode
(- tries-left 1))))))
(assert (>= tries-left 0) (conc "FATAL: too many attempts in dbfile:cautious-open-database of "fname", giving up."))
(if (and (file-write-access? fname)
(file-exists? busy-file))
(begin
(if (common:low-noise-print 120 busy-file)
(dbfile:print-err "INFO: dbfile:cautious-open-database: journal file "
busy-file" exists, trying again in few seconds."))
(thread-sleep! 1)
(if (eq? tries-left 2)
(begin
(dbfile:print-err "INFO: forcing journal rollup "busy-file)
(dbfile:brute-force-salvage-db fname)))
(dbfile:cautious-open-database fname init-proc sync-mode: sync-mode journal-mode: journal-mode (- tries-left 1)))
(dbfile:cautious-open-database fname init-proc sync-mode journal-mode (- tries-left 1)))
(let* ((result (condition-case
(if dir-access
(dbfile:with-simple-file-lock
(conc fname ".lock")
(lambda ()
(let* ((db-exists (file-exists? fname))
|
︙ | | |
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
|
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
|
+
-
+
|
(dbfile:print-err "INFO: db:lock-and-delta-sync copying db "runid" at "(current-seconds))
(set! *db-sync-in-progress* #t)
(db:sync-touched dbstruct runid keys dbinit)
(set! *db-sync-in-progress* #f)
(delete-file* lock-file)
#t)
(begin
(if (common:low-noise-print 120 (conc "no lock "from-db-file))
(dbfile:print-err "INFO: could not get lock for " from-db-file ", sync likely in progress.")
(dbfile:print-err "INFO: could not get lock for " from-db-file ", sync likely in progress."))
#f
))))
;; ;; Get a lock from the no-sync-db for the from-db, then delta sync the from-db to the to-db, otherwise return #f
;; ;;
;; (define (db:lock-and-delta-sync-orig no-sync-db dbstruct from-db-file runid keys dbinit)
;; (assert (not *db-sync-in-progress*) "FATAL: db:lock-and-sync called while a sync is in progress.")
|
︙ | | |
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
|
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
|
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
|
'("owner" #f)
'("description" #f)
'("reviewed" #f)
'("iterated" #f)
'("avg_runtime" #f)
'("avg_disk" #f)
'("tags" #f)
'("jobgroup" #f)))))
'("jobgroup" #f))
(list "tasks_queue"
'("id" #f)
'("action" #f)
'("owner" #f)
'("state" #f)
'("target" #f)
'("name" #f)
'("testpatt" #f)
'("keylock" #f)
'("params" #f)
'("creation_time" #f)
'("execution_time" #f))
)))
(define (db:sync-all-tables-list dbstruct keys)
(append (db:sync-main-list dbstruct keys)
db:sync-tests-only))
;; tbls is ( ("tablename" ( "field1" [#f|proc1] ) ( "field2" [#f|proc2] ) .... ) )
;; db's are dbdat's
|
︙ | | |
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
|
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
|
+
+
+
|
(define dbfile:db-init-proc (make-parameter #f))
;; (db:with-db dbstruct run-id sqlite3:exec "select blah fgrom blaz;")
;; r/w is a flag to indicate if the db is modified by this query #t = yes, #f = no
;;
(define (db:with-db dbstruct run-id r/w proc . params)
(assert dbstruct "FATAL: db:with-db called with dbstruct "#f)
(assert (dbr:dbstruct? dbstruct) "FATAL: dbstruct is "dbstruct)
(let* ((use-mutex (> *api-process-request-count* 25)) ;; risk of db corruption
(have-struct (dbr:dbstruct? dbstruct))
(dbdat (if have-struct ;; this stuff just allows us to call with a db handle directly
(db:open-db dbstruct run-id (dbfile:db-init-proc)) ;; (dbfile:get-subdb dbstruct run-id)
#f))
(db (if have-struct ;; this stuff just allows us to call with a db handle directly
(dbr:dbdat-dbh dbdat)
dbstruct))
(fname (if dbdat
(dbr:dbdat-dbfile dbdat)
"nofilenameavailable"))
(jfile (conc fname"-journal"))
#;(subdb (if have-struct
(dbfile:get-subdb dbstruct run-id)
#f))
) ;; was 25
(assert (sqlite3:database? db) "FATAL: db:with-db, db is not a database, db="db", fname="fname)
(if (file-exists? jfile)
(begin
(dbfile:print-err "INFO: "jfile" exists, delaying to reduce database load")
(thread-sleep! 0.2)))
(if (and use-mutex
(common:low-noise-print 120 "over-50-parallel-api-requests"))
(dbfile:print-err *api-process-request-count* " parallel api requests being processed in process "
|
︙ | | |
︙ | | |
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
|
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
|
-
+
|
(config-use-proxy (equal? (configf:lookup *configdat* "client" "use-http_proxy") "yes")))
(if (not config-use-proxy)
(determine-proxy (constantly #f)))
(debug:print-info 0 *default-log-port* "http-transport:try-start-server time=" (seconds->time-string (current-seconds)) " ipaddrsstr=" ipaddrstr " portnum=" portnum " config-hostname=" config-hostname)
(handle-exceptions
exn
(begin
(print-error-message exn)
;; (print-error-message exn)
(if (< portnum 64000)
(begin
(debug:print 0 *default-log-port* "WARNING: attempt to start server failed. Trying again ...")
(debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn))
(debug:print 5 *default-log-port* "exn=" (condition->list exn))
(portlogger:open-run-close portlogger:set-failed portnum)
(debug:print 0 *default-log-port* "WARNING: failed to start on portnum: " portnum ", trying next port")
|
︙ | | |
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
|
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
|
-
+
+
|
;; (remote-conndat-set! runremote #f))
;; Killing associated server to allow clean retry.")
;; (tasks:kill-server-run-id run-id) ;; better to kill the server in the logic that called this routine?
(mutex-unlock! *http-mutex*)
;; (signal (make-composite-condition
;; (make-property-condition 'commfail 'message "failed to connect to server")))
;; "communications failed"
(close-all-connections!)
;; (close-all-connections!)
(close-connection! fullurl)
(db:obj->string #f))
(with-input-from-request ;; was dat
fullurl
(list (cons 'key (or server-id "thekey"))
(cons 'cmd cmd)
(cons 'params sparams))
read-string))
|
︙ | | |
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
|
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
|
-
-
+
+
+
+
-
+
+
|
(signal (make-composite-condition
(make-property-condition
'timeout
'message "nmsg-transport:client-api-send-receive-raw timed out talking to server")))))))
;; careful closing of connections stored in *runremote*
;;
(define (http-transport:close-connections #!key (area-dat #f))
(let* ((runremote (or area-dat *runremote*))
(define (http-transport:close-connections runremote)
(let* (;; (runremote (or area-dat *runremote*))
(server-dat (if runremote
(remote-conndat runremote)
#f))) ;; (hash-table-ref/default *runremote* run-id #f)))
(if (vector? server-dat)
(let ((api-dat (http-transport:server-dat-get-api-uri server-dat)))
(handle-exceptions
exn
(begin
(print-call-chain *default-log-port*)
(debug:print-error 0 *default-log-port* " closing connection failed with error: " ((condition-property-accessor 'exn 'message) exn) ", exn=" exn))
(if (args:any-defined? "-server" "-execute" "-run")
(debug:print-info 0 *default-log-port* "Closing connections to "api-dat))
(close-connection! api-dat)
(close-idle-connections!)
(close-connection! (http-transport:server-dat-make-url server-dat))
(remote-conndat-set! runremote #f)
#t))
#f)))
(define (make-http-transport:server-dat)(make-vector 6))
(define (http-transport:server-dat-get-iface vec) (vector-ref vec 0))
(define (http-transport:server-dat-get-port vec) (vector-ref vec 1))
|
︙ | | |
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
|
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
|
-
+
|
(print-call-chain (current-error-port))
(debug:print-error 0 *default-log-port* "call to http-transport:server-dat-update-last-access with non-vector!!"))))
;;
;; connect
;;
(define (http-transport:client-connect iface port server-id)
(debug:print-info 0 *default-log-port* "Connecting to client at "iface":"port", with server-id "server-id)
(debug:print-info 2 *default-log-port* "Connecting to server at "iface":"port", id "server-id)
(let* ((api-url (conc "http://" iface ":" port "/api"))
(api-uri (uri-reference (conc "http://" iface ":" port "/api")))
(api-req (make-request method: 'POST uri: api-uri))
(server-dat (vector iface port api-uri api-url api-req (current-seconds) server-id)))
server-dat))
|
︙ | | |
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
|
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
|
-
+
-
-
-
-
-
-
+
-
+
-
-
+
+
-
-
-
-
-
+
-
-
-
-
+
+
|
(debug:print-info 0 *default-log-port* "Waiting for server alive signature")
(mutex-lock! *heartbeat-mutex*)
(set! sdat *server-info*)
(mutex-unlock! *heartbeat-mutex*)
(if (and sdat
(not changed)
(> (- (current-seconds) start-time) 2))
(let* ((servinfodir (conc *toppath*"/.servinfo"))
(let* ((servinfodir (server:get-servinfo-dir *toppath*)) ;; (conc *toppath*"/.servinfo"))
(ipaddr (car sdat))
(port (cadr sdat))
(servinf (conc servinfodir"/"ipaddr":"port)))
(set! servinfofile servinf)
(if (not (file-exists? servinfodir))
(create-directory servinfodir #t))
(with-output-to-file servinf
(lambda ()
(let* ((serv-id (server:mk-signature)))
(set! *server-id* serv-id)
(print "SERVER STARTED: "ipaddr":"port" AT "(current-seconds)" server-id: "serv-id" pid: "(current-process-id))
(print "started: "(seconds->year-week/day-time (current-seconds))))))
(set! *on-exit-procs* (cons
(lambda ()
(delete-file* servinf))
*on-exit-procs*))
;; put data about this server into a simple flat file host.port
(debug:print-info 0 *default-log-port* "Received server alive signature")
#;(common:save-pkt `((action . alive)
(T . server)
(pid . ,(current-process-id))
(ipaddr . ,(car sdat))
(port . ,(cadr sdat)))
*configdat* #t)
sdat)
(begin
(debug:print-info 0 *default-log-port* "Still waiting, last-sdat=" last-sdat)
(sleep 4)
(if (> (- (current-seconds) start-time) 120) ;; been waiting for two minutes
(if sdat
(let* ((ipaddr (car sdat))
(let* ((ipaddr (car sdat))
(port (cadr sdat))
(servinf (conc *toppath*"/.servinfo/"ipaddr":"port)))
(debug:print-error 0 *default-log-port* "transport appears to have died, exiting server")
(servinf (conc (server:get-servinfo-dir *toppath*)"/"ipaddr":"port)))
(debug:print-error 0 *default-log-port* "transport appears to have died, exiting server")
;; (delete-file* servinf) ;; handled by on-exit, can be removed
#;(common:save-pkt `((action . died)
(T . server)
(pid . ,(current-process-id))
(ipaddr . ,(car sdat))
(exit))
(port . ,(cadr sdat))
(msg . "Transport died?"))
*configdat* #t)
(exit))
(exit)
)
(loop start-time
(equal? sdat last-sdat)
sdat)))))))
(iface (car server-info))
(port (cadr server-info))
(last-access 0)
(server-timeout (server:expiration-timeout))
|
︙ | | |
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
|
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
|
-
+
+
|
(set! server-going #t)
(debug:print 0 *default-log-port* "SERVER: running, megatest version: " (common:get-full-version))) ;; NOTE: the server is NOT yet marked as running in the log. We do that in the keep-running routine.
(if (and no-sync-db
(common:low-noise-print 10 "sync-all")) ;; cheesy way to reduce frequency of running sync :)
(begin
(if (common:low-noise-print 120 "sync-all-print")
(debug:print 0 *default-log-port* "keep-running calling db:all-db-sync at " (time->string (seconds->local-time) "%H:%M:%S")))
(db:all-db-sync *dbstruct-dbs*))))
(db:all-db-sync *dbstruct-dbs*)
)))
;; when things go wrong we don't want to be doing the various queries too often
;; so we strive to run this stuff only every four seconds or so.
(let* ((sync-time (- (current-milliseconds) start-time))
(rem-time (quotient (- 4000 sync-time) 1000)))
(if (and (<= rem-time 4)
(> rem-time 0))
|
︙ | | |
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
|
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
|
-
+
-
+
-
+
-
+
|
(current-seconds)))
(if (common:low-noise-print 120 "server continuing")
(debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access))
(let ((curr-time (current-seconds)))
(handle-exceptions
exn
(debug:print 0 *default-log-port* "ERROR: Failed to change timestamp on info file " servinfofile ". Are you out of space on that disk? exn=" exn)
(if (and (< (- (current-seconds) server-start-time) 600) ;; run for ten minutes for experiment, 3600 thereafter
(if (and ;; (< (- (current-seconds) server-start-time) 600) ;; run for ten minutes for experiment, 3600 thereafter
(not *server-overloaded*)
(file-exists? servinfofile))
(change-file-times servinfofile curr-time curr-time)))
(if (or (common:low-noise-print 120 "start new server")
(if (and (common:low-noise-print 120 "start new server")
(> *api-process-request-count* 50)) ;; if this server is kind of busy start up another
(begin
(debug:print-info 0 *default-log-port* "Server is busy, parallel-api-count "*api-process-request-count*", start another if possible...")
(debug:print-info 0 *default-log-port* "Server is busy, api-count "*api-process-request-count*", start another if possible...")
(server:kind-run *toppath*)
(if (> *api-process-request-count* 100)
(begin
(debug:print-info 0 *default-log-port* "Server is overloaded at parallel-api-count="*api-process-request-count*", removing "servinfofile)
(debug:print-info 0 *default-log-port* "Server is overloaded at api-count=" *api-process-request-count*", removing "servinfofile)
(delete-file* servinfofile)))))))
(loop 0 server-state bad-sync-count (current-milliseconds)))
(else
(debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access))
(http-transport:server-shutdown port)))))))
(define (http-transport:server-shutdown port)
|
︙ | | |
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
|
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
|
-
-
-
+
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
+
+
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
|
(exit)))
;; all routes though here end in exit ...
;;
;; start_server?
;;
(define (http-transport:launch)
;; check that a server start is in progress, pause or exit if so
(let* ((tmp-area (common:get-db-tmp-area))
(server-start (conc tmp-area "/.server-start"))
;; check the .servinfo directory, are there other servers running on this
;; or another host?
(let* ((server-start-is-ok (server:minimal-check *toppath*)))
(server-started (conc tmp-area "/.server-started"))
(start-time (common:lazy-modification-time server-start))
(started-time (common:lazy-modification-time server-started))
(server-starting (< start-time started-time)) ;; if start-time is less than started-time then a server is still starting
(start-time-old (> (- (current-seconds) start-time) 5))
(cleanup-proc (lambda (msg)
(let* ((serv-fname (conc "server-" (current-process-id) "-" (get-host-name) ".log"))
(full-serv-fname (conc *toppath* "/logs/" serv-fname))
(new-serv-fname (conc *toppath* "/logs/" "defunct-" serv-fname)))
(debug:print 0 *default-log-port* msg)
(if (common:file-exists? full-serv-fname)
(system (conc "sleep 1;mv -f " full-serv-fname " " new-serv-fname))
(debug:print 0 *default-log-port* "INFO: cannot move " full-serv-fname " to " new-serv-fname))
(exit)))))
#;(if (and (not start-time-old) ;; last server start try was less than five seconds ago
(not server-starting))
(if (not server-start-is-ok)
(begin
(cleanup-proc "NOT starting server, there is either a recently started server or a server in process of starting")
(exit)))
;; lets not even bother to start if there are already three or more server files ready to go
#;(let* ((num-alive (server:get-num-alive (server:get-list *toppath*))))
(if (> num-alive 3)
(begin
(cleanup-proc (conc "ERROR: Aborting server start because there are already " num-alive " possible servers either running or starting up"))
(exit))))
#;(common:save-pkt `((action . start)
(T . server)
(debug:print 0 *default-log-port* "ERROR: server start not ok, exiting now.")
(exit 1))))
;; check that a server start is in progress, pause or exit if so
(pid . ,(current-process-id)))
*configdat* #t)
(let* ((th2 (make-thread (lambda ()
(debug:print-info 0 *default-log-port* "Server run thread started")
(http-transport:run
(if (args:get-arg "-server")
(args:get-arg "-server")
"-")
)) "Server run"))
(th3 (make-thread (lambda ()
(debug:print-info 0 *default-log-port* "Server monitor thread started")
(http-transport:keep-running)
"Keep running"))))
(thread-start! th2)
(thread-sleep! 0.25) ;; give the server time to settle before starting the keep-running monitor.
(thread-start! th3)
(set! *didsomething* #t)
(thread-join! th2)
(exit))))
(let* ((th2 (make-thread (lambda ()
(debug:print-info 0 *default-log-port* "Server run thread started")
(http-transport:run
(if (args:get-arg "-server")
(args:get-arg "-server")
"-")
)) "Server run"))
(th3 (make-thread (lambda ()
(debug:print-info 0 *default-log-port* "Server monitor thread started")
(http-transport:keep-running)
"Keep running"))))
(thread-start! th2)
(thread-sleep! 0.25) ;; give the server time to settle before starting the keep-running monitor.
(thread-start! th3)
(set! *didsomething* #t)
(thread-join! th2)
(exit)))
;; (define (http-transport:server-signal-handler signum)
;; (signal-mask! signum)
;; (handle-exceptions
;; exn
;; (debug:print 0 *default-log-port* " ... exiting ...")
;; (let ((th1 (make-thread (lambda ()
|
︙ | | |
︙ | | |
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
-
+
-
-
+
|
;;======================================================================
;; S U P P O R T F U N C T I O N S
;;======================================================================
;; if a server is either running or in the process of starting call client:setup
;; else return #f to let the calling proc know that there is no server available
;;
(define (rmt:get-connection-info areapath #!key (area-dat #f)) ;; TODO: push areapath down.
(define (rmt:get-connection-info areapath runremote) ;; TODO: push areapath down.
(let* ((runremote (or area-dat *runremote*))
(cinfo (if (remote? runremote)
(let* ((cinfo (if (remote? runremote)
(remote-conndat runremote)
#f)))
(if cinfo
cinfo
(if (server:check-if-running areapath)
(client:setup areapath)
#f))))
|
︙ | | |
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
|
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
|
+
+
-
+
-
|
;; reset the connection if it has been unused too long
((and runremote
(remote-conndat runremote)
(> (current-seconds) ;; if it has been more than server-timeout seconds since last contact, close this connection and start a new on
(+ (http-transport:server-dat-get-last-access (remote-conndat runremote))
(remote-server-timeout runremote))))
(debug:print-info 0 *default-log-port* "Connection to " (remote-server-url runremote) " expired due to no accesses, forcing new connection.")
(http-transport:close-connections runremote)
;; moving this setting of runremote conndat to #f to inside the http-transport:close-connections
(remote-conndat-set! runremote #f) ;; invalidate the connection, thus forcing a new connection.
;; (remote-conndat-set! runremote #f) ;; invalidate the connection, thus forcing a new connection.
(http-transport:close-connections area-dat: runremote)
(mutex-unlock! *rmt-mutex*)
(rmt:send-receive cmd rid params attemptnum: attemptnum))
;;DOT CASE5 [label="local\nread"];
;;DOT MUTEXLOCK -> CASE5 [label="server not required,\non homehost,\nread-only query"]; {rank=same "case 5" CASE5};
;;DOT CASE5 -> "rmt:open-qry-close-locally";
|
︙ | | |
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
|
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
|
-
+
|
(not (remote-conndat runremote)))
(and (not (cdr (remote-hh-dat runremote))) ;; not on a homehost
(not (remote-conndat runremote)))) ;; and no connection
(debug:print-info 12 *default-log-port* "rmt:send-receive, case 9, hh-dat: " (remote-hh-dat runremote) " conndat: " (remote-conndat runremote))
(mutex-unlock! *rmt-mutex*)
(if (not (server:check-if-running *toppath*)) ;; who knows, maybe one has started up?
(server:start-and-wait *toppath*))
(remote-conndat-set! runremote (rmt:get-connection-info *toppath*)) ;; calls client:setup which calls client:setup-http
(remote-conndat-set! runremote (rmt:get-connection-info *toppath* runremote)) ;; calls client:setup which calls client:setup-http
(rmt:send-receive cmd rid params attemptnum: attemptnum)) ;; TODO: add back-off timeout as
;;DOT CASE10 [label="on homehost"];
;;DOT MUTEXLOCK -> CASE10 [label="server not required,\non homehost"]; {rank=same "case 10" CASE10};
;;DOT CASE10 -> "rmt:open-qry-close-locally";
;; all set up if get this far, dispatch the query
((and (not (remote-force-server runremote))
|
︙ | | |
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
|
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
|
-
-
+
|
(success (if (vector? dat) (vector-ref dat 0) #f))
(res (if (vector? dat) (vector-ref dat 1) #f)))
(if (and (vector? conninfo) (< 5 (vector-length conninfo)))
(http-transport:server-dat-update-last-access conninfo) ;; refresh access time
(begin
(debug:print 0 *default-log-port* "INFO: Should not get here! conninfo=" conninfo)
(set! conninfo #f)
(remote-conndat-set! *runremote* #f) ;; NOTE: *runremote* is global copy of runremote. Purpose: factor out global.
(http-transport:close-connections area-dat: runremote)))
(http-transport:close-connections runremote)))
(debug:print-info 13 *default-log-port* "rmt:send-receive, case 9. conninfo=" conninfo " dat=" dat " runremote = " runremote)
(mutex-unlock! *rmt-mutex*)
(if success ;; success only tells us that the transport was
;; successful, have to examine the data to see if
;; there was a detected issue at the other end
(extras-transport-succeded *default-log-port* *rmt-mutex* attemptnum runremote res params rid cmd)
(begin
|
︙ | | |
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
|
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
|
-
-
+
|
(debug:print-info 12 log-port "rmt:send-receive, case 3")
(debug:print 0 log-port "WARNING: write transaction requested on a readonly area. cmd="cmd" params="params)
#f)
(define (extras-transport-failed *default-log-port* *rmt-mutex* attemptnum runremote cmd rid params)
(debug:print 0 *default-log-port* "WARNING: communication failed. Trying again, try num: " attemptnum)
(mutex-lock! *rmt-mutex*)
(remote-conndat-set! runremote #f)
(http-transport:close-connections area-dat: runremote)
(http-transport:close-connections runremote)
(remote-server-url-set! runremote #f)
(mutex-unlock! *rmt-mutex*)
(debug:print-info 12 *default-log-port* "rmt:send-receive, case 9.1")
(rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1)))
(define (extras-transport-succeded *default-log-port* *rmt-mutex* attemptnum runremote res params rid cmd)
(if (and (vector? res)
|
︙ | | |
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
|
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
|
-
+
|
;; server is
;; overloaded and we
;; want to ease off
;; the queries
(let ((wait-delay (+ attemptnum (* attemptnum 10))))
(debug:print 0 *default-log-port* "WARNING: server is overloaded. Delaying " wait-delay " seconds and trying call again.")
(mutex-lock! *rmt-mutex*)
(http-transport:close-connections area-dat: runremote)
(http-transport:close-connections runremote)
(set! *runremote* #f) ;; force starting over
(mutex-unlock! *rmt-mutex*)
(thread-sleep! wait-delay)
(rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1)))
res)) ;; All good, return res
#;(set-functions rmt:send-receive remote-server-url-set!
http-transport:close-connections remote-conndat-set!
debug:print debug:print-info
remote-ro-mode remote-ro-mode-set!
remote-ro-mode-checked-set! remote-ro-mode-checked)
|
︙ | | |
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
-
-
-
-
-
-
-
-
-
-
-
|
;;======================================================================
;; S E R V E R
;;======================================================================
;; Call this to start the actual server
;;
;; all routes though here end in exit ...
;;
;; start_server
;;
(define (server:launch run-id transport-type)
(case transport-type
((http)(http-transport:launch))
;;((nmsg)(nmsg-transport:launch run-id))
;;((rpc) (rpc-transport:launch run-id))
(else (debug:print-error 0 *default-log-port* "unknown server type " transport-type))))
;;======================================================================
;; S E R V E R U T I L I T I E S
;;======================================================================
;; Get the transport
(define (server:get-transport)
(if *transport-type*
|
︙ | | |
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
|
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
-
-
-
-
-
-
-
-
+
|
;; Given an area path, start a server process ### NOTE ### > file 2>&1
;; if the target-host is set
;; try running on that host
;; incidental: rotate logs in logs/ dir.
;;
(define (server:run areapath) ;; areapath is *toppath* for a given testsuite area
(let* (;; (curr-host (get-host-name))
;; (attempt-in-progress (server:start-attempted? areapath))
;; (dot-server-url (server:check-if-running areapath))
;; (curr-ip (server:get-best-guess-address curr-host))
;; (curr-pid (current-process-id))
;; (homehost (server:get-homehost)) ;; configf:lookup *configdat* "server" "homehost" ))
;; (target-host (car homehost))
(testsuite (common:get-testsuite-name))
(let* ((testsuite (common:get-testsuite-name))
(logfile (conc areapath "/logs/server.log")) ;; -" curr-pid "-" target-host ".log"))
(profile-mode (or (configf:lookup *configdat* "misc" "profilesw")
""))
(cmdln (conc (common:get-megatest-exe)
" -server - ";; (or target-host "-")
(if (equal? (configf:lookup *configdat* "server" "daemonize") "yes")
" -daemonize "
|
︙ | | |
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
|
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
|
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
|
(define (server:logf-get-start-info logf)
(let ((server-rx (regexp "^SERVER STARTED: (\\S+):(\\d+) AT ([\\d\\.]+) server-id: (\\S+) pid: (\\d+)")) ;; SERVER STARTED: host:port AT timesecs server id
(dbprep-rx (regexp "^SERVER: dbprep"))
(dbprep-found 0)
(bad-dat (list #f #f #f #f #f)))
(handle-exceptions
exn
(begin
(debug:print-info 0 *default-log-port* "Unable to get server info from " logf ", exn=" exn)
bad-dat) ;; no idea what went wrong, call it a bad server
(with-input-from-file
logf
(lambda ()
(let loop ((inl (read-line))
(lnum 0))
(if (not (eof-object? inl))
(let ((mlst (string-match server-rx inl))
(dbprep (string-match dbprep-rx inl)))
(if dbprep (set! dbprep-found 1))
(if (not mlst)
(if (< lnum 500) ;; give up if more than 500 lines of server log read
(loop (read-line)(+ lnum 1))
(begin
exn
(begin
;; WARNING: this is potentially dangerous to blanket ignore the errors
(if (file-exists? logf)
(debug:print-info 2 *default-log-port* "Unable to get server info from "logf", exn=" exn))
bad-dat) ;; no idea what went wrong, call it a bad server
(with-input-from-file
logf
(lambda ()
(let loop ((inl (read-line))
(lnum 0))
(if (not (eof-object? inl))
(let ((mlst (string-match server-rx inl))
(dbprep (string-match dbprep-rx inl)))
(if dbprep (set! dbprep-found 1))
(if (not mlst)
(if (< lnum 500) ;; give up if more than 500 lines of server log read
(loop (read-line)(+ lnum 1))
(begin
(debug:print-info 0 *default-log-port* "Unable to get server info from first 500 lines of " logf )
bad-dat))
(match mlst
((_ host port start server-id pid)
(list host
(string->number port)
(string->number start)
server-id
(string->number pid)))
(else
(debug:print 0 *current-log-port* "ERROR: did not recognise SERVER line info "mlst)
bad-dat))))
(begin
(if dbprep-found
(begin
(debug:print-info 2 *default-log-port* "Server is in dbprep at " (common:human-time))
(thread-sleep! 0.5)) ;; was 25 sec but that blocked things from starting?
(debug:print-info 0 *default-log-port* "Unable to get server info from " logf " at " (seconds->time-string (current-seconds))))
bad-dat))))))))
(match mlst
((_ host port start server-id pid)
(list host
(string->number port)
(string->number start)
server-id
(string->number pid)))
(else
(debug:print 0 *current-log-port* "ERROR: did not recognise SERVER line info "mlst)
bad-dat))))
(begin
(if dbprep-found
(begin
(debug:print-info 2 *default-log-port* "Server is in dbprep at " (common:human-time))
(thread-sleep! 0.5)) ;; was 25 sec but that blocked things from starting?
(debug:print-info 0 *default-log-port* "Unable to get server info from " logf " at " (seconds->time-string (current-seconds))))
bad-dat))))))))
;; ;; get a list of servers from the log files, with all relevant data
;; ;; ( mod-time host port start-time pid )
;; ;;
;; (define (server:get-list areapath #!key (limit #f))
;; (let ((fname-rx (regexp "^(|.*/)server-(\\d+)-(\\S+).log$"))
;; (day-seconds (* 24 60 60)))
|
︙ | | |
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
|
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
|
+
-
+
-
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
+
+
-
-
-
-
+
+
+
+
+
+
-
+
-
-
-
+
+
+
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
|
(thread-sleep! ( + 1 idletime))
(server:wait-for-server-start-last-flag areapath)))))))
;; oldest server alive determines host then choose random of youngest
;; five servers on that host
;;
(define (server:get-servers-info areapath)
;; (assert *toppath* "FATAL: server:get-servers-info called before *toppath* has been set.")
(let* ((servinfodir (conc *toppath*"/.servinfo")))
(let* ((servinfodir (server:get-servinfo-dir areapath))) ;; (conc *toppath*"/.servinfo")))
(if (not (file-exists? servinfodir))
(create-directory servinfodir))
(let* ((allfiles (glob (conc servinfodir"/*")))
(res (make-hash-table)))
(for-each
(lambda (f)
(let* ((hostport (pathname-strip-directory f))
(serverdat (server:logf-get-start-info f)))
(match serverdat
((host port start server-id pid)
(if (and host port start server-id pid)
(hash-table-set! res hostport serverdat)
(debug:print-info 0 *default-log-port* "bad server info for "f": "serverdat)))
(debug:print-info 2 *default-log-port* "bad server info for "f": "serverdat)))
(else
(debug:print-info 0 *default-log-port* "bad server info for "f": "serverdat)))))
(debug:print-info 2 *default-log-port* "bad server info for "f": "serverdat)))))
allfiles)
res)))
;; check the .servinfo directory, are there other servers running on this
;; or another host?
;;
;; returns #t => ok to start another server
;; #f => not ok to start another server
;;
(define (server:minimal-check areapath)
(server:clean-up-old areapath)
(let* ((srvdir (server:get-servinfo-dir areapath)) ;; (conc areapath"/.servinfo"))
(servrs (glob (conc srvdir"/*")))
(thishostip (server:get-best-guess-address (get-host-name)))
(thisservrs (glob (conc srvdir"/"thishostip":*")))
(homehostinf (server:choose-server areapath 'homehost))
(havehome (car homehostinf))
(wearehome (cdr homehostinf)))
(debug:print-info 0 *default-log-port* thishostip", have homehost: "havehome", we are homehost: "wearehome
", numservers: "(length thisservrs))
(cond
((not havehome) #t) ;; no homehost yet, go for it
((and havehome wearehome (< (length thisservrs) 20)) #t) ;; we are home and less than 20 servers, ok to start another
((and havehome (not wearehome)) #f) ;; we are not the home host
((and havehome wearehome (>= (length thisservrs) 20)) #f) ;; have enough running
(else
(debug:print 0 *default-log-port* "WARNING: Unrecognised scenario, servrs="servrs", thishostip="thishostip", thisservrs="thisservrs)
#t))))
(define server-last-start 0)
;; oldest server alive determines host then choose random of youngest
;; five servers on that host
;;
;; mode:
;; best - get best server (random of newest five)
;; home - get home host based on oldest server
;; info - print info
(define (server:choose-server areapath #!optional (mode 'best))
;; age is current-starttime
;; find oldest alive
;; 1. sort by age ascending and ping until good
;; find alive rand from youngest
;; 1. sort by age descending
;; 2. take five
;; 3. check alive, discard if not and repeat
;; first we clean up old server files
(server:clean-up-old areapath)
(let* ((since-last (- (current-seconds) server-last-start))
(server-start-delay 10))
(if ( < (- (current-seconds) server-last-start) 10 )
(begin
(debug:print 2 *default-log-port* "server:choose-server: seconds since last server start: " (- (current-seconds) server-last-start))
(debug:print 2 *default-log-port* "server:choose-server: last server start less than " server-start-delay " seconds ago. Sleeping " server-start-delay " seconds")
(thread-sleep! server-start-delay)
)
(debug:print 2 *default-log-port* "server:choose-server: seconds since last server start: " (- (current-seconds) server-last-start))
)
)
(let* ((serversdat (server:get-servers-info areapath))
(servkeys (hash-table-keys serversdat))
(by-time-asc (if (not (null? servkeys))
(by-time-asc (if (not (null? servkeys)) ;; NOTE: Oldest is last
(sort servkeys ;; list of "host:port"
(lambda (a b)
(>= (list-ref (hash-table-ref serversdat a) 2)
(list-ref (hash-table-ref serversdat b) 2))))
'())))
(debug:print 2 *default-log-port* "server:choose-server: serversdat: " serversdat)
(debug:print 2 *default-log-port* "server:choose-server: servkeys: " servkeys)
(if (not (null? by-time-asc))
(let* ((oldest (last by-time-asc))
(oldest-dat (hash-table-ref serversdat oldest))
(host (list-ref oldest-dat 0))
(all-valid (filter (lambda (x)
(equal? host (list-ref (hash-table-ref serversdat x) 0)))
by-time-asc))
(best-five (lambda ()
(if (> (length all-valid) 5)
(take all-valid 5)
all-valid)))
(best-ten (lambda ()
(if (> (length all-valid) 11)
(take (drop-right all-valid 1) 10) ;; remove the oldest from consideration so it can age out
(if (> (length all-valid) 8)
(drop-right all-valid 1)
all-valid))))
(names->dats (lambda (names)
(map (lambda (x)
(hash-table-ref serversdat x))
names)))
(am-home? (lambda ()
(let* ((currhost (get-host-name))
(bestadrs (server:get-best-guess-address currhost)))
(or (equal? host currhost)
(equal? host bestadrs))))))
(case mode
((info)
(print "oldest: "oldest-dat", selected host: "host", all-valid: "all-valid)
(print "youngest: "(hash-table-ref serversdat (car all-valid))))
((home) host)
((homehost) (cons host (am-home?))) ;; shut up old code
((home?) (am-home?))
((best-five)(names->dats (best-five)))
((best-ten)(names->dats (best-ten)))
((all-valid)(names->dats all-valid))
((best) (let* ((best-five (best-five))
(len (length best-five)))
(hash-table-ref serversdat (list-ref best-five (random len)))))
((best) (let* ((best-ten (best-ten))
(len (length best-ten)))
(hash-table-ref serversdat (list-ref best-ten (random len)))))
((count)(length all-valid))
(else
(debug:print 0 *default-log-port* "ERROR: invalid command "mode)
#f)))
(begin
(server:run areapath)
(set! server-last-start (current-seconds))
(thread-sleep! 3)
;; (thread-sleep! 3)
(case mode
((homehost) (cons #f #f))
(else #f))))))
(define (server:get-servinfo-dir areapath)
(let* ((spath (conc areapath"/.servinfo")))
(if (not (file-exists? spath))
(create-directory spath #t))
spath))
(define (server:clean-up-old areapath)
;; any server file that has not been touched in ten minutes is effectively dead
(let* ((sfiles (glob (conc (server:get-servinfo-dir areapath)"/*"))))
(for-each
(lambda (sfile)
(let* ((modtime (handle-exceptions
exn
(begin
(debug:print 0 *default-log-port* "WARNING: failed to get modification file for "sfile)
(current-seconds))
(file-modification-time sfile))))
(if (and (number? modtime)
(> (- (current-seconds) modtime)
600))
(begin
(debug:print 0 *default-log-port* "WARNING: found old server info file "sfile", removing it.")
(handle-exceptions
exn
(debug:print 0 *default-log-port* "WARNING: failed to delete old server info file "sfile)
(delete-file sfile))))))
sfiles)))
;; would like to eventually get rid of this
;;
(define (common:on-homehost?)
(server:choose-server *toppath* 'home?))
;; kind start up of server, wait before allowing another server for a given
;; area to be launched
;;
(define (server:kind-run areapath)
;; look for $MT_RUN_AREA_HOME/logs/server-start-last
;; and wait for it to be at least <server idletime> seconds old
;; (server:wait-for-server-start-last-flag areapath)
(let loop ()
(if (> (alist-ref 'adj-proc-load (common:get-normalized-cpu-load #f)) 2)
(begin
(if (common:low-noise-print 30 "our-host-load")
(debug:print 0 *default-log-port* "WARNING: system load is high, waiting to start server."))
(loop))))
(if (< (server:choose-server areapath 'count) 10)
(if (< (server:choose-server areapath 'count) 20)
(server:run areapath))
#;(if (not (server:check-if-running areapath)) ;; why try if there is already a server running?
(let* ((lock-file (conc areapath "/logs/server-start.lock")))
(let* ((start-flag (conc areapath "/logs/server-start-last")))
(common:simple-file-lock-and-wait lock-file expire-time: 25)
(debug:print-info 2 *default-log-port* "server:kind-run: touching " start-flag)
(system (conc "touch " start-flag)) ;; lazy but safe
|
︙ | | |
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
|
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
|
+
-
+
-
+
|
(define (server:start-and-wait areapath #!key (timeout 60))
(let ((give-up-time (+ (current-seconds) timeout)))
(let loop ((server-info (server:check-if-running areapath))
(try-num 0))
(if (or server-info
(> (current-seconds) give-up-time)) ;; server-url will be #f if no server available.
(server:record->url server-info)
(let* ( (servers (server:choose-server areapath 'all-valid))
(let ((num-ok (length (server:choose-server areapath 'all-valid))))
(num-ok (if servers (length (server:choose-server areapath 'all-valid)) 0)))
(if (and (> try-num 0) ;; first time through simply wait a little while then try again
(< num-ok 1)) ;; if there are no decent candidates for servers then try starting a new one
(server:run areapath))
(thread-sleep! 5)
(loop (server:check-if-running areapath)
(+ try-num 1)))))))
(define (server:get-num-servers #!key (numservers 2))
(let ((ns (string->number
(or (configf:lookup *configdat* "server" "numservers") "notanumber"))))
(or ns numservers)))
;; no longer care if multiple servers are started by accident. older servers will drop off in time.
;;
(define (server:check-if-running areapath) ;; #!key (numservers "2"))
(let* ((ns (server:get-num-servers)) ;; get the setting the for maximum number of servers allowed
(servers (server:choose-server areapath 'best-five))) ;; (server:get-best (server:get-list areapath))))
(servers (server:choose-server areapath 'best-ten))) ;; (server:get-best (server:get-list areapath))))
(if (or (and servers
(null? servers))
(not servers))
;; (and (list? servers)
;; (< (length servers) (+ 1 (random ns))))) ;; somewhere between 1 and numservers
#f
(let loop ((hed (car servers))
|
︙ | | |
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
|
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
|
-
+
|
;; Default is 60 seconds.
;;
(define (server:expiration-timeout)
(let ((tmo (configf:lookup *configdat* "server" "timeout")))
(if (and (string? tmo)
(common:hms-string->seconds tmo)) ;; BUG: hms-string->seconds is broken, if given "10" returns 0. Also, it doesn't belong in this logic unless the string->number is changed below
(* 3600 (string->number tmo))
60)))
600)))
(define (server:get-best-guess-address hostname)
(let ((res #f))
(for-each
(lambda (adr)
(if (not (eq? (u8vector-ref adr 0) 127))
(set! res adr)))
|
︙ | | |