Skip to content

Commit

Permalink
[Dataset quality] šŸž Rely solely on _index instead of data_stream propā€¦
Browse files Browse the repository at this point in the history
ā€¦erties (elastic#210329)

Closes elastic/logs-dev#192.

## Background

This have been an long running issue within dataset quality page which
became more noticeable when introducing failure store. Before this
change `Dataset quality details` page was already solely relying on
`_index` instead of filtering documents using `data_stream` properties
while the main page was filtering out the documents.

### Before 


https://github.com/user-attachments/assets/02d14cb9-81a6-4f61-a199-5d1e55443a20

### After


https://github.com/user-attachments/assets/09a4e523-b927-4147-99d1-6ceff40f1027
  • Loading branch information
yngrdyn authored Feb 11, 2025
1 parent a1356ff commit d26f9ff
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,82 +7,9 @@

import type { ElasticsearchClient } from '@kbn/core/server';
import { DataStreamDocsStat } from '../../../../common/api_types';
import { FAILURE_STORE_SELECTOR } from '../../../../common/constants';
import { DataStreamType } from '../../../../common/types';
import {
extractIndexNameFromBackingIndex,
streamPartsToIndexPattern,
} from '../../../../common/utils';
import { createDatasetQualityESClient } from '../../../utils';
import { DatasetQualityESClient } from '../../../utils/create_dataset_quality_es_client';
import { rangeQuery } from '../../../utils/queries';

const SIZE_LIMIT = 10000;

async function getPaginatedResults(options: {
datasetQualityESClient: DatasetQualityESClient;
index: string;
start: number;
end: number;
after?: { dataset: string };
prevResults?: Record<string, number>;
}) {
const { datasetQualityESClient, index, start, end, after, prevResults = {} } = options;

const bool = {
filter: [...rangeQuery(start, end)],
};

const response = await datasetQualityESClient.search({
index: `${index}${FAILURE_STORE_SELECTOR}`,
size: 0,
query: {
bool,
},
aggs: {
datasets: {
composite: {
...(after ? { after } : {}),
size: SIZE_LIMIT,
sources: [{ dataset: { terms: { field: '_index' } } }],
},
},
},
});

const currResults = (response.aggregations?.datasets.buckets ?? []).reduce((acc, curr) => {
const datasetName = extractIndexNameFromBackingIndex(curr.key.dataset as string);

return {
...acc,
[datasetName]: (acc[datasetName] ?? 0) + curr.doc_count,
};
}, {} as Record<string, number>);

const results = {
...prevResults,
...currResults,
};

if (
response.aggregations?.datasets.after_key &&
response.aggregations?.datasets.buckets.length === SIZE_LIMIT
) {
return getPaginatedResults({
datasetQualityESClient,
index,
start,
end,
after:
(response.aggregations?.datasets.after_key as {
dataset: string;
}) || after,
prevResults: results,
});
}

return results;
}
import { streamPartsToIndexPattern } from '../../../../common/utils';
import { getAggregatedDatasetPaginatedResults } from '../get_dataset_aggregated_paginated_results';

export async function getFailedDocsPaginated(options: {
esClient: ElasticsearchClient;
Expand All @@ -102,17 +29,10 @@ export async function getFailedDocsPaginated(options: {
})
);

const datasetQualityESClient = createDatasetQualityESClient(esClient);

const datasets = await getPaginatedResults({
datasetQualityESClient,
return await getAggregatedDatasetPaginatedResults({
esClient,
index: datasetNames.join(','),
start,
end,
});

return Object.entries(datasets).map(([dataset, count]) => ({
dataset,
count,
}));
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@

import { QueryDslBoolQuery } from '@elastic/elasticsearch/lib/api/types';
import type { ElasticsearchClient } from '@kbn/core/server';
import { extractIndexNameFromBackingIndex } from '../../../common/utils';
import { DataStreamDocsStat } from '../../../common/api_types';
import { createDatasetQualityESClient } from '../../utils';
import { rangeQuery } from '../../utils/queries';

interface Dataset {
type: string;
dataset: string;
namespace: string;
}

const SIZE_LIMIT = 10000;
Expand All @@ -37,11 +36,7 @@ export async function getAggregatedDatasetPaginatedResults(options: {
composite: {
...(afterKey ? { after: afterKey } : {}),
size: SIZE_LIMIT,
sources: [
{ type: { terms: { field: 'data_stream.type' } } },
{ dataset: { terms: { field: 'data_stream.dataset' } } },
{ namespace: { terms: { field: 'data_stream.namespace' } } },
],
sources: [{ dataset: { terms: { field: '_index' } } }],
},
},
});
Expand All @@ -65,7 +60,7 @@ export async function getAggregatedDatasetPaginatedResults(options: {

const currResults =
response.aggregations?.datasets.buckets.map((bucket) => ({
dataset: `${bucket.key.type}-${bucket.key.dataset}-${bucket.key.namespace}`,
dataset: bucket.key.dataset as string,
count: bucket.doc_count,
})) ?? [];

Expand All @@ -82,13 +77,17 @@ export async function getAggregatedDatasetPaginatedResults(options: {
end,
after:
(response.aggregations?.datasets.after_key as {
type: string;
dataset: string;
namespace: string;
}) || after,
prevResults: results,
});
}

return results;
return Object.entries(
results.reduce((acc, curr) => {
const dataset = extractIndexNameFromBackingIndex(curr.dataset);
acc[dataset] = (acc[dataset] ?? 0) + curr.count;
return acc;
}, {} as Record<string, number>)
).map(([dataset, count]) => ({ dataset, count }));
}

0 comments on commit d26f9ff

Please sign in to comment.