︙ | | | ︙ | |
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
directory-utils
format
;; http-client
;; intarweb
matchable
md5
message-digest
nng ;; nanomsg
(prefix base64 base64:)
(prefix sqlite3 sqlite3:)
regex
s11n
;; spiffy
;; spiffy-directory-listing
;; spiffy-request-vars
|
|
|
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
directory-utils
format
;; http-client
;; intarweb
matchable
md5
message-digest
;; nng ;; nanomsg
(prefix base64 base64:)
(prefix sqlite3 sqlite3:)
regex
s11n
;; spiffy
;; spiffy-directory-listing
;; spiffy-request-vars
|
︙ | | | ︙ | |
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
pgdb
pkts
portloggermod
(prefix mtargs args:)
servermod
stml2
tasksmod
)
(defstruct alldat
(areapath #f)
(ulexdat #f)
)
|
>
>
|
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
pgdb
pkts
portloggermod
(prefix mtargs args:)
servermod
stml2
tasksmod
ulex
)
(defstruct alldat
(areapath #f)
(ulexdat #f)
)
|
︙ | | | ︙ | |
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
(port #f)
(uuid #f)
(rep #f)
(dbfile #f)
(api-url #f)
(api-uri #f)
(api-req #f)
(status 'starting)
(trynum 0) ;; count the number of ports we've tried
)
(define (servdat->url sdat)
(conc (servdat-host sdat)":"(servdat-port sdat)))
|
>
>
|
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
|
(port #f)
(uuid #f)
(rep #f)
(dbfile #f)
(api-url #f)
(api-uri #f)
(api-req #f)
(uconn #f)
(mode #f)
(status 'starting)
(trynum 0) ;; count the number of ports we've tried
)
(define (servdat->url sdat)
(conc (servdat-host sdat)":"(servdat-port sdat)))
|
︙ | | | ︙ | |
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
|
;; -> http://abc.com:900/<entrypoint>
;;
(define (conndat->uri conn entrypoint)
(conc "http://"(conndat-ipaddr conn)":"(conndat-port conn)"/"entrypoint))
;; set up the api proc, seems like there should be a better place for this?
(define api-proc (make-parameter conc))
(api-proc api:process-request)
;; do we have a connection to apath dbname and
;; is it not expired? then return it
;;
;; else setup a connection
;;
;; if that fails, return '(#f "some reason") ;; NB// convert to raising an exception
|
>
>
>
|
|
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
|
;; -> http://abc.com:900/<entrypoint>
;;
(define (conndat->uri conn entrypoint)
(conc "http://"(conndat-ipaddr conn)":"(conndat-port conn)"/"entrypoint))
;; set up the api proc, seems like there should be a better place for this?
;;
;; IS THIS NEEDED ANYMORE? TODO - REMOVE IF POSSIBLE
;;
(define api-proc (make-parameter conc))
(api-proc api:execute-requests)
;; do we have a connection to apath dbname and
;; is it not expired? then return it
;;
;; else setup a connection
;;
;; if that fails, return '(#f "some reason") ;; NB// convert to raising an exception
|
︙ | | | ︙ | |
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
|
;; connections for other servers happens by requesting from main
;;
;; TODO: This is unnecessarily re-creating the record in the hash table
;;
(define (rmt:open-main-connection remdat apath)
(let* ((fullpath (db:dbname->path apath "/.db/main.db"))
(conns (remotedat-conns remdat))
(conn (hash-table-ref/default conns fullpath #f))) ;; TODO - create call for this
(cond
((and conn ;; conn is NOT a socket, just saying ...
(< (current-seconds) (conndat-expires conn)))
#t) ;; we are current and good to go - we'll deal elsewhere with a server that was killed or died
((and conn
(>= (current-seconds)(conndat-expires conn)))
(debug:print-info 0 *default-log-port* "connection to "fullpath" server expired. Reconnecting.")
(if (conndat-socket conn)
(nng-close! (conndat-socket conn)))
(hash-table-set! conns fullpath #f) ;; clean up
(rmt:open-main-connection remdat apath))
(else
;; Below we will find or create and connect to main
(let* ((dbname (db:run-id->dbname #f))
(the-srv (rmt:find-main-server apath dbname))
(start-main-srv (lambda () ;; call IF there is no the-srv found
|
|
>
>
>
>
>
>
>
>
>
>
>
>
>
|
|
|
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
|
;; connections for other servers happens by requesting from main
;;
;; TODO: This is unnecessarily re-creating the record in the hash table
;;
(define (rmt:open-main-connection remdat apath)
(let* ((fullpath (db:dbname->path apath "/.db/main.db"))
(conns (remotedat-conns remdat))
(conn (hash-table-ref/default conns fullpath #f)) ;; TODO - create call for this
(myconn (if *server-info*
(servdat-uconn *server-info*)
(let* ((th1 (make-thread (lambda ()(rmt:run (get-host-name))) "non-db mode server")))
(thread-start! th1)
(let loop ((count 0))
(assert (< count 30) "FATAL: responder failed to initialize in rmt:open-main-connection")
(if (not *server-info*)
(begin
(thread-sleep! 1)
(loop))
(begin
(servdat-mode-set! *server-info* 'non-db)
(servdat-uconn *server-info*))))))))
(cond
((and conn ;; conn is NOT a socket, just saying ...
(< (current-seconds) (conndat-expires conn)))
#t) ;; we are current and good to go - we'll deal elsewhere with a server that was killed or died
((and conn
(>= (current-seconds)(conndat-expires conn)))
(debug:print-info 0 *default-log-port* "connection to "fullpath" server expired. Reconnecting.")
#;(if (conndat-socket conn)
(nng-close! (conndat-socket conn))) ;; TODO - close the ulex server here?
(hash-table-set! conns fullpath #f) ;; clean up
(rmt:open-main-connection remdat apath))
(else
;; Below we will find or create and connect to main
(let* ((dbname (db:run-id->dbname #f))
(the-srv (rmt:find-main-server apath dbname))
(start-main-srv (lambda () ;; call IF there is no the-srv found
|
︙ | | | ︙ | |
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
|
(fullpath (db:dbname->path apath dbname))
(new-the-srv (make-conndat
apath: apath
dbname: dbname
fullname: fullpath
hostport: srv-addr
socket: (open-nn-connection srv-addr)
ipaddr: ipaddr
port: port
srvpkt: the-srv
srvkey: srvkey ;; generated by rmt:get-signature on the server side
lastmsg: (current-seconds)
expires: (+ (current-seconds) 60) ;; this needs to be gathered during the ping
)))
|
|
|
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
|
(fullpath (db:dbname->path apath dbname))
(new-the-srv (make-conndat
apath: apath
dbname: dbname
fullname: fullpath
hostport: srv-addr
;; socket: (open-nn-connection srv-addr) - TODO - open ulex connection?
ipaddr: ipaddr
port: port
srvpkt: the-srv
srvkey: srvkey ;; generated by rmt:get-signature on the server side
lastmsg: (current-seconds)
expires: (+ (current-seconds) 60) ;; this needs to be gathered during the ping
)))
|
︙ | | | ︙ | |
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
|
(debug:print-logger rmt:log-to-main)))
(cond
((or (not mconn) ;; no channel open to main?
(< (conndat-expires mconn)(+ (current-seconds) 2))) ;; restablish connection if less than 2 seconds on the lease
(if mconn ;; previously opened - clean up NB// consolidate this with the similar code in open main above
(begin
(debug:print-info 0 *default-log-port* "Clearing out connection to main that has expired.")
(nng-close! (conndat-socket mconn))
(hash-table-set! conns fullname #f)))
(rmt:open-main-connection remdat apath)
(rmt:general-open-connection remdat apath mdbname))
((not (rmt:get-conn remdat apath dbname)) ;; no channel open to dbname?
(let* ((res (rmt:send-receive-real remdat apath mdbname 'get-server `(,apath ,dbname))))
(case res
((server-started)
|
|
|
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
|
(debug:print-logger rmt:log-to-main)))
(cond
((or (not mconn) ;; no channel open to main?
(< (conndat-expires mconn)(+ (current-seconds) 2))) ;; restablish connection if less than 2 seconds on the lease
(if mconn ;; previously opened - clean up NB// consolidate this with the similar code in open main above
(begin
(debug:print-info 0 *default-log-port* "Clearing out connection to main that has expired.")
;; (nng-close! (conndat-socket mconn)) ;; TODO - close the ulex server/listener here?
(hash-table-set! conns fullname #f)))
(rmt:open-main-connection remdat apath)
(rmt:general-open-connection remdat apath mdbname))
((not (rmt:get-conn remdat apath dbname)) ;; no channel open to dbname?
(let* ((res (rmt:send-receive-real remdat apath mdbname 'get-server `(,apath ,dbname))))
(case res
((server-started)
|
︙ | | | ︙ | |
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
|
(debug:print-info 0 *default-log-port* "got "res)
(hash-table-set! conns
fullname
(make-conndat
apath: apath
dbname: dbname
hostport: (conc host":"port)
socket: (open-nn-connection (conc host":"port))
ipaddr: ipaddr
port: port
srvkey: servkey
lastmsg: (current-seconds)
expires: (+ (current-seconds) 60))))
(else
(debug:print-info 0 *default-log-port* "return data from starting server did not match host port servkey pid ipaddr apath dbname " res)))
|
|
|
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
|
(debug:print-info 0 *default-log-port* "got "res)
(hash-table-set! conns
fullname
(make-conndat
apath: apath
dbname: dbname
hostport: (conc host":"port)
;; socket: (open-nn-connection (conc host":"port)) ;; TODO - open ulex connection?
ipaddr: ipaddr
port: port
srvkey: servkey
lastmsg: (current-seconds)
expires: (+ (current-seconds) 60))))
(else
(debug:print-info 0 *default-log-port* "return data from starting server did not match host port servkey pid ipaddr apath dbname " res)))
|
︙ | | | ︙ | |
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
|
(let* ((apath *toppath*)
(remdat *remotedat*)
(conns (remotedat-conns remdat)) ;; just checking that remdat is a remotedat
(dbname (db:run-id->dbname rid)))
(if *localmode*
(let* ((dbdat (dbr:dbstruct-get-dbdat *dbstruct* dbname))
(indat `((cmd . ,cmd)(params . ,params))))
(api:process-request *dbstruct* indat)
;; (api:process-request dbdat indat)
)
(begin
(rmt:open-main-connection remdat apath)
(if rid (rmt:general-open-connection remdat apath dbname))
(rmt:send-receive-real remdat apath dbname cmd params)))))
|
>
|
|
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
|
(let* ((apath *toppath*)
(remdat *remotedat*)
(conns (remotedat-conns remdat)) ;; just checking that remdat is a remotedat
(dbname (db:run-id->dbname rid)))
(if *localmode*
(let* ((dbdat (dbr:dbstruct-get-dbdat *dbstruct* dbname))
(indat `((cmd . ,cmd)(params . ,params))))
(api:execute-requests *dbstruct* cmd params)
;; (api:process-request *dbstruct* indat)
;; (api:process-request dbdat indat)
)
(begin
(rmt:open-main-connection remdat apath)
(if rid (rmt:general-open-connection remdat apath dbname))
(rmt:send-receive-real remdat apath dbname cmd params)))))
|
︙ | | | ︙ | |
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
|
(let* ((soc (conndat-socket conn))
(key #f)
(host (conndat-ipaddr conn))
(port (conndat-port conn))
(payload `((cmd . ,cmd)
(key . ,(conndat-srvkey conn))
(params . ,params)))
(res (send-receive-nn soc ;; (open-send-receive-nn (conc host":"port)
(sexpr->string payload))))
(if (member res '("#<unspecified>")) ;; TODO - fix this in string->sexpr
#f
(string->sexpr res)))))
;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed
;; sometime in the future.
;;
|
|
<
|
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
|
(let* ((soc (conndat-socket conn))
(key #f)
(host (conndat-ipaddr conn))
(port (conndat-port conn))
(payload `((cmd . ,cmd)
(key . ,(conndat-srvkey conn))
(params . ,params)))
(res (send-receive soc payload)))
(if (member res '("#<unspecified>")) ;; TODO - fix this in string->sexpr
#f
(string->sexpr res)))))
;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed
;; sometime in the future.
;;
|
︙ | | | ︙ | |
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
|
(let* ((am-server (args:get-arg "-server"))
(dbfile (args:get-arg "-db"))
(apath *toppath*)
(remdat *remotedat*)) ;; foundation for future fix
(if *dbstruct-db*
(let* ((dbdat (db:get-dbdat *dbstruct-db* apath dbfile))
(db (dbr:dbdat-db dbdat))
(inmem (dbr:dbdat-db dbdat))
)
;; do a final sync here
(debug:print-info 0 *default-log-port* "Doing final sync for "apath" "dbfile" at "(current-seconds))
(db:sync-inmem->disk *dbstruct-db* apath dbfile force-sync: #t)
;; let's finalize here
(debug:print-info 0 *default-log-port* "Finalizing db and inmem")
(if (sqlite3:database? db)
|
|
|
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
|
(let* ((am-server (args:get-arg "-server"))
(dbfile (args:get-arg "-db"))
(apath *toppath*)
(remdat *remotedat*)) ;; foundation for future fix
(if *dbstruct-db*
(let* ((dbdat (db:get-dbdat *dbstruct-db* apath dbfile))
(db (dbr:dbdat-db dbdat))
(inmem (dbr:dbdat-db dbdat)) ;; WRONG
)
;; do a final sync here
(debug:print-info 0 *default-log-port* "Doing final sync for "apath" "dbfile" at "(current-seconds))
(db:sync-inmem->disk *dbstruct-db* apath dbfile force-sync: #t)
;; let's finalize here
(debug:print-info 0 *default-log-port* "Finalizing db and inmem")
(if (sqlite3:database? db)
|
︙ | | | ︙ | |
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
|
#t))))
(debug:print-info 4 *default-log-port* "starting exit process, finalizing databases.")
(if (and no-hurry (debug:debug-mode 18))
(rmt:print-db-stats))
(let ((th1 (make-thread
(lambda () ;; thread for cleaning up, give it five seconds
(let* ((start-time (current-seconds)))
(if (and *server-info*
*unclean-shutdown*)
(begin
(debug:print-info 0 *default-log-port* "Unclean server exit, calling server-shtudown")
(rmt:server-shutdown (servdat-host *server-info*)
(servdat-port *server-info*))))
(debug:print-info 0 *default-log-port* "Shutdown activities completed in "(- (current-seconds) start-time)" seconds"))
;; (if *dbstruct-db* (db:close-all *dbstruct-db*)) ;; one second allocated
#;(if (bdat-task-db *bdat*) ;; TODO: Check that this is correct for task db
(let ((db (cdr (bdat-task-db *bdat*))))
(if (sqlite3:database? db)
(begin
(debug:print-info 0 *default-log-port* "Closing down task db "db)
|
|
|
|
|
>
>
>
|
>
|
|
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
|
#t))))
(debug:print-info 4 *default-log-port* "starting exit process, finalizing databases.")
(if (and no-hurry (debug:debug-mode 18))
(rmt:print-db-stats))
(let ((th1 (make-thread
(lambda () ;; thread for cleaning up, give it five seconds
(let* ((start-time (current-seconds)))
(if *server-info*
(let* ((host (servdat-host *server-info*))
(port (servdat-port *server-info*)))
(debug:print-info 0 *default-log-port* "Shutting down server/responder.")
;;
;; TODO - add flushing/waiting on the work queue
;;
(rmt:server-shutdown host port)
(portlogger:open-run-close portlogger:set-port port "released")))
(debug:print-info 0 *default-log-port* "Shutdown activities completed in "(- (current-seconds) start-time)" seconds"))
;; (if *dbstruct-db* (db:close-all *dbstruct-db*)) ;; one second allocated
#;(if (bdat-task-db *bdat*) ;; TODO: Check that this is correct for task db
(let ((db (cdr (bdat-task-db *bdat*))))
(if (sqlite3:database? db)
(begin
(debug:print-info 0 *default-log-port* "Closing down task db "db)
|
︙ | | | ︙ | |
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
|
;;
;; conn is a conndat record
;;
(define (server:ping conn #!key (do-exit #f))
(let* ((req (conndat-socket conn))
(srvkey (conndat-srvkey conn))
(msg (sexpr->string '(ping ,srvkey))))
(send-receive-nn req msg))) ;; (server-ready? host port server-id))
;;======================================================================
;; http-transportmod.scm contents moved here
;;======================================================================
(define (http-transport:make-server-url hostport)
(if (not hostport)
#f
(conc "http://" (car hostport) ":" (cadr hostport))))
;;======================================================================
;; S E R V E R
;; ======================================================================
(define (http-get-function fnkey)
(hash-table-ref/default *http-functions* fnkey (lambda () "nothing here yet")))
;; Main entry point to start a server. was start-server
(define (rmt:run hostn)
;; ;; Configurations for server
;; (tcp-buffer-size 2048)
;; (max-connections 2048)
(debug:print 2 *default-log-port* "PID: "(current-process-id)". Attempting to start the server ...")
(let* ((db #f) ;; (open-db)) ;; we don't want the server to be opening and closing the db unnecesarily
(hostname (get-host-name))
(ipaddrstr (let ((ipstr (if (string=? "-" hostn)
;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
(server:get-best-guess-address hostname)
#f)))
(if ipstr ipstr hostn))) ;; hostname)))
(port (portlogger:open-run-close portlogger:find-port))
;; (link-tree-path (common:get-linktree))
;; (tmp-area (common:get-db-tmp-area))
#;(start-file (conc tmp-area "/.server-start")))
(debug:print-info 0 *default-log-port* "portlogger recommended port: " port)
(if *server-info*
(begin
(servdat-host-set! *server-info* ipaddrstr)
(servdat-port-set! *server-info* port)
(servdat-status-set! *server-info* 'trying-port)
(servdat-trynum-set! *server-info* (+ (servdat-trynum *server-info*) 1)))
(set! *server-info* (make-servdat host: ipaddrstr port: port)))
(let* ((rep (rmt:try-start-server ipaddrstr port)))
(let loop ((instr (nng-recv rep)))
(let* ((data (string->sexpr instr))
(res (case data
((quit) 'quit)
(else (api:process-request *dbstruct-db* data))))
(resdat (sexpr->string res)))
(if (not (eq? res 'quit))
(begin
(set! *db-last-access* (current-seconds))
(nng-send rep resdat)
(loop (nng-recv rep)))))))
(debug:print-info 0 *default-log-port* "After server, should never see this")
;; server exit stuff here
(let* ((portnum (servdat-port *server-info*))
(host (servdat-host *server-info*)))
(portlogger:open-run-close portlogger:set-port portnum "released")
(if (not (equal? (get-host-name) host))
(debug:print-info 0 *default-log-port* "Server shutdown called for host "host", but we are on "(get-host-name))
(rmt:server-shutdown host portnum))
;; (bdat-time-to-exit-set! *bdat* #t) ;; tell on-exit to be fast as we've already cleaned up
(portlogger:open-run-close portlogger:set-port port "released") ;; done in rmt:run
;; (debug:print-info 0 *default-log-port* "Max cached queries was " *max-cache-size*)
;; (debug:print-info 0 *default-log-port* "Number of cached writes " *number-of-writes*)
;; (debug:print-info 0 *default-log-port* "Average cached write time "
;; (if (eq? *number-of-writes* 0)
;; "n/a (no writes)"
;; (/ *writes-total-delay*
;; *number-of-writes*))
;; " ms")
;; (debug:print-info 0 *default-log-port* "Number non-cached queries " *number-non-write-queries*)
;; (debug:print-info 0 *default-log-port* "Average non-cached time "
;; (if (eq? *number-non-write-queries* 0)
;; "n/a (no queries)"
;; (/ *total-non-write-delay*
;; *number-non-write-queries*))
;; " ms")
(db:print-current-query-stats)
(debug:print-info 0 *default-log-port* "Server shutdown complete. Exiting")
)))
(define (rmt:try-start-server ipaddrstr portnum)
(if *server-info* ;; update the server info as we might be trying next port
(begin
(servdat-host-set! *server-info* ipaddrstr)
(servdat-port-set! *server-info* portnum)
(servdat-status-set! *server-info* 'trying-port)
(servdat-trynum-set! *server-info*
(+ (servdat-trynum *server-info*) 1)))
(set! *server-info* (make-servdat host: ipaddrstr port: portnum)))
(debug:print-info 0 *default-log-port* "rmt:try-start-server time="
(seconds->time-string (current-seconds))
" ipaddrsstr=" ipaddrstr
" portnum=" portnum)
(assert (servdat? *server-info*) "FATAL: Must always have *server-info* properly set up by here.")
(servdat-status-set! *server-info* 'starting)
(servdat-port-set! *server-info* portnum)
(if (not (servdat-rep *server-info*))
(let ((rep (make-rep-socket)))
(servdat-rep-set! *server-info* rep)
(socket-set! rep 'nng/recvtimeo 2000)))
(let* ((rep (servdat-rep *server-info*)))
(debug:print 0 *default-log-port* "INFO: Trying to start server on " ipaddrstr ":" portnum)
(handle-exceptions
exn
(begin
(print-error-message exn)
(if (< portnum 64000)
(begin
(debug:print 0 *default-log-port* "WARNING: attempt to start server failed. Trying again ...")
(debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn))
(debug:print 5 *default-log-port* "exn=" (condition->list exn))
(portlogger:open-run-close portlogger:set-failed portnum)
(debug:print 0 *default-log-port* "WARNING: failed to start on portnum: " portnum ", trying next port")
;; (thread-sleep! 0.1)
(rmt:try-start-server ipaddrstr
(portlogger:open-run-close portlogger:find-port)))
(begin
(print "ERROR: Tried and tried but could not start the server, stopping at port "portnum))))
(nng-listen rep (conc "tcp://*:" portnum))
rep)))
;;======================================================================
;; S E R V E R U T I L I T I E S
;;======================================================================
;;======================================================================
;; C L I E N T S
|
|
>
<
<
|
<
|
|
<
|
>
>
|
<
<
<
|
<
>
>
|
|
|
<
<
>
|
|
|
<
<
<
|
<
<
<
<
>
|
<
|
<
<
<
<
<
|
<
|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
|
|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
|
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
|
;;
;; conn is a conndat record
;;
(define (server:ping conn #!key (do-exit #f))
(let* ((req (conndat-socket conn))
(srvkey (conndat-srvkey conn))
(msg (sexpr->string '(ping ,srvkey))))
(send-receive req msg))) ;; (server-ready? host port server-id))
;;======================================================================
;; http-transportmod.scm contents moved here
;;======================================================================
(define (http-transport:make-server-url hostport)
(if (not hostport)
#f
(conc "http://" (car hostport) ":" (cadr hostport))))
;;======================================================================
;; S E R V E R
;; ======================================================================
(define (http-get-function fnkey)
(hash-table-ref/default *http-functions* fnkey (lambda () "nothing here yet")))
;; Main entry point to start a server. was start-server
(define (rmt:run hostn)
;; (assert (not *server-info*) "FATAL: rmt:run called but *server-info* has already been initialized")
;; ;; Configurations for server
;; (tcp-buffer-size 2048)
;; (max-connections 2048)
(debug:print 2 *default-log-port* "PID: "(current-process-id)". Attempting to start the server ...")
(if *server-info*
(let* ((uconn (servdat-uconn *server-info*)))
(wait-and-close uconn))
(let* ((port (portlogger:open-run-close portlogger:find-port))
(handler-proc (lambda (rem-host-port qrykey cmd params) ;;
(api:execute-requests *dbstruct-db* cmd params))))
;; (api:process-request *dbstuct-db*
(set! *server-info* (make-servdat host: hostn port: port))
(let* ((uconn (run-listener handler-proc suggested-port: port))
(rport (udat-port uconn))) ;; the real port
(servdat-host-set! *server-info* hostn)
(servdat-port-set! *server-info* rport)
(servdat-uconn-set! *server-info* uconn)
(wait-and-close uconn)
(db:print-current-query-stats)
)))
(let* ((host (servdat-host *server-info*))
(port (servdat-port *server-info*))
(mode (or (servdat-mode *server-info*)
"non-db")))
;; server exit stuff here
;; (rmt:server-shutdown host port) - always do in on-exit
;; (portlogger:open-run-close portlogger:set-port port "released") ;; moved to on-exit
(debug:print-info 0 *default-log-port* "Server "host":"port" mode "mode"shutdown complete. Exiting")
))
;;======================================================================
;; S E R V E R U T I L I T I E S
;;======================================================================
;;======================================================================
;; C L I E N T S
|
︙ | | | ︙ | |
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
|
(conc (alist-ref 'host srv-pkt) ":"
(alist-ref 'port srv-pkt)))
(define (server-ready? host port key) ;; server-address is host:port
(let* ((data (sexpr->string `((cmd . ping)
(key . ,key)
(params . ()))))
(res (open-send-receive-nn (conc host ":" port) data)))
(if res
(string->sexpr res)
res)))
; from the pkts return servers associated with dbpath
;; NOTE: Only one can be alive - have to check on each
;; in the list of pkts returned
|
|
|
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
|
(conc (alist-ref 'host srv-pkt) ":"
(alist-ref 'port srv-pkt)))
(define (server-ready? host port key) ;; server-address is host:port
(let* ((data (sexpr->string `((cmd . ping)
(key . ,key)
(params . ()))))
(res (send-receive (conc host ":" port) data)))
(if res
(string->sexpr res)
res)))
; from the pkts return servers associated with dbpath
;; NOTE: Only one can be alive - have to check on each
;; in the list of pkts returned
|
︙ | | | ︙ | |
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
|
(server-start-time (current-seconds))
(pkts-dir (get-pkts-dir))
(server-key (rmt:get-signature)) ;; This servers key
(is-main (equal? (args:get-arg "-db") ".db/main.db"))
(last-access 0)
(server-timeout (server:expiration-timeout))
(shutdown-server-sequence (lambda (host port)
(set! *unclean-shutdown* #f)
(debug:print-info 0 *default-log-port* "Starting to shutdown the server. pid="(current-process-id))
(rmt:server-shutdown host port)
(portlogger:open-run-close portlogger:set-port port "released")
(exit)))
(timed-out? (lambda ()
(<= (+ last-access server-timeout)
(current-seconds)))))
(servdat-dbfile-set! *server-info* (args:get-arg "-db"))
;; main and run db servers have both got wait logic (could/should merge it)
(if is-main
|
|
|
|
|
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
|
(server-start-time (current-seconds))
(pkts-dir (get-pkts-dir))
(server-key (rmt:get-signature)) ;; This servers key
(is-main (equal? (args:get-arg "-db") ".db/main.db"))
(last-access 0)
(server-timeout (server:expiration-timeout))
(shutdown-server-sequence (lambda (host port)
(set! *unclean-shutdown* #f) ;; Should not be needed anymore
(debug:print-info 0 *default-log-port* "Starting to shutdown the server. pid="(current-process-id))
;; (rmt:server-shutdown host port) -- called in on-exit
;; (portlogger:open-run-close portlogger:set-port port "released") called in on-exit
(exit)))
(timed-out? (lambda ()
(<= (+ last-access server-timeout)
(current-seconds)))))
(servdat-dbfile-set! *server-info* (args:get-arg "-db"))
;; main and run db servers have both got wait logic (could/should merge it)
(if is-main
|
︙ | | | ︙ | |
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355
2356
2357
2358
2359
2360
2361
2362
2363
|
(if (string-search (regexp (conc ":" port-num)) inl)
(begin
;(print "Output: " inl)
(set! ret #t))
(loop (read-line inp)))))))
ret))
;;start a server, returns the connection
;;
(define (start-nn-server portnum )
(let ((rep (make-rep-socket))) ;; (nn-socket 'rep)))
(socket-set! rep 'nng/recvtimeo 2000)
(handle-exceptions ;; why have exception handler here?
exn
(let ((emsg ((condition-property-accessor 'exn 'message) exn)))
(print "ERROR: Failed to start server \"" emsg "\"")
(exit 1))
(nng-dial #;nn-bind rep (conc "tcp://*:" portnum)))
rep))
(define (open-nn-connection host-port)
(let ((req (make-req-socket))
(uri (conc "tcp://" host-port)))
(nng-dial req uri)
(socket-set! req 'nng/recvtimeo 2000)
req))
(define (send-receive-nn req msg)
(nng-send req msg)
(nng-recv req))
(define (close-nn-connection req)
(nng-close! req))
;; ;; open connection to server, send message, close connection
;; ;;
;; (define (open-send-close-nn host-port msg #!key (timeout 3) ) ;; default timeout is 3 seconds
;; (let ((req (make-req-socket 'req))
;; (uri (conc "tcp://" host-port))
|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
|
|
|
|
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
|
(if (string-search (regexp (conc ":" port-num)) inl)
(begin
;(print "Output: " inl)
(set! ret #t))
(loop (read-line inp)))))))
ret))
#;(define (open-nn-connection host-port)
(let ((req (make-req-socket))
(uri (conc "tcp://" host-port)))
(nng-dial req uri)
(socket-set! req 'nng/recvtimeo 2000)
req))
#;(define (send-receive-nn req msg)
(nng-send req msg)
(nng-recv req))
#;(define (close-nn-connection req)
(nng-close! req))
;; ;; open connection to server, send message, close connection
;; ;;
;; (define (open-send-close-nn host-port msg #!key (timeout 3) ) ;; default timeout is 3 seconds
;; (let ((req (make-req-socket 'req))
;; (uri (conc "tcp://" host-port))
|
︙ | | | ︙ | |
2388
2389
2390
2391
2392
2393
2394
2395
2396
2397
2398
2399
2400
2401
2402
|
;; (thread-terminate! th1))
;; "timer thread")))
;; (thread-start! th1)
;; (thread-start! th2)
;; (thread-join! th1)
;; res))))
;;
(define (open-send-receive-nn host-port msg #!key (timeout 3) ) ;; default timeout is 3 seconds
(let ((req (make-req-socket))
(uri (conc "tcp://" host-port))
(res #f))
(handle-exceptions
exn
(let ((emsg ((condition-property-accessor 'exn 'message) exn)))
;; Send notification
|
|
|
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
|
;; (thread-terminate! th1))
;; "timer thread")))
;; (thread-start! th1)
;; (thread-start! th2)
;; (thread-join! th1)
;; res))))
;;
#;(define (open-send-receive-nn host-port msg #!key (timeout 3) ) ;; default timeout is 3 seconds
(let ((req (make-req-socket))
(uri (conc "tcp://" host-port))
(res #f))
(handle-exceptions
exn
(let ((emsg ((condition-property-accessor 'exn 'message) exn)))
;; Send notification
|
︙ | | | ︙ | |