Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions .github/workflows/build_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ jobs:
ports:
- 6379

valkey:
image: valkey/valkey-bundle:unstable
ports:
- 6380

rustfs:
image: rustfs/rustfs:alpha
ports:
Expand Down Expand Up @@ -133,6 +138,8 @@ jobs:
CLICKHOUSE_URI: clickhouse://localhost:${{ job.services.clickhouse.ports[8123] }}/
# Redis
REDIS_URI: redis://localhost:${{ job.services.redis.ports[6379] }}/
# Valkey
VALKEY_URI: valkey://localhost:${{ job.services.valkey.ports[6380] }}/
# S3
S3_ENDPOINT: localhost:${{ job.services.rustfs.ports[9000] }}
S3_ACCESS_KEY_ID: rustfsadmin
Expand Down Expand Up @@ -181,7 +188,7 @@ jobs:
go-version-file: ./go.mod

- name: Test
run: go test -timeout 20m -v ./... -skip "TestPostgres|TestMySQL|TestMongo|TestRedis|TestClickHouse|TestMilvus|TestQdrant|TestWeaviate"
run: go test -timeout 20m -v ./... -skip "TestPostgres|TestMySQL|TestMongo|TestRedis|TestClickHouse|TestMilvus|TestQdrant|TestWeaviate|TestValkey"

unit_test_windows:
strategy:
Expand Down Expand Up @@ -215,7 +222,7 @@ jobs:
go-version-file: ./go.mod

- name: Test
run: go test -timeout 20m -v ./... -skip "TestPostgres|TestMySQL|TestMongo|TestRedis|TestClickHouse|TestMilvus|TestQdrant|TestWeaviate"
run: go test -timeout 20m -v ./... -skip "TestPostgres|TestMySQL|TestMongo|TestRedis|TestClickHouse|TestMilvus|TestQdrant|TestWeaviate|TestValkey"

integrate_test:
name: integrate tests
Expand Down
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,10 @@ func (config *Config) Validate() error {
storage.RedissPrefix,
storage.RedisClusterPrefix,
storage.RedissClusterPrefix,
storage.ValkeyPrefix,
storage.ValkeysPrefix,
storage.ValkeyClusterPrefix,
storage.ValkeysClusterPrefix,
storage.MongoPrefix,
storage.MongoSrvPrefix,
storage.MySQLPrefix,
Expand Down
6 changes: 5 additions & 1 deletion config/config.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
[database]

# The database for caching, support Redis, MySQL, Postgres and MongoDB:
# The database for caching, support Redis, Valkey, MySQL, Postgres and MongoDB:
# redis://<user>:<password>@<host>:<port>/<db_number>
# rediss://<user>:<password>@<host>:<port>/<db_number>
# redis+cluster://<user>:<password>@<host>:<port>[?addr=<host2>:<port2>&addr=<host3>:<port3>]
# rediss+cluster://<user>:<password>@<host>:<port>[?addr=<host2>:<port2>&addr=<host3>:<port3>]
# valkey://<user>:<password>@<host>:<port>/<db_number>
# valkeys://<user>:<password>@<host>:<port>/<db_number>
# valkey+cluster://<user>:<password>@<host>:<port>[?addr=<host2>:<port2>&addr=<host3>:<port3>]
# valkeys+cluster://<user>:<password>@<host>:<port>[?addr=<host2>:<port2>&addr=<host3>:<port3>]
# mysql://[username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]
# postgres://bob:secret@1.2.3.4:5432/mydb?sslmode=verify-full
# postgresql://bob:secret@1.2.3.4:5432/mydb?sslmode=verify-full
Expand Down
16 changes: 16 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,4 +503,20 @@ func (s *ValidateTestSuite) TestCacheStore() {
// Test that rediss+cluster:// prefix is accepted for cache_store
s.Database.CacheStore = "rediss+cluster://:password@192.168.1.11:6379?addr=192.168.0.5:6379"
s.NoError(s.Validate())

// Test that valkey:// prefix is accepted for cache_store
s.Database.CacheStore = "valkey://localhost:6379/0"
s.NoError(s.Validate())

// Test that valkeys:// prefix is accepted for cache_store
s.Database.CacheStore = "valkeys://localhost:6379/0"
s.NoError(s.Validate())

// Test that valkey+cluster:// prefix is accepted for cache_store
s.Database.CacheStore = "valkey+cluster://:password@192.168.1.11:6379?addr=192.168.0.5:6379&addr=192.168.0.7:6379"
s.NoError(s.Validate())

// Test that valkeys+cluster:// prefix is accepted for cache_store
s.Database.CacheStore = "valkeys+cluster://:password@192.168.1.11:6379?addr=192.168.0.5:6379"
s.NoError(s.Validate())
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ require (
github.com/stretchr/testify v1.11.1
github.com/swaggest/swgui v1.8.5
github.com/tiktoken-go/tokenizer v0.7.0
github.com/valkey-io/valkey-go v1.0.74
github.com/weaviate/weaviate v1.27.0
github.com/weaviate/weaviate-go-client/v4 v4.16.1
github.com/yuin/goldmark v1.7.16
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -740,8 +740,8 @@ github.com/onsi/ginkgo v1.10.3/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+
github.com/onsi/ginkgo/v2 v2.23.4 h1:ktYTpKJAVZnDT4VjxSbiBenUjmlL/5QkBEocaWXiQus=
github.com/onsi/ginkgo/v2 v2.23.4/go.mod h1:Bt66ApGPBFzHyR+JO10Zbt0Gsp4uWxu5mIOTusL46e8=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.37.0 h1:CdEG8g0S133B4OswTDC/5XPSzE1OeP29QOioj2PID2Y=
github.com/onsi/gomega v1.37.0/go.mod h1:8D9+Txp43QWKhM24yyOBEdpkzN8FvJyAwecBgsU4KU0=
github.com/onsi/gomega v1.38.3 h1:eTX+W6dobAYfFeGC2PV6RwXRu/MyT+cQguijutvkpSM=
github.com/onsi/gomega v1.38.3/go.mod h1:ZCU1pkQcXDO5Sl9/VVEGlDyp+zm0m1cmeG5TOzLgdh4=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/openzipkin/zipkin-go v0.4.3 h1:9EGwpqkgnwdEIJ+Od7QVSEIH+ocmm5nPat0G7sjsSdg=
github.com/openzipkin/zipkin-go v0.4.3/go.mod h1:M9wCJZFWCo2RiY+o1eBCEMe0Dp2S5LDHcMZmk3RmK7c=
Expand Down Expand Up @@ -921,6 +921,8 @@ github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVM
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4=
github.com/valkey-io/valkey-go v1.0.74 h1:NqtBHzjybz+is+c71hsyZP7hoE5lwCHQX026me0Vb08=
github.com/valkey-io/valkey-go v1.0.74/go.mod h1:VGhZ6fs68Qrn2+OhH+6waZH27bjpgQOiLyUQyXuYK5k=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.6.0/go.mod h1:FstJa9V+Pj9vQ7OJie2qMHdwemEDaDiSdBnvPM1Su9w=
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
Expand Down
149 changes: 149 additions & 0 deletions storage/cache/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,21 @@ func benchmark(b *testing.B, database Database) {
b.Run("UpdateScores", func(b *testing.B) {
benchmarkUpdateDocuments(b, database)
})
b.Run("TSIngestSingle", func(b *testing.B) {
benchmarkTimeSeriesIngestSingle(b, database)
})
b.Run("TSIngestBatch", func(b *testing.B) {
benchmarkTimeSeriesIngestBatch(b, database)
})
b.Run("TSQuerySmallRange", func(b *testing.B) {
benchmarkTimeSeriesQuerySmallRange(b, database)
})
b.Run("TSQueryLargeRange", func(b *testing.B) {
benchmarkTimeSeriesQueryLargeRange(b, database)
})
b.Run("TSQueryWideRange", func(b *testing.B) {
benchmarkTimeSeriesQueryWideRange(b, database)
})
}

func benchmarkAddDocuments(b *testing.B, database Database) {
Expand Down Expand Up @@ -658,3 +673,137 @@ func benchmarkUpdateDocuments(b *testing.B, database Database) {
assert.NoError(b, err)
}
}

// benchmarkTimeSeriesIngestSingle measures single-point ingestion throughput.
func benchmarkTimeSeriesIngestSingle(b *testing.B, database Database) {
ctx := b.Context()
base := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC)
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := database.AddTimeSeriesPoints(ctx, []TimeSeriesPoint{
{Name: "bench_single", Value: float64(i), Timestamp: base.Add(time.Duration(i) * time.Second)},
})
assert.NoError(b, err)
}
}

// benchmarkTimeSeriesIngestBatch measures batch ingestion throughput (100 points per call).
func benchmarkTimeSeriesIngestBatch(b *testing.B, database Database) {
ctx := b.Context()
base := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC)
const batchSize = 100
b.ResetTimer()
for i := 0; i < b.N; i++ {
points := make([]TimeSeriesPoint, batchSize)
for j := range batchSize {
points[j] = TimeSeriesPoint{
Name: "bench_batch",
Value: float64(i*batchSize + j),
Timestamp: base.Add(time.Duration(i*batchSize+j) * time.Second),
}
}
err := database.AddTimeSeriesPoints(ctx, points)
assert.NoError(b, err)
}
}

// benchmarkTimeSeriesQuerySmallRange measures query latency for a small time range (60s window, 1s buckets).
// Pre-loads 10,000 points at 1-second intervals.
func benchmarkTimeSeriesQuerySmallRange(b *testing.B, database Database) {
ctx := b.Context()
base := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC)
const totalPoints = 10000
// pre-load data in batches of 500
for offset := 0; offset < totalPoints; offset += 500 {
end := offset + 500
if end > totalPoints {
end = totalPoints
}
points := make([]TimeSeriesPoint, 0, end-offset)
for i := offset; i < end; i++ {
points = append(points, TimeSeriesPoint{
Name: "bench_query_small",
Value: float64(i),
Timestamp: base.Add(time.Duration(i) * time.Second),
})
}
err := database.AddTimeSeriesPoints(ctx, points)
assert.NoError(b, err)
}
// query a 60-second window in the middle, 1-second buckets (~60 points returned)
queryBegin := base.Add(5000 * time.Second)
queryEnd := base.Add(5060 * time.Second)
b.ResetTimer()
for i := 0; i < b.N; i++ {
points, err := database.GetTimeSeriesPoints(ctx, "bench_query_small", queryBegin, queryEnd, time.Second)
assert.NoError(b, err)
assert.NotEmpty(b, points)
}
}

// benchmarkTimeSeriesQueryLargeRange measures query latency with heavy aggregation.
// Pre-loads 10,000 points at 1-second intervals, queries full range with 60-second buckets (~167 buckets).
func benchmarkTimeSeriesQueryLargeRange(b *testing.B, database Database) {
ctx := b.Context()
base := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC)
const totalPoints = 10000
// pre-load data in batches of 500
for offset := 0; offset < totalPoints; offset += 500 {
end := offset + 500
if end > totalPoints {
end = totalPoints
}
points := make([]TimeSeriesPoint, 0, end-offset)
for i := offset; i < end; i++ {
points = append(points, TimeSeriesPoint{
Name: "bench_query_large",
Value: float64(i),
Timestamp: base.Add(time.Duration(i) * time.Second),
})
}
err := database.AddTimeSeriesPoints(ctx, points)
assert.NoError(b, err)
}
// query full range with 60-second buckets
queryBegin := base
queryEnd := base.Add(time.Duration(totalPoints) * time.Second)
b.ResetTimer()
for i := 0; i < b.N; i++ {
points, err := database.GetTimeSeriesPoints(ctx, "bench_query_large", queryBegin, queryEnd, 60*time.Second)
assert.NoError(b, err)
assert.NotEmpty(b, points)
}
}

// benchmarkTimeSeriesQueryWideRange measures worst-case query: 100K points aggregated into ~28 hourly buckets.
func benchmarkTimeSeriesQueryWideRange(b *testing.B, database Database) {
ctx := b.Context()
base := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC)
const totalPoints = 100000
// pre-load data in batches of 1000
for offset := 0; offset < totalPoints; offset += 1000 {
end := offset + 1000
if end > totalPoints {
end = totalPoints
}
points := make([]TimeSeriesPoint, 0, end-offset)
for i := offset; i < end; i++ {
points = append(points, TimeSeriesPoint{
Name: "bench_query_wide",
Value: float64(i),
Timestamp: base.Add(time.Duration(i) * time.Second),
})
}
err := database.AddTimeSeriesPoints(ctx, points)
assert.NoError(b, err)
}
// query full range with 1-hour buckets
queryBegin := base
queryEnd := base.Add(time.Duration(totalPoints) * time.Second)
b.ResetTimer()
for i := 0; i < b.N; i++ {
points, err := database.GetTimeSeriesPoints(ctx, "bench_query_wide", queryBegin, queryEnd, time.Hour)
assert.NoError(b, err)
assert.NotEmpty(b, points)
}
}
Loading
Loading