Skip to content

Commit

Permalink
[streams] wired stream lifecycle (elastic#207083)
Browse files Browse the repository at this point in the history
Allows configuration of a wired stream's retention (root stream
included) with dsl or ilm policy. The UI changes will be implemented
separately, as well as the support for classic streams.

A retention set on a stream is inherited to its eligible children when
updated (ie not overriden).
A lifecycle can be set with a `PUT kbn:/api/streams/:id` call:

```
// sets dlm policy with optional data_retention - no retention means data is kept indefinitely
{ "stream": { "ingest": { "lifecycle": { "dsl": { "data_retention": "7d" } }, ... } } }

// sets ilm policy
{ "stream": { "ingest": { "lifecycle": { "ilm": { "policy": "my-policy" } }, ... } } }

// removes any dlm/ilm set explicitly set on a stream and inherits the lifecycle of the closest parent
// note that this cannot be applied to root stream
{ "stream": { "ingest": { "lifecycle": { "inherit": {} }, ... } } }

// disables a lifecycle and stops inheriting parent's lifecycle and is also propagated to eligible children. this is effectively similar to dsl and ilm regarding the inheritance
{ "stream": { "ingest": { "lifecycle": { "disabled": {} }, ... } } }
```

When we set a retention, we update the targeted stream component
template and data stream such as any new backing index created after the
update will pick up for the new lifecycle, but we have to do extra work
to apply the lifecycle to the existing backing indices.

---------

Co-authored-by: kibanamachine <[email protected]>
  • Loading branch information
klacabane and kibanamachine authored Jan 24, 2025
1 parent 02455ff commit 6370aa2
Show file tree
Hide file tree
Showing 29 changed files with 890 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { ingestStream } from './ingest_stream';

export const ingestReadStream = {
...ingestStream,
lifecycle: { type: 'dlm' },
effective_lifecycle: { dsl: { data_retention: '7d' } },
inherited_fields: {
'@timestamp': {
type: 'date',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { wiredStream } from './wired_stream';

export const wiredReadStream = {
...wiredStream,
lifecycle: { type: 'dlm' },
effective_lifecycle: { dsl: { data_retention: '7d' }, from: 'logs.nginx' },
inherited_fields: {
'@timestamp': {
type: 'date',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

export const wiredStreamConfig = {
ingest: {
lifecycle: { dsl: { data_retention: '7d' } },
processing: [
{
config: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ import {
wiredIngestSchema,
wiredStreamDefinitionSchemaBase,
} from './base';
import { ElasticsearchAsset, elasticsearchAssetSchema } from './common';
import { createIsNarrowSchema, createAsSchemaOrThrow } from '../../helpers';
import {
ElasticsearchAsset,
IngestStreamLifecycle,
elasticsearchAssetSchema,
InheritedIngestStreamLifecycle,
ingestStreamLifecycleSchema,
} from './common';
import { createIsNarrowSchema, createAsSchemaOrThrow } from '../../helpers';
inheritedIngestStreamLifecycleSchema,
} from './lifecycle';

/**
* Ingest get response
Expand Down Expand Up @@ -71,18 +72,16 @@ const ingestUpsertRequestSchema: z.Schema<IngestUpsertRequest> = z.union([
/**
* Stream get response
*/
interface IngestStreamGetResponseBase extends StreamGetResponseBase {
lifecycle: IngestStreamLifecycle;
}

interface WiredStreamGetResponse extends IngestStreamGetResponseBase {
interface WiredStreamGetResponse extends StreamGetResponseBase {
stream: Omit<WiredStreamDefinition, 'name'>;
inherited_fields: InheritedFieldDefinition;
effective_lifecycle: InheritedIngestStreamLifecycle;
}

interface UnwiredStreamGetResponse extends IngestStreamGetResponseBase {
interface UnwiredStreamGetResponse extends StreamGetResponseBase {
stream: Omit<UnwiredStreamDefinition, 'name'>;
elasticsearch_assets: ElasticsearchAsset[];
effective_lifecycle: IngestStreamLifecycle;
}

type IngestStreamGetResponse = WiredStreamGetResponse | UnwiredStreamGetResponse;
Expand Down Expand Up @@ -120,26 +119,21 @@ const ingestStreamUpsertRequestSchema: z.Schema<IngestStreamUpsertRequest> = z.u
unwiredStreamUpsertRequestSchema,
]);

const ingestStreamGetResponseSchemaBase: z.Schema<IngestStreamGetResponseBase> = z.intersection(
streamGetResponseSchemaBase,
z.object({
lifecycle: ingestStreamLifecycleSchema,
})
);

const wiredStreamGetResponseSchema: z.Schema<WiredStreamGetResponse> = z.intersection(
ingestStreamGetResponseSchemaBase,
streamGetResponseSchemaBase,
z.object({
stream: wiredStreamDefinitionSchemaBase,
inherited_fields: inheritedFieldDefinitionSchema,
effective_lifecycle: inheritedIngestStreamLifecycleSchema,
})
);

const unwiredStreamGetResponseSchema: z.Schema<UnwiredStreamGetResponse> = z.intersection(
ingestStreamGetResponseSchemaBase,
streamGetResponseSchemaBase,
z.object({
stream: unwiredStreamDefinitionSchemaBase,
elasticsearch_assets: z.array(elasticsearchAssetSchema),
effective_lifecycle: ingestStreamLifecycleSchema,
})
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { StreamDefinitionBase } from '../base';
import { FieldDefinition, fieldDefinitionSchema } from './fields';
import { ProcessorDefinition, processorDefinitionSchema } from './processors';
import { RoutingDefinition, routingDefinitionSchema } from './routing';
import { IngestStreamLifecycle, ingestStreamLifecycleSchema } from './lifecycle';

interface IngestBase {
processing: ProcessorDefinition[];
Expand All @@ -21,6 +22,7 @@ interface WiredIngest extends IngestBase {
wired: {
fields: FieldDefinition;
};
lifecycle: IngestStreamLifecycle;
}

interface UnwiredIngest extends IngestBase {
Expand Down Expand Up @@ -63,6 +65,7 @@ const wiredIngestSchema: z.Schema<WiredIngest> = z.intersection(
wired: z.object({
fields: fieldDefinitionSchema,
}),
lifecycle: ingestStreamLifecycleSchema,
})
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,3 @@ export const elasticsearchAssetSchema: z.Schema<ElasticsearchAsset> = z.object({
type: z.enum(['ingest_pipeline', 'component_template', 'index_template', 'data_stream']),
id: NonEmptyString,
});

export interface IngestStreamLifecycleDLM {
type: 'dlm';
data_retention?: string;
}

export interface IngestStreamLifecycleILM {
type: 'ilm';
policy: string;
}

export type IngestStreamLifecycle = IngestStreamLifecycleDLM | IngestStreamLifecycleILM;

export const ingestStreamLifecycleSchema: z.Schema<IngestStreamLifecycle> = z.discriminatedUnion(
'type',
[
z.object({ type: z.literal('dlm'), data_retention: z.optional(NonEmptyString) }),
z.object({ type: z.literal('ilm'), policy: NonEmptyString }),
]
);
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ export * from './processors';
export * from './conditions';
export * from './routing';
export * from './common';
export * from './lifecycle';
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { z } from '@kbn/zod';
import { NonEmptyString } from '@kbn/zod-helpers';
import { createIsNarrowSchema } from '../../../helpers';

export interface IngestStreamLifecycleDSL {
dsl: {
data_retention?: string;
};
}

export interface IngestStreamLifecycleILM {
ilm: {
policy: string;
};
}

export interface IngestStreamLifecycleInherit {
inherit: {};
}

export interface IngestStreamLifecycleDisabled {
disabled: {};
}

export type IngestStreamLifecycle =
| IngestStreamLifecycleDSL
| IngestStreamLifecycleILM
| IngestStreamLifecycleInherit
| IngestStreamLifecycleDisabled;

const dslLifecycleSchema = z.object({
dsl: z.object({ data_retention: z.optional(NonEmptyString) }),
});
const ilmLifecycleSchema = z.object({ ilm: z.object({ policy: NonEmptyString }) });
const inheritLifecycleSchema = z.object({ inherit: z.strictObject({}) });
const disabledLifecycleSchema = z.object({ disabled: z.strictObject({}) });

export const ingestStreamLifecycleSchema: z.Schema<IngestStreamLifecycle> = z.union([
dslLifecycleSchema,
ilmLifecycleSchema,
inheritLifecycleSchema,
disabledLifecycleSchema,
]);

export type InheritedIngestStreamLifecycle = IngestStreamLifecycle & { from: string };

export const inheritedIngestStreamLifecycleSchema: z.Schema<InheritedIngestStreamLifecycle> =
ingestStreamLifecycleSchema.and(z.object({ from: NonEmptyString }));

export const isDslLifecycle = createIsNarrowSchema(ingestStreamLifecycleSchema, dslLifecycleSchema);

export const isIlmLifecycle = createIsNarrowSchema(ingestStreamLifecycleSchema, ilmLifecycleSchema);

export const isInheritLifecycle = createIsNarrowSchema(
ingestStreamLifecycleSchema,
inheritLifecycleSchema
);

export const isDisabledLifecycle = createIsNarrowSchema(
ingestStreamLifecycleSchema,
disabledLifecycleSchema
);
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,18 @@
import { z } from '@kbn/zod';
import { NonEmptyString } from '@kbn/zod-helpers';
import {
IngestStreamLifecycle,
InheritedFieldDefinition,
InheritedIngestStreamLifecycle,
UnwiredStreamDefinition,
WiredStreamDefinition,
ingestStreamLifecycleSchema,
inheritedFieldDefinitionSchema,
inheritedIngestStreamLifecycleSchema,
unwiredStreamDefinitionSchema,
wiredStreamDefinitionSchema,
} from './ingest';
import {
ElasticsearchAsset,
IngestStreamLifecycle,
elasticsearchAssetSchema,
ingestStreamLifecycleSchema,
} from './ingest/common';
import { ElasticsearchAsset, elasticsearchAssetSchema } from './ingest/common';
import { createIsNarrowSchema } from '../helpers';

/**
Expand All @@ -31,16 +30,17 @@ interface ReadStreamDefinitionBase {
name: string;
dashboards: string[];
elasticsearch_assets: ElasticsearchAsset[];
lifecycle: IngestStreamLifecycle;
inherited_fields: InheritedFieldDefinition;
}

interface WiredReadStreamDefinition extends ReadStreamDefinitionBase {
stream: WiredStreamDefinition;
effective_lifecycle: InheritedIngestStreamLifecycle;
}

interface UnwiredReadStreamDefinition extends ReadStreamDefinitionBase {
stream: UnwiredStreamDefinition;
effective_lifecycle: IngestStreamLifecycle;
}

type ReadStreamDefinition = WiredReadStreamDefinition | UnwiredReadStreamDefinition;
Expand All @@ -50,20 +50,21 @@ const readStreamDefinitionSchemaBase: z.Schema<ReadStreamDefinitionBase> = z.obj
dashboards: z.array(NonEmptyString),
elasticsearch_assets: z.array(elasticsearchAssetSchema),
inherited_fields: inheritedFieldDefinitionSchema,
lifecycle: ingestStreamLifecycleSchema,
});

const wiredReadStreamDefinitionSchema: z.Schema<WiredReadStreamDefinition> = z.intersection(
readStreamDefinitionSchemaBase,
z.object({
stream: wiredStreamDefinitionSchema,
effective_lifecycle: inheritedIngestStreamLifecycleSchema,
})
);

const unwiredReadStreamDefinitionSchema: z.Schema<UnwiredReadStreamDefinition> = z.intersection(
readStreamDefinitionSchemaBase,
z.object({
stream: unwiredStreamDefinitionSchema,
effective_lifecycle: ingestStreamLifecycleSchema,
})
);

Expand Down
Loading

0 comments on commit 6370aa2

Please sign in to comment.