Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(http source): Digest access authentication #22101

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,16 @@ pub enum HttpError {
CallRequest { source: hyper::Error },
#[snafu(display("Failed to build HTTP request: {}", source))]
BuildRequest { source: http::Error },
#[snafu(display("Expected 401 with Digest Auth"))]
DigestAuthExpectation,
}

impl HttpError {
pub const fn is_retriable(&self) -> bool {
match self {
HttpError::BuildRequest { .. } | HttpError::MakeProxyConnector { .. } => false,
HttpError::BuildRequest { .. }
| HttpError::MakeProxyConnector { .. }
| HttpError::DigestAuthExpectation => false,
HttpError::CallRequest { .. }
| HttpError::BuildTlsConnector { .. }
| HttpError::MakeHttpsConnector { .. } => true,
Expand Down Expand Up @@ -295,6 +299,20 @@ pub enum Auth {
/// The bearer authentication token.
token: SensitiveString,
},
/// Digest authentication.
///
/// requires a round trip to the server to get the challenge and then send the response
Digest {
/// The digest authentication username.
#[configurable(metadata(docs::examples = "${USERNAME}"))]
#[configurable(metadata(docs::examples = "username"))]
user: String,

/// The digest authentication password.
#[configurable(metadata(docs::examples = "${PASSWORD}"))]
#[configurable(metadata(docs::examples = "password"))]
password: SensitiveString,
},
}

pub trait MaybeAuth: Sized {
Expand Down Expand Up @@ -333,6 +351,10 @@ impl Auth {
Ok(auth) => map.typed_insert(auth),
Err(error) => error!(message = "Invalid bearer token.", token = %token, %error),
},
Auth::Digest { user, password } => {
let auth = Authorization::basic(user.as_str(), password.inner());
map.typed_insert(auth);
}
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/sinks/databend/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ impl SinkConfig for DatabendConfig {
Some(Auth::Bearer { .. }) => {
return Err("Bearer authentication is not supported currently".into());
}
Some(Auth::Digest { .. }) => {
return Err("Digest authentication is not supported currently".into());
}
None => {}
}
if let Some(database) = &self.database {
Expand Down
4 changes: 4 additions & 0 deletions src/sinks/prometheus/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,10 @@ fn authorized<T: HttpBody>(req: &Request<T>, auth: &Option<Auth>) -> bool {
Auth::Bearer { token } => {
HeaderValue::from_str(format!("Bearer {}", token.inner()).as_str())
}
Auth::Digest { .. } => {
error!("Digest authentication is not supported.");
return false;
}
};

if let Ok(encoded_credentials) = encoded_credentials {
Expand Down
1 change: 1 addition & 0 deletions src/sinks/websocket/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@ mod tests {
user: _user,
password: _password,
} => { /* Not needed for tests at the moment */ }
Auth::Digest { .. } => { /* Not needed for tests at the moment */ }
}
}
Ok(res)
Expand Down
104 changes: 99 additions & 5 deletions src/sources/util/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ use bytes::Bytes;
use futures_util::{stream, FutureExt, StreamExt, TryFutureExt};
use http::{response::Parts, Uri};
use hyper::{Body, Request};
use md5::Digest;
use std::time::Duration;
use std::{collections::HashMap, future::ready};
use tokio_stream::wrappers::IntervalStream;
use vector_lib::json_size::JsonSize;
use vector_lib::sensitive_string::SensitiveString;

use crate::{
http::{Auth, HttpClient},
Expand Down Expand Up @@ -136,13 +138,18 @@ pub(crate) async fn call<
// proxy and tls settings.
let client =
HttpClient::new(inputs.tls.clone(), &inputs.proxy).expect("Building HTTP client failed");
let headers = inputs.headers.clone();
let content_type = inputs.content_type.clone();
let mut stream = IntervalStream::new(tokio::time::interval(inputs.interval))
.take_until(inputs.shutdown)
.map(move |_| stream::iter(inputs.urls.clone()))
.flatten()
.map(move |url| {
let client = client.clone();
let endpoint = url.to_string();
let uri = url.clone();
let content_type_inner = content_type.clone();
let auth_inner = inputs.auth.clone();

let context_builder = context_builder.clone();
let mut context = context_builder.build(&url);
Expand All @@ -158,25 +165,112 @@ pub(crate) async fn call<
};

// add user specified headers
for (header, values) in &inputs.headers {
for (header, values) in &headers {
for value in values {
builder = builder.header(header, value);
}
}

// set ACCEPT header if not user specified
if !inputs.headers.contains_key(http::header::ACCEPT.as_str()) {
builder = builder.header(http::header::ACCEPT, &inputs.content_type);
if !headers.contains_key(http::header::ACCEPT.as_str()) {
builder = builder.header(http::header::ACCEPT, &content_type_inner);
}

// building an empty request should be infallible
let mut request = builder.body(Body::empty()).expect("error creating request");

if let Some(auth) = &inputs.auth {
let mut is_digest = false;
let mut username = "".to_string();
let mut user_password = SensitiveString::default();
if let Some(auth) = auth_inner {
auth.apply(&mut request);
is_digest = match auth {
Auth::Digest { user, password } => {
username = user.clone();
user_password = password.clone();
true
},
_ => false
};
}

tokio::time::timeout(inputs.timeout, client.send(request))
.then({
let headers_value = headers.clone();
let username_inner = username.clone();
let user_password_inner = user_password.clone();
move |result| async move {
// make another round trip using digest authentication
if !is_digest {
result
} else {
// deduce we have the correct response type: 401 Unauthorized
let response = match result {
Ok(x) => x,
Err(_) => return result,
};
let (status, response_headers) = match response {
Ok(x) => {
let code = x.status();
let x_headers = x.headers().clone();
(code, x_headers)
},
Err(x) => return Ok(Err(x.into()))
};
if status != 401 {
return Ok(Err(crate::http::HttpError::DigestAuthExpectation))
}
let parts = match response_headers.get("www-authenticate") {
Some(header_value) => match header_value.to_str() {
Ok(value) => value,
Err(_) => return Ok(Err(crate::http::HttpError::DigestAuthExpectation)),
},
None => return Ok(Err(crate::http::HttpError::DigestAuthExpectation)),
};
let parts: Vec<&str> = parts.split(",").collect();
let mut realm = "";
let mut nonce = "";
for part in parts {
if part.contains("realm") {
realm = part.split("=").collect::<Vec<&str>>()[1].trim_matches('"');
}
if part.contains("nonce") {
nonce = part.split("=").collect::<Vec<&str>>()[1].trim_matches('"');
}
}
let ha1 = format!("{:x}", md5::Md5::digest(format!("{}:{}:{}", username_inner, realm, user_password_inner.inner())));
let ha2 = format!("{:x}", md5::Md5::digest(format!("GET:{}", uri.path())));
let cnonce = "00000001"; // TODO: use rng for client nonce
let nonce_count = "00000001";
let response_digest = format!("{:x}", md5::Md5::digest(format!("{}:{}:{}:{}:{}:{}", ha1, nonce, nonce_count, cnonce, "auth", ha2)));
let auth_header = format!(
"Digest username=\"{}\", realm=\"{}\", nonce=\"{}\", uri=\"{}\", response=\"{}\", cnonce=\"{}\", nc=\"{}\", qop=\"auth\"",
username_inner,
realm,
nonce,
uri.path(),
response_digest,
cnonce,
nonce_count
);
// make another trip but this time with auth digest impl'd

let mut builder = Request::get(uri);
for (header, values) in &headers_value {
for value in values {
builder = builder.header(header, value);
}
}
if !headers_value.contains_key(http::header::ACCEPT.as_str()) {
builder = builder.header(http::header::ACCEPT, &content_type_inner);
}
builder = builder.header(http::header::AUTHORIZATION, auth_header);

let request = builder.body(Body::empty()).expect("error creating request");
let auth_response = client.send(request).await;
Ok(auth_response)
}
}
})
.then(move |result| async move {
match result {
Ok(Ok(response)) => Ok(response),
Expand Down