@@ -22,6 +22,7 @@ import (
2222 "time"
2323
2424 "github.com/elastic/elastic-agent/pkg/component"
25+ "github.com/elastic/elastic-agent/pkg/testing/tools/fleettools"
2526 "github.com/elastic/go-elasticsearch/v8"
2627
2728 "github.com/gofrs/uuid/v5"
@@ -1355,13 +1356,14 @@ func setStrictMapping(client *elasticsearch.Client, index string) error {
13551356// re-ingest logs.
13561357//
13571358// Flow
1358- // 1) Create policy in Kibana with just monitoring and download it
1359- // 2) Install with monitoring with "process" runtime
1360- // 3) restart agent, to roll log files
1361- // 4) Switch to monitoring "otel" runtime
1362- // 5) restart agent 3 times, making sure healthy between restarts
1363- // 6) query ES for monitoring logs with aggregation on fingerprint and line number, should be 0 duplicates
1364- // 7) uninstall
1359+ // 1. Create policy in Kibana with just monitoring and "process" runtime
1360+ // 2. Install and Enroll
1361+ // 3. Switch to monitoring "otel" runtime
1362+ // 4. restart agent 3 times, making sure healthy between restarts
1363+ // 5. switch back to "process" runtime
1364+ // 6. query ES for monitoring logs with aggregation on fingerprint and line number,
1365+ // ideally 0 duplicates but possible to have a small number
1366+ // 7. uninstall
13651367func TestMonitoringNoDuplicates (t * testing.T ) {
13661368 info := define .Require (t , define.Requirements {
13671369 Group : integration .Default ,
@@ -1375,96 +1377,61 @@ func TestMonitoringNoDuplicates(t *testing.T) {
13751377 Sudo : true ,
13761378 })
13771379
1378- installOpts := atesting.InstallOpts {
1379- NonInteractive : true ,
1380- Privileged : true ,
1381- Force : true ,
1382- Develop : true ,
1383- }
1384-
1385- ctx , cancel := testcontext .WithDeadline (t , context .Background (), time .Now ().Add (5 * time .Minute ))
1380+ ctx , cancel := testcontext .WithDeadline (t ,
1381+ context .Background (),
1382+ time .Now ().Add (5 * time .Minute ))
13861383 t .Cleanup (cancel )
13871384
1388- policyCtx , policyCancel := testcontext .WithDeadline (t , context .Background (), time .Now ().Add (5 * time .Minute ))
1389- t .Cleanup (policyCancel )
1390-
1385+ policyName := fmt .Sprintf ("%s-%s" , t .Name (), uuid .Must (uuid .NewV4 ()).String ())
13911386 createPolicyReq := kibana.AgentPolicy {
1392- Name : fmt . Sprintf ( "%s-%s" , t . Name (), uuid . Must ( uuid . NewV4 ()). String ()) ,
1387+ Name : policyName ,
13931388 Namespace : info .Namespace ,
13941389 Description : fmt .Sprintf ("%s policy" , t .Name ()),
13951390 MonitoringEnabled : []kibana.MonitoringEnabledOption {
13961391 kibana .MonitoringEnabledLogs ,
13971392 kibana .MonitoringEnabledMetrics ,
13981393 },
1394+ Overrides : map [string ]any {
1395+ "agent" : map [string ]any {
1396+ "monitoring" : map [string ]any {
1397+ "_runtime_experimental" : "process" ,
1398+ },
1399+ },
1400+ },
13991401 }
1400- policyResponse , err := info .KibanaClient .CreatePolicy (policyCtx , createPolicyReq )
1402+ policyResponse , err := info .KibanaClient .CreatePolicy (ctx , createPolicyReq )
14011403 require .NoError (t , err , "error creating policy" )
14021404
1403- downloadURL := fmt .Sprintf ("/api/fleet/agent_policies/%s/download" , policyResponse .ID )
1404- resp , err := info .KibanaClient .Connection .SendWithContext (policyCtx , http .MethodGet , downloadURL , nil , nil , nil )
1405- require .NoError (t , err , "error downloading policy" )
1406- policyBytes , err := io .ReadAll (resp .Body )
1407- require .NoError (t , err , "error reading policy response" )
1408- defer resp .Body .Close ()
1409-
1410- apiKeyResponse , err := createESApiKey (info .ESClient )
1411- require .NoError (t , err , "failed to get api key" )
1412- require .True (t , len (apiKeyResponse .Encoded ) > 1 , "api key is invalid %q" , apiKeyResponse )
1413- apiKey , err := getDecodedApiKey (apiKeyResponse )
1414- require .NoError (t , err , "error decoding api key" )
1415-
1416- type PolicyOutputs struct {
1417- Type string `yaml:"type"`
1418- Hosts []string `yaml:"hosts"`
1419- Preset string `yaml:"preset"`
1420- ApiKey string `yaml:"api_key"`
1421- }
1422- type PolicyStruct struct {
1423- ID string `yaml:"id"`
1424- Revision int `yaml:"revision"`
1425- Outputs map [string ]PolicyOutputs `yaml:"outputs"`
1426- Fleet map [string ]any `yaml:"fleet"`
1427- OutputPermissions map [string ]any `yaml:"output_permissions"`
1428- Agent struct {
1429- Monitoring map [string ]any `yaml:"monitoring"`
1430- Rest map [string ]any `yaml:",inline"`
1431- } `yaml:"agent"`
1432- Inputs []map [string ]any `yaml:"inputs"`
1433- Signed map [string ]any `yaml:"signed"`
1434- SecretReferences []map [string ]any `yaml:"secret_references"`
1435- Namespaces []string `yaml:"namespaces"`
1436- }
1437-
1438- policy := PolicyStruct {}
1439- err = yaml .Unmarshal (policyBytes , & policy )
1440- require .NoError (t , err , "error unmarshalling policy: %s" , string (policyBytes ))
1441- d , prs := policy .Outputs ["default" ]
1442- require .True (t , prs , "default must be in outputs" )
1443- d .ApiKey = string (apiKey )
1444- policy .Outputs ["default" ] = d
1445- policy .Agent .Monitoring ["_runtime_experimental" ] = "process"
1446-
1447- updatedPolicyBytes , err := yaml .Marshal (policy )
1448- require .NoErrorf (t , err , "error marshalling policy, struct was %v" , policy )
1449- t .Cleanup (func () {
1450- if t .Failed () {
1451- t .Logf ("policy was %s" , string (updatedPolicyBytes ))
1452- }
1453- })
1405+ enrollmentToken , err := info .KibanaClient .CreateEnrollmentAPIKey (ctx ,
1406+ kibana.CreateEnrollmentAPIKeyRequest {
1407+ PolicyID : policyResponse .ID ,
1408+ })
14541409
14551410 fut , err := define .NewFixtureFromLocalBuild (t , define .Version ())
14561411 require .NoError (t , err )
1412+
14571413 err = fut .Prepare (ctx )
14581414 require .NoError (t , err )
1459- err = fut .Configure (ctx , updatedPolicyBytes )
1460- require .NoError (t , err )
1461- combinedOutput , err := fut .InstallWithoutEnroll (ctx , & installOpts )
1462- require .NoErrorf (t , err , "error install without enroll: %s\n combinedoutput:\n %s" , err , string (combinedOutput ))
14631415
1464- // store timestamp to filter otel docs with timestamp greater than this value
1416+ fleetServerURL , err := fleettools .DefaultURL (ctx , info .KibanaClient )
1417+ require .NoError (t , err , "failed getting Fleet Server URL" )
1418+
1419+ installOpts := atesting.InstallOpts {
1420+ NonInteractive : true ,
1421+ Privileged : true ,
1422+ Force : true ,
1423+ EnrollOpts : atesting.EnrollOpts {
1424+ URL : fleetServerURL ,
1425+ EnrollmentToken : enrollmentToken .APIKey ,
1426+ },
1427+ }
1428+ combinedOutput , err := fut .Install (ctx , & installOpts )
1429+ require .NoErrorf (t , err , "error install with enroll: %s\n combinedoutput:\n %s" , err , string (combinedOutput ))
1430+
1431+ // store timestamp to filter duplicate docs with timestamp greater than this value
14651432 installTimestamp := time .Now ().UTC ().Format ("2006-01-02T15:04:05.000Z" )
14661433
1467- healthcheck := func (ctx context.Context , message string , runtime component.RuntimeManager , componentCount int , timestamp string ) {
1434+ healthCheck := func (ctx context.Context , message string , runtime component.RuntimeManager , componentCount int , timestamp string ) {
14681435 require .EventuallyWithT (t , func (collect * assert.CollectT ) {
14691436 var statusErr error
14701437 status , statusErr := fut .ExecStatus (ctx )
@@ -1501,56 +1468,94 @@ func TestMonitoringNoDuplicates(t *testing.T) {
15011468 }
15021469
15031470 // make sure running and logs are making it to ES
1504- healthcheck (ctx , "control checkin v2 protocol has chunking enabled" , component .ProcessRuntimeManager , 3 , installTimestamp )
1471+ healthCheck (ctx ,
1472+ "control checkin v2 protocol has chunking enabled" ,
1473+ component .ProcessRuntimeManager ,
1474+ 3 ,
1475+ installTimestamp )
15051476
1506- // restart process monitoring, gives us multiple files to track in registry
1507- restartTimestamp := time .Now ().UTC ().Format ("2006-01-02T15:04:05.000Z" )
1508- restartBytes , err := fut .Exec (ctx , []string {"restart" })
1509- require .NoErrorf (t , err , "Restart error: %s, output was: %s" , err , string (restartBytes ))
1510- healthcheck (ctx , "control checkin v2 protocol has chunking enabled" , component .ProcessRuntimeManager , 3 , restartTimestamp )
1477+ // Switch to otel monitoring
1478+ otelMonUpdateReq := kibana.AgentPolicyUpdateRequest {
1479+ Name : policyName ,
1480+ Namespace : info .Namespace ,
1481+ Overrides : map [string ]any {
1482+ "agent" : map [string ]any {
1483+ "monitoring" : map [string ]any {
1484+ "_runtime_experimental" : "otel" ,
1485+ },
1486+ },
1487+ },
1488+ }
15111489
1512- // turn off monitoring
1513- policy .Agent .Monitoring ["enabled" ] = false
1514- updatedPolicyBytes , err = yaml .Marshal (policy )
1515- require .NoErrorf (t , err , "error marshalling policy, struct was %v" , policy )
1516- err = fut .Configure (ctx , updatedPolicyBytes )
1490+ otelMonResp , err := info .KibanaClient .UpdatePolicy (ctx ,
1491+ policyResponse .ID , otelMonUpdateReq )
15171492 require .NoError (t , err )
15181493
1519- // restart to make sure beat processes have exited
1520- restartBytes , err = fut .Exec (ctx , []string {"restart" })
1521- require .NoErrorf (t , err , "Restart error: %s, output was: %s" , err , string (restartBytes ))
1522-
1523- // make sure agent is running. with no processes.
1524- require .EventuallyWithT (t , func (collect * assert.CollectT ) {
1525- var statusErr error
1526- status , statusErr := fut .ExecStatus (ctx )
1527- assert .NoError (collect , statusErr )
1528- assertBeatsHealthy (collect , & status , component .ProcessRuntimeManager , 0 )
1529- }, 1 * time .Minute , 1 * time .Second )
1494+ otelTimestamp := time .Now ().UTC ().Format ("2006-01-02T15:04:05.000Z" )
15301495
1531- // Switch to otel monitoring
1532- policy .Agent .Monitoring ["enabled" ] = true
1533- policy .Agent .Monitoring ["_runtime_experimental" ] = "otel"
1534- updatedPolicyBytes , err = yaml .Marshal (policy )
1535- require .NoErrorf (t , err , "error marshalling policy, struct was %v" , policy )
1536- err = fut .Configure (ctx , updatedPolicyBytes )
1537- require .NoError (t , err )
1496+ // wait until policy is applied
1497+ policyCheck := func (expectedRevision int ) {
1498+ require .Eventually (t , func () bool {
1499+ inspectOutput , err := fut .ExecInspect (ctx )
1500+ require .NoError (t , err )
1501+ return expectedRevision == inspectOutput .Revision
1502+ }, 3 * time .Minute , 1 * time .Second )
1503+ }
1504+ policyCheck (otelMonResp .Revision )
15381505
15391506 // make sure running and logs are making it to ES
1540- otelTimestamp := time .Now ().UTC ().Format ("2006-01-02T15:04:05.000Z" )
1541- healthcheck (ctx , "Everything is ready. Begin running and processing data." , component .OtelRuntimeManager , 4 , otelTimestamp )
1507+ healthCheck (ctx ,
1508+ "Everything is ready. Begin running and processing data." ,
1509+ component .OtelRuntimeManager ,
1510+ 4 ,
1511+ otelTimestamp )
15421512
15431513 // restart 3 times, checks path definition is stable
15441514 for range 3 {
15451515 restartTimestamp := time .Now ().UTC ().Format ("2006-01-02T15:04:05.000Z" )
15461516 restartBytes , err := fut .Exec (ctx , []string {"restart" })
1547- require .NoErrorf (t , err , "Restart error: %s, output was: %s" , err , string (restartBytes ))
1548- healthcheck (ctx , "Everything is ready. Begin running and processing data." , component .OtelRuntimeManager , 4 , restartTimestamp )
1517+ require .NoErrorf (t ,
1518+ err ,
1519+ "Restart error: %s, output was: %s" ,
1520+ err ,
1521+ string (restartBytes ))
1522+ healthCheck (ctx ,
1523+ "Everything is ready. Begin running and processing data." ,
1524+ component .OtelRuntimeManager ,
1525+ 4 ,
1526+ restartTimestamp )
15491527 }
15501528
1529+ // Switch back to process monitoring
1530+ processMonUpdateReq := kibana.AgentPolicyUpdateRequest {
1531+ Name : policyName ,
1532+ Namespace : info .Namespace ,
1533+ Overrides : map [string ]any {
1534+ "agent" : map [string ]any {
1535+ "monitoring" : map [string ]any {
1536+ "_runtime_experimental" : "process" ,
1537+ },
1538+ },
1539+ },
1540+ }
1541+
1542+ processMonResp , err := info .KibanaClient .UpdatePolicy (ctx ,
1543+ policyResponse .ID , processMonUpdateReq )
1544+ require .NoError (t , err )
1545+
1546+ processTimestamp := time .Now ().UTC ().Format ("2006-01-02T15:04:05.000Z" )
1547+
1548+ // wait until policy is applied
1549+ policyCheck (processMonResp .Revision )
1550+
1551+ // make sure running and logs are making it to ES
1552+ healthCheck (ctx ,
1553+ "control checkin v2 protocol has chunking enabled" ,
1554+ component .ProcessRuntimeManager ,
1555+ 3 ,
1556+ processTimestamp )
1557+
15511558 // duplicate check
1552- dupCtx , dupCancel := context .WithTimeout (ctx , 10 * time .Second )
1553- defer dupCancel ()
15541559 rawQuery := map [string ]any {
15551560 "runtime_mappings" : map [string ]any {
15561561 "log.offset" : map [string ]any {
@@ -1573,6 +1578,7 @@ func TestMonitoringNoDuplicates(t *testing.T) {
15731578 "aggs" : map [string ]any {
15741579 "duplicates" : map [string ]any {
15751580 "multi_terms" : map [string ]any {
1581+ "size" : 500 ,
15761582 "min_doc_count" : 2 ,
15771583 "terms" : []map [string ]any {
15781584 {"field" : "log.file.fingerprint" },
@@ -1592,12 +1598,13 @@ func TestMonitoringNoDuplicates(t *testing.T) {
15921598 es .Search .WithSize (0 ),
15931599 es .Search .WithBody (& buf ),
15941600 es .Search .WithPretty (),
1595- es .Search .WithContext (dupCtx ),
1601+ es .Search .WithContext (ctx ),
15961602 )
15971603 require .NoError (t , err )
15981604 require .Falsef (t , (res .StatusCode >= http .StatusMultipleChoices || res .StatusCode < http .StatusOK ), "status should be 2xx was: %d" , res .StatusCode )
15991605 resultBuf , err := io .ReadAll (res .Body )
16001606 require .NoError (t , err )
1607+
16011608 aggResults := map [string ]any {}
16021609 err = json .Unmarshal (resultBuf , & aggResults )
16031610 aggs , ok := aggResults ["aggregations" ].(map [string ]any )
@@ -1606,7 +1613,15 @@ func TestMonitoringNoDuplicates(t *testing.T) {
16061613 require .Truef (t , ok , "'duplicates' wasn't a map[string]any, result was %s" , string (resultBuf ))
16071614 buckets , ok := dups ["buckets" ].([]any )
16081615 require .Truef (t , ok , "'buckets' wasn't a []any, result was %s" , string (resultBuf ))
1609- require .Equalf (t , 0 , len (buckets ), "buckets contained duplicates, result was %s" , string (resultBuf ))
1616+
1617+ hits , ok := aggResults ["hits" ].(map [string ]any )
1618+ require .Truef (t , ok , "'hits' wasn't a map[string]any, result was %s" , string (resultBuf ))
1619+ total , ok := hits ["total" ].(map [string ]any )
1620+ require .Truef (t , ok , "'total' wasn't a map[string]any, result was %s" , string (resultBuf ))
1621+ value , ok := total ["value" ].(float64 )
1622+ require .Truef (t , ok , "'total' wasn't an int, result was %s" , string (resultBuf ))
1623+
1624+ require .Equalf (t , 0 , len (buckets ), "len(buckets): %d, hits.total.value: %d, result was %s" , len (buckets ), value , string (resultBuf ))
16101625
16111626 // Uninstall
16121627 combinedOutput , err = fut .Uninstall (ctx , & atesting.UninstallOpts {Force : true })
0 commit comments