Skip to content
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
273 changes: 182 additions & 91 deletions airbyte/_util/api_util.py

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions airbyte/cloud/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ def resolve_cloud_client_id(
return get_secret(constants.CLOUD_CLIENT_ID_ENV_VAR, default=input_value)


def resolve_cloud_bearer_token(
input_value: str | SecretString | None = None,
/,
) -> SecretString | None:
"""Get the Airbyte Cloud bearer token from the environment."""
return try_get_secret(constants.CLOUD_BEARER_TOKEN_ENV_VAR, default=input_value)


def resolve_cloud_api_url(
input_value: str | None = None,
/,
Expand Down
30 changes: 18 additions & 12 deletions airbyte/cloud/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ def _fetch_connection_info(self) -> ConnectionResponse:
workspace_id=self.workspace.workspace_id,
connection_id=self.connection_id,
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
client_id=self.workspace.client_id, # type: ignore[arg-type]
client_secret=self.workspace.client_secret, # type: ignore[arg-type]
bearer_token=self.workspace.bearer_token,
)

@classmethod
Expand Down Expand Up @@ -178,8 +179,9 @@ def run_sync(
connection_id=self.connection_id,
api_root=self.workspace.api_root,
workspace_id=self.workspace.workspace_id,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
client_id=self.workspace.client_id, # type: ignore[arg-type]
client_secret=self.workspace.client_secret, # type: ignore[arg-type]
bearer_token=self.workspace.bearer_token,
)
sync_result = SyncResult(
workspace=self.workspace,
Expand Down Expand Up @@ -240,8 +242,9 @@ def get_previous_sync_logs(
limit=limit,
offset=offset,
order_by=order_by,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
client_id=self.workspace.client_id, # type: ignore[arg-type]
client_secret=self.workspace.client_secret, # type: ignore[arg-type]
bearer_token=self.workspace.bearer_token,
)
return [
SyncResult(
Expand Down Expand Up @@ -334,8 +337,9 @@ def rename(self, name: str) -> CloudConnection:
updated_response = api_util.patch_connection(
connection_id=self.connection_id,
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
client_id=self.workspace.client_id, # type: ignore[arg-type]
client_secret=self.workspace.client_secret, # type: ignore[arg-type]
bearer_token=self.workspace.bearer_token,
name=name,
)
self._connection_info = updated_response
Expand All @@ -353,8 +357,9 @@ def set_table_prefix(self, prefix: str) -> CloudConnection:
updated_response = api_util.patch_connection(
connection_id=self.connection_id,
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
client_id=self.workspace.client_id, # type: ignore[arg-type]
client_secret=self.workspace.client_secret, # type: ignore[arg-type]
bearer_token=self.workspace.bearer_token,
prefix=prefix,
)
self._connection_info = updated_response
Expand All @@ -377,8 +382,9 @@ def set_selected_streams(self, stream_names: list[str]) -> CloudConnection:
updated_response = api_util.patch_connection(
connection_id=self.connection_id,
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
client_id=self.workspace.client_id, # type: ignore[arg-type]
client_secret=self.workspace.client_secret, # type: ignore[arg-type]
bearer_token=self.workspace.bearer_token,
configurations=configurations,
)
self._connection_info = updated_response
Expand Down
60 changes: 36 additions & 24 deletions airbyte/cloud/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,9 @@ def check(
connector_type=self.connector_type,
actor_id=self.connector_id,
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
client_id=self.workspace.client_id, # type: ignore[arg-type]
client_secret=self.workspace.client_secret, # type: ignore[arg-type]
bearer_token=self.workspace.bearer_token,
)
check_result = CheckResult(
success=result[0],
Expand Down Expand Up @@ -195,8 +196,9 @@ def _fetch_connector_info(self) -> api_models.SourceResponse:
return api_util.get_source(
source_id=self.connector_id,
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
client_id=self.workspace.client_id, # type: ignore[arg-type]
client_secret=self.workspace.client_secret, # type: ignore[arg-type]
bearer_token=self.workspace.bearer_token,
)

def rename(self, name: str) -> CloudSource:
Expand All @@ -211,8 +213,9 @@ def rename(self, name: str) -> CloudSource:
updated_response = api_util.patch_source(
source_id=self.connector_id,
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
client_id=self.workspace.client_id, # type: ignore[arg-type]
client_secret=self.workspace.client_secret, # type: ignore[arg-type]
bearer_token=self.workspace.bearer_token,
name=name,
)
self._connector_info = updated_response
Expand All @@ -233,8 +236,9 @@ def update_config(self, config: dict[str, Any]) -> CloudSource:
updated_response = api_util.patch_source(
source_id=self.connector_id,
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
client_id=self.workspace.client_id, # type: ignore[arg-type]
client_secret=self.workspace.client_secret, # type: ignore[arg-type]
bearer_token=self.workspace.bearer_token,
config=config,
)
self._connector_info = updated_response
Expand Down Expand Up @@ -277,8 +281,9 @@ def _fetch_connector_info(self) -> api_models.DestinationResponse:
return api_util.get_destination(
destination_id=self.connector_id,
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
client_id=self.workspace.client_id, # type: ignore[arg-type]
client_secret=self.workspace.client_secret, # type: ignore[arg-type]
bearer_token=self.workspace.bearer_token,
)

def rename(self, name: str) -> CloudDestination:
Expand All @@ -293,8 +298,9 @@ def rename(self, name: str) -> CloudDestination:
updated_response = api_util.patch_destination(
destination_id=self.connector_id,
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
client_id=self.workspace.client_id, # type: ignore[arg-type]
client_secret=self.workspace.client_secret, # type: ignore[arg-type]
bearer_token=self.workspace.bearer_token,
name=name,
)
self._connector_info = updated_response
Expand All @@ -315,8 +321,9 @@ def update_config(self, config: dict[str, Any]) -> CloudDestination:
updated_response = api_util.patch_destination(
destination_id=self.connector_id,
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
client_id=self.workspace.client_id, # type: ignore[arg-type]
client_secret=self.workspace.client_secret, # type: ignore[arg-type]
bearer_token=self.workspace.bearer_token,
config=config,
)
self._connector_info = updated_response
Expand Down Expand Up @@ -375,8 +382,9 @@ def _fetch_definition_info(
workspace_id=self.workspace.workspace_id,
definition_id=self.definition_id,
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
client_id=self.workspace.client_id, # type: ignore[arg-type]
client_secret=self.workspace.client_secret, # type: ignore[arg-type]
bearer_token=self.workspace.bearer_token,
)
raise NotImplementedError(
"Docker custom source definitions are not yet supported. "
Expand Down Expand Up @@ -461,8 +469,9 @@ def connector_builder_project_id(self) -> str | None:
workspace_id=self.workspace.workspace_id,
definition_id=self.definition_id,
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
client_id=self.workspace.client_id, # type: ignore[arg-type]
client_secret=self.workspace.client_secret, # type: ignore[arg-type]
bearer_token=self.workspace.bearer_token,
)
)

Expand Down Expand Up @@ -508,8 +517,9 @@ def permanently_delete(
workspace_id=self.workspace.workspace_id,
definition_id=self.definition_id,
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
client_id=self.workspace.client_id, # type: ignore[arg-type]
client_secret=self.workspace.client_secret, # type: ignore[arg-type]
bearer_token=self.workspace.bearer_token,
safe_mode=safe_mode,
)
else:
Expand Down Expand Up @@ -577,8 +587,9 @@ def update_definition(
definition_id=self.definition_id,
manifest=manifest_dict,
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
client_id=self.workspace.client_id, # type: ignore[arg-type]
client_secret=self.workspace.client_secret, # type: ignore[arg-type]
bearer_token=self.workspace.bearer_token,
)
return CustomCloudSourceDefinition._from_yaml_response(self.workspace, result)

Expand Down Expand Up @@ -683,8 +694,9 @@ def deploy_source(
workspace_id=self.workspace.workspace_id,
config=config,
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
client_id=self.workspace.client_id, # type: ignore[arg-type]
client_secret=self.workspace.client_secret, # type: ignore[arg-type]
bearer_token=self.workspace.bearer_token,
)
return CloudSource._from_source_response( # noqa: SLF001 # Accessing Non-Public API
workspace=self.workspace,
Expand Down
5 changes: 5 additions & 0 deletions airbyte/cloud/sync_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResp
connection_id=self.connection.connection_id,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)
return self._connection_response

Expand All @@ -266,6 +267,7 @@ def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)
return asdict(destination_response.configuration)

Expand All @@ -287,6 +289,7 @@ def _fetch_latest_job_info(self) -> JobResponse:
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)
return self._latest_job_info

Expand All @@ -313,6 +316,7 @@ def start_time(self) -> datetime:
json={"id": self.job_id},
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)
raw_start_time = job_info_raw.get("startTime")
if raw_start_time:
Expand All @@ -332,6 +336,7 @@ def _fetch_job_with_attempts(self) -> dict[str, Any]:
},
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)
return self._job_with_attempts_info

Expand Down
Loading
Loading