Megatest

Check-in [a4d8d9166c]
Login
Overview
Comment:Put megatest main call into thread so that mailboxes work
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | v2.0001
Files: files | file ages | folders
SHA1: a4d8d9166ce606b5934964d3a19494c387068bcd
User & Date: matt on 2022-01-05 11:48:22
Other Links: branch diff | manifest | tags
Context
2022-01-05
20:59
wip check-in: 94e8e9f0b5 user: matt tags: v2.0001
17:36
Experiments in bypassing parts of the ulex flow. Leaf check-in: 51f97e7aaf user: matt tags: v2.0001-ulex-experiment
11:48
Put megatest main call into thread so that mailboxes work check-in: a4d8d9166c user: matt tags: v2.0001
10:38
wip. still blocking issues check-in: 1e01693b9c user: matt tags: v2.0001
Changes

Modified megatest.scm from [7c052417f9] to [a1cea3fd33].

2660
2661
2662
2663
2664
2665
2666
2667
2668
2669




2670
	       ((1)(exit 1))
	       ((2)(exit 2))
	       (else (exit 3)))))
     )

)

(import megatest-main)
(import commonmod)
(main)












|
|
|
>
>
>
>

2660
2661
2662
2663
2664
2665
2666
2667
2668
2669
2670
2671
2672
2673
2674
	       ((1)(exit 1))
	       ((2)(exit 2))
	       (else (exit 3)))))
     )

)

(import megatest-main commonmod)
(import srfi-18)

(thread-join!
 (thread-start!
  (make-thread main)))


Modified ulex/ulex.scm from [db4036e779] to [c5a87871a1].

92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
  ;; work handling
  (work-queue (make-mailbox))
  (work-proc  #f)                ;; set by user
  (cnum       0)                 ;; cookie number
  (mboxes     (make-hash-table)) ;; for the replies
  (avail-cmboxes '())            ;; list of (<cookie> . <mbox>) for re-use
  ;; threads
  (numthreads 10)
  (cmd-thread #f)
  (work-queue-thread #f)
  ) 

;; ;; struct for keeping track of others we are talking to
;; ;;
;; (defstruct pdat







|







92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
  ;; work handling
  (work-queue (make-mailbox))
  (work-proc  #f)                ;; set by user
  (cnum       0)                 ;; cookie number
  (mboxes     (make-hash-table)) ;; for the replies
  (avail-cmboxes '())            ;; list of (<cookie> . <mbox>) for re-use
  ;; threads
  (numthreads 50)
  (cmd-thread #f)
  (work-queue-thread #f)
  ) 

;; ;; struct for keeping track of others we are talking to
;; ;;
;; (defstruct pdat
156
157
158
159
160
161
162


163
164
165
166
167
168
169
;;
(define (run-listener handler-proc #!optional (port-suggestion 4242))
  (let* ((uconn (make-udat)))
    (udat-work-proc-set! uconn handler-proc)
    (if (setup-listener uconn port-suggestion)
	(let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop"))
	       (th2 (make-thread (lambda ()(process-work-queue uconn)) "Ulex work queue processor")))


	  (thread-start! th1)
	  (thread-start! th2)
	  (udat-cmd-thread-set! uconn th1)
	  (udat-work-queue-thread-set! uconn th2)
	  (print "cmd loop and process workers started")
	  uconn)
	(assert #f "ERROR: run-listener called without proper setup."))))







>
>







156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
;;
(define (run-listener handler-proc #!optional (port-suggestion 4242))
  (let* ((uconn (make-udat)))
    (udat-work-proc-set! uconn handler-proc)
    (if (setup-listener uconn port-suggestion)
	(let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop"))
	       (th2 (make-thread (lambda ()(process-work-queue uconn)) "Ulex work queue processor")))
	  (tcp-buffer-size 2048)
	  ;; (max-connections 2048) 
	  (thread-start! th1)
	  (thread-start! th2)
	  (udat-cmd-thread-set! uconn th1)
	  (udat-work-queue-thread-set! uconn th2)
	  (print "cmd loop and process workers started")
	  uconn)
	(assert #f "ERROR: run-listener called without proper setup."))))
312
313
314
315
316
317
318

319



320
321


322
323
324
325
326
327
328
329
330
331

332
333

334
335
336
337
338
339
340
  (mailbox-send! (udat-work-queue uconn) rdat))

(define (do-work uconn rdat)
  (let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change
    ;; put this following into a do-work procedure
    (match rdat
      ((rem-host-port qrykey cmd params)

       (let* ((result (proc rem-host-port qrykey cmd params)))



	 ;; send 'response as cmd and result as params
	 (send uconn rem-host-port qrykey 'response result))) ;; could check for ack


      (else
       (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params")))))
     
(define (process-work-queue uconn) 
  (let ((wqueue (udat-work-queue uconn))
	(proc   (udat-work-proc  uconn))
	(numthr (udat-numthreads uconn)))
    (let loop ((thnum    1)
	       (threads '()))
      (let ((thlst (cons (make-thread (lambda ()

					(let ((rdat (mailbox-receive! wqueue #f 'MBOX_TIMEOUT)))
					  (do-work uconn rdat)))

				      (conc "work thread " thnum))
			 threads)))
	(if (< thnum numthr)
	    (loop (+ thnum 1)
		  thlst)
	    (begin
	      (print "ULEX: Starting "(length thlst)" worker threads.")







>
|
>
>
>

|
>
>










>
|
|
>







314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
  (mailbox-send! (udat-work-queue uconn) rdat))

(define (do-work uconn rdat)
  (let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change
    ;; put this following into a do-work procedure
    (match rdat
      ((rem-host-port qrykey cmd params)
       (let* ((start-time (current-milliseconds))
	      (result (proc rem-host-port qrykey cmd params))
	      (end-time (current-milliseconds))
	      (run-time (- end-time start-time)))
	 (print "ULEX: work "cmd", "params" done in "run-time" ms")
	 ;; send 'response as cmd and result as params
	 (send uconn rem-host-port qrykey 'response result) ;; could check for ack
	 (print "ULEX: response sent back to "rem-host-port" in "(- (current-milliseconds) end-time))))
      (MBOX_TIMEOUT #f)
      (else
       (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params")))))
     
(define (process-work-queue uconn) 
  (let ((wqueue (udat-work-queue uconn))
	(proc   (udat-work-proc  uconn))
	(numthr (udat-numthreads uconn)))
    (let loop ((thnum    1)
	       (threads '()))
      (let ((thlst (cons (make-thread (lambda ()
					(let work-loop ()
					  (let ((rdat (mailbox-receive! wqueue 24000 'MBOX_TIMEOUT)))
					    (do-work uconn rdat))
					  (work-loop)))
				      (conc "work thread " thnum))
			 threads)))
	(if (< thnum numthr)
	    (loop (+ thnum 1)
		  thlst)
	    (begin
	      (print "ULEX: Starting "(length thlst)" worker threads.")