Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions docs/book/component-guide/orchestrators/kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ Some configuration options for the Kubernetes orchestrator can only be set throu
- **`skip_local_validations`** (default: False): If `True`, skips the local validations that would otherwise be performed when `local` is set.
- **`parallel_step_startup_waiting_period`**: How long (in seconds) to wait between starting parallel steps, useful for distributing server load in highly parallel pipelines.
- **`pass_zenml_token_as_secret`** (default: False): By default, the Kubernetes orchestrator will pass a short-lived API token to authenticate to the ZenML server as an environment variable as part of the Pod manifest. If you want this token to be stored in a Kubernetes secret instead, set `pass_zenml_token_as_secret=True` when registering your orchestrator. If you do so, make sure the service connector that you configure for your has permissions to create Kubernetes secrets. Additionally, the service account used for the Pods running your pipeline must have permissions to delete secrets, otherwise the cleanup will fail and you'll be left with orphaned secrets.
- **`auto_generate_image_pull_secrets`** (default: True): If `True`, automatically generates imagePullSecrets from container registry credentials in the stack. If `False`, relies on manually configured imagePullSecrets. When enabled, the service account used for running pipelines must have permissions to create and list secrets in the target namespace.

The following configuration options can be set either through the orchestrator config or overridden using `KubernetesOrchestratorSettings` (at the pipeline or step level):

Expand Down Expand Up @@ -464,6 +465,38 @@ kubectl logs job/<job-name> -n zenml

Common issues include incorrect cron expressions, insufficient permissions for the service account, or resource constraints.

#### Required Kubernetes RBAC Permissions

When using the automatic imagePullSecret generation feature (`auto_generate_image_pull_secrets=True`), the service account used by ZenML must have the following additional permissions in the target namespace:

```yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: zenml # or your configured namespace
name: zenml-image-pull-secret-manager
rules:
- apiGroups: [""]
resources: ["secrets"]
verbs: ["create", "get", "list", "update", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: zenml-image-pull-secret-binding
namespace: zenml # or your configured namespace
subjects:
- kind: ServiceAccount
name: zenml-service-account # or your configured service account
namespace: zenml
roleRef:
kind: Role
name: zenml-image-pull-secret-manager
apiGroup: rbac.authorization.k8s.io
```

If you're using a custom service account (via `service_account_name`), ensure it has these permissions. The default `edit` role granted to `zenml-service-account` includes these permissions.

For a tutorial on how to work with schedules in ZenML, check out our ['Managing
Scheduled
Pipelines'](https://docs.zenml.io/user-guides/tutorial/managing-scheduled-pipelines)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ The 'dockerhub' Docker Service Connector connector was used to successfully conf
The Docker Service Connector can be used by all Container Registry stack component flavors to authenticate to a remote Docker/OCI container registry. This allows container images to be built and published to private container registries without the need to configure explicit Docker credentials in the target environment or the Stack Component.

{% hint style="warning" %}
ZenML does not yet support automatically configuring Docker credentials in container runtimes such as Kubernetes clusters (i.e. via imagePullSecrets) to allow container images to be pulled from the private container registries. This will be added in a future release.
ZenML does not yet support automatically configuring Docker credentials in some container runtimes to allow container images to be pulled from the private container registries. This will be added in a future release. In the meantime please make sure your orchestration environment is configured appropriately to pull from the docker registry.
{% endhint %}

<!-- For scarf -->
<figure><img alt="ZenML Scarf" referrerpolicy="no-referrer-when-downgrade" src="https://static.scarf.sh/a.png?x-pxid=f0b4f458-0a54-4fcd-aa95-d5ee424815bc" /></figure>
1 change: 1 addition & 0 deletions docs/book/component-guide/step-operators/kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ The following configuration options can be set either through the step operator
- **`pod_failure_max_retries`** (default: 3): The maximum number of times to retry a step pod if it fails to start.
- **`pod_failure_retry_delay`** (default: 10): The delay (in seconds) between pod failure retries and pod startup retries.
- **`pod_failure_backoff`** (default: 1.0): The backoff factor for pod failure retries and pod startup retries.
- **`auto_generate_image_pull_secrets`** (default: True): If `True`, automatically generates imagePullSecrets from container registry credentials in the stack. If `False`, relies on manually configured imagePullSecrets. When enabled, the service account used for running steps must have permissions to create and list secrets in the target namespace.

```python
from zenml.integrations.kubernetes.flavors import KubernetesStepOperatorSettings
Expand Down
129 changes: 123 additions & 6 deletions src/zenml/container_registries/base_container_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,13 @@ def requires_authentication(self) -> bool:
def credentials(self) -> Optional[Tuple[str, str]]:
"""Username and password to authenticate with this container registry.

Service connector credentials take precedence over direct authentication secrets.

Returns:
Tuple with username and password if this container registry
requires authentication, `None` otherwise.
"""
secret = self.get_typed_authentication_secret(
expected_schema_type=BasicAuthSecretSchema
)
if secret:
return secret.username, secret.password

# Check service connector credentials first as they take precedence
connector = self.get_connector()
if connector:
from zenml.service_connectors.docker_service_connector import (
Expand All @@ -116,6 +113,13 @@ def credentials(self) -> Optional[Tuple[str, str]]:
connector.config.password.get_secret_value(),
)

# Fall back to direct authentication secrets
secret = self.get_typed_authentication_secret(
expected_schema_type=BasicAuthSecretSchema
)
if secret:
return secret.username, secret.password

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stefannica according to your comment the connector should be checked first, but in the credentials implementation it was the other way around - let me know if I am missing something or if it made sense to flip here

return None

@property
Expand Down Expand Up @@ -232,6 +236,119 @@ def get_image_repo_digest(self, image_name: str) -> Optional[str]:

return cast(str, metadata.id.split(":")[-1])

def get_kubernetes_image_pull_secret_data(
self,
) -> Optional[Tuple[str, str, str]]:
"""Get container registry credentials for Kubernetes imagePullSecrets.

This method only returns credentials when running with a Kubernetes-based
orchestrator (kubernetes, kubeflow, etc.). For local orchestrators, it
returns None since imagePullSecrets are not needed.

Returns:
Tuple of (registry_uri, username, password) if credentials are available
and running with a Kubernetes orchestrator, None otherwise. The
registry_uri is normalized for use in Kubernetes imagePullSecrets.
"""
from zenml.client import Client
from zenml.logger import get_logger

logger = get_logger(__name__)

# Check if we're using a Kubernetes-based orchestrator
try:
stack = Client().active_stack
orchestrator_flavor = stack.orchestrator.flavor

# List of orchestrator flavors that use Kubernetes and need imagePullSecrets
kubernetes_orchestrators = {
"kubernetes",
"kubeflow",
"vertex",
"sagemaker",
"tekton",
"airflow", # when running on Kubernetes
}

if orchestrator_flavor not in kubernetes_orchestrators:
logger.debug(
f"Skipping ImagePullSecret generation for non-Kubernetes orchestrator: {orchestrator_flavor}"
)
return None

# Additional check for Kubernetes orchestrator with local flag
if orchestrator_flavor == "kubernetes" and hasattr(
stack.orchestrator.config, "local"
):
if stack.orchestrator.config.local:
logger.debug(
"Skipping ImagePullSecret generation for local Kubernetes orchestrator"
)
return None

except Exception as e:
logger.debug(
f"Could not determine orchestrator type: {e}. Proceeding with ImagePullSecret generation."
)

logger.debug(
f"Getting ImagePullSecret data for registry: {self.config.uri}"
)

credentials = self.credentials
if not credentials:
logger.debug("No credentials found for container registry")
return None

username, password = credentials
registry_uri = self.config.uri

logger.debug(
f"Found credentials - username: {username[:3]}***, registry_uri: {registry_uri}"
)

# Check if there's a service connector with a different registry setting
connector = self.get_connector()
if connector:
from zenml.service_connectors.docker_service_connector import (
DockerServiceConnector,
)

if (
isinstance(connector, DockerServiceConnector)
and connector.config.registry
):
# Use the service connector's registry setting
original_registry_uri = registry_uri
registry_uri = connector.config.registry
logger.debug(
f"Service connector override: {original_registry_uri} -> {registry_uri}"
)

# Normalize registry URI for consistency
original_registry_uri = registry_uri
if registry_uri.startswith("https://"):
registry_uri = registry_uri[8:]
elif registry_uri.startswith("http://"):
registry_uri = registry_uri[7:]

if original_registry_uri != registry_uri:
logger.debug(
f"Normalized registry URI: {original_registry_uri} -> {registry_uri}"
)

# Validate the final result
if not registry_uri or not username or not password:
logger.warning(
f"Invalid ImagePullSecret data: registry_uri='{registry_uri}', username='{username}', password_length={len(password) if password else 0}"
)
return None

logger.debug(
f"Returning ImagePullSecret data: registry='{registry_uri}', username='{username[:3]}***'"
)
return registry_uri, username, password


class BaseContainerRegistryFlavor(Flavor):
"""Base flavor for container registries."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ class KubernetesOrchestratorSettings(BaseSettings):
scheduling a pipeline.
prevent_orchestrator_pod_caching: If `True`, the orchestrator pod will
not try to compute cached steps before starting the step pods.
auto_generate_image_pull_secrets: If `True`, automatically generates
imagePullSecrets from container registry credentials in the stack.
If `False`, relies on manually configured imagePullSecrets.
"""

synchronous: bool = True
Expand All @@ -88,6 +91,7 @@ class KubernetesOrchestratorSettings(BaseSettings):
failed_jobs_history_limit: Optional[NonNegativeInt] = None
ttl_seconds_after_finished: Optional[NonNegativeInt] = None
prevent_orchestrator_pod_caching: bool = False
auto_generate_image_pull_secrets: bool = True


class KubernetesOrchestratorConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ class KubernetesStepOperatorSettings(BaseSettings):
failure retries and pod startup retries (in seconds)
pod_failure_backoff: The backoff factor for pod failure retries and
pod startup retries.
auto_generate_image_pull_secrets: If `True`, automatically generates
imagePullSecrets from container registry credentials in the stack.
If `False`, relies on manually configured imagePullSecrets.
"""

pod_settings: Optional[KubernetesPodSettings] = None
Expand All @@ -52,6 +55,7 @@ class KubernetesStepOperatorSettings(BaseSettings):
pod_failure_max_retries: int = 3
pod_failure_retry_delay: int = 10
pod_failure_backoff: float = 1.0
auto_generate_image_pull_secrets: bool = True


class KubernetesStepOperatorConfig(
Expand Down
33 changes: 33 additions & 0 deletions src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,39 @@ def create_or_update_secret(
update_secret(core_api, namespace, secret_name, data)


def create_or_update_secret_from_manifest(
core_api: k8s_client.CoreV1Api,
secret_manifest: Dict[str, Any],
) -> None:
"""Create or update a Kubernetes secret from a complete manifest.

Args:
core_api: Client of Core V1 API of Kubernetes API.
secret_manifest: Complete Kubernetes secret manifest dict.

Raises:
ApiException: If the secret creation failed for any reason other than
the secret already existing.
"""
namespace = secret_manifest["metadata"]["namespace"]
secret_name = secret_manifest["metadata"]["name"]

try:
core_api.create_namespaced_secret(
namespace=namespace,
body=secret_manifest,
)
except ApiException as e:
if e.status == 409: # Already exists, update it
core_api.patch_namespaced_secret(
name=secret_name,
namespace=namespace,
body=secret_manifest,
)
else:
raise


def delete_secret(
core_api: k8s_client.CoreV1Api,
namespace: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
from zenml.integrations.kubernetes.orchestrators.manifest_utils import (
build_cron_job_manifest,
build_pod_manifest,
create_image_pull_secrets_from_manifests,
)
from zenml.integrations.kubernetes.pod_settings import KubernetesPodSettings
from zenml.logger import get_logger
Expand Down Expand Up @@ -517,7 +518,7 @@ def submit_pipeline(
"schedule. Use `Schedule(cron_expression=...)` instead."
)
cron_expression = deployment.schedule.cron_expression
cron_job_manifest = build_cron_job_manifest(
cron_job_manifest, secret_manifests = build_cron_job_manifest(
cron_expression=cron_expression,
run_name=orchestrator_run_name,
pod_name=pod_name,
Expand All @@ -533,6 +534,18 @@ def submit_pipeline(
successful_jobs_history_limit=settings.successful_jobs_history_limit,
failed_jobs_history_limit=settings.failed_jobs_history_limit,
ttl_seconds_after_finished=settings.ttl_seconds_after_finished,
namespace=self.config.kubernetes_namespace,
container_registry=stack.container_registry,
auto_generate_image_pull_secrets=settings.auto_generate_image_pull_secrets,
core_api=self._k8s_core_api,
)

# Create imagePullSecrets first
create_image_pull_secrets_from_manifests(
secret_manifests=secret_manifests,
core_api=self._k8s_core_api,
namespace=self.config.kubernetes_namespace,
reuse_existing=False, # Orchestrator creates/updates all secrets
)

self._k8s_batch_api.create_namespaced_cron_job(
Expand All @@ -546,7 +559,7 @@ def submit_pipeline(
return None
else:
# Create and run the orchestrator pod.
pod_manifest = build_pod_manifest(
pod_manifest, secret_manifests = build_pod_manifest(
run_name=orchestrator_run_name,
pod_name=pod_name,
pipeline_name=pipeline_name,
Expand All @@ -558,6 +571,18 @@ def submit_pipeline(
service_account_name=service_account_name,
env=environment,
mount_local_stores=self.config.is_local,
namespace=self.config.kubernetes_namespace,
container_registry=stack.container_registry,
auto_generate_image_pull_secrets=settings.auto_generate_image_pull_secrets,
core_api=self._k8s_core_api,
)

# Create imagePullSecrets first
create_image_pull_secrets_from_manifests(
secret_manifests=secret_manifests,
core_api=self._k8s_core_api,
namespace=self.config.kubernetes_namespace,
reuse_existing=False, # Orchestrator creates/updates all secrets
)

kube_utils.create_and_wait_for_pod_to_start(
Expand Down
Loading