Overview
Comment: | Start of conversion to zmq |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | switch-to-zmq |
Files: | files | file ages | folders |
SHA1: |
dc9fc1c7d4c6a767c47ac9e0b4af44b5 |
User & Date: | matt on 2012-10-23 00:13:55 |
Other Links: | branch diff | manifest | tags |
Context
2012-10-23
| ||
02:16 | More implemented on zmq conversion check-in: dcef80627c user: matt tags: switch-to-zmq | |
00:13 | Start of conversion to zmq check-in: dc9fc1c7d4 user: matt tags: switch-to-zmq | |
2012-10-22
| ||
17:30 | Switched back to util-linux 2.21, disabled libblkid check-in: f72f46f62c user: fdk71adm tags: trunk | |
Changes
Modified db.scm from [9e27b746af] to [b22def0785].
︙ | ︙ | |||
1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 | qrystr) res)) ;;====================================================================== ;; QUEUE UP META, TEST STATUS AND STEPS ;;====================================================================== (define (db:updater) (debug:print-info 4 "Starting cache processing") (let loop ((start-time (current-time))) (thread-sleep! 10) ;; move save time around to minimize regular collisions? (db:write-cached-data) (loop start-time))) (define (cdb:test-set-status-state test-id status state msg) (debug:print-info 4 "cdb:test-set-status-state test-id=" test-id ", status=" status ", state=" state ", msg=" msg) (mutex-lock! *incoming-mutex*) (set! *last-db-access* (current-seconds)) (if msg (set! *incoming-data* (cons (vector 'state-status-msg (current-milliseconds) (list state status msg test-id)) *incoming-data*)) (set! *incoming-data* (cons (vector 'state-status (current-milliseconds) (list state status test-id)) ;; run-id test-name item-path minutes cpuload diskfree tmpfree) *incoming-data*))) (mutex-unlock! *incoming-mutex*) (if *cache-on* (debug:print-info 6 "*cache-on* is " *cache-on* ", skipping cache write") (db:write-cached-data))) | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | < < < | < < < < < < < | < < < < < | < < < < < | < < < < < | < < < < < | 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 | qrystr) res)) ;;====================================================================== ;; QUEUE UP META, TEST STATUS AND STEPS ;;====================================================================== ;; db:updater is run in a thread to write out the cached data periodically (define (db:updater) (debug:print-info 4 "Starting cache processing") (let loop ((start-time (current-time))) (thread-sleep! 10) ;; move save time around to minimize regular collisions? (db:write-cached-data) (loop start-time))) ;; cdb:cached-access is called by the server loop to dispatch commands or queue up ;; db accesses ;; ;; params := qry-name cached? val1 val2 val3 ... (define (cdb:cached-access params) (if (< (length params) 2) "ERROR" (let ((qry-name (car params)) (cached? (cadr params)) (remparam (list-tail params 2))) (debug:print-info 12 "cdb:cached-access qry-name=" qry-name " params=" params) ;; Any special calls are dispatched here. ;; Remainder are put in the db queue (case qry-name ((login) ;; login checks that the megatest path matches (if (null? remparam) #f ;; no path - fail! (let ((calling-path (car remparam))) (if (equal? calling-path *toppath*) #t ;; path matches - pass! Should vet the caller at this time ... #f)))) ;; else fail to login (else (mutex-lock! *incoming-mutex*) (set! *last-db-access* (current-seconds)) (set! *incoming-data* (cons (vector qry-name (current-milliseconds) params) *incoming-data*)) (mutex-unlock! *incoming-mutex*) ;; NOTE: if cached? is #f then this call must be run immediately ;; but first all calls in the queue are run first in the order ;; of their time stamp (if (and cached? *cache-on*) (begin (debug:print-info 12 "*cache-on* is " *cache-on* ", skipping cache write") "CACHED") (begin (db:write-cached-data) "WRITTEN"))))))) (define (cdb:client-call zmq-socket . params) (debug:print-info 11 "zmq-socket " params) (let ((zdat (with-output-to-string (lambda ()(serialize params)))) (res #f)) (send-message zmq-socket zdat) (set! res (receive-message zdat)) (debug:print-info 11 "zmq-socket " (car params) " res=" res) res)) (define (cdb:test-set-status-state test-id status state msg) (debug:print-info 4 "cdb:test-set-status-state test-id=" test-id ", status=" status ", state=" state ", msg=" msg) (mutex-lock! *incoming-mutex*) (set! *last-db-access* (current-seconds)) (if msg (set! *incoming-data* (cons (vector 'state-status-msg (current-milliseconds) (list state status msg test-id)) *incoming-data*)) (set! *incoming-data* (cons (vector 'state-status (current-milliseconds) (list state status test-id)) ;; run-id test-name item-path minutes cpuload diskfree tmpfree) *incoming-data*))) (mutex-unlock! *incoming-mutex*) (if *cache-on* (debug:print-info 6 "*cache-on* is " *cache-on* ", skipping cache write") (db:write-cached-data))) (define (cdb:test-rollup-test_data-pass-fail zmqsocket test-id) (cdb:client-call zmqsocket 'test_data-pf-rollup #t test-id test-id test-id)) (define (cdb:pass-fail-counts zmqsocket test-id fail-count pass-count) (cdb:client-call zmqsocket 'pass-fail-counts fail-count pass-count test-id)) (define (cdb:tests-register-test zmqsocket db run-id test-name item-path) (let ((item-paths (if (equal? item-path "") (list item-path) (list item-path "")))) (cdb:client-call zmqsocket 'register-test run-id test-name item-path))) ;; The queue is a list of vectors where the zeroth slot indicates the type of query to ;; apply and the second slot is the time of the query and the third entry is a list of ;; values to be applied ;; (define (db:write-cached-data) (open-run-close |
︙ | ︙ |
Modified server.scm from [878578b9e5] to [855d362c2a].
1 2 3 4 5 6 7 8 9 10 | ;; Copyright 2006-2012, Matthew Welland. ;; ;; This program is made available under the GNU GPL version 2.0 or ;; greater. See the accompanying file COPYING for details. ;; ;; This program is distributed WITHOUT ANY WARRANTY; without even the ;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR ;; PURPOSE. | | | | | < < < < < < < | | | < < | < < < < < < | | | | | < < < < < < | < < < < < < < < < | < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < | | < | | > > > > > | | | < | < > | > > > | | > > | | | | | < < > > > > > | | | | < < | < | < < < < | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 | ;; Copyright 2006-2012, Matthew Welland. ;; ;; This program is made available under the GNU GPL version 2.0 or ;; greater. See the accompanying file COPYING for details. ;; ;; This program is distributed WITHOUT ANY WARRANTY; without even the ;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR ;; PURPOSE. (require-extension (srfi 18) extras tcp rpc s11n) (import (prefix rpc rpc:)) (use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo) (import (prefix sqlite3 sqlite3:)) (declare (unit server)) (declare (uses common)) (declare (uses db)) (declare (uses tests)) (include "common_records.scm") (include "db_records.scm") (define a (with-output-to-string (lambda ()(serialize '(1 2 3 "Hello and goodbye" #t))))) (define b (with-input-from-string a (lambda ()(deserialize)))) (define (server:run hostn) (debug:print 0 "Attempting to start the server ...") (let ((host:port (open-run-close db:get-var db "SERVER"))) ;; do whe already have a server running? (if host:port (set! *runremote* host:port) (let* ((zmq-socket #f) (hostname (if (string=? "-" hostn) (get-host-name) hostn)) (ipaddrstr (let ((ipstr (if (string=? "-" hostn) (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") #f))) (if ipstr ipstr hostname)))) (set! zmq-socket (server:find-free-port-and-open ipaddrstr)) (set! *cache-on* #t) ;; what to do when we quit ;; (on-exit (lambda () (open-run-close (lambda (db . params) (sqlite3:execute db "DELETE FROM metadat WHERE var='SERVER';")) #f ;; for db #f) ;; for a param (let loop () (let ((queue-len 0)) (thread-sleep! (random 5)) (mutex-lock! *incoming-mutex*) (set! queue-len (length *incoming-data*)) (mutex-unlock! *incoming-mutex*) (if (> queue-len 0) (begin (debug:print-info 0 "Queue not flushed, waiting ...") (loop))))))) ;; The heavy lifting ;; (let loop () (let* ((rawmsg (receive-message zmq-socket)) (params (with-input-from-string rawmsg (lambda ()(deserialize)))) (res #f)) (debug:print-info 12 "server=> received msg=" msg) (set! res (cdb:cached-access params)) (debug:print-info 12 "server=> processed msg=" msg) (send-message zmq-socket res) (loop))))))) ;; run server:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; (define (server:keep-running db host:port) ;; if none running or if > 20 seconds since ;; server last used then start shutdown (let loop ((count 0)) (thread-sleep! 20) ;; no need to do this very often (let ((numrunning (db:get-count-tests-running db))) (if (or (> numrunning 0) (> (+ *last-db-access* 60)(current-seconds))) (begin (debug:print-info 0 "Server continuing, tests running: " numrunning ", seconds since last db access: " (- (current-seconds) *last-db-access*)) (loop (+ 1 count))) (begin (debug:print-info 0 "Starting to shutdown the server side") ;; need to delete only *my* server entry (future use) (sqlite3:execute db "DELETE FROM metadat WHERE var='SERVER';") (thread-sleep! 10) (debug:print-info 0 "Max cached queries was " *max-cache-size*) (debug:print-info 0 "Server shutdown complete. Exiting") ;; (exit))) ))))) (define (server:find-free-port-and-open host s port) (let ((s (if s s (make-socket 'rep))) (p (if (number? port) port 5555))) (handle-exceptions exn (begin (print "Failed to bind to port " p ", trying next port") (server:find-free-port-and-open host s (+ p 1))) (let ((zmq-url (conc "tcp://" host ":" p))) (bind-socket s zmq-url) (set! *runremote* zmq-url) (debug:print 0 "Server started on " zmq-url) (db:set-var db "SERVER" zmq-url) s)))) (define (server:client-setup) (if *runremote* (begin (debug:print 0 "ERROR: Attempt to connect to server but already connected") #f) (let* ((hostinfo (open-run-close db:get-var #f "SERVER")) (zmq-socket (make-socket 'req))) (if hostinfo (begin (debug:print-info 2 "Setting up to connect to " hostinfo) (handle-exceptions exn (begin (debug:print 0 "ERROR: Failed to open a connection to the server at host: " host " port: " port) (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) (set! *runremote* #f)) (if (and (not (args:get-arg "-server")) ;; no point in the server using the server using the server ((rpc:procedure 'server:login host portn) *toppath*)) (begin (debug:print-info 2 "Logged in and connected to " host ":" port) (set! *runremote* (vector host portn))) (begin (debug:print-info 2 "Failed to login or connect to " host ":" port) (set! *runremote* #f))))) (debug:print-info 2 "no server available"))))) |
Modified testzmq/hwclient.scm from [8c368de31e] to [e984c3fbac].
1 2 3 | (use zmq posix) (define s (make-socket 'req)) | | | 1 2 3 4 5 6 7 8 9 10 11 | (use zmq posix) (define s (make-socket 'req)) (connect-socket s "tcp://*:5563") (define myname (cadr (argv))) (print "Start client...") (do ((i 0 (+ i 1))) ((>= i 1000)) |
︙ | ︙ |
Modified testzmq/hwserver.scm from [118f034d51] to [038a7e66e1].
1 2 3 | (use zmq srfi-18 posix) (define s (make-socket 'rep)) | | | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | (use zmq srfi-18 posix) (define s (make-socket 'rep)) (bind-socket s "tcp://*:5563") (print "Start server...") (let loop () (let* ((msg (receive-message s)) (name (caddr (string-split msg " "))) (resp (conc "World " name))) (print "Received request: [" msg "]") (thread-sleep! 0.0001) (print "Sending response \"" resp "\"") (send-message s resp) (loop))) |