Skip to content

Commit 7017ba2

Browse files
authored
Merge pull request #1 from wvaske/parquet_support
Parquet support
2 parents 3c2be85 + acde3f9 commit 7017ba2

File tree

6 files changed

+723
-11
lines changed

6 files changed

+723
-11
lines changed

dlio_benchmark/common/enumerations.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,8 @@ class FormatType(Enum):
133133
INDEXED_BINARY = 'indexed_binary'
134134
MMAP_INDEXED_BINARY = 'mmap_indexed_binary'
135135
SYNTHETIC = 'synthetic'
136-
136+
PARQUET = 'parquet'
137+
137138
def __str__(self):
138139
return self.value
139140

@@ -161,6 +162,8 @@ def get_enum(value):
161162
return FormatType.MMAP_INDEXED_BINARY
162163
elif FormatType.SYNTHETIC.value == value:
163164
return FormatType.SYNTHETIC
165+
elif FormatType.PARQUET.value == value:
166+
return FormatType.PARQUET
164167

165168
class DataLoaderType(Enum):
166169
"""
@@ -275,6 +278,9 @@ class Compression(Enum):
275278
BZIP2 = 'bz2'
276279
ZIP = 'zip'
277280
XZ = 'xz'
281+
LZ4 = 'lz4'
282+
ZSTD = 'zstd'
283+
SNAPPY = 'snappy'
278284

279285
def __str__(self):
280286
return self.value

dlio_benchmark/data_generator/generator_factory.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,5 +61,8 @@ def get_generator(type):
6161
elif type == FormatType.INDEXED_BINARY or type == FormatType.MMAP_INDEXED_BINARY:
6262
from dlio_benchmark.data_generator.indexed_binary_generator import IndexedBinaryGenerator
6363
return IndexedBinaryGenerator()
64+
elif type == FormatType.PARQUET:
65+
from dlio_benchmark.data_generator.parquet_generator import ParquetGenerator
66+
return ParquetGenerator()
6467
else:
6568
raise Exception(str(ErrorCodes.EC1001))
Lines changed: 306 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,306 @@
1+
"""
2+
Copyright (c) 2025, UChicago Argonne, LLC
3+
All Rights Reserved
4+
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
"""
17+
import os
18+
19+
import numpy as np
20+
import pyarrow as pa
21+
import pyarrow.parquet as pq
22+
23+
from dlio_benchmark.common.enumerations import Compression
24+
from dlio_benchmark.data_generator.data_generator import DataGenerator
25+
from dlio_benchmark.utils.utility import progress
26+
27+
# Map DLIO Compression enum values to PyArrow compression strings
28+
COMPRESSION_MAP = {
29+
Compression.NONE: None,
30+
Compression.SNAPPY: 'snappy',
31+
Compression.GZIP: 'gzip',
32+
Compression.LZ4: 'lz4',
33+
Compression.ZSTD: 'zstd',
34+
}
35+
36+
37+
class ParquetGenerator(DataGenerator):
38+
"""
39+
Schema-driven Parquet data generator with full compression and partitioning support.
40+
41+
When parquet_columns is configured, generates multi-column files with specified
42+
dtypes (float32, float64, string, binary, bool). When empty, falls back to
43+
Phase 9 single 'data' column behavior for backward compatibility.
44+
45+
Supports configurable row_group_size, batched writing for memory efficiency,
46+
and optional Hive-style partitioning.
47+
48+
Memory Optimization Features:
49+
- Batched writing: Data is generated and written in batches to reduce peak memory usage
50+
- Vectorized Numpy-to-Arrow conversion: Uses FixedSizeListArray.from_arrays for zero-copy
51+
or near zero-copy conversion instead of inefficient list comprehensions
52+
- Configurable batch size via parquet_generation_batch_size parameter
53+
"""
54+
55+
def __init__(self):
56+
super().__init__()
57+
self.parquet_columns = self._args.parquet_columns
58+
self.row_group_size = self._args.parquet_row_group_size
59+
self.partition_by = self._args.parquet_partition_by
60+
# Use generation_batch_size if set, otherwise default to row_group_size
61+
self.generation_batch_size = self._args.parquet_generation_batch_size
62+
if self.generation_batch_size <= 0:
63+
self.generation_batch_size = self.row_group_size
64+
65+
def _build_schema(self):
66+
"""Build PyArrow schema from column specifications for use with ParquetWriter."""
67+
if not self.parquet_columns:
68+
# Backward compatible: single 'data' column with list of uint8
69+
return pa.schema([('data', pa.list_(pa.uint8()))])
70+
71+
# Scalar PyArrow type map for numeric dtypes
72+
SCALAR_PA_TYPES = {
73+
'int8': pa.int8(),
74+
'float16': pa.float16(),
75+
'float32': pa.float32(),
76+
'float64': pa.float64(),
77+
}
78+
79+
fields = []
80+
for col_spec in self.parquet_columns:
81+
if hasattr(col_spec, 'get'):
82+
name = str(col_spec.get('name', 'data'))
83+
dtype = str(col_spec.get('dtype', 'float32'))
84+
size = int(col_spec.get('size', 1))
85+
else:
86+
name = str(col_spec)
87+
dtype = 'float32'
88+
size = 1
89+
90+
if size == 1 and dtype in SCALAR_PA_TYPES:
91+
# Scalar path: single-element numeric columns — most efficient for reads
92+
fields.append(pa.field(name, SCALAR_PA_TYPES[dtype]))
93+
elif dtype in SCALAR_PA_TYPES:
94+
# List path: multi-element numeric columns
95+
fields.append(pa.field(name, pa.list_(SCALAR_PA_TYPES[dtype], size)))
96+
elif dtype == 'list':
97+
fields.append(pa.field(name, pa.list_(pa.float32(), size)))
98+
elif dtype == 'string':
99+
fields.append(pa.field(name, pa.string()))
100+
elif dtype == 'binary':
101+
fields.append(pa.field(name, pa.binary()))
102+
elif dtype == 'bool':
103+
fields.append(pa.field(name, pa.bool_()))
104+
else:
105+
# Fallback: treat unknown dtype as float32 list
106+
fields.append(pa.field(name, pa.list_(pa.float32(), size)))
107+
108+
return pa.schema(fields)
109+
110+
def _generate_column_data_batch(self, col_spec, batch_size):
111+
"""
112+
Generate data for a single column based on its dtype specification.
113+
114+
Uses optimized vectorized conversion for Numpy-to-Arrow to minimize
115+
memory overhead and avoid intermediate Python objects.
116+
"""
117+
# Handle both dict and Hydra DictConfig by accessing values and casting to native types
118+
if hasattr(col_spec, 'get'): # dict-like (dict or DictConfig)
119+
name = str(col_spec.get('name', 'data'))
120+
dtype = str(col_spec.get('dtype', 'float32'))
121+
size = int(col_spec.get('size', 1))
122+
else:
123+
name = str(col_spec)
124+
dtype = 'float32'
125+
size = 1
126+
127+
# Scalar path: size=1 numeric columns — avoid FixedSizeListArray overhead
128+
if size == 1 and dtype == 'int8':
129+
data = np.random.randint(-128, 128, batch_size, dtype=np.int8)
130+
return name, pa.array(data, type=pa.int8())
131+
132+
if size == 1 and dtype == 'float16':
133+
data = np.random.rand(batch_size).astype(np.float16)
134+
return name, pa.array(data, type=pa.float16())
135+
136+
if size == 1 and dtype in ('float32', 'float64'):
137+
np_dtype = np.float32 if dtype == 'float32' else np.float64
138+
pa_type = pa.float32() if dtype == 'float32' else pa.float64()
139+
data = np.random.rand(batch_size).astype(np_dtype)
140+
return name, pa.array(data, type=pa_type)
141+
142+
# List path: multi-element columns use FixedSizeListArray
143+
if dtype == 'int8':
144+
data = np.random.randint(-128, 128, (batch_size, size), dtype=np.int8)
145+
flat_data = data.ravel()
146+
arrow_flat = pa.array(flat_data, type=pa.int8())
147+
arrow_data = pa.FixedSizeListArray.from_arrays(arrow_flat, size)
148+
return name, arrow_data
149+
150+
if dtype == 'float16':
151+
data = np.random.rand(batch_size, size).astype(np.float16)
152+
flat_data = data.ravel()
153+
arrow_flat = pa.array(flat_data, type=pa.float16())
154+
arrow_data = pa.FixedSizeListArray.from_arrays(arrow_flat, size)
155+
return name, arrow_data
156+
157+
if dtype in ('float32', 'float64'):
158+
np_dtype = np.float32 if dtype == 'float32' else np.float64
159+
# Generate data as contiguous array
160+
data = np.random.rand(batch_size, size).astype(np_dtype)
161+
# Optimized conversion: use FixedSizeListArray.from_arrays for zero-copy
162+
flat_data = data.ravel()
163+
arrow_flat = pa.array(flat_data)
164+
arrow_data = pa.FixedSizeListArray.from_arrays(arrow_flat, size)
165+
return name, arrow_data
166+
167+
if dtype == 'list':
168+
# Treat like float32 with configurable size
169+
data = np.random.rand(batch_size, size).astype(np.float32)
170+
# Optimized conversion
171+
flat_data = data.ravel()
172+
arrow_flat = pa.array(flat_data)
173+
arrow_data = pa.FixedSizeListArray.from_arrays(arrow_flat, size)
174+
return name, arrow_data
175+
176+
if dtype == 'string':
177+
data = [f"text_{j}" for j in range(batch_size)]
178+
return name, pa.array(data, type=pa.string())
179+
180+
if dtype == 'binary':
181+
data = [np.random.bytes(size) for _ in range(batch_size)]
182+
return name, pa.array(data, type=pa.binary())
183+
184+
if dtype == 'bool':
185+
data = np.random.choice([True, False], batch_size)
186+
return name, pa.array(data, type=pa.bool_())
187+
188+
# Fallback: treat unknown dtype as float32
189+
data = np.random.rand(batch_size, size).astype(np.float32)
190+
flat_data = data.ravel()
191+
arrow_flat = pa.array(flat_data)
192+
arrow_data = pa.FixedSizeListArray.from_arrays(arrow_flat, size)
193+
return name, arrow_data
194+
195+
def _generate_batch_columns(self, batch_size):
196+
"""Generate all columns for a batch of samples."""
197+
columns = {}
198+
for col_spec in self.parquet_columns:
199+
name, arrow_data = self._generate_column_data_batch(col_spec, batch_size)
200+
columns[name] = arrow_data
201+
return columns
202+
203+
def _generate_legacy_batch(self, dim1, dim2, batch_size):
204+
"""
205+
Generate backward-compatible single 'data' column batch.
206+
207+
Uses optimized conversion for the legacy format.
208+
"""
209+
record = np.random.randint(255, size=dim1 * dim2, dtype=np.uint8)
210+
# Create batch_size copies of the record using numpy broadcasting
211+
records = np.tile(record, (batch_size, 1))
212+
# Optimized conversion using FixedSizeListArray
213+
flat_data = records.ravel()
214+
arrow_flat = pa.array(flat_data)
215+
arrow_data = pa.FixedSizeListArray.from_arrays(arrow_flat, dim1 * dim2)
216+
return {'data': arrow_data}
217+
218+
def _generate_column_data(self, col_spec, num_samples):
219+
"""
220+
Generate data for a single column based on its dtype specification.
221+
222+
This method is kept for backward compatibility but uses the optimized
223+
batch generation internally.
224+
"""
225+
return self._generate_column_data_batch(col_spec, num_samples)
226+
227+
def generate(self):
228+
"""
229+
Generate parquet data files with config-driven schema or backward-compatible single column.
230+
231+
Uses batched writing strategy to minimize memory usage:
232+
- Opens ParquetWriter with pre-defined schema
233+
- Generates data in batches of size `generation_batch_size`
234+
- Writes each batch immediately to disk
235+
- Closes writer when complete
236+
237+
This approach significantly reduces peak memory usage for large files.
238+
"""
239+
super().generate()
240+
np.random.seed(10)
241+
record_label = 0
242+
dim = self.get_dimension(self.total_files_to_generate)
243+
244+
# Resolve compression from enum
245+
compression = COMPRESSION_MAP.get(self.compression, None)
246+
247+
for i in range(self.my_rank, int(self.total_files_to_generate), self.comm_size):
248+
progress(i + 1, self.total_files_to_generate, "Generating Parquet Data")
249+
250+
out_path_spec = self.storage.get_uri(self._file_list[i])
251+
252+
if self.partition_by:
253+
# Partitioned writes don't support streaming, use table-based approach
254+
# but still use optimized column generation
255+
if self.parquet_columns:
256+
columns = self._generate_batch_columns(self.num_samples)
257+
table = pa.table(columns)
258+
else:
259+
dim1 = dim[2 * i]
260+
dim2 = dim[2 * i + 1]
261+
columns = self._generate_legacy_batch(dim1, dim2, self.num_samples)
262+
table = pa.table(columns)
263+
264+
pq.write_to_dataset(
265+
table,
266+
root_path=os.path.dirname(out_path_spec),
267+
partition_cols=[self.partition_by],
268+
compression=compression,
269+
row_group_size=self.row_group_size,
270+
)
271+
else:
272+
# Use batched writing for memory efficiency
273+
schema = self._build_schema()
274+
275+
# Ensure parent directory exists
276+
parent_dir = os.path.dirname(out_path_spec)
277+
if parent_dir:
278+
os.makedirs(parent_dir, exist_ok=True)
279+
280+
with pq.ParquetWriter(out_path_spec, schema, compression=compression) as writer:
281+
num_batches = (self.num_samples + self.generation_batch_size - 1) // self.generation_batch_size
282+
283+
for batch_idx in range(num_batches):
284+
batch_start = batch_idx * self.generation_batch_size
285+
batch_end = min(batch_start + self.generation_batch_size, self.num_samples)
286+
current_batch_size = batch_end - batch_start
287+
288+
if self.parquet_columns:
289+
columns = self._generate_batch_columns(current_batch_size)
290+
else:
291+
dim1 = dim[2 * i]
292+
dim2 = dim[2 * i + 1]
293+
columns = self._generate_legacy_batch(dim1, dim2, current_batch_size)
294+
295+
batch_table = pa.table(columns)
296+
writer.write_table(batch_table, row_group_size=self.row_group_size)
297+
298+
# Log batch progress for large files
299+
if num_batches > 1 and self.my_rank == 0:
300+
self.logger.debug(
301+
f"File {i+1}/{self.total_files_to_generate}: "
302+
f"Wrote batch {batch_idx+1}/{num_batches} "
303+
f"({current_batch_size} samples)"
304+
)
305+
306+
np.random.seed()

0 commit comments

Comments
 (0)