Skip to content

Move JSON generation to sender thread to improve startup time. #9197

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package datadog.trace.bootstrap;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;

import datadog.json.JsonWriter;
import datadog.trace.bootstrap.environment.EnvironmentVariables;
import de.thetaphi.forbiddenapis.SuppressForbidden;
import java.io.Closeable;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -51,14 +53,14 @@ public static BootstrapInitializationTelemetry createFromForwarderPath(String fo
*/
public abstract void onError(Throwable t);

public abstract void onError(String reasonCode);

/**
* Indicates an exception that occurred during the bootstrapping process that left initialization
* incomplete. Equivalent to calling {@link #onError(Throwable)} and {@link #markIncomplete()}
*/
public abstract void onFatalError(Throwable t);

public abstract void onError(String reasonCode);

public abstract void markIncomplete();

public abstract void finish();
Expand All @@ -78,10 +80,10 @@ public void onAbort(String reasonCode) {}
public void onError(String reasonCode) {}

@Override
public void onFatalError(Throwable t) {}
public void onError(Throwable t) {}

@Override
public void onError(Throwable t) {}
public void onFatalError(Throwable t) {}

@Override
public void markIncomplete() {}
Expand All @@ -93,38 +95,31 @@ public void finish() {}
public static final class JsonBased extends BootstrapInitializationTelemetry {
private final JsonSender sender;

private final List<String> meta;
private final Map<String, List<String>> points;
private final Telemetry telemetry;

// one way false to true
private volatile boolean incomplete = false;
private volatile boolean error = false;

JsonBased(JsonSender sender) {
this.sender = sender;
this.meta = new ArrayList<>();
this.points = new LinkedHashMap<>();
this.telemetry = new Telemetry();
}

@Override
public void initMetaInfo(String attr, String value) {
synchronized (this.meta) {
this.meta.add(attr);
this.meta.add(value);
}
telemetry.setMetadata(attr, value);
}

@Override
public void onAbort(String reasonCode) {
onPoint("library_entrypoint.abort", "reason:" + reasonCode);
onPoint("library_entrypoint.abort", singletonList("reason:" + reasonCode));
markIncomplete();
setMetaInfo("abort", mapResultClass(reasonCode), reasonCode);
setResultMeta("abort", mapResultClass(reasonCode), reasonCode);
}

@Override
public void onError(Throwable t) {
error = true;
setMetaInfo("error", "internal_error", t.getMessage());
setResultMeta("error", "internal_error", t.getMessage());

List<String> causes = new ArrayList<>();

Expand All @@ -145,6 +140,12 @@ public void onError(Throwable t) {
onPoint("library_entrypoint.error", causes);
}

@Override
public void onError(String reasonCode) {
onPoint("library_entrypoint.error", singletonList("error_type:" + reasonCode));
setResultMeta("error", mapResultClass(reasonCode), reasonCode);
}

private int maxTags() {
String maxTags = EnvironmentVariables.get("DD_TELEMETRY_FORWARDER_MAX_TAGS");

Expand All @@ -165,14 +166,7 @@ public void onFatalError(Throwable t) {
markIncomplete();
}

@Override
public void onError(String reasonCode) {
error = true;
onPoint("library_entrypoint.error", "error_type:" + reasonCode);
setMetaInfo("error", mapResultClass(reasonCode), reasonCode);
}

private void setMetaInfo(String result, String resultClass, String resultReason) {
private void setResultMeta(String result, String resultClass, String resultReason) {
initMetaInfo("result", result);
initMetaInfo("result_class", resultClass);
initMetaInfo("result_reason", resultReason);
Expand All @@ -195,14 +189,8 @@ private String mapResultClass(String reasonCode) {
}
}

private void onPoint(String name, String tag) {
onPoint(name, Collections.singletonList(tag));
}

private void onPoint(String name, List<String> tags) {
synchronized (this.points) {
this.points.put(name, tags);
}
telemetry.addPoint(name, tags);
}

@Override
Expand All @@ -212,51 +200,91 @@ public void markIncomplete() {

@Override
public void finish() {
if (!this.incomplete && !this.error) {
setMetaInfo("success", "success", "Successfully configured ddtrace package");
if (!this.incomplete) {
onPoint("library_entrypoint.complete", emptyList());
}

this.sender.send(telemetry);
}
}

public static class Telemetry {
private final Map<String, String> metadata;
private final Map<String, List<String>> points;

public Telemetry() {
metadata = new LinkedHashMap<>();
points = new LinkedHashMap<>();

setResults("success", "success", "Successfully configured ddtrace package");
}

public void setMetadata(String name, String value) {
synchronized (metadata) {
metadata.put(name, value);
}
}

public void setResults(String result, String resultClass, String resultReason) {
synchronized (metadata) {
metadata.put("result", result);
metadata.put("result_class", resultClass);
metadata.put("result_reason", resultReason);
}
}

public void addPoint(String name, List<String> tags) {
synchronized (points) {
points.put(name, tags);
}
}

@Override
public String toString() {
try (JsonWriter writer = new JsonWriter()) {
writer.beginObject();
writer.name("metadata").beginObject();
synchronized (this.meta) {
for (int i = 0; i + 1 < this.meta.size(); i = i + 2) {
writer.name(this.meta.get(i));
writer.value(this.meta.get(i + 1));
synchronized (metadata) {
for (Map.Entry<String, String> entry : metadata.entrySet()) {
writer.name(entry.getKey());
writer.value(entry.getValue());
}

metadata.clear();
}
writer.endObject();

writer.name("points").beginArray();
synchronized (this.points) {
synchronized (points) {
for (Map.Entry<String, List<String>> entry : points.entrySet()) {
writer.beginObject();
writer.name("name").value(entry.getKey());
writer.name("tags").beginArray();
for (String tag : entry.getValue()) {
writer.value(tag);
if (!entry.getValue().isEmpty()) {
writer.name("tags").beginArray();
for (String tag : entry.getValue()) {
writer.value(tag);
}
writer.endArray();
}
writer.endArray();
writer.endObject();
}
this.points.clear();
}
if (!this.incomplete) {
writer.beginObject().name("name").value("library_entrypoint.complete").endObject();

points.clear();
}
writer.endArray();
writer.endObject();

this.sender.send(writer.toByteArray());
} catch (Throwable t) {
// Since this is the reporting mechanism, there's little recourse here
// Decided to simply ignore - arguably might want to write to stderr
return writer.toString();
}
}
}

/**
* Declare telemetry as {@code Object} to avoid issue with double class loading from different
* classloaders.
*/
public interface JsonSender {
void send(byte[] payload);
void send(Object telemetry);
}

public static final class ForwarderJsonSender implements JsonSender {
Expand All @@ -267,21 +295,21 @@ public static final class ForwarderJsonSender implements JsonSender {
}

@Override
public void send(byte[] payload) {
ForwarderJsonSenderThread t = new ForwarderJsonSenderThread(forwarderPath, payload);
public void send(Object telemetry) {
ForwarderJsonSenderThread t = new ForwarderJsonSenderThread(forwarderPath, telemetry);
t.setDaemon(true);
t.start();
}
}

public static final class ForwarderJsonSenderThread extends Thread {
private final String forwarderPath;
private final byte[] payload;
private final Object telemetry;

public ForwarderJsonSenderThread(String forwarderPath, byte[] payload) {
public ForwarderJsonSenderThread(String forwarderPath, Object telemetry) {
super("dd-forwarder-json-sender");
this.forwarderPath = forwarderPath;
this.payload = payload;
this.telemetry = telemetry;
}

@SuppressForbidden
Expand All @@ -291,6 +319,8 @@ public void run() {

// Run forwarder and mute tracing for subprocesses executed in by dd-java-agent.
try (final Closeable ignored = muteTracing()) {
byte[] payload = telemetry.toString().getBytes();

Process process = builder.start();
try (OutputStream out = process.getOutputStream()) {
out.write(payload);
Expand Down
Loading
Loading