︙ | | | ︙ | |
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
(let* ((start-port (portlogger:open-run-close portlogger:find-port))
(server-thread (make-thread (lambda ()
(nmsg-transport:try-start-server dbstruct run-id start-port server-id))
"server thread"))
(tdbdat (tasks:open-db)))
(thread-start! server-thread)
(if (nmsg-transport:ping hostn start-port timeout: 2 expected-key: (current-process-id))
(begin
(tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "dbprep")
(set! *server-info* (list hostn start-port)) ;; probably not needed anymore? currently used by keep-running
(thread-sleep! 3) ;; give some margin for queries to complete before switching from file based access to server based access
(set! *inmemdb* dbstruct)
(tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running")
(thread-start! (make-thread
(lambda ()(nmsg-transport:keep-running server-id))
|
|
>
|
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
(let* ((start-port (portlogger:open-run-close portlogger:find-port))
(server-thread (make-thread (lambda ()
(nmsg-transport:try-start-server dbstruct run-id start-port server-id))
"server thread"))
(tdbdat (tasks:open-db)))
(thread-start! server-thread)
(if (nmsg-transport:ping hostn start-port timeout: 2 expected-key: (current-process-id))
(let ((interface (if (equal? hostn "-")(get-host-name) hostn)))
(tasks:server-set-interface-port (db:delay-if-busy tdbdat) server-id interface start-port)
(tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "dbprep")
(set! *server-info* (list hostn start-port)) ;; probably not needed anymore? currently used by keep-running
(thread-sleep! 3) ;; give some margin for queries to complete before switching from file based access to server based access
(set! *inmemdb* dbstruct)
(tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running")
(thread-start! (make-thread
(lambda ()(nmsg-transport:keep-running server-id))
|
︙ | | | ︙ | |
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
|
(begin
(debug:print-info 0 "Another server took the slot, exiting")
(exit 0))))
(begin
;; since we didn't get the server lock we are going to clean up and bail out
(debug:print-info 2 "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue")
(tasks:server-delete-records-for-this-pid (db:delay-if-busy tdbdat) " http-transport:launch")
)))
(nmsg-transport:run dbstruct hostn run-id server-id)
(set! *didsomething* #t)
(exit))))
;;======================================================================
;; S E R V E R U T I L I T I E S
;;======================================================================
|
|
>
|
|
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
|
(begin
(debug:print-info 0 "Another server took the slot, exiting")
(exit 0))))
(begin
;; since we didn't get the server lock we are going to clean up and bail out
(debug:print-info 2 "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue")
(tasks:server-delete-records-for-this-pid (db:delay-if-busy tdbdat) " http-transport:launch")
))
;; locked in a server id, try to start up
(nmsg-transport:run dbstruct hostn run-id server-id))
(set! *didsomething* #t)
(exit))))
;;======================================================================
;; S E R V E R U T I L I T I E S
;;======================================================================
|
︙ | | | ︙ | |
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
|
;;======================================================================
;; ping the server at host:port
;; return the open socket if successful (return-socket == #t)
;; expect the key expected-key returned in payload
;; send our-key or #f as payload
;;
(define (nmsg-transport:ping hostn port #!key (timeout 3)(return-socket #t)(expected-key #f)(our-key #f))
;; send a random number along with pid and check that we get it back
(let* ((req (nn-socket 'req))
(host (if (or (not hostn)
(equal? hostn "-")) ;; use localhost
(get-host-name)
hostn))
(success #f)
(keepwaiting #t)
(dat (db:obj->string (vector "ping" our-key) transport: 'nmsg))
|
|
|
|
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
|
;;======================================================================
;; ping the server at host:port
;; return the open socket if successful (return-socket == #t)
;; expect the key expected-key returned in payload
;; send our-key or #f as payload
;;
(define (nmsg-transport:ping hostn port #!key (timeout 3)(return-socket #t)(expected-key #f)(our-key #f)(socket #f))
;; send a random number along with pid and check that we get it back
(let* ((req (or socket (nn-socket 'req)))
(host (if (or (not hostn)
(equal? hostn "-")) ;; use localhost
(get-host-name)
hostn))
(success #f)
(keepwaiting #t)
(dat (db:obj->string (vector "ping" our-key) transport: 'nmsg))
|
︙ | | | ︙ | |
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
|
(if (and keepwaiting (< count timeout)) ;; yes, this is very aproximate
(loop (+ count 1))))
(if keepwaiting
(begin
(print "timeout waiting for ping")
(thread-terminate! ping))))
"timeout")))
(nn-connect req (conc "tcp://" host ":" port))
(handle-exceptions
exn
(begin
;; (print-call-chain)
;; (print 0 " message: " ((condition-property-accessor 'exn 'message) exn))
;; (print "exn=" (condition->list exn))
(debug:print-info 1 "ping failed to connect to " host ":" port))
(thread-start! timeout)
(thread-start! ping)
(thread-join! ping)
(if success (thread-terminate! timeout)))
(if return-socket
(if success req #f)
(begin
(nn-close req)
success))))
(define (nmsg-transport:client-connect iface portnum)
(let* ((reqsoc (nmsg-transport:ping iface portnum))
(login-res #f))
(nn-connect reqsoc (conc "tcp://" iface ":" portnum))
(debug:print-info 11 "nmsg-transport:client-connect started. Next is login")
(set! login-res (client:login serverdat nmsg-sockets))
(if (and (not (null? login-res))
(car login-res))
(begin
(debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".")
(set! *nm-port* nmsg-sockets)
nmsg-sockets)
(begin
(debug:print-info 2 "Failed to login or connect to " conurl)
(set! *runremote* #f)
#f))))
;; run nmsg-transport:keep-running in a parallel thread to monitor that the db is being
;; used and to shutdown after sometime if it is not.
;;
(define (nmsg-transport:keep-running server-id)
;; if none running or if > 20 seconds since
;; server last used then start shutdown
;; This thread waits for the server to come alive
|
|
|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
|
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
|
(if (and keepwaiting (< count timeout)) ;; yes, this is very aproximate
(loop (+ count 1))))
(if keepwaiting
(begin
(print "timeout waiting for ping")
(thread-terminate! ping))))
"timeout")))
(if (not socket)(nn-connect req (conc "tcp://" host ":" port)))
(handle-exceptions
exn
(begin
;; (print-call-chain)
;; (print 0 " message: " ((condition-property-accessor 'exn 'message) exn))
;; (print "exn=" (condition->list exn))
(debug:print-info 1 "ping failed to connect to " host ":" port))
(thread-start! timeout)
(thread-start! ping)
(thread-join! ping)
(if success (thread-terminate! timeout)))
(if return-socket
(if success req #f)
(begin
(nn-close req) ;; should it be closed if we were handed a socket?
success))))
;; run nmsg-transport:keep-running in a parallel thread to monitor that the db is being
;; used and to shutdown after sometime if it is not.
;;
(define (nmsg-transport:keep-running server-id)
;; if none running or if > 20 seconds since
;; server last used then start shutdown
;; This thread waits for the server to come alive
|
︙ | | | ︙ | |
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
|
(debug:print-info 0 "Starting to shutdown the server.")
(set! *time-to-exit* #t)
(tasks:server-delete-record (db:delay-if-busy tdbdat) server-id " http-transport:keep-running")
(debug:print-info 0 "Server shutdown complete. Exiting")
;; (exit)
))))))
(define (nmsg-transport:client-signal-handler signum)
(handle-exceptions
exn
(debug:print " ... exiting ...")
(let ((th1 (make-thread (lambda ()
(if (not *received-response*)
(receive-message* *runremote*))) ;; flush out last call if applicable
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
|
(debug:print-info 0 "Starting to shutdown the server.")
(set! *time-to-exit* #t)
(tasks:server-delete-record (db:delay-if-busy tdbdat) server-id " http-transport:keep-running")
(debug:print-info 0 "Server shutdown complete. Exiting")
;; (exit)
))))))
;;======================================================================
;; C L I E N T S
;;======================================================================
(define (nmsg-transport:client-connect iface portnum)
(let* ((reqsoc (nmsg-transport:ping iface portnum return-socket: #t)))
(vector iface portnum #f #f #f (current-seconds) reqsoc)))
(define (nmsg-transport:client-api-send-receive run-id connection-info cmd param)
(let ((packet (vector cmd param))
(reqsoc (http-transport:server-dat-get-socket connection-info)))
(nn-send reqsoc (db:obj->string packet transport: 'nmsg))
(db:string->obj (nn-recv reqsoc) transport: 'nmsg)))
;;======================================================================
;; J U N K
;;======================================================================
;; DO NOT USE
;;
(define (nmsg-transport:client-signal-handler signum)
(handle-exceptions
exn
(debug:print " ... exiting ...")
(let ((th1 (make-thread (lambda ()
(if (not *received-response*)
(receive-message* *runremote*))) ;; flush out last call if applicable
|
︙ | | | ︙ | |