testmeta-get-record))
;; These are called by the server on recipt of /api calls
;; - keep it simple, only return the actual result of the call, i.e. no meta info here
;;
(define (api:execute-requests dbstruct cmd params)
(let ((res
(case (if (symbol? cmd)
cmd
(case (string->symbol cmd)
(string->symbol cmd))
;; SERVERS
((start-server) (apply server:kind-run params))
((kill-server) (set! *server-run* #f))
;; KEYS
((get-key-val-pairs) (apply db:get-key-val-pairs dbstruct params))
((get-keys) (db:get-keys dbstruct))
(define *inmemdb* #f)
(define *task-db* #f) ;; (vector db path-to-db)
(define *db-access-allowed* #t) ;; flag to allow access
(define *db-access-mutex* (make-mutex))
;; SERVER
(define *my-client-signature* #f)
(define *transport-type* 'nm)
(define *transport-type* 'nmsg)
(define *runremote* (make-hash-table)) ;; if set up for server communication this will hold <host port>
(define *max-cache-size* 0)
(define *logged-in-clients* (make-hash-table))
(define *client-non-blocking-mode* #f)
(define *server-id* #f)
(define *server-info* #f)
(define *time-to-exit* #f)
(begin
(debug:print-info 0 "Another server took the slot, exiting")
(exit 0))))
(begin
;; since we didn't get the server lock we are going to clean up and bail out
(debug:print-info 2 "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue")
(tasks:server-delete-records-for-this-pid (db:delay-if-busy tdbdat) " http-transport:launch")
))) (nmsg-transport:run dbstruct hostn run-id server-id)
))
;; locked in a server id, try to start up
(nmsg-transport:run dbstruct hostn run-id server-id))
(set! *didsomething* #t)
(exit))))
;;======================================================================
;; S E R V E R U T I L I T I E S
;;======================================================================
;;======================================================================
;; ping the server at host:port
;; return the open socket if successful (return-socket == #t)
;; expect the key expected-key returned in payload
;; send our-key or #f as payload
;;
(define (nmsg-transport:ping hostn port #!key (timeout 3)(return-socket #t)(expected-key #f)(our-key #f))
(define (nmsg-transport:ping hostn port #!key (timeout 3)(return-socket #t)(expected-key #f)(our-key #f)(socket #f))
;; send a random number along with pid and check that we get it back
(let* ((req (nn-socket 'req))
(let* ((req (or socket (nn-socket 'req)))
(host (if (or (not hostn)
(equal? hostn "-")) ;; use localhost
(get-host-name)
hostn))
(success #f)
(keepwaiting #t)
(dat (db:obj->string (vector "ping" our-key) transport: 'nmsg))
(if (and keepwaiting (< count timeout)) ;; yes, this is very aproximate
(loop (+ count 1))))
(if keepwaiting
(begin
(print "timeout waiting for ping")
(thread-terminate! ping))))
"timeout")))
(nn-connect req (conc "tcp://" host ":" port))
(if (not socket)(nn-connect req (conc "tcp://" host ":" port)))
(handle-exceptions
exn
(begin
;; (print-call-chain)
;; (print 0 " message: " ((condition-property-accessor 'exn 'message) exn))
;; (print "exn=" (condition->list exn))
(debug:print-info 1 "ping failed to connect to " host ":" port))
(thread-start! timeout)
(thread-start! ping)
(thread-join! ping)
(if success (thread-terminate! timeout)))
(if return-socket
(if success req #f)
(begin
(nn-close req)
(nn-close req) ;; should it be closed if we were handed a socket?
success))))
(define (nmsg-transport:client-connect iface portnum) (let* ((reqsoc (nmsg-transport:ping iface portnum)) (login-res #f)) (nn-connect reqsoc (conc "tcp://" iface ":" portnum)) (debug:print-info 11 "nmsg-transport:client-connect started. Next is login") (set! login-res (client:login serverdat nmsg-sockets)) (if (and (not (null? login-res)) (car login-res)) (begin (debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".") (set! *nm-port* nmsg-sockets) nmsg-sockets) (begin (debug:print-info 2 "Failed to login or connect to " conurl) (set! *runremote* #f) #f))));; run nmsg-transport:keep-running in a parallel thread to monitor that the db is being
;; used and to shutdown after sometime if it is not.
;;
(define (nmsg-transport:keep-running server-id)
;; if none running or if > 20 seconds since
;; server last used then start shutdown
;; This thread waits for the server to come alive
;; SHOULD CLOSE THE CONNECTION HERE
(hash-table-delete! *runremote* run-id)))))
(hash-table-keys *runremote*)))
(mutex-unlock! *db-multi-sync-mutex*)
(let* ((run-id (if rid rid 0))
(connection-info (rmt:get-connection-info run-id))
(jparams (db:obj->string params)))
;; the nmsg method does the encoding under the hood (the http method should be changed to do this also)
(if connection-info
;; use the server if have connection info
(let* ((dat (case *transport-type*
((http)(http-transport:client-api-send-receive run-id connection-info cmd jparams))
((nmsg)(nmsg-transport:client-api-send-receive run-id connection-info cmd jparams))
((nmsg)(nmsg-transport:client-api-send-receive run-id connection-info cmd params))
(else (exit))))
(res (if (vector? dat) (vector-ref dat 1) #f))
(success (if (vector? dat) (vector-ref dat 0) #f)))
(http-transport:server-dat-update-last-access connection-info)
(if success
(case *transport-type*
(db:string->obj res)
((http)(db:string->obj res))
((nmsg) res))
(begin ;; let ((new-connection-info (client:setup run-id)))
(debug:print 0 "WARNING: Communication failed, trying call to http-transport:client-api-send-receive again.")
(hash-table-delete! *runremote* run-id) ;; don't keep using the same connection
;; no longer killing the server in http-transport:client-api-send-receive
;; may kill it here but what are the criteria?
;; start with three calls then kill server
;; all routes though here end in exit ...
;;
;; start_server
;;
(define (server:launch run-id)
(case *transport-type*
((http)(http-transport:launch run-id))
((nm) (nmsg-transport:launch run-id))
((nmsg)(nmsg-transport:launch run-id))
(else (debug:print 0 "ERROR: unknown server type " *transport-type*))))
;;======================================================================
;; Q U E U E M A N A G E M E N T
;;======================================================================
;; We don't want to flush the queue if it was just flushed