Skip to content

Commit

Permalink
fix(ingestion/s3): groupby group-splitting issue
Browse files Browse the repository at this point in the history
  • Loading branch information
eagle-25 committed Jan 4, 2025
1 parent d2b67ca commit 4019f41
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 11 deletions.
21 changes: 11 additions & 10 deletions metadata-ingestion/src/datahub/ingestion/source/s3/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import pathlib
import re
import time
from collections import defaultdict
from datetime import datetime
from itertools import groupby
from pathlib import PurePath
from typing import Any, Dict, Iterable, List, Optional, Tuple
from urllib.parse import urlparse
Expand Down Expand Up @@ -139,6 +139,14 @@ def partitioned_folder_comparator(folder1: str, folder2: str) -> int:
return 1 if folder1 > folder2 else -1


def _group_s3_objects_by_dirname(s3_objects: Any) -> Dict[str, List[Any]]:
grouped_objects = defaultdict(list)
for obj in s3_objects:
dirname = obj.key.rsplit("/", 1)[0]
grouped_objects[dirname].append(obj)
return grouped_objects


@dataclasses.dataclass
class Folder:
creation_time: datetime
Expand Down Expand Up @@ -863,16 +871,9 @@ def get_folder_info(
Returns:
List[Folder]: A list of Folder objects representing the partitions found.
"""

prefix_to_list = prefix
files = list(
bucket.objects.filter(Prefix=f"{prefix_to_list}").page_size(PAGE_SIZE)
)
files = sorted(files, key=lambda a: a.last_modified)
grouped_files = groupby(files, lambda x: x.key.rsplit("/", 1)[0])

partitions: List[Folder] = []
for key, group in grouped_files:
s3_objects = bucket.objects.filter(Prefix=prefix).page_size(PAGE_SIZE)
for key, group in _group_s3_objects_by_dirname(s3_objects).items():

Check warning on line 876 in metadata-ingestion/src/datahub/ingestion/source/s3/source.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/s3/source.py#L875-L876

Added lines #L875 - L876 were not covered by tests
file_size = 0
creation_time = None
modification_time = None
Expand Down
21 changes: 20 additions & 1 deletion metadata-ingestion/tests/unit/s3/test_s3_source.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
from typing import List, Tuple
from unittest.mock import Mock

import pytest

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.data_lake_common.data_lake_utils import ContainerWUCreator
from datahub.ingestion.source.data_lake_common.path_spec import PathSpec
from datahub.ingestion.source.s3.source import partitioned_folder_comparator
from datahub.ingestion.source.s3.source import (
_group_s3_objects_by_dirname,
partitioned_folder_comparator,
)


def test_partition_comparator_numeric_folder_name():
Expand Down Expand Up @@ -240,3 +244,18 @@ def container_properties_filter(x: MetadataWorkUnit) -> bool:
"folder_abs_path": "my-bucket/my-dir/my-dir2",
"platform": "s3",
}


def test_group_s3_objects_by_dirname():
s3_objects = [
Mock(key="/dir1/file1.txt"),
Mock(key="/dir1/file2.txt"),
Mock(key="/dir2/file3.txt"),
Mock(key="/dir2/file4.txt"),
]

grouped_objects = _group_s3_objects_by_dirname(s3_objects)

assert len(grouped_objects) == 2
assert grouped_objects["/dir1"] == [s3_objects[0], s3_objects[1]]
assert grouped_objects["/dir2"] == [s3_objects[2], s3_objects[3]]

0 comments on commit 4019f41

Please sign in to comment.