@@ -62,3 +62,116 @@ def test_tags_propagated():
6262 # Profiler could add tags, so check that tags is a superset of config.tags
6363 for k , v in config .tags .items ():
6464 assert tags [k ] == v
65+
66+
67+ @pytest .mark .skipif (not ddup .is_available , reason = "ddup not available" )
68+ def test_concurrent_upload_sequence_numbers (tmp_path ):
69+ """Test that concurrent uploads produce unique, sequential file sequence numbers.
70+
71+ This is a regression test for a race condition in the C++ Uploader class where
72+ multiple threads creating Uploader objects concurrently could result in files
73+ with duplicate or non-sequential sequence numbers.
74+
75+ The bug occurred because:
76+ 1. Thread A creates Uploader -> upload_seq becomes 1
77+ 2. Thread B creates Uploader -> upload_seq becomes 2
78+ 3. Thread B exports file -> uses current upload_seq value (2)
79+ 4. Thread A exports file -> uses current upload_seq value (2, not 1!)
80+
81+ This resulted in duplicate sequence numbers or missing sequences.
82+
83+ The fix captures the sequence number atomically in the Uploader constructor
84+ (instance_upload_seq = ++upload_seq) so each instance has its own unique number.
85+ """
86+ import glob
87+ import os
88+ import threading
89+ import time
90+
91+ from ddtrace .profiling .collector .threading import ThreadingLockCollector
92+
93+ test_name = "test_concurrent_upload"
94+ pprof_prefix = str (tmp_path / test_name )
95+ output_filename = pprof_prefix + "." + str (os .getpid ())
96+
97+ # Configure ddup
98+ ddup .config (
99+ env = "test" ,
100+ service = test_name ,
101+ version = "test_version" ,
102+ output_filename = pprof_prefix ,
103+ )
104+ ddup .start ()
105+
106+ num_threads = 10
107+ num_uploads_per_thread = 5
108+ barrier = threading .Barrier (num_threads ) # Synchronize thread start for maximum contention
109+
110+ def upload_worker ():
111+ """Each thread collects samples and uploads multiple times."""
112+ # Wait for all threads to be ready
113+ barrier .wait ()
114+
115+ for _ in range (num_uploads_per_thread ):
116+ # Collect some samples (using lock collector to generate profile data)
117+ with ThreadingLockCollector (capture_pct = 100 ):
118+ lock = threading .Lock ()
119+ with lock :
120+ time .sleep (0.001 ) # Small amount of work
121+
122+ # Upload immediately to create race condition
123+ ddup .upload ()
124+
125+ # Start threads
126+ threads = []
127+ for _ in range (num_threads ):
128+ t = threading .Thread (target = upload_worker )
129+ threads .append (t )
130+ t .start ()
131+
132+ # Wait for all threads to complete
133+ for t in threads :
134+ t .join ()
135+
136+ # Analyze the created files
137+ files = glob .glob (output_filename + ".*" )
138+
139+ # Extract sequence numbers from filenames
140+ # Format: <prefix>.<pid>.<seq>
141+ sequence_numbers = []
142+ for f in files :
143+ seq = int (f .rsplit ("." , 1 )[- 1 ])
144+ sequence_numbers .append (seq )
145+
146+ sequence_numbers .sort ()
147+
148+ print (f"\n Created { len (files )} files" )
149+ print (f"Sequence numbers: { sequence_numbers } " )
150+
151+ # Check for issues
152+ expected_count = num_threads * num_uploads_per_thread
153+
154+ # Issue 1: Missing files (duplicates overwrite each other)
155+ assert len (files ) == expected_count , (
156+ f"Expected { expected_count } files, but found { len (files )} . "
157+ f"This suggests duplicate sequence numbers caused file overwrites!"
158+ )
159+
160+ # Issue 2: Duplicate sequence numbers
161+ assert len (sequence_numbers ) == len (set (sequence_numbers )), (
162+ f"Duplicate sequence numbers found! Sequences: { sequence_numbers } "
163+ )
164+
165+ # Issue 3: Gaps in sequence numbers
166+ # Sequences should be continuous (no gaps)
167+ for i in range (len (sequence_numbers ) - 1 ):
168+ assert sequence_numbers [i + 1 ] == sequence_numbers [i ] + 1 , (
169+ f"Gap in sequence numbers: { sequence_numbers [i ]} -> { sequence_numbers [i + 1 ]} "
170+ )
171+
172+ # Cleanup
173+ for f in files :
174+ try :
175+ os .remove (f )
176+ except Exception as e :
177+ print (f"Error removing file { f } : { e } " )
0 commit comments