Skip to content

Commit

Permalink
Merge pull request #818 from checkmarble/feat/ingested-data-reader-en…
Browse files Browse the repository at this point in the history
…dpoints

feat: add endpoint to get ingested object data
  • Loading branch information
ChibiBlasphem authored Feb 5, 2025
2 parents 5cd69e1 + 984e941 commit 2dc71d9
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 6 deletions.
26 changes: 26 additions & 0 deletions api/handle_ingestion.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,29 @@ func handleListUploadLogs(uc usecases.Usecases) func(c *gin.Context) {
c.JSON(http.StatusOK, pure_utils.Map(uploadLogs, dto.AdaptUploadLogDto))
}
}

func handleGetIngestedObject(uc usecases.Usecases) func(c *gin.Context) {
return func(c *gin.Context) {
ctx := c.Request.Context()
organizationID, err := utils.OrganizationIdFromRequest(c.Request)
if presentError(ctx, c, err) {
return
}

objectType := c.Param("object_type")
objectId := c.Param("object_id")

usecase := usecasesWithCreds(ctx, uc).NewIngestedDataReaderUsecase()
objects, err := usecase.GetIngestedObject(ctx, organizationID, objectType, objectId)
if presentError(ctx, c, err) {
return
}

if len(objects) == 0 {
c.JSON(http.StatusNotFound, nil)
return
}

c.JSON(http.StatusOK, dto.DataModelObject{Data: objects[0].Data, Metadata: objects[0].Metadata})
}
}
1 change: 1 addition & 0 deletions api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func addRoutes(r *gin.Engine, conf Configuration, uc usecases.Usecases, auth Aut
router.POST("/ingestion/:object_type/multiple", tom, handleIngestionMultiple(uc))
router.POST("/ingestion/:object_type/batch", timeoutMiddleware(conf.BatchTimeout), handlePostCsvIngestion(uc))
router.GET("/ingestion/:object_type/upload-logs", tom, handleListUploadLogs(uc))
router.GET("/ingestion/:object_type/:object_id", tom, handleGetIngestedObject(uc))

router.GET("/scenarios", tom, listScenarios(uc))
router.POST("/scenarios", tom, createScenario(uc))
Expand Down
5 changes: 5 additions & 0 deletions dto/data_model_dto.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,8 @@ type CreateFieldInput struct {
IsEnum bool `json:"is_enum"`
IsUnique bool `json:"is_unique"`
}

type DataModelObject struct {
Data map[string]any `json:"data"`
Metadata map[string]any `json:"metadata"`
}
5 changes: 5 additions & 0 deletions models/data_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,3 +252,8 @@ type DataModelLinkCreateInput struct {
ChildTableID string
ChildFieldID string
}

type DataModelObject struct {
Data map[string]any
Metadata map[string]any
}
22 changes: 18 additions & 4 deletions repositories/ingested_data_read_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package repositories
import (
"context"
"fmt"
"slices"

"github.com/Masterminds/squirrel"
"github.com/cockroachdb/errors"
Expand All @@ -25,7 +26,7 @@ type IngestedDataReadRepository interface {
exec Executor,
table models.Table,
objectId string,
) ([]map[string]any, error)
) ([]models.DataModelObject, error)
QueryAggregatedValue(
ctx context.Context,
exec Executor,
Expand Down Expand Up @@ -236,7 +237,7 @@ func (repo *IngestedDataReadRepositoryImpl) QueryIngestedObject(
exec Executor,
table models.Table,
objectId string,
) ([]map[string]any, error) {
) ([]models.DataModelObject, error) {
if err := validateClientDbExecutor(exec); err != nil {
return nil, err
}
Expand All @@ -248,7 +249,7 @@ func (repo *IngestedDataReadRepositoryImpl) QueryIngestedObject(
ctx,
exec,
qualifiedTableName,
columnNames,
append(columnNames, "valid_from"),
[]models.Filter{{
LeftSql: fmt.Sprintf("%s.object_id", qualifiedTableName),
Operator: ast.FUNC_EQUAL,
Expand All @@ -259,7 +260,20 @@ func (repo *IngestedDataReadRepositoryImpl) QueryIngestedObject(
return nil, err
}

return objectsAsMap, nil
ingestedObjects := make([]models.DataModelObject, len(objectsAsMap))
for i, object := range objectsAsMap {
ingestedObject := models.DataModelObject{Data: map[string]any{}, Metadata: map[string]any{}}
for fieldName, fieldValue := range object {
if slices.Contains(columnNames, fieldName) {
ingestedObject.Data[fieldName] = fieldValue
} else {
ingestedObject.Metadata[fieldName] = fieldValue
}
}
ingestedObjects[i] = ingestedObject
}

return ingestedObjects, nil
}

func queryWithDynamicColumnList(
Expand Down
35 changes: 35 additions & 0 deletions usecases/ingested_data_reader_usecase.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package usecases

import (
"context"

"github.com/checkmarble/marble-backend/models"
"github.com/checkmarble/marble-backend/repositories"
"github.com/checkmarble/marble-backend/usecases/executor_factory"
)

type IngestedDataReaderUsecase struct {
ingestedDataReadRepository repositories.IngestedDataReadRepository
dataModelRepository repositories.DataModelRepository
executorFactory executor_factory.ExecutorFactory
}

func (usecase *IngestedDataReaderUsecase) GetIngestedObject(ctx context.Context,
organizationId string, objectType string, objectId string,
) ([]models.DataModelObject, error) {
exec := usecase.executorFactory.NewExecutor()

dataModel, err := usecase.dataModelRepository.GetDataModel(ctx, exec, organizationId, true)
if err != nil {
return nil, err
}

table := dataModel.Tables[objectType]

db, err := usecase.executorFactory.NewClientDbExecutor(ctx, organizationId)
if err != nil {
return nil, err
}

return usecase.ingestedDataReadRepository.QueryIngestedObject(ctx, db, table, objectId)
}
2 changes: 1 addition & 1 deletion usecases/scheduled_execution/async_decision_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (w *AsyncDecisionWorker) createSingleDecisionForObjectId(
return false, nil, nil
}

object := models.ClientObject{TableName: table.Name, Data: objectMap[0]}
object := models.ClientObject{TableName: table.Name, Data: objectMap[0].Data}

evaluationParameters := evaluate_scenario.ScenarioEvaluationParameters{
Scenario: scenario,
Expand Down
2 changes: 1 addition & 1 deletion usecases/transfers_data_read/transfers_data_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (usecase TransferDataReader) QueryTransferDataFromMapping(
return make([]models.TransferData, 0), nil
}

readPartnerId, transferData := presentTransferData(ctx, objects[0])
readPartnerId, transferData := presentTransferData(ctx, objects[0].Data)
if err := usecase.enforceSecurity.ReadTransferData(ctx, readPartnerId); err != nil {
return nil, err
}
Expand Down
8 changes: 8 additions & 0 deletions usecases/usecases_with_creds.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,3 +480,11 @@ func (usecases UsecasesWithCreds) NewNewAsyncScheduledExecWorker() *scheduled_ex
)
return &w
}

func (usecases UsecasesWithCreds) NewIngestedDataReaderUsecase() IngestedDataReaderUsecase {
return IngestedDataReaderUsecase{
ingestedDataReadRepository: usecases.Repositories.IngestedDataReadRepository,
dataModelRepository: usecases.Repositories.DataModelRepository,
executorFactory: usecases.NewExecutorFactory(),
}
}

0 comments on commit 2dc71d9

Please sign in to comment.