Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ replay_pid*
!dd-java-agent/benchmark/releases/*.jar
**/errors/*.log

# Fuzz testing logs #
fuzz-logs/

# Magic for local JMC built
/vendor/jmc-libs

Expand Down
3 changes: 3 additions & 0 deletions dd-java-agent/agent-debugger/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ dependencies {
implementation libs.dogstatsd
implementation libs.moshi

// Antithesis SDK for assertions and property testing - bundled in tracer JAR
implementation group: 'com.antithesis', name: 'sdk', version: '1.4.5'

testImplementation libs.asm.util
testImplementation libs.bundles.junit5
testImplementation libs.junit.jupiter.params
Expand Down
3 changes: 3 additions & 0 deletions dd-java-agent/agent-profiling/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ dependencies {
api libs.slf4j
api project(':internal-api')

// Antithesis SDK for assertions and property testing - bundled in tracer JAR
implementation group: 'com.antithesis', name: 'sdk', version: '1.4.5'

api project(':dd-java-agent:agent-profiling:profiling-ddprof')
api project(':dd-java-agent:agent-profiling:profiling-uploader')
api project(':dd-java-agent:agent-profiling:profiling-controller')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ dependencies {
api project(':dd-java-agent:agent-profiling:profiling-controller')
api project(':dd-java-agent:agent-profiling:profiling-controller-jfr')

// Antithesis SDK for assertions and property testing - bundled in tracer JAR
implementation group: 'com.antithesis', name: 'sdk', version: '1.4.5'

testImplementation libs.bundles.junit5
testImplementation libs.bundles.mockito
testImplementation files(project(':dd-java-agent:agent-profiling:profiling-controller-jfr').sourceSets.test.output)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ dependencies {
api project(':components:environment')
api project(':dd-java-agent:agent-profiling:profiling-utils')

// Antithesis SDK for assertions and property testing - bundled in tracer JAR
implementation group: 'com.antithesis', name: 'sdk', version: '1.4.5'

testImplementation libs.bundles.junit5
testImplementation libs.guava
testImplementation libs.bundles.mockito
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ private void startProfilingRecording() {
ProfilerFlareLogger.getInstance().log("Shutdown in progress, cannot start profiling");
} else {
ProfilerFlareLogger.getInstance().log("Failed to start profiling", t);

throw t instanceof RuntimeException ? (RuntimeException) t : new RuntimeException(t);
}
}
Expand Down
3 changes: 3 additions & 0 deletions dd-java-agent/agent-profiling/profiling-ddprof/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ dependencies {

implementation libs.slf4j

// Antithesis SDK for assertions and property testing - bundled in tracer JAR
implementation group: 'com.antithesis', name: 'sdk', version: '1.4.5'

testImplementation libs.bundles.jmc
testImplementation libs.bundles.junit5
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* It is currently assumed that this class can be initialised early so that Datadog profiler's
* thread filter captures all tracing activity, which means it must not be modified to depend on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ dependencies {
implementation libs.lz4
implementation libs.aircompressor

// Antithesis SDK for assertions and property testing - bundled in tracer JAR
implementation group: 'com.antithesis', name: 'sdk', version: '1.4.5'

testImplementation project(':dd-java-agent:agent-profiling:profiling-testing')
testImplementation project(':utils:test-utils')
testImplementation libs.bundles.junit5
Expand Down
5 changes: 5 additions & 0 deletions dd-trace-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ dependencies {

implementation group: 'com.google.re2j', name: 're2j', version: '1.7'

// Antithesis SDK for assertions and property testing - bundled in tracer JAR
implementation group: 'com.antithesis', name: 'sdk', version: '1.4.5'

compileOnly group: 'com.github.spotbugs', name: 'spotbugs-annotations', version: '4.2.0'

// We have autoservices defined in test subtree, looks like we need this to be able to properly rebuild this
testAnnotationProcessor libs.autoservice.processor
testCompileOnly libs.autoservice.annotation
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package datadog.trace.common.writer;

import com.antithesis.sdk.Assert;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import datadog.communication.monitor.Monitoring;
import datadog.communication.monitor.Recording;
import datadog.communication.serialization.ByteBufferConsumer;
Expand Down Expand Up @@ -107,14 +110,39 @@ public void accept(int messageCount, ByteBuffer buffer) {
Payload payload = newPayload(messageCount, buffer);
final int sizeInBytes = payload.sizeInBytes();
healthMetrics.onSerialize(sizeInBytes);

// Antithesis: Track all send attempts
ObjectNode sendAttemptDetails = JsonNodeFactory.instance.objectNode();
sendAttemptDetails.put("trace_count", messageCount);
sendAttemptDetails.put("payload_size_bytes", sizeInBytes);
sendAttemptDetails.put("dropped_traces_in_payload", payload.droppedTraces());
sendAttemptDetails.put("dropped_spans_in_payload", payload.droppedSpans());
Assert.sometimes(true, "trace_payloads_being_sent", sendAttemptDetails);

RemoteApi.Response response = api.sendSerializedTraces(payload);
mapper.reset();

if (response.success()) {
// Antithesis: Track successful sends
ObjectNode successDetails = JsonNodeFactory.instance.objectNode();
successDetails.put("decision", "sent_success");
successDetails.put("trace_count", messageCount);
successDetails.put("payload_size_bytes", sizeInBytes);
successDetails.put("http_status", response.status().orElse(-1));
Assert.sometimes(true, "traces_sent_successfully", successDetails);
if (log.isDebugEnabled()) {
log.debug("Successfully sent {} traces to the API", messageCount);
}
healthMetrics.onSend(messageCount, sizeInBytes, response);
} else {
// Antithesis: Track failed sends
ObjectNode failedDetails = JsonNodeFactory.instance.objectNode();
failedDetails.put("decision", "dropped_send_failed");
failedDetails.put("trace_count", messageCount);
failedDetails.put("payload_size_bytes", sizeInBytes);
failedDetails.put("http_status", response.status().orElse(-1));
failedDetails.put("has_exception", response.exception() != null);
Assert.sometimes(true, "traces_failed_to_send", failedDetails);
if (log.isDebugEnabled()) {
log.debug(
"Failed to send {} traces of size {} bytes to the API", messageCount, sizeInBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import static datadog.trace.api.sampling.PrioritySampling.UNSET;
import static java.util.concurrent.TimeUnit.MINUTES;

import com.antithesis.sdk.Assert;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import datadog.trace.core.DDSpan;
import datadog.trace.core.monitor.HealthMetrics;
import datadog.trace.relocate.api.RatelimitedLogger;
Expand Down Expand Up @@ -68,6 +71,11 @@ protected RemoteWriter(
@Override
public void write(final List<DDSpan> trace) {
if (closed) {
// Antithesis: Track traces dropped during shutdown
ObjectNode shutdownDetails = JsonNodeFactory.instance.objectNode();
shutdownDetails.put("decision", "dropped_shutdown");
shutdownDetails.put("span_count", trace.size());
Assert.sometimes(true, "trace_dropped_writer_closed", shutdownDetails);
// We can't add events after shutdown otherwise it will never complete shutting down.
log.debug("Dropped due to shutdown: {}", trace);
handleDroppedTrace(trace);
Expand All @@ -80,17 +88,38 @@ public void write(final List<DDSpan> trace) {
final int samplingPriority = root.samplingPriority();
switch (traceProcessingWorker.publish(root, samplingPriority, trace)) {
case ENQUEUED_FOR_SERIALIZATION:
// Antithesis: Track traces enqueued for sending
ObjectNode enqueuedDetails = JsonNodeFactory.instance.objectNode();
enqueuedDetails.put("decision", "enqueued");
enqueuedDetails.put("trace_id", root.getTraceId().toString());
enqueuedDetails.put("span_count", trace.size());
enqueuedDetails.put("sampling_priority", samplingPriority);
Assert.sometimes(true, "trace_enqueued_for_send", enqueuedDetails);
log.debug("Enqueued for serialization: {}", trace);
healthMetrics.onPublish(trace, samplingPriority);
break;
case ENQUEUED_FOR_SINGLE_SPAN_SAMPLING:
log.debug("Enqueued for single span sampling: {}", trace);
break;
case DROPPED_BY_POLICY:
// Antithesis: Track traces dropped by policy
ObjectNode policyDetails = JsonNodeFactory.instance.objectNode();
policyDetails.put("decision", "dropped_policy");
policyDetails.put("trace_id", root.getTraceId().toString());
policyDetails.put("span_count", trace.size());
policyDetails.put("sampling_priority", samplingPriority);
Assert.sometimes(true, "trace_dropped_by_policy", policyDetails);
log.debug("Dropped by the policy: {}", trace);
handleDroppedTrace(trace);
break;
case DROPPED_BUFFER_OVERFLOW:
// Antithesis: Track traces dropped due to buffer overflow
ObjectNode overflowDetails = JsonNodeFactory.instance.objectNode();
overflowDetails.put("decision", "dropped_buffer_overflow");
overflowDetails.put("trace_id", root.getTraceId().toString());
overflowDetails.put("span_count", trace.size());
overflowDetails.put("sampling_priority", samplingPriority);
Assert.sometimes(true, "trace_dropped_buffer_overflow", overflowDetails);
if (log.isDebugEnabled()) {
log.debug("Dropped due to a buffer overflow: {}", trace);
} else {
Expand Down
18 changes: 18 additions & 0 deletions dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

import com.antithesis.sdk.Assert;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;

import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
import datadog.communication.ddagent.ExternalAgentLauncher;
import datadog.communication.ddagent.SharedCommunicationObjects;
Expand Down Expand Up @@ -1246,8 +1250,22 @@ void write(final List<DDSpan> trace) {
spanToSample.forceKeep(forceKeep);
boolean published = forceKeep || traceCollector.sample(spanToSample);
if (published) {
// Antithesis: Track traces accepted by sampling
ObjectNode acceptedDetails = JsonNodeFactory.instance.objectNode();
acceptedDetails.put("decision", "accepted");
acceptedDetails.put("trace_id", writtenTrace.get(0).getTraceId().toString());
acceptedDetails.put("span_count", writtenTrace.size());
acceptedDetails.put("sampling_priority", spanToSample.samplingPriority());
Assert.sometimes(true, "trace_accepted_by_sampling", acceptedDetails);
writer.write(writtenTrace);
} else {
// Antithesis: Track traces dropped by sampling
ObjectNode droppedDetails = JsonNodeFactory.instance.objectNode();
droppedDetails.put("decision", "dropped_sampling");
droppedDetails.put("trace_id", writtenTrace.get(0).getTraceId().toString());
droppedDetails.put("span_count", writtenTrace.size());
droppedDetails.put("sampling_priority", spanToSample.samplingPriority());
Assert.sometimes(true, "trace_dropped_by_sampling", droppedDetails);
// with span streaming this won't work - it needs to be changed
// to track an effective sampling rate instead, however, tests
// checking that a hard reference on a continuation prevents
Expand Down
3 changes: 3 additions & 0 deletions remote-config/remote-config-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ dependencies {
implementation(libs.moshi)
implementation(libs.bundles.cafe.crypto)

// Antithesis SDK for assertions and property testing - bundled in tracer JAR
implementation(group = "com.antithesis", name = "sdk", version = "1.4.5")

implementation(project(":internal-api"))

testImplementation(project(":utils:test-utils"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,8 @@ private boolean initialize() {
new PollerRequestFactory(config, tracerVersion, containerId, entityId, url, moshi);
} catch (Exception e) {
// We can't recover from this, so we'll not try to initialize again.
fatalOnInitialization = true;
log.error("Remote configuration poller initialization failed", e);
fatalOnInitialization = true;
}
return true;
}
Expand Down
3 changes: 3 additions & 0 deletions telemetry/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ dependencies {
implementation(libs.slf4j)

implementation(project(":internal-api"))

// Antithesis SDK for assertions and property testing - bundled in tracer JAR
implementation(group = "com.antithesis", name = "sdk", version = "1.4.5")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: Another concern, is how much more weight it adds to the jar.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now this PR adds ~20mb to the final jar... but a lot of that is because it embeds antithesis, its native FFI wrapper, and various jackson dependencies multiple times in each of the product directories in the agent jar.

If we move it to the shared section in dd-java-agent/build.gradle and add the necessary excludes to gradle/dependencies.gradle like we do for other shared dependencies then the overhead is 3mb compressed and 8mb uncompressed.

That still feels too big to have in the general deliverable for something only used for testing purposes.

One option might be to only include the direct dependency in the release (i.e. without jackson or the ffi wrapper.) - in other words just enough to allow the classes to load. We'd then have to look at how to combine the other parts for testing, whether that's via -Xbootclasspath/a: or something similar.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much for your analysis. Right now, the size of the Jar "doesn't matter much" because it's just a PoC, but when I write the final evaluation report, I think the data you provide is really very important, and I will include it in that report as something critical to consider if it is decided to adopt this Antithesis API.


compileOnly(project(":dd-java-agent:agent-tooling"))
testImplementation(project(":dd-java-agent:agent-tooling"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,11 @@ public Result sendHttpRequest(Request.Builder httpRequestBuilder) {
try (okhttp3.Response response =
OkHttpUtils.sendWithRetries(okHttpClient, httpRetryPolicy, httpRequest)) {
if (response.code() == 404) {

log.debug("Telemetry endpoint is disabled, dropping {} message.", requestType);
return Result.NOT_FOUND;
}

if (!response.isSuccessful()) {
log.debug(
"Telemetry message {} failed with: {} {}.",
Expand All @@ -109,6 +111,7 @@ public Result sendHttpRequest(Request.Builder httpRequestBuilder) {
response.message());
return Result.FAILURE;
}

} catch (InterruptedIOException e) {
log.debug("Telemetry message {} sending interrupted: {}.", requestType, e.toString());
return Result.INTERRUPTED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public TelemetryClient.Result sendRequest(TelemetryRequest request) {
// interrupted request is most likely due to telemetry system shutdown,
// we do not want to log errors and reattempt in this case
&& result != TelemetryClient.Result.INTERRUPTED;

if (currentClient == agentClient) {
if (requestFailed) {
reportErrorOnce(currentClient.getUrl(), result);
Expand Down
Loading