diff --git a/.gitignore b/.gitignore index 8c36419e..696c4e6a 100644 --- a/.gitignore +++ b/.gitignore @@ -23,6 +23,7 @@ examples/* !examples/rindexer_factory_indexing !examples/rindexer_rust_playground !examples/rust_clickhouse +!examples/rindexer_sqlite_db !examples/clickhouse_factory_indexing documentation/node_modules documentation/dist diff --git a/Cargo.lock b/Cargo.lock index 3b9b75fe..ccf046c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3778,7 +3778,7 @@ dependencies = [ "enr", "fnv", "futures", - "hashlink", + "hashlink 0.9.1", "hex", "hkdf", "lazy_static", @@ -4133,6 +4133,18 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" +[[package]] +name = "fallible-iterator" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" + +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "fast-float2" version = "0.2.3" @@ -4784,6 +4796,15 @@ dependencies = [ "hashbrown 0.14.5", ] +[[package]] +name = "hashlink" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1" +dependencies = [ + "hashbrown 0.15.5", +] + [[package]] name = "hdrhistogram" version = "7.5.4" @@ -6061,6 +6082,16 @@ dependencies = [ "libsecp256k1-core", ] +[[package]] +name = "libsqlite3-sys" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91632f3b4fb6bd1d72aa3d78f41ffecfcf2b1a6648d8c241dbe7dbfaf4875e15" +dependencies = [ + "pkg-config", + "vcpkg", +] + [[package]] name = "libz-rs-sys" version = "0.5.2" @@ -7271,7 +7302,7 @@ dependencies = [ "base64 0.22.1", "byteorder", "bytes", - "fallible-iterator", + "fallible-iterator 0.2.0", "hmac", "md-5", "memchr", @@ -7288,7 +7319,7 @@ checksum = "ef4605b7c057056dd35baeb6ac0c0338e4975b1f2bef0f65da953285eb007095" dependencies = [ "bytes", "chrono", - "fallible-iterator", + "fallible-iterator 0.2.0", "postgres-protocol", "serde_core", "serde_json", @@ -10817,7 +10848,7 @@ dependencies = [ [[package]] name = "rindexer" -version = "0.28.1" +version = "0.28.2" dependencies = [ "alloy", "alloy-chains", @@ -10862,6 +10893,7 @@ dependencies = [ "reth-node-api", "reth-node-ethereum", "reth-tracing", + "rusqlite", "rust_decimal", "serde", "serde_json", @@ -10887,7 +10919,7 @@ dependencies = [ [[package]] name = "rindexer_cli" -version = "0.28.1" +version = "0.28.2" dependencies = [ "alloy", "alloy-chains", @@ -11092,6 +11124,20 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48fd7bd8a6377e15ad9d42a8ec25371b94ddc67abe7c8b9127bec79bebaaae18" +[[package]] +name = "rusqlite" +version = "0.36.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3de23c3319433716cf134eed225fe9986bc24f63bed9be9f20c329029e672dc7" +dependencies = [ + "bitflags 2.10.0", + "fallible-iterator 0.3.0", + "fallible-streaming-iterator", + "hashlink 0.10.0", + "libsqlite3-sys", + "smallvec", +] + [[package]] name = "rust_clickhouse" version = "0.1.0" @@ -12817,7 +12863,7 @@ dependencies = [ "async-trait", "byteorder", "bytes", - "fallible-iterator", + "fallible-iterator 0.2.0", "futures-channel", "futures-util", "log", diff --git a/Dockerfile b/Dockerfile index 19edf5fd..49cf8f2f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -22,6 +22,7 @@ RUN apt-get update && apt-get install -y \ ca-certificates \ curl \ git \ + libsqlite3-0 \ && rm -rf /var/lib/apt/lists/* RUN curl -L https://foundry.paradigm.xyz | bash diff --git a/cli/Makefile b/cli/Makefile index 999e0f3f..a21dd05f 100644 --- a/cli/Makefile +++ b/cli/Makefile @@ -6,6 +6,8 @@ new_rust: RUSTFLAGS='-C target-cpu=native' cargo run --release --features jemalloc -- new --path $(CURDIR)/../examples rust start_indexer: RUSTFLAGS='-C target-cpu=native' RUST_BACKTRACE='full' cargo run --release --features jemalloc -- start --path $(CURDIR)/../examples/rindexer_demo_cli indexer +start_sqlite: + RUSTFLAGS='-C target-cpu=native' RUST_BACKTRACE='full' cargo run --release --features jemalloc -- start --path $(CURDIR)/../examples/rindexer_sqlite_db indexer start_all: RUSTFLAGS='-C target-cpu=native' RUST_BACKTRACE='full' cargo run --release --features jemalloc -- start --path $(CURDIR)/../examples/rindexer_demo_cli all start_graphql: diff --git a/cli/src/commands/new.rs b/cli/src/commands/new.rs index ce5c258a..d49260e0 100644 --- a/cli/src/commands/new.rs +++ b/cli/src/commands/new.rs @@ -11,12 +11,12 @@ use alloy::{ primitives::{Address, U64}, rpc::types::ValueOrArray, }; -use rindexer::manifest::config::Config; use rindexer::manifest::contract::ContractEvent; use rindexer::manifest::global::Global; #[cfg(feature = "reth")] use rindexer::manifest::reth::RethConfig; use rindexer::manifest::storage::ClickhouseDetails; +use rindexer::manifest::{config::Config, storage::SqliteDetails}; use rindexer::{ generator::{build::generate_rust_project, generate_docker_file}, manifest::{ @@ -155,7 +155,13 @@ pub fn handle_new_command( let repository = prompt_for_optional_input::("Repository", None); let storage_choice = prompt_for_input_list( "What Storages To Enable? (graphql can only be supported if postgres is enabled)", - &["postgres".to_string(), "clickhouse".to_string(), "csv".to_string(), "none".to_string()], + &[ + "postgres".to_string(), + "clickhouse".to_string(), + "csv".to_string(), + "sqlite".to_string(), + "none".to_string(), + ], None, ); let mut postgres_docker_enable = false; @@ -171,7 +177,7 @@ pub fn handle_new_command( let postgres_enabled = storage_choice == "postgres"; let csv_enabled = storage_choice == "csv"; let clickhouse_enabled = storage_choice == "clickhouse"; - + let sqlite_enabled = storage_choice == "sqlite"; // Handle Reth configuration if enabled let final_reth_config = get_reth_config(reth_config); @@ -284,6 +290,15 @@ pub fn handle_new_command( } else { None }, + sqlite: if sqlite_enabled { + Some(SqliteDetails { + enabled: true, + drop_each_run: None, + disable_create_tables: None, + }) + } else { + None + }, }, graphql: None, }; diff --git a/core/Cargo.toml b/core/Cargo.toml index 9d1045d6..0746e6b6 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -47,6 +47,7 @@ redis = { version = "0.32.7", features = ["streams"] } regex = "1.12.2" reqwest = { version = "0.12.24", features = ["json", "gzip"] } rust_decimal = { version = "1.39.0", features = ["db-tokio-postgres"] } +rusqlite = "0.36" serde = "1.0" serde_json = "1.0" serde_yaml = "0.9.34" diff --git a/core/src/blockclock/runlencoder.rs b/core/src/blockclock/runlencoder.rs index 43974d7f..af296b20 100644 --- a/core/src/blockclock/runlencoder.rs +++ b/core/src/blockclock/runlencoder.rs @@ -692,11 +692,11 @@ mod tests { return Ok(()); } - let mut encoder = DeltaEncoder::new(network_id, Some(&rpc_url), &file_path); + let mut encoder = DeltaEncoder::new(network_id, Some(rpc_url), &file_path); encoder.fetch_encode_persist(100).await?; drop(encoder); - let mut reloaded = DeltaEncoder::from_file_inner(network_id, Some(&rpc_url), &file_path)?; + let mut reloaded = DeltaEncoder::from_file_inner(network_id, Some(rpc_url), &file_path)?; reloaded.fetch_encode_persist(100).await?; @@ -739,7 +739,7 @@ mod tests { return Ok(()); } - let reloaded = DeltaEncoder::from_file(network_id, Some(&rpc_url), &base_path)?; + let reloaded = DeltaEncoder::from_file(network_id, Some(rpc_url), &base_path)?; let mut samples = Vec::with_capacity(sample_count); for _ in 1..=sample_count { diff --git a/core/src/database/mod.rs b/core/src/database/mod.rs index a169777f..f854c8aa 100644 --- a/core/src/database/mod.rs +++ b/core/src/database/mod.rs @@ -2,3 +2,4 @@ pub mod clickhouse; pub mod generate; pub mod postgres; pub mod sql_type_wrapper; +pub mod sqlite; diff --git a/core/src/database/sql_type_wrapper.rs b/core/src/database/sql_type_wrapper.rs index 432bf472..55b7f3d2 100644 --- a/core/src/database/sql_type_wrapper.rs +++ b/core/src/database/sql_type_wrapper.rs @@ -341,6 +341,174 @@ impl EthereumSqlTypeWrapper { } } + pub fn to_sqlite_string_value(&self) -> String { + // SQLite will store everything as TEXT, INTEGER, REAL, or BLOB + // For simplicity, we convert most things to text representation + match self { + // Boolean + EthereumSqlTypeWrapper::Bool(value) => if *value { "1" } else { "0" }.to_string(), + EthereumSqlTypeWrapper::VecBool(values) => { + serde_json::to_string(values).unwrap_or_default() + } + + // Integers - store as text to avoid overflow issues + EthereumSqlTypeWrapper::U8(value) => value.to_string(), + EthereumSqlTypeWrapper::I8(value) => value.to_string(), + EthereumSqlTypeWrapper::VecU8(values) => { + serde_json::to_string(values).unwrap_or_default() + } + EthereumSqlTypeWrapper::VecI8(values) => { + serde_json::to_string(values).unwrap_or_default() + } + + EthereumSqlTypeWrapper::U16(value) => value.to_string(), + EthereumSqlTypeWrapper::I16(value) => value.to_string(), + EthereumSqlTypeWrapper::VecU16(values) => { + serde_json::to_string(values).unwrap_or_default() + } + EthereumSqlTypeWrapper::VecI16(values) => { + serde_json::to_string(values).unwrap_or_default() + } + + EthereumSqlTypeWrapper::U32(value) => value.to_string(), + EthereumSqlTypeWrapper::I32(value) => value.to_string(), + EthereumSqlTypeWrapper::VecU32(values) => { + serde_json::to_string(values).unwrap_or_default() + } + EthereumSqlTypeWrapper::VecI32(values) => { + serde_json::to_string(values).unwrap_or_default() + } + + EthereumSqlTypeWrapper::U64(value) + | EthereumSqlTypeWrapper::U64Nullable(value) + | EthereumSqlTypeWrapper::U64BigInt(value) => value.to_string(), + EthereumSqlTypeWrapper::I64(value) => value.to_string(), + EthereumSqlTypeWrapper::VecU64(values) => { + serde_json::to_string(values).unwrap_or_default() + } + EthereumSqlTypeWrapper::VecI64(values) => { + serde_json::to_string(values).unwrap_or_default() + } + + EthereumSqlTypeWrapper::U128(value) => value.to_string(), + EthereumSqlTypeWrapper::I128(value) => value.to_string(), + EthereumSqlTypeWrapper::VecU128(values) => { + serde_json::to_string(&values.iter().map(|v| v.to_string()).collect::>()) + .unwrap_or_default() + } + EthereumSqlTypeWrapper::VecI128(values) => { + serde_json::to_string(&values.iter().map(|v| v.to_string()).collect::>()) + .unwrap_or_default() + } + + EthereumSqlTypeWrapper::U256(value) + | EthereumSqlTypeWrapper::U256Nullable(value) + | EthereumSqlTypeWrapper::U256Numeric(value) + | EthereumSqlTypeWrapper::U256Bytes(value) + | EthereumSqlTypeWrapper::U256BytesNullable(value) => value.to_string(), + EthereumSqlTypeWrapper::U256NumericNullable(value) => { + value.map(|v| v.to_string()).unwrap_or_else(|| "NULL".to_string()) + } + EthereumSqlTypeWrapper::VecU256(values) + | EthereumSqlTypeWrapper::VecU256Bytes(values) + | EthereumSqlTypeWrapper::VecU256Numeric(values) => { + serde_json::to_string(&values.iter().map(|v| v.to_string()).collect::>()) + .unwrap_or_default() + } + + EthereumSqlTypeWrapper::I256(value) + | EthereumSqlTypeWrapper::I256Nullable(value) + | EthereumSqlTypeWrapper::I256Numeric(value) + | EthereumSqlTypeWrapper::I256Bytes(value) + | EthereumSqlTypeWrapper::I256BytesNullable(value) => value.to_string(), + EthereumSqlTypeWrapper::VecI256(values) + | EthereumSqlTypeWrapper::VecI256Bytes(values) => { + serde_json::to_string(&values.iter().map(|v| v.to_string()).collect::>()) + .unwrap_or_default() + } + + EthereumSqlTypeWrapper::U512(value) => value.to_string(), + EthereumSqlTypeWrapper::VecU512(values) => { + serde_json::to_string(&values.iter().map(|v| v.to_string()).collect::>()) + .unwrap_or_default() + } + + // Hashes and bytes - store as hex strings + EthereumSqlTypeWrapper::B128(value) => format!("{:?}", value), + #[allow(deprecated)] + EthereumSqlTypeWrapper::H160(value) => format!("{:?}", value), + EthereumSqlTypeWrapper::B256(value) | EthereumSqlTypeWrapper::B256Bytes(value) => { + format!("{:?}", value) + } + EthereumSqlTypeWrapper::B512(value) => format!("{:?}", value), + EthereumSqlTypeWrapper::VecB128(values) => serde_json::to_string( + &values.iter().map(|v| format!("{:?}", v)).collect::>(), + ) + .unwrap_or_default(), + #[allow(deprecated)] + EthereumSqlTypeWrapper::VecH160(values) => serde_json::to_string( + &values.iter().map(|v| format!("{:?}", v)).collect::>(), + ) + .unwrap_or_default(), + EthereumSqlTypeWrapper::VecB256(values) + | EthereumSqlTypeWrapper::VecB256Bytes(values) => serde_json::to_string( + &values.iter().map(|v| format!("{:?}", v)).collect::>(), + ) + .unwrap_or_default(), + EthereumSqlTypeWrapper::VecB512(values) => serde_json::to_string( + &values.iter().map(|v| format!("{:?}", v)).collect::>(), + ) + .unwrap_or_default(), + + // Address + EthereumSqlTypeWrapper::Address(value) + | EthereumSqlTypeWrapper::AddressNullable(value) + | EthereumSqlTypeWrapper::AddressBytes(value) + | EthereumSqlTypeWrapper::AddressBytesNullable(value) => format!("{:?}", value), + EthereumSqlTypeWrapper::VecAddress(values) + | EthereumSqlTypeWrapper::VecAddressBytes(values) => serde_json::to_string( + &values.iter().map(|v| format!("{:?}", v)).collect::>(), + ) + .unwrap_or_default(), + + // Strings + EthereumSqlTypeWrapper::String(value) + | EthereumSqlTypeWrapper::StringVarchar(value) + | EthereumSqlTypeWrapper::StringChar(value) + | EthereumSqlTypeWrapper::StringNullable(value) + | EthereumSqlTypeWrapper::StringVarcharNullable(value) + | EthereumSqlTypeWrapper::StringCharNullable(value) => value.clone(), + EthereumSqlTypeWrapper::VecString(values) + | EthereumSqlTypeWrapper::VecStringVarchar(values) + | EthereumSqlTypeWrapper::VecStringChar(values) => { + serde_json::to_string(values).unwrap_or_default() + } + + // Bytes + EthereumSqlTypeWrapper::Bytes(value) | EthereumSqlTypeWrapper::BytesNullable(value) => { + format!("0x{}", hex::encode(value)) + } + EthereumSqlTypeWrapper::VecBytes(values) => serde_json::to_string( + &values.iter().map(|v| format!("0x{}", hex::encode(v))).collect::>(), + ) + .unwrap_or_default(), + + // DateTime + EthereumSqlTypeWrapper::DateTime(value) => value.to_rfc3339(), + EthereumSqlTypeWrapper::DateTimeNullable(value) => { + value.as_ref().map(|v| v.to_rfc3339()).unwrap_or_else(|| "NULL".to_string()) + } + + // UUID + EthereumSqlTypeWrapper::Uuid(value) => value.to_string(), + + // JSONB + EthereumSqlTypeWrapper::JSONB(value) => { + serde_json::to_string(value).unwrap_or_default() + } + } + } + pub fn to_clickhouse_value(&self) -> String { match self { // Boolean diff --git a/core/src/database/sqlite/client.rs b/core/src/database/sqlite/client.rs new file mode 100644 index 00000000..bbcd87f0 --- /dev/null +++ b/core/src/database/sqlite/client.rs @@ -0,0 +1,235 @@ +use std::{env, path::PathBuf}; + +use dotenv::dotenv; +use rusqlite::{Connection, ToSql}; +use tracing::{error, info}; + +use crate::database::generate::generate_event_table_columns_names_sql; +use crate::database::sql_type_wrapper::EthereumSqlTypeWrapper; + +pub fn connection_string() -> Result { + dotenv().ok(); + // Default to ./rindexer.db if DATABASE_URL is not set + let connection = env::var("DATABASE_URL").unwrap_or_else(|_| "./rindexer.db".to_string()); + Ok(connection) +} + +#[derive(thiserror::Error, Debug)] +pub enum SqliteConnectionError { + #[error("The database connection string is wrong please check your environment: {0}")] + DatabaseConnectionConfigWrong(#[from] env::VarError), + + #[error("SQLite error: {0}")] + SqliteError(#[from] SqliteError), + + #[error("Can not connect to the database please make sure your connection string is correct")] + CanNotConnectToDatabase, + + #[error("Could not parse connection string make sure it is correctly formatted")] + CouldNotParseConnectionString, +} + +#[derive(thiserror::Error, Debug)] +pub enum SqliteError { + #[error("SQLite error: {0}")] + SqliteError(#[from] rusqlite::Error), + + #[error("Failed to acquire connection lock")] + ConnectionLockError, +} + +#[allow(dead_code)] +#[derive(thiserror::Error, Debug)] +pub enum BulkInsertSqliteError { + #[error("{0}")] + SqliteError(#[from] SqliteError), + + #[error("Could not write data to SQLite: {0}")] + CouldNotWriteDataToSqlite(#[from] rusqlite::Error), +} + +pub struct SqliteClient { + db_path: String, +} + +impl SqliteClient { + pub async fn new() -> Result { + let connection_str = connection_string()?; + + info!("Connecting to SQLite database at: {}", connection_str); + + // Create parent directories if they don't exist + let db_path_clone = connection_str.clone(); + tokio::task::spawn_blocking(move || { + if let Some(parent) = PathBuf::from(&db_path_clone).parent() { + if !parent.exists() { + std::fs::create_dir_all(parent).map_err(|e| { + error!("Failed to create parent directories for SQLite database: {}", e); + SqliteConnectionError::CanNotConnectToDatabase + })?; + } + } + + // TODO: Not sure if we need this with only one writer. + // let conn = Connection::open(&db_path_clone).map_err(|e| { + // error!("Error connecting to SQLite database: {}", e); + // SqliteConnectionError::CanNotConnectToDatabase + // })?; + + // // Enable WAL mode for better concurrent performance + // conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA synchronous=NORMAL;").map_err( + // |_e| { + // error!("Error setting SQLite pragmas"); + // SqliteConnectionError::CanNotConnectToDatabase + // }, + // )?; + + info!("Successfully connected to SQLite database"); + Ok::<(), SqliteConnectionError>(()) + }) + .await + .map_err(|_| SqliteConnectionError::CanNotConnectToDatabase)??; + + Ok(SqliteClient { db_path: connection_str }) + } + + pub async fn batch_execute(&self, sql: &str) -> Result<(), SqliteError> { + let db_path = self.db_path.clone(); + let sql = sql.to_string(); + + tokio::task::spawn_blocking(move || { + let conn = Connection::open(&db_path)?; + conn.execute_batch(&sql)?; + + // Explicitly close the connection to ensure data is flushed to disk + drop(conn); + + Ok::<(), rusqlite::Error>(()) + }) + .await + .map_err(|_| SqliteError::ConnectionLockError)? + .map_err(SqliteError::SqliteError) + } + + // Helper method for single execute - currently unused but kept for potential future use + #[allow(dead_code)] + pub async fn execute(&self, query: &str, params: Vec) -> Result { + let db_path = self.db_path.clone(); + let query = query.to_string(); + + tokio::task::spawn_blocking(move || { + let conn = Connection::open(&db_path)?; + // Use dynamic dispatch with explicit type annotation to avoid trait object issues + let params_dynamic: Vec> = + params.into_iter().map(|p| Box::new(p) as Box).collect(); + let params_refs: Vec<&dyn ToSql> = params_dynamic.iter().map(|p| p.as_ref()).collect(); + let result = conn.execute(&query, params_refs.as_slice())?; + Ok::(result) + }) + .await + .map_err(|_| SqliteError::ConnectionLockError)? + .map_err(SqliteError::SqliteError) + } + + /// Query a single value from the database + /// Returns the value as a String, automatically converting from various SQLite types + pub async fn query_one_value( + &self, + query: &str, + params: Vec, + ) -> Result { + let db_path = self.db_path.clone(); + let query = query.to_string(); + + tokio::task::spawn_blocking(move || { + let conn = Connection::open(&db_path)?; + let params_dynamic: Vec> = + params.into_iter().map(|p| Box::new(p) as Box).collect(); + let params_refs: Vec<&dyn ToSql> = params_dynamic.iter().map(|p| p.as_ref()).collect(); + + let mut stmt = conn.prepare(&query)?; + let result: String = stmt.query_row(params_refs.as_slice(), |row| { + // Try to get as i64 first (most common for numeric columns in SQLite) + if let Ok(val) = row.get::<_, i64>(0) { + return Ok(val.to_string()); + } + // If that fails, try as String + if let Ok(val) = row.get::<_, String>(0) { + return Ok(val); + } + // If both fail, try as text (covers more cases) + row.get::<_, String>(0) + })?; + + Ok::(result) + }) + .await + .map_err(|_| SqliteError::ConnectionLockError)? + .map_err(SqliteError::SqliteError) + } + + /// Bulk insert method using SQLite transactions for better performance + /// Note: SQLite doesn't have COPY like PostgreSQL, but transactions significantly improve bulk insert performance + pub async fn insert_bulk( + &self, + table_name: &str, + columns: &[String], + bulk_data: &[Vec], + ) -> Result<(), String> { + if bulk_data.is_empty() { + return Ok(()); + } + + let db_path = self.db_path.clone(); + // SQLite doesn't support schema.table notation - replace dots with underscores + let table_name = table_name.replace('.', "_"); + let columns = columns.to_vec(); + + // Convert all data to strings for SQLite (simplest approach) + let string_data: Vec> = bulk_data + .iter() + .map(|row| row.iter().map(|wrapper| wrapper.to_sqlite_string_value()).collect()) + .collect(); + + tokio::task::spawn_blocking(move || { + let conn = Connection::open(&db_path) + .map_err(|e| format!("Failed to open connection: {}", e))?; + + // Start a transaction for bulk insert + conn.execute("BEGIN TRANSACTION", []) + .map_err(|e| format!("Failed to begin transaction: {}", e))?; + + let placeholders = + (1..=columns.len()).map(|i| format!("?{}", i)).collect::>().join(", "); + + let query = format!( + "INSERT INTO {} ({}) VALUES ({})", + table_name, + generate_event_table_columns_names_sql(&columns), + placeholders + ); + + for row in &string_data { + let params: Vec<&dyn ToSql> = row.iter().map(|s| s as &dyn ToSql).collect(); + + conn.execute(&query, params.as_slice()).map_err(|e| { + // Try to rollback on error + let _ = conn.execute("ROLLBACK", []); + format!("Failed to insert row: {}", e) + })?; + } + + conn.execute("COMMIT", []) + .map_err(|e| format!("Failed to commit transaction: {}", e))?; + + // Explicitly close the connection to ensure data is flushed to disk + drop(conn); + + Ok::<(), String>(()) + }) + .await + .map_err(|e| format!("Task join error: {}", e))??; + + Ok(()) + } +} diff --git a/core/src/database/sqlite/generate.rs b/core/src/database/sqlite/generate.rs new file mode 100644 index 00000000..af767671 --- /dev/null +++ b/core/src/database/sqlite/generate.rs @@ -0,0 +1,275 @@ +use crate::abi::{EventInfo, ParamTypeError, ReadAbiError}; +use crate::database::sqlite::client::SqliteError; +use crate::helpers::camel_to_snake; +use crate::indexer::native_transfer::{NATIVE_TRANSFER_ABI, NATIVE_TRANSFER_CONTRACT_NAME}; +use crate::indexer::Indexer; +use crate::manifest::contract::FactoryDetailsYaml; +use crate::types::code::Code; +use crate::ABIItem; +use std::path::Path; +use tracing::{error, info}; + +#[derive(thiserror::Error, Debug)] +pub enum GenerateTablesForIndexerSqlError { + #[error("{0}")] + ReadAbiError(#[from] ReadAbiError), + + #[error("{0}")] + ParamTypeError(#[from] ParamTypeError), + + #[error("failed to execute {0}")] + Sqlite(#[from] SqliteError), +} + +fn sqlite_type_for_solidity(solidity_type: &str) -> &'static str { + // SQLite has only 5 storage classes: NULL, INTEGER, REAL, TEXT, BLOB + // We'll use TEXT for most things for simplicity + if solidity_type.starts_with("uint") || solidity_type.starts_with("int") { + "TEXT" // Store large numbers as text to avoid overflow + } else if solidity_type == "bool" { + "INTEGER" // 0 or 1 + } else { + // includes bytes, address and anything else + "TEXT" + } +} + +/// SQLite reserved identifiers you want to quote. +const RESERVED_IDENTS: &[&str] = &[ + "from", "to", + // add more as needed +]; + +/// Wraps the identifier in quotes if it is a reserved keyword. +fn quote_if_reserved(ident: &str) -> String { + if RESERVED_IDENTS.contains(&ident) { + format!("\"{}\"", ident) + } else { + ident.to_string() + } +} + +fn generate_event_table_sql(abi_inputs: &[EventInfo], table_prefix: &str) -> String { + abi_inputs + .iter() + .map(|event_info| { + let table_name = format!("{}_{}", table_prefix, camel_to_snake(&event_info.name)); + info!("Creating table if not exists: {}", table_name); + + let event_columns = if event_info.inputs.is_empty() { + String::new() + } else { + event_info + .inputs + .iter() + .map(|input| { + let col_name = quote_if_reserved(&camel_to_snake(&input.name)); + let col_type = sqlite_type_for_solidity(&input.type_); + format!("{} {}", col_name, col_type) + }) + .collect::>() + .join(", ") + + "," + }; + + format!( + "CREATE TABLE IF NOT EXISTS {table_name} (\ + rindexer_id INTEGER PRIMARY KEY AUTOINCREMENT, \ + contract_address TEXT NOT NULL, \ + {event_columns} \ + tx_hash TEXT NOT NULL, \ + block_number INTEGER NOT NULL, \ + block_timestamp TEXT, \ + block_hash TEXT NOT NULL, \ + network TEXT NOT NULL, \ + tx_index INTEGER NOT NULL, \ + log_index TEXT NOT NULL\ + );" + ) + }) + .collect::>() + .join("\n") +} + +fn generate_internal_event_table_sql( + abi_inputs: &[EventInfo], + table_prefix: &str, + networks: Vec<&str>, +) -> String { + abi_inputs.iter().map(|event_info| { + let table_name = format!("rindexer_internal_{}_{}", table_prefix, camel_to_snake(&event_info.name)); + + let create_table_query = format!( + r#"CREATE TABLE IF NOT EXISTS {table_name} (network TEXT PRIMARY KEY, last_synced_block INTEGER);"# + ); + + let insert_queries = networks.iter().map(|network| { + format!( + r#"INSERT OR IGNORE INTO {table_name} (network, last_synced_block) VALUES ('{network}', 0);"# + ) + }).collect::>().join("\n"); + + let create_latest_block_query = r#"CREATE TABLE IF NOT EXISTS rindexer_internal_latest_block (network TEXT PRIMARY KEY, block INTEGER);"#.to_string(); + + let latest_block_insert_queries = networks.iter().map(|network| { + format!( + r#"INSERT OR IGNORE INTO rindexer_internal_latest_block (network, block) VALUES ('{network}', 0);"# + ) + }).collect::>().join("\n"); + + format!("{create_table_query}\n{insert_queries}\n{create_latest_block_query}\n{latest_block_insert_queries}") + }).collect::>().join("\n") +} + +fn generate_internal_factory_event_table_sql( + indexer_name: &str, + factories: &[FactoryDetailsYaml], +) -> String { + factories.iter().map(|factory| { + let table_name = format!( + "rindexer_internal_{}_{}_{}_{}", + camel_to_snake(indexer_name), + camel_to_snake(&factory.name), + camel_to_snake(&factory.event_name), + factory.input_names().iter().map(|v| camel_to_snake(v)).collect::>().join("_") + ); + + format!( + r#"CREATE TABLE IF NOT EXISTS {table_name} (factory_address TEXT, factory_deployed_address TEXT, network TEXT, PRIMARY KEY (factory_address, factory_deployed_address, network));"# + ) + }).collect::>().join("\n") +} + +pub fn generate_tables_for_indexer_sql( + project_path: &Path, + indexer: &Indexer, + _disable_event_tables: bool, +) -> Result { + let mut sql = String::new(); + + for contract in &indexer.contracts { + let contract_name = contract.before_modify_name_if_filter_readonly(); + let abi_items = ABIItem::read_abi_items(project_path, contract)?; + let events = ABIItem::extract_event_names_and_signatures_from_abi(abi_items)?; + let table_prefix = + format!("{}_{}", camel_to_snake(&indexer.name), camel_to_snake(&contract_name)); + let networks: Vec<&str> = contract.details.iter().map(|d| d.network.as_str()).collect(); + let factories = contract.details.iter().flat_map(|d| d.factory.clone()).collect::>(); + // Create event tables + sql.push_str(&generate_event_table_sql(&events, &table_prefix)); + + // Create internal tracking tables + sql.push_str(&generate_internal_event_table_sql(&events, &table_prefix, networks)); + + // Create factory tables if needed + if !factories.is_empty() { + sql.push_str(&generate_internal_factory_event_table_sql(&indexer.name, &factories)); + } + } + + if indexer.native_transfers.enabled { + let contract_name = NATIVE_TRANSFER_CONTRACT_NAME.to_string(); + let abi_str = NATIVE_TRANSFER_ABI; + let abi_items: Vec = + serde_json::from_str(abi_str).expect("JSON was not well-formatted"); + let event_names = ABIItem::extract_event_names_and_signatures_from_abi(abi_items)?; + let table_prefix = + format!("{}_{}", camel_to_snake(&indexer.name), camel_to_snake(&contract_name)); + let networks = indexer.clone().native_transfers.networks.unwrap(); + let networks: Vec<&str> = networks.iter().map(|d| d.network.as_str()).collect(); + + sql.push_str(&generate_event_table_sql(&event_names, &table_prefix)); + sql.push_str(&generate_internal_event_table_sql(&event_names, &table_prefix, networks)); + } + + sql.push_str(&format!( + r#" + CREATE TABLE IF NOT EXISTS rindexer_internal_{indexer_name}_last_known_relationship_dropping_sql ( + key INTEGER PRIMARY KEY, + value TEXT NOT NULL + ); + "#, + indexer_name = camel_to_snake(&indexer.name) + )); + + sql.push_str(&format!( + r#" + CREATE TABLE IF NOT EXISTS rindexer_internal_{indexer_name}_last_known_indexes_dropping_sql ( + key INTEGER PRIMARY KEY, + value TEXT NOT NULL + ); + "#, + indexer_name = camel_to_snake(&indexer.name) + )); + + sql.push_str(&format!( + r#" + CREATE TABLE IF NOT EXISTS rindexer_internal_{indexer_name}_last_run_migrations_sql ( + version INTEGER PRIMARY KEY, + migration_applied INTEGER NOT NULL + ); + "#, + indexer_name = camel_to_snake(&indexer.name) + )); + + Ok(Code::new(sql)) +} + +pub fn drop_tables_for_indexer_sql(project_path: &Path, indexer: &Indexer) -> Code { + let mut sql = format!( + "DROP TABLE IF EXISTS rindexer_internal_{}_last_known_indexes_dropping_sql;", + camel_to_snake(&indexer.name) + ); + sql.push_str(&format!( + "DROP TABLE IF EXISTS rindexer_internal_{}_last_known_relationship_dropping_sql;", + camel_to_snake(&indexer.name) + )); + sql.push_str("DROP TABLE IF EXISTS rindexer_internal_latest_block;"); + + for contract in &indexer.contracts { + let contract_name = contract.before_modify_name_if_filter_readonly(); + let table_prefix = + format!("{}_{}", camel_to_snake(&indexer.name), camel_to_snake(&contract_name)); + + let abi_items = ABIItem::read_abi_items(project_path, contract); + if let Ok(abi_items) = abi_items { + for abi_item in abi_items.iter() { + let table_name = format!("{}_{}", table_prefix, camel_to_snake(&abi_item.name)); + sql.push_str(&format!("DROP TABLE IF EXISTS {table_name};")); + + let internal_table_name = format!( + "rindexer_internal_{}_{}", + table_prefix, + camel_to_snake(&abi_item.name) + ); + sql.push_str(&format!("DROP TABLE IF EXISTS {internal_table_name};")); + } + } else { + error!( + "Could not read ABI items for contract moving on clearing the other data up: {}", + contract.name + ); + } + + // Drop factory indexing tables + for factory in contract.details.iter().flat_map(|d| d.factory.as_ref()) { + let factory_table_name = format!( + "rindexer_internal_{}_{}_{}_{}", + camel_to_snake(&indexer.name), + camel_to_snake(&factory.name), + camel_to_snake(&factory.event_name), + factory + .input_names() + .iter() + .map(|v| camel_to_snake(v)) + .collect::>() + .join("_") + ); + sql.push_str(&format!("DROP TABLE IF EXISTS {factory_table_name};")); + } + } + + Code::new(sql) +} + +// Note: generate_event_table_full_name is defined in database/generate.rs and used by no_code.rs diff --git a/core/src/database/sqlite/mod.rs b/core/src/database/sqlite/mod.rs new file mode 100644 index 00000000..5e222248 --- /dev/null +++ b/core/src/database/sqlite/mod.rs @@ -0,0 +1,3 @@ +pub mod client; +pub mod generate; +pub mod setup; diff --git a/core/src/database/sqlite/setup.rs b/core/src/database/sqlite/setup.rs new file mode 100644 index 00000000..e9839842 --- /dev/null +++ b/core/src/database/sqlite/setup.rs @@ -0,0 +1,66 @@ +use std::path::Path; + +use tracing::{debug, info}; + +use crate::database::sqlite::generate::{ + drop_tables_for_indexer_sql, generate_tables_for_indexer_sql, GenerateTablesForIndexerSqlError, +}; +use crate::{ + database::sqlite::client::{SqliteClient, SqliteConnectionError, SqliteError}, + manifest::core::Manifest, +}; + +#[derive(thiserror::Error, Debug)] +pub enum SetupSqliteError { + #[error("{0}")] + SqliteConnection(#[from] SqliteConnectionError), + + #[error("{0}")] + SqliteError(#[from] SqliteError), + + #[error("Error creating tables for indexer: {0}")] + GeneratingTables(#[from] GenerateTablesForIndexerSqlError), +} + +pub async fn setup_sqlite( + project_path: &Path, + manifest: &Manifest, +) -> Result { + info!("Setting up SQLite"); + + let client = SqliteClient::new().await?; + let disable_event_tables = manifest.storage.sqlite_disable_create_tables(); + + if manifest.storage.sqlite_drop_each_run() { + info!( + "`drop_each_run` enabled so dropping all data for {} before starting", + &manifest.name + ); + let sql = drop_tables_for_indexer_sql(project_path, &manifest.to_indexer()); + client.batch_execute(sql.as_str()).await?; + info!("Dropped all data for {}", manifest.name); + } + + if disable_event_tables { + info!("Creating internal rindexer tables for {}", manifest.name); + } else { + info!("Creating tables for {}", manifest.name); + } + + let sql = generate_tables_for_indexer_sql( + project_path, + &manifest.to_indexer(), + disable_event_tables, + )?; + + debug!("{}", sql); + client.batch_execute(sql.as_str()).await?; + + if disable_event_tables { + info!("Created tables for {}", manifest.name); + } else { + info!("Created internal rindexer tables for {}", manifest.name); + } + + Ok(client) +} diff --git a/core/src/event/config.rs b/core/src/event/config.rs index 10ed98c9..e10df99d 100644 --- a/core/src/event/config.rs +++ b/core/src/event/config.rs @@ -10,6 +10,7 @@ use crate::event::factory_event_filter_sync::update_known_factory_deployed_addre use crate::event::rindexer_event_filter::FactoryFilter; use crate::manifest::config::Config; use crate::manifest::contract::EventInputIndexedFilters; +use crate::SqliteClient; use crate::{ event::{ callback_registry::{ @@ -39,6 +40,7 @@ pub struct ContractEventProcessingConfig { pub progress: Arc>, pub postgres: Option>, pub clickhouse: Option>, + pub sqlite: Option>, pub csv_details: Option, pub stream_last_synced_block_file_path: Option, pub index_event_in_order: bool, @@ -115,6 +117,7 @@ pub struct FactoryEventProcessingConfig { pub progress: Arc>, pub postgres: Option>, pub clickhouse: Option>, + pub sqlite: Option>, pub csv_details: Option, pub stream_last_synced_block_file_path: Option, pub index_event_in_order: bool, @@ -296,6 +299,13 @@ impl EventProcessingConfig { } } + pub fn sqlite(&self) -> Option> { + match self { + Self::ContractEventProcessing(config) => config.sqlite.clone(), + Self::FactoryEventProcessing(config) => config.sqlite.clone(), + } + } + pub fn csv_details(&self) -> Option { match self { Self::ContractEventProcessing(config) => config.csv_details.clone(), @@ -348,6 +358,7 @@ pub struct TraceProcessingConfig { pub network: String, pub progress: Arc>, pub postgres: Option>, + pub sqlite: Option>, pub csv_details: Option, pub registry: Arc, pub method: TraceProcessingMethod, diff --git a/core/src/event/factory_event_filter_sync.rs b/core/src/event/factory_event_filter_sync.rs index 53287b63..4d594d48 100644 --- a/core/src/event/factory_event_filter_sync.rs +++ b/core/src/event/factory_event_filter_sync.rs @@ -41,6 +41,9 @@ pub enum UpdateKnownFactoryDeployedAddressesError { #[error("Could not write addresses to clickhouse: {0}")] ClickhouseWrite(#[from] ClickhouseError), + #[error("Could not write addresses to sqlite: {0}")] + SqliteWrite(String), + #[error("Could not parse logs")] LogsParse, } @@ -212,6 +215,41 @@ pub async fn update_known_factory_deployed_addresses( return Ok(()); } + if let Some(sqlite) = &config.sqlite { + let params = GenerateInternalFactoryEventTableNameParams { + indexer_name: config.indexer_name.clone(), + contract_name: config.contract_name.clone(), + event_name: config.event.name.clone(), + input_names: config.input_names().clone(), + }; + let table_name = generate_internal_factory_event_table_name(¶ms); + + sqlite + .insert_bulk( + &format!("rindexer_internal_{table_name}"), + &[ + "factory_address".to_string(), + "factory_deployed_address".to_string(), + "network".to_string(), + ], + &addresses + .clone() + .into_iter() + .map(|item| { + vec![ + EthereumSqlTypeWrapper::Address(item.factory_address), + EthereumSqlTypeWrapper::Address(item.address), + EthereumSqlTypeWrapper::String(config.network_contract.network.clone()), + ] + }) + .collect::>(), + ) + .await + .map_err(UpdateKnownFactoryDeployedAddressesError::SqliteWrite)?; + + return Ok(()); + } + if let Some(csv_details) = &config.csv_details { let full_path = get_full_path(&config.project_path, &csv_details.path)?; diff --git a/core/src/generator/build.rs b/core/src/generator/build.rs index 07f86784..165c8a00 100644 --- a/core/src/generator/build.rs +++ b/core/src/generator/build.rs @@ -13,7 +13,9 @@ use super::{ }; use crate::manifest::contract::Contract; use crate::{ - generator::database_bindings::{generate_clickhouse_code, generate_postgres_code}, + generator::database_bindings::{ + generate_clickhouse_code, generate_postgres_code, generate_sqlite_code, + }, generator::trace_bindings::{ generate_trace_bindings, generate_trace_handlers, trace_abigen_contract_file_name, GenerateTraceBindingsError, GenerateTraceHandlersError, @@ -256,6 +258,14 @@ pub fn generate_rindexer_typings( .map_err(WriteGlobalError::from)?; } + if manifest.storage.sqlite_enabled() { + write_file( + &generate_file_location(&output, "database"), + generate_sqlite_code().as_str(), + ) + .map_err(WriteGlobalError::from)?; + } + write_indexer_events(project_path, &output, manifest.to_indexer(), &manifest.storage)?; create_mod_file(output.as_path(), true)?; diff --git a/core/src/generator/database_bindings.rs b/core/src/generator/database_bindings.rs index fc399a86..4f186353 100644 --- a/core/src/generator/database_bindings.rs +++ b/core/src/generator/database_bindings.rs @@ -43,3 +43,25 @@ pub fn generate_clickhouse_code() -> Code { .to_string(), ) } + +pub fn generate_sqlite_code() -> Code { + Code::new( + r#" + use std::sync::Arc; + use rindexer::SqliteClient; + use tokio::sync::OnceCell; + + static SQLITE_CLIENT: OnceCell> = OnceCell::const_new(); + + pub async fn get_or_init_sqlite_client() -> Arc { + SQLITE_CLIENT + .get_or_init(|| async { + Arc::new(SqliteClient::new().await.expect("Failed to connect to SQLite")) + }) + .await + .clone() + } + "# + .to_string(), + ) +} diff --git a/core/src/generator/events_bindings.rs b/core/src/generator/events_bindings.rs index 2f1dd279..62f3d1e9 100644 --- a/core/src/generator/events_bindings.rs +++ b/core/src/generator/events_bindings.rs @@ -338,6 +338,8 @@ fn generate_event_callback_structs_code( "database: get_or_init_postgres_client().await," } else if storage.clickhouse_enabled() { "database: get_or_init_clickhouse_client().await," + } else if storage.sqlite_enabled() { + "database: get_or_init_sqlite_client().await," } else { "" }, @@ -664,6 +666,8 @@ fn generate_event_bindings_code( "use super::super::super::super::typings::database::get_or_init_postgres_client;" } else if storage.clickhouse_enabled() { "use super::super::super::super::typings::database::get_or_init_clickhouse_client;" + } else if storage.sqlite_enabled() { + "use super::super::super::super::typings::database::get_or_init_sqlite_client;" } else { "" }, @@ -671,6 +675,8 @@ fn generate_event_bindings_code( "PostgresClient," } else if storage.clickhouse_enabled() { "ClickhouseClient," + } else if storage.sqlite_enabled() { + "SqliteClient," } else { "" }, @@ -683,6 +689,8 @@ fn generate_event_bindings_code( "pub database: Arc," } else if storage.clickhouse_enabled() { "pub database: Arc," + } else if storage.sqlite_enabled() { + "pub database: Arc," } else { "" }, diff --git a/core/src/indexer/last_synced.rs b/core/src/indexer/last_synced.rs index a95b371d..da2672b2 100644 --- a/core/src/indexer/last_synced.rs +++ b/core/src/indexer/last_synced.rs @@ -13,6 +13,7 @@ use tracing::{debug, error}; use crate::database::clickhouse::client::ClickhouseClient; use crate::database::postgres::generate::generate_internal_event_table_name_no_shorten; +use crate::SqliteClient; use crate::{ database::{ generate::generate_indexer_contract_schema_name, @@ -77,6 +78,7 @@ pub struct SyncConfig<'a> { pub project_path: &'a Path, pub postgres: &'a Option>, pub clickhouse: &'a Option>, + pub sqlite: &'a Option>, pub csv_details: &'a Option, pub stream_details: &'a Option<&'a StreamsConfig>, pub contract_csv_enabled: bool, @@ -115,7 +117,10 @@ pub async fn get_last_synced_block_number(config: SyncConfig<'_>) -> Option } // Then check streams if no csv or database to find out last synced block - if config.postgres.is_none() && !config.contract_csv_enabled && config.stream_details.is_some() + if config.postgres.is_none() + && config.sqlite.is_none() + && !config.contract_csv_enabled + && config.stream_details.is_some() { let stream_details = config.stream_details.as_ref().unwrap(); @@ -212,6 +217,31 @@ pub async fn get_last_synced_block_number(config: SyncConfig<'_>) -> Option }; } + // Query database for last synced block + if let Some(sqlite) = config.sqlite { + let schema = + generate_indexer_contract_schema_name(config.indexer_name, config.contract_name); + let table_name = generate_internal_event_table_name(&schema, config.event_name); + // SQLite doesn't support schema.table notation - use underscore + let table_name = format!("rindexer_internal_{}", table_name); + let query = format!("SELECT last_synced_block FROM {} WHERE network = ?1", table_name); + + return match sqlite.query_one_value(&query, vec![config.network.to_string()]).await { + Ok(result) => { + let parsed = U64::from_str(&result).expect("Failed to parse last_synced_block"); + if parsed.is_zero() { + None + } else { + Some(parsed) + } + } + Err(e) => { + error!("Error fetching last synced block: {:?}", e); + None + } + }; + } + None } @@ -323,6 +353,24 @@ pub async fn update_progress_and_last_synced_task( if let Err(e) = result { error!("Error updating clickhouse last synced block: {:?}", e); } + } else if let Some(sqlite) = &config.sqlite() { + let schema = + generate_indexer_contract_schema_name(&config.indexer_name(), &config.contract_name()); + let table_name = generate_internal_event_table_name(&schema, &config.event_name()); + let network = &config.network_contract().network; + // SQLite doesn't support schema.table notation - use underscore + let table_name = format!("rindexer_internal_{}", table_name); + let query = format!( + "UPDATE {} SET last_synced_block = {} WHERE network = '{}' AND {} > last_synced_block; + UPDATE rindexer_internal_latest_block SET block = {} WHERE network = '{}' AND {} > block;", + table_name, to_block, network, to_block, latest, network, latest + ); + + let result = sqlite.batch_execute(&query).await; + + if let Err(e) = result { + error!("Error updating sqlite last synced block: {:?}", e); + } } else if let Some(csv_details) = &config.csv_details() { if let Err(e) = update_last_synced_block_number_for_file( &config.contract_name(), @@ -401,6 +449,24 @@ pub async fn evm_trace_update_progress_and_last_synced_task( } } + if let Some(ref sqlite) = config.sqlite { + // Use the native_transfer table for all trace events since they share the same pipeline + let schema = + generate_indexer_contract_schema_name(&config.indexer_name, &config.contract_name); + let table_name = generate_internal_event_table_name(&schema, "native_transfer"); + // SQLite doesn't support schema.table notation - use underscore + let table_name = format!("rindexer_internal_{}", table_name); + let query = format!( + "UPDATE {} SET last_synced_block = {} WHERE network = '{}' AND {} > last_synced_block", + table_name, to_block, config.network, to_block + ); + let result = sqlite.batch_execute(&query).await; + + if let Err(e) = result { + error!("Error updating last synced trace block sqlite: {:?}", e); + } + } + if let Some(csv_details) = &config.csv_details { if let Err(e) = update_last_synced_block_number_for_file( &config.contract_name, diff --git a/core/src/indexer/no_code.rs b/core/src/indexer/no_code.rs index 43208318..42227964 100644 --- a/core/src/indexer/no_code.rs +++ b/core/src/indexer/no_code.rs @@ -20,6 +20,8 @@ use crate::database::generate::generate_event_table_full_name; use crate::database::sql_type_wrapper::{ map_ethereum_wrapper_to_json, map_log_params_to_ethereum_wrapper, EthereumSqlTypeWrapper, }; +use crate::database::sqlite::client::SqliteClient; +use crate::database::sqlite::setup::{setup_sqlite, SetupSqliteError}; use crate::manifest::contract::Contract; use crate::{ abi::{ABIItem, CreateCsvFileForEvent, EventInfo, ParamTypeError, ReadAbiError}, @@ -75,6 +77,9 @@ pub enum SetupNoCodeError { #[error("Could not setup clickhouse: {0}")] SetupClickhouseError(#[from] SetupClickhouseError), + #[error("Could not setup sqlite: {0}")] + SetupSqliteError(#[from] SetupSqliteError), + #[error("You have graphql disabled as well as indexer so nothing can startup")] NothingToStartNoCode, } @@ -104,6 +109,11 @@ pub async fn setup_no_code( clickhouse = Some(Arc::new(setup_clickhouse(project_path, &manifest).await?)); } + let mut sqlite: Option> = None; + if manifest.storage.sqlite_enabled() { + sqlite = Some(Arc::new(setup_sqlite(project_path, &manifest).await?)); + } + if !details.indexing_details.enabled { return Ok(StartDetails { manifest_path: details.manifest_path, @@ -127,6 +137,7 @@ pub async fn setup_no_code( &manifest, postgres.clone(), clickhouse.clone(), + sqlite.clone(), &network_providers, ) .await?; @@ -147,6 +158,7 @@ pub async fn setup_no_code( &mut manifest, postgres, clickhouse, + sqlite, &network_providers, ) .await?; @@ -185,6 +197,7 @@ struct NoCodeCallbackParams { index_event_in_order: bool, csv: Option>, postgres: Option>, + sqlite: Option>, sql_event_table_name: String, sql_column_names: Vec, clickhouse: Option>, @@ -222,10 +235,10 @@ fn no_code_callback(params: Arc) -> EventCallbacks { // TODO // Remove unwrap let (from_block, to_block) = match &results { - CallbackResult::Event(event) => ( - event.first().unwrap().found_in_request.from_block, - event.first().unwrap().found_in_request.to_block, - ), + CallbackResult::Event(event) => { + let first = event.first().ok_or("No events found")?; + (first.found_in_request.from_block, first.found_in_request.to_block) + } CallbackResult::Trace(event) => { // Filter to only NativeTransfer events and get the first one let native_transfer = event @@ -236,19 +249,31 @@ fn no_code_callback(params: Arc) -> EventCallbacks { } TraceResult::Block { .. } => None, }) - .next() - .unwrap(); - (native_transfer.from_block, native_transfer.to_block) + .next(); + + match native_transfer { + Some(transfer) => (transfer.from_block, transfer.to_block), + None => { + debug!( + "{} {}: {} - {}", + params.indexer_name, + params.contract_name, + params.event_info.name, + "NO NATIVE TRANSFER EVENTS (only Block events)".red() + ); + return Ok(()); + } + } } }; let network = match &results { CallbackResult::Event(event) => { - event.first().unwrap().tx_information.network.clone() + event.first().ok_or("No events found")?.tx_information.network.clone() } CallbackResult::Trace(event) => { // Filter to only NativeTransfer events and get the first one - event + let network = event .iter() .filter_map(|result| match result { TraceResult::NativeTransfer { tx_information, .. } => { @@ -256,9 +281,16 @@ fn no_code_callback(params: Arc) -> EventCallbacks { } TraceResult::Block { .. } => None, }) - .next() - .unwrap() - .clone() + .next(); + + match network { + Some(net) => net.clone(), + None => { + // This shouldn't happen as we already checked for NativeTransfer above + // but handle it gracefully just in case + return Ok(()); + } + } } }; @@ -436,7 +468,10 @@ fn no_code_callback(params: Arc) -> EventCallbacks { all_params.iter().map(|param| param.to_type()).collect(); } - if params.postgres.is_some() || params.clickhouse.is_some() { + if params.postgres.is_some() + || params.clickhouse.is_some() + || params.sqlite.is_some() + { sql_bulk_data.push(all_params); } @@ -498,6 +533,25 @@ fn no_code_callback(params: Arc) -> EventCallbacks { } } + if let Some(sqlite) = ¶ms.sqlite { + if !sql_bulk_data.is_empty() { + if let Err(e) = sqlite + .insert_bulk( + ¶ms.sql_event_table_name, + ¶ms.sql_column_names, + &sql_bulk_data, + ) + .await + { + error!( + "{}::{} - Error performing sqlite bulk insert: {}", + params.contract_name, params.event_info.name, e + ); + return Err(e.to_string()); + } + } + } + if let Some(csv) = ¶ms.csv { if !csv_bulk_data.is_empty() { if let Err(e) = csv.append_bulk(csv_bulk_data).await { @@ -655,6 +709,7 @@ pub async fn process_events( manifest: &Manifest, postgres: Option>, clickhouse: Option>, + sqlite: Option>, network_providers: &[CreateNetworkProvider], ) -> Result, ProcessIndexersError> { let mut events: Vec = vec![]; @@ -665,6 +720,7 @@ pub async fn process_events( manifest, postgres.clone(), clickhouse.clone(), + sqlite.clone(), network_providers, &mut contract, ) @@ -681,6 +737,7 @@ async fn process_contract( manifest: &Manifest, postgres: Option>, clickhouse: Option>, + sqlite: Option>, network_providers: &[CreateNetworkProvider], contract: &mut Contract, ) -> Result, ProcessIndexersError> { @@ -774,6 +831,7 @@ async fn process_contract( index_event_in_order, csv, postgres: postgres.clone(), + sqlite: sqlite.clone(), clickhouse: clickhouse.clone(), sql_event_table_name, sql_column_names, @@ -794,6 +852,7 @@ pub async fn process_trace_events( manifest: &mut Manifest, postgres: Option>, clickhouse: Option>, + sqlite: Option>, network_providers: &[CreateNetworkProvider], ) -> Result, ProcessIndexersError> { let mut events: Vec = vec![]; @@ -880,6 +939,7 @@ pub async fn process_trace_events( index_event_in_order: false, csv, postgres: postgres.clone(), + sqlite: sqlite.clone(), clickhouse: clickhouse.clone(), sql_event_table_name, sql_column_names, diff --git a/core/src/indexer/start.rs b/core/src/indexer/start.rs index efdd7a7b..8064d0eb 100644 --- a/core/src/indexer/start.rs +++ b/core/src/indexer/start.rs @@ -62,6 +62,9 @@ pub enum StartIndexingError { #[error("{0}")] ClickhouseConnectionError(#[from] ClickhouseConnectionError), + #[error("{0}")] + SqliteConnectionError(#[from] crate::database::sqlite::client::SqliteConnectionError), + #[error("Could not get block number from provider: {0}")] GetBlockNumberError(#[from] ProviderError), @@ -169,6 +172,7 @@ pub async fn start_indexing_traces( project_path: &Path, postgres: Option>, clickhouse: Option>, + sqlite: Option>, indexer: &Indexer, trace_registry: Arc, ) -> Result>>, StartIndexingError> { @@ -214,6 +218,7 @@ pub async fn start_indexing_traces( project_path, postgres: &postgres, clickhouse: &clickhouse, + sqlite: &sqlite, csv_details: &manifest.storage.csv, contract_csv_enabled: manifest.contract_csv_enabled(&first_event.contract_name), stream_details: &stream_details, @@ -251,6 +256,7 @@ pub async fn start_indexing_traces( network: network_name.clone(), progress: trace_progress_state.clone(), postgres: postgres.clone(), + sqlite: sqlite.clone(), csv_details: None, registry: network_registry, method: network_details.method, @@ -286,6 +292,7 @@ pub async fn start_indexing_contract_events( project_path: &Path, postgres: Option>, clickhouse: Option>, + sqlite: Option>, indexer: &Indexer, registry: Arc, dependencies: &[ContractEventDependencies], @@ -325,7 +332,9 @@ pub async fn start_indexing_contract_events( let project_path = project_path.to_path_buf(); let postgres = postgres.clone(); let clickhouse = clickhouse.clone(); + let sqlite = sqlite.clone(); let manifest_csv_details = manifest.storage.csv.clone(); + let manifest = manifest.clone(); let registry = Arc::clone(®istry); let event_progress_state = Arc::clone(&event_progress_state); let dependencies = dependencies.to_vec(); @@ -335,6 +344,7 @@ pub async fn start_indexing_contract_events( project_path: &project_path, postgres: &postgres, clickhouse: &clickhouse, + sqlite: &sqlite, csv_details: &manifest_csv_details, contract_csv_enabled: manifest.contract_csv_enabled(&event.contract.name), stream_details: &stream_details, @@ -441,6 +451,7 @@ pub async fn start_indexing_contract_events( progress: Arc::clone(&event_progress_state), clickhouse: clickhouse.clone(), postgres: postgres.clone(), + sqlite: sqlite.clone(), config: manifest.config.clone(), csv_details: manifest_csv_details.clone(), // timestamps: timestamp_enabled_for_event @@ -473,6 +484,7 @@ pub async fn start_indexing_contract_events( progress: Arc::clone(&event_progress_state), postgres: postgres.clone(), clickhouse: clickhouse.clone(), + sqlite: sqlite.clone(), csv_details: manifest_csv_details.clone(), config: manifest.config.clone(), // timestamps: timestamp_enabled_for_event @@ -542,6 +554,7 @@ pub async fn start_indexing( let start = Instant::now(); let database = initialize_database(manifest).await?; let clickhouse = initialize_clickhouse(manifest).await?; + let sqlite = initialize_sqlite(manifest).await?; // any events which are non-blocking and can be fired in parallel let mut non_blocking_process_events = Vec::new(); @@ -555,6 +568,7 @@ pub async fn start_indexing( project_path, database.clone(), clickhouse.clone(), + sqlite.clone(), &indexer, trace_registry.clone() ), @@ -563,6 +577,7 @@ pub async fn start_indexing( project_path, database.clone(), clickhouse.clone(), + sqlite.clone(), &indexer, registry.clone(), dependencies, @@ -661,6 +676,22 @@ pub async fn initialize_clickhouse( } } +pub async fn initialize_sqlite( + manifest: &Manifest, +) -> Result>, StartIndexingError> { + if manifest.storage.sqlite_enabled() { + match crate::SqliteClient::new().await { + Ok(sqlite) => Ok(Some(Arc::new(sqlite))), + Err(e) => { + error!("Error connecting to SQLite: {:?}", e); + Err(StartIndexingError::SqliteConnectionError(e)) + } + } + } else { + Ok(None) + } +} + pub fn calculate_safe_block_number( reorg_safe_distance: bool, provider: &Arc, diff --git a/core/src/lib.rs b/core/src/lib.rs index 28beb7d1..21c769ea 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -19,6 +19,7 @@ pub use database::{ client::{PostgresClient, ToSql}, setup::setup_postgres, }, + sqlite::{client::SqliteClient, setup::setup_sqlite}, }; mod simple_file_formatters; diff --git a/core/src/manifest/core.rs b/core/src/manifest/core.rs index 1dd4bb6c..96517263 100644 --- a/core/src/manifest/core.rs +++ b/core/src/manifest/core.rs @@ -314,12 +314,11 @@ mod tests { "#; let manifest: Result = serde_yaml::from_str(yaml); - - assert_eq!(manifest.is_err(), true); + assert!(manifest.is_err()); assert!(manifest .unwrap_err() .to_string() - .contains("cannot specify both `postgres` and `clickhouse` at the same time")); + .contains("cannot specify more than one database option (postgres, clickhouse, sqlite) at the same time")); } #[test] diff --git a/core/src/manifest/storage.rs b/core/src/manifest/storage.rs index 80d8ce75..365b9408 100644 --- a/core/src/manifest/storage.rs +++ b/core/src/manifest/storage.rs @@ -116,6 +116,17 @@ pub struct CsvDetails { pub disable_create_headers: Option, } +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct SqliteDetails { + pub enabled: bool, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub drop_each_run: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub disable_create_tables: Option, +} + #[derive(Debug, Serialize, Default, Clone)] pub struct Storage { #[serde(default, skip_serializing_if = "Option::is_none")] @@ -124,6 +135,9 @@ pub struct Storage { #[serde(default, skip_serializing_if = "Option::is_none")] pub clickhouse: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub sqlite: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] pub csv: Option, } @@ -140,18 +154,31 @@ impl<'de> Deserialize<'de> for Storage { #[serde(default)] clickhouse: Option, #[serde(default)] + sqlite: Option, + #[serde(default)] csv: Option, } let raw = StorageRaw::deserialize(deserializer)?; - if raw.postgres.is_some() && raw.clickhouse.is_some() { + // Count how many database options are enabled + let db_count = [raw.postgres.is_some(), raw.clickhouse.is_some(), raw.sqlite.is_some()] + .iter() + .filter(|&&x| x) + .count(); + + if db_count > 1 { return Err(Error::custom( - "cannot specify both `postgres` and `clickhouse` at the same time", + "cannot specify more than one database option (postgres, clickhouse, sqlite) at the same time", )); } - Ok(Storage { postgres: raw.postgres, clickhouse: raw.clickhouse, csv: raw.csv }) + Ok(Storage { + postgres: raw.postgres, + clickhouse: raw.clickhouse, + sqlite: raw.sqlite, + csv: raw.csv, + }) } } @@ -239,6 +266,33 @@ impl Storage { self.csv.as_ref().is_some_and(|details| details.disable_create_headers.unwrap_or_default()) } + pub fn sqlite_enabled(&self) -> bool { + match &self.sqlite { + Some(details) => details.enabled, + None => false, + } + } + + pub fn sqlite_disable_create_tables(&self) -> bool { + let enabled = self.sqlite_enabled(); + if !enabled { + return true; + } + + self.sqlite + .as_ref() + .is_some_and(|details| details.disable_create_tables.unwrap_or_default()) + } + + pub fn sqlite_drop_each_run(&self) -> bool { + let enabled = self.sqlite_enabled(); + if !enabled { + return false; + } + + self.sqlite.as_ref().is_some_and(|details| details.drop_each_run.unwrap_or_default()) + } + pub async fn create_relationships_and_indexes( &self, project_path: &Path, diff --git a/examples/rindexer_sqlite_db/abis/SubscriptionModule.abi.json b/examples/rindexer_sqlite_db/abis/SubscriptionModule.abi.json new file mode 100644 index 00000000..74c08830 --- /dev/null +++ b/examples/rindexer_sqlite_db/abis/SubscriptionModule.abi.json @@ -0,0 +1,423 @@ +[ + { + "type": "function", + "name": "HUB", + "inputs": [], + "outputs": [ + { + "name": "", + "type": "address", + "internalType": "address" + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "MULTISEND", + "inputs": [], + "outputs": [ + { + "name": "", + "type": "address", + "internalType": "address" + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "NAME", + "inputs": [], + "outputs": [ + { + "name": "", + "type": "string", + "internalType": "string" + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "VERSION", + "inputs": [], + "outputs": [ + { + "name": "", + "type": "string", + "internalType": "string" + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "getSubscription", + "inputs": [ + { + "name": "id", + "type": "bytes32", + "internalType": "bytes32" + } + ], + "outputs": [ + { + "name": "", + "type": "tuple", + "internalType": "struct Subscription", + "components": [ + { + "name": "subscriber", + "type": "address", + "internalType": "address" + }, + { + "name": "recipient", + "type": "address", + "internalType": "address" + }, + { + "name": "amount", + "type": "uint256", + "internalType": "uint256" + }, + { + "name": "lastRedeemed", + "type": "uint256", + "internalType": "uint256" + }, + { + "name": "frequency", + "type": "uint256", + "internalType": "uint256" + }, + { + "name": "category", + "type": "uint8", + "internalType": "enum Category" + } + ] + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "getSubscriptionIds", + "inputs": [ + { + "name": "subscriber", + "type": "address", + "internalType": "address" + } + ], + "outputs": [ + { + "name": "", + "type": "bytes32[]", + "internalType": "bytes32[]" + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "isValidOrRedeemable", + "inputs": [ + { + "name": "id", + "type": "bytes32", + "internalType": "bytes32" + } + ], + "outputs": [ + { + "name": "", + "type": "uint256", + "internalType": "uint256" + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "redeem", + "inputs": [ + { + "name": "id", + "type": "bytes32", + "internalType": "bytes32" + }, + { + "name": "data", + "type": "bytes", + "internalType": "bytes" + } + ], + "outputs": [], + "stateMutability": "nonpayable" + }, + { + "type": "function", + "name": "subscribe", + "inputs": [ + { + "name": "recipient", + "type": "address", + "internalType": "address" + }, + { + "name": "amount", + "type": "uint256", + "internalType": "uint256" + }, + { + "name": "frequency", + "type": "uint256", + "internalType": "uint256" + }, + { + "name": "category", + "type": "uint8", + "internalType": "enum Category" + } + ], + "outputs": [ + { + "name": "id", + "type": "bytes32", + "internalType": "bytes32" + } + ], + "stateMutability": "nonpayable" + }, + { + "type": "function", + "name": "unsubscribe", + "inputs": [ + { + "name": "id", + "type": "bytes32", + "internalType": "bytes32" + } + ], + "outputs": [], + "stateMutability": "nonpayable" + }, + { + "type": "function", + "name": "unsubscribeMany", + "inputs": [ + { + "name": "_ids", + "type": "bytes32[]", + "internalType": "bytes32[]" + } + ], + "outputs": [], + "stateMutability": "nonpayable" + }, + { + "type": "function", + "name": "updateRecipient", + "inputs": [ + { + "name": "id", + "type": "bytes32", + "internalType": "bytes32" + }, + { + "name": "newRecipient", + "type": "address", + "internalType": "address" + } + ], + "outputs": [], + "stateMutability": "nonpayable" + }, + { + "type": "event", + "name": "RecipientUpdated", + "inputs": [ + { + "name": "id", + "type": "bytes32", + "indexed": true, + "internalType": "bytes32" + }, + { + "name": "oldRecipient", + "type": "address", + "indexed": true, + "internalType": "address" + }, + { + "name": "newRecipient", + "type": "address", + "indexed": true, + "internalType": "address" + } + ], + "anonymous": false + }, + { + "type": "event", + "name": "Redeemed", + "inputs": [ + { + "name": "id", + "type": "bytes32", + "indexed": true, + "internalType": "bytes32" + }, + { + "name": "subscriber", + "type": "address", + "indexed": true, + "internalType": "address" + }, + { + "name": "recipient", + "type": "address", + "indexed": true, + "internalType": "address" + }, + { + "name": "nextRedeemAt", + "type": "uint256", + "indexed": false, + "internalType": "uint256" + } + ], + "anonymous": false + }, + { + "type": "event", + "name": "SubscriptionCreated", + "inputs": [ + { + "name": "id", + "type": "bytes32", + "indexed": true, + "internalType": "bytes32" + }, + { + "name": "subscriber", + "type": "address", + "indexed": true, + "internalType": "address" + }, + { + "name": "recipient", + "type": "address", + "indexed": true, + "internalType": "address" + }, + { + "name": "amount", + "type": "uint256", + "indexed": false, + "internalType": "uint256" + }, + { + "name": "frequency", + "type": "uint256", + "indexed": false, + "internalType": "uint256" + }, + { + "name": "category", + "type": "uint8", + "indexed": false, + "internalType": "enum Category" + }, + { + "name": "creationTimestamp", + "type": "uint256", + "indexed": false, + "internalType": "uint256" + } + ], + "anonymous": false + }, + { + "type": "event", + "name": "Unsubscribed", + "inputs": [ + { + "name": "id", + "type": "bytes32", + "indexed": true, + "internalType": "bytes32" + }, + { + "name": "subscriber", + "type": "address", + "indexed": true, + "internalType": "address" + } + ], + "anonymous": false + }, + { + "type": "error", + "name": "ExecutionFailed", + "inputs": [] + }, + { + "type": "error", + "name": "IdentifierExists", + "inputs": [] + }, + { + "type": "error", + "name": "IdentifierNonexistent", + "inputs": [] + }, + { + "type": "error", + "name": "InvalidAmount", + "inputs": [] + }, + { + "type": "error", + "name": "InvalidCategory", + "inputs": [] + }, + { + "type": "error", + "name": "InvalidFrequency", + "inputs": [] + }, + { + "type": "error", + "name": "InvalidRecipient", + "inputs": [] + }, + { + "type": "error", + "name": "InvalidStreamSource", + "inputs": [] + }, + { + "type": "error", + "name": "InvalidSubscriber", + "inputs": [] + }, + { + "type": "error", + "name": "NotRedeemable", + "inputs": [] + }, + { + "type": "error", + "name": "OnlyRecipient", + "inputs": [] + }, + { + "type": "error", + "name": "OnlySubscriber", + "inputs": [] + } + ] \ No newline at end of file diff --git a/examples/rindexer_sqlite_db/rindexer.yaml b/examples/rindexer_sqlite_db/rindexer.yaml new file mode 100644 index 00000000..85c09977 --- /dev/null +++ b/examples/rindexer_sqlite_db/rindexer.yaml @@ -0,0 +1,28 @@ +name: Circles +description: My first rindexer project +repository: https://github.com/joshstevens19/rindexer +project_type: no-code +timestamps: true +networks: +- name: gnosis + chain_id: 100 + rpc: https://rpc.gnosischain.com/ +storage: + sqlite: + enabled: true + drop_each_run: true +native_transfers: + enabled: true +contracts: +- name: Subscriptions + details: + - network: gnosis + address: 0xcEbE4B6d50Ce877A9689ce4516Fe96911e099A78 + start_block: 41149803 + end_block: 41155000 + abi: ./abis/SubscriptionModule.abi.json + include_events: + - SubscriptionCreated + - Unsubscribed + - Redeemed + - RecipientUpdated