Skip to content

Commit

Permalink
enhancement(socket sink): support unix datagram mode (vectordotdev#21762
Browse files Browse the repository at this point in the history
)

* enhancement(socket sink): support unix datagram mode

* 5269_support_unix_datagram_mode_in_socket_sink.enhancement.md: fix linter error

* sinks/util/{udp,unix}.rs: abstract out common logic into sinks/util/datagram.rs

* sinks/util/service/net/unix: use sinks/util/unix/UnixEither and move impls there

* remove problematic feature gates for 'sinks-socket' and 'sinks-statsd'

* use std type and spawn blocking

* basic_unix_datagram_sink: attempt to reduce flakiness

* socket sink: ignore unix_mode on macOS

---------

Co-authored-by: Pavlos Rontidis <[email protected]>
  • Loading branch information
jpovixwm and pront authored Dec 18, 2024
1 parent 1275f1a commit 7c6d0c9
Show file tree
Hide file tree
Showing 10 changed files with 426 additions and 135 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
The `socket` sink now supports the `unix_mode` configuration option that specifies the Unix socket mode to use. Valid values:

- `Stream` (default) - Stream-oriented (`SOCK_STREAM`)
- `Datagram` - Datagram-oriented (`SOCK_DGRAM`)

This option only applies when `mode = "unix"`, and is unavailable on macOS, where `SOCK_STREAM` is always used for Unix sockets.

authors: jpovixwm
2 changes: 0 additions & 2 deletions src/internal_events/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,12 @@ impl<E: std::fmt::Display> InternalEvent for UnixSocketError<'_, E> {
}
}

#[cfg(all(unix, any(feature = "sinks-socket", feature = "sinks-statsd")))]
#[derive(Debug)]
pub struct UnixSocketSendError<'a, E> {
pub(crate) error: &'a E,
pub path: &'a std::path::Path,
}

#[cfg(all(unix, any(feature = "sinks-socket", feature = "sinks-statsd")))]
impl<E: std::fmt::Display> InternalEvent for UnixSocketSendError<'_, E> {
fn emit(self) {
let reason = "Unix socket send error.";
Expand Down
74 changes: 60 additions & 14 deletions src/sinks/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,12 @@ impl SinkConfig for SocketSinkConfig {

#[cfg(test)]
mod test {
#[cfg(unix)]
use std::path::PathBuf;
use std::{
future::ready,
net::{SocketAddr, UdpSocket},
};
#[cfg(unix)]
use std::{os::unix::net::UnixDatagram, path::PathBuf};

use futures::stream::StreamExt;
use futures_util::stream;
Expand Down Expand Up @@ -196,14 +196,42 @@ mod test {
crate::test_util::test_generate_config::<SocketSinkConfig>();
}

async fn test_udp(addr: SocketAddr) {
let receiver = UdpSocket::bind(addr).unwrap();
enum DatagramSocket {
Udp(UdpSocket),
#[cfg(unix)]
Unix(UnixDatagram),
}

enum DatagramSocketAddr {
Udp(SocketAddr),
#[cfg(unix)]
Unix(PathBuf),
}

async fn test_datagram(datagram_addr: DatagramSocketAddr) {
let receiver = match &datagram_addr {
DatagramSocketAddr::Udp(addr) => DatagramSocket::Udp(UdpSocket::bind(addr).unwrap()),
#[cfg(unix)]
DatagramSocketAddr::Unix(path) => {
DatagramSocket::Unix(UnixDatagram::bind(path).unwrap())
}
};

let config = SocketSinkConfig {
mode: Mode::Udp(UdpMode {
config: UdpSinkConfig::from_address(addr.to_string()),
encoding: JsonSerializerConfig::default().into(),
}),
mode: match &datagram_addr {
DatagramSocketAddr::Udp(addr) => Mode::Udp(UdpMode {
config: UdpSinkConfig::from_address(addr.to_string()),
encoding: JsonSerializerConfig::default().into(),
}),
#[cfg(unix)]
DatagramSocketAddr::Unix(path) => Mode::Unix(UnixMode {
config: UnixSinkConfig::new(
path.to_path_buf(),
crate::sinks::util::service::net::UnixMode::Datagram,
),
encoding: (None::<FramingConfig>, JsonSerializerConfig::default()).into(),
}),
},
acknowledgements: Default::default(),
};

Expand All @@ -218,9 +246,13 @@ mod test {
.expect("Running sink failed");

let mut buf = [0; 256];
let (size, _src_addr) = receiver
.recv_from(&mut buf)
.expect("Did not receive message");
let size = match &receiver {
DatagramSocket::Udp(sock) => {
sock.recv_from(&mut buf).expect("Did not receive message").0
}
#[cfg(unix)]
DatagramSocket::Unix(sock) => sock.recv(&mut buf).expect("Did not receive message"),
};

let packet = String::from_utf8(buf[..size].to_vec()).expect("Invalid data received");
let data = serde_json::from_str::<Value>(&packet).expect("Invalid JSON received");
Expand All @@ -234,14 +266,25 @@ mod test {
async fn udp_ipv4() {
trace_init();

test_udp(next_addr()).await;
test_datagram(DatagramSocketAddr::Udp(next_addr())).await;
}

#[tokio::test]
async fn udp_ipv6() {
trace_init();

test_udp(next_addr_v6()).await;
test_datagram(DatagramSocketAddr::Udp(next_addr_v6())).await;
}

#[cfg(unix)]
#[tokio::test]
async fn unix_datagram() {
trace_init();

test_datagram(DatagramSocketAddr::Unix(temp_uds_path(
"unix_datagram_socket_test",
)))
.await;
}

#[tokio::test]
Expand Down Expand Up @@ -292,7 +335,10 @@ mod test {

let config = SocketSinkConfig {
mode: Mode::Unix(UnixMode {
config: UnixSinkConfig::new(out_path),
config: UnixSinkConfig::new(
out_path,
crate::sinks::util::service::net::UnixMode::Stream,
),
encoding: (None::<FramingConfig>, NativeJsonSerializerConfig).into(),
}),
acknowledgements: Default::default(),
Expand Down
106 changes: 106 additions & 0 deletions src/sinks/util/datagram.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#[cfg(unix)]
use std::path::PathBuf;

use bytes::BytesMut;
use futures::{stream::BoxStream, StreamExt};
use futures_util::stream::Peekable;
use tokio::net::UdpSocket;
#[cfg(unix)]
use tokio::net::UnixDatagram;
use tokio_util::codec::Encoder;
use vector_lib::internal_event::RegisterInternalEvent;
use vector_lib::internal_event::{ByteSize, BytesSent, InternalEventHandle};
use vector_lib::EstimatedJsonEncodedSizeOf;

use crate::{
codecs::Transformer,
event::{Event, EventStatus, Finalizable},
internal_events::{SocketEventsSent, SocketMode, SocketSendError, UdpSendIncompleteError},
};

#[cfg(unix)]
use crate::internal_events::{UnixSendIncompleteError, UnixSocketSendError};

pub enum DatagramSocket {
Udp(UdpSocket),
#[cfg(unix)]
Unix(UnixDatagram, PathBuf),
}

pub async fn send_datagrams<E: Encoder<Event, Error = vector_lib::codecs::encoding::Error>>(
input: &mut Peekable<BoxStream<'_, Event>>,
mut socket: DatagramSocket,
transformer: &Transformer,
encoder: &mut E,
bytes_sent: &<BytesSent as RegisterInternalEvent>::Handle,
) {
while let Some(mut event) = input.next().await {
let byte_size = event.estimated_json_encoded_size_of();

transformer.transform(&mut event);

let finalizers = event.take_finalizers();
let mut bytes = BytesMut::new();

// Errors are handled by `Encoder`.
if encoder.encode(event, &mut bytes).is_err() {
continue;
}

match send_datagram(&mut socket, &bytes).await {
Ok(()) => {
emit!(SocketEventsSent {
mode: match socket {
DatagramSocket::Udp(_) => SocketMode::Udp,
#[cfg(unix)]
DatagramSocket::Unix(..) => SocketMode::Unix,
},
count: 1,
byte_size,
});

bytes_sent.emit(ByteSize(bytes.len()));
finalizers.update_status(EventStatus::Delivered);
}
Err(error) => {
match socket {
DatagramSocket::Udp(_) => emit!(SocketSendError {
mode: SocketMode::Udp,
error
}),
#[cfg(unix)]
DatagramSocket::Unix(_, path) => {
emit!(UnixSocketSendError {
path: path.as_path(),
error: &error
})
}
};
finalizers.update_status(EventStatus::Errored);
return;
}
}
}
}

async fn send_datagram(socket: &mut DatagramSocket, buf: &[u8]) -> tokio::io::Result<()> {
let sent = match socket {
DatagramSocket::Udp(udp) => udp.send(buf).await,
#[cfg(unix)]
DatagramSocket::Unix(uds, _) => uds.send(buf).await,
}?;
if sent != buf.len() {
match socket {
DatagramSocket::Udp(_) => emit!(UdpSendIncompleteError {
data_size: buf.len(),
sent,
}),
#[cfg(unix)]
DatagramSocket::Unix(..) => emit!(UnixSendIncompleteError {
data_size: buf.len(),
sent,
}),
}
}
Ok(())
}
3 changes: 2 additions & 1 deletion src/sinks/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod batch;
pub mod buffer;
pub mod builder;
pub mod compressor;
pub mod datagram;
pub mod encoding;
pub mod http;
pub mod metadata;
Expand All @@ -23,7 +24,7 @@ pub mod tcp;
#[cfg(any(test, feature = "test-utils"))]
pub mod test;
pub mod udp;
#[cfg(all(any(feature = "sinks-socket", feature = "sinks-statsd"), unix))]
#[cfg(unix)]
pub mod unix;
pub mod uri;
pub mod zstd;
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/util/service/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{
};

#[cfg(unix)]
use std::path::PathBuf;
use {crate::sinks::util::unix::UnixEither, std::path::PathBuf};

use crate::{
internal_events::{
Expand All @@ -33,7 +33,7 @@ pub use self::unix::{UnixConnectorConfig, UnixMode};
use self::tcp::TcpConnector;
use self::udp::UdpConnector;
#[cfg(unix)]
use self::unix::{UnixConnector, UnixEither};
use self::unix::UnixConnector;

use futures_util::{future::BoxFuture, FutureExt};
use snafu::{ResultExt, Snafu};
Expand Down
36 changes: 3 additions & 33 deletions src/sinks/util/service/net/unix.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
use std::{
io,
os::fd::{AsFd, BorrowedFd},
path::{Path, PathBuf},
};
use std::path::{Path, PathBuf};

use snafu::ResultExt;
use tokio::{
io::AsyncWriteExt,
net::{UnixDatagram, UnixStream},
};
use tokio::net::{UnixDatagram, UnixStream};

use vector_lib::configurable::configurable_component;

use crate::net;
use crate::{net, sinks::util::unix::UnixEither};

use super::{net_error::*, ConnectorType, NetError, NetworkConnector};

Expand Down Expand Up @@ -74,29 +67,6 @@ impl UnixConnectorConfig {
}
}

pub(super) enum UnixEither {
Datagram(UnixDatagram),
Stream(UnixStream),
}

impl UnixEither {
pub(super) async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
match self {
Self::Datagram(datagram) => datagram.send(buf).await,
Self::Stream(stream) => stream.write_all(buf).await.map(|_| buf.len()),
}
}
}

impl AsFd for UnixEither {
fn as_fd(&self) -> BorrowedFd<'_> {
match self {
Self::Datagram(datagram) => datagram.as_fd(),
Self::Stream(stream) => stream.as_fd(),
}
}
}

#[derive(Clone)]
pub(super) struct UnixConnector {
path: PathBuf,
Expand Down
Loading

0 comments on commit 7c6d0c9

Please sign in to comment.