Skip to content

Commit

Permalink
Add tests for new reader and writer api (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin authored Oct 9, 2024
1 parent 953f30f commit 7703bcd
Showing 1 changed file with 78 additions and 0 deletions.
78 changes: 78 additions & 0 deletions paimon_python_java/tests/test_write_and_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,81 @@ def testParallelRead(self):
# check data (ignore index)
pd.testing.assert_frame_equal(
result.reset_index(drop=True), expected_df.reset_index(drop=True))

def testAllWriteAndReadApi(self):
schema = Schema(self.simple_pa_schema)
self.catalog.create_table('default.test_all_api', schema, False)
table = self.catalog.get_table('default.test_all_api')
write_builder = table.new_batch_write_builder()

# write_arrow
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
data1 = {
'f0': [1, 2, 3],
'f1': ['a', 'b', 'c'],
}
pa_table = pa.Table.from_pydict(data1, schema=self.simple_pa_schema)
table_write.write_arrow(pa_table)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

# write_arrow_batch
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
data2 = {
'f0': [4, 5, 6],
'f1': ['d', 'e', 'f'],
}
df = pd.DataFrame(data2)
record_batch = pa.RecordBatch.from_pandas(df, schema=self.simple_pa_schema)
table_write.write_arrow_batch(record_batch)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

# write_pandas
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
data3 = {
'f0': [7, 8, 9],
'f1': ['g', 'h', 'i'],
}
df = pd.DataFrame(data3)
table_write.write_pandas(df)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

read_builder = table.new_read_builder()
table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
splits = table_scan.plan().splits()

# to_arrow
actual = table_read.to_arrow(splits)
expected = pa.Table.from_pydict({
'f0': [1, 2, 3, 4, 5, 6, 7, 8, 9],
'f1': ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i'],
}, schema=self.simple_pa_schema)
self.assertEqual(actual, expected)

# to_arrow_batch_reader
data_frames = [
batch.to_pandas()
for batch in table_read.to_arrow_batch_reader(splits)
]
actual = pd.concat(data_frames)
expected = pd.DataFrame({
'f0': [1, 2, 3, 4, 5, 6, 7, 8, 9],
'f1': ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i'],
})
expected['f0'] = expected['f0'].astype('int32')
pd.testing.assert_frame_equal(
actual.reset_index(drop=True), expected.reset_index(drop=True))

# to_pandas
actual = table_read.to_pandas(splits)
pd.testing.assert_frame_equal(
actual.reset_index(drop=True), expected.reset_index(drop=True))

0 comments on commit 7703bcd

Please sign in to comment.