Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 96 additions & 24 deletions airbyte/_util/api_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import requests
from airbyte_api import api, models

from airbyte.constants import CLOUD_API_ROOT, CLOUD_CONFIG_API_ROOT
from airbyte.constants import CLOUD_API_ROOT, CLOUD_CONFIG_API_ROOT, CLOUD_CONFIG_API_ROOT_ENV_VAR
from airbyte.exceptions import (
AirbyteConnectionSyncError,
AirbyteError,
Expand All @@ -30,6 +30,7 @@
PyAirbyteInputError,
)
from airbyte.secrets.base import SecretString
from airbyte.secrets.util import try_get_secret


if TYPE_CHECKING:
Expand Down Expand Up @@ -58,11 +59,36 @@ def status_ok(status_code: int) -> bool:


def get_config_api_root(api_root: str) -> str:
"""Get the configuration API root from the main API root."""
"""Get the configuration API root from the main API root.

Resolution order:
1. If `AIRBYTE_CLOUD_CONFIG_API_URL` environment variable is set, use that value.
2. If `api_root` matches the default Cloud API root, return the default Config API root.
3. Otherwise, raise NotImplementedError (cannot derive Config API from custom API root).

Args:
api_root: The main API root URL being used.

Returns:
The configuration API root URL.

Raises:
NotImplementedError: If the Config API root cannot be determined.
"""
# First, check if the Config API URL is explicitly set via environment variable
config_api_override = try_get_secret(CLOUD_CONFIG_API_ROOT_ENV_VAR, default=None)
if config_api_override:
return str(config_api_override)

# Fall back to deriving from the main API root
if api_root == CLOUD_API_ROOT:
return CLOUD_CONFIG_API_ROOT

raise NotImplementedError("Configuration API root not implemented for this API root.")
raise NotImplementedError(
f"Configuration API root not implemented for api_root='{api_root}'. "
f"Set the '{CLOUD_CONFIG_API_ROOT_ENV_VAR}' environment variable "
"to specify the Config API URL."
)


def get_web_url_root(api_root: str) -> str:
Expand Down Expand Up @@ -171,7 +197,8 @@ def get_workspace(
resource_type="workspace",
context={
"workspace_id": workspace_id,
"response": response,
"request_url": response.raw_response.url,
"status_code": response.status_code,
},
)

Expand Down Expand Up @@ -216,12 +243,13 @@ def list_connections(
has_more = bool(response.connections_response and response.connections_response.next)
offset += page_size

if not status_ok(response.status_code) and response.connections_response:
if not status_ok(response.status_code):
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
"request_url": response.raw_response.url,
"status_code": response.status_code,
},
)
assert response.connections_response is not None
result += [
Expand Down Expand Up @@ -265,12 +293,13 @@ def list_workspaces(
has_more = bool(response.workspaces_response and response.workspaces_response.next)
offset += page_size

if not status_ok(response.status_code) and response.workspaces_response:
if not status_ok(response.status_code):
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
"request_url": response.raw_response.url,
"status_code": response.status_code,
},
)

assert response.workspaces_response is not None
Expand Down Expand Up @@ -320,12 +349,13 @@ def list_sources(
has_more = bool(response.sources_response and response.sources_response.next)
offset += page_size

if not status_ok(response.status_code) and response.sources_response:
if not status_ok(response.status_code):
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
"request_url": response.raw_response.url,
"status_code": response.status_code,
},
)
assert response.sources_response is not None
result += [source for source in response.sources_response.data if name_filter(source.name)]
Expand Down Expand Up @@ -370,12 +400,13 @@ def list_destinations(
has_more = bool(response.destinations_response and response.destinations_response.next)
offset += page_size

if not status_ok(response.status_code) and response.destinations_response:
if not status_ok(response.status_code):
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
"request_url": response.raw_response.url,
"status_code": response.status_code,
},
)
assert response.destinations_response is not None
result += [
Expand Down Expand Up @@ -419,6 +450,10 @@ def get_connection(
resource_name_or_id=connection_id,
resource_type="connection",
log_text=response.raw_response.text,
context={
"request_url": response.raw_response.url,
"status_code": response.status_code,
},
)


Expand Down Expand Up @@ -457,6 +492,8 @@ def run_connection(
connection_id=connection_id,
context={
"workspace_id": workspace_id,
"request_url": response.raw_response.url,
"status_code": response.status_code,
},
response=response,
)
Expand Down Expand Up @@ -517,6 +554,8 @@ def get_job_logs( # noqa: PLR0913 # Too many arguments - needed for auth flexi
context={
"workspace_id": workspace_id,
"connection_id": connection_id,
"request_url": response.raw_response.url,
"status_code": response.status_code,
},
)

Expand Down Expand Up @@ -548,6 +587,10 @@ def get_job_info(
resource_name_or_id=str(job_id),
resource_type="job",
log_text=response.raw_response.text,
context={
"request_url": response.raw_response.url,
"status_code": response.status_code,
},
)


Expand Down Expand Up @@ -589,6 +632,10 @@ def create_source(

raise AirbyteError(
message="Could not create source.",
context={
"request_url": response.raw_response.url,
"status_code": response.status_code,
},
response=response,
)

Expand Down Expand Up @@ -620,6 +667,10 @@ def get_source(
resource_name_or_id=source_id,
resource_type="source",
log_text=response.raw_response.text,
context={
"request_url": response.raw_response.url,
"status_code": response.status_code,
},
)


Expand Down Expand Up @@ -695,7 +746,8 @@ def delete_source(
raise AirbyteError(
context={
"source_id": source_id,
"response": response,
"request_url": response.raw_response.url,
"status_code": response.status_code,
},
)

Expand Down Expand Up @@ -749,6 +801,8 @@ def patch_source(
message="Could not update source.",
context={
"source_id": source_id,
"request_url": response.raw_response.url,
"status_code": response.status_code,
},
response=response,
)
Expand Down Expand Up @@ -814,6 +868,10 @@ def create_destination(

raise AirbyteError(
message="Could not create destination.",
context={
"request_url": response.raw_response.url,
"status_code": response.status_code,
},
response=response,
)

Expand Down Expand Up @@ -863,6 +921,10 @@ def get_destination(
resource_name_or_id=destination_id,
resource_type="destination",
log_text=response.raw_response.text,
context={
"request_url": response.raw_response.url,
"status_code": response.status_code,
},
)


Expand Down Expand Up @@ -939,7 +1001,8 @@ def delete_destination(
raise AirbyteError(
context={
"destination_id": destination_id,
"response": response,
"request_url": response.raw_response.url,
"status_code": response.status_code,
},
)

Expand Down Expand Up @@ -993,6 +1056,8 @@ def patch_destination(
message="Could not update destination.",
context={
"destination_id": destination_id,
"request_url": response.raw_response.url,
"status_code": response.status_code,
},
response=response,
)
Expand Down Expand Up @@ -1056,7 +1121,8 @@ def create_connection( # noqa: PLR0913 # Too many arguments
context={
"source_id": source_id,
"destination_id": destination_id,
"response": response,
"request_url": response.raw_response.url,
"status_code": response.status_code,
},
)

Expand Down Expand Up @@ -1188,7 +1254,8 @@ def delete_connection(
raise AirbyteError(
context={
"connection_id": connection_id,
"response": response,
"request_url": response.raw_response.url,
"status_code": response.status_code,
},
)

Expand Down Expand Up @@ -1251,6 +1318,8 @@ def patch_connection( # noqa: PLR0913 # Too many arguments
message="Could not update connection.",
context={
"connection_id": connection_id,
"request_url": response.raw_response.url,
"status_code": response.status_code,
},
response=response,
)
Expand Down Expand Up @@ -1497,7 +1566,8 @@ def list_custom_yaml_source_definitions(
message="Failed to list custom YAML source definitions",
context={
"workspace_id": workspace_id,
"response": response,
"request_url": response.raw_response.url,
"status_code": response.status_code,
},
)
return response.declarative_source_definitions_response.data
Expand Down Expand Up @@ -1536,7 +1606,8 @@ def get_custom_yaml_source_definition(
context={
"workspace_id": workspace_id,
"definition_id": definition_id,
"response": response,
"request_url": response.raw_response.url,
"status_code": response.status_code,
},
)
return response.declarative_source_definition_response
Expand Down Expand Up @@ -1580,7 +1651,8 @@ def update_custom_yaml_source_definition(
context={
"workspace_id": workspace_id,
"definition_id": definition_id,
"response": response,
"request_url": response.raw_response.url,
"status_code": response.status_code,
},
)
return response.declarative_source_definition_response
Expand Down Expand Up @@ -1792,8 +1864,8 @@ def list_organizations_for_user(
raise AirbyteError(
message="Failed to list organizations for user.",
context={
"request_url": response.raw_response.url,
"status_code": response.status_code,
"response": response,
},
)

Expand Down
18 changes: 18 additions & 0 deletions airbyte/cloud/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,21 @@ def resolve_cloud_workspace_id(
) -> str:
"""Get the Airbyte Cloud workspace ID from the environment, or return None if not set."""
return str(get_secret(constants.CLOUD_WORKSPACE_ID_ENV_VAR, default=input_value))


def resolve_cloud_config_api_url(
input_value: str | None = None,
/,
) -> str | None:
"""Get the Airbyte Cloud Config API URL from the environment, or return None if not set.

The Config API is a separate internal API used for certain operations like
connector builder projects and custom source definitions.

Returns:
The Config API URL if set via environment variable or input, None otherwise.
"""
result = try_get_secret(constants.CLOUD_CONFIG_API_ROOT_ENV_VAR, default=input_value)
if result:
return str(result)
return None
10 changes: 10 additions & 0 deletions airbyte/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,16 @@ def _str_to_bool(value: str) -> bool:
CLOUD_API_ROOT_ENV_VAR: str = "AIRBYTE_CLOUD_API_URL"
"""The environment variable name for the Airbyte Cloud API URL."""

CLOUD_CONFIG_API_ROOT_ENV_VAR: str = "AIRBYTE_CLOUD_CONFIG_API_URL"
"""The environment variable name for the Airbyte Cloud Config API URL.

The Config API is a separate internal API used for certain operations like
connector builder projects and custom source definitions. This environment
variable allows overriding the default Config API URL, which is useful when
the public API URL has been overridden and the Config API cannot be derived
from it automatically.
"""

CLOUD_WORKSPACE_ID_ENV_VAR: str = "AIRBYTE_CLOUD_WORKSPACE_ID"
"""The environment variable name for the Airbyte Cloud workspace ID."""

Expand Down
Loading