Skip to content

Commit

Permalink
Add Guava asynchronous result type support for OpenTelemetry annotati…
Browse files Browse the repository at this point in the history
…ons (#5799)
  • Loading branch information
PerfectSlayer authored Aug 30, 2023
1 parent b2c7d02 commit 360dce2
Show file tree
Hide file tree
Showing 5 changed files with 251 additions and 0 deletions.
2 changes: 2 additions & 0 deletions dd-java-agent/instrumentation/guava-10/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ addTestSuiteForDir('latestDepTest', 'test')
dependencies {
compileOnly group: 'com.google.guava', name: 'guava', version: '10.0'

testImplementation project(':dd-java-agent:instrumentation:opentelemetry:opentelemetry-annotations-1.20')
testImplementation group: 'com.google.guava', name: 'guava', version: '16.0'
// ^ first version with com.google.common.reflect.ClassPath.getAllClasses()
testImplementation group: 'io.opentelemetry.instrumentation', name: 'opentelemetry-instrumentation-annotations', version: '1.28.0'

latestDepTestImplementation group: 'com.google.guava', name: 'guava', version: '+'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package datadog.trace.instrumentation.guava10;

import com.google.common.util.concurrent.ListenableFuture;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.decorator.AsyncResultDecorator;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;

public class GuavaAsyncResultSupportExtension
implements AsyncResultDecorator.AsyncResultSupportExtension {
static {
AsyncResultDecorator.registerExtension(new GuavaAsyncResultSupportExtension());
}

/**
* Register the extension as an {@link AsyncResultDecorator.AsyncResultSupportExtension} using
* static class initialization.<br>
* It uses an empty static method call to ensure the class loading and the one-time-only static
* class initialization. This will ensure this extension will only be registered once to the
* {@link AsyncResultDecorator}.
*/
public static void initialize() {}

@Override
public boolean supports(Class<?> result) {
return ListenableFuture.class.isAssignableFrom(result);
}

@Override
public Object apply(Object result, AgentSpan span) {
if (result instanceof ListenableFuture) {
ListenableFuture<?> listenableFuture = (ListenableFuture<?>) result;
if (!listenableFuture.isDone() && !listenableFuture.isCancelled()) {
listenableFuture.addListener(
() -> {
// Get value to check for execution exception
try {
listenableFuture.get();
} catch (ExecutionException e) {
span.addThrowable(e.getCause());
} catch (CancellationException | InterruptedException e) {
// Ignored
}
span.finish();
},
Runnable::run);
return result;
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import com.google.auto.service.AutoService;
Expand Down Expand Up @@ -31,18 +32,34 @@ public String instrumentedType() {
return "com.google.common.util.concurrent.AbstractFuture";
}

@Override
public String[] helperClassNames() {
return new String[] {
this.packageName + ".GuavaAsyncResultSupportExtension",
};
}

@Override
public Map<String, String> contextStore() {
return singletonMap(Runnable.class.getName(), State.class.getName());
}

@Override
public void adviceTransformations(AdviceTransformation transformation) {
transformation.applyAdvice(
isConstructor(), ListenableFutureInstrumentation.class.getName() + "$AbstractFutureAdvice");
transformation.applyAdvice(
named("addListener").and(takesArguments(Runnable.class, Executor.class)),
ListenableFutureInstrumentation.class.getName() + "$AddListenerAdvice");
}

public static class AbstractFutureAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void init() {
GuavaAsyncResultSupportExtension.initialize();
}
}

public static class AddListenerAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static State addListenerEnter(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import annotatedsample.GuavaTracedMethods
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.bootstrap.instrumentation.api.Tags
import spock.lang.Shared

import java.util.concurrent.CountDownLatch
import java.util.concurrent.ExecutionException
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors

class GuavaAsyncResultSupportExtensionTest extends AgentTestRunner {
@Override
void configurePreAgent() {
super.configurePreAgent()

injectSysConfig("dd.integration.opentelemetry-annotations-1.20.enabled", "true")
}

@Shared
ExecutorService executor

def setupSpec() {
this.executor = Executors.newSingleThreadExecutor()
}

def cleanupSpec() {
this.executor.shutdownNow()
}

def "test WithSpan annotated async method (ListenableFuture)"() {
setup:
def latch = new CountDownLatch(1)
def listenableFuture = GuavaTracedMethods.traceAsyncListenableFuture(executor, latch)
expect:
TEST_WRITER.size() == 0
when:
latch.countDown()
listenableFuture.get()
then:
assertTraces(1) {
trace(1) {
span {
resourceName "GuavaTracedMethods.traceAsyncListenableFuture"
operationName "GuavaTracedMethods.traceAsyncListenableFuture"
tags {
defaultTags()
"$Tags.COMPONENT" "opentelemetry"
}
}
}
}
}
def "test WithSpan annotated async method (cancelled ListenableFuture)"() {
setup:
def latch = new CountDownLatch(1)
def listenableFuture = GuavaTracedMethods.traceAsyncCancelledListenableFuture(latch)
expect:
TEST_WRITER.size() == 0
when:
listenableFuture.cancel(true)
then:
assertTraces(1) {
trace(1) {
span {
resourceName "GuavaTracedMethods.traceAsyncCancelledListenableFuture"
operationName "GuavaTracedMethods.traceAsyncCancelledListenableFuture"
tags {
defaultTags()
"$Tags.COMPONENT" "opentelemetry"
}
}
}
}
}
def "test WithSpan annotated async method (failing ListenableFuture)"() {
setup:
def latch = new CountDownLatch(1)
def expectedException = new IllegalStateException("Test exception")
def listenableFuture = GuavaTracedMethods.traceAsyncFailingListenableFuture(executor, latch, expectedException)
expect:
TEST_WRITER.size() == 0
when:
latch.countDown()
listenableFuture.get()
then:
thrown(ExecutionException)
assertTraces(1) {
trace(1) {
span {
resourceName "GuavaTracedMethods.traceAsyncFailingListenableFuture"
operationName "GuavaTracedMethods.traceAsyncFailingListenableFuture"
errored true
tags {
defaultTags()
"$Tags.COMPONENT" "opentelemetry"
errorTags(expectedException)
}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package annotatedsample;

import static java.util.concurrent.TimeUnit.SECONDS;

import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;

public class GuavaTracedMethods {
@WithSpan
public static ListenableFuture<String> traceAsyncListenableFuture(
ExecutorService executor, CountDownLatch latch) {
TestFuture listenableFuture = TestFuture.ofComplete(latch, "hello");
executor.submit(listenableFuture::start);
return listenableFuture;
}

@WithSpan
public static ListenableFuture<?> traceAsyncCancelledListenableFuture(CountDownLatch latch) {
return TestFuture.ofComplete(latch, "hello");
}

@WithSpan
public static ListenableFuture<?> traceAsyncFailingListenableFuture(
ExecutorService executor, CountDownLatch latch, Throwable exception) {
TestFuture listenableFuture = TestFuture.ofFailing(latch, exception);
executor.submit(listenableFuture::start);
return listenableFuture;
}

private static class TestFuture extends AbstractFuture<String> {
private final CountDownLatch latch;
private final String value;
private final Throwable exception;

private TestFuture(CountDownLatch latch, String value, Throwable exception) {
this.latch = latch;
this.value = value;
this.exception = exception;
}

private void start() {
try {
if (!this.latch.await(5, SECONDS)) {
throw new IllegalStateException("Latch still locked");
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (this.exception != null) {
setException(this.exception);
} else {
set(this.value);
}
}

private static TestFuture ofComplete(CountDownLatch latch, String value) {
return new TestFuture(latch, value, null);
}

private static TestFuture ofFailing(CountDownLatch latch, Throwable exception) {
return new TestFuture(latch, null, exception);
}
}
}

0 comments on commit 360dce2

Please sign in to comment.