︙ | | |
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
+
+
+
+
+
+
+
-
+
-
-
+
+
+
+
-
-
+
+
-
-
+
-
-
-
-
-
-
+
+
-
-
+
+
+
+
+
|
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
;; GNU General Public License for more details.
;;
;; You should have received a copy of the GNU General Public License
;; along with Megatest. If not, see <http://www.gnu.org/licenses/>.
;;======================================================================
(declare (unit common))
(declare (uses commonmod))
(declare (uses rmtmod))
(declare (uses debugprint))
(declare (uses mtargs))
(use srfi-1 data-structures posix regex-case (prefix base64 base64:)
format dot-locking csv-xml z3 udp ;; sql-de-lite
hostinfo md5 message-digest typed-records directory-utils stack
matchable regex posix (srfi 18) extras ;; tcp
(prefix nanomsg nmsg:)
(prefix sqlite3 sqlite3:)
pkts (prefix dbi dbi:)
)
(use posix-extras pathname-expand files)
(declare (unit common))
(declare (uses commonmod))
(import commonmod)
(import commonmod
debugprint
rmtmod
(prefix mtargs args:))
(include "common_records.scm")
;; (require-library margs)
(define (remove-files filespec)
(let ((files (glob filespec)))
;; (include "margs.scm")
(for-each delete-file files)))
;; (define old-exit exit)
;;
;; (define (exit . code)
;; (if (null? code)
;; (old-exit)
;; (old-exit code)))
(define (stop-the-train)
(thread-start! (make-thread (lambda ()
(let loop ()
(if (and *toppath*
(file-exists? (conc *toppath*"/stop-the-train")))
(let* ((msg (conc "ERROR: found file "*toppath*"/stop-the-train, exiting immediately")))
;; yes, print to current-output-port AND *default-log-port*, annoying but necessary I think
(begin
(debug:print 0 *default-log-port* "ERROR: found file "*toppath*"/stop-the-train, exiting immediately")
(print msg)
(debug:print 0 *default-log-port* msg)
(remove-files (conc *toppath* "/logs/server*"))
(remove-files (conc *toppath* "/.servinfo/*"))
(remove-files (conc *toppath* "/.mtdb/*lock"))
(exit 1)))
(thread-sleep! 5)
(loop))))))
;; execute thunk, return value. If exception thrown, trap exception, return #f, and emit nonfatal condition note to *default-log-port* .
;; arguments - thunk, message
(define (common:fail-safe thunk warning-message-on-exception)
|
︙ | | |
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
|
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
|
-
+
-
-
-
|
;; (define *db-last-sync* 0) ;; last time the sync to megatest.db happened
(define *db-sync-in-progress* #f) ;; if there is a sync in progress do not try to start another
;; (define *db-multi-sync-mutex* (make-mutex)) ;; protect access to *db-sync-in-progress*, *db-last-sync*
;; task db
(define *task-db* #f) ;; (vector db path-to-db)
(define *db-access-allowed* #t) ;; flag to allow access
;; (define *db-access-mutex* (make-mutex)) ;; moved to dbfile
(define *db-transaction-mutex* (make-mutex))
;; (define *db-transaction-mutex* (make-mutex))
(define *db-cache-path* #f)
;; (define *db-with-db-mutex* (make-mutex))
(define *db-api-call-time* (make-hash-table)) ;; hash of command => (list of times)
;; no sync db
;; (define *no-sync-db* #f) ;; moved to dbfile
;; SERVER
(define *my-client-signature* #f)
(define *transport-type* 'http) ;; override with [server] transport http|rpc|nmsg
(define *runremote* #f) ;; if set up for server communication this will hold <host port>
;; (define *max-cache-size* 0)
(define *logged-in-clients* (make-hash-table))
(define *server-id* #f)
(define *server-info* #f) ;; good candidate for easily convert to non-global
(define *time-to-exit* #f)
|
︙ | | |
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
|
212
213
214
215
216
217
218
219
220
221
222
223
224
225
|
-
-
|
(define *launch-setup-mutex* (make-mutex)) ;; need to be able to call launch:setup often so mutex it and re-call the real deal only if *toppath* not set
(define *homehost-mutex* (make-mutex))
;; Miscellaneous
(define *triggers-mutex* (make-mutex)) ;; block overlapping processing of triggers
(define *numcpus-cache* (make-hash-table))
(use posix-extras pathname-expand files)
;; this plugs a hole in posix-extras in recent chicken versions > 4.9)
(let-values (( (chicken-release-number chicken-major-version)
(apply values
(map string->number
(take
(string-split (chicken-version) ".")
2)))))
|
︙ | | |
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
|
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
|
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
|
(define *common:this-exe-fullpath* (common:get-this-exe-fullpath))
(define *common:this-exe-dir* (pathname-directory *common:this-exe-fullpath*))
(define *common:this-exe-name* (pathname-strip-directory *common:this-exe-fullpath*))
(define (common:get-sync-lock-filepath)
(let* ((tmp-area (common:get-db-tmp-area))
(lockfile (conc tmp-area "/megatest.db.sync-lock")))
(lockfile (conc tmp-area "/megatest.db.lock")))
lockfile))
;;======================================================================
;; when called from a wrapper I need sometimes to find the calling
;; wrapper, this is for dashboard to find the correct megatest.
;;
(define (common:find-local-megatest #!optional (progname "megatest"))
(let ((res (filter file-exists?
(map (lambda (updir)
(let* ((lm (car (argv)))
(dir (pathname-directory lm))
(exe (pathname-strip-directory lm)))
(conc (if dir (conc dir "/") "")
(case (string->symbol exe)
((dboard) (conc updir progname))
((mtest) (conc updir progname))
((dashboard) progname)
(else exe)))))
'("../../" "../")))))
(if (null? res)
(begin
(debug:print 0 *default-log-port* "Failed to find this executable! Using what can be found on the path")
progname)
(car res))))
(define *common:logpro-exit-code->status-sym-alist*
'( ( 0 . pass )
( 1 . fail )
( 2 . warn )
( 3 . check )
( 4 . waived )
( 5 . abort )
|
︙ | | |
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
|
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
|
+
+
+
+
+
+
-
+
-
-
+
+
+
-
-
+
+
+
+
+
+
+
+
|
((abort) "ABORT")
((skip) "SKIP")
(else "FAIL")))
(define (common:logpro-exit-code->test-status exit-code)
(status-sym->string (common:logpro-exit-code->status-sym exit-code)))
;;
(defstruct remote
;; transport to be used
;; http - use http-transport
;; http-read-cached - use http-transport for writes but in-mem cached for reads
(rmode 'http)
(hh-dat (let ((res (or (server:choose-server *toppath* 'homehost)
(cons #f #f))))
(assert (pair? res)(conc "FATAL: hh-dat should be a pair, got "res))
res))
(server-url #f) ;; (server:check-if-running *toppath*) #f))
(server-id #f)
(server-info (if *toppath* (server:check-if-running *toppath*) #f))
(server-info #f) ;; (if *toppath* (server:check-if-running *toppath*) #f))
(last-server-check 0) ;; last time we checked to see if the server was alive
(connect-time (current-seconds))
(conndat #f)
(connect-time (current-seconds)) ;; when we first connected
(last-access (current-seconds)) ;; last time we talked to server
;; (conndat #f) ;; iface port api-uri api-url api-req seconds server-id
(transport *transport-type*)
(server-timeout (server:expiration-timeout))
(force-server #f)
(ro-mode #f)
(ro-mode-checked #f)) ;; flag that indicates we have checked for ro-mode
(ro-mode-checked #f) ;; flag that indicates we have checked for ro-mode
;; conndat stuff
(iface #f) ;; TODO: Consolidate this data with server-url and server-info above
(port #f)
(api-url #f)
(api-uri #f)
(api-req #f))
;; launching and hosts
(defstruct host
(reachable #f)
(last-update 0)
(last-used 0)
(last-cpuload 1))
|
︙ | | |
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
|
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
|
+
+
+
+
-
-
+
+
+
+
+
+
+
+
+
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
|
(define (common:version-db-delta)
(- megatest-version (common:get-last-run-version-number)))
(define (common:version-changed?)
(not (equal? (common:get-last-run-version)
(common:version-signature))))
;; From 1.70 to 1.80, db's are compatible.
(define (common:api-changed?)
(let* (
(not (equal? (substring (->string megatest-version) 0 4)
(substring (conc (common:get-last-run-version)) 0 4))))
(megatest-major-version (substring (->string megatest-version) 0 4))
(run-major-version (substring (conc (common:get-last-run-version)) 0 4))
)
(and (not (equal? megatest-major-version "1.80"))
(not (equal? megatest-major-version megatest-run-version)))
)
)
;;======================================================================
;; Move me elsewhere ...
;; RADT => Why do we meed the version check here, this is called only if version misma
;;
(define (common:cleanup-db dbstruct #!key (full #f))
(case (rmt:transport-mode)
((http)
(apply db:multi-db-sync
dbstruct
'schema
'killservers
'adj-target
'new2old
'(dejunk)
)
(apply db:multi-db-sync
dbstruct
'schema
'killservers
'adj-target
'new2old
'(dejunk)
))
((tcp nfs)
(debug:print 0 *default-log-port* "WARNING: cleanup-db NOT implemented yet for tcp and nfs.")
#;(apply db:multi-db-sync
dbstruct
'schema
'killservers
'adj-target
'new2old
'(dejunk)
)))
(if (common:api-changed?)
(common:set-last-run-version)))
(define (common:snapshot-file filepath #!key (subdir ".") )
(if (file-exists? filepath)
(let* ((age-sec (lambda (file)
(if (file-exists? file)
|
︙ | | |
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
|
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
|
-
+
|
;; logs directory you wish to log-rotate.
;;
(define (common:rotate-logs)
(let* ((all-files (make-hash-table))
(stats (make-hash-table))
(inc-stat (lambda (key)
(hash-table-set! stats key (+ (hash-table-ref/default stats key 0) 1))))
(max-allowed (string->number (or (configf:lookup *configdat* "setup" "max-logfiles") "300")))) ;; name -> age
(max-allowed (string->number (or (configf:lookup *configdat* "setup" "max-logfiles") "600")))) ;; name -> age
(if (not (directory-exists? "logs"))(create-directory "logs"))
(directory-fold
(lambda (file rem)
(handle-exceptions
exn
(begin
(debug:print-info 2 *default-log-port* "unable to rotate log " file ", probably handled by another process, this is safe to ignore. exn=" exn)
|
︙ | | |
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
|
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
|
+
-
+
-
+
|
(debug:print-info 0 *default-log-port* "Deleted " (length files) " files from logs, keeping " max-allowed " files."))))))
;;======================================================================
;; Force a megatest cleanup-db if version is changed and skip-version-check not specified
;; Do NOT check if not on homehost!
;;
(define (common:exit-on-version-changed)
(if (and *toppath* ;; do nothing if *toppath* not yet provided
(if (common:on-homehost?)
(common:on-homehost?))
(if (common:api-changed?)
(let* ((mtconf (conc (get-environment-variable "MT_RUN_AREA_HOME") "/megatest.config"))
(dbfile (conc (get-environment-variable "MT_RUN_AREA_HOME") "/megatest.db"))
(dbfile (conc (get-environment-variable "MT_RUN_AREA_HOME") ".mtdb/main.db"))
(read-only (not (file-write-access? dbfile)))
(dbstruct (db:setup #t))) ;; (db:setup-db *dbstruct-dbs* *toppath* #f))) ;; #t)))
(debug:print 0 *default-log-port*
"WARNING: Version mismatch!\n"
" expected: " (common:version-signature) "\n"
" got: " (common:get-last-run-version))
(cond
|
︙ | | |
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
|
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
|
-
+
-
+
|
(print-call-chain (current-error-port))
(exit 1))
(common:cleanup-db dbstruct)))
((not (common:file-exists? mtconf))
(debug:print 0 *default-log-port* " megatest.config does not exist in this area. Cannot proceed with megatest version migration.")
(exit 1))
((not (common:file-exists? dbfile))
(debug:print 0 *default-log-port* " megatest.db does not exist in this area. Cannot proceed with megatest version migration.")
(debug:print 0 *default-log-port* " .mtdb/main.db does not exist in this area. Cannot proceed with megatest version migration.")
(exit 1))
((not (eq? (current-user-id)(file-owner mtconf)))
(debug:print 0 *default-log-port* " You do not own megatest.db in this area. Cannot proceed with megatest version migration.")
(debug:print 0 *default-log-port* " You do not own .mtdb/main.db in this area. Cannot proceed with megatest version migration.")
(exit 1))
(read-only
(debug:print 0 *default-log-port* " You have read-only access to this area. Cannot proceed with megatest version migration.")
(exit 1))
(else
(debug:print 0 *default-log-port* " to switch versions you can run: \"megatest -cleanup-db\"")
(exit 1)))))))
|
︙ | | |
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
|
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
|
+
+
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
-
-
-
|
(if dat
dat
""))))
(define (common:alist-ref/default key alist default)
(or (alist-ref key alist) default))
;; moved into commonmod
;;
(define (common:low-noise-print waitval . keys)
(let* ((key (string-intersperse (map conc keys) "-" ))
(lasttime (hash-table-ref/default *common:denoise* key 0))
(currtime (current-seconds)))
(if (> (- currtime lasttime) waitval)
(begin
(hash-table-set! *common:denoise* key currtime)
#t)
#f)))
;; (define (common:low-noise-print waitval . keys)
;; (let* ((key (string-intersperse (map conc keys) "-" ))
;; (lasttime (hash-table-ref/default *common:denoise* key 0))
;; (currtime (current-seconds)))
;; (if (> (- currtime lasttime) waitval)
;; (begin
;; (hash-table-set! *common:denoise* key currtime)
;; #t)
;; #f)))
(define (common:get-megatest-exe)
(or (getenv "MT_MEGATEST") "megatest"))
(define (common:read-encoded-string instr)
(handle-exceptions
exn
(handle-exceptions
exn
(begin
|
︙ | | |
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
|
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
|
-
-
-
-
-
-
-
-
-
-
-
+
-
+
-
+
-
+
-
+
-
-
+
+
-
-
+
+
-
-
-
+
-
-
+
+
-
-
-
|
(if (equal? thepath "/")
(begin
(debug:print-error 0 *default-log-port* "Unable to find megatest home directory.")
#f)
(loop (pathname-directory thepath)))))
))
(define (common:db-tmp-area-path)
(conc "/tmp/"
(current-user-name)
"/megatest_localdb/"
(common:get-testsuite-name)
"/"
(string-translate *toppath* "/" ".")
)
)
;;======================================================================
;; redefine for future cleanup (converge on area-name, the more generic
;;
(define common:get-area-name common:get-testsuite-name)
(define (common:get-db-tmp-area . junk)
(if *db-cache-path*
*db-cache-path*
(if *toppath* ;; common:get-create-writeable-dir
(handle-exceptions
exn
(begin
(debug:print-error 0 *default-log-port* "Couldn't create path to " *db-cache-path* ", exn=" exn)
(exit 1))
(let* ((toppath (common:real-path *toppath*))
(let* ((tsname (common:get-testsuite-name))
(tsname (common:get-testsuite-name))
(dbpath (common:get-create-writeable-dir
(list (conc "/tmp/" (current-user-name)
"/megatest_localdb/"
tsname "/"
(string-translate *toppath* "/" "."))
(string-translate toppath "/" "."))
(conc "/tmp/" (current-process-id) ;; just in case we have an issue with the dir by own user name
"/megatest_localdb/"
"/"(current-user-name) "/megatest_localdb/"
tsname
(string-translate *toppath* "/" "."))
(string-translate toppath "/" "."))
))))
(set! *db-cache-path* dbpath)
;; ensure megatest area has .megatest
(let ((dbarea (conc *toppath* "/.megatest")))
;; ensure megatest area has .mtdb
(let ((dbarea (conc *toppath* "/.mtdb")))
(if (not (file-exists? dbarea))
(create-directory dbarea)))
;; ensure tmp area has .megatest
(let ((dbarea (conc dbpath "/.megatest")))
;; ensure tmp area has .mtdb
(let ((dbarea (conc dbpath "/.mtdb")))
(if (not (file-exists? dbarea))
(create-directory dbarea)))
dbpath))
#f)))
(define (common:get-area-path-signature)
(message-digest-string (md5-primitive) *toppath*))
(define (common:get-signature str)
(message-digest-string (md5-primitive) str))
;;======================================================================
;; E X I T H A N D L I N G
;;======================================================================
(define (common:run-sync?)
(and *toppath* ;; gate if called before *toppath* is set
(and (common:on-homehost?)
(args:get-arg "-server")))
(common:on-homehost?)
(args:get-arg "-server")))
(define (common:human-time)
(time->string (seconds->local-time (current-seconds)) "%Y-%m-%d %H:%M:%S"))
(define (std-signal-handler signum)
;; (signal-mask! signum)
(set! *time-to-exit* #t)
;;(debug:print-info 13 *default-log-port* "got signal "signum)
(debug:print-error 0 *default-log-port* "Received signal " signum " aaa exiting promptly")
|
︙ | | |
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
|
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
|
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
|
;;======================================================================
;; '(print (string-intersperse (map cadr (hash-table-ref/default (read-config "megatest.config" \#f \#t) "disks" '"'"'("none" ""))) "\n"))'
(define (common:get-disks #!key (configf #f))
(hash-table-ref/default
(or configf (read-config "megatest.config" #f #t))
"disks" '("none" "")))
;;======================================================================
;; return first command that exists, else #f
;;
(define (common:which cmds)
(if (null? cmds)
#f
(let loop ((hed (car cmds))
(tal (cdr cmds)))
(let ((res (with-input-from-pipe (conc "which " hed) read-line)))
(if (and (string? res)
(common:file-exists? res))
res
(if (null? tal)
#f
(loop (car tal)(cdr tal))))))))
(define (common:get-install-area)
(let ((exe-path (car (argv))))
(if (common:file-exists? exe-path)
(handle-exceptions
exn
#f
(pathname-directory
|
︙ | | |
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
|
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
|
-
+
|
#t
#f))
(else
(debug:print 0 *default-log-port* "ERROR: Bad server force setting " force-setting ", forcing server.")
#t)))) ;; default to requiring server
(if force-result
(begin
(debug:print-info 0 *default-log-port* "forcing use of server, force setting is \"" force-setting "\".")
(debug:print-info 0 *default-log-port* "ATTENTION! Forcing use of server, force setting is \"" force-setting "\".")
#t)
#f)))
;;======================================================================
;; M I S C L I S T S
;;======================================================================
|
︙ | | |
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
|
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
|
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
|
(begin
(debug:print-error 0 *default-log-port* "command \"/bin/readlink -f " path "\" failed. exn=" exn)
path) ;; just give up
(with-input-from-pipe
(conc "/bin/readlink -f " path)
(lambda ()
(read-line)))))
;; for reasons I don't understand multiple calls to real-path in parallel threads
;; must be protected by mutexes
;;
(define (common:real-path inpath)
;; (process:cmd-run-with-stderr->list "readlink" "-f" inpath)) ;; cmd . params)
;; (let-values
;; (((inp oup pid) (process "readlink" (list "-f" inpath))))
;; (with-input-from-port inp
;; (let loop ((inl (read-line))
;; (res #f))
;; (print "inl=" inl)
;; (if (eof-object? inl)
;; (begin
;; (close-input-port inp)
;; (close-output-port oup)
;; ;; (process-wait pid)
;; res)
;; (loop (read-line) inl))))))
(with-input-from-pipe (conc "readlink -f " inpath) read-line))
;;======================================================================
;; returns *effective load* (not normalized)
;;
(define (common:get-intercept onemin fivemin)
(if (< onemin fivemin) ;; load is decreasing, just use the onemin load
onemin
|
︙ | | |
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
|
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
|
-
-
-
-
-
-
-
-
-
-
|
exn
(begin
(debug:print 0 *default-log-port* "failed to write file " fullpath ", exn=" exn)
#f)
(with-output-to-file fullpath (lambda ()(pp dat)))))
#f))
(define (common:raw-get-remote-host-load-orig remote-host)
(handle-exceptions
exn
(begin
(debug:print 0 *default-log-port* "failed to ssh to " remote-host " and get loadavg. exn=" exn)
#f) ;; more specific handling of errors needed
(with-input-from-pipe
(conc "ssh " remote-host " cat /proc/loadavg")
(lambda ()(list (read)(read)(read))))))
(define (common:raw-get-remote-host-load remote-host)
(let* ((inp #f))
(handle-exceptions
exn
(begin
(close-input-pipe inp)
(debug:print 0 *default-log-port* "failed to ssh to " remote-host " and get loadavg. exn=" exn)
|
︙ | | |
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
|
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
|
-
+
+
|
(handle-exceptions
exn
(begin
(debug:print 0 *default-log-port* "failed to ssh or read loadavg from host " remote-host ", exn=" exn)
'(-99 -99 -99))
(let* ((actual-hostname (or remote-host (get-host-name) "localhost")))
(or (common:get-cached-info actual-hostname "cpu-load")
(let ((result (if remote-host
(let ((result (if (and remote-host
(not (equal? remote-host (get-host-name))))
(map (lambda (res)
(if (eof-object? res) 9e99 res))
(common:raw-get-remote-host-load remote-host))
(with-input-from-file "/proc/loadavg"
(lambda ()(list (read)(read)(read)))))))
(match
result
|
︙ | | |
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
|
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
|
+
+
+
+
+
+
-
-
-
-
-
-
-
+
-
-
-
|
(cond
((and (list? res)
(> (length res) 2))
res)
((eq? res #f) default) ;; add messages?
((eq? res #f) default) ;; this would be the #eof
(else default))))
(define (common:ssh-get-loadavg remote-host)
(let ((inp (open-input-pipe (conc "ssh " remote-host " \"cat /proc/loadavg;cat /proc/cpuinfo;echo end\""))))
(let* ((res (read-lines inp)))
(close-input-pipe inp)
res)))
(define (common:get-normalized-cpu-load-raw remote-host)
(let* ((actual-host (or remote-host (get-host-name)))) ;; #f is localhost
(or (common:get-cached-info actual-host "normalized-load")
(let ((data (if remote-host
(let ((inp #f))
(handle-exceptions
exn
(begin
(close-input-port inp)
'())
(set! inp (open-input-port (conc "ssh " remote-host " \"cat /proc/loadavg;cat /proc/cpuinfo;echo end\"")))
(common:ssh-get-loadavg remote-host)
(let* ((res (read-lines inp)))
(close-input-port inp)
res)))
(append
(with-input-from-file "/proc/loadavg"
read-lines)
(with-input-from-file "/proc/cpuinfo"
read-lines)
(list "end"))))
(load-rx (regexp "^([\\d\\.]+)\\s+([\\d\\.]+)\\s+([\\d\\.]+)\\s+.*$"))
|
︙ | | |
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
|
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
|
+
+
+
+
+
+
+
+
+
-
-
-
-
-
-
+
+
+
+
+
+
+
+
-
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
|
(begin ;; found a host, return it
(debug:print 0 *default-log-port* "INFO: Found host: " new-best " load: " load " last-used: " delta " seconds ago, with job-rate: " job-rate)
(host-last-used-set! rec curr-time)
new-best)
(if (null? tal) #f (loop (car tal)(cdr tal) best-host)))))))))
(define (common:wait-for-homehost-load maxnormload msg)
(let loop ((start-time (current-seconds))) ;; we saw some instances of this being called before *toppath* was set. This might be an early setup race. This delay should help but it is impossible to test...
(if (not *toppath*)
(begin
(debug:print 0 *default-log-port* "ERROR: common:wait-for-homehost-load called before *toppath* set.")
(thread-sleep! 30)
(if (< (- (current-seconds) start-time) 300)
(loop start-time)))))
(case (rmt:transport-mode)
((http)
(let* ((hh-dat (if (common:on-homehost?) ;; if we are on the homehost then pass in #f so the calls are local.
#f
(server:choose-server *toppath* 'homehost)))
(hh (if hh-dat (car hh-dat) #f)))
(common:wait-for-normalized-load maxnormload msg hh)))
(let* ((hh-dat (if (common:on-homehost?) ;; if we are on the homehost then pass in #f so the calls are local.
#f
(server:choose-server *toppath* 'homehost)))
(hh (if hh-dat (car hh-dat) #f)))
(common:wait-for-normalized-load maxnormload msg hh)))
(else
(common:wait-for-normalized-load maxnormload msg (get-host-name)))))
(define (common:get-num-cpus remote-host)
(let* ((actual-host (or remote-host (get-host-name))))
;; hosts had better not be changing the number of cpus too often!
(or (hash-table-ref/default *numcpus-cache* actual-host #f)
(let* ((numcpus (or (common:get-cached-info actual-host "num-cpus" age: (+ 2592000 (random 3600)))
(let* ((proc (lambda ()
(let loop ((numcpu 0)
(inl (read-line)))
(if (eof-object? inl)
(if (> numcpu 0)
numcpu
#f) ;; if zero return #f so caller knows that things are not working
(loop (if (string-match "^processor\\s+:\\s+\\d+$" inl)
(+ numcpu 1)
numcpu)
(read-line))))))
(result (if remote-host
(result (if (and remote-host
(not (equal? remote-host (get-host-name))))
(common:generic-ssh
(conc "ssh " remote-host " cat /proc/cpuinfo")
proc -1)
(with-input-from-file "/proc/cpuinfo" proc))))
(if (and (number? result)
(> result 0))
(common:write-cached-info actual-host "num-cpus" result))
result))))
(hash-table-set! *numcpus-cache* actual-host numcpus)
numcpus))))
(define (common:generic-ssh ssh-command proc default #!optional (msg-proc #f))
(let ((inp #f))
(handle-exceptions
exn
(begin
(close-input-port inp)
(if msg-proc
(msg-proc)
(debug:print 0 *default-log-port* "Command: \""ssh-command"\" failed. exn="exn))
default)
(set! inp (open-input-pipe ssh-command))
(with-input-from-port inp
(lambda ()
(let ((res (proc)))
(close-input-port inp)
res))))))
;;======================================================================
;; wait for normalized cpu load to drop below maxload
;;
(define (common:wait-for-normalized-load maxnormload msg remote-host #!optional (rem-tries 5))
(let ((num-cpus (common:get-num-cpus remote-host)))
(if num-cpus
(common:wait-for-cpuload maxnormload num-cpus 15 msg: msg remote-host: remote-host)
|
︙ | | |
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
|
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
|
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
|
(define (get-uname . params)
(let* ((uname-res (process:cmd-run->list (conc "uname " (if (null? params) "-a" (car params)))))
(uname #f))
(if (null? (car uname-res))
"unknown"
(caar uname-res))))
;; for reasons I don't understand multiple calls to real-path in parallel threads
;; must be protected by mutexes
;;
(define (common:real-path inpath)
;; (process:cmd-run-with-stderr->list "readlink" "-f" inpath)) ;; cmd . params)
;; (let-values
;; (((inp oup pid) (process "readlink" (list "-f" inpath))))
;; (with-input-from-port inp
;; (let loop ((inl (read-line))
;; (res #f))
;; (print "inl=" inl)
;; (if (eof-object? inl)
;; (begin
;; (close-input-port inp)
;; (close-output-port oup)
;; ;; (process-wait pid)
;; res)
;; (loop (read-line) inl))))))
(with-input-from-pipe (conc "readlink -f " inpath) read-line))
;;======================================================================
;; D I S K S P A C E
;;======================================================================
(define (common:get-disk-space-used fpath)
(with-input-from-pipe (conc "/usr/bin/du -s " fpath) read))
|
︙ | | |
2617
2618
2619
2620
2621
2622
2623
2624
2625
2626
2627
2628
2629
2630
2631
2632
2633
2634
2635
2636
2637
2638
2639
2640
2641
2642
2643
2644
2645
2646
2647
2648
2649
2650
2651
2652
2653
2654
2655
2656
2657
2658
2659
2660
2661
2662
2663
2664
2665
2666
2667
2668
2669
2670
2671
2672
2673
2674
2675
2676
2677
2678
2679
2680
2681
2682
2683
2684
2685
2686
2687
2688
2689
2690
2691
2692
2693
2694
2695
2696
2697
2698
2699
2700
2701
2702
2703
2704
2705
2706
2707
2708
2709
2710
2711
2712
2713
2714
2715
2716
2717
2718
2719
2720
2721
2722
2723
2724
2725
2726
2727
2728
2729
2730
2731
2732
2733
2734
2735
2736
2737
2738
2739
2740
2741
2742
2743
2744
2745
2746
2747
2748
2749
2750
2751
2752
2753
2754
2755
2756
2757
2758
2759
2760
2761
2762
2763
2764
2765
2766
2767
2768
2769
2770
2771
2772
2773
2774
2775
2776
2777
2778
2779
2780
2781
2782
2783
2784
2785
2786
2787
2788
2789
2790
2791
2792
2793
2794
2795
2796
2797
2798
2799
2800
2801
2802
2803
2804
2805
2806
2807
2808
2809
2810
2811
2812
2813
2814
2815
2816
2817
2818
2819
2820
2821
2822
2823
2824
2825
2826
2827
2828
2829
2830
2831
2832
2833
2834
2835
2836
2837
2838
2839
2840
2841
2842
2843
2844
2845
2846
2847
2848
2849
2850
2851
2852
2853
2854
2855
2856
2857
2858
2859
2860
2861
2862
2863
2864
2865
2866
2867
2868
2869
2870
2871
2872
2873
2874
2875
2876
2877
2878
2879
2880
2881
2882
2883
2884
2885
2886
2887
2888
2889
2890
2891
2892
2893
2894
2895
2896
2897
2898
2899
2900
2901
2902
2903
2904
2905
2906
2907
2908
2909
2910
2911
|
2582
2583
2584
2585
2586
2587
2588
2589
2590
2591
2592
2593
2594
2595
|
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
|
(conc "viewscreen " cmd))))
(debug:print-info 02 *default-log-port* "Running command: " fullcmd)
(cond
(with-vars (common:without-vars fullcmd))
(with-orig-env (common:with-orig-env fullcmd))
(else (common:without-vars fullcmd "MT_.*")))))
;;======================================================================
;; T I M E A N D D A T E
;;======================================================================
;;======================================================================
;; Convert strings like "5s 2h 3m" => 60x60x2 + 3x60 + 5
(define (common:hms-string->seconds tstr)
(let ((parts (string-split-fields "\\w+" tstr))
(time-secs 0)
;; s=seconds, m=minutes, h=hours, d=days, M=months, y=years, w=weeks
(trx (regexp "(\\d+)([smhdMyw])")))
(for-each (lambda (part)
(let ((match (string-match trx part)))
(if match
(let ((val (string->number (cadr match)))
(unt (caddr match)))
(if val
(set! time-secs (+ time-secs (* val
(case (string->symbol unt)
((s) 1)
((m) 60) ;; minutes
((h) 3600)
((d) 86400)
((w) 604800)
((M) 2628000) ;; aproximately one month
((y) 31536000)
(else #f))))))))))
parts)
time-secs))
(define (seconds->hr-min-sec secs)
(let* ((hrs (quotient secs 3600))
(min (quotient (- secs (* hrs 3600)) 60))
(sec (- secs (* hrs 3600)(* min 60))))
(conc (if (> hrs 0)(conc hrs "hr ") "")
(if (> min 0)(conc min "m ") "")
sec "s")))
(define (seconds->time-string sec)
(time->string
(seconds->local-time sec) "%H:%M:%S"))
(define (seconds->work-week/day-time sec)
(time->string
(seconds->local-time sec) "ww%V.%u %H:%M"))
(define (seconds->work-week/day sec)
(time->string
(seconds->local-time sec) "ww%V.%u"))
(define (seconds->year-work-week/day sec)
(time->string
(seconds->local-time sec) "%yww%V.%w"))
(define (seconds->year-work-week/day-time sec)
(time->string
(seconds->local-time sec) "%Yww%V.%w %H:%M"))
(define (seconds->year-week/day-time sec)
(time->string
(seconds->local-time sec) "%Yw%V.%w %H:%M"))
(define (seconds->quarter sec)
(case (string->number
(time->string
(seconds->local-time sec)
"%m"))
((1 2 3) 1)
((4 5 6) 2)
((7 8 9) 3)
((10 11 12) 4)
(else #f)))
;;======================================================================
;; basic ISO8601 format (e.g. "2017-02-28 06:02:54") date time => Unix epoch
;;
(define (common:date-time->seconds datetime)
(local-time->seconds (string->time datetime "%Y-%m-%d %H:%M:%S")))
;;======================================================================
;; given span of seconds tstart to tend
;; find start time to mark and mark delta
;;
(define (common:find-start-mark-and-mark-delta tstart tend)
(let* ((deltat (- (max tend (+ tend 10)) tstart)) ;; can't handle runs of less than 4 seconds. Pad it to 10 seconds ...
(result #f)
(min 60)
(hr (* 60 60))
(day (* 24 hr))
(yr (* 365 day)) ;; year
(mo (/ yr 12))
(wk (* day 7)))
(for-each
(lambda (max-blks)
(for-each
(lambda (span) ;; 5 2 1
(if (not result)
(for-each
(lambda (timeunit timesym) ;; year month day hr min sec
(if (not result)
(let* ((time-blk (* span timeunit))
(num-blks (quotient deltat time-blk)))
(if (and (> num-blks 4)(< num-blks max-blks))
(let ((first (* (quotient tstart time-blk) time-blk)))
(set! result (list span timeunit time-blk first timesym))
)))))
(list yr mo wk day hr min 1)
'( y mo w d h m s))))
(list 8 6 5 2 1)))
'(5 10 15 20 30 40 50 500))
(if values
(apply values result)
(values 0 day 1 0 'd))))
;;======================================================================
;; given x y lim return the cron expansion
;;
(define (common:expand-cron-slash x y lim)
(let loop ((curr x)
(res `()))
(if (< curr lim)
(loop (+ curr y) (cons curr res))
(reverse res))))
;;======================================================================
;; expand a complex cron string to a list of cron strings
;;
;; x/y => x, x+y, x+2y, x+3y while x+Ny<max_for_field
;; a,b,c => a, b ,c
;;
;; NOTE: with flatten a lot of the crud below can be factored down.
;;
(define (common:cron-expand cron-str)
(if (list? cron-str)
(flatten
(fold (lambda (x res)
(if (list? x)
(let ((newres (map common:cron-expand x)))
(append x newres))
(cons x res)))
'()
cron-str)) ;; (map common:cron-expand cron-str))
(let ((cron-items (string-split cron-str))
(slash-rx (regexp "(\\d+)/(\\d+)"))
(comma-rx (regexp ".*,.*"))
(max-vals '((min . 60)
(hour . 24)
(dayofmonth . 28) ;;; BUG!!!! This will be a bug for some combinations
(month . 12)
(dayofweek . 7))))
(if (< (length cron-items) 5) ;; bad spec
cron-str ;; `(,cron-str) ;; just return the string, something downstream will fix it
(let loop ((hed (car cron-items))
(tal (cdr cron-items))
(type 'min)
(type-tal '(hour dayofmonth month dayofweek))
(res '()))
(regex-case
hed
(slash-rx ( _ base incr ) (let* ((basen (string->number base))
(incrn (string->number incr))
(expanded-vals (common:expand-cron-slash basen incrn (alist-ref type max-vals)))
(new-list-crons (fold (lambda (x myres)
(cons (conc (if (null? res)
""
(conc (string-intersperse res " ") " "))
x " " (string-intersperse tal " "))
myres))
'() expanded-vals)))
;; (print "new-list-crons: " new-list-crons)
;; (fold (lambda (x res)
;; (if (list? x)
;; (let ((newres (map common:cron-expand x)))
;; (append x newres))
;; (cons x res)))
;; '()
(flatten (map common:cron-expand new-list-crons))))
;; (map common:cron-expand (map common:cron-expand new-list-crons))))
(else (if (null? tal)
cron-str
(loop (car tal)(cdr tal)(car type-tal)(cdr type-tal)(append res (list hed)))))))))))
;;======================================================================
;; given a cron string and the last time event was processed return #t to run or #f to not run
;;
;; min hour dayofmonth month dayofweek
;; 0-59 0-23 1-31 1-12 0-6 ### NOTE: dayofweek does not include 7
;;
;; #t => yes, run the job
;; #f => no, do not run the job
;;
(define (common:cron-event cron-str now-seconds-in last-done) ;; ref-seconds = #f is NOW.
(let* ((cron-items (map string->number (string-split cron-str)))
(now-seconds (or now-seconds-in (current-seconds)))
(now-time (seconds->local-time now-seconds))
(last-done-time (seconds->local-time last-done))
(all-times (make-hash-table)))
;; (print "cron-items: " cron-items "(length cron-items): " (length cron-items))
(if (not (eq? (length cron-items) 5)) ;; don't even try to figure out junk strings
#f
(match-let ((( cmin chour cdayofmonth cmonth cdayofweek)
cron-items)
;; 0 1 2 3 4 5 6
((nsec nmin nhour ndayofmonth nmonth nyr ndayofweek n7 n8 n9)
(vector->list now-time))
((lsec lmin lhour ldayofmonth lmonth lyr ldayofweek l7 l8 l9)
(vector->list last-done-time)))
;; create all possible time slots
;; remove invalid slots due to (for example) day of week
;; get the start and end entries for the ref-seconds (current) time
;; if last-done > ref-seconds => this is an ERROR!
;; does the last-done time fall in the legit region?
;; yes => #f do not run again this command
;; no => #t ok to run the command
(for-each ;; month
(lambda (month)
(for-each ;; dayofmonth
(lambda (dom)
(for-each
(lambda (hr) ;; hour
(for-each
(lambda (minute) ;; minute
(let ((copy-now (apply vector (vector->list now-time))))
(vector-set! copy-now 0 0) ;; force seconds to zero
(vector-set! copy-now 1 minute)
(vector-set! copy-now 2 hr)
(vector-set! copy-now 3 dom) ;; dom is already corrected for zero referenced
(vector-set! copy-now 4 month)
(let* ((copy-now-secs (local-time->seconds copy-now))
(new-copy (seconds->local-time copy-now-secs))) ;; remake the time vector
(if (or (not cdayofweek)
(equal? (vector-ref new-copy 6)
cdayofweek)) ;; if the day is specified and a match OR if the day is NOT specified
(if (or (not cdayofmonth)
(equal? (vector-ref new-copy 3)
(+ 1 cdayofmonth))) ;; if the month is specified and a match OR if the month is NOT specified
(hash-table-set! all-times copy-now-secs new-copy))))))
(if cmin
`(,cmin) ;; if given cmin, have to use it
(list (- nmin 1) nmin (+ nmin 1))))) ;; minute
(if chour
`(,chour)
(list (- nhour 1) nhour (+ nhour 1))))) ;; hour
(if cdayofmonth
`(,cdayofmonth)
(list (- ndayofmonth 1) ndayofmonth (+ ndayofmonth 1)))))
(if cmonth
`(,cmonth)
(list (- nmonth 1) nmonth (+ nmonth 1))))
(let ((before #f)
(is-in #f))
(for-each
(lambda (moment)
(if (and before
(<= before now-seconds)
(>= moment now-seconds))
(begin
;; (print)
;; (print "Before: " (time->string (seconds->local-time before)))
;; (print "Now: " (time->string (seconds->local-time now-seconds)))
;; (print "After: " (time->string (seconds->local-time moment)))
;; (print "Last: " (time->string (seconds->local-time last-done)))
(if (< last-done before)
(set! is-in before))
))
(set! before moment))
(sort (hash-table-keys all-times) <))
is-in)))))
(define (common:extended-cron cron-str now-seconds-in last-done)
(let ((expanded-cron (common:cron-expand cron-str)))
(if (string? expanded-cron)
(common:cron-event expanded-cron now-seconds-in last-done)
(let loop ((hed (car expanded-cron))
(tal (cdr expanded-cron)))
(if (common:cron-event hed now-seconds-in last-done)
#t
(if (null? tal)
#f
(loop (car tal)(cdr tal))))))))
;;======================================================================
;; C O L O R S
;;======================================================================
(define (common:name->iup-color name)
(case (string->symbol (string-downcase name))
((red) "223 33 49")
|
︙ | | |
3137
3138
3139
3140
3141
3142
3143
3144
3145
3146
3147
3148
3149
3150
3151
3152
3153
3154
3155
3156
3157
3158
3159
3160
3161
3162
3163
3164
3165
3166
3167
3168
3169
3170
3171
3172
3173
3174
3175
3176
3177
3178
3179
3180
3181
3182
3183
3184
3185
3186
3187
3188
3189
|
2821
2822
2823
2824
2825
2826
2827
2828
2829
2830
2831
2832
2833
2834
|
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
|
(loop (car tal)(cdr tal)))))))
;; no match, try again
(if (null? tal)
fallback-launcher
(loop (car tal)(cdr tal))))))))
fallback-launcher)))
;;======================================================================
;; NMSG AND NEW API
;;======================================================================
;;
;; ;;======================================================================
;; ;; nm based server experiment, keep around for now.
;; ;;
;; (define (nm:start-server dbconn #!key (given-host-name #f))
;; (let* ((srvdat (start-raw-server given-host-name: given-host-name))
;; (host-name (srvdat-host srvdat))
;; (soc (srvdat-soc srvdat)))
;;
;; ;; start the queue processor (save for second round of development)
;; ;;
;; (thread-start! (make-thread! (lambda ()(queue-processor dbconn) "Queue processor")))
;; ;; msg is an alist
;; ;; 'r host:port <== where to return the data
;; ;; 'p params <== data to apply the command to
;; ;; 'e j|s|l <== encoding of the params. default is s (sexp), if not specified is assumed to be default
;; ;; 'c command <== look up the function to call using this key
;; ;;
;; (let loop ((msg-in (nn-recv soc)))
;; (if (not (equal? msg-in "quit"))
;; (let* ((dat (decode msg-in))
;; (host-port (alist-ref 'r dat)) ;; this is for the reverse req rep where the server is a client of the original client
;; (params (alist-ref 'p dat))
;; (command (let ((c (alist-ref 'c dat)))(if c (string->symbol c) #f)))
;; (all-good (and host-port params command (hash-table-exists? *commands* command))))
;; (if all-good
;; (let ((cmddat (make-qitem
;; command: command
;; host-port: host-port
;; params: params)))
;; (queue-push cmddat) ;; put request into the queue
;; (nn-send soc "queued")) ;; reply with "queued"
;; (print "ERROR: ["(common:human-time)"] BAD request " dat))
;; (loop (nn-recv soc)))))
;; (nn-close soc)))
;;======================================================================
;; D A S H B O A R D U S E R V I E W S
;;======================================================================
;;======================================================================
;; first read ~/views.config if it exists, then read $MTRAH/views.config if it exists
;;
|
︙ | | |
3345
3346
3347
3348
3349
3350
3351
3352
3353
3354
3355
3356
3357
3358
3359
3360
3361
3362
3363
3364
3365
3366
3367
3368
3369
3370
3371
3372
3373
3374
|
2990
2991
2992
2993
2994
2995
2996
2997
2998
2999
3000
3001
3002
3003
3004
3005
3006
3007
3008
3009
3010
3011
3012
3013
3014
3015
3016
3017
3018
3019
3020
3021
3022
3023
3024
3025
|
-
+
+
+
+
+
+
+
-
+
|
(debug:print 0 *default-log-port* "ERROR: packets directory " pktsdir " does not exist."))
((not (directory? pktsdir))
(debug:print 0 *default-log-port* "ERROR: packets directory path " pktsdir " is not a directory."))
((not (file-read-access? pktsdir))
(debug:print 0 *default-log-port* "ERROR: packets directory path " pktsdir " is not readable."))
(else
(debug:print-info 0 *default-log-port* "Loading packets found in " pktsdir)
(let ((pkts (glob (conc pktsdir "/*.pkt"))))
(let ((pkts (glob (conc pktsdir "/*.pkt")))
(sqdb (dbi:db-conn pdb))
)
;; Put this in a transaction to avoid issues overloading the db
(sqlite3:with-transaction
sqdb
(lambda ()
(for-each
(lambda (pkt)
(let* ((uuid (cadr (string-match ".*/([0-9a-f]+).pkt" pkt)))
(exists (lookup-by-uuid pdb uuid #f)))
(if (not exists)
(let* ((pktdat (string-intersperse
(with-input-from-file pkt read-lines)
"\n"))
(apkt (pkt->alist pktdat))
(ptype (alist-ref 'T apkt)))
(add-to-queue pdb pktdat uuid (or ptype 'cmd) #f 0)
(debug:print 4 *default-log-port* "Added " uuid " of type " ptype " to queue"))
(debug:print 4 *default-log-port* "pkt: " uuid " exists, skipping...")
)))
pkts)))))
pkts)))))))
pktsdirs))
use-lt: use-lt))
(define (common:get-pkt-alists pkts)
(map (lambda (x)
(alist-ref 'apkt x)) ;; 'pkta pulls out the alist from the read pkt
pkts))
|
︙ | | |