diff --git a/src/event/mod.rs b/src/event/mod.rs index b641643cb..3da339d55 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -17,22 +17,17 @@ * */ -pub mod format; +use std::{collections::HashMap, sync::Arc}; use arrow_array::RecordBatch; -use arrow_schema::{Field, Fields, Schema}; -use itertools::Itertools; -use std::sync::Arc; - -use self::error::EventError; -use crate::{ - metadata::update_stats, - parseable::{StagingError, PARSEABLE}, - storage::StreamType, - LOCK_EXPECT, -}; +use arrow_schema::Field; use chrono::NaiveDateTime; -use std::collections::HashMap; +use error::EventError; +use itertools::Itertools; + +use crate::{metadata::update_stats, parseable::PARSEABLE, storage::StreamType}; + +pub mod format; pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp"; pub const USER_AGENT_KEY: &str = "p_user_agent"; @@ -67,11 +62,12 @@ impl Event { } } + let stream = PARSEABLE.get_or_create_stream(&self.stream_name); if self.is_first_event { - commit_schema(&self.stream_name, self.rb.schema())?; + stream.commit_schema(self.rb.schema())?; } - PARSEABLE.get_or_create_stream(&self.stream_name).push( + stream.push( &key, &self.rb, self.parsed_timestamp, @@ -117,26 +113,12 @@ pub fn get_schema_key(fields: &[Arc]) -> String { format!("{hash:x}") } -pub fn commit_schema(stream_name: &str, schema: Arc) -> Result<(), StagingError> { - let mut stream_metadata = PARSEABLE.streams.write().expect("lock poisoned"); - - let map = &mut stream_metadata - .get_mut(stream_name) - .expect("map has entry for this stream name") - .metadata - .write() - .expect(LOCK_EXPECT) - .schema; - let current_schema = Schema::new(map.values().cloned().collect::()); - let schema = Schema::try_merge(vec![current_schema, schema.as_ref().clone()])?; - map.clear(); - map.extend(schema.fields.iter().map(|f| (f.name().clone(), f.clone()))); - Ok(()) -} - pub mod error { - use crate::{parseable::StagingError, storage::ObjectStorageError}; + use crate::{ + parseable::{StagingError, StreamNotFound}, + storage::ObjectStorageError, + }; #[derive(Debug, thiserror::Error)] pub enum EventError { @@ -144,5 +126,7 @@ pub mod error { Staging(#[from] StagingError), #[error("ObjectStorage Error: {0}")] ObjectStorage(#[from] ObjectStorageError), + #[error("{0}")] + NotFound(#[from] StreamNotFound), } } diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 7d9c33a45..94dd5334c 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -35,8 +35,6 @@ use tracing::error; use crate::event::error::EventError; use crate::handlers::http::fetch_schema; - -use crate::event::commit_schema; use crate::metrics::QUERY_EXECUTE_TIME; use crate::option::Mode; use crate::parseable::{StreamNotFound, PARSEABLE}; @@ -45,7 +43,6 @@ use crate::query::{execute, CountsRequest, CountsResponse, Query as LogicalQuery use crate::query::{TableScanVisitor, QUERY_SESSION}; use crate::rbac::Users; use crate::response::QueryResponse; -use crate::storage::object_storage::commit_schema_to_storage; use crate::storage::ObjectStorageError; use crate::utils::actix::extract_session_key_from_req; use crate::utils::time::{TimeParseError, TimeRange}; @@ -173,9 +170,15 @@ pub async fn update_schema_when_distributed(tables: &Vec) -> Result<(), for table in tables { if let Ok(new_schema) = fetch_schema(table).await { // commit schema merges the schema internally and updates the schema in storage. - commit_schema_to_storage(table, new_schema.clone()).await?; - - commit_schema(table, Arc::new(new_schema))?; + PARSEABLE + .storage + .get_object_store() + .commit_schema(table, new_schema.clone()) + .await?; + + PARSEABLE + .get_stream(table)? + .commit_schema(Arc::new(new_schema))?; } } } diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index c67c60043..ba1ccab25 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -28,7 +28,7 @@ use std::{ }; use arrow_array::RecordBatch; -use arrow_schema::{Field, Fields, Schema}; +use arrow_schema::{Field, Fields, Schema, SchemaRef}; use chrono::{NaiveDateTime, Timelike, Utc}; use derive_more::{Deref, DerefMut}; use itertools::Itertools; @@ -591,6 +591,18 @@ impl Stream { self.metadata.read().expect(LOCK_EXPECT).schema_version } + /// Stores updated schema in-memory + pub fn commit_schema(&self, schema: SchemaRef) -> Result<(), StagingError> { + let mut metadata = self.metadata.write().expect(LOCK_EXPECT); + let current_schema = Schema::new(metadata.schema.values().cloned().collect::()); + let schema = Schema::try_merge(vec![current_schema, schema.as_ref().clone()])?; + metadata.schema.clear(); + metadata + .schema + .extend(schema.fields.iter().map(|f| (f.name().clone(), f.clone()))); + Ok(()) + } + pub fn get_schema(&self) -> Arc { let metadata = self.metadata.read().expect(LOCK_EXPECT); diff --git a/src/storage/mod.rs b/src/storage/mod.rs index b6bc9bb25..aca514e5f 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -262,6 +262,9 @@ pub enum ObjectStorageError { #[error("JoinError: {0}")] JoinError(#[from] JoinError), + + #[error("Arrow Error: {0}")] + Arrow(#[from] arrow_schema::ArrowError), } pub fn to_object_store_path(path: &RelativePath) -> Path { diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index dea669975..eaaa07f40 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -869,7 +869,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { for path in stream.schema_files() { let file = File::open(&path)?; let schema: Schema = serde_json::from_reader(file)?; - commit_schema_to_storage(&stream_name, schema).await?; + self.commit_schema(&stream_name, schema).await?; if let Err(e) = remove_file(path) { warn!("Failed to remove staged file: {e}"); } @@ -878,16 +878,17 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { Ok(()) } -} -pub async fn commit_schema_to_storage( - stream_name: &str, - schema: Schema, -) -> Result<(), ObjectStorageError> { - let storage = PARSEABLE.storage().get_object_store(); - let stream_schema = storage.get_schema(stream_name).await?; - let new_schema = Schema::try_merge(vec![schema, stream_schema]).unwrap(); - storage.put_schema(stream_name, &new_schema).await + /// Stores updated schema in storage + async fn commit_schema( + &self, + stream_name: &str, + schema: Schema, + ) -> Result<(), ObjectStorageError> { + let stream_schema = self.get_schema(stream_name).await?; + let new_schema = Schema::try_merge(vec![schema, stream_schema])?; + self.put_schema(stream_name, &new_schema).await + } } #[inline(always)]