12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
-
+
+
|
;;======================================================================
;; Database access
;;======================================================================
(require-extension (srfi 18) extras tcp) ;; rpc)
;; (import (prefix rpc rpc:))
(use sqlite3 srfi-1 posix regex regex-case srfi-69 csv-xml s11n md5 message-digest)
(use sqlite3 srfi-1 posix regex regex-case srfi-69 csv-xml s11n md5 message-digest base64)
(import (prefix sqlite3 sqlite3:))
(import (prefix base64 base64:))
(declare (unit db))
(declare (uses common))
(declare (uses keys))
(declare (uses ods))
(include "common_records.scm")
|
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
|
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
|
-
-
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
-
-
+
+
+
+
+
+
+
+
-
-
-
+
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
|
;; (define (db:updater)
;; (debug:print-info 4 "Starting cache processing")
;; (let loop ()
;; (thread-sleep! 10) ;; move save time around to minimize regular collisions?
;; (db:write-cached-data)
;; (loop)))
(define (db:obj->string obj)(with-output-to-string (lambda ()(serialize obj))))
(define (db:string->obj msg)(with-input-from-string msg (lambda ()(deserialize))))
(define (db:obj->string obj)
(string-substitute
(regexp "=") "_"
(base64:base64-encode (with-output-to-string (lambda ()(serialize obj))))
#t))
(define (db:string->obj msg)
(with-input-from-string
(base64:base64-decode
(string-substitute
(regexp "_") "=" msg #t))
(lambda ()(deserialize))))
(define (cdb:use-non-blocking-mode proc)
(set! *client-non-blocking-mode* #t)
(let ((res (proc)))
(set! *client-non-blocking-mode* #f)
res))
;; params = 'target cached remparams
;;
;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime
;;
(define (cdb:client-call serverdat qtype immediate numretries . params)
(debug:print-info 11 "cdb:client-call serverdat=" serverdat ", qtype=" qtype ", immediate=" immediate ", numretries=" numretries ", params=" params)
(handle-exceptions
exn
(begin
(thread-sleep! 5)
(if (> numretries 0)(apply cdb:client-call serverdat qtype immediate (- numretries 1) params)))
;; (handle-exceptions
;; exn
;; (begin
;; (thread-sleep! 5)
;; (if (> numretries 0)(apply cdb:client-call serverdat qtype immediate (- numretries 1) params)))
(let* ((client-sig (server:get-client-signature))
(query-sig (message-digest-string (md5-primitive) (conc qtype immediate params)))
(zdat (db:obj->string (vector client-sig qtype immediate query-sig params (current-seconds)))) ;; (with-output-to-string (lambda ()(serialize params))))
)
;; (print "zdat=" zdat)
(let* (
(res #f)
(send-receive (lambda ()
(let loop ((res (server:client-send-receive serverdat zdat)))
;; get the sender info
(rawdat (server:client-send-receive serverdat zdat))
(tmp #f))
(print "Sent " zdat ", received " rawdat)
;; this should match (server:get-client-signature)
;; we will need to process "all" messages here some day
;; now get the actual message
(let ((myres (db:string->obj res)))
(if (equal? query-sig (vector-ref myres 1))
(set! res (vector-ref myres 2))
(loop (server:client-send-receive serverdat zdat)))))))
(timeout (lambda ()
(let loop ((n numretries))
(thread-sleep! 15)
(if (not res)
(if (> numretries 0)
(begin
(debug:print 2 "WARNING: no reply to query " params ", trying resend")
(debug:print-info 11 "re-sending message")
(set! tmp (db:string->obj newres))
;; (if (equal? query-sig (vector-ref myres 1))
;; (set! res
(vector-ref myres 2)
;; (loop (server:client-send-receive serverdat zdat)))))))
;; (timeout (lambda ()
;; (let loop ((n numretries))
;; (thread-sleep! 15)
;; (if (not res)
;; (if (> numretries 0)
;; (begin
;; (debug:print 2 "WARNING: no reply to query " params ", trying resend")
;; (debug:print-info 11 "re-sending message")
(send-message push-socket zdat)
(debug:print-info 11 "message re-sent")
(loop (- n 1)))
;; (apply cdb:client-call serverdats qtype immediate (- numretries 1) params))
(begin
(debug:print 0 "ERROR: cdb:client-call timed out " params ", exiting.")
(exit 5))))))))
(debug:print-info 11 "Starting threads")
(let ((th1 (make-thread send-receive "send receive"))
(th2 (make-thread timeout "timeout")))
(thread-start! th1)
(thread-start! th2)
(thread-join! th1)
(debug:print-info 11 "cdb:client-call returning res=" res)
res))))
;; (apply cdb:client-call serverdat qtype immediate numretries params)
;; (debug:print-info 11 "message re-sent")
;; (loop (- n 1)))
;; ;; (apply cdb:client-call serverdats qtype immediate (- numretries 1) params))
;; (begin
;; (debug:print 0 "ERROR: cdb:client-call timed out " params ", exiting.")
;; (exit 5))))))))
;; (send-receive)
)))
;; (debug:print-info 11 "Starting threads")
;; (let ((th1 (make-thread send-receive "send receive"))
;; (th2 (make-thread timeout "timeout")))
;; (thread-start! th1)
;; (thread-start! th2)
;; (thread-join! th1)
;; (debug:print-info 11 "cdb:client-call returning res=" res)
;; res))))
(define (cdb:set-verbosity serverdat val)
(cdb:client-call serverdat 'set-verbosity #f *default-numtries* val))
(define (cdb:login serverdat keyval signature)
(cdb:client-call serverdat 'login #t *default-numtries* keyval megatest-version signature))
|
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
|
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
|
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
|
flush
sync
set-verbosity
killserver))
;; not used, intended to indicate to run in calling process
(define db:run-local-queries '()) ;; rollup-tests-pass-fail))
UPDATE DB:PROCESS_QUEUE@@@@
;; The queue is a list of vectors where the zeroth slot indicates the type of query to
;; apply and the second slot is the time of the query and the third entry is a list of
;; values to be applied
;;
(define (db:process-queue db pubsock indata)
(let* ((data (sort indata (lambda (a b)
|