Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions worker_plan/worker_plan_internal/luigi_util/parameters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""Custom luigi.Parameter subclasses for PlanExe.

Plain `luigi.Parameter` warns ("not of type string") whenever the
value isn't a `str`. The check auto-skips any subclass — Luigi's
`Parameter._warn_on_wrong_param_type` returns early when
`self.__class__ != Parameter`. Subclassing is enough to silence
the warning while keeping the rest of the parameter behaviour.
"""
from pathlib import Path
import luigi


class PathParameter(luigi.Parameter):
"""Path-valued task parameter. Accepts str or Path; stores Path."""

def parse(self, x):
return Path(x)

def serialize(self, x):
return str(x)

def normalize(self, x):
if x is None:
return None
return Path(x)


class CallableParameter(luigi.Parameter):
"""Holds an arbitrary Python object (e.g. a bound method).

Intended for non-significant private parameters where Luigi's
string-type warning would otherwise fire.
"""
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from pathlib import Path
import luigi
from worker_plan_api.filenames import FilenameEnum
from worker_plan_internal.luigi_util.parameters import PathParameter


class InitialPlanRawTask(luigi.ExternalTask):
Expand All @@ -10,7 +11,7 @@ class InitialPlanRawTask(luigi.ExternalTask):
Tasks that want the bare prompt (rather than SetupTask's
formatted plan.txt) require this and read it via PlanFile.
"""
run_id_dir = luigi.Parameter()
run_id_dir = PathParameter()

def output(self):
return luigi.LocalTarget(str(Path(self.run_id_dir) / FilenameEnum.INITIAL_PLAN_RAW.value))
5 changes: 3 additions & 2 deletions worker_plan/worker_plan_internal/plan/run_plan_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from worker_plan_internal.llm_factory import get_llm_names_by_priority, SPECIAL_AUTO_ID, is_valid_llm_name
from worker_plan_api.model_profile import ModelProfileEnum, normalize_model_profile
from worker_plan_internal.luigi_util.obtain_output_files import ObtainOutputFiles
from worker_plan_internal.luigi_util.parameters import PathParameter, CallableParameter
from worker_plan_internal.plan.pipeline_environment import PipelineEnvironment
from worker_plan_internal.plan.ping_llm import run_ping_llm_report

Expand Down Expand Up @@ -62,7 +63,7 @@ class PlanTask(luigi.Task):
# Default it to the current timestamp, eg. 19841231_235959
# Path to the 'run/{run_id}' directory
_default_outputs_dir = os.getenv('PLANEXE_OUTPUTS_DIR', 'run')
run_id_dir = luigi.Parameter(default=Path(_default_outputs_dir) / datetime.now().strftime("%Y%m%d_%H%M%S"))
run_id_dir = PathParameter(default=Path(_default_outputs_dir) / datetime.now().strftime("%Y%m%d_%H%M%S"))

# By default, run everything but it's slow.
# This can be overridden in developer mode, where a quick turnaround is needed, and the details are not important.
Expand All @@ -76,7 +77,7 @@ class PlanTask(luigi.Task):
# If the callback raises exceptions different than PipelineStopRequested, the pipeline will be aborted. This means that something went wrong, and we should not continue.
# If the callback doesn't raise an exception, the pipeline will continue.
# If the callback is not provided, the pipeline will run until completion.
_pipeline_executor_callback = luigi.Parameter(default=None, significant=False, visibility=luigi.parameter.ParameterVisibility.PRIVATE)
_pipeline_executor_callback = CallableParameter(default=None, significant=False, visibility=luigi.parameter.ParameterVisibility.PRIVATE)

@classmethod
def description(cls) -> str:
Expand Down