Skip to content

Commit b01d777

Browse files
authored
Merge pull request #70 from scality/improvement/LOGC-21
LOGC-21: Use distributed ClickHouse set for tests
2 parents e668898 + 1a0da58 commit b01d777

14 files changed

Lines changed: 332 additions & 128 deletions

File tree

.github/workflows/tests.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ jobs:
4343

4444
- name: Wait for ClickHouse to be ready
4545
run: |
46-
timeout 30 bash -c 'until docker compose exec -T clickhouse clickhouse-client --query "SELECT 1" > /dev/null 2>&1; do sleep 1; done'
46+
timeout 30 bash -c 'until docker compose exec -T clickhouse-shard-1 clickhouse-client --query "SELECT 1" > /dev/null 2>&1; do sleep 1; done'
47+
timeout 30 bash -c 'until docker compose exec -T clickhouse-shard-2 clickhouse-client --query "SELECT 1" > /dev/null 2>&1; do sleep 1; done'
4748
4849
- name: Run GolangCI linter
4950
uses: golangci/golangci-lint-action@v9

deployment/clickhouse-config.xml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<?xml version="1.0"?>
2+
<clickhouse>
3+
<remote_servers>
4+
<test_cluster>
5+
<shard>
6+
<replica>
7+
<host>clickhouse-shard-1</host>
8+
<port>9000</port>
9+
<user>default</user>
10+
<password></password>
11+
</replica>
12+
</shard>
13+
<shard>
14+
<replica>
15+
<host>clickhouse-shard-2</host>
16+
<port>9000</port>
17+
<user>default</user>
18+
<password></password>
19+
</replica>
20+
</shard>
21+
</test_cluster>
22+
</remote_servers>
23+
</clickhouse>

deployment/conf/env

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# ClickHouse configuration
2-
export LOG_COURIER_CLICKHOUSE_URL="localhost:9002"
2+
export LOG_COURIER_CLICKHOUSE_URL="localhost:9002,localhost:9003"
33
export LOG_COURIER_CLICKHOUSE_USERNAME="default"
44
export LOG_COURIER_CLICKHOUSE_PASSWORD=""
55

docker-compose.yml

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
services:
2-
clickhouse:
2+
clickhouse-shard-1:
33
image: docker.io/clickhouse/clickhouse-server:24.3.2.23-alpine
4-
container_name: log-courier-clickhouse-1
4+
container_name: log-courier-clickhouse-shard-1
5+
hostname: clickhouse-shard-1
56
ports:
67
- "9002:9000" # Native protocol
78
- "8123:8123" # HTTP interface
@@ -10,7 +11,28 @@ services:
1011
CLICKHOUSE_USER: default
1112
CLICKHOUSE_PASSWORD: ""
1213
volumes:
13-
- clickhouse-data-1:/var/lib/clickhouse
14+
- clickhouse-shard-1-data:/var/lib/clickhouse
15+
- ./deployment/clickhouse-config.xml:/etc/clickhouse-server/config.d/cluster.xml:ro
16+
healthcheck:
17+
test: ["CMD", "clickhouse-client", "--query", "SELECT 1"]
18+
interval: 10s
19+
timeout: 5s
20+
retries: 5
21+
22+
clickhouse-shard-2:
23+
image: docker.io/clickhouse/clickhouse-server:24.3.2.23-alpine
24+
container_name: log-courier-clickhouse-shard-2
25+
hostname: clickhouse-shard-2
26+
ports:
27+
- "9003:9000" # Native protocol
28+
- "8124:8123" # HTTP interface
29+
environment:
30+
CLICKHOUSE_DB: logs
31+
CLICKHOUSE_USER: default
32+
CLICKHOUSE_PASSWORD: ""
33+
volumes:
34+
- clickhouse-shard-2-data:/var/lib/clickhouse
35+
- ./deployment/clickhouse-config.xml:/etc/clickhouse-server/config.d/cluster.xml:ro
1436
healthcheck:
1537
test: ["CMD", "clickhouse-client", "--query", "SELECT 1"]
1638
interval: 10s
@@ -27,7 +49,9 @@ services:
2749
profiles:
2850
- full
2951
depends_on:
30-
clickhouse:
52+
clickhouse-shard-1:
53+
condition: service_healthy
54+
clickhouse-shard-2:
3155
condition: service_healthy
3256
environment:
3357
SUPERVISORD_CONF: supervisord.conf
@@ -36,5 +60,7 @@ services:
3660
- ./deployment/conf/env:/conf/env:ro
3761

3862
volumes:
39-
clickhouse-data-1:
63+
clickhouse-shard-1-data:
64+
driver: local
65+
clickhouse-shard-2-data:
4066
driver: local

pkg/clickhouse/client.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ type Config struct {
2828
MaxBackoff time.Duration
2929
NumWorkers int // Number of parallel workers, used to size connection pool
3030
Logger *slog.Logger
31+
Settings map[string]interface{} // Optional ClickHouse settings (e.g., for tests)
3132
}
3233

3334
// NewClient creates a new ClickHouse client
@@ -55,6 +56,11 @@ func NewClient(ctx context.Context, cfg Config) (*Client, error) {
5556
ConnMaxLifetime: 5 * time.Minute,
5657
}
5758

59+
// Apply optional settings if provided (e.g., for test configuration)
60+
if len(cfg.Settings) > 0 {
61+
options.Settings = clickhouse.Settings(cfg.Settings)
62+
}
63+
5864
var conn driver.Conn
5965
var err error
6066
backoff := cfg.InitialBackoff

pkg/clickhouse/client_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,16 +35,16 @@ var _ = Describe("ClickHouse Client", func() {
3535

3636
Describe("Connection", func() {
3737
It("should connect successfully", func() {
38-
Expect(helper.Client).NotTo(BeNil())
38+
Expect(helper.Client()).NotTo(BeNil())
3939
})
4040

4141
It("should execute simple queries", func() {
42-
err := helper.Client.Exec(ctx, "SELECT 1")
42+
err := helper.Client().Exec(ctx, "SELECT 1")
4343
Expect(err).NotTo(HaveOccurred())
4444
})
4545

4646
It("should query and return results", func() {
47-
rows, err := helper.Client.Query(ctx, "SELECT 1 AS value")
47+
rows, err := helper.Client().Query(ctx, "SELECT 1 AS value")
4848
Expect(err).NotTo(HaveOccurred())
4949
defer func() { _ = rows.Close() }()
5050

@@ -82,14 +82,14 @@ var _ = Describe("ClickHouse Client", func() {
8282
// Verify database exists
8383
var dbCount uint64
8484
query := fmt.Sprintf("SELECT count() FROM system.databases WHERE name = '%s'", helper.DatabaseName)
85-
err = helper.Client.QueryRow(ctx, query).Scan(&dbCount)
85+
err = helper.Client().QueryRow(ctx, query).Scan(&dbCount)
8686
Expect(err).NotTo(HaveOccurred())
8787
Expect(dbCount).To(Equal(uint64(1)))
8888

8989
// Verify tables exist
9090
var tableCount uint64
9191
query = fmt.Sprintf("SELECT count() FROM system.tables WHERE database = '%s' AND name IN ('access_logs_federated', 'offsets_federated')", helper.DatabaseName)
92-
err = helper.Client.QueryRow(ctx, query).Scan(&tableCount)
92+
err = helper.Client().QueryRow(ctx, query).Scan(&tableCount)
9393
Expect(err).NotTo(HaveOccurred())
9494
Expect(tableCount).To(Equal(uint64(2)))
9595
})
@@ -104,7 +104,7 @@ var _ = Describe("ClickHouse Client", func() {
104104
// Verify database is gone
105105
var dbCount uint64
106106
query := fmt.Sprintf("SELECT count() FROM system.databases WHERE name = '%s'", helper.DatabaseName)
107-
err = helper.Client.QueryRow(ctx, query).Scan(&dbCount)
107+
err = helper.Client().QueryRow(ctx, query).Scan(&dbCount)
108108
Expect(err).NotTo(HaveOccurred())
109109
Expect(dbCount).To(Equal(uint64(0)))
110110
})
@@ -138,7 +138,7 @@ var _ = Describe("ClickHouse Client", func() {
138138
// Verify the log was inserted
139139
var count uint64
140140
query := fmt.Sprintf("SELECT COUNT(*) FROM %s.%s WHERE bucketName = 'test-bucket'", helper.DatabaseName, clickhouse.TableAccessLogsFederated)
141-
err = helper.Client.QueryRow(ctx, query).Scan(&count)
141+
err = helper.Client().QueryRow(ctx, query).Scan(&count)
142142
Expect(err).NotTo(HaveOccurred())
143143
Expect(count).To(Equal(uint64(1)))
144144
})
@@ -165,7 +165,7 @@ var _ = Describe("ClickHouse Client", func() {
165165
// Verify all 5 records are present
166166
var count uint64
167167
query := fmt.Sprintf("SELECT COUNT(*) FROM %s.%s WHERE bucketName = 'test-bucket-multi'", helper.DatabaseName, clickhouse.TableAccessLogsFederated)
168-
err := helper.Client.QueryRow(ctx, query).Scan(&count)
168+
err := helper.Client().QueryRow(ctx, query).Scan(&count)
169169
Expect(err).NotTo(HaveOccurred())
170170
Expect(count).To(Equal(uint64(5)))
171171
})

pkg/logcourier/batchfinder_test.go

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ var _ = Describe("BatchFinder", func() {
3030
err = helper.SetupSchema(ctx)
3131
Expect(err).NotTo(HaveOccurred())
3232

33-
finder = logcourier.NewBatchFinder(helper.Client, helper.DatabaseName, 5, 60, 3)
33+
finder = logcourier.NewBatchFinder(helper.Client(), helper.DatabaseName, 5, 60, 3)
3434
})
3535

3636
AfterEach(func() {
@@ -80,7 +80,7 @@ var _ = Describe("BatchFinder", func() {
8080
(insertedAt, bucketName, timestamp, req_id, operation, loggingEnabled, raftSessionID, requestURI)
8181
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
8282
`, helper.DatabaseName, clickhouse.TableAccessLogsFederated)
83-
err := helper.Client.Exec(ctx, query,
83+
err := helper.Client().Exec(ctx, query,
8484
oldTime.Add(time.Duration(i)*time.Second), // insertedAt
8585
"test-bucket", // bucketName
8686
oldTime.Add(time.Duration(i)*time.Second), // timestamp
@@ -98,7 +98,7 @@ var _ = Describe("BatchFinder", func() {
9898
offsetQuery := fmt.Sprintf(
9999
"INSERT INTO %s.%s (bucketName, raftSessionID, lastProcessedInsertedAt, lastProcessedTimestamp, lastProcessedReqId) VALUES (?, ?, ?, ?, ?)",
100100
helper.DatabaseName, clickhouse.TableOffsetsFederated)
101-
err := helper.Client.Exec(ctx, offsetQuery,
101+
err := helper.Client().Exec(ctx, offsetQuery,
102102
"test-bucket", uint16(0), lastLogTime, lastLogTime, "req-007")
103103
Expect(err).NotTo(HaveOccurred())
104104

@@ -122,7 +122,7 @@ var _ = Describe("BatchFinder", func() {
122122
loggingTargetBucket, loggingTargetPrefix, raftSessionID)
123123
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
124124
`, helper.DatabaseName, clickhouse.TableAccessLogsFederated)
125-
err := helper.Client.Exec(ctx, query,
125+
err := helper.Client().Exec(ctx, query,
126126
"", // bucketOwner
127127
"test-bucket", // bucketName
128128
time.Now(), // startTime
@@ -170,7 +170,7 @@ var _ = Describe("BatchFinder", func() {
170170
(insertedAt, bucketName, timestamp, req_id, operation, loggingEnabled, raftSessionID, requestURI)
171171
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
172172
`, helper.DatabaseName, clickhouse.TableAccessLogsFederated)
173-
err := helper.Client.Exec(ctx, query,
173+
err := helper.Client().Exec(ctx, query,
174174
oldTime, // insertedAt (old)
175175
"test-bucket", // bucketName
176176
oldTime, // timestamp
@@ -221,7 +221,7 @@ var _ = Describe("BatchFinder", func() {
221221
(insertedAt, bucketName, timestamp, req_id, operation, loggingEnabled, raftSessionID, requestURI)
222222
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
223223
`, helper.DatabaseName, clickhouse.TableAccessLogsFederated)
224-
err := helper.Client.Exec(ctx, query,
224+
err := helper.Client().Exec(ctx, query,
225225
baseTime.Add(time.Duration(i)*time.Second), // insertedAt varies
226226
"test-bucket", // bucketName
227227
timestamp, // timestamp (same for all)
@@ -239,7 +239,7 @@ var _ = Describe("BatchFinder", func() {
239239
offsetQuery := fmt.Sprintf(
240240
"INSERT INTO %s.%s (bucketName, raftSessionID, lastProcessedInsertedAt, lastProcessedTimestamp, lastProcessedReqId) VALUES (?, ?, ?, ?, ?)",
241241
helper.DatabaseName, clickhouse.TableOffsetsFederated)
242-
err := helper.Client.Exec(ctx, offsetQuery,
242+
err := helper.Client().Exec(ctx, offsetQuery,
243243
"test-bucket", uint16(0), offsetTime, timestamp, reqID)
244244
Expect(err).NotTo(HaveOccurred())
245245

@@ -262,7 +262,7 @@ var _ = Describe("BatchFinder", func() {
262262
(insertedAt, bucketName, timestamp, req_id, operation, loggingEnabled, raftSessionID, requestURI)
263263
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
264264
`, helper.DatabaseName, clickhouse.TableAccessLogsFederated)
265-
err := helper.Client.Exec(ctx, query,
265+
err := helper.Client().Exec(ctx, query,
266266
insertedAt, // insertedAt (same for all)
267267
"test-bucket", // bucketName
268268
baseTime.Add(time.Duration(i)*time.Second), // timestamp varies
@@ -280,7 +280,7 @@ var _ = Describe("BatchFinder", func() {
280280
offsetQuery := fmt.Sprintf(
281281
"INSERT INTO %s.%s (bucketName, raftSessionID, lastProcessedInsertedAt, lastProcessedTimestamp, lastProcessedReqId) VALUES (?, ?, ?, ?, ?)",
282282
helper.DatabaseName, clickhouse.TableOffsetsFederated)
283-
err := helper.Client.Exec(ctx, offsetQuery,
283+
err := helper.Client().Exec(ctx, offsetQuery,
284284
"test-bucket", uint16(0), insertedAt, offsetTimestamp, "req-003")
285285
Expect(err).NotTo(HaveOccurred())
286286

@@ -303,7 +303,7 @@ var _ = Describe("BatchFinder", func() {
303303
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
304304
`, helper.DatabaseName, clickhouse.TableAccessLogsFederated)
305305
insertedAt := baseTime.Add(time.Duration(i) * time.Second)
306-
err := helper.Client.Exec(ctx, query,
306+
err := helper.Client().Exec(ctx, query,
307307
insertedAt, // insertedAt
308308
"test-bucket", // bucketName
309309
insertedAt, // timestamp
@@ -322,17 +322,17 @@ var _ = Describe("BatchFinder", func() {
322322
helper.DatabaseName, clickhouse.TableOffsetsFederated)
323323

324324
t0 := baseTime
325-
err := helper.Client.Exec(ctx, offsetQuery,
325+
err := helper.Client().Exec(ctx, offsetQuery,
326326
"test-bucket", uint16(0), t0, t0, "req-000")
327327
Expect(err).NotTo(HaveOccurred())
328328

329329
t2 := baseTime.Add(2 * time.Second)
330-
err = helper.Client.Exec(ctx, offsetQuery,
330+
err = helper.Client().Exec(ctx, offsetQuery,
331331
"test-bucket", uint16(0), t2, t2, "req-002")
332332
Expect(err).NotTo(HaveOccurred())
333333

334334
t1 := baseTime.Add(1 * time.Second)
335-
err = helper.Client.Exec(ctx, offsetQuery,
335+
err = helper.Client().Exec(ctx, offsetQuery,
336336
"test-bucket", uint16(0), t1, t1, "req-001")
337337
Expect(err).NotTo(HaveOccurred())
338338

@@ -390,7 +390,7 @@ var _ = Describe("BatchFinder", func() {
390390
(insertedAt, bucketName, timestamp, req_id, operation, loggingEnabled, raftSessionID, requestURI)
391391
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
392392
`, helper.DatabaseName, clickhouse.TableAccessLogsFederated)
393-
err := helper.Client.Exec(ctx, query,
393+
err := helper.Client().Exec(ctx, query,
394394
oldTime, // insertedAt (older)
395395
"bucket-A", // bucketName
396396
oldTime, // timestamp
@@ -410,7 +410,7 @@ var _ = Describe("BatchFinder", func() {
410410
(insertedAt, bucketName, timestamp, req_id, operation, loggingEnabled, raftSessionID, requestURI)
411411
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
412412
`, helper.DatabaseName, clickhouse.TableAccessLogsFederated)
413-
err := helper.Client.Exec(ctx, query,
413+
err := helper.Client().Exec(ctx, query,
414414
newerTime, // insertedAt (newer)
415415
"bucket-B", // bucketName
416416
newerTime, // timestamp
@@ -489,15 +489,15 @@ var _ = Describe("BatchFinder", func() {
489489
(insertedAt, bucketName, timestamp, req_id, operation, loggingEnabled, raftSessionID, requestURI)
490490
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
491491
`, helper.DatabaseName, clickhouse.TableAccessLogsFederated)
492-
err := helper.Client.Exec(ctx, query,
492+
err := helper.Client().Exec(ctx, query,
493493
logAInsertedAt, "test-bucket", logATimestamp, "req-A",
494494
"GetObject", true, uint16(0), "/test-bucket/key-a")
495495
Expect(err).NotTo(HaveOccurred())
496496

497497
// Insert Log B: operation at :01, arrives at :05
498498
logBTimestamp := baseTime.Add(1 * time.Second)
499499
logBInsertedAt := baseTime.Add(5 * time.Second)
500-
err = helper.Client.Exec(ctx, query,
500+
err = helper.Client().Exec(ctx, query,
501501
logBInsertedAt, "test-bucket", logBTimestamp, "req-B",
502502
"GetObject", true, uint16(0), "/test-bucket/key-b")
503503
Expect(err).NotTo(HaveOccurred())
@@ -509,15 +509,15 @@ var _ = Describe("BatchFinder", func() {
509509
Expect(batches[0].LogCount).To(BeNumerically(">=", 2))
510510

511511
// Cycle 1: Fetch logs and simulate processing
512-
fetcher := logcourier.NewLogFetcher(helper.Client, helper.DatabaseName, 10000)
512+
fetcher := logcourier.NewLogFetcher(helper.Client(), helper.DatabaseName, 10000)
513513
records, err := fetcher.FetchLogs(ctx, batches[0])
514514
Expect(err).NotTo(HaveOccurred())
515515
Expect(records).To(HaveLen(2))
516516

517517
// Cycle 1: Commit offset from last fetched log
518518
// (simulating what Processor does)
519519
lastLog := records[len(records)-1]
520-
offsetMgr := logcourier.NewOffsetManager(helper.Client, helper.DatabaseName)
520+
offsetMgr := logcourier.NewOffsetManager(helper.Client(), helper.DatabaseName)
521521
err = offsetMgr.CommitOffset(ctx, "test-bucket", uint16(0), logcourier.Offset{
522522
InsertedAt: lastLog.InsertedAt,
523523
Timestamp: lastLog.Timestamp,
@@ -554,7 +554,7 @@ var _ = Describe("BatchFinder", func() {
554554
`, helper.DatabaseName, clickhouse.TableAccessLogsFederated)
555555

556556
for i := 0; i < 5; i++ {
557-
err := helper.Client.Exec(ctx, query,
557+
err := helper.Client().Exec(ctx, query,
558558
oldTime,
559559
"bucket-old",
560560
oldTime,
@@ -588,7 +588,7 @@ var _ = Describe("BatchFinder", func() {
588588
`, helper.DatabaseName, clickhouse.TableAccessLogsFederated)
589589

590590
for i := 0; i < 5; i++ {
591-
err := helper.Client.Exec(ctx, query,
591+
err := helper.Client().Exec(ctx, query,
592592
insertTime,
593593
bucketName,
594594
insertTime,

pkg/logcourier/configspec.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ var ConfigSpec = util.ConfigSpec{
1313
// ClickHouse connection
1414
"clickhouse.url": util.ConfigVarSpec{
1515
Help: "ClickHouse connection URL(s) - single host or comma-separated list (e.g., 'host1:9000,host2:9000')",
16-
DefaultValue: "localhost:9002",
16+
DefaultValue: []string{"localhost:9002", "localhost:9003"},
1717
EnvVar: "LOG_COURIER_CLICKHOUSE_URL",
1818
ParseFunc: func(rawValue any) (any, error) {
1919
if str, ok := rawValue.(string); ok {

0 commit comments

Comments
 (0)