diff --git a/config/config.md b/config/config.md
index 92f192e6df6a..46a0aee1a78d 100644
--- a/config/config.md
+++ b/config/config.md
@@ -474,7 +474,7 @@
| `meta_client.metadata_cache_ttl` | String | `10m` | TTL of the metadata cache. |
| `meta_client.metadata_cache_tti` | String | `5m` | -- |
| `wal` | -- | -- | The WAL options. |
-| `wal.provider` | String | `raft_engine` | The provider of the WAL.
- `raft_engine`: the wal is stored in the local file system by raft-engine.
- `kafka`: it's remote wal that data is stored in Kafka. |
+| `wal.provider` | String | `raft_engine` | The provider of the WAL.
- `raft_engine`: the wal is stored in the local file system by raft-engine.
- `kafka`: it's remote wal that data is stored in Kafka.
- `noop`: it's a no-op WAL provider that does not store any WAL data.
**Notes: any unflushed data will be lost when the datanode is shutdown.** |
| `wal.dir` | String | Unset | The directory to store the WAL files.
**It's only used when the provider is `raft_engine`**. |
| `wal.file_size` | String | `128MB` | The size of the WAL segment file.
**It's only used when the provider is `raft_engine`**. |
| `wal.purge_threshold` | String | `1GB` | The threshold of the WAL size to trigger a purge.
**It's only used when the provider is `raft_engine`**. |
diff --git a/config/datanode.example.toml b/config/datanode.example.toml
index e283967680d5..82ee07bd8477 100644
--- a/config/datanode.example.toml
+++ b/config/datanode.example.toml
@@ -118,6 +118,7 @@ metadata_cache_tti = "5m"
## The provider of the WAL.
## - `raft_engine`: the wal is stored in the local file system by raft-engine.
## - `kafka`: it's remote wal that data is stored in Kafka.
+## - `noop`: it's a no-op WAL provider that does not store any WAL data.
**Notes: any unflushed data will be lost when the datanode is shutdown.**
provider = "raft_engine"
## The directory to store the WAL files.
diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs
index 3b9ec93353fa..0b77dec34176 100644
--- a/src/cmd/src/error.rs
+++ b/src/cmd/src/error.rs
@@ -316,6 +316,13 @@ pub enum Error {
location: Location,
source: standalone::error::Error,
},
+
+ #[snafu(display("Invalid WAL provider"))]
+ InvalidWalProvider {
+ #[snafu(implicit)]
+ location: Location,
+ source: common_wal::error::Error,
+ },
}
pub type Result = std::result::Result;
@@ -373,6 +380,7 @@ impl ErrorExt for Error {
}
Error::MetaClientInit { source, .. } => source.status_code(),
Error::SchemaNotFound { .. } => StatusCode::DatabaseNotFound,
+ Error::InvalidWalProvider { .. } => StatusCode::InvalidArguments,
}
}
diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs
index 2b1cc407ebf1..58602d0a3950 100644
--- a/src/cmd/src/standalone.rs
+++ b/src/cmd/src/standalone.rs
@@ -476,7 +476,11 @@ impl StartCommand {
.step(10)
.build(),
);
- let kafka_options = opts.wal.clone().into();
+ let kafka_options = opts
+ .wal
+ .clone()
+ .try_into()
+ .context(error::InvalidWalProviderSnafu)?;
let wal_options_allocator = build_wal_options_allocator(&kafka_options, kv_backend.clone())
.await
.context(error::BuildWalOptionsAllocatorSnafu)?;
diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs
index 2cfea614dd4b..7bc26e41b0b1 100644
--- a/src/common/wal/src/config.rs
+++ b/src/common/wal/src/config.rs
@@ -25,6 +25,7 @@ use crate::config::kafka::common::{
};
use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig};
use crate::config::raft_engine::RaftEngineConfig;
+use crate::error::{Error, UnsupportedWalProviderSnafu};
/// Wal configurations for metasrv.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)]
@@ -43,6 +44,7 @@ pub enum MetasrvWalConfig {
pub enum DatanodeWalConfig {
RaftEngine(RaftEngineConfig),
Kafka(DatanodeKafkaConfig),
+ Noop,
}
impl Default for DatanodeWalConfig {
@@ -51,11 +53,13 @@ impl Default for DatanodeWalConfig {
}
}
-impl From for MetasrvWalConfig {
- fn from(config: DatanodeWalConfig) -> Self {
+impl TryFrom for MetasrvWalConfig {
+ type Error = Error;
+
+ fn try_from(config: DatanodeWalConfig) -> Result {
match config {
- DatanodeWalConfig::RaftEngine(_) => Self::RaftEngine,
- DatanodeWalConfig::Kafka(config) => Self::Kafka(MetasrvKafkaConfig {
+ DatanodeWalConfig::RaftEngine(_) => Ok(Self::RaftEngine),
+ DatanodeWalConfig::Kafka(config) => Ok(Self::Kafka(MetasrvKafkaConfig {
connection: config.connection,
kafka_topic: config.kafka_topic,
auto_create_topics: config.auto_create_topics,
@@ -67,7 +71,11 @@ impl From for MetasrvWalConfig {
flush_trigger_size: DEFAULT_FLUSH_TRIGGER_SIZE,
// This field won't be used in standalone mode
checkpoint_trigger_size: DEFAULT_CHECKPOINT_TRIGGER_SIZE,
- }),
+ })),
+ DatanodeWalConfig::Noop => UnsupportedWalProviderSnafu {
+ provider: "noop".to_string(),
+ }
+ .fail(),
}
}
}
diff --git a/src/common/wal/src/error.rs b/src/common/wal/src/error.rs
index d4427c5ba204..316c93ec7edb 100644
--- a/src/common/wal/src/error.rs
+++ b/src/common/wal/src/error.rs
@@ -92,6 +92,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
+
+ #[snafu(display("Unsupported WAL provider: {}", provider))]
+ UnsupportedWalProvider {
+ provider: String,
+ #[snafu(implicit)]
+ location: Location,
+ },
}
pub type Result = std::result::Result;
diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs
index 5c9e86848f2e..ed8b41f0c7a1 100644
--- a/src/datanode/src/datanode.rs
+++ b/src/datanode/src/datanode.rs
@@ -34,6 +34,7 @@ use common_wal::config::raft_engine::RaftEngineConfig;
use file_engine::engine::FileRegionEngine;
use log_store::kafka::log_store::KafkaLogStore;
use log_store::kafka::{GlobalIndexCollector, default_index_file};
+use log_store::noop::log_store::NoopLogStore;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use meta_client::MetaClientRef;
use metric_engine::engine::MetricEngine;
@@ -561,6 +562,27 @@ impl DatanodeBuilder {
self.extension_range_provider_factory.take(),
);
+ builder.try_build().await.context(BuildMitoEngineSnafu)?
+ }
+ DatanodeWalConfig::Noop => {
+ let log_store = Arc::new(NoopLogStore);
+
+ let builder = MitoEngineBuilder::new(
+ &opts.storage.data_home,
+ config,
+ log_store,
+ object_store_manager,
+ schema_metadata_manager,
+ file_ref_manager,
+ partition_expr_fetcher.clone(),
+ plugins,
+ );
+
+ #[cfg(feature = "enterprise")]
+ let builder = builder.with_extension_range_provider_factory(
+ self.extension_range_provider_factory.take(),
+ );
+
builder.try_build().await.context(BuildMitoEngineSnafu)?
}
};
diff --git a/src/log-store/src/lib.rs b/src/log-store/src/lib.rs
index 1151654dbe70..ec8207d5eb3d 100644
--- a/src/log-store/src/lib.rs
+++ b/src/log-store/src/lib.rs
@@ -18,5 +18,6 @@
pub mod error;
pub mod kafka;
pub mod metrics;
+pub mod noop;
pub mod raft_engine;
pub mod test_util;
diff --git a/src/log-store/src/noop.rs b/src/log-store/src/noop.rs
new file mode 100644
index 000000000000..3263e1d30cf1
--- /dev/null
+++ b/src/log-store/src/noop.rs
@@ -0,0 +1,15 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+pub mod log_store;
diff --git a/src/log-store/src/noop/log_store.rs b/src/log-store/src/noop/log_store.rs
new file mode 100644
index 000000000000..f0f056514e8c
--- /dev/null
+++ b/src/log-store/src/noop/log_store.rs
@@ -0,0 +1,116 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::collections::HashMap;
+
+use futures::stream;
+use store_api::logstore::entry::{Entry, NaiveEntry};
+use store_api::logstore::provider::Provider;
+use store_api::logstore::{AppendBatchResponse, EntryId, LogStore, SendableEntryStream, WalIndex};
+use store_api::storage::RegionId;
+
+use crate::error::{Error, Result};
+
+#[derive(Debug, Clone, Copy)]
+pub struct NoopLogStore;
+
+#[async_trait::async_trait]
+impl LogStore for NoopLogStore {
+ type Error = Error;
+
+ async fn stop(&self) -> Result<()> {
+ Ok(())
+ }
+
+ async fn append_batch(&self, entries: Vec) -> Result {
+ let last_entry_ids = entries
+ .iter()
+ .map(|entry| (entry.region_id(), 0))
+ .collect::>();
+ Ok(AppendBatchResponse { last_entry_ids })
+ }
+
+ async fn read(
+ &self,
+ _provider: &Provider,
+ _entry_id: EntryId,
+ _index: Option,
+ ) -> Result> {
+ Ok(Box::pin(stream::empty()))
+ }
+
+ async fn create_namespace(&self, _ns: &Provider) -> Result<()> {
+ Ok(())
+ }
+
+ async fn delete_namespace(&self, _ns: &Provider) -> Result<()> {
+ Ok(())
+ }
+
+ async fn list_namespaces(&self) -> Result> {
+ Ok(vec![])
+ }
+
+ fn entry(
+ &self,
+ data: Vec,
+ entry_id: EntryId,
+ region_id: RegionId,
+ provider: &Provider,
+ ) -> Result {
+ Ok(Entry::Naive(NaiveEntry {
+ provider: provider.clone(),
+ region_id,
+ entry_id,
+ data,
+ }))
+ }
+
+ async fn obsolete(
+ &self,
+ _provider: &Provider,
+ _region_id: RegionId,
+ _entry_id: EntryId,
+ ) -> Result<()> {
+ Ok(())
+ }
+
+ fn latest_entry_id(&self, _provider: &Provider) -> Result {
+ Ok(0)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[tokio::test]
+ async fn test_append_batch() {
+ let log_store = NoopLogStore;
+ let entries = vec![Entry::Naive(NaiveEntry {
+ provider: Provider::noop_provider(),
+ region_id: RegionId::new(1, 1),
+ entry_id: 1,
+ data: vec![1],
+ })];
+
+ let last_entry_ids = log_store
+ .append_batch(entries)
+ .await
+ .unwrap()
+ .last_entry_ids;
+ assert_eq!(last_entry_ids.len(), 1);
+ assert_eq!(last_entry_ids[&(RegionId::new(1, 1))], 0);
+ }
+}
diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs
index 95aef2776a15..af2fa093b8c5 100644
--- a/src/mito2/src/region/opener.rs
+++ b/src/mito2/src/region/opener.rs
@@ -25,6 +25,7 @@ use common_wal::options::WalOptions;
use futures::StreamExt;
use futures::future::BoxFuture;
use log_store::kafka::log_store::KafkaLogStore;
+use log_store::noop::log_store::NoopLogStore;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use object_store::manager::ObjectStoreManagerRef;
use object_store::util::{join_dir, normalize_dir};
@@ -367,7 +368,8 @@ impl RegionOpener {
match wal_options {
WalOptions::RaftEngine => {
ensure!(
- TypeId::of::() == TypeId::of::(),
+ TypeId::of::() == TypeId::of::()
+ || TypeId::of::() == TypeId::of::(),
error::IncompatibleWalProviderChangeSnafu {
global: "`kafka`",
region: "`raft_engine`",
@@ -377,7 +379,8 @@ impl RegionOpener {
}
WalOptions::Kafka(options) => {
ensure!(
- TypeId::of::() == TypeId::of::(),
+ TypeId::of::() == TypeId::of::()
+ || TypeId::of::() == TypeId::of::(),
error::IncompatibleWalProviderChangeSnafu {
global: "`raft_engine`",
region: "`kafka`",
diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs
index 97cb65e4c7ed..5d7fdfa8edfa 100644
--- a/tests-integration/src/standalone.rs
+++ b/tests-integration/src/standalone.rs
@@ -202,7 +202,7 @@ impl GreptimeDbStandaloneBuilder {
.step(10)
.build(),
);
- let kafka_options = opts.wal.clone().into();
+ let kafka_options = opts.wal.clone().try_into().unwrap();
let wal_options_allocator = build_wal_options_allocator(&kafka_options, kv_backend.clone())
.await
.unwrap();
diff --git a/tests-integration/src/tests.rs b/tests-integration/src/tests.rs
index c0179fa78127..db5c00efff11 100644
--- a/tests-integration/src/tests.rs
+++ b/tests-integration/src/tests.rs
@@ -13,6 +13,7 @@
// limitations under the License.
mod instance_kafka_wal_test;
+mod instance_noop_wal_test;
mod instance_test;
mod promql_test;
mod reconcile_table;
diff --git a/tests-integration/src/tests/instance_noop_wal_test.rs b/tests-integration/src/tests/instance_noop_wal_test.rs
new file mode 100644
index 000000000000..1bc4870fa8c2
--- /dev/null
+++ b/tests-integration/src/tests/instance_noop_wal_test.rs
@@ -0,0 +1,150 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use client::OutputData;
+use common_recordbatch::util::collect_batches;
+use common_test_util::recordbatch::check_output_stream;
+use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
+
+use crate::cluster::GreptimeDbClusterBuilder;
+use crate::tests::test_util::{
+ MockInstance, MockInstanceBuilder, RebuildableMockInstance, TestContext, execute_sql,
+};
+
+pub(crate) async fn distributed_with_noop_wal() -> TestContext {
+ common_telemetry::init_default_ut_logging();
+ let test_name = uuid::Uuid::new_v4().to_string();
+ let builder = GreptimeDbClusterBuilder::new(&test_name)
+ .await
+ .with_datanode_wal_config(DatanodeWalConfig::Noop)
+ .with_metasrv_wal_config(MetasrvWalConfig::RaftEngine);
+ TestContext::new(MockInstanceBuilder::Distributed(builder)).await
+}
+
+#[tokio::test]
+async fn test_mito_engine() {
+ let mut test_context = distributed_with_noop_wal().await;
+ let frontend = test_context.frontend();
+ let sql = r#"create table demo(
+ host STRING,
+ cpu DOUBLE,
+ memory DOUBLE,
+ ts timestamp,
+ TIME INDEX(ts)
+ )"#;
+
+ let output = execute_sql(&frontend, sql).await.data;
+ assert!(matches!(output, OutputData::AffectedRows(0)));
+
+ let output = execute_sql(
+ &frontend,
+ "insert into demo(host, cpu, memory, ts) values ('host1', 1.1, 1024, 1655276557000)",
+ )
+ .await
+ .data;
+ assert!(matches!(output, OutputData::AffectedRows(1)));
+
+ let output = execute_sql(&frontend, "select * from demo order by ts")
+ .await
+ .data;
+ let expected = r#"+-------+-----+--------+---------------------+
+| host | cpu | memory | ts |
++-------+-----+--------+---------------------+
+| host1 | 1.1 | 1024.0 | 2022-06-15T07:02:37 |
++-------+-----+--------+---------------------+"#;
+ check_output_stream(output, expected).await;
+
+ test_context.rebuild().await;
+ let frontend = test_context.frontend();
+ let output = execute_sql(&frontend, "select * from demo order by ts")
+ .await
+ .data;
+ // Unflushed data should be lost.
+ let expected = r#"++
+++"#;
+ check_output_stream(output, expected).await;
+
+ let output = execute_sql(
+ &frontend,
+ "insert into demo(host, cpu, memory, ts) values ('host1', 1.1, 1024, 1655276557000)",
+ )
+ .await
+ .data;
+ assert!(matches!(output, OutputData::AffectedRows(1)));
+ execute_sql(&frontend, "admin flush_table('demo')").await;
+
+ test_context.rebuild().await;
+ let frontend = test_context.frontend();
+ let output = execute_sql(&frontend, "select * from demo order by ts")
+ .await
+ .data;
+ let expected = r#"+-------+-----+--------+---------------------+
+| host | cpu | memory | ts |
++-------+-----+--------+---------------------+
+| host1 | 1.1 | 1024.0 | 2022-06-15T07:02:37 |
++-------+-----+--------+---------------------+"#;
+ check_output_stream(output, expected).await;
+}
+
+#[tokio::test]
+async fn test_metric_engine() {
+ let mut test_context = distributed_with_noop_wal().await;
+ let frontend = test_context.frontend();
+
+ let sql = r#"CREATE TABLE phy (ts timestamp time index, val double) engine=metric with ("physical_metric_table" = "");"#;
+ let output = execute_sql(&frontend, sql).await.data;
+ assert!(matches!(output, OutputData::AffectedRows(0)));
+
+ let sql = r#"CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) engine = metric with ("on_physical_table" = "phy");"#;
+ let output = execute_sql(&frontend, sql).await.data;
+ assert!(matches!(output, OutputData::AffectedRows(0)));
+
+ // The logical table should be lost.
+ test_context.rebuild().await;
+ let frontend = test_context.frontend();
+ let output = execute_sql(&frontend, "select * from t1").await;
+ let err = unwrap_err(output.data).await;
+ // Should returns region not found error.
+ assert!(err.contains("not found"));
+
+ let sql = r#"CREATE TABLE t2 (ts timestamp time index, job string primary key, val double) engine = metric with ("on_physical_table" = "phy");"#;
+ let output = execute_sql(&frontend, sql).await.data;
+ assert!(matches!(output, OutputData::AffectedRows(0)));
+
+ execute_sql(
+ &frontend,
+ "INSERT INTO t2 VALUES ('job1', 0, 0), ('job2', 1, 1);",
+ )
+ .await;
+ execute_sql(&frontend, "admin flush_table('phy')").await;
+
+ test_context.rebuild().await;
+ let frontend = test_context.frontend();
+ let output = execute_sql(&frontend, "select * from t2 order by job").await;
+ let expected = r#"+------+-------------------------+-----+
+| job | ts | val |
++------+-------------------------+-----+
+| job1 | 1970-01-01T00:00:00 | 0.0 |
+| job2 | 1970-01-01T00:00:00.001 | 1.0 |
++------+-------------------------+-----+"#;
+ check_output_stream(output.data, expected).await;
+}
+
+async fn unwrap_err(output: OutputData) -> String {
+ let error = match output {
+ OutputData::Stream(stream) => collect_batches(stream).await.unwrap_err(),
+ _ => unreachable!(),
+ };
+ format!("{error:?}")
+}