Skip to content
297 changes: 218 additions & 79 deletions airbyte/_util/api_util.py

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion airbyte/cloud/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@

from typing import TYPE_CHECKING

from airbyte.cloud.client_config import CloudClientConfig
from airbyte.cloud.connections import CloudConnection
from airbyte.cloud.constants import JobStatusEnum
from airbyte.cloud.sync_results import SyncResult
Expand All @@ -62,18 +63,20 @@
# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757
if TYPE_CHECKING:
# ruff: noqa: TC004
from airbyte.cloud import connections, constants, sync_results, workspaces
from airbyte.cloud import client_config, connections, constants, sync_results, workspaces


__all__ = [
# Submodules
"workspaces",
"connections",
"constants",
"client_config",
"sync_results",
# Classes
"CloudWorkspace",
"CloudConnection",
"CloudClientConfig",
"SyncResult",
# Enums
"JobStatusEnum",
Expand Down
25 changes: 25 additions & 0 deletions airbyte/cloud/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,31 @@
from airbyte.secrets.util import get_secret, try_get_secret


def resolve_cloud_bearer_token(
input_value: str | SecretString | None = None,
/,
) -> SecretString | None:
"""Get the Airbyte Cloud bearer token from the environment.

Unlike other resolve functions, this returns None if no bearer token is found,
since bearer token authentication is optional (client credentials can be used instead).

Args:
input_value: Optional explicit bearer token value. If provided, it will be
returned directly (wrapped in SecretString if needed).

Returns:
The bearer token as a SecretString, or None if not found.
"""
if input_value is not None:
return SecretString(input_value)

result = try_get_secret(constants.CLOUD_BEARER_TOKEN_ENV_VAR, default=None)
if result:
return SecretString(result)
return None


def resolve_cloud_client_secret(
input_value: str | SecretString | None = None,
/,
Expand Down
189 changes: 189 additions & 0 deletions airbyte/cloud/client_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Cloud client configuration for Airbyte Cloud API authentication.

This module provides the CloudClientConfig class for managing authentication
credentials and API configuration when connecting to Airbyte Cloud, OSS, or
Enterprise instances.

Two authentication methods are supported (mutually exclusive):
1. OAuth2 client credentials (client_id + client_secret)
2. Bearer token authentication

Example usage with client credentials:
```python
from airbyte.cloud.client_config import CloudClientConfig

config = CloudClientConfig(
client_id="your-client-id",
client_secret="your-client-secret",
)
```

Example usage with bearer token:
```python
from airbyte.cloud.client_config import CloudClientConfig

config = CloudClientConfig(
bearer_token="your-bearer-token",
)
```

Example using environment variables:
```python
from airbyte.cloud.client_config import CloudClientConfig

# Resolves from AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET,
# AIRBYTE_CLOUD_BEARER_TOKEN, and AIRBYTE_CLOUD_API_URL environment variables
config = CloudClientConfig.from_env()
```
"""

from __future__ import annotations

from dataclasses import dataclass

from airbyte._util import api_util
from airbyte.cloud.auth import (
resolve_cloud_api_url,
resolve_cloud_bearer_token,
resolve_cloud_client_id,
resolve_cloud_client_secret,
)
from airbyte.exceptions import PyAirbyteInputError
from airbyte.secrets.base import SecretString


@dataclass
class CloudClientConfig:
"""Client configuration for Airbyte Cloud API.

This class encapsulates the authentication and API configuration needed to connect
to Airbyte Cloud, OSS, or Enterprise instances. It supports two mutually
exclusive authentication methods:

1. OAuth2 client credentials flow (client_id + client_secret)
2. Bearer token authentication

Exactly one authentication method must be provided. Providing both or neither
will raise a validation error.

Attributes:
client_id: OAuth2 client ID for client credentials flow.
client_secret: OAuth2 client secret for client credentials flow.
bearer_token: Pre-generated bearer token for direct authentication.
api_root: The API root URL. Defaults to Airbyte Cloud API.
"""

client_id: SecretString | None = None
"""OAuth2 client ID for client credentials authentication."""

client_secret: SecretString | None = None
"""OAuth2 client secret for client credentials authentication."""

bearer_token: SecretString | None = None
"""Bearer token for direct authentication (alternative to client credentials)."""

api_root: str = api_util.CLOUD_API_ROOT
"""The API root URL. Defaults to Airbyte Cloud API."""

def __post_init__(self) -> None:
"""Validate credentials and ensure secrets are properly wrapped."""
# Wrap secrets in SecretString if they aren't already
if self.client_id is not None:
self.client_id = SecretString(self.client_id)
if self.client_secret is not None:
self.client_secret = SecretString(self.client_secret)
if self.bearer_token is not None:
self.bearer_token = SecretString(self.bearer_token)

# Validate mutual exclusivity
has_client_credentials = self.client_id is not None or self.client_secret is not None
has_bearer_token = self.bearer_token is not None

if has_client_credentials and has_bearer_token:
raise PyAirbyteInputError(
message="Cannot use both client credentials and bearer token authentication.",
guidance=(
"Provide either client_id and client_secret together, "
"or bearer_token alone, but not both."
),
)

if has_client_credentials and (self.client_id is None or self.client_secret is None):
# If using client credentials, both must be provided
raise PyAirbyteInputError(
message="Incomplete client credentials.",
guidance=(
"When using client credentials authentication, "
"both client_id and client_secret must be provided."
),
)

if not has_client_credentials and not has_bearer_token:
raise PyAirbyteInputError(
message="No authentication credentials provided.",
guidance=(
"Provide either client_id and client_secret together for OAuth2 "
"client credentials flow, or bearer_token for direct authentication."
),
)

@property
def uses_bearer_token(self) -> bool:
"""Return True if using bearer token authentication."""
return self.bearer_token is not None

@property
def uses_client_credentials(self) -> bool:
"""Return True if using client credentials authentication."""
return self.client_id is not None and self.client_secret is not None

@classmethod
def from_env(
cls,
*,
api_root: str | None = None,
) -> CloudClientConfig:
"""Create CloudClientConfig from environment variables.

This factory method resolves credentials from environment variables,
providing a convenient way to create credentials without explicitly
passing secrets.

Environment variables used:
- `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow).
- `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow).
- `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials).
- `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud).

The method will first check for a bearer token. If not found, it will
attempt to use client credentials.

Args:
api_root: The API root URL. If not provided, will be resolved from
the `AIRBYTE_CLOUD_API_URL` environment variable, or default to
the Airbyte Cloud API.

Returns:
A CloudClientConfig instance configured with credentials from the environment.

Raises:
PyAirbyteSecretNotFoundError: If required credentials are not found in
the environment.
"""
resolved_api_root = resolve_cloud_api_url(api_root)

# Try bearer token first
bearer_token = resolve_cloud_bearer_token()
if bearer_token:
return cls(
bearer_token=bearer_token,
api_root=resolved_api_root,
)

# Fall back to client credentials
return cls(
client_id=resolve_cloud_client_id(),
client_secret=resolve_cloud_client_secret(),
api_root=resolved_api_root,
)
8 changes: 8 additions & 0 deletions airbyte/cloud/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def _fetch_connection_info(self) -> ConnectionResponse:
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)

@classmethod
Expand Down Expand Up @@ -180,6 +181,7 @@ def run_sync(
workspace_id=self.workspace.workspace_id,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)
sync_result = SyncResult(
workspace=self.workspace,
Expand Down Expand Up @@ -242,6 +244,7 @@ def get_previous_sync_logs(
order_by=order_by,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)
return [
SyncResult(
Expand Down Expand Up @@ -298,6 +301,7 @@ def get_state_artifacts(self) -> list[dict[str, Any]] | None:
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)
if state_response.get("stateType") == "not_set":
return None
Expand All @@ -319,6 +323,7 @@ def get_catalog_artifact(self) -> dict[str, Any] | None:
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)
return connection_response.get("syncCatalog")

Expand All @@ -336,6 +341,7 @@ def rename(self, name: str) -> CloudConnection:
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
name=name,
)
self._connection_info = updated_response
Expand All @@ -355,6 +361,7 @@ def set_table_prefix(self, prefix: str) -> CloudConnection:
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
prefix=prefix,
)
self._connection_info = updated_response
Expand All @@ -379,6 +386,7 @@ def set_selected_streams(self, stream_names: list[str]) -> CloudConnection:
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
configurations=configurations,
)
self._connection_info = updated_response
Expand Down
Loading