Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
# Local configuration file (in .metaflow) containing overrides per-project
LOCAL_CONFIG_FILE = "config.json"

# Current runtime configuration
CURRENT_RUNTIME = from_conf("CURRENT_RUNTIME", "local")

###
# Default configuration
###
Expand Down
28 changes: 27 additions & 1 deletion metaflow/metaflow_current.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
from typing import Any, Optional, TYPE_CHECKING

from metaflow.metaflow_config import TEMPDIR
from metaflow.metaflow_config import TEMPDIR, CURRENT_RUNTIME

Parallel = namedtuple(
"Parallel", ["main_ip", "num_nodes", "node_index", "control_task_id"]
Expand All @@ -25,6 +25,7 @@ def __init__(self):
self._metadata_str = None
self._is_running = False
self._tempdir = TEMPDIR
self._compute = "local"

def _raise(ex):
raise ex
Expand All @@ -46,6 +47,7 @@ def _set_env(
metadata_str=None,
is_running=True,
tags=None,
compute="local",
):
if flow is not None:
self._flow_name = flow.name
Expand All @@ -61,6 +63,7 @@ def _set_env(
self._metadata_str = metadata_str
self._is_running = is_running
self._tags = tags
self._compute = compute

def _update_env(self, env):
for k, v in env.items():
Expand Down Expand Up @@ -283,6 +286,29 @@ def tempdir(self) -> Optional[str]:
"""
return self._tempdir

@property
def is_local(self) -> bool:
"""
Is the current execution environment local
"""

return self.runtime == "local"

@property
def runtime(self) -> str:
"""
Name of the current runtime.
"""

return CURRENT_RUNTIME

@property
def compute(self) -> str:
"""
Name of the compute implementation
"""
return self._compute


# instantiate the Current singleton. This will be populated
# by task.MetaflowTask before a task is executed.
Expand Down
1 change: 1 addition & 0 deletions metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2246,6 +2246,7 @@ def _container_templates(self):
"METAFLOW_KUBERNETES_FETCH_EC2_METADATA": KUBERNETES_FETCH_EC2_METADATA,
"METAFLOW_RUNTIME_ENVIRONMENT": "kubernetes",
"METAFLOW_OWNER": self.username,
"METAFLOW_CURRENT_RUNTIME": "argo-workflows",
},
**{
# Configuration for Argo Events. Keep these in sync with the
Expand Down
3 changes: 3 additions & 0 deletions metaflow/plugins/aws/batch/batch_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,9 @@ def task_pre_step(
self.metadata = metadata
self.task_datastore = task_datastore

# Set compute environment info
current._update_env({"compute": "batch"})

# current.tempdir reflects the value of METAFLOW_TEMPDIR (the current working
# directory by default), or the value of tmpfs_path if tmpfs_tempdir=False.
if not self.attributes["tmpfs_tempdir"]:
Expand Down
1 change: 1 addition & 0 deletions metaflow/plugins/aws/step_functions/step_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,7 @@ def _batch(self, node):
env["METAFLOW_PRODUCTION_TOKEN"] = self.production_token
env["SFN_STATE_MACHINE"] = self.name
env["METAFLOW_OWNER"] = attrs["metaflow.owner"]
env["METAFLOW_CURRENT_RUNTIME"] = "step-functions"
# Can't set `METAFLOW_TASK_ID` due to lack of run-scoped identifiers.
# We will instead rely on `AWS_BATCH_JOB_ID` as the task identifier.
# Can't set `METAFLOW_RETRY_COUNT` either due to integer casting issue.
Expand Down
2 changes: 2 additions & 0 deletions metaflow/plugins/kubernetes/kubernetes_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,8 @@ def task_pre_step(
):
self.metadata = metadata
self.task_datastore = task_datastore
# Set compute environment info
current._update_env({"compute": "kubernetes"})

# current.tempdir reflects the value of METAFLOW_TEMPDIR (the current working
# directory by default), or the value of tmpfs_path if tmpfs_tempdir=False.
Expand Down
Loading