︙ | | | ︙ | |
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
;; Call this to start the actual server
;;
(define *db:process-queue-mutex* (make-mutex))
(define (http-transport:run hostn run-id server-id)
(debug:print 2 #f "Attempting to start the server ...")
(let* ((db #f) ;; (open-db)) ;; we don't want the server to be opening and closing the db unnecesarily
(hostname (get-host-name))
(ipaddrstr (let ((ipstr (if (string=? "-" hostn)
;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
(server:get-best-guess-address hostname)
#f)))
(if ipstr ipstr hostn))) ;; hostname)))
|
|
|
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
;; Call this to start the actual server
;;
(define *db:process-queue-mutex* (make-mutex))
(define (http-transport:run hostn run-id server-id)
(debug:print 2 *default-log-port* "Attempting to start the server ...")
(let* ((db #f) ;; (open-db)) ;; we don't want the server to be opening and closing the db unnecesarily
(hostname (get-host-name))
(ipaddrstr (let ((ipstr (if (string=? "-" hostn)
;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
(server:get-best-guess-address hostname)
#f)))
(if ipstr ipstr hostn))) ;; hostname)))
|
︙ | | | ︙ | |
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
|
(debug:print-info 0 #f "http-transport:try-start-server run-id=" run-id " ipaddrsstr=" ipaddrstr " portnum=" portnum " server-id=" server-id " config-hostname=" config-hostname)
(handle-exceptions
exn
(begin
(print-error-message exn)
(if (< portnum 64000)
(begin
(debug:print 0 #f "WARNING: attempt to start server failed. Trying again ...")
(debug:print 0 #f " message: " ((condition-property-accessor 'exn 'message) exn))
(debug:print 0 #f "exn=" (condition->list exn))
(portlogger:open-run-close portlogger:set-failed portnum)
(debug:print 0 #f "WARNING: failed to start on portnum: " portnum ", trying next port")
(thread-sleep! 0.1)
;; get_next_port goes here
(http-transport:try-start-server run-id
ipaddrstr
(portlogger:open-run-close portlogger:find-port)
server-id))
(begin
(tasks:server-force-clean-run-record (db:delay-if-busy tdbdat) run-id ipaddrstr portnum " http-transport:try-start-server")
(print "ERROR: Tried and tried but could not start the server"))))
;; any error in following steps will result in a retry
(set! *server-info* (list ipaddrstr portnum))
(tasks:server-set-interface-port
(db:delay-if-busy tdbdat)
server-id
ipaddrstr portnum)
(debug:print 0 #f "INFO: Trying to start server on " ipaddrstr ":" portnum)
;; This starts the spiffy server
;; NEED WAY TO SET IP TO #f TO BIND ALL
;; (start-server bind-address: ipaddrstr port: portnum)
(if config-hostname ;; this is a hint to bind directly
(start-server port: portnum bind-address: (if (equal? config-hostname "-")
ipaddrstr
config-hostname))
(start-server port: portnum))
;; (portlogger:open-run-close portlogger:set-port portnum "released")
(tasks:server-force-clean-run-record (db:delay-if-busy tdbdat) run-id ipaddrstr portnum " http-transport:try-start-server")
(debug:print 1 #f "INFO: server has been stopped"))))
;;======================================================================
;; S E R V E R U T I L I T I E S
;;======================================================================
;;======================================================================
;; C L I E N T S
|
|
|
|
|
|
|
|
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
|
(debug:print-info 0 #f "http-transport:try-start-server run-id=" run-id " ipaddrsstr=" ipaddrstr " portnum=" portnum " server-id=" server-id " config-hostname=" config-hostname)
(handle-exceptions
exn
(begin
(print-error-message exn)
(if (< portnum 64000)
(begin
(debug:print 0 *default-log-port* "WARNING: attempt to start server failed. Trying again ...")
(debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn))
(debug:print 0 *default-log-port* "exn=" (condition->list exn))
(portlogger:open-run-close portlogger:set-failed portnum)
(debug:print 0 *default-log-port* "WARNING: failed to start on portnum: " portnum ", trying next port")
(thread-sleep! 0.1)
;; get_next_port goes here
(http-transport:try-start-server run-id
ipaddrstr
(portlogger:open-run-close portlogger:find-port)
server-id))
(begin
(tasks:server-force-clean-run-record (db:delay-if-busy tdbdat) run-id ipaddrstr portnum " http-transport:try-start-server")
(print "ERROR: Tried and tried but could not start the server"))))
;; any error in following steps will result in a retry
(set! *server-info* (list ipaddrstr portnum))
(tasks:server-set-interface-port
(db:delay-if-busy tdbdat)
server-id
ipaddrstr portnum)
(debug:print 0 *default-log-port* "INFO: Trying to start server on " ipaddrstr ":" portnum)
;; This starts the spiffy server
;; NEED WAY TO SET IP TO #f TO BIND ALL
;; (start-server bind-address: ipaddrstr port: portnum)
(if config-hostname ;; this is a hint to bind directly
(start-server port: portnum bind-address: (if (equal? config-hostname "-")
ipaddrstr
config-hostname))
(start-server port: portnum))
;; (portlogger:open-run-close portlogger:set-port portnum "released")
(tasks:server-force-clean-run-record (db:delay-if-busy tdbdat) run-id ipaddrstr portnum " http-transport:try-start-server")
(debug:print 1 *default-log-port* "INFO: server has been stopped"))))
;;======================================================================
;; S E R V E R U T I L I T I E S
;;======================================================================
;;======================================================================
;; C L I E N T S
|
︙ | | | ︙ | |
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
|
(set! *http-requests-in-progress* (- *http-requests-in-progress* 1))
(let loop ((etime (+ (current-seconds) 5))) ;; give up in five seconds
(if (> *http-requests-in-progress* 0)
(if (> etime (current-seconds))
(begin
(thread-sleep! 0.05)
(loop etime))
(debug:print 0 #f "ERROR: requests still in progress after 5 seconds of waiting. I'm going to pass on cleaning up http connections"))
(close-all-connections!)))
(set! *http-connections-next-cleanup* (+ (current-seconds) 10))
(mutex-unlock! *http-mutex*))
(define (http-transport:inc-requests-and-prep-to-close-all-connections)
(mutex-lock! *http-mutex*)
(set! *http-requests-in-progress* (+ 1 *http-requests-in-progress*)))
;; Send "cmd" with json payload "params" to serverdat and receive result
;;
(define (http-transport:client-api-send-receive run-id serverdat cmd params #!key (numretries 3))
(let* ((fullurl (if (vector? serverdat)
(http-transport:server-dat-get-api-req serverdat)
(begin
(debug:print 0 #f "FATAL ERROR: http-transport:client-api-send-receive called with no server info")
(exit 1))))
(res #f)
(success #t)
(sparams (db:obj->string params transport: 'http)))
;; (condition-case
;; handle-exceptions
;; exn
;; (if (> numretries 0)
;; (begin
;; (mutex-unlock! *http-mutex*)
;; (thread-sleep! 1)
;; (handle-exceptions
;; exn
;; (debug:print 0 #f "WARNING: closing connections failed. Server at " fullurl " almost certainly dead")
;; (close-all-connections!))
;; (debug:print 0 #f "WARNING: Failed to communicate with server, trying again, numretries left: " numretries)
;; (http-transport:client-api-send-receive run-id serverdat cmd sparams numretries: (- numretries 1)))
;; (begin
;; (mutex-unlock! *http-mutex*)
;; (tasks:kill-server-run-id run-id)
;; #f))
;; (begin
(debug:print-info 11 #f "fullurl=" fullurl ", cmd=" cmd ", params=" params ", run-id=" run-id "\n")
|
|
|
|
|
|
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
|
(set! *http-requests-in-progress* (- *http-requests-in-progress* 1))
(let loop ((etime (+ (current-seconds) 5))) ;; give up in five seconds
(if (> *http-requests-in-progress* 0)
(if (> etime (current-seconds))
(begin
(thread-sleep! 0.05)
(loop etime))
(debug:print 0 *default-log-port* "ERROR: requests still in progress after 5 seconds of waiting. I'm going to pass on cleaning up http connections"))
(close-all-connections!)))
(set! *http-connections-next-cleanup* (+ (current-seconds) 10))
(mutex-unlock! *http-mutex*))
(define (http-transport:inc-requests-and-prep-to-close-all-connections)
(mutex-lock! *http-mutex*)
(set! *http-requests-in-progress* (+ 1 *http-requests-in-progress*)))
;; Send "cmd" with json payload "params" to serverdat and receive result
;;
(define (http-transport:client-api-send-receive run-id serverdat cmd params #!key (numretries 3))
(let* ((fullurl (if (vector? serverdat)
(http-transport:server-dat-get-api-req serverdat)
(begin
(debug:print 0 *default-log-port* "FATAL ERROR: http-transport:client-api-send-receive called with no server info")
(exit 1))))
(res #f)
(success #t)
(sparams (db:obj->string params transport: 'http)))
;; (condition-case
;; handle-exceptions
;; exn
;; (if (> numretries 0)
;; (begin
;; (mutex-unlock! *http-mutex*)
;; (thread-sleep! 1)
;; (handle-exceptions
;; exn
;; (debug:print 0 *default-log-port* "WARNING: closing connections failed. Server at " fullurl " almost certainly dead")
;; (close-all-connections!))
;; (debug:print 0 *default-log-port* "WARNING: Failed to communicate with server, trying again, numretries left: " numretries)
;; (http-transport:client-api-send-receive run-id serverdat cmd sparams numretries: (- numretries 1)))
;; (begin
;; (mutex-unlock! *http-mutex*)
;; (tasks:kill-server-run-id run-id)
;; #f))
;; (begin
(debug:print-info 11 #f "fullurl=" fullurl ", cmd=" cmd ", params=" params ", run-id=" run-id "\n")
|
︙ | | | ︙ | |
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
|
(set! res (vector
success
(db:string->obj
(handle-exceptions
exn
(begin
(set! success #f)
(debug:print 0 #f "WARNING: failure in with-input-from-request to " fullurl ".")
(debug:print 0 #f " message: " ((condition-property-accessor 'exn 'message) exn))
(hash-table-delete! *runremote* run-id)
;; Killing associated server to allow clean retry.")
;; (tasks:kill-server-run-id run-id) ;; better to kill the server in the logic that called this routine?
(mutex-unlock! *http-mutex*)
;;; (signal (make-composite-condition
;;; (make-property-condition 'commfail 'message "failed to connect to server")))
;;; "communications failed"
|
|
|
|
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
|
(set! res (vector
success
(db:string->obj
(handle-exceptions
exn
(begin
(set! success #f)
(debug:print 0 *default-log-port* "WARNING: failure in with-input-from-request to " fullurl ".")
(debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn))
(hash-table-delete! *runremote* run-id)
;; Killing associated server to allow clean retry.")
;; (tasks:kill-server-run-id run-id) ;; better to kill the server in the logic that called this routine?
(mutex-unlock! *http-mutex*)
;;; (signal (make-composite-condition
;;; (make-property-condition 'commfail 'message "failed to connect to server")))
;;; "communications failed"
|
︙ | | | ︙ | |
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
|
(thread-join! th1)
(thread-terminate! th2)
(debug:print-info 11 #f "got res=" res)
(if (vector? res)
(if (vector-ref res 0)
res
(begin ;; note: this code also called in nmsg-transport - consider consolidating it
(debug:print 0 #f "ERROR: error occured at server, info=" (vector-ref res 2))
(debug:print 0 #f " client call chain:")
(print-call-chain (current-error-port))
(debug:print 0 #f " server call chain:")
(pp (vector-ref res 1) (current-error-port))
(signal (vector-ref result 0))))
(signal (make-composite-condition
(make-property-condition
'timeout
'message "nmsg-transport:client-api-send-receive-raw timed out talking to server")))))))
|
|
|
|
|
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
|
(thread-join! th1)
(thread-terminate! th2)
(debug:print-info 11 #f "got res=" res)
(if (vector? res)
(if (vector-ref res 0)
res
(begin ;; note: this code also called in nmsg-transport - consider consolidating it
(debug:print 0 *default-log-port* "ERROR: error occured at server, info=" (vector-ref res 2))
(debug:print 0 *default-log-port* " client call chain:")
(print-call-chain (current-error-port))
(debug:print 0 *default-log-port* " server call chain:")
(pp (vector-ref res 1) (current-error-port))
(signal (vector-ref result 0))))
(signal (make-composite-condition
(make-property-condition
'timeout
'message "nmsg-transport:client-api-send-receive-raw timed out talking to server")))))))
|
︙ | | | ︙ | |
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
|
#f))
(define (http-transport:server-dat-update-last-access vec)
(if (vector? vec)
(vector-set! vec 5 (current-seconds))
(begin
(print-call-chain (current-error-port))
(debug:print 0 #f "ERROR: call to http-transport:server-dat-update-last-access with non-vector!!"))))
;;
;; connect
;;
(define (http-transport:client-connect iface port)
(let* ((api-url (conc "http://" iface ":" port "/api"))
(api-uri (uri-reference (conc "http://" iface ":" port "/api")))
|
|
|
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
|
#f))
(define (http-transport:server-dat-update-last-access vec)
(if (vector? vec)
(vector-set! vec 5 (current-seconds))
(begin
(print-call-chain (current-error-port))
(debug:print 0 *default-log-port* "ERROR: call to http-transport:server-dat-update-last-access with non-vector!!"))))
;;
;; connect
;;
(define (http-transport:client-connect iface port)
(let* ((api-url (conc "http://" iface ":" port "/api"))
(api-uri (uri-reference (conc "http://" iface ":" port "/api")))
|
︙ | | | ︙ | |
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
|
(> (- (current-seconds) start-time) 2))
sdat
(begin
(debug:print-info 0 #f "Still waiting, last-sdat=" last-sdat)
(sleep 4)
(if (> (- (current-seconds) start-time) 120) ;; been waiting for two minutes
(begin
(debug:print 0 #f "ERROR: transport appears to have died, exiting server " server-id " for run " run-id)
(tasks:server-delete-record (db:delay-if-busy tdbdat) server-id "failed to start, never received server alive signature")
(exit))
(loop start-time
(equal? sdat last-sdat)
sdat)))))))
(iface (car server-info))
(port (cadr server-info))
|
|
|
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
|
(> (- (current-seconds) start-time) 2))
sdat
(begin
(debug:print-info 0 #f "Still waiting, last-sdat=" last-sdat)
(sleep 4)
(if (> (- (current-seconds) start-time) 120) ;; been waiting for two minutes
(begin
(debug:print 0 *default-log-port* "ERROR: transport appears to have died, exiting server " server-id " for run " run-id)
(tasks:server-delete-record (db:delay-if-busy tdbdat) server-id "failed to start, never received server alive signature")
(exit))
(loop start-time
(equal? sdat last-sdat)
sdat)))))))
(iface (car server-info))
(port (cadr server-info))
|
︙ | | | ︙ | |
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
|
((sync-failed)(cond
((> bad-sync-count 10) ;; time to give up
(http-transport:server-shutdown server-id port))
(else ;; (> bad-sync-count 0) ;; we've had a fail or two, delay and loop
(thread-sleep! 5)
(loop count server-state (+ bad-sync-count 1)))))
((exn)
(debug:print 0 #f "ERROR: error from sync code other than 'sync-failed. Attempting to gracefully shutdown the server")
(tasks:server-delete-record (db:delay-if-busy tdbdat) server-id " http-transport:keep-running crashed")
(exit)))
(set! sync-time (- (current-milliseconds) start-time))
(set! rem-time (quotient (- 4000 sync-time) 1000))
(debug:print 4 #f "SYNC: time= " sync-time ", rem-time=" rem-time)
(if (and (<= rem-time 4)
(> rem-time 0))
(thread-sleep! rem-time)
(thread-sleep! 4))) ;; fallback for if the math is changed ...
;;
|
|
|
|
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
|
((sync-failed)(cond
((> bad-sync-count 10) ;; time to give up
(http-transport:server-shutdown server-id port))
(else ;; (> bad-sync-count 0) ;; we've had a fail or two, delay and loop
(thread-sleep! 5)
(loop count server-state (+ bad-sync-count 1)))))
((exn)
(debug:print 0 *default-log-port* "ERROR: error from sync code other than 'sync-failed. Attempting to gracefully shutdown the server")
(tasks:server-delete-record (db:delay-if-busy tdbdat) server-id " http-transport:keep-running crashed")
(exit)))
(set! sync-time (- (current-milliseconds) start-time))
(set! rem-time (quotient (- 4000 sync-time) 1000))
(debug:print 4 *default-log-port* "SYNC: time= " sync-time ", rem-time=" rem-time)
(if (and (<= rem-time 4)
(> rem-time 0))
(thread-sleep! rem-time)
(thread-sleep! 4))) ;; fallback for if the math is changed ...
;;
|
︙ | | | ︙ | |
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
|
(set! port (cadr sdat))))
;; Transfer *last-db-access* to last-access to use in checking that we are still alive
(mutex-lock! *heartbeat-mutex*)
(set! last-access *last-db-access*)
(mutex-unlock! *heartbeat-mutex*)
;; (debug:print 11 #f "last-access=" last-access ", server-timeout=" server-timeout)
;;
;; no_traffic, no running tests, if server 0, no running servers
;;
;; (let ((wait-on-running (configf:lookup *configdat* "server" b"wait-on-running"))) ;; wait on running tasks (if not true then exit on time out)
;;
(let* ((hrs-since-start (/ (- (current-seconds) server-start-time) 3600))
(adjusted-timeout (if (> hrs-since-start 1)
|
|
|
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
|
(set! port (cadr sdat))))
;; Transfer *last-db-access* to last-access to use in checking that we are still alive
(mutex-lock! *heartbeat-mutex*)
(set! last-access *last-db-access*)
(mutex-unlock! *heartbeat-mutex*)
;; (debug:print 11 *default-log-port* "last-access=" last-access ", server-timeout=" server-timeout)
;;
;; no_traffic, no running tests, if server 0, no running servers
;;
;; (let ((wait-on-running (configf:lookup *configdat* "server" b"wait-on-running"))) ;; wait on running tasks (if not true then exit on time out)
;;
(let* ((hrs-since-start (/ (- (current-seconds) server-start-time) 3600))
(adjusted-timeout (if (> hrs-since-start 1)
|
︙ | | | ︙ | |
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
|
(daemon:ize)
(if *alt-log-file* ;; we should re-connect to this port, I think daemon:ize disrupts it
(begin
(current-error-port *alt-log-file*)
(current-output-port *alt-log-file*)))))
(if (server:check-if-running run-id)
(begin
(debug:print 0 #f "INFO: Server for run-id " run-id " already running")
(exit 0)))
(let loop ((server-id (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id))
(remtries 4))
(if (not server-id)
(if (> remtries 0)
(begin
(thread-sleep! 2)
|
|
|
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
|
(daemon:ize)
(if *alt-log-file* ;; we should re-connect to this port, I think daemon:ize disrupts it
(begin
(current-error-port *alt-log-file*)
(current-output-port *alt-log-file*)))))
(if (server:check-if-running run-id)
(begin
(debug:print 0 *default-log-port* "INFO: Server for run-id " run-id " already running")
(exit 0)))
(let loop ((server-id (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id))
(remtries 4))
(if (not server-id)
(if (> remtries 0)
(begin
(thread-sleep! 2)
|
︙ | | | ︙ | |
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
|
(print "LOGIN_FAILED")
(exit 1)))))
(define (http-transport:server-signal-handler signum)
(signal-mask! signum)
(handle-exceptions
exn
(debug:print 0 #f " ... exiting ...")
(let ((th1 (make-thread (lambda ()
(thread-sleep! 1))
"eat response"))
(th2 (make-thread (lambda ()
(debug:print 0 #f "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.")
(thread-sleep! 3) ;; give the flush three seconds to do it's stuff
(debug:print 0 #f " Done.")
(exit 4))
"exit on ^C timer")))
(thread-start! th2)
(thread-start! th1)
(thread-join! th2))))
;;======================================================================
|
|
|
|
|
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
|
(print "LOGIN_FAILED")
(exit 1)))))
(define (http-transport:server-signal-handler signum)
(signal-mask! signum)
(handle-exceptions
exn
(debug:print 0 *default-log-port* " ... exiting ...")
(let ((th1 (make-thread (lambda ()
(thread-sleep! 1))
"eat response"))
(th2 (make-thread (lambda ()
(debug:print 0 *default-log-port* "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.")
(thread-sleep! 3) ;; give the flush three seconds to do it's stuff
(debug:print 0 *default-log-port* " Done.")
(exit 4))
"exit on ^C timer")))
(thread-start! th2)
(thread-start! th1)
(thread-join! th2))))
;;======================================================================
|
︙ | | | ︙ | |