diff --git a/config/config.go b/config/config.go index 859110492..9a6af82f9 100644 --- a/config/config.go +++ b/config/config.go @@ -71,14 +71,16 @@ type Config struct { // DatabaseConfig is the configuration for the database. type DatabaseConfig struct { - DataStore string `mapstructure:"data_store" validate:"required,data_store"` // database for data store - CacheStore string `mapstructure:"cache_store" validate:"required,cache_store"` // database for cache store - TablePrefix string `mapstructure:"table_prefix"` - DataTablePrefix string `mapstructure:"data_table_prefix"` - CacheTablePrefix string `mapstructure:"cache_table_prefix"` - MySQL MySQLConfig `mapstructure:"mysql"` - Postgres SQLConfig `mapstructure:"postgres"` - Redis RedisConfig `mapstructure:"redis"` + VectorStore string `mapstructure:"vector_store"` + DataStore string `mapstructure:"data_store" validate:"required,data_store"` // database for data store + CacheStore string `mapstructure:"cache_store" validate:"required,cache_store"` // database for cache store + TablePrefix string `mapstructure:"table_prefix"` + DataTablePrefix string `mapstructure:"data_table_prefix"` + CacheTablePrefix string `mapstructure:"cache_table_prefix"` + VectorTablePrefix string `mapstructure:"vector_table_prefix"` + MySQL MySQLConfig `mapstructure:"mysql"` + Postgres SQLConfig `mapstructure:"postgres"` + Redis RedisConfig `mapstructure:"redis"` } type MySQLConfig struct { @@ -660,9 +662,11 @@ type configBinding struct { var bindings = []configBinding{ {"database.cache_store", "GORSE_CACHE_STORE"}, {"database.data_store", "GORSE_DATA_STORE"}, + {"database.vector_store", "GORSE_VECTOR_STORE"}, {"database.table_prefix", "GORSE_TABLE_PREFIX"}, {"database.cache_table_prefix", "GORSE_CACHE_TABLE_PREFIX"}, {"database.data_table_prefix", "GORSE_DATA_TABLE_PREFIX"}, + {"database.vector_table_prefix", "GORSE_VECTOR_TABLE_PREFIX"}, {"master.port", "GORSE_MASTER_PORT"}, {"master.host", "GORSE_MASTER_HOST"}, {"master.ssl_mode", "GORSE_MASTER_SSL_MODE"}, diff --git a/config/config_test.go b/config/config_test.go index 59a11dcbe..238fee5c1 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -193,9 +193,11 @@ func TestBindEnv(t *testing.T) { variables := []environmentVariable{ {"GORSE_CACHE_STORE", "redis://"}, {"GORSE_DATA_STORE", "mysql://"}, + {"GORSE_VECTOR_STORE", "qdrant://"}, {"GORSE_TABLE_PREFIX", "gorse_"}, {"GORSE_DATA_TABLE_PREFIX", "gorse_data_"}, {"GORSE_CACHE_TABLE_PREFIX", "gorse_cache_"}, + {"GORSE_VECTOR_TABLE_PREFIX", "gorse_vector_"}, {"GORSE_MASTER_PORT", "123"}, {"GORSE_MASTER_HOST", ""}, {"GORSE_MASTER_SSL_MODE", "true"}, @@ -240,9 +242,11 @@ func TestBindEnv(t *testing.T) { assert.NoError(t, err) assert.Equal(t, "redis://", config.Database.CacheStore) assert.Equal(t, "mysql://", config.Database.DataStore) + assert.Equal(t, "qdrant://", config.Database.VectorStore) assert.Equal(t, "gorse_", config.Database.TablePrefix) assert.Equal(t, "gorse_cache_", config.Database.CacheTablePrefix) assert.Equal(t, "gorse_data_", config.Database.DataTablePrefix) + assert.Equal(t, "gorse_vector_", config.Database.VectorTablePrefix) assert.Equal(t, 123, config.Master.Port) assert.Equal(t, "", config.Master.Host) assert.Equal(t, true, config.Master.SSLMode) diff --git a/master/master.go b/master/master.go index 788142708..b19d3f08c 100644 --- a/master/master.go +++ b/master/master.go @@ -40,6 +40,7 @@ import ( "github.com/gorse-io/gorse/storage/cache" "github.com/gorse-io/gorse/storage/data" "github.com/gorse-io/gorse/storage/meta" + "github.com/gorse-io/gorse/storage/vectors" "github.com/jellydator/ttlcache/v3" "github.com/juju/errors" "github.com/sashabaranov/go-openai" @@ -186,6 +187,18 @@ func (m *Master) Serve() { log.Logger().Fatal("failed to init database", zap.Error(err)) } + // open vector store + if m.Config.Database.VectorStore != "" { + log.Logger().Info("opening vector store", zap.String("path", m.Config.Database.VectorStore)) + m.VectorClient, err = vectors.Open(m.Config.Database.VectorStore, m.Config.Database.VectorTablePrefix) + if err != nil { + log.Logger().Fatal("failed to connect vector store", zap.Error(err)) + } + if err = m.VectorClient.Init(); err != nil { + log.Logger().Fatal("failed to init vector store", zap.Error(err)) + } + } + // load recommend config metaStr, err := m.metaStore.Get(meta.RECOMMEND_CONFIG) if err != nil && !errors.Is(err, errors.NotFound) { @@ -258,6 +271,7 @@ func (m *Master) Serve() { protocol.RegisterMasterServer(m.grpcServer, m) protocol.RegisterCacheStoreServer(m.grpcServer, cache.NewProxyServer(m.CacheClient)) protocol.RegisterDataStoreServer(m.grpcServer, data.NewProxyServer(m.DataClient)) + protocol.RegisterVectorStoreServer(m.grpcServer, vectors.NewProxyServer(m.VectorClient)) protocol.RegisterBlobStoreServer(m.grpcServer, m.blobServer) if err = m.grpcServer.Serve(lis); err != nil { log.Logger().Fatal("failed to start rpc server", zap.Error(err)) diff --git a/server/rest.go b/server/rest.go index 600d58183..83918c4e0 100644 --- a/server/rest.go +++ b/server/rest.go @@ -36,6 +36,7 @@ import ( "github.com/gorse-io/gorse/logics" "github.com/gorse-io/gorse/storage/cache" "github.com/gorse-io/gorse/storage/data" + "github.com/gorse-io/gorse/storage/vectors" "github.com/juju/errors" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/samber/lo" @@ -57,9 +58,10 @@ const ( // RestServer implements a REST-ful API server. type RestServer struct { - Config *config.Config - CacheClient cache.Database - DataClient data.Database + Config *config.Config + CacheClient cache.Database + DataClient data.Database + VectorClient vectors.Database HttpHost string HttpPort int diff --git a/server/server.go b/server/server.go index 7c7e296ab..57fd9c9ac 100644 --- a/server/server.go +++ b/server/server.go @@ -35,6 +35,7 @@ import ( "github.com/gorse-io/gorse/storage" "github.com/gorse-io/gorse/storage/cache" "github.com/gorse-io/gorse/storage/data" + "github.com/gorse-io/gorse/storage/vectors" "github.com/samber/lo" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/propagation" @@ -51,6 +52,8 @@ type Server struct { cachePrefix string dataPath string dataPrefix string + vectorPath string + vectorPrefix string conn *grpc.ClientConn masterClient protocol.MasterClient serverName string @@ -76,12 +79,13 @@ func NewServer( tlsConfig: tlsConfig, cacheFile: cacheFile, RestServer: RestServer{ - Config: config.GetDefaultConfig(), - CacheClient: new(cache.NoDatabase), - DataClient: new(data.NoDatabase), - HttpHost: serverHost, - HttpPort: serverPort, - WebService: new(restful.WebService), + Config: config.GetDefaultConfig(), + CacheClient: new(cache.NoDatabase), + DataClient: new(data.NoDatabase), + VectorClient: vectors.NoDatabase{}, + HttpHost: serverHost, + HttpPort: serverPort, + WebService: new(restful.WebService), }, } return s @@ -206,6 +210,21 @@ func (s *Server) Sync() { s.cachePrefix = s.Config.Database.CacheTablePrefix } + // connect to vector store + if s.vectorPath != s.Config.Database.VectorStore || s.vectorPrefix != s.Config.Database.VectorTablePrefix { + if strings.HasPrefix(s.Config.Database.VectorStore, storage.SQLitePrefix) { + log.Logger().Info("connect vector store via master") + s.VectorClient = vectors.NewProxyClient(s.conn) + } else { + if s.VectorClient, err = vectors.Open(s.Config.Database.VectorStore, s.Config.Database.VectorTablePrefix); err != nil { + log.Logger().Error("failed to connect vector store", zap.Error(err)) + goto sleep + } + } + s.vectorPath = s.Config.Database.VectorStore + s.vectorPrefix = s.Config.Database.VectorTablePrefix + } + // create trace provider if !s.traceConfig.Equal(s.Config.Tracing) { log.Logger().Info("create trace provider", zap.Any("tracing_config", s.Config.Tracing)) diff --git a/storage/vectors/no_database.go b/storage/vectors/no_database.go new file mode 100644 index 000000000..fca048c7a --- /dev/null +++ b/storage/vectors/no_database.go @@ -0,0 +1,46 @@ +// Copyright 2026 gorse Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package vectors + +import ( + "context" + "time" +) + +// NoDatabase is a no-op database that does nothing. +// Used when no vector store is configured. +type NoDatabase struct{} + +func (NoDatabase) Init() error { return nil } +func (NoDatabase) Optimize() error { + return nil +} +func (NoDatabase) Close() error { return nil } +func (NoDatabase) ListCollections(_ context.Context) ([]string, error) { + return nil, nil +} +func (NoDatabase) AddCollection(_ context.Context, _ string, _ int, _ Distance) error { + return nil +} +func (NoDatabase) DeleteCollection(_ context.Context, _ string) error { return nil } +func (NoDatabase) AddVectors(_ context.Context, _ string, _ []Vector) error { + return nil +} +func (NoDatabase) DeleteVectors(_ context.Context, _ string, _ time.Time) error { + return nil +} +func (NoDatabase) QueryVectors(_ context.Context, _ string, _ []float32, _ []string, _ int) ([]Vector, error) { + return nil, nil +} diff --git a/storage/vectors/no_database_test.go b/storage/vectors/no_database_test.go new file mode 100644 index 000000000..883f71f8f --- /dev/null +++ b/storage/vectors/no_database_test.go @@ -0,0 +1,54 @@ +// Copyright 2026 gorse Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package vectors + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestNoDatabase(t *testing.T) { + ctx := t.Context() + var database NoDatabase + + err := database.Init() + assert.NoError(t, err) + err = database.Optimize() + assert.NoError(t, err) + err = database.Close() + assert.NoError(t, err) + + collections, err := database.ListCollections(ctx) + assert.NoError(t, err) + assert.Nil(t, collections) + + err = database.AddCollection(ctx, "test", 4, Cosine) + assert.NoError(t, err) + err = database.DeleteCollection(ctx, "test") + assert.NoError(t, err) + + err = database.AddVectors(ctx, "test", []Vector{ + {Id: "a", Vector: []float32{1, 0, 0, 0}}, + }) + assert.NoError(t, err) + err = database.DeleteVectors(ctx, "test", time.Now()) + assert.NoError(t, err) + + results, err := database.QueryVectors(ctx, "test", []float32{1, 0, 0, 0}, []string{"cat-a"}, 10) + assert.NoError(t, err) + assert.Nil(t, results) +} diff --git a/worker/worker.go b/worker/worker.go index 7884b8307..35e1cb04d 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -40,6 +40,7 @@ import ( "github.com/gorse-io/gorse/storage/blob" "github.com/gorse-io/gorse/storage/cache" "github.com/gorse-io/gorse/storage/data" + "github.com/gorse-io/gorse/storage/vectors" "github.com/juju/errors" "github.com/lafikl/consistent" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -69,13 +70,16 @@ type Worker struct { cacheFile string // database connection path - cachePath string - cachePrefix string - dataPath string - dataPrefix string + cachePath string + cachePrefix string + dataPath string + dataPrefix string + vectorPath string + vectorPrefix string - blobConfig string - blobStore blob.Store + blobConfig string + blobStore blob.Store + vectorStore vectors.Database // master connection conn *grpc.ClientConn @@ -114,6 +118,7 @@ func NewWorker( DataClient: new(data.NoDatabase), Jobs: jobs, }, + vectorStore: vectors.NoDatabase{}, randGenerator: util.NewRand(time.Now().UTC().UnixNano()), // config cacheFile: cacheFile, @@ -202,6 +207,21 @@ func (w *Worker) Sync() { w.blobConfig = nextBlobConfig.URI } + // connect to vector store + if w.vectorPath != w.Config.Database.VectorStore || w.vectorPrefix != w.Config.Database.VectorTablePrefix { + if strings.HasPrefix(w.Config.Database.VectorStore, storage.SQLitePrefix) { + log.Logger().Info("connect vector store via master") + w.vectorStore = vectors.NewProxyClient(w.conn) + } else { + if w.vectorStore, err = vectors.Open(w.Config.Database.VectorStore, w.Config.Database.VectorTablePrefix); err != nil { + log.Logger().Error("failed to connect vector store", zap.Error(err)) + goto sleep + } + } + w.vectorPath = w.Config.Database.VectorStore + w.vectorPrefix = w.Config.Database.VectorTablePrefix + } + // synchronize collaborative filtering model w.latestCollaborativeFilteringModelId = meta.CollaborativeFilteringModelId if w.latestCollaborativeFilteringModelId > w.collaborativeFilteringModelId {