Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions veadk/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from veadk.tracing.base_tracer import BaseTracer
from veadk.utils.logger import get_logger
from veadk.utils.patches import patch_asyncio, patch_tracer
from veadk.tools.builtin_tools.agent_authorization import check_agent_authorization
from veadk.version import VERSION

patch_tracer()
Expand Down Expand Up @@ -123,6 +124,8 @@ class Agent(LlmAgent):
)
"""

enable_authz: bool = False

def model_post_init(self, __context: Any) -> None:
super().model_post_init(None) # for sub_agents init

Expand Down Expand Up @@ -184,6 +187,18 @@ def model_post_init(self, __context: Any) -> None:
load_memory.custom_metadata["backend"] = self.long_term_memory.backend
self.tools.append(load_memory)

if self.enable_authz:
if self.before_agent_callback:
if isinstance(self.before_agent_callback, list):
self.before_agent_callback.append(check_agent_authorization)
else:
self.before_agent_callback = [
self.before_agent_callback,
check_agent_authorization,
]
else:
self.before_agent_callback = check_agent_authorization

logger.info(f"VeADK version: {VERSION}")

logger.info(f"{self.__class__.__name__} `{self.name}` init done.")
Expand Down
11 changes: 11 additions & 0 deletions veadk/configs/auth_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ class VeIdentityConfig(BaseSettings):
If not provided, the endpoint will be auto-generated based on the region.
"""

role_trn: str = ""
"""The TRN of the role, in the format: trn:iam::${AccountId}:role/${RoleName}

When accessing with AK/SK (without session_token), this role will be automatically assumed to obtain temporary credentials.
For example: trn:iam::2000012345:role/MyWorkloadRole
"""

role_session_name: str = "veadk_assume_role_session"
"""Role session name, used to distinguish different sessions in audit logs.
"""

def get_endpoint(self) -> str:
"""Get the endpoint URL for Identity service.

Expand Down
105 changes: 105 additions & 0 deletions veadk/integrations/ve_identity/identity_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@
import aiohttp
import volcenginesdkid
import volcenginesdkcore
import volcenginesdksts

from veadk.integrations.ve_identity.models import (
AssumeRoleCredential,
DCRRegistrationRequest,
DCRRegistrationResponse,
OAuth2TokenResponse,
WorkloadToken,
)
from veadk.auth.veauth.utils import get_credential_from_vefaas_iam
from veadk.configs.auth_configs import VeIdentityConfig

from veadk.utils.logger import get_logger

Expand Down Expand Up @@ -77,6 +80,20 @@ def _refresh_creds(self: IdentityClient):
except Exception as e:
logger.warning(f"Failed to retrieve credentials from VeFaaS IAM: {e}")

# If there is no session_token and role_trn is configured, execute AssumeRole
if not session_token and self._identity_config.role_trn and ak and sk:
try:
logger.info(
f"No session token found, attempting AssumeRole with role: {self._identity_config.role_trn}"
)
sts_credentials = self._assume_role(ak, sk)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

感觉后面得给这个 sts_credentials 做个缓存每次调用identity接口都需要请求assume role开销有点大

ak = sts_credentials.access_key_id
sk = sts_credentials.secret_access_key
session_token = sts_credentials.session_token
logger.info("Successfully assumed role and obtained STS credentials")
except Exception as e:
logger.warning(f"Failed to assume role: {e}")

# Update configuration with the credentials
self._api_client.api_client.configuration.ak = ak
self._api_client.api_client.configuration.sk = sk
Expand Down Expand Up @@ -115,6 +132,7 @@ def __init__(
secret_key: Optional[str] = None,
session_token: Optional[str] = None,
region: str = "cn-beijing",
identity_config: Optional[VeIdentityConfig] = None,
):
"""Initialize the identity client.

Expand All @@ -128,6 +146,8 @@ def __init__(
KeyError: If required environment variables are not set.
"""
self.region = region
self._identity_config = identity_config or VeIdentityConfig()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里直接类似 https://github.com/volcengine/veadk-python/blob/main/veadk/integrations/ve_identity/auth_config.py#L30 用settings里全局的配置 可能就行 不太需要放到实例里 包括下面 self._identity_config的使用 都可以直接用 settings.veidentity


# Store initial credentials for fallback
self._initial_access_key = access_key or os.getenv("VOLCENGINE_ACCESS_KEY", "")
self._initial_secret_key = secret_key or os.getenv("VOLCENGINE_SECRET_KEY", "")
Expand All @@ -146,6 +166,56 @@ def __init__(
volcenginesdkcore.ApiClient(configuration)
)

def _assume_role(self, access_key: str, secret_key: str) -> AssumeRoleCredential:
"""Execute AssumeRole to get STS temporary credentials.

Args:
access_key: VolcEngine access key
secret_key: VolcEngine secret key

Returns:
AssumeRoleCredential containing temporary credentials

Raises:
Exception: If AssumeRole fails
"""
# Create STS client configuration
sts_config = volcenginesdkcore.Configuration()
sts_config.region = self.region
sts_config.ak = access_key
sts_config.sk = secret_key

# Create an STS API client
sts_client = volcenginesdksts.STSApi(volcenginesdkcore.ApiClient(sts_config))

# Construct an AssumeRole request
assume_role_request = volcenginesdksts.AssumeRoleRequest(
role_trn=self._identity_config.role_trn,
role_session_name=self._identity_config.role_session_name,
)

logger.info(
f"Executing AssumeRole for role: {self._identity_config.role_trn}, "
f"session: {self._identity_config.role_session_name}"
)

response: volcenginesdksts.AssumeRoleResponse = sts_client.assume_role(
assume_role_request
)

if not response.credentials:
raise Exception("AssumeRole returned no credentials")

access_key = response["access_key_id"]
secret_key = response["secret_access_key"]
session_token = response["session_token"]

return AssumeRoleCredential(
access_key_id=access_key,
secret_access_key=secret_key,
session_token=session_token,
)

@refresh_credentials
def create_oauth2_credential_provider(
self, request_params: Dict[str, Any]
Expand Down Expand Up @@ -533,3 +603,38 @@ async def create_oauth2_credential_provider_with_dcr(

# Create the credential provider with updated config
return self.create_oauth2_credential_provider(request_params)

@refresh_credentials
def check_permission(
self, principal_id, operation, resource_id, namespace="default"
) -> bool:
"""Check if the principal has permission to perform the operation on the resource.

Args:
principal_id: The ID of the principal (user or service).
operation: The operation to check permission for.
resource_id: The ID of the resource.
namespace: The namespace of the resource. Defaults to "default".

Returns:
True if the principal has permission, False otherwise.
"""
logger.info(
f"Checking permission for principal {principal_id} on resource {resource_id} for operation {operation}..."
)

request = volcenginesdkid.CheckPermissionRequest(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里request的字段不太对
class CheckPermissionRequest(
namespace_name: Any | None = None,
operation: Any | None = None,
original_callers: Any | None = None,
principal: Any | None = None,
references: Any | None = None,
resource: Any | None = None,
_configuration: Any | None = None
)

principal_id=principal_id,
operation=operation,
resource_id=resource_id,
namespace=namespace,
)

response: volcenginesdkid.CheckPermissionResponse = (
self._api_client.check_permission(request)
)

logger.info(
f"Permission check result for principal {principal_id} on resource {resource_id}: {response.allowed}"
)
return response.allowed
6 changes: 6 additions & 0 deletions veadk/integrations/ve_identity/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,9 @@ def validate_expires_at_positive(cls, v: int) -> int:
if v <= 0:
raise ValueError("expires_at must be a positive Unix timestamp")
return v


class AssumeRoleCredential(BaseModel):
access_key_id: str
secret_access_key: str
session_token: str
74 changes: 74 additions & 0 deletions veadk/tools/builtin_tools/agent_authorization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Optional

from google.genai import types
from google.adk.agents.callback_context import CallbackContext

from veadk.integrations.ve_identity.auth_config import _get_default_region
from veadk.integrations.ve_identity.identity_client import IdentityClient
from veadk.integrations.ve_identity.token_manager import get_workload_token
from veadk.utils.logger import get_logger

logger = get_logger(__name__)


region = _get_default_region()
identity_client = IdentityClient(region=region)


async def check_agent_authorization(
callback_context: CallbackContext,
) -> Optional[types.Content]:
"""Check if the agent is authorized to run using VeIdentity."""
workload_token = await get_workload_token(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里目前 在workload_name为空的情况下 会取agent_name 调用到identity_client的get_workload_access_token,需要先改一下 get_workload_access_token 方法签名的 workload_name 参数改为 optional 然后token_manager不传递agent_name

tool_context=callback_context,
identity_client=identity_client,
)

# Parse role_id from workload_token
# Format: trn:id:${Region}:${Account}:workloadpool/default/workload/${RoleId}
role_id = None
if workload_token:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try:
role_id = workload_token.split("/")[-1]
logger.debug(f"Parsed role_id: {role_id}")
except Exception as e:
logger.warning(f"Failed to parse role_id from workload_token: {e}")

agent_name = callback_context.agent_name
user_id = callback_context._invocation_context.user_id

namespace = "default"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

和方法的默认参数重复

user_id = user_id
action = "invoke"
workload_id = role_id if role_id else agent_name

allowed = identity_client.check_permission(
principal_id=user_id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里 principal, operation, resource 都是 包含 Type和Id的结构体,可能适合先定义一个数据类直接传递,operation可以固定成type: action, id: invoke

operation=action,
resource_id=workload_id,
namespace=namespace,
)

if allowed:
logger.debug("Agent is authorized to run.")
return None
else:
logger.warning("Agent is not authorized to run.")
return types.Content(
parts=[types.Part(text=f"Agent {agent_name} is not authorized to run.")],
role="model",
)