Skip to content

Commit

Permalink
chore: review comments 1
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhar-rudder committed Feb 7, 2025
1 parent c093c70 commit 28f7e95
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 15 deletions.
7 changes: 5 additions & 2 deletions warehouse/internal/repo/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func parseWHSchemas(rows *sqlmiddleware.Rows) ([]*model.WHSchema, error) {
var (
whSchema model.WHSchema
schemaPayloadRawRaw []byte
expiresAt sql.NullTime
)
err := rows.Scan(
&whSchema.ID,
Expand All @@ -133,15 +134,17 @@ func parseWHSchemas(rows *sqlmiddleware.Rows) ([]*model.WHSchema, error) {
&schemaPayloadRawRaw,
&whSchema.CreatedAt,
&whSchema.UpdatedAt,
&whSchema.ExpiresAt,
&expiresAt,
)
if err != nil {
return nil, fmt.Errorf("scanning row: %w", err)
}

whSchema.CreatedAt = whSchema.CreatedAt.UTC()
whSchema.UpdatedAt = whSchema.UpdatedAt.UTC()
whSchema.ExpiresAt = whSchema.ExpiresAt.UTC()
if expiresAt.Valid {
whSchema.ExpiresAt = expiresAt.Time.UTC()
}

var schemaPayload model.Schema
err = json.Unmarshal(schemaPayloadRawRaw, &schemaPayload)
Expand Down
7 changes: 6 additions & 1 deletion warehouse/router/state_export_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (

"github.com/samber/lo"

"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"

"github.com/rudderlabs/rudder-server/services/alerta"
"github.com/rudderlabs/rudder-server/warehouse/identity"
Expand Down Expand Up @@ -809,7 +811,10 @@ func (job *UploadJob) columnCountStat(tableName string) {
}
currentColumnsCount, err := job.schemaHandle.GetColumnsCountInWarehouseSchema(job.ctx, tableName)
if err != nil {
job.logger.Errorf("error getting column count for table %s: %v", tableName, err)
job.logger.Warnn("Getting column count in warehouse schema",
logger.NewStringField(logfield.TableName, tableName),
obskit.Error(err),
)
return
}

Expand Down
34 changes: 22 additions & 12 deletions warehouse/schema/schema_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ type schemaV2 struct {
enableIDResolution bool
fetchSchemaRepo fetchSchemaRepo
now func() time.Time
cachedSchema model.Schema
cacheExpiry time.Time
}

func newSchemaV2(v1 *schema, warehouse model.Warehouse, log logger.Logger, schemaSize stats.Histogram, ttlInMinutes time.Duration, fetchSchemaRepo fetchSchemaRepo) *schemaV2 {
return &schemaV2{
v2 := &schemaV2{
warehouse: warehouse,
log: log,
ttlInMinutes: ttlInMinutes,
Expand All @@ -42,12 +44,9 @@ func newSchemaV2(v1 *schema, warehouse model.Warehouse, log logger.Logger, schem
fetchSchemaRepo: fetchSchemaRepo,
enableIDResolution: v1.enableIDResolution,
now: timeutil.Now,
stats: struct {
schemaSize stats.Histogram
}{
schemaSize: schemaSize,
},
}
v2.stats.schemaSize = schemaSize
return v2
}

func (sh *schemaV2) SyncRemoteSchema(ctx context.Context, _ fetchSchemaRepo, uploadID int64) (bool, error) {
Expand All @@ -57,7 +56,7 @@ func (sh *schemaV2) SyncRemoteSchema(ctx context.Context, _ fetchSchemaRepo, upl
func (sh *schemaV2) IsWarehouseSchemaEmpty(ctx context.Context) bool {
schema, err := sh.getSchema(ctx)
if err != nil {
sh.log.Errorw("error getting schema: %w", err)
sh.log.Warnf("error getting schema: %v", err)
return true
}
return len(schema) == 0
Expand All @@ -66,7 +65,7 @@ func (sh *schemaV2) IsWarehouseSchemaEmpty(ctx context.Context) bool {
func (sh *schemaV2) GetTableSchemaInWarehouse(ctx context.Context, tableName string) model.TableSchema {
schema, err := sh.getSchema(ctx)
if err != nil {
sh.log.Errorw("error getting schema: %w", err)
sh.log.Warnf("error getting schema: %w", err)
return model.TableSchema{}
}
return schema[tableName]
Expand Down Expand Up @@ -161,18 +160,27 @@ func (sh *schemaV2) fetchSchemaFromWarehouse(ctx context.Context) (model.Schema,
}

func (sh *schemaV2) saveSchema(ctx context.Context, newSchema model.Schema) error {
expiresAt := sh.now().Add(sh.ttlInMinutes)
_, err := sh.schemaRepo.Insert(ctx, &model.WHSchema{
SourceID: sh.warehouse.Source.ID,
Namespace: sh.warehouse.Namespace,
DestinationID: sh.warehouse.Destination.ID,
DestinationType: sh.warehouse.Type,
Schema: newSchema,
ExpiresAt: sh.now().Add(sh.ttlInMinutes),
ExpiresAt: expiresAt,
})
return err
if err != nil {
return fmt.Errorf("inserting schema: %w", err)
}
sh.cachedSchema = newSchema
sh.cacheExpiry = expiresAt
return nil
}

func (sh *schemaV2) getSchema(ctx context.Context) (model.Schema, error) {
if sh.cachedSchema != nil && sh.cacheExpiry.After(sh.now()) {
return sh.cachedSchema, nil
}
whSchema, err := sh.schemaRepo.GetForNamespace(
ctx,
sh.warehouse.Source.ID,
Expand All @@ -183,11 +191,13 @@ func (sh *schemaV2) getSchema(ctx context.Context) (model.Schema, error) {
return nil, fmt.Errorf("getting schema for namespace: %w", err)
}
if whSchema.Schema == nil {
return model.Schema{}, nil
sh.cachedSchema = model.Schema{}
sh.cacheExpiry = sh.now().Add(sh.ttlInMinutes)
return sh.cachedSchema, nil
}
if whSchema.ExpiresAt.Before(sh.now()) {
sh.log.Infof("Schema expired for destination id: %s, namespace: %s at %v", sh.warehouse.Destination.ID, sh.warehouse.Namespace, whSchema.ExpiresAt)
return sh.fetchSchemaFromWarehouse(ctx)
}
return whSchema.Schema, nil
return sh.cachedSchema, nil
}

0 comments on commit 28f7e95

Please sign in to comment.