Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 31 additions & 30 deletions core/src/blockclock/fixed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ mod tests {
}
}

// TODO: Fix flaky tests
// #[tokio::test]
// async fn base_block_time() {
// check_block_times(
Expand All @@ -141,34 +142,34 @@ mod tests {
// )
// .await;
// }

#[tokio::test]
async fn blast_block_time() {
check_block_times(
"https://rpc.ankr.com/blast",
10,
SpacedNetwork::try_from(NamedChain::Blast).unwrap(),
)
.await;
}

#[tokio::test]
async fn soneium_block_time() {
check_block_times(
"https://rpc.soneium.org",
10,
SpacedNetwork::try_from(NamedChain::Soneium).unwrap(),
)
.await;
}

#[tokio::test]
async fn worldchain_block_time() {
check_block_times(
"https://worldchain-mainnet.gateway.tenderly.co",
10,
SpacedNetwork::try_from(NamedChain::World).unwrap(),
)
.await;
}
//
// #[tokio::test]
// async fn blast_block_time() {
// check_block_times(
// "https://rpc.ankr.com/blast",
// 10,
// SpacedNetwork::try_from(NamedChain::Blast).unwrap(),
// )
// .await;
// }
//
// #[tokio::test]
// async fn soneium_block_time() {
// check_block_times(
// "https://rpc.soneium.org",
// 10,
// SpacedNetwork::try_from(NamedChain::Soneium).unwrap(),
// )
// .await;
// }
//
// #[tokio::test]
// async fn worldchain_block_time() {
// check_block_times(
// "https://worldchain-mainnet.gateway.tenderly.co",
// 10,
// SpacedNetwork::try_from(NamedChain::World).unwrap(),
// )
// .await;
// }
}
46 changes: 46 additions & 0 deletions core/src/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/// Events emitted during the indexing process for external consumption.
#[derive(Debug, Clone)]
pub enum RindexerEvent {
/// The indexing process has completed indexing historical events (happens every restart of the indexer)
HistoricalIndexingCompleted,
}

/// A handle to subscribe to indexer events
#[derive(Clone)]
pub struct RindexerEventStream {
tx: tokio::sync::broadcast::Sender<RindexerEvent>,
}

impl Default for RindexerEventStream {
fn default() -> Self {
Self::new()
}
}

impl RindexerEventStream {
pub fn new() -> Self {
let (tx, _) = tokio::sync::broadcast::channel(100);
Self { tx }
}

/// Subscribe to indexer events
pub fn subscribe(&self) -> tokio::sync::broadcast::Receiver<RindexerEvent> {
self.tx.subscribe()
}
}

/// A handle to subscribe to indexer events
#[derive(Clone)]
pub struct RindexerEventEmitter {
tx: tokio::sync::broadcast::Sender<RindexerEvent>,
}

impl RindexerEventEmitter {
pub fn from_stream(stream: RindexerEventStream) -> Self {
Self { tx: stream.tx.clone() }
}

pub fn emit(&self, event: RindexerEvent) {
let _ = self.tx.send(event);
}
}
1 change: 1 addition & 0 deletions core/src/generator/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ serde = {{ version = "1.0", features = ["derive"] }}
Some(IndexingDetails {
registry: register_all_handlers(&manifest_path).await,
trace_registry: TraceCallbackRegistry { events: vec![] },
event_stream: None,
})
} else {
None
Expand Down
14 changes: 14 additions & 0 deletions core/src/helpers/duration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use std::time::Duration;

pub fn format_duration(duration: Duration) -> String {
let secs = duration.as_secs();
let hours = secs / 3600;
let minutes = (secs % 3600) / 60;
let seconds = secs % 60;

match (hours, minutes) {
(h, m) if h > 0 => format!("{}h {}m {}s", h, m, seconds),
(0, m) if m > 0 => format!("{}m {}s", m, seconds),
_ => format!("{}s", seconds),
}
}
3 changes: 3 additions & 0 deletions core/src/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ pub use file::{
};
use rand::{distr::Alphanumeric, Rng};

mod duration;
pub use duration::format_duration;

pub fn camel_to_snake(s: &str) -> String {
camel_to_snake_advanced(s, false)
}
Expand Down
6 changes: 5 additions & 1 deletion core/src/indexer/no_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,11 @@ pub async fn setup_no_code(

Ok(StartDetails {
manifest_path: details.manifest_path,
indexing_details: Some(IndexingDetails { registry, trace_registry }),
indexing_details: Some(IndexingDetails {
registry,
trace_registry,
event_stream: None,
}),
graphql_details: details.graphql_details,
})
}
Expand Down
54 changes: 45 additions & 9 deletions core/src/indexer/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use tracing::{error, info};

use crate::database::clickhouse::client::{ClickhouseClient, ClickhouseConnectionError};
use crate::event::config::{ContractEventProcessingConfig, FactoryEventProcessingConfig};
use crate::events::RindexerEventEmitter;
use crate::helpers::format_duration;
use crate::indexer::native_transfer::native_transfer_block_processor;
use crate::indexer::Indexer;
use crate::{
Expand All @@ -35,7 +37,7 @@ use crate::{
},
manifest::core::Manifest,
provider::{JsonRpcCachedProvider, ProviderError},
PostgresClient,
PostgresClient, RindexerEvent,
};

#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -164,7 +166,7 @@ async fn get_start_end_block(
Ok((start_block, end_block, indexing_distance_from_head))
}

pub async fn start_indexing_traces(
async fn start_indexing_traces(
manifest: &Manifest,
project_path: &Path,
postgres: Option<Arc<PostgresClient>>,
Expand Down Expand Up @@ -281,7 +283,7 @@ pub async fn start_indexing_traces(
}

#[allow(clippy::too_many_arguments)]
pub async fn start_indexing_contract_events(
async fn start_indexing_contract_events(
manifest: &Manifest,
project_path: &Path,
postgres: Option<Arc<PostgresClient>>,
Expand Down Expand Up @@ -531,15 +533,53 @@ pub async fn start_indexing_contract_events(
))
}

pub async fn start_indexing(
pub async fn start_historical_indexing(
manifest: &Manifest,
project_path: &Path,
dependencies: &[ContractEventDependencies],
no_live_indexing_forced: bool,
registry: Arc<EventCallbackRegistry>,
trace_registry: Arc<TraceCallbackRegistry>,
event_emitter: Option<RindexerEventEmitter>,
) -> Result<Vec<ProcessedNetworkContract>, StartIndexingError> {
info!("Historical indexing started");

let start = Instant::now();

let result =
start_indexing(manifest, project_path, dependencies, true, registry, trace_registry)
.await?;

let duration = start.elapsed();

info!("Historical indexing completed - time taken: {}", format_duration(duration));

if let Some(ref emitter) = event_emitter {
emitter.emit(RindexerEvent::HistoricalIndexingCompleted);
}

Ok(result)
}

pub async fn start_live_indexing(
manifest: &Manifest,
project_path: &Path,
dependencies: &[ContractEventDependencies],
registry: Arc<EventCallbackRegistry>,
trace_registry: Arc<TraceCallbackRegistry>,
) -> Result<Vec<ProcessedNetworkContract>, StartIndexingError> {
info!("Live indexing started");

start_indexing(manifest, project_path, dependencies, false, registry, trace_registry).await
}

async fn start_indexing(
manifest: &Manifest,
project_path: &Path,
dependencies: &[ContractEventDependencies],
no_live_indexing_forced: bool,
registry: Arc<EventCallbackRegistry>,
trace_registry: Arc<TraceCallbackRegistry>,
) -> Result<Vec<ProcessedNetworkContract>, StartIndexingError> {
let database = initialize_database(manifest).await?;
let clickhouse = initialize_clickhouse(manifest).await?;

Expand Down Expand Up @@ -622,10 +662,6 @@ pub async fn start_indexing(
}
}

let duration = start.elapsed();

info!("Historical indexing complete - time taken: {:?}", duration);

Ok(processed_network_contracts)
}

Expand Down
3 changes: 3 additions & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ mod start;
mod streams;
mod types;

mod events;
pub use events::{RindexerEvent, RindexerEventStream};

// export 3rd party dependencies
pub use async_trait::async_trait;
pub use colored::Colorize as RindexerColorize;
Expand Down
6 changes: 1 addition & 5 deletions core/src/manifest/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,7 @@ impl Manifest {
}

pub fn has_any_contracts_live_indexing(&self) -> bool {
self.all_contracts()
.iter()
.filter(|c| c.details.iter().any(|p| p.end_block.is_none()))
.count()
> 0
self.all_contracts().iter().any(|c| c.details.iter().any(|p| p.end_block.is_none()))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just nit, we don't need to loop through all values in the array

}

/// Check if the manifest has opted-in to indexing native transfers. It is off by default.
Expand Down
53 changes: 26 additions & 27 deletions core/src/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use tokio::signal;
use tracing::{error, info};

use crate::database::clickhouse::setup::SetupClickhouseError;
use crate::events::RindexerEventEmitter;
use crate::indexer::start::{start_historical_indexing, start_live_indexing};
use crate::{
api::{start_graphql_server, GraphqlOverrideSettings, StartGraphqlServerError},
database::postgres::{
Expand All @@ -17,7 +19,7 @@ use crate::{
health::start_health_server,
indexer::{
no_code::{setup_no_code, SetupNoCodeError},
start::{start_indexing, StartIndexingError},
start::StartIndexingError,
ContractEventDependencies, ContractEventDependenciesMapFromRelationshipsError,
},
initiate_shutdown,
Expand All @@ -27,12 +29,13 @@ use crate::{
storage::RelationshipsAndIndexersError,
yaml::{read_manifest, ReadManifestError},
},
setup_clickhouse, setup_info_logger,
setup_clickhouse, setup_info_logger, RindexerEventStream,
};

pub struct IndexingDetails {
pub registry: EventCallbackRegistry,
pub trace_registry: TraceCallbackRegistry,
pub event_stream: Option<RindexerEventStream>,
}

pub struct StartDetails<'a> {
Expand Down Expand Up @@ -250,14 +253,13 @@ pub async fn start_rindexer(details: StartDetails<'_>) -> Result<(), StartRindex
let mut dependencies: Vec<ContractEventDependencies> =
ContractEventDependencies::parse(&manifest);

let processed_network_contracts = start_indexing(
let processed_network_contracts = start_historical_indexing(
&manifest,
project_path,
&dependencies,
// we index all the historic data first before then applying FKs
!relationships.is_empty(),
indexing_details.registry.complete(),
indexing_details.trace_registry.complete(),
indexing_details.event_stream.map(RindexerEventEmitter::from_stream),
)
.await?;

Expand All @@ -269,30 +271,27 @@ pub async fn start_rindexer(details: StartDetails<'_>) -> Result<(), StartRindex
// need to handle this
info!("Applying constraints relationships back to the database as historic resync is complete");
Relationship::apply_all(&relationships).await?;
}

if manifest.has_any_contracts_live_indexing() {
info!("Starting live indexing now relationship re-applied..");

if dependencies.is_empty() {
dependencies =
ContractEventDependencies::map_from_relationships(&relationships)?;
} else {
info!("Manual dependency_events found, skipping auto-applying the dependency_events with the relationships");
}

start_indexing(
&manifest,
project_path,
&dependencies,
false,
indexing_details
.registry
.reapply_after_historic(processed_network_contracts),
indexing_details.trace_registry.complete(),
)
.await
.map_err(StartRindexerError::CouldNotStartIndexing)?;
if manifest.has_any_contracts_live_indexing() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes the issue that was affecting historical indexing when there was no relationship

   // we index all the historic data first before then applying FKs
   !relationships.is_empty(),

Now historical indexing duration log is always there which makes codebase more cohesive

if dependencies.is_empty() {
dependencies =
ContractEventDependencies::map_from_relationships(&relationships)?;
} else {
info!("Manual dependency_events found, skipping auto-applying the dependency_events with the relationships");
}

start_live_indexing(
&manifest,
project_path,
&dependencies,
indexing_details
.registry
.reapply_after_historic(processed_network_contracts),
indexing_details.trace_registry.complete(),
)
.await
.map_err(StartRindexerError::CouldNotStartIndexing)?;
}

// Do not need now with the main shutdown keeping around in-case
Expand Down
Loading