Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# Change summary; a 80ish characters long description of the change.
summary: Move component working directory management to coordinator

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: elastic-agent

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
#pr: https://github.com/owner/repo/1234

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
#issue: https://github.com/owner/repo/1234
35 changes: 24 additions & 11 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,10 +946,15 @@ func (c *Coordinator) watchRuntimeComponents(
state := make(map[string]runtime.ComponentState)

for {
var componentStates []runtime.ComponentComponentState
select {
case <-ctx.Done():
return
case componentState := <-runtimeComponentStates:
componentStates = append(componentStates, componentState)
case componentStates = <-otelComponentStates:
}
for _, componentState := range componentStates {
logComponentStateChange(c.logger, state, &componentState)
// Forward the final changes back to Coordinator, unless our context
// has ended.
Expand All @@ -958,21 +963,23 @@ func (c *Coordinator) watchRuntimeComponents(
case <-ctx.Done():
return
}
case componentStates := <-otelComponentStates:
for _, componentState := range componentStates {
logComponentStateChange(c.logger, state, &componentState)
// Forward the final changes back to Coordinator, unless our context
// has ended.
select {
case c.managerChans.runtimeManagerUpdate <- componentState:
case <-ctx.Done():
return
}
}
}
}
}

// ensureComponentWorkDirs ensures the component working directories exist for current components. This method is
// idempotent.
func (c *Coordinator) ensureComponentWorkDirs() error {
for _, comp := range c.componentModel {
c.logger.Debugf("Ensuring a working directory exists for component: %s", comp.ID)
err := comp.PrepareWorkDir(paths.Run())
if err != nil {
return fmt.Errorf("preparing a working directory for component %s failed: %w", comp.ID, err)
}
}
return nil
}

// logComponentStateChange emits a log message based on the new component state.
func logComponentStateChange(
logger *logger.Logger,
Expand Down Expand Up @@ -1755,6 +1762,12 @@ func (c *Coordinator) refreshComponentModel(ctx context.Context) (err error) {
return fmt.Errorf("generating component model: %w", err)
}

// ensure all components have working directories
err = c.ensureComponentWorkDirs()
if err != nil {
return fmt.Errorf("ensuring component work dirs exists: %w", err)
}

signed, err := component.SignedFromPolicy(c.derivedConfig)
if err != nil {
if !errors.Is(err, component.ErrNotFound) {
Expand Down
15 changes: 15 additions & 0 deletions internal/pkg/agent/application/coordinator/coordinator_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@ package coordinator

import (
"fmt"
"slices"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
"go.opentelemetry.io/collector/component/componentstatus"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/pkg/component"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-libs/logp"

Expand Down Expand Up @@ -176,6 +180,17 @@ func (c *Coordinator) applyComponentState(state runtime.ComponentComponentState)
break
}
}
// If the component was stopped and isn't listed among components the coordinator knows about, it's safe to
// remove its working directory. Otherwise, we may just be moving it between runtimes.
if !slices.ContainsFunc(c.componentModel, func(c component.Component) bool {
return state.Component.ID == c.ID
}) {
c.logger.Debugf("removing working directory for component '%s'", state.Component.ID)
err := state.Component.RemoveWorkDir(paths.Run())
if err != nil {
c.logger.Warnf("failed to remove workdir for component %s: %v", state.Component.ID, err)
}
}
}

c.stateNeedsRefresh = true
Expand Down
168 changes: 168 additions & 0 deletions internal/pkg/agent/application/coordinator/coordinator_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1152,6 +1152,174 @@ service:

}

func TestCoordinatorManagesComponentWorkDirs(t *testing.T) {
// Send a test policy to the Coordinator as a Config Manager update,
// verify it creates a working directory for the component, keeps that working directory as the component
// moves to a different runtime, then deletes it after the component is stopped.
top := paths.Top()
paths.SetTop(t.TempDir())
t.Cleanup(func() {
paths.SetTop(top)
})

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
logger := logp.NewLogger("testing")

configChan := make(chan ConfigChange, 1)
updateChan := make(chan runtime.ComponentComponentState, 1)

// Create a mocked runtime manager that will report the update call
runtimeManager := &fakeRuntimeManager{}
otelManager := &fakeOTelManager{}

// we need the filestream spec to be able to convert to Otel config
componentSpec := component.InputRuntimeSpec{
InputType: "filestream",
BinaryName: "agentbeat",
Spec: component.InputSpec{
Name: "filestream",
Command: &component.CommandSpec{
Args: []string{"filebeat"},
},
Platforms: []string{
"linux/amd64",
"linux/arm64",
"darwin/amd64",
"darwin/arm64",
"windows/amd64",
"container/amd64",
"container/arm64",
},
},
}

platform, err := component.LoadPlatformDetail()
require.NoError(t, err)
specs, err := component.NewRuntimeSpecs(platform, []component.InputRuntimeSpec{componentSpec})
require.NoError(t, err)

monitoringMgr := newTestMonitoringMgr()
coord := &Coordinator{
logger: logger,
agentInfo: &info.AgentInfo{},
stateBroadcaster: broadcaster.New(State{}, 0, 0),
managerChans: managerChans{
configManagerUpdate: configChan,
runtimeManagerUpdate: updateChan,
},
monitorMgr: monitoringMgr,
runtimeMgr: runtimeManager,
otelMgr: otelManager,
specs: specs,
vars: emptyVars(t),
componentPIDTicker: time.NewTicker(time.Second * 30),
secretMarkerFunc: testSecretMarkerFunc,
}

var workDirPath string
var workDirCreated time.Time

t.Run("run in process manager", func(t *testing.T) {
// Create a policy with one input and one output (no otel configuration)
cfg := config.MustNewConfigFrom(`
outputs:
default:
type: elasticsearch
hosts:
- localhost:9200
inputs:
- id: test-input
type: filestream
use_output: default
_runtime_experimental: process
`)

// Send the policy change and make sure it was acknowledged.
cfgChange := &configChange{cfg: cfg}
configChan <- cfgChange
coord.runLoopIteration(ctx)
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")
assert.NoError(t, cfgChange.err, "config processing shouldn't report an error")
require.Len(t, coord.componentModel, 1, "there should be one component")
workDirPath = coord.componentModel[0].WorkDirPath(paths.Run())
stat, err := os.Stat(workDirPath)
require.NoError(t, err, "component working directory should exist")
assert.True(t, stat.IsDir(), "component working directory should exist")
workDirCreated = stat.ModTime()
})

t.Run("run in otel manager", func(t *testing.T) {
// Create a policy with one input and one output (no otel configuration)
cfg := config.MustNewConfigFrom(`
outputs:
default:
type: elasticsearch
hosts:
- localhost:9200
inputs:
- id: test-input
type: filestream
use_output: default
_runtime_experimental: otel
`)

// Send the policy change and make sure it was acknowledged.
cfgChange := &configChange{cfg: cfg}
configChan <- cfgChange
coord.runLoopIteration(ctx)
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")
assert.NoError(t, cfgChange.err, "config processing shouldn't report an error")
require.Len(t, coord.componentModel, 1, "there should be one component")
compState := runtime.ComponentComponentState{
Component: component.Component{
ID: "filestream-default",
},
State: runtime.ComponentState{
State: client.UnitStateStopped,
},
}
updateChan <- compState
coord.runLoopIteration(ctx)
stat, err := os.Stat(workDirPath)
require.NoError(t, err, "component working directory should exist")
assert.True(t, stat.IsDir(), "component working directory should exist")
assert.Equal(t, workDirCreated, stat.ModTime(), "component working directory shouldn't have been modified")
})
t.Run("remove component", func(t *testing.T) {
// Create a policy with one input and one output (no otel configuration)
cfg := config.MustNewConfigFrom(`
outputs:
default:
type: elasticsearch
hosts:
- localhost:9200
inputs: []
`)

// Send the policy change and make sure it was acknowledged.
cfgChange := &configChange{cfg: cfg}
configChan <- cfgChange
coord.runLoopIteration(ctx)
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")
assert.NoError(t, cfgChange.err, "config processing shouldn't report an error")
require.Len(t, coord.componentModel, 0, "there should be one component")

compState := runtime.ComponentComponentState{
Component: component.Component{
ID: "filestream-default",
},
State: runtime.ComponentState{
State: client.UnitStateStopped,
},
}
updateChan <- compState
coord.runLoopIteration(ctx)
assert.NoDirExists(t, workDirPath, "component working directory shouldn't exist anymore")
})

}

func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) {
// Set a one-second timeout -- nothing here should block, but if it
// does let's report a failure instead of timing out the test runner.
Expand Down
42 changes: 42 additions & 0 deletions pkg/component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
"errors"
"fmt"
"maps"
"os"
"path/filepath"
"runtime"
"slices"
"sort"
"strings"
Expand Down Expand Up @@ -39,6 +42,7 @@ const (
defaultUnitLogLevel = client.UnitLogLevelInfo
headersKey = "headers"
elasticsearchType = "elasticsearch"
workDirPathMod = 0770
ProcessRuntimeManager = RuntimeManager("process")
OtelRuntimeManager = RuntimeManager("otel")
DefaultRuntimeManager RuntimeManager = ProcessRuntimeManager
Expand Down Expand Up @@ -238,6 +242,44 @@ func (c *Component) BinaryName() string {
return ""
}

// WorkDirName returns the name of the component's working directory.
func (c *Component) WorkDirName() string {
return c.ID
}

// WorkDirPath returns the full path of the component's working directory, placing it under the provided parent path.
func (c *Component) WorkDirPath(parentDirPath string) string {
return filepath.Join(parentDirPath, c.WorkDirName())
}

// PrepareWorkDir prepares the component working directory under the provided parent path. This involves creating
// it under the right ownership and ACLs. This method is idempotent.
func (c *Component) PrepareWorkDir(parentDirPath string) error {
uid, gid := os.Geteuid(), os.Getegid()
path := c.WorkDirPath(parentDirPath)
err := os.MkdirAll(path, workDirPathMod)
if err != nil {
return fmt.Errorf("failed to create path %q: %w", path, err)
}
if runtime.GOOS == Windows {
return nil
}
err = os.Chown(path, uid, gid)
if err != nil {
return fmt.Errorf("failed to chown %q: %w", path, err)
}
err = os.Chmod(path, workDirPathMod)
if err != nil {
return fmt.Errorf("failed to chmod %q: %w", path, err)
}
return nil
}

// RemoveWorkDir removes the component working directory under the provided parent path. This method is idempotent.
func (c *Component) RemoveWorkDir(parentDirPath string) error {
return os.RemoveAll(c.WorkDirPath(parentDirPath))
}

// Model is the components model with signed policy data
// This replaces former top level []Components with the top Model that captures signed policy data.
// The signed data is a part of the policy since 8.8.0 release and contains the signed policy fragments and the signature that can be validated.
Expand Down
Loading