Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel Table.append #428

Closed
bigluck opened this issue Feb 14, 2024 · 32 comments
Closed

Parallel Table.append #428

bigluck opened this issue Feb 14, 2024 · 32 comments

Comments

@bigluck
Copy link
Contributor

bigluck commented Feb 14, 2024

Apache Iceberg version

main (development)

Please describe the bug 🐞

While doing some tests with the latest RC (v0.6.0rc5), I generated a ~6.7GB arrow table and appended it to a new table.

In terms of performances, I got similar results (writing to S3) on these 2 type of EC2 machines:

  • c5ad.8xlarge 32 core, 64 ram, 10gbps nic -> wrote 1 parquet file of 2GB in 31s
  • c5ad.16xlarge 64 core, 128 ram, 20gbps nic -> wrote 1 parquet file of 1.6GB in 28s

By using htop I notice that the code was only using a thread during the append operation, which means that it's not parallelizing the write operation.

Screenshot 2024-02-13 at 14 26 35

@Fokko
Copy link
Contributor

Fokko commented Feb 14, 2024

@bigluck Thanks for raising this. This is on my list to look into!

Parallelization of this is always hard since it is hard to exactly know how big the Parquet file will be. Efficient encoding and compression can vary a lot. However, just using one core does not make any sense.

@sungwy sungwy added this to the PyIceberg 0.7.0 release milestone Feb 14, 2024
@kevinjqliu
Copy link
Contributor

Copy/paste my comments from Slack.

PyIceberg currently uses ParquetWriter's write_table function to write data files (see write_file) which I believe only uses 1 core (need to verify this)

ParquetWriter also has the write_dataset function for writing multiple files which has a use_threads argument to toggle and write in parallel.

https://arrow.apache.org/cookbook/r/reading-and-writing-data---multiple-files.html#write-partitioned-data---parquet
Describes using write_dataset to write multiple files

@kevinjqliu
Copy link
Contributor

kevinjqliu commented Feb 17, 2024

There's not a lot mentions of using Arrow to write a single file using multiple threads. The only thing I found was use_threads() here

I don't think this is exposed to writing a single file (write_table). But it is for writing multiple files (write_dataset)

@kevinjqliu
Copy link
Contributor

Looks like we have to use a higher level API to force parallelism, i.e. write_dataset to write multiple parquet files using multiple threads

@kevinjqliu
Copy link
Contributor

Looks like write_file already gets an iterator of WriteTasks.

Maybe we can use the ParquetWriter.write_dataset implementation to parallelize the data file writes.

@kevinjqliu
Copy link
Contributor

Support encoding a single parquet file using multiple threads

Looks like the bottleneck might be encoding instead of IO

@kevinjqliu
Copy link
Contributor

Related to #208 and #353

@zeddit
Copy link

zeddit commented Feb 18, 2024

I wonder if pyiceberg write will keep the row order when reading out later.
Why I ask this is because some engine like spark won't keep this, even specifying the sortOrder field.

@Fokko
Copy link
Contributor

Fokko commented Feb 18, 2024

There's not a lot mentions of using Arrow to write a single file using multiple threads. The only thing I found was use_threads() here

This is for reading, so each core reads a separate row group.

Maybe we can use the ParquetWriter.write_dataset implementation to parallelize the data file writes.

How would we control the size of a file (or the number of records is a second best).

I wonder if we generate a fair amount of data, like @bigluck is doing:

import multiprocessing
import os
import time
import uuid
from functools import partial
from typing import Any

import pyarrow as pa
from faker import Faker
from pyiceberg.catalog import load_catalog


def generate_fake_rows(batch_size, i) -> list[dict[str, Any]]:
    fake = Faker()
    return [
        {
            **fake.profile(),
            'password': fake.sha256(),
            'device_id': fake.uuid4(),
            'device_ip': fake.ipv4(),
            'device_user_agent': fake.user_agent(),
            'phone': fake.phone_number(),
            'salary': fake.random_int(15_000, 250_000),
            'job_description': fake.sentence(nb_words=15),
            'cc_expire': fake.credit_card_expire(),
            'cc_number': fake.credit_card_number(),
            'cc_security_code': fake.credit_card_security_code(),
        }
        for _ in range(batch_size)
    ]


def generate_fake_table(num_records: int, batch_size: int) -> pa.Table:
    num_batches = num_records // batch_size

    generate_fake_rows_partial = partial(generate_fake_rows, batch_size)

    with multiprocessing.Pool(processes=multiprocessing.cpu_count() - 1) as pool:
        results = pool.map(generate_fake_rows_partial, range(num_batches))
    return pa.Table.from_pylist([row for batch in results for row in batch])

How many Arrow buffers do we get, and if we can smartly join these buffers (we already have a bin-packing algorithm in the code).

We could use the write_batch operation of ParquetWriter to write certain batches to a file.

@kevinjqliu
Copy link
Contributor

kevinjqliu commented Feb 18, 2024

I took the above code and did some investigation.
Here's the notebook to see it in action https://colab.research.google.com/drive/12O4ARckCwJqP2U6L4WxREZbyPv_AmWC2?usp=sharing

I'll summarize.

I used the above functions to generate 1 million records and save them in a feather file.
The file on disk is 447 MB in size.
Reading the feather file back to Python, the pyarrow.Table is 685.46 MB in size, using nbytes

We can use the to_batches function to chop the table into RecordBatchs, using the default setting.
This turns the table into 16 batches with around 40MB each.

Given this, it seems plausible to chunk the table into batches and use the write_batch function. We can set the default chunk to size 516MB or some recommended setting.

Note, to_batches is a "zero-copy" method, so there shouldn't be performance impact

@kevinjqliu
Copy link
Contributor

It seems like there's an upper bound to the size of the RecordBatch produced by to_batches. I tried setting max_chunksize from 16 MB to 256 MB. All the batches produced are around 45MB in size.

I guess is this what you mean by bin-packing. We can bin-pack these batches into 512 MB parquet files.

@kevinjqliu
Copy link
Contributor

Also, @bigluck, while running the code to generate the data using faker, I opened htop and saw that it was using 6 CPUs. I'm using a M1 Mac

@bigluck
Copy link
Contributor Author

bigluck commented Feb 19, 2024

Thanks @kevinjqliu
Last week, I didn't test the code on my MBP; I did all the tests directly on the EC2 instance.

BTW it seems to use all the cores on my M2 Max:

Screenshot 2024-02-19 at 08 50 57

@Fokko
Copy link
Contributor

Fokko commented Feb 19, 2024

Do we know if this is for data generation, or also when writing? In the end, it would be good to be able to split the data into multiple files. The MacBooks have huge IO, so it might be that the CPU is the bottleneck and not the IO

@kevinjqliu
Copy link
Contributor

It was for data generation only. I can't seem to reproduce the parallelism issue for append, probably due to MacBook's huge IO.

@kevinjqliu
Copy link
Contributor

we already have a bin-packing algorithm in the code

@Fokko can you point me to that? I couldn't find it

@Fokko
Copy link
Contributor

Fokko commented Feb 19, 2024

@kevinjqliu It is under utils/bin_packing.py.

@kevinjqliu
Copy link
Contributor

thanks! I found it, had to fuzzy search in vscode :)

Here's an example of bin-packing an Arrow table.
https://colab.research.google.com/drive/1FM8mdr4j5KgsjBYmsp9_eH8SBlwVC6C6?usp=sharing

I didn't fully understand lookback and largest_bin_first so I just picked an arbitrary value by looking at the test examples

@kevinjqliu
Copy link
Contributor

kevinjqliu commented Feb 19, 2024

Integrating this with the write path, I have 2 approaches

  1. refactoring write_file so that it can write multiple parquet files. This means 1 WriteTask can produce multiple DataFiles
  2. Bin-pack earlier (in _dataframe_to_data_files) and create multiple WriteTasks that is then passed to write_file

I like the second option, but we need to coordinate with how we implement partitioned writes.

@kevinjqliu
Copy link
Contributor

kevinjqliu commented Feb 19, 2024

#444 something like this.

wrote out 3 files

(env) ➜  iceberg-python git:(kevinjqliu/bin-pack-write) ✗ ll -h /tmp/iceberg_warehouse8868767412496944120/iceberg_data/fake/fake_data/data   
total 498312
-rw-r--r--  1 kevinliu  wheel    80M Feb 19 10:19 00000-0-a61f9655-0d76-45ca-b85d-4d8dc8dbcbd9.parquet
-rw-r--r--  1 kevinliu  wheel    84M Feb 19 10:19 00000-1-a61f9655-0d76-45ca-b85d-4d8dc8dbcbd9.parquet
-rw-r--r--  1 kevinliu  wheel    80M Feb 19 10:19 00000-2-a61f9655-0d76-45ca-b85d-4d8dc8dbcbd9.parquet

and reading it back returns the same number of records

@kevinjqliu
Copy link
Contributor

Oh interesting, the input is 1M records, 685.46 MB in memory. We bin-pack the Arrow representation into 256MB chunks (['224.61 MB', '236.23 MB', '224.62 MB']), but writing these into parquet turns them into only ~80 MB.

We'd want to take into account the size of the parquet file when written to disk, not the size of the Arrow representation in memory.

@kevinjqliu
Copy link
Contributor

Here's how it's done in Java. BaseTaskWriter.java checks targetFileSize when writing and will create a new file when the file size target is met.

targetFileSize is set in SparkWrite which refers to the targetDataFileSize property

Default setting is 512 MB

@kevinjqliu
Copy link
Contributor

@bigluck #444 should allow parallelized write. Do you mind giving it a try?

@bigluck
Copy link
Contributor Author

bigluck commented Feb 24, 2024

Ciao @kevinjqliu, thanks!

I've tested it on the same c5ad.16xlarge machine, but the results are pretty similar, 27s vs 28s for this table:

$ pip install git+https://github.com/kevinjqliu/iceberg-python.git@kevinjqliu/bin-pack-write
$ python3 benchmark.py
...
Generating 10,000,000 records in 1,000 batches
 - generate_users: done (in 582.4395 seconds)
 - table size: 7188266296 bytes, 6.69 GB, 10,000,000 records (in 0.0002 seconds)
 - create empty table: users done (in 0.6575 seconds)
 - append data: done (in 27.7934 seconds)
...

-rw-rw-r--. 1 ec2-user ec2-user 6.7G Feb 24 09:04 table_10000000.arrow
Screenshot 2024-02-24 at 10 05 26

This is the final table parquet file on s3:

Screenshot 2024-02-24 at 10 19 04

@kevinjqliu
Copy link
Contributor

hm. Looks like something weird is going on if the resulting parquet file is 1.6 GB. Each parquet file size should be at most 512 MB, if not less. See the bin packing logic.

Here's something we can run for diagnostics,

# https://stackoverflow.com/questions/12523586/python-format-size-application-converting-b-to-kb-mb-gb-tb
def humanbytes(B):
    """Return the given bytes as a human friendly KB, MB, GB, or TB string."""
    B = float(B)
    KB = float(1024)
    MB = float(KB ** 2) # 1,048,576
    GB = float(KB ** 3) # 1,073,741,824
    TB = float(KB ** 4) # 1,099,511,627,776

    if B < KB:
        return '{0} {1}'.format(B,'Bytes' if 0 == B > 1 else 'Byte')
    elif KB <= B < MB:
        return '{0:.2f} KB'.format(B / KB)
    elif MB <= B < GB:
        return '{0:.2f} MB'.format(B / MB)
    elif GB <= B < TB:
        return '{0:.2f} GB'.format(B / GB)
    elif TB <= B:
        return '{0:.2f} TB'.format(B / TB)

import pyarrow as pa
from pyarrow import feather

arrow_tbl = feather.read_table('fake_data.feather')
print(f"Table has {len(arrow_tbl)} records")

from pyiceberg.catalog import load_catalog
catalog = load_catalog()
try:
    iceberg_tbl = catalog.drop_table("fake.fake_data")
except:
    pass
iceberg_tbl = catalog.create_table("fake.fake_data", arrow_tbl.schema)

from pyiceberg.io.pyarrow import bin_pack_arrow_table
bins = bin_pack_arrow_table(arrow_tbl,iceberg_tbl.properties)
for bin in bins:
    print(f"total={humanbytes(sum(map(lambda x:x.nbytes, bin)))}, chunks={[humanbytes(batch.nbytes) for batch in bin]}")

You might have to change the arrow_tbl and iceberg_tbl.
This will show us how the arrow table is bin-packed during writes.

For Write operations (append/overwrite), parallelism only kicks in during the actual writing. In order to take advantage of the parallelism, you'd have to set the PYICEBERG_MAX_WORKERS env variable to the number of CPUs.

@bigluck
Copy link
Contributor Author

bigluck commented Feb 25, 2024

Hey @kevinjqliu , we're currently debugging the issue on Slack, but I thought it would be helpful to report our findings here as well. In my tests, the pyarrow table is generated using the following code:

return pa.Table.from_pylist([row for batch in results for row in batch])

I've also cached the table on disk to save time, and it's read using the following code:

with pa.memory_map('table_10000000.arrow', 'r') as source:
    pa_table = pa.ipc.open_file(source).read_all()

Although I know that using a record batch would be the right way to read the file, I'm explicitly using read_all() because I can't control what table will be generated by users' code. This is to evaluate the edge case where the table has to be read without using streaming functions.

After importing, the bin_pack_arrow_table returns only one chunk:

total=6.69 GB, chunks=['6.69 GB']

Let me know if you have any questions, and thanks for your time!

@bigluck
Copy link
Contributor Author

bigluck commented Feb 26, 2024

@kevinjqliu, your latest changes are mind-blowing (#444 (comment) for reference)

I have tested your last changes on c5ad.8xlarge and c5ad.16xlarge instances using my 10,000,000 table.

  • On a c5ad.8xlarge instance with 10Gbps NIC, 32 cores, and a 64 RAM, it took an average of 3.9s to write 14 Parquet files. Previously, using pynessie 0.6.0, it took 31s.
  • On a c5ad.16xlarge instance with 20Gbps NIC, 64 cores, and a 128 RAM, it took approximately 3.6s to complete the same task, compared to 28.2s using pynessie 0.6.0.

I have been experimenting with different settings to improve the writing performances, but I failed.
I tried adjusting the PYICEBERG_MAX_WORKERS variable, but it did not make much difference. This might be due to the small size of my dataset (only 6.69 GB in arrow format), which resulted in only 14 output files.
I also tested the write.target-file-size-bytes property, which produced 27 files when set to 268435456 and 54 files when set to 134217728.
However, even when I set PYICEBERG_MAX_WORKERS to 64, the total write operation still took 3.6 seconds.

Overall, I am very impressed with how it works now! Well done!

@kevinjqliu
Copy link
Contributor

As a way to benchmark multithreaded writes to multiple parquet files, I've noticed that Duckdb's COPY command has the per_thread_output and file_size_bytes options.

Using the duckdb CLI,

.timer on
CREATE TABLE tbl AS SELECT * FROM read_parquet('table_10000000.parquet');

SELECT COUNT(*) FROM tbl;

COPY
    (SELECT * FROM tbl)
    TO 'parquet_files'
    (FORMAT PARQUET, PER_THREAD_OUTPUT true, FILE_SIZE_BYTES '512MB', OVERWRITE_OR_IGNORE true);

Result,

Run Time (s): real 14.588 user 66.250569 sys 8.554207

Screenshot 2024-02-27 at 2 04 10 PM
Screenshot 2024-02-27 at 2 04 22 PM

And setting FILE_SIZE_BYTES to 256MB,

COPY
    (SELECT * FROM tbl)
    TO 'parquet_files'
    (FORMAT PARQUET, PER_THREAD_OUTPUT true, FILE_SIZE_BYTES '256MB', OVERWRITE_OR_IGNORE true);
Run Time (s): real 15.575 user 66.483984 sys 10.547852

Screenshot 2024-02-27 at 2 05 03 PM
Screenshot 2024-02-27 at 2 05 40 PM

I'm not sure if there's a way to specify the number of threads Duckdb can use. But with htop while executing the statements, I can see that all the cores are used

@bigluck
Copy link
Contributor Author

bigluck commented Feb 27, 2024

@kevinjqliu nice, duckdb should use https://duckdb.org/docs/sql/configuration.html

@kevinjqliu
Copy link
Contributor

Thanks! It
thanks! I can see that its using 8 threads with

SELECT * FROM duckdb_settings();

I also ran

SET threads TO 8;

just in case

@kevinjqliu
Copy link
Contributor

kevinjqliu commented Feb 27, 2024

Here's the script I used to run the append() function to use 8 threads to write multiple parquet files
https://gist.github.com/kevinjqliu/e738641ec8f96de554c5ed39ead3f09a
Screenshot 2024-02-27 at 2 37 33 PM
Screenshot 2024-02-27 at 2 38 34 PM

Wrote 14 files in around 16.5 seconds

And on main branch without parallel write (#444),
Screenshot 2024-02-27 at 4 08 36 PM
Wrote a single 1.6G file in around 32.8 seconds

@Fokko
Copy link
Contributor

Fokko commented May 13, 2024

Fixed in #444

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants