|
| 1 | +use std::time::Duration; |
| 2 | + |
| 3 | +use serde::{Deserialize, Serialize}; |
| 4 | +use tokio::{ |
| 5 | + sync::mpsc::{self, error::TryRecvError, Receiver}, |
| 6 | + time::timeout, |
| 7 | +}; |
| 8 | + |
| 9 | +use clickhouse::{error::Result, sql::Identifier, Client, Row}; |
| 10 | + |
| 11 | +const TABLE_NAME: &str = "chrs_inserter"; |
| 12 | + |
| 13 | +#[derive(Debug, Row, Serialize, Deserialize)] |
| 14 | +struct MyRow { |
| 15 | + no: u32, |
| 16 | +} |
| 17 | + |
| 18 | +// Pattern 1: dense streams |
| 19 | +// ------------------------ |
| 20 | +// This pattern is useful when the stream is dense, i.e. with no/small pauses |
| 21 | +// between rows. For instance, when reading from a file or another database. |
| 22 | +// In other words, this pattern is applicable for ETL-like tasks. |
| 23 | +async fn dense(client: &Client, mut rx: Receiver<u32>) -> Result<()> { |
| 24 | + let mut inserter = client |
| 25 | + .inserter(TABLE_NAME)? |
| 26 | + // We limit the number of rows to be inserted in a single `INSERT` statement. |
| 27 | + // We use small value (100) for the example only. |
| 28 | + // See documentation of `with_max_rows` for details. |
| 29 | + .with_max_rows(100) |
| 30 | + // You can also use other limits. For instance, limit by the size. |
| 31 | + // First reached condition will end the current `INSERT`. |
| 32 | + .with_max_bytes(1_048_576); |
| 33 | + |
| 34 | + while let Some(no) = rx.recv().await { |
| 35 | + inserter.write(&MyRow { no })?; |
| 36 | + inserter.commit().await?; |
| 37 | + } |
| 38 | + |
| 39 | + inserter.end().await?; |
| 40 | + Ok(()) |
| 41 | +} |
| 42 | + |
| 43 | +// Pattern 2: sparse streams |
| 44 | +// ------------------------- |
| 45 | +// This pattern is useful when the stream is sparse, i.e. with pauses between |
| 46 | +// rows. For instance, when streaming a real-time stream of events into CH. |
| 47 | +// Some rows are arriving one by one with delay, some batched. |
| 48 | +async fn sparse(client: &Client, mut rx: Receiver<u32>) -> Result<()> { |
| 49 | + let mut inserter = client |
| 50 | + .inserter(TABLE_NAME)? |
| 51 | + // Slice the stream into chunks (one `INSERT` per chunk) by time. |
| 52 | + // See documentation of `with_period` for details. |
| 53 | + .with_period(Some(Duration::from_millis(100))) |
| 54 | + // If you have a lot of parallel inserters (e.g. on multiple nodes), |
| 55 | + // it's reasonable to add some bias to the period to spread the load. |
| 56 | + .with_period_bias(0.1) |
| 57 | + // We also can use other limits. This is useful when the stream is |
| 58 | + // recovered after a long time of inactivity (e.g. restart of service or CH). |
| 59 | + .with_max_rows(500_000); |
| 60 | + |
| 61 | + loop { |
| 62 | + let no = match rx.try_recv() { |
| 63 | + Ok(event) => event, |
| 64 | + Err(TryRecvError::Empty) => { |
| 65 | + // If there is no available events, we should wait for the next one. |
| 66 | + // However, we don't know when the next event will arrive. |
| 67 | + // So, we should wait no longer than the left time of the current period. |
| 68 | + let time_left = inserter.time_left().expect("with_period is set"); |
| 69 | + |
| 70 | + // Note: `rx.recv()` must be cancel safe for your channel. |
| 71 | + // This is true for popular `tokio`, `futures-channel`, `flume` channels. |
| 72 | + match timeout(time_left, rx.recv()).await { |
| 73 | + Ok(Some(event)) => event, |
| 74 | + // The stream is closed. |
| 75 | + Ok(None) => break, |
| 76 | + // Timeout |
| 77 | + Err(_) => { |
| 78 | + // If the period is over, we allow the inserter to end the current `INSERT` |
| 79 | + // statement. If no `INSERT` is in progress, this call is no-op. |
| 80 | + inserter.commit().await?; |
| 81 | + continue; |
| 82 | + } |
| 83 | + } |
| 84 | + } |
| 85 | + Err(TryRecvError::Disconnected) => break, |
| 86 | + }; |
| 87 | + |
| 88 | + inserter.write(&MyRow { no })?; |
| 89 | + inserter.commit().await?; |
| 90 | + |
| 91 | + // You can use result of `commit()` to get the number of rows inserted. |
| 92 | + // It's useful not only for statistics but also to implement |
| 93 | + // at-least-once delivery by sending this info back to the sender, |
| 94 | + // where all unacknowledged events should be stored in this case. |
| 95 | + } |
| 96 | + |
| 97 | + inserter.end().await?; |
| 98 | + Ok(()) |
| 99 | +} |
| 100 | + |
| 101 | +fn spawn_data_generator(n: u32, sparse: bool) -> Receiver<u32> { |
| 102 | + let (tx, rx) = mpsc::channel(1000); |
| 103 | + |
| 104 | + tokio::spawn(async move { |
| 105 | + for no in 0..n { |
| 106 | + if sparse { |
| 107 | + let delay_ms = if no % 100 == 0 { 20 } else { 2 }; |
| 108 | + tokio::time::sleep(Duration::from_millis(delay_ms)).await; |
| 109 | + } |
| 110 | + |
| 111 | + tx.send(no).await.unwrap(); |
| 112 | + } |
| 113 | + }); |
| 114 | + |
| 115 | + rx |
| 116 | +} |
| 117 | + |
| 118 | +async fn fetch_batches(client: &Client) -> Result<Vec<(String, u64)>> { |
| 119 | + client |
| 120 | + .query( |
| 121 | + "SELECT toString(insertion_time), count() |
| 122 | + FROM ? |
| 123 | + GROUP BY insertion_time |
| 124 | + ORDER BY insertion_time", |
| 125 | + ) |
| 126 | + .bind(Identifier(TABLE_NAME)) |
| 127 | + .fetch_all::<(String, u64)>() |
| 128 | + .await |
| 129 | +} |
| 130 | + |
| 131 | +#[tokio::main] |
| 132 | +async fn main() -> Result<()> { |
| 133 | + let client = Client::default().with_url("http://localhost:8123"); |
| 134 | + |
| 135 | + client |
| 136 | + .query( |
| 137 | + "CREATE OR REPLACE TABLE ? ( |
| 138 | + no UInt32, |
| 139 | + insertion_time DateTime64(6) DEFAULT now64(6) |
| 140 | + ) |
| 141 | + ENGINE = MergeTree |
| 142 | + ORDER BY no", |
| 143 | + ) |
| 144 | + .bind(Identifier(TABLE_NAME)) |
| 145 | + .execute() |
| 146 | + .await?; |
| 147 | + |
| 148 | + println!("Pattern 1: dense streams"); |
| 149 | + let rx = spawn_data_generator(1000, false); |
| 150 | + dense(&client, rx).await?; |
| 151 | + |
| 152 | + // Prints 10 batches with 100 rows in each. |
| 153 | + for (insertion_time, count) in fetch_batches(&client).await? { |
| 154 | + println!("{}: {} rows", insertion_time, count); |
| 155 | + } |
| 156 | + |
| 157 | + client |
| 158 | + .query("TRUNCATE TABLE ?") |
| 159 | + .bind(Identifier(TABLE_NAME)) |
| 160 | + .execute() |
| 161 | + .await?; |
| 162 | + |
| 163 | + println!("\nPattern 2: sparse streams"); |
| 164 | + let rx = spawn_data_generator(1000, true); |
| 165 | + sparse(&client, rx).await?; |
| 166 | + |
| 167 | + // Prints batches every 100±10ms. |
| 168 | + for (insertion_time, count) in fetch_batches(&client).await? { |
| 169 | + println!("{}: {} rows", insertion_time, count); |
| 170 | + } |
| 171 | + |
| 172 | + Ok(()) |
| 173 | +} |
0 commit comments