Overview
Comment: | second pass http, queue processing remaining |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | http-transport |
Files: | files | file ages | folders |
SHA1: |
e6b987028dcb11af5e20c4da991b8001 |
User & Date: | matt on 2013-01-14 23:03:28 |
Other Links: | branch diff | manifest | tags |
Context
2013-01-15
| ||
17:40 | incrementl changes for http transport check-in: d3db3e5ba1 user: mrwellan tags: http-transport | |
2013-01-14
| ||
23:03 | second pass http, queue processing remaining check-in: e6b987028d user: matt tags: http-transport | |
22:17 | first pass implemenation using http check-in: 0d7a6d510a user: matt tags: http-transport | |
Changes
Modified db.scm from [887b32b0da] to [644346c603].
︙ | ︙ | |||
1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 | flush sync set-verbosity killserver)) ;; not used, intended to indicate to run in calling process (define db:run-local-queries '()) ;; rollup-tests-pass-fail)) ;; The queue is a list of vectors where the zeroth slot indicates the type of query to ;; apply and the second slot is the time of the query and the third entry is a list of ;; values to be applied ;; (define (db:process-queue db pubsock indata) (let* ((data (sort indata (lambda (a b) | > > > > > > > > > > > > > > > > > > > > > > > > > | 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 | flush sync set-verbosity killserver)) ;; not used, intended to indicate to run in calling process (define db:run-local-queries '()) ;; rollup-tests-pass-fail)) UPDATE DB:PROCESS_QUEUE@@@@ ;; The queue is a list of vectors where the zeroth slot indicates the type of query to ;; apply and the second slot is the time of the query and the third entry is a list of ;; values to be applied ;; (define (db:process-queue db pubsock indata) (let* ((data (sort indata (lambda (a b) |
︙ | ︙ |
Modified server.scm from [f532ca81a3] to [a4326a1d4d].
︙ | ︙ | |||
56 57 58 59 60 61 62 | (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") #f))) (if ipstr ipstr hostname))) (start-port (if (args:get-arg "-port") (string->number (args:get-arg "-port")) (+ 5000 (random 1001))))) (set! *cache-on* #t) | | > > > > > > > > > > > > > > > > > > > > > > | | 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 | (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") #f))) (if ipstr ipstr hostname))) (start-port (if (args:get-arg "-port") (string->number (args:get-arg "-port")) (+ 5000 (random 1001))))) (set! *cache-on* #t) (server:try-start-server ipaddrstr start-port))) (define (server:main-loop) (define-page (main-page-path) (lambda () (with-request-variables (dat) (let* ((packet (db:string->obj dat)) (qtype (cdb:packet-get-qtype packet))) (debug:print-info 12 "server=> received packet=" packet) (if (not (member qtype '(sync ping))) (begin (mutex-lock! *heartbeat-mutex*) (set! *last-db-access* (current-seconds)) (mutex-unlock! *heartbeat-mutex*))) (open-run-close db:process-queue #f pub-socket (cons packet queue-lst))))))) ;; This is recursively run by server:run until sucessful ;; (define (server:try-start-server ipaddrstr portnum) (handle-exceptions exn (begin (print-error-message exn) (if (< portnum 9000) (begin (print "WARNING: failed to start on portnum: " portnum ", trying next port") (sleep 1) (server:try-start-server ipaddrstr (+ portnum 1))) (print "ERROR: Tried and tried but could not start the server"))) (print "INFO: Trying to start server on portnum: " portnum) (set! *runremote* (list ipaddrstr portnum)) (open-run-close tasks:server-register tasks:open-db (current-process-id) ipaddrstr portnum 0 'live) (awful-start server:main-loop ip-address: ipaddrstr port: portnum))) (define (server:mk-signature) (message-digest-string (md5-primitive) (with-output-to-string (lambda () (write (list (current-directory) (argv))))))) |
︙ | ︙ | |||
95 96 97 98 99 100 101 102 103 104 105 106 | ;;====================================================================== (define (server:get-client-signature) (if *my-client-signature* *my-client-signature* (let ((sig (server:mk-signature))) (set! *my-client-signature* sig) *my-client-signature*))) ;; <html> ;; <head></head> ;; <body>1 Hello, world! Goodbye Dolly</body></html> ;; Send msg to serverdat and receive result (define (server:client-send-receive serverdat msg) | > | | | | | | > | | | | 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 | ;;====================================================================== (define (server:get-client-signature) (if *my-client-signature* *my-client-signature* (let ((sig (server:mk-signature))) (set! *my-client-signature* sig) *my-client-signature*))) ;; <html> ;; <head></head> ;; <body>1 Hello, world! Goodbye Dolly</body></html> ;; Send msg to serverdat and receive result (define (server:client-send-receive serverdat msg) (let* ((res (with-input-from-request (conc (server:make-server-url serverdat) "/?dat=" msg) #f read-string)) (match (string-search (regexp "<body>(.*)<.body>") (caddr (string-split res "\n"))))) (cadr match))) (define (server:client-login serverdat) (cdb:login serverdat *toppath* (server:get-client-signature))) ;; Not currently used! But, I think it *should* be used!!! (define (server:client-logout serverdat) (let ((ok (and (socket? serverdat) (cdb:logout serverdat *toppath* (server:get-client-signature))))) ;; (close-socket serverdat) ok)) (define (server:client-connect iface port) (let* ((login-res #f) (serverdat (list iface port))) (set! login-res (server:client-login serverdat)) (if (and (not (null? login-res)) (car login-res)) (begin (debug:print-info 2 "Logged in and connected to " iface ":" port) (set! *runremote* serverdat) serverdat) (begin (debug:print-info 2 "Failed to login or connect to " iface ":" port) (set! *runremote* #f) #f)))) ;; Do all the connection work, start a server if not already running (define (server:client-setup #!key (numtries 50)) (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: failed to find megatest.config, exiting") (exit)))) (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) (if hostinfo (let ((host (list-ref hostinfo 0)) (iface (list-ref hostinfo 1))) (debug:print-info 2 "Setting up to connect to " hostinfo) (server:client-connect iface port)) ;; ) (if (> numtries 0) (let ((exe (car (argv))) (pid #f)) (debug:print-info 0 "No server available, attempting to start one...") ;; (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*) ;; (string-intersperse *verbosity* ",") ;; (conc *verbosity*))))) |
︙ | ︙ |
Modified tasks.scm from [bc19277591] to [2c690c7219].
︙ | ︙ | |||
58 59 60 61 62 63 64 | hostname TEXT, username TEXT, CONSTRAINT monitors_constraint UNIQUE (pid,hostname));") (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS servers (id INTEGER PRIMARY KEY, pid INTEGER, interface TEXT, hostname TEXT, | | < | | | | | | | | | | | | | | | | | | | 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 | hostname TEXT, username TEXT, CONSTRAINT monitors_constraint UNIQUE (pid,hostname));") (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS servers (id INTEGER PRIMARY KEY, pid INTEGER, interface TEXT, hostname TEXT, port INTEGER, start_time TIMESTAMP, priority INTEGER, state TEXT, mt_version TEXT, heartbeat TIMESTAMP, CONSTRAINT servers_constraint UNIQUE (pid,hostname,port));") (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS clients (id INTEGER PRIMARY KEY, server_id INTEGER, pid INTEGER, hostname TEXT, cmdline TEXT, login_time TIMESTAMP, logout_time TIMESTAMP DEFAULT -1, CONSTRAINT clients_constraint UNIQUE (pid,hostname));") )) mdb)) ;;====================================================================== ;; Server and client management ;;====================================================================== ;; state: 'live, 'shutting-down, 'dead (define (tasks:server-register mdb pid interface port priority state) (sqlite3:execute mdb "INSERT OR REPLACE INTO servers (pid,hostname,port,start_time,priority,state,mt_version,heartbeat,interface) VALUES(?, ?, ?, strftime('%s','now'), ?, ?, ?, strftime('%s','now'),?);" pid (get-host-name) port priority (conc state) megatest-version interface) (list (tasks:server-get-server-id mdb (get-host-name) port pid) interface port )) ;; NB// two servers with same pid on different hosts will be removed from the list if pid: is used! (define (tasks:server-deregister mdb hostname #!key (port #f)(pid #f)(action 'markdead)) (debug:print-info 11 "server-deregister " hostname ", port " port ", pid " pid) (if pid (case action ((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid)) (else (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE pid=?;" pid))) (if port (case action ((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE hostname=? AND port=?;" hostname port)) (else (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE hostname=? AND port=?;" hostname port))) (debug:print 0 "ERROR: tasks:server-deregister called with neither pid nor port specified")))) (define (tasks:server-deregister-self mdb hostname) (tasks:server-deregister mdb hostname pid: (current-process-id))) (define (tasks:server-get-server-id mdb hostname port pid) (let ((res #f)) (sqlite3:for-each-row (lambda (id) (set! res id)) mdb (if (and hostname pid) "SELECT id FROM servers WHERE hostname=? AND pid=?;" "SELECT id FROM servers WHERE hostname=? AND port=?;") hostname (if pid pid port)) res)) (define (tasks:server-update-heartbeat mdb server-id) (sqlite3:execute mdb "UPDATE servers SET heartbeat=strftime('%s','now') WHERE id=?;" server-id)) ;; alive servers keep the heartbeat field upto date with seconds every 6 or so seconds (define (tasks:server-alive? mdb server-id #!key (hostname #f)(port #f)(pid #f)) (let* ((server-id (if server-id server-id (tasks:server-get-server-id mdb hostname port pid))) (heartbeat-delta 99e9)) (sqlite3:for-each-row (lambda (delta) (set! heartbeat-delta delta)) mdb "SELECT strftime('%s','now')-heartbeat FROM servers WHERE id=?;" server-id) (< heartbeat-delta 10))) |
︙ | ︙ | |||
169 170 171 172 173 174 175 | ;; ping each server in the db and return first found that responds. ;; remove any others. will not necessarily remove all! (define (tasks:get-best-server mdb) (let ((res '()) (best #f)) (sqlite3:for-each-row | | | | | | < | | | | | | | < | 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 | ;; ping each server in the db and return first found that responds. ;; remove any others. will not necessarily remove all! (define (tasks:get-best-server mdb) (let ((res '()) (best #f)) (sqlite3:for-each-row (lambda (id hostname interface port pid) (set! res (cons (list hostname interface port pid) res)) (debug:print-info 2 "Found existing server " hostname ":" port " registered in db")) mdb "SELECT id,hostname,interface,port,pid FROM servers WHERE strftime('%s','now')-heartbeat < 10 AND mt_version=? ORDER BY start_time ASC LIMIT 1;" megatest-version) (if (null? res) #f (let loop ((hed (car res)) (tal (cdr res))) ;; (print "hed=" hed ", tal=" tal) (let* ((host (list-ref hed 0)) (iface (list-ref hed 1)) (port (list-ref hed 2)) (pid (list-ref hed 4)) (alive (open-run-close tasks:server-alive? tasks:open-db #f hostname: host port: port))) (if alive (begin (debug:print-info 2 "Found an existing, alive, server " host ", " port ".") (list host iface port)) (begin (debug:print-info 1 "Marking " host ":" port " as dead in server registry.") (if port (open-run-close tasks:server-deregister tasks:open-db host port: port) (open-run-close tasks:server-deregister tasks:open-db host pid: pid)) (if (null? tal) #f (loop (car tal)(cdr tal)))))))))) (define (tasks:mark-server hostname port pid state) (if port (open-run-close tasks:server-deregister tasks:open-db hostname port: port) (open-run-close tasks:server-deregister tasks:open-db hostname pid: pid))) (define (tasks:kill-server status hostname port pid) (debug:print-info 1 "Removing defunct server record for " hostname ":" port) (if port (open-run-close tasks:server-deregister tasks:open-db hostname port: port) (open-run-close tasks:server-deregister tasks:open-db hostname pid: pid)) (if status ;; #t means alive (begin |
︙ | ︙ | |||
240 241 242 243 244 245 246 | (debug:print 0 "WARNING: Can't kill frozen server on remote host " hostname)))))) (define (tasks:get-all-servers mdb) (let ((res '())) (sqlite3:for-each-row | | | | | 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 | (debug:print 0 "WARNING: Can't kill frozen server on remote host " hostname)))))) (define (tasks:get-all-servers mdb) (let ((res '())) (sqlite3:for-each-row (lambda (id pid hostname interface port start-time priority state mt-version last-update) (set! res (cons (vector id pid hostname interface port start-time priority state mt-version last-update) res))) mdb "SELECT id,pid,hostname,interface,port,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update FROM servers ORDER BY start_time DESC;") res)) ;;====================================================================== ;; Tasks and Task monitors ;;====================================================================== |
︙ | ︙ |