Skip to content

Go: Add otel support #3932

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Jun 3, 2025
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
fd9e8bc
Go: Add otel support
tjzhang-BQ May 21, 2025
01a9bd8
adding standalone test cases
tjzhang-BQ May 23, 2025
5fe738d
adding command name detection and remaining tests & changelog
tjzhang-BQ May 23, 2025
00876ef
update to use the new package names
tjzhang-BQ May 23, 2025
9242d86
add None check to command getting
tjzhang-BQ May 23, 2025
dd6dffb
add sleep in not renitialize test to wait for span flush
tjzhang-BQ May 24, 2025
63f5e93
single out OpentelemetryTestSuite
tjzhang-BQ May 24, 2025
2c60520
update makefile
tjzhang-BQ May 24, 2025
00fb9db
address comments
tjzhang-BQ May 27, 2025
b38ffb7
add batch support
tjzhang-BQ May 28, 2025
bba0cd7
add cstring free function
tjzhang-BQ May 28, 2025
e3ff82f
address comments on concurrency and memory management
tjzhang-BQ May 28, 2025
77b6016
move otel to internal new package
tjzhang-BQ May 29, 2025
fed117b
update cstring error message
tjzhang-BQ May 29, 2025
589e795
comment addressing part 1
tjzhang-BQ May 30, 2025
9c37e4a
rust linter
tjzhang-BQ May 30, 2025
3a54d30
comment addressing part 2
tjzhang-BQ May 30, 2025
0fbc9bf
adding defer for drop span
tjzhang-BQ May 30, 2025
6103482
change back spanptr typing
tjzhang-BQ May 30, 2025
0ac3fe1
add error value for client creation
tjzhang-BQ May 30, 2025
120f201
revert change
tjzhang-BQ May 30, 2025
9be36de
add pinner for pointer passing of otel sub structs
tjzhang-BQ May 31, 2025
722be4d
rust doc fix
tjzhang-BQ May 31, 2025
887e8d1
addressing lib.rs comments
tjzhang-BQ Jun 2, 2025
471d2d9
adding Optional note to FlushIntervalMs example
tjzhang-BQ Jun 2, 2025
6248364
adding notes and sample% check
tjzhang-BQ Jun 2, 2025
ab10f2f
return error for repeated init() call
tjzhang-BQ Jun 2, 2025
6bd603a
update test case
tjzhang-BQ Jun 2, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ jobs:
make install-tools build
make -k unit-test integ-test
make -k pubsub-test
make -k opentelemetry-test

- name: Run Example Tests
working-directory: go
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
* Go: SRandMemberCount command added ([#4037](https://github.com/valkey-io/valkey-glide/pull/4037))
* Go: SPopCount command added ([#4026](https://github.com/valkey-io/valkey-glide/pull/4026))
* Java: OTEL fix for script ([#4065](https://github.com/valkey-io/valkey-glide/pull/4065))
* Go: Add Opentelemetry support of creating spans ([#3932](https://github.com/valkey-io/valkey-glide/pull/3932))

#### Breaking Changes

Expand Down
293 changes: 290 additions & 3 deletions ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ use glide_core::errors::RequestErrorType;
use glide_core::errors::{self, error_message};
use glide_core::request_type::RequestType;
use glide_core::scripts_container;
use glide_core::{
DEFAULT_FLUSH_SIGNAL_INTERVAL_MS, GlideOpenTelemetry, GlideOpenTelemetryConfigBuilder,
GlideOpenTelemetrySignalsExporter, GlideSpan,
};
use protobuf::Message;
use redis::ObjectType;
use redis::ScanStateRC;
Expand All @@ -24,6 +28,7 @@ use std::ffi::CStr;
use std::future::Future;
use std::slice::from_raw_parts;
use std::str;
use std::str::FromStr;
use std::sync::Arc;
use std::{
ffi::{CString, c_void},
Expand Down Expand Up @@ -1022,6 +1027,7 @@ fn valkey_value_to_command_response(value: Value) -> RedisResult<CommandResponse
/// * `route_bytes` is an optional array of bytes that will be parsed into a Protobuf `Routes` object. The array must be allocated by the caller and subsequently freed by the caller after this function returns.
/// * `route_bytes_len` is the number of bytes in `route_bytes`. It must also not be greater than the max value of a signed pointer-sized integer.
/// * `route_bytes_len` must be 0 if `route_bytes` is null.
/// * `span_ptr` is a valid pointer to [`Arc<GlideSpan>`], a span created by [`create_otel_span`] or `0`. The span must be valid until the command is finished.
/// * This function should only be called should with a `client_adapter_ptr` created by [`create_client`], before [`close_client`] was called with the pointer.
#[unsafe(no_mangle)]
pub unsafe extern "C" fn command(
Expand All @@ -1033,6 +1039,7 @@ pub unsafe extern "C" fn command(
args_len: *const c_ulong,
route_bytes: *const u8,
route_bytes_len: usize,
span_ptr: u64,
) -> *mut CommandResult {
let client_adapter = unsafe {
// we increment the strong count to ensure that the client is not dropped just because we turned it into an Arc.
Expand All @@ -1054,6 +1061,9 @@ pub unsafe extern "C" fn command(
for command_arg in arg_vec {
cmd.arg(command_arg);
}
if span_ptr != 0 {
cmd.set_span(unsafe { get_unsafe_span_from_ptr(Some(span_ptr)) });
}

let route = if !route_bytes.is_null() {
let r_bytes = unsafe { std::slice::from_raw_parts(route_bytes, route_bytes_len) };
Expand Down Expand Up @@ -1579,6 +1589,7 @@ pub unsafe extern "C" fn batch(
batch_ptr: *const BatchInfo,
raise_on_error: bool,
options_ptr: *const BatchOptionsInfo,
span_ptr: u64,
) -> *mut CommandResult {
let client_adapter = unsafe {
// we increment the strong count to ensure that the client is not dropped just because we turned it into an Arc.
Expand All @@ -1588,7 +1599,7 @@ pub unsafe extern "C" fn batch(
let mut client = client_adapter.core.client.clone();

// TODO handle panics
let pipeline = match unsafe { create_pipeline(batch_ptr) } {
let mut pipeline = match unsafe { create_pipeline(batch_ptr) } {
Ok(pipeline) => pipeline,
Err(err) => {
return unsafe {
Expand All @@ -1600,9 +1611,13 @@ pub unsafe extern "C" fn batch(
};
}
};
if span_ptr != 0 {
pipeline.set_pipeline_span(unsafe { get_unsafe_span_from_ptr(Some(span_ptr)) });
}
let child_span = create_child_span(pipeline.span().as_ref(), "send_batch");
let (routing, timeout, pipeline_retry_strategy) = unsafe { get_pipeline_options(options_ptr) };

client_adapter.execute_request(callback_index, async move {
let result = client_adapter.execute_request(callback_index, async move {
if pipeline.is_atomic() {
client
.send_transaction(&pipeline, routing, timeout, raise_on_error)
Expand All @@ -1618,7 +1633,12 @@ pub unsafe extern "C" fn batch(
)
.await
}
})
});

if let Ok(span) = child_span {
span.end();
}
result
}

/// Convert raw C string to a rust string.
Expand Down Expand Up @@ -1755,3 +1775,270 @@ pub(crate) unsafe fn get_pipeline_options(
PipelineRetryStrategy::new(info.retry_server_error, info.retry_connection_error),
)
}

/// Creates an OpenTelemetry span with the given name and returns a pointer to the span as u64.
///
#[unsafe(no_mangle)]
pub extern "C" fn create_otel_span(request_type: RequestType) -> u64 {
let cmd = match request_type.get_command() {
Some(cmd) => cmd,
None => return 0, // Return 0 if no command available
};
let cmd_bytes = match cmd.command() {
Some(bytes) => bytes,
None => return 0, // Return 0 if no command bytes available
};
let command_name = match std::str::from_utf8(cmd_bytes.as_slice()) {
Ok(name) => name,
Err(_) => return 0, // Return 0 if command bytes are not valid UTF-8
};

let span = GlideOpenTelemetry::new_span(command_name);
let arc = Arc::new(span);
let ptr = Arc::into_raw(arc);
ptr as u64
}

/// Creates an OpenTelemetry span with a fixed name "batch" and returns a pointer to the span as u64.
///
#[unsafe(no_mangle)]
pub extern "C" fn create_batch_otel_span() -> u64 {
let command_name = "Batch";

let span = GlideOpenTelemetry::new_span(command_name);
let arc = Arc::new(span);
let ptr = Arc::into_raw(arc);
ptr as u64
}

/// Drops an OpenTelemetry span given its pointer as u64.
///
/// # Safety
/// * `span_ptr` must be a valid pointer to a [`Arc<GlideSpan>`] span created by [`create_otel_span`] or `0`.
#[unsafe(no_mangle)]
pub unsafe extern "C" fn drop_otel_span(span_ptr: u64) {
if span_ptr == 0 {
return;
}
unsafe {
Arc::from_raw(span_ptr as *const GlideSpan);
}
}

/// Configuration for OpenTelemetry integration in the Node.js client.
///
/// This struct allows you to configure how telemetry data (traces and metrics) is exported to an OpenTelemetry collector.
/// - `traces`: Optional configuration for exporting trace data. If `None`, trace data will not be exported.
/// - `metrics`: Optional configuration for exporting metrics data. If `None`, metrics data will not be exported.
/// - `flush_interval_ms`: Optional interval in milliseconds between consecutive exports of telemetry data. If `None`, a default value will be used.
///
/// At least one of traces or metrics must be provided.
#[repr(C)]
#[derive(Clone, Debug)]
pub struct OpenTelemetryConfig {
/// Configuration for exporting trace data. Only valid if has_traces is true.
pub traces: *const OpenTelemetryTracesConfig,
/// Configuration for exporting metrics data. Only valid if has_metrics is true.
pub metrics: *const OpenTelemetryMetricsConfig,
/// Whether flush interval is specified
pub has_flush_interval_ms: bool,
/// Interval in milliseconds between consecutive exports of telemetry data. Only valid if has_flush_interval_ms is true.
pub flush_interval_ms: i64,
}

/// Configuration for exporting OpenTelemetry traces.
///
/// - `endpoint`: The endpoint to which trace data will be exported. Expected format:
/// - For gRPC: `grpc://host:port`
/// - For HTTP: `http://host:port` or `https://host:port`
/// - For file exporter: `file:///absolute/path/to/folder/file.json`
/// - `has_sample_percentage`: Whether sample percentage is specified
/// - `sample_percentage`: The percentage of requests to sample and create a span for, used to measure command duration. Only valid if has_sample_percentage is true.
#[repr(C)]
#[derive(Clone, Debug)]
pub struct OpenTelemetryTracesConfig {
/// The endpoint to which trace data will be exported, `null` if not specified.
pub endpoint: *const c_char,
/// Whether sample percentage is specified
pub has_sample_percentage: bool,
/// The percentage of requests to sample and create a span for, used to measure command duration. Only valid if has_sample_percentage is true.
pub sample_percentage: u32,
}

/// Configuration for exporting OpenTelemetry metrics.
///
/// - `endpoint`: The endpoint to which metrics data will be exported. Expected format:
/// - For gRPC: `grpc://host:port`
/// - For HTTP: `http://host:port` or `https://host:port`
/// - For file exporter: `file:///absolute/path/to/folder/file.json`
#[repr(C)]
#[derive(Clone, Debug)]
pub struct OpenTelemetryMetricsConfig {
/// The endpoint to which metrics data will be exported, `null` if not specified.
pub endpoint: *const c_char,
}

/// Initializes OpenTelemetry with the given configuration.
///
/// # Safety
/// * `open_telemetry_config` and its underlying traces and metrics pointers must be valid until the function returns.
#[unsafe(no_mangle)]
pub unsafe extern "C" fn init_open_telemetry(
open_telemetry_config: *const OpenTelemetryConfig,
) -> *const c_char {
// At least one of traces or metrics must be provided
if unsafe { (*open_telemetry_config).traces.is_null() }
&& unsafe { (*open_telemetry_config).metrics.is_null() }
{
let error_msg =
"At least one of traces or metrics must be provided for OpenTelemetry configuration";
return CString::new(error_msg)
.unwrap_or_else(|_| CString::new("Couldn't convert error message to C string").unwrap())
.into_raw();
}

let mut config = GlideOpenTelemetryConfigBuilder::default();

// Initialize open telemetry traces exporter
if !unsafe { (*open_telemetry_config).traces.is_null() } {
let endpoint = unsafe { CStr::from_ptr((*(*open_telemetry_config).traces).endpoint) }
.to_string_lossy()
.to_string();
match GlideOpenTelemetrySignalsExporter::from_str(&endpoint) {
Ok(exporter) => {
let sample_percentage =
if unsafe { (*(*open_telemetry_config).traces).has_sample_percentage } {
Some(unsafe { (*(*open_telemetry_config).traces).sample_percentage })
} else {
None
};
config = config.with_trace_exporter(exporter, sample_percentage);
}
Err(e) => {
let error_msg = format!("Invalid traces exporter configuration: {}", e);
return CString::new(error_msg)
.unwrap_or_else(|_| {
CString::new("Couldn't convert error message to C string").unwrap()
})
.into_raw();
}
}
}

// Initialize open telemetry metrics exporter
if !unsafe { (*open_telemetry_config).metrics.is_null() } {
let endpoint = unsafe { CStr::from_ptr((*(*open_telemetry_config).metrics).endpoint) }
.to_string_lossy()
.to_string();
match GlideOpenTelemetrySignalsExporter::from_str(&endpoint) {
Ok(exporter) => {
config = config.with_metrics_exporter(exporter);
}
Err(e) => {
let error_msg = format!("Invalid metrics exporter configuration: {}", e);
return CString::new(error_msg)
.unwrap_or_else(|_| {
CString::new("Couldn't convert error message to C string").unwrap()
})
.into_raw();
}
}
}

let flush_interval_ms = if unsafe { (*open_telemetry_config).has_flush_interval_ms } {
unsafe { (*open_telemetry_config).flush_interval_ms }
} else {
DEFAULT_FLUSH_SIGNAL_INTERVAL_MS as i64
};

if flush_interval_ms <= 0 {
let error_msg = format!(
"InvalidInput: flushIntervalMs must be a positive integer (got: {})",
flush_interval_ms
);
return CString::new(error_msg)
.unwrap_or_else(|_| CString::new("Couldn't convert error message to C string").unwrap())
.into_raw();
}

config = config.with_flush_interval(std::time::Duration::from_millis(flush_interval_ms as u64));

// Initialize OpenTelemetry synchronously
match glide_core::client::get_or_init_runtime() {
Ok(glide_runtime) => {
match glide_runtime
.runtime
.block_on(async { GlideOpenTelemetry::initialise(config.build()) })
{
Ok(_) => std::ptr::null(), // Success
Err(e) => {
let error_msg = format!("Failed to initialize OpenTelemetry: {}", e);
CString::new(error_msg)
.unwrap_or_else(|_| {
CString::new("Couldn't convert error message to C string").unwrap()
})
.into_raw()
}
}
}
Err(e) => CString::new(e)
.unwrap_or_else(|_| CString::new("Couldn't convert error message to C string").unwrap())
.into_raw(),
}
}

/// Frees a C string.
///
/// # Safety
/// * `s` must be a valid pointer to a C string or `null`.
#[unsafe(no_mangle)]
pub unsafe extern "C" fn free_c_string(s: *mut c_char) {
unsafe {
if s.is_null() {
return;
}
drop(CString::from_raw(s));
};
}

/// This function converts a raw pointer to a GlideSpan into a safe Rust reference.
/// It handles the unsafe pointer operations internally, incrementing the reference count
/// to ensure the span remains valid while in use.
///
/// # Safety
///
/// This function is marked as unsafe because it dereferences a raw pointer. The caller
/// must ensure that:
/// * The pointer is valid and points to a properly allocated GlideSpan
/// * The pointer is properly aligned
/// * The data pointed to is not modified while the returned reference is in use
/// * The pointer is not used after the referenced data is dropped
///
/// # Arguments
///
/// * `command_span` - An optional raw pointer (as u64) to a GlideSpan
///
/// # Returns
///
/// * `Some(GlideSpan)` - A cloned GlideSpan if the pointer is valid
/// * `None` - If the pointer is None
unsafe fn get_unsafe_span_from_ptr(command_span: Option<u64>) -> Option<GlideSpan> {
command_span.map(|command_span| unsafe {
Arc::increment_strong_count(command_span as *const GlideSpan);
(*Arc::from_raw(command_span as *const GlideSpan)).clone()
})
}

/// Creates a child span for telemetry if telemetry is enabled
fn create_child_span(span: Option<&GlideSpan>, name: &str) -> Result<GlideSpan, String> {
// Early return if no parent span is provided
let parent_span = span.ok_or_else(|| "No parent span provided".to_string())?;

match parent_span.add_span(name) {
Ok(child_span) => Ok(child_span),
Err(error_msg) => Err(format!(
"Opentelemetry failed to create child span with name `{}`. Error: {:?}",
name, error_msg
)),
}
}
1 change: 1 addition & 0 deletions ffi/tests/ffi_client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ fn execute_command(
args_len_ptr,
route_bytes,
route_len,
0,
)
};
if command_res_ptr.is_null() {
Expand Down
Loading
Loading