Skip to content

Commit

Permalink
Merge branch 'main' into kafka-integration
Browse files Browse the repository at this point in the history
  • Loading branch information
hippalus authored Jan 6, 2025
2 parents bb12d04 + 3b02f75 commit 3fa6029
Show file tree
Hide file tree
Showing 12 changed files with 135 additions and 154 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "parseable"
version = "1.6.3"
version = "1.7.0"
authors = ["Parseable Team <[email protected]>"]
edition = "2021"
rust-version = "1.77.1"
Expand Down Expand Up @@ -121,8 +121,8 @@ rstest = "0.23.0"
arrow = "53.0.0"

[package.metadata.parseable_ui]
assets-url = "https://github.com/parseablehq/console/releases/download/v0.9.12/build.zip"
assets-sha1 = "9d5a45f204d709a2dd96f6a5e0b21b3834ee0e36"
assets-url = "https://github.com/parseablehq/console/releases/download/v0.9.14/build.zip"
assets-sha1 = "789e0888883250b928bd71e63522ab8159532992"

[features]
debug = []
Expand Down
62 changes: 0 additions & 62 deletions src/correlation/correlation_utils.rs

This file was deleted.

38 changes: 27 additions & 11 deletions src/correlation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@

use std::collections::HashSet;

use actix_web::http::header::ContentType;
use actix_web::{http::header::ContentType, Error};
use chrono::Utc;
use correlation_utils::user_auth_for_query;
use datafusion::error::DataFusionError;
use http::StatusCode;
use itertools::Itertools;
Expand All @@ -31,12 +30,15 @@ use tokio::sync::RwLock;
use tracing::{error, trace, warn};

use crate::{
handlers::http::rbac::RBACError, option::CONFIG, query::QUERY_SESSION, rbac::map::SessionKey,
storage::ObjectStorageError, users::filters::FilterQuery, utils::get_hash,
handlers::http::rbac::RBACError,
option::CONFIG,
query::QUERY_SESSION,
rbac::{map::SessionKey, Users},
storage::ObjectStorageError,
users::filters::FilterQuery,
utils::{get_hash, user_auth_for_query},
};

pub mod correlation_utils;

pub static CORRELATIONS: Lazy<Correlation> = Lazy::new(Correlation::default);

#[derive(Debug, Default)]
Expand Down Expand Up @@ -77,11 +79,15 @@ impl Correlation {
let correlations = self.0.read().await.iter().cloned().collect_vec();

let mut user_correlations = vec![];
let permissions = Users.get_permissions(session_key);

for c in correlations {
if user_auth_for_query(session_key, &c.table_configs)
.await
.is_ok()
{
let tables = &c
.table_configs
.iter()
.map(|t| t.table_name.clone())
.collect_vec();
if user_auth_for_query(&permissions, tables).is_ok() {
user_correlations.push(c);
}
}
Expand Down Expand Up @@ -220,7 +226,14 @@ impl CorrelationRequest {
}

// check if user has access to table
user_auth_for_query(session_key, &self.table_configs).await?;
let permissions = Users.get_permissions(session_key);
let tables = &self
.table_configs
.iter()
.map(|t| t.table_name.clone())
.collect_vec();

user_auth_for_query(&permissions, tables)?;

// to validate table config, we need to check whether the mentioned fields
// are present in the table or not
Expand Down Expand Up @@ -271,6 +284,8 @@ pub enum CorrelationError {
Unauthorized,
#[error("DataFusion Error: {0}")]
DataFusion(#[from] DataFusionError),
#[error("{0}")]
ActixError(#[from] Error),
}

impl actix_web::ResponseError for CorrelationError {
Expand All @@ -283,6 +298,7 @@ impl actix_web::ResponseError for CorrelationError {
Self::AnyhowError(_) => StatusCode::INTERNAL_SERVER_ERROR,
Self::Unauthorized => StatusCode::BAD_REQUEST,
Self::DataFusion(_) => StatusCode::INTERNAL_SERVER_ERROR,
Self::ActixError(_) => StatusCode::BAD_REQUEST,
}
}

Expand Down
13 changes: 6 additions & 7 deletions src/handlers/airplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ use tonic::transport::{Identity, Server, ServerTlsConfig};
use tonic_web::GrpcWebLayer;

use crate::handlers::http::cluster::get_ingestor_info;
use crate::handlers::http::query::{
authorize_and_set_filter_tags, into_query, update_schema_when_distributed,
};
use crate::handlers::http::query::{into_query, update_schema_when_distributed};
use crate::handlers::livetail::cross_origin_config;
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::option::CONFIG;
Expand All @@ -46,6 +44,7 @@ use crate::utils::arrow::flight::{
send_to_ingester,
};
use crate::utils::time::TimeRange;
use crate::utils::user_auth_for_query;
use arrow_flight::{
flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData,
FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc,
Expand Down Expand Up @@ -157,12 +156,12 @@ impl FlightService for AirServiceImpl {
.ok_or_else(|| Status::aborted("Malformed SQL Provided, Table Name Not Found"))?
.to_owned();

update_schema_when_distributed(streams)
update_schema_when_distributed(&streams)
.await
.map_err(|err| Status::internal(err.to_string()))?;

// map payload to query
let mut query = into_query(&ticket, &session_state, time_range)
let query = into_query(&ticket, &session_state, time_range)
.await
.map_err(|_| Status::internal("Failed to parse query"))?;

Expand Down Expand Up @@ -202,7 +201,7 @@ impl FlightService for AirServiceImpl {
rbac::Response::Authorized => (),
rbac::Response::UnAuthorized => {
return Err(Status::permission_denied(
"user is not authenticated to access this resource",
"user is not authorized to access this resource",
))
}
rbac::Response::ReloadRequired => {
Expand All @@ -212,7 +211,7 @@ impl FlightService for AirServiceImpl {

let permissions = Users.get_permissions(&key);

authorize_and_set_filter_tags(&mut query, permissions, &stream_name).map_err(|_| {
user_auth_for_query(&permissions, &streams).map_err(|_| {
Status::permission_denied("User Does not have permission to access this")
})?;
let time = Instant::now();
Expand Down
45 changes: 31 additions & 14 deletions src/handlers/http/correlation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@

use actix_web::{web, HttpRequest, Responder};
use bytes::Bytes;
use itertools::Itertools;
use relative_path::RelativePathBuf;

use crate::rbac::Users;
use crate::utils::user_auth_for_query;
use crate::{
option::CONFIG, storage::CORRELATIONS_ROOT_DIRECTORY,
utils::actix::extract_session_key_from_req,
};

use crate::correlation::{
correlation_utils::user_auth_for_query, CorrelationConfig, CorrelationError,
CorrelationRequest, CORRELATIONS,
};
use crate::correlation::{CorrelationConfig, CorrelationError, CorrelationRequest, CORRELATIONS};

pub async fn list(req: HttpRequest) -> Result<impl Responder, CorrelationError> {
let session_key = extract_session_key_from_req(&req)
Expand All @@ -52,14 +52,17 @@ pub async fn get(req: HttpRequest) -> Result<impl Responder, CorrelationError> {

let correlation = CORRELATIONS.get_correlation_by_id(correlation_id).await?;

if user_auth_for_query(&session_key, &correlation.table_configs)
.await
.is_ok()
{
Ok(web::Json(correlation))
} else {
Err(CorrelationError::Unauthorized)
}
let permissions = Users.get_permissions(&session_key);

let tables = &correlation
.table_configs
.iter()
.map(|t| t.table_name.clone())
.collect_vec();

user_auth_for_query(&permissions, tables)?;

Ok(web::Json(correlation))
}

pub async fn post(req: HttpRequest, body: Bytes) -> Result<impl Responder, CorrelationError> {
Expand Down Expand Up @@ -93,7 +96,14 @@ pub async fn modify(req: HttpRequest, body: Bytes) -> Result<impl Responder, Cor

// validate whether user has access to this correlation object or not
let correlation = CORRELATIONS.get_correlation_by_id(correlation_id).await?;
user_auth_for_query(&session_key, &correlation.table_configs).await?;
let permissions = Users.get_permissions(&session_key);
let tables = &correlation
.table_configs
.iter()
.map(|t| t.table_name.clone())
.collect_vec();

user_auth_for_query(&permissions, tables)?;

let correlation_request: CorrelationRequest = serde_json::from_slice(&body)?;
correlation_request.validate(&session_key).await?;
Expand Down Expand Up @@ -122,7 +132,14 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, CorrelationError
let correlation = CORRELATIONS.get_correlation_by_id(correlation_id).await?;

// validate user's query auth
user_auth_for_query(&session_key, &correlation.table_configs).await?;
let permissions = Users.get_permissions(&session_key);
let tables = &correlation
.table_configs
.iter()
.map(|t| t.table_name.clone())
.collect_vec();

user_auth_for_query(&permissions, tables)?;

// Delete from disk
let store = CONFIG.storage().get_object_store();
Expand Down
25 changes: 20 additions & 5 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@ use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
use crate::metadata::{SchemaVersion, STREAM_INFO};
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
use crate::option::{Mode, CONFIG};
use crate::rbac::role::Action;
use crate::rbac::Users;
use crate::stats::{event_labels_date, storage_size_labels_date, Stats};
use crate::storage::StreamType;
use crate::storage::{retention::Retention, StorageDir, StreamInfo};
use crate::utils::actix::extract_session_key_from_req;
use crate::{event, stats};

use crate::{metadata, validator};
Expand All @@ -46,6 +49,7 @@ use arrow_schema::{Field, Schema};
use bytes::Bytes;
use chrono::Utc;
use http::{HeaderName, HeaderValue};
use itertools::Itertools;
use serde_json::Value;
use std::collections::HashMap;
use std::fs;
Expand Down Expand Up @@ -86,16 +90,27 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
Ok((format!("log stream {stream_name} deleted"), StatusCode::OK))
}

pub async fn list(_: HttpRequest) -> impl Responder {
//list all streams from storage
pub async fn list(req: HttpRequest) -> Result<impl Responder, StreamError> {
let key = extract_session_key_from_req(&req)
.map_err(|err| StreamError::Anyhow(anyhow::Error::msg(err.to_string())))?;

// list all streams from storage
let res = CONFIG
.storage()
.get_object_store()
.list_streams()
.await
.unwrap();
.unwrap()
.into_iter()
.filter(|logstream| {
warn!("logstream-\n{logstream:?}");

Users.authorize(key.clone(), Action::ListStream, Some(&logstream.name), None)
== crate::rbac::Response::Authorized
})
.collect_vec();

web::Json(res)
Ok(web::Json(res))
}

pub async fn detect_schema(body: Bytes) -> Result<impl Responder, StreamError> {
Expand Down Expand Up @@ -130,7 +145,7 @@ pub async fn schema(req: HttpRequest) -> Result<impl Responder, StreamError> {
}
Err(err) => return Err(StreamError::from(err)),
};
match update_schema_when_distributed(vec![stream_name.clone()]).await {
match update_schema_when_distributed(&vec![stream_name.clone()]).await {
Ok(_) => {
let schema = STREAM_INFO.schema(&stream_name)?;
Ok((web::Json(schema), StatusCode::OK))
Expand Down
Loading

0 comments on commit 3fa6029

Please sign in to comment.