;;; ulex: Distributed sqlite3 db
;;;
;; Copyright (C) 2018 Matt Welland
;; Redistribution and use in source and binary forms, with or without
;; modification, is permitted.
;;
;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS
;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
;; ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE
;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
;; OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
;; BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
;; LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
;; USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
;; DAMAGE.
;;======================================================================
;; ABOUT:
;; See README in the distribution at https://www.kiatoa.com/fossils/ulex
;; NOTES:
;; Why sql-de-lite and not say, dbi? - performance mostly, then simplicity.
;;
;;======================================================================
;; (use rpc pkts mailbox sqlite3)
(module ulex
*
;; #;(
;; ;; areas
;; make-area
;; area->alist
;; ;; server
;; launch
;; update-known-servers
;; shutdown
;; get-best-server
;; ;; client side
;; call
;; ;; queries, procs and system commands (i.e. workers)
;; register
;; register-batch
;; ;; database - note: most database stuff need not be exposed, these calls may be removed from exports in future
;; open-db
;; ;; ports
;; pl-find-port
;; pl-get-prev-used-port
;; pl-open-db
;; pl-open-run-close
;; pl-release-port
;; pl-set-port
;; pl-take-port
;; pl-is-port-available
;; pl-get-port-state
;; ;; system
;; get-normalized-cpu-load
;; )
(import scheme posix chicken data-structures ports extras files mailbox)
(import rpc srfi-18 pkts matchable regex
typed-records srfi-69 srfi-1
srfi-4 regex-case
(prefix sqlite3 sqlite3:)
foreign
tcp) ;; ulex-netutil)
;;======================================================================
;; D E B U G H E L P E R S
;;======================================================================
(define (dbg> . args)
(with-output-to-port (current-error-port)
(lambda ()
(apply print "dbg> " args))))
(define (debug-pp . args)
(if (get-environment-variable "ULEX_DEBUG")
(with-output-to-port (current-error-port)
(lambda ()
(apply pp args)))))
(define *default-debug-port* (current-error-port))
(define (sdbg> fn stage-name stage-start stage-end start-time . message)
(if (get-environment-variable "ULEX_DEBUG")
(with-output-to-port *default-debug-port*
(lambda ()
(apply print "ulex:" fn " " stage-name " took " (- (if stage-end stage-end (current-milliseconds)) stage-start) " ms. "
(if start-time
(conc "total time " (- (current-milliseconds) start-time)
" ms.")
"")
message
)))))
;;======================================================================
;; M A C R O S
;;======================================================================
;; iup callbacks are not dumping the stack, this is a work-around
;;
;; Some of these routines use:
;;
;; http://www.cs.toronto.edu/~gfb/scheme/simple-macros.html
;;
;; Syntax for defining macros in a simple style similar to function definiton,
;; when there is a single pattern for the argument list and there are no keywords.
;;
;; (define-simple-syntax (name arg ...) body ...)
;;
(define-syntax define-simple-syntax
(syntax-rules ()
((_ (name arg ...) body ...)
(define-syntax name (syntax-rules () ((name arg ...) (begin body ...)))))))
(define-simple-syntax (catch-and-dump proc procname)
(handle-exceptions
exn
(begin
(print-call-chain (current-error-port))
(with-output-to-port (current-error-port)
(lambda ()
(print ((condition-property-accessor 'exn 'message) exn))
(print "Callback error in " procname)
(print "Full condition info:\n" (condition->list exn)))))
(proc)))
;;======================================================================
;; R E C O R D S
;;======================================================================
;; information about me as a server
;;
(defstruct area
;; about this area
(useportlogger #f)
(lowport 32768)
(server-type 'auto) ;; auto=create up to five servers/pkts, main=create pkts, passive=no pkt (unless there are no pkts at all)
(conn #f)
(port #f)
(myaddr (get-my-best-address))
pktid ;; get pkt from hosts table if needed
pktfile
pktsdir
dbdir
(dbhandles (make-hash-table)) ;; fname => list-of-dbh, NOTE: Should really never need more than one?
(mutex (make-mutex))
(rtable (make-hash-table)) ;; registration table of available actions
(dbs (make-hash-table)) ;; filename => random number, used for choosing what dbs I serve
;; about other servers
(hosts (make-hash-table)) ;; key => hostdat
(hoststats (make-hash-table)) ;; key => alist of fname => ( qcount . qtime )
(reqs (make-hash-table)) ;; uri => queue
;; work queues
(wqueues (make-hash-table)) ;; fname => qdat
(stats (make-hash-table)) ;; fname => totalqueries
(last-srvup (current-seconds)) ;; last time we updated the known servers
(cookie2mbox (make-hash-table)) ;; map cookie for outstanding request to mailbox of awaiting call
(ready #f)
(health (make-hash-table)) ;; ipaddr:port => num failed pings since last good ping
)
;; host stats
;;
(defstruct hostdat
(pkt #f)
(dbload (make-hash-table)) ;; "dbfile.db" => queries/min
(hostload #f) ;; normalized load ( 5min load / numcpus )
)
;; dbdat
;;
(defstruct dbdat
(dbh #f)
(fname #f)
(write-access #f)
(sths (make-hash-table)) ;; hash mapping query strings to handles
)
;; qdat
;;
(defstruct qdat
(writeq (make-queue))
(readq (make-queue))
(rwq (make-queue))
(logq (make-queue)) ;; do we need a queue for logging? yes, if we use sqlite3 db for logging
(osshort (make-queue))
(oslong (make-queue))
(misc (make-queue)) ;; used for things like ping-full
)
;; calldat
;;
(defstruct calldat
(ctype 'dbwrite)
(obj #f) ;; this would normally be an SQL statement e.g. SELECT, INSERT etc.
(rtime (current-milliseconds)))
;; make it a global? Well, it is local to area module
(define *pktspec*
`((server (hostname . h)
(port . p)
(pid . i)
(ipaddr . a)
)
(data (hostname . h) ;; sender hostname
(port . p) ;; sender port
(ipaddr . a) ;; sender ip
(hostkey . k) ;; sending host key - store info at server under this key
(servkey . s) ;; server key - this needs to match at server end or reject the msg
(format . f) ;; sb=serialized-base64, t=text, sx=sexpr, j=json
(data . d) ;; base64 encoded slln data
)))
;; work item
;;
(defstruct witem
(rhost #f) ;; return host
(ripaddr #f) ;; return ipaddr
(rport #f) ;; return port
(servkey #f) ;; the packet representing the client of this workitem, used by final send-message
(rdat #f) ;; the request - usually an sql query, type is rdat
(action #f) ;; the action: immediate, dbwrite, dbread,oslong, osshort
(cookie #f) ;; cookie id for response
(data #f) ;; the data payload, i.e. parameters
(result #f) ;; the result from processing the data
(caller #f)) ;; the calling peer according to rpc itself
(define (trim-pktid pktid)
(if (string? pktid)
(substring pktid 0 4)
"nopkt"))
(define (any->number num)
(cond
((number? num) num)
((string? num) (string->number num))
(else num)))
(use trace)
(trace-call-sites #t)
;;======================================================================
;; D A T A B A S E H A N D L I N G
;;======================================================================
;; look in dbhandles for a db, return it, else return #f
;;
(define (get-dbh acfg fname)
(let ((dbh-lst (hash-table-ref/default (area-dbhandles acfg) fname '())))
(if (null? dbh-lst)
(begin
;; (print "opening db for " fname)
(open-db acfg fname)) ;; Note that the handles get put back in the queue in the save-dbh calls
(let ((rem-lst (cdr dbh-lst)))
;; (print "re-using saved connection for " fname)
(hash-table-set! (area-dbhandles acfg) fname rem-lst)
(car dbh-lst)))))
(define (save-dbh acfg fname dbdat)
;; (print "saving dbh for " fname)
(hash-table-set! (area-dbhandles acfg) fname (cons dbdat (hash-table-ref/default (area-dbhandles acfg) fname '()))))
;; open the database, if never before opened init it. put the handle in the
;; open db's hash table
;; returns: the dbdat
;;
(define (open-db acfg fname)
(let* ((fullname (conc (area-dbdir acfg) "/" fname))
(exists (file-exists? fullname))
(write-access (if exists
(file-write-access? fullname)
(file-write-access? (area-dbdir acfg))))
(db (sqlite3:open-database fullname))
(handler (sqlite3:make-busy-timeout 136000))
)
(sqlite3:set-busy-handler! db handler)
(sqlite3:execute db "PRAGMA synchronous = 0;")
(if (not exists) ;; need to init the db
(if write-access
(let ((isql (get-rsql acfg 'dbinitsql))) ;; get the init sql statements
;; (sqlite3:with-transaction
;; db
;; (lambda ()
(if isql
(for-each
(lambda (sql)
(sqlite3:execute db sql))
isql)))
(print "ERROR: no write access to " (area-dbdir acfg))))
(make-dbdat dbh: db fname: fname write-access: write-access)))
;; This is a low-level command to retrieve or to prepare, save and return a prepared statment
;; you must extract the db handle
;;
(define (get-sth db cache stmt)
(if (hash-table-exists? cache stmt)
(begin
;; (print "Reusing cached stmt for " stmt)
(hash-table-ref/default cache stmt #f))
(let ((sth (sqlite3:prepare db stmt)))
(hash-table-set! cache stmt sth)
;; (print "prepared stmt for " stmt)
sth)))
;; a little more expensive but does all the tedious deferencing - only use if you don't already
;; have dbdat and db sitting around
;;
(define (full-get-sth acfg fname stmt)
(let* ((dbdat (get-dbh acfg fname))
(db (dbdat-dbh dbdat))
(sths (dbdat-sths dbdat)))
(get-sth db sths stmt)))
;; write to a db
;; acfg: area data
;; rdat: request data
;; hdat: (host . port)
;;
;; (define (dbwrite acfg rdat hdat data-in)
;; (let* ((dbname (car data-in))
;; (dbdat (get-dbh acfg dbname))
;; (db (dbdat-dbh dbdat))
;; (sths (dbdat-sths dbdat))
;; (stmt (calldat-obj rdat))
;; (sth (get-sth db sths stmt))
;; (data (cdr data-in)))
;; (print "dbname: " dbname " acfg: " acfg " rdat: " (calldat->alist rdat) " hdat: " hdat " data: " data)
;; (print "dbdat: " (dbdat->alist dbdat))
;; (apply sqlite3:execute sth data)
;; (save-dbh acfg dbname dbdat)
;; #t
;; ))
(define (finalize-all-db-handles acfg)
(let* ((dbhandles (area-dbhandles acfg)) ;; dbhandles is hash of fname ==> dbdat
(num 0))
(for-each
(lambda (area-name)
(print "Closing handles for " area-name)
(let ((dbdats (hash-table-ref/default dbhandles area-name '())))
(for-each
(lambda (dbdat)
;; first close all statement handles
(for-each
(lambda (sth)
(sqlite3:finalize! sth)
(set! num (+ num 1)))
(hash-table-values (dbdat-sths dbdat)))
;; now close the dbh
(set! num (+ num 1))
(sqlite3:finalize! (dbdat-dbh dbdat)))
dbdats)))
(hash-table-keys dbhandles))
(print "FINALIZED " num " dbhandles")))
;;======================================================================
;; W O R K Q U E U E H A N D L I N G
;;======================================================================
(define (register-db-as-mine acfg dbname)
(let ((ht (area-dbs acfg)))
(if (not (hash-table-ref/default ht dbname #f))
(hash-table-set! ht dbname (random 10000)))))
(define (work-queue-add acfg fname witem)
(let* ((work-queue-start (current-milliseconds))
(action (witem-action witem)) ;; NB the action is the index into the rdat actions
(qdat (or (hash-table-ref/default (area-wqueues acfg) fname #f)
(let ((newqdat (make-qdat)))
(hash-table-set! (area-wqueues acfg) fname newqdat)
newqdat)))
(rdat (hash-table-ref/default (area-rtable acfg) action #f)))
(if rdat
(queue-add!
(case (calldat-ctype rdat)
((dbwrite) (register-db-as-mine acfg fname)(qdat-writeq qdat))
((dbread) (register-db-as-mine acfg fname)(qdat-readq qdat))
((dbrw) (register-db-as-mine acfg fname)(qdat-rwq qdat))
((oslong) (qdat-oslong qdat))
((osshort) (qdat-osshort qdat))
((full-ping) (qdat-misc qdat))
(else
(print "ERROR: no queue for " action ". Adding to dbwrite queue.")
(qdat-writeq qdat)))
witem)
(case action
((full-ping)(qdat-misc qdat))
(else
(print "ERROR: No action " action " was registered"))))
(sdbg> "work-queue-add" "queue-add" work-queue-start #f #f)
#t)) ;; for now, simply return #t to indicate request got to the queue
(define (doqueue acfg q fname dbdat dbh)
;; (print "doqueue: " fname)
(let* ((start-time (current-milliseconds))
(qlen (queue-length q)))
(if (> qlen 1)
(print "Processing queue of length " qlen))
(let loop ((count 0)
(responses '()))
(let ((delta (- (current-milliseconds) start-time)))
(if (or (queue-empty? q)
(> delta 400)) ;; stop working on this queue after 400ms have passed
(list count delta responses) ;; return count, delta and responses list
(let* ((witem (queue-remove! q))
(action (witem-action witem))
(rdat (witem-rdat witem))
(stmt (calldat-obj rdat))
(sth (full-get-sth acfg fname stmt))
(ctype (calldat-ctype rdat))
(data (witem-data witem))
(cookie (witem-cookie witem)))
;; do the processing and save the result in witem-result
(witem-result-set!
witem
(case ctype ;; action
((noblockwrite) ;; blind write, no ack of success returned
(apply sqlite3:execute sth data)
(sqlite3:last-insert-rowid dbh))
((dbwrite) ;; blocking write
(apply sqlite3:execute sth data)
#t)
((dbread) ;; TODO: consider breaking this up and shipping in pieces for large query
(apply sqlite3:map-row (lambda x x) sth data))
((full-ping) 'full-ping)
(else (print "Not ready for action " action) #f)))
(loop (add1 count)
(if cookie
(cons witem responses)
responses))))))))
;; do up to 400ms of processing on each queue
;; - the work-queue-processor will allow the max 1200ms of work to complete but it will flag as overloaded
;;
(define (process-db-queries acfg fname)
(if (hash-table-exists? (area-wqueues acfg) fname)
(let* ((process-db-queries-start-time (current-milliseconds))
(qdat (hash-table-ref/default (area-wqueues acfg) fname #f))
(queue-sym->queue (lambda (queue-sym)
(case queue-sym ;; lookup the queue from qdat given a name (symbol)
((wqueue) (qdat-writeq qdat))
((rqueue) (qdat-readq qdat))
((rwqueue) (qdat-rwq qdat))
((misc) (qdat-misc qdat))
(else #f))))
(dbdat (get-dbh acfg fname))
(dbh (if (dbdat? dbdat)(dbdat-dbh dbdat) #f))
(nowtime (current-seconds)))
;; handle the queues that require a transaction
;;
(map ;;
(lambda (queue-sym)
;; (print "processing queue " queue-sym)
(let* ((queue (queue-sym->queue queue-sym)))
(if (not (queue-empty? queue))
(let ((responses
(sqlite3:with-transaction ;; todo - catch exceptions...
dbh
(lambda ()
(let* ((res (doqueue acfg queue fname dbdat dbh))) ;; this does the work!
;; (print "res=" res)
(match res
((count delta responses)
(update-stats acfg fname queue-sym delta count)
(sdbg> "process-db-queries" "sqlite3-transaction" process-db-queries-start-time #f #f)
responses) ;; return responses
(else
(print "ERROR: bad return data from doqueue " res)))
)))))
;; having completed the transaction, send the responses.
;; (print "INFO: sending " (length responses) " responses.")
(let loop ((responses-left responses))
(cond
((null? responses-left) #t)
(else
(let* ((witem (car responses-left))
(response (cdr responses-left)))
(call-deliver-response acfg (witem-ripaddr witem)(witem-rport witem)
(witem-cookie witem)(witem-result witem)))
(loop (cdr responses-left))))))
)))
'(wqueue rwqueue rqueue))
;; handle misc queue
;;
;; (print "processing misc queue")
(let ((queue (queue-sym->queue 'misc)))
(doqueue acfg queue fname dbdat dbh))
;; ....
(save-dbh acfg fname dbdat)
#t ;; just to let the tests know we got here
)
#f ;; nothing processed
))
;; run all queues in parallel per db but sequentially per queue for that db.
;; - process the queues every 500 or so ms
;; - allow for long running queries to continue but all other activities for that
;; db will be blocked.
;;
(define (work-queue-processor acfg)
(let* ((threads (make-hash-table))) ;; fname => thread
(let loop ((fnames (hash-table-keys (area-wqueues acfg)))
(target-time (+ (current-milliseconds) 50)))
;;(if (not (null? fnames))(print "Processing for these databases: " fnames))
(for-each
(lambda (fname)
;; (print "processing for " fname)
;;(process-db-queries acfg fname))
(let ((th (hash-table-ref/default threads fname #f)))
(if (and th (not (member (thread-state th) '(dead terminated))))
(begin
(print "WARNING: worker thread for " fname " is taking a long time.")
(print "Thread is in state " (thread-state th)))
(let ((th1 (make-thread (lambda ()
;; (catch-and-dump
;; (lambda ()
;; (print "Process queries for " fname)
(let ((start-time (current-milliseconds)))
(process-db-queries acfg fname)
;; (thread-sleep! 0.01) ;; need the thread to take at least some time
(hash-table-delete! threads fname)) ;; no mutexes?
fname)
"th1"))) ;; ))
(hash-table-set! threads fname th1)
(thread-start! th1)))))
fnames)
;; (thread-sleep! 0.1) ;; give the threads some time to process requests
;; burn time until 400ms is up
(let ((now-time (current-milliseconds)))
(if (< now-time target-time)
(let ((delta (- target-time now-time)))
(thread-sleep! (/ delta 1000)))))
(loop (hash-table-keys (area-wqueues acfg))
(+ (current-milliseconds) 50)))))
;;======================================================================
;; S T A T S G A T H E R I N G
;;======================================================================
(defstruct stat
(qcount-avg 0) ;; coarse running average
(qtime-avg 0) ;; coarse running average
(qcount 0) ;; total
(qtime 0) ;; total
(last-qcount 0) ;; last
(last-qtime 0) ;; last
(dbs '()) ;; list of db files handled by this node
(when 0)) ;; when the last query happened - seconds
(define (update-stats acfg fname bucket duration numqueries)
(let* ((key fname) ;; for now do not use bucket. Was: (conc fname "-" bucket)) ;; lazy but good enough
(stats (or (hash-table-ref/default (area-stats acfg) key #f)
(let ((newstats (make-stat)))
(hash-table-set! (area-stats acfg) key newstats)
newstats))))
;; when the last query happended (used to remove the fname from the active list)
(stat-when-set! stats (current-seconds))
;; last values
(stat-last-qcount-set! stats numqueries)
(stat-last-qtime-set! stats duration)
;; total over process lifetime
(stat-qcount-set! stats (+ (stat-qcount stats) numqueries))
(stat-qtime-set! stats (+ (stat-qtime stats) duration))
;; coarse average
(stat-qcount-avg-set! stats (/ (+ (stat-qcount-avg stats) numqueries) 2))
(stat-qtime-avg-set! stats (/ (+ (stat-qtime-avg stats) duration) 2))
;; here is where we add the stats for a given dbfile
(if (not (member fname (stat-dbs stats)))
(stat-dbs-set! stats (cons fname (stat-dbs stats))))
))
;;======================================================================
;; S E R V E R S T U F F
;;======================================================================
;; this does NOT return!
;;
(define (find-free-port-and-open acfg)
(let ((port (or (area-port acfg) 3200)))
(handle-exceptions
exn
(begin
(print "INFO: cannot bind to port " (rpc:default-server-port) ", trying next port")
(area-port-set! acfg (+ port 1))
(find-free-port-and-open acfg))
(rpc:default-server-port port)
(area-port-set! acfg port)
(tcp-read-timeout 120000)
;; ((rpc:make-server (tcp-listen port)) #t)
(tcp-listen (rpc:default-server-port)
))))
;; register this node by putting a packet into the pkts dir.
;; look for other servers
;; contact other servers and compile list of servers
;; there are two types of server
;; main servers - dashboards, runners and dedicated servers - need pkt
;; passive servers - test executers, step calls, list-runs - no pkt
;;
(define (register-node acfg hostip port-num)
;;(mutex-lock! (area-mutex acfg))
(let* ((server-type (area-server-type acfg)) ;; auto, main, passive (no pkt created)
(best-ip (or hostip (get-my-best-address)))
(mtdir (area-dbdir acfg))
(pktdir (area-pktsdir acfg))) ;; conc mtdir "/.server-pkts")))
(print "Registering node " best-ip ":" port-num)
(if (not mtdir) ;; require a home for this node to put or find databases
#f
(begin
(if (not (directory? pktdir))(create-directory pktdir))
;; server is started, now create pkt if needed
(print "Starting server in " server-type " mode with port " port-num)
(if (member server-type '(auto main)) ;; TODO: if auto, count number of servers registers, if > 3 then don't put out a pkt
(begin
(area-pktid-set! acfg
(write-alist->pkt
pktdir
`((hostname . ,(get-host-name))
(ipaddr . ,best-ip)
(port . ,port-num)
(pid . ,(current-process-id)))
pktspec: *pktspec*
ptype: 'server))
(area-pktfile-set! acfg (conc pktdir "/" (area-pktid acfg) ".pkt"))))
(area-port-set! acfg port-num)
#;(mutex-unlock! (area-mutex acfg))))))
(define *cookie-seqnum* 0)
(define (make-cookie key)
(set! *cookie-seqnum* (add1 *cookie-seqnum*))
;;(print "MAKE COOKIE CALLED -- on "servkey"-"*cookie-seqnum*)
(conc key "-" *cookie-seqnum*)
)
;; dispatch locally if possible
;;
(define (call-deliver-response acfg ipaddr port cookie data)
(if (and (equal? (area-myaddr acfg) ipaddr)
(equal? (area-port acfg) port))
(deliver-response acfg cookie data)
((rpc:procedure 'response ipaddr port) cookie data)))
(define (deliver-response acfg cookie data)
(let ((deliver-response-start (current-milliseconds)))
(thread-start! (make-thread
(lambda ()
(let loop ((tries-left 5))
;;(print "TOP OF DELIVER_RESPONSE LOOP; triesleft="tries-left)
;;(pp (hash-table->alist (area-cookie2mbox acfg)))
(let* ((mbox (hash-table-ref/default (area-cookie2mbox acfg) cookie #f)))
(cond
((eq? 0 tries-left)
(print "ulex:deliver-response: I give up. Mailbox never appeared. cookie="cookie)
)
(mbox
;;(print "got mbox="mbox" got data="data" send.")
(mailbox-send! mbox data))
(else
;;(print "no mbox yet. look for "cookie)
(thread-sleep! (/ (- 6 tries-left) 10))
(loop (sub1 tries-left))))))
;; (debug-pp (list (conc "ulex:deliver-response took " (- (current-milliseconds) deliver-response-start) " ms, cookie=" cookie " data=") data))
(sdbg> "deliver-response" "mailbox-send" deliver-response-start #f #f cookie)
)
(conc "deliver-response thread for cookie="cookie))))
#t)
;; action:
;; immediate - quick actions, no need to put in queues
;; dbwrite - put in dbwrite queue
;; dbread - put in dbread queue
;; oslong - os actions, e.g. du, that could take a long time
;; osshort - os actions that should be quick, e.g. df
;;
(define (request acfg from-ipaddr from-port servkey action cookie fname params) ;; std-peer-handler
;; NOTE: Use rpc:current-peer for getting return address
(let* ((std-peer-handler-start (current-milliseconds))
;; (raw-data (alist-ref 'data dat))
(rdat (hash-table-ref/default
(area-rtable acfg) action #f)) ;; this looks up the sql query or other details indexed by the action
(witem (make-witem ripaddr: from-ipaddr ;; rhost: from-host
rport: from-port action: action
rdat: rdat cookie: cookie
servkey: servkey data: params ;; TODO - rename data to params
caller: (rpc:current-peer))))
(if (not (equal? servkey (area-pktid acfg)))
`(#f . ,(conc "I don't know you servkey=" servkey ", pktid=" (area-pktid acfg))) ;; immediately return this
(let* ((ctype (if rdat
(calldat-ctype rdat) ;; is this necessary? these should be identical
action)))
(sdbg> "std-peer-handler" "immediate" std-peer-handler-start #f #f)
(case ctype
;; (dbwrite acfg rdat (cons from-ipaddr from-port) data)))
((full-ping) `(#t "ack to full ping" ,(work-queue-add acfg fname witem) ,cookie))
((response) `(#t "ack from requestor" ,(deliver-response acfg fname params)))
((dbwrite) `(#t "db write submitted" ,(work-queue-add acfg fname witem) ,cookie))
((dbread) `(#t "db read submitted" ,(work-queue-add acfg fname witem) ,cookie ))
((dbrw) `(#t "db read/write submitted" ,cookie))
((osshort) `(#t "os short submitted" ,cookie))
((oslong) `(#t "os long submitted" ,cookie))
(else `(#f "unrecognised action" ,ctype)))))))
;; Call this to start the actual server
;;
;; start_server
;;
;; mode: '
;; handler: proc which takes pktrecieved as argument
;;
(define (start-server acfg)
(let* ((conn (find-free-port-and-open acfg))
(port (area-port acfg)))
(rpc:publish-procedure!
'delist-db
(lambda (fname)
(hash-table-delete! (area-dbs acfg) fname)))
(rpc:publish-procedure!
'calling-addr
(lambda ()
(rpc:current-peer)))
(rpc:publish-procedure!
'ping
(lambda ()(real-ping acfg)))
(rpc:publish-procedure!
'request
(lambda (from-addr from-port servkey action cookie dbname params)
(request acfg from-addr from-port servkey action cookie dbname params)))
(rpc:publish-procedure!
'response
(lambda (cookie res-dat)
(deliver-response acfg cookie res-dat)))
(area-ready-set! acfg #t)
(area-conn-set! acfg conn)
((rpc:make-server conn) #f)));; ((tcp-listen (rpc:default-server-port)) #t)
(define (launch acfg) ;; #!optional (proc std-peer-handler))
(print "starting launch")
(update-known-servers acfg) ;; gotta do this on every start (thus why limit number of publicised servers)
#;(let ((original-handler (current-exception-handler))) ;; is th
(lambda (exception)
(server-exit-procedure)
(original-handler exception)))
(on-exit (lambda ()
(shutdown acfg))) ;; (finalize-all-db-handles acfg)))
;; set up the rpc handler
(let* ((th1 (make-thread
(lambda ()(start-server acfg))
"server thread"))
(th2 (make-thread
(lambda ()
(print "th2 starting")
(let loop ()
(work-queue-processor acfg)
(print "work-queue-processor crashed!")
(loop)))
"work queue thread")))
(thread-start! th1)
(thread-start! th2)
(let loop ()
(thread-sleep! 0.025)
(if (area-ready acfg)
#t
(loop)))
;; attempt to fix my address
(let* ((all-addr (get-all-ips-sorted))) ;; could use (tcp-addresses conn)?
(let loop ((rem-addrs all-addr))
(if (null? rem-addrs)
(begin
(print "ERROR: Failed to figure out the ip address of myself as a server. Giving up.")
(exit 1)) ;; BUG Changeme to raising an exception
(let* ((addr (car rem-addrs))
(good-addr (handle-exceptions
exn
#f
((rpc:procedure 'calling-addr addr (area-port acfg))))))
(if good-addr
(begin
(print "Got good-addr of " good-addr)
(area-myaddr-set! acfg good-addr))
(loop (cdr rem-addrs)))))))
(register-node acfg (area-myaddr acfg)(area-port acfg))
(print "INFO: Server started on " (area-myaddr acfg) ":" (area-port acfg))
;; (update-known-servers acfg) ;; gotta do this on every start (thus why limit number of publicised servers)
))
(define (clear-server-pkt acfg)
(let ((pktf (area-pktfile acfg)))
(if pktf (delete-file* pktf))))
(define (shutdown acfg)
(let (;;(conn (area-conn acfg))
(pktf (area-pktfile acfg))
(port (area-port acfg)))
(if pktf (delete-file* pktf))
(send-all "imshuttingdown")
;; (rpc:close-all-connections!) ;; don't know if this is actually needed
(finalize-all-db-handles acfg)))
(define (send-all msg)
#f)
;; given a area record look up all the packets
;;
(define (get-all-server-pkts acfg)
(let ((all-pkt-files (glob (conc (area-pktsdir acfg) "/*.pkt"))))
(map (lambda (pkt-file)
(read-pkt->alist pkt-file pktspec: *pktspec*))
all-pkt-files)))
#;((Z . "9a0212302295a19610d5796fce0370fa130758e9")
(port . "34827")
(pid . "28748")
(hostname . "zeus")
(T . "server")
(D . "1549427032.0"))
#;(define (get-my-best-address)
(let ((all-my-addresses (get-all-ips))) ;; (vector->list (hostinfo-addresses (hostname->hostinfo (get-host-name))))))
(cond
((null? all-my-addresses)
(get-host-name)) ;; no interfaces?
((eq? (length all-my-addresses) 1)
(ip->string (car all-my-addresses))) ;; only one to choose from, just go with it
(else
(ip->string (car (filter (lambda (x) ;; take any but 127.
(not (eq? (u8vector-ref x 0) 127)))
all-my-addresses)))))))
;; whoami? I am my pkt
;;
(define (whoami? acfg)
(hash-table-ref/default (area-hosts acfg)(area-pktid acfg) #f))
;;======================================================================
;; "Client side" operations
;;======================================================================
(define (safe-call call-key host port . params)
(handle-exceptions
exn
(begin
(print "Call " call-key " to " host ":" port " failed")
#f)
(apply (rpc:procedure call-key host port) params)))
;; ;; convert to/from string / sexpr
;;
;; (define (string->sexpr str)
;; (if (string? str)
;; (with-input-from-string str read)
;; str))
;;
;; (define (sexpr->string s)
;; (with-output-to-string (lambda ()(write s))))
;; is the server alive?
;;
(define (ping acfg host port)
(let* ((myaddr (area-myaddr acfg))
(myport (area-port acfg))
(start-time (current-milliseconds))
(res (if (and (equal? myaddr host)
(equal? myport port))
(real-ping acfg)
((rpc:procedure 'ping host port)))))
(cons (- (current-milliseconds) start-time)
res)))
;; returns ( ipaddr port alist-fname=>randnum )
(define (real-ping acfg)
`(,(area-myaddr acfg) ,(area-port acfg) ,(get-host-stats acfg)))
;; is the server alive AND the queues processing?
;;
#;(define (full-ping acfg servpkt)
(let* ((start-time (current-milliseconds))
(res (send-message acfg servpkt '(full-ping) 'full-ping)))
(cons (- (current-milliseconds) start-time)
res))) ;; (equal? res "got ping"))))
;; look up all pkts and get the server id (the hash), port, host/ip
;; store this info in acfg
;; return the number of responsive servers found
;;
;; DO NOT VERIFY THAT THE SERVER IS ALIVE HERE. This is called at times where the current server is not yet alive and cannot ping itself
;;
(define (update-known-servers acfg)
;; readll all pkts
;; foreach pkt; if it isn't me ping the server; if alive, add to hosts hash, else rm the pkt
(let* ((start-time (current-milliseconds))
(all-pkts (delete-duplicates
(append (get-all-server-pkts acfg)
(hash-table-values (area-hosts acfg)))))
(hostshash (area-hosts acfg))
(my-id (area-pktid acfg))
(pktsdir (area-pktsdir acfg)) ;; needed to remove pkts from non-responsive servers
(numsrvs 0)
(delpkt (lambda (pktsdir sid)
(print "clearing out server " sid)
(delete-file* (conc pktsdir "/" sid ".pkt"))
(hash-table-delete! hostshash sid))))
(area-last-srvup-set! acfg (current-seconds))
(for-each
(lambda (servpkt)
(if (list? servpkt)
;; (pp servpkt)
(let* ((shost (alist-ref 'ipaddr servpkt))
(sport (any->number (alist-ref 'port servpkt)))
(res (handle-exceptions
exn
(begin
;; (print "INFO: bad server on " shost ":" sport)
#f)
(ping acfg shost sport)))
(sid (alist-ref 'Z servpkt)) ;; Z code is our name for the server
(url (conc shost ":" sport))
)
#;(if (or (not res)
(null? res))
(begin
(print "STRANGE: ping of " url " gave " res)))
;; (print "Got " res " from " shost ":" sport)
(match res
((qduration . payload)
;; (print "Server pkt:" (alist-ref 'ipaddr servpkt) ":" (alist-ref 'port servpkt)
;; (if payload
;; "Success" "Fail"))
(match payload
((host port stats)
;; (print "From " host ":" port " got stats: " stats)
(if (and host port stats)
(let ((url (conc host ":" port)))
(hash-table-set! hostshash sid servpkt)
;; store based on host:port
(hash-table-set! (area-hoststats acfg) sid stats))
(print "missing data from the server, not sure what that means!"))
(set! numsrvs (+ numsrvs 1)))
(#f
(print "Removing pkt " sid " due to #f from server or failed ping")
(delpkt pktsdir sid))
(else
(print "Got ")(pp res)(print " from server ")(pp servpkt) " but response did not match (#f/#t . msg)")))
(else
;; here we delete the pkt - can't reach the server, remove it
;; however this logic is inadequate. we should mark the server as checked
;; and not good, if it happens a second time - then remove the pkt
;; or something similar. I.e. don't be too quick to assume the server is wedged or dead
;; could be it is simply too busy to reply
(let ((bad-pings (hash-table-ref/default (area-health acfg) url 0)))
(if (> bad-pings 1) ;; two bad pings - remove pkt
(begin
(print "INFO: " bad-pings " bad responses from " url ", deleting pkt " sid)
(delpkt pktsdir sid))
(begin
(print "INFO: " bad-pings " bad responses from " shost ":" sport " not deleting pkt yet")
(hash-table-set! (area-health acfg)
url
(+ (hash-table-ref/default (area-health acfg) url 0) 1))
))
))))
;; servpkt is not actually a pkt?
(begin
(print "Bad pkt " servpkt))))
all-pkts)
(sdbg> "update-known-servers" "end" start-time #f #f " found " numsrvs
" servers, pkts: " (map (lambda (p)
(alist-ref 'Z p))
all-pkts))
numsrvs))
(defstruct srvstat
(numfiles 0) ;; number of db files handled by this server - subtract 1 for the db being currently looked at
(randnum #f) ;; tie breaker number assigned to by the server itself - applies only to the db under consideration
(pkt #f)) ;; the server pkt
;;(define (srv->srvstat srvpkt)
;; Get the server best for given dbname and key
;;
;; NOTE: key is not currently used. The key points to the kind of query, this may be useful for directing read-only queries.
;;
(define (get-best-server acfg dbname key)
(let* (;; (servers (hash-table-values (area-hosts acfg)))
(servers (area-hosts acfg))
(skeys (sort (hash-table-keys servers) string>=?)) ;; a stable listing
(start-time (current-milliseconds))
(srvstats (make-hash-table)) ;; srvid => srvstat
(url (conc (area-myaddr acfg) ":" (area-port acfg))))
;; (print "scores for " dbname ": " (map (lambda (k)(cons k (calc-server-score acfg dbname k))) skeys))
(if (null? skeys)
(if (> (update-known-servers acfg) 0)
(get-best-server acfg dbname key) ;; some risk of infinite loop here, TODO add try counter
(begin
(print "ERROR: no server found!") ;; since this process is also a server this should never happen
#f))
(begin
;; (print "in get-best-server with skeys=" skeys)
(if (> (- (current-seconds) (area-last-srvup acfg)) 10)
(begin
(update-known-servers acfg)
(sdbg> "get-best-server" "update-known-servers" start-time #f #f)))
;; for each server look at the list of dbfiles, total number of dbs being handled
;; and the rand number, save the best host
;; also do a delist-db for each server dbfile not used
(let* ((best-server #f)
(servers-to-delist (make-hash-table)))
(for-each
(lambda (srvid)
(let* ((server (hash-table-ref/default servers srvid #f))
(stats (hash-table-ref/default (area-hoststats acfg) srvid '(()))))
;; (print "stats: " stats)
(if server
(let* ((dbweights (car stats))
(srvload (length (filter (lambda (x)(not (equal? dbname (car x)))) dbweights)))
(dbrec (alist-ref dbname dbweights equal?)) ;; get the pair with fname . randscore
(randnum (if dbrec
dbrec ;; (cdr dbrec)
0)))
(hash-table-set! srvstats srvid (make-srvstat numfiles: srvload randnum: randnum pkt: server))))))
skeys)
(let* ((sorted (sort (hash-table-values srvstats)
(lambda (a b)
(let ((numfiles-a (srvstat-numfiles a))
(numfiles-b (srvstat-numfiles b))
(randnum-a (srvstat-randnum a))
(randnum-b (srvstat-randnum b)))
(if (< numfiles-a numfiles-b) ;; Note, I don't think adding an offset works here. Goal was only move file handling to a different server if it has 2 less
#t
(if (and (equal? numfiles-a numfiles-b)
(< randnum-a randnum-b))
#t
#f))))))
(best (if (null? sorted)
(begin
(print "ERROR: should never be null due to self as server.")
#f)
(srvstat-pkt (car sorted)))))
#;(print "SERVER(" url "): " dbname ": " (map (lambda (srv)
(let ((p (srvstat-pkt srv)))
(conc (alist-ref 'ipaddr p) ":" (alist-ref 'port p)
"(" (srvstat-numfiles srv)","(srvstat-randnum srv)")")))
sorted))
best))))))
;; send out an "I'm about to exit notice to all known servers"
;;
(define (death-imminent acfg)
'())
;;======================================================================
;; U L E X - T H E I N T E R E S T I N G S T U F F ! !
;;======================================================================
;; register a handler
;; NOTES:
;; dbinitsql is reserved for a list of sql statements for initializing the db
;; dbinitfn is reserved for a db init function, if exists called after dbinitsql
;;
(define (register acfg key obj #!optional (ctype 'dbwrite))
(let ((ht (area-rtable acfg)))
(if (hash-table-exists? ht key)
(print "WARNING: redefinition of entry " key))
(hash-table-set! ht key (make-calldat obj: obj ctype: ctype))))
;; usage: register-batch acfg '((key1 . sql1) (key2 . sql2) ... )
;; NB// obj is often an sql query
;;
(define (register-batch acfg ctype data)
(let ((ht (area-rtable acfg)))
(map (lambda (dat)
(hash-table-set! ht (car dat)(make-calldat obj: (cdr dat) ctype: ctype)))
data)))
(define (initialize-area-calls-from-specfile area specfile)
(let* ((callspec (with-input-from-file specfile read )))
(for-each (lambda (group)
(register-batch
area
(car group)
(cdr group)))
callspec)))
;; get-rentry
;;
(define (get-rentry acfg key)
(hash-table-ref/default (area-rtable acfg) key #f))
(define (get-rsql acfg key)
(let ((cdat (get-rentry acfg key)))
(if cdat
(calldat-obj cdat)
#f)))
;; blocking call:
;; client server
;; ------ ------
;; call()
;; send-message()
;; nmsg-send()
;; nmsg-receive()
;; nmsg-respond(ack,cookie)
;; ack, cookie
;; mbox-thread-wait(cookie)
;; nmsg-send(client,cookie,result)
;; nmsg-respond(ack)
;; return result
;;
;; reserved action:
;; 'immediate
;; 'dbinitsql
;;
(define (call acfg dbname action params #!optional (count 0))
(let* ((call-start-time (current-milliseconds))
(srv (get-best-server acfg dbname action))
(post-get-start-time (current-milliseconds))
(rdat (hash-table-ref/default (area-rtable acfg) action #f))
(myid (trim-pktid (area-pktid acfg)))
(srvid (trim-pktid (alist-ref 'Z srv)))
(cookie (make-cookie myid)))
(sdbg> "call" "get-best-server" call-start-time #f call-start-time " from: " myid " to server: " srvid " for " dbname " action: " action " params: " params " rdat: " rdat)
(print "INFO: call to " (alist-ref 'ipaddr srv) ":" (alist-ref 'port srv) " from " (area-myaddr acfg) ":" (area-port acfg) " for " dbname)
(if (and srv rdat) ;; need both to dispatch a request
(let* ((ripaddr (alist-ref 'ipaddr srv))
(rsrvid (alist-ref 'Z srv))
(rport (any->number (alist-ref 'port srv)))
(res-full (if (and (equal? ripaddr (area-myaddr acfg))
(equal? rport (area-port acfg)))
(request acfg ripaddr rport (area-pktid acfg) action cookie dbname params)
(safe-call 'request ripaddr rport
(area-myaddr acfg)
(area-port acfg)
#;(area-pktid acfg)
rsrvid
action cookie dbname params))))
;; (print "res-full: " res-full)
(match res-full
((response-ok response-msg rem ...)
(let* ((send-message-time (current-milliseconds))
;; (match res-full
;; ((response-ok response-msg)
;; (response-ok (car res-full))
;; (response-msg (cadr res-full)
)
;; (res (take res-full 3))) ;; ctype == action, TODO: converge on one term <<=== what was this? BUG
;; (print "ulex:call: send-message took " (- send-message-time post-get-start-time) " ms params=" params)
(sdbg> "call" "send-message" post-get-start-time #f call-start-time)
(cond
((not response-ok) #f)
((member response-msg '("db read submitted" "db write submitted"))
(let* ((cookie-id (cadddr res-full))
(mbox (make-mailbox))
(mbox-time (current-milliseconds)))
(hash-table-set! (area-cookie2mbox acfg) cookie-id mbox)
(let* ((mbox-timeout-secs 20)
(mbox-timeout-result 'MBOX_TIMEOUT)
(res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result))
(mbox-receive-time (current-milliseconds)))
(hash-table-delete! (area-cookie2mbox acfg) cookie-id)
(sdbg> "call" "mailbox-receive" mbox-time #f call-start-time " from: " myid " to server: " srvid " for " dbname)
;; (print "ulex:call mailbox-receive took " (- mbox-receive-time mbox-time) "ms params=" params)
res)))
(else
(print "Unhandled response \""response-msg"\"")
#f))
;; depending on what action (i.e. ctype) is we will block here waiting for
;; all the data (mechanism to be determined)
;;
;; if res is a "working on it" then wait
;; wait for result
;; mailbox thread wait on
;; if res is a "can't help you" then try a different server
;; if res is a "ack" (e.g. for one-shot requests) then return res
))
(else
(if (< count 10)
(let* ((url (conc (alist-ref 'ipaddr srv) ":" (alist-ref 'port srv))))
(thread-sleep! 1)
(print "ERROR: Bad result from " url ", dbname: " dbname ", action: " action ", params: " params ". Trying again in 1 second.")
(call acfg dbname action params (+ count 1)))
(begin
(error (conc "ERROR: " count " tries, still have improper response res-full=" res-full)))))))
(begin
(if (not rdat)
(print "ERROR: action " action " not registered.")
(if (< count 10)
(begin
(thread-sleep! 1)
(area-hosts-set! acfg (make-hash-table)) ;; clear out all known hosts
(print "ERROR: no server found, srv=" srv ", trying again in 1 seconds")
(call acfg dbname action params (+ count 1)))
(begin
(error (conc "ERROR: no server found after 10 tries, srv=" srv ", giving up."))
#;(error "No server available"))))))))
;;======================================================================
;; U T I L I T I E S
;;======================================================================
;; get a signature for identifing this process
;;
(define (get-process-signature)
(cons (get-host-name)(current-process-id)))
;;======================================================================
;; S Y S T E M S T U F F
;;======================================================================
;; get normalized cpu load by reading from /proc/loadavg and
;; /proc/cpuinfo return all three values and the number of real cpus
;; and the number of threads returns alist '((adj-cpu-load
;; . normalized-proc-load) ... etc. keys: adj-proc-load,
;; adj-core-load, 1m-load, 5m-load, 15m-load
;;
(define (get-normalized-cpu-load)
(let ((res (get-normalized-cpu-load-raw))
(default `((adj-proc-load . 2) ;; there is no right answer
(adj-core-load . 2)
(1m-load . 2)
(5m-load . 0) ;; causes a large delta - thus causing default of throttling if stuff goes wrong
(15m-load . 0)
(proc . 1)
(core . 1)
(phys . 1)
(error . #t))))
(cond
((and (list? res)
(> (length res) 2))
res)
((eq? res #f) default) ;; add messages?
((eq? res #f) default) ;; this would be the #eof
(else default))))
(define (get-normalized-cpu-load-raw)
(let* ((actual-host (get-host-name))) ;; #f is localhost
(let ((data (append
(with-input-from-file "/proc/loadavg" read-lines)
(with-input-from-file "/proc/cpuinfo" read-lines)
(list "end")))
(load-rx (regexp "^([\\d\\.]+)\\s+([\\d\\.]+)\\s+([\\d\\.]+)\\s+.*$"))
(proc-rx (regexp "^processor\\s+:\\s+(\\d+)\\s*$"))
(core-rx (regexp "^core id\\s+:\\s+(\\d+)\\s*$"))
(phys-rx (regexp "^physical id\\s+:\\s+(\\d+)\\s*$"))
(max-num (lambda (p n)(max (string->number p) n))))
;; (print "data=" data)
(if (null? data) ;; something went wrong
#f
(let loop ((hed (car data))
(tal (cdr data))
(loads #f)
(proc-num 0) ;; processor includes threads
(phys-num 0) ;; physical chip on motherboard
(core-num 0)) ;; core
;; (print hed ", " loads ", " proc-num ", " phys-num ", " core-num)
(if (null? tal) ;; have all our data, calculate normalized load and return result
(let* ((act-proc (+ proc-num 1))
(act-phys (+ phys-num 1))
(act-core (+ core-num 1))
(adj-proc-load (/ (car loads) act-proc))
(adj-core-load (/ (car loads) act-core))
(result
(append (list (cons 'adj-proc-load adj-proc-load)
(cons 'adj-core-load adj-core-load))
(list (cons '1m-load (car loads))
(cons '5m-load (cadr loads))
(cons '15m-load (caddr loads)))
(list (cons 'proc act-proc)
(cons 'core act-core)
(cons 'phys act-phys)))))
result)
(regex-case
hed
(load-rx ( x l1 l5 l15 ) (loop (car tal)(cdr tal)(map string->number (list l1 l5 l15)) proc-num phys-num core-num))
(proc-rx ( x p ) (loop (car tal)(cdr tal) loads (max-num p proc-num) phys-num core-num))
(phys-rx ( x p ) (loop (car tal)(cdr tal) loads proc-num (max-num p phys-num) core-num))
(core-rx ( x c ) (loop (car tal)(cdr tal) loads proc-num phys-num (max-num c core-num)))
(else
(begin
;; (print "NO MATCH: " hed)
(loop (car tal)(cdr tal) loads proc-num phys-num core-num))))))))))
(define (get-host-stats acfg)
(let ((stats-hash (area-stats acfg)))
;; use this opportunity to remove references to dbfiles which have not been accessed in a while
(for-each
(lambda (dbname)
(let* ((stats (hash-table-ref stats-hash dbname))
(last-access (stat-when stats)))
(if (and (> last-access 0) ;; if zero then there has been no access
(> (- (current-seconds) last-access) 10)) ;; not used in ten seconds
(begin
(print "Removing " dbname " from stats list")
(hash-table-delete! stats-hash dbname) ;; remove from stats hash
(stat-dbs-set! stats (hash-table-keys stats))))))
(hash-table-keys stats-hash))
`(,(hash-table->alist (area-dbs acfg)) ;; dbname => randnum
,(map (lambda (dbname) ;; dbname is the db name
(cons dbname (stat-when (hash-table-ref stats-hash dbname))))
(hash-table-keys stats-hash))
(cpuload . ,(get-normalized-cpu-load)))))
#;(stats . ,(map (lambda (k) ;; create an alist from the stats data
(cons k (stat->alist (hash-table-ref (area-stats acfg) k))))
(hash-table-keys (area-stats acfg))))
#;(trace
;; assv
;; cdr
;; caar
;; ;; cdr
;; call
;; finalize-all-db-handles
;; get-all-server-pkts
;; get-normalized-cpu-load
;; get-normalized-cpu-load-raw
;; launch
;; nmsg-send
;; process-db-queries
;; receive-message
;; std-peer-handler
;; update-known-servers
;; work-queue-processor
)
;;======================================================================
;; netutil
;; move this back to ulex-netutil.scm someday?
;;======================================================================
;; #include <stdio.h>
;; #include <netinet/in.h>
;; #include <string.h>
;; #include <arpa/inet.h>
(foreign-declare "#include \"sys/types.h\"")
(foreign-declare "#include \"sys/socket.h\"")
(foreign-declare "#include \"ifaddrs.h\"")
(foreign-declare "#include \"arpa/inet.h\"")
;; get IP addresses from ALL interfaces
(define get-all-ips
(foreign-safe-lambda* scheme-object ()
"
// from https://stackoverflow.com/questions/17909401/linux-c-get-default-interfaces-ip-address :
C_word lst = C_SCHEME_END_OF_LIST, len, str, *a;
// struct ifaddrs *ifa, *i;
// struct sockaddr *sa;
struct ifaddrs * ifAddrStruct = NULL;
struct ifaddrs * ifa = NULL;
void * tmpAddrPtr = NULL;
if ( getifaddrs(&ifAddrStruct) != 0)
C_return(C_SCHEME_FALSE);
// for (i = ifa; i != NULL; i = i->ifa_next) {
for (ifa = ifAddrStruct; ifa != NULL; ifa = ifa->ifa_next) {
if (ifa->ifa_addr->sa_family==AF_INET) { // Check it is
// a valid IPv4 address
tmpAddrPtr = &((struct sockaddr_in *)ifa->ifa_addr)->sin_addr;
char addressBuffer[INET_ADDRSTRLEN];
inet_ntop(AF_INET, tmpAddrPtr, addressBuffer, INET_ADDRSTRLEN);
// printf(\"%s IP Address %s\\n\", ifa->ifa_name, addressBuffer);
len = strlen(addressBuffer);
a = C_alloc(C_SIZEOF_PAIR + C_SIZEOF_STRING(len));
str = C_string(&a, len, addressBuffer);
lst = C_a_pair(&a, str, lst);
}
// else if (ifa->ifa_addr->sa_family==AF_INET6) { // Check it is
// // a valid IPv6 address
// tmpAddrPtr = &((struct sockaddr_in6 *)ifa->ifa_addr)->sin6_addr;
// char addressBuffer[INET6_ADDRSTRLEN];
// inet_ntop(AF_INET6, tmpAddrPtr, addressBuffer, INET6_ADDRSTRLEN);
//// printf(\"%s IP Address %s\\n\", ifa->ifa_name, addressBuffer);
// len = strlen(addressBuffer);
// a = C_alloc(C_SIZEOF_PAIR + C_SIZEOF_STRING(len));
// str = C_string(&a, len, addressBuffer);
// lst = C_a_pair(&a, str, lst);
// }
// else {
// printf(\" not an IPv4 address\\n\");
// }
}
freeifaddrs(ifa);
C_return(lst);
"))
;; Change this to bias for addresses with a reasonable broadcast value?
;;
(define (ip-pref-less? a b)
(let* ((rate (lambda (ipstr)
(regex-case ipstr
( "^127\\." _ 0 )
( "^(10\\.0|192\\.168\\.)\\..*" _ 1 )
( else 2 ) ))))
(< (rate a) (rate b))))
(define (get-my-best-address)
(let ((all-my-addresses (get-all-ips))
;;(all-my-addresses-old (vector->list (hostinfo-addresses (hostname->hostinfo (get-host-name)))))
)
(cond
((null? all-my-addresses)
(get-host-name)) ;; no interfaces?
((eq? (length all-my-addresses) 1)
(car all-my-addresses)) ;; only one to choose from, just go with it
(else
(car (sort all-my-addresses ip-pref-less?)))
;; (else
;; (ip->string (car (filter (lambda (x) ;; take any but 127.
;; (not (eq? (u8vector-ref x 0) 127)))
;; all-my-addresses))))
)))
(define (get-all-ips-sorted)
(sort (get-all-ips) ip-pref-less?))
)