59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
|
(define *server-loop-heart-beat* (current-seconds))
(define *heartbeat-mutex* (make-mutex))
;;======================================================================
;; S E R V E R
;;======================================================================
(define (nmsg-transport:run dbstruct hostn run-id server-id #!key (retrynum 1000))
(debug:print 2 "Attempting to start the server ...")
(let* ((start-port (portlogger:open-run-close portlogger:find-port))
(server-thread (make-thread (lambda ()
(nmsg-transport:try-start-server dbstruct run-id start-port server-id))
"server thread"))
(tdbdat (tasks:open-db)))
(thread-start! server-thread)
(thread-sleep! 0.1)
(if (nmsg-transport:ping hostn start-port timeout: 2 expected-key: (current-process-id))
(let ((interface (if (equal? hostn "-")(get-host-name) hostn)))
(tasks:server-set-interface-port (db:delay-if-busy tdbdat) server-id interface start-port)
(tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "dbprep")
(set! *server-info* (list hostn start-port)) ;; probably not needed anymore? currently used by keep-running
(thread-sleep! 3) ;; give some margin for queries to complete before switching from file based access to server based access
;; (set! *inmemdb* dbstruct)
(tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running")
(thread-start! (make-thread
(lambda ()(nmsg-transport:keep-running server-id run-id))
"keep running"))
(thread-join! server-thread))
(if (> retrynum 0)
(begin
(debug:print 0 "WARNING: Failed to connect to server (self) on host " hostn ":" start-port ", trying again.")
(tasks:server-delete-record (db:delay-if-busy tdbdat) server-id "failed to start, never received server alive signature")
(portlogger:open-run-close portlogger:set-failed start-port)
(nmsg-transport:run dbstruct hostn run-id server-id))
(begin
(debug:print 0 "ERROR: could not find an open port to start server on. Giving up")
(exit 1))))))
(define (nmsg-transport:try-start-server dbstruct run-id portnum server-id)
(let ((repsoc (nn-socket 'rep)))
(nn-bind repsoc (conc "tcp://*:" portnum))
(let loop ((msg-in (nn-recv repsoc)))
(let* ((dat (db:string->obj msg-in transport: 'nmsg)))
(debug:print 0 "server, received: " dat)
(let ((result (api:execute-requests dbstruct dat)))
(debug:print 0 "server, sending: " result)
(nn-send repsoc (db:obj->string result transport: 'nmsg)))
(loop (nn-recv repsoc))))))
;; all routes though here end in exit ...
;;
(define (nmsg-transport:launch run-id)
(let* ((tdbdat (tasks:open-db))
(dbstruct (db:setup run-id))
(hostn (or (args:get-arg "-server") "-")))
(set! *run-id* run-id)
(set! *inmemdb* dbstruct)
;; with nbfake daemonize isn't really needed
;;
;; (if (args:get-arg "-daemonize")
;; (begin
;; (daemon:ize)
;; (if *alt-log-file* ;; we should re-connect to this port, I think daemon:ize disrupts it
;; (begin
;; (current-error-port *alt-log-file*)
;; (current-output-port *alt-log-file*)))))
(if (server:check-if-running run-id)
(begin
(debug:print-info 0 "Server for run-id " run-id " already running")
(exit 0)))
(let loop ((server-id (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id))
(remtries 4))
(if (not server-id)
(if (> remtries 0)
(begin
(thread-sleep! 2)
(if (not (server:check-if-running run-id))
(loop (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id)
(- remtries 1))
(begin
(debug:print-info 0 "Another server took the slot, exiting")
(exit 0))))
(begin
;; since we didn't get the server lock we are going to clean up and bail out
(debug:print-info 2 "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue")
(tasks:server-delete-records-for-this-pid (db:delay-if-busy tdbdat) " http-transport:launch")
))
;; locked in a server id, try to start up
(nmsg-transport:run dbstruct hostn run-id server-id))
(set! *didsomething* #t)
(exit))))
;;======================================================================
;; S E R V E R U T I L I T I E S
;;======================================================================
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
|
(define *server-loop-heart-beat* (current-seconds))
(define *heartbeat-mutex* (make-mutex))
;;======================================================================
;; S E R V E R
;;======================================================================
(define (nmsg-transport:run dbstruct area-dat hostn run-id server-id #!key (retrynum 1000))
(debug:print 2 "Attempting to start the server ...")
(let* ((start-port (portlogger:open-run-close portlogger:find-port area-dat))
(server-thread (make-thread (lambda ()
(nmsg-transport:try-start-server dbstruct run-id start-port server-id))
"server thread"))
(tdbdat (tasks:open-db area-dat)))
(thread-start! server-thread)
(thread-sleep! 0.1)
(if (nmsg-transport:ping hostn start-port timeout: 2 expected-key: (current-process-id))
(let ((interface (if (equal? hostn "-")(get-host-name) hostn)))
(tasks:server-set-interface-port (db:delay-if-busy tdbdat area-dat) server-id interface start-port)
(tasks:server-set-state! (db:delay-if-busy tdbdat area-dat) server-id "dbprep")
(set! *server-info* (list hostn start-port)) ;; probably not needed anymore? currently used by keep-running
(thread-sleep! 3) ;; give some margin for queries to complete before switching from file based access to server based access
;; (set! *inmemdb* dbstruct)
(tasks:server-set-state! (db:delay-if-busy tdbdat area-dat) server-id "running")
(thread-start! (make-thread
(lambda ()(nmsg-transport:keep-running server-id run-id area-dat))
"keep running"))
(thread-join! server-thread))
(if (> retrynum 0)
(begin
(debug:print 0 "WARNING: Failed to connect to server (self) on host " hostn ":" start-port ", trying again.")
(tasks:server-delete-record (db:delay-if-busy tdbdat area-dat) server-id "failed to start, never received server alive signature")
(portlogger:open-run-close portlogger:set-failed area-dat start-port)
(nmsg-transport:run dbstruct area-dat hostn run-id server-id))
(begin
(debug:print 0 "ERROR: could not find an open port to start server on. Giving up")
(exit 1))))))
(define (nmsg-transport:try-start-server dbstruct run-id portnum server-id)
(let ((repsoc (nn-socket 'rep)))
(nn-bind repsoc (conc "tcp://*:" portnum))
(let loop ((msg-in (nn-recv repsoc)))
(let* ((dat (db:string->obj msg-in transport: 'nmsg)))
(debug:print 0 "server, received: " dat)
(let ((result (api:execute-requests dbstruct dat)))
(debug:print 0 "server, sending: " result)
(nn-send repsoc (db:obj->string result transport: 'nmsg)))
(loop (nn-recv repsoc))))))
;; all routes though here end in exit ...
;;
(define (nmsg-transport:launch run-id area-dat)
(let* ((tdbdat (tasks:open-db area-dat))
(dbstruct (db:setup run-id))
(hostn (or (args:get-arg "-server") "-")))
(set! *run-id* run-id)
(set! *inmemdb* dbstruct)
;; with nbfake daemonize isn't really needed
;;
;; (if (args:get-arg "-daemonize")
;; (begin
;; (daemon:ize)
;; (if *alt-log-file* ;; we should re-connect to this port, I think daemon:ize disrupts it
;; (begin
;; (current-error-port *alt-log-file*)
;; (current-output-port *alt-log-file*)))))
(if (server:check-if-running run-id area-dat)
(begin
(debug:print-info 0 "Server for run-id " run-id " already running")
(exit 0)))
(let loop ((server-id (tasks:server-lock-slot (db:delay-if-busy tdbdat area-dat) run-id))
(remtries 4))
(if (not server-id)
(if (> remtries 0)
(begin
(thread-sleep! 2)
(if (not (server:check-if-running run-id area-dat))
(loop (tasks:server-lock-slot (db:delay-if-busy tdbdat area-dat) run-id)
(- remtries 1))
(begin
(debug:print-info 0 "Another server took the slot, exiting")
(exit 0))))
(begin
;; since we didn't get the server lock we are going to clean up and bail out
(debug:print-info 2 "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue")
(tasks:server-delete-records-for-this-pid (db:delay-if-busy tdbdat area-dat) " http-transport:launch")
))
;; locked in a server id, try to start up
(nmsg-transport:run dbstruct area-dat hostn run-id server-id))
(set! *didsomething* #t)
(exit))))
;;======================================================================
;; S E R V E R U T I L I T I E S
;;======================================================================
|
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
|
(signal (vector-ref result 0))))
(signal (make-composite-condition
(make-property-condition 'timeout 'message "nmsg-transport:client-api-send-receive-raw timed out talking to server"))))))
;; run nmsg-transport:keep-running in a parallel thread to monitor that the db is being
;; used and to shutdown after sometime if it is not.
;;
(define (nmsg-transport:keep-running server-id run-id)
;; if none running or if > 20 seconds since
;; server last used then start shutdown
;; This thread waits for the server to come alive
(let* ((server-info (let loop ()
(let ((sdat #f))
(mutex-lock! *heartbeat-mutex*)
(set! sdat *server-info*)
(mutex-unlock! *heartbeat-mutex*)
(if sdat
(begin
(debug:print-info 0 "keep-running got sdat=" sdat)
sdat)
(begin
(thread-sleep! 0.5)
(loop))))))
(iface (car server-info))
(port (cadr server-info))
(last-access 0)
(tdbdat (tasks:open-db))
(server-timeout (let ((tmo (configf:lookup *configdat* "server" "timeout")))
(if (and (string? tmo)
(string->number tmo))
(* 60 60 (string->number tmo))
;; (* 3 24 60 60) ;; default to three days
(* 60 1) ;; default to one minute
;; (* 60 60 25) ;; default to 25 hours
))))
|
|
|
|
|
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
|
(signal (vector-ref result 0))))
(signal (make-composite-condition
(make-property-condition 'timeout 'message "nmsg-transport:client-api-send-receive-raw timed out talking to server"))))))
;; run nmsg-transport:keep-running in a parallel thread to monitor that the db is being
;; used and to shutdown after sometime if it is not.
;;
(define (nmsg-transport:keep-running server-id run-id area-dat)
;; if none running or if > 20 seconds since
;; server last used then start shutdown
;; This thread waits for the server to come alive
(let* ((server-info (let loop ()
(let ((sdat #f))
(mutex-lock! *heartbeat-mutex*)
(set! sdat *server-info*)
(mutex-unlock! *heartbeat-mutex*)
(if sdat
(begin
(debug:print-info 0 "keep-running got sdat=" sdat)
sdat)
(begin
(thread-sleep! 0.5)
(loop))))))
(iface (car server-info))
(port (cadr server-info))
(last-access 0)
(tdbdat (tasks:open-db area-dat))
(server-timeout (let ((tmo (configf:lookup (megatest:area-configdat area-dat) "server" "timeout")))
(if (and (string? tmo)
(string->number tmo))
(* 60 60 (string->number tmo))
;; (* 3 24 60 60) ;; default to three days
(* 60 1) ;; default to one minute
;; (* 60 60 25) ;; default to 25 hours
))))
|
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
|
(begin
(debug:print-info 0 "Server continuing, seconds since last db access: " (- (current-seconds) last-access))
(loop 0))
(begin
(debug:print-info 0 "Starting to shutdown the server.")
(set! *time-to-exit* #t)
(db:sync-touched *inmemdb* run-id force-sync: #t)
(tasks:server-delete-record (db:delay-if-busy tdbdat) server-id " http-transport:keep-running")
(debug:print-info 0 "Server shutdown complete. Exiting")
(exit)
))))))
;;======================================================================
;; C L I E N T S
;;======================================================================
|
|
|
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
|
(begin
(debug:print-info 0 "Server continuing, seconds since last db access: " (- (current-seconds) last-access))
(loop 0))
(begin
(debug:print-info 0 "Starting to shutdown the server.")
(set! *time-to-exit* #t)
(db:sync-touched *inmemdb* run-id force-sync: #t)
(tasks:server-delete-record (db:delay-if-busy tdbdat area-dat) server-id " http-transport:keep-running")
(debug:print-info 0 "Server shutdown complete. Exiting")
(exit)
))))))
;;======================================================================
;; C L I E N T S
;;======================================================================
|
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
|
;;
(define (nmsg-transport:client-signal-handler signum)
(handle-exceptions
exn
(debug:print " ... exiting ...")
(let ((th1 (make-thread (lambda ()
(if (not *received-response*)
(receive-message* *runremote*))) ;; flush out last call if applicable
"eat response"))
(th2 (make-thread (lambda ()
(debug:print 0 "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.")
(thread-sleep! 3) ;; give the flush three seconds to do it's stuff
(debug:print 0 " Done.")
(exit 4))
"exit on ^C timer")))
(thread-start! th2)
(thread-start! th1)
(thread-join! th2))))
|
|
|
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
|
;;
(define (nmsg-transport:client-signal-handler signum)
(handle-exceptions
exn
(debug:print " ... exiting ...")
(let ((th1 (make-thread (lambda ()
(if (not *received-response*)
(receive-message* (common:get-remote remote #f)))) ;; flush out last call if applicable
"eat response"))
(th2 (make-thread (lambda ()
(debug:print 0 "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.")
(thread-sleep! 3) ;; give the flush three seconds to do it's stuff
(debug:print 0 " Done.")
(exit 4))
"exit on ^C timer")))
(thread-start! th2)
(thread-start! th1)
(thread-join! th2))))
|