diff --git a/Cargo.lock b/Cargo.lock index e1e3eec0..435669ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -861,10 +861,13 @@ dependencies = [ "atoma-auth", "atoma-state", "atoma-sui", + "atoma-utils", "axum 0.8.4", "base64 0.22.1", + "blake2", "chrono", "config", + "fastcrypto 0.1.8", "rand 0.8.5", "reqwest", "serde", diff --git a/atoma-proxy-service/Cargo.toml b/atoma-proxy-service/Cargo.toml index 47e9b26d..2591f8c1 100644 --- a/atoma-proxy-service/Cargo.toml +++ b/atoma-proxy-service/Cargo.toml @@ -9,10 +9,13 @@ anyhow.workspace = true atoma-auth.workspace = true atoma-state.workspace = true atoma-sui.workspace = true +atoma-utils.workspace = true axum.workspace = true base64.workspace = true +blake2.workspace = true chrono.workspace = true config.workspace = true +fastcrypto.workspace = true rand.workspace = true reqwest.workspace = true serde = { workspace = true, features = [ "derive" ] } diff --git a/atoma-proxy-service/docs/openapi.yml b/atoma-proxy-service/docs/openapi.yml index 2898b5a4..9c0578f5 100644 --- a/atoma-proxy-service/docs/openapi.yml +++ b/atoma-proxy-service/docs/openapi.yml @@ -666,38 +666,81 @@ paths: description: Pricing updated successfully '500': description: Failed to update pricing - /google_oauth: - post: + /v1/get_node_attestation/{node_small_id}: + get: tags: - - Auth - summary: |- - Logs in a user with the proxy service using Google OAuth. - This endpoint is used to verify a Google ID token and return an access token. + - Nodes + summary: Retrieves the node attestation for a specific node. description: |- # Arguments - * `proxy_service_state` - The shared state containing the state manager - * `body` - The request body containing the Google ID token + * `node_small_id` - The small ID of the node for which to retrieve the attestation # Returns + * `Result>` - A JSON response containing the node attestation + - `Ok(Json)` - Successfully retrieved node attestation + - `Err(StatusCode::INTERNAL_SERVER_ERROR)` - Failed to retrieve node attestation from state manager - * `Result>` - A JSON response containing the access and refresh tokens - operationId: google_oauth + # Example Response + Returns a JSON object representing the node attestation: + ```json + { + "node_small_id": 123, + "attestation": [0,1,2,3] + } + operationId: get_node_attestation + parameters: + - name: node_small_id + in: path + description: The small ID of the node for which to retrieve the attestation + required: true + schema: + type: integer + format: int64 + responses: + '200': + description: Retrieves node attestation for a specific node + '500': + description: Failed to get node attestation + /v1/update_node_attestation: + put: + tags: + - Nodes + summary: ' Updates the node attestation for a specific node.' + description: |- + # Arguments + + * `proxy_service_state` - The shared state containing the state manager + * `node_small_id` - The small ID of the node for which to update the attestation + * `attestation` - The new attestation data for the node + + # Returns + + * `Result` - A status code indicating the result of the operation + - `Ok(StatusCode::OK)` - Successfully updated node attestation + - `Err(StatusCode::INTERNAL_SERVER_ERROR)` - Failed to update node attestation in state manager + + # Example Request + + ```json + { + "node_small_id": 123, + "attestation": [0,1,2,3], + "signature": [4,5,6,7] + } + ``` + operationId: update_node_attestation requestBody: content: - text/plain: + application/json: schema: - type: string + $ref: '#/components/schemas/NodeAttestation' required: true responses: '200': - description: Logs in a user with Google OAuth - content: - text/plain: - schema: - type: string + description: Successfully updated node attestation '500': - description: Failed to verify Google ID token + description: Failed to update node attestation components: schemas: CreateTokenRequest: @@ -725,6 +768,21 @@ components: password: type: string description: The user's password + NodeAttestation: + type: object + required: + - node_small_id + - attestation + properties: + attestation: + type: array + items: + type: integer + format: int32 + minimum: 0 + node_small_id: + type: integer + format: int64 ProofRequest: type: object description: |- diff --git a/atoma-proxy-service/src/components/openapi.rs b/atoma-proxy-service/src/components/openapi.rs index 7a797479..de7ab0f9 100644 --- a/atoma-proxy-service/src/components/openapi.rs +++ b/atoma-proxy-service/src/components/openapi.rs @@ -18,6 +18,10 @@ use crate::{ LOGIN_PATH, REGISTER_PATH, REVOKE_API_TOKEN_PATH, UPDATE_SUI_ADDRESS_PATH, USDC_PAYMENT_PATH, }, + nodes::{ + GetNodeAttestationOpenApi, UpdateNodeAttestationOpenApi, GET_NODE_ATTESTATION_PATH, + UPDATE_NODE_ATTESTATION_PATH, + }, settings::{SetPricingForUserOpenApi, SET_PRICE_FOR_USER_PATH}, stacks::{ GetCurrentStacksOpenApi, GetStacksByUserId, GET_ALL_STACKS_FOR_USER_PATH, @@ -63,6 +67,8 @@ pub fn openapi_router() -> Router { (path = GET_GRAPHS_PATH, api = GetGraphs, tags = ["Stats"]), (path = GET_STATS_PATH, api = GetStats, tags = ["Stats"]), (path = SET_PRICE_FOR_USER_PATH, api = SetPricingForUserOpenApi, tags = ["Settings"]), + (path = GET_NODE_ATTESTATION_PATH, api = GetNodeAttestationOpenApi, tags = ["Nodes"]), + (path = UPDATE_NODE_ATTESTATION_PATH, api = UpdateNodeAttestationOpenApi, tags = ["Nodes"]), ), tags( (name = "Health", description = "Health check endpoints"), diff --git a/atoma-proxy-service/src/handlers/mod.rs b/atoma-proxy-service/src/handlers/mod.rs index c68ec7f3..4dab3cdc 100644 --- a/atoma-proxy-service/src/handlers/mod.rs +++ b/atoma-proxy-service/src/handlers/mod.rs @@ -1,4 +1,5 @@ pub mod auth; +pub mod nodes; pub mod settings; pub mod stacks; pub mod stats; diff --git a/atoma-proxy-service/src/handlers/nodes.rs b/atoma-proxy-service/src/handlers/nodes.rs new file mode 100644 index 00000000..8907ff20 --- /dev/null +++ b/atoma-proxy-service/src/handlers/nodes.rs @@ -0,0 +1,259 @@ +use std::str::FromStr; + +use atoma_state::types::NodeAttestation; +use atoma_utils::constants::SIGNATURE; +use axum::{ + extract::{Path, State}, + http::{HeaderMap, StatusCode}, + routing::{get, put}, + Json, Router, +}; +use blake2::Digest; +use fastcrypto::{ + ed25519::{Ed25519PublicKey, Ed25519Signature}, + secp256k1::{Secp256k1PublicKey, Secp256k1Signature}, + secp256r1::{Secp256r1PublicKey, Secp256r1Signature}, + traits::VerifyingKey, +}; +use sui_sdk::types::{ + base_types::SuiAddress, + crypto::{PublicKey, Signature, SignatureScheme, SuiSignature, ToFromBytes}, +}; +use tracing::{error, instrument}; +use utoipa::OpenApi; + +use crate::ProxyServiceState; + +type Result = std::result::Result; + +/// The path for the update of node attestation endpoint. +pub const UPDATE_NODE_ATTESTATION_PATH: &str = "/v1/update_node_attestation"; + +/// The path for the get node attestation endpoint. +pub const GET_NODE_ATTESTATION_PATH: &str = "/v1/get_node_attestation"; + +/// Returns a router with the subscriptions endpoint. +/// +/// # Returns +/// * `Router` - A router with the subscriptions endpoint +pub fn nodes_router() -> Router { + Router::new() + .route(UPDATE_NODE_ATTESTATION_PATH, put(update_node_attestation)) + .route( + &format!("{GET_NODE_ATTESTATION_PATH}/{{id}}"), + get(get_node_attestation), + ) +} + +/// OpenAPI documentation for the update_node_attestation endpoint. +/// +/// This struct is used to generate OpenAPI documentation for the update_node_attestation +/// endpoint. It uses the `utoipa` crate's derive macro to automatically generate +/// the OpenAPI specification from the code. +#[derive(OpenApi)] +#[openapi(paths(update_node_attestation))] +pub struct UpdateNodeAttestationOpenApi; + +/// Updates the node attestation for a specific node. +/// +///# Arguments +/// +/// * `proxy_service_state` - The shared state containing the state manager +/// * `node_small_id` - The small ID of the node for which to update the attestation +/// * `attestation` - The new attestation data for the node +/// +/// # Returns +/// +/// * `Result` - A status code indicating the result of the operation +/// - `Ok(StatusCode::OK)` - Successfully updated node attestation +/// - `Err(StatusCode::INTERNAL_SERVER_ERROR)` - Failed to update node attestation in state manager +/// +/// # Example Request +/// +/// ```json +/// { +/// "node_small_id": 123, +/// "attestation": [0,1,2,3], +/// "signature": [4,5,6,7] +/// } +/// ``` +#[utoipa::path( + put, + path = "", + request_body = NodeAttestation, + responses( + (status = OK, description = "Successfully updated node attestation"), + (status = INTERNAL_SERVER_ERROR, description = "Failed to update node attestation") + ) +)] +#[instrument(level = "trace", skip_all)] +pub async fn update_node_attestation( + State(proxy_service_state): State, + headers: HeaderMap, + Json(update_attestation): Json, +) -> Result { + let signature = headers + .get(SIGNATURE) + .ok_or_else(|| { + error!("Signature header is missing"); + StatusCode::BAD_REQUEST + })? + .to_str() + .map_err(|_| { + error!("Invalid signature format"); + StatusCode::BAD_REQUEST + })?; + let signature = Signature::from_str(signature).map_err(|_| { + error!("Invalid signature format"); + StatusCode::BAD_REQUEST + })?; + + let signature_bytes = signature.signature_bytes(); + let public_key_bytes = signature.public_key_bytes(); + let signature_scheme = signature.scheme(); + let public_key = + PublicKey::try_from_bytes(signature_scheme, public_key_bytes).map_err(|e| { + error!("Failed to extract public key from bytes, with error: {e}"); + StatusCode::BAD_REQUEST + })?; + let mut hasher = blake2::Blake2b::new(); + hasher.update(update_attestation.node_small_id.to_le_bytes()); + hasher.update(&update_attestation.attestation); + let attestation_hash: [u8; 32] = hasher.finalize().into(); + + match signature_scheme { + SignatureScheme::ED25519 => { + let public_key = Ed25519PublicKey::from_bytes(public_key.as_ref()).unwrap(); + let signature = Ed25519Signature::from_bytes(signature_bytes).unwrap(); + public_key + .verify(&attestation_hash, &signature) + .map_err(|_| { + error!("Failed to verify signature"); + StatusCode::UNAUTHORIZED + })?; + } + SignatureScheme::Secp256k1 => { + let public_key = Secp256k1PublicKey::from_bytes(public_key.as_ref()).unwrap(); + let signature = Secp256k1Signature::from_bytes(signature_bytes).unwrap(); + public_key + .verify(&attestation_hash, &signature) + .map_err(|_| { + error!("Failed to verify signature"); + StatusCode::UNAUTHORIZED + })?; + } + SignatureScheme::Secp256r1 => { + let public_key = Secp256r1PublicKey::from_bytes(public_key.as_ref()).unwrap(); + let signature = Secp256r1Signature::from_bytes(signature_bytes).unwrap(); + public_key + .verify(&attestation_hash, &signature) + .map_err(|_| { + error!("Failed to verify signature"); + StatusCode::UNAUTHORIZED + })?; + } + _ => { + error!("Currently unsupported signature scheme"); + return Err(StatusCode::BAD_REQUEST); + } + } + + let public_address = proxy_service_state + .atoma_state + .get_node_sui_address(update_attestation.node_small_id) + .await + .map_err(|_| { + error!("Failed to get node sui address"); + StatusCode::INTERNAL_SERVER_ERROR + })? + .ok_or_else(|| { + error!( + "Node sui address not found for node_small_id: {}", + update_attestation.node_small_id + ); + StatusCode::NOT_FOUND + })?; + + let sui_address = SuiAddress::from(&public_key); + + if public_address != sui_address.to_string() { + error!( + "Public key does not match the sui address for node_small_id: {}", + update_attestation.node_small_id + ); + return Err(StatusCode::UNAUTHORIZED); + } + + proxy_service_state + .atoma_state + .update_node_attestation(update_attestation) + .await + .map_err(|_| { + error!("Failed to update node attestation"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + Ok(StatusCode::OK) +} + +/// OpenAPI documentation for the get_node_attestation endpoint. +/// +/// This struct is used to generate OpenAPI documentation for the get_node_attestation +/// endpoint. It uses the `utoipa` crate's derive macro to automatically generate +/// the OpenAPI specification from the code. +#[derive(OpenApi)] +#[openapi(paths(get_node_attestation))] +pub struct GetNodeAttestationOpenApi; + +/// Retrieves the node attestation for a specific node. +/// +/// # Arguments +/// * `proxy_service_state` - The shared state containing the state manager +/// * `node_small_id` - The small ID of the node for which to retrieve the attestation +/// +/// # Returns +/// * `Result>` - A JSON response containing the node attestation +/// - `Ok(Json)` - Successfully retrieved node attestation +/// - `Err(StatusCode::INTERNAL_SERVER_ERROR)` - Failed to retrieve node attestation from state manager +/// +/// # Example Response +/// Returns a JSON object representing the node attestation: +/// ```json +/// { +/// "node_small_id": 123, +/// "attestation": [0,1,2,3] +/// } +#[utoipa::path( + get, + path = "/{node_small_id}", + params( + ("node_small_id" = i64, description = "The small ID of the node for which to retrieve the attestation") + ), + responses( + (status = OK, description = "Retrieves node attestation for a specific node"), + (status = INTERNAL_SERVER_ERROR, description = "Failed to get node attestation") + ) +)] +#[instrument(level = "trace", skip_all)] +pub async fn get_node_attestation( + State(proxy_service_state): State, + Path(node_small_id): Path, +) -> Result> { + Ok(Json( + proxy_service_state + .atoma_state + .get_node_attestation(node_small_id) + .await + .map_err(|_| { + error!("Failed to get nodes subscriptions"); + StatusCode::INTERNAL_SERVER_ERROR + })? + .ok_or_else(|| { + error!( + "Node attestation not found for node_small_id: {}", + node_small_id + ); + StatusCode::NOT_FOUND + })?, + )) +} diff --git a/atoma-proxy-service/src/proxy_service.rs b/atoma-proxy-service/src/proxy_service.rs index 95f5248d..6a8705cc 100644 --- a/atoma-proxy-service/src/proxy_service.rs +++ b/atoma-proxy-service/src/proxy_service.rs @@ -16,8 +16,8 @@ use utoipa::OpenApi; use crate::{ components::{grafana::Grafana, openapi::openapi_router}, handlers::{ - auth::auth_router, settings::settings_router, stacks::stacks_router, stats::stats_router, - subscriptions::subscriptions_router, tasks::tasks_router, + auth::auth_router, nodes::nodes_router, settings::settings_router, stacks::stacks_router, + stats::stats_router, subscriptions::subscriptions_router, tasks::tasks_router, }, ModelModality, }; @@ -161,7 +161,7 @@ pub async fn run_proxy_service( pub fn create_proxy_service_router(proxy_service_state: ProxyServiceState) -> Router { let cors = CorsLayer::new() .allow_origin(Any) - .allow_methods(vec![Method::GET, Method::POST]) + .allow_methods(vec![Method::GET, Method::POST, Method::PUT]) .allow_headers(Any); Router::new() .merge(auth_router()) @@ -170,6 +170,7 @@ pub fn create_proxy_service_router(proxy_service_state: ProxyServiceState) -> Ro .merge(subscriptions_router()) .merge(tasks_router()) .merge(stats_router()) + .merge(nodes_router()) .layer(cors) .with_state(proxy_service_state) .route(HEALTH_PATH, get(health)) diff --git a/atoma-state/src/migrations/20250722090829_node-attestations.sql b/atoma-state/src/migrations/20250722090829_node-attestations.sql new file mode 100644 index 00000000..63da49a4 --- /dev/null +++ b/atoma-state/src/migrations/20250722090829_node-attestations.sql @@ -0,0 +1,6 @@ +CREATE TABLE + node_attestations ( + node_small_id BIGINT PRIMARY KEY NOT NULL, + attestation BYTEA NOT NULL, + updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP + ); diff --git a/atoma-state/src/state_manager.rs b/atoma-state/src/state_manager.rs index d5a50a69..6e937509 100644 --- a/atoma-state/src/state_manager.rs +++ b/atoma-state/src/state_manager.rs @@ -4,8 +4,9 @@ use crate::handlers::{handle_atoma_event, handle_p2p_event, handle_state_manager use crate::network::NetworkMetrics; use crate::types::{ AtomaAtomaStateManagerEvent, CheapestNode, ComputedUnitsProcessedResponse, LatencyResponse, - NodeDistribution, NodePublicKey, NodeSubscription, Pricing, Stack, StackAttestationDispute, - StackSettlementTicket, StatsStackResponse, Task, TokenResponse, UserProfile, + NodeAttestation, NodeDistribution, NodePublicKey, NodeSubscription, Pricing, Stack, + StackAttestationDispute, StackSettlementTicket, StatsStackResponse, Task, TokenResponse, + UserProfile, }; use crate::{build_query_with_in, AtomaStateManagerError}; @@ -1141,6 +1142,95 @@ impl AtomaState { .collect() } + /// Retrieves the attestation for a specific node. + /// This method fetches the attestation record from the `node_attestations` table + /// for a given node identified by its small ID. + /// + /// # Arguments + /// + /// * `node_small_id` - The unique identifier of the node for which the attestation is requested + /// + /// # Returns + /// + /// - `Result>`: A result containing either: + /// - `Ok(Some(NodeAttestation))`: The `NodeAttestation` object if found. + /// - `Ok(None)`: If no attestation exists for the specified node. + /// - `Err(AtomaStateManagerError)`: An error if the database query fails or if there's an issue parsing the results. + /// + /// # Errors + /// + /// This function will return an error if: + /// - The database query fails to execute. + /// - There's an issue converting the database row into a `NodeAttestation` object. + /// + /// # Example + /// + /// ```rust,ignore + /// use atoma_node::atoma_state::{AtomaStateManager, NodeAttestation}; + /// + /// async fn get_node_attestation(state_manager: &AtomaStateManager, node_small_id: i64) -> Result, AtomaStateManagerError> { + /// state_manager.get_node_attestation(node_small_id).await + /// } + /// ``` + #[instrument(level = "trace", skip_all, fields(%node_small_id))] + pub async fn get_node_attestation( + &self, + node_small_id: i64, + ) -> Result> { + let attestation = sqlx::query("SELECT * FROM node_attestations WHERE node_small_id = $1") + .bind(node_small_id) + .fetch_optional(&self.db) + .await?; + attestation + .map(|attestation| { + NodeAttestation::from_row(&attestation).map_err(AtomaStateManagerError::from) + }) + .transpose() + } + + /// Updates or inserts a node attestation in the database. + /// This method either updates an existing attestation for a node or inserts a new one + /// if it does not already exist. + /// + /// # Arguments + /// + /// * `attestation` - The `NodeAttestation` object containing the node's small ID, + /// the attestation data, and its validity status. + /// + /// # Returns + /// + /// - `Result<()>`: A result indicating success (Ok(())) or failure (Err(AtomaStateManagerError)). + /// + /// # Errors + /// + /// This function will return an error if: + /// - The database query fails to execute. + /// - There's an issue with the data being inserted or updated. + /// # Example + /// + /// ```rust,ignore + /// use atoma_node::atoma_state::{AtomaStateManager, NodeAttestation}; + /// + /// async fn update_node_attestation(state_manager: &AtomaStateManager, attestation: NodeAttestation) -> Result<(), AtomaStateManagerError> { + /// state_manager.update_node_attestation(attestation).await + /// } + /// ``` + #[instrument(level = "trace", skip_all, fields(node_small_id = %attestation.node_small_id))] + pub async fn update_node_attestation(&self, attestation: NodeAttestation) -> Result<()> { + sqlx::query( + "INSERT INTO node_attestations (node_small_id, attestation) + VALUES ($1, $2) + ON CONFLICT (node_small_id) DO UPDATE SET + attestation = EXCLUDED.attestation, + updated_at = NOW()", + ) + .bind(attestation.node_small_id) + .bind(attestation.attestation) + .execute(&self.db) + .await?; + Ok(()) + } + /// Retrieves all stacks that are not settled. /// /// This method fetches all stack records from the `stacks` table that are not in the settle period. diff --git a/atoma-state/src/types.rs b/atoma-state/src/types.rs index 19e34c7c..c01002d1 100644 --- a/atoma-state/src/types.rs +++ b/atoma-state/src/types.rs @@ -598,6 +598,12 @@ pub struct NodePublicKey { pub stack_small_id: Option, } +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, FromRow, ToSchema)] +pub struct NodeAttestation { + pub node_small_id: i64, + pub attestation: Vec, +} + pub enum AtomaAtomaStateManagerEvent { /// Locks a stack LockStack {