diff --git a/airbyte/_util/api_util.py b/airbyte/_util/api_util.py index 482c7ddf2..ebd8fe19b 100644 --- a/airbyte/_util/api_util.py +++ b/airbyte/_util/api_util.py @@ -81,10 +81,53 @@ def get_web_url_root(api_root: str) -> str: def get_airbyte_server_instance( *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> airbyte_api.AirbyteAPI: - """Get an Airbyte instance.""" + """Get an Airbyte API instance. + + Supports two authentication methods (mutually exclusive): + 1. OAuth2 client credentials (client_id + client_secret) + 2. Bearer token authentication + + Args: + api_root: The API root URL. + client_id: OAuth2 client ID (required if not using bearer_token). + client_secret: OAuth2 client secret (required if not using bearer_token). + bearer_token: Pre-generated bearer token (alternative to client credentials). + + Returns: + An authenticated AirbyteAPI instance. + + Raises: + PyAirbyteInputError: If authentication parameters are invalid. + """ + # Guard: must provide either bearer token OR both client credentials + if bearer_token is None and (client_id is None or client_secret is None): + raise PyAirbyteInputError( + message="No authentication credentials provided.", + guidance="Provide either client_id and client_secret, or bearer_token.", + ) + + # Guard: cannot provide both auth methods + if bearer_token is not None and (client_id is not None or client_secret is not None): + raise PyAirbyteInputError( + message="Cannot use both client credentials and bearer token authentication.", + guidance="Provide either client_id and client_secret, or bearer_token, but not both.", + ) + + # Option 1: Bearer token authentication + if bearer_token is not None: + return airbyte_api.AirbyteAPI( + security=models.Security( + bearer_auth=bearer_token, + ), + server_url=api_root, + ) + + # Option 2: Client credentials flow (guaranteed non-None by first guard) + return airbyte_api.AirbyteAPI( security=models.Security( client_credentials=models.SchemeClientCredentials( @@ -105,14 +148,16 @@ def get_workspace( workspace_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> models.WorkspaceResponse: """Get a workspace object.""" airbyte_instance = get_airbyte_server_instance( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) response = airbyte_instance.workspaces.get_workspace( api.GetWorkspaceRequest( @@ -138,8 +183,9 @@ def list_connections( workspace_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, name: str | None = None, name_filter: Callable[[str], bool] | None = None, ) -> list[models.ConnectionResponse]: @@ -153,6 +199,7 @@ def list_connections( airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) result: list[models.ConnectionResponse] = [] @@ -189,8 +236,9 @@ def list_workspaces( workspace_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, name: str | None = None, name_filter: Callable[[str], bool] | None = None, ) -> list[models.WorkspaceResponse]: @@ -204,6 +252,7 @@ def list_workspaces( airbyte_instance: airbyte_api.AirbyteAPI = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) result: list[models.WorkspaceResponse] = [] @@ -238,8 +287,9 @@ def list_sources( workspace_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, name: str | None = None, name_filter: Callable[[str], bool] | None = None, ) -> list[models.SourceResponse]: @@ -253,6 +303,7 @@ def list_sources( airbyte_instance: airbyte_api.AirbyteAPI = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) result: list[models.SourceResponse] = [] @@ -286,8 +337,9 @@ def list_destinations( workspace_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, name: str | None = None, name_filter: Callable[[str], bool] | None = None, ) -> list[models.DestinationResponse]: @@ -301,6 +353,7 @@ def list_destinations( airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) result: list[models.DestinationResponse] = [] @@ -342,14 +395,16 @@ def get_connection( connection_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> models.ConnectionResponse: """Get a connection.""" _ = workspace_id # Not used (yet) airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) response = airbyte_instance.connections.get_connection( @@ -372,8 +427,9 @@ def run_connection( connection_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> models.JobResponse: """Get a connection. @@ -385,6 +441,7 @@ def run_connection( airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) response = airbyte_instance.jobs.create_job( @@ -408,14 +465,15 @@ def run_connection( # Get job info (logs) -def get_job_logs( +def get_job_logs( # noqa: PLR0913 # Too many arguments - needed for auth flexibility workspace_id: str, connection_id: str, limit: int = 100, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, offset: int | None = None, order_by: str | None = None, ) -> list[models.JobResponse]: @@ -428,6 +486,7 @@ def get_job_logs( api_root: The API root URL. client_id: The client ID for authentication. client_secret: The client secret for authentication. + bearer_token: Bearer token for authentication (alternative to client credentials). offset: Number of jobs to skip from the beginning. Defaults to None (0). order_by: Field and direction to order by (e.g., "createdAt|DESC"). Defaults to None. @@ -437,6 +496,7 @@ def get_job_logs( airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) response: api.ListJobsResponse = airbyte_instance.jobs.list_jobs( @@ -465,13 +525,15 @@ def get_job_info( job_id: int, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> models.JobResponse: """Get a job.""" airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) response = airbyte_instance.jobs.get_job( @@ -499,8 +561,9 @@ def create_source( config: models.SourceConfiguration | dict[str, Any], definition_id: str | None = None, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> models.SourceResponse: """Create a source connector instance. @@ -509,6 +572,7 @@ def create_source( airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) response: api.CreateSourceResponse = airbyte_instance.sources.create_source( @@ -533,13 +597,15 @@ def get_source( source_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> models.SourceResponse: """Get a connection.""" airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) response = airbyte_instance.sources.get_source( @@ -562,8 +628,9 @@ def delete_source( *, source_name: str | None = None, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, workspace_id: str | None = None, safe_mode: bool = True, ) -> None: @@ -576,6 +643,7 @@ def delete_source( api_root: The API root URL client_id: OAuth client ID client_secret: OAuth client secret + bearer_token: Bearer token for authentication (alternative to client credentials). workspace_id: The workspace ID (not currently used) safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True. @@ -593,6 +661,7 @@ def delete_source( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) source_name = source_info.name @@ -614,6 +683,7 @@ def delete_source( airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) response = airbyte_instance.sources.delete_source( @@ -634,8 +704,9 @@ def patch_source( source_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, name: str | None = None, config: models.SourceConfiguration | dict[str, Any] | None = None, ) -> models.SourceResponse: @@ -649,6 +720,7 @@ def patch_source( api_root: The API root URL client_id: Client ID for authentication client_secret: Client secret for authentication + bearer_token: Bearer token for authentication (alternative to client credentials). name: Optional new name for the source config: Optional new configuration for the source @@ -658,6 +730,7 @@ def patch_source( airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) response = airbyte_instance.sources.patch_source( @@ -712,13 +785,15 @@ def create_destination( workspace_id: str, config: DestinationConfiguration | dict[str, Any], api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> models.DestinationResponse: """Get a connection.""" airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) definition_id_override: str | None = None @@ -747,13 +822,15 @@ def get_destination( destination_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> models.DestinationResponse: """Get a connection.""" airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) response = airbyte_instance.destinations.get_destination( @@ -794,8 +871,9 @@ def delete_destination( *, destination_name: str | None = None, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, workspace_id: str | None = None, safe_mode: bool = True, ) -> None: @@ -808,6 +886,7 @@ def delete_destination( api_root: The API root URL client_id: OAuth client ID client_secret: OAuth client secret + bearer_token: Bearer token for authentication (alternative to client credentials). workspace_id: The workspace ID (not currently used) safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True. @@ -825,6 +904,7 @@ def delete_destination( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) destination_name = destination_info.name @@ -847,6 +927,7 @@ def delete_destination( airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) response = airbyte_instance.destinations.delete_destination( @@ -867,8 +948,9 @@ def patch_destination( destination_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, name: str | None = None, config: DestinationConfiguration | dict[str, Any] | None = None, ) -> models.DestinationResponse: @@ -882,6 +964,7 @@ def patch_destination( api_root: The API root URL client_id: Client ID for authentication client_secret: Client secret for authentication + bearer_token: Bearer token for authentication (alternative to client credentials). name: Optional new name for the destination config: Optional new configuration for the destination @@ -891,6 +974,7 @@ def patch_destination( airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) response = airbyte_instance.destinations.patch_destination( @@ -943,8 +1027,9 @@ def create_connection( # noqa: PLR0913 # Too many arguments source_id: str, destination_id: str, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, workspace_id: str | None = None, prefix: str, selected_stream_names: list[str], @@ -953,6 +1038,7 @@ def create_connection( # noqa: PLR0913 # Too many arguments airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) stream_configurations_obj = build_stream_configurations(selected_stream_names) @@ -982,8 +1068,9 @@ def get_connection_by_name( connection_name: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> models.ConnectionResponse: """Get a connection.""" connections = list_connections( @@ -991,6 +1078,7 @@ def get_connection_by_name( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) found: list[models.ConnectionResponse] = [ connection for connection in connections if connection.name == connection_name @@ -1033,8 +1121,9 @@ def delete_connection( *, api_root: str, workspace_id: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, safe_mode: bool = True, ) -> None: """Delete a connection. @@ -1047,6 +1136,7 @@ def delete_connection( workspace_id: The workspace ID client_id: OAuth client ID client_secret: OAuth client secret + bearer_token: Bearer token for authentication (alternative to client credentials). safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True. @@ -1062,6 +1152,7 @@ def delete_connection( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) connection_name = connection_info.name @@ -1085,6 +1176,7 @@ def delete_connection( airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) response = airbyte_instance.connections.delete_connection( @@ -1105,8 +1197,9 @@ def patch_connection( # noqa: PLR0913 # Too many arguments connection_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, name: str | None = None, configurations: models.StreamConfigurationsInput | None = None, schedule: models.AirbyteAPIConnectionSchedule | None = None, @@ -1123,6 +1216,7 @@ def patch_connection( # noqa: PLR0913 # Too many arguments api_root: The API root URL client_id: Client ID for authentication client_secret: Client secret for authentication + bearer_token: Bearer token for authentication (alternative to client credentials). name: Optional new name for the connection configurations: Optional new stream configurations schedule: Optional new sync schedule @@ -1135,6 +1229,7 @@ def patch_connection( # noqa: PLR0913 # Too many arguments airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) response = airbyte_instance.connections.patch_connection( @@ -1197,15 +1292,24 @@ def _make_config_api_request( api_root: str, path: str, json: dict[str, Any], - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> dict[str, Any]: config_api_root = get_config_api_root(api_root) - bearer_token = get_bearer_token( - client_id=client_id, - client_secret=client_secret, - api_root=api_root, - ) + + # Use provided bearer token or generate one from client credentials + if bearer_token is None: + if client_id is None or client_secret is None: + raise PyAirbyteInputError( + message="No authentication credentials provided.", + guidance="Provide either client_id and client_secret, or bearer_token.", + ) + bearer_token = get_bearer_token( + client_id=client_id, + client_secret=client_secret, + api_root=api_root, + ) headers: dict[str, Any] = { "Content-Type": "application/json", "Authorization": f"Bearer {bearer_token}", @@ -1245,8 +1349,9 @@ def check_connector( *, actor_id: str, connector_type: Literal["source", "destination"], - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, workspace_id: str | None = None, api_root: str = CLOUD_API_ROOT, ) -> tuple[bool, str | None]: @@ -1267,6 +1372,7 @@ def check_connector( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) result, message = json_result.get("status"), json_result.get("message") @@ -1330,14 +1436,16 @@ def create_custom_yaml_source_definition( workspace_id: str, manifest: dict[str, Any], api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> models.DeclarativeSourceDefinitionResponse: """Create a custom YAML source definition.""" airbyte_instance = get_airbyte_server_instance( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) request_body = models.CreateDeclarativeSourceDefinitionRequest( @@ -1363,14 +1471,16 @@ def list_custom_yaml_source_definitions( workspace_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> list[models.DeclarativeSourceDefinitionResponse]: """List all custom YAML source definitions in a workspace.""" airbyte_instance = get_airbyte_server_instance( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) request = api.ListDeclarativeSourceDefinitionsRequest( @@ -1398,14 +1508,16 @@ def get_custom_yaml_source_definition( definition_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> models.DeclarativeSourceDefinitionResponse: """Get a specific custom YAML source definition.""" airbyte_instance = get_airbyte_server_instance( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) request = api.GetDeclarativeSourceDefinitionRequest( @@ -1436,14 +1548,16 @@ def update_custom_yaml_source_definition( *, manifest: dict[str, Any], api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> models.DeclarativeSourceDefinitionResponse: """Update a custom YAML source definition.""" airbyte_instance = get_airbyte_server_instance( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) request_body = models.UpdateDeclarativeSourceDefinitionRequest( @@ -1477,8 +1591,9 @@ def delete_custom_yaml_source_definition( definition_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, safe_mode: bool = True, ) -> None: """Delete a custom YAML source definition. @@ -1489,6 +1604,7 @@ def delete_custom_yaml_source_definition( api_root: The API root URL client_id: OAuth client ID client_secret: OAuth client secret + bearer_token: Bearer token for authentication (alternative to client credentials). safe_mode: If True, requires the connector name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True. @@ -1503,6 +1619,7 @@ def delete_custom_yaml_source_definition( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) connector_name = definition_info.name @@ -1529,6 +1646,7 @@ def delete_custom_yaml_source_definition( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) request = api.DeleteDeclarativeSourceDefinitionRequest( @@ -1553,8 +1671,9 @@ def get_connector_builder_project_for_definition_id( workspace_id: str, definition_id: str, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> str | None: """Get the connector builder project ID for a declarative source definition. @@ -1569,6 +1688,7 @@ def get_connector_builder_project_for_definition_id( api_root: The API root URL client_id: OAuth client ID client_secret: OAuth client secret + bearer_token: Bearer token for authentication (alternative to client credentials). Returns: The builder project ID if found, None otherwise (can be null in API response) @@ -1582,6 +1702,7 @@ def get_connector_builder_project_for_definition_id( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) return json_result.get("builderProjectId") @@ -1593,8 +1714,9 @@ def update_connector_builder_project_testing_values( testing_values: dict[str, Any], spec: dict[str, Any], api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> dict[str, Any]: """Update the testing values for a connector builder project. @@ -1613,6 +1735,7 @@ def update_connector_builder_project_testing_values( api_root: The API root URL client_id: OAuth client ID client_secret: OAuth client secret + bearer_token: Bearer token for authentication (alternative to client credentials). Returns: The updated testing values from the API response @@ -1628,6 +1751,7 @@ def update_connector_builder_project_testing_values( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) @@ -1637,8 +1761,9 @@ def update_connector_builder_project_testing_values( def list_organizations_for_user( *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> list[models.OrganizationResponse]: """List all organizations accessible to the current user. @@ -1648,6 +1773,7 @@ def list_organizations_for_user( api_root: The API root URL client_id: OAuth client ID client_secret: OAuth client secret + bearer_token: Bearer token for authentication (alternative to client credentials). Returns: List of OrganizationResponse objects containing organization_id, organization_name, email @@ -1656,6 +1782,7 @@ def list_organizations_for_user( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) response = airbyte_instance.organizations.list_organizations_for_user() @@ -1675,8 +1802,9 @@ def list_workspaces_in_organization( organization_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, name_contains: str | None = None, max_items_limit: int | None = None, ) -> list[dict[str, Any]]: @@ -1689,6 +1817,7 @@ def list_workspaces_in_organization( api_root: The API root URL client_id: OAuth client ID client_secret: OAuth client secret + bearer_token: Bearer token for authentication (alternative to client credentials). name_contains: Optional substring filter for workspace names (server-side) max_items_limit: Optional maximum number of workspaces to return @@ -1717,6 +1846,7 @@ def list_workspaces_in_organization( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) workspaces = json_result.get("workspaces", []) @@ -1745,8 +1875,9 @@ def get_workspace_organization_info( workspace_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> dict[str, Any]: """Get organization info for a workspace. @@ -1760,6 +1891,7 @@ def get_workspace_organization_info( api_root: The API root URL client_id: OAuth client ID client_secret: OAuth client secret + bearer_token: Bearer token for authentication (alternative to client credentials). Returns: Dictionary containing organization info: @@ -1774,6 +1906,7 @@ def get_workspace_organization_info( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) @@ -1781,8 +1914,9 @@ def get_connection_state( connection_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> dict[str, Any]: """Get the state for a connection. @@ -1793,6 +1927,7 @@ def get_connection_state( api_root: The API root URL client_id: OAuth client ID client_secret: OAuth client secret + bearer_token: Bearer token for authentication (alternative to client credentials). Returns: Dictionary containing the connection state. @@ -1803,6 +1938,7 @@ def get_connection_state( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) @@ -1810,8 +1946,9 @@ def get_connection_catalog( connection_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> dict[str, Any]: """Get the configured catalog for a connection. @@ -1825,6 +1962,7 @@ def get_connection_catalog( api_root: The API root URL client_id: OAuth client ID client_secret: OAuth client secret + bearer_token: Bearer token for authentication (alternative to client credentials). Returns: Dictionary containing the connection info with syncCatalog. @@ -1835,4 +1973,5 @@ def get_connection_catalog( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) diff --git a/airbyte/cloud/__init__.py b/airbyte/cloud/__init__.py index 8aa537c33..86edf8905 100644 --- a/airbyte/cloud/__init__.py +++ b/airbyte/cloud/__init__.py @@ -53,6 +53,7 @@ from typing import TYPE_CHECKING +from airbyte.cloud.client_config import CloudClientConfig from airbyte.cloud.connections import CloudConnection from airbyte.cloud.constants import JobStatusEnum from airbyte.cloud.sync_results import SyncResult @@ -62,7 +63,7 @@ # Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757 if TYPE_CHECKING: # ruff: noqa: TC004 - from airbyte.cloud import connections, constants, sync_results, workspaces + from airbyte.cloud import client_config, connections, constants, sync_results, workspaces __all__ = [ @@ -70,10 +71,12 @@ "workspaces", "connections", "constants", + "client_config", "sync_results", # Classes "CloudWorkspace", "CloudConnection", + "CloudClientConfig", "SyncResult", # Enums "JobStatusEnum", diff --git a/airbyte/cloud/auth.py b/airbyte/cloud/auth.py index 7925bdd95..990bfc44d 100644 --- a/airbyte/cloud/auth.py +++ b/airbyte/cloud/auth.py @@ -6,6 +6,31 @@ from airbyte.secrets.util import get_secret, try_get_secret +def resolve_cloud_bearer_token( + input_value: str | SecretString | None = None, + /, +) -> SecretString | None: + """Get the Airbyte Cloud bearer token from the environment. + + Unlike other resolve functions, this returns None if no bearer token is found, + since bearer token authentication is optional (client credentials can be used instead). + + Args: + input_value: Optional explicit bearer token value. If provided, it will be + returned directly (wrapped in SecretString if needed). + + Returns: + The bearer token as a SecretString, or None if not found. + """ + if input_value is not None: + return SecretString(input_value) + + result = try_get_secret(constants.CLOUD_BEARER_TOKEN_ENV_VAR, default=None) + if result: + return SecretString(result) + return None + + def resolve_cloud_client_secret( input_value: str | SecretString | None = None, /, diff --git a/airbyte/cloud/client_config.py b/airbyte/cloud/client_config.py new file mode 100644 index 000000000..411ea2164 --- /dev/null +++ b/airbyte/cloud/client_config.py @@ -0,0 +1,189 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Cloud client configuration for Airbyte Cloud API authentication. + +This module provides the CloudClientConfig class for managing authentication +credentials and API configuration when connecting to Airbyte Cloud, OSS, or +Enterprise instances. + +Two authentication methods are supported (mutually exclusive): +1. OAuth2 client credentials (client_id + client_secret) +2. Bearer token authentication + +Example usage with client credentials: + ```python + from airbyte.cloud.client_config import CloudClientConfig + + config = CloudClientConfig( + client_id="your-client-id", + client_secret="your-client-secret", + ) + ``` + +Example usage with bearer token: + ```python + from airbyte.cloud.client_config import CloudClientConfig + + config = CloudClientConfig( + bearer_token="your-bearer-token", + ) + ``` + +Example using environment variables: + ```python + from airbyte.cloud.client_config import CloudClientConfig + + # Resolves from AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, + # AIRBYTE_CLOUD_BEARER_TOKEN, and AIRBYTE_CLOUD_API_URL environment variables + config = CloudClientConfig.from_env() + ``` +""" + +from __future__ import annotations + +from dataclasses import dataclass + +from airbyte._util import api_util +from airbyte.cloud.auth import ( + resolve_cloud_api_url, + resolve_cloud_bearer_token, + resolve_cloud_client_id, + resolve_cloud_client_secret, +) +from airbyte.exceptions import PyAirbyteInputError +from airbyte.secrets.base import SecretString + + +@dataclass +class CloudClientConfig: + """Client configuration for Airbyte Cloud API. + + This class encapsulates the authentication and API configuration needed to connect + to Airbyte Cloud, OSS, or Enterprise instances. It supports two mutually + exclusive authentication methods: + + 1. OAuth2 client credentials flow (client_id + client_secret) + 2. Bearer token authentication + + Exactly one authentication method must be provided. Providing both or neither + will raise a validation error. + + Attributes: + client_id: OAuth2 client ID for client credentials flow. + client_secret: OAuth2 client secret for client credentials flow. + bearer_token: Pre-generated bearer token for direct authentication. + api_root: The API root URL. Defaults to Airbyte Cloud API. + """ + + client_id: SecretString | None = None + """OAuth2 client ID for client credentials authentication.""" + + client_secret: SecretString | None = None + """OAuth2 client secret for client credentials authentication.""" + + bearer_token: SecretString | None = None + """Bearer token for direct authentication (alternative to client credentials).""" + + api_root: str = api_util.CLOUD_API_ROOT + """The API root URL. Defaults to Airbyte Cloud API.""" + + def __post_init__(self) -> None: + """Validate credentials and ensure secrets are properly wrapped.""" + # Wrap secrets in SecretString if they aren't already + if self.client_id is not None: + self.client_id = SecretString(self.client_id) + if self.client_secret is not None: + self.client_secret = SecretString(self.client_secret) + if self.bearer_token is not None: + self.bearer_token = SecretString(self.bearer_token) + + # Validate mutual exclusivity + has_client_credentials = self.client_id is not None or self.client_secret is not None + has_bearer_token = self.bearer_token is not None + + if has_client_credentials and has_bearer_token: + raise PyAirbyteInputError( + message="Cannot use both client credentials and bearer token authentication.", + guidance=( + "Provide either client_id and client_secret together, " + "or bearer_token alone, but not both." + ), + ) + + if has_client_credentials and (self.client_id is None or self.client_secret is None): + # If using client credentials, both must be provided + raise PyAirbyteInputError( + message="Incomplete client credentials.", + guidance=( + "When using client credentials authentication, " + "both client_id and client_secret must be provided." + ), + ) + + if not has_client_credentials and not has_bearer_token: + raise PyAirbyteInputError( + message="No authentication credentials provided.", + guidance=( + "Provide either client_id and client_secret together for OAuth2 " + "client credentials flow, or bearer_token for direct authentication." + ), + ) + + @property + def uses_bearer_token(self) -> bool: + """Return True if using bearer token authentication.""" + return self.bearer_token is not None + + @property + def uses_client_credentials(self) -> bool: + """Return True if using client credentials authentication.""" + return self.client_id is not None and self.client_secret is not None + + @classmethod + def from_env( + cls, + *, + api_root: str | None = None, + ) -> CloudClientConfig: + """Create CloudClientConfig from environment variables. + + This factory method resolves credentials from environment variables, + providing a convenient way to create credentials without explicitly + passing secrets. + + Environment variables used: + - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow). + - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow). + - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials). + - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud). + + The method will first check for a bearer token. If not found, it will + attempt to use client credentials. + + Args: + api_root: The API root URL. If not provided, will be resolved from + the `AIRBYTE_CLOUD_API_URL` environment variable, or default to + the Airbyte Cloud API. + + Returns: + A CloudClientConfig instance configured with credentials from the environment. + + Raises: + PyAirbyteSecretNotFoundError: If required credentials are not found in + the environment. + """ + resolved_api_root = resolve_cloud_api_url(api_root) + + # Try bearer token first + bearer_token = resolve_cloud_bearer_token() + if bearer_token: + return cls( + bearer_token=bearer_token, + api_root=resolved_api_root, + ) + + # Fall back to client credentials + return cls( + client_id=resolve_cloud_client_id(), + client_secret=resolve_cloud_client_secret(), + api_root=resolved_api_root, + ) diff --git a/airbyte/cloud/connections.py b/airbyte/cloud/connections.py index 95531f6eb..34a98322e 100644 --- a/airbyte/cloud/connections.py +++ b/airbyte/cloud/connections.py @@ -62,6 +62,7 @@ def _fetch_connection_info(self) -> ConnectionResponse: api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) @classmethod @@ -180,6 +181,7 @@ def run_sync( workspace_id=self.workspace.workspace_id, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) sync_result = SyncResult( workspace=self.workspace, @@ -242,6 +244,7 @@ def get_previous_sync_logs( order_by=order_by, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) return [ SyncResult( @@ -298,6 +301,7 @@ def get_state_artifacts(self) -> list[dict[str, Any]] | None: api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) if state_response.get("stateType") == "not_set": return None @@ -319,6 +323,7 @@ def get_catalog_artifact(self) -> dict[str, Any] | None: api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) return connection_response.get("syncCatalog") @@ -336,6 +341,7 @@ def rename(self, name: str) -> CloudConnection: api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, name=name, ) self._connection_info = updated_response @@ -355,6 +361,7 @@ def set_table_prefix(self, prefix: str) -> CloudConnection: api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, prefix=prefix, ) self._connection_info = updated_response @@ -379,6 +386,7 @@ def set_selected_streams(self, stream_names: list[str]) -> CloudConnection: api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, configurations=configurations, ) self._connection_info = updated_response diff --git a/airbyte/cloud/connectors.py b/airbyte/cloud/connectors.py index 2707b4e74..094b904ce 100644 --- a/airbyte/cloud/connectors.py +++ b/airbyte/cloud/connectors.py @@ -165,6 +165,7 @@ def check( api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) check_result = CheckResult( success=result[0], @@ -197,6 +198,7 @@ def _fetch_connector_info(self) -> api_models.SourceResponse: api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) def rename(self, name: str) -> CloudSource: @@ -213,6 +215,7 @@ def rename(self, name: str) -> CloudSource: api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, name=name, ) self._connector_info = updated_response @@ -235,6 +238,7 @@ def update_config(self, config: dict[str, Any]) -> CloudSource: api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, config=config, ) self._connector_info = updated_response @@ -279,6 +283,7 @@ def _fetch_connector_info(self) -> api_models.DestinationResponse: api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) def rename(self, name: str) -> CloudDestination: @@ -295,6 +300,7 @@ def rename(self, name: str) -> CloudDestination: api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, name=name, ) self._connector_info = updated_response @@ -317,6 +323,7 @@ def update_config(self, config: dict[str, Any]) -> CloudDestination: api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, config=config, ) self._connector_info = updated_response @@ -377,6 +384,7 @@ def _fetch_definition_info( api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) raise NotImplementedError( "Docker custom source definitions are not yet supported. " @@ -463,6 +471,7 @@ def connector_builder_project_id(self) -> str | None: api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) ) @@ -510,6 +519,7 @@ def permanently_delete( api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, safe_mode=safe_mode, ) else: @@ -579,6 +589,7 @@ def update_definition( api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) return CustomCloudSourceDefinition._from_yaml_response(self.workspace, result) @@ -685,6 +696,7 @@ def deploy_source( api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) return CloudSource._from_source_response( # noqa: SLF001 # Accessing Non-Public API workspace=self.workspace, @@ -754,6 +766,7 @@ def set_testing_values( api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) return self diff --git a/airbyte/cloud/sync_results.py b/airbyte/cloud/sync_results.py index 931c33733..c59ae2012 100644 --- a/airbyte/cloud/sync_results.py +++ b/airbyte/cloud/sync_results.py @@ -255,6 +255,7 @@ def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResp connection_id=self.connection.connection_id, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) return self._connection_response @@ -266,6 +267,7 @@ def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) return asdict(destination_response.configuration) @@ -287,6 +289,7 @@ def _fetch_latest_job_info(self) -> JobResponse: api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) return self._latest_job_info @@ -313,6 +316,7 @@ def start_time(self) -> datetime: json={"id": self.job_id}, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) raw_start_time = job_info_raw.get("startTime") if raw_start_time: @@ -332,6 +336,7 @@ def _fetch_job_with_attempts(self) -> dict[str, Any]: }, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) return self._job_with_attempts_info diff --git a/airbyte/cloud/workspaces.py b/airbyte/cloud/workspaces.py index aa27560d0..05d0c0a68 100644 --- a/airbyte/cloud/workspaces.py +++ b/airbyte/cloud/workspaces.py @@ -35,7 +35,7 @@ from __future__ import annotations -from dataclasses import dataclass +from dataclasses import dataclass, field from functools import cached_property from pathlib import Path from typing import TYPE_CHECKING, Any, Literal, overload @@ -47,10 +47,12 @@ from airbyte._util.api_util import get_web_url_root from airbyte.cloud.auth import ( resolve_cloud_api_url, + resolve_cloud_bearer_token, resolve_cloud_client_id, resolve_cloud_client_secret, resolve_cloud_workspace_id, ) +from airbyte.cloud.client_config import CloudClientConfig from airbyte.cloud.connections import CloudConnection from airbyte.cloud.connectors import ( CloudDestination, @@ -88,17 +90,55 @@ class CloudWorkspace: By overriding `api_root`, you can use this class to interact with self-managed Airbyte instances, both OSS and Enterprise. + + Two authentication methods are supported (mutually exclusive): + 1. OAuth2 client credentials (client_id + client_secret) + 2. Bearer token authentication + + Example with client credentials: + ```python + workspace = CloudWorkspace( + workspace_id="...", + client_id="...", + client_secret="...", + ) + ``` + + Example with bearer token: + ```python + workspace = CloudWorkspace( + workspace_id="...", + bearer_token="...", + ) + ``` """ workspace_id: str - client_id: SecretString - client_secret: SecretString + client_id: SecretString | None = None + client_secret: SecretString | None = None api_root: str = api_util.CLOUD_API_ROOT + bearer_token: SecretString | None = None + + # Internal credentials object (set in __post_init__, excluded from __init__) + _credentials: CloudClientConfig | None = field(default=None, init=False, repr=False) def __post_init__(self) -> None: - """Ensure that the client ID and secret are handled securely.""" - self.client_id = SecretString(self.client_id) - self.client_secret = SecretString(self.client_secret) + """Validate and initialize credentials.""" + # Wrap secrets in SecretString if provided + if self.client_id is not None: + self.client_id = SecretString(self.client_id) + if self.client_secret is not None: + self.client_secret = SecretString(self.client_secret) + if self.bearer_token is not None: + self.bearer_token = SecretString(self.bearer_token) + + # Create internal CloudClientConfig object (validates mutual exclusivity) + self._credentials = CloudClientConfig( + client_id=self.client_id, + client_secret=self.client_secret, + bearer_token=self.bearer_token, + api_root=self.api_root, + ) @classmethod def from_env( @@ -113,9 +153,14 @@ def from_env( providing a convenient way to create a workspace without explicitly passing credentials. + Two authentication methods are supported (mutually exclusive): + 1. Bearer token (checked first) + 2. OAuth2 client credentials (fallback) + Environment variables used: - - `AIRBYTE_CLOUD_CLIENT_ID`: Required. The OAuth client ID. - - `AIRBYTE_CLOUD_CLIENT_SECRET`: Required. The OAuth client secret. + - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials). + - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow). + - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow). - `AIRBYTE_CLOUD_WORKSPACE_ID`: The workspace ID (if not passed as argument). - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud). @@ -142,11 +187,23 @@ def from_env( workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id") ``` """ + resolved_api_root = resolve_cloud_api_url(api_root) + + # Try bearer token first + bearer_token = resolve_cloud_bearer_token() + if bearer_token: + return cls( + workspace_id=resolve_cloud_workspace_id(workspace_id), + bearer_token=bearer_token, + api_root=resolved_api_root, + ) + + # Fall back to client credentials return cls( workspace_id=resolve_cloud_workspace_id(workspace_id), client_id=resolve_cloud_client_id(), client_secret=resolve_cloud_client_secret(), - api_root=resolve_cloud_api_url(api_root), + api_root=resolved_api_root, ) @property @@ -166,6 +223,7 @@ def _organization_info(self) -> dict[str, Any]: api_root=self.api_root, client_id=self.client_id, client_secret=self.client_secret, + bearer_token=self.bearer_token, ) @overload @@ -248,6 +306,7 @@ def connect(self) -> None: workspace_id=self.workspace_id, client_id=self.client_id, client_secret=self.client_secret, + bearer_token=self.bearer_token, ) print(f"Successfully connected to workspace: {self.workspace_url}") @@ -337,6 +396,7 @@ def deploy_source( config=source_config_dict, client_id=self.client_id, client_secret=self.client_secret, + bearer_token=self.bearer_token, ) return CloudSource( workspace=self, @@ -392,6 +452,7 @@ def deploy_destination( config=destination_conf_dict, # Wants a dataclass but accepts dict client_id=self.client_id, client_secret=self.client_secret, + bearer_token=self.bearer_token, ) return CloudDestination( workspace=self, @@ -425,6 +486,7 @@ def permanently_delete_source( api_root=self.api_root, client_id=self.client_id, client_secret=self.client_secret, + bearer_token=self.bearer_token, safe_mode=safe_mode, ) @@ -461,6 +523,7 @@ def permanently_delete_destination( api_root=self.api_root, client_id=self.client_id, client_secret=self.client_secret, + bearer_token=self.bearer_token, safe_mode=safe_mode, ) @@ -507,6 +570,7 @@ def deploy_connection( prefix=table_prefix or "", client_id=self.client_id, client_secret=self.client_secret, + bearer_token=self.bearer_token, ) return CloudConnection( @@ -551,6 +615,7 @@ def permanently_delete_connection( workspace_id=self.workspace_id, client_id=self.client_id, client_secret=self.client_secret, + bearer_token=self.bearer_token, safe_mode=safe_mode, ) @@ -584,6 +649,7 @@ def list_connections( name_filter=name_filter, client_id=self.client_id, client_secret=self.client_secret, + bearer_token=self.bearer_token, ) return [ CloudConnection._from_connection_response( # noqa: SLF001 (non-public API) @@ -611,6 +677,7 @@ def list_sources( name_filter=name_filter, client_id=self.client_id, client_secret=self.client_secret, + bearer_token=self.bearer_token, ) return [ CloudSource._from_source_response( # noqa: SLF001 (non-public API) @@ -638,6 +705,7 @@ def list_destinations( name_filter=name_filter, client_id=self.client_id, client_secret=self.client_secret, + bearer_token=self.bearer_token, ) return [ CloudDestination._from_destination_response( # noqa: SLF001 (non-public API) @@ -738,6 +806,7 @@ def publish_custom_source_definition( api_root=self.api_root, client_id=self.client_id, client_secret=self.client_secret, + bearer_token=self.bearer_token, ) custom_definition = CustomCloudSourceDefinition._from_yaml_response( # noqa: SLF001 self, result @@ -773,6 +842,7 @@ def list_custom_source_definitions( api_root=self.api_root, client_id=self.client_id, client_secret=self.client_secret, + bearer_token=self.bearer_token, ) return [ CustomCloudSourceDefinition._from_yaml_response(self, d) # noqa: SLF001 @@ -806,6 +876,7 @@ def get_custom_source_definition( api_root=self.api_root, client_id=self.client_id, client_secret=self.client_secret, + bearer_token=self.bearer_token, ) return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 diff --git a/airbyte/constants.py b/airbyte/constants.py index 465afffc1..dfc943ca2 100644 --- a/airbyte/constants.py +++ b/airbyte/constants.py @@ -222,6 +222,14 @@ def _str_to_bool(value: str) -> bool: CLOUD_WORKSPACE_ID_ENV_VAR: str = "AIRBYTE_CLOUD_WORKSPACE_ID" """The environment variable name for the Airbyte Cloud workspace ID.""" +CLOUD_BEARER_TOKEN_ENV_VAR: str = "AIRBYTE_CLOUD_BEARER_TOKEN" +"""The environment variable name for the Airbyte Cloud bearer token. + +When set, this bearer token will be used for authentication instead of +client credentials (client_id + client_secret). This is useful when you +already have a valid bearer token and want to skip the OAuth2 token exchange. +""" + CLOUD_API_ROOT: str = "https://api.airbyte.com/v1" """The Airbyte Cloud API root URL. diff --git a/airbyte/mcp/_util.py b/airbyte/mcp/_util.py index f3d3ff5b6..d93974d8f 100644 --- a/airbyte/mcp/_util.py +++ b/airbyte/mcp/_util.py @@ -1,6 +1,8 @@ # Copyright (c) 2025 Airbyte, Inc., all rights reserved. """Internal utility functions for MCP.""" +from __future__ import annotations + import json import os from pathlib import Path @@ -8,14 +10,24 @@ import dotenv import yaml +from fastmcp.server.dependencies import get_http_headers from airbyte._util.meta import is_interactive +from airbyte.cloud.auth import ( + resolve_cloud_api_url, + resolve_cloud_bearer_token, + resolve_cloud_client_id, + resolve_cloud_client_secret, + resolve_cloud_workspace_id, +) +from airbyte.cloud.client_config import CloudClientConfig from airbyte.secrets import ( DotenvSecretManager, GoogleGSMSecretManager, SecretSourceEnum, register_secret_manager, ) +from airbyte.secrets.base import SecretString from airbyte.secrets.config import disable_secret_source from airbyte.secrets.hydration import deep_update, detect_hardcoded_secrets from airbyte.secrets.util import get_secret, is_secret_available @@ -23,6 +35,12 @@ AIRBYTE_MCP_DOTENV_PATH_ENVVAR = "AIRBYTE_MCP_ENV_FILE" +# HTTP header names for Airbyte Cloud authentication (X-Airbyte-Cloud-* convention) +HEADER_CLIENT_ID = "X-Airbyte-Cloud-Client-Id" +HEADER_CLIENT_SECRET = "X-Airbyte-Cloud-Client-Secret" +HEADER_WORKSPACE_ID = "X-Airbyte-Cloud-Workspace-Id" +HEADER_API_URL = "X-Airbyte-Cloud-Api-Url" + def _load_dotenv_file(dotenv_path: Path | str) -> None: """Load environment variables from a .env file.""" @@ -224,3 +242,218 @@ def _raise_invalid_type(file_config: object) -> None: ) return config_dict + + +def _get_header_value(headers: dict[str, str], header_name: str) -> str | None: + """Get a header value from a headers dict, case-insensitively. + + Args: + headers: Dictionary of HTTP headers. + header_name: The header name to look for (case-insensitive). + + Returns: + The header value if found, None otherwise. + """ + header_name_lower = header_name.lower() + for key, value in headers.items(): + if key.lower() == header_name_lower: + return value + return None + + +def get_bearer_token_from_headers() -> SecretString | None: + """Extract bearer token from HTTP Authorization header. + + This function extracts the bearer token from the standard HTTP + `Authorization: Bearer ` header when running as an MCP HTTP server. + + Returns: + The bearer token as a SecretString, or None if not found or not in HTTP context. + """ + headers = get_http_headers() + if not headers: + return None + + auth_header = _get_header_value(headers, "Authorization") + if not auth_header: + return None + + # Parse "Bearer " format + if auth_header.lower().startswith("bearer "): + token = auth_header[7:].strip() # Remove "Bearer " prefix + if token: + return SecretString(token) + + return None + + +def get_client_id_from_headers() -> SecretString | None: + """Extract client ID from HTTP headers. + + Returns: + The client ID as a SecretString, or None if not found. + """ + headers = get_http_headers() + if not headers: + return None + + value = _get_header_value(headers, HEADER_CLIENT_ID) + if value: + return SecretString(value) + return None + + +def get_client_secret_from_headers() -> SecretString | None: + """Extract client secret from HTTP headers. + + Returns: + The client secret as a SecretString, or None if not found. + """ + headers = get_http_headers() + if not headers: + return None + + value = _get_header_value(headers, HEADER_CLIENT_SECRET) + if value: + return SecretString(value) + return None + + +def get_workspace_id_from_headers() -> str | None: + """Extract workspace ID from HTTP headers. + + Returns: + The workspace ID, or None if not found. + """ + headers = get_http_headers() + if not headers: + return None + + return _get_header_value(headers, HEADER_WORKSPACE_ID) + + +def get_api_url_from_headers() -> str | None: + """Extract API URL from HTTP headers. + + Returns: + The API URL, or None if not found. + """ + headers = get_http_headers() + if not headers: + return None + + return _get_header_value(headers, HEADER_API_URL) + + +def resolve_cloud_credentials( + *, + client_id: SecretString | str | None = None, + client_secret: SecretString | str | None = None, + bearer_token: SecretString | str | None = None, + api_root: str | None = None, +) -> CloudClientConfig: + """Resolve CloudClientConfig from multiple sources. + + This function resolves authentication credentials for Airbyte Cloud + from multiple sources in the following priority order: + + 1. Explicit parameters passed to this function + 2. HTTP headers (when running as MCP HTTP server) + 3. Environment variables + + For bearer token authentication, the resolution order is: + 1. Explicit `bearer_token` parameter + 2. HTTP `Authorization: Bearer ` header + 3. `AIRBYTE_CLOUD_BEARER_TOKEN` environment variable + + For client credentials authentication, the resolution order is: + 1. Explicit `client_id` and `client_secret` parameters + 2. HTTP `X-Airbyte-Cloud-Client-Id` and `X-Airbyte-Cloud-Client-Secret` headers + 3. `AIRBYTE_CLOUD_CLIENT_ID` and `AIRBYTE_CLOUD_CLIENT_SECRET` environment variables + + Args: + client_id: Optional explicit client ID. + client_secret: Optional explicit client secret. + bearer_token: Optional explicit bearer token. + api_root: Optional explicit API root URL. + + Returns: + A CloudClientConfig instance with resolved authentication. + + Raises: + PyAirbyteInputError: If no valid authentication can be resolved. + """ + # Resolve API root (explicit -> header -> env var -> default) + resolved_api_root = api_root or get_api_url_from_headers() or resolve_cloud_api_url() + + # Try to resolve bearer token first (explicit -> header -> env var) + resolved_bearer_token: SecretString | None = None + if bearer_token is not None: + resolved_bearer_token = SecretString(bearer_token) + else: + # Try HTTP header + resolved_bearer_token = get_bearer_token_from_headers() + if resolved_bearer_token is None: + # Try env var + resolved_bearer_token = resolve_cloud_bearer_token() + + if resolved_bearer_token: + return CloudClientConfig( + bearer_token=resolved_bearer_token, + api_root=resolved_api_root, + ) + + # Fall back to client credentials (explicit -> header -> env var) + resolved_client_id: SecretString | None = None + resolved_client_secret: SecretString | None = None + + if client_id is not None: + resolved_client_id = SecretString(client_id) + else: + resolved_client_id = get_client_id_from_headers() + if resolved_client_id is None: + resolved_client_id = resolve_cloud_client_id() + + if client_secret is not None: + resolved_client_secret = SecretString(client_secret) + else: + resolved_client_secret = get_client_secret_from_headers() + if resolved_client_secret is None: + resolved_client_secret = resolve_cloud_client_secret() + + return CloudClientConfig( + client_id=resolved_client_id, + client_secret=resolved_client_secret, + api_root=resolved_api_root, + ) + + +def resolve_workspace_id( + workspace_id: str | None = None, +) -> str: + """Resolve workspace ID from multiple sources. + + Resolution order: + 1. Explicit `workspace_id` parameter + 2. HTTP `X-Airbyte-Cloud-Workspace-Id` header + 3. `AIRBYTE_CLOUD_WORKSPACE_ID` environment variable + + Args: + workspace_id: Optional explicit workspace ID. + + Returns: + The resolved workspace ID. + + Raises: + PyAirbyteSecretNotFoundError: If no workspace ID can be resolved. + """ + if workspace_id is not None: + return workspace_id + + # Try HTTP header + header_workspace_id = get_workspace_id_from_headers() + if header_workspace_id: + return header_workspace_id + + # Fall back to env var + return resolve_cloud_workspace_id() diff --git a/airbyte/mcp/cloud_ops.py b/airbyte/mcp/cloud_ops.py index 841db4fe1..ec21818e4 100644 --- a/airbyte/mcp/cloud_ops.py +++ b/airbyte/mcp/cloud_ops.py @@ -13,7 +13,6 @@ resolve_cloud_api_url, resolve_cloud_client_id, resolve_cloud_client_secret, - resolve_cloud_workspace_id, ) from airbyte.cloud.connectors import CustomCloudSourceDefinition from airbyte.cloud.constants import FAILED_STATUSES @@ -26,7 +25,12 @@ register_guid_created_in_session, register_tools, ) -from airbyte.mcp._util import resolve_config, resolve_list_of_strings +from airbyte.mcp._util import ( + resolve_cloud_credentials, + resolve_config, + resolve_list_of_strings, + resolve_workspace_id, +) from airbyte.secrets import SecretString @@ -214,15 +218,23 @@ class SyncJobListResult(BaseModel): def _get_cloud_workspace(workspace_id: str | None = None) -> CloudWorkspace: """Get an authenticated CloudWorkspace. + Resolves credentials from multiple sources in order: + 1. HTTP headers (when running as MCP server with HTTP/SSE transport) + 2. Environment variables + Args: - workspace_id: Optional workspace ID. If not provided, uses the - AIRBYTE_CLOUD_WORKSPACE_ID environment variable. + workspace_id: Optional workspace ID. If not provided, uses HTTP headers + or the AIRBYTE_CLOUD_WORKSPACE_ID environment variable. """ + credentials = resolve_cloud_credentials() + resolved_workspace_id = resolve_workspace_id(workspace_id) + return CloudWorkspace( - workspace_id=resolve_cloud_workspace_id(workspace_id), - client_id=resolve_cloud_client_id(), - client_secret=resolve_cloud_client_secret(), - api_root=resolve_cloud_api_url(), + workspace_id=resolved_workspace_id, + client_id=credentials.client_id, + client_secret=credentials.client_secret, + bearer_token=credentials.bearer_token, + api_root=credentials.api_root, ) @@ -503,16 +515,14 @@ def check_airbyte_cloud_workspace( Returns workspace details including workspace ID, name, and organization info. """ workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) - api_root = resolve_cloud_api_url() - client_id = resolve_cloud_client_id() - client_secret = resolve_cloud_client_secret() - # Get workspace details from the public API + # Get workspace details from the public API using workspace's credentials workspace_response = api_util.get_workspace( workspace_id=workspace.workspace_id, - api_root=api_root, - client_id=client_id, - client_secret=client_secret, + api_root=workspace.api_root, + client_id=workspace.client_id, + client_secret=workspace.client_secret, + bearer_token=workspace.bearer_token, ) # Try to get organization info, but fail gracefully if we don't have permissions. @@ -1273,6 +1283,7 @@ def _resolve_organization( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=None, # Organization listing requires client credentials ) if organization_id: @@ -1396,6 +1407,7 @@ def list_cloud_workspaces( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=None, # Workspace listing requires client credentials name_contains=name_contains, max_items_limit=max_items_limit, ) diff --git a/tests/integration_tests/cloud/test_cloud_api_util.py b/tests/integration_tests/cloud/test_cloud_api_util.py index c0ba2c999..7e00276b4 100644 --- a/tests/integration_tests/cloud/test_cloud_api_util.py +++ b/tests/integration_tests/cloud/test_cloud_api_util.py @@ -38,6 +38,7 @@ def test_get_workspace( api_root=airbyte_cloud_api_root, client_id=airbyte_cloud_client_id, client_secret=airbyte_cloud_client_secret, + bearer_token=None, ) assert workspace.workspace_id == workspace_id @@ -53,6 +54,7 @@ def test_list_workspaces( api_root=airbyte_cloud_api_root, client_id=airbyte_cloud_client_id, client_secret=airbyte_cloud_client_secret, + bearer_token=None, ) assert result assert len(result) > 0 @@ -70,6 +72,7 @@ def test_list_sources( api_root=airbyte_cloud_api_root, client_id=airbyte_cloud_client_id, client_secret=airbyte_cloud_client_secret, + bearer_token=None, ) assert ( result @@ -89,6 +92,7 @@ def test_list_destinations( api_root=airbyte_cloud_api_root, client_id=airbyte_cloud_client_id, client_secret=airbyte_cloud_client_secret, + bearer_token=None, ) assert ( result @@ -112,6 +116,7 @@ def test_create_and_delete_source( api_root=airbyte_cloud_api_root, client_id=airbyte_cloud_client_id, client_secret=airbyte_cloud_client_secret, + bearer_token=None, ) assert source.name == new_resource_name assert source.source_type == "faker" @@ -123,6 +128,7 @@ def test_create_and_delete_source( workspace_id=workspace_id, client_id=airbyte_cloud_client_id, client_secret=airbyte_cloud_client_secret, + bearer_token=None, ) @@ -148,6 +154,7 @@ def test_create_and_delete_destination( config=destination_config, client_id=airbyte_cloud_client_id, client_secret=airbyte_cloud_client_secret, + bearer_token=None, ) assert destination.name == new_resource_name assert destination.destination_type == "duckdb" @@ -159,6 +166,7 @@ def test_create_and_delete_destination( workspace_id=workspace_id, client_id=airbyte_cloud_client_id, client_secret=airbyte_cloud_client_secret, + bearer_token=None, ) @@ -183,6 +191,7 @@ def test_create_and_delete_connection( config=SourceFaker(), client_id=airbyte_cloud_client_id, client_secret=airbyte_cloud_client_secret, + bearer_token=None, ) assert source.name == new_source_name assert source.source_type == "faker" @@ -198,6 +207,7 @@ def test_create_and_delete_connection( ), client_id=airbyte_cloud_client_id, client_secret=airbyte_cloud_client_secret, + bearer_token=None, ) assert destination.name == new_destination_name assert destination.destination_type == "duckdb" @@ -213,6 +223,7 @@ def test_create_and_delete_connection( selected_stream_names=["users", "purchases", "products"], client_id=airbyte_cloud_client_id, client_secret=airbyte_cloud_client_secret, + bearer_token=None, ) assert connection.source_id == source.source_id assert connection.destination_id == destination.destination_id @@ -224,6 +235,7 @@ def test_create_and_delete_connection( workspace_id=workspace_id, client_id=airbyte_cloud_client_id, client_secret=airbyte_cloud_client_secret, + bearer_token=None, ) api_util.delete_source( source_id=source.source_id, @@ -231,6 +243,7 @@ def test_create_and_delete_connection( workspace_id=workspace_id, client_id=airbyte_cloud_client_id, client_secret=airbyte_cloud_client_secret, + bearer_token=None, ) api_util.delete_destination( destination_id=destination.destination_id, @@ -238,6 +251,7 @@ def test_create_and_delete_connection( workspace_id=workspace_id, client_id=airbyte_cloud_client_id, client_secret=airbyte_cloud_client_secret, + bearer_token=None, ) @@ -283,6 +297,7 @@ def test_check_connector( connector_type=connector_type, client_id=airbyte_cloud_client_id, client_secret=airbyte_cloud_client_secret, + bearer_token=None, ) assert result == expect_success except AirbyteError as e: