Skip to content

Commit

Permalink
Merge pull request #364 from m453h/arch-eraser-sketch
Browse files Browse the repository at this point in the history
Add ability to remove archive documents from ElasticSearch using original URL
  • Loading branch information
m453h authored Feb 26, 2025
2 parents b3cf2b8 + 896d0c4 commit caf6306
Show file tree
Hide file tree
Showing 6 changed files with 553 additions and 0 deletions.
38 changes: 38 additions & 0 deletions bin/run-arch-eraser.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#!/bin/sh

SCRIPT_DIR="$(dirname "$0")"
. "$SCRIPT_DIR/func.sh"

print_help(){
echo ""
echo "Usage: $0 <path_to_url_list_file> [OPTIONS]"
echo ""
echo "Description:"
echo " Description: Deletes documents from Elasticsearch based on URLs provided in input files"
echo ""
echo "Options:"
echo " --elasticsearch-hosts Elasticsearch host URL(s)"
echo " --indices The name of the Elasticsearch indices to delete documents from"
echo " --min-delay The minimum time to wait between delete operations (default: 0.5 seconds)"
echo " --max-delay The maximum time to wait between delete operations (default: 3.0 seconds)"
echo " --batch-size The number of documents to send in a delete request to Elasticsearch. (default: 1000)"
echo ""
echo " Example:"
echo " $0 data/arch-lister/url_list --elasticsearch-hosts=http://localhost:9200 --indices=index1,index2 --batch-size=1000 --min-delay=1 --max-delay=3"
}

# Handle help flag
if [ "$1" = "-h" ] || [ "$1" = "--help" ]; then
print_help
exit 0
fi

if [ $# -lt 1 ]; then
print_help
exit 1
fi

input_path=$1
shift 1

run_python indexer.scripts.arch-eraser "$input_path" "$@" --rabbitmq-url='-'
46 changes: 46 additions & 0 deletions bin/run-arch-url-lister.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#!/bin/sh

SCRIPT_DIR="$(dirname "$0")"
. "$SCRIPT_DIR/func.sh"

print_help(){
echo ""
echo "Usage: $0 <input_file_path> [output_file_path]"
echo ""
echo "Description:"
echo " Writes a list of URLs in a WARC file to a txt file "
echo ""
echo "Arguments:"
echo " <input_file_path> Path to an indirect file (a file that contains a list of files to process)."
echo " [output_file_path] Optional. Path to output file for the URL list."
echo " Default: <PROJECT_DIR>/data/arch-lister/url_list/<WARC_FILE_NAME>.txt"
echo ""
echo " Example:"
echo " $0 arch-lister/file-1.txt"
echo " $0 arch-lister/file-1.txt url-lists.txt"
}

# Handle help flag
if [ "$1" = "-h" ] || [ "$1" = "--help" ]; then
print_help
exit 0
fi

# We expect (1) argument when we want to process files and list URLs
if [ $# -lt 1 ]; then
print_help
exit 1
fi

# Verify that the input file path exists
if [ ! -f "$1" ]; then
echo "Error: The input file '$1' does not exist, please check the path and try again"
exit 1
fi

OUTPUT_PARAM=""
if [ -n "$2" ]; then
OUTPUT_PARAM="-o $2"
fi

run_python indexer.scripts.arch-lister "@$1" $OUTPUT_PARAM --rabbitmq-url='-'
90 changes: 90 additions & 0 deletions bin/run-arch-warc-lister.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#!/bin/sh

SCRIPT_DIR="$(dirname "$0")"
. "$SCRIPT_DIR/func.sh"

is_valid_date() {
case "$1" in
[0-9][0-9][0-9][0-9]/[0-9][0-9]/[0-9][0-9]) return 0 ;;
*) return 1 ;;
esac
}

increment_date() {
date_cmd=$(command -v date)
case "$(uname)" in
Darwin)
"$date_cmd" -j -v+1d -f "%Y/%m/%d" "$1" +"%Y/%m/%d"
;;
Linux)
"$date_cmd" -d "$1 + 1 day" +"%Y/%m/%d" 2>/dev/null
;;
*)
echo "Unsupported Environment" >&2
return 1
;;
esac
}

convert_date_to_int() {
input_date="$1"
if [ "$(uname)" = "Darwin" ]; then
date -j -f "%Y/%m/%d" "$input_date" +"%Y%m%d" 2>/dev/null
elif [ "$(uname)" = "Linux" ]; then
date -d "$input_date" +"%Y%m%d" 2>/dev/null
else
echo "Unsupported OS" >&2
return 1
fi
}

print_help(){
echo ""
echo "Usage: $0 <start_date> <end_date> <pattern> <output>"
echo ""
echo "Description:"
echo " Outputs a list of files from archives based on a specified date range and matching pattern."
echo ""
echo "Arguments:"
echo " <start_date> Start date for filtering records (format: YYYY/MM/DD)."
echo " <end_date> End date for filtering records (format: YYYY/MM/DD)."
echo " <pattern> String pattern used to construct file paths (e.g. 'b2://archives/{pattern}/mchist2022')"
echo " <output> The path to the output file where the archive list will be written"
echo ""
echo " Example:"
echo " $0 2024/12/15 2024/12/31 'b2://archives/{pattern}/mchist2022'"
}

# Handle help flag
if [ "$1" = "-h" ] || [ "$1" = "--help" ]; then
print_help
exit 0
fi

# We expect (4) arguments when we want to list files by date
if [ $# -lt 4 ]; then
print_help
exit 1
fi

start_date="$1"
end_date="$2"
pattern="$3"

if ! is_valid_date "$start_date" || ! is_valid_date "$end_date"; then
echo "Error: Invalid date format. Use YYYY/MM/DD" >&2
exit 1
fi

search_pattern=""
start_date_int=$(convert_date_to_int "$start_date")
end_date_int=$(convert_date_to_int "$end_date")

while [ "$start_date_int" -le "$end_date_int" ]; do
current_url=$(echo "$pattern" | sed "s|{pattern}|$start_date|g")
search_pattern="${search_pattern}${current_url} "
start_date=$(increment_date "$start_date")
start_date_int=$(convert_date_to_int "$start_date")
done

run_python indexer.scripts.arch-lister $search_pattern -o "$4" -w --rabbitmq-url='-'
11 changes: 11 additions & 0 deletions indexer/queuer.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,17 @@ def maybe_process_file(self, fname: str) -> None:
args = self.args
assert args

if fname[0] == "@":
# implement "indirect file" (file containing file names)
# NOTE! paths read from indirect files are NOT interpreted
# as relative to the path of the indirect file.
logger.info("indirect file %s", fname)

f = self.open_file(fname[1:])
for line in f:
self.maybe_process_file(line.decode().rstrip())
return

if args.test:
logger.info("maybe_process_file %s", fname)
return
Expand Down
175 changes: 175 additions & 0 deletions indexer/scripts/arch-eraser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import argparse
import logging
import random
import sys
import time
from typing import BinaryIO, List, Optional

import mcmetadata
from elasticsearch import Elasticsearch

from indexer.elastic import ElasticMixin
from indexer.queuer import Queuer

logger = logging.getLogger("arch-eraser")


class ArchEraser(ElasticMixin, Queuer):
"""
A class for deleting documents from Elasticsearch based on URLs from txt files.
Supports both single and batch deletion operations with configurable delays.
"""

HANDLE_GZIP = False
APP_BLOBSTORE = ""
MAX_RETRY_TIME = 60

def __init__(self, process_name: str, descr: str):
super().__init__(process_name, descr)
self._es_client: Optional[Elasticsearch] = None
self.indices: str = ""
self.min_delay: float = 0
self.max_delay: float = 0
self.batch_size: int = 0
self.successful_operations_count: int = 0
self.display_stats: bool = True

def qconnect(self) -> None:
return

def check_output_queues(self) -> None:
return

def define_options(self, ap: argparse.ArgumentParser) -> None:
super().define_options(ap)
ap.add_argument(
"--batch-size",
dest="batch_size",
type=int,
default=1000,
help="The number of documents to send in a delete request to Elasticsearch.",
)
ap.add_argument(
"--indices",
dest="indices",
help="The name of the Elasticsearch indices to delete from",
)
ap.add_argument(
"--min-delay",
dest="min_delay",
type=float,
default=0.5,
help="The minimum time to wait between delete operations (default: 0.5 seconds)",
)
ap.add_argument(
"--max-delay",
dest="max_delay",
type=float,
default=3.0,
help="The maximum time to wait between delete operations (default: 3.0 seconds)",
)

def process_args(self) -> None:
super().process_args()
assert self.args
self.indices = self.args.indices
self.min_delay = self.args.min_delay
self.max_delay = self.args.max_delay
self.batch_size = self.args.batch_size

@property
def es_client(self) -> Elasticsearch:
if self._es_client is None:
self._es_client = self.elasticsearch_client()
return self._es_client

def delete_documents(self, urls: List[str]) -> None:
total_urls = len(urls)
try:
ids = []
for url in urls:
ids.append(mcmetadata.urls.unique_url_hash(url))
if len(ids) >= self.batch_size:
delete_count = self._delete_documents_by_ids(ids)
self._update_stats(delete_count)
self._apply_delay()
ids.clear()
if ids:
delete_count = self._delete_documents_by_ids(ids)
self._update_stats(delete_count)
except Exception as e:
logger.exception(e)
finally:
self._log_deletion_stats(total_urls)

def _delete_documents_by_ids(self, ids: List[str]) -> int | None:
sec = 1 / 16
while True:
try:
response = self.es_client.delete_by_query(
index=self.indices,
body={"query": {"terms": {"_id": ids}}},
wait_for_completion=True,
)
if self.display_stats and response.get("deleted") is not None:
return int(response.get("deleted"))
return None
except Exception as e:
self.display_stats = False # If there is an exception we lose all stats and should not display them
sec *= 2
if sec > self.MAX_RETRY_TIME:
# If an exception occurs we are going to exit to ensure that the file tracker
# doesn't mark a file as processed in the end of processing
logger.exception(e)
sys.exit(1)
logger.warning("retry delete operation: after %s(s)", sec)
time.sleep(sec)

def _update_stats(self, delete_count: int | None) -> None:
if delete_count is None or not self.display_stats:
return
self.successful_operations_count += delete_count

def _log_deletion_stats(self, total_urls: int) -> None:
if self.display_stats:
if self.successful_operations_count == total_urls:
log_level = logging.INFO
else:
log_level = logging.WARNING
logger.log(
log_level,
"Deleted [%s] out of [%s] documents.",
self.successful_operations_count,
total_urls,
)
else:
logger.warning("Unable to get deletion stats")

def _apply_delay(self) -> None:
delay = random.uniform(self.min_delay, self.max_delay)
logger.info("Waiting %0.2f seconds before deleting the next batch...", delay)
time.sleep(delay)

def process_file(self, fname: str, fobj: BinaryIO) -> None:
assert self.args
urls = []
with open(fname, "r") as file:
for line in file:
urls.append(line.strip())
logger.info("collected %d urls from %s", len(urls), fname)

if self.args.dry_run:
return

# Not a dry run, do the actual deletion
logger.warning("deleting %d urls from %s here!", len(urls), fname)
start_time = time.time()
self.delete_documents(urls)
end_time = time.time()
elapsed_time = end_time - start_time
logger.info("Time taken: %.2f seconds", elapsed_time)


if __name__ == "__main__":
app = ArchEraser("arch-eraser", "remove stories loaded from archive files from ES")
app.main()
Loading

0 comments on commit caf6306

Please sign in to comment.