Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions examples/fstest.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
# Test the correctness of file system read and write.
#
# This script runs multiple tasks to write and read data to/from the file system.
# Each task writes to an individual file in the given directory.
# Then it reads the data back and verifies the correctness.
"""Filesystem Testing with Smallpond's Distributed Execution

Demonstrates Smallpond's distributed computation capabilities through filesystem I/O:
1. Uses sp.from_items() to create distributed tasks
2. Uses df.repartition() for parallel task distribution
3. Uses df.map() to execute I/O operations in parallel
4. Collects results with to_pandas() for analysis

Note: Smallpond provides read/write methods read_parquet(),write_parquet() etc, but this example intentionally uses low-level file I/O wrapped in Smallpond's distributed execution framework to perform filesystem benchmarking. The distribution is handled by Smallpond (parallel execution across workers) while the actual I/O is done with Python's native file operations to have precise control needed for benchmarking.
"""

import argparse
import glob
Expand Down Expand Up @@ -153,8 +158,11 @@ def fstest(

if output_path is not None:
os.makedirs(output_path, exist_ok=True)
#creates a distributed DataFrame
df = sp.from_items([{"path": os.path.join(output_path, f"{i}")} for i in range(npartitions)])
#distributes the work across partitions
df = df.repartition(npartitions, by_rows=True)
#runs the write operations in parallel across those partitions
stats = df.map(lambda x: fswrite(x["path"], size, blocksize)).to_pandas()
logging.info(f"write stats:\n{stats}")

Expand Down
27 changes: 27 additions & 0 deletions examples/read_write_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""
This script demonstrates the basic usage of Smallpond for processing Parquet files.
===================================
It shows how to:
- Read data from Parquet files
- Repartition data based on a column
- Apply SQL transformations
- Write results back to Parquet format
"""

import smallpond

# Initialize session
sp = smallpond.init()

# Load data
df = sp.read_parquet("prices.parquet")
print(df.to_pandas())

# Process data
df = df.repartition(3, hash_by="ticker")
df = sp.partial_sql("SELECT ticker, min(price), max(price) FROM {0} GROUP BY ticker", df)

# Save results
df.write_parquet("output/")
# Show results
print(df.to_pandas())
23 changes: 22 additions & 1 deletion examples/shuffle_data.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
"""
Large-Scale Data Shuffling Example
================================

This script demonstrates advanced data processing capabilities of Smallpond using
the Driver class for complex pipeline execution. It shows how to:
- Create multi-stage data processing pipelines
- Implement different partitioning strategies (row and hash-based)
- Apply SQL transformations on partitioned data
- Use memory-efficient streaming writes

Note: Uses Driver class for advanced execution control instead of smallpond.init()
"""

from smallpond.contrib.copy_table import StreamCopy
from smallpond.execution.driver import Driver
from smallpond.logical.dataset import ParquetDataSet
Expand All @@ -22,16 +36,18 @@ def shuffle_data(
ctx = Context()
dataset = ParquetDataSet(input_paths, union_by_name=True)
data_files = DataSourceNode(ctx, dataset)
# Node 1: Partitions data by splitting based on file boundaries or row counts
data_partitions = DataSetPartitionNode(
ctx,
(data_files,),
npartitions=num_data_partitions,
partition_by_rows=True,
partition_by_rows=True, # distributes rows across partitions
random_shuffle=skip_hash_partition,
)
if skip_hash_partition:
urls_partitions = data_partitions
else:
# Node 2: HashPartiion to ensure even distribution of rows across partitions
urls_partitions = HashPartitionNode(
ctx,
(data_partitions,),
Expand All @@ -40,20 +56,24 @@ def shuffle_data(
random_shuffle=True,
engine_type=engine_type,
)
# Node 3: Example adding SQL Tranformations, here we are adding a sort_key column and sorting one it
shuffled_urls = SqlEngineNode(
ctx,
(urls_partitions,),
r"select *, cast(random() * 2147483647 as integer) as sort_key from {0} order by sort_key",
cpu_limit=16,
)
# Node 4: Repartition again to fit output partition count
repartitioned = DataSetPartitionNode(
ctx,
(shuffled_urls,),
npartitions=num_out_data_partitions,
partition_by_rows=True,
)
# Node 5: Write file in partitioned parquet format using Streaming for memory efficiency
shuffled_urls = StreamCopy(ctx, (repartitioned,), output_name="data_copy", cpu_limit=1)

# Logical Plan DAG with all the Nodes dependencies and creates lazy execution plan
plan = LogicalPlan(ctx, shuffled_urls)
return plan

Expand All @@ -67,6 +87,7 @@ def main():
driver.add_argument("-e", "--engine_type", default="duckdb", choices=("duckdb", "arrow"))
driver.add_argument("-x", "--skip_hash_partition", action="store_true")
plan = shuffle_data(**driver.get_arguments())
# Executes logical plan parallelly, handling task scheduling dependencies & resource management
driver.run(plan)


Expand Down
20 changes: 20 additions & 0 deletions examples/sort_mock_urls.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,23 @@
"""Low-Level URL Sorting Example with Custom Node Implementation
=================================================

This script demonstrates advanced usage of Smallpond's low-level Node API for URL sorting.
It shows how to build a complex data processing pipeline using custom nodes and multiple
execution engines (DuckDB and Arrow).

Key Features:
- Uses Driver class for advanced execution control
- Demonstrates both SQL and Arrow-based sorting approaches
- Shows how to implement custom processing nodes
- Multi-stage pipeline with explicit node dependencies:
1. Data Loading (DataSourceNode)
2. Initial Partitioning (DataSetPartitionNode)
3. URL Parsing (SqlEngineNode)
4. Hash-based Partitioning (HashPartitionNode)
5. Sorting (SqlEngineNode or SortUrlsNode)
6. Final Result Collection (DataSetPartitionNode)
"""

import logging
import os.path
from typing import List, Optional, OrderedDict
Expand Down
20 changes: 20 additions & 0 deletions examples/sort_mock_urls_v2.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,30 @@
"""High-Level URL Sorting Example using DataFrame API
==========================================

This script demonstrates the simplified DataFrame API approach to URL sorting in Smallpond.
It shows how to process and sort URLs using a high-level, pandas-like interface that
automatically handles distributed execution.

Key Features:
- Uses smallpond.init() for simplified setup
- Supports both local and distributed execution via Ray
- Partitioned sorting for better scalability:
* First partitions data by host (hash_by="host")
* Then sorts within each partition (partial_sort)
- For global sorting across all partitions, use:
df.map("SELECT * FROM {0} ORDER BY column")
"""
import argparse
from typing import List

import smallpond
from smallpond.dataframe import Session



def sort_mock_urls_v2(sp: Session, input_paths: List[str], output_path: str, npartitions: int):
dataset = sp.read_csv(input_paths, schema={"urlstr": "varchar", "valstr": "varchar"}, delim=r"\t").repartition(npartitions)
#Creates Dataframe of host, url, payload. Uses DuckDB SQL syntax to transform.
urls = dataset.map(
"""
split_part(urlstr, '/', 1) as host,
Expand All @@ -15,6 +33,8 @@ def sort_mock_urls_v2(sp: Session, input_paths: List[str], output_path: str, npa
"""
)
urls = urls.repartition(npartitions, hash_by="host")
#Sorts each partition independently, sorting done locally in each partition, not globla partition
#For Global sorting use SQL like df.map("""SELECT * FROM {0} ORDER BY sort_column""")
sorted_urls = urls.partial_sort(by=["host"])
sorted_urls.write_parquet(output_path)

Expand Down