Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 41 additions & 11 deletions code/go/internal/validator/semantic/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,15 @@ type processor struct {
position position
}

func (p *processor) GetAttributeString(key string) (string, bool) {
s, ok := p.Attributes[key].(string)
if !ok {
return "", false
}

return s, true
}

func (p *processor) UnmarshalYAML(value *yaml.Node) error {
var procMap map[string]struct {
Attributes map[string]any `yaml:",inline"`
Expand All @@ -133,6 +142,7 @@ func (p *processor) UnmarshalYAML(value *yaml.Node) error {

type ingestPipeline struct {
Processors []processor `yaml:"processors"`
OnFailure []processor `yaml:"on_failure"`
}

type field struct {
Expand Down Expand Up @@ -301,7 +311,7 @@ func readPipelinesFolder(fsys fspath.FS, pipelinesDir string) ([]string, error)
var pipelineFiles []string
entries, err := fs.ReadDir(fsys, pipelinesDir)
if errors.Is(err, os.ErrNotExist) {
return []string{}, nil
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("can't list pipelines directory (path: %s): %w", fsys.Path(pipelinesDir), err)
Expand Down Expand Up @@ -362,22 +372,42 @@ func listTransforms(fsys fspath.FS) ([]string, error) {

}

func listPipelineFiles(fsys fspath.FS, dataStream string) ([]pipelineFileMetadata, error) {
func listPipelineFiles(fsys fspath.FS) ([]pipelineFileMetadata, error) {
var pipelineFileMetadatas []pipelineFileMetadata

ingestPipelineDir := path.Join(dataStreamDir, dataStream, "elasticsearch", "ingest_pipeline")
pipelineFiles, err := readPipelinesFolder(fsys, ingestPipelineDir)
if err != nil {
return nil, fmt.Errorf("cannot read pipeline files from integration package: %w", err)
type pipelineDirMetadata struct {
dir string
dataStream string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe if dataStream field from pipelineFileMetadata can be removed, this one too could be removed.

}

for _, file := range pipelineFiles {
pipelineFileMetadatas = append(pipelineFileMetadatas, pipelineFileMetadata{
filePath: file,
fullFilePath: fsys.Path(file),
dataStream: dataStream,
// Empty directory is used here to read ingest pipelines defined in the package root.
dirs := []pipelineDirMetadata{{dir: ""}}

dataStreams, err := listDataStreams(fsys)
if err != nil {
return nil, specerrors.ValidationErrors{specerrors.NewStructuredError(err, specerrors.UnassignedCode)}
}
for _, dataStream := range dataStreams {
dirs = append(dirs, pipelineDirMetadata{
dir: path.Join(dataStreamDir, dataStream),
dataStream: dataStream,
})
}

for _, d := range dirs {
pipelinePath := path.Join(d.dir, "elasticsearch", "ingest_pipeline")
pipelineFiles, err := readPipelinesFolder(fsys, pipelinePath)
if err != nil {
return nil, fmt.Errorf("cannot read pipeline files from integration package: %w", err)
}
for _, file := range pipelineFiles {
pipelineFileMetadatas = append(pipelineFileMetadatas, pipelineFileMetadata{
filePath: file,
fullFilePath: fsys.Path(file),
dataStream: d.dataStream,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this dataStream field form pipelineFileMetadata struct used ?
Taking a look at #1010 and this PR, it looks like it is not. Could you check it? If not, should it be kept or used somewhere?

Copy link
Contributor Author

@taylor-swanson taylor-swanson Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I haven't used the dataStream field yet, I've added it to keep it consistent with listFieldsFiles and fieldFileMetadata, which is what this function and struct were based on.

})
}
}

return pipelineFileMetadatas, nil
}
135 changes: 135 additions & 0 deletions code/go/internal/validator/semantic/validate_pipeline_on_failure.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package semantic

import (
"fmt"
"io/fs"
"strings"

"gopkg.in/yaml.v3"

"github.com/elastic/package-spec/v3/code/go/internal/fspath"
"github.com/elastic/package-spec/v3/code/go/pkg/specerrors"
)

var requiredMessageValues = []string{
"_ingest.on_failure_processor_type",
"_ingest.on_failure_processor_tag",
"_ingest.on_failure_message",
"_ingest.pipeline",
}

// ValidatePipelineOnFailure validates ingest pipeline global on_failure handlers.
func ValidatePipelineOnFailure(fsys fspath.FS) specerrors.ValidationErrors {

var errs specerrors.ValidationErrors
pipelineFiles, err := listPipelineFiles(fsys)
if err != nil {
return specerrors.ValidationErrors{specerrors.NewStructuredError(err, specerrors.UnassignedCode)}
}

for _, pipelineFile := range pipelineFiles {
content, err := fs.ReadFile(fsys, pipelineFile.filePath)
if err != nil {
return specerrors.ValidationErrors{specerrors.NewStructuredError(err, specerrors.UnassignedCode)}
}

var pipeline ingestPipeline
if err = yaml.Unmarshal(content, &pipeline); err != nil {
return specerrors.ValidationErrors{specerrors.NewStructuredError(err, specerrors.UnassignedCode)}
}

if vErrs := validatePipelineOnFailure(&pipeline, pipelineFile.fullFilePath); len(vErrs) > 0 {
errs = append(errs, vErrs...)
}
}

return errs
}

func validatePipelineOnFailure(pipeline *ingestPipeline, filename string) specerrors.ValidationErrors {
var errs specerrors.ValidationErrors

if e := checkSetEventKind(pipeline, filename); len(e) > 0 {
errs = append(errs, e...)
}
if e := checkSetErrorMessage(pipeline, filename); len(e) > 0 {
errs = append(errs, e...)
}

return errs
}

func checkSetEventKind(pipeline *ingestPipeline, filename string) specerrors.ValidationErrors {
var errs specerrors.ValidationErrors
var found bool

for _, proc := range pipeline.OnFailure {
if proc.Type != "set" {
continue
}
if s, ok := proc.GetAttributeString("field"); !ok || s != "event.kind" {
continue
}

found = true

if s, ok := proc.GetAttributeString("value"); !ok || s != "pipeline_error" {
errs = append(errs, specerrors.NewStructuredError(
fmt.Errorf("file %q is invalid: pipeline on_failure handler must set event.kind to \"pipeline_error\"", filename),
specerrors.CodePipelineOnFailureEventKind),
)
}

break
}

if !found {
errs = append(errs, specerrors.NewStructuredError(
fmt.Errorf("file %q is invalid: pipeline on_failure handler must set event.kind to \"pipeline_error\"", filename),
specerrors.CodePipelineOnFailureEventKind),
)
}

return errs
}

func checkSetErrorMessage(pipeline *ingestPipeline, filename string) specerrors.ValidationErrors {
var errs specerrors.ValidationErrors
var found bool

for _, proc := range pipeline.OnFailure {
if proc.Type != "set" && proc.Type != "append" {
continue
}
if s, ok := proc.GetAttributeString("field"); !ok || s != "error.message" {
continue
}

found = true

value, _ := proc.GetAttributeString("value")
for _, reqMessageValue := range requiredMessageValues {
if !strings.Contains(value, reqMessageValue) {
errs = append(errs, specerrors.NewStructuredError(
fmt.Errorf("file %q is invalid: pipeline on_failure error.message must include %q", filename, reqMessageValue),
specerrors.CodePipelineOnFailureMessage),
)
}
}

break
}

if !found {
errs = append(errs, specerrors.NewStructuredError(
fmt.Errorf("file %q is invalid: pipeline on_failure handler must set error.message", filename),
specerrors.CodePipelineOnFailureMessage),
)
}

return errs
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package semantic

import (
"testing"

"gopkg.in/yaml.v3"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestValidatePipelineOnFailure(t *testing.T) {
testCases := []struct {
name string
pipeline string
errors []string
}{
{
name: "good-set",
pipeline: `
on_failure:
- set:
field: event.kind
value: pipeline_error
- set:
field: error.message
value: >-
Processor '{{{ _ingest.on_failure_processor_type }}}'
with tag '{{{ _ingest.on_failure_processor_tag }}}'
in pipeline '{{{ _ingest.pipeline }}}'
failed with message '{{{ _ingest.on_failure_message }}}'
`,
},
{
name: "good-append",
pipeline: `
on_failure:
- set:
field: event.kind
value: pipeline_error
- append:
field: error.message
value: >-
Processor '{{{ _ingest.on_failure_processor_type }}}'
with tag '{{{ _ingest.on_failure_processor_tag }}}'
in pipeline '{{{ _ingest.pipeline }}}'
failed with message '{{{ _ingest.on_failure_message }}}'
`,
},
{
name: "bad-event-kind-missing",
pipeline: `
on_failure:
- append:
field: error.message
value: >-
Processor '{{{ _ingest.on_failure_processor_type }}}'
with tag '{{{ _ingest.on_failure_processor_tag }}}'
in pipeline '{{{ _ingest.pipeline }}}'
failed with message '{{{ _ingest.on_failure_message }}}'
`,
errors: []string{
`file "default.yml" is invalid: pipeline on_failure handler must set event.kind to "pipeline_error" (SVR00008)`,
},
},
{
name: "bad-event-kind-wrong-value",
pipeline: `
on_failure:
- set:
field: event.kind
value: event
- append:
field: error.message
value: >-
Processor '{{{ _ingest.on_failure_processor_type }}}'
with tag '{{{ _ingest.on_failure_processor_tag }}}'
in pipeline '{{{ _ingest.pipeline }}}'
failed with message '{{{ _ingest.on_failure_message }}}'
`,
errors: []string{
`file "default.yml" is invalid: pipeline on_failure handler must set event.kind to "pipeline_error" (SVR00008)`,
},
},
{
name: "bad-error-message-missing",
pipeline: `
on_failure:
- set:
field: event.kind
value: pipeline_error
`,
errors: []string{
`file "default.yml" is invalid: pipeline on_failure handler must set error.message (SVR00009)`,
},
},
{
name: "bad-error-message-wrong-value",
pipeline: `
on_failure:
- set:
field: event.kind
value: pipeline_error
- set:
field: error.message
value: Pipeline failed
`,
errors: []string{
`file "default.yml" is invalid: pipeline on_failure error.message must include "_ingest.on_failure_processor_type" (SVR00009)`,
`file "default.yml" is invalid: pipeline on_failure error.message must include "_ingest.on_failure_processor_tag" (SVR00009)`,
`file "default.yml" is invalid: pipeline on_failure error.message must include "_ingest.on_failure_message" (SVR00009)`,
`file "default.yml" is invalid: pipeline on_failure error.message must include "_ingest.pipeline" (SVR00009)`,
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var pipeline ingestPipeline
err := yaml.Unmarshal([]byte(tc.pipeline), &pipeline)
require.NoError(t, err)

errors := validatePipelineOnFailure(&pipeline, "default.yml")
assert.Len(t, errors, len(tc.errors))
for _, err := range errors {
assert.Contains(t, tc.errors, err.Error())
}
})
}
}
27 changes: 10 additions & 17 deletions code/go/internal/validator/semantic/validate_pipeline_tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,25 @@ import (

// ValidatePipelineTags validates ingest pipeline processor tags.
func ValidatePipelineTags(fsys fspath.FS) specerrors.ValidationErrors {
dataStreams, err := listDataStreams(fsys)
var errors specerrors.ValidationErrors
pipelineFiles, err := listPipelineFiles(fsys)
if err != nil {
return specerrors.ValidationErrors{specerrors.NewStructuredError(err, specerrors.UnassignedCode)}
}

var errors specerrors.ValidationErrors
for _, dataStream := range dataStreams {
pipelineFiles, err := listPipelineFiles(fsys, dataStream)
for _, pipelineFile := range pipelineFiles {
content, err := fs.ReadFile(fsys, pipelineFile.filePath)
if err != nil {
return specerrors.ValidationErrors{specerrors.NewStructuredError(err, specerrors.UnassignedCode)}
}

for _, pipelineFile := range pipelineFiles {
content, err := fs.ReadFile(fsys, pipelineFile.filePath)
if err != nil {
return specerrors.ValidationErrors{specerrors.NewStructuredError(err, specerrors.UnassignedCode)}
}

var pipeline ingestPipeline
if err = yaml.Unmarshal(content, &pipeline); err != nil {
return specerrors.ValidationErrors{specerrors.NewStructuredError(err, specerrors.UnassignedCode)}
}
var pipeline ingestPipeline
if err = yaml.Unmarshal(content, &pipeline); err != nil {
return specerrors.ValidationErrors{specerrors.NewStructuredError(err, specerrors.UnassignedCode)}
}

if vErrs := validatePipelineTags(&pipeline, pipelineFile.fullFilePath); len(vErrs) > 0 {
errors = append(errors, vErrs...)
}
if vErrs := validatePipelineTags(&pipeline, pipelineFile.fullFilePath); len(vErrs) > 0 {
errors = append(errors, vErrs...)
}
}

Expand Down
Loading