Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Patch references to UC schemas to capture dependencies automatically #1989

Merged
merged 16 commits into from
Jan 16, 2025
136 changes: 136 additions & 0 deletions bundle/config/validate/schema_references.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package validate

import (
"context"
"fmt"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn"
)

// Validate that any references to UC schemas defined in the DAB use the ${resources.schemas...}
// syntax to capture the deploy time dependency.
func SchemaReferences() bundle.ReadOnlyMutator {
return &schemaReferences{}
}

type schemaReferences struct{}

func (v *schemaReferences) Name() string {
return "validate:schema_dependency"
}

func findSchemaInBundle(rb bundle.ReadOnlyBundle, catalogName, schemaName string) ([]dyn.Location, dyn.Path, bool) {
for k, s := range rb.Config().Resources.Schemas {
if s.CatalogName != catalogName || s.Name != schemaName {
continue
}
return rb.Config().GetLocations("resources.schemas." + k), dyn.NewPath(dyn.Key("resources"), dyn.Key("schemas"), dyn.Key(k)), true
}
return nil, nil, false
}

func (v *schemaReferences) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics {
diags := diag.Diagnostics{}
for k, p := range rb.Config().Resources.Pipelines {
// Skip if the pipeline uses hive metastore. The DLT API allows creating
// a pipeline without a schema or target when using hive metastore.
if p.Catalog == "" {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API allows creating a pipeline without a schema/target specified as long as you are not using UC. It's not a forward-looking use case anyway, so I did not look deeper into this.

continue
}

schemaName := ""
fieldPath := dyn.Path{}
schemaLocation := []dyn.Location{}
switch {
case p.Schema == "" && p.Target == "":
// The error message is identical to the one DLT backend returns when
// a schema is not defined for a UC DLT pipeline (date: 20 Dec 2024).
diags = append(diags, diag.Diagnostic{
Severity: diag.Error,
Summary: "Unity Catalog pipeline should have a schema or target defined",
shreyas-goenka marked this conversation as resolved.
Show resolved Hide resolved
Detail: `The target or schema field is required for UC pipelines. Reason: DLT
requires specifying a target schema for UC pipelines. Please use the
TEMPORARY keyword in the CREATE MATERIALIZED VIEW or CREATE STREAMING
TABLE statement if you do not wish to publish your dataset.`,
Locations: rb.Config().GetLocations("resources.pipelines." + k),
Paths: []dyn.Path{
dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(k), dyn.Key("schema")),
dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(k), dyn.Key("target")),
},
})
continue
case p.Schema != "" && p.Target != "":
locations := rb.Config().GetLocations("resources.pipelines." + k + ".schema")
locations = append(locations, rb.Config().GetLocations("resources.pipelines."+k+".target")...)

// The Databricks Terraform provider already has client side validation
// that does not allow this today. Having this here allows us to float
// this validation on `bundle validate` and provide location information.
diags = append(diags, diag.Diagnostic{
shreyas-goenka marked this conversation as resolved.
Show resolved Hide resolved
Severity: diag.Error,
Summary: "Both schema and target are defined in a Unity Catalog pipeline. Only one of them should be defined.",
Locations: locations,
Paths: []dyn.Path{
dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(k), dyn.Key("schema")),
dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(k), dyn.Key("target")),
},
})
continue
case p.Schema != "":
schemaName = p.Schema
fieldPath = dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(k), dyn.Key("schema"))
schemaLocation = rb.Config().GetLocations("resources.pipelines." + k + ".schema")
case p.Target != "":
schemaName = p.Target
fieldPath = dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(k), dyn.Key("target"))
schemaLocation = rb.Config().GetLocations("resources.pipelines." + k + ".target")
}

// Check if the schema is defined in the bundle.
matchLocations, matchPath, found := findSchemaInBundle(rb, p.Catalog, schemaName)
if !found {
continue
}

diags = append(diags, diag.Diagnostic{
Severity: diag.Warning,
Summary: fmt.Sprintf("Use ${%s.name} syntax to refer to the UC schema instead of directly using its name %q", matchPath, schemaName),
Detail: fmt.Sprintf(`Using ${%s.name} will allow DABs to capture the deploy time dependency this DLT pipeline
has on the schema %q and deploy changes to the schema before deploying the pipeline.`, matchPath, schemaName),
Locations: append(schemaLocation, matchLocations...),
Paths: []dyn.Path{
fieldPath,
matchPath,
},
})
}

for k, v := range rb.Config().Resources.Volumes {
shreyas-goenka marked this conversation as resolved.
Show resolved Hide resolved
if v.CatalogName == "" || v.SchemaName == "" {
continue
}

matchLocations, matchPath, found := findSchemaInBundle(rb, v.CatalogName, v.SchemaName)
if !found {
continue
}

fieldLocations := rb.Config().GetLocations("resources.volumes." + k + ".schema_name")

diags = append(diags, diag.Diagnostic{
Severity: diag.Warning,
Summary: fmt.Sprintf("Use ${%s.name} syntax to refer to the UC schema instead of directly using its name %q", matchPath, v.SchemaName),
Detail: fmt.Sprintf(`Using ${%s.name} will allow DABs to capture the deploy time dependency this Volume
has on the schema %q and deploy changes to the schema before deploying the Volume.`, matchPath, v.SchemaName),
Locations: append(matchLocations, fieldLocations...),
Paths: []dyn.Path{
dyn.NewPath(dyn.Key("resources"), dyn.Key("volumes"), dyn.Key(k), dyn.Key("schema")),
matchPath,
},
})
}

return diags
}
228 changes: 228 additions & 0 deletions bundle/config/validate/schema_references_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
package validate

import (
"context"
"testing"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/bundle/internal/bundletest"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/stretchr/testify/assert"
)

func TestValidateSchemaReferencesForPipelines(t *testing.T) {
pipelineTargetL := dyn.Location{File: "file1", Line: 1, Column: 1}
pipelineSchemaL := dyn.Location{File: "file2", Line: 2, Column: 2}
pipelineL := dyn.Location{File: "file3", Line: 3, Column: 3}
schemaL := dyn.Location{File: "file4", Line: 4, Column: 4}

for _, tc := range []struct {
schemaV string
targetV string
catalogV string
want diag.Diagnostics
}{
{
schemaV: "",
targetV: "",
catalogV: "",
want: diag.Diagnostics{},
},
{
schemaV: "",
targetV: "",
catalogV: "main",
want: diag.Diagnostics{{
Summary: "Unity Catalog pipeline should have a schema or target defined",
Severity: diag.Error,
Detail: `The target or schema field is required for UC pipelines. Reason: DLT
requires specifying a target schema for UC pipelines. Please use the
TEMPORARY keyword in the CREATE MATERIALIZED VIEW or CREATE STREAMING
TABLE statement if you do not wish to publish your dataset.`,
Locations: []dyn.Location{pipelineL},
Paths: []dyn.Path{
dyn.MustPathFromString("resources.pipelines.p1.schema"),
dyn.MustPathFromString("resources.pipelines.p1.target"),
},
}},
},
{
schemaV: "both",
targetV: "both",
catalogV: "main",
want: diag.Diagnostics{{
Severity: diag.Error,
Summary: "Both schema and target are defined in a Unity Catalog pipeline. Only one of them should be defined.",
Locations: []dyn.Location{pipelineSchemaL, pipelineTargetL},
Paths: []dyn.Path{
dyn.MustPathFromString("resources.pipelines.p1.schema"),
dyn.MustPathFromString("resources.pipelines.p1.target"),
},
}},
},
{
schemaV: "schema1",
targetV: "",
catalogV: "other",
want: diag.Diagnostics{},
},
{
schemaV: "schema1",
targetV: "",
catalogV: "main",
want: diag.Diagnostics{{
Severity: diag.Warning,
Summary: `Use ${resources.schemas.s1.name} syntax to refer to the UC schema instead of directly using its name "schema1"`,
Detail: `Using ${resources.schemas.s1.name} will allow DABs to capture the deploy time dependency this DLT pipeline
has on the schema "schema1" and deploy changes to the schema before deploying the pipeline.`,
Locations: []dyn.Location{pipelineSchemaL, schemaL},
Paths: []dyn.Path{
dyn.MustPathFromString("resources.pipelines.p1.schema"),
dyn.MustPathFromString("resources.schemas.s1"),
},
}},
},
{
schemaV: "",
targetV: "schema1",
catalogV: "main",
want: diag.Diagnostics{{
Severity: diag.Warning,
Summary: `Use ${resources.schemas.s1.name} syntax to refer to the UC schema instead of directly using its name "schema1"`,
Detail: `Using ${resources.schemas.s1.name} will allow DABs to capture the deploy time dependency this DLT pipeline
has on the schema "schema1" and deploy changes to the schema before deploying the pipeline.`,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be painful to update this test if the message changes, but it's not nice to assert on the message.

I have added a "golden files" test utilities here #2025 , it's a good fit for this cases.

Full output is stored in the file and can be updated by running tests with TESTS_OUTPUT=OVERWRITE

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note, that PR only targets integration tests for now, so not ready for your use case yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine in this case since its unit tests that are colocated with the mutator that implements the functionality. It'll be a simple search and replace when updating.

Are you proposing to extend the golden file approach to mutators or unit tests? Possibly by serializing the diagnostics and persisting them in a file?

Locations: []dyn.Location{pipelineTargetL, schemaL},
Paths: []dyn.Path{
dyn.MustPathFromString("resources.pipelines.p1.target"),
dyn.MustPathFromString("resources.schemas.s1"),
},
}},
},
{
schemaV: "${resources.schemas.s1.name}",
targetV: "",
catalogV: "main",
want: diag.Diagnostics{},
},
{
schemaV: "",
targetV: "${resources.schemas.s1.name}",
catalogV: "main",
want: diag.Diagnostics{},
},
} {

b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Schemas: map[string]*resources.Schema{
"s1": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "main",
Name: "schema1",
},
},
},
Pipelines: map[string]*resources.Pipeline{
"p1": {
PipelineSpec: &pipelines.PipelineSpec{
Name: "abc",
Schema: tc.schemaV,
Target: tc.targetV,
Catalog: tc.catalogV,
},
},
},
},
},
}

bundletest.SetLocation(b, "resources.schemas.s1", []dyn.Location{schemaL})
bundletest.SetLocation(b, "resources.pipelines.p1", []dyn.Location{pipelineL})
if tc.schemaV != "" {
bundletest.SetLocation(b, "resources.pipelines.p1.schema", []dyn.Location{pipelineSchemaL})
}
if tc.targetV != "" {
bundletest.SetLocation(b, "resources.pipelines.p1.target", []dyn.Location{pipelineTargetL})
}

diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), SchemaReferences())
assert.Equal(t, tc.want, diags)
}
}

func TestValidateSchemaReferencesForVolumes(t *testing.T) {
schemaL := dyn.Location{File: "file1", Line: 1, Column: 1}
volumeSchemaL := dyn.Location{File: "file2", Line: 2, Column: 2}
for _, tc := range []struct {
catalogV string
schemaV string
want diag.Diagnostics
}{
{
catalogV: "main",
schemaV: "schema1",
want: diag.Diagnostics{{
Severity: diag.Warning,
Summary: `Use ${resources.schemas.s1.name} syntax to refer to the UC schema instead of directly using its name "schema1"`,
Detail: `Using ${resources.schemas.s1.name} will allow DABs to capture the deploy time dependency this Volume
has on the schema "schema1" and deploy changes to the schema before deploying the Volume.`,
Locations: []dyn.Location{schemaL, volumeSchemaL},
Paths: []dyn.Path{
dyn.MustPathFromString("resources.volumes.v1.schema"),
dyn.MustPathFromString("resources.schemas.s1"),
},
}},
},
{
catalogV: "main",
schemaV: "${resources.schemas.s1.name}",
want: diag.Diagnostics{},
},
{
catalogV: "main",
schemaV: "other",
want: diag.Diagnostics{},
},
{
catalogV: "other",
schemaV: "schema1",
want: diag.Diagnostics{},
},
} {
b := bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Schemas: map[string]*resources.Schema{
"s1": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "main",
Name: "schema1",
},
},
},
Volumes: map[string]*resources.Volume{
"v1": {
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
SchemaName: tc.schemaV,
CatalogName: tc.catalogV,
Name: "my_volume",
},
},
},
},
},
}

bundletest.SetLocation(&b, "resources.schemas.s1", []dyn.Location{schemaL})
bundletest.SetLocation(&b, "resources.volumes.v1.schema_name", []dyn.Location{volumeSchemaL})

diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(&b), SchemaReferences())
assert.Equal(t, tc.want, diags)
}
}
1 change: 1 addition & 0 deletions bundle/config/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func (v *validate) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics
JobTaskClusterSpec(),
ValidateFolderPermissions(),
SingleNodeCluster(),
SchemaReferences(),
))
}

Expand Down
Loading