︙ | | | ︙ | |
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
directory-utils
format
;; http-client
;; intarweb
matchable
md5
message-digest
nng ;; nanomsg
(prefix base64 base64:)
(prefix sqlite3 sqlite3:)
regex
s11n
;; spiffy
;; spiffy-directory-listing
;; spiffy-request-vars
|
|
|
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
directory-utils
format
;; http-client
;; intarweb
matchable
md5
message-digest
;; nng ;; nanomsg
(prefix base64 base64:)
(prefix sqlite3 sqlite3:)
regex
s11n
;; spiffy
;; spiffy-directory-listing
;; spiffy-request-vars
|
︙ | | | ︙ | |
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
pgdb
pkts
portloggermod
(prefix mtargs args:)
servermod
stml2
tasksmod
)
(defstruct alldat
(areapath #f)
(ulexdat #f)
)
|
>
>
|
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
pgdb
pkts
portloggermod
(prefix mtargs args:)
servermod
stml2
tasksmod
ulex
)
(defstruct alldat
(areapath #f)
(ulexdat #f)
)
|
︙ | | | ︙ | |
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
|
;; connections for other servers happens by requesting from main
;;
;; TODO: This is unnecessarily re-creating the record in the hash table
;;
(define (rmt:open-main-connection remdat apath)
(let* ((fullpath (db:dbname->path apath "/.db/main.db"))
(conns (remotedat-conns remdat))
(conn (hash-table-ref/default conns fullpath #f))) ;; TODO - create call for this
(cond
((and conn ;; conn is NOT a socket, just saying ...
(< (current-seconds) (conndat-expires conn)))
#t) ;; we are current and good to go - we'll deal elsewhere with a server that was killed or died
((and conn
(>= (current-seconds)(conndat-expires conn)))
(debug:print-info 0 *default-log-port* "connection to "fullpath" server expired. Reconnecting.")
|
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
|
;; connections for other servers happens by requesting from main
;;
;; TODO: This is unnecessarily re-creating the record in the hash table
;;
(define (rmt:open-main-connection remdat apath)
(let* ((fullpath (db:dbname->path apath "/.db/main.db"))
(conns (remotedat-conns remdat))
(conn (hash-table-ref/default conns fullpath #f)) ;; TODO - create call for this
(myconn (if *server-info*
(servdat-uconn *server-info*)
(let* ((th1 (make-thread (lambda ()(rmt:run (get-host-name))) "non-db mode server")))
(thread-start! th1)
(let loop ((count 0))
(assert (< count 30) "FATAL: responder failed to initialize in rmt:open-main-connection")
(if (not *server-info*)
(begin
(thread-sleep! 1)
(loop))
(begin
(servdat-mode-set! *server-info* 'non-db)
(server-uconn *server-info*))))))))
;; What is next?
(cond
((and conn ;; conn is NOT a socket, just saying ...
(< (current-seconds) (conndat-expires conn)))
#t) ;; we are current and good to go - we'll deal elsewhere with a server that was killed or died
((and conn
(>= (current-seconds)(conndat-expires conn)))
(debug:print-info 0 *default-log-port* "connection to "fullpath" server expired. Reconnecting.")
|
︙ | | | ︙ | |
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
|
;; ======================================================================
(define (http-get-function fnkey)
(hash-table-ref/default *http-functions* fnkey (lambda () "nothing here yet")))
;; Main entry point to start a server. was start-server
(define (rmt:run hostn)
;; ;; Configurations for server
;; (tcp-buffer-size 2048)
;; (max-connections 2048)
(debug:print 2 *default-log-port* "PID: "(current-process-id)". Attempting to start the server ...")
(let* ((db #f) ;; (open-db)) ;; we don't want the server to be opening and closing the db unnecesarily
(hostname (get-host-name))
(ipaddrstr (let ((ipstr (if (string=? "-" hostn)
;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
(server:get-best-guess-address hostname)
#f)))
(if ipstr ipstr hostn))) ;; hostname)))
(port (portlogger:open-run-close portlogger:find-port))
;; (link-tree-path (common:get-linktree))
;; (tmp-area (common:get-db-tmp-area))
#;(start-file (conc tmp-area "/.server-start")))
(debug:print-info 0 *default-log-port* "portlogger recommended port: " port)
(if *server-info*
(begin
(servdat-host-set! *server-info* ipaddrstr)
(servdat-port-set! *server-info* port)
(servdat-status-set! *server-info* 'trying-port)
(servdat-trynum-set! *server-info* (+ (servdat-trynum *server-info*) 1)))
(set! *server-info* (make-servdat host: ipaddrstr port: port)))
(let* ((rep (rmt:try-start-server ipaddrstr port)))
(let loop ((instr (nng-recv rep)))
(let* ((data (string->sexpr instr))
(res (case data
((quit) 'quit)
(else (api:process-request *dbstruct-db* data))))
(resdat (sexpr->string res)))
(if (not (eq? res 'quit))
(begin
(set! *db-last-access* (current-seconds))
(nng-send rep resdat)
(loop (nng-recv rep)))))))
(debug:print-info 0 *default-log-port* "After server, should never see this")
;; server exit stuff here
(let* ((portnum (servdat-port *server-info*))
(host (servdat-host *server-info*)))
(portlogger:open-run-close portlogger:set-port portnum "released")
(if (not (equal? (get-host-name) host))
(debug:print-info 0 *default-log-port* "Server shutdown called for host "host", but we are on "(get-host-name))
(rmt:server-shutdown host portnum))
;; (bdat-time-to-exit-set! *bdat* #t) ;; tell on-exit to be fast as we've already cleaned up
(portlogger:open-run-close portlogger:set-port port "released") ;; done in rmt:run
;; (debug:print-info 0 *default-log-port* "Max cached queries was " *max-cache-size*)
;; (debug:print-info 0 *default-log-port* "Number of cached writes " *number-of-writes*)
;; (debug:print-info 0 *default-log-port* "Average cached write time "
;; (if (eq? *number-of-writes* 0)
;; "n/a (no writes)"
;; (/ *writes-total-delay*
;; *number-of-writes*))
;; " ms")
;; (debug:print-info 0 *default-log-port* "Number non-cached queries " *number-non-write-queries*)
;; (debug:print-info 0 *default-log-port* "Average non-cached time "
;; (if (eq? *number-non-write-queries* 0)
;; "n/a (no queries)"
;; (/ *total-non-write-delay*
;; *number-non-write-queries*))
;; " ms")
(db:print-current-query-stats)
(debug:print-info 0 *default-log-port* "Server shutdown complete. Exiting")
)))
(define (rmt:try-start-server ipaddrstr portnum)
(if *server-info* ;; update the server info as we might be trying next port
(begin
(servdat-host-set! *server-info* ipaddrstr)
(servdat-port-set! *server-info* portnum)
(servdat-status-set! *server-info* 'trying-port)
(servdat-trynum-set! *server-info*
(+ (servdat-trynum *server-info*) 1)))
(set! *server-info* (make-servdat host: ipaddrstr port: portnum)))
(debug:print-info 0 *default-log-port* "rmt:try-start-server time="
(seconds->time-string (current-seconds))
" ipaddrsstr=" ipaddrstr
" portnum=" portnum)
(assert (servdat? *server-info*) "FATAL: Must always have *server-info* properly set up by here.")
(servdat-status-set! *server-info* 'starting)
(servdat-port-set! *server-info* portnum)
(if (not (servdat-rep *server-info*))
(let ((rep (make-rep-socket)))
(servdat-rep-set! *server-info* rep)
(socket-set! rep 'nng/recvtimeo 2000)))
(let* ((rep (servdat-rep *server-info*)))
(debug:print 0 *default-log-port* "INFO: Trying to start server on " ipaddrstr ":" portnum)
(handle-exceptions
exn
(begin
(print-error-message exn)
(if (< portnum 64000)
(begin
(debug:print 0 *default-log-port* "WARNING: attempt to start server failed. Trying again ...")
(debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn))
(debug:print 5 *default-log-port* "exn=" (condition->list exn))
(portlogger:open-run-close portlogger:set-failed portnum)
(debug:print 0 *default-log-port* "WARNING: failed to start on portnum: " portnum ", trying next port")
;; (thread-sleep! 0.1)
(rmt:try-start-server ipaddrstr
(portlogger:open-run-close portlogger:find-port)))
(begin
(print "ERROR: Tried and tried but could not start the server, stopping at port "portnum))))
(nng-listen rep (conc "tcp://*:" portnum))
rep)))
;;======================================================================
;; S E R V E R U T I L I T I E S
;;======================================================================
;;======================================================================
;; C L I E N T S
|
>
<
<
|
<
|
|
<
|
>
>
|
<
<
<
|
<
>
>
|
|
|
<
<
>
|
|
|
<
<
<
|
<
<
<
<
>
|
<
|
<
<
<
<
<
|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
|
|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
|
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
|
;; ======================================================================
(define (http-get-function fnkey)
(hash-table-ref/default *http-functions* fnkey (lambda () "nothing here yet")))
;; Main entry point to start a server. was start-server
(define (rmt:run hostn)
;; (assert (not *server-info*) "FATAL: rmt:run called but *server-info* has already been initialized")
;; ;; Configurations for server
;; (tcp-buffer-size 2048)
;; (max-connections 2048)
(debug:print 2 *default-log-port* "PID: "(current-process-id)". Attempting to start the server ...")
(if *server-info*
(let* ((uconn (servdat-uconn *server-info*)))
(wait-and-close uconn))
(let* ((port (portlogger:open-run-close portlogger:find-port))
(handler-proc (lambda (rem-host-port qrykey cmd params) ;;
(api:execute-requests *dbstruct-db* cmd params))))
;; (api:process-request *dbstuct-db*
(set! *server-info* (make-servdat host: ipaddrstr port: port))
(let* ((uconn (run-listener handler-proc suggested-port: port))
(rport (udat-port uconn))) ;; the real port
(servdat-host-set! *server-info* hostn)
(servdat-port-set! *server-info* rport)
(servdat-uconn-set! *server-info* uconn)
(wait-and-close uconn)
(db:print-current-query-stats)
)))
(let* ((host (servdat-host *servdat-info*))
(port (servdat-port *servdat-info*))
(mode (or (servdat-mode *servdat-mode*)
"non-db")))
;; server exit stuff here
(rmt:server-shutdown host port)
(portlogger:open-run-close portlogger:set-port port "released") ;; done in rmt:run
(debug:print-info 0 *default-log-port* "Server "host":"port" mode "mode"shutdown complete. Exiting")
))
;;======================================================================
;; S E R V E R U T I L I T I E S
;;======================================================================
;;======================================================================
;; C L I E N T S
|
︙ | | | ︙ | |
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
|
(if (string-search (regexp (conc ":" port-num)) inl)
(begin
;(print "Output: " inl)
(set! ret #t))
(loop (read-line inp)))))))
ret))
;;start a server, returns the connection
;;
(define (start-nn-server portnum )
(let ((rep (make-rep-socket))) ;; (nn-socket 'rep)))
(socket-set! rep 'nng/recvtimeo 2000)
(handle-exceptions ;; why have exception handler here?
exn
(let ((emsg ((condition-property-accessor 'exn 'message) exn)))
(print "ERROR: Failed to start server \"" emsg "\"")
(exit 1))
(nng-dial #;nn-bind rep (conc "tcp://*:" portnum)))
rep))
(define (open-nn-connection host-port)
(let ((req (make-req-socket))
(uri (conc "tcp://" host-port)))
(nng-dial req uri)
(socket-set! req 'nng/recvtimeo 2000)
req))
|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
|
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
|
(if (string-search (regexp (conc ":" port-num)) inl)
(begin
;(print "Output: " inl)
(set! ret #t))
(loop (read-line inp)))))))
ret))
(define (open-nn-connection host-port)
(let ((req (make-req-socket))
(uri (conc "tcp://" host-port)))
(nng-dial req uri)
(socket-set! req 'nng/recvtimeo 2000)
req))
|
︙ | | | ︙ | |