Overview
Comment: | wip |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | v1.70-captain-ulex | v1.70-defunct-try |
Files: | files | file ages | folders |
SHA1: |
7f562787413cbe3bdafad21d2ab1f09e |
User & Date: | matt on 2020-01-06 22:12:14 |
Other Links: | branch diff | manifest | tags |
Context
2020-01-08
| ||
01:11 | Removed nanomsg from dcommonmon and megamod check-in: 95b4151a44 user: matt tags: v1.70-captain-ulex, v1.70-defunct-try | |
2020-01-06
| ||
22:12 | wip check-in: 7f56278741 user: matt tags: v1.70-captain-ulex, v1.70-defunct-try | |
17:06 | wip check-in: 0390dc30b4 user: mrwellan tags: v1.70-captain-ulex, v1.70-defunct-try | |
Changes
Modified ulex/ulex.scm from [7b7114168d] to [43414b60e9].
︙ | ︙ | |||
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 | (my-port #f) (my-pid (current-process-id)) ;; server and handler thread (serv-listener #f) ;; this processes server info (handler-thread #f) (handlers (make-hash-table)) (outgoing-conns (make-hash-table)) ;; host:port -> conn ;; app info (appname #f) (dbtypes (make-hash-table)) ;; this should be an alist but hash is easier. dbtype => [ initproc syncproc ] ;; cookies (cnum 0) ;; cookie num ) ;; struct for keeping track of others we are talking to (defstruct peer (addr-port #f) (hostname #f) (pid #f) (inp #f) ;; input port from the peer (oup #f) ;; output port to the peer (owns '()) ;; list of databases this peer is currently handling ) ;;====================================================================== ;; Captain pkt functions ;;====================================================================== ;; given a pkts dir read ;; (define (get-all-captain-pkts udata) | > > > > > > > > > > > | 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 | (my-port #f) (my-pid (current-process-id)) ;; server and handler thread (serv-listener #f) ;; this processes server info (handler-thread #f) (handlers (make-hash-table)) (outgoing-conns (make-hash-table)) ;; host:port -> conn (mboxes (make-hash-table)) ;; key => mbox (work-queue (make-queue)) ;; most stuff goes here (fast-queue (make-queue)) ;; super quick stuff goes here (e.g. ping) (busy #f) ;; is either of the queues busy ;; app info (appname #f) (dbtypes (make-hash-table)) ;; this should be an alist but hash is easier. dbtype => [ initproc syncproc ] ;; cookies (cnum 0) ;; cookie num ) ;; struct for keeping track of others we are talking to (defstruct peer (addr-port #f) (hostname #f) (pid #f) (inp #f) ;; input port from the peer (oup #f) ;; output port to the peer (owns '()) ;; list of databases this peer is currently handling ) (defstruct work (peer-dat #f) (handlerkey #f) (qrykey #f) (data #f) (start (current-milliseconds))) ;;====================================================================== ;; Captain pkt functions ;;====================================================================== ;; given a pkts dir read ;; (define (get-all-captain-pkts udata) |
︙ | ︙ | |||
337 338 339 340 341 342 343 | (if (null? params) "" (conc " " (string-intersperse params " ")))) oup) (write-line data oup) ;; NOTE: DO NOT BE TEMPTED TO LOOK AT ANY DATA ON INP HERE! ;; (there is a listener for handling that) )) | > > > > > > > > > > > > > > > > | > > > > > > > > > > > > > > > > | | | | | 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 | (if (null? params) "" (conc " " (string-intersperse params " ")))) oup) (write-line data oup) ;; NOTE: DO NOT BE TEMPTED TO LOOK AT ANY DATA ON INP HERE! ;; (there is a listener for handling that) )) ;; send a request to the given host-port and register a mailbox in udata ;; wait for the mailbox data and return it ;; (define (send-receive udata host-port handler qrykey data #!key (hostname #f)(pid #f)(params '())) (let ((mbox (make-mailbox)) (mbox-time (current-milliseconds)) (mboxes (udat-mboxes udata))) (hash-table-set! mboxes qrykey mbox) (send udata host-port handler qrykey data hostname: hostname pid: pid params: params) (let* ((mbox-timeout-secs 20) (mbox-timeout-result 'MBOX_TIMEOUT) (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result)) (mbox-receive-time (current-milliseconds))) (hash-table-delete! mboxes qrykey) res))) (define (add-to-work-queue udata peer-dat handlerkey qrykey data) (let ((wdat (make-work peer-dat: peer-dat handlerkey: handlerkey qrykey: qrykey data: data))) (if (udat-busy udata) (queue-add! (udat-work-queue udata) wdat) (process-work udata wdat)) ;; passing in wdat tells process-work to first process the passed in wdat )) (define (do-work udata wdat) #f) (define (process-work udata #!optional wdat) (if wdat (do-work udata wdat)) ;; process wdat (let ((wqueue (udat-work-queue udata))) (if (not (queue-empty? wqueue)) (let loop ((wd (queue-remove! wqueue))) (do-work udata wd) (if (not (queue-empty? wqueue)) (loop (queue-remove! wqueue))))))) ;; send back ack - this is tcp we are talking about, do we really need an ack? ;; ;; NOTE: No need to send back host:port of self - that is locked in by qrykey ;; (define (send-ack udata host-port qrykey) ;; #!optional (hostname #f)(pid #f)) (send udata host-port "ack" qrykey qrykey)) ;; we must send a second line - for the ack let it be the qrykey ;; ;; (define (ulex-handler udata) (let* ((serv-listener (udat-serv-listener udata))) (let-values (((inp oup)(tcp-accept serv-listener))) ;; data comes as two lines ;; handlerkey resp-addr:resp-port hostname pid qrykey [dbpath/dbfile.db] ;; data (let loop ((state 'start)) (let* ((controldat (read-line inp)) (data (read-line inp))) (match (string-split controldat) ((handlerkey host:port pid qrykey params ...) (case (string->symbol handlerkey) ((ack)(print "Got ack!")) ((ping) (let* ((proc (hash-table-ref/default (udat-handlers udata) 'ping #f)) (val (if proc (proc) "gotping"))) (send udata host:port "version" qrykey val))) ((rucaptain) (send udata host:port "iamcaptain" qrykey (if (udat-my-cpkt-key udata) "yes" "no"))) (else ;; (send-ack udata host:port qrykey) (add-to-work-queue udata (get-peer-dat udata host:port) handlerkey qrykey data))) (else (print "BAD DATA? handler=" handlerkey " data=" data))))) (loop state))))) ;; add a proc to the handler list (define (register-handler udata key proc) (hash-table-set! (udat-handlers udata) key proc)) |
︙ | ︙ |