Skip to content

Commit

Permalink
complete the distribution doc en version
Browse files Browse the repository at this point in the history
  • Loading branch information
yxdyc committed Jan 6, 2025
1 parent 24f73da commit 1f131df
Showing 1 changed file with 49 additions and 39 deletions.
88 changes: 49 additions & 39 deletions docs/Distributed.md
Original file line number Diff line number Diff line change
@@ -1,53 +1,62 @@
# Distributed Data Processing in Data-Juicer

In Data-Juicer, we implement distributed data processing based on the famous [Ray](https://github.com/ray-project/ray) framework.
Based on Ray, we optimize the strategy of tuning the number of split blocks of the input dataset in Ray,
and contributed for streaming reading of json files to Ray and Apache Arrow.
Then, we scale distributed data processing of Data-Juicer based on Ray and our patches up to datasets containing billions of samples on tens of thousands of CPU cores for ultimate efficiency.
We also equipped with a MinHash-based deduplication operator based on Ray, which could deduplicate TB-sized datasets on thousand of CPU cores in 3 hours.
## Overview

For more details, please refer to our Data-Juicer 2.0 paper.
Data-Juicer supports large-scale distributed data processing based on [Ray](https://github.com/ray-project/ray) and Alibaba's [PAI](https://www.aliyun.com/product/bigdata/learn).

## Functional Optimizations for Ray
With a dedicated design, almost all operators of Data-Juicer implemented in standalone mode can be seamlessly executed in Ray distributed mode. We continuously conduct engine-specific optimizations for large-scale scenarios, such as data subset splitting strategies that balance the number of files and workers, and streaming I/O patches for JSON files to Ray and Apache Arrow.

### Subset splitting
For reference, in our experiments with 25 to 100 Alibaba Cloud nodes, Data-Juicer in Ray mode processes datasets containing 70 billion samples on 6400 CPU cores in 2 hours and 7 billion samples on 3200 CPU cores in 0.45 hours. Additionally, a MinHash-LSH-based deduplication operator in Ray mode can deduplicate terabyte-sized datasets on 8 nodes with 1280 CPU cores in 3 hours.

When there are tens of thousands of nodes but with only a few dataset files, Ray would split the dataset files according to the available resources and distribute the blocks of the dataset to all nodes, which brings a huge network communication cost and decreases the CPU utilization of each node.
More details can be found in our paper, [Data-Juicer 2.0: Cloud-Scale Adaptive Data Processing for Foundation Models](arXiv_link_coming_soon).

Thus, we split the original dataset into smaller 128MB files in advance automatically according to the dataset size and the number of distributed nodes, trying to adapt the features of Arrow and Ray for better performance.
This approach reduces location and reprocessing costs associated with fault tolerance and helps mitigate network exchange overheads, especially beneficial in contexts involving large-scale multimodal data, as well as in scenarios that require handling global objects of Ray Actor in distributed modes.
![Arch-Overview](
https://img.alicdn.com/imgextra/i4/O1CN01uawwRu1JMSdafy5lF_!!6000000001014-2-tps-4034-4146.png)

### Streaming Reading of Json Files
## Implementation and Optimizations

We offer a streaming loading interface, addressing the current lack of native support in the Arrow framework underlying Hugging Face and Ray Datasets for streaming JSON data.
We developed an in-house patch for Apache Arrow ([PR](https://github.com/apache/arrow/pull/45084)) to alleviate Out-of-Memory (OOM) issues.
### Ray Mode in Data-Juicer

## Distributed Dataset Processing
- For most implementations of Data-Juicer [operators](Operators.md), the core processing functions are engine-agnostic. Interoperability is primarily managed in [RayDataset](../data_juicer/core/ray_data.py) and [RayExecutor](../data_juicer/core/ray_executor.py), which are subclasses of the base `DJDataset` and `BaseExecutor`, respectively, and support both Ray [Tasks](https://docs.ray.io/en/latest/ray-core/tasks.html) and [Actors](https://docs.ray.io/en/latest/ray-core/actors.html).
- The exception is the deduplication operators, which are challenging to scale in standalone mode. We provide these operators with the prefix [`ray_xx_deduplication`](../data_juicer/ops/deduplicator/).

Based on the two optimizations above, we conduct experiments on datasets with billions of samples.
We prepare a 560k-sample multimodal dataset and expand it by different factors to get datasets with different sizes.
The experimental results are shown in the figure below, which demonstrates the scalability.
And our optimizations for Ray offers 2x∼3x speedups in our experiments.
### Subset Splitting

![Overview](https://img.alicdn.com/imgextra/i3/O1CN01JV8wcC1oxn0G2xnBT_!!6000000005292-0-tps-1328-1742.jpg)
When dealing with tens of thousands of nodes but only a few dataset files, Ray would split the dataset files according to available resources and distribute the blocks across all nodes, incurring huge network communication costs and reduces CPU utilization. For more details, see [Ray's autodetect_parallelism](https://github.com/ray-project/ray/blob/2dbd08a46f7f08ea614d8dd20fd0bca5682a3078/python/ray/data/_internal/util.py#L201-L205) and [tuning output blocks for Ray](https://docs.ray.io/en/latest/data/performance-tips.html#tuning-output-blocks-for-read).

To optimize performance, we automatically split the original dataset into smaller files in advance, considering the features of Arrow and Ray. By default, the single file size is set to 128MB if it results in the number of sub-files being larger than five times the total number of CPU cores in the cluster.

### Streaming Reading of JSON Files

To address the lack of native support in the Arrow framework underlying Ray Datasets for streaming JSON data, we have developed a streaming loading interface and contributed an in-house [patch](https://github.com/modelscope/data-juicer/pull/515) for Apache Arrow ([PR to the repo](https://github.com/apache/arrow/pull/45084)). This patch helps alleviate Out-of-Memory issues.

### Deduplication

An optimized MinHash-LSH-based Deduplicator is provided in Ray mode. We implement a multi-process Union-Find set in Ray Actors and a load-balanced distributed algorithm, [BTS](https://ieeexplore.ieee.org/document/10598116), to complete equivalence class merging. This operator can deduplicate terabyte-sized datasets on 1280 CPU cores in 3 hours. Our ablation study shows 2x to 3x speedups with our dedicated optimizations for Ray mode compared to the vanilla version of this deduplication operator.

## Distributed Deduplication on Large-Scale Datasets
## Performance Results

We conduct MinHash-based RayDeduplicator on datasets sized at 200GB, 1TB, and 5TB, using CPU counts ranging from 640 to 1280 cores.
As the table below shows, when the data size increases by 5x, the processing time increases by 4.02x∼5.62x.
When the number of CPU cores doubles, the processing time decreases to 58.9%∼67.1% of the original time.
### Processing with Varied Scales

We conducted experiments on datasets with billions of samples. We prepared a 560k-sample multimodal dataset and expanded it by different factors (1x to 125000x) to create datasets of varying sizes. The experimental results, shown in the figure below, demonstrate good scalability.

![Overview](https://img.alicdn.com/imgextra/i3/O1CN01JV8wcC1oxn0G2xnBT_!!6000000005292-0-tps-1328-1742.jpg)

### Distributed Deduplication on Large-Scale Datasets

We tested the MinHash-based RayDeduplicator on datasets sized at 200GB, 1TB, and 5TB, using CPU counts ranging from 640 to 1280 cores. As the table below shows, when the data size increases by 5x, the processing time increases by 4.02x to 5.62x. When the number of CPU cores doubles, the processing time decreases to 58.9% to 67.1% of the original time.

| # CPU | 200GB Time | 1TB Time | 5TB Time |
|---------|------------|-----------|------------|
| 4 * 160 | 11.13 min | 50.83 min | 285.43 min |
| 8 * 160 | 7.47 min | 30.08 min | 168.10 min |

## Examples of Data Processing based on Ray
## Quick Start

### Running Example of Ray Mode

### Simple Demo of Data Processing Based on Ray Using Data-Juicer OPs
We provide a simple demo in the directory `demos/process_on_ray/`, which includes a config file and two test datasets.

We already prepare a simple demo in the directory `demos/process_on_ray/`, where we put a config file and two test datasets.
```text
demos/process_on_ray/
├── configs
Expand All @@ -57,38 +66,39 @@ demos/process_on_ray/
└── demo-dataset.jsonl
```

We already set the executor type to "ray" and set an auto ray address in the config file.
In the config file, set the executor type to "ray" and specify an automatic Ray address.

```yaml
...
dataset_path: './demos/process_on_ray/data/demo-dataset.jsonl'
export_path: './outputs/demo/demo-processed'

executor_type: 'ray' # set the executor type to "ray"
ray_address: 'auto' # set an auto ray address.
executor_type: 'ray' # Set the executor type to "ray"
ray_address: 'auto' # Set an automatic Ray address
...
```

Before running, we need to install Data-Juicer and its `dist` requirements:
Before running, install Data-Juicer and its `dist` requirements:

```shell
pip install -v -e . # installing the minimal requirements of Data-Juicer
pip install -v -e ".[dist]" # including dependencies on ray and other distributed libs
pip install -v -e . # Install the minimal requirements of Data-Juicer
pip install -v -e ".[dist]" # Include dependencies on Ray and other distributed libraries
```

Then we need to start a Ray cluster:
Start a Ray cluster:

```shell
ray start --head # start a local cluster as the head node
ray start --head # Start a local cluster as the head node
```

And we can run this demo with the `dj-process` tool:
Run the demo using the `dj-process` tool:

```shell
# run the tool from source
# Run the tool from source
python tools/process_data.py --config demos/process_on_ray/configs/demo.yaml

# use command line tool
# Use the command-line tool
dj-process --config demos/process_on_ray/configs/demo.yaml
```

Data-Juicer will process the demo dataset with the demo config file and export the result datasets in the directory specified by the `export_path` argument in the config file.
Data-Juicer will process the demo dataset with the demo config file and export the result datasets to the directory specified by the `export_path` argument in the config file.

0 comments on commit 1f131df

Please sign in to comment.