Skip to content

Commit d58ed54

Browse files
fix: fetch first-event-at from storage (#1130)
current: query server fetches first-event-at from all the live ingestors but, this logic fails when ingestor which ingested the events for a stream is not reachable anymore, query server gets `None` change: fetch the first-event-at from all the stream jsons from storage find the earliest value, and update in server's memory map
1 parent 1c001c8 commit d58ed54

File tree

9 files changed

+225
-34
lines changed

9 files changed

+225
-34
lines changed

src/catalog/mod.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,9 @@ async fn create_manifest(
279279
}
280280
};
281281
first_event_at = Some(lower_bound.with_timezone(&Local).to_rfc3339());
282-
if let Err(err) = STREAM_INFO.set_first_event_at(stream_name, first_event_at.clone()) {
282+
if let Err(err) =
283+
STREAM_INFO.set_first_event_at(stream_name, first_event_at.as_ref().unwrap())
284+
{
283285
error!(
284286
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",
285287
stream_name
@@ -330,8 +332,8 @@ pub async fn remove_manifest_from_snapshot(
330332
let manifests = &mut meta.snapshot.manifest_list;
331333
// Filter out items whose manifest_path contains any of the dates_to_delete
332334
manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date)));
335+
STREAM_INFO.reset_first_event_at(stream_name)?;
333336
meta.first_event_at = None;
334-
STREAM_INFO.set_first_event_at(stream_name, None)?;
335337
storage.put_snapshot(stream_name, meta.snapshot).await?;
336338
}
337339
match CONFIG.options.mode {
@@ -391,7 +393,7 @@ pub async fn get_first_event(
391393
first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
392394
meta.first_event_at = Some(first_event_at.clone());
393395
storage.put_stream_manifest(stream_name, &meta).await?;
394-
STREAM_INFO.set_first_event_at(stream_name, Some(first_event_at.clone()))?;
396+
STREAM_INFO.set_first_event_at(stream_name, &first_event_at)?;
395397
}
396398
}
397399
}

src/event/format/json.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ impl EventFormat for Event {
9494
};
9595

9696
if value_arr
97-
.iter()
98-
.any(|value| fields_mismatch(&schema, value, schema_version))
97+
.iter()
98+
.any(|value| fields_mismatch(&schema, value, schema_version))
9999
{
100100
return Err(anyhow!(
101101
"Could not process this event due to mismatch in datatype"

src/event/format/mod.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -112,11 +112,8 @@ pub trait EventFormat: Sized {
112112
time_partition: Option<&String>,
113113
schema_version: SchemaVersion,
114114
) -> Result<(RecordBatch, bool), AnyError> {
115-
let (data, mut schema, is_first) = self.to_data(
116-
storage_schema,
117-
time_partition,
118-
schema_version,
119-
)?;
115+
let (data, mut schema, is_first) =
116+
self.to_data(storage_schema, time_partition, schema_version)?;
120117

121118
if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() {
122119
return Err(anyhow!(

src/handlers/http/logstream.rs

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,10 @@ use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, St
2121
use super::cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME};
2222
use super::ingest::create_stream_if_not_exists;
2323
use super::modal::utils::logstream_utils::{
24-
create_stream_and_schema_from_storage, create_update_stream,
24+
create_stream_and_schema_from_storage, create_update_stream, update_first_event_at,
2525
};
2626
use super::query::update_schema_when_distributed;
2727
use crate::alerts::Alerts;
28-
use crate::catalog::get_first_event;
2928
use crate::event::format::{override_data_type, LogSource};
3029
use crate::handlers::STREAM_TYPE_KEY;
3130
use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
@@ -57,7 +56,7 @@ use std::fs;
5756
use std::num::NonZeroU32;
5857
use std::str::FromStr;
5958
use std::sync::Arc;
60-
use tracing::{error, warn};
59+
use tracing::warn;
6160

6261
pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
6362
let stream_name = stream_name.into_inner();
@@ -550,19 +549,19 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
550549
return Err(StreamError::StreamNotFound(stream_name));
551550
}
552551
}
553-
554-
let store = CONFIG.storage().get_object_store();
555-
let dates: Vec<String> = Vec::new();
556-
if let Ok(Some(first_event_at)) = get_first_event(store, &stream_name, dates).await {
557-
if let Err(err) =
558-
metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at))
552+
let storage = CONFIG.storage().get_object_store();
553+
// if first_event_at is not found in memory map, check if it exists in the storage
554+
// if it exists in the storage, update the first_event_at in memory map
555+
let stream_first_event_at =
556+
if let Ok(Some(first_event_at)) = STREAM_INFO.get_first_event(&stream_name) {
557+
Some(first_event_at)
558+
} else if let Ok(Some(first_event_at)) =
559+
storage.get_first_event_from_storage(&stream_name).await
559560
{
560-
error!(
561-
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",
562-
stream_name
563-
);
564-
}
565-
}
561+
update_first_event_at(&stream_name, &first_event_at).await
562+
} else {
563+
None
564+
};
566565

567566
let hash_map = STREAM_INFO.read().unwrap();
568567
let stream_meta = &hash_map
@@ -572,7 +571,7 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
572571
let stream_info: StreamInfo = StreamInfo {
573572
stream_type: stream_meta.stream_type.clone(),
574573
created_at: stream_meta.created_at.clone(),
575-
first_event_at: stream_meta.first_event_at.clone(),
574+
first_event_at: stream_first_event_at,
576575
time_partition: stream_meta.time_partition.clone(),
577576
time_partition_limit: stream_meta
578577
.time_partition_limit
@@ -582,8 +581,6 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
582581
log_source: stream_meta.log_source.clone(),
583582
};
584583

585-
// get the other info from
586-
587584
Ok((web::Json(stream_info), StatusCode::OK))
588585
}
589586

src/handlers/http/modal/ingest_server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ impl IngestServer {
249249
web::put()
250250
.to(ingestor_logstream::put_stream)
251251
.authorize_for_stream(Action::CreateStream),
252-
)
252+
),
253253
)
254254
.service(
255255
// GET "/logstream/{logstream}/info" ==> Get info for given log stream

src/handlers/http/modal/utils/logstream_utils.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use crate::{
3636
storage::{LogStream, ObjectStoreFormat, StreamType},
3737
validator,
3838
};
39+
use tracing::error;
3940

4041
pub async fn create_update_stream(
4142
headers: &HeaderMap,
@@ -508,3 +509,56 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<
508509

509510
Ok(true)
510511
}
512+
513+
/// Updates the first-event-at in storage and logstream metadata for the specified stream.
514+
///
515+
/// This function updates the `first-event-at` in both the object store and the stream info metadata.
516+
/// If either update fails, an error is logged, but the function will still return the `first-event-at`.
517+
///
518+
/// # Arguments
519+
///
520+
/// * `stream_name` - The name of the stream to update.
521+
/// * `first_event_at` - The value of first-event-at.
522+
///
523+
/// # Returns
524+
///
525+
/// * `Option<String>` - Returns `Some(String)` with the provided timestamp if the update is successful,
526+
/// or `None` if an error occurs.
527+
///
528+
/// # Errors
529+
///
530+
/// This function logs an error if:
531+
/// * The `first-event-at` cannot be updated in the object store.
532+
/// * The `first-event-at` cannot be updated in the stream info.
533+
///
534+
/// # Examples
535+
///```ignore
536+
/// ```rust
537+
/// use parseable::handlers::http::modal::utils::logstream_utils::update_first_event_at;
538+
/// let result = update_first_event_at("my_stream", "2023-01-01T00:00:00Z").await;
539+
/// match result {
540+
/// Some(timestamp) => println!("first-event-at: {}", timestamp),
541+
/// None => eprintln!("Failed to update first-event-at"),
542+
/// }
543+
/// ```
544+
pub async fn update_first_event_at(stream_name: &str, first_event_at: &str) -> Option<String> {
545+
let storage = CONFIG.storage().get_object_store();
546+
if let Err(err) = storage
547+
.update_first_event_in_stream(stream_name, first_event_at)
548+
.await
549+
{
550+
error!(
551+
"Failed to update first_event_at in storage for stream {:?}: {err:?}",
552+
stream_name
553+
);
554+
}
555+
556+
if let Err(err) = metadata::STREAM_INFO.set_first_event_at(stream_name, first_event_at) {
557+
error!(
558+
"Failed to update first_event_at in stream info for stream {:?}: {err:?}",
559+
stream_name
560+
);
561+
}
562+
563+
Some(first_event_at.to_string())
564+
}

src/metadata.rs

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,13 +212,46 @@ impl StreamInfo {
212212
pub fn set_first_event_at(
213213
&self,
214214
stream_name: &str,
215-
first_event_at: Option<String>,
215+
first_event_at: &str,
216216
) -> Result<(), MetadataError> {
217217
let mut map = self.write().expect(LOCK_EXPECT);
218218
map.get_mut(stream_name)
219219
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
220220
.map(|metadata| {
221-
metadata.first_event_at = first_event_at;
221+
metadata.first_event_at = Some(first_event_at.to_owned());
222+
})
223+
}
224+
225+
/// Removes the `first_event_at` timestamp for the specified stream from the LogStreamMetadata.
226+
///
227+
/// This function is called during the retention task, when the parquet files along with the manifest files are deleted from the storage.
228+
/// The manifest path is removed from the snapshot in the stream.json
229+
/// and the first_event_at value in the stream.json is removed.
230+
///
231+
/// # Arguments
232+
///
233+
/// * `stream_name` - The name of the stream for which the `first_event_at` timestamp is to be removed.
234+
///
235+
/// # Returns
236+
///
237+
/// * `Result<(), MetadataError>` - Returns `Ok(())` if the `first_event_at` timestamp is successfully removed,
238+
/// or a `MetadataError` if the stream metadata is not found.
239+
///
240+
/// # Examples
241+
/// ```ignore
242+
/// ```rust
243+
/// let result = metadata.remove_first_event_at("my_stream");
244+
/// match result {
245+
/// Ok(()) => println!("first-event-at removed successfully"),
246+
/// Err(e) => eprintln!("Error removing first-event-at from STREAM_INFO: {}", e),
247+
/// }
248+
/// ```
249+
pub fn reset_first_event_at(&self, stream_name: &str) -> Result<(), MetadataError> {
250+
let mut map = self.write().expect(LOCK_EXPECT);
251+
map.get_mut(stream_name)
252+
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
253+
.map(|metadata| {
254+
metadata.first_event_at.take();
222255
})
223256
}
224257

src/storage/object_storage.rs

Lines changed: 109 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use actix_web_prometheus::PrometheusMetrics;
4444
use arrow_schema::Schema;
4545
use async_trait::async_trait;
4646
use bytes::Bytes;
47-
use chrono::Local;
47+
use chrono::{DateTime, Local, Utc};
4848
use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder};
4949
use once_cell::sync::OnceCell;
5050
use relative_path::RelativePath;
@@ -217,6 +217,42 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
217217
Ok(())
218218
}
219219

220+
/// Updates the first event timestamp in the object store for the specified stream.
221+
///
222+
/// This function retrieves the current object-store format for the given stream,
223+
/// updates the `first_event_at` field with the provided timestamp, and then
224+
/// stores the updated format back in the object store.
225+
///
226+
/// # Arguments
227+
///
228+
/// * `stream_name` - The name of the stream to update.
229+
/// * `first_event` - The timestamp of the first event to set.
230+
///
231+
/// # Returns
232+
///
233+
/// * `Result<(), ObjectStorageError>` - Returns `Ok(())` if the update is successful,
234+
/// or an `ObjectStorageError` if an error occurs.
235+
///
236+
/// # Examples
237+
/// ```ignore
238+
/// ```rust
239+
/// let result = object_store.update_first_event_in_stream("my_stream", "2023-01-01T00:00:00Z").await;
240+
/// assert!(result.is_ok());
241+
/// ```
242+
async fn update_first_event_in_stream(
243+
&self,
244+
stream_name: &str,
245+
first_event: &str,
246+
) -> Result<(), ObjectStorageError> {
247+
let mut format = self.get_object_store_format(stream_name).await?;
248+
format.first_event_at = Some(first_event.to_string());
249+
let format_json = to_bytes(&format);
250+
self.put_object(&stream_json_path(stream_name), format_json)
251+
.await?;
252+
253+
Ok(())
254+
}
255+
220256
async fn put_alerts(
221257
&self,
222258
stream_name: &str,
@@ -623,6 +659,78 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
623659
Ok(())
624660
}
625661

662+
async fn get_stream_meta_from_storage(
663+
&self,
664+
stream_name: &str,
665+
) -> Result<Vec<ObjectStoreFormat>, ObjectStorageError> {
666+
let mut stream_metas = vec![];
667+
let stream_meta_bytes = self
668+
.get_objects(
669+
Some(&RelativePathBuf::from_iter([
670+
stream_name,
671+
STREAM_ROOT_DIRECTORY,
672+
])),
673+
Box::new(|file_name| file_name.ends_with("stream.json")),
674+
)
675+
.await;
676+
if let Ok(stream_meta_bytes) = stream_meta_bytes {
677+
for stream_meta in stream_meta_bytes {
678+
let stream_meta_ob = serde_json::from_slice::<ObjectStoreFormat>(&stream_meta)?;
679+
stream_metas.push(stream_meta_ob);
680+
}
681+
}
682+
683+
Ok(stream_metas)
684+
}
685+
686+
/// Retrieves the earliest first-event-at from the storage for the specified stream.
687+
///
688+
/// This function fetches the object-store format from all the stream.json files for the given stream from the storage,
689+
/// extracts the `first_event_at` timestamps, and returns the earliest `first_event_at`.
690+
///
691+
/// # Arguments
692+
///
693+
/// * `stream_name` - The name of the stream for which `first_event_at` is to be retrieved.
694+
///
695+
/// # Returns
696+
///
697+
/// * `Result<Option<String>, ObjectStorageError>` - Returns `Ok(Some(String))` with the earliest
698+
/// first event timestamp if found, `Ok(None)` if no timestamps are found, or an `ObjectStorageError`
699+
/// if an error occurs.
700+
///
701+
/// # Examples
702+
/// ```ignore
703+
/// ```rust
704+
/// let result = get_first_event_from_storage("my_stream").await;
705+
/// match result {
706+
/// Ok(Some(first_event)) => println!("first-event-at: {}", first_event),
707+
/// Ok(None) => println!("first-event-at not found"),
708+
/// Err(err) => println!("Error: {:?}", err),
709+
/// }
710+
/// ```
711+
async fn get_first_event_from_storage(
712+
&self,
713+
stream_name: &str,
714+
) -> Result<Option<String>, ObjectStorageError> {
715+
let mut all_first_events = vec![];
716+
let stream_metas = self.get_stream_meta_from_storage(stream_name).await;
717+
if let Ok(stream_metas) = stream_metas {
718+
for stream_meta in stream_metas.iter() {
719+
if let Some(first_event) = &stream_meta.first_event_at {
720+
let first_event = DateTime::parse_from_rfc3339(first_event).unwrap();
721+
let first_event = first_event.with_timezone(&Utc);
722+
all_first_events.push(first_event);
723+
}
724+
}
725+
}
726+
727+
if all_first_events.is_empty() {
728+
return Ok(None);
729+
}
730+
let first_event_at = all_first_events.iter().min().unwrap().to_rfc3339();
731+
Ok(Some(first_event_at))
732+
}
733+
626734
// pick a better name
627735
fn get_bucket_name(&self) -> String;
628736
}

src/storage/retention.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,9 +218,9 @@ mod action {
218218
return;
219219
}
220220
}
221-
if let Ok(first_event_at) = res_remove_manifest {
221+
if let Ok(Some(first_event_at)) = res_remove_manifest {
222222
if let Err(err) =
223-
metadata::STREAM_INFO.set_first_event_at(&stream_name, first_event_at)
223+
metadata::STREAM_INFO.set_first_event_at(&stream_name, &first_event_at)
224224
{
225225
error!(
226226
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",

0 commit comments

Comments
 (0)