Skip to content

Commit

Permalink
Add multi-thread support for Parser._add, Labeler._add and Featurizer…
Browse files Browse the repository at this point in the history
…._add
  • Loading branch information
YasushiMiyata authored and senwu committed Jun 10, 2021
1 parent 5ab8d4f commit 9d794b9
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 19 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ Changed
* `@YasushiMiyata`_: Changed :class:`UDFRunner`'s and :class:`UDF`'s data commit process as follows:
(`#545 <https://github.com/HazyResearch/fonduer/pull/545>`_)

* Removed ``add`` process in :func:`_apply` in :class:`UDFRunner`.
* Added ``add`` and ``commit`` of ``y`` to :class:`UDF`.
* Removed ``add`` process on single-thread in :func:`_apply` in :class:`UDFRunner`.
* Added ``UDFRunner._add`` of ``y`` on multi-threads to :class:`Parser`, :class:`Labeler` and :class:`Featurizer`.
* Removed ``y`` of document parsed result from ``out_queue`` in :class:`UDF`.

Fixed
Expand Down
4 changes: 2 additions & 2 deletions src/fonduer/features/featurizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,11 @@ def get_keys(self) -> List[FeatureKey]:
"""
return list(get_sparse_matrix_keys(self.session, FeatureKey))

def _add(self, records_list: List[List[Dict[str, Any]]]) -> None:
def _add(self, session: Session, records_list: List[List[Dict[str, Any]]]) -> None:
# Make a flat list of all records from the list of list of records.
# This helps reduce the number of queries needed to update.
all_records = list(itertools.chain.from_iterable(records_list))
batch_upsert_records(self.session, Feature, all_records)
batch_upsert_records(session, Feature, all_records)

def clear(self, train: bool = False, split: int = 0) -> None: # type: ignore
"""Delete Features of each class from the database.
Expand Down
12 changes: 9 additions & 3 deletions src/fonduer/parser/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,11 @@ def apply( # type: ignore
progress_bar=progress_bar,
)

def _add(self, doc: Union[Document, None]) -> None:
def _add(self, session: Session, doc: Union[Document, None]) -> None:
# Persist the object if no error happens during parsing.
if doc:
self.session.add(doc)
self.session.commit()
session.add(doc)
session.commit()

def clear(self) -> None: # type: ignore
"""Clear all of the ``Context`` objects in the database."""
Expand All @@ -156,6 +156,12 @@ def get_documents(self) -> List[Document]:
:return: A list of all ``Documents`` in the database ordered by name.
"""
# return (
# self.session.query(Document, Sentence)
# .join(Sentence, Document.id == Sentence.document_id)
# .all()
# )
# return self.session.query(Sentence).order_by(Sentence.name).all()
return self.session.query(Document).order_by(Document.name).all()


Expand Down
4 changes: 2 additions & 2 deletions src/fonduer/supervision/labeler.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,9 @@ def drop_keys(

drop_keys(self.session, LabelKey, key_map)

def _add(self, records_list: List[List[Dict[str, Any]]]) -> None:
def _add(self, session: Session, records_list: List[List[Dict[str, Any]]]) -> None:
for records in records_list:
batch_upsert_records(self.session, self.table, records)
batch_upsert_records(session, self.table, records)

def clear( # type: ignore
self,
Expand Down
25 changes: 15 additions & 10 deletions src/fonduer/utils/udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Any, Collection, Dict, List, Optional, Set, Type, Union

from sqlalchemy import inspect
from sqlalchemy.orm import Session
from sqlalchemy.orm import Session, scoped_session, sessionmaker

from fonduer.meta import Meta, new_sessionmaker
from fonduer.parser.models.document import Document
Expand Down Expand Up @@ -94,7 +94,7 @@ def _after_apply(self, **kwargs: Any) -> None:
"""Execute this method by a single process after apply."""
pass

def _add(self, instance: Any) -> None:
def _add(self, session: Session, instance: Any) -> None:
pass

def _apply(
Expand All @@ -114,9 +114,13 @@ def _apply(
# Clear the last documents parsed by the last run
self.last_docs = set()

# Create DB session factory for insert data on each UDF (#545)
session_factory = new_sessionmaker()
# Create UDF Processes
for i in range(parallelism):
udf = self.udf_class(
session_factory=session_factory,
runner=self,
in_queue=in_queue,
out_queue=out_queue,
worker_id=i,
Expand Down Expand Up @@ -164,8 +168,6 @@ def in_thread_func() -> None:
# Flush the processes
self.udfs = []

self.session.commit()


class UDF(Process):
"""UDF class."""
Expand All @@ -174,6 +176,8 @@ class UDF(Process):

def __init__(
self,
session_factory: sessionmaker = None,
runner: UDFRunner = None,
in_queue: Optional[Queue] = None,
out_queue: Optional[Queue] = None,
worker_id: int = 0,
Expand All @@ -187,6 +191,8 @@ def __init__(
"""
super().__init__()
self.daemon = True
self.session_factory = session_factory
self.runner = runner
self.in_queue = in_queue
self.out_queue = out_queue
self.worker_id = worker_id
Expand All @@ -201,9 +207,9 @@ def run(self) -> None:
multiprocess setting The basic routine is: get from JoinableQueue,
apply, put / add outputs, loop
"""
# Each UDF starts its own Engine
# See SQLalchemy, using connection pools with multiprocessing.
Session = new_sessionmaker()
# Each UDF get thread local (scoped) session from connection pools
# See SQLalchemy, using scoped sesion with multiprocessing.
Session = scoped_session(self.session_factory)
session = Session()
while True:
doc = self.in_queue.get() # block until an item is available
Expand All @@ -214,12 +220,11 @@ def run(self) -> None:
if not inspect(doc).transient:
doc = session.merge(doc, load=False)
y = self.apply(doc, **self.apply_kwargs)
if y:
session.add(y)
session.commit()
self.runner._add(session, y)
self.out_queue.put(doc.name)
session.commit()
session.close()
Session.remove()

def apply(
self, doc: Document, **kwargs: Any
Expand Down

0 comments on commit 9d794b9

Please sign in to comment.