Skip to content

Commit

Permalink
[Streams 🌊] Schema editor UI (elastic#202372)
Browse files Browse the repository at this point in the history
## Summary

Implements https://github.com/elastic/observability-dev/issues/4133.

Opening this up for a first pass as the PR is getting quite big. I've
listed below some things that can be improved in further iterations.

## High level notes 

- Support for `format` has been added to the field definition
- UI: 
  - View inherited, mapped, unmapped fields.
  - Edit mapped and unmapped fields.
  - Map unmapped and unmap mapped fields
  - Simulation / preview results
  - Filtering

## Followups

- Filter dropdowns (on the right):

![Screenshot 2024-12-05 at 19 31
05](https://github.com/user-attachments/assets/31f22cd6-bf39-49bf-ba1c-1e94e42ebbd6)

- We could potentially use a separate API for the mapping edits, rather
than the core edit route, to be more performant, but for now this is
used to create less surface area / deviation.

- State management is rudimentary right now. It could be improved with a
`useReducer` approach to avoid potential `useState` race conditions, and
then even something like xstate when things are more concrete. No state
syncs with the URL currently.
- Due to the lack of URL state syncing the "Edit in parent stream"
button doesn't navigate with things like a pre-selected field. We could
potentially co-ordinate this between the hooks in the schema editor and
detail view parent, but it's unneeded complexity at the moment.

- We could provide a lot more assistance with `format`. We could provide
a dropdown with options, and then a toggle to do custom. (Actually, it
looks like in the refined designs this is a dropdown, so I'll probably
switch this to a select with predefined options)


## Issues

- There seems to be a bug in the Elasticsearch JS library we use, calls
to `simulate.ingest` don't work as `body` is just set to `undefined`
(chasing this up). You can do the following patch in node_modules just
to get things going (run `yarn start` again):

![Screenshot 2024-12-05 at 19 52
08](https://github.com/user-attachments/assets/73e8e067-ca36-472f-81fc-f8158653f0c8)

- Runtime mappings don't seem to work with `match_only_text`:
`mapper_parsing_exception: No handler for type [match_only_text]`

## Open questions

- We might freeze changes to the root stream
- A failure on simulation doesn't do a hard block on saving changes. I
don't think it should, but open to other opinions.

## Screenshots

![Screenshot 2024-12-05 at 19 50
33](https://github.com/user-attachments/assets/bcccc223-1c65-47c5-8b06-7c79ed4004e6)
![Screenshot 2024-12-05 at 19 50
42](https://github.com/user-attachments/assets/c9cc24d6-738f-4d9a-a8a9-114403548f69)
![Screenshot 2024-12-05 at 19 50
54](https://github.com/user-attachments/assets/c19e5d37-b194-449e-ba46-6bd7eb0784cd)
![Screenshot 2024-12-05 at 19 41
15](https://github.com/user-attachments/assets/f2b4306c-1d6b-4899-914b-8796151ed2c2)
![Screenshot 2024-12-05 at 19 41
27](https://github.com/user-attachments/assets/effea5bd-b0fb-4c16-a758-a37fa25cb965)
![Screenshot 2024-12-05 at 19 49
53](https://github.com/user-attachments/assets/8f963162-9d7e-4fb2-b702-5af0d9c4f6a7)
![Screenshot 2024-12-05 at 19 50
03](https://github.com/user-attachments/assets/2c34b320-b0b2-4c16-8e78-018b461f7969)

---------

Co-authored-by: Joe Reuter <[email protected]>
Co-authored-by: kibanamachine <[email protected]>
  • Loading branch information
3 people authored and CAWilson94 committed Jan 10, 2025
1 parent 5e0edc4 commit a92633f
Show file tree
Hide file tree
Showing 29 changed files with 1,929 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ export type ProcessingDefinition = z.infer<typeof processingDefinitionSchema>;
export const fieldDefinitionSchema = z.object({
name: z.string(),
type: z.enum(['keyword', 'match_only_text', 'long', 'double', 'date', 'boolean', 'ip']),
format: z.optional(z.string()),
});

export type FieldDefinition = z.infer<typeof fieldDefinitionSchema>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ export function generateLayer(
// @timestamp can't ignore malformed dates as it's used for sorting in logsdb
(property as MappingDateProperty).ignore_malformed = false;
}
if (field.type === 'date' && field.format) {
(property as MappingDateProperty).format = field.format;
}
properties[field.name] = property;
});
return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import { listStreamsRoute } from './streams/list';
import { readStreamRoute } from './streams/read';
import { resyncStreamsRoute } from './streams/resync';
import { sampleStreamRoute } from './streams/sample';
import { schemaFieldsSimulationRoute } from './streams/schema/fields_simulation';
import { unmappedFieldsRoute } from './streams/schema/unmapped_fields';
import { streamsStatusRoutes } from './streams/settings';

export const streamsRouteRepository = {
Expand All @@ -29,6 +31,8 @@ export const streamsRouteRepository = {
...esqlRoutes,
...disableStreamsRoute,
...sampleStreamRoute,
...unmappedFieldsRoute,
...schemaFieldsSimulationRoute,
};

export type StreamsRouteRepository = typeof streamsRouteRepository;
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { z } from '@kbn/zod';
import { notFound, internal } from '@hapi/boom';
import { getFlattenedObject } from '@kbn/std';
import { fieldDefinitionSchema } from '../../../../common/types';
import { createServerRoute } from '../../create_server_route';
import { DefinitionNotFound } from '../../../lib/streams/errors';
import { checkReadAccess } from '../../../lib/streams/stream_crud';

const SAMPLE_SIZE = 200;

export const schemaFieldsSimulationRoute = createServerRoute({
endpoint: 'POST /api/streams/{id}/schema/fields_simulation',
options: {
access: 'internal',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({
path: z.object({ id: z.string() }),
body: z.object({
field_definitions: z.array(fieldDefinitionSchema),
}),
}),
handler: async ({
response,
params,
request,
logger,
getScopedClients,
}): Promise<{
status: 'unknown' | 'success' | 'failure';
simulationError: string | null;
documentsWithRuntimeFieldsApplied: unknown[] | null;
}> => {
try {
const { scopedClusterClient } = await getScopedClients({ request });

const hasAccess = await checkReadAccess({ id: params.path.id, scopedClusterClient });
if (!hasAccess) {
throw new DefinitionNotFound(`Stream definition for ${params.path.id} not found.`);
}

const propertiesForSample = Object.fromEntries(
params.body.field_definitions.map((field) => [field.name, { type: 'keyword' }])
);

const documentSamplesSearchBody = {
// Add keyword runtime mappings so we can pair with exists, this is to attempt to "miss" less documents for the simulation.
runtime_mappings: propertiesForSample,
query: {
bool: {
filter: Object.keys(propertiesForSample).map((field) => ({
exists: { field },
})),
},
},
sort: [
{
'@timestamp': {
order: 'desc',
},
},
],
size: SAMPLE_SIZE,
};

const sampleResults = await scopedClusterClient.asCurrentUser.search({
index: params.path.id,
...documentSamplesSearchBody,
});

if (
(typeof sampleResults.hits.total === 'object' && sampleResults.hits.total?.value === 0) ||
sampleResults.hits.total === 0 ||
!sampleResults.hits.total
) {
return {
status: 'unknown',
simulationError: null,
documentsWithRuntimeFieldsApplied: null,
};
}

const propertiesForSimulation = Object.fromEntries(
params.body.field_definitions.map((field) => [
field.name,
{ type: field.type, ...(field.format ? { format: field.format } : {}) },
])
);

const fieldDefinitionKeys = Object.keys(propertiesForSimulation);

const sampleResultsAsSimulationDocs = sampleResults.hits.hits.map((hit) => ({
_index: params.path.id,
_id: hit._id,
_source: Object.fromEntries(
Object.entries(getFlattenedObject(hit._source as Record<string, unknown>)).filter(
([k]) => fieldDefinitionKeys.includes(k) || k === '@timestamp'
)
),
}));

const simulationBody = {
docs: sampleResultsAsSimulationDocs,
component_template_substitutions: {
[`${params.path.id}@stream.layer`]: {
template: {
mappings: {
dynamic: 'strict',
properties: propertiesForSimulation,
},
},
},
},
};

// TODO: We should be using scopedClusterClient.asCurrentUser.simulate.ingest() but the ES JS lib currently has a bug. The types also aren't available yet, so we use any.
const simulation = (await scopedClusterClient.asCurrentUser.transport.request({
method: 'POST',
path: `_ingest/_simulate`,
body: simulationBody,
})) as any;

const hasErrors = simulation.docs.some((doc: any) => doc.doc.error !== undefined);

if (hasErrors) {
const documentWithError = simulation.docs.find((doc: any) => {
return doc.doc.error !== undefined;
});

return {
status: 'failure',
simulationError: JSON.stringify(
// Use the first error as a representative error
documentWithError.doc.error
),
documentsWithRuntimeFieldsApplied: null,
};
}

// Convert the field definitions to a format that can be used in runtime mappings (match_only_text -> keyword)
const propertiesCompatibleWithRuntimeMappings = Object.fromEntries(
params.body.field_definitions.map((field) => [
field.name,
{
type: field.type === 'match_only_text' ? 'keyword' : field.type,
...(field.format ? { format: field.format } : {}),
},
])
);

const runtimeFieldsSearchBody = {
runtime_mappings: propertiesCompatibleWithRuntimeMappings,
sort: [
{
'@timestamp': {
order: 'desc',
},
},
],
size: SAMPLE_SIZE,
fields: params.body.field_definitions.map((field) => field.name),
_source: false,
};

// This gives us a "fields" representation rather than _source from the simulation
const runtimeFieldsResult = await scopedClusterClient.asCurrentUser.search({
index: params.path.id,
...runtimeFieldsSearchBody,
});

return {
status: 'success',
simulationError: null,
documentsWithRuntimeFieldsApplied: runtimeFieldsResult.hits.hits
.map((hit) => {
if (!hit.fields) {
return {};
}
return Object.keys(hit.fields).reduce<Record<string, unknown>>((acc, field) => {
acc[field] = hit.fields![field][0];
return acc;
}, {});
})
.filter((doc) => Object.keys(doc).length > 0),
};
} catch (e) {
if (e instanceof DefinitionNotFound) {
throw notFound(e);
}

throw internal(e);
}
},
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { z } from '@kbn/zod';
import { internal, notFound } from '@hapi/boom';
import { getFlattenedObject } from '@kbn/std';
import { DefinitionNotFound } from '../../../lib/streams/errors';
import { checkReadAccess, readAncestors, readStream } from '../../../lib/streams/stream_crud';
import { createServerRoute } from '../../create_server_route';

const SAMPLE_SIZE = 500;

export const unmappedFieldsRoute = createServerRoute({
endpoint: 'GET /api/streams/{id}/schema/unmapped_fields',
options: {
access: 'internal',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({
path: z.object({ id: z.string() }),
}),
handler: async ({
response,
params,
request,
logger,
getScopedClients,
}): Promise<{ unmappedFields: string[] }> => {
try {
const { scopedClusterClient } = await getScopedClients({ request });

const hasAccess = await checkReadAccess({ id: params.path.id, scopedClusterClient });
if (!hasAccess) {
throw new DefinitionNotFound(`Stream definition for ${params.path.id} not found.`);
}

const streamEntity = await readStream({
scopedClusterClient,
id: params.path.id,
});

const searchBody = {
sort: [
{
'@timestamp': {
order: 'desc',
},
},
],
size: SAMPLE_SIZE,
};

const results = await scopedClusterClient.asCurrentUser.search({
index: params.path.id,
...searchBody,
});

const sourceFields = new Set<string>();

results.hits.hits.forEach((hit) => {
Object.keys(getFlattenedObject(hit._source as Record<string, unknown>)).forEach((field) => {
sourceFields.add(field);
});
});

// Mapped fields from the stream's definition and inherited from ancestors
const mappedFields = new Set<string>();

streamEntity.definition.fields.forEach((field) => mappedFields.add(field.name));

const { ancestors } = await readAncestors({
id: params.path.id,
scopedClusterClient,
});

for (const ancestor of ancestors) {
ancestor.definition.fields.forEach((field) => mappedFields.add(field.name));
}

const unmappedFields = Array.from(sourceFields)
.filter((field) => !mappedFields.has(field))
.sort();

return { unmappedFields };
} catch (e) {
if (e instanceof DefinitionNotFound) {
throw notFound(e);
}

throw internal(e);
}
},
});
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"@kbn/server-route-repository-client",
"@kbn/observability-utils-server",
"@kbn/observability-utils-common",
"@kbn/std",
"@kbn/safer-lodash-set"
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@ import { EuiDataGrid } from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import React, { useEffect, useMemo, useState } from 'react';

export function PreviewTable({ documents }: { documents: unknown[] }) {
export function PreviewTable({
documents,
displayColumns,
}: {
documents: unknown[];
displayColumns?: string[];
}) {
const [height, setHeight] = useState('100px');
useEffect(() => {
// set height to 100% after a short delay otherwise it doesn't calculate correctly
Expand All @@ -19,6 +25,8 @@ export function PreviewTable({ documents }: { documents: unknown[] }) {
}, []);

const columns = useMemo(() => {
if (displayColumns) return displayColumns;

const cols = new Set<string>();
documents.forEach((doc) => {
if (!doc || typeof doc !== 'object') {
Expand All @@ -29,7 +37,7 @@ export function PreviewTable({ documents }: { documents: unknown[] }) {
});
});
return Array.from(cols);
}, [documents]);
}, [displayColumns, documents]);

const gridColumns = useMemo(() => {
return Array.from(columns).map((column) => ({
Expand Down
Loading

0 comments on commit a92633f

Please sign in to comment.