Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions internal/testrunner/runners/system/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1093,8 +1093,8 @@ func (r *tester) prepareScenario(ctx context.Context, config *testConfig, stackC
}
scenario.kibanaDataStream = ds

scenario.indexTemplateName = BuildIndexTemplateName(ds, r.pkgManifest.Type, config.Vars)
scenario.dataStream = BuildDataStreamName(scenario.policyTemplateInput, ds, r.pkgManifest.Type, config.Vars)
scenario.indexTemplateName = BuildIndexTemplateName(ds, policyTemplate, r.pkgManifest.Type, config.Vars)
scenario.dataStream = BuildDataStreamName(scenario.policyTemplateInput, ds, policyTemplate, r.pkgManifest.Type, config.Vars)

r.cleanTestScenarioHandler = func(ctx context.Context) error {
logger.Debugf("Deleting data stream for testing %s", scenario.dataStream)
Expand Down Expand Up @@ -1263,8 +1263,8 @@ func (r *tester) prepareScenario(ctx context.Context, config *testConfig, stackC

// BuildIndexTemplateName builds the expected index template name that is installed in Elasticsearch
// when the package data stream is added to the policy.
func BuildIndexTemplateName(ds kibana.PackageDataStream, manType string, cfgVars common.MapStr) string {
dataStreamDataset := getExpectedDatasetForTest(manType, ds.Inputs[0].Streams[0].DataStream.Dataset, cfgVars)
func BuildIndexTemplateName(ds kibana.PackageDataStream, policyTemplate packages.PolicyTemplate, manType string, cfgVars common.MapStr) string {
dataStreamDataset := getExpectedDatasetForTest(manType, ds.Inputs[0].Streams[0].DataStream.Dataset, policyTemplate.Name, cfgVars)

indexTemplateName := fmt.Sprintf(
"%s-%s",
Expand All @@ -1276,8 +1276,8 @@ func BuildIndexTemplateName(ds kibana.PackageDataStream, manType string, cfgVars

// BuildDataStreamName builds the expected data stream name that is installed in Elasticsearch
// when the package data stream is added to the policy.
func BuildDataStreamName(policyTemplateInput string, ds kibana.PackageDataStream, manType string, cfgVars common.MapStr) string {
dataStreamDataset := getExpectedDatasetForTest(manType, ds.Inputs[0].Streams[0].DataStream.Dataset, cfgVars)
func BuildDataStreamName(policyTemplateInput string, ds kibana.PackageDataStream, policyTemplate packages.PolicyTemplate, manType string, cfgVars common.MapStr) string {
dataStreamDataset := getExpectedDatasetForTest(manType, ds.Inputs[0].Streams[0].DataStream.Dataset, policyTemplate.Name, cfgVars)

// Input packages using the otel collector input require to add a specific dataset suffix
if manType == "input" && policyTemplateInput == otelCollectorInputName {
Expand All @@ -1293,13 +1293,14 @@ func BuildDataStreamName(policyTemplateInput string, ds kibana.PackageDataStream
return dataStreamName
}

func getExpectedDatasetForTest(pkgType, dataset string, cfgVars common.MapStr) string {
func getExpectedDatasetForTest(pkgType, dataset, policyTemplateName string, cfgVars common.MapStr) string {
if pkgType == "input" {
// Input packages can set `data_stream.dataset` by convention to customize the dataset.
v, _ := cfgVars.GetValue("data_stream.dataset")
if ds, ok := v.(string); ok && ds != "" {
return ds
}
return policyTemplateName
}
return dataset
}
Expand Down Expand Up @@ -1768,6 +1769,7 @@ func (r *tester) expectedDatasets(scenario *scenarioTest, config *testConfig) ([
// get dataset directly from package policy added when preparing the scenario
expectedDataset := scenario.kibanaDataStream.Inputs[0].Streams[0].DataStream.Dataset
if r.pkgManifest.Type == "input" {
expectedDataset = scenario.policyTemplateName
if scenario.policyTemplateInput == otelCollectorInputName {
// Input packages whose input is `otelcol` must add the `.otel` suffix
// Example: httpcheck.metrics.otel
Expand Down Expand Up @@ -2015,22 +2017,26 @@ func createInputPackageDatastream(
},
}

dataset := fmt.Sprintf("%s.%s", pkg.Name, policyTemplate.Name)
streams := []kibana.Stream{
{
ID: fmt.Sprintf("%s-%s.%s", policyTemplate.Input, pkg.Name, policyTemplate.Name),
Enabled: true,
DataStream: kibana.DataStream{
Type: policyTemplate.Type,
Dataset: dataset,
Type: policyTemplate.Type,
// This dataset is the one Fleet uses to identify the stream,
// it must be <package name>.<policy template name>. This is not
// the same as the dataset used for the index template, configured
// with vars below.
Dataset: fmt.Sprintf("%s.%s", pkg.Name, policyTemplate.Name),
},
},
}

// Add policyTemplate-level vars.
vars := setKibanaVariables(policyTemplate.Vars, cfgVars)
if _, found := vars["data_stream.dataset"]; !found {
dataStreamDataset := dataset
// Fleet uses the policy template name as default dataset for input packages, do the same.
dataStreamDataset := policyTemplate.Name
v, _ := cfgVars.GetValue("data_stream.dataset")
if dataset, ok := v.(string); ok && dataset != "" {
dataStreamDataset = dataset
Expand Down