Skip to content

Commit 299d556

Browse files
feat(cloud): Add workspace validation to CloudConnection (#929)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
1 parent 0f76d41 commit 299d556

2 files changed

Lines changed: 103 additions & 4 deletions

File tree

airbyte/cloud/connections.py

Lines changed: 82 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from airbyte._util import api_util
99
from airbyte.cloud.connectors import CloudDestination, CloudSource
1010
from airbyte.cloud.sync_results import SyncResult
11+
from airbyte.exceptions import AirbyteWorkspaceMismatchError
1112

1213

1314
if TYPE_CHECKING:
@@ -16,7 +17,7 @@
1617
from airbyte.cloud.workspaces import CloudWorkspace
1718

1819

19-
class CloudConnection:
20+
class CloudConnection: # noqa: PLR0904 # Too many public methods
2021
"""A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
2122
2223
You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
@@ -54,9 +55,41 @@ def __init__(
5455
self._cloud_destination_object: CloudDestination | None = None
5556
"""The destination object. (Cached.)"""
5657

57-
def _fetch_connection_info(self) -> ConnectionResponse:
58-
"""Populate the connection with data from the API."""
59-
return api_util.get_connection(
58+
def _fetch_connection_info(
59+
self,
60+
*,
61+
force_refresh: bool = False,
62+
verify: bool = True,
63+
) -> ConnectionResponse:
64+
"""Fetch and cache connection info from the API.
65+
66+
By default, this method will only fetch from the API if connection info is not
67+
already cached. It also verifies that the connection belongs to the expected
68+
workspace unless verification is explicitly disabled.
69+
70+
Args:
71+
force_refresh: If True, always fetch from the API even if cached.
72+
If False (default), only fetch if not already cached.
73+
verify: If True (default), verify that the connection is valid (e.g., that
74+
the workspace_id matches this object's workspace). Raises an error if
75+
validation fails.
76+
77+
Returns:
78+
The ConnectionResponse from the API.
79+
80+
Raises:
81+
AirbyteWorkspaceMismatchError: If verify is True and the connection's
82+
workspace_id doesn't match the expected workspace.
83+
AirbyteMissingResourceError: If the connection doesn't exist.
84+
"""
85+
if not force_refresh and self._connection_info is not None:
86+
# Use cached info, but still verify if requested
87+
if verify:
88+
self._verify_workspace_match(self._connection_info)
89+
return self._connection_info
90+
91+
# Fetch from API
92+
connection_info = api_util.get_connection(
6093
workspace_id=self.workspace.workspace_id,
6194
connection_id=self.connection_id,
6295
api_root=self.workspace.api_root,
@@ -65,6 +98,51 @@ def _fetch_connection_info(self) -> ConnectionResponse:
6598
bearer_token=self.workspace.bearer_token,
6699
)
67100

101+
# Cache the result first (before verification may raise)
102+
self._connection_info = connection_info
103+
104+
# Verify if requested
105+
if verify:
106+
self._verify_workspace_match(connection_info)
107+
108+
return connection_info
109+
110+
def _verify_workspace_match(self, connection_info: ConnectionResponse) -> None:
111+
"""Verify that the connection belongs to the expected workspace.
112+
113+
Raises:
114+
AirbyteWorkspaceMismatchError: If the workspace IDs don't match.
115+
"""
116+
if connection_info.workspace_id != self.workspace.workspace_id:
117+
raise AirbyteWorkspaceMismatchError(
118+
resource_type="connection",
119+
resource_id=self.connection_id,
120+
workspace=self.workspace,
121+
expected_workspace_id=self.workspace.workspace_id,
122+
actual_workspace_id=connection_info.workspace_id,
123+
message=(
124+
f"Connection '{self.connection_id}' belongs to workspace "
125+
f"'{connection_info.workspace_id}', not '{self.workspace.workspace_id}'."
126+
),
127+
)
128+
129+
def check_is_valid(self) -> bool:
130+
"""Check if this connection exists and belongs to the expected workspace.
131+
132+
This method fetches connection info from the API (if not already cached) and
133+
verifies that the connection's workspace_id matches the workspace associated
134+
with this CloudConnection object.
135+
136+
Returns:
137+
True if the connection exists and belongs to the expected workspace.
138+
139+
Raises:
140+
AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
141+
AirbyteMissingResourceError: If the connection doesn't exist.
142+
"""
143+
self._fetch_connection_info(force_refresh=False, verify=True)
144+
return True
145+
68146
@classmethod
69147
def _from_connection_response(
70148
cls,

airbyte/exceptions.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,27 @@ class AirbyteConnectionSyncError(AirbyteConnectionError):
490490
"""An error occurred while executing the remote Airbyte job."""
491491

492492

493+
@dataclass
494+
class AirbyteWorkspaceMismatchError(AirbyteError):
495+
"""Resource does not belong to the expected workspace.
496+
497+
This error is raised when a resource (connection, source, or destination) is fetched
498+
from the API and the workspace ID in the response does not match the expected workspace.
499+
"""
500+
501+
resource_type: str | None = None
502+
"""The type of resource (e.g., 'connection', 'source', 'destination')."""
503+
504+
resource_id: str | None = None
505+
"""The ID of the resource that was fetched."""
506+
507+
expected_workspace_id: str | None = None
508+
"""The workspace ID that was expected."""
509+
510+
actual_workspace_id: str | None = None
511+
"""The workspace ID returned by the API."""
512+
513+
493514
@dataclass
494515
class AirbyteConnectionSyncTimeoutError(AirbyteConnectionSyncError):
495516
"""An timeout occurred while waiting for the remote Airbyte job to complete."""

0 commit comments

Comments
 (0)