Skip to content

An empty RecordBatch will result in an invalid UDF call #5793

@dujl

Description

@dujl

Describe the bug

After filtering, a micropartition may contain empty RecordBatches. In subsequent UDF calls, these empty RecordBatches lead to unnecessary and invalid UDF invocations.

import random
import string

import logging
def configure_logging():
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
        datefmt='%Y-%m-%d %H:%M:%S.%s'.format("%f"))

    logging.getLogger("tracing.span").setLevel(logging.WARNING)
    logging.getLogger("daft_io.stats").setLevel(logging.WARNING)
    logging.getLogger("DaftStatisticsManager").setLevel(logging.WARNING)
    logging.getLogger("DaftFlotillaScheduler").setLevel(logging.WARNING)
    logging.getLogger("DaftFlotillaDispatcher").setLevel(logging.WARNING)

configure_logging()

import daft
from daft import col

logger = logging.getLogger(__name__)
logger.info("Starting Daft task demo")

# 1. 生成 100 行示例数据  
# 1. Generate 100 rows of sample data
def generate_data(n=100):
    data = {
        "id": list(range(n)),
        "value": [random.randint(1, 100) for _ in range(n)],
        "text": [
            "".join(random.choices(string.ascii_lowercase, k=5))
            for _ in range(n)
        ],
    }
    return data

import ray
from ray.job_config import LoggingConfig
ray.init(logging_config=LoggingConfig(
        log_level="INFO",
    ))
daft.context.set_runner_ray(address="auto")
# 构建 Daft DataFrame  
# Build Daft DataFrame
df = daft.from_pydict(generate_data())
# df = df.into_partitions(2)

# 2. 过滤:只保留 id % 3 == 0 的行  
# Filter: only keep rows where id % 3 == 0
filtered_df = df.filter(col("id") % 3 == 0)


@daft.udf(return_dtype=daft.DataType.string(), concurrency=2, batch_size=3)
def to_upper(text_series):
    logger.info(f"batch size is {len(text_series)}")
    import time
    # 模拟一些处理时间
    # Simulate some processing time
    time.sleep(1)  
    return text_series.str.upper()

# 应用 UDF(注意:Daft 的 UDF 是向量化操作) 
# Apply UDF (note: Daft's UDF is a vectorized operation)
processed_df = filtered_df.with_column("text_upper", to_upper(col("text")))

# 4. 执行 count  
# Execute count
result = processed_df.to_pydict()

# 收集结果并打印  
# Collect results and print
print("Filtered and processed rows:", result)

The udf log:

2025-12-04 21:50:12,373	INFO <ipython-input-1-1ae1182d2276>:55 -- batch size is 0 job_id=01000000 worker_id=6d15be16e56ab7bfc7d0b7489fe395aad36a33053ea8966c7b7bb62b node_id=ce25f1a9cc778500068fe78cc5d0078f046f94110e751b304ca60533 actor_id=88902a7a8f95dfd5964e5d7601000000 task_id=3960fbd2f1670d9d88902a7a8f95dfd5964e5d7601000000 task_name=UDFActor.eval_input task_func_name=daft.execution.ray_actor_pool_udf.UDFActor.eval_input actor_name=__main__.to_upper:027b1a8c-1 timestamp_ns=1764856212373853930

To Reproduce

No response

Expected behavior

No response

Component(s)

UDFs (User Defined Functions)

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingp0Priority 0 - to be addressed immediately

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions