Megatest

Changes On Branch 27b30355f5fa8b0a
Login

Changes In Branch v1.62-rpc Through [27b30355f5] Excluding Merge-Ins

This is equivalent to a diff from bff9d56983 to 27b30355f5

2018-04-18
00:21
No idea what this was. Commiting just in case it is interesting ... Leaf check-in: 33160354bc user: matt tags: dunno
2016-12-08
09:55
Merged in fix for wrong num params check-in: 3b6ab0c796 user: mrwellan tags: v1.62-rpc
2016-12-07
21:19
fixed stackdump at end of megatest (was doing http close all connections even if using rpc transport) check-in: 27b30355f5 user: bjbarcla tags: v1.62-rpc
20:53
merged v1.63 check-in: e25c16ede1 user: bjbarcla tags: v1.62-rpc
2016-12-05
13:11
Bumped version to v1.6301 check-in: fbf0a07a1d user: mrwellan tags: v1.63, v1.6301
12:58
Protected calls to expensive ping with calls to cheap server:read-dotserver. This appears to 100% the run-away pings problem Closed-Leaf check-in: bff9d56983 user: mrwellan tags: v1.62-no-rpc
11:05
Correct expiration of server connections check-in: b168adb943 user: mrwellan tags: v1.62-no-rpc

Modified client.scm from [50265f350f] to [e8cb4a6c94].

44
45
46
47
48
49
50
51


52
53
54
55


56


57
58
59
60
61
62
63
64
    ok))

(define (client:connect iface port)
  (case (server:get-transport)
    ((rpc)  (rpc:client-connect  iface port))
    ((http) (http:client-connect iface port))
    ((zmq)  (zmq:client-connect  iface port))
    (else   (rpc:client-connect  iface port))))



(define (client:setup  run-id #!key (remaining-tries 10) (failed-connects 0))
  (case (server:get-transport)
    ((rpc) (rpc-transport:client-setup run-id remaining-tries: remaining-tries failed-connects: failed-connects)) ;;(client:setup-rpc run-id))


    ((http)(client:setup-http run-id remaining-tries: remaining-tries failed-connects: failed-connects))


    (else  (rpc-transport:client-setup run-id remaining-tries: remaining-tries failed-connects: failed-connects)))) ;; (client:setup-rpc run-id))))

;; (define (client:login-no-auto-setup server-info run-id)
;;   (case (server:get-transport)
;;     ((rpc)  (rpc:login-no-auto-client-setup server-info run-id))
;;     ((http) (rmt:login-no-auto-client-setup server-info run-id))
;;     (else   (rpc:login-no-auto-client-setup server-info run-id))))
;; 







|
>
>



|
>
>

>
>
|







44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
    ok))

(define (client:connect iface port)
  (case (server:get-transport)
    ((rpc)  (rpc:client-connect  iface port))
    ((http) (http:client-connect iface port))
    ((zmq)  (zmq:client-connect  iface port))
    (else
     (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (5)")
     (exit))))

(define (client:setup  run-id #!key (remaining-tries 10) (failed-connects 0))
  (case (server:get-transport)
    ((rpc) (let ((res (client:setup-rpc run-id remaining-tries: remaining-tries)))
             (remote-conndat-set! *runremote* res)
             res))
    ((http)(client:setup-http run-id remaining-tries: remaining-tries failed-connects: failed-connects))
    (else
     (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (6)")
     (exit)))) ;; (client:setup-rpc run-id))))

;; (define (client:login-no-auto-setup server-info run-id)
;;   (case (server:get-transport)
;;     ((rpc)  (rpc:login-no-auto-client-setup server-info run-id))
;;     ((http) (rmt:login-no-auto-client-setup server-info run-id))
;;     (else   (rpc:login-no-auto-client-setup server-info run-id))))
;; 
150
151
152
153
154
155
156




















157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
;;   2. We are a run tests, list runs or other interactive process and we must figure out
;;      *transport-type* and *runremote* from the monitor.db
;;
;; client:setup
;;
;; lookup_server, need to remove *runremote* stuff
;;




















(define (client:setup-http run-id #!key (remaining-tries 10) (failed-connects 0))
  (debug:print-info 2 *default-log-port* "client:setup remaining-tries=" remaining-tries)
  (let* ((tdbdat (tasks:open-db)))
    (if (<= remaining-tries 0)
	(begin
	  (debug:print-error 0 *default-log-port* "failed to start or connect to server for run-id " run-id)
	  (exit 1))
	(let* ((server-dat (tasks:get-server (db:delay-if-busy tdbdat) run-id)))
	  (debug:print-info 4 *default-log-port* "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries)
	  (if server-dat
	      (let* ((iface     (tasks:hostinfo-get-interface server-dat))
		     (hostname  (tasks:hostinfo-get-hostname  server-dat))
		     (port      (tasks:hostinfo-get-port      server-dat))
		     (start-res (case *transport-type*
				  ((http)(http-transport:client-connect iface port))
				  ;;((nmsg)(nmsg-transport:client-connect hostname port))
                                  ))
		     (ping-res  (case *transport-type* 
				  ((http)(rmt:login-no-auto-client-setup start-res))
				  ;; ((nmsg)(let ((logininfo (rmt:login-no-auto-client-setup start-res run-id)))
 				  ;;          (if logininfo
 				  ;;              (car (vector-ref logininfo 1))
 				  ;;              #f)))
                                  
                                  )))
		(if (and start-res
			 ping-res)
		    (begin
		      (remote-conndat-set! *runremote* start-res) ;; (hash-table-set! *runremote* run-id start-res)
		      (debug:print-info 2 *default-log-port* "connected to " (http-transport:server-dat-make-url start-res))
		      start-res)
		    (begin    ;; login failed but have a server record, clean out the record and try again







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>













<
|
<
<
<
|
<
<
<
<
<
<







156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195

196



197






198
199
200
201
202
203
204
;;   2. We are a run tests, list runs or other interactive process and we must figure out
;;      *transport-type* and *runremote* from the monitor.db
;;
;; client:setup
;;
;; lookup_server, need to remove *runremote* stuff
;;

(define (client:setup-rpc run-id #!key (remaining-tries 10) (failed-connects 0))
  (debug:print-info 2 *default-log-port* "client:setup-rpc remaining-tries=" remaining-tries)
  (let* ((server-dat (tasks:get-server (db:delay-if-busy (tasks:open-db)) run-id))
         (num-available (tasks:num-in-available-state (db:delay-if-busy (tasks:open-db)) run-id)))
    (cond
     ((<= remaining-tries 0)
      (debug:print-error 0 *default-log-port* "failed to start or connect to server for run-id " run-id)
      (exit 1))
     (server-dat
      (debug:print-info 4 *default-log-port* "client:setup-rpc server-dat=" server-dat ", remaining-tries=" remaining-tries)

      (rpc-transport:client-setup run-id server-dat remaining-tries: remaining-tries))
     (else
      (if (< num-available 2)
          (server:try-running run-id))
      (thread-sleep! (+ 2 (random (- 20 remaining-tries)))) ;; give server a little time to start up, randomize a little to avoid start storms.
      (client:setup run-id remaining-tries: (- remaining-tries 1))))))


(define (client:setup-http run-id #!key (remaining-tries 10) (failed-connects 0))
  (debug:print-info 2 *default-log-port* "client:setup remaining-tries=" remaining-tries)
  (let* ((tdbdat (tasks:open-db)))
    (if (<= remaining-tries 0)
	(begin
	  (debug:print-error 0 *default-log-port* "failed to start or connect to server for run-id " run-id)
	  (exit 1))
	(let* ((server-dat (tasks:get-server (db:delay-if-busy tdbdat) run-id)))
	  (debug:print-info 4 *default-log-port* "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries)
	  (if server-dat
	      (let* ((iface     (tasks:hostinfo-get-interface server-dat))
		     (hostname  (tasks:hostinfo-get-hostname  server-dat))
		     (port      (tasks:hostinfo-get-port      server-dat))

		     (start-res (http-transport:client-connect iface port))



		     (ping-res  (rmt:login-no-auto-client-setup start-res)))






		(if (and start-res
			 ping-res)
		    (begin
		      (remote-conndat-set! *runremote* start-res) ;; (hash-table-set! *runremote* run-id start-res)
		      (debug:print-info 2 *default-log-port* "connected to " (http-transport:server-dat-make-url start-res))
		      start-res)
		    (begin    ;; login failed but have a server record, clean out the record and try again
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
		(let ((num-available (tasks:num-in-available-state (db:dbdat-get-db tdbdat) run-id)))
		  (debug:print-info 0 *default-log-port* "client:setup, no server registered, remaining-tries=" remaining-tries " num-available=" num-available)
		  (if (< num-available 2)
		      (server:try-running run-id))
		  (thread-sleep! (+ 5 (random (- 20 remaining-tries))))  ;; give server a little time to start up, randomize a little to avoid start storms.
		  (client:setup run-id remaining-tries: (- remaining-tries 1)))))))))

;; keep this as a function to ease future 
(define (client:start run-id server-info)
  (http-transport:client-connect (tasks:hostinfo-get-interface server-info)
				 (tasks:hostinfo-get-port server-info)))

;; ;; client:signal-handler
;; (define (client:signal-handler signum)
;;   (signal-mask! signum)







|







223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
		(let ((num-available (tasks:num-in-available-state (db:dbdat-get-db tdbdat) run-id)))
		  (debug:print-info 0 *default-log-port* "client:setup, no server registered, remaining-tries=" remaining-tries " num-available=" num-available)
		  (if (< num-available 2)
		      (server:try-running run-id))
		  (thread-sleep! (+ 5 (random (- 20 remaining-tries))))  ;; give server a little time to start up, randomize a little to avoid start storms.
		  (client:setup run-id remaining-tries: (- remaining-tries 1)))))))))

;; keep this as a function to ease future ;; this is unused, not porting for rpc -BB
(define (client:start run-id server-info)
  (http-transport:client-connect (tasks:hostinfo-get-interface server-info)
				 (tasks:hostinfo-get-port server-info)))

;; ;; client:signal-handler
;; (define (client:signal-handler signum)
;;   (signal-mask! signum)

Modified common.scm from [a3fcacf886] to [435cf851b0].

16
17
18
19
20
21
22


23
24
25
26
27
28
29

(import (prefix sqlite3 sqlite3:))
(import (prefix base64 base64:))

(declare (unit common))

(include "common_records.scm")



;; (require-library margs)
;; (include "margs.scm")

;; (define old-exit exit)
;; 
;; (define (exit . code)







>
>







16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

(import (prefix sqlite3 sqlite3:))
(import (prefix base64 base64:))

(declare (unit common))

(include "common_records.scm")
(include "thunk-utils.scm")


;; (require-library margs)
;; (include "margs.scm")

;; (define old-exit exit)
;; 
;; (define (exit . code)
58
59
60
61
62
63
64






65


66


67

68
69
70
71
72
73
74
  (mutex-lock! *context-mutex*)
  (let ((cxt (hash-table-ref/default *contexts* toppath #f)))
    (if (not cxt)
        (set! cxt (let ((x (make-cxt)))(hash-table-set! *contexts* toppath x) x)))
    (let ((cxt-mutex (cxt-mutex cxt)))
      (mutex-unlock! *context-mutex*)
      (mutex-lock! cxt-mutex)






      (let ((res (proc cxt)))


        (mutex-unlock! cxt-mutex)


        res))))

        
(define *db-keys* #f)

(define *configinfo*   #f)   ;; raw results from setup, includes toppath and table from megatest.config
(define *runconfigdat* #f)   ;; run configs data
(define *configdat*    #f)   ;; megatest.config data
(define *configstatus* #f)   ;; status of data; 'fulldata : all processing done, #f : no data yet, 'partialdata : partial read done







>
>
>
>
>
>
|
>
>
|
>
>
|
>







60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
  (mutex-lock! *context-mutex*)
  (let ((cxt (hash-table-ref/default *contexts* toppath #f)))
    (if (not cxt)
        (set! cxt (let ((x (make-cxt)))(hash-table-set! *contexts* toppath x) x)))
    (let ((cxt-mutex (cxt-mutex cxt)))
      (mutex-unlock! *context-mutex*)
      (mutex-lock! cxt-mutex)
      ;; here we guard proc with exception handler so
      ;; no matter how proc succeeds or fails,
      ;; the cxt-mutex will be unlocked afterward.
      (let* ((EXCEPTION-SYMBOL (gensym)) ;; use a generated symbol
             (guarded-proc               ;; to avoid collision
              (lambda args
                (let* ((res (condition-case
                             (apply proc args)
                             [x () (cons EXCEPTION-SYMBOL x)])))
                 (mutex-unlock! cxt-mutex)
                 (if (and (pair? res) (eq? (car res) EXCEPTION))
                     (abort (cdr res))
                     res)))))
        (guarded-proc cxt)))))
        
(define *db-keys* #f)

(define *configinfo*   #f)   ;; raw results from setup, includes toppath and table from megatest.config
(define *runconfigdat* #f)   ;; run configs data
(define *configdat*    #f)   ;; megatest.config data
(define *configstatus* #f)   ;; status of data; 'fulldata : all processing done, #f : no data yet, 'partialdata : partial read done
100
101
102
103
104
105
106
107










108
109
110
111
112
113
114
(define *task-db*             #f) ;; (vector db path-to-db)
(define *db-access-allowed*   #t) ;; flag to allow access
(define *db-access-mutex*     (make-mutex))
(define *db-cache-path*       #f)

;; SERVER
(define *my-client-signature* #f)
(define *transport-type*    'http)             ;; override with [server] transport http|rpc|nmsg










(define *runremote*         #f)                ;; if set up for server communication this will hold <host port>
(define *max-cache-size*    0)
(define *logged-in-clients* (make-hash-table))
(define *server-id*         #f)
(define *server-info*       #f)
(define *time-to-exit*      #f)
(define *server-run*        #t)







|
>
>
>
>
>
>
>
>
>
>







113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
(define *task-db*             #f) ;; (vector db path-to-db)
(define *db-access-allowed*   #t) ;; flag to allow access
(define *db-access-mutex*     (make-mutex))
(define *db-cache-path*       #f)

;; SERVER
(define *my-client-signature* #f)
(define *transport-type*  #f)             ;; override with [server] transport http|rpc|nmsg

(define (common:set-transport-type)
  (set! *transport-type*
        (string->symbol
         (or
          (args:get-arg "-transport")
          (configf:lookup *configdat* "server" "transport")
          "http")))
  *transport-type*)
  
(define *runremote*         #f)                ;; if set up for server communication this will hold <host port>
(define *max-cache-size*    0)
(define *logged-in-clients* (make-hash-table))
(define *server-id*         #f)
(define *server-info*       #f)
(define *time-to-exit*      #f)
(define *server-run*        #t)
1071
1072
1073
1074
1075
1076
1077






















































1078
1079
1080
1081
1082
1083
1084
      (map (lambda (res)
	     (if (eof-object? res) 9e99 res))
	   (with-input-from-pipe 
	    (conc "ssh " remote-host " cat /proc/loadavg")
	    (lambda ()(list (read)(read)(read)))))
      (with-input-from-file "/proc/loadavg" 
	(lambda ()(list (read)(read)(read))))))























































(define (common:wait-for-cpuload maxload numcpus waitdelay #!key (count 1000) (msg #f)(remote-host #f))
  (let* ((loadavg (common:get-cpu-load remote-host))
	 (first   (car loadavg))
	 (next    (cadr loadavg))
	 (adjload (* maxload numcpus))
	 (loadjmp (- first next)))







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
      (map (lambda (res)
	     (if (eof-object? res) 9e99 res))
	   (with-input-from-pipe 
	    (conc "ssh " remote-host " cat /proc/loadavg")
	    (lambda ()(list (read)(read)(read)))))
      (with-input-from-file "/proc/loadavg" 
	(lambda ()(list (read)(read)(read))))))

;; get normalized cpu load by reading from /proc/loadavg and /proc/cpuinfo return all three values and the number of real cpus and the number of threads
;; returns list (normalized-proc-load normalized-core-load 1m 5m 15m ncores nthreads)
;;
(define (common:get-normalized-cpu-load remote-host)
  (let ((data (if remote-host
                  (with-input-from-pipe 
                   (conc "ssh " remote-host " cat /proc/loadavg;cat /proc/cpuinfo;echo end")
                   read-lines)
                  (append 
                   (with-input-from-file "/proc/loadavg" 
                     read-lines)
                   (with-input-from-file "/proc/cpuinfo"
                     read-lines)
                   (list "end"))))
        (load-rx  (regexp "^([\\d\\.]+)\\s+([\\d\\.]+)\\s+([\\d\\.]+)\\s+.*$"))
        (proc-rx  (regexp "^processor\\s+:\\s+(\\d+)\\s*$"))
        (core-rx  (regexp "^core id\\s+:\\s+(\\d+)\\s*$"))
        (phys-rx  (regexp "^physical id\\s+:\\s+(\\d+)\\s*$"))
        (max-num  (lambda (p n)(max (string->number p) n))))
    ;; (print "data=" data)
    (if (null? data) ;; something went wrong
        #f
        (let loop ((hed      (car data))
                   (tal      (cdr data))
                   (loads    #f)
                   (proc-num 0)  ;; processor includes threads
                   (phys-num 0)  ;; physical chip on motherboard
                   (core-num 0)) ;; core
          ;; (print hed ", " loads ", " proc-num ", " phys-num ", " core-num)
          (if (null? tal) ;; have all our data, calculate normalized load and return result
              (let* ((act-proc (+ proc-num 1))
                     (act-phys (+ phys-num 1))
                     (act-core (+ core-num 1))
                     (adj-proc-load (/ (car loads) act-proc))
                     (adj-core-load (/ (car loads) act-core)))
                (append (list (cons 'adj-proc-load adj-proc-load)
                              (cons 'adj-core-load adj-core-load))
                        (list (cons '1m-load (car loads))
                              (cons '5m-load (cadr loads))
                              (cons '15m-load (caddr loads)))
                        (list (cons 'proc act-proc)
                              (cons 'core act-core)
                              (cons 'phys act-phys))))
              (regex-case
               hed
               (load-rx  ( x l1 l5 l15 ) (loop (car tal)(cdr tal)(map string->number (list l1 l5 l15)) proc-num phys-num core-num))
               (proc-rx  ( x p         ) (loop (car tal)(cdr tal) loads           (max-num p proc-num) phys-num core-num))
               (phys-rx  ( x p         ) (loop (car tal)(cdr tal) loads           proc-num (max-num p phys-num) core-num))
               (core-rx  ( x c         ) (loop (car tal)(cdr tal) loads           proc-num phys-num (max-num c core-num)))
               (else 
                (begin
                  ;; (print "NO MATCH: " hed)
                  (loop (car tal)(cdr tal) loads proc-num phys-num core-num)))))))))

(define (common:wait-for-cpuload maxload numcpus waitdelay #!key (count 1000) (msg #f)(remote-host #f))
  (let* ((loadavg (common:get-cpu-load remote-host))
	 (first   (car loadavg))
	 (next    (cadr loadavg))
	 (adjload (* maxload numcpus))
	 (loadjmp (- first next)))

Modified db.scm from [b74c06eb1c] to [c23c904f85].

1497
1498
1499
1500
1501
1502
1503
1504
1505
1506

1507
1508
1509
1510
1511
1512
1513
	   (min-incompleted-ids (map car incompleted)) ;; do 'em all
	   (all-ids             (append min-incompleted-ids (map car oldlaunched))))
      (if (> (length all-ids) 0)
	  (begin
	    (debug:print 0 *default-log-port* "WARNING: Marking test(s); " (string-intersperse (map conc all-ids) ", ") " as INCOMPLETE")
	    (sqlite3:execute 
	     db
	     (conc "UPDATE tests SET state='INCOMPLETE' WHERE id IN (" 
		   (string-intersperse (map conc all-ids) ",")
		   ");")))))


    ;; Now do rollups for the toplevel tests
    ;;
    ;; (db:delay-if-busy dbdat)
    (for-each
     (lambda (toptest)
       (let ((test-name (list-ref toptest 3)))







|

|
>







1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
	   (min-incompleted-ids (map car incompleted)) ;; do 'em all
	   (all-ids             (append min-incompleted-ids (map car oldlaunched))))
      (if (> (length all-ids) 0)
	  (begin
	    (debug:print 0 *default-log-port* "WARNING: Marking test(s); " (string-intersperse (map conc all-ids) ", ") " as INCOMPLETE")
	    (sqlite3:execute 
	     db
	     (conc "UPDATE tests SET state='INCOMPLETE' WHERE run_id=? AND id IN (" 
		   (string-intersperse (map conc all-ids) ",")
		   ");")
             run-id))))

    ;; Now do rollups for the toplevel tests
    ;;
    ;; (db:delay-if-busy dbdat)
    (for-each
     (lambda (toptest)
       (let ((test-name (list-ref toptest 3)))
3308
3309
3310
3311
3312
3313
3314

3315
3316
3317
3318
3319
3320
3321
	;; '(test-set-rundir         "UPDATE tests SET rundir=? AND testname=? AND item_path=?;") ;; DONE
	'(test-set-rundir-shortdir "UPDATE tests SET rundir=?,shortdir=? WHERE testname=? AND item_path=? AND run_id=?;")    ;; BROKEN!!! NEEDS run-id
	'(delete-tests-in-state   ;; "DELETE FROM tests WHERE state=?;")                  ;; DONE
	  "UPDATE tests SET state='DELETED' WHERE state=?")
	'(tests:test-set-toplog   "UPDATE tests SET final_logf=? WHERE run_id=? AND testname=? AND item_path='';")
	'(update-cpuload-diskfree "UPDATE tests SET cpuload=?,diskfree=? WHERE id=?;") ;; DONE
	'(update-uname-host       "UPDATE tests SET uname=?,host=? WHERE id=?;")       ;; DONE

	'(update-test-state       "UPDATE tests SET state=? WHERE state=? AND run_id=? AND testname=? AND NOT (item_path='' AND testname IN (SELECT DISTINCT testname FROM tests WHERE testname=? AND item_path != ''));")
	'(update-test-status      "UPDATE tests SET status=? WHERE status like ? AND run_id=? AND testname=? AND NOT (item_path='' AND testname IN (SELECT DISTINCT testname FROM tests WHERE testname=? AND item_path != ''));")
	;; stuff for roll-up-pass-fail-counts
	'(update-pass-fail-counts "UPDATE tests 
             SET fail_count=(SELECT count(id) FROM tests WHERE testname=? AND item_path != '' AND status IN ('FAIL','CHECK','INCOMPLETE','ABORT')),
                 pass_count=(SELECT count(id) FROM tests WHERE testname=? AND item_path != '' AND status IN ('PASS','WARN','WAIVED'))
             WHERE testname=? AND item_path='' AND run_id=?;") ;; DONE  ;; BROKEN!!! NEEDS run-id







>







3309
3310
3311
3312
3313
3314
3315
3316
3317
3318
3319
3320
3321
3322
3323
	;; '(test-set-rundir         "UPDATE tests SET rundir=? AND testname=? AND item_path=?;") ;; DONE
	'(test-set-rundir-shortdir "UPDATE tests SET rundir=?,shortdir=? WHERE testname=? AND item_path=? AND run_id=?;")    ;; BROKEN!!! NEEDS run-id
	'(delete-tests-in-state   ;; "DELETE FROM tests WHERE state=?;")                  ;; DONE
	  "UPDATE tests SET state='DELETED' WHERE state=?")
	'(tests:test-set-toplog   "UPDATE tests SET final_logf=? WHERE run_id=? AND testname=? AND item_path='';")
	'(update-cpuload-diskfree "UPDATE tests SET cpuload=?,diskfree=? WHERE id=?;") ;; DONE
	'(update-uname-host       "UPDATE tests SET uname=?,host=? WHERE id=?;")       ;; DONE
        '(update-test-rundat      "INSERT INTO test_rundat (test_id,update_time,cpuload,diskfree,diskusage,run_duration) VALUES (?,?,?,?,?,?);")
	'(update-test-state       "UPDATE tests SET state=? WHERE state=? AND run_id=? AND testname=? AND NOT (item_path='' AND testname IN (SELECT DISTINCT testname FROM tests WHERE testname=? AND item_path != ''));")
	'(update-test-status      "UPDATE tests SET status=? WHERE status like ? AND run_id=? AND testname=? AND NOT (item_path='' AND testname IN (SELECT DISTINCT testname FROM tests WHERE testname=? AND item_path != ''));")
	;; stuff for roll-up-pass-fail-counts
	'(update-pass-fail-counts "UPDATE tests 
             SET fail_count=(SELECT count(id) FROM tests WHERE testname=? AND item_path != '' AND status IN ('FAIL','CHECK','INCOMPLETE','ABORT')),
                 pass_count=(SELECT count(id) FROM tests WHERE testname=? AND item_path != '' AND status IN ('PASS','WARN','WAIVED'))
             WHERE testname=? AND item_path='' AND run_id=?;") ;; DONE  ;; BROKEN!!! NEEDS run-id

Modified http-transport.scm from [4d8eecbf3a] to [67f16e95fd].

82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
				      (res #f))
				 (cond
				  ((equal? (uri-path (request-uri (current-request)))
					   '(/ "api"))
				   (send-response body:    (api:process-request *dbstruct-db* $) ;; the $ is the request vars proc
						  headers: '((content-type text/plain)))
				   (mutex-lock! *heartbeat-mutex*)
				   (set! *db-last-access* (current-seconds))
				   (mutex-unlock! *heartbeat-mutex*))
				  ((equal? (uri-path (request-uri (current-request))) 
					   '(/ ""))
				   (send-response body: (http-transport:main-page)))
				  ((equal? (uri-path (request-uri (current-request))) 
					   '(/ "json_api"))
				   (send-response body: (http-transport:main-page)))







|







82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
				      (res #f))
				 (cond
				  ((equal? (uri-path (request-uri (current-request)))
					   '(/ "api"))
				   (send-response body:    (api:process-request *dbstruct-db* $) ;; the $ is the request vars proc
						  headers: '((content-type text/plain)))
				   (mutex-lock! *heartbeat-mutex*)
				   (set! *last-db-access* (current-seconds))
				   (mutex-unlock! *heartbeat-mutex*))
				  ((equal? (uri-path (request-uri (current-request))) 
					   '(/ ""))
				   (send-response body: (http-transport:main-page)))
				  ((equal? (uri-path (request-uri (current-request))) 
					   '(/ "json_api"))
				   (send-response body: (http-transport:main-page)))
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
;;
;; connect
;;
(define (http-transport:client-connect iface port)
  (let* ((api-url      (conc "http://" iface ":" port "/api"))
	 (api-uri      (uri-reference (conc "http://" iface ":" port "/api")))
	 (api-req      (make-request method: 'POST uri: api-uri))
	 (server-dat   (vector iface port api-uri api-url api-req (current-seconds))))
    server-dat))

;; run http-transport:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (http-transport:keep-running server-id run-id)
  ;; if none running or if > 20 seconds since 







|







333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
;;
;; connect
;;
(define (http-transport:client-connect iface port)
  (let* ((api-url      (conc "http://" iface ":" port "/api"))
	 (api-uri      (uri-reference (conc "http://" iface ":" port "/api")))
	 (api-req      (make-request method: 'POST uri: api-uri))
	 (server-dat   (vector iface port api-uri api-url api-req (current-seconds) 'http)))
    server-dat))

;; run http-transport:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (http-transport:keep-running server-id run-id)
  ;; if none running or if > 20 seconds since 
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
		      (begin
			(tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "dbprep")
			(thread-sleep! 0.5) ;; give some margin for queries to complete before switching from file based access to server based access
			(set! *dbstruct-db*  (db:setup)) ;;  run-id))
			(set! server-going #t)
			(tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running")
			(server:write-dotserver *toppath* (conc iface ":" port))
			(delete-file* (conc *toppath* "/.starting-server")))
		      (begin ;; gotta exit nicely
			(tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "collision")
			(http-transport:server-shutdown server-id port))))))

      ;; when things go wrong we don't want to be doing the various queries too often
      ;; so we strive to run this stuff only every four seconds or so.
      (let* ((sync-time (- (current-milliseconds) start-time))
	    (rem-time  (quotient (- 4000 sync-time) 1000)))







|
|







396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
		      (begin
			(tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "dbprep")
			(thread-sleep! 0.5) ;; give some margin for queries to complete before switching from file based access to server based access
			(set! *dbstruct-db*  (db:setup)) ;;  run-id))
			(set! server-going #t)
			(tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running")
			(server:write-dotserver *toppath* (conc iface ":" port))
                        (server:dotserver-starting-remove))
                      (begin ;; gotta exit nicely
			(tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "collision")
			(http-transport:server-shutdown server-id port))))))

      ;; when things go wrong we don't want to be doing the various queries too often
      ;; so we strive to run this stuff only every four seconds or so.
      (let* ((sync-time (- (current-milliseconds) start-time))
	    (rem-time  (quotient (- 4000 sync-time) 1000)))
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
      (if (or (not (equal? sdat (list iface port)))
	      (not server-id))
	  (begin 
	    (debug:print-info 0 *default-log-port* "interface changed, refreshing iface and port info")
	    (set! iface (car sdat))
	    (set! port  (cadr sdat))))
      
      ;; Transfer *db-last-access* to last-access to use in checking that we are still alive
      (mutex-lock! *heartbeat-mutex*)
      (set! last-access *db-last-access*)
      (mutex-unlock! *heartbeat-mutex*)

      ;; (debug:print 11 *default-log-port* "last-access=" last-access ", server-timeout=" server-timeout)
      ;;
      ;; no_traffic, no running tests, if server 0, no running servers
      ;;
      ;; (let ((wait-on-running (configf:lookup *configdat* "server" b"wait-on-running"))) ;; wait on running tasks (if not true then exit on time out)







|

|







424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
      (if (or (not (equal? sdat (list iface port)))
	      (not server-id))
	  (begin 
	    (debug:print-info 0 *default-log-port* "interface changed, refreshing iface and port info")
	    (set! iface (car sdat))
	    (set! port  (cadr sdat))))
      
      ;; Transfer *last-db-access* to last-access to use in checking that we are still alive
      (mutex-lock! *heartbeat-mutex*)
      (set! last-access *last-db-access*)
      (mutex-unlock! *heartbeat-mutex*)

      ;; (debug:print 11 *default-log-port* "last-access=" last-access ", server-timeout=" server-timeout)
      ;;
      ;; no_traffic, no running tests, if server 0, no running servers
      ;;
      ;; (let ((wait-on-running (configf:lookup *configdat* "server" b"wait-on-running"))) ;; wait on running tasks (if not true then exit on time out)
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557

558
559
560
561
562
563
564
    (exit)))

;; all routes though here end in exit ...
;;
;; start_server? 
;;
(define (http-transport:launch run-id)
  (with-output-to-file
      (conc *toppath* "/.starting-server")
    (lambda ()
      (print (current-process-id) " on " (get-host-name))))
  (let* ((tdbdat (tasks:open-db)))
    (set! *run-id*   run-id)
    (if (args:get-arg "-daemonize")
	(begin
	  (daemon:ize)
	  (if *alt-log-file* ;; we should re-connect to this port, I think daemon:ize disrupts it
	      (begin
		(current-error-port *alt-log-file*)
		(current-output-port *alt-log-file*)))))
    (if (and (server:read-dotserver *toppath*)
             (server:check-if-running run-id))
	(begin
	  (debug:print 0 *default-log-port* "INFO: Server for run-id " run-id " already running")
	  (exit 0))
	(begin ;; ok, no server detected, clean out any lingering records
	   (tasks:server-force-clean-running-records-for-run-id  (db:delay-if-busy tdbdat) run-id "notresponding")))
    (let loop ((server-id (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id))
	       (remtries  4))
      (if (not server-id)
	  (if (> remtries 0)
	      (begin
		(thread-sleep! 2)
		(loop (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id)
		      (- remtries 1)))
	      (begin
		;; since we didn't get the server lock we are going to clean up and bail out
		(debug:print-info 2 *default-log-port* "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue")
		(tasks:server-delete-records-for-this-pid (db:delay-if-busy tdbdat) " http-transport:launch")
		(delete-file* (conc *toppath* "/.starting-server"))

		))
	  (let* ((th2 (make-thread (lambda ()
				     (debug:print-info 0 *default-log-port* "Server run thread started")
				     (http-transport:run 
				      (if (args:get-arg "-server")
					  (args:get-arg "-server")
					  "-")







<
|
<
<
















|





|





|
>







518
519
520
521
522
523
524

525


526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
    (exit)))

;; all routes though here end in exit ...
;;
;; start_server? 
;;
(define (http-transport:launch run-id)

  (server:dotserver-starting)


  (let* ((tdbdat (tasks:open-db)))
    (set! *run-id*   run-id)
    (if (args:get-arg "-daemonize")
	(begin
	  (daemon:ize)
	  (if *alt-log-file* ;; we should re-connect to this port, I think daemon:ize disrupts it
	      (begin
		(current-error-port *alt-log-file*)
		(current-output-port *alt-log-file*)))))
    (if (and (server:read-dotserver *toppath*)
             (server:check-if-running run-id))
	(begin
	  (debug:print 0 *default-log-port* "INFO: Server for run-id " run-id " already running")
	  (exit 0))
	(begin ;; ok, no server detected, clean out any lingering records
	   (tasks:server-force-clean-running-records-for-run-id  (db:delay-if-busy tdbdat) run-id "notresponding")))
    (let loop ((server-id (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id 'http))
	       (remtries  4))
      (if (not server-id)
	  (if (> remtries 0)
	      (begin
		(thread-sleep! 2)
		(loop (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id 'http)
		      (- remtries 1)))
	      (begin
		;; since we didn't get the server lock we are going to clean up and bail out
		(debug:print-info 2 *default-log-port* "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue")
		(tasks:server-delete-records-for-this-pid (db:delay-if-busy tdbdat) " http-transport:launch")
                
                (server:dotserver-starting-remove)
		))
	  (let* ((th2 (make-thread (lambda ()
				     (debug:print-info 0 *default-log-port* "Server run thread started")
				     (http-transport:run 
				      (if (args:get-arg "-server")
					  (args:get-arg "-server")
					  "-")
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
	       " ms</td></tr>"
	       "<tr><td>Number non-cached queries</td> <td>"  *number-non-write-queries* "</td></tr>"
	       "<tr><td>Average non-cached time</td>   <td>" (if (eq? *number-non-write-queries* 0)
								 "n/a (no queries)"
								 (/ *total-non-write-delay* 
								    *number-non-write-queries*))
	       " ms</td></tr>"
	       "<tr><td>Last access</td><td>"              (seconds->time-string *db-last-access*) "</td></tr>"
	       "</table>")))
    (mutex-unlock! *heartbeat-mutex*)
    res))

(define (http-transport:runs linkpath)
  (conc "<h3>Runs</h3>"
	(string-intersperse







|







634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
	       " ms</td></tr>"
	       "<tr><td>Number non-cached queries</td> <td>"  *number-non-write-queries* "</td></tr>"
	       "<tr><td>Average non-cached time</td>   <td>" (if (eq? *number-non-write-queries* 0)
								 "n/a (no queries)"
								 (/ *total-non-write-delay* 
								    *number-non-write-queries*))
	       " ms</td></tr>"
	       "<tr><td>Last access</td><td>"              (seconds->time-string *last-db-access*) "</td></tr>"
	       "</table>")))
    (mutex-unlock! *heartbeat-mutex*)
    res))

(define (http-transport:runs linkpath)
  (conc "<h3>Runs</h3>"
	(string-intersperse

Modified launch.scm from [4e784cfd15] to [92d3f0b9e5].

814
815
816
817
818
819
820

821
822
823
824
825
826
827
	      (set! *runconfigdat* rdat)
	      (set! *toppath*      toppath)
	      (set! *configstatus* 'partial))
	    (begin
	      (debug:print-error 0 *default-log-port* "No " mtconfig " file found. Giving up.")
	      (exit 2))))))
    ;; additional house keeping

    (let* ((linktree (or (getenv "MT_LINKTREE")
			 (if *configdat* (configf:lookup *configdat* "setup" "linktree") #f))))
      (if linktree
	  (begin
	    (if (not (file-exists? linktree))
		(begin
		  (handle-exceptions







>







814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
	      (set! *runconfigdat* rdat)
	      (set! *toppath*      toppath)
	      (set! *configstatus* 'partial))
	    (begin
	      (debug:print-error 0 *default-log-port* "No " mtconfig " file found. Giving up.")
	      (exit 2))))))
    ;; additional house keeping
    (common:set-transport-type)
    (let* ((linktree (or (getenv "MT_LINKTREE")
			 (if *configdat* (configf:lookup *configdat* "setup" "linktree") #f))))
      (if linktree
	  (begin
	    (if (not (file-exists? linktree))
		(begin
		  (handle-exceptions

Modified megatest-version.scm from [a6ad525294] to [b4a5a2c435].

1
2
3
4
5
6
7
;; Always use two or four digit decimal
;; 1.01, 1.02...1.10,1.11,1.1101 ... 1.99,2.00..

(declare (unit megatest-version))

(define megatest-version 1.6208)






|

1
2
3
4
5
6
7
;; Always use two or four digit decimal
;; 1.01, 1.02...1.10,1.11,1.1101 ... 1.99,2.00..

(declare (unit megatest-version))

(define megatest-version 1.6301)

Modified megatest.scm from [46d15d3c2a] to [3af01ae3a7].

325
326
327
328
329
330
331

332
333
334
335
336
337
338
			"-q" ;; quiet 0, errors/warnings only
		       )
		 args:arg-hash
		 0))

;; Add args that use remargs here
;;

(if (and (not (null? remargs))
	 (not (or
	       (args:get-arg "-runstep")
	       (args:get-arg "-envcap")
	       (args:get-arg "-envdelta")
	       )
	      ))







>







325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
			"-q" ;; quiet 0, errors/warnings only
		       )
		 args:arg-hash
		 0))

;; Add args that use remargs here
;;

(if (and (not (null? remargs))
	 (not (or
	       (args:get-arg "-runstep")
	       (args:get-arg "-envcap")
	       (args:get-arg "-envdelta")
	       )
	      ))
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
(if (args:get-arg "-server")

    ;; Server? Start up here.
    ;;
    (let ((tl        (launch:setup))
	;; (run-id    (and (args:get-arg "-run-id")
	;; 		  (string->number (args:get-arg "-run-id"))))
          (transport-type (string->symbol (or (args:get-arg "-transport") "http"))))
      ;; (if run-id
      ;;   (begin
      (server:launch 0 transport-type)
      (set! *didsomething* #t)))
;;     ;; (debug:print-error 0 *default-log-port* "server requires run-id be specified with -run-id")))
;; 
;;     ;; Not a server? This section will decide how to communicate
;;     ;;
;;     ;;  Setup client for all expect listed here







|
<
<







698
699
700
701
702
703
704
705


706
707
708
709
710
711
712
(if (args:get-arg "-server")

    ;; Server? Start up here.
    ;;
    (let ((tl        (launch:setup))
	;; (run-id    (and (args:get-arg "-run-id")
	;; 		  (string->number (args:get-arg "-run-id"))))
          (transport-type *transport-type*  ))


      (server:launch 0 transport-type)
      (set! *didsomething* #t)))
;;     ;; (debug:print-error 0 *default-log-port* "server requires run-id be specified with -run-id")))
;; 
;;     ;; Not a server? This section will decide how to communicate
;;     ;;
;;     ;;  Setup client for all expect listed here
1823
1824
1825
1826
1827
1828
1829

1830
1831
1832
1833
1834
1835
1836
1837

(if (args:get-arg "-cleanup-db")
    (begin
      (if (not (launch:setup))
	  (begin
	    (debug:print 0 *default-log-port* "Failed to setup, exiting") 
	    (exit 1)))

      (common:cleanup-db)
      (set! *didsomething* #t)))

(if (args:get-arg "-mark-incompletes")
    (begin
      (if (not (launch:setup))
	  (begin
	    (debug:print 0 *default-log-port* "Failed to setup, exiting")







>
|







1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837

(if (args:get-arg "-cleanup-db")
    (begin
      (if (not (launch:setup))
	  (begin
	    (debug:print 0 *default-log-port* "Failed to setup, exiting") 
	    (exit 1)))
      (let ((dbstruct (db:setup *toppath*)))
        (common:cleanup-db dbstruct))
      (set! *didsomething* #t)))

(if (args:get-arg "-mark-incompletes")
    (begin
      (if (not (launch:setup))
	  (begin
	    (debug:print 0 *default-log-port* "Failed to setup, exiting")
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
          (debug:print 0 *default-log-port* "Failed to create HTML output in " toppath "/lt/runs-index.html"))
      (set! *didsomething* #t)))

;;======================================================================
;; Exit and clean up
;;======================================================================

(if *runremote* (close-all-connections!)) ;; for http-client

(if (not *didsomething*)
    (debug:print 0 *default-log-port* help))

(set! *time-to-exit* #t)
(thread-join! *watchdog*)








|







1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
          (debug:print 0 *default-log-port* "Failed to create HTML output in " toppath "/lt/runs-index.html"))
      (set! *didsomething* #t)))

;;======================================================================
;; Exit and clean up
;;======================================================================

(if (and *runremote* (eq? 'http (remote-transport *runremote*))) (close-all-connections!)) ;; for http-client

(if (not *didsomething*)
    (debug:print 0 *default-log-port* help))

(set! *time-to-exit* #t)
(thread-join! *watchdog*)

Modified rmt.scm from [5e992d9837] to [a4b1443352].

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

18
19
20
21
22
23
24
;;======================================================================
;; Copyright 2006-2013, Matthew Welland.
;; 
;;  This program is made available under the GNU GPL version 2.0 or
;;  greater. See the accompanying file COPYING for details.
;; 
;;  This program is distributed WITHOUT ANY WARRANTY; without even the
;;  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
;;  PURPOSE.
;;======================================================================

(use format typed-records) ;; RADT => purpose of json format??

(declare (unit rmt))
(declare (uses api))
(declare (uses tdb))
(declare (uses http-transport))

;;(declare (uses nmsg-transport))
(include "common_records.scm")

;;
;; THESE ARE ALL CALLED ON THE CLIENT SIDE!!!
;;











|






>







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
;;======================================================================
;; Copyright 2006-2013, Matthew Welland.
;; 
;;  This program is made available under the GNU GPL version 2.0 or
;;  greater. See the accompanying file COPYING for details.
;; 
;;  This program is distributed WITHOUT ANY WARRANTY; without even the
;;  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
;;  PURPOSE.
;;======================================================================
;;
(use format typed-records) ;; RADT => purpose of json format??

(declare (unit rmt))
(declare (uses api))
(declare (uses tdb))
(declare (uses http-transport))
(declare (uses rpc-transport))
;;(declare (uses nmsg-transport))
(include "common_records.scm")

;;
;; THESE ARE ALL CALLED ON THE CLIENT SIDE!!!
;;

131
132
133
134
135
136
137


138

139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154




155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
     ;; if not on homehost ensure we have a connection to a live server
     ;; NOTE: we *have* a homehost record by now
     ((and (not (cdr (remote-hh-dat *runremote*)))        ;; are we on a homehost?
           (not (remote-conndat *runremote*)))            ;; and no connection
      (debug:print-info 12 *default-log-port* "rmt:send-receive, case  6  hh-dat: " (remote-hh-dat *runremote*) " conndat: " (remote-conndat *runremote*))
      (mutex-unlock! *rmt-mutex*)
      (tasks:start-and-wait-for-server (tasks:open-db) 0 15)


      (remote-conndat-set! *runremote* (rmt:get-connection-info 0)) ;; calls client:setup which calls client:setup-http

      (rmt:send-receive cmd rid params attemptnum: attemptnum))
     ;; all set up if get this far, dispatch the query
     ((cdr (remote-hh-dat *runremote*)) ;; we are on homehost
      (mutex-unlock! *rmt-mutex*)
      (debug:print-info 12 *default-log-port* "rmt:send-receive, case  7")
      (rmt:open-qry-close-locally cmd (if rid rid 0) params))
     ;; not on homehost, do server query
     (else
      (mutex-unlock! *rmt-mutex*)
      (debug:print-info 12 *default-log-port* "rmt:send-receive, case  9")
      (let* ((conninfo (remote-conndat *runremote*))
	     (dat      (case (remote-transport *runremote*)
			 ((http) (condition-case ;; handling here has caused a lot of problems. However it is needed to deal with attemtped communication to servers that have gone away
                                  (http-transport:client-api-send-receive 0 conninfo cmd params)
                                  ((commfail)(vector #f "communications fail"))
                                  ((exn)(vector #f "other fail" (print-call-chain)))))




			 (else
			  (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported")
			  (exit))))
	     (success  (if (vector? dat) (vector-ref dat 0) #f))
	     (res      (if (vector? dat) (vector-ref dat 1) #f)))
	(if (vector? conninfo)(http-transport:server-dat-update-last-access conninfo)) ;; refresh access time
        (debug:print-info 12 *default-log-port* "rmt:send-receive, case  9. conninfo=" conninfo " dat=" dat)
	(if success
	    (case (remote-transport *runremote*)
	      ((http) res)
	      (else
	       (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " is unknown")
	       (exit 1)))
	    (begin
	      (debug:print 0 *default-log-port* "WARNING: communication failed. Trying again, try num: " attemptnum)
	      (remote-conndat-set!    *runremote* #f)
	      (remote-server-url-set! *runremote* #f)







>
>
|
>
















>
>
>
>

|







|







132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
     ;; if not on homehost ensure we have a connection to a live server
     ;; NOTE: we *have* a homehost record by now
     ((and (not (cdr (remote-hh-dat *runremote*)))        ;; are we on a homehost?
           (not (remote-conndat *runremote*)))            ;; and no connection
      (debug:print-info 12 *default-log-port* "rmt:send-receive, case  6  hh-dat: " (remote-hh-dat *runremote*) " conndat: " (remote-conndat *runremote*))
      (mutex-unlock! *rmt-mutex*)
      (tasks:start-and-wait-for-server (tasks:open-db) 0 15)
      (let* ((cinfo (rmt:get-connection-info 0))
            (transport (vector-ref cinfo 6))) ;; TODO: replace with tasks:server-dat-accessor-?? for transport
        (remote-conndat-set! *runremote* cinfo) ;; calls client:setup which calls client:setup-http
        (remote-transport-set! *runremote* transport))
      (rmt:send-receive cmd rid params attemptnum: attemptnum))
     ;; all set up if get this far, dispatch the query
     ((cdr (remote-hh-dat *runremote*)) ;; we are on homehost
      (mutex-unlock! *rmt-mutex*)
      (debug:print-info 12 *default-log-port* "rmt:send-receive, case  7")
      (rmt:open-qry-close-locally cmd (if rid rid 0) params))
     ;; not on homehost, do server query
     (else
      (mutex-unlock! *rmt-mutex*)
      (debug:print-info 12 *default-log-port* "rmt:send-receive, case  9")
      (let* ((conninfo (remote-conndat *runremote*))
	     (dat      (case (remote-transport *runremote*)
			 ((http) (condition-case ;; handling here has caused a lot of problems. However it is needed to deal with attemtped communication to servers that have gone away
                                  (http-transport:client-api-send-receive 0 conninfo cmd params)
                                  ((commfail)(vector #f "communications fail"))
                                  ((exn)(vector #f "other fail" (print-call-chain)))))
                         ((rpc) (condition-case ;; handling here has caused a lot of problems. However it is needed to deal with attemtped communication to servers that have gone away
                                  (rpc-transport:client-api-send-receive 0 conninfo cmd params)
                                  ((commfail)(vector #f "communications fail"))
                                  ((exn)(vector #f "other fail" (print-call-chain)))))
			 (else
			  (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (1)")
			  (exit))))
	     (success  (if (vector? dat) (vector-ref dat 0) #f))
	     (res      (if (vector? dat) (vector-ref dat 1) #f)))
	(if (vector? conninfo)(http-transport:server-dat-update-last-access conninfo)) ;; refresh access time
        (debug:print-info 12 *default-log-port* "rmt:send-receive, case  9. conninfo=" conninfo " dat=" dat)
	(if success
	    (case (remote-transport *runremote*)
	      ((http rpc) res)
	      (else
	       (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " is unknown")
	       (exit 1)))
	    (begin
	      (debug:print 0 *default-log-port* "WARNING: communication failed. Trying again, try num: " attemptnum)
	      (remote-conndat-set!    *runremote* #f)
	      (remote-server-url-set! *runremote* #f)
264
265
266
267
268
269
270

271






272
273
274
275
276
277
278
    res))

(define (rmt:send-receive-no-auto-client-setup connection-info cmd run-id params)
  (let* ((run-id   (if run-id run-id 0))
	 (res  	   (handle-exceptions
		    exn
		    #f

		    (http-transport:client-api-send-receive run-id connection-info cmd params))))






    (if (and res (vector-ref res 0))
	(vector-ref res 1) ;;; YES!! THIS IS CORRECT!! CHANGE IT HERE, THEN CHANGE rmt:send-receive ALSO!!!
	#f)))

;; ;; Wrap json library for strings (why the ports crap in the first place?)
;; (define (rmt:dat->json-str dat)
;;   (with-output-to-string 







>
|
>
>
>
>
>
>







272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
    res))

(define (rmt:send-receive-no-auto-client-setup connection-info cmd run-id params)
  (let* ((run-id   (if run-id run-id 0))
	 (res  	   (handle-exceptions
		    exn
		    #f
                    (case (remote-transport *runremote*)
                      ((http) (http-transport:client-api-send-receive run-id connection-info cmd params))
                      ((rpc) (rpc-transport:client-api-send-receive run-id connection-info cmd params))
                      (else
			  (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (2)")
			  (exit))

                    ))))
    (if (and res (vector-ref res 0))
	(vector-ref res 1) ;;; YES!! THIS IS CORRECT!! CHANGE IT HERE, THEN CHANGE rmt:send-receive ALSO!!!
	#f)))

;; ;; Wrap json library for strings (why the ports crap in the first place?)
;; (define (rmt:dat->json-str dat)
;;   (with-output-to-string 
308
309
310
311
312
313
314
315





316
317
318
319
320
321
322
  (rmt:send-receive 'login run-id (list *toppath* megatest-version *my-client-signature*)))

;; This login does no retries under the hood - it acts a bit like a ping.
;; Deprecated for nmsg-transport.
;;
(define (rmt:login-no-auto-client-setup connection-info)
  (case *transport-type* ;; run-id of 0 is just a placeholder
    ((http)(rmt:send-receive-no-auto-client-setup connection-info 'login 0 (list *toppath* megatest-version *my-client-signature*)))





    ;;((nmsg)(nmsg-transport:client-api-send-receive run-id connection-info 'login (list *toppath* megatest-version run-id *my-client-signature*)))
    ))

;; hand off a call to one of the db:queries statements
;; added run-id to make looking up the correct db possible 
;;
(define (rmt:general-call stmtname run-id . params)







|
>
>
>
>
>







323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
  (rmt:send-receive 'login run-id (list *toppath* megatest-version *my-client-signature*)))

;; This login does no retries under the hood - it acts a bit like a ping.
;; Deprecated for nmsg-transport.
;;
(define (rmt:login-no-auto-client-setup connection-info)
  (case *transport-type* ;; run-id of 0 is just a placeholder
    ((http rpc)(rmt:send-receive-no-auto-client-setup connection-info 'login 0 (list *toppath* megatest-version *my-client-signature*)))
    (else
     (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (3)")
     (exit))

    
    ;;((nmsg)(nmsg-transport:client-api-send-receive run-id connection-info 'login (list *toppath* megatest-version run-id *my-client-signature*)))
    ))

;; hand off a call to one of the db:queries statements
;; added run-id to make looking up the correct db possible 
;;
(define (rmt:general-call stmtname run-id . params)
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
(define (rmt:update-run-event_time run-id)
  (rmt:send-receive 'update-run-event_time #f (list run-id)))

(define (rmt:get-runs-by-patt  keys runnamepatt targpatt offset limit fields last-runs-update) ;; fields of #f uses default
  (rmt:send-receive 'get-runs-by-patt #f (list keys runnamepatt targpatt offset limit fields last-runs-update)))

(define (rmt:find-and-mark-incomplete run-id ovr-deadtime)
  (if (rmt:send-receive 'have-incompletes? run-id (list run-id ovr-deadtime))
      (rmt:send-receive 'mark-incomplete run-id (list run-id ovr-deadtime))))

(define (rmt:get-main-run-stats run-id)
  (rmt:send-receive 'get-main-run-stats #f (list run-id)))

(define (rmt:get-var varname)
  (rmt:send-receive 'get-var #f (list varname)))








|
|







606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
(define (rmt:update-run-event_time run-id)
  (rmt:send-receive 'update-run-event_time #f (list run-id)))

(define (rmt:get-runs-by-patt  keys runnamepatt targpatt offset limit fields last-runs-update) ;; fields of #f uses default
  (rmt:send-receive 'get-runs-by-patt #f (list keys runnamepatt targpatt offset limit fields last-runs-update)))

(define (rmt:find-and-mark-incomplete run-id ovr-deadtime)
  ;; (if (rmt:send-receive 'have-incompletes? run-id (list run-id ovr-deadtime))
  (rmt:send-receive 'mark-incomplete run-id (list run-id ovr-deadtime))) ;; )

(define (rmt:get-main-run-stats run-id)
  (rmt:send-receive 'get-main-run-stats #f (list run-id)))

(define (rmt:get-var varname)
  (rmt:send-receive 'get-var #f (list varname)))

Modified rpc-transport.scm from [7aa56cfddc] to [1a12c353bd].

1
2
3
4
5
6
7
8
9

;; Copyright 2006-2012, Matthew Welland.
;; 
;;  This program is made available under the GNU GPL version 2.0 or
;;  greater. See the accompanying file COPYING for details.
;; 
;;  This program is distributed WITHOUT ANY WARRANTY; without even the
;;  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
;;  PURPOSE.

|







1
2
3
4
5
6
7
8
9

;; Copyright 2006-2016, Matthew Welland.
;; 
;;  This program is made available under the GNU GPL version 2.0 or
;;  greater. See the accompanying file COPYING for details.
;; 
;;  This program is distributed WITHOUT ANY WARRANTY; without even the
;;  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
;;  PURPOSE.
19
20
21
22
23
24
25




26
27
28



















































































29







30


31





32








33



































34

35


36
37
38
39
40
41
42
43
44
45


46
47






48


















































49


50














51
52
53





54
55
56





57
















58


59






60

61






62
63









64










65
66
67
68
69
70
71



72
73



74











75
76


77
78
79
80
81

82

83





84
85
86

87


88
89
90
91
92
93
94
95




96
97
98
99












100
101
102
103
104
105
106




107








108
109
110
111


112






113




114














115
116
117
























118
119
120
121

122
123

124
125

126


127





128


129
130


131




132











133
134

135


136










137


138
139

140
141
142
143
144
145
146


147

148

149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174


175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193

194
195

196


197
198
199
200
201
202
203
204
205
206
207
208
209
210

211
212
213
214

215
216
217
218
219
220




221
222
223
224
225


226



227
228

(declare (uses common))
(declare (uses db))
(declare (uses tests))
(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running.

(include "common_records.scm")
(include "db_records.scm")





;; procstr is the name of the procedure to be called as a string
(define (rpc-transport:autoremote procstr params)



















































































  (handle-exceptions







   exn


   (begin





     (debug:print 1 *default-log-port* "Remote failed for " proc " " params)








     (apply (eval (string->symbol procstr)) params))



































   ;; (if *runremote*

   ;;    (apply (eval (string->symbol (conc "remote:" procstr))) params)


   (apply (eval (string->symbol procstr)) params)))

;; all routes though here end in exit ...
;;
;; start_server? 
;;
(define (rpc-transport:launch run-id)
  (let* ((tdbdat (tasks:open-db)))
    (BB> "rpc-transport:launch fired for run-id="run-id)
    (set! *run-id*   run-id)


    (if (args:get-arg "-daemonize")
        (daemon:ize))






    (if (server:check-if-running run-id)


















































        (begin


          (debug:print 0 *default-log-port* "INFO: Server for run-id " run-id " already running")














          (exit 0)))
    (let loop ((server-id (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id))
               (remtries  4))





      (if (not server-id)
          (if (> remtries 0)
              (begin





                (thread-sleep! 2)
















                (loop (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id)


                      (- remtries 1)))






              (begin

                ;; since we didn't get the server lock we are going to clean up and bail out






                (debug:print-info 2 *default-log-port* "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue")
                (tasks:server-delete-records-for-this-pid (db:delay-if-busy tdbdat) " rpc-transport:launch")))









          (begin










            (rpc-transport:run (if (args:get-arg "-server")(args:get-arg "-server") "-") run-id server-id)
            (exit))))))

(define (rpc-transport:run hostn run-id server-id)
  (debug:print 2 *default-log-port* "Attempting to start the rpc server ...")
   ;; (trace rpc:publish-procedure!)




  (rpc:publish-procedure! 'server:login server:login)
  (rpc:publish-procedure! 'testing (lambda () "Just testing"))















  (let* ((db              #f)
	 (hostname        (get-host-name))


	 (ipaddrstr       (let ((ipstr (if (string=? "-" hostn)
					   ;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
					   (server:get-best-guess-address hostname)
					   #f)))
			    (if ipstr ipstr hostn))) ;; hostname))) 

	 (start-port      (open-run-close tasks:server-get-next-port tasks:open-db))

	 (link-tree-path  (configf:lookup *configdat* "setup" "linktree"))





	 (rpc:listener    (rpc-transport:find-free-port-and-open (rpc:default-server-port)))
	 (th1             (make-thread
			   (lambda ()

			     ((rpc:make-server rpc:listener) #t))


			   "rpc:server"))
			   ;; (cute (rpc:make-server rpc:listener) "rpc:server")
			   ;; 'rpc:server))
	 (hostname        (if (string=? "-" hostn)
			      (get-host-name) 
			      hostn))
	 (ipaddrstr       (if (string=? "-" hostn)
			      (server:get-best-guess-address hostname) ;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")




			      #f))
	 (portnum         (rpc:default-server-port))
	 (host:port       (conc (if ipaddrstr ipaddrstr hostname) ":" portnum))
	 (tdb             (tasks:open-db)))












    (thread-start! th1)
    (set! db *dbstruct-db*)
    (open-run-close tasks:server-set-interface-port 
		    tasks:open-db 
		    server-id 
		    ipaddrstr portnum)
    (debug:print 0 *default-log-port* "Server started on " host:port)




    








    ;; (trace rpc:publish-procedure!)
    ;; (rpc:publish-procedure! 'server:login server:login)
    ;; (rpc:publish-procedure! 'testing (lambda () "Just testing"))



    ;;======================================================================






    ;;	  ;; end of publish-procedure section




    ;;======================================================================














    ;;
    (on-exit (lambda ()
	       (open-run-close tasks:server-set-state! tasks:open-db server-id "stopped")))

























    (set! *rpc:listener* rpc:listener)
    (tasks:server-set-state! tdb server-id "running")
    (set! *dbstruct-db*  (db:setup run-id))

    ;; if none running or if > 20 seconds since 
    ;; server last used then start shutdown

    (let loop ((count 0))
      (thread-sleep! 5) ;; no need to do this very often

      (let ((numrunning -1)) ;; (db:get-count-tests-running db)))


	(if (or (> numrunning 0)





		(> (+ *db-last-access* 60)(current-seconds)))


	    (begin
	      (debug:print-info 0 *default-log-port* "Server continuing, tests running: " numrunning ", seconds since last db access: " (- (current-seconds) *db-last-access*))


	      (loop (+ 1 count)))




	    (begin











	      (debug:print-info 0 *default-log-port* "Starting to shutdown the server side")
	      (open-run-close tasks:server-delete-record tasks:open-db server-id " rpc-transport:try-start-server stop")

	      (thread-sleep! 10)


	      (debug:print-info 0 *default-log-port* "Max cached queries was " *max-cache-size*)










	      (debug:print-info 0 *default-log-port* "Server shutdown complete. Exiting")


	      ))))))


(define (rpc-transport:find-free-port-and-open port)
  (handle-exceptions
   exn
	  (begin
     (print "Failed to bind to port " (rpc:default-server-port) ", trying next port")
     (rpc-transport:find-free-port-and-open (+ port 1)))
   (rpc:default-server-port port)


   (tcp-read-timeout 240000)

   (tcp-listen (rpc:default-server-port) 10000)))


(define (rpc-transport:ping run-id host port)
  (handle-exceptions
   exn
   (begin
     (print "SERVER_NOT_FOUND")
     (exit 1))
   (let ((login-res ((rpc:procedure 'server:login host port) *toppath*)))
     (if (and (list? login-res)
	      (car login-res))
	 (begin
	   (print "LOGIN_OK")
	   (exit 0))
	 (begin
	   (print "LOGIN_FAILED")
	   (exit 1))))))

(define (rpc-transport:client-setup run-id #!key (remtries 10))
  (if *runremote*
      (begin
	(debug:print-error 0 *default-log-port* "Attempt to connect to server but already connected")
	#f)
      (let* ((host-info (hash-table-ref/default *runremote* run-id #f))) ;; (open-run-close db:get-var #f "SERVER"))
	(if host-info
	    (let ((iface    (car host-info))
		  (port     (cadr host-info))


		  (ping-res ((rpc:procedure 'server:login host port) *toppath*)))
	      (if ping-res
		  (let ((server-dat (list iface port #f #f #f)))
		    (hash-table-set! *runremote* run-id server-dat)
		    server-dat)
		  (begin
		    (server:try-running run-id)
		    (thread-sleep! 2)
		    (rpc-transport:client-setup run-id (- remtries 1)))))
 	    (let* ((server-db-info (open-run-close tasks:get-server tasks:open-db run-id)))
 	      (debug:print-info 0 *default-log-port* "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries)
	      (if server-db-info
 		  (let* ((iface     (tasks:hostinfo-get-interface server-db-info))
 			 (port      (tasks:hostinfo-get-port      server-db-info))
			 (server-dat (list iface port #f #f #f))
 			 (ping-res  ((rpc:procedure 'server:login host port) *toppath*)))
 		    (if start-res
 			(begin
 			  (hash-table-set! *runremote* run-id server-dat)

			  server-dat)
			(begin

			  (server:try-running run-id)


			  (thread-sleep! 2)
			  (rpc-transport:client-setup run-id (- remtries 1)))))
		  (begin
		    (server:try-running run-id)
		    (thread-sleep! 2)
		    (rpc-transport:client-setup run-id (- remtries 1)))))))))
;; 
;; 	     (port     (if (and hostinfo (> (length hostdat) 1))(cadr hostdat) #f)))
;; 	(if (and port
;; 		 (string->number port))
;; 	    (let ((portn (string->number port)))
;; 	      (debug:print-info 2 *default-log-port* "Setting up to connect to host " host ":" port)
;; 	      (handle-exceptions
;; 	       exn

;; 	       (begin
;; 		 (debug:print-error 0 *default-log-port* "Failed to open a connection to the server at host: " host " port: " port)
;; 		 (debug:print 0 *default-log-port* "   EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
;; 		 ;; (open-run-close 

;; 		 ;;  (lambda (db . param) 
;; 		 ;;    (sqlite3:execute db "DELETE FROM metadat WHERE var='SERVER'"))
;; 		 ;;  #f)
;; 		 (set! *runremote* #f))
;; 	       (if (and (not (args:get-arg "-server")) ;; no point in the server using the server using the server
;; 			((rpc:procedure 'server:login host portn) *toppath*))




;; 		   (begin
;; 		     (debug:print-info 2 *default-log-port* "Logged in and connected to " host ":" port)
;; 		     (set! *runremote* (vector host portn)))
;; 		   (begin
;; 		     (debug:print-info 2 *default-log-port* "Failed to login or connect to " host ":" port)


;; 		     (set! *runremote* #f)))))



;; 	    (debug:print-info 2 *default-log-port* "no server available")))))









>
>
>
>


|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
>
>
>
>
>
>
>
|
>
>
|
>
>
>
>
>
|
>
>
>
>
>
>
>
>
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
>
|
>
>
|






<
<
|
>
>
|
|
>
>
>
>
>
>
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
>
>
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
|
<
>
>
>
>
>
|
<
|
>
>
>
>
>
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
>
>
|
>
>
>
>
>
>
|
>
|
>
>
>
>
>
>
|
<
>
>
>
>
>
>
>
>
>
|
>
>
>
>
>
>
>
>
>
>
|
|





>
>
>
|
|
>
>
>

>
>
>
>
>
>
>
>
>
>
>

|
>
>
|


|
|
>
|
>

>
>
>
>
>
|


>
|
>
>

|
|
|




>
>
>
>
|
|
|
|
>
>
>
>
>
>
>
>
>
>
>
>


|
<
<
<

>
>
>
>
|
>
>
>
>
>
>
>
>
|
|
|

>
>
|
>
>
>
>
>
>
|
>
>
>
>
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
|
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
|
<
|
>
|
<
>
|
|
>
|
>
>
|
>
>
>
>
>
|
>
>
|
|
>
>
|
>
>
>
>
|
>
>
>
>
>
>
>
>
>
>
>
|
|
>
|
>
>
|
>
>
>
>
>
>
>
>
>
>
|
>
>
|

>
|


|

|

>
>

>
|
>
|




|


<
|







|
|
|
<
<
<
|
<
|
>
>
|
|
|
<
<
|
<
<
<
<
<
<
<
<
<
<
<
|
<
>
|
|
>
|
>
>
|
|
|
<
|
|
<
<
<
<
<
<
<
|
>
|
<
<
<
>
|
<
<
<
<
|
>
>
>
>
|
|
|
|
|
>
>
|
>
>
>
|
|
>
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189


190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271

272
273
274
275
276
277

278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320

321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423



424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499

500
501
502

503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583

584
585
586
587
588
589
590
591
592
593
594



595

596
597
598
599
600
601


602











603

604
605
606
607
608
609
610
611
612
613

614
615







616
617
618



619
620




621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
(declare (uses common))
(declare (uses db))
(declare (uses tests))
(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running.

(include "common_records.scm")
(include "db_records.scm")

(define *heartbeat-mutex* (make-mutex))
(define *server-loop-heart-beat* (current-seconds))


;; procstr is the name of the procedure to be called as a string
(define (rpc-transport:autoremote procstr params)  ;; may be unused, I think api-exec deprecates this one.
  (let* ((procsym (if (symbol? procstr)
                     procstr
                     (string->symbol (->string procstr))))
        (res
         (begin
           (apply (eval procsym) params))))
    res))


;; rpc receiver
(define (rpc-transport:api-exec cmd params)
  (let* ( (resdat  (api:execute-requests *dbstruct-db* (vector cmd params))) ;; #( flag result )
          (flag    (vector-ref resdat 0))
          (res     (vector-ref resdat 1)))

    (mutex-lock! *heartbeat-mutex*)

    (set! *last-db-access* (current-seconds)) ;; bump *last-db-access*; this will renew keep-running thread's lease on life for another (server:get-timeout) seconds
    ;;(BB> "in api-exec; last-db-access updated to "*last-db-access*)
    (mutex-unlock! *heartbeat-mutex*)

    res))


;; retry an operation (depends on srfi-18)
;; ==================
;; idea here is to avoid spending time on coding retrying something.  Trying to be generic here.
;;
;; Exception handling:
;; -------------------
;; if evaluating the thunk results in exception, it will be retried.
;; on last try, if final-failure-returns-actual is true, the exception will be re-thrown to caller.
;;
;; look at options below #!key to see how to configure behavior
;;
;;
(define (retry-thunk
         the-thunk
         #!key ;;;; options below
         (accept-result?   (lambda (x) x)) ;; retry if predicate applied to thunk's result is false 
         (retries                       4) ;; how many tries
         (failure-value                #f) ;; return this on final failure, unless following option is enabled:
         (final-failure-returns-actual #f) ;; on failure, on the last try, just return the result, not failure-value

         (retry-delay                 0.1) ;; delay between tries
         (back-off-factor               1) ;; multiply retry-delay by this factor on retry
         (random-delay                0.1) ;; add a random portion of this value to wait

         (chatty                       #f) ;; print status as we go, for debugging.
         )
  
  (when chatty (print) (print "Entered retry-thunk") (print "-=-=-=-=-=-"))
  (let* ((guarded-thunk ;; we are guarding the thunk against exceptions.  We will record whether result of evaluation is an exception or a regular result.
          (lambda ()
           (let* ((EXCEPTION (gensym)) ;; using gensym to avoid potential collision
                  (res
                   (condition-case
                    (the-thunk) ;; this is what we are guarding the execution of
                    [x () (cons EXCEPTION x)]
                    )))
             (cond
              ((and (pair? res) (eq? (car res) EXCEPTION))
               (if chatty
                   (print " - the-thunk threw exception >"(cdr res)"<"))
               (cons 'exception (cdr res)))
               (else
                (if chatty
                    (print " - the-thunk returned result >"res"<"))
                (cons 'regular-result res)))))))
    
    (let loop ((guarded-res (guarded-thunk))
               (retries-left retries)
               (fail-wait retry-delay))
      (if chatty (print "   =========="))
      (let* ((wait-time (+ fail-wait (+ (* fail-wait back-off-factor)
                                        (* random-delay
                                           (/ (random 1024) 1024) ))))
             (res-type (car guarded-res))
             (res-value (cdr guarded-res)))
        (cond
         ((and (eq? res-type 'regular-result) (accept-result? res-value))
                   (if chatty (print " + return result that satisfied accept-result? >"res-value"<"))
                   res-value)

         ((> retries-left 0)
          (if chatty (print " - sleep "wait-time))
          (thread-sleep! wait-time)
          (if chatty (print " + retry ["retries-left" tries left]"))
          (loop (guarded-thunk)
                (sub1 retries-left)
                wait-time))
         
         ((eq? res-type 'regular-result)
          (if final-failure-returns-actual
              (begin
                (if chatty (print " + last try failed- return the result >"res-value"<"))
                res-value)
              (begin
                (if chatty (print " + last try failed- return canned failure value >"failure-value"<"))
              failure-value)))
         
         (else ;; no retries left; result was not accepted and res-type can only be 'exception
          (if final-failure-returns-actual 
              (begin
                (if chatty (print " + last try failed with exception- re-throw it >"res-value"<"))
                (abort res-value)); re-raise the exception. TODO: find a way for call-history to show as though from entry to this function
              (begin
                (if chatty (print " + last try failed with exception- return canned failure value >"failure-value"<"))
                failure-value))))))))


(define (rpc-transport:server-shutdown server-id rpc:listener ) ;;#!key (from-on-exit #f))
  ;;(on-exit (lambda () #t)) ;; turn off on-exit stuff
  ;;(tcp-close rpc:listener) ;; gotta exit nicely
  ;;(tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "stopped")


  ;; TODO: (low) the following is extraordinaritly slow.  Maybe we don't even need portlogger for rpc anyway??  the exception-based failover when ports are taken is fast!
  ;;(portlogger:open-run-close portlogger:set-port (rpc:default-server-port) "released")
  
  (set! *time-to-exit* #t)
  ;;(if *dbstruct-db* (db:sync-touched *dbstruct-db* *run-id* force-sync: #t))
  
  (server:remove-dotserver-file *toppath* "anyhost:anyport" force: #t)
  (tasks:server-delete-record (db:delay-if-busy (tasks:open-db)) server-id " rpc-transport:keep-running complete")
  

  ;;(BB> "Before (exit) (from-on-exit="from-on-exit")")
  ;;(unless from-on-exit (exit))  ;; sometimes we hang (around) here with 100% cpu.
  ;;(BB> "After")
  ;; strace reveals endless:
  ;; getrusage(RUSAGE_SELF, {ru_utime={413, 917868}, ru_stime={0, 60003}, ...}) = 0
  ;; getrusage(RUSAGE_SELF, {ru_utime={414, 9874}, ru_stime={0, 60003}, ...}) = 0
  ;; getrusage(RUSAGE_SELF, {ru_utime={414, 13874}, ru_stime={0, 60003}, ...}) = 0
  ;; getrusage(RUSAGE_SELF, {ru_utime={414, 105880}, ru_stime={0, 60003}, ...}) = 0
  ;; getrusage(RUSAGE_SELF, {ru_utime={414, 109880}, ru_stime={0, 60003}, ...}) = 0
  ;; getrusage(RUSAGE_SELF, {ru_utime={414, 201886}, ru_stime={0, 60003}, ...}) = 0
  ;; getrusage(RUSAGE_SELF, {ru_utime={414, 205886}, ru_stime={0, 60003}, ...}) = 0
  ;; getrusage(RUSAGE_SELF, {ru_utime={414, 297892}, ru_stime={0, 60003}, ...}) = 0
  ;; getrusage(RUSAGE_SELF, {ru_utime={414, 301892}, ru_stime={0, 60003}, ...}) = 0
  ;; getrusage(RUSAGE_SELF, {ru_utime={414, 393898}, ru_stime={0, 60003}, ...}) = 0
  ;; getrusage(RUSAGE_SELF, {ru_utime={414, 397898}, ru_stime={0, 60003}, ...}) = 0
  ;; make a post to chicken-users w/ http://paste.call-cc.org/paste?id=60a4b66a29ccf7d11359ea866db642c970735978


  ;; (if from-on-exit
  ;;     ;; avoid above condition!  End current process externally  since 1 in 20 (exit)'s result in hung, 100% cpu zombies. (see above)
  
  (system (conc  "kill -9  "(current-process-id)))
  )


;; all routes though here end in exit ...
;;
;; start_server? 
;;
(define (rpc-transport:launch run-id)


  (set! *run-id*   run-id)

  ;; ;; send to background if requested
  ;; (when (args:get-arg "-daemonize")
  ;;     (daemon:ize)
  ;;     (when *alt-log-file* ;; we should re-connect to this port, I think daemon:ize disrupts it
  ;;       (current-error-port *alt-log-file*)
  ;;       (current-output-port *alt-log-file*)))

  ;; double check we dont alrady have a running server for this run-id
  (when (and (server:read-dotserver *toppath*)
             (server:check-if-running run-id))
    (debug:print 0 *default-log-port* "INFO: Server for run-id " run-id " already running")
    (exit 0))

  ;; did not find server running, let's clean up the table of dead servers
  (tasks:server-force-clean-running-records-for-run-id  (db:delay-if-busy (tasks:open-db)) run-id "notresponding")  

  (server:dotserver-starting)

  


  
  ;; let's get a server-id for this server
  ;;   if at first we do not suceed, try 3 more times.
  (let ((server-id (retry-thunk
                    (lambda () (tasks:server-lock-slot (db:delay-if-busy (tasks:open-db)) run-id 'rpc))
                    chatty: #f
                    final-failure-returns-actual: #t
                    retries: 4)))
    (when (not server-id) ;; dang we couldn't get a server-id.
      ;; since we didn't get the server lock we are going to clean up and bail out


      (debug:print-info 2 *default-log-port* "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue")
      (tasks:server-delete-records-for-this-pid (db:delay-if-busy (tasks:open-db)) " rpc-transport:launch")
      (server:dotserver-starting-remove)
      (exit 1))

    ;; we got a server-id (and a corresponding entry in servers table in globally shared mdb)
    ;; all systems go.  Proceed to setup rpc server.  
    (rpc-transport:run
     (if (args:get-arg "-server")
         (args:get-arg "-server")
         "-")
     run-id
     server-id)
    (exit)))

(define *rpc-listener-port* #f)
(define *rpc-listener-port-bind-timestamp* #f)

(define *on-exit-flag #f)

(define (rpc-transport:server-dat-get-iface         vec)    (vector-ref  vec 0))
(define (rpc-transport:server-dat-get-port          vec)    (vector-ref  vec 1))
(define (rpc-transport:server-dat-get-last-access   vec)    (vector-ref  vec 5))
(define (rpc-transport:server-dat-get-transport     vec)    (vector-ref  vec 6))
(define (rpc-transport:server-dat-update-last-access vec)
  (if (vector? vec)
      (vector-set! vec 5 (current-seconds))
      (begin
	(print-call-chain (current-error-port))
	(debug:print-error 0 *default-log-port* "call to rpc-transport:server-dat-update-last-access with non-vector!!"))))


(define *api-exec-ht* (make-hash-table))
(define *api-exec-mutex* (make-mutex))
;; let's see if caching the rpc stub curbs thread-profusion on server side
(define (rpc-transport:get-api-exec iface port)
  (mutex-lock! *api-exec-mutex*)
  (let* ((lu (hash-table-ref/default *api-exec-ht* (cons iface  port) #f)))
    (if lu
        (begin
          (mutex-unlock! *api-exec-mutex*)
          lu)
        (let ((res (rpc:procedure 'api-exec iface port)))
          (hash-table-set! *api-exec-ht* (cons iface port) res)
          (mutex-unlock! *api-exec-mutex*)
          res))))


;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; this client-side procedure makes rpc call to server and returns result
;;
(define (rpc-transport:client-api-send-receive run-id serverdat cmd params #!key (numretries 3))
  ;;(BB> "entered rpc-transport:client-api-send-receive with run-id="run-id " serverdat="serverdat" cmd="cmd" params="params" numretries="numretries)
  (if (not (vector? serverdat))

      (begin
        (BB> "WHAT?? for run-id="run-id", serverdat="serverdat)
        (print-call-chain)
        (exit 1)))
  (let* ((iface (rpc-transport:server-dat-get-iface serverdat))
         (port  (rpc-transport:server-dat-get-port serverdat))
         (res #f)
         (api-exec (rpc-transport:get-api-exec iface port))  ;; chached by host/port. may need to clear...
         (send-receive (lambda ()
                         (tcp-buffer-size 0)
                         (set! res (retry-thunk
                                    (lambda ()
                                      (condition-case
                                       ;;(vector #t (run-remote cmd params))
                                       (vector 'success (api-exec cmd params))
                                       [x (exn i/o net) (vector 'comms-fail (conc "communications fail ["(->string x)"]") x)]
                                       [x () (vector 'other-fail "other fail ["(->string x)"]" x)]))
                                    chatty: #f
                                    accept-result?: (lambda(x)
                                                      (and (vector? x) (vector-ref x 0)))
                                    retries: 8
                                    back-off-factor: 1.5
                                    random-wait: 0.2
                                    retry-delay: 0.1
                                    final-failure-returns-actual: #t))
                         ;;(BB> "HEY res="res)
                         res
                         ))
         (th1 (make-thread send-receive "send-receive"))
         (time-out-reached #f)
         (time-out     (lambda ()
			      (thread-sleep! 45)
                              (set! time-out-reached #t)
                              (thread-terminate! th1)
			      #f))

         (th2 (make-thread time-out     "time out")))
	 (thread-start! th1)
	 (thread-start! th2)
	 (thread-join! th1)
	 (thread-terminate! th2)
         ;;(BB> "alt got res="res)
	 (debug:print-info 11 *default-log-port* "got res=" res)

	 (if (vector? res)
             (case (vector-ref res 0)
               ((success) (vector #t (vector-ref res 1)))
               (
                (comms-fail other-fail)
                ;;(comms-fail) 
                (debug:print 0 *default-log-port* "WARNING: comms failure for rpc request >>"res"<<")
                ;;(debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn))
                (vector #f (vector-ref res 1)))
               (else
                (debug:print-error 0 *default-log-port* "error occured at server, info=" (vector-ref res 1))
                (debug:print 0 *default-log-port* " client call chain:")
                (print-call-chain (current-error-port))
                (debug:print 0 *default-log-port* " server call chain:")
                (pp (vector-ref res 1) (current-error-port))
                (signal (vector-ref res 2))))
             (signal (make-composite-condition
                      (make-property-condition 
             	       'timeout
             	       'message "nmsg-transport:client-api-send-receive-raw timed out talking to server"))))))
        


(define (rpc-transport:run hostn run-id server-id)
  (debug:print 2 *default-log-port* "Attempting to start the rpc server ...")
   ;; (trace rpc:publish-procedure!)

  ;;======================================================================
  ;;	  start of publish-procedure section
  ;;======================================================================
  (rpc:publish-procedure! 'server:login server:login) ;; this allows client to validate it is the same megatest instance as the server.  No security here, just making sure we're in the right room.
  (rpc:publish-procedure!
   'testing
   (lambda ()
     "Just testing"))

  ;; procedure to receive arbitrary API request from client's rpc:send-receive/rpc-transport:client-api-send-receive 
  (rpc:publish-procedure! 'rpc-transport:autoremote rpc-transport:autoremote)
  ;; can use this to run most anything at the remote
  (rpc:publish-procedure! 'api-exec rpc-transport:api-exec)
  
  
  ;;======================================================================
  ;;	  end of publish-procedure section
  ;;======================================================================


  (let* ((db              #f)
	 (hostname        (let ((res (get-host-name)))  res))
         (server-start-time (current-seconds))
         (server-timeout (server:get-timeout))
	 (ipaddrstr       (let* ((ipstr (if (string=? "-" hostn)
					   ;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
					   (server:get-best-guess-address hostname)
					   #f))
                                 (res (if ipstr ipstr hostn)))
                            res)) ;; hostname))) 
	 (start-port      (let ((res (portlogger:open-run-close portlogger:find-port)))      ;; BB> TODO: remove portlogger!
                            res))
	 (link-tree-path  (configf:lookup *configdat* "setup" "linktree"))

         ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
         ;; rpc:listener is the tcp-listen result from inside the find-free-port-and-open complex.
         ;;   It is our handle on the listening tcp port
         ;;   We will attach this to our rpc server with rpc:make-server in thread th1 .
	 (rpc:listener    (rpc-transport:find-free-port-and-open start-port)) 
	 (th1             (make-thread
			   (lambda ()
                             ;;(BB> "BEFORE rpc:make-server")
			     ((rpc:make-server rpc:listener) #t)
                             ;;(BB> "BEFORE rpc:make-server")
                             )
			   "rpc:server"))


         (hostname        (if (string=? "-" hostn)
			      (get-host-name) 
			      hostn))
	 (ipaddrstr       (if (string=? "-" hostn)
			      (server:get-best-guess-address hostname) ;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
                              (string-intersperse 
                               (map number->string
                                    (u8vector->list
                                      (hostname->ip hostn))) ".")
                              ))
	 (portnum         (let ((res (rpc:default-server-port)))  res))
	 (host:port       (conc (if ipaddrstr ipaddrstr hostname) ":" portnum)))

    (when (not (equal? ipaddrstr (server:get-best-guess-address (get-host-name))))
        
      (debug:print 0 *default-log-port* "Error: This host "(ip->string (hostname->ip (get-host-name)))" ("(get-host-name)") is not the homehost "ipaddrstr" ("(ip->hostname (string->ip ipaddrstr))"; Cannot proceed.")
      (server:dotserver-starting-remove)
      (tcp-close rpc:listener) ;; gotta exit nicely and free up that tcp port
      (exit))
    
    (tasks:server-set-interface-port (db:delay-if-busy (tasks:open-db)) server-id ipaddrstr portnum)

    ;;============================================================
    ;;  activate thread th1 to attach opened tcp port to rpc server
    ;;=============================================================
    (thread-start! th1)
    (set! db *dbstruct-db*)




    (debug:print 0 *default-log-port* "Server started on " host:port)
    ;;(BB> "before SELF-TEST")
    (if (retry-thunk (lambda ()
                       (rpc-transport:self-test run-id ipaddrstr portnum))
                     final-failure-returns-actual: #t ;; TODO: remove this line
                     )
        (debug:print 0 *default-log-port* "INFO: rpc self test passed!")
        (begin
          (debug:print 0 *default-log-port* "Error: rpc listener did not pass self test.  Shutting down.  On: " host:port)
          (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "dead")
          (tcp-close rpc:listener) ;; gotta exit nicely and free up that tcp port
          (rpc-transport:server-shutdown server-id rpc:listener)
          (server:dotserver-starting-remove)
          (exit)))




    ;;(on-exit (lambda ()
    ;;           (rpc-transport:server-shutdown server-id rpc:listener from-on-exit: #t)))
    
    ;; check again for running servers for this run-id in case one has snuck in since we checked last in rpc-transport:launch
    (if (not (equal? server-id (tasks:server-am-i-the-server? (db:delay-if-busy (tasks:open-db)) run-id)));; try to ensure no double registering of servers
        (begin ;; i am not the server, another server snuck in and beat this one to the punch
          (tcp-close rpc:listener) ;; gotta exit nicely and free up that tcp port
          (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "collision")
          (server:dotserver-starting-remove))

        (begin ;; i am the server
          ;; setup the in-memory db
          (set! *dbstruct-db*  (db:setup run-id))
          (db:get-db *dbstruct-db* run-id)

          ;; at this point, satisfied server has started
          ;; let's make it official
          (server:write-dotserver *toppath* (conc ipaddrstr ":" portnum))
          (mutex-lock! *heartbeat-mutex*)
          (set! *last-db-access* (current-seconds))
          (mutex-unlock! *heartbeat-mutex*)
          (set! *rpc:listener* rpc:listener) 
          (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "running") ;; update our mdb servers entry

          
          
          ;; this let loop will hold open this thread until we want the server to shut down.
          ;;   if no requests received within the last 20 seconds :
          ;;   database hasnt changed in ??
          ;;


          
          ;; keep-running loop: polls last-db-access to see if we have timed out.  keep running if not. 
          (let loop ((count          0)
                     (bad-sync-count 0))
            (BB> "keep running: count = "count)
            ;; Use this opportunity to sync the inmemdb to db

            (let ((start-time (current-milliseconds))
                  (sync-time  #f)
                  (rem-time   #f))

              ;; following is now done in common:watchdog, commenting out.  sync-time will now be 0; can live with that.
              ;; ;; inmemddb is a dbstruct
              ;; (condition-case
              ;;  (db:sync-touched *dbstruct-db* *run-id* force-sync: #t)
              ;;  ((sync-failed)(cond
              ;;                 ((> bad-sync-count 10) ;; time to give up
              ;;                  (rpc-transport:server-shutdown server-id rpc:listener))
              ;;                 (else ;; (> bad-sync-count 0)  ;; we've had a fail or two, delay and loop
              ;;                  (thread-sleep! 5)
              ;;                  (loop count (+ bad-sync-count 1)))))
              ;;  ((exn)
              ;;   (debug:print-error 0 *default-log-port* "error from sync code other than 'sync-failed. Attempting to gracefully shutdown the server ")
              ;;   (rpc-transport:server-shutdown server-id rpc:listener)))
              
              (set! sync-time  (- (current-milliseconds) start-time))

              (set! rem-time (quotient (- 4000 sync-time) 1000))
              (debug:print 4 *default-log-port* "SYNC: time= " sync-time ", rem-time=" rem-time)
              

              (if (and (<= rem-time 4)
                       (> rem-time 0))
                  (thread-sleep! rem-time)
                  (thread-sleep! 4))) ;; fallback for if the math is changed ...
            
            (if (< count 1) ;; 3x3 = 9 secs aprox
                (loop (+ count 1) bad-sync-count))

            ;; BB: don't see how this is possible with RPC
            ;; ;; Check that iface and port have not changed (can happen if server port collides)
            ;; (mutex-lock! *heartbeat-mutex*)
            ;; (set! sdat *server-info*)
            ;; (mutex-unlock! *heartbeat-mutex*)
            
            ;; (if (or (not (equal? sdat (list iface port)))
            ;;         (not server-id))
            ;;     (begin 
            ;;       (debug:print-info 0 *default-log-port* "interface changed, refreshing iface and port info")
            ;;       (set! iface (car sdat))
            ;;       (set! port  (cadr sdat))))
            
            ;; Transfer *last-db-access* to last-access to use in checking that we are still alive
            (mutex-lock! *heartbeat-mutex*)
            (set! last-access *last-db-access*)
            (mutex-unlock! *heartbeat-mutex*)
            
            ;; (debug:print 11 *default-log-port* "last-access=" last-access ", server-timeout=" server-timeout)
            ;;
            ;; no_traffic, no running tests, if server 0, no running servers
            ;;
            ;; (let ((wait-on-running (configf:lookup *configdat* "server" b"wait-on-running"))) ;; wait on running tasks (if not true then exit on time out)
            ;;
            (let* ((hrs-since-start  (/ (- (current-seconds) server-start-time) 3600))
                   (adjusted-timeout (if (> hrs-since-start 1)
                                         (- server-timeout (inexact->exact (round (* hrs-since-start 60))))  ;; subtract 60 seconds per hour
                                         server-timeout)))
              (if (common:low-noise-print 120 "server timeout")
                  (debug:print-info 0 *default-log-port* "Adjusted server timeout: " adjusted-timeout))
              (if (and *server-run*
                       (> (+ last-access server-timeout)
                          (current-seconds)))
                  (begin
                    (if (common:low-noise-print 120 "server continuing")
                        (debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access)))
                    ;;
                    ;; Consider implementing some smarts here to re-insert the record or kill self is
                    ;; the db indicates so
                    ;;
                    (if (tasks:server-am-i-the-server? (db:delay-if-busy (tasks:open-db)) run-id)
                        (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "running"))
                    ;;
                    (loop 0 bad-sync-count))
                  (begin
                    ;;(BB> "SERVER SHUTDOWN CALLED!  last-access="last-access" current-seconds="(current-seconds)" server-timeout="server-timeout)
                    
                    (rpc-transport:server-shutdown server-id rpc:listener)))))
          ;; end new loop
          ))))


(define (rpc-transport:find-free-port-and-open port #!key )
  (handle-exceptions
   exn
   (begin
     (print "Failed to bind to port " (rpc:default-server-port) ", trying next port")
     (rpc-transport:find-free-port-and-open (add1 port)))
   (rpc:default-server-port port)
   (set! *rpc-listener-port* port) ;; a bit paranoid about rpc:default-server-port parameter not changing across threads (as params are wont to do).  keeping this global in my back pocket in case this causes problems
   (set! *rpc-listener-port-bind-timestamp* (current-milliseconds)) ;; may want to test how long it has been since the last bind attempt happened...
   (tcp-read-timeout 240000)
   (tcp-buffer-size 0) ;; gotta do this because http-transport undoes it.
   (tcp-listen (rpc:default-server-port) 10000)
   ))
  
(define (rpc-transport:ping run-id host port)
  (handle-exceptions
   exn
   (begin
     (print "SERVER_NOT_FOUND exn="exn)
     (exit 1))
   (let ((login-res ((rpc:procedure 'server:login host port) *toppath*)))

     (if login-res
	 (begin
	   (print "LOGIN_OK")
	   (exit 0))
	 (begin
	   (print "LOGIN_FAILED")
	   (exit 1))))))

(define (rpc-transport:self-test run-id host port)
  (if (not host)
      (abort "host not set."))



  (if (not port)

      (abort "port not set."))
  (tcp-buffer-size 0) ;; gotta do this because http-transport undoes it.
  (let* ((testing-res ((rpc:procedure 'testing host port)))
         (login-res ((rpc:procedure 'server:login host port) *toppath*))
         (res (and login-res (equal? testing-res "Just testing"))))
    


    (if login-res











        (begin

          ;;(BB> "Self test PASS.  login-res="login-res" testing-res="testing-res" *toppath*="*toppath*)
          #t)
        (begin
          (BB> "Self test fail.  login-res="login-res" testing-res="testing-res" *toppath*="*toppath*)
           
          #f))
    res))

(define (rpc-transport:client-setup run-id server-dat #!key (remaining-tries 10))
  ;;(BB> "entered rpc-transport:client-setup with run-id="run-id" and server-dat="server-dat" and retries="remaining-tries)

  (tcp-buffer-size 0)
  (debug:print-info 0 *default-log-port* "rpc-transport:client-setup run-id="run-id" server-dat=" server-dat ", remaining-tries=" remaining-tries)







  (let* ((iface     (tasks:hostinfo-get-interface server-dat))
         (hostname  (tasks:hostinfo-get-hostname  server-dat))
         (port      (tasks:hostinfo-get-port      server-dat))



         (runremote-server-dat (vector iface port #f #f #f (current-seconds) 'rpc)) ;; http version := (vector iface port api-uri api-url api-req (current-seconds) 'http  )
         (ping-res (retry-thunk (lambda ()  ;; make 3 attempts to ping.




                                  ((rpc:procedure 'server:login iface port) *toppath*))
                                chatty: #f
                                retries: 3)))
    ;; we got here from rmt:get-connection-info on the condition that *runremote* has no entry for run-id...
    (if ping-res
        (begin
          (debug:print-info 0 *default-log-port* "rpc-transport:client-setup CONNECTION ESTABLISHED run-id="run-id" server-dat=" server-dat)
          runremote-server-dat)
        (begin ;; login failed but have a server record, clean out the record and try again
          (debug:print-info 0 *default-log-port* "rpc-transport:client-setup UNABLE TO CONNECT run-id="run-id" server-dat=" server-dat)
          (tasks:kill-server-run-id run-id)
          (tasks:server-force-clean-run-record  (db:delay-if-busy (tasks:open-db)) run-id iface port
                                                   " rpc-transport:client-setup (server-dat = #t)")
          (if (> remaining-tries 2)
              (thread-sleep! (+ 1 (random 5))) ;; spread out the starts a little
              (thread-sleep! (+ 15 (random 20)))) ;; it isn't going well. give it plenty of time
          (server:try-running run-id)
          (thread-sleep! 5)   ;; give server a little time to start up
          (client:setup run-id remaining-tries: (sub1 remaining-tries))))))

Modified server.scm from [c0f30a061c] to [3d61545655].

44
45
46
47
48
49
50
51





52

53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
;; Call this to start the actual server
;;

;; all routes though here end in exit ...
;;
;; start_server
;;
(define (server:launch run-id transport-type)





  (BB> "server:launch fired for run-id="run-id" transport-type="transport-type)

  (case transport-type
    ((http)(http-transport:launch run-id))
    ;;((nmsg)(nmsg-transport:launch run-id))
    ((rpc)  (rpc-transport:launch run-id))
    (else (debug:print-error 0 *default-log-port* "unknown server type " transport-type))))
;;       (else   (debug:print-error 0 *default-log-port* "No known transport set, transport=" transport ", using rpc")
;; 	      (rpc-transport:launch run-id)))))

;;======================================================================
;; S E R V E R   U T I L I T I E S 
;;======================================================================

;; Get the transport
(define (server:get-transport)
  (if *transport-type*







|
>
>
>
>
>
|
>
|
|
|
|
|
<
<
|







44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63


64
65
66
67
68
69
70
71
;; Call this to start the actual server
;;

;; all routes though here end in exit ...
;;
;; start_server
;;
(define (server:launch run-id transport-type-raw)
  (let ((transport-type
         (cond
          ((string? transport-type-raw) (string->symbol transport-type-raw))
          (else transport-type-raw))))
         
    ;;(BB> "server:launch fired for run-id="run-id" transport-type="transport-type)

    (case transport-type
      ((http)(http-transport:launch run-id))
      ;;((nmsg)(nmsg-transport:launch run-id))
      ((rpc)  (rpc-transport:launch run-id))
      (else (debug:print-error 0 *default-log-port* "unknown server type " transport-type)))))


  
;;======================================================================
;; S E R V E R   U T I L I T I E S 
;;======================================================================

;; Get the transport
(define (server:get-transport)
  (if *transport-type*
110
111
112
113
114
115
116
117

118
119

120
121
122
123
124
125
126
	 (curr-ip     (server:get-best-guess-address curr-host))
	 (curr-pid    (current-process-id))
	 (homehost    (common:get-homehost)) ;; configf:lookup *configdat* "server" "homehost" ))
	 (target-host (car homehost))
	 (testsuite   (common:get-testsuite-name))
	 (logfile     (conc *toppath* "/logs/server.log"))
	 (cmdln (conc (common:get-megatest-exe)
		      " -server " (or target-host "-") " -run-id " 0 (if (equal? (configf:lookup *configdat* "server" "daemonize") "yes")

									      (conc " -daemonize -log " logfile)
									      "")

		      " -m testsuite:" testsuite)) ;; (conc " >> " logfile " 2>&1 &")))))
	 (log-rotate  (make-thread common:rotate-logs  "server run, rotate logs thread")))
    ;; we want the remote server to start in *toppath* so push there
    (push-directory *toppath*)
    (debug:print 0 *default-log-port* "INFO: Trying to start server (" cmdln ") ...")
    (thread-start! log-rotate)








|
>
|
|
>







114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
	 (curr-ip     (server:get-best-guess-address curr-host))
	 (curr-pid    (current-process-id))
	 (homehost    (common:get-homehost)) ;; configf:lookup *configdat* "server" "homehost" ))
	 (target-host (car homehost))
	 (testsuite   (common:get-testsuite-name))
	 (logfile     (conc *toppath* "/logs/server.log"))
	 (cmdln (conc (common:get-megatest-exe)
		      " -server " (or target-host "-") " -run-id " 0
                      (if (equal? (configf:lookup *configdat* "server" "daemonize") "yes")
                          (conc " -daemonize -log " logfile)
                          "")
                      " -transport " (server:get-transport)
		      " -m testsuite:" testsuite)) ;; (conc " >> " logfile " 2>&1 &")))))
	 (log-rotate  (make-thread common:rotate-logs  "server run, rotate logs thread")))
    ;; we want the remote server to start in *toppath* so push there
    (push-directory *toppath*)
    (debug:print 0 *default-log-port* "INFO: Trying to start server (" cmdln ") ...")
    (thread-start! log-rotate)

185
186
187
188
189
190
191












192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
	      (file-read-access? dotfile))
	 (with-input-from-file
	     dotfile
	   (lambda ()
	     (read-line)))
	 #f))))













;; write a .server file in *toppath* with hostport
;; return #t on success, #f otherwise
;;
(define (server:write-dotserver areapath hostport)
  (let ((lock-file   (conc areapath "/.server.lock"))
	(server-file (conc areapath "/.server")))
    (if (common:simple-file-lock lock-file)
	(let ((res (handle-exceptions
		    exn
		    #f ;; failed for some reason, for the moment simply return #f
		    (with-output-to-file server-file
		      (lambda ()
			(print hostport)))
		    #t)))
	  (debug:print-info 0 *default-log-port* "server file " server-file " for " hostport " created")
	  (common:simple-file-release-lock lock-file)
	  res)
	#f)))

(define (server:remove-dotserver-file areapath hostport)
  (let ((dotserver   (server:read-dotserver areapath))
	(server-file (conc areapath "/.server"))
	(lock-file   (conc areapath "/.server.lock")))
    (if (and dotserver (string-match (conc ".*:" hostport "$") dotserver)) ;; port matches, good enough info to decide to remove the file
	(if (common:simple-file-lock lock-file)
	    (begin
	      (handle-exceptions
	       exn
	       #f
	       (delete-file* server-file))
	      (debug:print-info 0 *default-log-port* "server file " server-file " for " hostport " removed")
	      (common:simple-file-release-lock lock-file))))))

;; no longer care if multiple servers are started by accident. older servers will drop off in time.
;;
(define (server:check-if-running areapath)
  (let* ((dotserver (server:read-dotserver areapath))) ;; tdbdat (tasks:open-db)))
    (if dotserver
	(let* ((res (case *transport-type*
		      ((http)(server:ping-server dotserver))
		      ;; ((nmsg)(nmsg-transport:ping (tasks:hostinfo-get-interface server)
		      )))
	  (if res
	      dotserver
	      #f))
	#f)))








>
>
>
>
>
>
>
>
>
>
>
>



















|



|















|







191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
	      (file-read-access? dotfile))
	 (with-input-from-file
	     dotfile
	   (lambda ()
	     (read-line)))
	 #f))))


(define (server:dotserver-starting)
  (with-output-to-file
      (conc *toppath* "/.starting-server")
    (lambda ()
      (print (current-process-id) " on " (get-host-name)))))

(define (server:dotserver-starting-remove)
  (delete-file* (conc *toppath* "/.starting-server")))
  

  
;; write a .server file in *toppath* with hostport
;; return #t on success, #f otherwise
;;
(define (server:write-dotserver areapath hostport)
  (let ((lock-file   (conc areapath "/.server.lock"))
	(server-file (conc areapath "/.server")))
    (if (common:simple-file-lock lock-file)
	(let ((res (handle-exceptions
		    exn
		    #f ;; failed for some reason, for the moment simply return #f
		    (with-output-to-file server-file
		      (lambda ()
			(print hostport)))
		    #t)))
	  (debug:print-info 0 *default-log-port* "server file " server-file " for " hostport " created")
	  (common:simple-file-release-lock lock-file)
	  res)
	#f)))

(define (server:remove-dotserver-file areapath hostport #!key (force #f))
  (let ((dotserver   (server:read-dotserver areapath))
	(server-file (conc areapath "/.server"))
	(lock-file   (conc areapath "/.server.lock")))
    (if (or force (and dotserver (string-match (conc ".*:" hostport "$") dotserver))) ;; port matches, good enough info to decide to remove the file
	(if (common:simple-file-lock lock-file)
	    (begin
	      (handle-exceptions
	       exn
	       #f
	       (delete-file* server-file))
	      (debug:print-info 0 *default-log-port* "server file " server-file " for " hostport " removed")
	      (common:simple-file-release-lock lock-file))))))

;; no longer care if multiple servers are started by accident. older servers will drop off in time.
;;
(define (server:check-if-running areapath)
  (let* ((dotserver (server:read-dotserver areapath))) ;; tdbdat (tasks:open-db)))
    (if dotserver
	(let* ((res (case *transport-type*
		      ((http rpc)(server:ping-server dotserver))
		      ;; ((nmsg)(nmsg-transport:ping (tasks:hostinfo-get-interface server)
		      )))
	  (if res
	      dotserver
	      #f))
	#f)))

263
264
265
266
267
268
269


270




271
272
273
274
275
276
277
	  (begin
	    (if host-port-in
		(debug:print 0 *default-log-port*  "ERROR: bad host:port"))
	    (if do-exit (exit 1))
	    #f)
	  (let* ((iface      (car host-port))
		 (port       (cadr host-port))


		 (server-dat (http-transport:client-connect iface port))




		 (login-res  (rmt:login-no-auto-client-setup server-dat)))
	    (if (and (list? login-res)
		     (car login-res))
		(begin
		  (print "LOGIN_OK")
		  (if do-exit (exit 0)))
		(begin







>
>
|
>
>
>
>







281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
	  (begin
	    (if host-port-in
		(debug:print 0 *default-log-port*  "ERROR: bad host:port"))
	    (if do-exit (exit 1))
	    #f)
	  (let* ((iface      (car host-port))
		 (port       (cadr host-port))
		 (server-dat
                  (case (remote-transport *runremote*)
                    ((http) (http-transport:client-connect iface port))
                    ((rpc) (rpc-transport:client-connect iface port))
                    (else
                     (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (4)")
                     (exit))))
		 (login-res  (rmt:login-no-auto-client-setup server-dat)))
	    (if (and (list? login-res)
		     (car login-res))
		(begin
		  (print "LOGIN_OK")
		  (if do-exit (exit 0)))
		(begin

Modified tasks.scm from [b8a3c2af2e] to [ae98efc45e].

168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
(define (tasks:hostinfo-get-interface   vec)    (vector-ref  vec 1))
(define (tasks:hostinfo-get-port        vec)    (vector-ref  vec 2))
(define (tasks:hostinfo-get-pubport     vec)    (vector-ref  vec 3))
(define (tasks:hostinfo-get-transport   vec)    (vector-ref  vec 4))
(define (tasks:hostinfo-get-pid         vec)    (vector-ref  vec 5))
(define (tasks:hostinfo-get-hostname    vec)    (vector-ref  vec 6))

(define (tasks:server-lock-slot mdb run-id)
  (tasks:server-clean-out-old-records-for-run-id mdb run-id " tasks:server-lock-slot")
  (if (< (tasks:num-in-available-state mdb run-id) 4)
      (begin 
	(tasks:server-set-available mdb run-id)
	(thread-sleep! (/ (random 1500) 1000)) ;; (thread-sleep! 2) ;; Try removing this. It may not be needed.
	(tasks:server-am-i-the-server? mdb run-id))
      #f))
	
;; register that this server may come online (first to register goes though with the process)
(define (tasks:server-set-available mdb run-id)
  (sqlite3:execute 
   mdb 
   "INSERT INTO servers (pid,hostname,port,pubport,start_time,      priority,state,mt_version,heartbeat,   interface,transport,run_id)
                   VALUES(?, ?,       ?,   ?, strftime('%s','now'), ?,       ?,    ?,-1,?,        ?,        ?);"
   (current-process-id)          ;; pid
   (get-host-name)               ;; hostname
   -1                            ;; port
   -1                            ;; pubport
   (random 1000)                 ;; priority (used a tiebreaker on get-available)
   "available"                   ;; state
   (common:version-signature)    ;; mt_version
   -1                            ;; interface
   ;; (conc (server:get-transport)) ;; transport
   (conc *transport-type*)    ;; transport
   run-id
   ))

(define (tasks:num-in-available-state mdb run-id)
  (let ((res 0))
    (sqlite3:for-each-row
     (lambda (num-in-queue)







|



|





|













|







168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
(define (tasks:hostinfo-get-interface   vec)    (vector-ref  vec 1))
(define (tasks:hostinfo-get-port        vec)    (vector-ref  vec 2))
(define (tasks:hostinfo-get-pubport     vec)    (vector-ref  vec 3))
(define (tasks:hostinfo-get-transport   vec)    (vector-ref  vec 4))
(define (tasks:hostinfo-get-pid         vec)    (vector-ref  vec 5))
(define (tasks:hostinfo-get-hostname    vec)    (vector-ref  vec 6))

(define (tasks:server-lock-slot mdb run-id transport-type)
  (tasks:server-clean-out-old-records-for-run-id mdb run-id " tasks:server-lock-slot")
  (if (< (tasks:num-in-available-state mdb run-id) 4)
      (begin 
	(tasks:server-set-available mdb run-id transport-type)
	(thread-sleep! (/ (random 1500) 1000)) ;; (thread-sleep! 2) ;; Try removing this. It may not be needed.
	(tasks:server-am-i-the-server? mdb run-id))
      #f))
	
;; register that this server may come online (first to register goes though with the process)
(define (tasks:server-set-available mdb run-id transport-type)
  (sqlite3:execute 
   mdb 
   "INSERT INTO servers (pid,hostname,port,pubport,start_time,      priority,state,mt_version,heartbeat,   interface,transport,run_id)
                   VALUES(?, ?,       ?,   ?, strftime('%s','now'), ?,       ?,    ?,-1,?,        ?,        ?);"
   (current-process-id)          ;; pid
   (get-host-name)               ;; hostname
   -1                            ;; port
   -1                            ;; pubport
   (random 1000)                 ;; priority (used a tiebreaker on get-available)
   "available"                   ;; state
   (common:version-signature)    ;; mt_version
   -1                            ;; interface
   ;; (conc (server:get-transport)) ;; transport
   (symbol->string transport-type)    ;; transport
   run-id
   ))

(define (tasks:num-in-available-state mdb run-id)
  (let ((res 0))
    (sqlite3:for-each-row
     (lambda (num-in-queue)

Modified tests.scm from [5514a2a23d] to [c9883a48b0].

1330
1331
1332
1333
1334
1335
1336

1337
1338
1339
1340
1341
1342
1343
	   (set! res count))
	 tdb
	 "SELECT count(id) FROM test_rundat;")
	res))
  0)

(define (tests:update-central-meta-info run-id test-id cpuload diskfree minutes uname hostname)

  (if (and cpuload diskfree)
      (rmt:general-call 'update-cpuload-diskfree run-id cpuload diskfree test-id))
  (if minutes 
      (rmt:general-call 'update-run-duration run-id minutes test-id))
  (if (and uname hostname)
      (rmt:general-call 'update-uname-host run-id uname hostname test-id)))
  







>







1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
	   (set! res count))
	 tdb
	 "SELECT count(id) FROM test_rundat;")
	res))
  0)

(define (tests:update-central-meta-info run-id test-id cpuload diskfree minutes uname hostname)
  (rmt:general-call 'update-test-rundat run-id test-id (or cpuload -1)(or diskfree -1)(or minutes -1))
  (if (and cpuload diskfree)
      (rmt:general-call 'update-cpuload-diskfree run-id cpuload diskfree test-id))
  (if minutes 
      (rmt:general-call 'update-run-duration run-id minutes test-id))
  (if (and uname hostname)
      (rmt:general-call 'update-uname-host run-id uname hostname test-id)))
  

Modified tests/fullrun/megatest.config from [353b25ebc0] to [eec8c46fb2].

155
156
157
158
159
160
161

162
163
164
165
166
167
168
169

[server]

# force use of server always
# required yes

# Use http instead of direct filesystem access

transport http
# transport fs
# transport nmsg

synchronous 0

# If the server can't be started on this port it will try the next port until
# it succeeds







>
|







155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170

[server]

# force use of server always
# required yes

# Use http instead of direct filesystem access
transport rpc
# transport http
# transport fs
# transport nmsg

synchronous 0

# If the server can't be started on this port it will try the next port until
# it succeeds

Added thunk-utils.scm version [e6dc11200a].



















































































































































































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
(use srfi-18)


;; wrap a proc with a mutex so that two threads may not call proc simultaneously.
;; will catch exceptions to ensure mutex is unlocked even if exception is thrown.
;; will generate a unique mutex for proc unless one is specified with canned-mutex: option
;;
;; example 1: (define thread-safe-+ (make-synchronized-proc +))
;; example 2: (define thread-safe-plus
;;               (make-synchronized-proc
;;                  (lambda (x y)
;;                      (+ x y))))

(define (make-synchronized-proc proc
                                #!key (canned-mutex #f))
  (let* ((guard-mutex (if canned-mutex canned-mutex (make-mutex)))
         (guarded-proc ;; we are guarding the thunk against exceptions.  We will record whether result of evaluation is an exception or a regular result.
          (lambda args
            (mutex-lock! guard-mutex)
            (let* ((EXCEPTION (gensym)) ;; using gensym to avoid potential collision with a proc that returns a pair having the first element be our flag.  gensym guarantees the symbol is unique.
                   (res
                    (condition-case
                     (apply proc args) ;; this is what we are guarding the execution of
                     [x () (cons EXCEPTION x)]
                     )))
              (mutex-unlock! guard-mutex)
              (cond
               ((and (pair? res) (eq? (car res) EXCEPTION))
                (raise (cdr res)))
               (else
                res))))))
    guarded-proc))


;; retry an operation (depends on srfi-18)
;; ==================
;; idea here is to avoid spending time on coding retrying something.  Trying to be generic here.
;;
;; Exception handling:
;; -------------------
;; if evaluating the thunk results in exception, it will be retried.
;; on last try, if final-failure-returns-actual is true, the exception will be re-thrown to caller.
;;
;; look at options below #!key to see how to configure behavior
;;
;;

(define (retry-thunk
         the-thunk
         #!key ;;;; options below
         (accept-result?   (lambda (x) x)) ;; retry if predicate applied to thunk's result is false 
         (retries                       4) ;; how many tries
         (failure-value                #f) ;; return this on final failure, unless following option is enabled:
         (final-failure-returns-actual #f) ;; on failure, on the last try, just return the result, not failure-value

         (retry-delay                 0.1) ;; delay between tries
         (back-off-factor               1) ;; multiply retry-delay by this factor on retry
         (random-delay                0.1) ;; add a random portion of this value to wait

         (chatty                       #f) ;; print status as we go, for debugging.
         )
  
  (when chatty (print) (print "Entered retry-thunk") (print "-=-=-=-=-=-"))
  (let* ((guarded-thunk ;; we are guarding the thunk against exceptions.  We will record whether result of evaluation is an exception or a regular result.
          (lambda ()
           (let* ((EXCEPTION (gensym)) ;; using gensym to avoid potential collision
                  (res
                   (condition-case
                    (the-thunk) ;; this is what we are guarding the execution of
                    [x () (cons EXCEPTION x)]
                    )))
             (cond
              ((and (pair? res) (eq? (car res) EXCEPTION))
               (if chatty
                   (print " - the-thunk threw exception >"(cdr res)"<"))
               (cons 'exception (cdr res)))
               (else
                (if chatty
                    (print " - the-thunk returned result >"res"<"))
                (cons 'regular-result res)))))))
    
    (let loop ((guarded-res (guarded-thunk))
               (retries-left retries)
               (fail-wait retry-delay))
      (if chatty (print "   =========="))
      (let* ((wait-time (+ fail-wait (+ (* fail-wait back-off-factor)
                                        (* random-delay
                                           (/ (random 1024) 1024) ))))
             (res-type (car guarded-res))
             (res-value (cdr guarded-res)))
        (cond
         ((and (eq? res-type 'regular-result) (accept-result? res-value))
                   (if chatty (print " + return result that satisfied accept-result? >"res-value"<"))
                   res-value)

         ((> retries-left 0)
          (if chatty (print " - sleep "wait-time))
          (thread-sleep! wait-time)
          (if chatty (print " + retry ["retries-left" tries left]"))
          (loop (guarded-thunk)
                (sub1 retries-left)
                wait-time))
         
         ((eq? res-type 'regular-result)
          (if final-failure-returns-actual
              (begin
                (if chatty (print " + last try failed- return the result >"res-value"<"))
                res-value)
              (begin
                (if chatty (print " + last try failed- return canned failure value >"failure-value"<"))
              failure-value)))
         
         (else ;; no retries left; result was not accepted and res-type can only be 'exception
          (if final-failure-returns-actual 
              (begin
                (if chatty (print " + last try failed with exception- re-throw it >"res-value"<"))
                (abort res-value)); re-raise the exception. TODO: find a way for call-history to show as though from entry to this function
              (begin
                (if chatty (print " + last try failed with exception- return canned failure value >"failure-value"<"))
                failure-value))))))))