Skip to content

Commit

Permalink
[infra] replace pycln with ruff (#1485)
Browse files Browse the repository at this point in the history
* pre-commit autoupdate

* run ruff linter and formatter

* remove pycln

* ignore some rules

* make lint

* poetry add ruff --dev

* remove ruff from dev dep

* git checkout apache/main poetry.lock

* add back --exit-non-zero-on-fix
  • Loading branch information
kevinjqliu authored Jan 5, 2025
1 parent acd6f5a commit 59fffe3
Show file tree
Hide file tree
Showing 27 changed files with 1,535 additions and 1,324 deletions.
15 changes: 4 additions & 11 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,19 @@ repos:
- id: check-yaml
- id: check-ast
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version (Used for linting)
rev: v0.7.4
rev: v0.8.6
hooks:
- id: ruff
args: [ --fix, --exit-non-zero-on-fix, --preview ]
args: [ --fix, --exit-non-zero-on-fix ]
- id: ruff-format
args: [ --preview ]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.8.0
rev: v1.14.1
hooks:
- id: mypy
args:
[--install-types, --non-interactive, --config=pyproject.toml]
- repo: https://github.com/hadialqattan/pycln
rev: v2.4.0
hooks:
- id: pycln
args: [--config=pyproject.toml]
- repo: https://github.com/igorshubovych/markdownlint-cli
rev: v0.42.0
rev: v0.43.0
hooks:
- id: markdownlint
args: ["--fix"]
Expand Down
12 changes: 7 additions & 5 deletions pyiceberg/cli/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,10 @@ def version(self, version: str) -> None:
self._out({"version": version})

def describe_refs(self, refs: List[Tuple[str, SnapshotRefType, Dict[str, str]]]) -> None:
self._out([
{"name": name, "type": type, detail_key: detail_val}
for name, type, detail in refs
for detail_key, detail_val in detail.items()
])
self._out(
[
{"name": name, "type": type, detail_key: detail_val}
for name, type, detail in refs
for detail_key, detail_val in detail.items()
]
)
16 changes: 8 additions & 8 deletions pyiceberg/expressions/visitors.py
Original file line number Diff line number Diff line change
Expand Up @@ -1228,7 +1228,7 @@ def visit_less_than(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
# NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more.
return ROWS_MIGHT_MATCH

if lower_bound >= literal.value:
if lower_bound >= literal.value: # type: ignore[operator]
return ROWS_CANNOT_MATCH

return ROWS_MIGHT_MATCH
Expand All @@ -1249,7 +1249,7 @@ def visit_less_than_or_equal(self, term: BoundTerm[L], literal: Literal[L]) -> b
# NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more.
return ROWS_MIGHT_MATCH

if lower_bound > literal.value:
if lower_bound > literal.value: # type: ignore[operator]
return ROWS_CANNOT_MATCH

return ROWS_MIGHT_MATCH
Expand All @@ -1266,7 +1266,7 @@ def visit_greater_than(self, term: BoundTerm[L], literal: Literal[L]) -> bool:

if upper_bound_bytes := self.upper_bounds.get(field_id):
upper_bound = from_bytes(field.field_type, upper_bound_bytes)
if upper_bound <= literal.value:
if upper_bound <= literal.value: # type: ignore[operator]
if self._is_nan(upper_bound):
# NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more.
return ROWS_MIGHT_MATCH
Expand All @@ -1287,7 +1287,7 @@ def visit_greater_than_or_equal(self, term: BoundTerm[L], literal: Literal[L]) -

if upper_bound_bytes := self.upper_bounds.get(field_id):
upper_bound = from_bytes(field.field_type, upper_bound_bytes)
if upper_bound < literal.value:
if upper_bound < literal.value: # type: ignore[operator]
if self._is_nan(upper_bound):
# NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more.
return ROWS_MIGHT_MATCH
Expand All @@ -1312,7 +1312,7 @@ def visit_equal(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
# NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more.
return ROWS_MIGHT_MATCH

if lower_bound > literal.value:
if lower_bound > literal.value: # type: ignore[operator]
return ROWS_CANNOT_MATCH

if upper_bound_bytes := self.upper_bounds.get(field_id):
Expand All @@ -1321,7 +1321,7 @@ def visit_equal(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
# NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more.
return ROWS_MIGHT_MATCH

if upper_bound < literal.value:
if upper_bound < literal.value: # type: ignore[operator]
return ROWS_CANNOT_MATCH

return ROWS_MIGHT_MATCH
Expand Down Expand Up @@ -1349,7 +1349,7 @@ def visit_in(self, term: BoundTerm[L], literals: Set[L]) -> bool:
# NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more.
return ROWS_MIGHT_MATCH

literals = {lit for lit in literals if lower_bound <= lit}
literals = {lit for lit in literals if lower_bound <= lit} # type: ignore[operator]
if len(literals) == 0:
return ROWS_CANNOT_MATCH

Expand All @@ -1359,7 +1359,7 @@ def visit_in(self, term: BoundTerm[L], literals: Set[L]) -> bool:
if self._is_nan(upper_bound):
return ROWS_MIGHT_MATCH

literals = {lit for lit in literals if upper_bound >= lit}
literals = {lit for lit in literals if upper_bound >= lit} # type: ignore[operator]
if len(literals) == 0:
return ROWS_CANNOT_MATCH

Expand Down
44 changes: 25 additions & 19 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2449,27 +2449,31 @@ def _dataframe_to_data_files(
yield from write_file(
io=io,
table_metadata=table_metadata,
tasks=iter([
WriteTask(write_uuid=write_uuid, task_id=next(counter), record_batches=batches, schema=task_schema)
for batches in bin_pack_arrow_table(df, target_file_size)
]),
tasks=iter(
[
WriteTask(write_uuid=write_uuid, task_id=next(counter), record_batches=batches, schema=task_schema)
for batches in bin_pack_arrow_table(df, target_file_size)
]
),
)
else:
partitions = _determine_partitions(spec=table_metadata.spec(), schema=table_metadata.schema(), arrow_table=df)
yield from write_file(
io=io,
table_metadata=table_metadata,
tasks=iter([
WriteTask(
write_uuid=write_uuid,
task_id=next(counter),
record_batches=batches,
partition_key=partition.partition_key,
schema=task_schema,
)
for partition in partitions
for batches in bin_pack_arrow_table(partition.arrow_table_partition, target_file_size)
]),
tasks=iter(
[
WriteTask(
write_uuid=write_uuid,
task_id=next(counter),
record_batches=batches,
partition_key=partition.partition_key,
schema=task_schema,
)
for partition in partitions
for batches in bin_pack_arrow_table(partition.arrow_table_partition, target_file_size)
]
),
)


Expand Down Expand Up @@ -2534,10 +2538,12 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T
partition_columns: List[Tuple[PartitionField, NestedField]] = [
(partition_field, schema.find_field(partition_field.source_id)) for partition_field in spec.fields
]
partition_values_table = pa.table({
str(partition.field_id): partition.transform.pyarrow_transform(field.field_type)(arrow_table[field.name])
for partition, field in partition_columns
})
partition_values_table = pa.table(
{
str(partition.field_id): partition.transform.pyarrow_transform(field.field_type)(arrow_table[field.name])
for partition, field in partition_columns
}
)

# Sort by partitions
sort_indices = pa.compute.sort_indices(
Expand Down
56 changes: 31 additions & 25 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,28 +292,32 @@ def __repr__(self) -> str:


def data_file_with_partition(partition_type: StructType, format_version: TableVersion) -> StructType:
data_file_partition_type = StructType(*[
NestedField(
field_id=field.field_id,
name=field.name,
field_type=field.field_type,
required=field.required,
)
for field in partition_type.fields
])
data_file_partition_type = StructType(
*[
NestedField(
field_id=field.field_id,
name=field.name,
field_type=field.field_type,
required=field.required,
)
for field in partition_type.fields
]
)

return StructType(*[
NestedField(
field_id=102,
name="partition",
field_type=data_file_partition_type,
required=True,
doc="Partition data tuple, schema based on the partition spec",
)
if field.field_id == 102
else field
for field in DATA_FILE_TYPE[format_version].fields
])
return StructType(
*[
NestedField(
field_id=102,
name="partition",
field_type=data_file_partition_type,
required=True,
doc="Partition data tuple, schema based on the partition spec",
)
if field.field_id == 102
else field
for field in DATA_FILE_TYPE[format_version].fields
]
)


class DataFile(Record):
Expand Down Expand Up @@ -398,10 +402,12 @@ def __eq__(self, other: Any) -> bool:


def manifest_entry_schema_with_data_file(format_version: TableVersion, data_file: StructType) -> Schema:
return Schema(*[
NestedField(2, "data_file", data_file, required=True) if field.field_id == 2 else field
for field in MANIFEST_ENTRY_SCHEMAS[format_version].fields
])
return Schema(
*[
NestedField(2, "data_file", data_file, required=True) if field.field_id == 2 else field
for field in MANIFEST_ENTRY_SCHEMAS[format_version].fields
]
)


class ManifestEntry(Record):
Expand Down
14 changes: 8 additions & 6 deletions pyiceberg/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -1707,12 +1707,14 @@ def list(self, list_type: ListType, element_result: Callable[[], bool]) -> bool:
return self._is_field_compatible(list_type.element_field) and element_result()

def map(self, map_type: MapType, key_result: Callable[[], bool], value_result: Callable[[], bool]) -> bool:
return all([
self._is_field_compatible(map_type.key_field),
self._is_field_compatible(map_type.value_field),
key_result(),
value_result(),
])
return all(
[
self._is_field_compatible(map_type.key_field),
self._is_field_compatible(map_type.value_field),
key_result(),
value_result(),
]
)

def primitive(self, primitive: PrimitiveType) -> bool:
return True
32 changes: 17 additions & 15 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,18 +629,20 @@ def delete(
if len(filtered_df) == 0:
replaced_files.append((original_file.file, []))
elif len(df) != len(filtered_df):
replaced_files.append((
original_file.file,
list(
_dataframe_to_data_files(
io=self._table.io,
df=filtered_df,
table_metadata=self.table_metadata,
write_uuid=commit_uuid,
counter=counter,
)
),
))
replaced_files.append(
(
original_file.file,
list(
_dataframe_to_data_files(
io=self._table.io,
df=filtered_df,
table_metadata=self.table_metadata,
write_uuid=commit_uuid,
counter=counter,
)
),
)
)

if len(replaced_files) > 0:
with self.update_snapshot(snapshot_properties=snapshot_properties).overwrite() as overwrite_snapshot:
Expand Down Expand Up @@ -680,9 +682,9 @@ def add_files(
raise ValueError(f"Cannot add files that are already referenced by table, files: {', '.join(referenced_files)}")

if self.table_metadata.name_mapping() is None:
self.set_properties(**{
TableProperties.DEFAULT_NAME_MAPPING: self.table_metadata.schema().name_mapping.model_dump_json()
})
self.set_properties(
**{TableProperties.DEFAULT_NAME_MAPPING: self.table_metadata.schema().name_mapping.model_dump_json()}
)
with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot:
data_files = _parquet_files_to_data_files(
table_metadata=self.table_metadata, file_paths=file_paths, io=self._table.io
Expand Down
Loading

0 comments on commit 59fffe3

Please sign in to comment.