Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# Change summary; a 80ish characters long description of the change.
summary: Fix issue where switching to OTEL runtime would cause data to be re-ingested

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: elastic-agent

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
#pr: https://github.com/owner/repo/1234

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
#issue: https://github.com/owner/repo/1234
35 changes: 24 additions & 11 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,10 +941,15 @@ func (c *Coordinator) watchRuntimeComponents(
state := make(map[string]runtime.ComponentState)

for {
var componentStates []runtime.ComponentComponentState
select {
case <-ctx.Done():
return
case componentState := <-runtimeComponentStates:
componentStates = append(componentStates, componentState)
case componentStates = <-otelComponentStates:
}
for _, componentState := range componentStates {
logComponentStateChange(c.logger, state, &componentState)
// Forward the final changes back to Coordinator, unless our context
// has ended.
Expand All @@ -953,21 +958,23 @@ func (c *Coordinator) watchRuntimeComponents(
case <-ctx.Done():
return
}
case componentStates := <-otelComponentStates:
for _, componentState := range componentStates {
logComponentStateChange(c.logger, state, &componentState)
// Forward the final changes back to Coordinator, unless our context
// has ended.
select {
case c.managerChans.runtimeManagerUpdate <- componentState:
case <-ctx.Done():
return
}
}
}
}
}

// ensureComponentWorkDirs ensures the component working directories exist for current components. This method is
// idempotent.
func (c *Coordinator) ensureComponentWorkDirs() error {
for _, comp := range c.componentModel {
c.logger.Debugf("Ensuring a working directory exists for component: %s", comp.ID)
err := comp.PrepareWorkDir(paths.Run())
if err != nil {
return fmt.Errorf("preparing a working directory for component %s failed: %w", comp.ID, err)
}
}
return nil
}

// logComponentStateChange emits a log message based on the new component state.
func logComponentStateChange(
logger *logger.Logger,
Expand Down Expand Up @@ -1750,6 +1757,12 @@ func (c *Coordinator) refreshComponentModel(ctx context.Context) (err error) {
return fmt.Errorf("generating component model: %w", err)
}

// ensure all components have working directories
err = c.ensureComponentWorkDirs()
if err != nil {
return fmt.Errorf("ensuring component work dirs exists: %w", err)
}

signed, err := component.SignedFromPolicy(c.derivedConfig)
if err != nil {
if !errors.Is(err, component.ErrNotFound) {
Expand Down
15 changes: 15 additions & 0 deletions internal/pkg/agent/application/coordinator/coordinator_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@ package coordinator

import (
"fmt"
"slices"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
"go.opentelemetry.io/collector/component/componentstatus"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/pkg/component"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-libs/logp"

Expand Down Expand Up @@ -176,6 +180,17 @@ func (c *Coordinator) applyComponentState(state runtime.ComponentComponentState)
break
}
}
// If the component was stopped and isn't listed among components the coordinator knows about, it's safe to
// remove its working directory. Otherwise, we may just be moving it between runtimes.
if !slices.ContainsFunc(c.componentModel, func(c component.Component) bool {
return state.Component.ID == c.ID
}) {
c.logger.Debugf("removing working directory for component '%s'", state.Component.ID)
err := state.Component.RemoveWorkDir(paths.Run())
if err != nil {
c.logger.Warnf("failed to remove workdir for component %s: %v", state.Component.ID, err)
}
}
}

c.stateNeedsRefresh = true
Expand Down
168 changes: 168 additions & 0 deletions internal/pkg/agent/application/coordinator/coordinator_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,174 @@ service:

}

func TestCoordinatorManagesComponentWorkDirs(t *testing.T) {
// Send a test policy to the Coordinator as a Config Manager update,
// verify it creates a working directory for the component, keeps that working directory as the component
// moves to a different runtime, then deletes it after the component is stopped.
top := paths.Top()
paths.SetTop(t.TempDir())
t.Cleanup(func() {
paths.SetTop(top)
})

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
logger := logp.NewLogger("testing")

configChan := make(chan ConfigChange, 1)
updateChan := make(chan runtime.ComponentComponentState, 1)

// Create a mocked runtime manager that will report the update call
runtimeManager := &fakeRuntimeManager{}
otelManager := &fakeOTelManager{}

// we need the filestream spec to be able to convert to Otel config
componentSpec := component.InputRuntimeSpec{
InputType: "filestream",
BinaryName: "agentbeat",
Spec: component.InputSpec{
Name: "filestream",
Command: &component.CommandSpec{
Args: []string{"filebeat"},
},
Platforms: []string{
"linux/amd64",
"linux/arm64",
"darwin/amd64",
"darwin/arm64",
"windows/amd64",
"container/amd64",
"container/arm64",
},
},
}

platform, err := component.LoadPlatformDetail()
require.NoError(t, err)
specs, err := component.NewRuntimeSpecs(platform, []component.InputRuntimeSpec{componentSpec})
require.NoError(t, err)

monitoringMgr := newTestMonitoringMgr()
coord := &Coordinator{
logger: logger,
agentInfo: &info.AgentInfo{},
stateBroadcaster: broadcaster.New(State{}, 0, 0),
managerChans: managerChans{
configManagerUpdate: configChan,
runtimeManagerUpdate: updateChan,
},
monitorMgr: monitoringMgr,
runtimeMgr: runtimeManager,
otelMgr: otelManager,
specs: specs,
vars: emptyVars(t),
componentPIDTicker: time.NewTicker(time.Second * 30),
secretMarkerFunc: testSecretMarkerFunc,
}

var workDirPath string
var workDirCreated time.Time

t.Run("run in process manager", func(t *testing.T) {
// Create a policy with one input and one output (no otel configuration)
cfg := config.MustNewConfigFrom(`
outputs:
default:
type: elasticsearch
hosts:
- localhost:9200
inputs:
- id: test-input
type: filestream
use_output: default
_runtime_experimental: process
`)

// Send the policy change and make sure it was acknowledged.
cfgChange := &configChange{cfg: cfg}
configChan <- cfgChange
coord.runLoopIteration(ctx)
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")
assert.NoError(t, cfgChange.err, "config processing shouldn't report an error")
require.Len(t, coord.componentModel, 1, "there should be one component")
workDirPath = coord.componentModel[0].WorkDirPath(paths.Run())
stat, err := os.Stat(workDirPath)
require.NoError(t, err, "component working directory should exist")
assert.True(t, stat.IsDir(), "component working directory should exist")
workDirCreated = stat.ModTime()
})

t.Run("run in otel manager", func(t *testing.T) {
// Create a policy with one input and one output (no otel configuration)
cfg := config.MustNewConfigFrom(`
outputs:
default:
type: elasticsearch
hosts:
- localhost:9200
inputs:
- id: test-input
type: filestream
use_output: default
_runtime_experimental: otel
`)

// Send the policy change and make sure it was acknowledged.
cfgChange := &configChange{cfg: cfg}
configChan <- cfgChange
coord.runLoopIteration(ctx)
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")
assert.NoError(t, cfgChange.err, "config processing shouldn't report an error")
require.Len(t, coord.componentModel, 1, "there should be one component")
compState := runtime.ComponentComponentState{
Component: component.Component{
ID: "filestream-default",
},
State: runtime.ComponentState{
State: client.UnitStateStopped,
},
}
updateChan <- compState
coord.runLoopIteration(ctx)
stat, err := os.Stat(workDirPath)
require.NoError(t, err, "component working directory should exist")
assert.True(t, stat.IsDir(), "component working directory should exist")
assert.Equal(t, workDirCreated, stat.ModTime(), "component working directory shouldn't have been modified")
})
t.Run("remove component", func(t *testing.T) {
// Create a policy with one input and one output (no otel configuration)
cfg := config.MustNewConfigFrom(`
outputs:
default:
type: elasticsearch
hosts:
- localhost:9200
inputs: []
`)

// Send the policy change and make sure it was acknowledged.
cfgChange := &configChange{cfg: cfg}
configChan <- cfgChange
coord.runLoopIteration(ctx)
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")
assert.NoError(t, cfgChange.err, "config processing shouldn't report an error")
require.Len(t, coord.componentModel, 0, "there should be one component")

compState := runtime.ComponentComponentState{
Component: component.Component{
ID: "filestream-default",
},
State: runtime.ComponentState{
State: client.UnitStateStopped,
},
}
updateChan <- compState
coord.runLoopIteration(ctx)
assert.NoDirExists(t, workDirPath, "component working directory shouldn't exist anymore")
})

}

func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) {
// Set a one-second timeout -- nothing here should block, but if it
// does let's report a failure instead of timing out the test runner.
Expand Down
42 changes: 42 additions & 0 deletions pkg/component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
"errors"
"fmt"
"maps"
"os"
"path/filepath"
"runtime"
"slices"
"sort"
"strings"
Expand Down Expand Up @@ -39,6 +42,7 @@ const (
defaultUnitLogLevel = client.UnitLogLevelInfo
headersKey = "headers"
elasticsearchType = "elasticsearch"
workDirPathMod = 0770
ProcessRuntimeManager = RuntimeManager("process")
OtelRuntimeManager = RuntimeManager("otel")
DefaultRuntimeManager RuntimeManager = ProcessRuntimeManager
Expand Down Expand Up @@ -238,6 +242,44 @@ func (c *Component) BinaryName() string {
return ""
}

// WorkDirName returns the name of the component's working directory.
func (c *Component) WorkDirName() string {
return c.ID
}

// WorkDirPath returns the full path of the component's working directory, placing it under the provided parent path.
func (c *Component) WorkDirPath(parentDirPath string) string {
return filepath.Join(parentDirPath, c.WorkDirName())
}

// PrepareWorkDir prepares the component working directory under the provided parent path. This involves creating
// it under the right ownership and ACLs. This method is idempotent.
func (c *Component) PrepareWorkDir(parentDirPath string) error {
uid, gid := os.Geteuid(), os.Getegid()
path := c.WorkDirPath(parentDirPath)
err := os.MkdirAll(path, workDirPathMod)
if err != nil {
return fmt.Errorf("failed to create path %q: %w", path, err)
}
if runtime.GOOS == Windows {
return nil
}
err = os.Chown(path, uid, gid)
if err != nil {
return fmt.Errorf("failed to chown %q: %w", path, err)
}
err = os.Chmod(path, workDirPathMod)
if err != nil {
return fmt.Errorf("failed to chmod %q: %w", path, err)
}
return nil
}

// RemoveWorkDir removes the component working directory under the provided parent path. This method is idempotent.
func (c *Component) RemoveWorkDir(parentDirPath string) error {
return os.RemoveAll(c.WorkDirPath(parentDirPath))
}

// Model is the components model with signed policy data
// This replaces former top level []Components with the top Model that captures signed policy data.
// The signed data is a part of the policy since 8.8.0 release and contains the signed policy fragments and the signature that can be validated.
Expand Down
Loading
Loading