Megatest

Check-in [1e01693b9c]
Login
Overview
Comment:wip. still blocking issues
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | v2.0001
Files: files | file ages | folders
SHA1: 1e01693b9c9ef0075952b733bb48c67d85c1a518
User & Date: matt on 2022-01-05 10:38:31
Other Links: branch diff | manifest | tags
Context
2022-01-05
11:48
Put megatest main call into thread so that mailboxes work check-in: a4d8d9166c user: matt tags: v2.0001
10:38
wip. still blocking issues check-in: 1e01693b9c user: matt tags: v2.0001
2022-01-04
21:11
wip check-in: 8e478a8774 user: matt tags: v2.0001
Changes

Modified dashboard.scm from [feba132a9c] to [cc72246a09].

96
97
98
99
100
101
102

103
104
105
106
107
108
109
	srfi-1
	regex regex-case srfi-69
	typed-records
	sparse-vectors
	format
	srfi-4
	srfi-14

	)

;; (include "common_records.scm")
;; (include "db_records.scm")
;; (include "run_records.scm")
;; (include "task_records.scm")
;; (include "megatest-version.scm")







>







96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
	srfi-1
	regex regex-case srfi-69
	typed-records
	sparse-vectors
	format
	srfi-4
	srfi-14
	srfi-18
	)

;; (include "common_records.scm")
;; (include "db_records.scm")
;; (include "run_records.scm")
;; (include "task_records.scm")
;; (include "megatest-version.scm")
3752
3753
3754
3755
3756
3757
3758

3759



3760

(import dashboard)

;; ease debugging by loading ~/.dashboardrc
(let ((debugcontrolf (get-debugcontrolf)))
  (if debugcontrolf
      (load debugcontrolf)))


(main)












>
|
>
>
>

>
3753
3754
3755
3756
3757
3758
3759
3760
3761
3762
3763
3764
3765
3766
(import dashboard)

;; ease debugging by loading ~/.dashboardrc
(let ((debugcontrolf (get-debugcontrolf)))
  (if debugcontrolf
      (load debugcontrolf)))

(import srfi-18)

(thread-join!
 (thread-start!
  (make-thread main "main")))


Modified rmtmod.scm from [a7fbdc800c] to [f907b596d1].

1640
1641
1642
1643
1644
1645
1646



1647
1648
1649








1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689

;;======================================================================
;; S E R V E R
;; ======================================================================

(define (http-get-function fnkey)
  (hash-table-ref/default *http-functions* fnkey (lambda () "nothing here yet")))




;; Main entry point to start a server. was start-server
(define (rmt:run hostn)








  ;;  ;; Configurations for server
  ;;  (tcp-buffer-size 2048)
  ;;  (max-connections 2048) 
  (debug:print 2 *default-log-port* "PID: "(current-process-id)". Attempting to start the server ...")
  (if (and *db-serv-info*
	   (servdat-uconn *db-serv-info*))
      (let* ((uconn (servdat-uconn *db-serv-info*)))
	(wait-and-close uconn))
      (let* ((port            (portlogger:open-run-close portlogger:find-port))
	     (handler-proc    (lambda (rem-host-port qrykey cmd params) ;;
				(set! *db-last-access* (current-seconds))
				  (assert (list? params) "FATAL: handler called with non-list params")
				  (api:execute-requests *dbstruct-db* cmd params))))
	;; (api:process-request *dbstuct-db* 
	(if (not *db-serv-info*)
	    (set! *db-serv-info* (make-servdat host: hostn port: port)))
	(let* ((uconn (run-listener handler-proc port))
	       (rport (udat-port uconn))) ;; the real port
	  (servdat-host-set! *db-serv-info* hostn)
	  (servdat-port-set! *db-serv-info* rport)
	  (servdat-uconn-set! *db-serv-info* uconn)
	  (wait-and-close uconn)
	  (db:print-current-query-stats)
	  )))
  (let* ((host (servdat-host *db-serv-info*))
	 (port (servdat-port *db-serv-info*))
	 (mode (or (servdat-mode *db-serv-info*)
		   "non-db")))
      ;; server exit stuff here
      ;; (rmt:server-shutdown host port) - always do in on-exit
      ;; (portlogger:open-run-close portlogger:set-port port "released") ;; moved to on-exit 
      (debug:print-info 0 *default-log-port* "Server "host":"port" mode "mode"shutdown complete. Exiting")
      ))

;;======================================================================
;; S E R V E R   U T I L I T I E S 
;;======================================================================

;;======================================================================
;; C L I E N T S







>
>
>



>
>
>
>
>
>
>
>
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|







1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700

;;======================================================================
;; S E R V E R
;; ======================================================================

(define (http-get-function fnkey)
  (hash-table-ref/default *http-functions* fnkey (lambda () "nothing here yet")))

(define *rmt:run-mutex* (make-mutex))
(define *rmt:run-flag* #f)

;; Main entry point to start a server. was start-server
(define (rmt:run hostn)
  (mutex-lock! *rmt:run-mutex*)
  (if *rmt:run-flag*
      (begin
	(debug:print-warn 0 *default-log-port* "rmt:run already running.")
	(mutex-unlock! *rmt:run-mutex*))
      (begin
	(set! *rmt:run-flag* #t)
	(mutex-unlock! *rmt:run-mutex*)
	;;  ;; Configurations for server
	;;  (tcp-buffer-size 2048)
	;;  (max-connections 2048) 
	(debug:print 2 *default-log-port* "PID: "(current-process-id)". Attempting to start the server ...")
	(if (and *db-serv-info*
		 (servdat-uconn *db-serv-info*))
	    (let* ((uconn (servdat-uconn *db-serv-info*)))
	      (wait-and-close uconn))
	    (let* ((port            (portlogger:open-run-close portlogger:find-port))
		   (handler-proc    (lambda (rem-host-port qrykey cmd params) ;;
				      (set! *db-last-access* (current-seconds))
				      (assert (list? params) "FATAL: handler called with non-list params")
				      (api:execute-requests *dbstruct-db* cmd params))))
	      ;; (api:process-request *dbstuct-db* 
	      (if (not *db-serv-info*)
		  (set! *db-serv-info* (make-servdat host: hostn port: port)))
	      (let* ((uconn (run-listener handler-proc port))
		     (rport (udat-port uconn))) ;; the real port
		(servdat-host-set! *db-serv-info* hostn)
		(servdat-port-set! *db-serv-info* rport)
		(servdat-uconn-set! *db-serv-info* uconn)
		(wait-and-close uconn)
		(db:print-current-query-stats)
		)))
	(let* ((host (servdat-host *db-serv-info*))
	       (port (servdat-port *db-serv-info*))
	       (mode (or (servdat-mode *db-serv-info*)
			 "non-db")))
	  ;; server exit stuff here
	  ;; (rmt:server-shutdown host port) - always do in on-exit
	  ;; (portlogger:open-run-close portlogger:set-port port "released") ;; moved to on-exit 
	  (debug:print-info 0 *default-log-port* "Server "host":"port" mode "mode"shutdown complete. Exiting")
	  ))))

;;======================================================================
;; S E R V E R   U T I L I T I E S 
;;======================================================================

;;======================================================================
;; C L I E N T S

Modified tests/simplerun/debug.scm from [e077b8643e] to [55a7eef5b7].

12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
(define th1 
  (make-thread
    (lambda ()
(let loop ((r 1)
	   (i 1))
   (print "register-test "r" test"i)
   (rmt:register-test r "test1" (conc "item_" i))
   (if (< i 1000)
       (loop r (+ i 1))
       (if (< r 100)
	   (begin
	      (print "get-tests-for-run "r)
              (rmt:get-tests-for-run r "%" '() '() 0 #f #f #f #f #f 0 #f)
 	      (loop (+ r 1) 0)))))
)))







|







12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
(define th1 
  (make-thread
    (lambda ()
(let loop ((r 1)
	   (i 1))
   (print "register-test "r" test"i)
   (rmt:register-test r "test1" (conc "item_" i))
   (if (< i 100000)
       (loop r (+ i 1))
       (if (< r 100)
	   (begin
	      (print "get-tests-for-run "r)
              (rmt:get-tests-for-run r "%" '() '() 0 #f #f #f #f #f 0 #f)
 	      (loop (+ r 1) 0)))))
)))

Modified ulex/ulex.scm from [f9d66ef133] to [db4036e779].

62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
	chicken.string
	chicken.sort
	chicken.pretty-print
	
	address-info
	mailbox
	matchable
	queues
	regex
	regex-case
	s11n
	srfi-1
	srfi-18
	srfi-4
	srfi-69







|







62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
	chicken.string
	chicken.sort
	chicken.pretty-print
	
	address-info
	mailbox
	matchable
	;; queues
	regex
	regex-case
	s11n
	srfi-1
	srfi-18
	srfi-4
	srfi-69
86
87
88
89
90
91
92
93
94
95
96
97
98

99
100
101
102
103
104
105
  ;; the listener side
  (port #f)
  (host-port #f)
  (socket #f)
  ;; the peers
  (peers  (make-hash-table)) ;; host:port->peer
  ;; work handling
  (work-queue (make-queue))
  (work-proc  #f)            ;; set by user
  (cnum       0)             ;; cookie number
  (mboxes     (make-hash-table))
  (avail-cmboxes '())  ;; list of (<cookie> . <mbox>) for re-use
  ;; threads

  (cmd-thread #f)
  (work-queue-thread #f)
  ) 

;; ;; struct for keeping track of others we are talking to
;; ;;
;; (defstruct pdat







|
|
|
|
|

>







86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
  ;; the listener side
  (port #f)
  (host-port #f)
  (socket #f)
  ;; the peers
  (peers  (make-hash-table)) ;; host:port->peer
  ;; work handling
  (work-queue (make-mailbox))
  (work-proc  #f)                ;; set by user
  (cnum       0)                 ;; cookie number
  (mboxes     (make-hash-table)) ;; for the replies
  (avail-cmboxes '())            ;; list of (<cookie> . <mbox>) for re-use
  ;; threads
  (numthreads 10)
  (cmd-thread #f)
  (work-queue-thread #f)
  ) 

;; ;; struct for keeping track of others we are talking to
;; ;;
;; (defstruct pdat
303
304
305
306
307
308
309
310

311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326

327
328
329

330
331



332






333
334
335
336
337
338
339
;;======================================================================
;; work queues - this is all happening on the listener side
;;======================================================================

;; rdat is (rem-host-port qrykey cmd params)
					     
(define (add-to-work-queue uconn rdat)
  (queue-add! (udat-work-queue uconn) rdat))


(define (do-work uconn rdat)
  (let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change
    ;; put this following into a do-work procedure
    (match rdat
      ((rem-host-port qrykey cmd params)
       (let* ((result (proc rem-host-port qrykey cmd params)))
	 ;; send 'response as cmd and result as params
	 (send uconn rem-host-port qrykey 'response result))) ;; could check for ack
      (else
       (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params")))))
     
     
(define (process-work-queue uconn) 
  (let ((wqueue (udat-work-queue uconn))
	(proc   (udat-work-proc  uconn)))

    (let loop ()
      (if (queue-empty? wqueue)
	  (thread-sleep! 0.1)

	  (let ((rdat (queue-remove! wqueue)))
	    (do-work uconn rdat)))



      (loop))))







;; below was to enable re-use of connections. This seems non-trivial so for
;; now lets open on each call
;;
;; ;; given host-port get or create peer struct
;; ;;
;; (define (udat-get-peer uconn host-port)







|
>












<


|
>
|
<
|
>
|
|
>
>
>
|
>
>
>
>
>
>







304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324

325
326
327
328
329

330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
;;======================================================================
;; work queues - this is all happening on the listener side
;;======================================================================

;; rdat is (rem-host-port qrykey cmd params)
					     
(define (add-to-work-queue uconn rdat)
  #;(queue-add! (udat-work-queue uconn) rdat)
  (mailbox-send! (udat-work-queue uconn) rdat))

(define (do-work uconn rdat)
  (let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change
    ;; put this following into a do-work procedure
    (match rdat
      ((rem-host-port qrykey cmd params)
       (let* ((result (proc rem-host-port qrykey cmd params)))
	 ;; send 'response as cmd and result as params
	 (send uconn rem-host-port qrykey 'response result))) ;; could check for ack
      (else
       (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params")))))
     

(define (process-work-queue uconn) 
  (let ((wqueue (udat-work-queue uconn))
	(proc   (udat-work-proc  uconn))
	(numthr (udat-numthreads uconn)))
    (let loop ((thnum    1)

	       (threads '()))
      (let ((thlst (cons (make-thread (lambda ()
					(let ((rdat (mailbox-receive! wqueue #f 'MBOX_TIMEOUT)))
					  (do-work uconn rdat)))
				      (conc "work thread " thnum))
			 threads)))
	(if (< thnum numthr)
	    (loop (+ thnum 1)
		  thlst)
	    (begin
	      (print "ULEX: Starting "(length thlst)" worker threads.")
	      (map thread-start! thlst)
	      (print "ULEX: Threads started. Joining all.")
	      (map thread-join! thlst)))))))

;; below was to enable re-use of connections. This seems non-trivial so for
;; now lets open on each call
;;
;; ;; given host-port get or create peer struct
;; ;;
;; (define (udat-get-peer uconn host-port)