Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to remove archive documents from ElasticSearch using original URL #364

Merged
merged 24 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
48e9976
Add indexer/scripts/arch-eraser.py
philbudne Jan 10, 2025
6775aec
Add quick "indirect file" support to Queuer class
philbudne Jan 10, 2025
cc525ca
indexer/scripts/arch-eraser.py: update top docstring
philbudne Jan 13, 2025
50cadb2
Add script to invoke arch-eraser
m453h Jan 23, 2025
35916e8
Allow passing of extra args
m453h Jan 23, 2025
f01145d
Add ability to delete documents from ElasticSearch
m453h Jan 27, 2025
3b4c738
Clean up indexer/elastic.py
m453h Jan 27, 2025
c1dcbab
Add option to write output of WARC files to process to txt file
m453h Jan 29, 2025
b34d911
Improve bash script to handle both erase and generate erase list options
m453h Jan 29, 2025
da7080b
Replace f-strings with %-formatting in log messages
m453h Jan 29, 2025
012f837
Clean up run-arch-eraser script
m453h Jan 29, 2025
1fe69e8
Add option to set random delay(s) between elastic search deletions
m453h Feb 3, 2025
08a65e2
Improve type checking in queuer
m453h Feb 3, 2025
3dfea92
Update minimum and maximum delay time help message
m453h Feb 3, 2025
a4288ce
Simplify arch-eraser bash script help and remove redudant .gitignore …
m453h Feb 4, 2025
341f469
Rename ArchEraser to ArchLister and split WARC and URL listing scripts
m453h Feb 7, 2025
16e3e77
Add arch-eraser script
m453h Feb 9, 2025
bc74f94
Update indexer/scripts/arch-lister.py
m453h Feb 14, 2025
4d9b69f
Simplify arch-eraser by using only batch delete and use Ids to query
m453h Feb 18, 2025
35c9680
Merge branch 'arch-eraser-sketch' of github.com:m453h/story-indexer i…
m453h Feb 18, 2025
00cfc17
Fix error in arch-lister due to identation
m453h Feb 18, 2025
e35bd5b
Improve arch-eraser and ensure arch-lister outputs to specified path
m453h Feb 18, 2025
1192ec7
Update run-arch-eraser.sh and run-arch-warc-lister help messages
m453h Feb 18, 2025
896d0c4
Modify arch-eraser to use delete_by_query API
m453h Feb 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ docker/deploy.log
docker/docker-compose.yml
docker/docker-compose.yml.dump
docker/docker-compose.yml.save-*
*.sqlite3
66 changes: 66 additions & 0 deletions bin/run-arch-eraser.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#!/bin/sh

. bin/func.sh

is_valid_date() {
case "$1" in
[0-9][0-9][0-9][0-9]/[0-9][0-9]/[0-9][0-9]) return 0 ;;
*) return 1 ;;
esac
}
increment_date() {
date_cmd=$(command -v date)
case "$(uname)" in
Darwin)
"$date_cmd" -j -v+1d -f "%Y/%m/%d" "$1" +"%Y/%m/%d"
;;
Linux)
"$date_cmd" -d "$1 + 1 day" +"%Y/%m/%d" 2>/dev/null
;;
*)
echo "Unsupported Environment" >&2
return 1
;;
esac
}

if [ $# -lt 3 ]; then
echo "Usage: $0 [start_date] [end_date] [pattern]" >&2
exit 1
fi

start_date="$1"
end_date="$2"
pattern="$3"
shift 3
other_params="$*"

if ! is_valid_date "$start_date" || ! is_valid_date "$end_date"; then
echo "Error: Invalid date format. Use YYYY/MM/DD" >&2
exit 1
fi

convert_date_to_int() {
input_date="$1"
if [ "$(uname)" = "Darwin" ]; then
date -j -f "%Y/%m/%d" "$input_date" +"%Y%m%d" 2>/dev/null
elif [ "$(uname)" = "Linux" ]; then
date -d "$input_date" +"%Y%m%d" 2>/dev/null
else
echo "Unsupported OS" >&2
return 1
fi
}

output_string=""
start_date_int=$(convert_date_to_int "$start_date")
end_date_int=$(convert_date_to_int "$end_date")

while [ "$start_date_int" -le "$end_date_int" ]; do
current_url=$(echo "$pattern" | sed "s|{pattern}|$start_date|g")
output_string="${output_string}${current_url} "
start_date=$(increment_date "$start_date")
start_date_int=$(convert_date_to_int "$start_date")
done

run_python indexer.scripts.arch-eraser ${output_string} ${other_params}
30 changes: 27 additions & 3 deletions indexer/queuer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
import os
import sys
import tempfile
from typing import BinaryIO, cast
from contextlib import nullcontext
from typing import BinaryIO, cast, ContextManager, Optional, TextIO

import requests

Expand Down Expand Up @@ -71,6 +72,7 @@ def define_options(self, ap: argparse.ArgumentParser) -> None:
help="clean up old, incompletely processed files",
)
ap.add_argument("input_files", nargs="*", default=None)
ap.add_argument("--output-file", dest="output_file", default=None)

def process_file(self, fname: str, fobj: BinaryIO) -> None:
"""
Expand Down Expand Up @@ -189,8 +191,19 @@ def maybe_process_files(self, fname: str) -> None:
# more likely a queuer might actually need
# multiple keys (eg; reading CSVs from one bucket
# that reference objects in another bucket).
for key in sorted(bs.list_objects(prefix), reverse=True):
self.maybe_process_file(f"{scheme}://{bs.bucket}/{key}")
assert self.args
output_file: Optional[str] = self.args.output_file
context: ContextManager[Optional[object]] = nullcontext()
with (
open(output_file, "a")
if output_file
else context
) as file:
for key in sorted(bs.list_objects(prefix), reverse=True):
if output_file:
assert isinstance(file, io.TextIOWrapper)
file.write(f"{scheme}://{bs.bucket}/{key}\n")
self.maybe_process_file(f"{scheme}://{bs.bucket}/{key}")
break # found working config: for store .... loop

except tuple(bs.EXCEPTIONS) as e:
Expand All @@ -212,6 +225,17 @@ def maybe_process_file(self, fname: str) -> None:
args = self.args
assert args

if fname[0] == "@":
# implement "indirect file" (file containing file names)
# NOTE! paths read from indirect files are NOT interpreted
# as relative to the path of the indirect file.
logger.info("indirect file %s", fname)

f = self.open_file(fname[1:])
for line in f:
self.maybe_process_file(line.decode().rstrip())
return

if args.test:
logger.info("maybe_process_file %s", fname)
return
Expand Down
221 changes: 221 additions & 0 deletions indexer/scripts/arch-eraser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
"""
Sketch of an "archive eraser"

Reads archive files (possibly from remote "blob stores"), extracts URLs
and removes objects from Elasticsearch.

NOTE!! Does not actually remove the archive files from their
off-site storage location(s)!!!!

This was written to remove stories for late Jan thru early March 2022
(database E) that were initially recovered in 2024 (with some link
rot) thru various means(*), so that stories "blindly" recovered from
S3 (without known URL) where canonical URLs were then extracted.

However, experimention suggested that loading the canonical URLs
without first removing the first attempt could lead to a 10% duplicate
rate (initial vs. final URLs and other URL differences).

From the below, it appears all of the WARC files are available on S3,
with some available from B2 as well (may be cheaper to fetch from B2).

(*) The different ways the URLs were recovered:
1. From synthetic RSS files written at the time for IA (both by the legacy system
and the then "backup" rss-fetcher.

2. From RSS files blindly extracted from S3 (ignoring the HTML files!) into CSV files of URLs

Stories in index mc_search-00002 and mc_search-00003

All files on S3, some on B2 (starting 2024/05/31)

arch prefix start end archives
mccsv 2024/05/22 -> 2024/06/27 S3/(B2)
mc(rss) 2024/05/27 -> 2024/06/20 S3/(B2) [1]
mcrss 2024/06/20 -> 2024/08/16 S3/B2

(B2) means some of the date range on B2

[1] initial WARC files from RSS files written from 2024/05/27 thru
2024/06/20 start with mc- (see below):

THESE SHOULD BE VERIFIED!!! The "via" field in the metadata
should indicate how the URL was obtained!

dates container name (in WARC filename)
2024/05/27-2024/05/28 cf94b52abe5a S3 [154 files]
2024/05/29-2024/06/04 cefd3fdce464 S3 [882 files]
2024/06/05 0c501ed61cf4 S3 & B2 [497 files]
2024/06/05 446d55936e82 S3 & B2 [27 files]
2024/06/05 cefd3fdce464
2024/06/06-2024/06/09 0c501ed61cf4
2024/06/09 7e1b47c305f1 S3 & B2 [1 file]
2024/06/11-2024/06/20 6c55aaf9daaa

================

This is based on the "Queuer" class, which reads both local and remote
input files, and keeps track of which files have been processed.

No queues are involved (provide any value for --rabbitmq-url or RABBITMQ_URL)

The "tracker" uses SQLite3 (**), and should be multi-process safe,
although this application may experience more contention (SQLite3 does
full-table locks for row creation), and testing should be done (using
--dry-run) with multiple processes running to see if any errors or
exceptions are thrown due to lock contention!

(**) The author doesn't care how you pronounce it, but he says "ess cue ell ite"
(like it's a mineral): https://www.youtube.com/watch?v=Jib2AmRb_rk

Because the files involved span a wide range of dates, and have
various forms, rather than implement fancy wildcard or filtering
support, the idea is to collect all the (full) archive URLs into a
file (or files), and use the "indirect file" feature in the queuer
to read the files of URLs.
"""

import argparse
import logging
from typing import BinaryIO, List, Optional

from elasticsearch import Elasticsearch

from indexer.elastic import ElasticMixin
from indexer.queuer import Queuer
from indexer.story_archive_writer import StoryArchiveReader

logger = logging.getLogger("arch-eraser")


class ArchEraser(ElasticMixin, Queuer):
APP_BLOBSTORE = "HIST" # first choice for blobstore conf vars
HANDLE_GZIP = False # StoryArchiveReader handles compression

# don't want to talk to RabbitMQ, but too much work
# to refactor Queuer into a FileProcessor add-in

def __init__(self, process_name: str, descr: str):
super().__init__(process_name, descr)
self.is_batch_delete: bool = False
self.keep_alive: str = ""
self.fetch_batch_size: Optional[int] = None
self.indices: str = ""

def qconnect(self) -> None:
return

def check_output_queues(self) -> None:
return

def define_options(self, ap: argparse.ArgumentParser) -> None:
super().define_options(ap)
ap.add_argument(
"--fetch-batch-size",
dest="fetch_batch_size",
type=int,
default=1000,
help="The number of documents to fetch from Elasticsearch in each batch (default: 1000)",
)
ap.add_argument(
"--indices",
dest="indices",
help="The name of the Elasticsearch indices to delete",
)
ap.add_argument(
"--keep-alive",
dest="keep_alive",
default="1m",
help="How long should Elasticsearch keep the PIT alive e.g. 1m -> 1 minute",
)
ap.add_argument(
"--batch-delete",
dest="is_batch_delete",
action="store_true",
default=False,
help="Enable batch deletion of documents (default: False)",
)

def process_args(self) -> None:
"""
Process command line arguments and set instance variables.
"""
super().process_args()
assert self.args
self.fetch_batch_size = self.args.fetch_batch_size
self.indices = self.args.indices
self.keep_alive = self.args.keep_alive
self.is_batch_delete = self.args.is_batch_delete

def delete_documents(self, urls: List[Optional[str]]) -> None:
es = self.elasticsearch_client()
pit_id = None
total_deleted = 0
try:
pit_id = es.open_point_in_time(
index=self.indices, keep_alive=self.keep_alive
).get("id")
logger.info("Opened Point-in-Time with ID %s", pit_id)
query = {
"size": self.fetch_batch_size,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised to see a query!

If it's just to find the ID of the document to delete, then you can use mcmetadata.urls.unique_url_hash() on the URL.

Any reason to use the original_url field (below)?

It ended up in ES because it's one of the outputs of mcmetadata, and in normal operation it should be identical to url, and I don't think it's actually used by anything (if it was possible to remove fields in the move to the new cluster, it would be one of two I'd suggest tossing, the other is full_language)!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, for the added context ! @philbudne I have made the change to use the ID constructed from mcmetadata.urls.unique_url_hash()

Regarding why we have a query, it is to retrieve both the index and ID of the document, as Elasticsearch requires this information for deletion. Since the stories could be in either mc_search-00002 or mc_search-00003, I thought retrieving the story by original_url ID to identify its index before constructing the list of stories for bulk_delete would be the most appropriate approach.

An alternative to prefetching the stories to determine their index would be to use the delete_by_query API. However, this can potentially use the scroll API when handling large datasets, adding more load to Elasticsearch. That's why I opted for a query using search_after and PIT.

I’m definitely open to exploring any other approach you think could improve this!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m453h haven't studied delete_by_query, but only one document should EVER match the "id" within a single index, and we try not to index a URL more than once by doing an id query against all indices before trying to import a story, so if the delete_by_query is limited to index ...02 and ...03 the worst case is two documents.

My concern is that the separate lookup and delete will have longer latency (two round trips thru the API) and more impact on the ES servers (locating the document twice). On the other hand I could be worrying about nothing!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@philbudne I’ve updated the implementation to use delete_by_query. Could you take a look and let me know if this approach looks better? 🤞

Also, I noticed from the docs that we can throttle requests which might help prevent the deletion process from overwhelming the cluster. I was thinking that this could also give us a more accurate way to estimate the total deletion time. I've currently not set any throttling, perhaps determining a realistic rate for document removal and setting it as the default value could be useful

"query": {"terms": {"original_url": urls}},
"pit": {"id": pit_id, "keep_alive": self.keep_alive},
"sort": [{"_doc": "asc"}],
}
search_after = None
while True:
if search_after:
query["search_after"] = search_after
# Fetch the next batch of documents
response = es.search(body=query)
hits = response["hits"]["hits"]
# Each result will return a PIT ID which may change, thus we just need to update it
pit_id = response.get("pit_id")
if not hits:
break
bulk_actions = []
for hit in hits:
document_index = hit["_index"]
document_id = hit["_id"]
if self.is_batch_delete:
bulk_actions.append(
{"delete": {"_index": document_index, "_id": document_id}}
)
else:
es.delete(index=document_index, id=document_id)
total_deleted += 1
if bulk_actions:
es.bulk(index=self.indices, body=bulk_actions)
total_deleted += len(bulk_actions)
search_after = hits[-1]["sort"]
except Exception as e:
logger.exception(e)
finally:
if total_deleted != len(urls):
logger.warning(
f"Mismatch in document deletion count: [{total_deleted}] deleted out of [{len(urls)}] expected."
)
else:
logger.info(f"Deleted [{total_deleted}/{len(urls)}] documents.")
if isinstance(es, Elasticsearch) and pit_id:
response = es.close_point_in_time(id=pit_id)
if response.get("succeeded"):
logger.info("Successfully closed Point-in-Time with ID %s", pit_id)

def process_file(self, fname: str, fobj: BinaryIO) -> None:
assert self.args
logger.info("process_file %s", fname)
# it may be possible to make this faster by NOT using
# StoryArchiveReader and warcio, but it came for "free":
reader = StoryArchiveReader(fobj)
urls = []
for story in reader.read_stories():
urls.append(story.content_metadata().url)
logger.info("collected %d urls from %s", len(urls), fname)
if not self.args.dry_run:
logger.warning("delete %d urls from %s here!", len(urls), fname)


if __name__ == "__main__":
app = ArchEraser("arch-eraser", "remove stories loaded from archive files from ES")
app.main()