Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,16 @@ type Config struct {

// DatabaseConfig is the configuration for the database.
type DatabaseConfig struct {
DataStore string `mapstructure:"data_store" validate:"required,data_store"` // database for data store
CacheStore string `mapstructure:"cache_store" validate:"required,cache_store"` // database for cache store
TablePrefix string `mapstructure:"table_prefix"`
DataTablePrefix string `mapstructure:"data_table_prefix"`
CacheTablePrefix string `mapstructure:"cache_table_prefix"`
MySQL MySQLConfig `mapstructure:"mysql"`
Postgres SQLConfig `mapstructure:"postgres"`
Redis RedisConfig `mapstructure:"redis"`
VectorStore string `mapstructure:"vector_store"`
DataStore string `mapstructure:"data_store" validate:"required,data_store"` // database for data store
CacheStore string `mapstructure:"cache_store" validate:"required,cache_store"` // database for cache store
TablePrefix string `mapstructure:"table_prefix"`
DataTablePrefix string `mapstructure:"data_table_prefix"`
CacheTablePrefix string `mapstructure:"cache_table_prefix"`
VectorTablePrefix string `mapstructure:"vector_table_prefix"`
MySQL MySQLConfig `mapstructure:"mysql"`
Postgres SQLConfig `mapstructure:"postgres"`
Redis RedisConfig `mapstructure:"redis"`
}

type MySQLConfig struct {
Expand Down Expand Up @@ -660,9 +662,11 @@ type configBinding struct {
var bindings = []configBinding{
{"database.cache_store", "GORSE_CACHE_STORE"},
{"database.data_store", "GORSE_DATA_STORE"},
{"database.vector_store", "GORSE_VECTOR_STORE"},
{"database.table_prefix", "GORSE_TABLE_PREFIX"},
{"database.cache_table_prefix", "GORSE_CACHE_TABLE_PREFIX"},
{"database.data_table_prefix", "GORSE_DATA_TABLE_PREFIX"},
{"database.vector_table_prefix", "GORSE_VECTOR_TABLE_PREFIX"},
{"master.port", "GORSE_MASTER_PORT"},
{"master.host", "GORSE_MASTER_HOST"},
{"master.ssl_mode", "GORSE_MASTER_SSL_MODE"},
Expand Down
4 changes: 4 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,11 @@ func TestBindEnv(t *testing.T) {
variables := []environmentVariable{
{"GORSE_CACHE_STORE", "redis://<cache_store>"},
{"GORSE_DATA_STORE", "mysql://<data_store>"},
{"GORSE_VECTOR_STORE", "qdrant://<vector_store>"},
{"GORSE_TABLE_PREFIX", "gorse_"},
{"GORSE_DATA_TABLE_PREFIX", "gorse_data_"},
{"GORSE_CACHE_TABLE_PREFIX", "gorse_cache_"},
{"GORSE_VECTOR_TABLE_PREFIX", "gorse_vector_"},
{"GORSE_MASTER_PORT", "123"},
{"GORSE_MASTER_HOST", "<master_host>"},
{"GORSE_MASTER_SSL_MODE", "true"},
Expand Down Expand Up @@ -240,9 +242,11 @@ func TestBindEnv(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, "redis://<cache_store>", config.Database.CacheStore)
assert.Equal(t, "mysql://<data_store>", config.Database.DataStore)
assert.Equal(t, "qdrant://<vector_store>", config.Database.VectorStore)
assert.Equal(t, "gorse_", config.Database.TablePrefix)
assert.Equal(t, "gorse_cache_", config.Database.CacheTablePrefix)
assert.Equal(t, "gorse_data_", config.Database.DataTablePrefix)
assert.Equal(t, "gorse_vector_", config.Database.VectorTablePrefix)
assert.Equal(t, 123, config.Master.Port)
assert.Equal(t, "<master_host>", config.Master.Host)
assert.Equal(t, true, config.Master.SSLMode)
Expand Down
14 changes: 14 additions & 0 deletions master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/gorse-io/gorse/storage/cache"
"github.com/gorse-io/gorse/storage/data"
"github.com/gorse-io/gorse/storage/meta"
"github.com/gorse-io/gorse/storage/vectors"
"github.com/jellydator/ttlcache/v3"
"github.com/juju/errors"
"github.com/sashabaranov/go-openai"
Expand Down Expand Up @@ -186,6 +187,18 @@ func (m *Master) Serve() {
log.Logger().Fatal("failed to init database", zap.Error(err))
}

// open vector store
if m.Config.Database.VectorStore != "" {
log.Logger().Info("opening vector store", zap.String("path", m.Config.Database.VectorStore))
m.VectorClient, err = vectors.Open(m.Config.Database.VectorStore, m.Config.Database.VectorTablePrefix)
if err != nil {
log.Logger().Fatal("failed to connect vector store", zap.Error(err))
}
if err = m.VectorClient.Init(); err != nil {
log.Logger().Fatal("failed to init vector store", zap.Error(err))
}
}

// load recommend config
metaStr, err := m.metaStore.Get(meta.RECOMMEND_CONFIG)
if err != nil && !errors.Is(err, errors.NotFound) {
Expand Down Expand Up @@ -258,6 +271,7 @@ func (m *Master) Serve() {
protocol.RegisterMasterServer(m.grpcServer, m)
protocol.RegisterCacheStoreServer(m.grpcServer, cache.NewProxyServer(m.CacheClient))
protocol.RegisterDataStoreServer(m.grpcServer, data.NewProxyServer(m.DataClient))
protocol.RegisterVectorStoreServer(m.grpcServer, vectors.NewProxyServer(m.VectorClient))
protocol.RegisterBlobStoreServer(m.grpcServer, m.blobServer)
if err = m.grpcServer.Serve(lis); err != nil {
log.Logger().Fatal("failed to start rpc server", zap.Error(err))
Expand Down
8 changes: 5 additions & 3 deletions server/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/gorse-io/gorse/logics"
"github.com/gorse-io/gorse/storage/cache"
"github.com/gorse-io/gorse/storage/data"
"github.com/gorse-io/gorse/storage/vectors"
"github.com/juju/errors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/samber/lo"
Expand All @@ -57,9 +58,10 @@ const (

// RestServer implements a REST-ful API server.
type RestServer struct {
Config *config.Config
CacheClient cache.Database
DataClient data.Database
Config *config.Config
CacheClient cache.Database
DataClient data.Database
VectorClient vectors.Database

HttpHost string
HttpPort int
Expand Down
31 changes: 25 additions & 6 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/gorse-io/gorse/storage"
"github.com/gorse-io/gorse/storage/cache"
"github.com/gorse-io/gorse/storage/data"
"github.com/gorse-io/gorse/storage/vectors"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
Expand All @@ -51,6 +52,8 @@ type Server struct {
cachePrefix string
dataPath string
dataPrefix string
vectorPath string
vectorPrefix string
conn *grpc.ClientConn
masterClient protocol.MasterClient
serverName string
Expand All @@ -76,12 +79,13 @@ func NewServer(
tlsConfig: tlsConfig,
cacheFile: cacheFile,
RestServer: RestServer{
Config: config.GetDefaultConfig(),
CacheClient: new(cache.NoDatabase),
DataClient: new(data.NoDatabase),
HttpHost: serverHost,
HttpPort: serverPort,
WebService: new(restful.WebService),
Config: config.GetDefaultConfig(),
CacheClient: new(cache.NoDatabase),
DataClient: new(data.NoDatabase),
VectorClient: vectors.NoDatabase{},
HttpHost: serverHost,
HttpPort: serverPort,
WebService: new(restful.WebService),
},
}
return s
Expand Down Expand Up @@ -206,6 +210,21 @@ func (s *Server) Sync() {
s.cachePrefix = s.Config.Database.CacheTablePrefix
}

// connect to vector store
if s.vectorPath != s.Config.Database.VectorStore || s.vectorPrefix != s.Config.Database.VectorTablePrefix {
if strings.HasPrefix(s.Config.Database.VectorStore, storage.SQLitePrefix) {
log.Logger().Info("connect vector store via master")
s.VectorClient = vectors.NewProxyClient(s.conn)
} else {
if s.VectorClient, err = vectors.Open(s.Config.Database.VectorStore, s.Config.Database.VectorTablePrefix); err != nil {
log.Logger().Error("failed to connect vector store", zap.Error(err))
goto sleep
}
}
s.vectorPath = s.Config.Database.VectorStore
s.vectorPrefix = s.Config.Database.VectorTablePrefix
}

// create trace provider
if !s.traceConfig.Equal(s.Config.Tracing) {
log.Logger().Info("create trace provider", zap.Any("tracing_config", s.Config.Tracing))
Expand Down
46 changes: 46 additions & 0 deletions storage/vectors/no_database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2026 gorse Project Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package vectors

import (
"context"
"time"
)

// NoDatabase is a no-op database that does nothing.
// Used when no vector store is configured.
type NoDatabase struct{}

func (NoDatabase) Init() error { return nil }
func (NoDatabase) Optimize() error {
return nil
}
func (NoDatabase) Close() error { return nil }
func (NoDatabase) ListCollections(_ context.Context) ([]string, error) {
return nil, nil
}
func (NoDatabase) AddCollection(_ context.Context, _ string, _ int, _ Distance) error {
return nil
}
func (NoDatabase) DeleteCollection(_ context.Context, _ string) error { return nil }
func (NoDatabase) AddVectors(_ context.Context, _ string, _ []Vector) error {
return nil
}
func (NoDatabase) DeleteVectors(_ context.Context, _ string, _ time.Time) error {
return nil
}
func (NoDatabase) QueryVectors(_ context.Context, _ string, _ []float32, _ []string, _ int) ([]Vector, error) {
return nil, nil
}
54 changes: 54 additions & 0 deletions storage/vectors/no_database_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2026 gorse Project Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package vectors

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestNoDatabase(t *testing.T) {
ctx := t.Context()
var database NoDatabase

err := database.Init()
assert.NoError(t, err)
err = database.Optimize()
assert.NoError(t, err)
err = database.Close()
assert.NoError(t, err)

collections, err := database.ListCollections(ctx)
assert.NoError(t, err)
assert.Nil(t, collections)

err = database.AddCollection(ctx, "test", 4, Cosine)
assert.NoError(t, err)
err = database.DeleteCollection(ctx, "test")
assert.NoError(t, err)

err = database.AddVectors(ctx, "test", []Vector{
{Id: "a", Vector: []float32{1, 0, 0, 0}},
})
assert.NoError(t, err)
err = database.DeleteVectors(ctx, "test", time.Now())
assert.NoError(t, err)

results, err := database.QueryVectors(ctx, "test", []float32{1, 0, 0, 0}, []string{"cat-a"}, 10)
assert.NoError(t, err)
assert.Nil(t, results)
}
32 changes: 26 additions & 6 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/gorse-io/gorse/storage/blob"
"github.com/gorse-io/gorse/storage/cache"
"github.com/gorse-io/gorse/storage/data"
"github.com/gorse-io/gorse/storage/vectors"
"github.com/juju/errors"
"github.com/lafikl/consistent"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -69,13 +70,16 @@ type Worker struct {
cacheFile string

// database connection path
cachePath string
cachePrefix string
dataPath string
dataPrefix string
cachePath string
cachePrefix string
dataPath string
dataPrefix string
vectorPath string
vectorPrefix string

blobConfig string
blobStore blob.Store
blobConfig string
blobStore blob.Store
vectorStore vectors.Database

// master connection
conn *grpc.ClientConn
Expand Down Expand Up @@ -114,6 +118,7 @@ func NewWorker(
DataClient: new(data.NoDatabase),
Jobs: jobs,
},
vectorStore: vectors.NoDatabase{},
randGenerator: util.NewRand(time.Now().UTC().UnixNano()),
// config
cacheFile: cacheFile,
Expand Down Expand Up @@ -202,6 +207,21 @@ func (w *Worker) Sync() {
w.blobConfig = nextBlobConfig.URI
}

// connect to vector store
if w.vectorPath != w.Config.Database.VectorStore || w.vectorPrefix != w.Config.Database.VectorTablePrefix {
if strings.HasPrefix(w.Config.Database.VectorStore, storage.SQLitePrefix) {
log.Logger().Info("connect vector store via master")
w.vectorStore = vectors.NewProxyClient(w.conn)
} else {
if w.vectorStore, err = vectors.Open(w.Config.Database.VectorStore, w.Config.Database.VectorTablePrefix); err != nil {
log.Logger().Error("failed to connect vector store", zap.Error(err))
goto sleep
}
}
w.vectorPath = w.Config.Database.VectorStore
w.vectorPrefix = w.Config.Database.VectorTablePrefix
}

// synchronize collaborative filtering model
w.latestCollaborativeFilteringModelId = meta.CollaborativeFilteringModelId
if w.latestCollaborativeFilteringModelId > w.collaborativeFilteringModelId {
Expand Down
Loading