Skip to content

Commit

Permalink
fix bug: enum values insertion is broken with separate client db (bad…
Browse files Browse the repository at this point in the history
… separation of executor scope)
  • Loading branch information
Pascal-Delange committed Nov 8, 2024
1 parent df101a8 commit 8e91044
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 108 deletions.
5 changes: 5 additions & 0 deletions mocks/data_model_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,8 @@ func (d *DataModelRepository) GetPivot(ctx context.Context, exec repositories.Ex
args := d.Called(ctx, exec, pivotId)
return args.Get(0).(models.PivotMetadata), args.Error(1)
}

func (d *DataModelRepository) BatchInsertEnumValues(ctx context.Context, exec repositories.Executor, enumValues models.EnumValues, table models.Table) error {
args := d.Called(ctx, exec, enumValues, table)
return args.Error(0)
}
12 changes: 12 additions & 0 deletions models/data_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,18 @@ type UpdateFieldInput struct {
IsUnique *bool
}

type EnumValues map[string]map[any]struct{}

// CollectEnumValues mutates the EnumValues object to collect all the enum values from the payload
func (enumValues EnumValues) CollectEnumValues(payload ClientObject) {
for fieldName := range enumValues {
value := payload.Data[fieldName]
if value != nil && value != "" {
enumValues[fieldName][value] = struct{}{}
}
}
}

// ///////////////////////////////
// Data Model Link
// ///////////////////////////////
Expand Down
52 changes: 52 additions & 0 deletions repositories/data_model_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type DataModelRepository interface {
GetLinks(ctx context.Context, exec Executor, organizationId string) ([]models.LinkToSingle, error)
DeleteDataModel(ctx context.Context, exec Executor, organizationID string) error
GetDataModelField(ctx context.Context, exec Executor, fieldId string) (models.FieldMetadata, error)
BatchInsertEnumValues(ctx context.Context, exec Executor, enumValues models.EnumValues, table models.Table) error

CreatePivot(ctx context.Context, exec Executor, id string, pivot models.CreatePivotInput) error
ListPivots(ctx context.Context, exec Executor, organization_id string, tableId *string) ([]models.PivotMetadata, error)
Expand Down Expand Up @@ -500,3 +501,54 @@ func (repo *DataModelRepositoryPostgresql) GetPivot(ctx context.Context, exec Ex
dbmodels.AdaptPivotMetadata,
)
}

func (repo *DataModelRepositoryPostgresql) BatchInsertEnumValues(ctx context.Context, exec Executor, enumValues models.EnumValues, table models.Table) error {
if err := validateMarbleDbExecutor(exec); err != nil {
return err
}

// This has to be done in 2 queries because there cannot be multiple ON CONFLICT clauses per query
textQuery := NewQueryBuilder().
Insert("data_model_enum_values").
Columns("field_id", "text_value").
Suffix("ON CONFLICT ON CONSTRAINT unique_data_model_enum_text_values_field_id_value DO NOTHING")

floatQuery := NewQueryBuilder().
Insert("data_model_enum_values").
Columns("field_id", "float_value").
Suffix("ON CONFLICT ON CONSTRAINT unique_data_model_enum_float_values_field_id_value DO NOTHING")

// Hack to avoid empty query, which would cause an execution error
var shouldInsertTextValues bool
var shouldInsertFloatValues bool

for fieldName, values := range enumValues {
fieldId := table.Fields[fieldName].ID
dataType := table.Fields[fieldName].DataType

for value := range values {
if dataType == models.String {
textQuery = textQuery.Values(fieldId, value)
shouldInsertTextValues = true
} else if dataType == models.Float {
floatQuery = floatQuery.Values(fieldId, value)
shouldInsertFloatValues = true
}
}
}

if shouldInsertTextValues {
err := ExecBuilder(ctx, exec, textQuery)
if err != nil {
return err
}
}
if shouldInsertFloatValues {
err := ExecBuilder(ctx, exec, floatQuery)
if err != nil {
return err
}
}

return nil
}
86 changes: 3 additions & 83 deletions repositories/ingestion_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package repositories

import (
"context"
"fmt"
"time"

"github.com/Masterminds/squirrel"
Expand Down Expand Up @@ -53,7 +52,7 @@ func (repo *IngestionRepositoryImpl) IngestObjects(
}

if len(payloadsToInsert) > 0 {
if err := repo.batchInsertPayloadsAndEnumValues(ctx, tx, payloadsToInsert, table); err != nil {
if err := repo.batchInsertPayloads(ctx, tx, payloadsToInsert, table); err != nil {
return 0, err
}
}
Expand Down Expand Up @@ -157,113 +156,34 @@ func (repo *IngestionRepositoryImpl) batchUpdateValidUntilOnObsoleteObjects(ctx
return err
}

func (repo *IngestionRepositoryImpl) batchInsertPayloadsAndEnumValues(ctx context.Context,
exec Executor, payloads []models.ClientObject, table models.Table,
func (repo *IngestionRepositoryImpl) batchInsertPayloads(ctx context.Context, exec Executor, payloads []models.ClientObject, table models.Table,
) error {
columnNames := models.ColumnNames(table)
query := NewQueryBuilder().Insert(tableNameWithSchema(exec, table.Name))

enumValues := buildEnumValuesWithEnumFields(table)

for _, payload := range payloads {
collectEnumValues(payload, enumValues)

insertValues := generateInsertValues(payload, columnNames)
// Add UUID to the insert values for the "id" field
insertValues = append(insertValues, uuid.NewString())
query = query.Values(insertValues...)
}

err := batchInsertEnumValues(ctx, exec, enumValues, table)
if err != nil {
return fmt.Errorf("batchInsertEnumValues error: %w", err)
}

columnNames = append(columnNames, "id")
query = query.Columns(columnNames...)

err = ExecBuilder(ctx, exec, query)
err := ExecBuilder(ctx, exec, query)
if IsUniqueViolationError(err) {
return errors.Wrap(models.ConflictError, "unique constraint violation during ingestion")
}

return err
}

type EnumValues map[string]map[any]bool

func buildEnumValuesWithEnumFields(table models.Table) EnumValues {
enumValues := make(EnumValues)
for fieldName := range table.Fields {
dataType := table.Fields[fieldName].DataType
if table.Fields[fieldName].IsEnum && (dataType == models.String || dataType == models.Float) {
enumValues[fieldName] = make(map[any]bool)
}
}
return enumValues
}

// mutates enumValues
func collectEnumValues(payload models.ClientObject, enumValues EnumValues) {
for fieldName := range enumValues {
value := payload.Data[fieldName]
if value != nil && value != "" {
enumValues[fieldName][value] = true
}
}
}

func generateInsertValues(payload models.ClientObject, columnNames []string) []any {
insertValues := make([]any, len(columnNames))
for i, fieldName := range columnNames {
insertValues[i] = payload.Data[fieldName]
}
return insertValues
}

// This has to be done in 2 queries because there cannot be multiple ON CONFLICT clauses per query
func batchInsertEnumValues(ctx context.Context, exec Executor, enumValues EnumValues, table models.Table) error {
textQuery := NewQueryBuilder().
Insert("data_model_enum_values").
Columns("field_id", "text_value").
Suffix("ON CONFLICT ON CONSTRAINT unique_data_model_enum_text_values_field_id_value DO NOTHING")

floatQuery := NewQueryBuilder().
Insert("data_model_enum_values").
Columns("field_id", "float_value").
Suffix("ON CONFLICT ON CONSTRAINT unique_data_model_enum_float_values_field_id_value DO NOTHING")

// Hack to avoid empty query, which would cause an execution error
var shouldInsertTextValues bool
var shouldInsertFloatValues bool

for fieldName, values := range enumValues {
fieldId := table.Fields[fieldName].ID
dataType := table.Fields[fieldName].DataType

for value := range values {
if dataType == models.String {
textQuery = textQuery.Values(fieldId, value)
shouldInsertTextValues = true
} else if dataType == models.Float {
floatQuery = floatQuery.Values(fieldId, value)
shouldInsertFloatValues = true
}
}
}

if shouldInsertTextValues {
err := ExecBuilder(ctx, exec, textQuery)
if err != nil {
return err
}
}
if shouldInsertFloatValues {
err := ExecBuilder(ctx, exec, floatQuery)
if err != nil {
return err
}
}

return nil
}
88 changes: 63 additions & 25 deletions usecases/ingestion_usecase.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,10 @@ func (usecase *IngestionUseCase) IngestObject(
}

var nb int
ingestClosure := func() error {
return usecase.transactionFactory.TransactionInOrgSchema(ctx, organizationId, func(tx repositories.Transaction) error {
nb, err = usecase.ingestionRepository.IngestObjects(ctx, tx, []models.ClientObject{payload}, table)
return err
})
}
err = retryIngestion(ctx, ingestClosure)
err = retryIngestion(ctx, func() error {
nb, err = usecase.insertEnumValuesAndIngest(ctx, organizationId, []models.ClientObject{payload}, table)
return err
})
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -179,13 +176,10 @@ func (usecase *IngestionUseCase) IngestObjects(
}

var nb int
ingestClosure := func() error {
return usecase.transactionFactory.TransactionInOrgSchema(ctx, organizationId, func(tx repositories.Transaction) error {
nb, err = usecase.ingestionRepository.IngestObjects(ctx, tx, clientObjects, table)
return err
})
}
err = retryIngestion(ctx, ingestClosure)
err = retryIngestion(ctx, func() error {
nb, err = usecase.insertEnumValuesAndIngest(ctx, organizationId, clientObjects, table)
return err
})
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -516,22 +510,17 @@ func (usecase *IngestionUseCase) ingestObjectsFromCSV(ctx context.Context, organ
clientObjects = append(clientObjects, clientObject)
}

ingestClosure := func() error {
return usecase.transactionFactory.TransactionInOrgSchema(
ctx,
organizationId,
func(tx repositories.Transaction) error {
_, err := usecase.ingestionRepository.IngestObjects(ctx, tx, clientObjects, table)
return err
})
}
if err := retryIngestion(ctx, ingestClosure); err != nil {
var nb int
if err := retryIngestion(ctx, func() error {
nb, err = usecase.insertEnumValuesAndIngest(ctx, organizationId, clientObjects, table)
return err
}); err != nil {
return ingestionResult{
numRowsIngested: total,
err: err,
}
}
total += len(clientObjects)
total += nb
}

return ingestionResult{
Expand Down Expand Up @@ -626,3 +615,52 @@ func retryIngestion(ctx context.Context, f func() error) error {
}),
)
}

func (usecase *IngestionUseCase) insertEnumValuesAndIngest(
ctx context.Context,
organizationId string,
payloads []models.ClientObject,
table models.Table,
) (int, error) {
var nb int
var err error
err = usecase.transactionFactory.TransactionInOrgSchema(ctx, organizationId, func(tx repositories.Transaction) error {
nb, err = usecase.ingestionRepository.IngestObjects(ctx, tx, payloads, table)
return err
})
if err != nil {
return 0, err
}

go func() {
// I'm giving it a short deadline because it's not critical to the user - in any situation i'd rather it fails
// than take more than 10ms
ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), time.Millisecond*10)
defer cancel()
enumValues := buildEnumValuesContainersFromTable(table)
for _, payload := range payloads {
enumValues.CollectEnumValues(payload)
}
exec := usecase.executorFactory.NewExecutor()
err := usecase.dataModelRepository.BatchInsertEnumValues(ctx, exec, enumValues, table)
if err != nil {
utils.LogAndReportSentryError(ctx, err)
} else if errors.Is(err, context.DeadlineExceeded) {
logger := utils.LoggerFromContext(ctx)
logger.WarnContext(ctx, "Deadline exceeded while inserting enum values")
}
}()

return nb, nil
}

func buildEnumValuesContainersFromTable(table models.Table) models.EnumValues {
enumValues := make(models.EnumValues)
for fieldName := range table.Fields {
dataType := table.Fields[fieldName].DataType
if table.Fields[fieldName].IsEnum && (dataType == models.String || dataType == models.Float) {
enumValues[fieldName] = make(map[any]struct{})
}
}
return enumValues
}

0 comments on commit 8e91044

Please sign in to comment.