Skip to content

Commit 23d2c32

Browse files
swiatekmleehinman
authored andcommitted
Move component working directory management to coordinator (#10857)
* Move component setup and teardown code to the Component struct * Move teardown to coordinator * Integration test to check data re-ingest when switching runtimes * Add integration test for component working dirs * Check workdir creation time in tests * Fix linter warnings * Fix a minor issue in endpoint integration tests * Add changelog entry * Completely remove workdir handling from command runtime * Fix changelog summary * Use fleet in the integration test --------- Co-authored-by: Lee E. Hinman <[email protected]> (cherry picked from commit 41fd24f)
1 parent 59e6509 commit 23d2c32

File tree

10 files changed

+917
-117
lines changed

10 files changed

+917
-117
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: bug-fix
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: Fix issue where switching to OTEL runtime would cause data to be re-ingested
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
19+
#description:
20+
21+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
22+
component: elastic-agent
23+
24+
# PR URL; optional; the PR number that added the changeset.
25+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
26+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
27+
# Please provide it if you are adding a fragment for a different PR.
28+
#pr: https://github.com/owner/repo/1234
29+
30+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
31+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
32+
#issue: https://github.com/owner/repo/1234

internal/pkg/agent/application/coordinator/coordinator.go

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -941,10 +941,15 @@ func (c *Coordinator) watchRuntimeComponents(
941941
state := make(map[string]runtime.ComponentState)
942942

943943
for {
944+
var componentStates []runtime.ComponentComponentState
944945
select {
945946
case <-ctx.Done():
946947
return
947948
case componentState := <-runtimeComponentStates:
949+
componentStates = append(componentStates, componentState)
950+
case componentStates = <-otelComponentStates:
951+
}
952+
for _, componentState := range componentStates {
948953
logComponentStateChange(c.logger, state, &componentState)
949954
// Forward the final changes back to Coordinator, unless our context
950955
// has ended.
@@ -953,21 +958,23 @@ func (c *Coordinator) watchRuntimeComponents(
953958
case <-ctx.Done():
954959
return
955960
}
956-
case componentStates := <-otelComponentStates:
957-
for _, componentState := range componentStates {
958-
logComponentStateChange(c.logger, state, &componentState)
959-
// Forward the final changes back to Coordinator, unless our context
960-
// has ended.
961-
select {
962-
case c.managerChans.runtimeManagerUpdate <- componentState:
963-
case <-ctx.Done():
964-
return
965-
}
966-
}
967961
}
968962
}
969963
}
970964

965+
// ensureComponentWorkDirs ensures the component working directories exist for current components. This method is
966+
// idempotent.
967+
func (c *Coordinator) ensureComponentWorkDirs() error {
968+
for _, comp := range c.componentModel {
969+
c.logger.Debugf("Ensuring a working directory exists for component: %s", comp.ID)
970+
err := comp.PrepareWorkDir(paths.Run())
971+
if err != nil {
972+
return fmt.Errorf("preparing a working directory for component %s failed: %w", comp.ID, err)
973+
}
974+
}
975+
return nil
976+
}
977+
971978
// logComponentStateChange emits a log message based on the new component state.
972979
func logComponentStateChange(
973980
logger *logger.Logger,
@@ -1750,6 +1757,12 @@ func (c *Coordinator) refreshComponentModel(ctx context.Context) (err error) {
17501757
return fmt.Errorf("generating component model: %w", err)
17511758
}
17521759

1760+
// ensure all components have working directories
1761+
err = c.ensureComponentWorkDirs()
1762+
if err != nil {
1763+
return fmt.Errorf("ensuring component work dirs exists: %w", err)
1764+
}
1765+
17531766
signed, err := component.SignedFromPolicy(c.derivedConfig)
17541767
if err != nil {
17551768
if !errors.Is(err, component.ErrNotFound) {

internal/pkg/agent/application/coordinator/coordinator_state.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,14 @@ package coordinator
66

77
import (
88
"fmt"
9+
"slices"
910

1011
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
1112
"go.opentelemetry.io/collector/component/componentstatus"
1213

14+
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
15+
"github.com/elastic/elastic-agent/pkg/component"
16+
1317
"github.com/elastic/elastic-agent-client/v7/pkg/client"
1418
"github.com/elastic/elastic-agent-libs/logp"
1519

@@ -176,6 +180,17 @@ func (c *Coordinator) applyComponentState(state runtime.ComponentComponentState)
176180
break
177181
}
178182
}
183+
// If the component was stopped and isn't listed among components the coordinator knows about, it's safe to
184+
// remove its working directory. Otherwise, we may just be moving it between runtimes.
185+
if !slices.ContainsFunc(c.componentModel, func(c component.Component) bool {
186+
return state.Component.ID == c.ID
187+
}) {
188+
c.logger.Debugf("removing working directory for component '%s'", state.Component.ID)
189+
err := state.Component.RemoveWorkDir(paths.Run())
190+
if err != nil {
191+
c.logger.Warnf("failed to remove workdir for component %s: %v", state.Component.ID, err)
192+
}
193+
}
179194
}
180195

181196
c.stateNeedsRefresh = true

internal/pkg/agent/application/coordinator/coordinator_unit_test.go

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1151,6 +1151,174 @@ service:
11511151

11521152
}
11531153

1154+
func TestCoordinatorManagesComponentWorkDirs(t *testing.T) {
1155+
// Send a test policy to the Coordinator as a Config Manager update,
1156+
// verify it creates a working directory for the component, keeps that working directory as the component
1157+
// moves to a different runtime, then deletes it after the component is stopped.
1158+
top := paths.Top()
1159+
paths.SetTop(t.TempDir())
1160+
t.Cleanup(func() {
1161+
paths.SetTop(top)
1162+
})
1163+
1164+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
1165+
defer cancel()
1166+
logger := logp.NewLogger("testing")
1167+
1168+
configChan := make(chan ConfigChange, 1)
1169+
updateChan := make(chan runtime.ComponentComponentState, 1)
1170+
1171+
// Create a mocked runtime manager that will report the update call
1172+
runtimeManager := &fakeRuntimeManager{}
1173+
otelManager := &fakeOTelManager{}
1174+
1175+
// we need the filestream spec to be able to convert to Otel config
1176+
componentSpec := component.InputRuntimeSpec{
1177+
InputType: "filestream",
1178+
BinaryName: "agentbeat",
1179+
Spec: component.InputSpec{
1180+
Name: "filestream",
1181+
Command: &component.CommandSpec{
1182+
Args: []string{"filebeat"},
1183+
},
1184+
Platforms: []string{
1185+
"linux/amd64",
1186+
"linux/arm64",
1187+
"darwin/amd64",
1188+
"darwin/arm64",
1189+
"windows/amd64",
1190+
"container/amd64",
1191+
"container/arm64",
1192+
},
1193+
},
1194+
}
1195+
1196+
platform, err := component.LoadPlatformDetail()
1197+
require.NoError(t, err)
1198+
specs, err := component.NewRuntimeSpecs(platform, []component.InputRuntimeSpec{componentSpec})
1199+
require.NoError(t, err)
1200+
1201+
monitoringMgr := newTestMonitoringMgr()
1202+
coord := &Coordinator{
1203+
logger: logger,
1204+
agentInfo: &info.AgentInfo{},
1205+
stateBroadcaster: broadcaster.New(State{}, 0, 0),
1206+
managerChans: managerChans{
1207+
configManagerUpdate: configChan,
1208+
runtimeManagerUpdate: updateChan,
1209+
},
1210+
monitorMgr: monitoringMgr,
1211+
runtimeMgr: runtimeManager,
1212+
otelMgr: otelManager,
1213+
specs: specs,
1214+
vars: emptyVars(t),
1215+
componentPIDTicker: time.NewTicker(time.Second * 30),
1216+
secretMarkerFunc: testSecretMarkerFunc,
1217+
}
1218+
1219+
var workDirPath string
1220+
var workDirCreated time.Time
1221+
1222+
t.Run("run in process manager", func(t *testing.T) {
1223+
// Create a policy with one input and one output (no otel configuration)
1224+
cfg := config.MustNewConfigFrom(`
1225+
outputs:
1226+
default:
1227+
type: elasticsearch
1228+
hosts:
1229+
- localhost:9200
1230+
inputs:
1231+
- id: test-input
1232+
type: filestream
1233+
use_output: default
1234+
_runtime_experimental: process
1235+
`)
1236+
1237+
// Send the policy change and make sure it was acknowledged.
1238+
cfgChange := &configChange{cfg: cfg}
1239+
configChan <- cfgChange
1240+
coord.runLoopIteration(ctx)
1241+
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")
1242+
assert.NoError(t, cfgChange.err, "config processing shouldn't report an error")
1243+
require.Len(t, coord.componentModel, 1, "there should be one component")
1244+
workDirPath = coord.componentModel[0].WorkDirPath(paths.Run())
1245+
stat, err := os.Stat(workDirPath)
1246+
require.NoError(t, err, "component working directory should exist")
1247+
assert.True(t, stat.IsDir(), "component working directory should exist")
1248+
workDirCreated = stat.ModTime()
1249+
})
1250+
1251+
t.Run("run in otel manager", func(t *testing.T) {
1252+
// Create a policy with one input and one output (no otel configuration)
1253+
cfg := config.MustNewConfigFrom(`
1254+
outputs:
1255+
default:
1256+
type: elasticsearch
1257+
hosts:
1258+
- localhost:9200
1259+
inputs:
1260+
- id: test-input
1261+
type: filestream
1262+
use_output: default
1263+
_runtime_experimental: otel
1264+
`)
1265+
1266+
// Send the policy change and make sure it was acknowledged.
1267+
cfgChange := &configChange{cfg: cfg}
1268+
configChan <- cfgChange
1269+
coord.runLoopIteration(ctx)
1270+
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")
1271+
assert.NoError(t, cfgChange.err, "config processing shouldn't report an error")
1272+
require.Len(t, coord.componentModel, 1, "there should be one component")
1273+
compState := runtime.ComponentComponentState{
1274+
Component: component.Component{
1275+
ID: "filestream-default",
1276+
},
1277+
State: runtime.ComponentState{
1278+
State: client.UnitStateStopped,
1279+
},
1280+
}
1281+
updateChan <- compState
1282+
coord.runLoopIteration(ctx)
1283+
stat, err := os.Stat(workDirPath)
1284+
require.NoError(t, err, "component working directory should exist")
1285+
assert.True(t, stat.IsDir(), "component working directory should exist")
1286+
assert.Equal(t, workDirCreated, stat.ModTime(), "component working directory shouldn't have been modified")
1287+
})
1288+
t.Run("remove component", func(t *testing.T) {
1289+
// Create a policy with one input and one output (no otel configuration)
1290+
cfg := config.MustNewConfigFrom(`
1291+
outputs:
1292+
default:
1293+
type: elasticsearch
1294+
hosts:
1295+
- localhost:9200
1296+
inputs: []
1297+
`)
1298+
1299+
// Send the policy change and make sure it was acknowledged.
1300+
cfgChange := &configChange{cfg: cfg}
1301+
configChan <- cfgChange
1302+
coord.runLoopIteration(ctx)
1303+
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")
1304+
assert.NoError(t, cfgChange.err, "config processing shouldn't report an error")
1305+
require.Len(t, coord.componentModel, 0, "there should be one component")
1306+
1307+
compState := runtime.ComponentComponentState{
1308+
Component: component.Component{
1309+
ID: "filestream-default",
1310+
},
1311+
State: runtime.ComponentState{
1312+
State: client.UnitStateStopped,
1313+
},
1314+
}
1315+
updateChan <- compState
1316+
coord.runLoopIteration(ctx)
1317+
assert.NoDirExists(t, workDirPath, "component working directory shouldn't exist anymore")
1318+
})
1319+
1320+
}
1321+
11541322
func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) {
11551323
// Set a one-second timeout -- nothing here should block, but if it
11561324
// does let's report a failure instead of timing out the test runner.

pkg/component/component.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ import (
99
"errors"
1010
"fmt"
1111
"maps"
12+
"os"
13+
"path/filepath"
14+
"runtime"
1215
"slices"
1316
"sort"
1417
"strings"
@@ -39,6 +42,7 @@ const (
3942
defaultUnitLogLevel = client.UnitLogLevelInfo
4043
headersKey = "headers"
4144
elasticsearchType = "elasticsearch"
45+
workDirPathMod = 0770
4246
ProcessRuntimeManager = RuntimeManager("process")
4347
OtelRuntimeManager = RuntimeManager("otel")
4448
DefaultRuntimeManager RuntimeManager = ProcessRuntimeManager
@@ -238,6 +242,44 @@ func (c *Component) BinaryName() string {
238242
return ""
239243
}
240244

245+
// WorkDirName returns the name of the component's working directory.
246+
func (c *Component) WorkDirName() string {
247+
return c.ID
248+
}
249+
250+
// WorkDirPath returns the full path of the component's working directory, placing it under the provided parent path.
251+
func (c *Component) WorkDirPath(parentDirPath string) string {
252+
return filepath.Join(parentDirPath, c.WorkDirName())
253+
}
254+
255+
// PrepareWorkDir prepares the component working directory under the provided parent path. This involves creating
256+
// it under the right ownership and ACLs. This method is idempotent.
257+
func (c *Component) PrepareWorkDir(parentDirPath string) error {
258+
uid, gid := os.Geteuid(), os.Getegid()
259+
path := c.WorkDirPath(parentDirPath)
260+
err := os.MkdirAll(path, workDirPathMod)
261+
if err != nil {
262+
return fmt.Errorf("failed to create path %q: %w", path, err)
263+
}
264+
if runtime.GOOS == Windows {
265+
return nil
266+
}
267+
err = os.Chown(path, uid, gid)
268+
if err != nil {
269+
return fmt.Errorf("failed to chown %q: %w", path, err)
270+
}
271+
err = os.Chmod(path, workDirPathMod)
272+
if err != nil {
273+
return fmt.Errorf("failed to chmod %q: %w", path, err)
274+
}
275+
return nil
276+
}
277+
278+
// RemoveWorkDir removes the component working directory under the provided parent path. This method is idempotent.
279+
func (c *Component) RemoveWorkDir(parentDirPath string) error {
280+
return os.RemoveAll(c.WorkDirPath(parentDirPath))
281+
}
282+
241283
// Model is the components model with signed policy data
242284
// This replaces former top level []Components with the top Model that captures signed policy data.
243285
// The signed data is a part of the policy since 8.8.0 release and contains the signed policy fragments and the signature that can be validated.

0 commit comments

Comments
 (0)