Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/update example books #423

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions example/README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# Example Setup

0. Prerequisites:
- Python 3
- Docker and `docker-compose`
- Python 3.8+
- Docker
- ~16GiB of free storage
2. Launch the containers: `docker-compose up -d`
3. Download and ingest the documents into Solr: `./ingest.py`
4. Search the corpus: `http://localhost:8181`
1. Launch containers: `docker compose up -d`
- Make sure you have write permissions for the subdirectory `./data` since this is where the data will be downloaded to
2. Download and ingest the documents into Solr: `./ingest.py`
- Please consider additional args in case of problems: `./ingest.py --help`
3. Search the index: `http://localhost:8181`
1 change: 0 additions & 1 deletion example/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: '2'
services:
solr:
image: solr:9.5
Expand Down
301 changes: 192 additions & 109 deletions example/ingest.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,55 @@
#!/usr/bin/env python3

import itertools
import json
import re
import sys
import tarfile
import xml.etree.ElementTree as etree
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from argparse import (
ArgumentParser,
)
from concurrent.futures import ProcessPoolExecutor, as_completed
from logging import (
DEBUG,
INFO,
Formatter,
Logger,
StreamHandler,
)
from pathlib import Path
from urllib import request
from xml.etree import (
ElementTree as etree
)
from urllib.error import (
URLError,
)
from typing import (
Callable,
Dict,
)


# turn on/off diagnostic information
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# turn on/off diagnostic information

GOOGLE1000_PATH = './data/google1000'
GOOGLE1000_URL = 'https://ocrhl.jbaiter.de/data/google1000_texts.tar.gz'
GOOGLE1000_NUM_VOLUMES = 1000
GOOGLE1000_BATCH_SIZE = 4
LUNION_PATH = './data/bnl_lunion'
LUNION_TEXTS_URL = 'https://ocrhl.jbaiter.de/data/bnl_lunion_texts.tar.gz'
LUNION_NUM_ARTICLES = 41446
LUNION_BATCH_SIZE = 1000
SOLR_HOST = 'localhost:8983'
HOCR_METADATA_PAT = re.compile(
r'<meta name=[\'"]DC\.(?P<key>.+?)[\'"] content=[\'"](?P<value>.+?)[\'"]\s*/?>')
NSMAP = {
'mets': 'http://www.loc.gov/METS/',
'mods': 'http://www.loc.gov/mods/v3'
}
DEFAULT_N_WORKERS = 2
DEFAULT_LOG_LEVEL = INFO
LOGGER_NAME = 'ingest'
LOGGER = None


class SolrException(Exception):
Expand All @@ -31,56 +58,149 @@ def __init__(self, resp, payload):
self.payload = payload


def gbooks_are_volumes_missing(base_path):
for vol_no in range(1000):
vol_path = base_path / 'Volume_{:04}.hocr'.format(vol_no)
if not vol_path.exists():
return True
return False
def main_ingest(the_args):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function shouldn't have to know about argparse, just pass the parameters directly, there are not that many 🙃

_n_worker = the_args.num_workers
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't have to mark variables as private in a function body, they're private to the function anyway.

_is_books_only = the_args.books_only
_book_base_path = Path(GOOGLE1000_PATH).absolute()
_book_base_path.mkdir(parents=True, exist_ok=True)
LOGGER.info("Load and Indexing Google %s Books into %s",
GOOGLE1000_NUM_VOLUMES, _book_base_path)

futs = []
with ProcessPoolExecutor(max_workers=_n_worker) as pool_exec:
gbooks_iter = load_documents(
GOOGLE1000_URL, _book_base_path, transform_gbook_to_document)
for idx, batch in enumerate(_generate_batches(gbooks_iter, GOOGLE1000_BATCH_SIZE)):
futs.append(pool_exec.submit(_index_documents, batch))
LOGGER.info("process %04d/%d",
((idx+1)*GOOGLE1000_BATCH_SIZE), GOOGLE1000_NUM_VOLUMES)
for fut in as_completed(futs):
fut.result()
if _is_books_only:
LOGGER.info("Only GBooks requested, job done.")
return

_lunion_bas_path = Path(LUNION_PATH).absolute()
_lunion_bas_path.mkdir(parents=True, exist_ok=True)
LOGGER.info("Load and Indexing BNL/L'Union articles into %s",
_lunion_bas_path)

futs = []
with ProcessPoolExecutor(max_workers=_n_worker) as pool_exec:
bnl_iter = bnl_load_documents(_lunion_bas_path)
for idx, batch in enumerate(_generate_batches(bnl_iter, LUNION_BATCH_SIZE)):
futs.append(pool_exec.submit(_index_documents, batch))
LOGGER.info("process %05d/%d",
(idx+1)*LUNION_BATCH_SIZE, LUNION_NUM_ARTICLES)
for fut in as_completed(futs):
fut.result()
LOGGER.info("All Jobs done.")


def gbooks_parse_metadata(hocr):
# I know, the <center> won't hold, but I think it's okay in this case,
# especially since we 100% know what data this script is going to work with
# and we don't want an external lxml dependency in here
raw_meta = {key: int(value) if value.isdigit() else value
for key, value in HOCR_METADATA_PAT.findall(hocr)}
raw_meta = {key: int(value) if value.isdigit() else value
for key, value in HOCR_METADATA_PAT.findall(hocr)}
return {
'author': [raw_meta.get('creator')] if 'creator' in raw_meta else [],
'title': [raw_meta['title']],
'date': '{}-01-01T00:00:00Z'.format(raw_meta['date']),
'date': f"{raw_meta['date']}-01-01T00:00:00Z",
**{k: v for k, v in raw_meta.items()
if k not in ('creator', 'title', 'date')}
}


def gbooks_load_documents(base_path):
if gbooks_are_volumes_missing(base_path):
print("Downloading missing volumes to {}".format(base_path))
base_path.mkdir(exist_ok=True)
with request.urlopen(GOOGLE1000_URL) as resp:
tf = tarfile.open(fileobj=resp, mode='r|gz')
for ti in tf:
if not ti.name.endswith('.hocr'):
continue
vol_id = ti.name.split('/')[-1].split('.')[0]
ocr_text = tf.extractfile(ti).read()
doc_path = base_path / '{}.hocr'.format(vol_id)
if not doc_path.exists():
with doc_path.open('wb') as fp:
fp.write(ocr_text)
hocr_header = ocr_text[:1024].decode('utf8')
yield {'id': vol_id.split("_")[-1],
'source': 'gbooks',
'ocr_text': '/data/google1000/' + doc_path.name,
**gbooks_parse_metadata(hocr_header)}
def transform_gbook_to_document(document_path: Path) -> Dict:
_content = document_path.read_text()
_doc_id = document_path.stem.split("_")[1]
_doc_name = document_path.name
return {'id': _doc_id,
'source': 'gbooks',
'ocr_text': f'/data/google1000/{_doc_name}',
**gbooks_parse_metadata(_content)}


def _gbook_doc_path_from_tar_entry(ti, base_path:Path) -> Path:
if not ti.name.endswith('.hocr'):
return None
vol_id = ti.name.split('/')[-1].split('.')[0]
return base_path / f'{vol_id}.hocr'


def load_documents(the_url, base_path: Path, transform_func: Callable):
try:
with request.urlopen(the_url) as resp:
try:
tf = tarfile.open(fileobj=resp, mode='r|gz')
for ti in tf:
_doc_path = _gbook_doc_path_from_tar_entry(ti, base_path)
if _doc_path is None:
continue
if not _doc_path.exists():
LOGGER.debug("Download %s", _doc_path)
try:
_local_file = tf.extractfile(ti).read()
with _doc_path.open('wb') as fp:
fp.write(_local_file)
except tarfile.ReadError as _entry_read_error:
LOGGER.error("Fail process %s: %s",
ti, _entry_read_error.args[0])
continue
LOGGER.debug("Extract metadata from %s", _doc_path)
yield transform_func(_doc_path)
except tarfile.ReadError as _tar_read_error:
LOGGER.error("Processing %s: %s",
tf, _tar_read_error.args[0])
except URLError as _exc:
LOGGER.error("Fail request %s: %s", the_url, _exc.args[0])


def bnl_load_documents(base_path: Path):
if bnl_are_volumes_missing(base_path):
LOGGER.info("Download missing BNL/L'Union issues to %s", base_path)
with request.urlopen(LUNION_TEXTS_URL) as resp:
try:
tf = tarfile.open(fileobj=resp, mode='r|gz')
last_vol = None
for ti in tf:
sanitized_name = re.sub(r'^\./?', '', ti.name)
if not sanitized_name:
continue
if ti.isdir() and '/' not in sanitized_name:
if last_vol is not None:
doc_path = base_path / last_vol
mets_path = next(iter(doc_path.glob("*-mets.xml")))
doc_id = last_vol.replace("newspaper_lunion_", "")
yield from bnl_extract_article_docs(
doc_id, mets_path, doc_path / 'text')
last_vol = sanitized_name
if ti.isdir():
(base_path / ti.name).mkdir(parents=True, exist_ok=True)
else:
out_path = base_path / ti.name
with out_path.open('wb') as fp:
fp.write(tf.extractfile(ti).read())
except tarfile.ReadError as _tar_read_error:
LOGGER.error("ERROR processing %s: %s",
tf, _tar_read_error.args[0])
doc_path = base_path / last_vol
mets_path = next(iter(doc_path.glob("*-mets.xml")))
doc_id = last_vol.replace("newspaper_lunion_", "")
yield from bnl_extract_article_docs(
doc_id, mets_path, doc_path / 'text')
else:
for doc_path in base_path.glob('*.hocr'):
hocr = doc_path.read_text()
yield {'id': doc_path.stem.split("_")[1],
'source': 'gbooks',
'ocr_text': '/data/google1000/' + doc_path.name,
**gbooks_parse_metadata(hocr)}
with ProcessPoolExecutor(max_workers=4) as pool:
futs = []
for issue_dir in base_path.iterdir():
if not issue_dir.is_dir() or not issue_dir.name.startswith('15'):
continue
mets_path = next(iter(issue_dir.glob("*-mets.xml")))
doc_id = issue_dir.name.replace("newspaper_lunion_", "")
futs.append(pool.submit(bnl_extract_article_docs, doc_id, mets_path, issue_dir / 'text'))
for fut in as_completed(futs):
yield from fut.result()


def bnl_get_metadata(mods_tree):
Expand Down Expand Up @@ -125,6 +245,8 @@ def bnl_get_article_pointer(path_regions):


def bnl_extract_article_docs(issue_id, mets_path, alto_basedir):
LOGGER.debug("bnl_extract_article_docs %s to %s",
issue_id, mets_path)
mets_tree = etree.parse(str(mets_path))
article_elems = mets_tree.findall(
".//mets:structMap[@TYPE='LOGICAL']//mets:div[@TYPE='ARTICLE']",
Expand Down Expand Up @@ -169,62 +291,21 @@ def bnl_are_volumes_missing(base_path):
return num_pages != 10880


def bnl_load_documents(base_path):
if not base_path.exists():
base_path.mkdir()
if bnl_are_volumes_missing(base_path):
print("Downloading missing BNL/L'Union issues to {}".format(base_path))
base_path.mkdir(exist_ok=True)
with request.urlopen(LUNION_TEXTS_URL) as resp:
tf = tarfile.open(fileobj=resp, mode='r|gz')
last_vol = None
for ti in tf:
sanitized_name = re.sub(r'^\./?', '', ti.name)
if not sanitized_name:
continue
if ti.isdir() and '/' not in sanitized_name:
if last_vol is not None:
vol_path = base_path / last_vol
mets_path = next(iter(vol_path.glob("*-mets.xml")))
vol_id = last_vol.replace("newspaper_lunion_", "")
yield from bnl_extract_article_docs(
vol_id, mets_path, vol_path / 'text')
last_vol = sanitized_name
if ti.isdir():
(base_path / ti.name).mkdir(parents=True, exist_ok=True)
else:
out_path = base_path / ti.name
with out_path.open('wb') as fp:
fp.write(tf.extractfile(ti).read())
vol_path = base_path / last_vol
mets_path = next(iter(vol_path.glob("*-mets.xml")))
vol_id = last_vol.replace("newspaper_lunion_", "")
yield from bnl_extract_article_docs(
vol_id, mets_path, vol_path / 'text')
else:
with ProcessPoolExecutor(max_workers=4) as pool:
futs = []
for issue_dir in base_path.iterdir():
if not issue_dir.is_dir() or not issue_dir.name.startswith('15'):
continue
mets_path = next(iter(issue_dir.glob("*-mets.xml")))
vol_id = issue_dir.name.replace("newspaper_lunion_", "")
futs.append(pool.submit(bnl_extract_article_docs, vol_id, mets_path, issue_dir / 'text'))
for fut in as_completed(futs):
yield from fut.result()


def index_documents(docs):
req = request.Request(
"http://{}/solr/ocr/update?softCommit=true".format(SOLR_HOST),
data=json.dumps(docs).encode('utf8'),
headers={'Content-Type': 'application/json'})
resp = request.urlopen(req)
if resp.status >= 400:
raise SolrException(json.loads(resp.read()), docs)
def _index_documents(docs):
_req_url = f"http://{SOLR_HOST}/solr/ocr/update?softCommit=true"
try:
LOGGER.debug("Push %d documents to %s", len(docs), _req_url)
req = request.Request(_req_url,
data=json.dumps(docs).encode('utf8'),
headers={'Content-Type': 'application/json'})
resp = request.urlopen(req)
if resp.status >= 400:
raise SolrException(json.loads(resp.read()), docs)
except URLError as _url_err:
LOGGER.error("Fail indexing %d documents: %s", len(docs), _url_err)


def generate_batches(it, chunk_size):
def _generate_batches(it, chunk_size):
cur_batch = []
for x in it:
cur_batch.append(x)
Expand All @@ -235,22 +316,24 @@ def generate_batches(it, chunk_size):
yield cur_batch


def _calculate_log_level(the_level) -> int:
if isinstance(the_level, str):
_level_str = str(the_level).lower()
if 'debug' in _level_str:
return DEBUG
return DEFAULT_LOG_LEVEL


if __name__ == '__main__':
with ProcessPoolExecutor(max_workers=8) as pool:
print("Indexing BNL/L'Union articles")
futs = []
bnl_iter = bnl_load_documents(Path(LUNION_PATH))
for idx, batch in enumerate(generate_batches(bnl_iter, 1000)):
futs.append(pool.submit(index_documents, batch))
print("\r{:05}/{}".format((idx+1)*1000, LUNION_NUM_ARTICLES), end='')
for fut in as_completed(futs):
fut.result()
print("\nIndexing Google 1000 Books volumes")
futs = []
gbooks_iter = gbooks_load_documents(Path(GOOGLE1000_PATH))
for idx, batch in enumerate(generate_batches(gbooks_iter, 4)):
futs.append(pool.submit(index_documents, batch))
print("\r{:04}/{}".format((idx+1)*4, GOOGLE1000_NUM_VOLUMES), end='')
for fut in as_completed(futs):
fut.result()
print("\n")
PARSER = ArgumentParser(description='ingest example data into SOLR')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

those aren't really constants either, even if they're in the global scope.

PARSER.add_argument('--log-level', help='like "debug", "info", "error" (default:info)', required=False,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we only use logging for debug statements, I think we only need a --debug flag to toggle debug logging on.

default=DEFAULT_LOG_LEVEL)
PARSER.add_argument('--num-workers', help="how many concurrent processes to start (default:4)", required=False,
default=DEFAULT_N_WORKERS, type=int)
PARSER.add_argument('--books-only', help="if only interested in book corpus (default: False)", required=False, action='store_true')
ARGS = PARSER.parse_args()
LOGGER = Logger(LOGGER_NAME, _calculate_log_level(ARGS.log_level))
STDOUT_HANDLER = StreamHandler(sys.stdout)
STDOUT_HANDLER.setFormatter(Formatter("%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%dT%H:%M:%S"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably stick to logging.basicConfig and not even create a dedicated logger, since this is just a small utility script:

Suggested change
LOGGER = Logger(LOGGER_NAME, _calculate_log_level(ARGS.log_level))
STDOUT_HANDLER = StreamHandler(sys.stdout)
STDOUT_HANDLER.setFormatter(Formatter("%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%dT%H:%M:%S"))
logging.basicConfig(level=logging.DEBUG if args.debug else logging.WARN)

then you can just call logging.debug in your code.

LOGGER.addHandler(STDOUT_HANDLER)
main_ingest(ARGS)
Loading