Megatest

Diff
Login

Differences From Artifact [1f587f4eee]:

To Artifact [904aac257a]:


295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317

318
319
320
321
322
323
324
325
326
327
328

329
330
331
332
333
334
335
336
337
338
339
340
341
342



343
344
345
346
347

348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
  (set! *http-connections-next-cleanup* (+ (current-seconds) 10))
  (mutex-unlock! *http-mutex*))

(define (http-transport:inc-requests-and-prep-to-close-all-connections)
  (mutex-lock! *http-mutex*)
  (set! *http-requests-in-progress* (+ 1 *http-requests-in-progress*)))

;; Turn off proxy handling
(define (http-transport:client-turn-off-proxy)
  (determine-proxy (constantly #f))) ;; From (chicken base)

;; serverdat contains uuid to be used for connection validation
;;
;; NOTE: serverdat must be initialized once by servdat-init
;;
(define (http-transport:send-receive serverdat cmd params #!key (numretries 3))
  (let* ((fullurl    (servdat-api-uri serverdat)) ;; gets uri for /api
	 (res        #f)
	 (success    #t)
         (server-id  (servdat-uuid serverdat)))
       ;; set up the http-client here
    (max-retry-attempts 1)
    ;; consider all requests indempotent

    (retry-request? (lambda (request)
		      #f))
    ;; send the data and get the response extract the needed info from
    ;; the http data and process and return it.
    (let* ((send-recieve (lambda ()
			   (set! res
				 (vector
				  #t ;; success
				  (with-input-from-request
				   fullurl 
				   (list (cons 'key server-id)

					 (cons 'cmd cmd)
					 (cons 'params params))
				   read)))))
	      (time-out     (lambda ()
			      (thread-sleep! 45)
			      (debug:print 0 *default-log-port* "WARNING: send-receive took more than 45 seconds!!")
			      #f))
	      (th1 (make-thread send-recieve "with-input-from-request"))
	      (th2 (make-thread time-out     "time out")))
	 (thread-start! th1)
	 (thread-start! th2)
	 (thread-join! th1)
	 (close-idle-connections!)
	 (thread-terminate! th2)



      res)))

;; careful closing of connections stored in *runremote*
;;
(define (http-transport:close-connections #!key (area-dat #f))

  (let* ((runremote  (or area-dat *runremote*))
	 (server-dat (if runremote
                         (remote-conndat runremote)
                         #f))) ;; (hash-table-ref/default *runremote* run-id #f)))
    (if (vector? server-dat)
	(let ((api-dat (http-transport:server-dat-get-api-uri server-dat)))
	  (handle-exceptions
	    exn
	    (begin
	      (print-call-chain *default-log-port*)
	      (debug:print-error 0 *default-log-port* " closing connection failed with error: " ((condition-property-accessor 'exn 'message) exn) ", exn=" exn))
	    (close-connection! api-dat)
            ;;(close-idle-connections!)
	    #t))
	#f)))


(define (make-http-transport:server-dat)(make-vector 6))
(define (http-transport:server-dat-get-iface         vec)    (vector-ref  vec 0))
(define (http-transport:server-dat-get-port          vec)    (vector-ref  vec 1))
(define (http-transport:server-dat-get-api-uri       vec)    (vector-ref  vec 2))
(define (http-transport:server-dat-get-api-url       vec)    (vector-ref  vec 3))







<
<
<
<


|

|
<
|

<
<
<
<
>
|
<







|
|
>

|












>
>
>
|




>
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|







295
296
297
298
299
300
301




302
303
304
305
306

307
308




309
310

311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
  (set! *http-connections-next-cleanup* (+ (current-seconds) 10))
  (mutex-unlock! *http-mutex*))

(define (http-transport:inc-requests-and-prep-to-close-all-connections)
  (mutex-lock! *http-mutex*)
  (set! *http-requests-in-progress* (+ 1 *http-requests-in-progress*)))





;; serverdat contains uuid to be used for connection validation
;;
;; NOTE: serverdat must be initialized or created by servdat-init
;;
(define (http-transport:send-receive sdat qry-key cmd params #!key (numretries 3))

  (let* ((res        #f)
	 (success    #t)




	 (sparams    (with-output-to-string
		       (lambda ()(write params)))))

    ;; send the data and get the response extract the needed info from
    ;; the http data and process and return it.
    (let* ((send-recieve (lambda ()
			   (set! res
				 (vector
				  #t ;; success
				  (with-input-from-request
				   (servdat-api-uri sdat)
				   (list (cons 'key qry-key)
					 (cons 'srvid (servdat-uuid sdat))
					 (cons 'cmd cmd)
					 (cons 'params sparams))
				   read)))))
	      (time-out     (lambda ()
			      (thread-sleep! 45)
			      (debug:print 0 *default-log-port* "WARNING: send-receive took more than 45 seconds!!")
			      #f))
	      (th1 (make-thread send-recieve "with-input-from-request"))
	      (th2 (make-thread time-out     "time out")))
	 (thread-start! th1)
	 (thread-start! th2)
	 (thread-join! th1)
	 (close-idle-connections!)
	 (thread-terminate! th2)
	 (if (string? res)
	     (with-input-from-string res
	       (lambda () read))
	     res))))

;; careful closing of connections stored in *runremote*
;;
(define (http-transport:close-connections #!key (area-dat #f))
  (debug:print-info 0 *default-log-port* "http-transport:close-connections doesn't do anything now!"))
;;   (let* ((runremote  (or area-dat *runremote*))
;; 	 (server-dat (if runremote
;;                          (remote-conndat runremote)
;;                          #f))) ;; (hash-table-ref/default *runremote* run-id #f)))
;;     (if (vector? server-dat)
;; 	(let ((api-dat (http-transport:server-dat-get-api-uri server-dat)))
;; 	  (handle-exceptions
;; 	    exn
;; 	    (begin
;; 	      (print-call-chain *default-log-port*)
;; 	      (debug:print-error 0 *default-log-port* " closing connection failed with error: " ((condition-property-accessor 'exn 'message) exn) ", exn=" exn))
;; 	    (close-connection! api-dat)
;;             ;;(close-idle-connections!)
;; 	    #t))
;; 	#f)))


(define (make-http-transport:server-dat)(make-vector 6))
(define (http-transport:server-dat-get-iface         vec)    (vector-ref  vec 0))
(define (http-transport:server-dat-get-port          vec)    (vector-ref  vec 1))
(define (http-transport:server-dat-get-api-uri       vec)    (vector-ref  vec 2))
(define (http-transport:server-dat-get-api-url       vec)    (vector-ref  vec 3))
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410






411
412
413
414
415
416
417
418
(define (http-transport:server-dat-update-last-access vec)
  (if (vector? vec)
      (vector-set! vec 5 (current-seconds))
      (begin
	(print-call-chain (current-error-port))
	(debug:print-error 0 *default-log-port* "call to http-transport:server-dat-update-last-access with non-vector!!"))))

;;
;; connect
;;
(define (http-transport:client-connect iface port server-id) 
  (let* ((api-url      (conc "http://" iface ":" port "/api"))
	 (api-uri      (uri-reference (conc "http://" iface ":" port "/api")))
	 (api-req      (make-request method: 'POST uri: api-uri))
	 (server-dat   (vector iface port api-uri api-url api-req (current-seconds) server-id)))
    server-dat))

;; initialize servdat for client side
(define (servdat-init sdat-in iface port uuid)
  (let* ((sdat (or sdat-in (make-servdat))))
    (if uuid (servdat-uuid-set! sdat uuid))
    (servdat-host-set! sdat iface)
    (servdat-port-set! sdat port)
    (servdat-api-url-set! sdat (conc "http://" iface ":" port "/api"))
    (servdat-api-uri-set! sdat (uri-reference (servdat->url sdat)))
    (servdat-api-req-set! sdat (make-request method: 'POST
					     uri: (servdat-api-uri sdat)))






    sdat))

;;======================================================================
;; NEW SERVER METHOD
;;======================================================================

;; only use for main.db - need to re-write some of this :(
;;







|
|

<
<
<
<
<
<
<
<









>
>
>
>
>
>
|







380
381
382
383
384
385
386
387
388
389








390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
(define (http-transport:server-dat-update-last-access vec)
  (if (vector? vec)
      (vector-set! vec 5 (current-seconds))
      (begin
	(print-call-chain (current-error-port))
	(debug:print-error 0 *default-log-port* "call to http-transport:server-dat-update-last-access with non-vector!!"))))

;; initialize servdat for client side, setup needed parameters
;; pass in #f as sdat-in to create sdat
;;








(define (servdat-init sdat-in iface port uuid)
  (let* ((sdat (or sdat-in (make-servdat))))
    (if uuid (servdat-uuid-set! sdat uuid))
    (servdat-host-set! sdat iface)
    (servdat-port-set! sdat port)
    (servdat-api-url-set! sdat (conc "http://" iface ":" port "/api"))
    (servdat-api-uri-set! sdat (uri-reference (servdat->url sdat)))
    (servdat-api-req-set! sdat (make-request method: 'POST
					     uri: (servdat-api-uri sdat)))
    ;; set up the http-client parameters
    (max-retry-attempts 1)
    ;; consider all requests indempotent
    (retry-request? (lambda (request)
		      #f))
    (determine-proxy (constantly #f))
   sdat))

;;======================================================================
;; NEW SERVER METHOD
;;======================================================================

;; only use for main.db - need to re-write some of this :(
;;
470
471
472
473
474
475
476
477
478
479
480

481
482
483
484
485
486
487
	   (read-pkt->alist pkt-file pktspec: pktspec))
	 all-pkt-files)))

(define (server-address srv-pkt)
  (conc (alist-ref 'host srv-pkt) ":"
	(alist-ref 'port srv-pkt)))
	
(define (server-ready? server-address)
  ;; ping the server and ask it
  ;; if it ready
  #f)


;; from the pkts return servers associated with dbpath
;; NOTE: Only one can be alive - have to check on each
;;       in the list of pkts returned
;;
(define (get-viable-servers serv-pkts dbpath)
  (let loop ((tail serv-pkts)







|


|
>







464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
	   (read-pkt->alist pkt-file pktspec: pktspec))
	 all-pkt-files)))

(define (server-address srv-pkt)
  (conc (alist-ref 'host srv-pkt) ":"
	(alist-ref 'port srv-pkt)))
	
(define (server-ready? host port) ;; server-address is host:port
  ;; ping the server and ask it
  ;; if it ready
  (let* ((sdat (servdat-init #f host port #f)))
    (http-transport:send-receive sdat 'ping '())))

;; from the pkts return servers associated with dbpath
;; NOTE: Only one can be alive - have to check on each
;;       in the list of pkts returned
;;
(define (get-viable-servers serv-pkts dbpath)
  (let loop ((tail serv-pkts)
497
498
499
500
501
502
503


504
505
506
507
508
509
510
511
512
;; from viable servers get one that is alive and ready
;;
(define (get-the-server serv-pkts dbpath)
  (let loop ((tail serv-pkts))
    (if (null? tail)
	#f
	(let* ((spkt (car tail))


	       (addr (server-address spkt)))
	  (if (server-ready? addr)
	      spkt
	      (loop (cdr tail)))))))

;; am I the "first" in line server? I.e. my D card is smallest
;; use Z card as tie breaker
;;
(define (get-best-candidate serv-pkts dbpath)







>
>

|







492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
;; from viable servers get one that is alive and ready
;;
(define (get-the-server serv-pkts dbpath)
  (let loop ((tail serv-pkts))
    (if (null? tail)
	#f
	(let* ((spkt (car tail))
	       (host (alist-ref 'host spkt))
	       (port (alist-ref 'port spkt))
	       (addr (server-address spkt)))
	  (if (server-ready? host port)
	      spkt
	      (loop (cdr tail)))))))

;; am I the "first" in line server? I.e. my D card is smallest
;; use Z card as tie breaker
;;
(define (get-best-candidate serv-pkts dbpath)
576
577
578
579
580
581
582

583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
		     (viables      (get-viable-servers all-pkts db-file))
		     (best-srv     (get-best-candidate viables db-file))
		     (best-srv-key (if best-srv (alist-ref 'servkey best-srv) #f)))
		(debug:print 0 *default-log-port* "best-srv-key: "best-srv-key", server-key: "server-key)
		;; am I the best-srv, compare server-keys to know
		(if (equal? best-srv-key server-key)
		    (if (get-lock-db sdat db-file) ;; (db:get-iam-server-lock *dbstruct-db* *toppath* run-id)

			(debug:print 0 *default-log-port* "I'm the server!")
			(servdat-dbfile-set! sdat db-file))
		    (begin
		      (debug:print 0 *default-log-port* "I'm not the server, exiting.")
		      (bdat-time-to-exit-set! *bdat* #t)
		      (thread-sleep! 0.2)
		      (exit)))
		(begin
		  (debug:print 0 *default-log-port*
			       "Keys do not match "best-srv-key", "server-key", exiting.")
		  (bdat-time-to-exit-set! *bdat* #t)
		  (thread-sleep! 0.2)
		  (exit)))
	      sdat))
	(begin
	  (debug:print-info 0 *default-log-port* "Still waiting, last-sdat=" last-sdat)
	  (sleep 4)
	  (if (> (- (current-seconds) start-time) 120) ;; been waiting for two minutes
	      (begin
		(debug:print-error 0 *default-log-port* "transport appears to have died, exiting server")
		(exit))
	      (loop start-time
		    (equal? sdat last-sdat)
		    sdat)))))))

;; run http-transport:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (http-transport:keep-running dbname) 
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown







>
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|







573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
		     (viables      (get-viable-servers all-pkts db-file))
		     (best-srv     (get-best-candidate viables db-file))
		     (best-srv-key (if best-srv (alist-ref 'servkey best-srv) #f)))
		(debug:print 0 *default-log-port* "best-srv-key: "best-srv-key", server-key: "server-key)
		;; am I the best-srv, compare server-keys to know
		(if (equal? best-srv-key server-key)
		    (if (get-lock-db sdat db-file) ;; (db:get-iam-server-lock *dbstruct-db* *toppath* run-id)
			(begin
			  (debug:print 0 *default-log-port* "I'm the server!")
			  (servdat-dbfile-set! sdat db-file))
			(begin
			  (debug:print 0 *default-log-port* "I'm not the server, exiting.")
			  (bdat-time-to-exit-set! *bdat* #t)
			  (thread-sleep! 0.2)
			  (exit)))
		    (begin
		      (debug:print 0 *default-log-port*
				   "Keys do not match "best-srv-key", "server-key", exiting.")
		      (bdat-time-to-exit-set! *bdat* #t)
		      (thread-sleep! 0.2)
		      (exit)))
		sdat))
	    (begin ;; sdat not yet contains server info
	      (debug:print-info 0 *default-log-port* "Still waiting, last-sdat=" last-sdat)
	      (sleep 4)
	      (if (> (- (current-seconds) start-time) 120) ;; been waiting for two minutes
		  (begin
		    (debug:print-error 0 *default-log-port* "transport appears to have died, exiting server")
		    (exit))
		  (loop start-time
			(equal? sdat last-sdat)
			sdat))))))))

;; run http-transport:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (http-transport:keep-running dbname) 
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown