Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class Uploader
static inline ddog_CancellationToken cancel{ .inner = nullptr };
static inline std::atomic<uint64_t> upload_seq{ 0 };
std::string output_filename;
uint64_t instance_upload_seq; // Captured sequence number for this instance
ddog_prof_ProfileExporter ddog_exporter{ .inner = nullptr };

bool export_to_file(ddog_prof_EncodedProfile* encoded);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ Datadog::Uploader::Uploader(std::string_view _output_filename, ddog_prof_Profile
: output_filename{ _output_filename }
, ddog_exporter{ _ddog_exporter }
{
// Increment the upload sequence number every time we build an uploader.
// Upoloaders are use-once-and-destroy.
upload_seq++;
// Atomically increment and capture the upload sequence number for this instance.
// This prevents race conditions where multiple threads creating Uploaders concurrently
// could result in duplicate sequence numbers in filenames.
// Uploaders are use-once-and-destroy.
instance_upload_seq = ++upload_seq;
}

bool
Expand All @@ -28,6 +30,7 @@ Datadog::Uploader::export_to_file(ddog_prof_EncodedProfile* encoded)
// Write the profile to a file using the following format for filename:
// <output_filename>.<process_id>.<sequence_number>
std::ostringstream oss;
// oss << output_filename << "." << getpid() << "." << instance_upload_seq;
oss << output_filename << "." << getpid() << "." << upload_seq;
std::string filename = oss.str();
std::ofstream out(filename, std::ios::binary);
Expand Down
113 changes: 113 additions & 0 deletions tests/profiling_v2/exporter/test_ddup.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,116 @@ def test_tags_propagated():
# Profiler could add tags, so check that tags is a superset of config.tags
for k, v in config.tags.items():
assert tags[k] == v


@pytest.mark.skipif(not ddup.is_available, reason="ddup not available")
def test_concurrent_upload_sequence_numbers(tmp_path):
"""Test that concurrent uploads produce unique, sequential file sequence numbers.

This is a regression test for a race condition in the C++ Uploader class where
multiple threads creating Uploader objects concurrently could result in files
with duplicate or non-sequential sequence numbers.

The bug occurred because:
1. Thread A creates Uploader -> upload_seq becomes 1
2. Thread B creates Uploader -> upload_seq becomes 2
3. Thread B exports file -> uses current upload_seq value (2)
4. Thread A exports file -> uses current upload_seq value (2, not 1!)

This resulted in duplicate sequence numbers or missing sequences.

The fix captures the sequence number atomically in the Uploader constructor
(instance_upload_seq = ++upload_seq) so each instance has its own unique number.
"""
import glob
import os
import threading
import time

from ddtrace.profiling.collector.threading import ThreadingLockCollector

test_name = "test_concurrent_upload"
pprof_prefix = str(tmp_path / test_name)
output_filename = pprof_prefix + "." + str(os.getpid())

# Configure ddup
ddup.config(
env="test",
service=test_name,
version="test_version",
output_filename=pprof_prefix,
)
ddup.start()

num_threads = 10
num_uploads_per_thread = 5
barrier = threading.Barrier(num_threads) # Synchronize thread start for maximum contention

def upload_worker():
"""Each thread collects samples and uploads multiple times."""
# Wait for all threads to be ready
barrier.wait()

for _ in range(num_uploads_per_thread):
# Collect some samples (using lock collector to generate profile data)
with ThreadingLockCollector(capture_pct=100):
lock = threading.Lock()
with lock:
time.sleep(0.001) # Small amount of work

# Upload immediately to create race condition
ddup.upload()

# Start threads
threads = []
for _ in range(num_threads):
t = threading.Thread(target=upload_worker)
threads.append(t)
t.start()

# Wait for all threads to complete
for t in threads:
t.join()

# Analyze the created files
files = glob.glob(output_filename + ".*")

# Extract sequence numbers from filenames
# Format: <prefix>.<pid>.<seq>
sequence_numbers = []
for f in files:
seq = int(f.rsplit(".", 1)[-1])
sequence_numbers.append(seq)

sequence_numbers.sort()

print(f"\nCreated {len(files)} files")
print(f"Sequence numbers: {sequence_numbers}")

# Check for issues
expected_count = num_threads * num_uploads_per_thread

# Issue 1: Missing files (duplicates overwrite each other)
assert len(files) == expected_count, (
f"Expected {expected_count} files, but found {len(files)}. "
f"This suggests duplicate sequence numbers caused file overwrites!"
)

# Issue 2: Duplicate sequence numbers
assert len(sequence_numbers) == len(
set(sequence_numbers)
), f"Duplicate sequence numbers found! Sequences: {sequence_numbers}"

# Issue 3: Gaps in sequence numbers
# Sequences should be continuous (no gaps)
for i in range(len(sequence_numbers) - 1):
assert (
sequence_numbers[i + 1] == sequence_numbers[i] + 1
), f"Gap in sequence numbers: {sequence_numbers[i]} -> {sequence_numbers[i + 1]}"

# Cleanup
for f in files:
try:
os.remove(f)
except Exception as e:
print(f"Error removing file {f}: {e}")
Loading