Skip to content

Add checks for reindex task failures and number of docs created #1291

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions curator/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1280,7 +1280,7 @@ def _get_reindex_args(self, source, dest):
del reindex_args['slices']
return reindex_args

def _post_run_quick_check(self, index_name):
def _post_run_quick_check(self, index_name, task_id):
# Verify the destination index is there after the fact
index_exists = self.client.indices.exists(index=index_name)
alias_instead = self.client.indices.exists_alias(name=index_name)
Expand All @@ -1303,6 +1303,36 @@ def _post_run_quick_check(self, index_name):
'not found.'.format(index_name)
)

# Verify that the reindex task finished without errors
response = self.client.tasks.get(task_id)
failures = response['response']['failures']

if len(failures) > 0:
self.loggit.error(
'The reindex task completed with {0} errors. '
'Check task: {1} for more information.'.format(len(failures), task_id)
)

raise exceptions.FailedExecution(
'Reindex completed with {0} errors. '
'Check task: {1} for more information.'.format(len(failures), task_id)
)

# Verify that the number of docs in the source and destination index match
total_docs = response['task']['status']['total']
created_docs = response['task']['status']['created']

if not created_docs == total_docs:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, devil's advocate here. What happens if someone does a reindex from query instead of a full index? Will these still be equal? What if someone reindexes from multiple indices to 1 index (e.g. 1 week's worth of daily indices into a single weekly index)? Do these numbers match up? If the answer is not "yes," then I can't use the code—at least not as-is.

The draft code works just fine in the event of a 1:1 reindex. I could merge it if the test is only made made when a verify_doc_count flag is present (or something like it). It would, of course, have to be vetted by conditional tests. It wouldn't be valid, for example, if Curator was able to detect a many-to-one reindex (code which would have to be added).

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you reindex going from a mapping file without nested documents to a mapping file with nested documents, the document count will be different. I assume the doc count verification should be optional...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for providing a concrete example, @rahst12

self.loggit.error(
'The number of created documents ({0}) in the destination index'
'do not match the number of total documents in the source index ({1}). '
'Check task: {2} for more information.'.format(created_docs, total_docs, task_id)
)
raise exceptions.FailedExecution(
'Reindex incomplete. The number of created documents ({0}) in the destination index'
'do not match the number of total documents in the source index ({1}). '.format(created_docs, total_docs)
)

def sources(self):
# Generator for sources & dests
dest = self.body['dest']['index']
Expand Down Expand Up @@ -1375,7 +1405,7 @@ def do_action(self):
self.client, 'reindex', task_id=response['task'],
wait_interval=self.wait_interval, max_wait=self.max_wait
)
self._post_run_quick_check(dest)
self._post_run_quick_check(dest, response['task'])

else:
self.loggit.warn(
Expand Down