Megatest

Diff
Login

Differences From Artifact [67b3f4e15b]:

To Artifact [a18aca67cf]:


134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151

152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
			     (loop))))))))

    ;; The heavy lifting
    ;;
    ;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime
    ;;
    (let loop ((queue-lst '()))
      ;; (print "GOT HERE EH?")
      (let* ((rawmsg (receive-message* pull-socket))
	     (packet (db:string->obj rawmsg)))
	(debug:print-info 12 "server=> received packet=" packet)
	(if (cdb:packet-get-immediate packet) ;; process immediately or put in queue
	    (begin
	      (db:process-queue pubsock (cons packet queue))
	      (loop '()))
	    (loop (cons packet queue)))))))

(define (server:reply pubsock target result)

  (send-message pubsock target send-more: #t)
  (send-message pubsock result))

;; run server:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (server:keep-running)
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  ;; This thread waits for the server to come alive
  (let* ((server-info (let loop ()
			(let ((sdat #f))
			  (mutex-lock! *heartbeat-mutex*)
			  (set! sdat *server-info*)
			  (mutex-unlock! *heartbeat-mutex*)
			  (if sdat sdat
			      (begin
				(sleep 4)
				(loop))))))
	 (iface       (cadr server-info))
	 (pullport    (caddr server-info))
	 (pubport     (cadddr server-info)) ;; id interface pullport pubport)
	 ;; (zmq-sockets (server:client-connect iface pullport pubport)))
	 )
    (let loop ((count 0))
      (thread-sleep! 4) ;; no need to do this very often
      ;;  (let ((queue-len (string->number (cdb:client-call zmq-sockets 'sync #t 1))))
      ;; (print "Server running, count is " count)
      (if (< count 1) ;; 3x3 = 9 secs aprox
	  (loop (+ count 1)))
      
      ;; NOTE: Get rid of this mechanism! It really is not needed...
      (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info))
      







|





|

|


>

|




















|



|







134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
			     (loop))))))))

    ;; The heavy lifting
    ;;
    ;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime
    ;;
    (let loop ((queue-lst '()))
      (print "GOT HERE EH?")
      (let* ((rawmsg (receive-message* pull-socket))
	     (packet (db:string->obj rawmsg)))
	(debug:print-info 12 "server=> received packet=" packet)
	(if (cdb:packet-get-immediate packet) ;; process immediately or put in queue
	    (begin
	      (db:process-queue pub-socket (cons packet queue-lst))
	      (loop '()))
	    (loop (cons packet queue-lst)))))))

(define (server:reply pubsock target result)
  (debug:print-info 11 "server:reply target=" target ", result=" result)
  (send-message pubsock target send-more: #t)
  (send-message pubsock (db:obj->string result)))

;; run server:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (server:keep-running)
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  ;; This thread waits for the server to come alive
  (let* ((server-info (let loop ()
			(let ((sdat #f))
			  (mutex-lock! *heartbeat-mutex*)
			  (set! sdat *server-info*)
			  (mutex-unlock! *heartbeat-mutex*)
			  (if sdat sdat
			      (begin
				(sleep 4)
				(loop))))))
	 (iface       (cadr server-info))
	 (pullport    (caddr server-info))
	 (pubport     (cadddr server-info)) ;; id interface pullport pubport)
	 ;; (zmq-sockets (server:client-connect iface pullport pubport))
	 )
    (let loop ((count 0))
      (thread-sleep! 4) ;; no need to do this very often
      ;; (let ((queue-len (string->number (cdb:client-call zmq-sockets 'sync #t 1))))
      ;; (print "Server running, count is " count)
      (if (< count 1) ;; 3x3 = 9 secs aprox
	  (loop (+ count 1)))
      
      ;; NOTE: Get rid of this mechanism! It really is not needed...
      (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info))
      
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
       (exit)) ;; To exit or not? That is the question.
     (let ((zmq-url (conc "tcp://" iface ":" p)))
       (debug:print 0 "Trying to start server on " zmq-url)
       (bind-socket s zmq-url)
       (list iface s port)))))

(define (server:setup-ports ipaddrstr startport)
  (let* ((s1 (server:find-free-port-and-open ipaddrstr #f startport 'pub))
	 (p1 (caddr s1))
	 (s2 (server:find-free-port-and-open ipaddrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pull))
	 (p2 (caddr s2)))
    (set! *runremote* #f)
    (debug:print 0 "Server started on " ipaddrstr " ports " p1 " and " p2)
    (mutex-lock! *heartbeat-mutex*)
    (set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) ipaddrstr p1 p2 0 'live))
    (mutex-unlock! *heartbeat-mutex*)
    (list s1 s2)))







|

|







222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
       (exit)) ;; To exit or not? That is the question.
     (let ((zmq-url (conc "tcp://" iface ":" p)))
       (debug:print 0 "Trying to start server on " zmq-url)
       (bind-socket s zmq-url)
       (list iface s port)))))

(define (server:setup-ports ipaddrstr startport)
  (let* ((s1 (server:find-free-port-and-open ipaddrstr #f startport 'pull))
	 (p1 (caddr s1))
	 (s2 (server:find-free-port-and-open ipaddrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pub))
	 (p2 (caddr s2)))
    (set! *runremote* #f)
    (debug:print 0 "Server started on " ipaddrstr " ports " p1 " and " p2)
    (mutex-lock! *heartbeat-mutex*)
    (set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) ipaddrstr p1 p2 0 'live))
    (mutex-unlock! *heartbeat-mutex*)
    (list s1 s2)))