diff --git a/changelog/fragments/1761679059-move-component-workdir-coordinator.yaml b/changelog/fragments/1761679059-move-component-workdir-coordinator.yaml new file mode 100644 index 00000000000..b006fbafd51 --- /dev/null +++ b/changelog/fragments/1761679059-move-component-workdir-coordinator.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# Change summary; a 80ish characters long description of the change. +summary: Fix issue where switching to OTEL runtime would cause data to be re-ingested + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: elastic-agent + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +#pr: https://github.com/owner/repo/1234 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +#issue: https://github.com/owner/repo/1234 diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index ed4f5f65b4f..a62bdb5d93f 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -941,10 +941,15 @@ func (c *Coordinator) watchRuntimeComponents( state := make(map[string]runtime.ComponentState) for { + var componentStates []runtime.ComponentComponentState select { case <-ctx.Done(): return case componentState := <-runtimeComponentStates: + componentStates = append(componentStates, componentState) + case componentStates = <-otelComponentStates: + } + for _, componentState := range componentStates { logComponentStateChange(c.logger, state, &componentState) // Forward the final changes back to Coordinator, unless our context // has ended. @@ -953,21 +958,23 @@ func (c *Coordinator) watchRuntimeComponents( case <-ctx.Done(): return } - case componentStates := <-otelComponentStates: - for _, componentState := range componentStates { - logComponentStateChange(c.logger, state, &componentState) - // Forward the final changes back to Coordinator, unless our context - // has ended. - select { - case c.managerChans.runtimeManagerUpdate <- componentState: - case <-ctx.Done(): - return - } - } } } } +// ensureComponentWorkDirs ensures the component working directories exist for current components. This method is +// idempotent. +func (c *Coordinator) ensureComponentWorkDirs() error { + for _, comp := range c.componentModel { + c.logger.Debugf("Ensuring a working directory exists for component: %s", comp.ID) + err := comp.PrepareWorkDir(paths.Run()) + if err != nil { + return fmt.Errorf("preparing a working directory for component %s failed: %w", comp.ID, err) + } + } + return nil +} + // logComponentStateChange emits a log message based on the new component state. func logComponentStateChange( logger *logger.Logger, @@ -1750,6 +1757,12 @@ func (c *Coordinator) refreshComponentModel(ctx context.Context) (err error) { return fmt.Errorf("generating component model: %w", err) } + // ensure all components have working directories + err = c.ensureComponentWorkDirs() + if err != nil { + return fmt.Errorf("ensuring component work dirs exists: %w", err) + } + signed, err := component.SignedFromPolicy(c.derivedConfig) if err != nil { if !errors.Is(err, component.ErrNotFound) { diff --git a/internal/pkg/agent/application/coordinator/coordinator_state.go b/internal/pkg/agent/application/coordinator/coordinator_state.go index 974c1c539f9..71b5d7d3d71 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_state.go +++ b/internal/pkg/agent/application/coordinator/coordinator_state.go @@ -6,10 +6,14 @@ package coordinator import ( "fmt" + "slices" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" "go.opentelemetry.io/collector/component/componentstatus" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" + "github.com/elastic/elastic-agent/pkg/component" + "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-libs/logp" @@ -176,6 +180,17 @@ func (c *Coordinator) applyComponentState(state runtime.ComponentComponentState) break } } + // If the component was stopped and isn't listed among components the coordinator knows about, it's safe to + // remove its working directory. Otherwise, we may just be moving it between runtimes. + if !slices.ContainsFunc(c.componentModel, func(c component.Component) bool { + return state.Component.ID == c.ID + }) { + c.logger.Debugf("removing working directory for component '%s'", state.Component.ID) + err := state.Component.RemoveWorkDir(paths.Run()) + if err != nil { + c.logger.Warnf("failed to remove workdir for component %s: %v", state.Component.ID, err) + } + } } c.stateNeedsRefresh = true diff --git a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go index 272bf555bcd..99491680787 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go @@ -1151,6 +1151,174 @@ service: } +func TestCoordinatorManagesComponentWorkDirs(t *testing.T) { + // Send a test policy to the Coordinator as a Config Manager update, + // verify it creates a working directory for the component, keeps that working directory as the component + // moves to a different runtime, then deletes it after the component is stopped. + top := paths.Top() + paths.SetTop(t.TempDir()) + t.Cleanup(func() { + paths.SetTop(top) + }) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + logger := logp.NewLogger("testing") + + configChan := make(chan ConfigChange, 1) + updateChan := make(chan runtime.ComponentComponentState, 1) + + // Create a mocked runtime manager that will report the update call + runtimeManager := &fakeRuntimeManager{} + otelManager := &fakeOTelManager{} + + // we need the filestream spec to be able to convert to Otel config + componentSpec := component.InputRuntimeSpec{ + InputType: "filestream", + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Name: "filestream", + Command: &component.CommandSpec{ + Args: []string{"filebeat"}, + }, + Platforms: []string{ + "linux/amd64", + "linux/arm64", + "darwin/amd64", + "darwin/arm64", + "windows/amd64", + "container/amd64", + "container/arm64", + }, + }, + } + + platform, err := component.LoadPlatformDetail() + require.NoError(t, err) + specs, err := component.NewRuntimeSpecs(platform, []component.InputRuntimeSpec{componentSpec}) + require.NoError(t, err) + + monitoringMgr := newTestMonitoringMgr() + coord := &Coordinator{ + logger: logger, + agentInfo: &info.AgentInfo{}, + stateBroadcaster: broadcaster.New(State{}, 0, 0), + managerChans: managerChans{ + configManagerUpdate: configChan, + runtimeManagerUpdate: updateChan, + }, + monitorMgr: monitoringMgr, + runtimeMgr: runtimeManager, + otelMgr: otelManager, + specs: specs, + vars: emptyVars(t), + componentPIDTicker: time.NewTicker(time.Second * 30), + secretMarkerFunc: testSecretMarkerFunc, + } + + var workDirPath string + var workDirCreated time.Time + + t.Run("run in process manager", func(t *testing.T) { + // Create a policy with one input and one output (no otel configuration) + cfg := config.MustNewConfigFrom(` +outputs: + default: + type: elasticsearch + hosts: + - localhost:9200 +inputs: + - id: test-input + type: filestream + use_output: default + _runtime_experimental: process +`) + + // Send the policy change and make sure it was acknowledged. + cfgChange := &configChange{cfg: cfg} + configChan <- cfgChange + coord.runLoopIteration(ctx) + assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change") + assert.NoError(t, cfgChange.err, "config processing shouldn't report an error") + require.Len(t, coord.componentModel, 1, "there should be one component") + workDirPath = coord.componentModel[0].WorkDirPath(paths.Run()) + stat, err := os.Stat(workDirPath) + require.NoError(t, err, "component working directory should exist") + assert.True(t, stat.IsDir(), "component working directory should exist") + workDirCreated = stat.ModTime() + }) + + t.Run("run in otel manager", func(t *testing.T) { + // Create a policy with one input and one output (no otel configuration) + cfg := config.MustNewConfigFrom(` +outputs: + default: + type: elasticsearch + hosts: + - localhost:9200 +inputs: + - id: test-input + type: filestream + use_output: default + _runtime_experimental: otel +`) + + // Send the policy change and make sure it was acknowledged. + cfgChange := &configChange{cfg: cfg} + configChan <- cfgChange + coord.runLoopIteration(ctx) + assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change") + assert.NoError(t, cfgChange.err, "config processing shouldn't report an error") + require.Len(t, coord.componentModel, 1, "there should be one component") + compState := runtime.ComponentComponentState{ + Component: component.Component{ + ID: "filestream-default", + }, + State: runtime.ComponentState{ + State: client.UnitStateStopped, + }, + } + updateChan <- compState + coord.runLoopIteration(ctx) + stat, err := os.Stat(workDirPath) + require.NoError(t, err, "component working directory should exist") + assert.True(t, stat.IsDir(), "component working directory should exist") + assert.Equal(t, workDirCreated, stat.ModTime(), "component working directory shouldn't have been modified") + }) + t.Run("remove component", func(t *testing.T) { + // Create a policy with one input and one output (no otel configuration) + cfg := config.MustNewConfigFrom(` +outputs: + default: + type: elasticsearch + hosts: + - localhost:9200 +inputs: [] +`) + + // Send the policy change and make sure it was acknowledged. + cfgChange := &configChange{cfg: cfg} + configChan <- cfgChange + coord.runLoopIteration(ctx) + assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change") + assert.NoError(t, cfgChange.err, "config processing shouldn't report an error") + require.Len(t, coord.componentModel, 0, "there should be one component") + + compState := runtime.ComponentComponentState{ + Component: component.Component{ + ID: "filestream-default", + }, + State: runtime.ComponentState{ + State: client.UnitStateStopped, + }, + } + updateChan <- compState + coord.runLoopIteration(ctx) + assert.NoDirExists(t, workDirPath, "component working directory shouldn't exist anymore") + }) + +} + func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) { // Set a one-second timeout -- nothing here should block, but if it // does let's report a failure instead of timing out the test runner. diff --git a/pkg/component/component.go b/pkg/component/component.go index 79cc677fd8e..ec8d3d333a8 100644 --- a/pkg/component/component.go +++ b/pkg/component/component.go @@ -9,6 +9,9 @@ import ( "errors" "fmt" "maps" + "os" + "path/filepath" + "runtime" "slices" "sort" "strings" @@ -39,6 +42,7 @@ const ( defaultUnitLogLevel = client.UnitLogLevelInfo headersKey = "headers" elasticsearchType = "elasticsearch" + workDirPathMod = 0770 ProcessRuntimeManager = RuntimeManager("process") OtelRuntimeManager = RuntimeManager("otel") DefaultRuntimeManager RuntimeManager = ProcessRuntimeManager @@ -238,6 +242,44 @@ func (c *Component) BinaryName() string { return "" } +// WorkDirName returns the name of the component's working directory. +func (c *Component) WorkDirName() string { + return c.ID +} + +// WorkDirPath returns the full path of the component's working directory, placing it under the provided parent path. +func (c *Component) WorkDirPath(parentDirPath string) string { + return filepath.Join(parentDirPath, c.WorkDirName()) +} + +// PrepareWorkDir prepares the component working directory under the provided parent path. This involves creating +// it under the right ownership and ACLs. This method is idempotent. +func (c *Component) PrepareWorkDir(parentDirPath string) error { + uid, gid := os.Geteuid(), os.Getegid() + path := c.WorkDirPath(parentDirPath) + err := os.MkdirAll(path, workDirPathMod) + if err != nil { + return fmt.Errorf("failed to create path %q: %w", path, err) + } + if runtime.GOOS == Windows { + return nil + } + err = os.Chown(path, uid, gid) + if err != nil { + return fmt.Errorf("failed to chown %q: %w", path, err) + } + err = os.Chmod(path, workDirPathMod) + if err != nil { + return fmt.Errorf("failed to chmod %q: %w", path, err) + } + return nil +} + +// RemoveWorkDir removes the component working directory under the provided parent path. This method is idempotent. +func (c *Component) RemoveWorkDir(parentDirPath string) error { + return os.RemoveAll(c.WorkDirPath(parentDirPath)) +} + // Model is the components model with signed policy data // This replaces former top level []Components with the top Model that captures signed policy data. // The signed data is a part of the policy since 8.8.0 release and contains the signed policy fragments and the signature that can be validated. diff --git a/pkg/component/component_test.go b/pkg/component/component_test.go index 0721f84473c..fcd990aa3d4 100644 --- a/pkg/component/component_test.go +++ b/pkg/component/component_test.go @@ -10,6 +10,7 @@ import ( "os" "path/filepath" "reflect" + "runtime" "sort" "strconv" "strings" @@ -3138,3 +3139,47 @@ func makeMapStructureErr(t *testing.T) error { } return mapstructure.Decode(data, &output) } + +func TestComponent_WorkDir(t *testing.T) { + c := &Component{ID: "my-component"} + + t.Run("WorkDirName", func(t *testing.T) { + require.Equal(t, "my-component", c.WorkDirName()) + }) + + t.Run("WorkDirPath", func(t *testing.T) { + require.Equal(t, filepath.Join("parent", "my-component"), c.WorkDirPath("parent")) + }) + + t.Run("Prepare and Remove", func(t *testing.T) { + tempDir := t.TempDir() + + // Prepare + err := c.PrepareWorkDir(tempDir) + require.NoError(t, err) + + workDir := c.WorkDirPath(tempDir) + info, err := os.Stat(workDir) + require.NoError(t, err) + require.True(t, info.IsDir()) + + // On non-windows, check permissions and ownership + // On windows, only directory creation is checked. + if runtime.GOOS != "windows" { + require.Equal(t, os.FileMode(workDirPathMod), info.Mode().Perm()) + } + + // Prepare again (idempotency) + err = c.PrepareWorkDir(tempDir) + require.NoError(t, err) + + // Remove + err = c.RemoveWorkDir(tempDir) + require.NoError(t, err) + assert.NoDirExists(t, workDir) + + // Remove again (idempotency) + err = c.RemoveWorkDir(tempDir) + require.NoError(t, err) + }) +} diff --git a/pkg/component/runtime/command.go b/pkg/component/runtime/command.go index 2abfb2254b8..23b549d60a3 100644 --- a/pkg/component/runtime/command.go +++ b/pkg/component/runtime/command.go @@ -11,7 +11,6 @@ import ( "os" "os/exec" "path/filepath" - "runtime" "strings" "time" @@ -34,8 +33,6 @@ const ( actionStop = actionMode(0) actionStart = actionMode(1) - runDirMod = 0770 - envAgentComponentID = "AGENT_COMPONENT_ID" envAgentComponentType = "AGENT_COMPONENT_TYPE" @@ -44,8 +41,6 @@ const ( func (m actionMode) String() string { switch m { - case actionTeardown: - return "teardown" case actionStop: return "stop" case actionStart: @@ -368,11 +363,8 @@ func (c *commandRuntime) start(comm Communicator) error { } env = append(env, fmt.Sprintf("%s=%s", envAgentComponentID, c.current.ID)) env = append(env, fmt.Sprintf("%s=%s", envAgentComponentType, c.getSpecType())) - uid, gid := os.Geteuid(), os.Getegid() - workDir, err := c.workDir(uid, gid) - if err != nil { - return err - } + uid := os.Geteuid() + workDir := c.current.WorkDirPath(paths.Run()) path, err := filepath.Abs(c.getSpecBinaryPath()) if err != nil { return fmt.Errorf("failed to determine absolute path: %w", err) @@ -388,9 +380,7 @@ func (c *commandRuntime) start(comm Communicator) error { args := c.monitor.EnrichArgs(c.current.ID, c.getSpecBinaryName(), cmdSpec.Args) // differentiate data paths - dataPath := filepath.Join(paths.Run(), c.current.ID) - _ = os.MkdirAll(dataPath, 0755) - args = append(args, "-E", "path.data="+dataPath) + args = append(args, "-E", "path.data="+workDir) // reset checkin state before starting the process. c.lastCheckin = time.Time{} @@ -478,40 +468,13 @@ func (c *commandRuntime) handleProc(state *os.ProcessState) bool { return true case actionStop, actionTeardown: // stopping (should have exited) - if c.actionState == actionTeardown { - // teardown so the entire component has been removed (cleanup work directory) - _ = os.RemoveAll(c.workDirPath()) - } + // Component workdir creation and deletion happens in the coordinator, nothing to do here. stopMsg := fmt.Sprintf("Stopped: pid '%d' exited with code '%d'", state.Pid(), state.ExitCode()) c.forceCompState(client.UnitStateStopped, stopMsg) } return false } -func (c *commandRuntime) workDirPath() string { - return filepath.Join(paths.Run(), c.current.ID) -} - -func (c *commandRuntime) workDir(uid int, gid int) (string, error) { - path := c.workDirPath() - err := os.MkdirAll(path, runDirMod) - if err != nil { - return "", fmt.Errorf("failed to create path %q: %w", path, err) - } - if runtime.GOOS == component.Windows { - return path, nil - } - err = os.Chown(path, uid, gid) - if err != nil { - return "", fmt.Errorf("failed to chown %q: %w", path, err) - } - err = os.Chmod(path, runDirMod) - if err != nil { - return "", fmt.Errorf("failed to chmod %q: %w", path, err) - } - return path, nil -} - func (c *commandRuntime) getSpecType() string { if c.current.InputSpec != nil { return c.current.InputSpec.InputType diff --git a/pkg/component/runtime/manager_fake_input_test.go b/pkg/component/runtime/manager_fake_input_test.go index e177994196d..472083f7173 100644 --- a/pkg/component/runtime/manager_fake_input_test.go +++ b/pkg/component/runtime/manager_fake_input_test.go @@ -8,7 +8,6 @@ import ( "context" "errors" "fmt" - "os" "path/filepath" "regexp" "runtime" @@ -138,6 +137,11 @@ func (suite *FakeInputSuite) TestManager_Features() { }, }, } + runPath := paths.Run() + require.NoError(t, comp.PrepareWorkDir(runPath)) + t.Cleanup(func() { + assert.NoError(t, comp.RemoveWorkDir(runPath)) + }) subscriptionCtx, subCancel := context.WithCancel(context.Background()) defer subCancel() @@ -338,6 +342,11 @@ func (suite *FakeInputSuite) TestManager_APM() { }, }, } + runPath := paths.Run() + require.NoError(t, comp.PrepareWorkDir(runPath)) + t.Cleanup(func() { + assert.NoError(t, comp.RemoveWorkDir(runPath)) + }) subscriptionCtx, subCancel := context.WithCancel(context.Background()) defer subCancel() @@ -610,6 +619,12 @@ func (suite *FakeInputSuite) TestManager_Limits() { Units: []component.Unit{}, } + runPath := paths.Run() + require.NoError(t, comp.PrepareWorkDir(runPath)) + t.Cleanup(func() { + assert.NoError(t, comp.RemoveWorkDir(runPath)) + }) + subscriptionCtx, subCancel := context.WithCancel(context.Background()) defer subCancel() subscriptionErrCh := make(chan error) @@ -774,6 +789,12 @@ func (suite *FakeInputSuite) TestManager_BadUnitToGood() { }, } + runPath := paths.Run() + require.NoError(t, comp.PrepareWorkDir(runPath)) + t.Cleanup(func() { + assert.NoError(t, comp.RemoveWorkDir(runPath)) + }) + subCtx, subCancel := context.WithCancel(context.Background()) defer subCancel() subErrCh := make(chan error) @@ -792,9 +813,10 @@ func (suite *FakeInputSuite) TestManager_BadUnitToGood() { } else { unit, ok := state.Units[ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "fake-input"}] if ok { - if unit.State == client.UnitStateFailed { + switch unit.State { + case client.UnitStateFailed: subErrCh <- fmt.Errorf("unit failed: %s", unit.Message) - } else if unit.State == client.UnitStateHealthy { + case client.UnitStateHealthy: // update the bad unit to be good; so it will transition to healthy updatedComp := comp updatedComp.Units = make([]component.Unit, len(comp.Units)) @@ -816,9 +838,9 @@ func (suite *FakeInputSuite) TestManager_BadUnitToGood() { if err != nil { subErrCh <- err } - } else if unit.State == client.UnitStateStopped || unit.State == client.UnitStateStarting { + case client.UnitStateStopped, client.UnitStateStarting: // acceptable - } else { + default: // unknown state that should not have occurred subErrCh <- fmt.Errorf("unit reported unexpected state: %v", unit.State) } @@ -832,24 +854,25 @@ func (suite *FakeInputSuite) TestManager_BadUnitToGood() { subErrCh <- errors.New("bad-input unit should be failed") } } else { - if unit.State == client.UnitStateFailed { + switch unit.State { + case client.UnitStateFailed: if unit.Message == "hard-error for config" { // still hard-error; wait for it to go healthy } else { subErrCh <- fmt.Errorf("unit failed: %s", unit.Message) } - } else if unit.State == client.UnitStateHealthy { + case client.UnitStateHealthy: // bad unit is now healthy; stop the component m.Update(component.Model{Components: []component.Component{}}) err := <-m.errCh if err != nil { subErrCh <- err } - } else if unit.State == client.UnitStateStopped { + case client.UnitStateStopped: subErrCh <- nil - } else if unit.State == client.UnitStateStarting { + case client.UnitStateStarting: // acceptable - } else { + default: // unknown state that should not have occurred subErrCh <- fmt.Errorf("unit reported unexpected state: %v", unit.State) } @@ -949,6 +972,14 @@ func (suite *FakeInputSuite) TestManager_GoodUnitToBad() { } goodUnitKey := ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "good-input"} + runPath := paths.Run() + for _, comp := range []component.Component{healthyComp, unhealthyComp} { + require.NoError(t, comp.PrepareWorkDir(runPath)) + t.Cleanup(func() { + assert.NoError(t, comp.RemoveWorkDir(runPath)) + }) + } + // Wait for Manager to start up timedWaitForReady(t, m, 1*time.Second) @@ -1067,6 +1098,7 @@ func noDeadlockTestComponent(t *testing.T, index int) component.Component { } func (suite *FakeInputSuite) TestManager_NoDeadlock() { + runPath := paths.Run() t := suite.T() // NOTE: This is a long-running test that spams the runtime managers `Update` function to try and // trigger a deadlock. This test takes 2 minutes to run trying to re-produce issue: @@ -1105,6 +1137,10 @@ func (suite *FakeInputSuite) TestManager_NoDeadlock() { defer close(updateResultChan) for i := 0; updateLoopCtx.Err() == nil; i++ { comp := noDeadlockTestComponent(t, i) + require.NoError(t, comp.PrepareWorkDir(runPath)) + t.Cleanup(func() { + assert.NoError(t, comp.RemoveWorkDir(runPath)) + }) m.Update(component.Model{Components: []component.Component{comp}}) err := <-m.errCh updateResultChan <- err @@ -1193,6 +1229,11 @@ func (suite *FakeInputSuite) TestManager_Configure() { }, }, } + runPath := paths.Run() + require.NoError(t, comp.PrepareWorkDir(runPath)) + t.Cleanup(func() { + assert.NoError(t, comp.RemoveWorkDir(runPath)) + }) subCtx, subCancel := context.WithCancel(context.Background()) defer subCancel() @@ -1210,9 +1251,10 @@ func (suite *FakeInputSuite) TestManager_Configure() { } else { unit, ok := state.Units[ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "fake-input"}] if ok { - if unit.State == client.UnitStateFailed { + switch unit.State { + case client.UnitStateFailed: subErrCh <- fmt.Errorf("unit failed: %s", unit.Message) - } else if unit.State == client.UnitStateHealthy { + case client.UnitStateHealthy: // update config to change the state to degraded comp.Units[0].Config = component.MustExpectedConfig(map[string]interface{}{ "type": "fake", @@ -1224,11 +1266,11 @@ func (suite *FakeInputSuite) TestManager_Configure() { if err != nil { subErrCh <- err } - } else if unit.State == client.UnitStateDegraded { + case client.UnitStateDegraded: subErrCh <- nil - } else if unit.State == client.UnitStateStarting { + case client.UnitStateStarting: // acceptable - } else { + default: // unknown state that should not have occurred subErrCh <- fmt.Errorf("unit reported unexpected state: %v", unit.State) } @@ -1325,6 +1367,11 @@ func (suite *FakeInputSuite) TestManager_RemoveUnit() { }, }, } + runPath := paths.Run() + require.NoError(t, comp.PrepareWorkDir(runPath)) + t.Cleanup(func() { + assert.NoError(t, comp.RemoveWorkDir(runPath)) + }) subCtx, subCancel := context.WithCancel(context.Background()) defer subCancel() @@ -1344,11 +1391,12 @@ func (suite *FakeInputSuite) TestManager_RemoveUnit() { } else { unit0, ok := state.Units[ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "fake-input-0"}] if ok { - if unit0.State == client.UnitStateFailed { + switch unit0.State { + case client.UnitStateFailed: subErrCh <- fmt.Errorf("unit 0 failed: %s", unit0.Message) - } else if unit0.State == client.UnitStateStarting || unit0.State == client.UnitStateHealthy { + case client.UnitStateStarting, client.UnitStateHealthy: // acceptable - } else { + default: // unknown state that should not have occurred subErrCh <- fmt.Errorf("unit 0 reported unexpected state: %v", unit0.State) } @@ -1357,9 +1405,10 @@ func (suite *FakeInputSuite) TestManager_RemoveUnit() { } unit1, ok := state.Units[ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "fake-input-1"}] if ok { - if unit1.State == client.UnitStateFailed { + switch unit1.State { + case client.UnitStateFailed: subErrCh <- fmt.Errorf("unit 1 failed: %s", unit1.Message) - } else if unit1.State == client.UnitStateHealthy { + case client.UnitStateHealthy: // unit1 is healthy lets remove it from the component comp.Units = comp.Units[0:1] m.Update(component.Model{Components: []component.Component{comp}}) @@ -1367,12 +1416,12 @@ func (suite *FakeInputSuite) TestManager_RemoveUnit() { if err != nil { subErrCh <- err } - } else if unit1.State == client.UnitStateStarting || unit1.State == client.UnitStateStopping { + case client.UnitStateStarting, client.UnitStateStopping: // acceptable - } else if unit1.State == client.UnitStateStopped { + case client.UnitStateStopped: // unit should have been reported stopped before being removed unit1Stopped = true - } else { + default: // unknown state that should not have occurred subErrCh <- fmt.Errorf("unit 1 reported unexpected state: %v", unit1.State) } @@ -1470,6 +1519,11 @@ func (suite *FakeInputSuite) TestManager_ActionState() { }, }, } + runPath := paths.Run() + require.NoError(t, comp.PrepareWorkDir(runPath)) + t.Cleanup(func() { + assert.NoError(t, comp.RemoveWorkDir(runPath)) + }) subCtx, subCancel := context.WithCancel(context.Background()) defer subCancel() @@ -1487,9 +1541,10 @@ func (suite *FakeInputSuite) TestManager_ActionState() { } else { unit, ok := state.Units[ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "fake-input"}] if ok { - if unit.State == client.UnitStateFailed { + switch unit.State { + case client.UnitStateFailed: subErrCh <- fmt.Errorf("unit failed: %s", unit.Message) - } else if unit.State == client.UnitStateHealthy { + case client.UnitStateHealthy: // must be called in a separate go routine because it cannot block receiving from the // subscription channel go func() { @@ -1503,12 +1558,12 @@ func (suite *FakeInputSuite) TestManager_ActionState() { subErrCh <- err } }() - } else if unit.State == client.UnitStateDegraded { + case client.UnitStateDegraded: // action set it to degraded subErrCh <- nil - } else if unit.State == client.UnitStateStarting { + case client.UnitStateStarting: // acceptable - } else { + default: // unknown state that should not have occurred subErrCh <- fmt.Errorf("unit reported unexpected state: %v", unit.State) } @@ -1595,6 +1650,11 @@ func (suite *FakeInputSuite) TestManager_Restarts() { }, }, } + runPath := paths.Run() + require.NoError(t, comp.PrepareWorkDir(runPath)) + t.Cleanup(func() { + assert.NoError(t, comp.RemoveWorkDir(runPath)) + }) subCtx, subCancel := context.WithCancel(context.Background()) defer subCancel() @@ -1616,11 +1676,12 @@ func (suite *FakeInputSuite) TestManager_Restarts() { } else { unit, ok := state.Units[ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "fake-input"}] if ok { - if unit.State == client.UnitStateFailed { + switch unit.State { + case client.UnitStateFailed: if !killed { subErrCh <- fmt.Errorf("unit failed: %s", unit.Message) } - } else if unit.State == client.UnitStateHealthy { + case client.UnitStateHealthy: // force the input to exit and it should be restarted if !killed { killed = true @@ -1642,9 +1703,9 @@ func (suite *FakeInputSuite) TestManager_Restarts() { // got back to healthy after kill subErrCh <- nil } - } else if unit.State == client.UnitStateStarting { + case client.UnitStateStarting: // acceptable - } else { + default: // unknown state that should not have occurred subErrCh <- fmt.Errorf("unit reported unexpected state: %v", unit.State) } @@ -1738,6 +1799,11 @@ func (suite *FakeInputSuite) TestManager_Restarts_ConfigKill() { }, }, } + runPath := paths.Run() + require.NoError(t, comp.PrepareWorkDir(runPath)) + t.Cleanup(func() { + assert.NoError(t, comp.RemoveWorkDir(runPath)) + }) subCtx, subCancel := context.WithCancel(context.Background()) defer subCancel() @@ -1759,11 +1825,12 @@ func (suite *FakeInputSuite) TestManager_Restarts_ConfigKill() { } else { unit, ok := state.Units[ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "fake-input"}] if ok { - if unit.State == client.UnitStateFailed { + switch unit.State { + case client.UnitStateFailed: if !killed { subErrCh <- fmt.Errorf("unit failed: %s", unit.Message) } - } else if unit.State == client.UnitStateHealthy { + case client.UnitStateHealthy: // force the input to exit and it should be restarted if !killed { killed = true @@ -1786,9 +1853,9 @@ func (suite *FakeInputSuite) TestManager_Restarts_ConfigKill() { // got back to healthy after kill subErrCh <- nil } - } else if unit.State == client.UnitStateStarting || unit.State == client.UnitStateStopped { + case client.UnitStateStarting, client.UnitStateStopped: // acceptable - } else { + default: // unknown state that should not have occurred subErrCh <- fmt.Errorf("unit reported unexpected state: %v", unit.State) } @@ -1883,6 +1950,11 @@ func (suite *FakeInputSuite) TestManager_KeepsRestarting() { }, }, } + runPath := paths.Run() + require.NoError(t, comp.PrepareWorkDir(runPath)) + t.Cleanup(func() { + assert.NoError(t, comp.RemoveWorkDir(runPath)) + }) subCtx, subCancel := context.WithCancel(context.Background()) defer subCancel() @@ -1904,10 +1976,11 @@ func (suite *FakeInputSuite) TestManager_KeepsRestarting() { } else { unit, ok := state.Units[ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "fake-input"}] if ok { - if unit.State == client.UnitStateFailed { + switch unit.State { + case client.UnitStateFailed: // unit should not be failed because we allow restart per period subErrCh <- fmt.Errorf("unit failed: %s", unit.Message) - } else if unit.State == client.UnitStateHealthy { + case client.UnitStateHealthy: if lastStoppedCount != stoppedCount { lastStoppedCount = stoppedCount @@ -1928,11 +2001,11 @@ func (suite *FakeInputSuite) TestManager_KeepsRestarting() { // got stopped 3 times and got back to healthy subErrCh <- nil } - } else if unit.State == client.UnitStateStarting { + case client.UnitStateStarting: // acceptable - } else if unit.State == client.UnitStateStopped { + case client.UnitStateStopped: stoppedCount += 1 - } else { + default: // unknown state that should not have occurred subErrCh <- fmt.Errorf("unit reported unexpected state: %v", unit.State) } @@ -2028,6 +2101,11 @@ func (suite *FakeInputSuite) TestManager_RestartsOnMissedCheckins() { }, }, } + runPath := paths.Run() + require.NoError(t, comp.PrepareWorkDir(runPath)) + t.Cleanup(func() { + assert.NoError(t, comp.RemoveWorkDir(runPath)) + }) subCtx, subCancel := context.WithCancel(context.Background()) defer subCancel() @@ -2137,6 +2215,11 @@ func (suite *FakeInputSuite) TestManager_InvalidAction() { }, }, } + runPath := paths.Run() + require.NoError(t, comp.PrepareWorkDir(runPath)) + t.Cleanup(func() { + assert.NoError(t, comp.RemoveWorkDir(runPath)) + }) subCtx, subCancel := context.WithCancel(context.Background()) defer subCancel() @@ -2154,9 +2237,10 @@ func (suite *FakeInputSuite) TestManager_InvalidAction() { } else { unit, ok := state.Units[ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "fake-input"}] if ok { - if unit.State == client.UnitStateFailed { + switch unit.State { + case client.UnitStateFailed: subErrCh <- fmt.Errorf("unit failed: %s", unit.Message) - } else if unit.State == client.UnitStateHealthy { + case client.UnitStateHealthy: actionCtx, actionCancel := context.WithTimeout(context.Background(), 5*time.Second) _, err := m.PerformAction(actionCtx, comp, comp.Units[0], "invalid_missing_action", nil) actionCancel() @@ -2167,9 +2251,9 @@ func (suite *FakeInputSuite) TestManager_InvalidAction() { } else { subErrCh <- nil } - } else if unit.State == client.UnitStateStarting { + case client.UnitStateStarting: // acceptable - } else { + default: // unknown state that should not have occurred subErrCh <- fmt.Errorf("unit reported unexpected state: %v", unit.State) } @@ -2349,6 +2433,13 @@ func (suite *FakeInputSuite) TestManager_MultiComponent() { }, }, } + runPath := paths.Run() + for _, c := range components { + require.NoError(t, c.PrepareWorkDir(runPath)) + t.Cleanup(func() { + assert.NoError(t, c.RemoveWorkDir(runPath)) + }) + } subCtx, subCancel := context.WithCancel(context.Background()) defer subCancel() @@ -2476,6 +2567,11 @@ func (suite *FakeInputSuite) TestManager_LogLevel() { }, }, } + runPath := paths.Run() + require.NoError(t, comp.PrepareWorkDir(runPath)) + t.Cleanup(func() { + assert.NoError(t, comp.RemoveWorkDir(runPath)) + }) subCtx, subCancel := context.WithCancel(context.Background()) defer subCancel() @@ -2493,9 +2589,10 @@ func (suite *FakeInputSuite) TestManager_LogLevel() { } else { unit, ok := state.Units[ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "fake-input"}] if ok { - if unit.State == client.UnitStateFailed { + switch unit.State { + case client.UnitStateFailed: subErrCh <- fmt.Errorf("unit failed: %s", unit.Message) - } else if unit.State == client.UnitStateHealthy { + case client.UnitStateHealthy: updatedComp := comp updatedComp.Units = make([]component.Unit, len(comp.Units)) copy(updatedComp.Units, comp.Units) @@ -2520,9 +2617,9 @@ func (suite *FakeInputSuite) TestManager_LogLevel() { } else { subErrCh <- nil } - } else if unit.State == client.UnitStateStarting { + case client.UnitStateStarting: // acceptable - } else { + default: // unknown state that should not have occurred subErrCh <- fmt.Errorf("unit reported unexpected state: %v", unit.State) } @@ -2677,6 +2774,19 @@ func (suite *FakeInputSuite) TestManager_StartStopComponent() { }, }, } + runPath := paths.Run() + for _, c := range components { + require.NoError(t, c.PrepareWorkDir(runPath)) + t.Cleanup(func() { + assert.NoError(t, c.RemoveWorkDir(runPath)) + }) + } + for _, c := range components2 { + require.NoError(t, c.PrepareWorkDir(runPath)) + t.Cleanup(func() { + assert.NoError(t, c.RemoveWorkDir(runPath)) + }) + } select { case err := <-managerErrCh: @@ -2824,6 +2934,12 @@ func (suite *FakeInputSuite) TestManager_Chunk() { Units: units, } + runPath := paths.Run() + require.NoError(t, comp.PrepareWorkDir(runPath)) + t.Cleanup(func() { + assert.NoError(t, comp.RemoveWorkDir(runPath)) + }) + waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) defer waitCancel() if err := waitForReady(waitCtx, m); err != nil { @@ -2846,15 +2962,16 @@ func (suite *FakeInputSuite) TestManager_Chunk() { healthyCount := 0 stoppedCount := 0 for _, unit := range state.Units { - if unit.State == client.UnitStateFailed { + switch unit.State { + case client.UnitStateFailed: subErrCh <- fmt.Errorf("unit failed: %s", unit.Message) - } else if unit.State == client.UnitStateHealthy { + case client.UnitStateHealthy: healthyCount += 1 - } else if unit.State == client.UnitStateStopped { + case client.UnitStateStopped: stoppedCount += 1 - } else if unit.State == client.UnitStateStarting { + case client.UnitStateStarting: // acceptable - } else { + default: // unknown state that should not have occurred subErrCh <- fmt.Errorf("unit reported unexpected state: %v", unit.State) } @@ -2901,10 +3018,6 @@ LOOP: err = <-errCh require.NoError(t, err) - - workDir := filepath.Join(paths.Run(), comp.ID) - _, err = os.Stat(workDir) - require.ErrorIs(t, err, os.ErrNotExist) } func signalState(subErrCh chan error, state *ComponentState, acceptableStates []client.UnitState) { diff --git a/testing/integration/ess/beat_receivers_test.go b/testing/integration/ess/beat_receivers_test.go index 5a42e6d4f5a..cfd3af140b8 100644 --- a/testing/integration/ess/beat_receivers_test.go +++ b/testing/integration/ess/beat_receivers_test.go @@ -14,6 +14,7 @@ import ( "io" "net/http" "os" + "path/filepath" "runtime" "strings" "testing" @@ -21,6 +22,7 @@ import ( "time" "github.com/elastic/elastic-agent/pkg/component" + "github.com/elastic/elastic-agent/pkg/testing/tools/fleettools" "github.com/elastic/go-elasticsearch/v8" "github.com/gofrs/uuid/v5" @@ -34,6 +36,7 @@ import ( "github.com/elastic/elastic-agent/pkg/testing/define" "github.com/elastic/elastic-agent/pkg/testing/tools/testcontext" "github.com/elastic/elastic-agent/testing/integration" + "github.com/elastic/go-elasticsearch/v8/esapi" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -251,7 +254,6 @@ func TestClassicAndReceiverAgentMonitoring(t *testing.T) { status, statusErr := classicFixture.ExecStatus(ctx) assert.NoError(collect, statusErr) assertBeatsHealthy(collect, &status, component.ProcessRuntimeManager, 3) - return }, 1*time.Minute, 1*time.Second) // 2. Assert monitoring logs and metrics are available on ES @@ -322,7 +324,6 @@ func TestClassicAndReceiverAgentMonitoring(t *testing.T) { status, statusErr := beatReceiverFixture.ExecStatus(ctx) assert.NoError(collect, statusErr) assertBeatsHealthy(collect, &status, component.OtelRuntimeManager, 4) - return }, 1*time.Minute, 1*time.Second) // 5. Assert monitoring logs and metrics are available on ES (for otel mode) @@ -785,7 +786,6 @@ agent.monitoring.enabled: false status, statusErr := fixture.ExecStatus(ctx) require.NoError(collect, statusErr) assertBeatsReady(collect, &status, component.ProcessRuntimeManager) - return }, 2*time.Minute, 5*time.Second) // change configuration and wait until the beats receiver is healthy @@ -797,7 +797,6 @@ agent.monitoring.enabled: false status, statusErr := fixture.ExecStatus(ctx) require.NoError(collect, statusErr) assertBeatsReady(collect, &status, component.OtelRuntimeManager) - return }, 2*time.Minute, 5*time.Second) logsBytes, err := fixture.Exec(ctx, []string{"logs", "-n", "1000", "--exclude-events"}) @@ -874,7 +873,6 @@ agent.monitoring.enabled: false assert.NoError(collect, statusErr) // we should be running beats processes even though the otel runtime was requested assertBeatsHealthy(collect, &status, component.ProcessRuntimeManager, 1) - return }, 1*time.Minute, 1*time.Second) logsBytes, err := fixture.Exec(ctx, []string{"logs", "-n", "1000", "--exclude-events"}) require.NoError(t, err) @@ -911,6 +909,138 @@ agent.monitoring.enabled: false assert.Equal(t, expectedMessage, message) } +// TestComponentWorkDir verifies that the component working directory is not deleted when moving the component from +// the process runtime to the otel runtime. +func TestComponentWorkDir(t *testing.T) { + _ = define.Require(t, define.Requirements{ + Group: integration.Default, + Local: true, + Sudo: true, + OS: []define.OS{ + {Type: define.Windows}, + {Type: define.Linux}, + {Type: define.Darwin}, + }, + Stack: nil, + }) + + type configOptions struct { + RuntimeExperimental string + } + configTemplate := `agent.logging.level: info +agent.logging.to_stderr: true +agent.logging.to_files: false +inputs: + # Collecting system metrics + - type: system/metrics + id: unique-system-metrics-input + _runtime_experimental: {{.RuntimeExperimental}} + streams: + - metricsets: + - cpu +outputs: + default: + type: elasticsearch + hosts: [http://localhost:9200] + api_key: placeholder +agent.monitoring.enabled: false +` + + var configBuffer bytes.Buffer + require.NoError(t, + template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer, + configOptions{ + RuntimeExperimental: string(component.ProcessRuntimeManager), + })) + processConfig := configBuffer.Bytes() + require.NoError(t, + template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer, + configOptions{ + RuntimeExperimental: string(component.OtelRuntimeManager), + })) + receiverConfig := configBuffer.Bytes() + // this is the context for the whole test, with a global timeout defined + ctx, cancel := testcontext.WithDeadline(t, t.Context(), time.Now().Add(5*time.Minute)) + defer cancel() + + // set up a standalone agent + fixture, err := define.NewFixtureFromLocalBuild(t, define.Version()) + require.NoError(t, err) + + err = fixture.Prepare(ctx) + require.NoError(t, err) + err = fixture.Configure(ctx, processConfig) + require.NoError(t, err) + + output, err := fixture.Install(ctx, &atesting.InstallOpts{Privileged: true, Force: true}) + require.NoError(t, err, "failed to install agent: %s", output) + + var componentID, componentWorkDir string + var workDirCreated time.Time + + // wait for component to appear in status + require.EventuallyWithT(t, func(collect *assert.CollectT) { + var statusErr error + status, statusErr := fixture.ExecStatus(ctx) + require.NoError(collect, statusErr) + require.Equal(collect, 1, len(status.Components)) + componentStatus := status.Components[0] + componentID = componentStatus.ID + }, 2*time.Minute, 5*time.Second) + + runDir, err := atesting.FindRunDir(fixture) + require.NoError(t, err) + + componentWorkDir = filepath.Join(runDir, componentID) + stat, err := os.Stat(componentWorkDir) + require.NoError(t, err, "component working directory should exist") + assert.True(t, stat.IsDir(), "component working directory should exist") + workDirCreated = stat.ModTime() + + // change configuration and wait until the beats receiver is present in status + err = fixture.Configure(ctx, receiverConfig) + require.NoError(t, err) + + require.EventuallyWithT(t, func(collect *assert.CollectT) { + var statusErr error + status, statusErr := fixture.ExecStatus(ctx) + require.NoError(collect, statusErr) + require.Equal(collect, 1, len(status.Components)) + componentStatus := status.Components[0] + assert.Equal(collect, "beats-receiver", componentStatus.VersionInfo.Name) + }, 2*time.Minute, 5*time.Second) + + // the component working directory should still exist + stat, err = os.Stat(componentWorkDir) + require.NoError(t, err, "component working directory should exist") + assert.True(t, stat.IsDir(), "component working directory should exist") + assert.Equal(t, workDirCreated, stat.ModTime(), "component working directory shouldn't have been modified") + + configNoComponents := `agent.logging.level: info +agent.logging.to_stderr: true +agent.logging.to_files: false +inputs: [] +outputs: + default: + type: elasticsearch + hosts: [http://localhost:9200] + api_key: placeholder +agent.monitoring.enabled: false +` + err = fixture.Configure(ctx, []byte(configNoComponents)) + require.NoError(t, err) + + require.EventuallyWithT(t, func(collect *assert.CollectT) { + var statusErr error + status, statusErr := fixture.ExecStatus(ctx) + require.NoError(collect, statusErr) + require.Equal(collect, 0, len(status.Components)) + }, 2*time.Minute, 5*time.Second) + + // the component working directory shouldn't exist anymore + require.NoDirExists(t, componentWorkDir) +} + func assertCollectorComponentsHealthy(t *assert.CollectT, status *atesting.AgentStatusCollectorOutput) { assert.Equal(t, int(cproto.CollectorComponentStatus_StatusOK), status.Status, "component status should be ok") assert.Equal(t, "", status.Error, "component status should not have an error") @@ -1219,3 +1349,281 @@ func setStrictMapping(client *elasticsearch.Client, index string) error { } return nil } + +// TestMonitoringNoDuplicates checks to see if switching to otel +// runtime re-ingests logs. Also checks to make sure restarting +// elastic-agent when using otel runtime for monitoring doesn't +// re-ingest logs. +// +// Flow +// 1. Create policy in Kibana with just monitoring and "process" runtime +// 2. Install and Enroll +// 3. Switch to monitoring "otel" runtime +// 4. restart agent 3 times, making sure healthy between restarts +// 5. switch back to "process" runtime +// 6. query ES for monitoring logs with aggregation on fingerprint and line number, +// ideally 0 duplicates but possible to have a small number +// 7. uninstall +func TestMonitoringNoDuplicates(t *testing.T) { + info := define.Require(t, define.Requirements{ + Group: integration.Default, + Local: true, + OS: []define.OS{ + {Type: define.Linux}, + {Type: define.Darwin}, + {Type: define.Windows}, + }, + Stack: &define.Stack{}, + Sudo: true, + }) + + ctx, cancel := testcontext.WithDeadline(t, + context.Background(), + time.Now().Add(5*time.Minute)) + t.Cleanup(cancel) + + policyName := fmt.Sprintf("%s-%s", t.Name(), uuid.Must(uuid.NewV4()).String()) + createPolicyReq := kibana.AgentPolicy{ + Name: policyName, + Namespace: info.Namespace, + Description: fmt.Sprintf("%s policy", t.Name()), + MonitoringEnabled: []kibana.MonitoringEnabledOption{ + kibana.MonitoringEnabledLogs, + kibana.MonitoringEnabledMetrics, + }, + Overrides: map[string]any{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "_runtime_experimental": "process", + }, + }, + }, + } + policyResponse, err := info.KibanaClient.CreatePolicy(ctx, createPolicyReq) + require.NoError(t, err, "error creating policy") + + enrollmentToken, err := info.KibanaClient.CreateEnrollmentAPIKey(ctx, + kibana.CreateEnrollmentAPIKeyRequest{ + PolicyID: policyResponse.ID, + }) + + fut, err := define.NewFixtureFromLocalBuild(t, define.Version()) + require.NoError(t, err) + + err = fut.Prepare(ctx) + require.NoError(t, err) + + fleetServerURL, err := fleettools.DefaultURL(ctx, info.KibanaClient) + require.NoError(t, err, "failed getting Fleet Server URL") + + installOpts := atesting.InstallOpts{ + NonInteractive: true, + Privileged: true, + Force: true, + EnrollOpts: atesting.EnrollOpts{ + URL: fleetServerURL, + EnrollmentToken: enrollmentToken.APIKey, + }, + } + combinedOutput, err := fut.Install(ctx, &installOpts) + require.NoErrorf(t, err, "error install with enroll: %s\ncombinedoutput:\n%s", err, string(combinedOutput)) + + // store timestamp to filter duplicate docs with timestamp greater than this value + installTimestamp := time.Now().UTC().Format("2006-01-02T15:04:05.000Z") + + healthCheck := func(ctx context.Context, message string, runtime component.RuntimeManager, componentCount int, timestamp string) { + require.EventuallyWithT(t, func(collect *assert.CollectT) { + var statusErr error + status, statusErr := fut.ExecStatus(ctx) + assert.NoError(collect, statusErr) + assertBeatsHealthy(collect, &status, runtime, componentCount) + }, 1*time.Minute, 1*time.Second) + require.Eventuallyf(t, + func() bool { + findCtx, findCancel := context.WithTimeout(ctx, 10*time.Second) + defer findCancel() + mustClauses := []map[string]any{ + {"match_phrase": map[string]any{"message": message}}, + {"match": map[string]any{"data_stream.type": "logs"}}, + {"match": map[string]any{"data_stream.dataset": "elastic_agent"}}, + {"match": map[string]any{"data_stream.namespace": info.Namespace}}, + } + rawQuery := map[string]any{ + "query": map[string]any{ + "bool": map[string]any{ + "must": mustClauses, + "filter": map[string]any{"range": map[string]any{"@timestamp": map[string]any{"gte": timestamp}}}, + }, + }, + "sort": []map[string]any{ + {"@timestamp": map[string]any{"order": "asc"}}, + }, + } + docs, err := estools.PerformQueryForRawQuery(findCtx, rawQuery, "logs-*", info.ESClient) + require.NoError(t, err) + return docs.Hits.Total.Value > 0 + }, + 4*time.Minute, 5*time.Second, + "health check failed: timestamp: %s", timestamp) + } + + // make sure running and logs are making it to ES + healthCheck(ctx, + "control checkin v2 protocol has chunking enabled", + component.ProcessRuntimeManager, + 3, + installTimestamp) + + // Switch to otel monitoring + otelMonUpdateReq := kibana.AgentPolicyUpdateRequest{ + Name: policyName, + Namespace: info.Namespace, + Overrides: map[string]any{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "_runtime_experimental": "otel", + }, + }, + }, + } + + otelMonResp, err := info.KibanaClient.UpdatePolicy(ctx, + policyResponse.ID, otelMonUpdateReq) + require.NoError(t, err) + + otelTimestamp := time.Now().UTC().Format("2006-01-02T15:04:05.000Z") + + // wait until policy is applied + policyCheck := func(expectedRevision int) { + require.Eventually(t, func() bool { + inspectOutput, err := fut.ExecInspect(ctx) + require.NoError(t, err) + return expectedRevision == inspectOutput.Revision + }, 3*time.Minute, 1*time.Second) + } + policyCheck(otelMonResp.Revision) + + // make sure running and logs are making it to ES + healthCheck(ctx, + "Everything is ready. Begin running and processing data.", + component.OtelRuntimeManager, + 4, + otelTimestamp) + + // restart 3 times, checks path definition is stable + for range 3 { + restartTimestamp := time.Now().UTC().Format("2006-01-02T15:04:05.000Z") + restartBytes, err := fut.Exec(ctx, []string{"restart"}) + require.NoErrorf(t, + err, + "Restart error: %s, output was: %s", + err, + string(restartBytes)) + healthCheck(ctx, + "Everything is ready. Begin running and processing data.", + component.OtelRuntimeManager, + 4, + restartTimestamp) + } + + // Switch back to process monitoring + processMonUpdateReq := kibana.AgentPolicyUpdateRequest{ + Name: policyName, + Namespace: info.Namespace, + Overrides: map[string]any{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "_runtime_experimental": "process", + }, + }, + }, + } + + processMonResp, err := info.KibanaClient.UpdatePolicy(ctx, + policyResponse.ID, processMonUpdateReq) + require.NoError(t, err) + + processTimestamp := time.Now().UTC().Format("2006-01-02T15:04:05.000Z") + + // wait until policy is applied + policyCheck(processMonResp.Revision) + + // make sure running and logs are making it to ES + healthCheck(ctx, + "control checkin v2 protocol has chunking enabled", + component.ProcessRuntimeManager, + 3, + processTimestamp) + + // duplicate check + rawQuery := map[string]any{ + "runtime_mappings": map[string]any{ + "log.offset": map[string]any{ + "type": "keyword", + }, + "log.file.fingerprint": map[string]any{ + "type": "keyword", + }, + }, + "query": map[string]any{ + "bool": map[string]any{ + "must": []map[string]any{ + {"match": map[string]any{"data_stream.type": "logs"}}, + {"match": map[string]any{"data_stream.dataset": "elastic_agent"}}, + {"match": map[string]any{"data_stream.namespace": info.Namespace}}, + }, + "filter": map[string]any{"range": map[string]any{"@timestamp": map[string]any{"gte": installTimestamp}}}, + }, + }, + "aggs": map[string]any{ + "duplicates": map[string]any{ + "multi_terms": map[string]any{ + "size": 500, + "min_doc_count": 2, + "terms": []map[string]any{ + {"field": "log.file.fingerprint"}, + {"field": "log.offset"}, + }, + }, + }, + }, + } + var buf bytes.Buffer + err = json.NewEncoder(&buf).Encode(rawQuery) + require.NoError(t, err) + + es := esapi.New(info.ESClient) + res, err := es.Search( + es.Search.WithIndex("logs-*"), + es.Search.WithSize(0), + es.Search.WithBody(&buf), + es.Search.WithPretty(), + es.Search.WithContext(ctx), + ) + require.NoError(t, err) + require.Falsef(t, (res.StatusCode >= http.StatusMultipleChoices || res.StatusCode < http.StatusOK), "status should be 2xx was: %d", res.StatusCode) + resultBuf, err := io.ReadAll(res.Body) + require.NoError(t, err) + + aggResults := map[string]any{} + err = json.Unmarshal(resultBuf, &aggResults) + aggs, ok := aggResults["aggregations"].(map[string]any) + require.Truef(t, ok, "'aggregations' wasn't a map[string]any, result was %s", string(resultBuf)) + dups, ok := aggs["duplicates"].(map[string]any) + require.Truef(t, ok, "'duplicates' wasn't a map[string]any, result was %s", string(resultBuf)) + buckets, ok := dups["buckets"].([]any) + require.Truef(t, ok, "'buckets' wasn't a []any, result was %s", string(resultBuf)) + + hits, ok := aggResults["hits"].(map[string]any) + require.Truef(t, ok, "'hits' wasn't a map[string]any, result was %s", string(resultBuf)) + total, ok := hits["total"].(map[string]any) + require.Truef(t, ok, "'total' wasn't a map[string]any, result was %s", string(resultBuf)) + value, ok := total["value"].(float64) + require.Truef(t, ok, "'total' wasn't an int, result was %s", string(resultBuf)) + + require.Equalf(t, 0, len(buckets), "len(buckets): %d, hits.total.value: %d, result was %s", len(buckets), value, string(resultBuf)) + + // Uninstall + combinedOutput, err = fut.Uninstall(ctx, &atesting.UninstallOpts{Force: true}) + require.NoErrorf(t, err, "error uninstalling beat receiver agent monitoring, err: %s, combined output: %s", err, string(combinedOutput)) +} diff --git a/testing/integration/ess/endpoint_security_test.go b/testing/integration/ess/endpoint_security_test.go index d2504522832..35cfac59fd7 100644 --- a/testing/integration/ess/endpoint_security_test.go +++ b/testing/integration/ess/endpoint_security_test.go @@ -714,10 +714,11 @@ func testInstallWithEndpointSecurityAndRemoveEndpointIntegration(t *testing.T, i // Verify that the Endpoint directory was correctly removed. // Regression test for https://github.com/elastic/elastic-agent/issues/3077 agentInstallPath := fixture.WorkDir() - files, err := os.ReadDir(filepath.Clean(filepath.Join(agentInstallPath, ".."))) + elasticInstallPath := filepath.Clean(filepath.Join(agentInstallPath, "..")) + files, err := os.ReadDir(elasticInstallPath) require.NoError(t, err) - t.Logf("Checking directories at install path %s", agentInstallPath) + t.Logf("Checking directories at install path %s", elasticInstallPath) for _, f := range files { if !f.IsDir() { continue