-
Notifications
You must be signed in to change notification settings - Fork 1.3k
[python] Add schema short-circuit to SplitRead and FileScanner read paths #8217
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
6f56c7c
e9f1267
9f5d521
8817e84
d43d8d9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -167,6 +167,14 @@ def _compute_nested_path_by_name(self) -> Optional[Dict[str, List[str]]]: | |
| def _nested_path_by_name(self) -> Optional[Dict[str, List[str]]]: | ||
| return self._cached_nested_path_by_name | ||
|
|
||
| def _resolve_schema(self, schema_id: int): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is still another direct schema-manager access in DataEvolutionSplitRead._create_union_reader(): when a regular file has no write_cols, it calls self.table.schema_manager.get_schema(first_file.schema_id) to derive field ids. If first_file.schema_id is the current schema id, this bypasses _resolve_schema() and can trigger the same REST-catalog 403 on data-evolution reads. Please switch that remaining call to self._resolve_schema(first_file.schema_id) too. |
||
| """Resolve schema, short-circuiting current table schema id to avoid | ||
| filesystem access (REST catalog would get 403). | ||
| """ | ||
| if schema_id == self.table.table_schema.id: | ||
| return self.table.table_schema | ||
| return self.table.schema_manager.get_schema(schema_id) | ||
|
|
||
| def _push_down_predicate(self) -> Optional[Predicate]: | ||
| if self.predicate is None: | ||
| return None | ||
|
|
@@ -304,8 +312,7 @@ def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool, | |
| if has_nested: | ||
| raise NotImplementedError( | ||
| "Nested-field projection is not supported on ROW files") | ||
| file_schema = self.table.schema_manager.get_schema( | ||
| file.schema_id) | ||
| file_schema = self._resolve_schema(file.schema_id) | ||
| if file.write_cols: | ||
| field_map = {f.name: f for f in file_schema.fields} | ||
| row_full_fields = [field_map[n] for n in file.write_cols | ||
|
|
@@ -389,7 +396,7 @@ def _get_fields_and_predicate(self, schema_id: int, read_fields): | |
| key = (schema_id, tuple(read_fields)) | ||
| if key not in self.schema_id_2_fields: | ||
| nested_path_by_name = self._nested_path_by_name() | ||
| schema = self.table.schema_manager.get_schema(schema_id) | ||
| schema = self._resolve_schema(schema_id) | ||
| schema_fields = ( | ||
| SpecialFields.row_type_with_row_tracking(schema.fields) | ||
| if self.row_tracking_enabled else schema.fields | ||
|
|
@@ -461,7 +468,7 @@ def _file_read_fields(self, file: DataFileMeta) -> Optional[List[DataField]]: | |
| nested-projection reads.""" | ||
| if self._nested_path_by_name() is not None: | ||
| return None | ||
| file_schema = self.table.schema_manager.get_schema(file.schema_id) | ||
| file_schema = self._resolve_schema(file.schema_id) | ||
| if file_schema is None: | ||
| return None | ||
| return self._final_data_fields_from( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,94 @@ | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
|
|
||
| import unittest | ||
| from unittest.mock import MagicMock | ||
|
|
||
| from pypaimon.read.scanner.file_scanner import FileScanner | ||
| from pypaimon.schema.data_types import AtomicType, DataField | ||
|
|
||
|
|
||
| class FileScannerSchemaFieldsShortCircuitTest(unittest.TestCase): | ||
| """Test _schema_fields short-circuits current schema id (REST catalog 403 fix).""" | ||
|
|
||
| def _make_scanner(self, current_schema_id, current_fields, | ||
| historical_fields_map=None): | ||
| """Build a FileScanner bypassing __init__ to isolate _schema_fields.""" | ||
| scanner = FileScanner.__new__(FileScanner) | ||
| scanner.table = MagicMock() | ||
| scanner.table.table_schema.id = current_schema_id | ||
| scanner.table.table_schema.fields = current_fields | ||
| scanner.table.schema_manager = MagicMock() | ||
|
|
||
| def get_schema(schema_id): | ||
| if historical_fields_map is None or schema_id not in historical_fields_map: | ||
| raise AssertionError( | ||
| f"schema_manager.get_schema({schema_id}) was called " | ||
| "but no historical schema was registered for it" | ||
| ) | ||
| historical = MagicMock() | ||
| historical.fields = historical_fields_map[schema_id] | ||
| return historical | ||
|
|
||
| scanner.table.schema_manager.get_schema.side_effect = get_schema | ||
| return scanner | ||
|
|
||
| def test_short_circuits_current_schema_id(self): | ||
| """Current schema id returns in-memory fields without filesystem access.""" | ||
| current_fields = [DataField(0, "a", AtomicType("INT"))] | ||
| scanner = self._make_scanner( | ||
| current_schema_id=5, current_fields=current_fields | ||
| ) | ||
|
|
||
| result = scanner._schema_fields(5) | ||
|
|
||
| self.assertIs(result, current_fields) | ||
| scanner.table.schema_manager.get_schema.assert_not_called() | ||
|
|
||
| def test_delegates_for_historical_schema(self): | ||
| """Historical schema id delegates to schema_manager.get_schema().""" | ||
| current_fields = [DataField(0, "a", AtomicType("INT"))] | ||
| historical_fields = [ | ||
| DataField(0, "a", AtomicType("INT")), | ||
| DataField(1, "b", AtomicType("STRING")), | ||
| ] | ||
| scanner = self._make_scanner( | ||
| current_schema_id=5, | ||
| current_fields=current_fields, | ||
| historical_fields_map={3: historical_fields}, | ||
| ) | ||
|
|
||
| result = scanner._schema_fields(3) | ||
|
|
||
| self.assertEqual(result, historical_fields) | ||
| scanner.table.schema_manager.get_schema.assert_called_once_with(3) | ||
|
|
||
| def test_short_circuit_works_for_zero_schema_id(self): | ||
| """Schema id == 0 still short-circuits (guards against truthiness bugs).""" | ||
| current_fields = [DataField(0, "x", AtomicType("INT"))] | ||
| scanner = self._make_scanner( | ||
| current_schema_id=0, current_fields=current_fields | ||
| ) | ||
|
|
||
| result = scanner._schema_fields(0) | ||
|
|
||
| self.assertIs(result, current_fields) | ||
| scanner.table.schema_manager.get_schema.assert_not_called() | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| unittest.main() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This only protects the SimpleStatsEvolutions callback, but a normal scan still resolves the file schema while decoding manifest entries before _filter_manifest_entry() can use this helper. ManifestFileManager.read() still calls table.schema_manager.get_schema(file_dict["_SCHEMA_ID"]).fields for every entry, so current-schema files from a REST catalog can still hit the same 403 before this short-circuit is reached. Please apply the same current-schema short-circuit in the manifest decode path as well, or pass a resolver into ManifestFileManager.