Skip to content

Commit

Permalink
[pinpoint-apm#2584] Refactor AsyncChildTrace
Browse files Browse the repository at this point in the history
 - Remove DefaultTrace dependency
 - Reduce memory allocation
 - Simplify Storage handling
  • Loading branch information
emeroad committed Oct 20, 2017
1 parent 47e894f commit 76d2349
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@


import com.navercorp.pinpoint.bootstrap.context.scope.TraceScope;
import com.navercorp.pinpoint.common.annotations.InterfaceAudience;

/**
* @author emeroad
Expand All @@ -27,17 +28,21 @@ public interface Trace extends StackOperation {
// ----------------------------------------------
// activeTrace related api
// TODO extract interface???
@InterfaceAudience.Private
long getId();

@InterfaceAudience.Private
long getStartTime();

/**
* @deprecated Since 1.7.0 Use {@link #getThreadId()}
* This API will be removed in 1.8.0
*/
@Deprecated
@InterfaceAudience.Private
Thread getBindThread();

@InterfaceAudience.Private
long getThreadId();

//------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,54 @@
import com.navercorp.pinpoint.bootstrap.context.*;
import com.navercorp.pinpoint.bootstrap.context.scope.TraceScope;
import com.navercorp.pinpoint.common.util.Assert;
import com.navercorp.pinpoint.exception.PinpointException;
import com.navercorp.pinpoint.profiler.context.id.TraceRoot;
import com.navercorp.pinpoint.profiler.context.recorder.WrappedSpanEventRecorder;
import com.navercorp.pinpoint.profiler.context.scope.DefaultTraceScopePool;
import com.navercorp.pinpoint.profiler.context.storage.Storage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncChildTrace implements Trace {

private static final int BEGIN_STACKID = 1;
private static final int ASYNC_BEGIN_STACK_ID = 1001;

private final AsyncContextFactory asyncContextFactory;
private static final Logger logger = LoggerFactory.getLogger(AsyncChildTrace.class.getName());
private static final boolean isWarn = logger.isWarnEnabled();

private final boolean sampling;

private final CallStack callStack;

private final Storage storage;

private final TraceRoot traceRoot;
private final DefaultTrace trace;
private final WrappedSpanEventRecorder wrappedSpanEventRecorder;

private final AsyncContextFactory asyncContextFactory;
private final SpanRecorder spanRecorder;

private boolean closed = false;

private final DefaultTraceScopePool scopePool = new DefaultTraceScopePool();

private final int asyncId;
private final short asyncSequence;

public AsyncChildTrace(final AsyncContextFactory asyncContextFactory, final TraceRoot traceRoot, final DefaultTrace trace, final int asyncId, final short asyncSequence) {
this.asyncContextFactory = Assert.requireNonNull(asyncContextFactory, "asyncContextFactory must not be null");
public AsyncChildTrace(final TraceRoot traceRoot, CallStack callStack, Storage storage, AsyncContextFactory asyncContextFactory, boolean sampling,
SpanRecorder spanRecorder, WrappedSpanEventRecorder wrappedSpanEventRecorder, final int asyncId, final short asyncSequence) {

this.traceRoot = Assert.requireNonNull(traceRoot, "traceRoot must not be null");
this.trace = Assert.requireNonNull(trace, "trace must not be null");
this.callStack = Assert.requireNonNull(callStack, "callStack must not be null");
this.storage = Assert.requireNonNull(storage, "storage must not be null");
this.asyncContextFactory = Assert.requireNonNull(asyncContextFactory, "asyncContextFactory must not be null");
this.sampling = sampling;
this.spanRecorder = Assert.requireNonNull(spanRecorder, "spanRecorder must not be null");
this.wrappedSpanEventRecorder = Assert.requireNonNull(wrappedSpanEventRecorder, "wrappedSpanEventRecorder must not be null");
this.asyncId = asyncId;
this.asyncSequence = asyncSequence;

traceBlockBegin(BEGIN_STACKID);
traceBlockBegin(ASYNC_BEGIN_STACK_ID);
}


Expand Down Expand Up @@ -69,9 +95,14 @@ public TraceId getTraceId() {
return this.traceRoot.getTraceId();
}

private SpanEventRecorder wrappedSpanEventRecorder(WrappedSpanEventRecorder wrappedSpanEventRecorder, SpanEvent spanEvent) {
wrappedSpanEventRecorder.setWrapped(spanEvent);
return wrappedSpanEventRecorder;
}

@Override
public boolean canSampled() {
return trace.canSampled();
return sampling;
}

@Override
Expand All @@ -81,28 +112,85 @@ public boolean isRoot() {

@Override
public SpanEventRecorder traceBlockBegin() {
final SpanEventRecorder recorder = trace.traceBlockBegin();
recorder.recordAsyncId(asyncId);
recorder.recordAsyncSequence(asyncSequence);
return recorder;
return traceBlockBegin(DEFAULT_STACKID);
}

public SpanEvent traceBlockBegin0(final int stackId) {
if (closed) {
if (isWarn) {
stackDump("already closed trace");
}
final SpanEvent dummy = newSpanEvent(stackId);
return dummy;
}
// Set properties for the case when stackFrame is not used as part of Span.
final SpanEvent spanEvent = newSpanEvent(stackId);
this.callStack.push(spanEvent);
return spanEvent;
}

private SpanEvent newSpanEvent(int stackId) {
final SpanEvent spanEvent = new SpanEvent(traceRoot);
spanEvent.markStartTime();
spanEvent.setStackId(stackId);
return spanEvent;
}

private void stackDump(String caused) {
PinpointException exception = new PinpointException(caused);
logger.warn("[DefaultTrace] Corrupted call stack found TraceRoot:{}, CallStack:{}", traceRoot, callStack, exception);
}

@Override
public SpanEventRecorder traceBlockBegin(int stackId) {
final SpanEventRecorder recorder = trace.traceBlockBegin(stackId);
recorder.recordAsyncId(asyncId);
recorder.recordAsyncSequence(asyncSequence);
return recorder;
final SpanEvent spanEvent = traceBlockBegin0(stackId);
spanEvent.setAsyncId(asyncId);
spanEvent.setAsyncSequence(asyncSequence);

return wrappedSpanEventRecorder(wrappedSpanEventRecorder, spanEvent);
}

@Override
public void traceBlockEnd() {
trace.traceBlockEnd();
traceBlockEnd(DEFAULT_STACKID);
}

@Override
public void traceBlockEnd(int stackId) {
trace.traceBlockEnd(stackId);
if (closed) {
if (isWarn) {
stackDump("already closed trace");
}
return;
}

final SpanEvent spanEvent = callStack.pop();
if (spanEvent == null) {
if (isWarn) {
stackDump("call stack is empty.");
}
return;
}

if (spanEvent.getStackId() != stackId) {
// stack dump will make debugging easy.
if (isWarn) {
stackDump("not matched stack id. expected=" + stackId + ", current=" + spanEvent.getStackId());
}
}

if (spanEvent.isTimeRecording()) {
spanEvent.markAfterTime();
}
logSpan(spanEvent);
}

private void logSpan(SpanEvent spanEvent) {
this.storage.store(spanEvent);
}

private void logSpan() {
this.storage.flush();
}

@Override
Expand All @@ -112,7 +200,16 @@ public boolean isAsync() {

@Override
public boolean isRootStack() {
return trace.getCallStackFrameId() == BEGIN_STACKID;
return getCallStackFrameId0() == ASYNC_BEGIN_STACK_ID;
}

public int getCallStackFrameId0() {
final SpanEvent spanEvent = callStack.peek();
if (spanEvent == null) {
return ROOT_STACKID;
} else {
return spanEvent.getStackId();
}
}

/**
Expand All @@ -127,46 +224,79 @@ public AsyncTraceId getAsyncTraceId() {

@Override
public boolean isClosed() {
return this.trace.isClosed();
return closed;
}

@Override
public void close() {
traceBlockEnd(BEGIN_STACKID);
trace.close();
traceBlockEnd(ASYNC_BEGIN_STACK_ID);
close0();
}

public void close0() {
if (closed) {
logger.warn("Already closed childTrace");
return;
}
closed = true;

if (!callStack.empty()) {
if (isWarn) {
stackDump("not empty call stack");
}
// skip
} else {
logSpan();
}

this.storage.close();

}


@Override
public SpanRecorder getSpanRecorder() {
return trace.getSpanRecorder();
return spanRecorder;
}

@Override
public SpanEventRecorder currentSpanEventRecorder() {
return trace.currentSpanEventRecorder();
SpanEvent spanEvent = callStack.peek();
if (spanEvent == null) {
if (isWarn) {
stackDump("call stack is empty");
}
// make dummy.
spanEvent = new SpanEvent(traceRoot);
}

return wrappedSpanEventRecorder(this.wrappedSpanEventRecorder, spanEvent);
}

@Override
public int getCallStackFrameId() {
return trace.getCallStackFrameId();
final SpanEvent spanEvent = callStack.peek();
if (spanEvent == null) {
return ROOT_STACKID;
} else {
return spanEvent.getStackId();
}
}

@Override
public TraceScope getScope(String name) {
return trace.getScope(name);
return scopePool.get(name);
}

@Override
public TraceScope addScope(String name) {
return trace.addScope(name);
return scopePool.add(name);
}

@Override
public String toString() {
return "AsyncChildTrace{" +
"traceRoot=" + traceRoot +
", trace=" + trace +
", asyncId=" + asyncId +
", asyncSequence=" + asyncSequence +
'}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,6 @@ public boolean isClosed() {

@Override
public void close() {
final AsyncState asyncState = this.asyncState;
if (asyncState == null) {
return;
}

if (asyncState.await()) {
// flush.
this.trace.flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import com.navercorp.pinpoint.profiler.context.id.ListenableAsyncState;
import com.navercorp.pinpoint.profiler.context.recorder.RecorderFactory;
import com.navercorp.pinpoint.profiler.context.recorder.WrappedSpanEventRecorder;
import com.navercorp.pinpoint.profiler.context.storage.AsyncStorage;
import com.navercorp.pinpoint.profiler.context.storage.Storage;
import com.navercorp.pinpoint.profiler.context.storage.StorageFactory;

Expand Down Expand Up @@ -140,26 +139,28 @@ public Trace newTraceObject() {
@Override
public Trace continueAsyncTraceObject(TraceRoot traceRoot, int asyncId, short asyncSequence) {

final Span span = spanFactory.newSpan(traceRoot);

final Storage storage = storageFactory.createStorage(traceRoot);

final Storage asyncStorage = new AsyncStorage(storage);
final CallStack callStack = callStackFactory.newCallStack(traceRoot);

final boolean samplingEnable = true;
final TraceId traceId = traceRoot.getTraceId();
final SpanRecorder spanRecorder = recorderFactory.newSpanRecorder(span, traceId.isRoot(), samplingEnable);
final WrappedSpanEventRecorder wrappedSpanEventRecorder = recorderFactory.newWrappedSpanEventRecorder();
final SpanRecorder spanRecorder = newAsyncChildSpanRecorder(traceRoot, samplingEnable);

// TODO AtomicIdGenerator.UNTRACKED_ID
final DefaultTrace trace = new DefaultTrace(span, callStack, asyncStorage, asyncContextFactory, samplingEnable, spanRecorder, wrappedSpanEventRecorder, ActiveTraceHandle.EMPTY_HANDLE);
final WrappedSpanEventRecorder wrappedSpanEventRecorder = recorderFactory.newWrappedSpanEventRecorder();

final Trace asyncTrace = new AsyncChildTrace(asyncContextFactory, traceRoot, trace, asyncId, asyncSequence);
final Trace asyncTrace = new AsyncChildTrace(traceRoot, callStack, storage, asyncContextFactory, samplingEnable, spanRecorder, wrappedSpanEventRecorder, asyncId, asyncSequence);

return asyncTrace;
}

private SpanRecorder newAsyncChildSpanRecorder(TraceRoot traceRoot, boolean samplingEnable) {
// TODO implement SpanRecorder based on TraceRoot
// remove span allocation
final Span span = spanFactory.newSpan(traceRoot);
final TraceId traceId = traceRoot.getTraceId();
return recorderFactory.newSpanRecorder(span, traceId.isRoot(), samplingEnable);
}

// entry point async trace.
@InterfaceAudience.LimitedPrivate("vert.x")
@Override
Expand Down
Loading

0 comments on commit 76d2349

Please sign in to comment.