diff --git a/docs/book/component-guide/orchestrators/README.md b/docs/book/component-guide/orchestrators/README.md index de13007055c..e68be150d24 100644 --- a/docs/book/component-guide/orchestrators/README.md +++ b/docs/book/component-guide/orchestrators/README.md @@ -34,6 +34,7 @@ Out of the box, ZenML comes with a `local` orchestrator already part of the defa | [SkypilotGCPOrchestrator](skypilot-vm.md) | `vm_gcp` | `skypilot[gcp]` | Runs your pipelines in GCP VMs using SkyPilot | | [SkypilotAzureOrchestrator](skypilot-vm.md) | `vm_azure` | `skypilot[azure]` | Runs your pipelines in Azure VMs using SkyPilot | | [HyperAIOrchestrator](hyperai.md) | `hyperai` | `hyperai` | Runs your pipeline in HyperAI.ai instances. | +| [ModalOrchestrator](modal.md) | `modal` | `modal` | Runs your pipelines on Modal's serverless cloud platform. | | [Custom Implementation](custom.md) | _custom_ | | Extend the orchestrator abstraction and provide your own implementation | If you would like to see the available flavors of orchestrators, you can use the command: diff --git a/docs/book/component-guide/orchestrators/modal.md b/docs/book/component-guide/orchestrators/modal.md new file mode 100644 index 00000000000..4516ccc229e --- /dev/null +++ b/docs/book/component-guide/orchestrators/modal.md @@ -0,0 +1,718 @@ +--- +description: Orchestrating your pipelines to run on Modal's serverless cloud platform. +--- + +# Modal Orchestrator + +Using the ZenML `modal` integration, you can orchestrate and scale your ML pipelines on [Modal's](https://modal.com/) serverless cloud platform with minimal setup and maximum efficiency. + +The Modal orchestrator is designed for speed and cost-effectiveness, running entire pipelines using Modal sandboxes with persistent app architecture for maximum flexibility and efficiency. + +{% hint style="warning" %} +This component is only meant to be used within the context of a [remote ZenML deployment scenario](https://docs.zenml.io/getting-started/deploying-zenml/). Usage with a local ZenML deployment may lead to unexpected behavior! +{% endhint %} + +## When to use it + +You should use the Modal orchestrator if: + +* you want a serverless solution that scales to zero when not in use. +* you're looking for fast pipeline execution with minimal cold start overhead. +* you want cost-effective ML pipeline orchestration without managing infrastructure. +* you need easy access to GPUs and high-performance computing resources. +* you prefer a simple setup process without complex Kubernetes configurations. +* you need flexibility between fast pipeline execution (PIPELINE mode) and step-level resource isolation (PER_STEP mode). + +## When NOT to use it + +The Modal orchestrator may not be the best choice if: + +* **You need extremely fine-grained control beyond per-step isolation**: While the Modal orchestrator supports two execution modes (PIPELINE mode for speed and PER_STEP mode for step isolation), if you need even more granular control over individual step execution environments, consider the [Modal step operator](../step-operators/modal.md) instead. + +* **You have strict data locality requirements**: Modal runs in specific cloud regions and may not be suitable if you need to keep data processing within specific geographic boundaries or on-premises. + +* **You require very long-running pipelines**: While Modal supports up to 24-hour timeouts, extremely long-running batch jobs (days/weeks) might be better suited for other orchestrators. + +* **You need complex workflow patterns**: Modal orchestrator is optimized for straightforward ML pipelines. If you need complex DAG patterns, conditional logic, or dynamic pipeline generation, other orchestrators might be more suitable. + +* **Cost optimization for infrequent workloads**: While Modal is cost-effective for regular workloads, very infrequent pipelines (running once per month) might benefit from traditional infrastructure that doesn't incur per-execution overhead. + +## How to deploy it + +The Modal orchestrator runs on Modal's cloud infrastructure, so you don't need to deploy or manage any servers. You just need: + +1. A [Modal account](https://modal.com/) (free tier available) +2. Modal CLI installed and authenticated +3. A [remote ZenML deployment](https://docs.zenml.io/getting-started/deploying-zenml/) for production use + +## How to use it + +### Quick Start (5 minutes) + +```bash +# 1. Install Modal integration +zenml integration install modal + +# 2. Setup Modal authentication +modal setup + +# 3. Register orchestrator and run +zenml orchestrator register modal_orch --flavor=modal --synchronous=true +zenml stack update -o modal_orch +python my_pipeline.py +``` + +### Full Setup Requirements + +To use the Modal orchestrator, you need: + +* The ZenML `modal` integration installed. If you haven't done so, run: + ```shell + zenml integration install modal + ``` +* [Docker](https://www.docker.com) installed and running. +* A [remote artifact store](../artifact-stores/README.md) as part of your stack. +* A [remote container registry](../container-registries/README.md) as part of your stack. +* Modal authenticated: + ```shell + modal setup + ``` + +### Setting up the orchestrator + +You can register the orchestrator with or without explicit Modal credentials: + +**Option 1: Using Modal CLI authentication (recommended for development)** + +```shell +# Register the orchestrator (uses Modal CLI credentials) +zenml orchestrator register \ + --flavor=modal \ + --synchronous=true + +# Register and activate a stack with the new orchestrator +zenml stack register -o ... --set +``` + +**Option 2: Using Modal API token (recommended for production)** + +```shell +# Register the orchestrator with explicit credentials +zenml orchestrator register \ + --flavor=modal \ + --token-id= \ + --token-secret= \ + --workspace= \ + --synchronous=true + +# Register and activate a stack with the new orchestrator +zenml stack register -o ... --set +``` + +You can get your Modal token from the [Modal dashboard](https://modal.com/settings/tokens). + +{% hint style="info" %} +ZenML will build a Docker image called `/zenml:` which includes your code and use it to run your pipeline steps in Modal functions. Check out [this page](https://docs.zenml.io/concepts/containerization) if you want to learn more about how ZenML builds these images and how you can customize them. +{% endhint %} + +You can now run any ZenML pipeline using the Modal orchestrator: + +```shell +python file_that_runs_a_zenml_pipeline.py +``` + +### Modal UI + +Modal provides an excellent web interface where you can monitor your pipeline runs in real-time, view logs, and track resource usage. + +You can access the Modal dashboard at [modal.com/apps](https://modal.com/apps) to see your running and completed functions. + +### Configuration overview + +{% hint style="info" %} +**Modal Orchestrator vs Step Operator** + +ZenML offers both a [Modal orchestrator](modal.md) and a [Modal step operator](../step-operators/modal.md). Choose based on your needs: + +| Feature | Modal Step Operator | Modal Orchestrator | +|---------|-------------------|-------------------| +| **Execution Scope** | Individual steps only | Entire pipeline | +| **Orchestration** | Local ZenML | Remote Modal | +| **Resource Flexibility** | Per-step resources | Pipeline-wide resources | +| **Cost Model** | Pay per step execution | Pay per pipeline execution | +| **Setup Complexity** | Simple | Requires remote ZenML | +| **Best For** | Hybrid workflows, selective GPU usage | Full cloud execution, production | + +**Quick Decision Guide**: +- **Use Step Operator**: Need GPUs for only some steps, have local data dependencies, want hybrid local/cloud workflow +- **Use Orchestrator**: Want full cloud execution, production deployment, consistent resource requirements +{% endhint %} + +The Modal orchestrator uses two types of settings following ZenML's standard pattern: + +1. **`ResourceSettings`** (standard ZenML) - for hardware resource quantities: + - `cpu_count` - Number of CPU cores + - `memory` - Memory allocation (e.g., "16GB") + - `gpu_count` - Number of GPUs to allocate + +2. **`ModalOrchestratorSettings`** (Modal-specific) - for Modal platform configuration: + - `gpu` - GPU type specification (e.g., "T4", "A100", "H100") + - `region` - Cloud region preference + - `cloud` - Cloud provider selection + - `modal_environment` - Modal environment name (e.g., "main", "dev", "prod") + - `app_name` - Custom Modal app name (defaults to pipeline name) + - `mode` - Execution strategy: "pipeline" (default) or "per_step" + - `max_parallelism` - Maximum concurrent steps (for "per_step" mode) + - `timeout` - Maximum execution time in seconds + - `synchronous` - Wait for completion (True) or fire-and-forget (False) + +{% hint style="info" %} +**GPU Configuration**: Use `ResourceSettings.gpu_count` to specify how many GPUs you need, and `ModalOrchestratorSettings.gpu` to specify what type of GPU. Modal will combine these automatically (e.g., `gpu_count=2` + `gpu="A100"` becomes `"A100:2"`). +{% endhint %} + +### Configuration Examples + +**Simple Configuration (Recommended):** + +```python +from zenml.integrations.modal.flavors.modal_orchestrator_flavor import ( + ModalOrchestratorSettings +) + +# Simple GPU pipeline +@pipeline( + settings={ + "orchestrator": ModalOrchestratorSettings(gpu="A100") + } +) +def my_gpu_pipeline(): + # Your pipeline steps here + ... +``` + +**Advanced Configuration:** + +```python +from zenml.integrations.modal.flavors.modal_orchestrator_flavor import ( + ModalOrchestratorSettings +) +from zenml.config import ResourceSettings + +# Configure Modal-specific settings +modal_settings = ModalOrchestratorSettings( + gpu="A100", # GPU type (optional) + region="us-east-1", # Preferred region + cloud="aws", # Cloud provider + modal_environment="production", # Modal environment name + app_name="ml-training-prod", # Custom Modal app name (optional) + mode="pipeline", # "pipeline" (default) or "per_step" + max_parallelism=3, # Max concurrent steps (per_step mode) + timeout=3600, # 1 hour timeout + synchronous=True, # Wait for completion +) + +# Configure hardware resources (quantities) +resource_settings = ResourceSettings( + cpu_count=16, # Number of CPU cores + memory="32GB", # 32GB RAM + gpu_count=1 # Number of GPUs +) + +@pipeline( + settings={ + "orchestrator": modal_settings, + "resources": resource_settings + } +) +def my_modal_pipeline(): + # Your pipeline steps here + ... +``` + +### Resource configuration + +{% hint style="info" %} +**Resource Configuration by Execution Mode**: + +**Pipeline Mode**: All steps share the same resources configured at the pipeline level. Configure resources at the `@pipeline` level for best results. + +**Per-Step Mode**: Each step can have its own resource configuration! You can mix different GPUs, CPU, and memory settings across steps. Pipeline-level settings serve as defaults that individual steps can override. + +**Resource Fallback Behavior**: If no pipeline-level resource settings are provided, the orchestrator will automatically use the highest resource requirements found across all steps in the pipeline. +{% endhint %} + +You can configure pipeline-wide resource requirements using `ResourceSettings` for hardware resources and `ModalOrchestratorSettings` for Modal-specific configurations: + +```python +from zenml.config import ResourceSettings +from zenml.integrations.modal.flavors.modal_orchestrator_flavor import ( + ModalOrchestratorSettings +) + +# Configure resources at the pipeline level (recommended) +@pipeline( + settings={ + "resources": ResourceSettings( + cpu_count=16, + memory="32GB", + gpu_count=1 # These resources apply to the entire pipeline + ), + "orchestrator": ModalOrchestratorSettings( + gpu="A100", # GPU type for the entire pipeline + region="us-west-2" + ) + } +) +def my_pipeline(): + first_step() # Runs with pipeline resources: 16 CPU, 32GB RAM, 1x A100 + second_step() # Runs with same resources: 16 CPU, 32GB RAM, 1x A100 + ... + +@step +def first_step(): + # Uses pipeline-level resource configuration + ... + +@step +def second_step(): + # Uses same pipeline-level resource configuration + ... +``` + +### Execution Modes + +The Modal orchestrator supports two execution modes: + +#### Pipeline Mode (Default - Recommended) + +```python +modal_settings = ModalOrchestratorSettings( + mode="pipeline", # Execute entire pipeline in one sandbox + gpu="A100" +) +``` + +**Benefits:** +- **Fastest execution**: Entire pipeline runs in single sandbox +- **Cost-effective**: Minimal overhead and resource usage +- **Simple**: All steps share same environment and resources + +**Best for:** Most ML pipelines, production workloads, cost optimization + +#### Per-Step Mode (Advanced) + +```python +modal_settings = ModalOrchestratorSettings( + mode="per_step", # Execute each step in separate sandbox + max_parallelism=3, # Run up to 3 steps concurrently + gpu="T4" # Default GPU for steps (can be overridden per step) +) +``` + +**Benefits:** +- **Granular control**: Each step runs in isolated sandbox with its own resources +- **Parallel execution**: Steps can run concurrently based on dependencies +- **Step-specific resources**: Each step can have different CPU, memory, GPU configurations +- **Resource optimization**: Use expensive GPUs only for steps that need them + +**Best for:** Complex pipelines with varying resource needs, debugging individual steps, cost optimization + +**Per-Step Resource Configuration:** +In per-step mode, you can configure different resources for each step, enabling powerful resource optimization: + +```python +@pipeline( + settings={ + "orchestrator": ModalOrchestratorSettings( + mode="per_step", + max_parallelism=2, + gpu="T4" # Default GPU for steps + ) + } +) +def mixed_resource_pipeline(): + # Light preprocessing - no GPU needed + preprocess_data() + + # Heavy training - needs A100 GPU + train_model() + + # Evaluation - T4 GPU sufficient + evaluate_model() + +@step( + settings={ + "resources": ResourceSettings(cpu_count=2, memory="4GB") # CPU-only step + } +) +def preprocess_data(): + # Light CPU work - no GPU, saves costs + pass + +@step( + settings={ + "orchestrator": ModalOrchestratorSettings(gpu="A100"), # Override to A100 + "resources": ResourceSettings(gpu_count=1, memory="32GB") + } +) +def train_model(): + # Heavy training with A100 GPU and 32GB RAM + pass + +@step( + settings={ + "resources": ResourceSettings(gpu_count=1, memory="16GB") # Uses pipeline default T4 + } +) +def evaluate_model(): + # Evaluation with T4 GPU and 16GB RAM + pass +``` + +**Key Benefits of Per-Step Resource Configuration:** +- **Cost optimization**: Use expensive GPUs (A100, H100) only for steps that need them +- **Resource efficiency**: Match CPU/memory to actual step requirements +- **Parallel execution**: Steps with different resources can run concurrently +- **Flexibility**: Each step gets exactly the resources it needs + +{% hint style="info" %} +**Docker Configuration Validation** + +The Modal orchestrator validates Docker configurations based on execution mode: + +- **PIPELINE mode**: Per-step Docker settings are not allowed since all steps run in the same sandbox. If you have step-specific Docker configurations, you'll get an error suggesting to either use PER_STEP mode or remove the step-specific settings. + +- **PER_STEP mode**: Per-step Docker settings are fully supported and each step can have its own Docker configuration. +{% endhint %} + +### Sandbox Architecture + +The Modal orchestrator uses a simplified sandbox-based architecture: + +- **Persistent apps per pipeline**: Each pipeline gets its own Modal app that stays alive +- **Dynamic sandboxes for execution**: Each pipeline run creates fresh sandboxes for complete isolation +- **Built-in output streaming**: Modal automatically handles log streaming and output capture +- **Maximum flexibility**: Sandboxes can execute arbitrary commands and provide better isolation + +This architecture provides optimal benefits: +- **Simplicity**: No complex app deployment or time window management +- **Flexibility**: Sandboxes offer more dynamic execution capabilities +- **Isolation**: Each run gets a completely fresh execution environment +- **Performance**: Persistent apps eliminate deployment overhead + +### Base image requirements + +{% hint style="info" %} +**Docker Image Customization** + +ZenML will automatically build a Docker image that includes your code and dependencies, then use it to run your pipeline on Modal. The base image and dependencies you configure will determine what's available in your Modal execution environment. + +Key considerations: +- **Base image**: Choose an appropriate base image for your workload (e.g., `python:3.9-slim`, `ubuntu:20.04`, or specialized ML images) +- **Dependencies**: Ensure all required packages are specified in your `requirements.txt` or Docker settings +- **System packages**: If you need system-level packages, configure them in your Docker settings +- **Environment variables**: Configure any necessary environment variables in your ZenML pipeline or Docker settings + +Check out the [ZenML Docker customization guide](https://docs.zenml.io/how-to/customize-docker-builds) for detailed information on customizing your execution environment. +{% endhint %} + +### Using GPUs + +Modal makes it easy to use GPUs for your ML workloads. Use `ResourceSettings` to specify the number of GPUs and `ModalOrchestratorSettings` to specify the GPU type: + +{% hint style="warning" %} +**Base Image Requirements for GPU Usage** + +When using GPUs, ensure your base Docker image includes the appropriate CUDA runtime and drivers. Modal's GPU instances come with CUDA pre-installed, but your application dependencies (like PyTorch, TensorFlow) must be compatible with the CUDA version. + +For optimal GPU performance: +- Use CUDA-compatible base images (e.g., `nvidia/cuda:11.8-runtime-ubuntu20.04`) +- Install GPU-compatible versions of ML frameworks in your Docker requirements +- Test your GPU setup locally before deploying to Modal + +ZenML will use your base image configuration from the container registry, so ensure GPU compatibility is built into your image. +{% endhint %} + +```python +from zenml.config import ResourceSettings +from zenml.integrations.modal.flavors.modal_orchestrator_flavor import ( + ModalOrchestratorSettings +) + +@step( + settings={ + "resources": ResourceSettings( + gpu_count=1 # Number of GPUs to allocate + ), + "orchestrator": ModalOrchestratorSettings( + gpu="A100", # GPU type: "T4", "A10G", "A100", "H100" + region="us-east-1" + ) + } +) +def train_model(): + # Your GPU-accelerated training code + # Modal will provision 1x A100 GPU (gpu_count=1 + gpu="A100") + import torch + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + print(f"Using device: {device}") + ... +``` + +Available GPU types include: +- `T4` - Cost-effective for inference and light training +- `A10G` - Balanced performance for training and inference +- `A100` - High-performance for large model training +- `H100` - Latest generation for maximum performance + +**Examples of GPU configurations:** + +```python +# Simple GPU pipeline (recommended) +@pipeline( + settings={ + "orchestrator": ModalOrchestratorSettings( + gpu="A100", + mode="pipeline" # Default: entire pipeline in one sandbox + ), + "resources": ResourceSettings(gpu_count=1) + } +) +def simple_gpu_pipeline(): + # All steps run in same sandbox with 1x A100 GPU + step_one() + step_two() + +# Per-step execution with multiple GPUs +@pipeline( + settings={ + "orchestrator": ModalOrchestratorSettings( + gpu="A100", + mode="per_step", # Each step in separate sandbox + max_parallelism=2 # Run up to 2 steps concurrently + ), + "resources": ResourceSettings(gpu_count=4) + } +) +def multi_gpu_pipeline(): + # Each step runs in separate sandbox with 4x A100 GPUs + training_step() # Sandbox 1: 4x A100 + evaluation_step() # Sandbox 2: 4x A100 (can run in parallel) +``` + +### Synchronous vs Asynchronous execution + +You can choose whether to wait for pipeline completion or run asynchronously: + +```python +# Wait for completion (default) +modal_settings = ModalOrchestratorSettings( + synchronous=True +) + +# Fire-and-forget execution +modal_settings = ModalOrchestratorSettings( + synchronous=False +) +``` + +### Authentication with different environments + +{% hint style="info" %} +**Best Practice: Separate Stacks for Different Environments** + +Consider creating separate ZenML stacks for different environments (development, staging, production), each configured with different Modal environments and workspaces. This provides better isolation and allows for different resource configurations per environment. + +For example: +- **Development stack**: Uses Modal "dev" environment with smaller resource limits +- **Production stack**: Uses Modal "production" environment with production-grade resources and credentials + +This approach helps prevent accidental deployment to production and allows for environment-specific configurations. +{% endhint %} + +For production deployments, you can specify different Modal environments: + +```python +modal_settings = ModalOrchestratorSettings( + modal_environment="production", # or "staging", "dev", etc. + workspace="my-company" +) +``` + +### How it works: Persistent Apps + Dynamic Sandboxes + +{% hint style="info" %} +**Simplified Architecture for Maximum Flexibility** + +The ZenML Modal orchestrator uses a streamlined "Persistent Apps + Dynamic Sandboxes" architecture: + +**Persistent Pipeline Apps**: +- Each pipeline gets its own persistent Modal app (e.g., `zenml-pipeline-training-pipeline`) +- Apps stay alive across multiple runs using `modal.App.lookup(create_if_missing=True)` +- No complex time windows or deployment logic - truly persistent + +**Dynamic Execution Sandboxes**: +- Each pipeline run creates a fresh Modal sandbox for complete isolation +- Sandboxes execute arbitrary commands with maximum flexibility +- Built-in output streaming via `modal.enable_output()` +- Fresh execution environment prevents any conflicts between runs + +**Execution Flow**: +- Your entire pipeline runs in a single sandbox using `PipelineEntrypoint` +- Simple app lookup or creation, then sandbox execution +- Automatic log streaming and output capture +- Complete isolation between different pipeline runs + +**Benefits**: +- **Simplicity**: No complex app deployment or reuse logic +- **Flexibility**: Sandboxes can execute any commands dynamically +- **Isolation**: Each run gets completely fresh execution context +- **Performance**: Persistent apps eliminate deployment overhead +{% endhint %} + +### Fast execution with persistent apps + +Modal orchestrator uses persistent apps to minimize startup overhead: + +```python +modal_settings = ModalOrchestratorSettings( + region="us-east-1", # Preferred region + cloud="aws", # Cloud provider + modal_environment="main", # Modal environment + timeout=3600, # 1 hour timeout +) + +@pipeline( + settings={ + "orchestrator": modal_settings + } +) +def my_pipeline(): + ... +``` + +This ensures your pipelines start executing quickly by reusing persistent apps and creating fresh sandboxes for isolation. + +### Cost Optimization + +{% hint style="warning" %} +**Understanding Modal Costs** + +**Execution Model**: +- **Pay-per-use**: You only pay when sandboxes are actively running +- **No idle costs**: Persistent apps don't incur costs when not executing +- **Resource-based pricing**: Cost depends on CPU, memory, and GPU usage + +**Execution Mode Impact**: +- **Pipeline mode**: Most cost-effective - single sandbox for entire pipeline +- **Per-step mode**: Higher overhead - separate sandbox per step, but enables parallelism + +**Cost Examples (approximate)**: +```python +# Cost-effective: Pipeline mode +# Single A100 GPU for 30-minute pipeline = ~$0.80 +ModalOrchestratorSettings(mode="pipeline", gpu="A100") + +# Higher cost: Per-step mode +# A100 GPU per step (5 steps × 6 min each) = ~$0.80 +# But steps can run in parallel, reducing total time +ModalOrchestratorSettings(mode="per_step", gpu="A100") +``` + +**Cost Optimization Strategies**: +- **Use pipeline mode** for most workloads (fastest, cheapest) +- **Right-size resources**: Don't use A100s for simple preprocessing +- **Optimize pipeline execution time** to reduce sandbox runtime +- **Choose efficient regions** close to your data sources +- **Set appropriate timeouts** to avoid runaway costs + +Monitor your Modal dashboard to track sandbox execution time and resource usage. +{% endhint %} + + +## Best practices + +1. **Start with pipeline mode**: The default `pipeline` execution mode runs your entire pipeline in one sandbox, minimizing overhead and cost. Switch to `per_step` only if you need granular control. + +2. **Separate resource and orchestrator settings**: Use `ResourceSettings` for hardware (CPU, memory, GPU count) and `ModalOrchestratorSettings` for Modal-specific configurations (GPU type, region, etc.). + +3. **Configure appropriate timeouts**: Set realistic timeouts for your workloads: + ```python + modal_settings = ModalOrchestratorSettings( + timeout=7200, # 2 hours + mode="pipeline" # Recommended for most cases + ) + ``` + +4. **Choose execution mode based on needs**: + - **Pipeline mode**: For production, cost optimization, simple workflows + - **Per-step mode**: For debugging, heterogeneous resources, or parallel execution + +5. **Use appropriate GPU types**: Match GPU types to your workload requirements: + - `T4`: Inference, light training, cost-sensitive workloads + - `A100`: Large model training, high-performance computing + - `H100`: Latest generation, maximum performance + +6. **Optimize for your execution mode**: + - **Pipeline mode**: Optimize total pipeline runtime + - **Per-step mode**: Set appropriate `max_parallelism` (typically 2-4) + +7. **Monitor resource usage**: Use Modal's dashboard to track your resource consumption and optimize accordingly. + +8. **Environment separation**: Use separate Modal environments (`dev`, `staging`, `prod`) for different deployment stages. + +## Troubleshooting + +### Common issues + +1. **Authentication errors**: + ```bash + # Verify Modal setup + modal auth show + + # Re-authenticate if needed + modal setup + ``` + +2. **Image build failures**: + - Check Docker registry credentials in your ZenML stack + - Verify your Docker daemon is running + - Ensure base image compatibility with Modal's environment + +3. **Resource allocation errors**: + ``` + Error: No capacity for requested GPU type + ``` + **Solution**: Try different regions or GPU types, or reduce `max_parallelism` in per-step mode + +4. **Pipeline timeouts**: + ```python + # Increase timeout for long-running pipelines + ModalOrchestratorSettings(timeout=14400) # 4 hours + ``` + +5. **Per-step mode issues**: + - **Too many concurrent steps**: Reduce `max_parallelism` + - **Resource conflicts**: Ensure adequate quota for parallel execution + - **Step dependencies**: Verify your pipeline DAG allows for parallelism + +### Performance troubleshooting + +**Slow execution in per-step mode**: +- Reduce `max_parallelism` to avoid resource contention +- Consider switching to `pipeline` mode for better performance +- Check Modal dashboard for sandbox startup times + +**Memory issues**: +- Increase memory allocation in `ResourceSettings` +- For pipeline mode: ensure total memory covers all steps +- For per-step mode: configure per-step memory requirements + +### Getting help + +- Check the [Modal documentation](https://modal.com/docs) for platform-specific issues +- Monitor your sandboxes in the [Modal dashboard](https://modal.com/apps) +- Use `zenml logs` to view detailed pipeline execution logs +- Check ZenML step operator docs for [hybrid workflows](../step-operators/modal.md) + +For more information and a full list of configurable attributes of the Modal orchestrator, check out the [SDK Docs](https://sdkdocs.zenml.io/latest/integration_code_docs/integrations-modal.html#zenml.integrations.modal.orchestrators). \ No newline at end of file diff --git a/docs/book/component-guide/step-operators/modal.md b/docs/book/component-guide/step-operators/modal.md index 83c0ec99035..dc832474b3b 100644 --- a/docs/book/component-guide/step-operators/modal.md +++ b/docs/book/component-guide/step-operators/modal.md @@ -8,11 +8,43 @@ description: Executing individual steps in Modal. ### When to use it +{% hint style="info" %} +**Modal Step Operator vs Orchestrator** + +ZenML offers both a [Modal step operator](modal.md) and a [Modal orchestrator](../orchestrators/modal.md). Choose based on your needs: + +| Feature | Modal Step Operator | Modal Orchestrator | +|---------|-------------------|-------------------| +| **Execution Scope** | Individual steps only | Entire pipeline | +| **Orchestration** | Local ZenML | Remote Modal | +| **Resource Flexibility** | Per-step resources | Pipeline-wide resources | +| **Cost Model** | Pay per step execution | Pay per pipeline execution | +| **Setup Complexity** | Simple | Requires remote ZenML | +| **Best For** | Hybrid workflows, selective GPU usage | Full cloud execution, production | + +**Quick Decision Guide**: +- **Use Step Operator**: Need GPUs for only some steps, have local data dependencies, want hybrid local/cloud workflow +- **Use Orchestrator**: Want full cloud execution, production deployment, consistent resource requirements +{% endhint %} + You should use the Modal step operator if: -* You need fast execution time for steps that require computing resources (CPU, GPU, memory). -* You want to easily specify the exact hardware requirements (e.g., GPU type, CPU count, memory) for each step. -* You have access to Modal. +* You want to run only specific compute-intensive steps (like training or inference) on Modal while keeping other steps local. +* You need different hardware requirements for different steps in your pipeline. +* You want to leverage Modal's fast execution and GPU access for select steps without moving your entire pipeline to the cloud. +* You have a hybrid workflow where some steps need to access local resources or data. + +### When NOT to use it + +The Modal step operator may not be the best choice if: + +* **You want to run entire pipelines on Modal**: Use the [Modal orchestrator](../orchestrators/modal.md) instead for complete pipeline execution with better cost efficiency and reduced overhead. + +* **You have simple, lightweight steps**: For steps that don't require significant compute resources, the overhead of running them on Modal may not be worth it. + +* **You need very low latency**: The step operator introduces some overhead for individual step execution compared to running steps locally. + +* **You have tight data locality requirements**: If your steps need to access large amounts of local data, transferring it to Modal for each step execution may be inefficient. ### How to deploy it @@ -23,7 +55,29 @@ To use the Modal step operator: ### How to use it -To use the Modal step operator, we need: +#### Quick Start (5 minutes) + +```bash +# 1. Install Modal integration +zenml integration install modal + +# 2. Setup Modal authentication +modal setup + +# 3. Register step operator +zenml step-operator register modal_step --flavor=modal +zenml stack update -s modal_step + +# 4. Use in your code +@step(step_operator="modal_step") +def train_model(): + # This step runs on Modal + pass +``` + +#### Full Setup Requirements + +To use the Modal step operator, you need: * The ZenML `modal` integration installed. If you haven't done so, run @@ -39,11 +93,29 @@ To use the Modal step operator, we need: We can then register the step operator: +**Option 1: Using Modal CLI authentication (recommended for development)** + ```shell +# Register the step operator (uses Modal CLI credentials) zenml step-operator register --flavor=modal zenml stack update -s ... ``` +**Option 2: Using Modal API token (recommended for production)** + +```shell +# Register the step operator with explicit credentials +zenml step-operator register \ + --flavor=modal \ + --token-id= \ + --token-secret= \ + --workspace= \ + --modal_environment= +zenml stack update -s ... +``` + +You can get your Modal token from the [Modal dashboard](https://modal.com/settings/tokens). + Once you added the step operator to your active stack, you can use it to execute individual steps of your pipeline by specifying it in the `@step` decorator as follows: ```python @@ -62,6 +134,24 @@ ZenML will build a Docker image which includes your code and use it to run your #### Additional configuration +The Modal step operator uses two types of settings following ZenML's standard pattern: + +1. **`ResourceSettings`** (standard ZenML) - for hardware resource quantities: + - `cpu_count` - Number of CPU cores + - `memory` - Memory allocation (e.g., "16GB") + - `gpu_count` - Number of GPUs to allocate + +2. **`ModalStepOperatorSettings`** (Modal-specific) - for Modal platform configuration: + - `gpu` - GPU type specification (e.g., "T4", "A100", "H100") + - `region` - Cloud region preference + - `cloud` - Cloud provider selection + - `modal_environment` - Modal environment name + - `timeout` - Maximum execution time in seconds + +{% hint style="info" %} +**GPU Configuration**: Use `ResourceSettings.gpu_count` to specify how many GPUs you need, and `ModalStepOperatorSettings.gpu` to specify what type of GPU. Modal will combine these automatically (e.g., `gpu_count=2` + `gpu="A100"` becomes `"A100:2"`). +{% endhint %} + You can specify the hardware requirements for each step using the `ResourceSettings` class as described in our documentation on [resource settings](https://docs.zenml.io/user-guides/tutorial/distributed-training): @@ -69,10 +159,20 @@ You can specify the hardware requirements for each step using the from zenml.config import ResourceSettings from zenml.integrations.modal.flavors import ModalStepOperatorSettings -modal_settings = ModalStepOperatorSettings(gpu="A100") +# Configure Modal-specific settings +modal_settings = ModalStepOperatorSettings( + gpu="A100", # GPU type (optional) + region="us-east-1", # Preferred region + cloud="aws", # Cloud provider + modal_environment="production", # Modal environment + timeout=3600, # 1 hour timeout +) + +# Configure hardware resources (quantities) resource_settings = ResourceSettings( - cpu=2, - memory="32GB" + cpu_count=2, # Number of CPU cores + memory="32GB", # 32GB RAM + gpu_count=1 # Number of GPUs (combined with gpu type above) ) @step( @@ -83,11 +183,12 @@ resource_settings = ResourceSettings( } ) def my_modal_step(): + # This step will run on Modal with 1x A100 GPU, 2 CPU cores, and 32GB RAM ... ``` {% hint style="info" %} -Note that the `cpu` parameter in `ResourceSettings` currently only accepts a single integer value. This specifies a soft minimum limit - Modal will guarantee at least this many physical cores, but the actual usage could be higher. The CPU cores/hour will also determine the minimum price paid for the compute resources. +Note that the `cpu_count` parameter in `ResourceSettings` specifies a soft minimum limit - Modal will guarantee at least this many physical cores, but the actual usage could be higher. The CPU cores/hour will also determine the minimum price paid for the compute resources. For example, with the configuration above (2 CPUs and 32GB memory), the minimum cost would be approximately $1.03 per hour ((0.135 * 2) + (0.024 * 32) = $1.03). {% endhint %} @@ -100,6 +201,45 @@ full list of supported GPU types and the [SDK docs](https://sdkdocs.zenml.io/latest/integration\_code\_docs/integrations-modal/#zenml.integrations.modal.flavors.modal\_step\_operator\_flavor.ModalStepOperatorSettings) for more details on the available settings. +### Authentication with different environments + +{% hint style="info" %} +**Best Practice: Separate Step Operators for Different Environments** + +Consider creating separate ZenML step operators for different environments (development, staging, production), each configured with different Modal environments and workspaces. This provides better isolation and allows for different resource configurations per environment. + +For example: +- **Development step operator**: Uses Modal "dev" environment with smaller resource limits +- **Production step operator**: Uses Modal "production" environment with production-grade resources and credentials + +This approach helps prevent accidental deployment to production and allows for environment-specific configurations. +{% endhint %} + +For production deployments, you can specify different Modal environments and workspaces: + +```python +modal_settings = ModalStepOperatorSettings( + modal_environment="production", # or "staging", "dev", etc. + workspace="my-company", # Modal workspace name + gpu="A100", + region="us-east-1", + cloud="aws" +) +``` + +Or configure them when registering the step operator: + +```shell +zenml step-operator register modal_prod \ + --flavor=modal \ + --token-id= \ + --token-secret= \ + --workspace="production-workspace" \ + --modal_environment="production" +``` + +### Resource configuration notes + The settings do allow you to specify the region and cloud provider, but these settings are only available for Modal Enterprise and Team plan customers. Moreover, certain combinations of settings are not available. It is suggested to @@ -109,6 +249,70 @@ detailed error messages that can help identify what is incompatible. See more in the [Modal docs on region selection](https://modal.com/docs/guide/region-selection) for more details. +### Available GPU types + +Modal supports various GPU types for different workloads: + +- `T4` - Cost-effective for inference and light training +- `A10G` - Balanced performance for training and inference +- `A100` - High-performance for large model training +- `H100` - Latest generation for maximum performance + +**Examples of GPU configurations:** + +```python +# Single GPU step +@step( + settings={ + "step_operator": ModalStepOperatorSettings(gpu="A100"), + "resources": ResourceSettings(gpu_count=1) + } +) +def train_model(): + # Uses 1x A100 GPU + ... + +# Multiple GPU step +@step( + settings={ + "step_operator": ModalStepOperatorSettings(gpu="A100"), + "resources": ResourceSettings(gpu_count=4) + } +) +def distributed_training(): + # Uses 4x A100 GPUs + ... +``` + +### Base image requirements + +{% hint style="info" %} +**Docker Image Customization** + +ZenML will automatically build a Docker image that includes your code and dependencies, then use it to run your steps on Modal. The base image and dependencies you configure will determine what's available in your Modal execution environment. + +Key considerations: +- **Base image**: Choose an appropriate base image for your workload (e.g., `python:3.9-slim`, `ubuntu:20.04`, or specialized ML images) +- **Dependencies**: Ensure all required packages are specified in your `requirements.txt` or Docker settings +- **System packages**: If you need system-level packages, configure them in your Docker settings +- **Environment variables**: Configure any necessary environment variables in your ZenML step or Docker settings + +Check out the [ZenML Docker customization guide](https://docs.zenml.io/how-to/customize-docker-builds) for detailed information on customizing your execution environment. +{% endhint %} + +{% hint style="warning" %} +**Base Image Requirements for GPU Usage** + +When using GPUs, ensure your base Docker image includes the appropriate CUDA runtime and drivers. Modal's GPU instances come with CUDA pre-installed, but your application dependencies (like PyTorch, TensorFlow) must be compatible with the CUDA version. + +For optimal GPU performance: +- Use CUDA-compatible base images (e.g., `nvidia/cuda:11.8-runtime-ubuntu20.04`) +- Install GPU-compatible versions of ML frameworks in your Docker requirements +- Test your GPU setup locally before deploying to Modal + +ZenML will use your base image configuration from the container registry, so ensure GPU compatibility is built into your image. +{% endhint %} +
ZenML Scarf
diff --git a/docs/book/component-guide/toc.md b/docs/book/component-guide/toc.md index 03359348d01..1d3a424c2de 100644 --- a/docs/book/component-guide/toc.md +++ b/docs/book/component-guide/toc.md @@ -19,6 +19,7 @@ * [Skypilot VM Orchestrator](orchestrators/skypilot-vm.md) * [HyperAI Orchestrator](orchestrators/hyperai.md) * [Lightning AI Orchestrator](orchestrators/lightning.md) + * [Modal Orchestrator](orchestrators/modal.md) * [Develop a custom orchestrator](orchestrators/custom.md) * [Deployers](deployers/README.md) * [Docker Deployer](deployers/docker.md) diff --git a/src/zenml/integrations/modal/__init__.py b/src/zenml/integrations/modal/__init__.py index 081628cb035..fe36ae7e6f3 100644 --- a/src/zenml/integrations/modal/__init__.py +++ b/src/zenml/integrations/modal/__init__.py @@ -11,10 +11,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express # or implied. See the License for the specific language governing # permissions and limitations under the License. -"""Modal integration for cloud-native step execution. +"""Modal integration for cloud-native step execution and orchestration. -The Modal integration sub-module provides a step operator flavor that allows -executing steps on Modal's cloud infrastructure. +The Modal integration sub-module provides a step operator flavor and an orchestrator +flavor that allow executing steps and complete pipelines on Modal's cloud infrastructure. """ from typing import List, Type @@ -22,6 +22,7 @@ from zenml.integrations.integration import Integration from zenml.stack import Flavor +MODAL_ORCHESTRATOR_FLAVOR = "modal" MODAL_STEP_OPERATOR_FLAVOR = "modal" @@ -29,7 +30,7 @@ class ModalIntegration(Integration): """Definition of Modal integration for ZenML.""" NAME = MODAL - REQUIREMENTS = ["modal>=0.64.49,<1"] + REQUIREMENTS = ["modal>=1"] @classmethod def flavors(cls) -> List[Type[Flavor]]: @@ -38,8 +39,13 @@ def flavors(cls) -> List[Type[Flavor]]: Returns: List of new stack component flavors. """ - from zenml.integrations.modal.flavors import ModalStepOperatorFlavor + from zenml.integrations.modal.flavors import ( + ModalOrchestratorFlavor, + ModalStepOperatorFlavor, + ) + + return [ModalOrchestratorFlavor, ModalStepOperatorFlavor] + - return [ModalStepOperatorFlavor] diff --git a/src/zenml/integrations/modal/flavors/__init__.py b/src/zenml/integrations/modal/flavors/__init__.py index 472e0bcb4f8..aee910296e1 100644 --- a/src/zenml/integrations/modal/flavors/__init__.py +++ b/src/zenml/integrations/modal/flavors/__init__.py @@ -13,6 +13,11 @@ # permissions and limitations under the License. """Modal integration flavors.""" +from zenml.integrations.modal.flavors.modal_orchestrator_flavor import ( + ModalOrchestratorConfig, + ModalOrchestratorFlavor, + ModalOrchestratorSettings, +) from zenml.integrations.modal.flavors.modal_step_operator_flavor import ( ModalStepOperatorConfig, ModalStepOperatorFlavor, @@ -20,6 +25,9 @@ ) __all__ = [ + "ModalOrchestratorConfig", + "ModalOrchestratorFlavor", + "ModalOrchestratorSettings", "ModalStepOperatorConfig", "ModalStepOperatorFlavor", "ModalStepOperatorSettings", diff --git a/src/zenml/integrations/modal/flavors/modal_orchestrator_flavor.py b/src/zenml/integrations/modal/flavors/modal_orchestrator_flavor.py new file mode 100644 index 00000000000..246f6046d89 --- /dev/null +++ b/src/zenml/integrations/modal/flavors/modal_orchestrator_flavor.py @@ -0,0 +1,231 @@ +# Copyright (c) ZenML GmbH 2025. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Modal orchestrator flavor.""" + +from enum import Enum +from typing import TYPE_CHECKING, Optional, Type + +from pydantic import Field + +from zenml.config.base_settings import BaseSettings +from zenml.orchestrators import BaseOrchestratorConfig, BaseOrchestratorFlavor +from zenml.utils.secret_utils import SecretField + +if TYPE_CHECKING: + from zenml.integrations.modal.orchestrators import ModalOrchestrator + +from zenml.integrations.modal import MODAL_ORCHESTRATOR_FLAVOR + + +class ModalExecutionMode(str, Enum): + """Execution modes for the Modal orchestrator. + + Attributes: + PIPELINE: Execute entire pipeline in one Modal sandbox (fastest, default). + PER_STEP: Execute each step in a separate Modal sandbox (granular control, + better for debugging, allows step-specific resources). + """ + + PIPELINE = "pipeline" + PER_STEP = "per_step" + + +class ModalOrchestratorSettings(BaseSettings): + """Modal orchestrator settings. + + Attributes: + gpu: The type of GPU to use for the pipeline execution (e.g., "T4", + "A100"). Use ResourceSettings.gpu_count to specify the number of GPUs. + region: The region to use for the pipeline execution. + cloud: The cloud provider to use for the pipeline execution. + modal_environment: The Modal environment to use for the pipeline execution. + app_name: Custom name for the Modal app (defaults to pipeline name). + timeout: Maximum execution time in seconds (default 24h). + mode: Execution mode controlling sandbox allocation. PIPELINE mode runs the + entire pipeline in a single Modal sandbox (fastest, shared resources). + PER_STEP mode runs each step in its own sandbox (granular control, + step-specific resources, better for debugging and resource isolation). + max_parallelism: Maximum number of parallel sandboxes (for PER_STEP mode). + synchronous: Wait for completion (True) or fire-and-forget (False). + """ + + gpu: Optional[str] = Field( + None, + description="GPU type for pipeline execution. Must be a valid Modal GPU type. " + "Examples: 'T4' (cost-effective), 'A100' (high-performance), 'V100' (training workloads). " + "Use ResourceSettings.gpu_count to specify number of GPUs. If not specified, uses CPU-only execution", + ) + region: Optional[str] = Field( + None, + description="Cloud region for pipeline execution. Must be a valid region for the selected cloud provider. " + "Examples: 'us-east-1', 'us-west-2', 'eu-west-1'. If not specified, Modal uses default region " + "based on cloud provider and availability", + ) + cloud: Optional[str] = Field( + None, + description="Cloud provider for pipeline execution. Must be a valid Modal-supported cloud provider. " + "Examples: 'aws', 'gcp'. If not specified, Modal uses default cloud provider " + "based on workspace configuration", + ) + modal_environment: Optional[str] = Field( + None, + description="Modal environment name for pipeline execution. Must be a valid environment " + "configured in your Modal workspace. Examples: 'main', 'staging', 'production'. " + "If not specified, uses the default environment for the workspace", + ) + app_name: Optional[str] = Field( + None, + description="Specifies custom name for the Modal app used for pipeline execution. " + "Must be a valid Modal app name containing only alphanumeric characters, " + "hyphens, and underscores. Examples: 'ml-training-app', 'data_pipeline_prod', " + "'zenml-experiments'. If not provided, defaults to 'zenml-pipeline-{pipeline_name}'", + ) + timeout: int = Field( + 86400, + description="Maximum execution time in seconds for pipeline completion. Must be between 1 and 86400 seconds. " + "Examples: 3600 (1 hour), 7200 (2 hours), 86400 (24 hours maximum). " + "Pipeline execution will be terminated if it exceeds this timeout", + ) + mode: ModalExecutionMode = Field( + ModalExecutionMode.PIPELINE, + description="Execution mode controlling sandbox allocation strategy. PIPELINE mode runs entire pipeline " + "in single Modal sandbox for fastest execution with shared resources. PER_STEP mode runs each step " + "in separate sandbox for granular control and resource isolation. Examples: 'pipeline', 'per_step'", + ) + max_parallelism: Optional[int] = Field( + None, + description="Maximum number of parallel sandboxes for PER_STEP execution mode. Must be positive integer. " + "Examples: 5 (up to 5 parallel steps), 10 (higher parallelism). Only applies when mode='per_step'. " + "If not specified, Modal determines optimal parallelism based on pipeline structure", + ) + synchronous: bool = Field( + True, + description="Controls whether pipeline execution blocks the client until completion. If True, " + "client waits for all steps to finish before returning. If False, returns immediately " + "and executes asynchronously. Useful for long-running production pipelines", + ) + + +class ModalOrchestratorConfig( + BaseOrchestratorConfig, ModalOrchestratorSettings +): + """Modal orchestrator config. + + Attributes: + token_id: Modal API token ID (ak-xxxxx format) for authentication. + token_secret: Modal API token secret (as-xxxxx format) for authentication. + workspace: Modal workspace name (optional). + + Note: If token_id and token_secret are not provided, falls back to + Modal's default authentication (~/.modal.toml). + All other configuration options (modal_environment, gpu, region, etc.) + are inherited from ModalOrchestratorSettings. + """ + + token_id: Optional[str] = SecretField( + default=None, + description="Modal API token ID for authentication. Must be in format 'ak-xxxxx' as provided by Modal. " + "Example: 'ak-1234567890abcdef'. If not provided, falls back to Modal's default authentication " + "from ~/.modal.toml file. Required for programmatic access to Modal API", + ) + token_secret: Optional[str] = SecretField( + default=None, + description="Modal API token secret for authentication. Must be in format 'as-xxxxx' as provided by Modal. " + "Example: 'as-abcdef1234567890'. Used together with token_id for API authentication. " + "If not provided, falls back to Modal's default authentication from ~/.modal.toml file", + ) + workspace: Optional[str] = Field( + None, + description="Modal workspace name for pipeline execution. Must be a valid workspace name " + "you have access to. Examples: 'my-company', 'ml-team', 'personal-workspace'. " + "If not specified, uses the default workspace from Modal configuration", + ) + + @property + def is_remote(self) -> bool: + """Checks if this stack component is running remotely. + + Returns: + True since Modal runs remotely. + """ + return True + + @property + def is_synchronous(self) -> bool: + """Whether the orchestrator runs synchronous or not. + + Returns: + Whether the orchestrator runs synchronous or not. + """ + return self.synchronous + + +class ModalOrchestratorFlavor(BaseOrchestratorFlavor): + """Flavor for the Modal orchestrator.""" + + @property + def name(self) -> str: + """Name of the orchestrator flavor. + + Returns: + Name of the orchestrator flavor. + """ + return MODAL_ORCHESTRATOR_FLAVOR + + @property + def docs_url(self) -> Optional[str]: + """A url to point at docs explaining this flavor. + + Returns: + A flavor docs url. + """ + return self.generate_default_docs_url() + + @property + def sdk_docs_url(self) -> Optional[str]: + """A url to point at SDK docs explaining this flavor. + + Returns: + A flavor SDK docs url. + """ + return self.generate_default_sdk_docs_url() + + @property + def logo_url(self) -> str: + """A url to represent the flavor in the dashboard. + + Returns: + The flavor logo. + """ + return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/orchestrator/modal.png" + + @property + def config_class(self) -> Type[ModalOrchestratorConfig]: + """Config class for the Modal orchestrator flavor. + + Returns: + The config class. + """ + return ModalOrchestratorConfig + + @property + def implementation_class(self) -> Type["ModalOrchestrator"]: + """Implementation class for this flavor. + + Returns: + Implementation class for this flavor. + """ + from zenml.integrations.modal.orchestrators import ModalOrchestrator + + return ModalOrchestrator diff --git a/src/zenml/integrations/modal/flavors/modal_step_operator_flavor.py b/src/zenml/integrations/modal/flavors/modal_step_operator_flavor.py index a66580faa54..4585460e34f 100644 --- a/src/zenml/integrations/modal/flavors/modal_step_operator_flavor.py +++ b/src/zenml/integrations/modal/flavors/modal_step_operator_flavor.py @@ -15,9 +15,12 @@ from typing import TYPE_CHECKING, Optional, Type +from pydantic import Field + from zenml.config.base_settings import BaseSettings from zenml.integrations.modal import MODAL_STEP_OPERATOR_FLAVOR from zenml.step_operators import BaseStepOperatorConfig, BaseStepOperatorFlavor +from zenml.utils.secret_utils import SecretField if TYPE_CHECKING: from zenml.integrations.modal.step_operators import ModalStepOperator @@ -36,20 +39,80 @@ class ModalStepOperatorSettings(BaseSettings): incompatible. See more in the Modal docs at https://modal.com/docs/guide/region-selection. Attributes: - gpu: The type of GPU to use for the step execution. + gpu: The type of GPU to use for the step execution (e.g., "T4", "A100"). + Use ResourceSettings.gpu_count to specify the number of GPUs. region: The region to use for the step execution. cloud: The cloud provider to use for the step execution. + modal_environment: The Modal environment to use for the step execution. + timeout: Maximum execution time in seconds (default 24h). """ - gpu: Optional[str] = None - region: Optional[str] = None - cloud: Optional[str] = None + gpu: Optional[str] = Field( + None, + description="GPU type for step execution. Must be a valid Modal GPU type. " + "Examples: 'T4' (cost-effective), 'A100' (high-performance), 'V100' (training workloads). " + "Use ResourceSettings.gpu_count to specify number of GPUs. If not specified, uses CPU-only execution", + ) + region: Optional[str] = Field( + None, + description="Cloud region for step execution. Must be a valid region for the selected cloud provider. " + "Examples: 'us-east-1', 'us-west-2', 'eu-west-1'. If not specified, Modal uses default region " + "based on cloud provider and availability", + ) + cloud: Optional[str] = Field( + None, + description="Cloud provider for step execution. Must be a valid Modal-supported cloud provider. " + "Examples: 'aws', 'gcp'. If not specified, Modal uses default cloud provider " + "based on workspace configuration", + ) + modal_environment: Optional[str] = Field( + None, + description="Modal environment name for step execution. Must be a valid environment " + "configured in your Modal workspace. Examples: 'main', 'staging', 'production'. " + "If not specified, uses the default environment for the workspace", + ) + timeout: int = Field( + 86400, + description="Maximum execution time in seconds for step completion. Must be between 1 and 86400 seconds. " + "Examples: 3600 (1 hour), 7200 (2 hours), 86400 (24 hours maximum). " + "Step execution will be terminated if it exceeds this timeout", + ) class ModalStepOperatorConfig( BaseStepOperatorConfig, ModalStepOperatorSettings ): - """Configuration for the Modal step operator.""" + """Configuration for the Modal step operator. + + Attributes: + token_id: Modal API token ID (ak-xxxxx format) for authentication. + token_secret: Modal API token secret (as-xxxxx format) for authentication. + workspace: Modal workspace name (optional). + + Note: If token_id and token_secret are not provided, falls back to + Modal's default authentication (~/.modal.toml). + All other configuration options (modal_environment, gpu, region, etc.) + are inherited from ModalStepOperatorSettings. + """ + + token_id: Optional[str] = SecretField( + default=None, + description="Modal API token ID for authentication. Must be in format 'ak-xxxxx' as provided by Modal. " + "Example: 'ak-1234567890abcdef'. If not provided, falls back to Modal's default authentication " + "from ~/.modal.toml file. Required for programmatic access to Modal API", + ) + token_secret: Optional[str] = SecretField( + default=None, + description="Modal API token secret for authentication. Must be in format 'as-xxxxx' as provided by Modal. " + "Example: 'as-abcdef1234567890'. Used together with token_id for API authentication. " + "If not provided, falls back to Modal's default authentication from ~/.modal.toml file", + ) + workspace: Optional[str] = Field( + None, + description="Modal workspace name for step execution. Must be a valid workspace name " + "you have access to. Examples: 'my-company', 'ml-team', 'personal-workspace'. " + "If not specified, uses the default workspace from Modal configuration", + ) @property def is_remote(self) -> bool: diff --git a/src/zenml/integrations/modal/orchestrators/__init__.py b/src/zenml/integrations/modal/orchestrators/__init__.py new file mode 100644 index 00000000000..a83f67c36ca --- /dev/null +++ b/src/zenml/integrations/modal/orchestrators/__init__.py @@ -0,0 +1,20 @@ +# Copyright (c) ZenML GmbH 2025. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Modal orchestrator implementation.""" + +from zenml.integrations.modal.orchestrators.modal_orchestrator import ( + ModalOrchestrator, +) + +__all__ = ["ModalOrchestrator"] \ No newline at end of file diff --git a/src/zenml/integrations/modal/orchestrators/modal_orchestrator.py b/src/zenml/integrations/modal/orchestrators/modal_orchestrator.py new file mode 100644 index 00000000000..9fd9175a294 --- /dev/null +++ b/src/zenml/integrations/modal/orchestrators/modal_orchestrator.py @@ -0,0 +1,244 @@ +# Copyright (c) ZenML GmbH 2025. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Implementation of a Modal orchestrator.""" + +import asyncio +import os +from typing import ( + TYPE_CHECKING, + Dict, + List, + Optional, + Type, + cast, +) + +from zenml.config.base_settings import BaseSettings +from zenml.config.build_configuration import BuildConfiguration +from zenml.constants import ORCHESTRATOR_DOCKER_IMAGE_KEY +from zenml.integrations.modal.flavors.modal_orchestrator_flavor import ( + ModalExecutionMode, +) +from zenml.integrations.modal.orchestrators.modal_sandbox_executor import ( + ModalSandboxExecutor, +) +from zenml.integrations.modal.utils import ( + ENV_ZENML_MODAL_ORCHESTRATOR_RUN_ID, + get_modal_stack_validator, + setup_modal_client, +) +from zenml.logger import get_logger +from zenml.orchestrators import ContainerizedOrchestrator, SubmissionResult +from zenml.stack import Stack, StackValidator + +if TYPE_CHECKING: + from zenml.integrations.modal.flavors.modal_orchestrator_flavor import ( + ModalOrchestratorConfig, + ModalOrchestratorSettings, + ) + from zenml.models import ( + PipelineDeploymentBase, + PipelineDeploymentResponse, + PipelineRunResponse, + ) + +logger = get_logger(__name__) + + +class ModalOrchestrator(ContainerizedOrchestrator): + """Orchestrator responsible for running entire pipelines on Modal.""" + + @property + def config(self) -> "ModalOrchestratorConfig": + """Returns the Modal orchestrator config. + + Returns: + The Modal orchestrator config. + """ + return cast("ModalOrchestratorConfig", self._config) + + @property + def settings_class(self) -> Optional[Type["BaseSettings"]]: + """Settings class for the Modal orchestrator. + + Returns: + The settings class. + """ + from zenml.integrations.modal.flavors.modal_orchestrator_flavor import ( + ModalOrchestratorSettings, + ) + + return ModalOrchestratorSettings + + def get_docker_builds( + self, deployment: "PipelineDeploymentBase" + ) -> List["BuildConfiguration"]: + """Gets the Docker builds required for the component. + + For Modal orchestrator in PIPELINE mode, per-step images are not allowed + since the entire pipeline runs in a single sandbox. + + Args: + deployment: The pipeline deployment for which to get the builds. + + Returns: + The required Docker builds. + + Raises: + ValueError: If PIPELINE mode is used with per-step Docker settings. + """ + builds = super().get_docker_builds(deployment) + + # Get the execution mode from settings + settings = cast( + "ModalOrchestratorSettings", self.get_settings(deployment) + ) + mode = settings.mode + + # In PIPELINE mode, check if any builds have step-specific configurations + if mode == ModalExecutionMode.PIPELINE: + for build in builds: + if ( + build.key == ORCHESTRATOR_DOCKER_IMAGE_KEY + and build.step_name is not None + ): + raise ValueError( + f"Per-step Docker settings are not supported in PIPELINE " + f"execution mode. Step '{build.step_name}' has custom Docker " + f"settings but will be ignored since the entire pipeline runs " + f"in a single sandbox. Either use PER_STEP execution mode or " + f"remove step-specific Docker settings." + ) + + return builds + + def _setup_modal_client(self) -> None: + """Setup Modal client with authentication.""" + setup_modal_client( + token_id=self.config.token_id, + token_secret=self.config.token_secret, + workspace=self.config.workspace, + environment=self.config.modal_environment, + ) + + @property + def validator(self) -> Optional[StackValidator]: + """Ensures there is a container registry and artifact store in the stack. + + Returns: + A `StackValidator` instance. + """ + return get_modal_stack_validator() + + def get_orchestrator_run_id(self) -> str: + """Returns the active orchestrator run id. + + Raises: + RuntimeError: If the environment variable specifying the run id + is not set. + + Returns: + The orchestrator run id. + """ + try: + return os.environ[ENV_ZENML_MODAL_ORCHESTRATOR_RUN_ID] + except KeyError: + raise RuntimeError( + "Unable to read run id from environment variable " + f"{ENV_ZENML_MODAL_ORCHESTRATOR_RUN_ID}." + ) + + def submit_pipeline( + self, + deployment: "PipelineDeploymentResponse", + stack: "Stack", + environment: Dict[str, str], + placeholder_run: Optional["PipelineRunResponse"] = None, + ) -> Optional[SubmissionResult]: + """Submits a pipeline to Modal for execution. + + This method submits the pipeline to Modal and returns immediately unless + synchronous execution is configured, in which case it provides a wait + function in the submission result. + + Args: + deployment: The pipeline deployment to submit. + stack: The stack the pipeline will run on. + environment: Environment variables to set in the orchestration + environment. + placeholder_run: An optional placeholder run for the deployment. + + Returns: + Optional submission result with wait function if synchronous. + """ + self._setup_modal_client() + + if placeholder_run: + environment["ZENML_PIPELINE_RUN_ID"] = str(placeholder_run.id) + + # Get settings from pipeline configuration + settings = cast( + "ModalOrchestratorSettings", self.get_settings(deployment) + ) + + # Check execution mode + mode = settings.mode + logger.info(f"🚀 Executing pipeline with Modal ({mode.lower()} mode)") + + # Create sandbox executor + executor = ModalSandboxExecutor( + deployment=deployment, + stack=stack, + environment=environment, + settings=settings, + ) + + run_id = placeholder_run.id if placeholder_run else None + + if settings.synchronous: + + def _wait_for_completion() -> None: + # TODO: separate this into creating the sandbox, and + # monitoring/log streaming. + async def _execute_pipeline() -> None: + try: + await executor.execute_pipeline( + run_id=run_id, + synchronous=True, + ) + logger.info( + "✅ Pipeline execution completed successfully" + ) + except Exception as e: + logger.error(f"Pipeline execution failed: {e}") + raise + + asyncio.run(_execute_pipeline()) + + return SubmissionResult(wait_for_completion=_wait_for_completion) + else: + + async def _execute_pipeline() -> None: + try: + await executor.execute_pipeline( + run_id=run_id, + synchronous=False, + ) + logger.info("✅ Pipeline submitted successfully") + except Exception as e: + logger.error(f"Pipeline submission failed: {e}") + raise + + asyncio.run(_execute_pipeline()) + return None diff --git a/src/zenml/integrations/modal/orchestrators/modal_orchestrator_entrypoint.py b/src/zenml/integrations/modal/orchestrators/modal_orchestrator_entrypoint.py new file mode 100644 index 00000000000..01c4a425cd7 --- /dev/null +++ b/src/zenml/integrations/modal/orchestrators/modal_orchestrator_entrypoint.py @@ -0,0 +1,355 @@ +# Copyright (c) ZenML GmbH 2025. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Entrypoint of the Modal orchestrator sandbox.""" + +import argparse +import asyncio +import os +from typing import TYPE_CHECKING, Any, Dict, cast +from uuid import UUID, uuid4 + +import modal + +from zenml.client import Client + +if TYPE_CHECKING: + from zenml.models import PipelineDeploymentResponse + from zenml.stack import Stack +from zenml.entrypoints.pipeline_entrypoint_configuration import ( + PipelineEntrypointConfiguration, +) +from zenml.enums import ExecutionStatus +from zenml.exceptions import AuthorizationException +from zenml.integrations.modal.flavors.modal_orchestrator_flavor import ( + ModalExecutionMode, + ModalOrchestratorSettings, +) +from zenml.integrations.modal.orchestrators.modal_orchestrator import ( + ModalOrchestrator, +) +from zenml.integrations.modal.orchestrators.modal_sandbox_executor import ( + ModalSandboxExecutor, +) +from zenml.integrations.modal.utils import ( + ENV_ZENML_MODAL_ORCHESTRATOR_RUN_ID, + get_modal_app_name, + setup_modal_client, +) +from zenml.logger import get_logger +from zenml.orchestrators import publish_utils +from zenml.orchestrators.dag_runner import NodeStatus, ThreadedDagRunner +from zenml.orchestrators.utils import get_config_environment_vars + +logger = get_logger(__name__) + + +def parse_args() -> argparse.Namespace: + """Parse entrypoint arguments. + + Returns: + Parsed args. + """ + parser = argparse.ArgumentParser() + parser.add_argument("--deployment_id", type=str, required=True) + parser.add_argument("--run_id", type=str, required=False) + return parser.parse_args() + + +def run_step_on_modal( + step_name: str, + executor: ModalSandboxExecutor, +) -> None: + """Run a pipeline step in a separate Modal sandbox. + + Args: + step_name: Name of the step. + executor: The Modal sandbox executor. + + Raises: + Exception: If the sandbox fails to execute. + """ + logger.info(f"▶️ Running step: {step_name}") + + try: + asyncio.run(executor.execute_step(step_name)) + logger.info(f"✅ Step completed: {step_name}") + except Exception as e: + logger.error(f"❌ Step failed: {step_name} - {e}") + raise + + +async def prepare_shared_image_cache( + deployment: "PipelineDeploymentResponse", + stack: "Stack", +) -> Dict[str, modal.Image]: + """Pre-build all required images for pipeline steps and create shared Modal app. + + This function analyzes all steps in the deployment, identifies unique images + needed, and pre-builds them to avoid redundant builds during step execution. + + Args: + deployment: The pipeline deployment. + stack: The ZenML stack. + settings: Modal orchestrator settings. + + Returns: + The shared image cache. + + Raises: + ValueError: If the deployment has no associated build information. + Exception: For any unexpected error while building images. + """ + from zenml.integrations.modal.utils import get_or_build_modal_image + + logger.info("🔧 Preparing images for step execution") + + # Check if deployment has a build + if deployment.build is None: + raise ValueError( + "Deployment build is None, cannot prepare image cache" + ) + + image_cache: Dict[str, modal.Image] = {} + + for build_item in deployment.build.images.values(): + try: + cached_image = get_or_build_modal_image( + stack=stack, + pipeline_name=deployment.pipeline_configuration.name, + build_item=build_item, + build_id=deployment.build.id, + ) + image_cache[build_item.image] = cached_image + except Exception as e: + logger.error(f"Failed to build image {build_item.image}: {e}") + raise + + logger.info(f"✅ Prepared {len(image_cache)} container images") + return image_cache + + +def execute_pipeline_mode(args: argparse.Namespace) -> None: + """Execute entire pipeline in single sandbox mode. + + Args: + args: Parsed command line arguments. + """ + logger.debug("Executing pipeline sequentially in this sandbox") + entrypoint_args = PipelineEntrypointConfiguration.get_entrypoint_arguments( + deployment_id=args.deployment_id + ) + config = PipelineEntrypointConfiguration(arguments=entrypoint_args) + config.run() + + +def execute_per_step_mode( + deployment: "PipelineDeploymentResponse", + active_stack: "Stack", + environment: Dict[str, str], + pipeline_settings: ModalOrchestratorSettings, + args: argparse.Namespace, + orchestrator_run_id: str, +) -> None: + """Execute pipeline with per-step sandboxes. + + Args: + deployment: The pipeline deployment. + active_stack: The active ZenML stack. + environment: Environment variables. + pipeline_settings: Modal orchestrator settings. + args: Parsed command line arguments. + orchestrator_run_id: The orchestrator run ID. + """ + logger.debug("Executing pipeline with per-step sandboxes") + + app = modal.App.lookup( + get_modal_app_name(pipeline_settings, deployment), + create_if_missing=True, + environment_name=pipeline_settings.modal_environment, + ) + + shared_image_cache = asyncio.run( + prepare_shared_image_cache( + deployment=deployment, + stack=active_stack, + ) + ) + + # Create shared executor instance that will be reused across steps + shared_executor = ModalSandboxExecutor( + deployment=deployment, + stack=active_stack, + environment=environment, + settings=pipeline_settings, + shared_image_cache=shared_image_cache, + shared_app=app, + ) + + def run_step_wrapper(step_name: str) -> None: + """Wrapper to execute a single pipeline step. + + Args: + step_name: Name of the step to execute. + """ + run_step_on_modal(step_name, shared_executor) + + def finalize_wrapper(node_states: Dict[str, NodeStatus]) -> None: + """Wrapper to finalize pipeline execution. + + Args: + node_states: Mapping of node/step names to their execution + status after DAG completion. + """ + finalize_run(node_states, args, orchestrator_run_id) + + # Build DAG from deployment + pipeline_dag = { + step_name: step.spec.upstream_steps + for step_name, step in deployment.step_configurations.items() + } + + logger.info(f"🚀 Executing {len(pipeline_dag)} pipeline steps") + + # Run using ThreadedDagRunner with optimized execution + ThreadedDagRunner( + dag=pipeline_dag, + run_fn=run_step_wrapper, + finalize_fn=finalize_wrapper, + max_parallelism=getattr(pipeline_settings, "max_parallelism", None), + ).run() + + +def finalize_run( + node_states: Dict[str, NodeStatus], + args: argparse.Namespace, + orchestrator_run_id: str, +) -> None: + """Finalize the run by updating step and pipeline run statuses. + + Args: + node_states: The states of the nodes. + args: Parsed command line arguments. + orchestrator_run_id: The orchestrator run ID. + """ + try: + client = Client() + deployment = client.get_deployment(args.deployment_id) + + # Fetch the pipeline run + list_args: Dict[str, Any] = {} + if args.run_id: + list_args = dict(id=UUID(args.run_id)) + else: + list_args = dict(orchestrator_run_id=orchestrator_run_id) + + pipeline_runs = client.list_pipeline_runs( + hydrate=True, + project=deployment.project_id, + deployment_id=deployment.id, + **list_args, + ) + + if not len(pipeline_runs): + return + + pipeline_run = pipeline_runs[0] + pipeline_failed = False + + for step_name, node_state in node_states.items(): + if node_state != NodeStatus.FAILED: + continue + + pipeline_failed = True + + # Mark failed step runs as failed + step_run = pipeline_run.steps.get(step_name) + if step_run and step_run.status in { + ExecutionStatus.INITIALIZING, + ExecutionStatus.RUNNING, + }: + publish_utils.publish_failed_step_run(step_run.id) + + # Mark pipeline as failed if any steps failed + if pipeline_failed and pipeline_run.status in { + ExecutionStatus.INITIALIZING, + ExecutionStatus.RUNNING, + }: + publish_utils.publish_failed_pipeline_run(pipeline_run.id) + + except AuthorizationException: + # Token may be invalidated after completion, this is expected + pass + + +def main() -> None: + """Entrypoint of the Modal orchestrator sandbox. + + This entrypoint is used to execute the pipeline in a Modal sandbox. + + Raises: + Exception: If the pipeline execution fails. + """ + logger.debug("Modal orchestrator sandbox started.") + + args = parse_args() + + # Generate orchestrator run ID locally since it's just a random UUID + orchestrator_run_id = str(uuid4()) + os.environ[ENV_ZENML_MODAL_ORCHESTRATOR_RUN_ID] = orchestrator_run_id + + client = Client() + active_stack = client.active_stack + orchestrator = active_stack.orchestrator + assert isinstance(orchestrator, ModalOrchestrator) + + deployment = client.get_deployment(args.deployment_id) + pipeline_settings = cast( + ModalOrchestratorSettings, + orchestrator.get_settings(deployment), + ) + + setup_modal_client( + token_id=orchestrator.config.token_id, + token_secret=orchestrator.config.token_secret, + workspace=orchestrator.config.workspace, + environment=orchestrator.config.modal_environment, + ) + + try: + if pipeline_settings.mode == ModalExecutionMode.PIPELINE: + execute_pipeline_mode(args) + else: + environment = get_config_environment_vars() + environment[ENV_ZENML_MODAL_ORCHESTRATOR_RUN_ID] = ( + orchestrator_run_id + ) + + execute_per_step_mode( + deployment, + active_stack, + environment, + pipeline_settings, + args, + orchestrator_run_id, + ) + + logger.debug("Pipeline execution completed successfully") + + except Exception as e: + logger.error(f"Pipeline execution failed: {e}") + raise + + +if __name__ == "__main__": + main() diff --git a/src/zenml/integrations/modal/orchestrators/modal_orchestrator_entrypoint_configuration.py b/src/zenml/integrations/modal/orchestrators/modal_orchestrator_entrypoint_configuration.py new file mode 100644 index 00000000000..f638f702819 --- /dev/null +++ b/src/zenml/integrations/modal/orchestrators/modal_orchestrator_entrypoint_configuration.py @@ -0,0 +1,78 @@ +# Copyright (c) ZenML GmbH 2025. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Entrypoint configuration for the Modal orchestrator sandbox.""" + +from typing import TYPE_CHECKING, List, Optional, Set + +if TYPE_CHECKING: + from uuid import UUID + +DEPLOYMENT_ID_OPTION = "deployment_id" +RUN_ID_OPTION = "run_id" + + +class ModalOrchestratorEntrypointConfiguration: + """Entrypoint configuration for the orchestrator sandbox.""" + + @classmethod + def get_entrypoint_options(cls) -> Set[str]: + """Gets all the options required for running this entrypoint. + + Returns: + Entrypoint options. + """ + options = { + DEPLOYMENT_ID_OPTION, + } + return options + + @classmethod + def get_entrypoint_command(cls) -> List[str]: + """Returns a command that runs the entrypoint module. + + Returns: + Entrypoint command. + """ + command = [ + "python", + "-m", + "zenml.integrations.modal.orchestrators.modal_orchestrator_entrypoint", + ] + return command + + @classmethod + def get_entrypoint_arguments( + cls, + deployment_id: "UUID", + run_id: Optional["UUID"] = None, + ) -> List[str]: + """Gets all arguments that the entrypoint command should be called with. + + Args: + deployment_id: ID of the deployment. + run_id: Optional ID of the pipeline run. + + Returns: + List of entrypoint arguments. + """ + args = [ + f"--{DEPLOYMENT_ID_OPTION}", + str(deployment_id), + ] + + if run_id: + args.append(f"--{RUN_ID_OPTION}") + args.append(str(run_id)) + + return args diff --git a/src/zenml/integrations/modal/orchestrators/modal_sandbox_executor.py b/src/zenml/integrations/modal/orchestrators/modal_sandbox_executor.py new file mode 100644 index 00000000000..0b96a70b956 --- /dev/null +++ b/src/zenml/integrations/modal/orchestrators/modal_sandbox_executor.py @@ -0,0 +1,470 @@ +# Copyright (c) ZenML GmbH 2025. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Modal sandbox executor.""" + +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union, cast +from uuid import UUID + +import modal + +from zenml.client import Client +from zenml.config.resource_settings import ByteUnit, ResourceSettings +from zenml.entrypoints.step_entrypoint_configuration import ( + StepEntrypointConfiguration, +) +from zenml.integrations.modal.flavors.modal_orchestrator_flavor import ( + ModalOrchestratorSettings, +) +from zenml.integrations.modal.orchestrators.modal_orchestrator_entrypoint_configuration import ( + ModalOrchestratorEntrypointConfiguration, +) +from zenml.integrations.modal.utils import ( + generate_sandbox_tags, + get_modal_app_name, + get_or_build_modal_image, +) +from zenml.logger import get_logger + +if TYPE_CHECKING: + from zenml.config.step_configurations import Step + from zenml.models import PipelineDeploymentResponse + from zenml.stack import Stack + +logger = get_logger(__name__) + + +class ModalSandboxExecutor: + """Handles execution of ZenML pipelines and steps in Modal sandboxes.""" + + def __init__( + self, + deployment: "PipelineDeploymentResponse", + stack: "Stack", + environment: Dict[str, str], + settings: ModalOrchestratorSettings, + shared_image_cache: Optional[Dict[str, modal.Image]] = None, + shared_app: Optional[modal.App] = None, + ): + """Initialize the Modal sandbox executor. + + Args: + deployment: The pipeline deployment. + stack: The ZenML stack. + environment: Environment variables. + settings: Modal orchestrator settings. + shared_image_cache: Pre-built images shared across step executions. + shared_app: Shared Modal app for the entire pipeline execution. + """ + self.deployment = deployment + self.stack = stack + self.environment = environment + self.settings = settings + self.client = Client() + self.shared_image_cache = shared_image_cache or {} + + # Use shared app if provided, otherwise create new one + if shared_app: + self.app = shared_app + self.app_name = shared_app.name + else: + # Create Modal app for this pipeline + self.app_name = get_modal_app_name(settings, deployment) + self.app = modal.App.lookup( + self.app_name, + create_if_missing=True, + environment_name=settings.modal_environment, + ) + + # --------------------------------------------------------------------- + # Resource utilities + # --------------------------------------------------------------------- + + def _get_settings( + self, step_name: Optional[str] = None + ) -> ModalOrchestratorSettings: + """Get settings for a specific step or pipeline. + + Args: + step_name: Optional step name for which to fetch settings. If not + given, pipeline-level settings are returned. + + Returns: + Pipeline or step settings. + """ + container: Union["PipelineDeploymentResponse", "Step"] = ( + self.deployment.step_configurations[step_name] + if step_name + else self.deployment + ) + return cast( + ModalOrchestratorSettings, + self.stack.orchestrator.get_settings(container), + ) + + def _get_resource_settings( + self, step_name: Optional[str] = None + ) -> ResourceSettings: + """Return validated resource settings for either pipeline or step. + + Args: + step_name: Optional name of the step for which to fetch resource + settings. If ``None`` (default), pipeline-level settings are + returned. + + Returns: + A validated ``ResourceSettings`` object (never ``None``). + """ + if step_name: + return self.deployment.step_configurations[ + step_name + ].config.resource_settings + else: + return self.deployment.pipeline_configuration.resource_settings + + # TODO: Maybe use defaults? + # resource_settings = ResourceSettings( + # cpu_count=1, + # memory="1024MB", + # gpu_count=0, + # ) + + def _create_environment_secret(self) -> Optional[modal.Secret]: + """Create a Modal secret containing environment variables. + + Returns: + Modal secret with environment variables, or None if no env vars. + """ + if not self.environment: + return None + + return modal.Secret.from_dict( + cast(Dict[str, Optional[str]], self.environment) + ) + + def _get_resource_config( + self, step_name: Optional[str] = None + ) -> tuple[Optional[str], Optional[int], Optional[int]]: + """Get validated resource configuration for pipeline or step. + + Args: + step_name: Name of the step (None for pipeline-level). + + Returns: + Tuple of (gpu_values, cpu_count, memory_mb) with validated values. + """ + settings = self._get_settings(step_name) + resource_settings = self._get_resource_settings(step_name) + + cpu_count: Optional[int] = None + if resource_settings.cpu_count is not None: + cpu_count = int(resource_settings.cpu_count) + + memory_mb: Optional[int] = None + if memory_float := resource_settings.get_memory(ByteUnit.MB): + memory_mb = int(memory_float) + + gpu_value = None + gpu_type = settings.gpu + gpu_count = resource_settings.gpu_count + + if not gpu_type and gpu_count is not None: + gpu_type = "T4" + logger.debug( + f"No GPU type specified for {'step ' + step_name if step_name else 'pipeline'}, " + f"but gpu_count={gpu_count}. Defaulting to {gpu_type}." + ) + + if gpu_count == 0: + gpu_value = None + elif gpu_count is None: + gpu_value = gpu_type + else: + gpu_value = f"{gpu_type}:{gpu_count}" + + return gpu_value, cpu_count, memory_mb + + def _prepare_modal_api_params( + self, + entrypoint_command: List[str], + image: Any, + gpu: Optional[str], + cpu: Optional[int], + memory: Optional[int], + cloud: Optional[str], + region: Optional[str], + app: Any, + timeout: int, + secrets: List[modal.Secret], + ) -> Dict[str, Any]: + """Prepare and validate Modal API parameters. + + This method ensures that all parameters passed to Modal API are valid + and handles None values appropriately. + + Args: + entrypoint_command: Command to execute. + image: Modal image. + gpu: GPU configuration string. + cpu: CPU count. + memory: Memory in MB. + cloud: Cloud provider. + region: Cloud region. + app: Modal app. + timeout: Timeout in seconds. + secrets: List of Modal secrets. + + Returns: + Dictionary of validated parameters for Modal API. + + Raises: + ValueError: If required parameters are invalid. + """ + if not entrypoint_command: + raise ValueError("Entrypoint command cannot be empty") + + if image is None: + raise ValueError("Modal image is required") + + if timeout <= 0: + raise ValueError(f"Timeout must be positive, got {timeout}") + + # Build parameters dictionary + # Note: entrypoint_command will be passed as *args separately + params = { + "image": image, + "app": app, + "timeout": timeout, + } + + # Add optional parameters only if they have valid values + if gpu is not None: + # Validate GPU format + if isinstance(gpu, str) and gpu.strip(): + params["gpu"] = gpu + else: + logger.warning(f"Invalid GPU value '{gpu}', ignoring") + + if cpu is not None and cpu > 0: + params["cpu"] = cpu + + if memory is not None and memory > 0: + params["memory"] = memory + + if cloud is not None and cloud.strip(): + params["cloud"] = cloud + + if region is not None and region.strip(): + params["region"] = region + + if secrets: + params["secrets"] = secrets + + # Log final parameters for debugging + param_summary = { + k: v + for k, v in params.items() + if k not in ["image", "app", "secrets"] # Skip complex objects + } + logger.debug(f"Modal sandbox parameters: {param_summary}") + + return params + + async def _execute_sandbox( + self, + entrypoint_command: List[str], + mode: str, + step_name: Optional[str] = None, + run_id: Optional[UUID] = None, + synchronous: bool = True, + ) -> None: + """Execute a sandbox with the given command. + + Args: + entrypoint_command: Command to execute in the sandbox. + mode: Execution mode for tagging. + step_name: Name of the step (for step execution). + run_id: Pipeline run ID for tagging. + synchronous: Whether to wait for completion. + """ + # Get resource configuration with validation + gpu_values, cpu_count, memory_mb = self._get_resource_config(step_name) + + # Get settings (step-specific for steps, pipeline-level for pipeline) + if step_name: + step_settings = self._get_settings(step_name) + cloud = step_settings.cloud + region = step_settings.region + timeout = step_settings.timeout + else: + cloud = self.settings.cloud + region = self.settings.region + timeout = self.settings.timeout + + modal_image = self._get_cached_or_build_image(step_name) + + env_secret = self._create_environment_secret() + secrets = [env_secret] if env_secret else [] + + tags = generate_sandbox_tags( + pipeline_name=self.deployment.pipeline_configuration.name, + deployment_id=str(self.deployment.id), + execution_mode=mode, + step_name=step_name, + run_id=run_id, + ) + + modal_params = self._prepare_modal_api_params( + entrypoint_command=entrypoint_command, + image=modal_image, + gpu=gpu_values, + cpu=cpu_count, + memory=memory_mb, + cloud=cloud, + region=region, + app=self.app, + timeout=timeout, + secrets=secrets, + ) + + with modal.enable_output(): + sb = await modal.Sandbox.create.aio( + *entrypoint_command, **modal_params + ) + sb.set_tags(tags) + + if synchronous: + async for line in sb.stdout: + print(line, end="") + await sb.wait.aio() + else: + logger.debug("Sandbox started asynchronously.") + + def _get_cached_or_build_image( + self, step_name: Optional[str] = None + ) -> modal.Image: + """Get cached Modal image or build new one if not in cache. + + This method first checks the shared image cache for an existing image. + If found, it returns the cached image. Otherwise, it falls back to + the standard image building process. + + Args: + step_name: Name of the step (None for pipeline-level). + + Returns: + The modal image. + """ + from zenml.integrations.modal.orchestrators.modal_orchestrator import ( + ModalOrchestrator, + ) + + assert self.deployment.build + + image_name = ModalOrchestrator.get_image( + deployment=self.deployment, step_name=step_name + ) + + if cached_image := self.shared_image_cache.get(image_name): + return cached_image + + build_item = self.deployment.build._get_item( + component_key="orchestrator", step=step_name + ) + + return get_or_build_modal_image( + stack=self.stack, + pipeline_name=self.deployment.pipeline_configuration.name, + build_item=build_item, + build_id=self.deployment.build.id, + ) + + def _get_image_cache_key( + self, image_name: str, step_name: Optional[str] = None + ) -> str: + """Generate a cache key for Modal images. + + Args: + image_name: The base image name. + step_name: Name of the step (None for pipeline-level). + + Returns: + Cache key for the image. + + Raises: + ValueError: If the deployment does not have a build ID which is + required to scope the cache key. + """ + # Use build ID and step name to create unique cache key + # Include a hash of the image name for uniqueness + if self.deployment.build is None: + raise ValueError( + "Deployment build is None, cannot generate cache key" + ) + + build_id = str(self.deployment.build.id) + image_hash = str(hash(image_name))[-8:] # Last 8 chars of hash + if step_name: + return f"{build_id}_{step_name}_{image_hash}" + else: + return f"{build_id}_pipeline_{image_hash}" + + async def execute_pipeline( + self, + run_id: Optional[UUID] = None, + synchronous: bool = True, + ) -> None: + """Execute the entire pipeline in a single sandbox. + + Args: + run_id: The pipeline run ID. + synchronous: Whether to wait for completion. + """ + logger.debug("Executing entire pipeline in single sandbox") + + command = ( + ModalOrchestratorEntrypointConfiguration.get_entrypoint_command() + ) + args = ( + ModalOrchestratorEntrypointConfiguration.get_entrypoint_arguments( + deployment_id=self.deployment.id, + run_id=run_id, + ) + ) + + await self._execute_sandbox( + entrypoint_command=command + args, + mode="PIPELINE", + run_id=run_id, + synchronous=synchronous, + ) + + async def execute_step(self, step_name: str) -> None: + """Execute a single step in its own sandbox. + + Args: + step_name: Name of the step to execute. + """ + logger.debug(f"Executing step '{step_name}' in separate sandbox") + + command = StepEntrypointConfiguration.get_entrypoint_command() + args = StepEntrypointConfiguration.get_entrypoint_arguments( + step_name=step_name, deployment_id=self.deployment.id + ) + + await self._execute_sandbox( + entrypoint_command=command + args, + mode="PER_STEP", + step_name=step_name, + synchronous=True, + ) diff --git a/src/zenml/integrations/modal/step_operators/modal_step_operator.py b/src/zenml/integrations/modal/step_operators/modal_step_operator.py index 9c654ca6541..f48be1ce11e 100644 --- a/src/zenml/integrations/modal/step_operators/modal_step_operator.py +++ b/src/zenml/integrations/modal/step_operators/modal_step_operator.py @@ -14,21 +14,25 @@ """Modal step operator implementation.""" import asyncio -from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Type, cast +from typing import TYPE_CHECKING, Dict, List, Optional, Type, cast import modal -from modal_proto import api_pb2 from zenml.client import Client from zenml.config.build_configuration import BuildConfiguration -from zenml.config.resource_settings import ByteUnit, ResourceSettings -from zenml.enums import StackComponentType +from zenml.config.resource_settings import ByteUnit from zenml.integrations.modal.flavors import ( ModalStepOperatorConfig, ModalStepOperatorSettings, ) +from zenml.integrations.modal.utils import ( + build_modal_image, + get_gpu_values, + get_modal_stack_validator, + setup_modal_client, +) from zenml.logger import get_logger -from zenml.stack import Stack, StackValidator +from zenml.stack import StackValidator from zenml.step_operators import BaseStepOperator if TYPE_CHECKING: @@ -41,24 +45,6 @@ MODAL_STEP_OPERATOR_DOCKER_IMAGE_KEY = "modal_step_operator" -def get_gpu_values( - settings: ModalStepOperatorSettings, resource_settings: ResourceSettings -) -> Optional[str]: - """Get the GPU values for the Modal step operator. - - Args: - settings: The Modal step operator settings. - resource_settings: The resource settings. - - Returns: - The GPU string if a count is specified, otherwise the GPU type. - """ - if not settings.gpu: - return None - gpu_count = resource_settings.gpu_count - return f"{settings.gpu}:{gpu_count}" if gpu_count else settings.gpu - - class ModalStepOperator(BaseStepOperator): """Step operator to run a step on Modal. @@ -91,40 +77,7 @@ def validator(self) -> Optional[StackValidator]: Returns: The stack validator. """ - - def _validate_remote_components(stack: "Stack") -> Tuple[bool, str]: - if stack.artifact_store.config.is_local: - return False, ( - "The Modal step operator runs code remotely and " - "needs to write files into the artifact store, but the " - f"artifact store `{stack.artifact_store.name}` of the " - "active stack is local. Please ensure that your stack " - "contains a remote artifact store when using the Modal " - "step operator." - ) - - container_registry = stack.container_registry - assert container_registry is not None - - if container_registry.config.is_local: - return False, ( - "The Modal step operator runs code remotely and " - "needs to push/pull Docker images, but the " - f"container registry `{container_registry.name}` of the " - "active stack is local. Please ensure that your stack " - "contains a remote container registry when using the " - "Modal step operator." - ) - - return True, "" - - return StackValidator( - required_components={ - StackComponentType.CONTAINER_REGISTRY, - StackComponentType.IMAGE_BUILDER, - }, - custom_validation_function=_validate_remote_components, - ) + return get_modal_stack_validator() def get_docker_builds( self, snapshot: "PipelineSnapshotBase" @@ -161,51 +114,25 @@ def launch( info: The step run information. entrypoint_command: The entrypoint command for the step. environment: The environment variables for the step. - - Raises: - RuntimeError: If no Docker credentials are found for the container registry. - ValueError: If no container registry is found in the stack. """ settings = cast(ModalStepOperatorSettings, self.get_settings(info)) image_name = info.get_image(key=MODAL_STEP_OPERATOR_DOCKER_IMAGE_KEY) zc = Client() stack = zc.active_stack - if not stack.container_registry: - raise ValueError( - "No Container registry found in the stack. " - "Please add a container registry and ensure " - "it is correctly configured." - ) - - if docker_creds := stack.container_registry.credentials: - docker_username, docker_password = docker_creds - else: - raise RuntimeError( - "No Docker credentials found for the container registry." - ) - - my_secret = modal.secret._Secret.from_dict( - { - "REGISTRY_USERNAME": docker_username, - "REGISTRY_PASSWORD": docker_password, - } - ) - - spec = modal.image.DockerfileSpec( - commands=[f"FROM {image_name}"], context_files={} + # Setup Modal authentication + setup_modal_client( + token_id=self.config.token_id, + token_secret=self.config.token_secret, + workspace=self.config.workspace, + environment=self.config.modal_environment, ) - zenml_image = modal.Image._from_args( - dockerfile_function=lambda *_, **__: spec, - force_build=False, - image_registry_config=modal.image._ImageRegistryConfig( - api_pb2.REGISTRY_AUTH_TYPE_STATIC_CREDS, my_secret - ), - ).env(environment) + # Build Modal image using shared utility + zenml_image = build_modal_image(image_name, stack, environment) resource_settings = info.config.resource_settings - gpu_values = get_gpu_values(settings, resource_settings) + gpu_values = get_gpu_values(settings.gpu, resource_settings) app = modal.App( f"zenml-{info.run_name}-{info.step_run_id}-{info.pipeline_step_name}" @@ -231,7 +158,7 @@ async def run_sandbox() -> asyncio.Future[None]: cloud=settings.cloud, region=settings.region, app=app, - timeout=86400, # 24h, the max Modal allows + timeout=settings.timeout, ) await sb.wait.aio() diff --git a/src/zenml/integrations/modal/utils.py b/src/zenml/integrations/modal/utils.py new file mode 100644 index 00000000000..1332760195c --- /dev/null +++ b/src/zenml/integrations/modal/utils.py @@ -0,0 +1,351 @@ +# Copyright (c) ZenML GmbH 2025. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Shared utilities for Modal integration components.""" + +import os +from typing import TYPE_CHECKING, Dict, Optional, Tuple +from uuid import UUID + +import modal + +from zenml.config import ResourceSettings +from zenml.enums import StackComponentType +from zenml.logger import get_logger +from zenml.stack import Stack, StackValidator + +if TYPE_CHECKING: + from zenml.integrations.modal.flavors.modal_orchestrator_flavor import ( + ModalOrchestratorSettings, + ) + from zenml.models import BuildItem, PipelineDeploymentResponse + +logger = get_logger(__name__) + +ENV_ZENML_MODAL_ORCHESTRATOR_RUN_ID = "ZENML_MODAL_ORCHESTRATOR_RUN_ID" + + +def setup_modal_client( + token_id: Optional[str] = None, + token_secret: Optional[str] = None, + workspace: Optional[str] = None, + environment: Optional[str] = None, +) -> None: + """Setup Modal client with authentication. + + Args: + token_id: Modal API token ID (ak-xxxxx format). + token_secret: Modal API token secret (as-xxxxx format). + workspace: Modal workspace name. + environment: Modal environment name. + """ + if token_id and token_secret: + # Validate token format + if not token_id.startswith("ak-"): + logger.warning( + f"Token ID format may be invalid. Expected format: ak-xxxxx, " + f"got: {token_id[:10]}... (truncated for security)" + ) + + if not token_secret.startswith("as-"): + logger.warning( + f"Token secret format may be invalid. Expected format: as-xxxxx, " + f"got: {token_secret[:10]}... (truncated for security)" + ) + + # Set both token ID and secret + os.environ["MODAL_TOKEN_ID"] = token_id + os.environ["MODAL_TOKEN_SECRET"] = token_secret + logger.debug("Using platform token ID and secret from config") + logger.debug(f"Token ID starts with: {token_id[:5]}...") + logger.debug(f"Token secret starts with: {token_secret[:5]}...") + + elif token_id: + # Validate token format + if not token_id.startswith("ak-"): + logger.warning( + f"Token ID format may be invalid. Expected format: ak-xxxxx, " + f"got: {token_id[:10]}... (truncated for security)" + ) + + # Only token ID provided + os.environ["MODAL_TOKEN_ID"] = token_id + logger.debug("Using platform token ID from config") + logger.warning( + "Only token ID provided. Make sure MODAL_TOKEN_SECRET is set " + "or platform authentication may fail." + ) + logger.debug(f"Token ID starts with: {token_id[:5]}...") + + elif token_secret: + # Validate token format + if not token_secret.startswith("as-"): + logger.warning( + f"Token secret format may be invalid. Expected format: as-xxxxx, " + f"got: {token_secret[:10]}... (truncated for security)" + ) + + # Only token secret provided (unusual) + os.environ["MODAL_TOKEN_SECRET"] = token_secret + logger.warning( + "Only token secret provided. Make sure MODAL_TOKEN_ID is set " + "or platform authentication may fail." + ) + logger.debug(f"Token secret starts with: {token_secret[:5]}...") + + else: + logger.debug("Using default platform authentication (~/.modal.toml)") + # Check if default auth exists + modal_toml_path = os.path.expanduser("~/.modal.toml") + if os.path.exists(modal_toml_path): + logger.debug(f"Found platform config at {modal_toml_path}") + else: + logger.warning( + f"No platform config found at {modal_toml_path}. " + "Run 'modal token new' to set up authentication." + ) + + # Set workspace/environment if provided + if workspace: + os.environ["MODAL_WORKSPACE"] = workspace + if environment: + os.environ["MODAL_ENVIRONMENT"] = environment + + +# TODO: refactor step operator and remove this +def get_gpu_values( + gpu_type: Optional[str], resource_settings: ResourceSettings +) -> Optional[str]: + """Get the GPU values for Modal components. + + Args: + gpu_type: The GPU type from Modal settings (e.g., "T4", "A100"). + resource_settings: The resource settings containing GPU configuration. + + Returns: + The GPU string for Modal API, or None if no GPU requested. + """ + if not gpu_type: + return None + + gpu_count = resource_settings.gpu_count + if gpu_count == 0: + return None + elif gpu_count is None: + return gpu_type + else: + return f"{gpu_type}:{gpu_count}" + + +def build_modal_image( + image_name: str, + stack: "Stack", + environment: Optional[Dict[str, str]] = None, +) -> modal.Image: + """Build a Modal image from a Docker registry with authentication. + + This helper function centralizes the shared logic for building Modal images + from Docker registries, including credential validation, secret creation, + and image building with Modal installation. + + Args: + image_name: The name of the Docker image to use as base. + stack: The ZenML stack containing container registry. + environment: Optional environment variables to apply to the image. + + Returns: + The configured Modal image. + + Raises: + RuntimeError: If no Docker credentials are found. + """ + if not stack.container_registry: + raise RuntimeError( + "No Container registry found in the stack. " + "Please add a container registry and ensure " + "it is correctly configured." + ) + + if docker_creds := stack.container_registry.credentials: + docker_username, docker_password = docker_creds + else: + raise RuntimeError( + "No Docker credentials found for the container registry." + ) + + registry_secret = modal.Secret.from_dict( + { + "REGISTRY_USERNAME": docker_username, + "REGISTRY_PASSWORD": docker_password, + } + ) + + modal_image = modal.Image.from_registry( + image_name, secret=registry_secret + ).pip_install("modal") + + if environment: + modal_image = modal_image.env(environment) + + return modal_image + + +def get_or_build_modal_image( + stack: "Stack", + pipeline_name: str, + build_item: "BuildItem", + build_id: UUID, +) -> modal.Image: + """Get existing Modal image or build new one based on pipeline name and build ID. + + Args: + stack: The ZenML stack containing container registry. + pipeline_name: The pipeline name for caching. + build_item: The build item to use for the image. + build_id: The build ID for the image key. + + Returns: + The configured Modal image. + """ + cache_key = ( + build_item.settings_checksum or f"{build_id}-{build_item.image}" + ) + + remote_image_cache = modal.Dict.from_name( + f"zenml-image-cache-{pipeline_name}", create_if_missing=True + ) + + try: + if modal_image_id := remote_image_cache.get(cache_key): + existing_image = modal.Image.from_id(modal_image_id) + logger.debug( + f"Using cached Modal image for image {build_item.image} with cache key {cache_key}" + ) + return existing_image + except (modal.exception.NotFoundError, KeyError): + pass + + new_image = build_modal_image( + image_name=build_item.image, + stack=stack, + ) + + new_image.hydrate() + try: + remote_image_cache[cache_key] = new_image.object_id + except Exception as e: + logger.warning(f"Failed to cache image: {e}") + + return new_image + + +def generate_sandbox_tags( + pipeline_name: str, + deployment_id: str, + execution_mode: str, + step_name: Optional[str] = None, + run_id: Optional[UUID] = None, +) -> Dict[str, str]: + """Generate tags for Modal sandboxes. + + Args: + pipeline_name: Name of the pipeline + deployment_id: ZenML deployment ID + execution_mode: Execution mode (PIPELINE or PER_STEP) + step_name: Step name (for PER_STEP mode) + run_id: Pipeline run ID + + Returns: + Dictionary of tags for the sandbox + """ + tags = { + "zenml_pipeline": pipeline_name, + "zenml_deployment_id": deployment_id, + "zenml_execution_mode": execution_mode, + "zenml_component": "modal_orchestrator", + } + + if step_name: + tags["zenml_step"] = step_name + + if run_id: + tags["zenml_run_id"] = str(run_id) + + return tags + + +def get_modal_stack_validator() -> StackValidator: + """Get a stack validator for Modal components. + + The validator ensures that the stack contains a remote artifact store and + container registry. + + Returns: + A stack validator for modal components. + """ + + def _validate_remote_components(stack: "Stack") -> Tuple[bool, str]: + if stack.artifact_store.config.is_local: + return False, ( + "Serverless components run code remotely and " + "need to write files into the artifact store, but the " + f"artifact store `{stack.artifact_store.name}` of the " + "active stack is local. Please ensure that your stack " + "contains a remote artifact store when using serverless " + "components." + ) + + container_registry = stack.container_registry + assert container_registry is not None + + if container_registry.config.is_local: + return False, ( + "Serverless components run code remotely and " + "need to push/pull Docker images, but the " + f"container registry `{container_registry.name}` of the " + "active stack is local. Please ensure that your stack " + "contains a remote container registry when using serverless " + "components." + ) + + return True, "" + + return StackValidator( + required_components={ + StackComponentType.CONTAINER_REGISTRY, + StackComponentType.IMAGE_BUILDER, + }, + custom_validation_function=_validate_remote_components, + ) + + +def get_modal_app_name( + settings: "ModalOrchestratorSettings", + deployment: "PipelineDeploymentResponse", +) -> str: + """Get the Modal app name from settings or generate default from pipeline name. + + Args: + settings: Modal orchestrator settings object. + deployment: The pipeline deployment object. + + Returns: + The Modal app name to use. + """ + if settings.app_name: + return settings.app_name + else: + pipeline_name = deployment.pipeline_configuration.name.replace( + "_", "-" + ) + return f"zenml-pipeline-{pipeline_name}" diff --git a/tests/integration/integrations/modal/orchestrators/__init__.py b/tests/integration/integrations/modal/orchestrators/__init__.py new file mode 100644 index 00000000000..47d03e07fbd --- /dev/null +++ b/tests/integration/integrations/modal/orchestrators/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) ZenML GmbH 2025. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Modal orchestrator integration tests.""" \ No newline at end of file diff --git a/tests/integration/integrations/modal/orchestrators/test_modal_sandbox_executor.py b/tests/integration/integrations/modal/orchestrators/test_modal_sandbox_executor.py new file mode 100644 index 00000000000..4a414f85df4 --- /dev/null +++ b/tests/integration/integrations/modal/orchestrators/test_modal_sandbox_executor.py @@ -0,0 +1,243 @@ +# Copyright (c) ZenML GmbH 2025. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Tests for ModalSandboxExecutor pipeline and step resource merging.""" + +from unittest.mock import Mock, patch +from uuid import uuid4 + +from zenml.config.resource_settings import ResourceSettings +from zenml.integrations.modal.flavors.modal_orchestrator_flavor import ( + ModalOrchestratorSettings, +) + + +class TestModalSandboxExecutorResourceMerging: + """Test resource merging between pipeline and step settings.""" + + def setup_method(self): + """Set up test fixtures.""" + # Mock Modal library + self.modal_patcher = patch( + "zenml.integrations.modal.orchestrators.modal_sandbox_executor.modal" + ) + self.modal_patcher.start() + + # Mock utils functions + self.utils_patcher = patch( + "zenml.integrations.modal.orchestrators.modal_sandbox_executor.get_resource_settings_from_deployment" + ) + self.mock_get_resource_settings = self.utils_patcher.start() + + # Create mock deployment + self.mock_deployment = Mock() + self.mock_deployment.id = uuid4() + self.mock_deployment.build = Mock() + self.mock_deployment.build.id = uuid4() + self.mock_deployment.pipeline_configuration = Mock() + self.mock_deployment.pipeline_configuration.name = "test_pipeline" + + # Pipeline-level settings + self.pipeline_settings = ModalOrchestratorSettings( + gpu="A100", timeout=3600, cloud="aws", region="us-east-1" + ) + + def teardown_method(self): + """Clean up after each test method.""" + self.modal_patcher.stop() + self.utils_patcher.stop() + + def _create_executor(self, **kwargs): + """Helper to create ModalSandboxExecutor with mocked dependencies.""" + from zenml.integrations.modal.orchestrators.modal_sandbox_executor import ( + ModalSandboxExecutor, + ) + + return ModalSandboxExecutor( + deployment=self.mock_deployment, + stack=Mock(), + environment={}, + settings=self.pipeline_settings, + **kwargs, + ) + + def test_step_resources_override_pipeline_resources(self): + """Test that step-level resources override pipeline-level resources.""" + step_name = "test_step" + + # Step has specific resource requirements + step_resources = ResourceSettings( + cpu_count=8, memory="16GB", gpu_count=2 + ) + mock_step_config = Mock() + mock_step_config.resource_settings = step_resources + mock_step_config.settings = {} + + self.mock_deployment.step_configurations = { + step_name: Mock(config=mock_step_config) + } + + # Pipeline has different defaults + pipeline_resources = ResourceSettings( + cpu_count=4, memory="8GB", gpu_count=1 + ) + self.mock_get_resource_settings.return_value = pipeline_resources + + executor = self._create_executor() + + # Step should get its specific resources, not pipeline defaults + step_result = executor._get_resource_settings(step_name) + pipeline_result = executor._get_resource_settings(None) + + assert step_result == step_resources + assert pipeline_result == pipeline_resources + assert step_result.cpu_count == 8 # Step override + assert pipeline_result.cpu_count == 4 # Pipeline default + + def test_step_modal_settings_override_pipeline_settings(self): + """Test that step-level Modal settings override pipeline settings.""" + step_name = "test_step" + + # Step overrides GPU and region + step_modal_settings = Mock() + step_modal_settings.model_dump.return_value = { + "gpu": "V100", + "region": "us-west-2", + } + + mock_step_config = Mock() + mock_step_config.settings = {"orchestrator.modal": step_modal_settings} + + self.mock_deployment.step_configurations = { + step_name: Mock(config=mock_step_config) + } + + executor = self._create_executor() + result = executor._get_settings(step_name) + + # Step overrides should take precedence + assert result.gpu == "V100" # Step override + assert result.region == "us-west-2" # Step override + assert result.cloud == "aws" # Pipeline default (not overridden) + assert result.timeout == 3600 # Pipeline default (not overridden) + + def test_partial_step_overrides_preserve_pipeline_defaults(self): + """Test that partial step overrides preserve non-overridden pipeline settings.""" + step_name = "test_step" + + # Step only overrides GPU + step_modal_settings = Mock() + step_modal_settings.model_dump.return_value = {"gpu": "T4"} + + mock_step_config = Mock() + mock_step_config.settings = {"orchestrator.modal": step_modal_settings} + + self.mock_deployment.step_configurations = { + step_name: Mock(config=mock_step_config) + } + + executor = self._create_executor() + result = executor._get_settings(step_name) + + assert result.gpu == "T4" # Step override + assert result.cloud == "aws" # Pipeline default preserved + assert result.region == "us-east-1" # Pipeline default preserved + assert result.timeout == 3600 # Pipeline default preserved + + @patch( + "zenml.integrations.modal.orchestrators.modal_sandbox_executor.get_gpu_values" + ) + @patch( + "zenml.integrations.modal.orchestrators.modal_sandbox_executor.get_resource_values" + ) + def test_complete_resource_merging_integration( + self, mock_get_resource_values, mock_get_gpu_values + ): + """Integration test for complete resource merging between pipeline and step.""" + step_name = "integration_step" + + # Step has specific resources and Modal settings + step_resources = ResourceSettings( + cpu_count=16, memory="32GB", gpu_count=4 + ) + step_modal_settings = Mock() + step_modal_settings.model_dump.return_value = { + "gpu": "A100", + "cloud": "gcp", + "region": "us-central1", + } + + mock_step_config = Mock() + mock_step_config.resource_settings = step_resources + mock_step_config.settings = {"orchestrator.modal": step_modal_settings} + + self.mock_deployment.step_configurations = { + step_name: Mock(config=mock_step_config) + } + + # Mock utility function returns + mock_get_gpu_values.return_value = "A100:4" + mock_get_resource_values.return_value = (16, 32000) + + executor = self._create_executor() + + # Test resource configuration + gpu_values, cpu_count, memory_mb = executor._get_resource_config( + step_name + ) + + # Test step settings + step_settings = executor._get_settings(step_name) + + # Assert step resources are used + assert gpu_values == "A100:4" + assert cpu_count == 16 + assert memory_mb == 32000 + + # Assert step Modal settings override pipeline where specified + assert step_settings.gpu == "A100" # Step override + assert step_settings.cloud == "gcp" # Step override + assert step_settings.region == "us-central1" # Step override + assert ( + step_settings.timeout == 3600 + ) # Pipeline default (not overridden) + + # Verify utility functions called with step resources + mock_get_gpu_values.assert_called_once_with("A100", step_resources) + mock_get_resource_values.assert_called_once_with(step_resources) + + def test_fallback_to_pipeline_when_no_step_config(self): + """Test fallback to pipeline settings when step has no specific configuration.""" + step_name = "minimal_step" + + # Step has no resource settings or Modal settings + mock_step_config = Mock() + mock_step_config.resource_settings = None + mock_step_config.settings = {} + + self.mock_deployment.step_configurations = { + step_name: Mock(config=mock_step_config) + } + + executor = self._create_executor() + + # Should get empty ResourceSettings (fallback) + step_resources = executor._get_resource_settings(step_name) + assert isinstance(step_resources, ResourceSettings) + assert step_resources.cpu_count is None + + # Should get pipeline Modal settings (fallback) + step_settings = executor._get_settings(step_name) + assert step_settings.gpu == "A100" # Pipeline setting + assert step_settings.cloud == "aws" # Pipeline setting + assert step_settings.region == "us-east-1" # Pipeline setting diff --git a/tests/integration/integrations/modal/step_operators/test_modal_step_operator.py b/tests/integration/integrations/modal/step_operators/test_modal_step_operator.py index 865f6d55265..a0325994f68 100644 --- a/tests/integration/integrations/modal/step_operators/test_modal_step_operator.py +++ b/tests/integration/integrations/modal/step_operators/test_modal_step_operator.py @@ -12,6 +12,12 @@ # or implied. See the License for the specific language governing # permissions and limitations under the License. +"""Integration tests for the Modal step operator utilities. + +This module verifies helper functions inside the Modal step operator flavor, +specifically the ``get_gpu_values`` helper which converts GPU type/count pairs +into the string format expected by the Modal SDK. +""" import pytest @@ -30,17 +36,18 @@ ("", 1, None), (None, 1, None), ("A100", None, "A100"), - ("A100", 0, "A100"), + ("A100", 0, None), ("A100", 1, "A100:1"), ("A100", 2, "A100:2"), ("V100", None, "V100"), - ("V100", 0, "V100"), + ("V100", 0, None), ("V100", 1, "V100:1"), ("V100", 2, "V100:2"), ], ) def test_get_gpu_values(gpu, gpu_count, expected_result): + """Test the get_gpu_values function.""" settings = ModalStepOperatorSettings(gpu=gpu) resource_settings = ResourceSettings(gpu_count=gpu_count) - result = get_gpu_values(settings, resource_settings) + result = get_gpu_values(settings.gpu, resource_settings) assert result == expected_result