Skip to content

Commit e5d9bb7

Browse files
committed
feat(clickhouse sink): add DNS auto-resolution for load balancing
1 parent 2e128dc commit e5d9bb7

File tree

5 files changed

+257
-41
lines changed

5 files changed

+257
-41
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
The ClickHouse sink now supports DNS auto-resolution for load balancing, allowing automatic discovery and rotation of ClickHouse cluster nodes through DNS lookups. This enables better high availability and load distribution when connecting to ClickHouse clusters with multiple endpoints.
2+
3+
authors: sebinsunny

src/sinks/clickhouse/config.rs

Lines changed: 157 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
11
//! Configuration for the `Clickhouse` sink.
22
3-
use std::fmt;
4-
3+
use futures_util::TryFutureExt;
54
use http::{Request, StatusCode, Uri};
65
use hyper::Body;
6+
use std::fmt;
77
use vector_lib::codecs::{JsonSerializerConfig, NewlineDelimitedEncoderConfig, encoding::Framer};
88

99
use super::{
1010
request_builder::ClickhouseRequestBuilder,
11-
service::{ClickhouseRetryLogic, ClickhouseServiceRequestBuilder},
11+
service::{ClickhouseHealthLogic, ClickhouseRetryLogic, ClickhouseServiceRequestBuilder},
1212
sink::{ClickhouseSink, PartitionKey},
1313
};
1414
use crate::{
15+
dns,
1516
http::{Auth, HttpClient, MaybeAuth},
1617
sinks::{
1718
prelude::*,
@@ -61,6 +62,13 @@ pub struct ClickhouseConfig {
6162
#[configurable(metadata(docs::examples = "http://localhost:8123"))]
6263
pub endpoint: UriSerde,
6364

65+
/// Automatically resolve hostnames to all available IP addresses.
66+
///
67+
/// When enabled, the hostname in the endpoint will be resolved to all its IP addresses,
68+
/// and Vector will load balance across all resolved IPs.
69+
#[serde(default)]
70+
pub auto_resolve_dns: bool,
71+
6472
/// The table that data is inserted into.
6573
#[configurable(metadata(docs::examples = "mytable"))]
6674
pub table: Template,
@@ -176,36 +184,133 @@ pub struct AsyncInsertSettingsConfig {
176184

177185
impl_generate_config_from_default!(ClickhouseConfig);
178186

179-
#[async_trait::async_trait]
180-
#[typetag::serde(name = "clickhouse")]
181-
impl SinkConfig for ClickhouseConfig {
182-
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
183-
let endpoint = self.endpoint.with_default_parts().uri;
184-
185-
let auth = self.auth.choose_one(&self.endpoint.auth)?;
187+
#[derive(Debug, Clone)]
188+
pub struct ClickhouseCommon {
189+
pub endpoint: Uri,
190+
pub auth: Option<Auth>,
191+
pub tls_settings: TlsSettings,
192+
service_request_builder: ClickhouseServiceRequestBuilder,
193+
}
186194

187-
let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
195+
impl ClickhouseCommon {
196+
pub async fn parse_config(
197+
config: &ClickhouseConfig,
198+
endpoint_str: &str,
199+
) -> crate::Result<Self> {
200+
let endpoint = endpoint_str.parse::<UriSerde>()?;
201+
let endpoint_uri = endpoint.with_default_parts().uri;
188202

189-
let client = HttpClient::new(tls_settings, &cx.proxy)?;
203+
let auth = config.auth.choose_one(&endpoint.auth)?;
204+
let tls_settings = TlsSettings::from_options(config.tls.as_ref())?;
190205

191-
let clickhouse_service_request_builder = ClickhouseServiceRequestBuilder {
206+
let service_request_builder = ClickhouseServiceRequestBuilder {
192207
auth: auth.clone(),
193-
endpoint: endpoint.clone(),
194-
skip_unknown_fields: self.skip_unknown_fields,
195-
date_time_best_effort: self.date_time_best_effort,
196-
insert_random_shard: self.insert_random_shard,
197-
compression: self.compression,
198-
query_settings: self.query_settings,
208+
endpoint: endpoint_uri.clone(),
209+
skip_unknown_fields: config.skip_unknown_fields,
210+
date_time_best_effort: config.date_time_best_effort,
211+
insert_random_shard: config.insert_random_shard,
212+
compression: config.compression,
213+
query_settings: config.query_settings,
199214
};
200215

201-
let service: HttpService<ClickhouseServiceRequestBuilder, PartitionKey> =
202-
HttpService::new(client.clone(), clickhouse_service_request_builder);
216+
Ok(Self {
217+
endpoint: endpoint_uri,
218+
auth,
219+
tls_settings,
220+
service_request_builder,
221+
})
222+
}
223+
224+
pub async fn parse_many(config: &ClickhouseConfig) -> crate::Result<Vec<Self>> {
225+
let endpoint_str = config.endpoint.with_default_parts().uri.to_string();
226+
227+
let all_endpoints = if config.auto_resolve_dns {
228+
Self::resolve_endpoint_to_ips(&endpoint_str).await?
229+
} else {
230+
vec![endpoint_str]
231+
};
232+
233+
if all_endpoints.is_empty() {
234+
return Err("No endpoints available after DNS resolution".into());
235+
}
236+
237+
let mut commons = Vec::new();
238+
for endpoint_str in all_endpoints {
239+
commons.push(Self::parse_config(config, &endpoint_str).await?);
240+
}
241+
Ok(commons)
242+
}
243+
244+
async fn resolve_endpoint_to_ips(endpoint_str: &str) -> crate::Result<Vec<String>> {
245+
let uri: Uri = endpoint_str.parse()?;
246+
247+
let host = uri.host().ok_or("URI must contain a host")?;
248+
249+
// Resolve hostname to all IP addresses
250+
let ips: Vec<_> = dns::Resolver.lookup_ip(host.to_string()).await?.collect();
251+
252+
if ips.is_empty() {
253+
return Err("No IP addresses found for hostname".into());
254+
}
255+
256+
let mut resolved_endpoints = Vec::new();
257+
for ip in ips {
258+
let new_endpoint = uri.to_string().replace(host, &ip.to_string());
259+
resolved_endpoints.push(new_endpoint);
260+
}
261+
262+
Ok(resolved_endpoints)
263+
}
264+
265+
pub(super) const fn get_service_request_builder(&self) -> &ClickhouseServiceRequestBuilder {
266+
&self.service_request_builder
267+
}
268+
269+
pub async fn healthcheck(self, client: HttpClient) -> crate::Result<()> {
270+
let uri = get_healthcheck_uri(&self.endpoint);
271+
let mut request = Request::get(uri).body(Body::empty()).unwrap();
272+
273+
if let Some(auth) = self.auth {
274+
auth.apply(&mut request);
275+
}
276+
277+
let response = client.send(request).await?;
278+
279+
match response.status() {
280+
StatusCode::OK => Ok(()),
281+
status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
282+
}
283+
}
284+
}
285+
286+
#[async_trait::async_trait]
287+
#[typetag::serde(name = "clickhouse")]
288+
impl SinkConfig for ClickhouseConfig {
289+
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
290+
let commons = ClickhouseCommon::parse_many(self).await?;
291+
let common = commons[0].clone();
292+
293+
let client = HttpClient::new(common.tls_settings.clone(), &cx.proxy)?;
203294

204295
let request_limits = self.request.into_settings();
205296

206-
let service = ServiceBuilder::new()
207-
.settings(request_limits, ClickhouseRetryLogic::default())
208-
.service(service);
297+
let services = commons
298+
.iter()
299+
.map(|common| {
300+
let endpoint = common.endpoint.to_string();
301+
let service: HttpService<ClickhouseServiceRequestBuilder, PartitionKey> =
302+
HttpService::new(client.clone(), common.get_service_request_builder().clone());
303+
(endpoint, service)
304+
})
305+
.collect::<Vec<_>>();
306+
307+
let service = request_limits.distributed_service(
308+
ClickhouseRetryLogic::default(),
309+
services,
310+
Default::default(),
311+
ClickhouseHealthLogic,
312+
1,
313+
);
209314

210315
let batch_settings = self.batch.into_batcher_settings()?;
211316

@@ -235,7 +340,13 @@ impl SinkConfig for ClickhouseConfig {
235340
request_builder,
236341
);
237342

238-
let healthcheck = Box::pin(healthcheck(client, endpoint, auth));
343+
let healthcheck = futures::future::select_ok(
344+
commons
345+
.into_iter()
346+
.map(move |common| common.healthcheck(client.clone()).boxed()),
347+
)
348+
.map_ok(|((), _)| ())
349+
.boxed();
239350

240351
Ok((VectorSink::from_event_streamsink(sink), healthcheck))
241352
}
@@ -258,22 +369,6 @@ fn get_healthcheck_uri(endpoint: &Uri) -> String {
258369
uri
259370
}
260371

261-
async fn healthcheck(client: HttpClient, endpoint: Uri, auth: Option<Auth>) -> crate::Result<()> {
262-
let uri = get_healthcheck_uri(&endpoint);
263-
let mut request = Request::get(uri).body(Body::empty()).unwrap();
264-
265-
if let Some(auth) = auth {
266-
auth.apply(&mut request);
267-
}
268-
269-
let response = client.send(request).await?;
270-
271-
match response.status() {
272-
StatusCode::OK => Ok(()),
273-
status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
274-
}
275-
}
276-
277372
#[cfg(test)]
278373
mod tests {
279374
use super::*;
@@ -298,4 +393,25 @@ mod tests {
298393
"http://localhost:8123/path/?query=SELECT%201"
299394
);
300395
}
396+
397+
#[tokio::test]
398+
async fn test_auto_resolve_dns_enabled() {
399+
let config = ClickhouseConfig {
400+
endpoint: "http://localhost:8123".parse().unwrap(),
401+
auto_resolve_dns: true, // Enabled
402+
table: "test_table".try_into().unwrap(),
403+
..Default::default()
404+
};
405+
406+
let commons = ClickhouseCommon::parse_many(&config).await.unwrap();
407+
assert!(!commons.is_empty());
408+
409+
// All resolved endpoints should be IP addresses, not hostnames
410+
for common in &commons {
411+
let endpoint_str = common.endpoint.to_string();
412+
assert!(!endpoint_str.contains("localhost"));
413+
// Should contain either IPv4 or IPv6 addresses
414+
assert!(endpoint_str.contains("127.0.0.1") || endpoint_str.contains("::1"));
415+
}
416+
}
301417
}

src/sinks/clickhouse/integration_tests.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,65 @@ async fn templated_table() {
392392
}
393393
}
394394

395+
#[tokio::test]
396+
async fn insert_events_unix_timestamps_using_dns_resolution() {
397+
trace_init();
398+
399+
let table = random_table_name();
400+
let host = clickhouse_address();
401+
402+
let mut batch = BatchConfig::default();
403+
batch.max_events = Some(1);
404+
405+
let config = ClickhouseConfig {
406+
endpoint: host.parse().unwrap(),
407+
auto_resolve_dns: true,
408+
table: table.clone().try_into().unwrap(),
409+
compression: Compression::None,
410+
encoding: Transformer::new(None, None, Some(TimestampFormat::Unix)).unwrap(),
411+
batch,
412+
request: TowerRequestConfig {
413+
retry_attempts: 1,
414+
..Default::default()
415+
},
416+
..Default::default()
417+
};
418+
let client = ClickhouseClient::new(host);
419+
client
420+
.create_table(
421+
&table,
422+
"host String, timestamp DateTime('UTC'), message String",
423+
)
424+
.await;
425+
426+
let (sink, _hc) = config.build(SinkContext::default()).await.unwrap();
427+
428+
let (mut input_event, _receiver) = make_event();
429+
430+
run_and_assert_sink_compliance(sink, stream::once(ready(input_event.clone())), &SINK_TAGS)
431+
.await;
432+
433+
let output = client.select_all(&table).await;
434+
assert_eq!(1, output.rows);
435+
436+
let exp_event = input_event.as_mut_log();
437+
exp_event.insert(
438+
(PathPrefix::Event, log_schema().timestamp_key().unwrap()),
439+
format!(
440+
"{}",
441+
exp_event
442+
.get_timestamp()
443+
.unwrap()
444+
.as_timestamp()
445+
.unwrap()
446+
.format("%Y-%m-%d %H:%M:%S")
447+
),
448+
);
449+
450+
let expected = serde_json::to_value(exp_event).unwrap();
451+
assert_eq!(expected, output.data[0]);
452+
}
453+
395454
fn make_event() -> (Event, BatchStatusReceiver) {
396455
let (batch, receiver) = BatchNotifier::new_with_receiver();
397456
let mut event = LogEvent::from("raw log line").with_batch_notifier(&batch);

src/sinks/clickhouse/service.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use crate::{
1717
util::{
1818
http::{HttpRequest, HttpResponse, HttpRetryLogic, HttpServiceRequestBuilder},
1919
retries::RetryAction,
20+
service::HealthLogic,
2021
},
2122
},
2223
};
@@ -328,3 +329,30 @@ mod tests {
328329
.unwrap_err();
329330
}
330331
}
332+
333+
#[derive(Clone)]
334+
pub struct ClickhouseHealthLogic;
335+
336+
impl HealthLogic for ClickhouseHealthLogic {
337+
type Error = crate::Error;
338+
type Response = HttpResponse;
339+
340+
fn is_healthy(&self, response: &Result<Self::Response, Self::Error>) -> Option<bool> {
341+
match response {
342+
Ok(response) => {
343+
let status = response.http_response.status();
344+
if status.is_success() {
345+
Some(true)
346+
} else if status.is_server_error() {
347+
Some(false)
348+
} else {
349+
None
350+
}
351+
}
352+
Err(error) => match error.downcast_ref::<HttpError>() {
353+
Some(HttpError::CallRequest { .. }) => Some(false),
354+
_ => None,
355+
},
356+
}
357+
}
358+
}

website/cue/reference/components/sinks/generated/clickhouse.cue

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,16 @@ generated: components: sinks: clickhouse: configuration: {
204204
}
205205
}
206206
}
207+
auto_resolve_dns: {
208+
description: """
209+
Automatically resolve hostnames to all available IP addresses.
210+
211+
When enabled, the hostname in the endpoint will be resolved to all its IP addresses,
212+
and Vector will load balance across all resolved IPs.
213+
"""
214+
required: false
215+
type: bool: default: false
216+
}
207217
batch: {
208218
description: "Event batching behavior."
209219
required: false

0 commit comments

Comments
 (0)