Skip to content

Commit

Permalink
backward compat (apache#1151)
Browse files Browse the repository at this point in the history
  • Loading branch information
sungwy committed Sep 13, 2024
1 parent d8d509f commit 1d4ed06
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 34 deletions.
65 changes: 63 additions & 2 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,15 @@
UpgradeFormatVersionUpdate,
update_table_metadata,
)
from pyiceberg.table.update.schema import UpdateSchema
from pyiceberg.table.update.snapshot import ManageSnapshots, UpdateSnapshot
from pyiceberg.table.update.schema import UpdateSchema, _Move, _MoveOperation
from pyiceberg.table.update.snapshot import (
ManageSnapshots,
UpdateSnapshot,
_DeleteFiles,
_FastAppendFiles,
_MergeAppendFiles,
_OverwriteFiles,
)
from pyiceberg.table.update.spec import UpdateSpec
from pyiceberg.typedef import (
EMPTY_DICT,
Expand Down Expand Up @@ -1464,3 +1471,57 @@ def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List
from pyiceberg.io.pyarrow import parquet_files_to_data_files

yield from parquet_files_to_data_files(io=io, table_metadata=table_metadata, file_paths=iter(file_paths))


@deprecated(
deprecated_in="0.8.0",
removed_in="0.9.0",
help_message="pyiceberg.table.Move has been changed to private class pyiceberg.table.update.schema._Move",
)
def Move(*args: Any, **kwargs: Any) -> _Move:
return _Move(*args, **kwargs)


@deprecated(
deprecated_in="0.8.0",
removed_in="0.9.0",
help_message="pyiceberg.table.MoveOperation has been changed to private class pyiceberg.table.update.schema._MoveOperation",
)
def MoveOperation(*args: Any, **kwargs: Any) -> _MoveOperation:
return _MoveOperation(*args, **kwargs)


@deprecated(
deprecated_in="0.8.0",
removed_in="0.9.0",
help_message="pyiceberg.table.DeleteFiles has been changed to private class pyiceberg.table.update.snapshot._DeleteFiles",
)
def DeleteFiles(*args: Any, **kwargs: Any) -> _DeleteFiles:
return _DeleteFiles(*args, **kwargs)


@deprecated(
deprecated_in="0.8.0",
removed_in="0.9.0",
help_message="pyiceberg.table.FastAppendFiles has been changed to private class pyiceberg.table.update.snapshot._FastAppendFiles",
)
def FastAppendFiles(*args: Any, **kwargs: Any) -> _FastAppendFiles:
return _FastAppendFiles(*args, **kwargs)


@deprecated(
deprecated_in="0.8.0",
removed_in="0.9.0",
help_message="pyiceberg.table.MergeAppendFiles has been changed to private class pyiceberg.table.update.snapshot._MergeAppendFiles",
)
def MergeAppendFiles(*args: Any, **kwargs: Any) -> _MergeAppendFiles:
return _MergeAppendFiles(*args, **kwargs)


@deprecated(
deprecated_in="0.8.0",
removed_in="0.9.0",
help_message="pyiceberg.table.OverwriteFiles has been changed to private class pyiceberg.table.update.snapshot._OverwriteFiles",
)
def OverwriteFiles(*args: Any, **kwargs: Any) -> _OverwriteFiles:
return _OverwriteFiles(*args, **kwargs)
42 changes: 23 additions & 19 deletions pyiceberg/table/update/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,17 @@
TABLE_ROOT_ID = -1


class MoveOperation(Enum):
class _MoveOperation(Enum):
First = 1
Before = 2
After = 3


@dataclass
class Move:
class _Move:
field_id: int
full_name: str
op: MoveOperation
op: _MoveOperation
other_field_id: Optional[int] = None


Expand All @@ -79,7 +79,7 @@ class UpdateSchema(UpdateTableMetadata["UpdateSchema"]):
_adds: Dict[int, List[NestedField]] = {}
_updates: Dict[int, NestedField] = {}
_deletes: Set[int] = set()
_moves: Dict[int, List[Move]] = {}
_moves: Dict[int, List[_Move]] = {}

_added_name_to_id: Dict[str, int] = {}
# Part of https://github.com/apache/iceberg/pull/8393
Expand Down Expand Up @@ -146,7 +146,7 @@ def union_by_name(self, new_schema: Union[Schema, "pa.Schema"]) -> UpdateSchema:
visit_with_partner(
Catalog._convert_schema_if_needed(new_schema),
-1,
UnionByNameVisitor(update_schema=self, existing_schema=self._schema, case_sensitive=self._case_sensitive),
_UnionByNameVisitor(update_schema=self, existing_schema=self._schema, case_sensitive=self._case_sensitive),
# type: ignore
PartnerIdByNameAccessor(partner_schema=self._schema, case_sensitive=self._case_sensitive),
)
Expand Down Expand Up @@ -410,13 +410,13 @@ def _find_for_move(self, name: str) -> Optional[int]:

return self._added_name_to_id.get(name)

def _move(self, move: Move) -> None:
def _move(self, move: _Move) -> None:
if parent_name := self._id_to_parent.get(move.field_id):
parent_field = self._schema.find_field(parent_name, case_sensitive=self._case_sensitive)
if not parent_field.field_type.is_struct:
raise ValueError(f"Cannot move fields in non-struct type: {parent_field.field_type}")

if move.op == MoveOperation.After or move.op == MoveOperation.Before:
if move.op == _MoveOperation.After or move.op == _MoveOperation.Before:
if move.other_field_id is None:
raise ValueError("Expected other field when performing before/after move")

Expand All @@ -426,7 +426,7 @@ def _move(self, move: Move) -> None:
self._moves[parent_field.field_id] = self._moves.get(parent_field.field_id, []) + [move]
else:
# In the top level field
if move.op == MoveOperation.After or move.op == MoveOperation.Before:
if move.op == _MoveOperation.After or move.op == _MoveOperation.Before:
if move.other_field_id is None:
raise ValueError("Expected other field when performing before/after move")

Expand All @@ -451,7 +451,7 @@ def move_first(self, path: Union[str, Tuple[str, ...]]) -> UpdateSchema:
if field_id is None:
raise ValueError(f"Cannot move missing column: {full_name}")

self._move(Move(field_id=field_id, full_name=full_name, op=MoveOperation.First))
self._move(_Move(field_id=field_id, full_name=full_name, op=_MoveOperation.First))

return self

Expand Down Expand Up @@ -485,7 +485,7 @@ def move_before(self, path: Union[str, Tuple[str, ...]], before_path: Union[str,
if field_id == before_field_id:
raise ValueError(f"Cannot move {full_name} before itself")

self._move(Move(field_id=field_id, full_name=full_name, other_field_id=before_field_id, op=MoveOperation.Before))
self._move(_Move(field_id=field_id, full_name=full_name, other_field_id=before_field_id, op=_MoveOperation.Before))

return self

Expand Down Expand Up @@ -514,7 +514,7 @@ def move_after(self, path: Union[str, Tuple[str, ...]], after_name: Union[str, T
if field_id == after_field_id:
raise ValueError(f"Cannot move {full_name} after itself")

self._move(Move(field_id=field_id, full_name=full_name, other_field_id=after_field_id, op=MoveOperation.After))
self._move(_Move(field_id=field_id, full_name=full_name, other_field_id=after_field_id, op=_MoveOperation.After))

return self

Expand Down Expand Up @@ -592,10 +592,14 @@ class _ApplyChanges(SchemaVisitor[Optional[IcebergType]]):
_adds: Dict[int, List[NestedField]]
_updates: Dict[int, NestedField]
_deletes: Set[int]
_moves: Dict[int, List[Move]]
_moves: Dict[int, List[_Move]]

def __init__(
self, adds: Dict[int, List[NestedField]], updates: Dict[int, NestedField], deletes: Set[int], moves: Dict[int, List[Move]]
self,
adds: Dict[int, List[NestedField]],
updates: Dict[int, NestedField],
deletes: Set[int],
moves: Dict[int, List[_Move]],
) -> None:
self._adds = adds
self._updates = updates
Expand Down Expand Up @@ -715,7 +719,7 @@ def primitive(self, primitive: PrimitiveType) -> Optional[IcebergType]:
return primitive


class UnionByNameVisitor(SchemaWithPartnerVisitor[int, bool]):
class _UnionByNameVisitor(SchemaWithPartnerVisitor[int, bool]):
update_schema: UpdateSchema
existing_schema: Schema
case_sensitive: bool
Expand Down Expand Up @@ -873,20 +877,20 @@ def _add_fields(fields: Tuple[NestedField, ...], adds: Optional[List[NestedField
return fields + tuple(adds)


def _move_fields(fields: Tuple[NestedField, ...], moves: List[Move]) -> Tuple[NestedField, ...]:
def _move_fields(fields: Tuple[NestedField, ...], moves: List[_Move]) -> Tuple[NestedField, ...]:
reordered = list(copy(fields))
for move in moves:
# Find the field that we're about to move
field = next(field for field in reordered if field.field_id == move.field_id)
# Remove the field that we're about to move from the list
reordered = [field for field in reordered if field.field_id != move.field_id]

if move.op == MoveOperation.First:
if move.op == _MoveOperation.First:
reordered = [field] + reordered
elif move.op == MoveOperation.Before or move.op == MoveOperation.After:
elif move.op == _MoveOperation.Before or move.op == _MoveOperation.After:
other_field_id = move.other_field_id
other_field_pos = next(i for i, field in enumerate(reordered) if field.field_id == other_field_id)
if move.op == MoveOperation.Before:
if move.op == _MoveOperation.Before:
reordered.insert(other_field_pos, field)
else:
reordered.insert(other_field_pos + 1, field)
Expand All @@ -897,7 +901,7 @@ def _move_fields(fields: Tuple[NestedField, ...], moves: List[Move]) -> Tuple[Ne


def _add_and_move_fields(
fields: Tuple[NestedField, ...], adds: List[NestedField], moves: List[Move]
fields: Tuple[NestedField, ...], adds: List[NestedField], moves: List[_Move]
) -> Optional[Tuple[NestedField, ...]]:
if len(adds) > 0:
# always apply adds first so that added fields can be moved
Expand Down
26 changes: 13 additions & 13 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ def fetch_manifest_entry(self, manifest: ManifestFile, discard_deleted: bool = T
return manifest.fetch_manifest_entry(io=self._io, discard_deleted=discard_deleted)


class DeleteFiles(_SnapshotProducer["DeleteFiles"]):
class _DeleteFiles(_SnapshotProducer["_DeleteFiles"]):
"""Will delete manifest entries from the current snapshot based on the predicate.
This will produce a DELETE snapshot:
Expand Down Expand Up @@ -443,7 +443,7 @@ def files_affected(self) -> bool:
return len(self._deleted_entries()) > 0


class FastAppendFiles(_SnapshotProducer["FastAppendFiles"]):
class _FastAppendFiles(_SnapshotProducer["_FastAppendFiles"]):
def _existing_manifests(self) -> List[ManifestFile]:
"""To determine if there are any existing manifest files.
Expand Down Expand Up @@ -472,7 +472,7 @@ def _deleted_entries(self) -> List[ManifestEntry]:
return []


class MergeAppendFiles(FastAppendFiles):
class _MergeAppendFiles(_FastAppendFiles):
_target_size_bytes: int
_min_count_to_merge: int
_merge_enabled: bool
Expand Down Expand Up @@ -507,7 +507,7 @@ def __init__(
def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile]:
"""To perform any post-processing on the manifests before writing them to the new snapshot.
In MergeAppendFiles, we merge manifests based on the target size and the minimum count to merge
In _MergeAppendFiles, we merge manifests based on the target size and the minimum count to merge
if automatic merge is enabled.
"""
unmerged_data_manifests = [manifest for manifest in manifests if manifest.content == ManifestContent.DATA]
Expand All @@ -523,7 +523,7 @@ def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile
return data_manifest_merge_manager.merge_manifests(unmerged_data_manifests) + unmerged_deletes_manifests


class OverwriteFiles(_SnapshotProducer["OverwriteFiles"]):
class _OverwriteFiles(_SnapshotProducer["_OverwriteFiles"]):
"""Overwrites data from the table. This will produce an OVERWRITE snapshot.
Data and delete files were added and removed in a logical overwrite operation.
Expand Down Expand Up @@ -610,18 +610,18 @@ def __init__(self, transaction: Transaction, io: FileIO, snapshot_properties: Di
self._io = io
self._snapshot_properties = snapshot_properties

def fast_append(self) -> FastAppendFiles:
return FastAppendFiles(
def fast_append(self) -> _FastAppendFiles:
return _FastAppendFiles(
operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties
)

def merge_append(self) -> MergeAppendFiles:
return MergeAppendFiles(
def merge_append(self) -> _MergeAppendFiles:
return _MergeAppendFiles(
operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties
)

def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> OverwriteFiles:
return OverwriteFiles(
def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> _OverwriteFiles:
return _OverwriteFiles(
commit_uuid=commit_uuid,
operation=Operation.OVERWRITE
if self._transaction.table_metadata.current_snapshot() is not None
Expand All @@ -631,8 +631,8 @@ def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> OverwriteFiles:
snapshot_properties=self._snapshot_properties,
)

def delete(self) -> DeleteFiles:
return DeleteFiles(
def delete(self) -> _DeleteFiles:
return _DeleteFiles(
operation=Operation.DELETE,
transaction=self._transaction,
io=self._io,
Expand Down
15 changes: 15 additions & 0 deletions tests/table/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -1243,3 +1243,18 @@ def test_update_metadata_log_overflow(table_v2: Table) -> None:
table_v2.metadata_location,
)
assert len(new_metadata.metadata_log) == 1


def test_table_module_refactoring_backward_compatibility() -> None:
# TODO: Remove this in 0.9.0
try:
from pyiceberg.table import ( # noqa: F401
DeleteFiles,
FastAppendFiles,
MergeAppendFiles,
Move,
MoveOperation,
OverwriteFiles,
)
except Exception as exc:
raise pytest.fail("Importing moved modules should not raise an exception") from exc

0 comments on commit 1d4ed06

Please sign in to comment.