Skip to content

Commit 56bcb5e

Browse files
authored
Introduce batch write api (#4)
1 parent 107ab37 commit 56bcb5e

File tree

8 files changed

+204
-7
lines changed

8 files changed

+204
-7
lines changed

.github/workflows/paimon-python-checks.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ jobs:
3535
runs-on: ubuntu-latest
3636

3737
steps:
38+
- name: Checkout code
39+
uses: actions/checkout@v2
3840
- name: Run lint-python.sh
3941
run: |
4042
chmod +x dev/lint-python.sh

dev/lint-python.sh

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,9 @@ function get_os_index() {
149149
local sys_os=$(uname -s)
150150
echo "Detected OS: ${sys_os}"
151151
if [ ${sys_os} == "Darwin" ]; then
152-
return 0
152+
echo 0
153153
elif [[ ${sys_os} == "Linux" ]]; then
154-
return 1
154+
echo 1
155155
else
156156
echo "Unsupported OS: ${sys_os}"
157157
exit 1
@@ -360,8 +360,13 @@ function install_environment() {
360360
print_function "STAGE" "installing environment"
361361

362362
#get the index of the SUPPORT_OS array for convenient to install tool.
363-
get_os_index $sys_os
364-
local os_index=$?
363+
local os_index=$(get_os_index | tail -n1)
364+
365+
# In some Linux distributions, md5sum is installed instead of md5. But our miniconda installation shell uses md5
366+
if [ "$os_index" -eq 1 ] && [ ! -f /usr/local/bin/md5 ]; then
367+
echo "Creating symlink for md5 to md5sum..."
368+
sudo ln -s $(which md5sum) /usr/local/bin/md5
369+
fi
365370

366371
# step-1 install wget
367372
# the file size of the miniconda.sh is too big to use "wget" tool to download instead

java_based_implementation/api_impl.py

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818

1919
from java_based_implementation.java_gateway import get_gateway
2020
from java_based_implementation.util.java_utils import to_j_catalog_context
21-
from paimon_python_api import catalog, read_builder, table_scan, split, table_read
22-
from paimon_python_api import table
23-
from pyarrow import RecordBatchReader
21+
from paimon_python_api import (catalog, table, read_builder, table_scan, split, table_read,
22+
write_builder, table_write, commit_message, table_commit)
23+
from pyarrow import RecordBatchReader, RecordBatch
2424
from typing import List
2525
from typing_extensions import Self
2626

@@ -53,6 +53,10 @@ def new_read_builder(self) -> 'ReadBuilder':
5353
j_read_builder = self._j_table.newReadBuilder()
5454
return ReadBuilder(j_read_builder)
5555

56+
def new_batch_write_builder(self) -> 'BatchWriteBuilder':
57+
j_batch_write_builder = self._j_table.newBatchWriteBuilder()
58+
return BatchWriteBuilder(j_batch_write_builder)
59+
5660

5761
class ReadBuilder(read_builder.ReadBuilder):
5862

@@ -110,3 +114,54 @@ class TableRead(table_read.TableRead):
110114
def create_reader(self, split: Split) -> RecordBatchReader:
111115
# TODO
112116
pass
117+
118+
119+
class BatchWriteBuilder(write_builder.BatchWriteBuilder):
120+
121+
def __init__(self, j_batch_write_builder):
122+
self._j_batch_write_builder = j_batch_write_builder
123+
124+
def with_overwrite(self, static_partition: dict) -> Self:
125+
self._j_batch_write_builder.withOverwrite(static_partition)
126+
return self
127+
128+
def new_write(self) -> 'BatchTableWrite':
129+
j_batch_table_write = self._j_batch_write_builder.newWrite()
130+
return BatchTableWrite(j_batch_table_write)
131+
132+
def new_commit(self) -> 'BatchTableCommit':
133+
j_batch_table_commit = self._j_batch_write_builder.newCommit()
134+
return BatchTableCommit(j_batch_table_commit)
135+
136+
137+
class BatchTableWrite(table_write.BatchTableWrite):
138+
139+
def __init__(self, j_batch_table_write):
140+
self._j_batch_table_write = j_batch_table_write
141+
142+
def write(self, record_batch: RecordBatch):
143+
# TODO
144+
pass
145+
146+
def prepare_commit(self) -> List['CommitMessage']:
147+
j_commit_messages = self._j_batch_table_write.prepareCommit()
148+
return list(map(lambda cm: CommitMessage(cm), j_commit_messages))
149+
150+
151+
class CommitMessage(commit_message.CommitMessage):
152+
153+
def __init__(self, j_commit_message):
154+
self._j_commit_message = j_commit_message
155+
156+
def to_j_commit_message(self):
157+
return self._j_commit_message
158+
159+
160+
class BatchTableCommit(table_commit.BatchTableCommit):
161+
162+
def __init__(self, j_batch_table_commit):
163+
self._j_batch_table_commit = j_batch_table_commit
164+
165+
def commit(self, commit_messages: List[CommitMessage]):
166+
j_commit_messages = list(map(lambda cm: cm.to_j_commit_message(), commit_messages))
167+
self._j_batch_table_commit.commit(j_commit_messages)

paimon_python_api/commit_message.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#################################################################################
18+
19+
from abc import ABC
20+
21+
22+
class CommitMessage(ABC):
23+
"""Commit message collected from writer."""

paimon_python_api/table.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
from abc import ABC, abstractmethod
2020
from read_builder import ReadBuilder
21+
from write_builder import BatchWriteBuilder
2122

2223

2324
class Table(ABC):
@@ -26,3 +27,7 @@ class Table(ABC):
2627
@abstractmethod
2728
def new_read_builder(self) -> ReadBuilder:
2829
"""Return a builder for building table scan and table read."""
30+
31+
@abstractmethod
32+
def new_batch_write_builder(self) -> BatchWriteBuilder:
33+
"""Returns a builder for building batch table write and table commit."""

paimon_python_api/table_commit.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#################################################################################
18+
19+
from abc import ABC, abstractmethod
20+
from commit_message import CommitMessage
21+
from typing import List
22+
23+
24+
class BatchTableCommit(ABC):
25+
"""A table commit for batch processing. Recommended for one-time committing."""
26+
27+
@abstractmethod
28+
def commit(self, commit_messages: List[CommitMessage]):
29+
"""
30+
Commit the commit messages to generate snapshots. One commit may generate
31+
up to two snapshots, one for adding new files and the other for compaction.
32+
"""

paimon_python_api/table_write.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#################################################################################
18+
19+
from abc import ABC, abstractmethod
20+
from commit_message import CommitMessage
21+
from pyarrow import RecordBatch
22+
from typing import List
23+
24+
25+
class BatchTableWrite(ABC):
26+
"""A table write for batch processing. Recommended for one-time committing."""
27+
28+
@abstractmethod
29+
def write(self, record_batch: RecordBatch):
30+
""" Write a batch to the writer. */"""
31+
32+
@abstractmethod
33+
def prepare_commit(self) -> List[CommitMessage]:
34+
"""Prepare commit message for TableCommit. Collect incremental files for this writer."""

paimon_python_api/write_builder.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#################################################################################
18+
19+
from abc import ABC, abstractmethod
20+
from table_commit import BatchTableCommit
21+
from table_write import BatchTableWrite
22+
from typing_extensions import Self
23+
24+
25+
class BatchWriteBuilder(ABC):
26+
"""An interface for building the TableScan and TableRead."""
27+
28+
@abstractmethod
29+
def with_overwrite(self, static_partition: dict) -> Self:
30+
"""
31+
Overwrite writing, same as the 'INSERT OVERWRITE T PARTITION (...)' semantics of SQL.
32+
If you pass an empty dict, it means OVERWRITE whole table.
33+
"""
34+
35+
@abstractmethod
36+
def new_write(self) -> BatchTableWrite:
37+
"""Create a BatchTableWrite to perform batch writing."""
38+
39+
@abstractmethod
40+
def new_commit(self) -> BatchTableCommit:
41+
"""Create a BatchTableCommit to perform batch commiting."""

0 commit comments

Comments
 (0)