59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
;; (begin
;; (debug:print 0 *default-log-port* "Remote failed for " proc " " params " exn="exn)
;; (apply (eval (string->symbol procstr)) params))
;; ;; (if *runremote*
;; ;; (apply (eval (string->symbol (conc "remote:" procstr))) params)
;; (apply (eval (string->symbol procstr)) params)))
(define (rpc-transport:server-shutdown server-id rpc:listener #!key (from-on-exit #f))
(on-exit (lambda () #t)) ;; turn off on-exit stuff
;;(tcp-close rpc:listener) ;; gotta exit nicely
;;(tasks:bb-server-set-state! server-id "stopped")
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
|
;; (begin
;; (debug:print 0 *default-log-port* "Remote failed for " proc " " params " exn="exn)
;; (apply (eval (string->symbol procstr)) params))
;; ;; (if *runremote*
;; ;; (apply (eval (string->symbol (conc "remote:" procstr))) params)
;; (apply (eval (string->symbol procstr)) params)))
;; retry an operation (depends on srfi-18)
;; ==================
;; idea here is to avoid spending time on coding retrying something. Trying to be generic here.
;;
;; Exception handling:
;; -------------------
;; if evaluating the thunk results in exception, it will be retried.
;; on last try, if final-failure-returns-actual is true, the exception will be re-thrown to caller.
;;
;; look at options below #!key to see how to configure behavior
;;
;;
(define (retry-thunk
the-thunk
#!key ;;;; options below
(accept-result? (lambda (x) x)) ;; retry if predicate applied to thunk's result is false
(retries 4) ;; how many tries
(failure-value #f) ;; return this on final failure, unless following option is enabled:
(final-failure-returns-actual #f) ;; on failure, on the last try, just return the result, not failure-value
(retry-delay 0.1) ;; delay between tries
(back-off-factor 1) ;; multiply retry-delay by this factor on retry
(random-delay 0.1) ;; add a random portion of this value to wait
(chatty #f) ;; print status as we go, for debugging.
)
(when chatty (print) (print "Entered retry-thunk") (print "-=-=-=-=-=-"))
(let* ((guarded-thunk ;; we are guarding the thunk against exceptions. We will record whether result of evaluation is an exception or a regular result.
(lambda ()
(let* ((EXCEPTION (gensym)) ;; using gensym to avoid potential collision
(res
(condition-case
(the-thunk) ;; this is what we are guarding the execution of
[x () (cons EXCEPTION x)]
)))
(cond
((and (pair? res) (eq? (car res) EXCEPTION))
(if chatty
(print " - the-thunk threw exception >"(cdr res)"<"))
(cons 'exception (cdr res)))
(else
(if chatty
(print " - the-thunk returned result >"res"<"))
(cons 'regular-result res)))))))
(let loop ((guarded-res (guarded-thunk))
(retries-left retries)
(fail-wait retry-delay))
(if chatty (print " =========="))
(let* ((wait-time (+ fail-wait (+ (* fail-wait back-off-factor)
(* random-delay
(/ (random 1024) 1024) ))))
(res-type (car guarded-res))
(res-value (cdr guarded-res)))
(cond
((and (eq? res-type 'regular-result) (accept-result? res-value))
(if chatty (print " + return result that satisfied accept-result? >"res-value"<"))
res-value)
((> retries-left 0)
(if chatty (print " - sleep "wait-time))
(thread-sleep! wait-time)
(if chatty (print " + retry ["retries-left" tries left]"))
(loop (guarded-thunk)
(sub1 retries-left)
wait-time))
((eq? res-type 'regular-result)
(if final-failure-returns-actual
(begin
(if chatty (print " + last try failed- return the result >"res-value"<"))
res-value)
(begin
(if chatty (print " + last try failed- return canned failure value >"failure-value"<"))
failure-value)))
(else ;; no retries left; result was not accepted and res-type can only be 'exception
(if final-failure-returns-actual
(begin
(if chatty (print " + last try failed with exception- re-throw it >"res-value"<"))
(abort res-value)); re-raise the exception. TODO: find a way for call-history to show as though from entry to this function
(begin
(if chatty (print " + last try failed with exception- return canned failure value >"failure-value"<"))
failure-value))))))))
(define (rpc-transport:server-shutdown server-id rpc:listener #!key (from-on-exit #f))
(on-exit (lambda () #t)) ;; turn off on-exit stuff
;;(tcp-close rpc:listener) ;; gotta exit nicely
;;(tasks:bb-server-set-state! server-id "stopped")
|
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
|
(vector-set! vec 5 (current-seconds))
(begin
(print-call-chain (current-error-port))
(debug:print-error 0 *default-log-port* "call to rpc-transport:server-dat-update-last-access with non-vector!!"))))
(define *api-exec-ht* (make-hash-table))
;; let's see if caching the rpc stub curbs thread-profusion on server side
(define (rpc-transport:get-api-exec iface port)
(let* ((lu (hash-table-ref/default *api-exec-ht* (cons iface port) #f)))
(if lu
lu
(let ((res (rpc:procedure 'api-exec iface port)))
(hash-table-set! *api-exec-ht* (cons iface port) res)
res))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; this client-side procedure makes rpc call to server and returns result
;;
(define (rpc-transport:client-api-send-receive run-id serverdat cmd params #!key (numretries 3))
;;(BB> "entered rpc-transport:client-api-send-receive with run-id="run-id " serverdat="serverdat" cmd="cmd" params="params" numretries="numretries)
|
|
>
>
>
|
>
|
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
|
(vector-set! vec 5 (current-seconds))
(begin
(print-call-chain (current-error-port))
(debug:print-error 0 *default-log-port* "call to rpc-transport:server-dat-update-last-access with non-vector!!"))))
(define *api-exec-ht* (make-hash-table))
(define *api-exec-mutex* (make-mutex))
;; let's see if caching the rpc stub curbs thread-profusion on server side
(define (rpc-transport:get-api-exec iface port)
(mutex-lock! *api-exec-mutex*)
(let* ((lu (hash-table-ref/default *api-exec-ht* (cons iface port) #f)))
(if lu
(begin
(mutex-unlock! *api-exec-mutex*)
lu)
(let ((res (rpc:procedure 'api-exec iface port)))
(hash-table-set! *api-exec-ht* (cons iface port) res)
(mutex-unlock! *api-exec-mutex*)
res))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; this client-side procedure makes rpc call to server and returns result
;;
(define (rpc-transport:client-api-send-receive run-id serverdat cmd params #!key (numretries 3))
;;(BB> "entered rpc-transport:client-api-send-receive with run-id="run-id " serverdat="serverdat" cmd="cmd" params="params" numretries="numretries)
|
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
|
;;(vector #t (run-remote cmd params))
(vector 'success (api-exec cmd params))
[x (exn i/o net) (vector 'comms-fail (conc "communications fail ["(->string x)"]") x)]
[x () (vector 'other-fail "other fail ["(->string x)"]" x)]))
chatty: #f
accept-result?: (lambda(x)
(and (vector? x) (vector-ref x 0)))
retries: 4
back-off-factor: 1.5
random-wait: 0.2
retry-delay: 0.1
final-failure-returns-actual: #t))
;;(BB> "HEY res="res)
res
))
|
|
|
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
|
;;(vector #t (run-remote cmd params))
(vector 'success (api-exec cmd params))
[x (exn i/o net) (vector 'comms-fail (conc "communications fail ["(->string x)"]") x)]
[x () (vector 'other-fail "other fail ["(->string x)"]" x)]))
chatty: #f
accept-result?: (lambda(x)
(and (vector? x) (vector-ref x 0)))
retries: 8
back-off-factor: 1.5
random-wait: 0.2
retry-delay: 0.1
final-failure-returns-actual: #t))
;;(BB> "HEY res="res)
res
))
|
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
|
(thread-join! th1)
(thread-terminate! th2)
;;(BB> "alt got res="res)
(debug:print-info 11 *default-log-port* "got res=" res)
(if (vector? res)
(case (vector-ref res 0)
((success) (vector #t (vector-ref res 1)))
((comms-fail)
(debug:print 0 *default-log-port* "WARNING: comms failure for rpc request >>"res"<<")
;;(debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn))
(vector #f (vector-ref res 1)))
(else
(debug:print-error 0 *default-log-port* "error occured at server, info=" (vector-ref res 1))
(debug:print 0 *default-log-port* " client call chain:")
(print-call-chain (current-error-port))
|
>
>
|
|
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
|
(thread-join! th1)
(thread-terminate! th2)
;;(BB> "alt got res="res)
(debug:print-info 11 *default-log-port* "got res=" res)
(if (vector? res)
(case (vector-ref res 0)
((success) (vector #t (vector-ref res 1)))
(
(comms-fail other-fail)
;;(comms-fail)
(debug:print 0 *default-log-port* "WARNING: comms failure for rpc request >>"res"<<")
;;(debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn))
(vector #f (vector-ref res 1)))
(else
(debug:print-error 0 *default-log-port* "error occured at server, info=" (vector-ref res 1))
(debug:print 0 *default-log-port* " client call chain:")
(print-call-chain (current-error-port))
|