Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions code/go/internal/validator/semantic/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,15 @@ type processor struct {
position position
}

func (p *processor) GetAttributeString(key string) (string, bool) {
s, ok := p.Attributes[key].(string)
if !ok {
return "", false
}

return s, true
}

func (p *processor) UnmarshalYAML(value *yaml.Node) error {
var procMap map[string]struct {
Attributes map[string]any `yaml:",inline"`
Expand All @@ -133,6 +142,7 @@ func (p *processor) UnmarshalYAML(value *yaml.Node) error {

type ingestPipeline struct {
Processors []processor `yaml:"processors"`
OnFailure []processor `yaml:"on_failure"`
}

type field struct {
Expand Down
141 changes: 141 additions & 0 deletions code/go/internal/validator/semantic/validate_pipeline_on_failure.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package semantic

import (
"fmt"
"io/fs"
"strings"

"gopkg.in/yaml.v3"

"github.com/elastic/package-spec/v3/code/go/internal/fspath"
"github.com/elastic/package-spec/v3/code/go/pkg/specerrors"
)

var requiredMessageValues = []string{
"_ingest.on_failure_processor_type",
"_ingest.on_failure_processor_tag",
"_ingest.on_failure_message",
"_ingest.pipeline",
}

// ValidatePipelineOnFailure validates ingest pipeline global on_failure handlers.
func ValidatePipelineOnFailure(fsys fspath.FS) specerrors.ValidationErrors {
dataStreams, err := listDataStreams(fsys)
if err != nil {
return specerrors.ValidationErrors{specerrors.NewStructuredError(err, specerrors.UnassignedCode)}
}

var errs specerrors.ValidationErrors
for _, dataStream := range dataStreams {
pipelineFiles, err := listPipelineFiles(fsys, dataStream)
if err != nil {
return specerrors.ValidationErrors{specerrors.NewStructuredError(err, specerrors.UnassignedCode)}
}

for _, pipelineFile := range pipelineFiles {
content, err := fs.ReadFile(fsys, pipelineFile.filePath)
if err != nil {
return specerrors.ValidationErrors{specerrors.NewStructuredError(err, specerrors.UnassignedCode)}
}

var pipeline ingestPipeline
if err = yaml.Unmarshal(content, &pipeline); err != nil {
return specerrors.ValidationErrors{specerrors.NewStructuredError(err, specerrors.UnassignedCode)}
}

if vErrs := validatePipelineOnFailure(&pipeline, pipelineFile.fullFilePath); len(vErrs) > 0 {
errs = append(errs, vErrs...)
}
}
}

return errs
}

func validatePipelineOnFailure(pipeline *ingestPipeline, filename string) specerrors.ValidationErrors {
var errs specerrors.ValidationErrors

if e := checkSetEventKind(pipeline, filename); len(e) > 0 {
errs = append(errs, e...)
}
if e := checkSetErrorMessage(pipeline, filename); len(e) > 0 {
errs = append(errs, e...)
}

return errs
}

func checkSetEventKind(pipeline *ingestPipeline, filename string) specerrors.ValidationErrors {
var errs specerrors.ValidationErrors
var found bool

for _, proc := range pipeline.OnFailure {
if proc.Type != "set" {
continue
}
if s, ok := proc.GetAttributeString("field"); !ok || s != "event.kind" {
continue
}

found = true

if s, ok := proc.GetAttributeString("value"); !ok || s != "pipeline_error" {
errs = append(errs, specerrors.NewStructuredError(
fmt.Errorf("file %q is invalid: pipeline on_failure handler must set event.kind to \"pipeline_error\"", filename),
specerrors.CodePipelineOnFailureEventKind),
)
}

break
}

if !found {
errs = append(errs, specerrors.NewStructuredError(
fmt.Errorf("file %q is invalid: pipeline on_failure handler must set event.kind to \"pipeline_error\"", filename),
specerrors.CodePipelineOnFailureEventKind),
)
}

return errs
}

func checkSetErrorMessage(pipeline *ingestPipeline, filename string) specerrors.ValidationErrors {
var errs specerrors.ValidationErrors
var found bool

for _, proc := range pipeline.OnFailure {
if proc.Type != "set" && proc.Type != "append" {
continue
}
if s, ok := proc.GetAttributeString("field"); !ok || s != "error.message" {
continue
}

found = true

value, _ := proc.GetAttributeString("value")
for _, reqMessageValue := range requiredMessageValues {
if !strings.Contains(value, reqMessageValue) {
errs = append(errs, specerrors.NewStructuredError(
fmt.Errorf("file %q is invalid: pipeline on_failure error.message must include %q", filename, reqMessageValue),
specerrors.CodePipelineOnFailureMessage),
)
}
}

break
}

if !found {
errs = append(errs, specerrors.NewStructuredError(
fmt.Errorf("file %q is invalid: pipeline on_failure handler must set error.message", filename),
specerrors.CodePipelineOnFailureMessage),
)
}

return errs
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package semantic

import (
"testing"

"gopkg.in/yaml.v3"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestValidatePipelineOnFailure(t *testing.T) {
testCases := []struct {
name string
pipeline string
errors []string
}{
{
name: "good-set",
pipeline: `
on_failure:
- set:
field: event.kind
value: pipeline_error
- set:
field: error.message
value: >-
Processor '{{{ _ingest.on_failure_processor_type }}}'
with tag '{{{ _ingest.on_failure_processor_tag }}}'
in pipeline '{{{ _ingest.pipeline }}}'
failed with message '{{{ _ingest.on_failure_message }}}'
`,
},
{
name: "good-append",
pipeline: `
on_failure:
- set:
field: event.kind
value: pipeline_error
- append:
field: error.message
value: >-
Processor '{{{ _ingest.on_failure_processor_type }}}'
with tag '{{{ _ingest.on_failure_processor_tag }}}'
in pipeline '{{{ _ingest.pipeline }}}'
failed with message '{{{ _ingest.on_failure_message }}}'
`,
},
{
name: "bad-event-kind-missing",
pipeline: `
on_failure:
- append:
field: error.message
value: >-
Processor '{{{ _ingest.on_failure_processor_type }}}'
with tag '{{{ _ingest.on_failure_processor_tag }}}'
in pipeline '{{{ _ingest.pipeline }}}'
failed with message '{{{ _ingest.on_failure_message }}}'
`,
errors: []string{
`file "default.yml" is invalid: pipeline on_failure handler must set event.kind to "pipeline_error" (SVR00007)`,
},
},
{
name: "bad-event-kind-wrong-value",
pipeline: `
on_failure:
- set:
field: event.kind
value: event
- append:
field: error.message
value: >-
Processor '{{{ _ingest.on_failure_processor_type }}}'
with tag '{{{ _ingest.on_failure_processor_tag }}}'
in pipeline '{{{ _ingest.pipeline }}}'
failed with message '{{{ _ingest.on_failure_message }}}'
`,
errors: []string{
`file "default.yml" is invalid: pipeline on_failure handler must set event.kind to "pipeline_error" (SVR00007)`,
},
},
{
name: "bad-error-message-missing",
pipeline: `
on_failure:
- set:
field: event.kind
value: pipeline_error
`,
errors: []string{
`file "default.yml" is invalid: pipeline on_failure handler must set error.message (SVR00008)`,
},
},
{
name: "bad-error-message-wrong-value",
pipeline: `
on_failure:
- set:
field: event.kind
value: pipeline_error
- set:
field: error.message
value: Pipeline failed
`,
errors: []string{
`file "default.yml" is invalid: pipeline on_failure error.message must include "_ingest.on_failure_processor_type" (SVR00008)`,
`file "default.yml" is invalid: pipeline on_failure error.message must include "_ingest.on_failure_processor_tag" (SVR00008)`,
`file "default.yml" is invalid: pipeline on_failure error.message must include "_ingest.on_failure_message" (SVR00008)`,
`file "default.yml" is invalid: pipeline on_failure error.message must include "_ingest.pipeline" (SVR00008)`,
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var pipeline ingestPipeline
err := yaml.Unmarshal([]byte(tc.pipeline), &pipeline)
require.NoError(t, err)

errors := validatePipelineOnFailure(&pipeline, "default.yml")
assert.Len(t, errors, len(tc.errors))
for _, err := range errors {
assert.Contains(t, tc.errors, err.Error())
}
})
}
}
1 change: 1 addition & 0 deletions code/go/internal/validator/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ func (s Spec) rules(pkgType string, rootSpec spectypes.ItemSpec) validationRules
{fn: semantic.ValidateMinimumAgentVersion},
{fn: semantic.ValidateIntegrationPolicyTemplates, types: []string{"integration"}},
{fn: semantic.ValidatePipelineTags, types: []string{"integration"}, since: semver.MustParse("3.6.0")},
{fn: semantic.ValidatePipelineOnFailure, types: []string{"integration"}, since: semver.MustParse("3.6.0")},
}

var validationRules validationRules
Expand Down
2 changes: 2 additions & 0 deletions code/go/pkg/specerrors/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ const (
CodeVisualizationByValue = "SVR00004"
CodeMinimumKibanaVersion = "SVR00005"
CodePipelineTagRequired = "SVR00006"
CodePipelineOnFailureEventKind = "SVR00007"
CodePipelineOnFailureMessage = "SVR00008"
)
13 changes: 13 additions & 0 deletions code/go/pkg/validator/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,19 @@ func TestValidateIngestPipelines(t *testing.T) {
"set processor at line 15 has duplicate tag value: \"set_sample_field\"",
},
},
"bad_pipeline_on_failure": {
"missing": []string{
`pipeline on_failure handler must set event.kind to "pipeline_error" (SVR00007)`,
`pipeline on_failure handler must set error.message (SVR00008)`,
},
"incorrect": []string{
`pipeline on_failure handler must set event.kind to "pipeline_error" (SVR00007)`,
`pipeline on_failure error.message must include "_ingest.on_failure_processor_type" (SVR00008)`,
`pipeline on_failure error.message must include "_ingest.on_failure_processor_tag" (SVR00008)`,
`pipeline on_failure error.message must include "_ingest.on_failure_message" (SVR00008)`,
`pipeline on_failure error.message must include "_ingest.pipeline" (SVR00008)`,
},
},
}

for pkgName, pipelines := range tests {
Expand Down
Loading