diff --git a/datafusion_ray/__init__.py b/datafusion_ray/__init__.py index a920253..d26afa8 100644 --- a/datafusion_ray/__init__.py +++ b/datafusion_ray/__init__.py @@ -20,6 +20,6 @@ except ImportError: import importlib_metadata -from .core import RayContext, prettify, runtime_env +from .core import RayContext, prettify, runtime_env, RayStagePool __version__ = importlib_metadata.version(__name__) diff --git a/datafusion_ray/core.py b/datafusion_ray/core.py index 3d36941..d9c9742 100644 --- a/datafusion_ray/core.py +++ b/datafusion_ray/core.py @@ -17,14 +17,16 @@ from collections import defaultdict +from dataclasses import dataclass import logging import os import pyarrow as pa import asyncio import ray -import uuid +import json import time -from typing import Optional + +from .friendly import new_friendly_name from datafusion_ray._datafusion_ray_internal import ( RayContext as RayContextInternal, @@ -65,15 +67,11 @@ def call_sync(coro): """call a coroutine in the current event loop or run a new one, and synchronously return the result""" try: - try: - loop = asyncio.get_running_loop() - except RuntimeError: - return asyncio.run(coro) - else: - return loop.run_until_complete(coro) - except Exception as e: - log.error(f"Error in call: {e}") - log.exception(e) + loop = asyncio.get_running_loop() + except RuntimeError: + return asyncio.run(coro) + else: + return loop.run_until_complete(coro) # work around for https://github.com/ray-project/ray/issues/31606 @@ -82,30 +80,376 @@ async def _ensure_coro(maybe_obj_ref): async def wait_for(coros, name=""): + """Wait for all coros to complete and return their results. + Does not preserve ordering.""" + return_values = [] # wrap the coro in a task to work with python 3.10 and 3.11+ where asyncio.wait semantics # changed to not accept any awaitable + start = time.time() done, _ = await asyncio.wait([asyncio.create_task(_ensure_coro(c)) for c in coros]) + end = time.time() + log.info(f"waiting for {name} took {end - start}s") for d in done: e = d.exception() if e is not None: log.error(f"Exception waiting {name}: {e}") + raise e else: return_values.append(d.result()) return return_values +class RayStagePool: + """A pool of RayStage actors that can be acquired and released""" + + # TODO: We can probably manage this set in a better way + # This is not a threadsafe implementation. + # This is simple though and will suffice for now + + def __init__(self, min_workers: int, max_workers: int): + self.min_workers = min_workers + self.max_workers = max_workers + + # a map of stage_key (a random identifier) to stage actor reference + self.pool = {} + # a map of stage_key to listening address + self.addrs = {} + + # holds object references from the start_up method for each stage + # we know all stages are listening when all of these refs have + # been waited on. When they are ready we remove them from this set + self.stages_started = set() + + # an event that is set when all stages are ready to serve + self.stages_ready = asyncio.Event() + + # stages that are started but we need to get their address + self.need_address = set() + + # stages that we have the address for but need to start serving + self.need_serving = set() + + # stages in use + self.acquired = set() + + # stages available + self.available = set() + + for _ in range(min_workers): + self._new_stage() + + log.info( + f"created ray stage pool (min_workers: {min_workers}, max_workers: {max_workers})" + ) + + async def start(self): + if not self.stages_ready.is_set(): + await self._wait_for_stages_started() + await self._wait_for_get_addrs() + await self._wait_for_serve() + self.stages_ready.set() + + async def wait_for_ready(self): + await self.stages_ready.wait() + + async def acquire(self, need=1): + stage_keys = [] + + have = len(self.available) + total = len(self.available) + len(self.acquired) + can_make = self.max_workers - total + + need_to_make = need - have + + if need_to_make > can_make: + raise Exception(f"Cannot allocate workers above {self.max_workers}") + + if need_to_make > 0: + log.debug(f"creating {need_to_make} additional stages") + for _ in range(need_to_make): + self._new_stage() + await wait_for([self.start()], "waiting for created stages") + + assert len(self.available) >= need + + for _ in range(need): + stage_key = self.available.pop() + self.acquired.add(stage_key) + + stage_keys.append(stage_key) + + stages = [self.pool[sk] for sk in stage_keys] + addrs = [self.addrs[sk] for sk in stage_keys] + return (stages, stage_keys, addrs) + + def release(self, stage_keys: list[str]): + for stage_key in stage_keys: + self.acquired.remove(stage_key) + self.available.add(stage_key) + + def _new_stage(self): + self.stages_ready.clear() + stage_key = new_friendly_name() + log.debug(f"starting stage: {stage_key}") + stage = RayStage.options(name=f"Stage: {stage_key}").remote(stage_key) + self.pool[stage_key] = stage + self.stages_started.add(stage.start_up.remote()) + self.available.add(stage_key) + + async def _wait_for_stages_started(self): + log.info("waiting for stages to be ready") + started_keys = await wait_for(self.stages_started, "stages to be started") + # we need the addresses of these stages still + self.need_address.update(set(started_keys)) + # we've started all the stages we know about + self.stages_started = set() + log.info("stages are all listening") + + async def _wait_for_get_addrs(self): + # get the addresses in a pipelined fashion + refs = [] + stage_keys = [] + for stage_key in self.need_address: + stage = self.pool[stage_key] + refs.append(stage.addr.remote()) + stage_keys.append(stage_key) + + self.need_serving.add(stage_key) + + addrs = await wait_for(refs, "stage addresses") + + for key, addr in addrs: + self.addrs[key] = addr + + self.need_address = set() + + async def _wait_for_serve(self): + log.info("running stages") + try: + for stage_key in self.need_serving: + log.info(f"starting serving of stage {stage_key}") + stage = self.pool[stage_key] + stage.serve.remote() + self.need_serving = set() + + except Exception as e: + log.error(f"StagePool: Uhandled Exception in serve: {e}") + raise e + + async def all_done(self): + log.info("calling stage all done") + refs = [stage.all_done.remote() for stage in self.pool.values()] + await wait_for(refs, "stages to be all done") + log.info("all stages shutdown") + + +@ray.remote(num_cpus=0) +class RayStage: + def __init__(self, stage_key): + self.stage_key = stage_key + + # import this here so ray doesn't try to serialize the rust extension + from datafusion_ray._datafusion_ray_internal import StageService + + self.stage_service = StageService(stage_key) + + async def start_up(self): + # this method is sync + self.stage_service.start_up() + return self.stage_key + + async def all_done(self): + await self.stage_service.all_done() + + async def addr(self): + return (self.stage_key, self.stage_service.addr()) + + async def update_plan( + self, + stage_id: int, + stage_addrs: dict[int, dict[int, list[str]]], + partition_group: list[int], + plan_bytes: bytes, + ): + await self.stage_service.update_plan( + stage_id, + stage_addrs, + partition_group, + plan_bytes, + ) + + async def serve(self): + log.info(f"[{self.stage_key}] serving on {self.stage_service.addr()}") + await self.stage_service.serve() + log.info(f"[{self.stage_key}] done serving") + + +@dataclass +class StageData: + stage_id: int + plan_bytes: bytes + partition_group: list[int] + child_stage_ids: list[int] + num_output_partitions: int + full_partitions: bool + + +@dataclass +class InternalStageData: + stage_id: int + plan_bytes: bytes + partition_group: list[int] + child_stage_ids: list[int] + num_output_partitions: int + full_partitions: bool + remote_stage: ... # ray.actor.ActorHandle[RayStage] + remote_addr: str + + def __str__(self): + return f"""Stage: {self.stage_id}, pg: {self.partition_group}, child_stages:{self.child_stage_ids}, listening addr:{self.remote_addr}""" + + +@ray.remote(num_cpus=0) +class RayContextSupervisor: + def __init__( + self, + worker_pool_min: int, + worker_pool_max: int, + ) -> None: + log.info(f"Creating RayContextSupervisor worker_pool_min: {worker_pool_min}") + self.pool = RayStagePool(worker_pool_min, worker_pool_max) + self.stages: dict[str, InternalStageData] = {} + log.info("Created RayContextSupervisor") + + async def start(self): + await self.pool.start() + + async def wait_for_ready(self): + await self.pool.wait_for_ready() + + async def get_stage_addrs(self, stage_id: int): + addrs = [ + sd.remote_addr for sd in self.stages.values() if sd.stage_id == stage_id + ] + return addrs + + async def new_query( + self, + stage_datas: list[StageData], + ): + if len(self.stages) > 0: + self.pool.release(list(self.stages.keys())) + + remote_stages, remote_stage_keys, remote_addrs = await self.pool.acquire( + len(stage_datas) + ) + self.stages = {} + + for i, sd in enumerate(stage_datas): + remote_stage = remote_stages[i] + remote_stage_key = remote_stage_keys[i] + remote_addr = remote_addrs[i] + self.stages[remote_stage_key] = InternalStageData( + sd.stage_id, + sd.plan_bytes, + sd.partition_group, + sd.child_stage_ids, + sd.num_output_partitions, + sd.full_partitions, + remote_stage, + remote_addr, + ) + + # sort out the mess of who talks to whom and ensure we can supply the correct + # addresses to each of them + addrs_by_stage_key = await self.sort_out_addresses() + if log.level <= logging.DEBUG: + # TODO: string builder here + out = "" + for stage_key, stage in self.stages.items(): + out += f"[{stage_key}]: {stage}\n" + out += f"child addrs: {addrs_by_stage_key[stage_key]}\n" + log.debug(out) + + refs = [] + # now tell the stages what they are doing for this query + for stage_key, isd in self.stages.items(): + log.info(f"going to update plan for {stage_key}") + kid = addrs_by_stage_key[stage_key] + refs.append( + isd.remote_stage.update_plan.remote( + isd.stage_id, + {stage_id: val["child_addrs"] for (stage_id, val) in kid.items()}, + isd.partition_group, + isd.plan_bytes, + ) + ) + log.info("that's all of them") + + await wait_for(refs, "updating plans") + + async def sort_out_addresses(self): + """Iterate through our stages and gather all of their listening addresses. + Then, provide the addresses to of peer stages to each stage. + """ + addrs_by_stage_key = {} + for stage_key, isd in self.stages.items(): + stage_addrs = defaultdict(dict) + + # using "isd" as shorthand to denote InternalStageData as a reminder + + for child_stage_id in isd.child_stage_ids: + addrs = defaultdict(list) + child_stage_keys, child_stage_datas = zip( + *filter( + lambda x: x[1].stage_id == child_stage_id, + self.stages.items(), + ) + ) + output_partitions = [ + c_isd.num_output_partitions for c_isd in child_stage_datas + ] + + # sanity check + assert all([op == output_partitions[0] for op in output_partitions]) + output_partitions = output_partitions[0] + + for child_stage_isd in child_stage_datas: + if child_stage_isd.full_partitions: + for partition in range(output_partitions): + # this stage is the definitive place to read this output partition + addrs[partition] = [child_stage_isd.remote_addr] + else: + for partition in range(output_partitions): + # this output partition must be gathered from all stages with this stage_id + addrs[partition] = [ + c.remote_addr for c in child_stage_datas + ] + + stage_addrs[child_stage_id]["child_addrs"] = addrs + # not necessary but useful for debug logs + stage_addrs[child_stage_id]["stage_keys"] = child_stage_keys + + addrs_by_stage_key[stage_key] = stage_addrs + + return addrs_by_stage_key + + async def all_done(self): + await self.pool.all_done() + + class RayDataFrame: def __init__( self, ray_internal_df: RayDataFrameInternal, - query_id: str, + supervisor, # ray.actor.ActorHandle[RayContextSupervisor], batch_size=8192, - partitions_per_worker: Optional[int] = None, + partitions_per_worker: int | None = None, prefetch_buffer_size=0, ): self.df = ray_internal_df - self.query_id = query_id + self.supervisor = supervisor self._stages = None self._batches = None self.batch_size = batch_size @@ -119,12 +463,6 @@ def stages(self): self.batch_size, self.prefetch_buffer_size, self.partitions_per_worker ) - self.coord = RayStageCoordinator.options( - name="RayQueryCoordinator:" + self.query_id, - ).remote( - self.query_id, - ) - return self._stages def execution_plan(self): @@ -143,19 +481,19 @@ def collect(self) -> list[pa.RecordBatch]: t2 = time.time() log.debug(f"creating stages took {t2 -t1}s") - last_stage = max([stage.stage_id for stage in self._stages]) - log.debug(f"last stage is {last_stage}") + last_stage_id = max([stage.stage_id for stage in self._stages]) + log.debug(f"last stage is {last_stage_id}") self.create_ray_stages() - t3 = time.time() - log.debug(f"creating ray stage actors took {t3 -t2}s") - self.run_stages() - addrs = ray.get(self.coord.get_stage_addrs.remote()) + last_stage_addrs = ray.get( + self.supervisor.get_stage_addrs.remote(last_stage_id) + ) + log.debug(f"last stage addrs {last_stage_addrs}") - reader = self.df.read_final_stage(last_stage, addrs[last_stage][0][0]) + reader = self.df.read_final_stage(last_stage_id, last_stage_addrs[0]) + log.debug("got reader") self._batches = list(reader) - self.coord.all_done.remote() return self._batches def show(self) -> None: @@ -163,31 +501,27 @@ def show(self) -> None: print(prettify(batches)) def create_ray_stages(self): + stage_datas = [] - # if we are doing each partition separate (isolate_partitions =True) - # then the plan generated will include a PartitionIsolator which - # will take care of that. Our job is to then launch a stage for each - # partition. - # - refs = [] + # note, whereas the PyDataFrameStage object contained in self.stages() + # holds information for a numbered stage, + # when we tell the supervisor about our query, it wants a StageData + # object per actor that will be created. Hence the loop over partition_groups for stage in self.stages(): for partition_group in stage.partition_groups: - refs.append( - self.coord.new_stage.remote( + stage_datas.append( + StageData( stage.stage_id, stage.plan_bytes(), partition_group, + stage.child_stage_ids, stage.num_output_partitions, stage.full_partitions, ) ) - # wait for all stages to be created - # ray.wait(refs, num_returns=len(refs)) - call_sync(wait_for(refs, "creating ray stages")) - - def run_stages(self): - self.coord.serve.remote() + ref = self.supervisor.new_query.remote(stage_datas) + call_sync(wait_for([ref], "creating ray stages")) class RayContext: @@ -195,13 +529,32 @@ def __init__( self, batch_size: int = 8192, prefetch_buffer_size: int = 0, - partitions_per_worker: Optional[int] = None, + partitions_per_worker: int | None = None, + worker_pool_min: int = 1, + worker_pool_max: int = 100, ) -> None: self.ctx = RayContextInternal() self.batch_size = batch_size self.partitions_per_worker = partitions_per_worker self.prefetch_buffer_size = prefetch_buffer_size + self.supervisor = RayContextSupervisor.options( + name="RayContextSupersisor", + ).remote( + worker_pool_min, + worker_pool_max, + ) + + # start up our super visor and don't check in on it until its + # time to query, then we will await this ref + start_ref = self.supervisor.start.remote() + + # ensure we are ready + s = time.time() + call_sync(wait_for([start_ref], "RayContextSupervisor start")) + e = time.time() + log.info(f"RayContext::__init__ waiting for supervisor to be ready took {e-s}s") + def register_parquet(self, name: str, path: str): self.ctx.register_parquet(name, path) @@ -209,12 +562,12 @@ def register_listing_table(self, name: str, path: str, file_extention="parquet") self.ctx.register_listing_table(name, path, file_extention) def sql(self, query: str) -> RayDataFrame: - query_id = str(uuid.uuid4()) df = self.ctx.sql(query) + return RayDataFrame( df, - query_id, + self.supervisor, self.batch_size, self.partitions_per_worker, self.prefetch_buffer_size, @@ -223,175 +576,9 @@ def sql(self, query: str) -> RayDataFrame: def set(self, option: str, value: str) -> None: self.ctx.set(option, value) + def __del__(self): + log.info("RayContext, cleaning up remote resources") + ref = self.supervisor.all_done.remote() + call_sync(wait_for([ref], "RayContextSupervisor all done")) -@ray.remote(num_cpus=0) -class RayStageCoordinator: - def __init__( - self, - query_id: str, - ) -> None: - self.query_id = query_id - self.stages = {} - self.stage_addrs = defaultdict(lambda: defaultdict(list)) - self.output_partitions = {} - self.stages_started = [] - self.stages_ready = asyncio.Event() - - async def all_done(self): - log.debug("calling stage all done") - refs = [stage.all_done.remote() for stage in self.stages.values()] - # ray.wait(refs, num_returns=len(refs)) - await wait_for(refs, "stages to be all done") - log.debug("done stage all done") - - async def new_stage( - self, - stage_id: int, - plan_bytes: bytes, - partition_group: list[int], - num_output_partitions: int, - full_partitions: bool, - ): - - try: - if stage_id in self.output_partitions: - assert self.output_partitions[stage_id] == num_output_partitions - else: - self.output_partitions[stage_id] = num_output_partitions - - # we need a tuple so its hashable - partition_set = tuple(partition_group) - stage_key = (stage_id, partition_set, full_partitions) - - log.debug(f"creating new stage {stage_key} from bytes {len(plan_bytes)}") - stage = RayStage.options( - name=f"Stage: {stage_key}, query_id:{self.query_id}", - ).remote(stage_id, plan_bytes, partition_group) - self.stages[stage_key] = stage - self.stages_started.append(stage.start_up.remote()) - - except Exception as e: - log.error( - f"RayQueryCoordinator[{self.query_id}] Unhandled Exception in new stage! {e}" - ) - raise e - - async def wait_for_stages_ready(self): - log.debug("waiting for stages to be ready") - await self.stages_ready.wait() - - async def ensure_stages_ready(self): - # ray.wait(self.stages_started, num_returns=len(self.stages_started)) - log.debug(f"going to wait for {self.stages_started}") - await wait_for(self.stages_started, "stages to be started") - await self.sort_out_addresses() - log.info("all stages ready") - self.stages_ready.set() - - async def get_stage_addrs(self) -> dict[int, list[str]]: - log.debug("Checking to ensure stages are ready before returning addrs") - await self.wait_for_stages_ready() - log.debug("Looks like they are ready") - return self.stage_addrs - - async def sort_out_addresses(self): - """Iterate through our stages and gather all of their listening addresses. - Then, provide the addresses to of peer stages to each stage. - """ - - # first go get all addresses from the stages we launched, concurrently - # pipeline this by firing up all tasks before awaiting any results - addrs_by_stage = defaultdict(list) - addrs_by_stage_partition = defaultdict(dict) - for stage_key, stage in self.stages.items(): - stage_id, partition_set, full_partitions = stage_key - a_future = stage.addr.remote() - addrs_by_stage[stage_id].append(a_future) - for partition in partition_set: - addrs_by_stage_partition[stage_id][partition] = a_future - - for stage_key, stage in self.stages.items(): - stage_id, partition_set, full_partitions = stage_key - if full_partitions: - for partition in range(self.output_partitions[stage_id]): - self.stage_addrs[stage_id][partition] = await wait_for( - [addrs_by_stage_partition[stage_id][partition]] - ) - else: - for partition in range(self.output_partitions[stage_id]): - self.stage_addrs[stage_id][partition] = await wait_for( - addrs_by_stage[stage_id] - ) - - if log.level <= logging.DEBUG: - out = "" - for stage_id, partition_addrs in self.stage_addrs.items(): - out += f"Stage {stage_id}: \n" - for partition, addrs in partition_addrs.items(): - out += f" partition {partition}: {addrs}\n" - log.debug(f"stage_addrs:\n{out}") - # now update all the stages with the addresses of peers such - # that they can contact their child stages - refs = [] - for stage_key, stage in self.stages.items(): - refs.append(stage.set_stage_addrs.remote(self.stage_addrs)) - - # ray.wait(refs, num_returns=len(refs)) - await wait_for(refs, "stages to to have addrs set") - log.debug("all stage addrs set? or should be") - - async def serve(self): - await self.ensure_stages_ready() - log.info("running stages") - try: - for stage_key, stage in self.stages.items(): - log.info(f"starting serving of stage {stage_key}") - stage.serve.remote() - - except Exception as e: - log.error( - f"RayQueryCoordinator[{self.query_id}] Unhandled Exception in run stages! {e}" - ) - raise e - - -@ray.remote(num_cpus=0) -class RayStage: - def __init__( - self, - stage_id: int, - plan_bytes: bytes, - partition_group: list[int], - ): - - from datafusion_ray._datafusion_ray_internal import StageService - - try: - self.stage_id = stage_id - self.stage_service = StageService( - stage_id, - plan_bytes, - partition_group, - ) - except Exception as e: - log.error( - f"StageService[{self.stage_id}{partition_group}] Unhandled Exception in init: {e}!" - ) - raise - - async def start_up(self): - # this method is sync - self.stage_service.start_up() - - async def all_done(self): - await self.stage_service.all_done() - - async def addr(self): - return self.stage_service.addr() - - async def set_stage_addrs(self, stage_addrs: dict[int, list[str]]): - await self.stage_service.set_stage_addrs(stage_addrs) - - async def serve(self): - await self.stage_service.serve() - log.info("StageService done serving") + # log.debug("all stage addrs set? or should be") diff --git a/datafusion_ray/friendly.py b/datafusion_ray/friendly.py new file mode 100644 index 0000000..49d31e7 --- /dev/null +++ b/datafusion_ray/friendly.py @@ -0,0 +1,1588 @@ +# Adapted from http://en.wikipedia.org/wiki/List_of_animal_names +# and https://github.com/bryanmylee/zoo-ids + +import random + + +def new_friendly_name(): + name = random.choice(animals) + adj = adjectives.pop() + return f"{adj}-{name}" + + +# In total, 221 animals +animals = [ + "aardvark", + "albatross", + "alligator", + "alpaca", + "ant", + "anteater", + "antelope", + "ape", + "armadillo", + "donkey", + "baboon", + "badger", + "barracuda", + "bat", + "bear", + "beaver", + "bee", + "bison", + "boar", + "buffalo", + "butterfly", + "camel", + "capybara", + "caribou", + "cassowary", + "cat", + "caterpillar", + "cattle", + "chamois", + "cheetah", + "chicken", + "chimpanzee", + "chinchilla", + "chough", + "clam", + "cobra", + "cockroach", + "cod", + "cormorant", + "coyote", + "crab", + "crane", + "crocodile", + "crow", + "curlew", + "deer", + "dinosaur", + "dog", + "dogfish", + "dolphin", + "dotterel", + "dove", + "dragonfly", + "duck", + "dugong", + "dunlin", + "eagle", + "echidna", + "eel", + "eland", + "elephant", + "elk", + "emu", + "falcon", + "ferret", + "finch", + "fish", + "flamingo", + "fly", + "fox", + "frog", + "gaur", + "gazelle", + "gerbil", + "giraffe", + "gnat", + "gnu", + "goat", + "goldfinch", + "goldfish", + "goose", + "gorilla", + "goshawk", + "grasshopper", + "grouse", + "guanaco", + "gull", + "hamster", + "hare", + "hawk", + "hedgehog", + "heron", + "herring", + "hippopotamus", + "hornet", + "horse", + "human", + "hummingbird", + "hyena", + "ibex", + "ibis", + "jackal", + "jaguar", + "jay", + "jellyfish", + "kangaroo", + "kingfisher", + "koala", + "kookabura", + "kouprey", + "kudu", + "lapwing", + "lark", + "lemur", + "leopard", + "lion", + "llama", + "lobster", + "locust", + "loris", + "louse", + "lyrebird", + "magpie", + "mallard", + "manatee", + "mandrill", + "mantis", + "marten", + "meerkat", + "mink", + "mole", + "mongoose", + "monkey", + "moose", + "mosquito", + "mouse", + "mule", + "narwhal", + "newt", + "nightingale", + "octopus", + "okapi", + "opossum", + "oryx", + "ostrich", + "otter", + "owl", + "oyster", + "panther", + "parrot", + "partridge", + "peafowl", + "pelican", + "penguin", + "pheasant", + "pig", + "pigeon", + "pony", + "porcupine", + "porpoise", + "quail", + "quelea", + "quetzal", + "rabbit", + "raccoon", + "rail", + "ram", + "rat", + "raven", + "reindeer", + "rhinoceros", + "rook", + "salamander", + "salmon", + "sandpiper", + "sardine", + "scorpion", + "seahorse", + "seal", + "shark", + "sheep", + "shrew", + "skunk", + "snail", + "snake", + "sparrow", + "spider", + "spoonbill", + "squid", + "squirrel", + "starling", + "stingray", + "stinkbug", + "stork", + "swallow", + "swan", + "tapir", + "tarsier", + "termite", + "tiger", + "toad", + "trout", + "turkey", + "turtle", + "viper", + "vulture", + "wallaby", + "walrus", + "wasp", + "weasel", + "whale", + "wildcat", + "wolf", + "wolverine", + "wombat", + "woodcock", + "woodpecker", + "worm", + "wren", + "yak", + "zebra", +] + +adjectives = set( + [ + "abandoned", + "able", + "absolute", + "adorable", + "adventurous", + "academic", + "acceptable", + "acclaimed", + "accomplished", + "accurate", + "aching", + "acidic", + "acrobatic", + "active", + "actual", + "adept", + "admirable", + "admired", + "adolescent", + "adorable", + "adored", + "advanced", + "afraid", + "affectionate", + "aged", + "aggravating", + "aggressive", + "agile", + "agitated", + "agonizing", + "agreeable", + "ajar", + "alarmed", + "alarming", + "alert", + "alienated", + "alive", + "all", + "altruistic", + "amazing", + "ambitious", + "ample", + "amused", + "amusing", + "anchored", + "ancient", + "angelic", + "angry", + "anguished", + "animated", + "annual", + "another", + "antique", + "anxious", + "any", + "apprehensive", + "appropriate", + "apt", + "arctic", + "arid", + "aromatic", + "artistic", + "ashamed", + "assured", + "astonishing", + "athletic", + "attached", + "attentive", + "attractive", + "austere", + "authentic", + "authorized", + "automatic", + "avaricious", + "average", + "aware", + "awesome", + "awful", + "awkward", + "babyish", + "bad", + "back", + "baggy", + "bare", + "barren", + "basic", + "beautiful", + "belated", + "beloved", + "beneficial", + "better", + "best", + "bewitched", + "big", + "big-hearted", + "biodegradable", + "bite-sized", + "bitter", + "black", + "black-and-white", + "bland", + "blank", + "blaring", + "bleak", + "blind", + "blissful", + "blond", + "blue", + "blushing", + "bogus", + "boiling", + "bold", + "bony", + "boring", + "bossy", + "both", + "bouncy", + "bountiful", + "bowed", + "brave", + "breakable", + "brief", + "bright", + "brilliant", + "brisk", + "broken", + "bronze", + "brown", + "bruised", + "bubbly", + "bulky", + "bumpy", + "buoyant", + "burdensome", + "burly", + "bustling", + "busy", + "buttery", + "buzzing", + "calculating", + "calm", + "candid", + "canine", + "capital", + "carefree", + "careful", + "careless", + "caring", + "cautious", + "cavernous", + "celebrated", + "charming", + "cheap", + "cheerful", + "cheery", + "chief", + "chilly", + "chubby", + "circular", + "classic", + "clean", + "clear", + "clear-cut", + "clever", + "close", + "closed", + "cloudy", + "clueless", + "clumsy", + "cluttered", + "coarse", + "cold", + "colorful", + "colorless", + "colossal", + "comfortable", + "common", + "compassionate", + "competent", + "complete", + "complex", + "complicated", + "composed", + "concerned", + "concrete", + "confused", + "conscious", + "considerate", + "constant", + "content", + "conventional", + "cooked", + "cool", + "cooperative", + "coordinated", + "corny", + "corrupt", + "costly", + "courageous", + "courteous", + "crafty", + "crazy", + "creamy", + "creative", + "creepy", + "criminal", + "crisp", + "critical", + "crooked", + "crowded", + "cruel", + "crushing", + "cuddly", + "cultivated", + "cultured", + "cumbersome", + "curly", + "curvy", + "cute", + "cylindrical", + "damaged", + "damp", + "dangerous", + "dapper", + "daring", + "darling", + "dark", + "dazzling", + "dead", + "deadly", + "deafening", + "dear", + "dearest", + "decent", + "decimal", + "decisive", + "deep", + "defenseless", + "defensive", + "defiant", + "deficient", + "definite", + "definitive", + "delayed", + "delectable", + "delicious", + "delightful", + "delirious", + "demanding", + "dense", + "dental", + "dependable", + "dependent", + "descriptive", + "deserted", + "detailed", + "determined", + "devoted", + "different", + "difficult", + "digital", + "diligent", + "dim", + "dimpled", + "dimwitted", + "direct", + "disastrous", + "discrete", + "disfigured", + "disgusting", + "disloyal", + "dismal", + "distant", + "downright", + "dreary", + "dirty", + "disguised", + "dishonest", + "dismal", + "distant", + "distinct", + "distorted", + "dizzy", + "dopey", + "doting", + "double", + "downright", + "drab", + "drafty", + "dramatic", + "dreary", + "droopy", + "dry", + "dual", + "dull", + "dutiful", + "each", + "eager", + "earnest", + "early", + "easy", + "easy-going", + "ecstatic", + "edible", + "educated", + "elaborate", + "elastic", + "elated", + "elderly", + "electric", + "elegant", + "elementary", + "elliptical", + "embarrassed", + "embellished", + "eminent", + "emotional", + "empty", + "enchanted", + "enchanting", + "energetic", + "enlightened", + "enormous", + "enraged", + "entire", + "envious", + "equal", + "equatorial", + "essential", + "esteemed", + "ethical", + "euphoric", + "even", + "evergreen", + "everlasting", + "every", + "evil", + "exalted", + "excellent", + "exemplary", + "exhausted", + "excitable", + "excited", + "exciting", + "exotic", + "expensive", + "experienced", + "expert", + "extraneous", + "extroverted", + "extra-large", + "extra-small", + "fabulous", + "failing", + "faint", + "fair", + "faithful", + "fake", + "false", + "familiar", + "famous", + "fancy", + "fantastic", + "far", + "faraway", + "far-flung", + "far-off", + "fast", + "fat", + "fatal", + "fatherly", + "favorable", + "favorite", + "fearful", + "fearless", + "feisty", + "feline", + "female", + "feminine", + "few", + "fickle", + "filthy", + "fine", + "finished", + "firm", + "first", + "firsthand", + "fitting", + "fixed", + "flaky", + "flamboyant", + "flashy", + "flat", + "flawed", + "flawless", + "flickering", + "flimsy", + "flippant", + "flowery", + "fluffy", + "fluid", + "flustered", + "focused", + "fond", + "foolhardy", + "foolish", + "forceful", + "forked", + "formal", + "forsaken", + "forthright", + "fortunate", + "fragrant", + "frail", + "frank", + "frayed", + "free", + "French", + "fresh", + "frequent", + "friendly", + "frightened", + "frightening", + "frigid", + "frilly", + "frizzy", + "frivolous", + "front", + "frosty", + "frozen", + "frugal", + "fruitful", + "full", + "fumbling", + "functional", + "funny", + "fussy", + "fuzzy", + "gargantuan", + "gaseous", + "general", + "generous", + "gentle", + "genuine", + "giant", + "giddy", + "gigantic", + "gifted", + "giving", + "glamorous", + "glaring", + "glass", + "gleaming", + "gleeful", + "glistening", + "glittering", + "gloomy", + "glorious", + "glossy", + "glum", + "golden", + "good", + "good-natured", + "gorgeous", + "graceful", + "gracious", + "grand", + "grandiose", + "granular", + "grateful", + "grave", + "gray", + "great", + "greedy", + "green", + "gregarious", + "grim", + "grimy", + "gripping", + "grizzled", + "gross", + "grotesque", + "grouchy", + "grounded", + "growing", + "growling", + "grown", + "grubby", + "gruesome", + "grumpy", + "guilty", + "gullible", + "gummy", + "hairy", + "half", + "handmade", + "handsome", + "handy", + "happy", + "happy-go-lucky", + "hard", + "hard-to-find", + "harmful", + "harmless", + "harmonious", + "harsh", + "hasty", + "hateful", + "haunting", + "healthy", + "heartfelt", + "hearty", + "heavenly", + "heavy", + "hefty", + "helpful", + "helpless", + "hidden", + "hideous", + "high", + "high-level", + "hilarious", + "hoarse", + "hollow", + "homely", + "honest", + "honorable", + "honored", + "hopeful", + "horrible", + "hospitable", + "hot", + "huge", + "humble", + "humiliating", + "humming", + "humongous", + "hungry", + "hurtful", + "husky", + "icky", + "icy", + "ideal", + "idealistic", + "identical", + "idle", + "idiotic", + "idolized", + "ignorant", + "ill", + "illegal", + "ill-fated", + "ill-informed", + "illiterate", + "illustrious", + "imaginary", + "imaginative", + "immaculate", + "immaterial", + "immediate", + "immense", + "impassioned", + "impeccable", + "impartial", + "imperfect", + "imperturbable", + "impish", + "impolite", + "important", + "impossible", + "impractical", + "impressionable", + "impressive", + "improbable", + "impure", + "inborn", + "incomparable", + "incompatible", + "incomplete", + "inconsequential", + "incredible", + "indelible", + "inexperienced", + "indolent", + "infamous", + "infantile", + "infatuated", + "inferior", + "infinite", + "informal", + "innocent", + "insecure", + "insidious", + "insignificant", + "insistent", + "instructive", + "insubstantial", + "intelligent", + "intent", + "intentional", + "interesting", + "internal", + "international", + "intrepid", + "ironclad", + "irresponsible", + "irritating", + "itchy", + "jaded", + "jagged", + "jam-packed", + "jaunty", + "jealous", + "jittery", + "joint", + "jolly", + "jovial", + "joyful", + "joyous", + "jubilant", + "judicious", + "juicy", + "jumbo", + "junior", + "jumpy", + "juvenile", + "kaleidoscopic", + "keen", + "key", + "kind", + "kindhearted", + "kindly", + "klutzy", + "knobby", + "knotty", + "knowledgeable", + "knowing", + "known", + "kooky", + "kosher", + "lame", + "lanky", + "large", + "last", + "lasting", + "late", + "lavish", + "lawful", + "lazy", + "leading", + "lean", + "leafy", + "left", + "legal", + "legitimate", + "light", + "lighthearted", + "likable", + "likely", + "limited", + "limp", + "limping", + "linear", + "lined", + "liquid", + "little", + "live", + "lively", + "livid", + "loathsome", + "lone", + "lonely", + "long", + "long-term", + "loose", + "lopsided", + "lost", + "loud", + "lovable", + "lovely", + "loving", + "low", + "loyal", + "lucky", + "lumbering", + "luminous", + "lumpy", + "lustrous", + "luxurious", + "mad", + "made-up", + "magnificent", + "majestic", + "major", + "male", + "mammoth", + "married", + "marvelous", + "masculine", + "massive", + "mature", + "meager", + "mealy", + "mean", + "measly", + "meaty", + "medical", + "mediocre", + "medium", + "meek", + "mellow", + "melodic", + "memorable", + "menacing", + "merry", + "messy", + "metallic", + "mild", + "milky", + "mindless", + "miniature", + "minor", + "minty", + "miserable", + "miserly", + "misguided", + "misty", + "mixed", + "modern", + "modest", + "moist", + "monstrous", + "monthly", + "monumental", + "moral", + "mortified", + "motherly", + "motionless", + "mountainous", + "muddy", + "muffled", + "multicolored", + "mundane", + "murky", + "mushy", + "musty", + "muted", + "mysterious", + "naive", + "narrow", + "nasty", + "natural", + "naughty", + "nautical", + "near", + "neat", + "necessary", + "needy", + "negative", + "neglected", + "negligible", + "neighboring", + "nervous", + "new", + "next", + "nice", + "nifty", + "nimble", + "nippy", + "nocturnal", + "noisy", + "nonstop", + "normal", + "notable", + "noted", + "noteworthy", + "novel", + "noxious", + "numb", + "nutritious", + "nutty", + "obedient", + "obese", + "oblong", + "oily", + "oblong", + "obvious", + "occasional", + "odd", + "oddball", + "offbeat", + "offensive", + "official", + "old", + "old-fashioned", + "only", + "open", + "optimal", + "optimistic", + "opulent", + "orange", + "orderly", + "organic", + "ornate", + "ornery", + "ordinary", + "original", + "other", + "our", + "outlying", + "outgoing", + "outlandish", + "outrageous", + "outstanding", + "oval", + "overcooked", + "overdue", + "overjoyed", + "overlooked", + "palatable", + "pale", + "paltry", + "parallel", + "parched", + "partial", + "passionate", + "past", + "pastel", + "peaceful", + "peppery", + "perfect", + "perfumed", + "periodic", + "perky", + "personal", + "pertinent", + "pesky", + "pessimistic", + "petty", + "phony", + "physical", + "piercing", + "pink", + "pitiful", + "plain", + "plaintive", + "plastic", + "playful", + "pleasant", + "pleased", + "pleasing", + "plump", + "plush", + "polished", + "polite", + "political", + "pointed", + "pointless", + "poised", + "poor", + "popular", + "portly", + "posh", + "positive", + "possible", + "potable", + "powerful", + "powerless", + "practical", + "precious", + "present", + "prestigious", + "pretty", + "precious", + "previous", + "pricey", + "prickly", + "primary", + "prime", + "pristine", + "private", + "prize", + "probable", + "productive", + "profitable", + "profuse", + "proper", + "proud", + "prudent", + "punctual", + "pungent", + "puny", + "pure", + "purple", + "pushy", + "putrid", + "puzzled", + "puzzling", + "quaint", + "qualified", + "quarrelsome", + "quarterly", + "queasy", + "querulous", + "questionable", + "quick", + "quick-witted", + "quiet", + "quintessential", + "quirky", + "quixotic", + "quizzical", + "radiant", + "ragged", + "rapid", + "rare", + "rash", + "raw", + "recent", + "reckless", + "rectangular", + "ready", + "real", + "realistic", + "reasonable", + "red", + "reflecting", + "regal", + "regular", + "reliable", + "relieved", + "remarkable", + "remorseful", + "remote", + "repentant", + "required", + "respectful", + "responsible", + "repulsive", + "revolving", + "rewarding", + "rich", + "rigid", + "right", + "ringed", + "ripe", + "roasted", + "robust", + "rosy", + "rotating", + "rotten", + "rough", + "round", + "rowdy", + "royal", + "rubbery", + "rundown", + "ruddy", + "rude", + "runny", + "rural", + "rusty", + "sad", + "safe", + "salty", + "same", + "sandy", + "sane", + "sarcastic", + "sardonic", + "satisfied", + "scaly", + "scarce", + "scared", + "scary", + "scented", + "scholarly", + "scientific", + "scornful", + "scratchy", + "scrawny", + "second", + "secondary", + "second-hand", + "secret", + "self-assured", + "self-reliant", + "selfish", + "sentimental", + "separate", + "serene", + "serious", + "serpentine", + "several", + "severe", + "shabby", + "shadowy", + "shady", + "shallow", + "shameful", + "shameless", + "sharp", + "shimmering", + "shiny", + "shocked", + "shocking", + "shoddy", + "short", + "short-term", + "showy", + "shrill", + "shy", + "sick", + "silent", + "silky", + "silly", + "silver", + "similar", + "simple", + "simplistic", + "sinful", + "single", + "sizzling", + "skeletal", + "skinny", + "sleepy", + "slight", + "slim", + "slimy", + "slippery", + "slow", + "slushy", + "small", + "smart", + "smoggy", + "smooth", + "smug", + "snappy", + "snarling", + "sneaky", + "sniveling", + "snoopy", + "sociable", + "soft", + "soggy", + "solid", + "somber", + "some", + "spherical", + "sophisticated", + "sore", + "sorrowful", + "soulful", + "soupy", + "sour", + "Spanish", + "sparkling", + "sparse", + "specific", + "spectacular", + "speedy", + "spicy", + "spiffy", + "spirited", + "spiteful", + "splendid", + "spotless", + "spotted", + "spry", + "square", + "squeaky", + "squiggly", + "stable", + "staid", + "stained", + "stale", + "standard", + "starchy", + "stark", + "starry", + "steep", + "sticky", + "stiff", + "stimulating", + "stingy", + "stormy", + "straight", + "strange", + "steel", + "strict", + "strident", + "striking", + "striped", + "strong", + "studious", + "stunning", + "stupendous", + "stupid", + "sturdy", + "stylish", + "subdued", + "submissive", + "substantial", + "subtle", + "suburban", + "sudden", + "sugary", + "sunny", + "super", + "superb", + "superficial", + "superior", + "supportive", + "sure-footed", + "surprised", + "suspicious", + "svelte", + "sweaty", + "sweet", + "sweltering", + "swift", + "sympathetic", + "tall", + "talkative", + "tame", + "tan", + "tangible", + "tart", + "tasty", + "tattered", + "taut", + "tedious", + "teeming", + "tempting", + "tender", + "tense", + "tepid", + "terrible", + "terrific", + "testy", + "thankful", + "that", + "these", + "thick", + "thin", + "third", + "thirsty", + "this", + "thorough", + "thorny", + "those", + "thoughtful", + "threadbare", + "thrifty", + "thunderous", + "tidy", + "tight", + "timely", + "tinted", + "tiny", + "tired", + "torn", + "total", + "tough", + "traumatic", + "treasured", + "tremendous", + "tragic", + "trained", + "tremendous", + "triangular", + "tricky", + "trifling", + "trim", + "trivial", + "troubled", + "true", + "trusting", + "trustworthy", + "trusty", + "truthful", + "tubby", + "turbulent", + "twin", + "ugly", + "ultimate", + "unacceptable", + "unaware", + "uncomfortable", + "uncommon", + "unconscious", + "understated", + "unequaled", + "uneven", + "unfinished", + "unfit", + "unfolded", + "unfortunate", + "unhappy", + "unhealthy", + "uniform", + "unimportant", + "unique", + "united", + "unkempt", + "unknown", + "unlawful", + "unlined", + "unlucky", + "unnatural", + "unpleasant", + "unrealistic", + "unripe", + "unruly", + "unselfish", + "unsightly", + "unsteady", + "unsung", + "untidy", + "untimely", + "untried", + "untrue", + "unused", + "unusual", + "unwelcome", + "unwieldy", + "unwilling", + "unwitting", + "unwritten", + "upbeat", + "upright", + "upset", + "urban", + "usable", + "used", + "useful", + "useless", + "utilized", + "utter", + "vacant", + "vague", + "vain", + "valid", + "valuable", + "vapid", + "variable", + "vast", + "velvety", + "venerated", + "vengeful", + "verifiable", + "vibrant", + "vicious", + "victorious", + "vigilant", + "vigorous", + "villainous", + "violet", + "violent", + "virtual", + "virtuous", + "visible", + "vital", + "vivacious", + "vivid", + "voluminous", + "wan", + "warlike", + "warm", + "warmhearted", + "warped", + "wary", + "wasteful", + "watchful", + "waterlogged", + "watery", + "wavy", + "wealthy", + "weak", + "weary", + "webbed", + "wee", + "weekly", + "weepy", + "weighty", + "weird", + "welcome", + "well-documented", + "well-groomed", + "well-informed", + "well-lit", + "well-made", + "well-off", + "well-to-do", + "well-worn", + "wet", + "which", + "whimsical", + "whirlwind", + "whispered", + "white", + "whole", + "whopping", + "wicked", + "wide", + "wide-eyed", + "wiggly", + "wild", + "willing", + "wilted", + "winding", + "windy", + "winged", + "wiry", + "wise", + "witty", + "wobbly", + "woeful", + "wonderful", + "wooden", + "woozy", + "wordy", + "worldly", + "worn", + "worried", + "worrisome", + "worse", + "worst", + "worthless", + "worthwhile", + "worthy", + "wrathful", + "wretched", + "writhing", + "wrong", + "wry", + "yawning", + "yearly", + "yellow", + "yellowish", + "young", + "youthful", + "yummy", + "zany", + "zealous", + "zesty", + "zigzag", + ] +) diff --git a/src/dataframe.rs b/src/dataframe.rs index 7bc00f2..3878975 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -397,7 +397,7 @@ impl PyDataFrameStage { /// returns the stage ids of that we need to read from in order to execute #[getter] - pub fn input_stage_ids(&self) -> PyResult> { + pub fn child_stage_ids(&self) -> PyResult> { let mut result = vec![]; self.plan .clone() diff --git a/src/ray_stage_reader.rs b/src/ray_stage_reader.rs index 093ed4e..317fdcd 100644 --- a/src/ray_stage_reader.rs +++ b/src/ray_stage_reader.rs @@ -111,8 +111,11 @@ impl ExecutionPlan for RayStageReaderExec { let clients = client_map .get(&(self.stage_id, partition)) .ok_or(internal_datafusion_err!( - "No flight clients found for {}", - self.stage_id + "{} No flight clients found for {}:{}, have {:?}", + name, + self.stage_id, + partition, + client_map.keys() ))? .lock() .iter() @@ -138,17 +141,20 @@ impl ExecutionPlan for RayStageReaderExec { let mut streams = vec![]; for mut client in clients { + let name = name.clone(); + trace!("{name} Getting flight stream" ); match client.do_get(ticket.clone()).await { Ok(flight_stream) => { + trace!("{name} Got flight stream. headers:{:?}", flight_stream.headers()); let rbr_stream = RecordBatchStreamAdapter::new(schema.clone(), flight_stream - .map_err(|e| internal_datafusion_err!("Error consuming flight stream: {}", e))); + .map_err(move |e| internal_datafusion_err!("{} Error consuming flight stream: {}", name, e))); streams.push(Box::pin(rbr_stream) as SendableRecordBatchStream); }, Err(e) => { error = true; - yield internal_err!("Error getting flight stream: {}", e); + yield internal_err!("{} Error getting flight stream: {}", name, e); } } } diff --git a/src/stage_service.rs b/src/stage_service.rs index 040906d..0307a7b 100644 --- a/src/stage_service.rs +++ b/src/stage_service.rs @@ -20,15 +20,18 @@ use std::collections::HashMap; use std::error::Error; use std::sync::Arc; +use arrow::array::RecordBatch; +use arrow::datatypes::Schema; use arrow_flight::encode::FlightDataEncoderBuilder; use arrow_flight::error::FlightError; use arrow_flight::FlightClient; use datafusion::common::internal_datafusion_err; use datafusion::execution::SessionStateBuilder; +use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_python::utils::wait_for_future; -use futures::TryStreamExt; +use futures::{Stream, TryStreamExt}; use local_ip_address::local_ip; use log::{debug, error, info, trace}; use tokio::net::TcpListener; @@ -42,11 +45,11 @@ use arrow_flight::{flight_service_server::FlightServiceServer, Ticket}; use pyo3::prelude::*; -use parking_lot::Mutex; +use parking_lot::{Mutex, RwLock}; use tokio::sync::mpsc::{channel, Receiver, Sender}; -use crate::flight::{FlightHandler, FlightServ}; +use crate::flight::{DoGetStream, FlightHandler, FlightServ}; use crate::isolator::PartitionGroup; use crate::util::{ bytes_to_physical_plan, display_plan_with_partition_counts, extract_ticket, fix_plan, @@ -61,45 +64,67 @@ pub(crate) struct ServiceClients(pub HashMap<(usize, usize), Mutex>, +} + +struct StageHandlerInner { /// our stage id that we are hosting pub(crate) stage_id: usize, /// the physical plan that comprises our stage - plan: Arc, + pub(crate) plan: Arc, /// the session context we will use to execute the plan - ctx: Mutex>, - /// The partitions we will be hosting from this plan. - partition_group: Vec, + pub(crate) ctx: SessionContext, } impl StageHandler { + pub fn new(name: String) -> Self { + let inner = RwLock::new(None); + + Self { name, inner } + } + async fn update_plan( + &self, + stage_id: usize, + stage_addrs: HashMap>>, + plan: Arc, + partition_group: Vec, + ) -> DFResult<()> { + let inner = StageHandlerInner::new(stage_id, stage_addrs, plan, partition_group).await?; + self.inner.write().replace(inner); + Ok(()) + } + + fn stage_id(&self) -> Option { + self.inner.read().as_ref().map(|i| i.stage_id) + } +} + +impl StageHandlerInner { pub async fn new( stage_id: usize, - plan_bytes: &[u8], + stage_addrs: HashMap>>, + plan: Arc, partition_group: Vec, ) -> DFResult { - let plan = bytes_to_physical_plan(&SessionContext::new(), plan_bytes)?; - let plan = fix_plan(plan)?; - debug!( - "StageHandler::new [Stage:{}], plan:\n{}", - stage_id, - display_plan_with_partition_counts(&plan) - ); - - let ctx = Mutex::new(None); + let ctx = Self::configure_ctx(stage_id, stage_addrs, &plan, partition_group).await?; Ok(Self { stage_id, plan, ctx, - partition_group, }) } async fn configure_ctx( - &self, + stage_id: usize, stage_addrs: HashMap>>, - ) -> DFResult<()> { - let stage_ids_i_need = input_stage_ids(&self.plan)?; + plan: &Arc, + partition_group: Vec, + ) -> DFResult { + let stage_ids_i_need = input_stage_ids(&plan)?; // map of stage_id, partition -> Vec let mut client_map = HashMap::new(); @@ -141,7 +166,7 @@ impl StageHandler { // this only matters if the plan includes an PartitionIsolatorExec, which looks for this // for this extension and will be ignored otherwise - config = config.with_extension(Arc::new(PartitionGroup(self.partition_group.clone()))); + config = config.with_extension(Arc::new(PartitionGroup(partition_group.clone()))); let state = SessionStateBuilder::new() .with_default_features() @@ -149,12 +174,33 @@ impl StageHandler { .build(); let ctx = SessionContext::new_with_state(state); - self.ctx.lock().replace(ctx); - trace!("ctx configured for stage {}", self.stage_id); - Ok(()) + trace!("ctx configured for stage {}", stage_id); + + Ok(ctx) } } +fn make_stream( + inner: &StageHandlerInner, + partition: usize, +) -> Result> + Send + 'static, Status> { + let task_ctx = inner.ctx.task_ctx(); + + let stream = inner + .plan + .execute(partition, task_ctx) + .inspect_err(|e| { + error!( + "{}", + format!("Could not get partition stream from plan {e}") + ) + }) + .map_err(|e| Status::internal(format!("Could not get partition stream from plan {e}")))? + .map_err(|e| FlightError::from_external_error(Box::new(e))); + + Ok(stream) +} + #[async_trait] impl FlightHandler for StageHandler { async fn get_stream( @@ -168,41 +214,33 @@ impl FlightHandler for StageHandler { let ticket = request.into_inner(); - let partition = extract_ticket(ticket) - .map_err(|e| Status::internal(format!("Unexpected error extracting ticket {e}")))?; + let partition = extract_ticket(ticket).map_err(|e| { + Status::internal(format!( + "{}, Unexpected error extracting ticket {e}", + self.name + )) + })?; trace!( - "StageService[Stage:{}], request for partition {} from {}", - self.stage_id, + "{}, request for partition {} from {}", + self.name, partition, remote_addr ); - let task_ctx = self - .ctx - .lock() - .as_ref() - .ok_or(Status::internal(format!( - "Stage [{}] get_stream cannot find ctx", - self.stage_id - )))? - .task_ctx(); - + let name = self.name.clone(); let stream = self - .plan - .execute(partition, task_ctx) - .inspect_err(|e| { - error!( - "{}", - format!("Could not get partition stream from plan {e}") - ) - }) - .map_err(|e| Status::internal(format!("Could not get partition stream from plan {e}")))? - .map_err(|e| FlightError::from_external_error(Box::new(e))); + .inner + .read() + .as_ref() + .map(|inner| make_stream(inner, partition)) + .ok_or_else(|| Status::internal(format!("{} No inner found", &name)))??; let out_stream = FlightDataEncoderBuilder::new() .build(stream) - .map_err(|e| Status::internal(format!("Unexpected error building stream {e}"))); + .map_err(move |e| { + Status::internal(format!("{} Unexpected error building stream {e}", name)) + }); Ok(Response::new(Box::pin(out_stream))) } @@ -225,22 +263,15 @@ pub struct StageService { #[pymethods] impl StageService { #[new] - pub fn new( - py: Python, - stage_id: usize, - plan_bytes: &[u8], - partition_group: Vec, - ) -> PyResult { + pub fn new(name: String) -> PyResult { + let name = format!("[{}]", name); let listener = None; let addr = None; let (all_done_tx, all_done_rx) = channel(1); let all_done_tx = Arc::new(Mutex::new(all_done_tx)); - let name = format!("StageService[{}]", stage_id); - let fut = StageHandler::new(stage_id, plan_bytes, partition_group); - - let handler = Arc::new(wait_for_future(py, fut).to_py_err()?); + let handler = Arc::new(StageHandler::new(name.clone())); Ok(Self { name, @@ -272,33 +303,65 @@ impl StageService { /// get the address of the listing socket for this service pub fn addr(&self) -> PyResult { - self.addr - .clone() - .ok_or_else(|| PyErr::new::("Couldn't get addr")) + self.addr.clone().ok_or_else(|| { + PyErr::new::(format!( + "{},Couldn't get addr", + self.name + )) + }) } - pub fn set_stage_addrs<'a>( - &mut self, - py: Python<'a>, - stage_addrs: HashMap>>, - ) -> PyResult> { - let handler = self.handler.clone(); + /// signal to the service that we can shutdown + /// + /// returns a python coroutine that should be awaited + pub fn all_done<'a>(&self, py: Python<'a>) -> PyResult> { + let sender = self.all_done_tx.lock().clone(); + let fut = async move { - handler.configure_ctx(stage_addrs).await.to_py_err()?; + sender.send(()).await.to_py_err()?; Ok(()) }; pyo3_async_runtimes::tokio::future_into_py(py, fut) } - /// signal to the service that we can shutdown + /// replace the plan that this service was providing, we will do this when we want + /// to reuse the StageService for a subsequent query + /// /// returns a python coroutine that should be awaited - pub fn all_done<'a>(&self, py: Python<'a>) -> PyResult> { - let sender = self.all_done_tx.lock().clone(); + pub fn update_plan<'a>( + &self, + py: Python<'a>, + stage_id: usize, + stage_addrs: HashMap>>, + partition_group: Vec, + plan_bytes: &[u8], + ) -> PyResult> { + let plan = bytes_to_physical_plan(&SessionContext::new(), plan_bytes)?; + debug!( + "{} Received New Plan: Stage:{} my addr: {}, partition_group {:?}, stage_addrs:\n{:?}\nplan:\n{}", + self.name, + stage_id, + self.addr()?, + partition_group, + stage_addrs, + display_plan_with_partition_counts(&plan) + ); + + let handler = self.handler.clone(); + let name = self.name.clone(); let fut = async move { - sender.send(()).await.to_py_err()?; + handler + .update_plan(stage_id, stage_addrs, plan, partition_group.clone()) + .await + .to_py_err()?; + info!( + "{} [stage: {} pg:{:?}] updated plan", + name, stage_id, partition_group + ); Ok(()) }; + pyo3_async_runtimes::tokio::future_into_py(py, fut) } @@ -309,7 +372,7 @@ impl StageService { let signal = async move { // TODO: handle Result - let _ = all_done_rx.recv().await; + let result = all_done_rx.recv().await; }; let service = FlightServ { @@ -319,11 +382,9 @@ impl StageService { let svc = FlightServiceServer::new(service); let listener = self.listener.take().unwrap(); - let name = self.name.clone(); - let stage_id = self.handler.stage_id; + let serv = async move { - trace!("StageService [{}] Serving", stage_id); Server::builder() .add_service(svc) .serve_with_incoming_shutdown( @@ -331,9 +392,8 @@ impl StageService { signal, ) .await - .inspect_err(|e| error!("StageService [{}] ERROR serving {e}", name)) + .inspect_err(|e| error!("{}, ERROR serving {e}", name)) .map_err(|e| PyErr::new::(format!("{e}")))?; - info!("tageService [{}] DONE serving", name); Ok::<(), Box>(()) }; diff --git a/tpch/tpc.py b/tpch/tpc.py index 6c26bf8..8fa5ae8 100644 --- a/tpch/tpc.py +++ b/tpch/tpc.py @@ -14,18 +14,28 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +# +# +# +# This file is useful for running a query against a TPCH dataset. +# +# You can run an arbitrary query by passing --query 'select...' or you can run a +# TPCH query by passing --qnum 1-22. import argparse import ray -from datafusion import SessionContext, SessionConfig -from datafusion_ray import RayContext, prettify, runtime_env -from datetime import datetime -import json +from datafusion_ray import RayContext, runtime_env import os +import sys import time -import duckdb -from datafusion.object_store import AmazonS3 +try: + import duckdb +except ImportError: + print( + "duckdb not installed, which is used in this file for retrieving the TPCH query" + ) + sys.exit(1) def make_ctx( @@ -33,6 +43,7 @@ def make_ctx( concurrency: int, batch_size: int, partitions_per_worker: int | None, + worker_pool_min: int, listing_tables: bool, ): @@ -51,7 +62,11 @@ def make_ctx( # use ray job submit ray.init(runtime_env=runtime_env) - ctx = RayContext(batch_size=batch_size, partitions_per_worker=partitions_per_worker) + ctx = RayContext( + batch_size=batch_size, + partitions_per_worker=partitions_per_worker, + worker_pool_min=worker_pool_min, + ) ctx.set("datafusion.execution.target_partitions", f"{concurrency}") # ctx.set("datafusion.execution.parquet.pushdown_filters", "true") @@ -75,19 +90,19 @@ def main( batch_size: int, query: str, partitions_per_worker: int | None, - validate: bool, + worker_pool_min: int, listing_tables, ) -> None: ctx = make_ctx( - data_path, concurrency, batch_size, partitions_per_worker, listing_tables + data_path, + concurrency, + batch_size, + partitions_per_worker, + worker_pool_min, + listing_tables, ) df = ctx.sql(query) - for stage in df.stages(): - print( - f"Stage {stage.stage_id} output partitions:{stage.num_output_partitions} partition_groups: {stage.partition_groups}" - ) - print(stage.execution_plan().display_indent()) - + time.sleep(3) df.show() @@ -110,7 +125,11 @@ def tpch_query(qnum: int) -> str: type=int, help="Max partitions per Stage Service Worker", ) - parser.add_argument("--validate", action="store_true") + parser.add_argument( + "--worker-pool-min", + type=int, + help="Minimum number of RayStages to keep in pool", + ) parser.add_argument("--listing-tables", action="store_true") args = parser.parse_args() @@ -125,6 +144,6 @@ def tpch_query(qnum: int) -> str: int(args.batch_size), query, args.partitions_per_worker, - args.validate, + args.worker_pool_min, args.listing_tables, ) diff --git a/tpch/tpcbench.py b/tpch/tpcbench.py index 1a93360..f8a9dbc 100644 --- a/tpch/tpcbench.py +++ b/tpch/tpcbench.py @@ -24,7 +24,13 @@ import os import time -import duckdb +try: + import duckdb +except ImportError: + print( + "duckdb not installed, which is used in this file for retrieving the TPCH query" + ) + sys.exit(1) def tpch_query(qnum: int) -> str: @@ -38,6 +44,7 @@ def main( concurrency: int, batch_size: int, partitions_per_worker: int | None, + worker_pool_min: int, listing_tables: bool, validate: bool, prefetch_buffer_size: int, @@ -62,6 +69,7 @@ def main( batch_size=batch_size, partitions_per_worker=partitions_per_worker, prefetch_buffer_size=prefetch_buffer_size, + worker_pool_min=worker_pool_min, ) ctx.set("datafusion.execution.target_partitions", f"{concurrency}") @@ -189,6 +197,11 @@ def main( type=int, help="How many batches each stage should eagerly buffer", ) + parser.add_argument( + "--worker-pool-min", + type=int, + help="Minimum number of RayStages to keep in pool", + ) args = parser.parse_args() @@ -198,6 +211,7 @@ def main( int(args.concurrency), int(args.batch_size), args.partitions_per_worker, + args.worker_pool_min, args.listing_tables, args.validate, args.prefetch_buffer_size,