Skip to content

Commit 0f6268e

Browse files
swiatekmleehinman
authored andcommitted
Move component working directory management to coordinator (#10857)
* Move component setup and teardown code to the Component struct * Move teardown to coordinator * Integration test to check data re-ingest when switching runtimes * Add integration test for component working dirs * Check workdir creation time in tests * Fix linter warnings * Fix a minor issue in endpoint integration tests * Add changelog entry * Completely remove workdir handling from command runtime * Fix changelog summary * Use fleet in the integration test --------- Co-authored-by: Lee E. Hinman <[email protected]> (cherry picked from commit 41fd24f)
1 parent 948ba76 commit 0f6268e

File tree

10 files changed

+917
-117
lines changed

10 files changed

+917
-117
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: bug-fix
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: Fix issue where switching to OTEL runtime would cause data to be re-ingested
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
19+
#description:
20+
21+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
22+
component: elastic-agent
23+
24+
# PR URL; optional; the PR number that added the changeset.
25+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
26+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
27+
# Please provide it if you are adding a fragment for a different PR.
28+
#pr: https://github.com/owner/repo/1234
29+
30+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
31+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
32+
#issue: https://github.com/owner/repo/1234

internal/pkg/agent/application/coordinator/coordinator.go

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -786,10 +786,15 @@ func (c *Coordinator) watchRuntimeComponents(
786786
state := make(map[string]runtime.ComponentState)
787787

788788
for {
789+
var componentStates []runtime.ComponentComponentState
789790
select {
790791
case <-ctx.Done():
791792
return
792793
case componentState := <-runtimeComponentStates:
794+
componentStates = append(componentStates, componentState)
795+
case componentStates = <-otelComponentStates:
796+
}
797+
for _, componentState := range componentStates {
793798
logComponentStateChange(c.logger, state, &componentState)
794799
// Forward the final changes back to Coordinator, unless our context
795800
// has ended.
@@ -798,21 +803,23 @@ func (c *Coordinator) watchRuntimeComponents(
798803
case <-ctx.Done():
799804
return
800805
}
801-
case componentStates := <-otelComponentStates:
802-
for _, componentState := range componentStates {
803-
logComponentStateChange(c.logger, state, &componentState)
804-
// Forward the final changes back to Coordinator, unless our context
805-
// has ended.
806-
select {
807-
case c.managerChans.runtimeManagerUpdate <- componentState:
808-
case <-ctx.Done():
809-
return
810-
}
811-
}
812806
}
813807
}
814808
}
815809

810+
// ensureComponentWorkDirs ensures the component working directories exist for current components. This method is
811+
// idempotent.
812+
func (c *Coordinator) ensureComponentWorkDirs() error {
813+
for _, comp := range c.componentModel {
814+
c.logger.Debugf("Ensuring a working directory exists for component: %s", comp.ID)
815+
err := comp.PrepareWorkDir(paths.Run())
816+
if err != nil {
817+
return fmt.Errorf("preparing a working directory for component %s failed: %w", comp.ID, err)
818+
}
819+
}
820+
return nil
821+
}
822+
816823
// logComponentStateChange emits a log message based on the new component state.
817824
func logComponentStateChange(
818825
logger *logger.Logger,
@@ -1599,6 +1606,12 @@ func (c *Coordinator) refreshComponentModel(ctx context.Context) (err error) {
15991606
return fmt.Errorf("generating component model: %w", err)
16001607
}
16011608

1609+
// ensure all components have working directories
1610+
err = c.ensureComponentWorkDirs()
1611+
if err != nil {
1612+
return fmt.Errorf("ensuring component work dirs exists: %w", err)
1613+
}
1614+
16021615
signed, err := component.SignedFromPolicy(c.derivedConfig)
16031616
if err != nil {
16041617
if !errors.Is(err, component.ErrNotFound) {

internal/pkg/agent/application/coordinator/coordinator_state.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,14 @@ package coordinator
66

77
import (
88
"fmt"
9+
"slices"
910

1011
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
1112
"go.opentelemetry.io/collector/component/componentstatus"
1213

14+
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
15+
"github.com/elastic/elastic-agent/pkg/component"
16+
1317
"github.com/elastic/elastic-agent-client/v7/pkg/client"
1418
"github.com/elastic/elastic-agent-libs/logp"
1519

@@ -176,6 +180,17 @@ func (c *Coordinator) applyComponentState(state runtime.ComponentComponentState)
176180
break
177181
}
178182
}
183+
// If the component was stopped and isn't listed among components the coordinator knows about, it's safe to
184+
// remove its working directory. Otherwise, we may just be moving it between runtimes.
185+
if !slices.ContainsFunc(c.componentModel, func(c component.Component) bool {
186+
return state.Component.ID == c.ID
187+
}) {
188+
c.logger.Debugf("removing working directory for component '%s'", state.Component.ID)
189+
err := state.Component.RemoveWorkDir(paths.Run())
190+
if err != nil {
191+
c.logger.Warnf("failed to remove workdir for component %s: %v", state.Component.ID, err)
192+
}
193+
}
179194
}
180195

181196
c.stateNeedsRefresh = true

internal/pkg/agent/application/coordinator/coordinator_unit_test.go

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1138,6 +1138,174 @@ service:
11381138

11391139
}
11401140

1141+
func TestCoordinatorManagesComponentWorkDirs(t *testing.T) {
1142+
// Send a test policy to the Coordinator as a Config Manager update,
1143+
// verify it creates a working directory for the component, keeps that working directory as the component
1144+
// moves to a different runtime, then deletes it after the component is stopped.
1145+
top := paths.Top()
1146+
paths.SetTop(t.TempDir())
1147+
t.Cleanup(func() {
1148+
paths.SetTop(top)
1149+
})
1150+
1151+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
1152+
defer cancel()
1153+
logger := logp.NewLogger("testing")
1154+
1155+
configChan := make(chan ConfigChange, 1)
1156+
updateChan := make(chan runtime.ComponentComponentState, 1)
1157+
1158+
// Create a mocked runtime manager that will report the update call
1159+
runtimeManager := &fakeRuntimeManager{}
1160+
otelManager := &fakeOTelManager{}
1161+
1162+
// we need the filestream spec to be able to convert to Otel config
1163+
componentSpec := component.InputRuntimeSpec{
1164+
InputType: "filestream",
1165+
BinaryName: "agentbeat",
1166+
Spec: component.InputSpec{
1167+
Name: "filestream",
1168+
Command: &component.CommandSpec{
1169+
Args: []string{"filebeat"},
1170+
},
1171+
Platforms: []string{
1172+
"linux/amd64",
1173+
"linux/arm64",
1174+
"darwin/amd64",
1175+
"darwin/arm64",
1176+
"windows/amd64",
1177+
"container/amd64",
1178+
"container/arm64",
1179+
},
1180+
},
1181+
}
1182+
1183+
platform, err := component.LoadPlatformDetail()
1184+
require.NoError(t, err)
1185+
specs, err := component.NewRuntimeSpecs(platform, []component.InputRuntimeSpec{componentSpec})
1186+
require.NoError(t, err)
1187+
1188+
monitoringMgr := newTestMonitoringMgr()
1189+
coord := &Coordinator{
1190+
logger: logger,
1191+
agentInfo: &info.AgentInfo{},
1192+
stateBroadcaster: broadcaster.New(State{}, 0, 0),
1193+
managerChans: managerChans{
1194+
configManagerUpdate: configChan,
1195+
runtimeManagerUpdate: updateChan,
1196+
},
1197+
monitorMgr: monitoringMgr,
1198+
runtimeMgr: runtimeManager,
1199+
otelMgr: otelManager,
1200+
specs: specs,
1201+
vars: emptyVars(t),
1202+
componentPIDTicker: time.NewTicker(time.Second * 30),
1203+
secretMarkerFunc: testSecretMarkerFunc,
1204+
}
1205+
1206+
var workDirPath string
1207+
var workDirCreated time.Time
1208+
1209+
t.Run("run in process manager", func(t *testing.T) {
1210+
// Create a policy with one input and one output (no otel configuration)
1211+
cfg := config.MustNewConfigFrom(`
1212+
outputs:
1213+
default:
1214+
type: elasticsearch
1215+
hosts:
1216+
- localhost:9200
1217+
inputs:
1218+
- id: test-input
1219+
type: filestream
1220+
use_output: default
1221+
_runtime_experimental: process
1222+
`)
1223+
1224+
// Send the policy change and make sure it was acknowledged.
1225+
cfgChange := &configChange{cfg: cfg}
1226+
configChan <- cfgChange
1227+
coord.runLoopIteration(ctx)
1228+
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")
1229+
assert.NoError(t, cfgChange.err, "config processing shouldn't report an error")
1230+
require.Len(t, coord.componentModel, 1, "there should be one component")
1231+
workDirPath = coord.componentModel[0].WorkDirPath(paths.Run())
1232+
stat, err := os.Stat(workDirPath)
1233+
require.NoError(t, err, "component working directory should exist")
1234+
assert.True(t, stat.IsDir(), "component working directory should exist")
1235+
workDirCreated = stat.ModTime()
1236+
})
1237+
1238+
t.Run("run in otel manager", func(t *testing.T) {
1239+
// Create a policy with one input and one output (no otel configuration)
1240+
cfg := config.MustNewConfigFrom(`
1241+
outputs:
1242+
default:
1243+
type: elasticsearch
1244+
hosts:
1245+
- localhost:9200
1246+
inputs:
1247+
- id: test-input
1248+
type: filestream
1249+
use_output: default
1250+
_runtime_experimental: otel
1251+
`)
1252+
1253+
// Send the policy change and make sure it was acknowledged.
1254+
cfgChange := &configChange{cfg: cfg}
1255+
configChan <- cfgChange
1256+
coord.runLoopIteration(ctx)
1257+
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")
1258+
assert.NoError(t, cfgChange.err, "config processing shouldn't report an error")
1259+
require.Len(t, coord.componentModel, 1, "there should be one component")
1260+
compState := runtime.ComponentComponentState{
1261+
Component: component.Component{
1262+
ID: "filestream-default",
1263+
},
1264+
State: runtime.ComponentState{
1265+
State: client.UnitStateStopped,
1266+
},
1267+
}
1268+
updateChan <- compState
1269+
coord.runLoopIteration(ctx)
1270+
stat, err := os.Stat(workDirPath)
1271+
require.NoError(t, err, "component working directory should exist")
1272+
assert.True(t, stat.IsDir(), "component working directory should exist")
1273+
assert.Equal(t, workDirCreated, stat.ModTime(), "component working directory shouldn't have been modified")
1274+
})
1275+
t.Run("remove component", func(t *testing.T) {
1276+
// Create a policy with one input and one output (no otel configuration)
1277+
cfg := config.MustNewConfigFrom(`
1278+
outputs:
1279+
default:
1280+
type: elasticsearch
1281+
hosts:
1282+
- localhost:9200
1283+
inputs: []
1284+
`)
1285+
1286+
// Send the policy change and make sure it was acknowledged.
1287+
cfgChange := &configChange{cfg: cfg}
1288+
configChan <- cfgChange
1289+
coord.runLoopIteration(ctx)
1290+
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")
1291+
assert.NoError(t, cfgChange.err, "config processing shouldn't report an error")
1292+
require.Len(t, coord.componentModel, 0, "there should be one component")
1293+
1294+
compState := runtime.ComponentComponentState{
1295+
Component: component.Component{
1296+
ID: "filestream-default",
1297+
},
1298+
State: runtime.ComponentState{
1299+
State: client.UnitStateStopped,
1300+
},
1301+
}
1302+
updateChan <- compState
1303+
coord.runLoopIteration(ctx)
1304+
assert.NoDirExists(t, workDirPath, "component working directory shouldn't exist anymore")
1305+
})
1306+
1307+
}
1308+
11411309
func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) {
11421310
// Set a one-second timeout -- nothing here should block, but if it
11431311
// does let's report a failure instead of timing out the test runner.

pkg/component/component.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ import (
99
"errors"
1010
"fmt"
1111
"maps"
12+
"os"
13+
"path/filepath"
14+
"runtime"
1215
"slices"
1316
"sort"
1417
"strings"
@@ -39,6 +42,7 @@ const (
3942
defaultUnitLogLevel = client.UnitLogLevelInfo
4043
headersKey = "headers"
4144
elasticsearchType = "elasticsearch"
45+
workDirPathMod = 0770
4246
ProcessRuntimeManager = RuntimeManager("process")
4347
OtelRuntimeManager = RuntimeManager("otel")
4448
DefaultRuntimeManager RuntimeManager = ProcessRuntimeManager
@@ -238,6 +242,44 @@ func (c *Component) BinaryName() string {
238242
return ""
239243
}
240244

245+
// WorkDirName returns the name of the component's working directory.
246+
func (c *Component) WorkDirName() string {
247+
return c.ID
248+
}
249+
250+
// WorkDirPath returns the full path of the component's working directory, placing it under the provided parent path.
251+
func (c *Component) WorkDirPath(parentDirPath string) string {
252+
return filepath.Join(parentDirPath, c.WorkDirName())
253+
}
254+
255+
// PrepareWorkDir prepares the component working directory under the provided parent path. This involves creating
256+
// it under the right ownership and ACLs. This method is idempotent.
257+
func (c *Component) PrepareWorkDir(parentDirPath string) error {
258+
uid, gid := os.Geteuid(), os.Getegid()
259+
path := c.WorkDirPath(parentDirPath)
260+
err := os.MkdirAll(path, workDirPathMod)
261+
if err != nil {
262+
return fmt.Errorf("failed to create path %q: %w", path, err)
263+
}
264+
if runtime.GOOS == Windows {
265+
return nil
266+
}
267+
err = os.Chown(path, uid, gid)
268+
if err != nil {
269+
return fmt.Errorf("failed to chown %q: %w", path, err)
270+
}
271+
err = os.Chmod(path, workDirPathMod)
272+
if err != nil {
273+
return fmt.Errorf("failed to chmod %q: %w", path, err)
274+
}
275+
return nil
276+
}
277+
278+
// RemoveWorkDir removes the component working directory under the provided parent path. This method is idempotent.
279+
func (c *Component) RemoveWorkDir(parentDirPath string) error {
280+
return os.RemoveAll(c.WorkDirPath(parentDirPath))
281+
}
282+
241283
// Model is the components model with signed policy data
242284
// This replaces former top level []Components with the top Model that captures signed policy data.
243285
// The signed data is a part of the policy since 8.8.0 release and contains the signed policy fragments and the signature that can be validated.

0 commit comments

Comments
 (0)