︙ | | | ︙ | |
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
(require-extension (srfi 18) extras tcp) ;; rpc)
;; (import (prefix rpc rpc:))
(use sqlite3 srfi-1 posix regex regex-case srfi-69 csv-xml s11n md5 message-digest)
(import (prefix sqlite3 sqlite3:))
(use zmq)
(declare (unit db))
(declare (uses common))
(declare (uses keys))
(declare (uses ods))
(include "common_records.scm")
(include "db_records.scm")
|
<
<
|
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
(require-extension (srfi 18) extras tcp) ;; rpc)
;; (import (prefix rpc rpc:))
(use sqlite3 srfi-1 posix regex regex-case srfi-69 csv-xml s11n md5 message-digest)
(import (prefix sqlite3 sqlite3:))
(declare (unit db))
(declare (uses common))
(declare (uses keys))
(declare (uses ods))
(include "common_records.scm")
(include "db_records.scm")
|
︙ | | | ︙ | |
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
|
(if currstate (conc "state='" currstate "' AND ") "")
(if currstatus (conc "status='" currstatus "' AND ") "")
" run_id=? AND testname=? AND NOT (item_path='' AND testname in (SELECT DISTINCT testname FROM tests WHERE testname=? AND item_path != ''));")))
;;(debug:print 0 "QRY: " qry)
(sqlite3:execute db qry run-id newstate newstatus testname testname)))
testnames))
(define (cdb:delete-tests-in-state zmqsocket run-id state)
(cdb:client-call zmqsocket 'delete-tests-in-state #t *default-numtries* run-id state))
;; speed up for common cases with a little logic
(define (db:test-set-state-status-by-id db test-id newstate newstatus newcomment)
(cond
((and newstate newstatus newcomment)
(sqlite3:exectute db "UPDATE tests SET state=?,status=?,comment=? WHERE id=?;" newstate newstatus test-id))
((and newstate newstatus)
|
|
|
|
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
|
(if currstate (conc "state='" currstate "' AND ") "")
(if currstatus (conc "status='" currstatus "' AND ") "")
" run_id=? AND testname=? AND NOT (item_path='' AND testname in (SELECT DISTINCT testname FROM tests WHERE testname=? AND item_path != ''));")))
;;(debug:print 0 "QRY: " qry)
(sqlite3:execute db qry run-id newstate newstatus testname testname)))
testnames))
(define (cdb:delete-tests-in-state serverdat run-id state)
(cdb:client-call serverdat 'delete-tests-in-state #t *default-numtries* run-id state))
;; speed up for common cases with a little logic
(define (db:test-set-state-status-by-id db test-id newstate newstatus newcomment)
(cond
((and newstate newstatus newcomment)
(sqlite3:exectute db "UPDATE tests SET state=?,status=?,comment=? WHERE id=?;" newstate newstatus test-id))
((and newstate newstatus)
|
︙ | | | ︙ | |
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
|
(define (db:test-set-comment db test-id comment)
(sqlite3:execute
db
"UPDATE tests SET comment=? WHERE id=?;"
comment test-id))
(define (cdb:test-set-rundir! zmqsocket run-id test-name item-path rundir)
(cdb:client-call zmqsocket 'test-set-rundir #t *default-numtries* rundir run-id test-name item-path))
(define (cdb:test-set-rundir-by-test-id zmqsocket test-id rundir)
(cdb:client-call zmqsocket 'test-set-rundir-by-test-id #t *default-numtries* rundir test-id))
(define (db:test-get-rundir-from-test-id db test-id)
(let ((res #f)) ;; (hash-table-ref/default *test-paths* test-id #f)))
;; (if res
;; res
;; (begin
(sqlite3:for-each-row
(lambda (tpath)
(set! res tpath))
db
"SELECT rundir FROM tests WHERE id=?;"
test-id)
;; (hash-table-set! *test-paths* test-id res)
res)) ;; ))
(define (cdb:test-set-log! zmqsocket test-id logf)
(if (string? logf)(cdb:client-call zmqsocket 'test-set-log #f *default-numtries* logf test-id)))
;;======================================================================
;; Misc. test related queries
;;======================================================================
(define (db:test-get-paths-matching db keynames target fnamepatt #!key (res '()))
(let* ((testpatt (if (args:get-arg "-testpatt")(args:get-arg "-testpatt") "%"))
|
|
|
|
|
|
|
|
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
|
(define (db:test-set-comment db test-id comment)
(sqlite3:execute
db
"UPDATE tests SET comment=? WHERE id=?;"
comment test-id))
(define (cdb:test-set-rundir! serverdat run-id test-name item-path rundir)
(cdb:client-call serverdat 'test-set-rundir #t *default-numtries* rundir run-id test-name item-path))
(define (cdb:test-set-rundir-by-test-id serverdat test-id rundir)
(cdb:client-call serverdat 'test-set-rundir-by-test-id #t *default-numtries* rundir test-id))
(define (db:test-get-rundir-from-test-id db test-id)
(let ((res #f)) ;; (hash-table-ref/default *test-paths* test-id #f)))
;; (if res
;; res
;; (begin
(sqlite3:for-each-row
(lambda (tpath)
(set! res tpath))
db
"SELECT rundir FROM tests WHERE id=?;"
test-id)
;; (hash-table-set! *test-paths* test-id res)
res)) ;; ))
(define (cdb:test-set-log! serverdat test-id logf)
(if (string? logf)(cdb:client-call serverdat 'test-set-log #f *default-numtries* logf test-id)))
;;======================================================================
;; Misc. test related queries
;;======================================================================
(define (db:test-get-paths-matching db keynames target fnamepatt #!key (res '()))
(let* ((testpatt (if (args:get-arg "-testpatt")(args:get-arg "-testpatt") "%"))
|
︙ | | | ︙ | |
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
|
(set! *client-non-blocking-mode* #f)
res))
;; params = 'target cached remparams
;;
;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime
;;
(define (cdb:client-call zmq-sockets qtype immediate numretries . params)
(debug:print-info 11 "cdb:client-call zmq-sockets=" zmq-sockets ", qtype=" qtype ", immediate=" immediate ", numretries=" numretries ", params=" params)
(handle-exceptions
exn
(begin
(thread-sleep! 5)
(if (> numretries 0)(apply cdb:client-call zmq-sockets qtype immediate (- numretries 1) params)))
(let* ((push-socket (vector-ref zmq-sockets 0))
(sub-socket (vector-ref zmq-sockets 1))
(client-sig (server:get-client-signature))
(query-sig (message-digest-string (md5-primitive) (conc qtype immediate params)))
(zdat (db:obj->string (vector client-sig qtype immediate query-sig params (current-seconds)))) ;; (with-output-to-string (lambda ()(serialize params))))
(res #f)
(send-receive (lambda ()
(debug:print-info 11 "sending message")
(send-message push-socket zdat)
(debug:print-info 11 "message sent")
(let loop ()
;; get the sender info
;; this should match (server:get-client-signature)
;; we will need to process "all" messages here some day
(receive-message* sub-socket)
;; now get the actual message
(let ((myres (db:string->obj (receive-message* sub-socket))))
(if (equal? query-sig (vector-ref myres 1))
(set! res (vector-ref myres 2))
(loop))))))
(timeout (lambda ()
(let loop ((n numretries))
(thread-sleep! 15)
(if (not res)
(if (> numretries 0)
(begin
(debug:print 2 "WARNING: no reply to query " params ", trying resend")
(debug:print-info 11 "re-sending message")
(send-message push-socket zdat)
(debug:print-info 11 "message re-sent")
(loop (- n 1)))
;; (apply cdb:client-call zmq-sockets qtype immediate (- numretries 1) params))
(begin
(debug:print 0 "ERROR: cdb:client-call timed out " params ", exiting.")
(exit 5))))))))
(debug:print-info 11 "Starting threads")
(let ((th1 (make-thread send-receive "send receive"))
(th2 (make-thread timeout "timeout")))
(thread-start! th1)
(thread-start! th2)
(thread-join! th1)
(debug:print-info 11 "cdb:client-call returning res=" res)
res))))
(define (cdb:set-verbosity zmq-socket val)
(cdb:client-call zmq-socket 'set-verbosity #f *default-numtries* val))
(define (cdb:login zmq-sockets keyval signature)
(cdb:client-call zmq-sockets 'login #t *default-numtries* keyval megatest-version signature))
(define (cdb:logout zmq-socket keyval signature)
(cdb:client-call zmq-socket 'logout #t *default-numtries* keyval signature))
(define (cdb:num-clients zmq-socket)
(cdb:client-call zmq-socket 'numclients #t *default-numtries*))
(define (cdb:test-set-status-state zmqsocket test-id status state msg)
(if msg
(cdb:client-call zmqsocket 'state-status-msg #t *default-numtries* state status msg test-id)
(cdb:client-call zmqsocket 'state-status #t *default-numtries* state status test-id))) ;; run-id test-name item-path minutes cpuload diskfree tmpfree)
(define (cdb:test-rollup-test_data-pass-fail zmqsocket test-id)
(cdb:client-call zmqsocket 'test_data-pf-rollup #t *default-numtries* test-id test-id test-id test-id))
(define (cdb:pass-fail-counts zmqsocket test-id fail-count pass-count)
(cdb:client-call zmqsocket 'pass-fail-counts #t *default-numtries* fail-count pass-count test-id))
(define (cdb:tests-register-test zmqsocket run-id test-name item-path)
(let ((item-paths (if (equal? item-path "")
(list item-path)
(list item-path ""))))
(cdb:client-call zmqsocket 'register-test #t *default-numtries* run-id test-name item-path)))
(define (cdb:flush-queue zmqsocket)
(cdb:client-call zmqsocket 'flush #f *default-numtries*))
(define (cdb:kill-server zmqsocket)
(cdb:client-call zmqsocket 'killserver #f *default-numtries*))
(define (cdb:roll-up-pass-fail-counts zmqsocket run-id test-name item-path status)
(cdb:client-call zmqsocket 'immediate #f *default-numtries* open-run-close db:roll-up-pass-fail-counts #f run-id test-name item-path status))
(define (cdb:get-test-info zmqsocket run-id test-name item-path)
(cdb:client-call zmqsocket 'immediate #f *default-numtries* open-run-close db:get-test-info #f run-id test-name item-path))
(define (cdb:get-test-info-by-id zmqsocket test-id)
(cdb:client-call zmqsocket 'immediate #f *default-numtries* open-run-close db:get-test-info-by-id #f test-id))
;; db should be db open proc or #f
(define (cdb:remote-run proc db . params)
(apply cdb:client-call *runremote* 'immediate #f *default-numtries* open-run-close proc #f params))
(define (db:test-get-logfile-info db run-id test-name)
(let ((res #f))
|
|
|
|
<
<
|
<
<
<
|
<
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
|
(set! *client-non-blocking-mode* #f)
res))
;; params = 'target cached remparams
;;
;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime
;;
(define (cdb:client-call serverdat qtype immediate numretries . params)
(debug:print-info 11 "cdb:client-call serverdat=" serverdat ", qtype=" qtype ", immediate=" immediate ", numretries=" numretries ", params=" params)
(handle-exceptions
exn
(begin
(thread-sleep! 5)
(if (> numretries 0)(apply cdb:client-call serverdat qtype immediate (- numretries 1) params)))
(let* ((client-sig (server:get-client-signature))
(query-sig (message-digest-string (md5-primitive) (conc qtype immediate params)))
(zdat (db:obj->string (vector client-sig qtype immediate query-sig params (current-seconds)))) ;; (with-output-to-string (lambda ()(serialize params))))
(res #f)
(send-receive (lambda ()
(let loop ((res (server:client-send-receive serverdat zdat)))
;; get the sender info
;; this should match (server:get-client-signature)
;; we will need to process "all" messages here some day
;; now get the actual message
(let ((myres (db:string->obj res)))
(if (equal? query-sig (vector-ref myres 1))
(set! res (vector-ref myres 2))
(loop (server:client-send-receive serverdat zdat)))))))
(timeout (lambda ()
(let loop ((n numretries))
(thread-sleep! 15)
(if (not res)
(if (> numretries 0)
(begin
(debug:print 2 "WARNING: no reply to query " params ", trying resend")
(debug:print-info 11 "re-sending message")
(send-message push-socket zdat)
(debug:print-info 11 "message re-sent")
(loop (- n 1)))
;; (apply cdb:client-call serverdats qtype immediate (- numretries 1) params))
(begin
(debug:print 0 "ERROR: cdb:client-call timed out " params ", exiting.")
(exit 5))))))))
(debug:print-info 11 "Starting threads")
(let ((th1 (make-thread send-receive "send receive"))
(th2 (make-thread timeout "timeout")))
(thread-start! th1)
(thread-start! th2)
(thread-join! th1)
(debug:print-info 11 "cdb:client-call returning res=" res)
res))))
(define (cdb:set-verbosity serverdat val)
(cdb:client-call serverdat 'set-verbosity #f *default-numtries* val))
(define (cdb:login serverdat keyval signature)
(cdb:client-call serverdat 'login #t *default-numtries* keyval megatest-version signature))
(define (cdb:logout serverdat keyval signature)
(cdb:client-call serverdat 'logout #t *default-numtries* keyval signature))
(define (cdb:num-clients serverdat)
(cdb:client-call serverdat 'numclients #t *default-numtries*))
(define (cdb:test-set-status-state serverdat test-id status state msg)
(if msg
(cdb:client-call serverdat 'state-status-msg #t *default-numtries* state status msg test-id)
(cdb:client-call serverdat 'state-status #t *default-numtries* state status test-id))) ;; run-id test-name item-path minutes cpuload diskfree tmpfree)
(define (cdb:test-rollup-test_data-pass-fail serverdat test-id)
(cdb:client-call serverdat 'test_data-pf-rollup #t *default-numtries* test-id test-id test-id test-id))
(define (cdb:pass-fail-counts serverdat test-id fail-count pass-count)
(cdb:client-call serverdat 'pass-fail-counts #t *default-numtries* fail-count pass-count test-id))
(define (cdb:tests-register-test serverdat run-id test-name item-path)
(let ((item-paths (if (equal? item-path "")
(list item-path)
(list item-path ""))))
(cdb:client-call serverdat 'register-test #t *default-numtries* run-id test-name item-path)))
(define (cdb:flush-queue serverdat)
(cdb:client-call serverdat 'flush #f *default-numtries*))
(define (cdb:kill-server serverdat)
(cdb:client-call serverdat 'killserver #f *default-numtries*))
(define (cdb:roll-up-pass-fail-counts serverdat run-id test-name item-path status)
(cdb:client-call serverdat 'immediate #f *default-numtries* open-run-close db:roll-up-pass-fail-counts #f run-id test-name item-path status))
(define (cdb:get-test-info serverdat run-id test-name item-path)
(cdb:client-call serverdat 'immediate #f *default-numtries* open-run-close db:get-test-info #f run-id test-name item-path))
(define (cdb:get-test-info-by-id serverdat test-id)
(cdb:client-call serverdat 'immediate #f *default-numtries* open-run-close db:get-test-info-by-id #f test-id))
;; db should be db open proc or #f
(define (cdb:remote-run proc db . params)
(apply cdb:client-call *runremote* 'immediate #f *default-numtries* open-run-close proc #f params))
(define (db:test-get-logfile-info db run-id test-name)
(let ((res #f))
|
︙ | | | ︙ | |
︙ | | | ︙ | |
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
|
;; PURPOSE.
(require-extension (srfi 18) extras tcp s11n)
(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest)
(import (prefix sqlite3 sqlite3:))
(use zmq)
(declare (unit server))
(declare (uses common))
(declare (uses db))
(declare (uses tests))
(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running.
(include "common_records.scm")
(include "db_records.scm")
;; Transition to pub --> sub with pull <-- push
;;
;; 1. client sends request to server via push to the pull port
;; 2. server puts request in queue or processes immediately as appropriate
;; 3. server puts responses from completed requests into pub port
;;
;; TODO
;;
;; Done Tested
;; [x] [ ] 1. Add columns pullport pubport to servers table
;; [x] [ ] 2. Add rm of monitor.db if older than 11/12/2012
;; [x] [ ] 3. Add create of pullport and pubport with finding of available ports
;; [x] [ ] 4. Add client compose of request
;; [x] [ ] - name of client: testname/itempath-test_id-hostname
;; [x] [ ] - name of request: callname, params
;; [x] [ ] - request key: f(clientname, callname, params)
;; [x] [ ] 5. Add processing of subscription hits
;; [x] [ ] - done when get key
;; [x] [ ] - return results
;; [x] [ ] 6. Add timeout processing
;; [x] [ ] - after 60 seconds
;; [ ] [ ] i. check server alive, connect to new if necessary
;; [ ] [ ] ii. resend request
;; [ ] [ ] 7. Turn self ping back on
(define (server:make-server-url hostport)
(if (not hostport)
#f
(conc "tcp://" (car hostport) ":" (cadr hostport))))
(define *server-loop-heart-beat* (current-seconds))
(define *heartbeat-mutex* (make-mutex))
(define-inline (zmqsock:get-pub dat)(vector-ref dat 0))
(define-inline (zmqsock:get-pull dat)(vector-ref dat 1))
(define-inline (zmqsock:set-pub! dat s)(vector-set! dat s 0))
(define-inline (zmqsock:set-pull! dat s)(vector-set! dat s 0))
(define (server:run hostn)
(debug:print 2 "Attempting to start the server ...")
(if (not *toppath*)
(if (not (setup-for-run))
(begin
(debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting")
(exit))))
(let* ((zmq-sdat1 #f)
(zmq-sdat2 #f)
(pull-socket #f)
(pub-socket #f)
(p1 #f)
(p2 #f)
(zmq-sockets-dat #f)
(iface (if (string=? "-" hostn)
"*" ;; (get-host-name)
hostn))
(hostname (get-host-name))
(ipaddrstr (let ((ipstr (if (string=? "-" hostn)
(string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
#f)))
(if ipstr ipstr hostname)))
(last-run 0))
(set! zmq-sockets-dat (server:setup-ports ipaddrstr (if (args:get-arg "-port")
(string->number (args:get-arg "-port"))
(+ 5000 (random 1001)))))
(set! zmq-sdat1 (car zmq-sockets-dat))
(set! pull-socket (cadr zmq-sdat1)) ;; (iface s port)
(set! p1 (caddr zmq-sdat1))
(set! zmq-sdat2 (cadr zmq-sockets-dat))
(set! pub-socket (cadr zmq-sdat2))
(set! p2 (caddr zmq-sdat2))
(set! *cache-on* #t)
;; what to do when we quit
;;
;; (on-exit (lambda ()
;; (if (and *toppath* *server-info*)
;; (open-run-close tasks:server-deregister-self tasks:open-db (car *server-info*))
;; (let loop ()
;; (let ((queue-len 0))
;; (thread-sleep! (random 5))
;; (mutex-lock! *incoming-mutex*)
;; (set! queue-len (length *incoming-data*))
;; (mutex-unlock! *incoming-mutex*)
;; (if (> queue-len 0)
;; (begin
;; (debug:print-info 0 "Queue not flushed, waiting ...")
;; (loop))))))))
;; The heavy lifting
;;
;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime
;;
(let loop ((queue-lst '()))
(let* ((rawmsg (receive-message* pull-socket))
(packet (db:string->obj rawmsg))
(qtype (cdb:packet-get-qtype packet)))
(debug:print-info 12 "server=> received packet=" packet)
(if (not (member qtype '(sync ping)))
(begin
(mutex-lock! *heartbeat-mutex*)
(set! *last-db-access* (current-seconds))
(mutex-unlock! *heartbeat-mutex*)))
(if #t ;; (cdb:packet-get-immediate packet) ;; process immediately or put in queue
(begin
(open-run-close db:process-queue #f pub-socket (cons packet queue-lst))
(loop '()))
(loop (cons packet queue-lst)))))))
;; run server:keep-running in a parallel thread to monitor that the db is being
;; used and to shutdown after sometime if it is not.
;;
(define (server:keep-running)
;; if none running or if > 20 seconds since
;; server last used then start shutdown
;; This thread waits for the server to come alive
(let* ((server-info (let loop ()
(let ((sdat #f))
(mutex-lock! *heartbeat-mutex*)
(set! sdat *server-info*)
(mutex-unlock! *heartbeat-mutex*)
(if sdat sdat
(begin
(sleep 4)
(loop))))))
(iface (cadr server-info))
(pullport (caddr server-info))
(pubport (cadddr server-info)) ;; id interface pullport pubport)
(zmq-sockets (server:client-connect iface pullport pubport))
(last-access 0))
(let loop ((count 0))
(thread-sleep! 4) ;; no need to do this very often
;; NB// sync currently does NOT return queue-length
(let ((queue-len (cdb:client-call zmq-sockets 'sync #t 1)))
;; (print "Server running, count is " count)
(if (< count 1) ;; 3x3 = 9 secs aprox
(loop (+ count 1)))
;; NOTE: Get rid of this mechanism! It really is not needed...
(open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info))
;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access
(mutex-lock! *heartbeat-mutex*)
(set! last-access *last-db-access*)
(mutex-unlock! *heartbeat-mutex*)
(if (> (+ last-access
;; (* 50 60 60) ;; 48 hrs
;; 60 ;; one minute
;; (* 60 60) ;; one hour
(* 45 60) ;; 45 minutes, until the db deletion bug is fixed.
)
(current-seconds))
(begin
(debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access))
(loop 0))
(begin
(debug:print-info 0 "Starting to shutdown the server.")
;; need to delete only *my* server entry (future use)
(set! *time-to-exit* #t)
(open-run-close tasks:server-deregister-self tasks:open-db (get-host-name))
(thread-sleep! 1)
(debug:print-info 0 "Max cached queries was " *max-cache-size*)
(debug:print-info 0 "Server shutdown complete. Exiting")
(exit)))))))
(define (server:find-free-port-and-open iface s port stype #!key (trynum 50))
(let ((s (if s s (make-socket stype)))
(p (if (number? port) port 5555))
(old-handler (current-exception-handler)))
(handle-exceptions
exn
(begin
(debug:print 0 "Failed to bind to port " p ", trying next port")
(debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
;; (old-handler)
;; (print-call-chain)
(if (> trynum 0)
(server:find-free-port-and-open iface s (+ p 1) trynum: (- trynum 1))
(debug:print-info 0 "Tried ports up to " p
" but all were in use. Please try a different port range by starting the server with parameter \" -port N\" where N is the starting port number to use"))
(exit)) ;; To exit or not? That is the question.
(let ((zmq-url (conc "tcp://" iface ":" p)))
(debug:print 2 "Trying to start server on " zmq-url)
(bind-socket s zmq-url)
(list iface s port)))))
(define (server:setup-ports ipaddrstr startport)
(let* ((s1 (server:find-free-port-and-open ipaddrstr #f startport 'pull))
(p1 (caddr s1))
(s2 (server:find-free-port-and-open ipaddrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pub))
(p2 (caddr s2)))
(set! *runremote* #f)
(debug:print 0 "Server started on " ipaddrstr " ports " p1 " and " p2)
(mutex-lock! *heartbeat-mutex*)
(set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) ipaddrstr p1 p2 0 'live))
(mutex-unlock! *heartbeat-mutex*)
(list s1 s2)))
(define (server:mk-signature)
(message-digest-string (md5-primitive)
(with-output-to-string
(lambda ()
(write (list (current-directory)
(argv)))))))
|
>
|
>
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
|
>
>
>
<
<
<
|
|
<
<
<
<
<
<
<
|
<
|
|
|
<
<
<
<
<
<
<
<
<
|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
|
<
<
|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
|
|
<
<
<
|
|
|
<
<
<
|
|
>
>
>
|
|
<
<
<
|
<
<
|
<
<
<
<
<
<
<
<
<
<
<
|
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
;; PURPOSE.
(require-extension (srfi 18) extras tcp s11n)
(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest)
(import (prefix sqlite3 sqlite3:))
(use spiffy awful http-client)
(tcp-buffer-size 2048)
(declare (unit server))
(declare (uses common))
(declare (uses db))
(declare (uses tests))
(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running.
(include "common_records.scm")
(include "db_records.scm")
(define (server:make-server-url hostport)
(if (not hostport)
#f
(conc "http://" (car hostport) ":" (cadr hostport))))
(define *server-loop-heart-beat* (current-seconds))
(define *heartbeat-mutex* (make-mutex))
;;======================================================================
;; S E R V E R
;;======================================================================
;; Call this to start the actual server
;;
(define (server:run hostn)
(debug:print 2 "Attempting to start the server ...")
(if (not *toppath*)
(if (not (setup-for-run))
(begin
(debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting")
(exit))))
(let* ((iface (if (string=? "-" hostn)
"*" ;; (get-host-name)
hostn))
(hostname (get-host-name))
(ipaddrstr (let ((ipstr (if (string=? "-" hostn)
(string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
#f)))
(if ipstr ipstr hostname)))
(start-port (if (args:get-arg "-port")
(string->number (args:get-arg "-port"))
(+ 5000 (random 1001)))))
(set! *cache-on* #t)
(server:try-start-server ipaddrstr portnum)))
;; This is recursively run by server:run until sucessful
;;
(define (server:try-start-server ipaddrstr portnum)
(handle-exceptions
exn
(begin
(print-error-message exn)
(if (< portnum 9000)
(begin
(print "WARNING: failed to start on portnum: " portnum ", trying next port")
(sleep 1)
(server:try-start-server ipaddrstr (+ portnum 1)))
(print "ERROR: Tried and tried but could not start the server")))
(print "INFO: Trying to start server on portnum: " portnum)
(awful-start hello-world ip-address: ipaddrstr port: portnum)))
(define (server:mk-signature)
(message-digest-string (md5-primitive)
(with-output-to-string
(lambda ()
(write (list (current-directory)
(argv)))))))
|
︙ | | | ︙ | |
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
|
;;======================================================================
(define (server:get-client-signature)
(if *my-client-signature* *my-client-signature*
(let ((sig (server:mk-signature)))
(set! *my-client-signature* sig)
*my-client-signature*)))
;;
(define (server:client-socket-connect iface port #!key (context #f)(type 'req)(subscriptions '()))
(debug:print-info 3 "client-connect " iface ":" port ", type=" type ", subscriptions=" subscriptions)
(let ((connect-ok #f)
(zmq-socket (if context
(make-socket type context)
(make-socket type)))
(conurl (server:make-server-url (list iface port))))
(if (socket? zmq-socket)
(begin
;; first apply subscriptions
(for-each (lambda (subscription)
(debug:print 2 "Subscribing to " subscription)
(socket-option-set! zmq-socket 'subscribe subscription))
subscriptions)
(connect-socket zmq-socket conurl)
zmq-socket)
(begin
(debug:print 0 "ERROR: Failed to open socket to " conurl)
#f))))
(define (server:client-login zmq-sockets)
(cdb:login zmq-sockets *toppath* (server:get-client-signature)))
;; Not currently used! But, I think it *should* be used!!!
(define (server:client-logout zmq-socket)
(let ((ok (and (socket? zmq-socket)
(cdb:logout zmq-socket *toppath* (server:get-client-signature)))))
;; (close-socket zmq-socket)
ok))
(define (server:client-connect iface pullport pubport)
(let* ((push-socket (server:client-socket-connect iface pullport type: 'push))
(sub-socket (server:client-socket-connect iface pubport
type: 'sub
subscriptions: (list (server:get-client-signature) "all")))
(zmq-sockets (vector push-socket sub-socket))
(login-res #f))
(set! login-res (server:client-login zmq-sockets))
(if (and (not (null? login-res))
(car login-res))
(begin
(debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".")
(set! *runremote* zmq-sockets)
zmq-sockets)
(begin
(debug:print-info 2 "Failed to login or connect to " conurl)
(set! *runremote* #f)
#f))))
;; Do all the connection work, start a server if not already running
(define (server:client-setup #!key (numtries 50))
(if (not *toppath*)
(if (not (setup-for-run))
(begin
(debug:print 0 "ERROR: failed to find megatest.config, exiting")
(exit))))
(let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db)))
(if hostinfo
(let ((host (list-ref hostinfo 0))
(iface (list-ref hostinfo 1))
(pullport (list-ref hostinfo 2))
(pubport (list-ref hostinfo 3)))
(debug:print-info 2 "Setting up to connect to " hostinfo)
;; (handle-exceptions
;; exn
;; (begin
;; ;; something went wrong in connecting to the server. In this scenario it is ok
;; ;; to try again
;; (debug:print 0 "ERROR: Failed to open a connection to the server at: " hostinfo)
;; (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
;; (debug:print 0 " perhaps jobs killed with -9? Removing server records")
;; (open-run-close tasks:server-deregister tasks:open-db host pullport: pullport)
;; (server:client-setup (- numtries 1))
;; #f)
(server:client-connect iface pullport pubport)) ;; )
(if (> numtries 0)
(let ((exe (car (argv)))
(pid #f))
(debug:print-info 0 "No server available, attempting to start one...")
;; (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*)
;; (string-intersperse *verbosity* ",")
;; (conc *verbosity*)))))
|
|
|
>
>
|
<
<
|
|
<
<
<
<
<
<
<
<
<
<
<
<
<
|
|
|
|
|
<
<
<
<
<
|
|
|
|
|
<
<
<
<
<
<
<
<
<
<
<
<
<
|
|
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
|
;;======================================================================
(define (server:get-client-signature)
(if *my-client-signature* *my-client-signature*
(let ((sig (server:mk-signature)))
(set! *my-client-signature* sig)
*my-client-signature*)))
;; <html>
;; <head></head>
;; <body>1 Hello, world! Goodbye Dolly</body></html>
;; Send msg to serverdat and receive result
(define (server:client-send-receive serverdat msg)
(let* ((res (with-input-from-request (conc serverdat "/?dat=" msg) #f read-string))
(match (string-search (regexp "<body>(.*)<.body>") (caddr (string-split res "\n")))))
(cadr match)))
(define (server:client-login serverdat)
(cdb:login serverdat *toppath* (server:get-client-signature)))
;; Not currently used! But, I think it *should* be used!!!
(define (server:client-logout zmq-socket)
(let ((ok (and (socket? zmq-socket)
(cdb:logout zmq-socket *toppath* (server:get-client-signature)))))
;; (close-socket zmq-socket)
ok))
(define (server:client-connect iface port)
(let* ((login-res #f))
(set! login-res (server:client-login serverdat))
(if (and (not (null? login-res))
(car login-res))
(begin
(debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".")
(set! *runremote* serverdat)
serverdat)
(begin
(debug:print-info 2 "Failed to login or connect to " conurl)
(set! *runremote* #f)
#f))))
;; Do all the connection work, start a server if not already running
(define (server:client-setup #!key (numtries 50))
(if (not *toppath*)
(if (not (setup-for-run))
(begin
(debug:print 0 "ERROR: failed to find megatest.config, exiting")
(exit))))
(let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db)))
(if hostinfo
(let ((host (list-ref hostinfo 0))
(iface (list-ref hostinfo 1)))
(debug:print-info 2 "Setting up to connect to " hostinfo)
(server:client-connect iface pullport pubport)) ;; )
(if (> numtries 0)
(let ((exe (car (argv)))
(pid #f))
(debug:print-info 0 "No server available, attempting to start one...")
;; (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*)
;; (string-intersperse *verbosity* ",")
;; (conc *verbosity*)))))
|
︙ | | | ︙ | |
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
|
(debug:print 0 "ERROR: cannot find megatest.config, exiting")
(exit))))
(debug:print-info 2 "Starting the standalone server")
(let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db)))
(if hostinfo
(debug:print-info 2 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo))
(if *toppath*
(let* (;; (th1 (make-thread (lambda ()
;; (let ((server-info #f))
;; ;; wait for the server to be online and available
;; (let loop ()
;; (debug:print-info 2 "Waiting for the server to come online before starting heartbeat")
;; (thread-sleep! 2)
;; (mutex-lock! *heartbeat-mutex*)
;; (set! server-info *server-info* )
;; (mutex-unlock! *heartbeat-mutex*)
;; (if (not server-info)(loop)))
;; (debug:print 2 "Server alive, starting self-ping")
;; (server:self-ping server-info)
;; ))
;; "Self ping"))
(th2 (make-thread (lambda ()
(server:run
(if (args:get-arg "-server")
(args:get-arg "-server")
"-"))) "Server run"))
(th3 (make-thread (lambda ()(server:keep-running)) "Keep running"))
)
(set! *client-non-blocking-mode* #t)
|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
|
|
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
|
(debug:print 0 "ERROR: cannot find megatest.config, exiting")
(exit))))
(debug:print-info 2 "Starting the standalone server")
(let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db)))
(if hostinfo
(debug:print-info 2 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo))
(if *toppath*
(let* ((th2 (make-thread (lambda ()
(server:run
(if (args:get-arg "-server")
(args:get-arg "-server")
"-"))) "Server run"))
(th3 (make-thread (lambda ()(server:keep-running)) "Keep running"))
)
(set! *client-non-blocking-mode* #t)
|
︙ | | | ︙ | |
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
|
(set-signal-handler! signal/int server:client-signal-handler)
(if (server:client-setup)
(debug:print-info 2 "connected as client")
(begin
(debug:print 0 "ERROR: Failed to connect as client")
(exit))))
;;======================================================================
;; Defunct functions
;;======================================================================
;; ping a server and return number of clients or #f (if no response)
;; NOT IN USE!
(define (server:ping host port #!key (secs 10)(return-socket #f))
(cdb:use-non-blocking-mode
(lambda ()
(let* ((res #f)
(th1 (make-thread
(lambda ()
(let* ((zmq-context (make-context 1))
(zmq-socket (server:client-connect host port context: zmq-context)))
(if zmq-socket
(if (server:client-login zmq-socket)
(let ((numclients (cdb:num-clients zmq-socket)))
(if (not return-socket)
(begin
(server:client-logout zmq-socket)
(close-socket zmq-socket)))
(set! res (list #t numclients (if return-socket zmq-socket #f))))
(begin
;; (close-socket zmq-socket)
(set! res (list #f "CAN'T LOGIN" #f))))
(set! res (list #f "CAN'T CONNECT" #f)))))
"Ping: th1"))
(th2 (make-thread
(lambda ()
(let loop ((count 1))
(debug:print-info 1 "Ping " count " server on " host " at port " port)
(thread-sleep! 2)
(if (< count (/ secs 2))
(loop (+ count 1))))
;; (thread-terminate! th1)
(set! res (list #f "TIMED OUT" #f)))
"Ping: th2")))
(thread-start! th2)
(thread-start! th1)
(handle-exceptions
exn
(set! res (list #f "TIMED OUT" #f))
(thread-join! th1 secs))
res))))
;; (define (server:self-ping server-info)
;; ;; server-info: server-id interface pullport pubport
;; (let ((iface (list-ref server-info 1))
;; (pullport (list-ref server-info 2))
;; (pubport (list-ref server-info 3)))
;; (server:client-connect iface pullport pubport)
;; (let loop ()
;; (thread-sleep! 2)
;; (cdb:client-call *runremote* 'ping #t)
;; (debug:print 4 "server:self-ping - I'm alive on " iface ":" pullport "/" pubport "!")
;; (mutex-lock! *heartbeat-mutex*)
;; (set! *server-loop-heart-beat* (current-seconds))
;; (mutex-unlock! *heartbeat-mutex*)
;; (loop))))
(define (server:reply pubsock target query-sig success/fail result)
(debug:print-info 11 "server:reply target=" target ", result=" result)
(send-message pubsock target send-more: #t)
(send-message pubsock (db:obj->string (vector success/fail query-sig result))))
|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
|
222
223
224
225
226
227
228
|
(set-signal-handler! signal/int server:client-signal-handler)
(if (server:client-setup)
(debug:print-info 2 "connected as client")
(begin
(debug:print 0 "ERROR: Failed to connect as client")
(exit))))
|