Megatest

Changes On Branch b85732a36a126233
Login

Changes In Branch interleaved-queries Through [b85732a36a] Excluding Merge-Ins

This is equivalent to a diff from 8c476e8627 to b85732a36a

2012-11-19
01:55
server, list-runs and repl now working check-in: 0cb9ad87a9 user: matt tags: interleaved-queries
2012-11-18
23:30
Initial coding for interleaved queries done and compiles check-in: b85732a36a user: matt tags: interleaved-queries
2012-11-17
17:07
Increased rigour of mocktest, added timeout and retry on dbaccess and added count of all accesses check-in: 79ae44c0b4 user: matt tags: interleaved-queries
2012-11-12
19:50
Cherrypicked the fix to building for deploy check-in: e4ac93792c user: matt tags: trunk
00:04
Interleaving query enablement, server side coded check-in: 4980ca9f98 user: matt tags: interleaved-queries
2012-11-06
09:44
Testing ordering of loading zmq, fixes to deploy script check-in: 8c476e8627 user: mrwellan tags: trunk
06:21
Added deploy script check-in: af55ffc7d7 user: matt tags: trunk

Modified common.scm from [48dba0a8c5] to [cc3866e918].

45
46
47
48
49
50
51

52

53

54
55
56
57
58
59
60
45
46
47
48
49
50
51
52

53
54
55
56
57
58
59
60
61
62







+
-
+

+







(define *rpc:listener*      #f) ;; if set up for server communication this will hold the tcp port
(define *runremote*         #f) ;; if set up for server communication this will hold <host port>
(define *last-db-access*    (current-seconds))  ;; update when db is accessed via server
(define *max-cache-size*    0)
(define *logged-in-clients* (make-hash-table))
(define *client-non-blocking-mode* #f)
(define *server-id*         #f)
(define *server-info*       #f)
(define *time-to-exit* #f)
(define *time-to-exit*      #f)
(define *received-response* #f)
(define *default-numtries*  2)

(define *target*            (make-hash-table)) ;; cache the target here; target is keyval1/keyval2/.../keyvalN
(define *keys*              (make-hash-table)) ;; cache the keys here
(define *keyvals*           (make-hash-table))
(define *toptest-paths*     (make-hash-table)) ;; cache toptest path settings here
(define *test-paths*        (make-hash-table)) ;; cache test-id to test run paths here
(define *test-ids*          (make-hash-table)) ;; cache run-id, testname, and item-path => test-id

Modified db.scm from [108dfe0472] to [c306724289].

9
10
11
12
13
14
15
16
17


18
19

20
21
22
23
24
25
26
9
10
11
12
13
14
15


16
17
18

19
20
21
22
23
24
25
26







-
-
+
+

-
+







;;  PURPOSE.
;;======================================================================

;;======================================================================
;; Database access
;;======================================================================

(require-extension (srfi 18) extras tcp rpc)
(import (prefix rpc rpc:))
(require-extension (srfi 18) extras tcp) ;;  rpc)
;; (import (prefix rpc rpc:))

(use sqlite3 srfi-1 posix regex regex-case srfi-69 csv-xml s11n)
(use sqlite3 srfi-1 posix regex regex-case srfi-69 csv-xml s11n md5 message-digest)
(import (prefix sqlite3 sqlite3:))

(use zmq)

(declare (unit db))
(declare (uses common))
(declare (uses keys))
779
780
781
782
783
784
785
786

787
788
789
790
791
792
793
779
780
781
782
783
784
785

786
787
788
789
790
791
792
793







-
+







			       (if currstatus (conc "status='" currstatus "' AND ") "")
			       " run_id=? AND testname=? AND NOT (item_path='' AND testname in (SELECT DISTINCT testname FROM tests WHERE testname=? AND item_path != ''));")))
		;;(debug:print 0 "QRY: " qry)
		(sqlite3:execute db qry run-id newstate newstatus testname testname)))
	    testnames))

(define (cdb:delete-tests-in-state zmqsocket run-id state)
  (cdb:client-call zmqsocket 'delete-tests-in-state #t run-id state))
  (cdb:client-call zmqsocket 'delete-tests-in-state #t *default-numtries* run-id state))

;; speed up for common cases with a little logic
(define (db:test-set-state-status-by-id db test-id newstate newstatus newcomment)
  (cond
   ((and newstate newstatus newcomment)
    (sqlite3:exectute db "UPDATE tests SET state=?,status=?,comment=? WHERE id=?;" newstate newstatus test-id))
   ((and newstate newstatus)
958
959
960
961
962
963
964
965

966
967
968

969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985

986
987
988
989
990
991
992
958
959
960
961
962
963
964

965
966
967

968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984

985
986
987
988
989
990
991
992







-
+


-
+
















-
+







(define (db:test-set-comment db test-id comment)
  (sqlite3:execute 
   db 
   "UPDATE tests SET comment=? WHERE id=?;"
   comment test-id))

(define (cdb:test-set-rundir! zmqsocket run-id test-name item-path rundir)
  (cdb:client-call zmqsocket 'test-set-rundir #t rundir run-id test-name item-path))
  (cdb:client-call zmqsocket 'test-set-rundir #t *default-numtries* rundir run-id test-name item-path))

(define (cdb:test-set-rundir-by-test-id zmqsocket test-id rundir)
  (cdb:client-call zmqsocket 'test-set-rundir-by-test-id #t rundir test-id))
  (cdb:client-call zmqsocket 'test-set-rundir-by-test-id #t *default-numtries* rundir test-id))

(define (db:test-get-rundir-from-test-id db test-id)
  (let ((res #f)) ;; (hash-table-ref/default *test-paths* test-id #f)))
    ;; (if res
    ;;     res
    ;;     (begin
    (sqlite3:for-each-row
     (lambda (tpath)
       (set! res tpath))
     db 
     "SELECT rundir FROM tests WHERE id=?;"
     test-id)
    ;; (hash-table-set! *test-paths* test-id res)
    res)) ;; ))

(define (cdb:test-set-log! zmqsocket test-id logf)
  (if (string? logf)(cdb:client-call zmqsocket 'test-set-log #f logf test-id)))
  (if (string? logf)(cdb:client-call zmqsocket 'test-set-log #f *default-numtries* logf test-id)))

;;======================================================================
;; Misc. test related queries
;;======================================================================

(define (db:test-get-paths-matching db keynames target fnamepatt #!key (res '()))
  (let* ((testpatt   (if (args:get-arg "-testpatt")(args:get-arg "-testpatt") "%"))
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185



1186
1187
1188
1189








1190
1191
1192


1193
1194
1195
1196
1197
1198
1199
























1200
1201
1202

1203
1204
1205


1206
1207
1208

1209
1210
1211

1212
1213
1214
1215
1216


1217
1218
1219

1220
1221
1222

1223
1224
1225
1226
1227
1228

1229
1230
1231

1232
1233
1234

1235
1236
1237

1238
1239
1240

1241
1242
1243

1244
1245
1246
1247

1248
1249
1250
1251
1252
1253
1254
1093
1094
1095
1096
1097
1098
1099












































































1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112




1113
1114
1115
1116
1117
1118
1119
1120



1121
1122







1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148

1149
1150


1151
1152
1153
1154

1155
1156
1157

1158
1159
1160
1161


1162
1163
1164
1165

1166
1167
1168

1169
1170
1171
1172
1173
1174

1175
1176
1177

1178
1179
1180

1181
1182
1183

1184
1185
1186

1187
1188
1189

1190
1191
1192
1193

1194
1195
1196
1197
1198
1199
1200
1201







-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-










+
+
+
-
-
-
-
+
+
+
+
+
+
+
+
-
-
-
+
+
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+


-
+

-
-
+
+


-
+


-
+



-
-
+
+


-
+


-
+





-
+


-
+


-
+


-
+


-
+


-
+



-
+







;; (define (db:updater)
;;   (debug:print-info 4 "Starting cache processing")
;;   (let loop ()
;;     (thread-sleep! 10) ;; move save time around to minimize regular collisions?
;;     (db:write-cached-data)
;;     (loop)))

;; cdb:cached-access is called by the server loop to dispatch commands or queue up
;; db accesses
;;
;; params := qry-name cached? val1 val2 val3 ...
(define (cdb:cached-access params)
  (debug:print-info 12 "cdb:cached-access params=" params)
  (if (< (length params) 2)
      "ERROR"
      (let ((qry-name (car params))
	    (cached?  (cadr params))
	    (remparam (list-tail params 2))) 
	(debug:print-info 12 "cdb:cached-access qry-name=" qry-name " params=" params)
	(if (not cached?)(db:write-cached-data))
	;; Any special calls are dispatched here. 
	;; Remainder are put in the db queue
	(case qry-name
	  ((login) ;; login checks that the megatest path and version matches
	   (if (< (length remparam) 3) ;; should get toppath, version and signature
	       '(#f "login failed due to missing params") ;; missing params
	       (let ((calling-path (car   remparam))
		     (calling-vers (cadr  remparam))
		     (client-key   (caddr remparam)))
		 (if (and (equal? calling-path *toppath*)
			  (equal? megatest-version calling-vers))
		     (begin
		       (hash-table-set! *logged-in-clients* client-key (current-seconds))
		       '(#t "successful login"))      ;; path matches - pass! Should vet the caller at this time ...
		     (list #f (conc "Login failed due to mismatch paths: " calling-path ", " *toppath*))))))
	  ((logout)
	   (if (and (> (length remparam) 1)
		    (eq? *toppath* (car remparam))
		    (hash-table-ref/default *logged-in-clients* (cadr remparam) #f))
	       #t
	       #f))
	  ((numclients)
	   (length (hash-table-keys *logged-in-clients*)))
	  ((flush)
	   (db:write-cached-data)
	   #t)
	  ((immediate)
	   (db:write-cached-data)
	   (if (not (null? remparam))
	       (apply (car remparam) (cdr remparam))
	       "ERROR"))
	  ((killserver)
	   ;; (db:write-cached-data)
	   (debug:print-info 0 "Remotely killed server on host " (get-host-name) " pid " (current-process-id))
	   (set! *time-to-exit* #t)
	   #t)
	  ((set-verbosity)
	   (set! *verbosity* (caddr params))
	   *verbosity*)
	  ((get-verbosity)
	   *verbosity*)
	  ((ping)
	   'hi)
	  (else
	   (mutex-lock! *incoming-mutex*)
	   (set! *last-db-access* (current-seconds))
	   (set! *incoming-data* (cons 
				  (vector qry-name
					  (current-milliseconds)
					  remparam)
				  *incoming-data*))
	   (mutex-unlock! *incoming-mutex*)
	   ;; NOTE: if cached? is #f then this call must be run immediately
	   ;;       but first all calls in the queue are run first in the order
	   ;;       of their time stamp
	   (if (and cached? *cache-on*)
	       (begin
		 (debug:print-info 12 "*cache-on* is " *cache-on* ", skipping cache write")
		 "CACHED")
	       (begin
		 (db:write-cached-data)
		 "WRITTEN")))))))

(define (db:obj->string obj)(with-output-to-string (lambda ()(serialize obj))))
(define (db:string->obj msg)(with-input-from-string msg (lambda ()(deserialize))))

(define (cdb:use-non-blocking-mode proc)
  (set! *client-non-blocking-mode* #t)
  (let ((res (proc)))
    (set! *client-non-blocking-mode* #f)
    res))
  
;; params = 'target cached remparams
;;
;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime
;;
(define (cdb:client-call zmq-socket . params)
  (debug:print-info 11 "cdb:client-call zmq-socket=" zmq-socket " params=" params)
  (let ((zdat (db:obj->string params)) ;; (with-output-to-string (lambda ()(serialize params))))
	(res  #f))
(define (cdb:client-call zmq-sockets qtype immediate numretries . params)
  (debug:print-info 11 "cdb:client-call zmq-sockets=" zmq-sockets " params=" params)
  (let* ((push-socket (vector-ref zmq-sockets 0))
	 (sub-socket  (vector-ref zmq-sockets 1))
	 (client-sig   (server:get-client-signature))
	 (query-sig    (message-digest-string (md5-primitive) (conc qtype immediate params)))
	 (zdat (db:obj->string (vector client-sig qtype immediate query-sig params (current-seconds)))) ;; (with-output-to-string (lambda ()(serialize params))))
	 (res  #f)
    ;; (signal-mask! signal/int)
    (set! *received-response* #f)
    (send-message zmq-socket zdat)
	 (send-receive (lambda ()
			 (send-message push-socket zdat)
    ;; (signal-unmask! signal/int)
    (set! res (db:string->obj (if *client-non-blocking-mode* 
				  (receive-message* zmq-socket)
				  (receive-message  zmq-socket))))
    (set! *received-response* #t)
    (debug:print-info 11 "zmq-socket " (car params) " res=" res)
    res))
			 (db:string->obj
			  (let ((rmsg (if *client-non-blocking-mode* receive-message* receive-message)))
			    ;; get the sender info
			    ;; this should match (server:get-client-signature)
			    ;; we will need to process "all" messages here some day
			    (rmsg sub-socket)
			    ;; now get the actual message
			    (rmsg  sub-socket)))))
	 (timeout (lambda ()
		    (thread-sleep! 5)
		    (if (not res)
			(if (> numretries 0)
			    (begin
			      (debug:print 0 "WARNING: no reply to query " params ", trying again")
			      (apply cdb:client-call zmq-sockets qtype immediate (- numretries 1) params))
			    (begin
			      (debug:print 0 "ERROR: cdb:client-call timed out " params ", exiting.")
			      (exit 5)))))))
    (let ((th1 (make-thread send-receive "send receive"))
	  (th2 (make-thread timeout      "timeout")))
      (thread-start! th1)
      (thread-start! th2)
      (thread-join! th1)
      res)))
  
(define (cdb:set-verbosity zmq-socket val)
  (cdb:client-call zmq-socket 'set-verbosity #f val))
  (cdb:client-call zmq-socket 'set-verbosity #f *default-numtries* val))

(define (cdb:login zmq-socket keyval signature)
  (cdb:client-call zmq-socket 'login #t keyval megatest-version signature))
(define (cdb:login zmq-sockets keyval signature)
  (cdb:client-call zmq-sockets 'login #t *default-numtries* keyval megatest-version signature))

(define (cdb:logout zmq-socket keyval signature)
  (cdb:client-call zmq-socket 'logout #t keyval signature))
  (cdb:client-call zmq-socket 'logout #t *default-numtries* keyval signature))

(define (cdb:num-clients zmq-socket)
  (cdb:client-call zmq-socket 'numclients #t))
  (cdb:client-call zmq-socket 'numclients #t *default-numtries*))

(define (cdb:test-set-status-state zmqsocket test-id status state msg)
  (if msg
      (cdb:client-call zmqsocket 'state-status-msg #t state status msg test-id)
      (cdb:client-call zmqsocket 'state-status #t state status test-id))) ;; run-id test-name item-path minutes cpuload diskfree tmpfree) 
      (cdb:client-call zmqsocket 'state-status-msg #t *default-numtries* state status msg test-id)
      (cdb:client-call zmqsocket 'state-status #t *default-numtries* state status test-id))) ;; run-id test-name item-path minutes cpuload diskfree tmpfree) 

(define (cdb:test-rollup-test_data-pass-fail zmqsocket test-id)
  (cdb:client-call zmqsocket 'test_data-pf-rollup #t test-id test-id test-id test-id))
  (cdb:client-call zmqsocket 'test_data-pf-rollup #t *default-numtries* test-id test-id test-id test-id))

(define (cdb:pass-fail-counts zmqsocket test-id fail-count pass-count)
  (cdb:client-call zmqsocket 'pass-fail-counts #t fail-count pass-count test-id))
  (cdb:client-call zmqsocket 'pass-fail-counts #t *default-numtries* fail-count pass-count test-id))

(define (cdb:tests-register-test zmqsocket run-id test-name item-path)
  (let ((item-paths (if (equal? item-path "")
			(list item-path)
			(list item-path ""))))
    (cdb:client-call zmqsocket 'register-test #t run-id test-name item-path)))
    (cdb:client-call zmqsocket 'register-test #t *default-numtries* run-id test-name item-path)))

(define (cdb:flush-queue zmqsocket)
  (cdb:client-call zmqsocket 'flush #f))
  (cdb:client-call zmqsocket 'flush #f *default-numtries*))

(define (cdb:kill-server zmqsocket)
  (cdb:client-call zmqsocket 'killserver #f))
  (cdb:client-call zmqsocket 'killserver #f *default-numtries*))

(define (cdb:roll-up-pass-fail-counts zmqsocket run-id test-name item-path status)
  (cdb:client-call zmqsocket 'immediate #f open-run-close db:roll-up-pass-fail-counts #f run-id test-name item-path status))
  (cdb:client-call zmqsocket 'immediate #f *default-numtries* open-run-close db:roll-up-pass-fail-counts #f run-id test-name item-path status))

(define (cdb:get-test-info zmqsocket run-id test-name item-path)
  (cdb:client-call zmqsocket 'immediate #f open-run-close db:get-test-info #f run-id test-name item-path))
  (cdb:client-call zmqsocket 'immediate #f *default-numtries* open-run-close db:get-test-info #f run-id test-name item-path))

(define (cdb:get-test-info-by-id zmqsocket test-id)
  (cdb:client-call zmqsocket 'immediate #f open-run-close db:get-test-info-by-id #f test-id))
  (cdb:client-call zmqsocket 'immediate #f *default-numtries* open-run-close db:get-test-info-by-id #f test-id))

;; db should be db open proc or #f
(define (cdb:remote-run proc db . params)
  (apply cdb:client-call *runremote* 'immediate #f open-run-close proc #f params))
  (apply cdb:client-call *runremote* 'immediate #f *default-numtries* open-run-close proc #f params))

(define (db:test-get-logfile-info db run-id test-name)
  (let ((res #f))
    (sqlite3:for-each-row 
     (lambda (path final_logf)
       (set! logf final_logf)
       (set! res (list path final_logf))
1288
1289
1290
1291
1292
1293
1294
1295

1296
1297
1298
1299
1300


1301
1302
1303
1304
1305
1306
1307
1308
1309

1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328




1329

1330


1331
1332
1333
1334
1335

1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350



1351
1352
1353
1354
1355
1356
1357
1358

1359
1360
1361
1362
1363
1364
1365
1235
1236
1237
1238
1239
1240
1241

1242
1243
1244
1245


1246
1247



1248
1249
1250
1251
1252

1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269



1270
1271
1272
1273
1274
1275

1276
1277
1278
1279
1280
1281

1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295


1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314







-
+



-
-
+
+
-
-
-





-
+
















-
-
-
+
+
+
+

+
-
+
+




-
+













-
-
+
+
+








+







;; not used, intended to indicate to run in calling process
(define db:run-local-queries '()) ;; rollup-tests-pass-fail))

;; The queue is a list of vectors where the zeroth slot indicates the type of query to
;; apply and the second slot is the time of the query and the third entry is a list of 
;; values to be applied
;;
(define (db:write-cached-data)
(define (db:process-queue pubsock indata)
  (open-run-close
   (lambda (db . junkparams)
     (let ((queries    (make-hash-table))
	   (data       #f))
       (mutex-lock! *incoming-mutex*)
	   (data       (sort indata (lambda (a b)
				      (< (cdb:packet-get-qtime a)(cdb:packet-get-qtime b))))))
       (set! data (sort *incoming-data* (lambda (a b)(< (vector-ref a 1)(vector-ref b 1)))))
       (set! *incoming-data* '())
       (mutex-unlock! *incoming-mutex*)
       (if (> (length data) 0)
	   (debug:print-info 4 "Writing cached data " data))

       ;; prepare the needed statements, do each only once
       (for-each (lambda (request-item)
		   (let ((stmt-key (vector-ref request-item 0)))
		   (let ((stmt-key (cdb:get-qtype request-item)))
		     (if (not (hash-table-ref/default queries stmt-key #f))
			 (let ((stmt (alist-ref stmt-key db:queries)))
			   (if stmt
			       (hash-table-set! queries stmt-key (sqlite3:prepare db (car stmt)))
			       (if (procedure? stmt-key)
				   (hash-table-set! queries stmt-key #f)
				   (debug:print 0 "ERROR: Missing query spec for " stmt-key "!")))))))
		 data)
       
       ;; outer loop to handle special queries that cannot be handled in the
       ;; transaction.
       (let outerloop ((special-qry #f)
		       (stmts       data))
	 (if special-qry

	     ;; handle a query that cannot be part of the grouped queries
	     (let* ((stmt-key (vector-ref special-qry 0))
		    (qry      (hash-table-ref queries stmt-key))
		    (params   (vector-ref special-qry 2)))
	     (let* ((stmt-key       (cdb:get-qtype special-qry))
		    (return-address (cdb:get-client-sig special-qry))
		    (qry            (hash-table-ref queries stmt-key))
		    (params         (cdb:get-params special-qry)))
	       (if (string? qry)
		   (begin
		   (apply sqlite3:execute db qry params)
		     (apply sqlite3:execute db qry params)
		     (server:reply return-address #t))
		   (if (procedure? stmt-key)
		       (begin
			 ;; we are being handed a procedure so call it
			 (debug:print-info 11 "Running (apply " stmt-key " " db " " params ")")
			 (apply stmt-key db params))
			 (server:reply return-address (apply stmt-key db params)))
		       (debug:print 0 "ERROR: Unrecognised queued call " qry " " params)))
	       (if (not (null? stmts))
		   (outerloop #f stmts)))

	     ;; handle normal queries
	     (let ((rem (sqlite3:with-transaction 
			 db
			 (lambda ()
			   (debug:print-info 11 "flushing " stmts " to db")
			   (if (null? stmts)
			       stmts
			       (let innerloop ((hed (car stmts))
					       (tal (cdr stmts)))
				 (let ((params   (vector-ref hed 2))
				       (stmt-key (vector-ref hed 0)))
				 (let ((params         (cdb:packet-get-params hed))
				       (return-address (cdb:packet-get-client-sig hed))
				       (stmt-key       (cdb:packet-get-qtype hed)))
				   (if (or (procedure? stmt-key)
					   (member stmt-key db:special-queries))
				       (begin
					 (debug:print-info 11 "Handling special statement " stmt-key)
					 (cons hed tal))
				       (begin
					 (debug:print-info 11 "Executing " stmt-key " for " params)
					 (apply sqlite3:execute (hash-table-ref queries stmt-key) params)
					 (server:reply return-address #t)
					 (if (not (null? tal))
					     (innerloop (car tal)(cdr tal))
					     '()))
				       ))))))))
	       (if (not (null? rem))
		   (outerloop (car rem)(cdr rem))))))
       (for-each (lambda (stmt-key)

Modified db_records.scm from [685480077c] to [b53402355f].

116
117
118
119
120
121
122














116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136







+
+
+
+
+
+
+
+
+
+
+
+
+
+
(define-inline (db:step-stable-set-end!       vec val)(vector-set! vec 2 val))
(define-inline (db:step-stable-set-status!    vec val)(vector-set! vec 3 val))
(define-inline (db:step-stable-set-runtime!   vec val)(vector-set! vec 4 val))

;; use this one for db-get-run-info
(define-inline (db:get-row    vec)(vector-ref vec 1))

;; The data structure for handing off requests via wire
(define (make-cdb:packet)(make-vector 6))
(define-inline (cdb:packet-get-client-sig   vec)    (vector-ref  vec 0))
(define-inline (cdb:packet-get-qtype        vec)    (vector-ref  vec 1))
(define-inline (cdb:packet-get-immediate    vec)    (vector-ref  vec 2))
(define-inline (cdb:packet-get-query-sig    vec)    (vector-ref  vec 3))
(define-inline (cdb:packet-get-params       vec)    (vector-ref  vec 4))
(define-inline (cdb:packet-get-qtime        vec)    (vector-ref  vec 5))
(define-inline (cdb:packet-set-client-sig!  vec val)(vector-set! vec 0 val))
(define-inline (cdb:packet-set-qtype!       vec val)(vector-set! vec 1 val))
(define-inline (cdb:packet-set-immediate!   vec val)(vector-set! vec 2 val))
(define-inline (cdb:packet-set-query-sig!   vec val)(vector-set! vec 3 val))
(define-inline (cdb:packet-set-params!      vec val)(vector-set! vec 4 val))
(define-inline (cdb:packet-set-qtime!       vec val)(vector-set! vec 5 val))

Modified megatest.scm from [8b945e5bb2] to [b74cc84dd9].

95
96
97
98
99
100
101
102

103
104
105
106
107
108
109
110
95
96
97
98
99
100
101

102

103
104
105
106
107
108
109







-
+
-







  -rebuild-db             : bring the database schema up to date
  -update-meta            : update the tests metadata for all tests
  -env2file fname         : write the environment to fname.csh and fname.sh
  -setvars VAR1=val1,VAR2=val2 : Add environment variables to a run NB// these are
                                 overwritten by values set in config files.
  -server -|hostname      : start the server (reduces contention on megatest.db), use
                            - to automatically figure out hostname
  -listservers            : list the servers 
  -list-servers            : list the servers 
  -killserver host:port|pid : kill server specified by host:port or pid
  -repl                   : start a repl (useful for extending megatest)

Spreadsheet generation
  -extract-ods fname.ods  : extract an open document spreadsheet from the database
  -pathmod path           : insert path, i.e. path/runame/itempath/logfile.html
                            will clear the field if no rundir/testname/itempath/logfile
                            if it contains forward slashes the path will be converted
119
120
121
122
123
124
125

126
127
128
129
130
131
132
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132







+







megatest -test-files 'logs/*.log' -target ubuntu/n%/no% :runname w49% -testpatt test_mt%

Called as " (string-intersperse (argv) " ") "
Built from " megatest-fossil-hash ))

;;  -gui                    : start a gui interface
;;  -config fname           : override the runconfig file with fname
;;  -kill-server host:port|pid : kill server specified by host:port or pid

;; process args
(define remargs (args:get-args 
		 (argv)
		 (list  "-runtests"  ;; run a specific test
			"-config"    ;; override the config file name
			"-execute"   ;; run the command encoded in the base64 parameter
155
156
157
158
159
160
161
162

163
164
165
166
167
168
169
155
156
157
158
159
160
161

162
163
164
165
166
167
168
169







-
+







			":variable"
			":value"
			":expected"
			":tol"
			":units"
			;; misc
			"-server"
			"-killserver"
			"-kill-server"
			"-port"
			"-extract-ods"
			"-pathmod"
			"-env2file"
			"-setvars"
			"-set-state-status"
			"-debug" ;; for *verbosity* > 2
182
183
184
185
186
187
188
189

190
191
192
193
194
195
196
182
183
184
185
186
187
188

189
190
191
192
193
194
195
196







-
+







			"-summarize-items"
		        "-gui"
			;; misc
			"-archive"
			"-repl"
			"-lock"
			"-unlock"
			"-listservers"
			"-list-servers"
			;; queries
			"-test-paths" ;; get path(s) to a test, ordered by youngest first

			"-runall"    ;; run all tests
			"-remove-runs"
			"-usequeue"
			"-rebuild-db"
268
269
270
271
272
273
274
275
276


277
278
279
280

281
282
283


284
285
286
287
288



289
290
291
292
293
294
295
296
297
298







299

300

301
302
303
304
305




306
307
308
309
310

311
312
313
314
315
316

317
318
319
320
321
322
323
324
325
326
327
328

329
330
331
332
333
334
335
268
269
270
271
272
273
274


275
276
277
278
279

280
281


282
283
284
285



286
287
288
289
290
291
292






293
294
295
296
297
298
299
300
301

302
303
304



305
306
307
308





309




310

311
312
313
314
315
316
317
318
319
320
321
322

323
324
325
326
327
328
329
330







-
-
+
+



-
+

-
-
+
+


-
-
-
+
+
+




-
-
-
-
-
-
+
+
+
+
+
+
+

+
-
+


-
-
-
+
+
+
+
-
-
-
-
-
+
-
-
-
-

-
+











-
+







;;======================================================================

(if (args:get-arg "-server")
    (begin
      (debug:print 1 "Launching server...")
      (server:launch)))

(if (or (args:get-arg "-listservers")
	(args:get-arg "-killserver"))
(if (args:get-arg "-list-servers")
	;; (args:get-arg "-kill-server"))
    (let ((tl (setup-for-run)))
      (if tl 
	  (let ((servers (open-run-close tasks:get-all-servers tasks:open-db))
		(fmtstr  "~5a~8a~8a~20a~20a~10a~20a~10a~10a\n")
		(fmtstr  "~5a~8a~8a~20a~20a~10a~10a~10a~10a\n")
		(servers-to-kill '()))
	    (format #t fmtstr "Id" "MTver" "Pid" "Host" "Interface" "Port" "Time" "Priority" "State")
	    (format #t fmtstr "==" "=====" "===" "====" "=========" "====" "====" "========" "=====")
	    (format #t fmtstr "Id" "MTver" "Pid" "Host" "Interface" "OutPort" "InPort" "LastBeat" "State")
	    (format #t fmtstr "==" "=====" "===" "====" "=========" "=======" "======" "========" "=====")
	    (for-each 
	     (lambda (server)
	       (let* ((killinfo   (args:get-arg "-killserver"))
		      (khost-port (if killinfo (if (substring-index ":" killinfo)(string-split ":") #f) #f))
		      (kpid       (if killinfo (if (substring-index ":" killinfo) #f (string->number killinfo)) #f))
	       (let* (;; (killinfo   (args:get-arg "-kill-server"))
		      ;; (khost-port (if killinfo (if (substring-index ":" killinfo)(string-split ":") #f) #f))
		      ;; (kpid       (if killinfo (if (substring-index ":" killinfo) #f (string->number killinfo)) #f))
		      (id         (vector-ref server 0))
		      (pid        (vector-ref server 1))
		      (hostname   (vector-ref server 2))
		      (interface  (vector-ref server 3))
		      (port       (vector-ref server 4))
		      (start-time (vector-ref server 5))
		      (priority   (vector-ref server 6))
		      (state      (vector-ref server 7))
		      (mt-ver     (vector-ref server 8))
		      (status     (open-run-close tasks:server-alive? tasks:open-db #f hostname: hostname port: port))
		      (pullport   (vector-ref server 4))
		      (pubport    (vector-ref server 5))
		      (start-time (vector-ref server 6))
		      (priority   (vector-ref server 7))
		      (state      (vector-ref server 8))
		      (mt-ver     (vector-ref server 9))
		      (last-update (vector-ref server 10)) ;;   (open-run-close tasks:server-alive? tasks:open-db #f hostname: hostname port: port))
		      (killed     #f)
		      (status     (< last-update 20)))
		      (zmq-socket (if status (server:client-connect hostname port) #f)))
		 ;;   (zmq-sockets (if status (server:client-connect hostname port) #f)))
		 ;; no need to login as status of #t indicates we are connecting to correct 
		 ;; server
		 (if (not status)    ;; no point in keeping dead records in the db
		      (open-run-close tasks:server-deregister tasks:open-db hostname port: port pid: pid))

		 (if (equal? state "dead")
		     (if (> last-update (* 25 60 60)) ;; keep records around for slighly over a day.
			 (open-run-close tasks:server-deregister tasks:open-db hostname pullport: pullport pid: pid action: 'delete))
		     (if (> last-update 20)        ;; Mark as dead if not updated in last 20 seconds
		 (if (and khost-port ;; kill by host/port
			  (equal? hostname (car khost-port))
			  (equal? port (string->number (cadr khost-port))))
		     (tasks:kill-server status hostname port pid))

			 (open-run-close tasks:server-deregister tasks:open-db hostname pullport: pullport pid: pid)))
		 (if (and kpid
			  (equal? hostname (get-host-name))
			  (equal? kpid pid)) ;;; YEP, ALL WITH PID WILL BE KILLED!!!
		     (tasks:kill-server status hostname #f pid))

		 (format #t fmtstr id mt-ver pid hostname interface port start-time priority 
		 (format #t fmtstr id mt-ver pid hostname interface pullport pubport last-update
			 (if status "alive" "dead"))))
	     servers)
	    (debug:print-info 1 "Done with listservers")
	    (set! *didsomething* #t)
	    (exit) ;; must do, would have to add checks to many/all calls below
	    )
	  (exit)))
    ;; if not list or kill then start a client (if appropriate)
    (if (or (args-defined? "-h" "-version" "-gen-megatest-area" "-gen-megatest-test")
	    (eq? (length (hash-table-keys args:arg-hash)) 0))
	(debug:print-info 1 "Server connection not needed")

	
	(server:client-launch)))

;;======================================================================
;; Remove old run(s)
;;======================================================================

;; since several actions can be specified on the command line the removal

Modified server.scm from [c2beddecc4] to [67b3f4e15b].

1
2
3
4
5
6
7
8
9
10
11
12


13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

























28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46














47





48
49
50
51
52
53
54


55
56
57
58
59
60
61
62
63
64
65














66
67
68
69
70
71
72







73
74
75
76
77
78







79
80
81
82
83
84

85
86
87
88
89
90
91
92
93
94
95
96
97


98
99


100
101
102
103
104


105
106
107
108
109
110
111


112
113
114
115
116
117
118

119
120
121


122



123
124
125
126
127
128
129















130
131


132
133
134
135
136





137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152

153
154
155
156
157
158
159
160
161
162
163










164
165
166
167
168
169
170
171
172
173
174
175











176
177
178


179
180
181
182
183
184
185
186
187
188
189
190
191


192
193

194







195
196
197
198
199
200






201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217


218
219
220
221


222
223
224





225
226


227

228
229
230
231


232
233
234
235
236
237



















238
239
240
241
242
243
244
245
246
247
248
249
250




251
252
253
254
255
256
257
258
259
260
261
262
263












264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
1
2
3
4
5
6
7
8
9
10


11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61










62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90











91
92
93
94
95
96
97
98
99
100
101
102
103
104







105
106
107
108
109
110
111






112
113
114
115
116
117
118
119
120
121
122
123

124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139


140
141





142
143







144
145




146


147



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175


176
177





178
179
180
181
182
















183











184
185
186
187
188
189
190
191
192
193












194
195
196
197
198
199
200
201
202
203
204
205


206
207
208
209
210
211
212
213
214
215
216
217
218
219

220
221
222

223
224
225
226
227
228
229
230
231






232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252


253
254
255
256


257
258
259
260
261
262
263
264
265
266
267
268
269
270

271
272



273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309



310
311
312
313
314












315
316
317
318
319
320
321
322
323
324
325
326












327
328
329
330
331
332
333










-
-
+
+















+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+









-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+

+
+
+
+
+







+
+
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
-
-
-
-
+
+
+
+
+
+
+
-
-
-
-
-
-
+
+
+
+
+
+
+





-
+













+
+
-
-
+
+
-
-
-
-
-
+
+
-
-
-
-
-
-
-
+
+
-
-
-
-

-
-
+
-
-
-
+
+

+
+
+







+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
+
+
-
-
-
-
-
+
+
+
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+

-
-
+
+












-
+
+

-
+

+
+
+
+
+
+
+
-
-
-
-
-
-
+
+
+
+
+
+















-
-
+
+


-
-
+
+



+
+
+
+
+


+
+
-
+

-
-
-
+
+






+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+










-
-
-
+
+
+
+

-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
-
-
-
-
-
-
-
-
-








;; Copyright 2006-2012, Matthew Welland.
;; 
;;  This program is made available under the GNU GPL version 2.0 or
;;  greater. See the accompanying file COPYING for details.
;; 
;;  This program is distributed WITHOUT ANY WARRANTY; without even the
;;  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
;;  PURPOSE.

(require-extension (srfi 18) extras tcp rpc s11n)
(import (prefix rpc rpc:))
(require-extension (srfi 18) extras tcp s11n)
;; (import (prefix rpc rpc:))

(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest)
(import (prefix sqlite3 sqlite3:))

(use zmq)

(declare (unit server))

(declare (uses common))
(declare (uses db))
(declare (uses tests))
(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running.

(include "common_records.scm")
(include "db_records.scm")

;; Transition to pub --> sub with pull <-- push
;;
;;   1. client sends request to server via push to the pull port
;;   2. server puts request in queue or processes immediately as appropriate
;;   3. server puts responses from completed requests into pub port 
;;
;; TODO
;;
;; Done Tested
;; [x]  [ ]    1. Add columns pullport pubport to servers table
;; [x]  [ ]    2. Add rm of monitor.db if older than 11/12/2012 
;; [x]  [ ]    3. Add create of pullport and pubport with finding of available ports
;; [x]  [ ]    4. Add client compose of request
;; [x]  [ ]        - name of client: testname/itempath-test_id-hostname 
;; [x]  [ ]        - name of request: callname, params
;; [x]  [ ]        - request key: f(clientname, callname, params)
;; [ ]  [ ]    5. Add processing of subscription hits
;; [ ]  [ ]        - done when get key 
;; [ ]  [ ]        - return results
;; [ ]  [ ]    6. Add timeout processing
;; [ ]  [ ]        - after 60 seconds
;; [ ]  [ ]            i. check server alive, connect to new if necessary
;; [ ]  [ ]           ii. resend request
;; [ ]  [ ]    7. Turn self ping back on

(define (server:make-server-url hostport)
  (if (not hostport)
      #f
      (conc "tcp://" (car hostport) ":" (cadr hostport))))

(define  *server-loop-heart-beat* (current-seconds))
(define *heartbeat-mutex* (make-mutex))

(define (server:self-ping iface port)
  (let ((zsocket (server:client-connect iface port)))
    (let loop ()
      (thread-sleep! 2)
      (cdb:client-call zsocket 'ping #t)
      (debug:print 4 "server:self-ping - I'm alive on " iface ":" port "!")
      (mutex-lock! *heartbeat-mutex*)
      (set! *server-loop-heart-beat* (current-seconds))
      (mutex-unlock! *heartbeat-mutex*)
      (loop))))
;; (define (server:self-ping server-info)
;;   ;; server-info: server-id interface pullport pubport
;;   (let ((iface    (list-ref server-info 1))
;; 	(pullport (list-ref server-info 2))
;; 	(pubport  (list-ref server-info 3)))
;;     (server:client-connect iface pullport pubport)
;;     (let loop ()
;;       (thread-sleep! 2)
;;       (cdb:client-call *runremote* 'ping #t)
;;       (debug:print 4 "server:self-ping - I'm alive on " iface ":" pullport "/" pubport "!")
;;       (mutex-lock! *heartbeat-mutex*)
;;       (set! *server-loop-heart-beat* (current-seconds))
;;       (mutex-unlock! *heartbeat-mutex*)
;;       (loop))))
    
(define-inline (zmqsock:get-pub  dat)(vector-ref dat 0))
(define-inline (zmqsock:get-pull dat)(vector-ref dat 1))
(define-inline (zmqsock:set-pub! dat s)(vector-set! dat s 0))
(define-inline (zmqsock:set-pull! dat s)(vector-set! dat s 0))

(define (server:run hostn)
  (debug:print 0 "Attempting to start the server ...")
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting")
	    (exit))))
  (let* ((zmq-sdat1       #f)
	 (zmq-sdat2       #f)
  (let* ((zmq-socket     #f)
	 (zmq-socket-dat #f)
	 (iface          (if (string=? "-" hostn)
			     "*" ;; (get-host-name) 
			     hostn))
	 (hostname       (get-host-name))
	 (ipaddrstr      (let ((ipstr (if (string=? "-" hostn)
					  (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
					  #f)))
			   (if ipstr ipstr hostname)))
	 (actual-port    #f))
	 (pull-socket     #f)
	 (pub-socket      #f)
	 (p1              #f)
	 (p2              #f)
	 (zmq-sockets-dat #f)
	 (iface           (if (string=? "-" hostn)
			      "*" ;; (get-host-name) 
			      hostn))
	 (hostname        (get-host-name))
	 (ipaddrstr       (let ((ipstr (if (string=? "-" hostn)
					   (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
					   #f)))
			    (if ipstr ipstr hostname)))
	 (last-run       0))
    ;; (set! zmq-socket (server:find-free-port-and-open iface zmq-socket 5555 0))
    (set! zmq-socket-dat (server:find-free-port-and-open ipaddrstr zmq-socket (if (args:get-arg "-port")
									      (string->number (args:get-arg "-port"))
									      (+ 5000 (random 1001)))
						     0))
    (set! zmq-socket  (cadr  zmq-socket-dat))
    (set! actual-port (caddr zmq-socket-dat))
    (set! zmq-sockets-dat (server:setup-ports ipaddrstr (if (args:get-arg "-port")
							    (string->number (args:get-arg "-port"))
							    (+ 5000 (random 1001)))))

    (set! zmq-sdat1    (car   zmq-sockets-dat))
    (set! pull-socket  (cadr  zmq-sdat1)) ;; (iface s  port)
    (set! p1           (caddr zmq-sdat1))
    (set! *cache-on* #t)

    ;; (set! th1 (make-thread (lambda ()
    ;;     		     (server:self-ping ipaddrstr actual-port))))
    ;; (thread-start! th1)
    
    
    (set! zmq-sdat2    (cadr  zmq-sockets-dat))
    (set! pub-socket   (cadr  zmq-sdat2))
    (set! p2           (caddr zmq-sdat2))

    (set! *cache-on* #t)

    ;; what to do when we quit
    ;;
    (on-exit (lambda ()
	       (if (and *toppath* *server-info*)
		   (begin
		     (open-run-close tasks:server-deregister-self tasks:open-db ipaddrstr))
		     (open-run-close tasks:server-deregister-self tasks:open-db ipaddrstr p1 p2))
		   (let loop () 
		     (let ((queue-len 0))
		       (thread-sleep! (random 5))
		       (mutex-lock! *incoming-mutex*)
		       (set! queue-len (length *incoming-data*))
		       (mutex-unlock! *incoming-mutex*)
		       (if (> queue-len 0)
			   (begin
			     (debug:print-info 0 "Queue not flushed, waiting ...")
			     (loop))))))))

    ;; The heavy lifting
    ;;
    ;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime
    ;;
    (let loop ()
      ;; ;; Ugly yuk. 
    (let loop ((queue-lst '()))
      ;; (print "GOT HERE EH?")
      ;; (mutex-lock! *incoming-mutex*)
      ;; (set! *server-loop-heart-beat* (list 'waiting (current-seconds)))
      ;; (mutex-unlock! *incoming-mutex*)
      (let* ((rawmsg (receive-message* zmq-socket))
	     (params (db:string->obj rawmsg)) ;; (with-input-from-string rawmsg (lambda ()(deserialize))))
      (let* ((rawmsg (receive-message* pull-socket))
	     (packet (db:string->obj rawmsg)))
	     (res    #f))
	;;; Ugly yuk. 
	;; (mutex-lock! *incoming-mutex*)
	;; (set! *server-loop-heart-beat* (list 'working (current-seconds)))
	;; (mutex-unlock! *incoming-mutex*)
	(debug:print-info 12 "server=> received params=" params)
	(set! res (cdb:cached-access params))
	(debug:print-info 12 "server=> received packet=" packet)
	(if (cdb:packet-get-immediate packet) ;; process immediately or put in queue
	(debug:print-info 12 "server=> processed res=" res)
	(send-message zmq-socket (db:obj->string res))
	(if (not *time-to-exit*)
	    (loop)
	    (begin
	      (open-run-close tasks:server-deregister-self tasks:open-db #f)
	      (db:write-cached-data)
	      (db:process-queue pubsock (cons packet queue))
	      (exit)
	      ))))
    (thread-join! th1)))
	      (loop '()))
	    (loop (cons packet queue)))))))

(define (server:reply pubsock target result)
  (send-message pubsock target send-more: #t)
  (send-message pubsock result))

;; run server:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (server:keep-running)
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  ;; This thread waits for the server to come alive
  (let* ((server-info (let loop ()
			(let ((sdat #f))
			  (mutex-lock! *heartbeat-mutex*)
			  (set! sdat *server-info*)
			  (mutex-unlock! *heartbeat-mutex*)
			  (if sdat sdat
			      (begin
				(sleep 4)
				(loop))))))
	 (iface       (cadr server-info))
	 (pullport    (caddr server-info))
	 (pubport     (cadddr server-info)) ;; id interface pullport pubport)
	 ;; (zmq-sockets (server:client-connect iface pullport pubport)))
	 )
  (let loop ((count 0))
    (thread-sleep! 4) ;; no need to do this very often
    (let loop ((count 0))
      (thread-sleep! 4) ;; no need to do this very often
    (db:write-cached-data)
    ;; (print "Server running, count is " count)
    (if (< count 1) ;; 3x3 = 9 secs aprox
	(loop (+ count 1))
	(let (;; (numrunning            (open-run-close db:get-count-tests-running #f))
      ;;  (let ((queue-len (string->number (cdb:client-call zmq-sockets 'sync #t 1))))
      ;; (print "Server running, count is " count)
      (if (< count 1) ;; 3x3 = 9 secs aprox
	  (loop (+ count 1)))
      
	      (server-loop-heartbeat #f)
	      (server-info           #f)
	      (pulse                 0))
	  ;; BUG add a wait on server alive here!!
	  ;; ;; Ugly yuk. 
	  (mutex-lock! *heartbeat-mutex*)
	  (set! server-loop-heartbeat *server-loop-heart-beat*)
	  (set! server-info *server-info*)
	  (mutex-unlock! *heartbeat-mutex*)
	  ;; The logic here is that if the server loop gets stuck blocked in working
	  ;; we don't want to update our heartbeat
	  (set! pulse (- (current-seconds) server-loop-heartbeat))
	  (debug:print-info 2 "Heartbeat period is " pulse " seconds on " (cadr server-info) ":" (caddr server-info) ", last db access is " (- (current-seconds) *last-db-access*) " seconds ago")
	  (if (> pulse 15) ;; must stay less than 10 seconds 
	      (begin
		(open-run-close tasks:server-deregister tasks:open-db (cadr server-info) port: (caddr server-info))
      ;; NOTE: Get rid of this mechanism! It really is not needed...
		(debug:print 0 "ERROR: Heartbeat failed, committing servercide")
		(exit))
	      (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info)))
	  ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access
	  (if (> (+ *last-db-access* 
		    ;; (* 48 60 60)    ;; 48 hrs
		    ;; 60              ;; one minute
		    (* 60 60)       ;; one hour
		    )
		 (current-seconds))
	      (begin
      (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info))
      
      ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access
      (if (> (+ *last-db-access* 
		;; (* 48 60 60)    ;; 48 hrs
		;; 60              ;; one minute
		(* 60 60)       ;; one hour
		)
	     (current-seconds))
	  (begin
		;; (debug:print-info 2 "Server continuing, tests running: " numrunning ", seconds since last db access: " (- (current-seconds) *last-db-access*))
		(debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) *last-db-access*))
		(loop 0))
	      (begin
		(debug:print-info 0 "Starting to shutdown the server.")
		;; need to delete only *my* server entry (future use)
		(set! *time-to-exit* #t)
		(open-run-close tasks:server-deregister-self tasks:open-db (get-host-name))
		(thread-sleep! 1)
		(debug:print-info 0 "Max cached queries was " *max-cache-size*)
		(debug:print-info 0 "Server shutdown complete. Exiting")
		(exit)))))))
	    (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) *last-db-access*))
	    (loop 0))
	  (begin
	    (debug:print-info 0 "Starting to shutdown the server.")
	    ;; need to delete only *my* server entry (future use)
	    (set! *time-to-exit* #t)
	    (open-run-close tasks:server-deregister-self tasks:open-db (get-host-name))
	    (thread-sleep! 1)
	    (debug:print-info 0 "Max cached queries was " *max-cache-size*)
	    (debug:print-info 0 "Server shutdown complete. Exiting")
	    (exit))))))

(define (server:find-free-port-and-open iface s port #!key (trynum 50))
  (let ((s (if s s (make-socket 'rep)))
(define (server:find-free-port-and-open iface s port stype #!key (trynum 50))
  (let ((s (if s s (make-socket stype)))
	(p (if (number? port) port 5555))
 	(old-handler (current-exception-handler)))
    (handle-exceptions
     exn
     (begin
       (debug:print 0 "Failed to bind to port " p ", trying next port")
       (debug:print 0 "   EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
       ;; (old-handler)
       ;; (print-call-chain)
       (if (> trynum 0)
	   (server:find-free-port-and-open iface s (+ p 1) trynum: (- trynum 1))
	   (debug:print-info 0 "Tried ports up to " p 
			     " but all were in use. Please try a different port range by starting the server with parameter \" -port N\" where N is the starting port number to use")))
			     " but all were in use. Please try a different port range by starting the server with parameter \" -port N\" where N is the starting port number to use"))
       (exit)) ;; To exit or not? That is the question.
     (let ((zmq-url (conc "tcp://" iface ":" p)))
       (print "Trying to start server on " zmq-url)
       (debug:print 0 "Trying to start server on " zmq-url)
       (bind-socket s zmq-url)
       (list iface s port)))))

(define (server:setup-ports ipaddrstr startport)
  (let* ((s1 (server:find-free-port-and-open ipaddrstr #f startport 'pub))
	 (p1 (caddr s1))
	 (s2 (server:find-free-port-and-open ipaddrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pull))
	 (p2 (caddr s2)))
       (set! *runremote* #f)
       (debug:print 0 "Server started on " zmq-url)
       (mutex-lock! *heartbeat-mutex*)
       (set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) iface p 0 'live))
       (mutex-unlock! *heartbeat-mutex*)
       (list iface s port)))))
    (set! *runremote* #f)
    (debug:print 0 "Server started on " ipaddrstr " ports " p1 " and " p2)
    (mutex-lock! *heartbeat-mutex*)
    (set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) ipaddrstr p1 p2 0 'live))
    (mutex-unlock! *heartbeat-mutex*)
    (list s1 s2)))

(define (server:mk-signature)
  (message-digest-string (md5-primitive) 
			 (with-output-to-string
			   (lambda ()
			     (write (list (current-directory)
					  (argv)))))))

(define (server:get-client-signature)
  (if *my-client-signature* *my-client-signature*
      (let ((sig (server:mk-signature)))
	(set! *my-client-signature* sig)
	*my-client-signature*)))

;; 
(define (server:client-connect iface port #!key (context #f))
  (debug:print-info 3 "client-connect " iface ":" port)
(define (server:client-socket-connect iface port #!key (context #f)(type 'req)(subscriptions '()))
  (debug:print-info 3 "client-connect " iface ":" port ", type=" type ", subscriptions=" subscriptions)
  (let ((connect-ok #f)
	(zmq-socket (if context 
			(make-socket 'req context)
			(make-socket 'req)))
			(make-socket type context)
			(make-socket type)))
	(conurl     (server:make-server-url (list iface port))))
    (if (socket? zmq-socket)
	(begin
	  ;; first apply subscriptions
	  (for-each (lambda (subscription)
		      (debug:print 2 "Subscribing to " subscription)
		      (socket-option-set! zmq-socket 'subscribe subscription))
		    subscriptions)
	  (connect-socket zmq-socket conurl)
	  zmq-socket)
	(begin
	  (debug:print 0 "ERROR: Failed to open socket to " conurl)
	#f)))
	  #f))))
  

(define (server:client-login zmq-socket)
  (cdb:login zmq-socket *toppath* (server:get-client-signature)))
(define (server:client-login zmq-sockets)
  (cdb:login zmq-sockets *toppath* (server:get-client-signature)))

(define (server:client-logout zmq-socket)
  (let ((ok (and (socket? zmq-socket)
		 (cdb:logout zmq-socket *toppath* (server:get-client-signature)))))
    ;; (close-socket zmq-socket)
    ok))

(define (server:client-connect iface pullport pubport)
  (let* ((push-socket (server:client-socket-connect iface pullport type: 'push))
	 (sub-socket  (server:client-socket-connect iface pubport
						    type: 'sub
						    subscriptions: (list (server:get-client-signature) "all")))
	 (zmq-sockets (vector push-socket sub-socket))
	 (login-res   #f))
    (set! login-res (server:client-login zmq-sockets))
    (if (and (not (null? login-res))
	     (car login-res))
	(begin
	  (debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".")
	  (set! *runremote* zmq-sockets)
	  zmq-sockets)
	(begin
	  (debug:print-info 2 "Failed to login or connect to " conurl)
	  (set! *runremote* #f)
	  #f))))

;; Do all the connection work, start a server if not already running
(define (server:client-setup #!key (numtries 50))
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: failed to find megatest.config, exiting")
	    (exit))))
  (let ((hostinfo   (open-run-close tasks:get-best-server tasks:open-db)))
    (if hostinfo
	(let ((host    (car   hostinfo))
	      (iface   (cadr  hostinfo))
	      (port    (caddr hostinfo)))
	(let ((host     (list-ref hostinfo 0))
	      (iface    (list-ref hostinfo 1))
	      (pullport (list-ref hostinfo 2))
	      (pubport  (list-ref hostinfo 3)))
	  (debug:print-info 2 "Setting up to connect to " hostinfo)
	  (handle-exceptions
	   exn
	   (begin
	     ;; something went wrong in connecting to the server. In this scenario it is ok
	     ;; to try again
	     (debug:print 0 "ERROR: Failed to open a connection to the server at: " hostinfo)
	     (debug:print 0 "   EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
	     (debug:print 0 "   perhaps jobs killed with -9? Removing server records")
	     (open-run-close tasks:server-deregister tasks:open-db host port: port)
	     (server:client-setup (- numtries 1))
	     #f)
	   (let* ((zmq-socket (server:client-connect iface port))
	  ;; (handle-exceptions
	  ;;   exn
	  ;;  (begin
	  ;;    ;; something went wrong in connecting to the server. In this scenario it is ok
	  ;;    ;; to try again
	  ;;    (debug:print 0 "ERROR: Failed to open a connection to the server at: " hostinfo)
	  ;;    (debug:print 0 "   EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
	  ;;    (debug:print 0 "   perhaps jobs killed with -9? Removing server records")
	  ;;    (open-run-close tasks:server-deregister tasks:open-db host pullport: pullport)
	  ;;    (server:client-setup (- numtries 1))
	  ;;    #f)
	   (server:client-connect iface pullport pubport)) ;; )
		  (login-res  (server:client-login zmq-socket))
		  (connect-ok (if (null? login-res) #f (car login-res)))
		  (conurl     (server:make-server-url (list iface port))))
	     (if connect-ok
		 (begin
		   (debug:print-info 2 "Logged in and connected to " conurl)
		   (set! *runremote* zmq-socket)
		   #t)
		 (begin
		   (debug:print-info 2 "Failed to login or connect to " conurl)
		   (set! *runremote* #f)
		   #f)))))
	(if (> numtries 0)
	    (let ((exe (car (argv))))
	      (debug:print-info 1 "No server available, attempting to start one...")
	      (process-run exe (list "-server" "-" "-debug" (conc *verbosity*)))
	      (sleep 5) ;; give server time to start
	      ;; we are starting a server, do not try again! That can lead to 
	      ;; recursively starting many processes!!!
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309














310
311
312

313

314
315

316
317
318
319



320
321
322
323
324
325
326
327
328
329
330
331
332

333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349

350
351
352
353
354
355
356
342
343
344
345
346
347
348












349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364

365

366
367

368
369
370
371

372
373
374
375
376
377
378
379
380
381
382
383
384
385
386

387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412







-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+


-
+
-
+

-
+



-
+
+
+












-
+

















+







	    (debug:print 0 "ERROR: cannot find megatest.config, exiting")
	    (exit))))
  (debug:print-info 1 "Starting the standalone server")
  (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db)))
    (if hostinfo
	(debug:print-info 1 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo))
	(if *toppath* 
	    (let* ((th1 (make-thread (lambda ()
				       (let ((server-info #f))
					 ;; wait for the server to be online and available
					 (let loop ()
					   (debug:print-info 1 "Waiting for the server to come online before starting heartbeat")
					   (thread-sleep! 2)
					   (mutex-lock! *heartbeat-mutex*)
					   (set! server-info *server-info* )
					   (mutex-unlock! *heartbeat-mutex*)
					   (if (not server-info)(loop)))
					 (debug:print 1 "Server alive, starting self-ping")
					 (server:self-ping (cadr server-info)(caddr server-info)))) "Self ping"))
	    (let* (;; (th1 (make-thread (lambda ()
		   ;;      	       (let ((server-info #f))
		   ;;      		 ;; wait for the server to be online and available
		   ;;      		 (let loop ()
		   ;;      		   (debug:print-info 1 "Waiting for the server to come online before starting heartbeat")
		   ;;      		   (thread-sleep! 2)
		   ;;      		   (mutex-lock! *heartbeat-mutex*)
		   ;;      		   (set! server-info *server-info* )
		   ;;      		   (mutex-unlock! *heartbeat-mutex*)
		   ;;      		   (if (not server-info)(loop)))
		   ;;      		 (debug:print 1 "Server alive, starting self-ping")
		   ;;      		 (server:self-ping server-info)
		   ;;      		 ))
		   ;;      	     "Self ping"))
		   (th2 (make-thread (lambda ()
				       (server:run (args:get-arg "-server"))) "Server run"))
		   (th3 (make-thread (lambda ()
		   (th3 (make-thread (lambda ()(server:keep-running)) "Keep running"))
				       (server:keep-running)) "Keep running")))
		   )
	      (set! *client-non-blocking-mode* #t)
	      (thread-start! th1)
	      ;; (thread-start! th1)
	      (thread-start! th2)
	      (thread-start! th3)
	      (set! *didsomething* #t)
	      (thread-join! th3))
	      ;; (thread-join! th3)
	      (thread-join! th2)
	      )
	    (debug:print 0 "ERROR: Failed to setup for megatest")))
    (exit)))

(define (server:client-signal-handler signum)
  (handle-exceptions
   exn
   (debug:print " ... exiting ...")
   (let ((th1 (make-thread (lambda ()
			     (if (not *received-response*)
				 (receive-message* *runremote*))) ;; flush out last call if applicable
			   "eat response"))
	 (th2 (make-thread (lambda ()
			     (debug:print 0 "ERROR: Received ^C, attempting clean exit.")
			     (debug:print 0 "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.")
			     (thread-sleep! 3) ;; give the flush three seconds to do it's stuff
			     (debug:print 0 "       Done.")
			     (exit 4))
			   "exit on ^C timer")))
     (thread-start! th2)
     (thread-start! th1)
     (thread-join! th2))))

(define (server:client-launch)
  (set-signal-handler! signal/int server:client-signal-handler)
   (if (server:client-setup)
       (debug:print-info 2 "connected as client")
       (begin
	 (debug:print 0 "ERROR: Failed to connect as client")
	 (exit))))

;; ping a server and return number of clients or #f (if no response)
;; NOT IN USE!
(define (server:ping host port #!key (secs 10)(return-socket #f))
  (cdb:use-non-blocking-mode
   (lambda ()
     (let* ((res #f)
	    (th1 (make-thread
		  (lambda ()
		    (let* ((zmq-context (make-context 1))

Modified tasks.scm from [53e3d6e8bd] to [acea04adee].

20
21
22
23
24
25
26
27









28
29
30
31
32
33
34
20
21
22
23
24
25
26

27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42







-
+
+
+
+
+
+
+
+
+








;;======================================================================
;; Tasks db
;;======================================================================

(define (tasks:open-db)
  (let* ((dbpath  (conc *toppath* "/monitor.db"))
	 (exists  (file-exists? dbpath))
	 (exists  (if (file-exists? dbpath)
		      ;; BUGGISHNESS: Remove this code in six months. Today is 11/13/2012
		      (if (< (file-change-time dbpath) 1352851396.0)
			  (begin
			    (debug:print 0 "NOTE: removing old db file " dbpath)
			    (delete-file dbpath)
			    #f)
			  #t)
		      #f))
	 (mdb     (sqlite3:open-database dbpath)) ;; (never-give-up-open-db dbpath))
	 (handler (make-busy-timeout 36000)))
    (sqlite3:set-busy-handler! mdb handler)
    (sqlite3:execute mdb (conc "PRAGMA synchronous = 0;"))
    (if (not exists)
	(begin
	  (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS tasks_queue (id INTEGER PRIMARY KEY,
50
51
52
53
54
55
56
57


58
59
60
61
62
63

64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81

82
83
84
85



86
87

88

89

90
91
92
93


94

95
96
97
98
99






100
101
102
103
104
105

106
107
108
109
110
111
112
113
114


115
116
117
118
119
120
121

122
123
124

125
126
127
128
129
130
131
58
59
60
61
62
63
64

65
66
67
68
69
70
71

72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89

90
91
92


93
94
95
96

97
98
99

100
101
102


103
104
105
106





107
108
109
110
111
112
113
114
115
116
117

118
119
120
121
122
123
124
125


126
127
128
129
130
131
132
133

134
135
136

137
138
139
140
141
142
143
144







-
+
+





-
+

















-
+


-
-
+
+
+

-
+

+
-
+


-
-
+
+

+
-
-
-
-
-
+
+
+
+
+
+





-
+







-
-
+
+






-
+


-
+







                                hostname TEXT,
                                username TEXT,
                               CONSTRAINT monitors_constraint UNIQUE (pid,hostname));")
	  (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS servers (id INTEGER PRIMARY KEY,
                                  pid INTEGER,
                                  interface TEXT,
                                  hostname TEXT,
                                  port INTEGER,
                                  pullport INTEGER,
                                  pubport  INTEGER,
                                  start_time TIMESTAMP,
                                  priority INTEGER,
                                  state TEXT,
                                  mt_version TEXT,
                                  heartbeat TIMESTAMP,
                               CONSTRAINT servers_constraint UNIQUE (pid,hostname,port));")
                               CONSTRAINT servers_constraint UNIQUE (pid,hostname,pullport,pubport));")
	  (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS clients (id INTEGER PRIMARY KEY,
                                  server_id INTEGER,
                                  pid INTEGER,
                                  hostname TEXT,
                                  cmdline TEXT,
                                  login_time TIMESTAMP,
                                  logout_time TIMESTAMP DEFAULT -1,
                                CONSTRAINT clients_constraint UNIQUE (pid,hostname));")
                                  
	  ))
    mdb))
    
;;======================================================================
;; Server and client management
;;======================================================================

;; state: 'live, 'shutting-down, 'dead
(define (tasks:server-register mdb pid interface port priority state)
(define (tasks:server-register mdb pid interface pullport pubport priority state)
  (sqlite3:execute 
   mdb 
   "INSERT OR REPLACE INTO servers (pid,hostname,port,start_time,priority,state,mt_version,heartbeat,interface) VALUES(?,?,?,strftime('%s','now'),?,?,?,strftime('%s','now'),?);"
   pid (get-host-name) port priority (conc state) megatest-version interface)
   "INSERT OR REPLACE INTO servers (pid,hostname,pullport,pubport,start_time,priority,state,mt_version,heartbeat,interface)
                             VALUES(?,  ?,       ?,       ?, strftime('%s','now'), ?, ?, ?, strftime('%s','now'),?);"
   pid (get-host-name) pullport pubport priority (conc state) megatest-version interface)
  (list 
   (tasks:server-get-server-id mdb (get-host-name) port pid)
   (tasks:server-get-server-id mdb (get-host-name) pullport pid)
   interface
   pullport
   port))
   pubport))

;; NB// two servers with same pid on different hosts will be removed from the list if pid: is used!
(define (tasks:server-deregister mdb hostname #!key (port #f)(pid #f))
  (debug:print-info 11 "server-deregister " hostname ", port " port ", pid " pid)
(define (tasks:server-deregister mdb hostname #!key (pullport #f)(pid #f)(action 'markdead))
  (debug:print-info 11 "server-deregister " hostname ", pullport " pullport ", pid " pid)
  (if pid
      (case action
      ;; (sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid)
      (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE pid=?;" pid)
      (if port
	  ;; (sqlite3:execute mdb "DELETE FROM servers WHERE  hostname=? AND port=?;" hostname port)
	  (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE hostname=? AND port=?;" hostname port)
	((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid))
	(else    (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE pid=?;" pid)))
      (if pullport
	  (case action
	    ((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE  hostname=? AND pullport=?;" hostname port))
	    (else    (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE hostname=? AND pullport=?;" hostname pullport)))
	  (debug:print 0 "ERROR: tasks:server-deregister called with neither pid nor port specified"))))

(define (tasks:server-deregister-self mdb hostname)
  (tasks:server-deregister mdb hostname pid: (current-process-id)))

(define (tasks:server-get-server-id mdb hostname port pid)
(define (tasks:server-get-server-id mdb hostname pullport pid)
  (let ((res #f))
    (sqlite3:for-each-row
     (lambda (id)
       (set! res id))
     mdb
     (if (and hostname  pid)
	 "SELECT id FROM servers WHERE hostname=? AND pid=?;"
	 "SELECT id FROM servers WHERE hostname=? AND port=?;")
     hostname (if pid pid port))
	 "SELECT id FROM servers WHERE hostname=? AND pullport=?;")
     hostname (if pid pid pullport))
    res))

(define (tasks:server-update-heartbeat mdb server-id)
  (sqlite3:execute mdb "UPDATE servers SET heartbeat=strftime('%s','now') WHERE id=?;" server-id))

;; alive servers keep the heartbeat field upto date with seconds every 6 or so seconds
(define (tasks:server-alive? mdb server-id #!key (hostname #f)(port #f)(pid #f))
(define (tasks:server-alive? mdb server-id #!key (hostname #f)(pullport #f)(pid #f))
  (let* ((server-id  (if server-id 
			 server-id
			 (tasks:server-get-server-id mdb hostname port pid)))
			 (tasks:server-get-server-id mdb hostname pullport pid)))
	 (heartbeat-delta 99e9))
    (sqlite3:for-each-row
     (lambda (delta)
       (set! heartbeat-delta delta))
     mdb "SELECT strftime('%s','now')-heartbeat FROM servers WHERE id=?;" server-id)
    (< heartbeat-delta 10)))

156
157
158
159
160
161
162
163
164
165



166


167

168
169
170
171
172
173
174
175
176
177






178
179
180
181


182
183
184




185
186
187
188







189
190
191
192
193
194
195
169
170
171
172
173
174
175



176
177
178
179
180
181

182

183
184
185
186





187
188
189
190
191
192
193
194


195
196
197


198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219







-
-
-
+
+
+

+
+
-
+
-




-
-
-
-
-
+
+
+
+
+
+


-
-
+
+

-
-
+
+
+
+




+
+
+
+
+
+
+








;; ping each server in the db and return first found that responds. 
;; remove any others. will not necessarily remove all!
(define (tasks:get-best-server mdb)
  (let ((res '())
	(best #f))
    (sqlite3:for-each-row
     (lambda (id hostname interface port pid)
       (set! res (cons (list hostname interface port pid) res))
       (debug:print-info 2 "Found existing server " hostname ":" port " registered in db"))
     (lambda (id hostname interface pullport pubport pid)
       (set! res (cons (list hostname interface pullport pubport pid) res))
       (debug:print-info 2 "Found existing server " hostname ":" pullport " registered in db"))
     mdb
     "SELECT id,hostname,interface,pullport,pubport,pid FROM servers
         WHERE strftime('%s','now')-heartbeat < 10
     "SELECT id,hostname,interface,port,pid FROM servers WHERE state='live' AND mt_version=? ORDER BY start_time ASC LIMIT 1;" megatest-version)
               AND mt_version=? ORDER BY start_time ASC LIMIT 1;" megatest-version)
    ;; (print "res=" res)
    (if (null? res) #f
	(let loop ((hed (car res))
		   (tal (cdr res)))
	  ;; (print "hed=" hed ", tal=" tal)
	  (let* ((host     (car    hed))
		 (iface    (cadr   hed))
		 (port     (caddr  hed))
		 (pid      (cadddr hed))
		 (alive    (open-run-close tasks:server-alive? tasks:open-db #f hostname: host port: port)))
	  (let* ((host     (list-ref hed 0))
		 (iface    (list-ref hed 1))
		 (pullport (list-ref hed 2))
		 (pubport  (list-ref hed 3))
		 (pid      (list-ref hed 4))
		 (alive    (open-run-close tasks:server-alive? tasks:open-db #f hostname: host pullport: pullport)))
	    (if alive
		(begin
		  (debug:print-info 2 "Found an existing, alive, server " host ":" port ".")
		  (list host iface port))
		  (debug:print-info 2 "Found an existing, alive, server " host ", " pullport " and " pubport ".")
		  (list host iface pullport pubport))
		(begin
		  (debug:print-info 1 "Removing " host ":" port " from server registry as it appears to be dead")
		  (tasks:kill-server #f host port pid)
		  (debug:print-info 1 "Marking " host ":" pullport " as dead in server registry.")
		  (if pullport
		      (open-run-close tasks:server-deregister tasks:open-db host pullport: pullport)
		      (open-run-close tasks:server-deregister tasks:open-db host pid:  pid))
		  (if (null? tal)
		      #f
		      (loop (car tal)(cdr tal))))))))))

(define (tasks:mark-server hostname pullport pid state)
  (if port
      (open-run-close tasks:server-deregister tasks:open-db hostname port: port)
      (open-run-close tasks:server-deregister tasks:open-db hostname pid:  pid)))


;; NOTE: NOT PORTED TO WORK WITH pullport/pubport
(define (tasks:kill-server status hostname port pid)
  (debug:print-info 1 "Removing defunct server record for " hostname ":" port)
  (if port
      (open-run-close tasks:server-deregister tasks:open-db hostname port: port)
      (open-run-close tasks:server-deregister tasks:open-db hostname pid:  pid))
  (if status ;; #t means alive
      (begin
210
211
212
213
214
215
216


217
218
219
220
221
222


223
224

225
226
227
228
229
230
231
234
235
236
237
238
239
240
241
242
243
244
245
246


247
248
249

250
251
252
253
254
255
256
257







+
+




-
-
+
+

-
+







	    (if (equal? hostname (get-host-name))
		(begin
		  (debug:print-info 1 "Sending signal/term to " pid " on " hostname)
		  (process-signal pid signal/term)  ;; local machine, send sig term
		  (thread-sleep! 5)                 ;; give it five seconds to die peacefully then do a brutal kill
		  (process-signal pid signal/kill)) 
		(debug:print 0 "WARNING: Can't kill frozen server on remote host " hostname))))))



(define (tasks:get-all-servers mdb)
  (let ((res '()))
    (sqlite3:for-each-row
     (lambda (id pid hostname interface port start-time priority state mt-version)
       (set! res (cons (vector id pid hostname interface port start-time priority state mt-version) res)))
     (lambda (id pid hostname interface pullport pubport start-time priority state mt-version last-update)
       (set! res (cons (vector id pid hostname interface pullport pubport start-time priority state mt-version last-update) res)))
     mdb
     "SELECT id,pid,hostname,interface,port,start_time,priority,state,mt_version FROM servers ORDER BY start_time DESC;")
     "SELECT id,pid,hostname,interface,pullport,pubport,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update FROM servers ORDER BY start_time DESC;")
    res))
       

;;======================================================================
;; Tasks and Task monitors
;;======================================================================

Added testzmq/mockupclient.scm version [3f321e6f75].




































1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
(use zmq posix)

(define cname "Bob")
(define runtime 10)
(let ((args (argv)))
  (if (< (length args) 3)
      (begin
	(print "Usage: mockupclient clientname runtime")
	(exit))
      (begin
	(set! cname (cadr args))
	(set! runtime (string->number (caddr args))))))
      
;; (define start-delay (/ (random 100) 9))
;; (define runtime     (+ 1 (/ (random 200) 2)))

(print "Starting client " cname " with runtime " runtime)

(include "mockupclientlib.scm")

(set! endtime (+ (current-seconds) runtime))

(let loop ()
  (let ((x (random 15))
	(varname (list-ref (list "hello" "goodbye" "saluton" "kiaorana")(random 4))))
    (case x
      ;; ((1)(dbaccess cname 'sync "nodat"    #f))
      ((2 3 4 5)(dbaccess cname 'set varname (random 999)))
      ((6 7 8 9 10)(print cname ": Get \"" varname "\" " (dbaccess cname 'get varname #f)))
      (else
       (thread-sleep! 0.011)))
    (if (< (current-seconds) endtime)
	(loop))))

(print "Client " cname " all done!!")

Added testzmq/mockupserver.scm version [5ff56c6e40].







































































































































1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
;; pub/sub with envelope address
;; Note that if you don't insert a sleep, the server will crash with SIGPIPE as soon
;; as a client disconnects.  Also a remaining client may receive tons of
;; messages afterward.

(use zmq srfi-18 sqlite3)

(define pub (make-socket 'pub))
(define pull (make-socket 'pull))
(define cname "server")
(define total-db-accesses 0)
(define start-time (current-seconds))

(bind-socket pub "tcp://*:5563")
(bind-socket pull "tcp://*:5564")

(define (open-db)
  (let* ((dbpath    "mockup.db")
	 (dbexists  (file-exists? dbpath))
	 (db        (open-database dbpath)) ;; (never-give-up-open-db dbpath))
	 (handler   (make-busy-timeout 10)))
    (set-busy-handler! db handler)
    (if (not dbexists)
	(for-each
	 (lambda (stmt)
	   (execute db stmt))
	 (list
	  "PRAGMA SYNCHRONOUS=0;"
	  "CREATE TABLE clients (id INTEGER PRIMARY KEY,name TEXT,num_accesses INTEGER DEFAULT 0);"
	  "CREATE TABLE vars    (var TEXT,val TEXT,CONSTRAINT vars_constraint UNIQUE (var));")))
    db))

(define cid-cache (make-hash-table))

(define (get-client-id db cname)
  (let ((cid (hash-table-ref/default cid-cache cname #f)))
    (if cid 
	cid
	(begin
	  (execute db "INSERT OR REPLACE INTO clients (name) VALUES(?);" cname)
	  (for-each-row 
	   (lambda (id)
	     (set! cid id))
	   db
	   "SELECT id FROM clients WHERE name=?;" cname)
	  (hash-table-set! cid-cache cname cid)
	  (set! total-db-accesses (+ total-db-accesses 2))
	  cid))))

(define (count-client db cname)
  (let ((cid (get-client-id db cname)))
    (execute db "UPDATE clients SET num_accesses=num_accesses+1 WHERE id=?;" cid)
    (set! total-db-accesses (+ total-db-accesses 1))
    ))

(define db (open-db))
;; (define queuelst '())
;; (define mx1 (make-mutex))

(define (process-queue queuelst)
  (let ((queuelen (length queuelst)))
    (for-each
     (lambda (item)
       (let ((cname (vector-ref item 1))
	     (clcmd (vector-ref item 2))
	     (cdata (vector-ref item 3)))
	 (send-message pub cname send-more: #t)
	 (send-message pub (case clcmd
			     ((sync)
			      (conc queuelen))
			     ((set)
			      (set! total-db-accesses (+ total-db-accesses 1))
			      (apply execute db "INSERT OR REPLACE INTO vars (var,val) VALUES (?,?);" (string-split cdata))
			      "ok")
			     ((get)
			      (set! total-db-accesses (+ total-db-accesses 1))
			      (let ((res "noval"))
				(for-each-row
				 (lambda (val)
				   (set! res val))
				 db 
				 "SELECT val FROM vars WHERE var=?;" cdata)
				res))
			     (else (conc "unk cmd: " clcmd))))))
     queuelst)))

(define th1 (make-thread 
	     (lambda ()
	       (let ((last-run 0)) ;; current-seconds when run last
		 (let loop ((queuelst '()))
		   (let* ((indat (receive-message* pull))
			  (parts (string-split indat ":"))
			  (cname (car parts))                   ;; client name
			  (clcmd (string->symbol (cadr parts))) ;; client cmd
			  (cdata (caddr parts))                 ;; client data
			  (svect (vector (current-seconds) cname clcmd cdata))) ;; record for the queue
		     (count-client db cname)
		     (case clcmd
		       ((sync) ;; just process the queue
			(print "Got sync from " cname)
			(process-queue (cons svect queuelst))
			(loop '()))
		       ((get)
			(process-queue (cons svect queuelst))
			(loop '()))
		       (else
			(loop (cons svect queuelst))))))))
	     "server thread"))

(include "mockupclientlib.scm")

;; send a sync to the pull port
(define th2 (make-thread
	     (lambda ()
	       (let ((last-action-time (current-seconds)))
		 (let loop ()
		   (thread-sleep! 5)
		   (let ((queuelen (string->number (dbaccess "server" 'sync "nada" #f)))
			 (last-action-delta #f))
		     (if (> queuelen 1)(set! last-action-time (current-seconds)))
		     (set! last-action-delta (- (current-seconds) last-action-time))
		     (print "Server: Got queuelen=" queuelen ", last-action-delta=" last-action-delta)
		     (if (< last-action-delta 60)
			 (loop)
			 (print "Server exiting, 25 seconds since last access"))))))
	     "sync thread"))

(thread-start! th1)
(thread-start! th2)
(thread-join! th2)

(let* ((run-time       (- (current-seconds) start-time))
       (queries/second (/  total-db-accesses run-time)))
  (print "Server exited! Total db accesses=" total-db-accesses " in " run-time " seconds for " queries/second " queries/second"))

Added testzmq/testmockup.sh version [15deaa0e30].


































1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
#!/bin/bash

rm -f mockup.db

echo Compiling mockupserver.scm and mockupclient.scm
csc mockupserver.scm
csc mockupclient.scm

echo Starting server
./mockupserver &

sleep 1

echo Starting clients
for i in a b c d e f g h i j k l m n o p q s t u v w x y z; 
  do
  for k in a b;
    do
    for j in 0 1 2 3 4 5 6 7 8 9;
      do
      waittime=`random 0 60`
      runtime=`random 5 120`
      echo "Starting client $i$k$j with waittime $waittime and runtime $runtime" 
      (sleep $waittime;./mockupclient $i$k$j $runtime) &
    done
  done
done

wait
echo testmockup.sh script done
# echo "Waiting for 5 seconds then killing all mockupserver and mockupclient processes"
# sleep 30
# killall -v mockupserver mockupclient

Modified utils/installall.sh from [e461189985] to [6bb115f66a].

1

2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

17
18
19
20
21
22
23

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

16
17
18
19
20
21
22
23
-
+














-
+







#!/bin/bash
#! /bin/env bash

set -x

# Copyright 2007-2010, Matthew Welland.
# 
#  This program is made available under the GNU GPL version 2.0 or
#  greater. See the accompanying file COPYING for details.
# 
#  This program is distributed WITHOUT ANY WARRANTY; without even the
#  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
#  PURPOSE.

echo You may need to do the following first:
echo sudo apt-get install libreadline-dev
echo sudo apt-get install libwebkitgtk-dev
echo sudo apt-get install libwebkitgtk-dev 
echo sudo apt-get install libmotif3 -OR- set KTYPE=26g4
echo KTYPE can be 26, 26g4, or 32
echo KTYPE=$KTYPE
echo You are using PREFIX=$PREFIX
echo You are using proxy="$proxy"
echo 
echo "Set additional_libpath to help find gtk or other libraries, don't forget a leading :"
266
267
268
269
270
271
272
273


274
275
276
277
278
279
280
266
267
268
269
270
271
272

273
274
275
276
277
278
279
280
281







-
+
+








if [[ -e ${ZEROMQ}${zpatchlev}.tar.gz ]] ; then
    tar xfz ${ZEROMQ}.tar.gz
    cd ${ZEROMQ}
    ln -s $PREFIX/include/uuid src
    # LDFLAGS=-L$PREFIX/lib ./configure --prefix=$PREFIX 
    
    ./configure --enable-static --disable-shared --prefix=$PREFIX --with-uuid=$PREFIX LDFLAGS="-L$PREFIX/lib" CPPFLAGS="-fPIC -I$PREFIX/include" LIBS="-lgcc"
    ./configure --enable-static --prefix=$PREFIX --with-uuid=$PREFIX LDFLAGS="-L$PREFIX/lib" CPPFLAGS="-fPIC -I$PREFIX/include" LIBS="-lgcc"
    # --disable-shared CPPFLAGS="-fPIC 
    # LDFLAGS="-L/usr/lib64 -L$PREFIX/lib" ./configure --enable-static --prefix=$PREFIX 
    make
    make install
    CSC_OPTIONS="-I$PREFIX/include -L$CSCLIBS" chicken-install $PROX zmq
    # CSC_OPTIONS="-I$PREFIX/include -L$CSCLIBS" chicken-install $PROX -deploy -prefix $DEPLOYTARG zmq
fi