Skip to content

Commit

Permalink
[Entity Analytics] Adding changes for event.ingested in riskScore and…
Browse files Browse the repository at this point in the history
… assetCriticality (elastic#203975)

## Summary

This pull request introduces changes to the asset criticality and risk
score data clients to utilize a new ingest pipeline for adding event
timestamps. The changes include the addition of utility functions for
creating and retrieving the ingest pipeline, updates to the field
mappings, and modifications to the data clients to integrate the new
pipeline.

### Ingest Pipeline Integration:

*
[`x-pack/plugins/security_solution/server/lib/entity_analytics/utils/create_ingest_pipeline.ts`](diffhunk://#diff-0011b86f0b91d8a6bb1c91ea0ff59830905e90436af01f5893b14d054b4e69f5R1-R50):
Added new utility functions `getIngestPipelineName` and
`createIngestTimestampPipeline` to manage the ingest pipeline for adding
event timestamps.

### Asset Criticality Data Client:

*
[`x-pack/plugins/security_solution/server/lib/entity_analytics/asset_criticality/asset_criticality_data_client.ts`](diffhunk://#diff-31b32ff8816e16c97f0d702225b9e13d7417331850c88b33435079419db94b62R26-R29):
Imported the new utility functions and updated the `init` method to
create the ingest timestamp pipeline. Additionally, modified the index
settings to use the new ingest pipeline.
### Risk Score Data Client:

*
[`x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/risk_score_data_client.ts`](diffhunk://#diff-5a33102890d8bc4948e5d3d7df3901c23146bde3dee7bd15563bd1169358e43aR43-R46):
Imported the new utility functions, updated the `init` method to create
the ingest timestamp pipeline, and modified the index settings to use
the new ingest pipeline.

### Field Mapping Updates:

*
[`x-pack/plugins/security_solution/server/lib/entity_analytics/asset_criticality/constants.ts`](diffhunk://#diff-d0e75953a3b6d040a296cb4cd7513428a18b152808231819f28d7329dc86a92cL20-R20):
Added the field mapping `event.ingested` for asset criticality.
*
[`x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/configurations.ts`](diffhunk://#diff-43b70e77669c1f7c9608f8d26095db18f6fa0380beeb5990701656ae920602d7L102-R102):
Added the field mapping `event.ingested` for risk score.


### Checklist

Check the PR satisfies following conditions. 

Reviewers should verify this PR satisfies this list as well.

- [ ] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios
- [ ] [Flaky Test
Runner](https://ci-stats.kibana.dev/trigger_flaky_test_runner/1) was
used on any tests changed
- [ ] The PR description includes the appropriate Release Notes section,
and the correct `release_note:*` label is applied per the
[guidelines](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)

### Testing steps :

- Checkout main branch
- Setup and start kibana
- Enable Risk Engine

- Execute below query, result should not have event.ingested
```
GET /*asset-criticality.asset-criticality-*/_mapping
GET /*risk-score.risk-score-latest-*/_mapping
```

- Add data using document generator
- Execute below query
```
GET /*asset-criticality.asset-criticality-*/_search
{
    "_source": ["event.ingested", "@timestamp"],
    "query": {
    "exists": {
      "field": "event.ingested"
    }
  }
}
```
### Expected output
```
{
  "took": 0,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 0,
      "relation": "eq"
    },
    "max_score": null,
    "hits": []
  }
}
```
- Same output as above for below query too
```
GET /*risk-score.risk-score-latest-*/_search
{
    "_source": ["event.ingested", "@timestamp"],
    "query": {
    "exists": {
      "field": "event.ingested"
    }
  }
}
```

- The below query should give results but `event.ingested` should not be
present in the results

```
GET /*asset-criticality.asset-criticality-*/_search
{
    "_source": ["@timestamp", "event.ingested"]
}

GET /*risk-score.risk-score-latest-*/_search
{
    "_source": ["@timestamp", "event.ingested"]
}
```

### Expected output

```
{
  "took": 0,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 3,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": ".asset-criticality.asset-criticality-default",
        "_id": "user.name:user-001",
        "_score": 1,
        "_source": {
          "@timestamp": "2025-01-09T14:20:24.221Z"
        }
      },
      {
        "_index": ".asset-criticality.asset-criticality-default",
        "_id": "user.name:user-002",
        "_score": 1,
        "_source": {
          "@timestamp": "2025-01-09T14:20:24.221Z"
        }
      },
      {
        "_index": ".asset-criticality.asset-criticality-default",
        "_id": "host.name:host-001",
        "_score": 1,
        "_source": {
          "@timestamp": "2025-01-09T14:20:24.222Z"
        }
      }
    ]
  }
}
```

### - Checkout this PR and restart Kibana

(Try running the Risk Score engine using the Run Engine option if you
have added data after enabling the Risk Engine)

All the above queries should contain data/results with `event.ingested`
as below :

```
{
  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 11,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "risk-score.risk-score-latest-default",
        "_id": "X19B5MlF3Loy86u-U-mC6BrCwAAAAAAA",
        "_score": 1,
        "_source": {
          "event": {
            "ingested": "2025-01-10T07:51:30.757784Z"
          },
          "@timestamp": "2025-01-10T07:51:30.363Z"
        }
      },
      {
        "_index": "risk-score.risk-score-latest-default",
        "_id": "X19DYvlD0CQ6h1VE9n-ScWnjqwAAAAAA",
        "_score": 1,
        "_source": {
          "event": {
            "ingested": "2025-01-10T07:51:30.757971Z"
          },
          "@timestamp": "2025-01-10T07:51:30.363Z"
        }
      },
      {
        "_index": "risk-score.risk-score-latest-default",
        "_id": "X19DQLgfYH-Zr4z01uVnAImoTgAAAAAA",
        "_score": 1,
        "_source": {
          "event": {
            "ingested": "2025-01-10T07:51:30.758039Z"
          },
          "@timestamp": "2025-01-10T07:51:30.363Z"
        }
      },
      {
        "_index": "risk-score.risk-score-latest-default",
        "_id": "X19IqrXmM5aDk2qno3rUL5TI3gAAAAAA",
        "_score": 1,
        "_source": {
          "event": {
            "ingested": "2025-01-10T07:51:30.758108Z"
          },
          "@timestamp": "2025-01-10T07:51:30.363Z"
        }
      },
      {
        "_index": "risk-score.risk-score-latest-default",
        "_id": "X19K9okuf9lAZcd2Y7t-QFWJAQAAAAAA",
        "_score": 1,
        "_source": {
          "event": {
            "ingested": "2025-01-10T07:51:30.758163Z"
          },
          "@timestamp": "2025-01-10T07:51:30.363Z"
        }
      },
      {
        "_index": "risk-score.risk-score-latest-default",
        "_id": "X19K95CQyZSvT-ZQVwx_6jJTzgAAAAAA",
        "_score": 1,
        "_source": {
          "event": {
            "ingested": "2025-01-10T07:51:30.758222Z"
          },
          "@timestamp": "2025-01-10T07:51:30.363Z"
        }
      },
      {
        "_index": "risk-score.risk-score-latest-default",
        "_id": "X19LMkPHJ-L99JamiiYkt9WB1wAAAAAA",
        "_score": 1,
        "_source": {
          "event": {
            "ingested": "2025-01-10T07:51:30.758272Z"
          },
          "@timestamp": "2025-01-10T07:51:30.363Z"
        }
      },
      {
        "_index": "risk-score.risk-score-latest-default",
        "_id": "X19M4c0tojXVhK5aOwVA46RNVgAAAAAA",
        "_score": 1,
        "_source": {
          "event": {
            "ingested": "2025-01-10T07:51:30.758462Z"
          },
          "@timestamp": "2025-01-10T07:51:30.363Z"
        }
      },
      {
        "_index": "risk-score.risk-score-latest-default",
        "_id": "X19M7j9nZmY4g5bEDPJc20zNHgAAAAAA",
        "_score": 1,
        "_source": {
          "event": {
            "ingested": "2025-01-10T07:51:30.758573Z"
          },
          "@timestamp": "2025-01-10T07:51:30.363Z"
        }
      },
      {
        "_index": "risk-score.risk-score-latest-default",
        "_id": "X19TVbTGATHGj2iG_rFIUx2_1QAAAAAA",
        "_score": 1,
        "_source": {
          "event": {
            "ingested": "2025-01-10T07:51:30.758629Z"
          },
          "@timestamp": "2025-01-10T07:51:30.363Z"
        }
      }
    ]
  }
}
```


```
{
  "took": 0,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 3,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": ".asset-criticality.asset-criticality-default",
        "_id": "user.name:user-001",
        "_score": 1,
        "_source": {
          "@timestamp": "2025-01-10T07:50:19.522Z",
          "event": {
            "ingested": "2025-01-10T07:50:19.532122Z"
          }
        }
      },
      {
        "_index": ".asset-criticality.asset-criticality-default",
        "_id": "user.name:user-002",
        "_score": 1,
        "_source": {
          "@timestamp": "2025-01-10T07:50:19.523Z",
          "event": {
            "ingested": "2025-01-10T07:50:19.535465Z"
          }
        }
      },
      {
        "_index": ".asset-criticality.asset-criticality-default",
        "_id": "host.name:host-001",
        "_score": 1,
        "_source": {
          "@timestamp": "2025-01-10T07:50:19.523Z",
          "event": {
            "ingested": "2025-01-10T07:50:19.535536Z"
          }
        }
      }
    ]
  }
}
```
The ingest pipeline should also be visible as below 

```
GET /_ingest/pipeline/entity_analytics_create_eventIngest_from_timestamp-pipeline*
```


![image](https://github.com/user-attachments/assets/42d4167b-575c-43ea-9219-34b31ded12fb)

---------

Co-authored-by: kibanamachine <[email protected]>
  • Loading branch information
abhishekbhatia1710 and kibanamachine authored Jan 17, 2025
1 parent 7987527 commit e266c83
Show file tree
Hide file tree
Showing 17 changed files with 618 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ describe('AssetCriticalityDataClient', () => {
index: '.asset-criticality.asset-criticality-default',
mappings: {
_meta: {
version: 2,
version: 3,
},
dynamic: 'strict',
properties: {
Expand All @@ -56,6 +56,13 @@ describe('AssetCriticalityDataClient', () => {
criticality_level: {
type: 'keyword',
},
event: {
properties: {
ingested: {
type: 'date',
},
},
},
'@timestamp': {
type: 'date',
ignore_malformed: false,
Expand Down Expand Up @@ -114,6 +121,9 @@ describe('AssetCriticalityDataClient', () => {
},
},
},
settings: {
default_pipeline: 'entity_analytics_create_eventIngest_from_timestamp-pipeline-default',
},
},
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import {
import { AssetCriticalityAuditActions } from './audit';
import { AUDIT_CATEGORY, AUDIT_OUTCOME, AUDIT_TYPE } from '../audit';
import { getImplicitEntityFields } from './helpers';
import {
getIngestPipelineName,
createEventIngestedFromTimestamp,
} from '../utils/create_ingest_pipeline';

interface AssetCriticalityClientOpts {
logger: Logger;
Expand Down Expand Up @@ -62,6 +66,7 @@ export class AssetCriticalityDataClient {
* Initialize asset criticality resources.
*/
public async init() {
await createEventIngestedFromTimestamp(this.options.esClient, this.options.namespace);
await this.createOrUpdateIndex();

this.options.auditLogger?.log({
Expand Down Expand Up @@ -90,6 +95,9 @@ export class AssetCriticalityDataClient {
version: ASSET_CRITICALITY_MAPPINGS_VERSIONS,
},
},
settings: {
default_pipeline: getIngestPipelineName(this.options.namespace),
},
},
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,36 @@ export class AssetCriticalityMigrationClient {
}
);
};

public copyTimestampToEventIngestedForAssetCriticality = (abortSignal?: AbortSignal) => {
return this.options.esClient.updateByQuery(
{
index: this.assetCriticalityDataClient.getIndex(),
conflicts: 'proceed',
ignore_unavailable: true,
allow_no_indices: true,
body: {
query: {
bool: {
must_not: {
exists: {
field: 'event.ingested',
},
},
},
},
script: {
source: 'ctx._source.event.ingested = ctx._source.@timestamp',
lang: 'painless',
},
},
},
{
requestTimeout: '5m',
retryOnTimeout: true,
maxRetries: 2,
signal: abortSignal,
}
);
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { ASSET_CRITICALITY_MAPPINGS_VERSIONS, assetCriticalityFieldMap } from '.

describe('asset criticality - constants', () => {
it("please bump 'ASSET_CRITICALITY_MAPPINGS_VERSIONS' when mappings change", () => {
expect(ASSET_CRITICALITY_MAPPINGS_VERSIONS).toEqual(2);
expect(ASSET_CRITICALITY_MAPPINGS_VERSIONS).toEqual(3);
expect(assetCriticalityFieldMap).toMatchInlineSnapshot(`
Object {
"@timestamp": Object {
Expand All @@ -27,6 +27,11 @@ describe('asset criticality - constants', () => {
"required": false,
"type": "keyword",
},
"event.ingested": Object {
"array": false,
"required": false,
"type": "date",
},
"host.asset.criticality": Object {
"array": false,
"required": false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const assetCriticalityMapping = {
};

// Upgrade this value to force a mappings update on the next Kibana startup
export const ASSET_CRITICALITY_MAPPINGS_VERSIONS = 2;
export const ASSET_CRITICALITY_MAPPINGS_VERSIONS = 3;

export const assetCriticalityFieldMap: FieldMap = {
'@timestamp': {
Expand Down Expand Up @@ -58,6 +58,11 @@ export const assetCriticalityFieldMap: FieldMap = {
required: false,
},
'user.asset.criticality': assetCriticalityMapping,
'event.ingested': {
type: 'date',
array: false,
required: false,
},
'service.name': {
type: 'keyword',
array: false,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { EntityAnalyticsMigrationsParams } from '.';
import { AssetCriticalityMigrationClient } from '../asset_criticality/asset_criticality_migration_client';

const TASK_TYPE = 'security-solution-ea-asset-criticality-copy-timestamp-to-event-ingested';
const TASK_ID = `${TASK_TYPE}-task-id`;
const TASK_TIMEOUT = '15m';
const TASK_SCOPE = ['securitySolution'];

export const assetCrticalityCopyTimestampToEventIngested = async ({
auditLogger,
taskManager,
logger,
getStartServices,
}: EntityAnalyticsMigrationsParams) => {
if (!taskManager) {
return;
}

logger.debug(`Register task "${TASK_TYPE}"`);

taskManager.registerTaskDefinitions({
[TASK_TYPE]: {
title: `Copy Asset Criticality @timestamp value to events.ingested`,
timeout: TASK_TIMEOUT,
createTaskRunner: createMigrationTask({ auditLogger, logger, getStartServices }),
},
});

const [_, depsStart] = await getStartServices();
const taskManagerStart = depsStart.taskManager;

if (taskManagerStart) {
logger.debug(`Task scheduled: "${TASK_TYPE}"`);

const now = new Date();
try {
await taskManagerStart.ensureScheduled({
id: TASK_ID,
taskType: TASK_TYPE,
scheduledAt: now,
runAt: now,
scope: TASK_SCOPE,
params: {},
state: {},
});
} catch (e) {
logger.error(`Error scheduling ${TASK_ID}, received ${e.message}`);
}
}
};

export const createMigrationTask =
({
getStartServices,
logger,
auditLogger,
}: Pick<EntityAnalyticsMigrationsParams, 'getStartServices' | 'logger' | 'auditLogger'>) =>
() => {
let abortController: AbortController;
return {
run: async () => {
abortController = new AbortController();
const [coreStart] = await getStartServices();
const esClient = coreStart.elasticsearch.client.asInternalUser;
const assetCrticalityClient = new AssetCriticalityMigrationClient({
esClient,
logger,
auditLogger,
});

const assetCriticalityResponse =
await assetCrticalityClient.copyTimestampToEventIngestedForAssetCriticality(
abortController.signal
);

const failures = assetCriticalityResponse.failures?.map((failure) => failure.cause);
const hasFailures = failures && failures?.length > 0;

logger.info(
`Task "${TASK_TYPE}" finished. Updated documents: ${
assetCriticalityResponse.updated
}, failures: ${hasFailures ? failures.join('\n') : 0}`
);
},

cancel: async () => {
abortController.abort();
logger.debug(`Task cancelled: "${TASK_TYPE}"`);
},
};
};
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import type { AuditLogger, Logger, StartServicesAccessor } from '@kbn/core/serve
import type { TaskManagerSetupContract } from '@kbn/task-manager-plugin/server';
import type { StartPlugins } from '../../../plugin';
import { scheduleAssetCriticalityEcsCompliancyMigration } from '../asset_criticality/migrations/schedule_ecs_compliancy_migration';
import { assetCrticalityCopyTimestampToEventIngested } from './asset_criticality_copy_timestamp_to_event_ingested';
import { riskScoreCopyTimestampToEventIngested } from './risk_score_copy_timestamp_to_event_ingested';
import { updateAssetCriticalityMappings } from '../asset_criticality/migrations/update_asset_criticality_mappings';
import { updateRiskScoreMappings } from '../risk_engine/migrations/update_risk_score_mappings';

Expand Down Expand Up @@ -43,3 +45,21 @@ export const scheduleEntityAnalyticsMigration = async (params: EntityAnalyticsMi
await scheduleAssetCriticalityEcsCompliancyMigration({ ...params, logger: scopedLogger });
await updateRiskScoreMappings({ ...params, logger: scopedLogger });
};

export const scheduleAssetCriticalityCopyTimestampToEventIngested = async (
params: EntityAnalyticsMigrationsParams
) => {
const scopedLogger = params.logger.get(
'entityAnalytics.assetCriticality.copyTimestampToEventIngested'
);

await assetCrticalityCopyTimestampToEventIngested({ ...params, logger: scopedLogger });
};

export const scheduleRiskScoreCopyTimestampToEventIngested = async (
params: EntityAnalyticsMigrationsParams
) => {
const scopedLogger = params.logger.get('entityAnalytics.riskScore.copyTimestampToEventIngested');

await riskScoreCopyTimestampToEventIngested({ ...params, logger: scopedLogger });
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { EntityAnalyticsMigrationsParams } from '.';
import { RiskScoreDataClient } from '../risk_score/risk_score_data_client';
import { buildScopedInternalSavedObjectsClientUnsafe } from '../risk_score/tasks/helpers';

const TASK_TYPE = 'security-solution-ea-risk-score-copy-timestamp-to-event-ingested';
const TASK_ID = `${TASK_TYPE}-task-id`;
const TASK_TIMEOUT = '15m';
const TASK_SCOPE = ['securitySolution'];

export const riskScoreCopyTimestampToEventIngested = async ({
auditLogger,
taskManager,
logger,
getStartServices,
}: EntityAnalyticsMigrationsParams) => {
if (!taskManager) {
return;
}

logger.debug(`Register task "${TASK_TYPE}"`);

taskManager.registerTaskDefinitions({
[TASK_TYPE]: {
title: `Copy Risk Score @timestamp value to events.ingested`,
timeout: TASK_TIMEOUT,
createTaskRunner: createMigrationTask({ auditLogger, logger, getStartServices }),
},
});

const [_, depsStart] = await getStartServices();
const taskManagerStart = depsStart.taskManager;

if (taskManagerStart) {
logger.debug(`Task scheduled: "${TASK_TYPE}"`);

const now = new Date();
try {
await taskManagerStart.ensureScheduled({
id: TASK_ID,
taskType: TASK_TYPE,
scheduledAt: now,
runAt: now,
scope: TASK_SCOPE,
params: {},
state: {},
});
} catch (e) {
logger.error(`Error scheduling ${TASK_ID}, received ${e.message}`);
}
}
};

export const createMigrationTask =
({
getStartServices,
logger,
auditLogger,
}: Pick<EntityAnalyticsMigrationsParams, 'getStartServices' | 'logger' | 'auditLogger'>) =>
() => {
let abortController: AbortController;
return {
run: async () => {
abortController = new AbortController();
const [coreStart] = await getStartServices();
const esClient = coreStart.elasticsearch.client.asInternalUser;
const soClient = buildScopedInternalSavedObjectsClientUnsafe({ coreStart, namespace: '*' });

const riskScoreClient = new RiskScoreDataClient({
esClient,
logger,
auditLogger,
namespace: '*',
soClient,
kibanaVersion: '*',
});
const riskScoreResponse = await riskScoreClient.copyTimestampToEventIngestedForRiskScore(
abortController.signal
);
const failures = riskScoreResponse.failures?.map((failure) => failure.cause);
const hasFailures = failures && failures?.length > 0;

logger.info(
`Task "${TASK_TYPE}" finished. Updated documents: ${
riskScoreResponse.updated
}, failures: ${hasFailures ? failures.join('\n') : 0}`
);
},

cancel: async () => {
abortController.abort();
logger.debug(`Task cancelled: "${TASK_TYPE}"`);
},
};
};
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,19 @@ describe('#getDefaultRiskEngineConfiguration', () => {
const namespace = 'default';
const config = getDefaultRiskEngineConfiguration({ namespace });

expect(config._meta.mappingsVersion).toEqual(2);
expect(config._meta.mappingsVersion).toEqual(3);
expect(riskScoreFieldMap).toMatchInlineSnapshot(`
Object {
"@timestamp": Object {
"array": false,
"required": false,
"type": "date",
},
"event.ingested": Object {
"array": false,
"required": false,
"type": "date",
},
"host.name": Object {
"array": false,
"required": false,
Expand Down
Loading

0 comments on commit e266c83

Please sign in to comment.