92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
-
+
|
;; work handling
(work-queue (make-mailbox))
(work-proc #f) ;; set by user
(cnum 0) ;; cookie number
(mboxes (make-hash-table)) ;; for the replies
(avail-cmboxes '()) ;; list of (<cookie> . <mbox>) for re-use
;; threads
(numthreads 10)
(numthreads 50)
(cmd-thread #f)
(work-queue-thread #f)
)
;; ;; struct for keeping track of others we are talking to
;; ;;
;; (defstruct pdat
|
156
157
158
159
160
161
162
163
164
165
166
167
168
169
|
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
|
+
+
|
;;
(define (run-listener handler-proc #!optional (port-suggestion 4242))
(let* ((uconn (make-udat)))
(udat-work-proc-set! uconn handler-proc)
(if (setup-listener uconn port-suggestion)
(let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop"))
(th2 (make-thread (lambda ()(process-work-queue uconn)) "Ulex work queue processor")))
(tcp-buffer-size 2048)
;; (max-connections 2048)
(thread-start! th1)
(thread-start! th2)
(udat-cmd-thread-set! uconn th1)
(udat-work-queue-thread-set! uconn th2)
(print "cmd loop and process workers started")
uconn)
(assert #f "ERROR: run-listener called without proper setup."))))
|
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
|
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
|
+
-
+
+
+
+
-
+
+
+
+
-
-
+
+
+
|
(mailbox-send! (udat-work-queue uconn) rdat))
(define (do-work uconn rdat)
(let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change
;; put this following into a do-work procedure
(match rdat
((rem-host-port qrykey cmd params)
(let* ((start-time (current-milliseconds))
(let* ((result (proc rem-host-port qrykey cmd params)))
(result (proc rem-host-port qrykey cmd params))
(end-time (current-milliseconds))
(run-time (- end-time start-time)))
(print "ULEX: work "cmd", "params" done in "run-time" ms")
;; send 'response as cmd and result as params
(send uconn rem-host-port qrykey 'response result))) ;; could check for ack
(send uconn rem-host-port qrykey 'response result) ;; could check for ack
(print "ULEX: response sent back to "rem-host-port" in "(- (current-milliseconds) end-time))))
(MBOX_TIMEOUT #f)
(else
(print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params")))))
(define (process-work-queue uconn)
(let ((wqueue (udat-work-queue uconn))
(proc (udat-work-proc uconn))
(numthr (udat-numthreads uconn)))
(let loop ((thnum 1)
(threads '()))
(let ((thlst (cons (make-thread (lambda ()
(let work-loop ()
(let ((rdat (mailbox-receive! wqueue #f 'MBOX_TIMEOUT)))
(do-work uconn rdat)))
(let ((rdat (mailbox-receive! wqueue 24000 'MBOX_TIMEOUT)))
(do-work uconn rdat))
(work-loop)))
(conc "work thread " thnum))
threads)))
(if (< thnum numthr)
(loop (+ thnum 1)
thlst)
(begin
(print "ULEX: Starting "(length thlst)" worker threads.")
|