Skip to content

Commit a6d6203

Browse files
fix(profiling): fix race condition when generating upload files sequencing
1 parent 548d46a commit a6d6203

File tree

3 files changed

+120
-3
lines changed

3 files changed

+120
-3
lines changed

ddtrace/internal/datadog/profiling/dd_wrapper/include/uploader.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ class Uploader
2222
static inline ddog_CancellationToken cancel{ .inner = nullptr };
2323
static inline std::atomic<uint64_t> upload_seq{ 0 };
2424
std::string output_filename;
25+
uint64_t instance_upload_seq; // Captured sequence number for this instance
2526
ddog_prof_ProfileExporter ddog_exporter{ .inner = nullptr };
2627

2728
bool export_to_file(ddog_prof_EncodedProfile* encoded);

ddtrace/internal/datadog/profiling/dd_wrapper/src/uploader.cpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@ Datadog::Uploader::Uploader(std::string_view _output_filename, ddog_prof_Profile
1717
: output_filename{ _output_filename }
1818
, ddog_exporter{ _ddog_exporter }
1919
{
20-
// Increment the upload sequence number every time we build an uploader.
21-
// Upoloaders are use-once-and-destroy.
22-
upload_seq++;
20+
// Atomically increment and capture the upload sequence number for this instance.
21+
// This prevents race conditions where multiple threads creating Uploaders concurrently
22+
// could result in duplicate sequence numbers in filenames.
23+
// Uploaders are use-once-and-destroy.
24+
instance_upload_seq = ++upload_seq;
2325
}
2426

2527
bool
@@ -28,6 +30,7 @@ Datadog::Uploader::export_to_file(ddog_prof_EncodedProfile* encoded)
2830
// Write the profile to a file using the following format for filename:
2931
// <output_filename>.<process_id>.<sequence_number>
3032
std::ostringstream oss;
33+
// oss << output_filename << "." << getpid() << "." << instance_upload_seq;
3134
oss << output_filename << "." << getpid() << "." << upload_seq;
3235
std::string filename = oss.str();
3336
std::ofstream out(filename, std::ios::binary);

tests/profiling_v2/exporter/test_ddup.py

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,3 +62,116 @@ def test_tags_propagated():
6262
# Profiler could add tags, so check that tags is a superset of config.tags
6363
for k, v in config.tags.items():
6464
assert tags[k] == v
65+
66+
67+
@pytest.mark.skipif(not ddup.is_available, reason="ddup not available")
68+
def test_concurrent_upload_sequence_numbers(tmp_path):
69+
"""Test that concurrent uploads produce unique, sequential file sequence numbers.
70+
71+
This is a regression test for a race condition in the C++ Uploader class where
72+
multiple threads creating Uploader objects concurrently could result in files
73+
with duplicate or non-sequential sequence numbers.
74+
75+
The bug occurred because:
76+
1. Thread A creates Uploader -> upload_seq becomes 1
77+
2. Thread B creates Uploader -> upload_seq becomes 2
78+
3. Thread B exports file -> uses current upload_seq value (2)
79+
4. Thread A exports file -> uses current upload_seq value (2, not 1!)
80+
81+
This resulted in duplicate sequence numbers or missing sequences.
82+
83+
The fix captures the sequence number atomically in the Uploader constructor
84+
(instance_upload_seq = ++upload_seq) so each instance has its own unique number.
85+
"""
86+
import glob
87+
import os
88+
import threading
89+
import time
90+
91+
from ddtrace.profiling.collector.threading import ThreadingLockCollector
92+
93+
test_name = "test_concurrent_upload"
94+
pprof_prefix = str(tmp_path / test_name)
95+
output_filename = pprof_prefix + "." + str(os.getpid())
96+
97+
# Configure ddup
98+
ddup.config(
99+
env="test",
100+
service=test_name,
101+
version="test_version",
102+
output_filename=pprof_prefix,
103+
)
104+
ddup.start()
105+
106+
num_threads = 10
107+
num_uploads_per_thread = 5
108+
barrier = threading.Barrier(num_threads) # Synchronize thread start for maximum contention
109+
110+
def upload_worker():
111+
"""Each thread collects samples and uploads multiple times."""
112+
# Wait for all threads to be ready
113+
barrier.wait()
114+
115+
for _ in range(num_uploads_per_thread):
116+
# Collect some samples (using lock collector to generate profile data)
117+
with ThreadingLockCollector(capture_pct=100):
118+
lock = threading.Lock()
119+
with lock:
120+
time.sleep(0.001) # Small amount of work
121+
122+
# Upload immediately to create race condition
123+
ddup.upload()
124+
125+
# Start threads
126+
threads = []
127+
for _ in range(num_threads):
128+
t = threading.Thread(target=upload_worker)
129+
threads.append(t)
130+
t.start()
131+
132+
# Wait for all threads to complete
133+
for t in threads:
134+
t.join()
135+
136+
# Analyze the created files
137+
files = glob.glob(output_filename + ".*")
138+
139+
# Extract sequence numbers from filenames
140+
# Format: <prefix>.<pid>.<seq>
141+
sequence_numbers = []
142+
for f in files:
143+
seq = int(f.rsplit(".", 1)[-1])
144+
sequence_numbers.append(seq)
145+
146+
sequence_numbers.sort()
147+
148+
print(f"\nCreated {len(files)} files")
149+
print(f"Sequence numbers: {sequence_numbers}")
150+
151+
# Check for issues
152+
expected_count = num_threads * num_uploads_per_thread
153+
154+
# Issue 1: Missing files (duplicates overwrite each other)
155+
assert len(files) == expected_count, (
156+
f"Expected {expected_count} files, but found {len(files)}. "
157+
f"This suggests duplicate sequence numbers caused file overwrites!"
158+
)
159+
160+
# Issue 2: Duplicate sequence numbers
161+
assert len(sequence_numbers) == len(
162+
set(sequence_numbers)
163+
), f"Duplicate sequence numbers found! Sequences: {sequence_numbers}"
164+
165+
# Issue 3: Gaps in sequence numbers
166+
# Sequences should be continuous (no gaps)
167+
for i in range(len(sequence_numbers) - 1):
168+
assert (
169+
sequence_numbers[i + 1] == sequence_numbers[i] + 1
170+
), f"Gap in sequence numbers: {sequence_numbers[i]} -> {sequence_numbers[i + 1]}"
171+
172+
# Cleanup
173+
for f in files:
174+
try:
175+
os.remove(f)
176+
except Exception as e:
177+
print(f"Error removing file {f}: {e}")

0 commit comments

Comments
 (0)