Megatest

Diff
Login

Differences From Artifact [f2b9d5f3d9]:

To Artifact [d9e8792ebb]:


18
19
20
21
22
23
24
25

26
27
28
29
30
31
32
33
(declare (unit server))

(declare (uses common))
(declare (uses db))
(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running.
(declare (uses synchash))
(declare (uses http-transport))
(declare (uses launch))

;; (declare (uses zmq-transport))
(declare (uses daemon))

(include "common_records.scm")
(include "db_records.scm")

(define (server:make-server-url hostport)
  (if (not hostport)







|
>
|







18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
(declare (unit server))

(declare (uses common))
(declare (uses db))
(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running.
(declare (uses synchash))
(declare (uses http-transport))
(declare (uses rpc-transport))
(declare (uses nmsg-transport))
(declare (uses launch))
(declare (uses daemon))

(include "common_records.scm")
(include "db_records.scm")

(define (server:make-server-url hostport)
  (if (not hostport)
45
46
47
48
49
50
51

52
53
54
55
56
57

58
59
60
61
62
63
64











65
66
67
68
69
70
71
72
73
74
75
76
77




78









79
80
81
82
83
84
85
;;

;; all routes though here end in exit ...
;;
;; start_server
;;
(define (server:launch run-id)

  (http-transport:launch run-id))

;;======================================================================
;; Q U E U E   M A N A G E M E N T
;;======================================================================


;; We don't want to flush the queue if it was just flushed
(define *server:last-write-flush* (current-milliseconds))

;;======================================================================
;; S E R V E R   U T I L I T I E S 
;;======================================================================












;; Generate a unique signature for this server
(define (server:mk-signature)
  (message-digest-string (md5-primitive) 
			 (with-output-to-string
			   (lambda ()
			     (write (list (current-directory)
					  (argv)))))))


;; When using zmq this would send the message back (two step process)
;; with spiffy or rpc this simply returns the return data to be returned
;; 
(define (server:reply return-addr query-sig success/fail result)




  (db:obj->string (vector success/fail query-sig result)))










;; Given a run id start a server process    ### NOTE ### > file 2>&1 
;; if the run-id is zero and the target-host is set 
;; try running on that host
;;
(define  (server:run run-id)
  (let* ((curr-host   (get-host-name))







>
|
|
<
|
<
|
>
|
<





>
>
>
>
>
>
>
>
>
>
>








<




>
>
>
>
|
>
>
>
>
>
>
>
>
>







46
47
48
49
50
51
52
53
54
55

56

57
58
59

60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83

84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
;;

;; all routes though here end in exit ...
;;
;; start_server
;;
(define (server:launch run-id)
  (case *transport-type*
    ((http)(http-transport:launch run-id))
    ((nmsg)(nmsg-transport:launch run-id))

    ((rpc)  (rpc-transport:launch run-id))

    (else (debug:print 0 "ERROR: unknown server type " *transport-type*))))
;;       (else   (debug:print 0 "ERROR: No known transport set, transport=" transport ", using rpc")
;; 	      (rpc-transport:launch run-id)))))


;;======================================================================
;; S E R V E R   U T I L I T I E S 
;;======================================================================

;; Get the transport
(define (server:get-transport)
  (if *transport-type*
      *transport-type*
      (let ((ttype (string->symbol
		    (or (args:get-arg "-transport")
			(configf:lookup *configdat* "server" "transport")
			"rpc"))))
	(set! *transport-type* ttype)
	ttype)))
	    
;; Generate a unique signature for this server
(define (server:mk-signature)
  (message-digest-string (md5-primitive) 
			 (with-output-to-string
			   (lambda ()
			     (write (list (current-directory)
					  (argv)))))))


;; When using zmq this would send the message back (two step process)
;; with spiffy or rpc this simply returns the return data to be returned
;; 
(define (server:reply return-addr query-sig success/fail result)
  (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result)
  ;; (send-message pubsock target send-more: #t)
  ;; (send-message pubsock 
  (case (server:get-transport)
    ((rpc)  (db:obj->string (vector success/fail query-sig result)))
    ((http) (db:obj->string (vector success/fail query-sig result)))
    ((zmq)
     (let ((pub-socket (vector-ref *runremote* 1)))
       (send-message pub-socket return-addr send-more: #t)
       (send-message pub-socket (db:obj->string (vector success/fail query-sig result)))))
    ((fs)   result)
    (else 
     (debug:print 0 "ERROR: unrecognised transport type: " *transport-type*)
     result)))

;; Given a run id start a server process    ### NOTE ### > file 2>&1 
;; if the run-id is zero and the target-host is set 
;; try running on that host
;;
(define  (server:run run-id)
  (let* ((curr-host   (get-host-name))
106
107
108
109
110
111
112






113
114
115
116
117
118
119
	  (setenv "TARGETHOST" target-host)))
    (setenv "TARGETHOST_LOGF" logfile)
    (system (conc "nbfake " cmdln))
    (unsetenv "TARGETHOST_LOGF")
    (if (get-environment-variable "TARGETHOST")(unsetenv "TARGETHOST"))
    ;; (system cmdln)
    (pop-directory)))







;; kind start up of servers, wait 40 seconds before allowing another server for a given
;; run-id to be launched
(define (server:kind-run run-id)
  (let ((last-run-time (hash-table-ref/default *server-kind-run* run-id #f)))
    (if (or (not last-run-time)
	    (> (- (current-seconds) last-run-time) 30))







>
>
>
>
>
>







129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
	  (setenv "TARGETHOST" target-host)))
    (setenv "TARGETHOST_LOGF" logfile)
    (system (conc "nbfake " cmdln))
    (unsetenv "TARGETHOST_LOGF")
    (if (get-environment-variable "TARGETHOST")(unsetenv "TARGETHOST"))
    ;; (system cmdln)
    (pop-directory)))

(define (server:get-client-signature)
  (if *my-client-signature* *my-client-signature*
      (let ((sig (server:mk-signature)))
	(set! *my-client-signature* sig)
	*my-client-signature*)))

;; kind start up of servers, wait 40 seconds before allowing another server for a given
;; run-id to be launched
(define (server:kind-run run-id)
  (let ((last-run-time (hash-table-ref/default *server-kind-run* run-id #f)))
    (if (or (not last-run-time)
	    (> (- (current-seconds) last-run-time) 30))
134
135
136
137
138
139
140

141
142
143



144
145
146
147
148
149
150
	       (trycount 0))
    (if server
	;; note: client:start will set *runremote*. this needs to be changed
	;;       also, client:start will login to the server, also need to change that.
	;;
	;; client:start returns #t if login was successful.
	;;

	(let ((res (server:ping-server run-id 
				       (tasks:hostinfo-get-interface server)
				       (tasks:hostinfo-get-port      server))))



	  ;; if the server didn't respond we must remove the record
	  (if res
	      #t
	      (begin
		(debug:print-info 0 "server at " server " not responding, removing record")
		(tasks:server-force-clean-running-records-for-run-id (db:delay-if-busy tdbdat) run-id 
				" server:check-if-running")







>
|
|
|
>
>
>







163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
	       (trycount 0))
    (if server
	;; note: client:start will set *runremote*. this needs to be changed
	;;       also, client:start will login to the server, also need to change that.
	;;
	;; client:start returns #t if login was successful.
	;;
	(let ((res (case *transport-type*
		     ((http)(server:ping-server run-id 
						(tasks:hostinfo-get-interface server)
						(tasks:hostinfo-get-port      server)))
		     ((nmsg)(nmsg-transport:ping (tasks:hostinfo-get-interface server)
						 (tasks:hostinfo-get-port      server)
						 timeout: 2)))))
	  ;; if the server didn't respond we must remove the record
	  (if res
	      #t
	      (begin
		(debug:print-info 0 "server at " server " not responding, removing record")
		(tasks:server-force-clean-running-records-for-run-id (db:delay-if-busy tdbdat) run-id 
				" server:check-if-running")
194
195
196
197
198
199
200






















		(res "NOREPLY"))
       (if (eof-object? inl)
	   (case (string->symbol res)
	     ((NOREPLY)  #f)
	     ((LOGIN_OK) #t)
	     (else       #f))
	   (loop (read-line) inl))))))





























>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
		(res "NOREPLY"))
       (if (eof-object? inl)
	   (case (string->symbol res)
	     ((NOREPLY)  #f)
	     ((LOGIN_OK) #t)
	     (else       #f))
	   (loop (read-line) inl))))))

(define (server:login toppath)
  (lambda (toppath)
    (set! *last-db-access* (current-seconds))
    (if (equal? *toppath* toppath)
	(begin
	  ;; (debug:print-info 2 "login successful")
	  #t)
	(begin
	  ;; (debug:print-info 2 "login failed")
	  #f))))

(define (server:get-timeout)
  (let ((tmo (configf:lookup  *configdat* "server" "timeout")))
    (if (and (string? tmo)
	     (string->number tmo))
	(* 60 60 (string->number tmo))
	;; (* 3 24 60 60) ;; default to three days
	(* 60 1)         ;; default to one minute
	;; (* 60 60 25)      ;; default to 25 hours
	)))