Skip to content

Commit

Permalink
Yield during planning
Browse files Browse the repository at this point in the history
  • Loading branch information
geoffxy committed Oct 31, 2023
1 parent d994134 commit cb72f3d
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
7 changes: 7 additions & 0 deletions src/brad/planner/beam/query_based.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import heapq
import json
import logging
Expand Down Expand Up @@ -129,6 +130,12 @@ async def _run_replan_impl(self, window_multiplier: int = 1) -> None:

# 5. Run beam search to formulate the table placements.
for j, query_idx in enumerate(query_indices[1:]):
if j % 5 == 0:
# This is a long-running process. We should yield every so often
# to allow other tasks to run on the daemon (e.g., processing
# metrics messages).
await asyncio.sleep(0)

logger.debug("Processing index %d of %d", j, len(query_indices[1:]))

next_top_k: List[BlueprintCandidate] = []
Expand Down
10 changes: 8 additions & 2 deletions src/brad/planner/beam/table_based.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import heapq
import itertools
import json
Expand Down Expand Up @@ -130,8 +131,13 @@ async def _run_replan_impl(self, window_multiplier: int = 1) -> None:

# 5. Run beam search to formulate the rest of the table placements.
for j, cluster in enumerate(clusters[1:]):
if j % 100 == 0:
logger.debug("Processing index %d of %d", j, len(clusters[1:]))
if j % 5 == 0:
# This is a long-running process. We should yield every so often
# to allow other tasks to run on the daemon (e.g., processing
# metrics messages).
await asyncio.sleep(0)

logger.debug("Processing index %d of %d", j, len(clusters[1:]))

next_top_k: List[BlueprintCandidate] = []
tables, queries, _ = cluster
Expand Down

0 comments on commit cb72f3d

Please sign in to comment.