Skip to content

Commit

Permalink
Merge branch 'main' into feat/doughnut-http-endpoint-1
Browse files Browse the repository at this point in the history
  • Loading branch information
edknv committed Nov 22, 2024
2 parents 187d13d + 68e7cc7 commit 51b5bca
Show file tree
Hide file tree
Showing 9 changed files with 827 additions and 425 deletions.
1 change: 1 addition & 0 deletions client/client_examples/examples/python_client_usage.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,7 @@
"job_spec.add_task(extract_task)\n",
"job_spec.add_task(dedup_task)\n",
"job_spec.add_task(filter_task)\n",
"job_spec.add_task(split_task)\n",
"job_spec.add_task(store_task)\n",
"job_spec.add_task(embed_task)\n",
"job_spec.add_task(vdb_upload_task)"
Expand Down
2 changes: 1 addition & 1 deletion client/src/nv_ingest_client/cli/util/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ def create_and_process_jobs(
failed_jobs.append(f"{job_id}::{source_name}")
except RuntimeError as e:
source_name = job_id_map[job_id]
logger.error(f"Error while processing {job_id}({source_name}) {e}")
logger.error(f"Error while processing '{job_id}' - ({source_name}):\n{e}")
failed_jobs.append(f"{job_id}::{source_name}")
except Exception as e:
traceback.print_exc()
Expand Down
39 changes: 25 additions & 14 deletions client/src/nv_ingest_client/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def __init__(

self._current_message_id = 0
self._job_states = {}
self._job_index_to_job_spec = {}
self._message_client_hostname = message_client_hostname or "localhost"
self._message_client_port = message_client_port or 7670
self._message_counter_id = msg_counter_id or "nv-ingest-message-id"
Expand Down Expand Up @@ -177,16 +178,19 @@ def _add_single_job(self, job_spec: JobSpec) -> str:

return job_index

def add_job(self, job_spec: Union[BatchJobSpec, JobSpec]) -> str:
def add_job(self, job_spec: Union[BatchJobSpec, JobSpec]) -> Union[str, List[str]]:
if isinstance(job_spec, JobSpec):
job_index = self._add_single_job(job_spec)
self._job_index_to_job_spec[job_index] = job_spec

return job_index
elif isinstance(job_spec, BatchJobSpec):
job_indexes = []
for _, job_specs in job_spec.job_specs.items():
for job in job_specs:
job_index = self._add_single_job(job)
job_indexes.append(job_index)
self._job_index_to_job_spec[job_index] = job
return job_indexes
else:
raise ValueError(f"Unexpected type: {type(job_spec)}")
Expand Down Expand Up @@ -241,7 +245,8 @@ def create_job(
extended_options=extended_options,
)

return self.add_job(job_spec)
job_id = self.add_job(job_spec)
return job_id

def add_task(self, job_index: str, task: Task) -> None:
job_state = self._get_and_check_job_state(job_index, required_state=JobStateEnum.PENDING)
Expand Down Expand Up @@ -295,7 +300,8 @@ def _fetch_job_result(self, job_index: str, timeout: float = 100, data_only: boo
"""

try:
job_state = self._get_and_check_job_state(job_index, required_state=[JobStateEnum.SUBMITTED, JobStateEnum.SUBMITTED_ASYNC])
job_state = self._get_and_check_job_state(job_index, required_state=[JobStateEnum.SUBMITTED,
JobStateEnum.SUBMITTED_ASYNC])
response = self._message_client.fetch_message(job_state.job_id, timeout)

if response is not None:
Expand Down Expand Up @@ -345,12 +351,12 @@ def _fetch_job_result_wait(self, job_id: str, timeout: float = 60, data_only: bo
# This is the direct Python approach function for retrieving jobs which handles the timeouts directly
# in the function itself instead of expecting the user to handle it themselves
def fetch_job_result(
self,
job_ids: Union[str, List[str]],
timeout: float = 100,
max_retries: Optional[int] = None,
retry_delay: float = 1,
verbose: bool = False,
self,
job_ids: Union[str, List[str]],
timeout: float = 100,
max_retries: Optional[int] = None,
retry_delay: float = 1,
verbose: bool = False,
) -> List[Tuple[Optional[Dict], str]]:
"""
Fetches job results for multiple job IDs concurrently with individual timeouts and retry logic.
Expand Down Expand Up @@ -410,14 +416,19 @@ def fetch_with_retries(job_id: str):
try:
result = handle_future_result(future, timeout=timeout)
results.append(result.get("data"))
del self._job_index_to_job_spec[job_id]
except concurrent.futures.TimeoutError:
logger.error(f"Timeout while fetching result for job ID {job_id}")
logger.error(
f"Timeout while fetching result for job ID {job_id}: {self._job_index_to_job_spec[job_id].source_id}")
except json.JSONDecodeError as e:
logger.error(f"Decoding while processing job ID {job_id}: {e}")
logger.error(
f"Decoding while processing job ID {job_id}: {self._job_index_to_job_spec[job_id].source_id}\n{e}")
except RuntimeError as e:
logger.error(f"Error while processing job ID {job_id}: {e}")
logger.error(
f"Error while processing job ID {job_id}: {self._job_index_to_job_spec[job_id].source_id}\n{e}")
except Exception as e:
logger.error(f"Error while fetching result for job ID {job_id}: {e}")
logger.error(
f"Error while fetching result for job ID {job_id}: {self._job_index_to_job_spec[job_id].source_id}\n{e}")

return results

Expand Down Expand Up @@ -585,7 +596,7 @@ def submit_job_async(self, job_indices: Union[str, List[str]], job_queue_id: str

return future_to_job_index

def create_jobs_for_batch(self, files_batch: List[str], tasks: Dict[str, Any]) -> List[JobSpec]:
def create_jobs_for_batch(self, files_batch: List[str], tasks: Dict[str, Any]) -> List[str]:
"""
Create and submit job specifications (JobSpecs) for a batch of files, returning the job IDs.
This function takes a batch of files, processes each file to extract its content and type,
Expand Down
16 changes: 9 additions & 7 deletions client/src/nv_ingest_client/client/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ class Ingestor:
"""

def __init__(
self,
documents: Optional[List[str]] = None,
client: Optional[NvIngestClient] = None,
job_queue_id: str = DEFAULT_JOB_QUEUE_ID,
**kwargs,
self,
documents: Optional[List[str]] = None,
client: Optional[NvIngestClient] = None,
job_queue_id: str = DEFAULT_JOB_QUEUE_ID,
**kwargs,
):
self._documents = documents or []
self._client = client
Expand All @@ -83,6 +83,7 @@ def __init__(
self._job_specs = None
self._job_ids = None
self._job_states = None
self._job_id_to_source_id = {}

if self._check_files_local():
self._job_specs = BatchJobSpec(self._documents)
Expand Down Expand Up @@ -242,6 +243,7 @@ def ingest_async(self, **kwargs: Any) -> Future:
self._prepare_ingest_run()

self._job_ids = self._client.add_job(self._job_specs)

future_to_job_id = self._client.submit_job_async(self._job_ids, self._job_queue_id, **kwargs)
self._job_states = {job_id: self._client._get_and_check_job_state(job_id) for job_id in self._job_ids}

Expand Down Expand Up @@ -300,8 +302,8 @@ def all_tasks(self) -> "Ingestor":
.filter() \
.split() \
.embed()
# .store() \
# .vdb_upload()
# .store() \
# .vdb_upload()
# fmt: on
return self

Expand Down
16 changes: 13 additions & 3 deletions client/src/nv_ingest_client/util/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

logger = logging.getLogger(__name__)


# pylint: disable=invalid-name
# pylint: disable=missing-class-docstring
# pylint: disable=logging-fstring-interpolation
Expand Down Expand Up @@ -257,14 +256,25 @@ def check_ingest_result(json_payload: Dict) -> typing.Tuple[bool, str]:
)

is_failed = json_payload.get("status", "") in "failed"
description = json_payload.get("description", "")
description = ""
if (is_failed):
try:
source_id = json_payload.get("data", [])[0].get("metadata", {}).get("source_metadata", {}).get(
"source_name",
"")
except Exception as e:
source_id = ""

description = f"[{source_id}]: {json_payload.get('status', '')}\n"

description += (json_payload.get("description", ""))

# Look to see if we have any failure annotations to augment the description
if is_failed and "annotations" in json_payload:
for annot_id, value in json_payload["annotations"].items():
if "task_result" in value and value["task_result"] == "FAILURE":
message = value.get("message", "Unknown")
description = f"\n↪ Event that caused this failure: {annot_id} -> {message}"
description += f"\n↪ Event that caused this failure: {annot_id} -> {message}"
break

return is_failed, description
Expand Down
Loading

0 comments on commit 51b5bca

Please sign in to comment.