Overview
Comment: | Basic server/client working |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | http-transport |
Files: | files | file ages | folders |
SHA1: |
ed470f76ce2683928d86ad31a00fcdbe |
User & Date: | matt on 2013-01-15 22:47:39 |
Other Links: | branch diff | manifest | tags |
Context
2013-01-15
| ||
23:10 | Added few more working calls check-in: 7a5200221d user: matt tags: http-transport | |
22:47 | Basic server/client working check-in: ed470f76ce user: matt tags: http-transport | |
17:40 | incrementl changes for http transport check-in: d3db3e5ba1 user: mrwellan tags: http-transport | |
Changes
Modified db.scm from [a287f6a261] to [7976f66b1a].
︙ | ︙ | |||
1134 1135 1136 1137 1138 1139 1140 | (zdat (db:obj->string (vector client-sig qtype immediate query-sig params (current-seconds)))) ;; (with-output-to-string (lambda ()(serialize params)))) ) ;; (print "zdat=" zdat) (let* ( (res #f) (rawdat (server:client-send-receive serverdat zdat)) (tmp #f)) | | | | | 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 | (zdat (db:obj->string (vector client-sig qtype immediate query-sig params (current-seconds)))) ;; (with-output-to-string (lambda ()(serialize params)))) ) ;; (print "zdat=" zdat) (let* ( (res #f) (rawdat (server:client-send-receive serverdat zdat)) (tmp #f)) (debug:print-info 11 "Sent " zdat ", received " rawdat) (set! tmp (db:string->obj rawdat)) ;; (if (equal? query-sig (vector-ref myres 1)) ;; (set! res (vector-ref tmp 2) ;; (loop (server:client-send-receive serverdat zdat))))))) ;; (timeout (lambda () ;; (let loop ((n numretries)) ;; (thread-sleep! 15) ;; (if (not res) ;; (if (> numretries 0) ;; (begin |
︙ | ︙ | |||
1274 1275 1276 1277 1278 1279 1280 | (let* ((data (sort indata (lambda (a b) (< (cdb:packet-get-qtime a)(cdb:packet-get-qtime b)))))) (for-each (lambda (item) (db:process-queue-item db pubsock item)) data))) | | | | | | | | | | | 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 | (let* ((data (sort indata (lambda (a b) (< (cdb:packet-get-qtime a)(cdb:packet-get-qtime b)))))) (for-each (lambda (item) (db:process-queue-item db pubsock item)) data))) (define (db:process-queue-item db item) (let* ((stmt-key (cdb:packet-get-qtype item)) (qry-sig (cdb:packet-get-query-sig item)) (return-address (cdb:packet-get-client-sig item)) (params (cdb:packet-get-params item)) (query (let ((q (alist-ref stmt-key db:queries))) (if q (car q) #f)))) (debug:print-info 11 "Special queries/requests stmt-key=" stmt-key ", return-address=" return-address ", qrery=" query ", params=" params) (cond (query (apply sqlite3:execute db query params) (server:reply pubsock return-address qry-sig #t #t)) ((member stmt-key db:special-queries) (debug:print-info 11 "Handling special statement " stmt-key) (case stmt-key ((immediate) (let ((proc (car params)) (remparams (cdr params))) ;; we are being handed a procedure so call it (debug:print-info 11 "Running (apply " proc " " remparams ")") (server:reply return-address qry-sig #t (apply proc remparams)))) ((login) (if (< (length params) 3) ;; should get toppath, version and signature (server:reply return-address qry-sig '(#f "login failed due to missing params")) ;; missing params (let ((calling-path (car params)) (calling-vers (cadr params)) (client-key (caddr params))) (if (and (equal? calling-path *toppath*) (equal? megatest-version calling-vers)) (begin (hash-table-set! *logged-in-clients* client-key (current-seconds)) (server:reply return-address qry-sig #t '(#t "successful login"))) ;; path matches - pass! Should vet the caller at this time ... (server:reply return-address qry-sig #f (list #f (conc "Login failed due to mismatch paths: " calling-path ", " *toppath*))))))) ((flush sync) (server:reply return-address qry-sig #t 1)) ;; (length data))) ((set-verbosity) (set! *verbosity* (car params)) (server:reply return-address qry-sig #t '(#t *verbosity*))) ((killserver) (debug:print 0 "WARNING: Server going down in 15 seconds by user request!") (open-run-close tasks:server-deregister tasks:open-db (cadr *server-info*) pullport: (caddr *server-info*)) (thread-start! (make-thread (lambda ()(thread-sleep! 15)(exit)))) (server:reply return-address qry-sig #t '(#t "exit process started"))) (else ;; not a command, i.e. is a query (debug:print 0 "ERROR: Unrecognised query/command " stmt-key) (server:reply pubsock return-address qry-sig #f 'failed)))) (else (debug:print-info 11 "Executing " stmt-key " for " params) (apply sqlite3:execute (hash-table-ref queries stmt-key) params) (server:reply return-address qry-sig #t #t))))) (define (db:test-get-records-for-index-file db run-id test-name) (let ((res '())) (sqlite3:for-each-row (lambda (id itempath state status run_duration logf comment) (set! res (cons (vector id itempath state status run_duration logf comment) res))) db |
︙ | ︙ |
Modified server.scm from [ccebef113b] to [17b16fc90f].
︙ | ︙ | |||
75 76 77 78 79 80 81 | (qtype (cdb:packet-get-qtype packet))) (debug:print-info 12 "server=> received packet=" packet) (if (not (member qtype '(sync ping))) (begin (mutex-lock! *heartbeat-mutex*) (set! *last-db-access* (current-seconds)) (mutex-unlock! *heartbeat-mutex*))) | | | 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 | (qtype (cdb:packet-get-qtype packet))) (debug:print-info 12 "server=> received packet=" packet) (if (not (member qtype '(sync ping))) (begin (mutex-lock! *heartbeat-mutex*) (set! *last-db-access* (current-seconds)) (mutex-unlock! *heartbeat-mutex*))) (open-run-close db:process-queue-item open-db packet)))))) ;; This is recursively run by server:run until sucessful ;; (define (server:try-start-server ipaddrstr portnum) (handle-exceptions exn |
︙ | ︙ | |||
113 114 115 116 117 118 119 | (write (list (current-directory) (argv))))))) ;;====================================================================== ;; S E R V E R U T I L I T I E S ;;====================================================================== | | | | | | | > | > > | 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 | (write (list (current-directory) (argv))))))) ;;====================================================================== ;; S E R V E R U T I L I T I E S ;;====================================================================== (define (server:reply return-addr query-sig success/fail result) (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result) ;; (send-message pubsock target send-more: #t) ;; (send-message pubsock (db:obj->string (vector success/fail query-sig result))) ;;====================================================================== ;; C L I E N T S ;;====================================================================== (define (server:get-client-signature) (if *my-client-signature* *my-client-signature* (let ((sig (server:mk-signature))) (set! *my-client-signature* sig) *my-client-signature*))) ;; <html> ;; <head></head> ;; <body>1 Hello, world! Goodbye Dolly</body></html> ;; Send msg to serverdat and receive result (define (server:client-send-receive serverdat msg) (let* ((url (server:make-server-url serverdat)) (fullurl (conc url "/?dat=" msg))) (debug:print-info 11 "fullurl=" fullurl) (let* ((res (with-input-from-request fullurl #f read-string))) (debug:print-info 11 "got res=" res) (let ((match (string-search (regexp "<body>(.*)<.body>") res))) (debug:print-info 11 "match=" match) (let ((final (cadr match))) (debug:print-info 11 "final=" final) final))))) (define (server:client-login serverdat) (cdb:login serverdat *toppath* (server:get-client-signature))) ;; Not currently used! But, I think it *should* be used!!! (define (server:client-logout serverdat) (let ((ok (and (socket? serverdat) |
︙ | ︙ | |||
177 178 179 180 181 182 183 | (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: failed to find megatest.config, exiting") (exit)))) (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) (if hostinfo (let ((host (list-ref hostinfo 0)) | | > > | | | 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 | (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: failed to find megatest.config, exiting") (exit)))) (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) (if hostinfo (let ((host (list-ref hostinfo 0)) (iface (list-ref hostinfo 1)) (port (list-ref hostinfo 2)) (pid (list-ref hostinfo 3))) (debug:print-info 2 "Setting up to connect to " hostinfo) (server:client-connect iface port)) ;; ) (if (> numtries 0) (let (;; (exe (car (argv))) (pid #f)) (debug:print-info 0 "No server available, attempting to start one...") ;; (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*) ;; (string-intersperse *verbosity* ",") ;; (conc *verbosity*))))) (set! pid (process-fork (lambda () ;; (current-input-port (open-input-file "/dev/null")) ;; (current-output-port (open-output-file "/dev/null")) ;; (current-error-port (open-output-file "/dev/null")) (server:launch)))) (let loop ((count 0)) (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) (if (not hostinfo) (begin (debug:print-info 0 "Waiting for server pid=" pid " to start") (sleep 2) ;; give server time to start (if (< count 5) |
︙ | ︙ | |||
223 224 225 226 227 228 229 | (mutex-unlock! *heartbeat-mutex*) (if sdat sdat (begin (sleep 4) (loop)))))) (iface (car server-info)) (port (cadr server-info)) | | > | | | | 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 | (mutex-unlock! *heartbeat-mutex*) (if sdat sdat (begin (sleep 4) (loop)))))) (iface (car server-info)) (port (cadr server-info)) (last-access 0) (spid (open-run-close tasks:server-get-server-id tasks:open-db #f iface port #f))) (print "Keep-running got server pid " spid ", using iface " iface " and port " port) (let loop ((count 0)) (thread-sleep! 4) ;; no need to do this very often ;; NB// sync currently does NOT return queue-length (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1))) ;; (print "Server running, count is " count) (if (< count 1) ;; 3x3 = 9 secs aprox (loop (+ count 1))) ;; NOTE: Get rid of this mechanism! It really is not needed... (open-run-close tasks:server-update-heartbeat tasks:open-db spid) ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access (mutex-lock! *heartbeat-mutex*) (set! last-access *last-db-access*) (mutex-unlock! *heartbeat-mutex*) (if (> (+ last-access ;; (* 50 60 60) ;; 48 hrs |
︙ | ︙ | |||
279 280 281 282 283 284 285 | (debug:print-info 2 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo)) (if *toppath* (let* ((th2 (make-thread (lambda () (server:run (if (args:get-arg "-server") (args:get-arg "-server") "-"))) "Server run")) | | < < | < < | | | 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 | (debug:print-info 2 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo)) (if *toppath* (let* ((th2 (make-thread (lambda () (server:run (if (args:get-arg "-server") (args:get-arg "-server") "-"))) "Server run")) (th3 (make-thread (lambda ()(server:keep-running)) "Keep running")) ) (thread-start! th2) (thread-start! th3) (set! *didsomething* #t) (thread-join! th2) ) (debug:print 0 "ERROR: Failed to setup for megatest"))) (exit))) (define (server:client-signal-handler signum) (handle-exceptions exn (debug:print " ... exiting ...") (let ((th1 (make-thread (lambda () "") ;; do nothing for now (was flush out last call if applicable) "eat response")) (th2 (make-thread (lambda () (debug:print 0 "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.") (thread-sleep! 1) ;; give the flush one second to do it's stuff (debug:print 0 " Done.") (exit 4)) "exit on ^C timer"))) (thread-start! th2) (thread-start! th1) (thread-join! th2)))) |
︙ | ︙ |
Modified tasks.scm from [cbe1edb880] to [38db4ca0e0].
︙ | ︙ | |||
89 90 91 92 93 94 95 | (define (tasks:server-register mdb pid interface port priority state) (sqlite3:execute mdb "INSERT OR REPLACE INTO servers (pid,hostname,port,start_time,priority,state,mt_version,heartbeat,interface) VALUES(?, ?, ?, strftime('%s','now'), ?, ?, ?, strftime('%s','now'),?);" pid (get-host-name) port priority (conc state) megatest-version interface) (list | | | > | > | > > > | | | | | | 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 | (define (tasks:server-register mdb pid interface port priority state) (sqlite3:execute mdb "INSERT OR REPLACE INTO servers (pid,hostname,port,start_time,priority,state,mt_version,heartbeat,interface) VALUES(?, ?, ?, strftime('%s','now'), ?, ?, ?, strftime('%s','now'),?);" pid (get-host-name) port priority (conc state) megatest-version interface) (list (tasks:server-get-server-id mdb (get-host-name) interface port pid) interface port )) ;; NB// two servers with same pid on different hosts will be removed from the list if pid: is used! (define (tasks:server-deregister mdb hostname #!key (port #f)(pid #f)(action 'markdead)) (debug:print-info 11 "server-deregister " hostname ", port " port ", pid " pid) (if pid (case action ((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid)) (else (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE pid=?;" pid))) (if port (case action ((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE hostname=? AND port=?;" hostname port)) (else (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE hostname=? AND port=?;" hostname port))) (debug:print 0 "ERROR: tasks:server-deregister called with neither pid nor port specified")))) (define (tasks:server-deregister-self mdb hostname) (tasks:server-deregister mdb hostname pid: (current-process-id))) (define (tasks:server-get-server-id mdb hostname iface port pid) (let ((res #f)) (sqlite3:for-each-row (lambda (id) (set! res id)) mdb (cond ((and hostname pid) "SELECT id FROM servers WHERE hostname=? AND pid=?;") ((and iface port) "SELECT id FROM servers WHERE interface=? AND port=?;") ((and hostname port) "SELECT id FROM servers WHERE hostname=? AND port=?;") (else (begin (debug:print 0 "ERROR: tasks:server-get-server-id needs (hostname and pid) OR (iface and port) OR (hostname and port)") "SELECT id FROM servers WHERE pid=-999;"))) (if hostname hostname iface)(if pid pid port)) res)) (define (tasks:server-update-heartbeat mdb server-id) (sqlite3:execute mdb "UPDATE servers SET heartbeat=strftime('%s','now') WHERE id=?;" server-id)) ;; alive servers keep the heartbeat field upto date with seconds every 6 or so seconds (define (tasks:server-alive? mdb server-id #!key (iface #f)(hostname #f)(port #f)(pid #f)) (let* ((server-id (if server-id server-id (tasks:server-get-server-id mdb hostname iface port pid))) (heartbeat-delta 99e9)) (sqlite3:for-each-row (lambda (delta) (set! heartbeat-delta delta)) mdb "SELECT strftime('%s','now')-heartbeat FROM servers WHERE id=?;" server-id) (< heartbeat-delta 10))) (define (tasks:client-register mdb pid hostname cmdline) (sqlite3:execute mdb "INSERT OR REPLACE INTO clients (server_id,pid,hostname,cmdline,login_time) VALUES(?,?,?,?,strftime('%s','now'));") (tasks:server-get-server-id mdb hostname #f #f pid) pid hostname cmdline) (define (tasks:client-logout mdb pid hostname cmdline) (sqlite3:execute mdb "UPDATE clients SET logout_time=strftime('%s','now') WHERE pid=? AND hostname=? AND cmdline=?;" pid hostname cmdline)) |
︙ | ︙ | |||
175 176 177 178 179 180 181 | (lambda (id hostname interface port pid) (set! res (cons (list hostname interface port pid) res)) (debug:print-info 2 "Found existing server " hostname ":" port " registered in db")) mdb "SELECT id,hostname,interface,port,pid FROM servers WHERE strftime('%s','now')-heartbeat < 10 AND mt_version=? ORDER BY start_time ASC LIMIT 1;" megatest-version) | > | > > > > | | | | | | | | | | | | | | | | | | | | | | 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 | (lambda (id hostname interface port pid) (set! res (cons (list hostname interface port pid) res)) (debug:print-info 2 "Found existing server " hostname ":" port " registered in db")) mdb "SELECT id,hostname,interface,port,pid FROM servers WHERE strftime('%s','now')-heartbeat < 10 AND mt_version=? ORDER BY start_time ASC LIMIT 1;" megatest-version) ;; for now we are keeping only one server registered in the db, return #f or first server found (if (null? res) #f (car res)))) ;; BUG: This logic is probably needed unless methodology changes completely... ;; ;; (if (null? res) #f ;; (let loop ((hed (car res)) ;; (tal (cdr res))) ;; ;; (print "hed=" hed ", tal=" tal) ;; (let* ((host (list-ref hed 0)) ;; (iface (list-ref hed 1)) ;; (port (list-ref hed 2)) ;; (pid (list-ref hed 4)) ;; (alive (open-run-close tasks:server-alive? tasks:open-db #f hostname: host port: port))) ;; (if alive ;; (begin ;; (debug:print-info 2 "Found an existing, alive, server " host ", " port ".") ;; (list host iface port)) ;; (begin ;; (debug:print-info 1 "Marking " host ":" port " as dead in server registry.") ;; (if port ;; (open-run-close tasks:server-deregister tasks:open-db host port: port) ;; (open-run-close tasks:server-deregister tasks:open-db host pid: pid)) ;; (if (null? tal) ;; #f ;; (loop (car tal)(cdr tal)))))))))) (define (tasks:remove-server-records mdb) (sqlite3:execute mdb "DELETE FROM servers;")) (define (tasks:mark-server hostname port pid state) (if port (open-run-close tasks:server-deregister tasks:open-db hostname port: port) (open-run-close tasks:server-deregister tasks:open-db hostname pid: pid))) |
︙ | ︙ |