diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 4143f6d973..718625b6fd 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -5,6 +5,8 @@ import ( "flag" "fmt" "io" + "os" + "path/filepath" "sync" "time" @@ -193,6 +195,38 @@ func (i *Ingester) getOrCreateInstance(tenantID string) (*instance, error) { return inst, nil } +func (i *Ingester) getOrOpenInstance(tenantID string) (*instance, error) { + inst, ok := i.getInstanceByID(tenantID) + if ok { + return inst, nil + } + + i.instancesMtx.Lock() + defer i.instancesMtx.Unlock() + inst, ok = i.instances[tenantID] + if ok { + return inst, nil + } + + if _, err := os.Stat(filepath.Join(i.dbConfig.DataPath, tenantID)); err != nil { + if os.IsNotExist(err) { + return nil, nil + } + return nil, err + } + + var err error + limiter := NewLimiter(tenantID, i.limits, i.lifecycler, i.cfg.LifecyclerConfig.RingConfig.ReplicationFactor) + inst, err = newInstance(i.phlarectx, i.dbConfig, tenantID, i.localBucket, i.storageBucket, limiter) + if err != nil { + return nil, err + } + + i.instances[tenantID] = inst + activeTenantsStats.Set(int64(len(i.instances))) + return inst, nil +} + func (i *Ingester) getInstanceByID(id string) (*instance, bool) { i.instancesMtx.RLock() defer i.instancesMtx.RUnlock() @@ -220,8 +254,11 @@ func forInstanceUnary[T any](ctx context.Context, i *Ingester, f func(*instance) if err != nil { return connect.NewResponse[T](new(T)), connect.NewError(connect.CodeInvalidArgument, err) } - instance, ok := i.getInstanceByID(tenantID) - if ok { + instance, err := i.getOrOpenInstance(tenantID) + if err != nil { + return nil, connect.NewError(connect.CodeInternal, err) + } + if instance != nil { return f(instance) } return connect.NewResponse[T](new(T)), nil @@ -232,8 +269,11 @@ func forInstanceStream[Req, Resp any](ctx context.Context, i *Ingester, stream * if err != nil { return connect.NewError(connect.CodeInvalidArgument, err) } - instance, ok := i.getInstanceByID(tenantID) - if ok { + instance, err := i.getOrOpenInstance(tenantID) + if err != nil { + return connect.NewError(connect.CodeInternal, err) + } + if instance != nil { return f(instance) } // The client blocks awaiting the response. diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 18b2ee4bb9..9ae2d4d1f1 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -117,7 +117,7 @@ func Test_MultitenantReadWrite(t *testing.T) { require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing)) } -func Test_ReadNotFound(t *testing.T) { +func Test_Query_TenantNotFound(t *testing.T) { dbPath := t.TempDir() logger := log.NewJSONLogger(os.Stdout) reg := prometheus.NewRegistry() @@ -152,3 +152,76 @@ func Test_ReadNotFound(t *testing.T) { require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing)) } + +func Test_Query_TenantFound(t *testing.T) { + dbPath := t.TempDir() + logger := log.NewJSONLogger(os.Stdout) + phlareCtx := phlarecontext.WithLogger(context.Background(), logger) + + cfg := client.Config{ + StorageBackendConfig: client.StorageBackendConfig{ + Backend: client.Filesystem, + Filesystem: filesystem.Config{ + Directory: dbPath, + }, + }, + } + + fs, err := client.NewBucket(phlareCtx, cfg, "storage") + require.NoError(t, err) + + ing, err := New(phlareCtx, defaultIngesterTestConfig(t), phlaredb.Config{ + DataPath: dbPath, + MaxBlockDuration: 30 * time.Hour, + }, fs, &fakeLimits{}, 0) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) + + req := &connect.Request[pushv1.PushRequest]{ + Msg: &pushv1.PushRequest{ + Series: []*pushv1.RawProfileSeries{ + { + Labels: phlaremodel.LabelsFromStrings("foo", "bar"), + Samples: []*pushv1.RawSample{ + { + ID: uuid.NewString(), + RawProfile: testProfile(t), + }, + }, + }, + }, + }, + } + + ctx := tenant.InjectTenantID(context.Background(), "foo") + _, err = ing.Push(ctx, req) + require.NoError(t, err) + + query := &typesv1.LabelValuesRequest{ + Name: "foo", + Start: time.Now().Add(-1 * time.Hour).UnixMilli(), + End: time.Now().Add(time.Hour).UnixMilli(), + } + + labelsValues, err := ing.LabelValues(ctx, connect.NewRequest(query)) + require.NoError(t, err) + require.Equal(t, []string{"bar"}, labelsValues.Msg.Names) + + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing)) + + // Open the ingester again and check if the data is + // available for queries before the first push request. + + ing, err = New(phlareCtx, defaultIngesterTestConfig(t), phlaredb.Config{ + DataPath: dbPath, + MaxBlockDuration: 30 * time.Hour, + }, fs, &fakeLimits{}, 0) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) + + labelsValues, err = ing.LabelValues(ctx, connect.NewRequest(query)) + require.NoError(t, err) + require.Equal(t, []string{"bar"}, labelsValues.Msg.Names) + + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing)) +}