Skip to content

Commit

Permalink
[pinpoint-apm#3275] Supports TCP transport type for Span and Stat data
Browse files Browse the repository at this point in the history
1. implement profiler side
2. add config
  • Loading branch information
koo-taejin authored and emeroad committed Aug 31, 2017
1 parent 68232ea commit fc19f45
Show file tree
Hide file tree
Showing 37 changed files with 595 additions and 203 deletions.
8 changes: 8 additions & 0 deletions agent/src/main/resources-local/pinpoint.config
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,21 @@ profiler.spandatasender.write.queue.size=5120
#profiler.spandatasender.socket.timeout=3000
profiler.spandatasender.chunk.size=16384
profiler.spandatasender.socket.type=OIO
# Should keep in mind
# 1. Loadbancing : TCP transport load balancing is per connection.(UDP transport loadbalancing is per packet)
# 2. In unexpected situations, UDP has its own protection feature (like packet loss etc.), but tcp does not have such a feature. (We will add protection later)
profiler.spandatasender.transport.type=UDP

# Capacity of the StatDataSender write queue.
profiler.statdatasender.write.queue.size=5120
#profiler.statdatasender.socket.sendbuffersize=1048576
#profiler.statdatasender.socket.timeout=3000
profiler.statdatasender.chunk.size=16384
profiler.statdatasender.socket.type=OIO
# Should keep in mind
# 1. Loadbancing : TCP transport load balancing is per connection.(UDP transport loadbalancing is per packet)
# 2. In unexpected situations, UDP has its own protection feature (like packet loss etc.), but tcp does not have such a feature. (We will add protection later)
profiler.statdatasender.transport.type=UDP

# Interval to retry sending agent info. Unit is milliseconds.
profiler.agentInfo.send.retry.interval=300000
Expand Down
8 changes: 8 additions & 0 deletions agent/src/main/resources-release/pinpoint.config
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,21 @@ profiler.spandatasender.write.queue.size=5120
#profiler.spandatasender.socket.timeout=3000
profiler.spandatasender.chunk.size=16384
profiler.spandatasender.socket.type=OIO
# Should keep in mind
# 1. Loadbancing : TCP transport load balancing is per connection.(UDP transport loadbalancing is per packet)
# 2. In unexpected situations, UDP has its own protection feature (like packet loss etc.), but tcp does not have such a feature. (We will add protection later)
profiler.spandatasender.transport.type=UDP

# Capacity of the StatDataSender write queue.
profiler.statdatasender.write.queue.size=5120
#profiler.statdatasender.socket.sendbuffersize=1048576
#profiler.statdatasender.socket.timeout=3000
profiler.statdatasender.chunk.size=16384
profiler.statdatasender.socket.type=OIO
# Should keep in mind
# 1. Loadbancing : TCP transport load balancing is per connection.(UDP transport loadbalancing is per packet)
# 2. In unexpected situations, UDP has its own protection feature (like packet loss etc.), but tcp does not have such a feature. (We will add protection later)
profiler.statdatasender.transport.type=UDP

# Interval to retry sending agent info. Unit is milliseconds.
profiler.agentInfo.send.retry.interval=300000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,20 @@ profiler.spandatasender.write.queue.size=5120
#profiler.spandatasender.socket.sendbuffersize=1048576
#profiler.spandatasender.socket.timeout=3000
profiler.spandatasender.chunk.size=16384
# Should keep in mind
# 1. Loadbancing : TCP transport load balancing is per connection.(UDP transport loadbalancing is per packet)
# 2. In unexpected situations, UDP has its own protection feature (like packet loss etc.), but tcp does not have such a feature. (We will add protection later)
profiler.spandatasender.transport.type=UDP

# Capacity of the StatDataSender write queue.
profiler.statdatasender.write.queue.size=5120
#profiler.statdatasender.socket.sendbuffersize=1048576
#profiler.statdatasender.socket.timeout=3000
profiler.statdatasender.chunk.size=16384
# Should keep in mind
# 1. Loadbancing : TCP transport load balancing is per connection.(UDP transport loadbalancing is per packet)
# 2. In unexpected situations, UDP has its own protection feature (like packet loss etc.), but tcp does not have such a feature. (We will add protection later)
profiler.statdatasender.transport.type=UDP

# Interval to retry sending agent info. Unit is milliseconds.
profiler.agentInfo.send.retry.interval=300000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,20 @@ profiler.spandatasender.write.queue.size=5120
#profiler.spandatasender.socket.sendbuffersize=1048576
#profiler.spandatasender.socket.timeout=3000
profiler.spandatasender.chunk.size=16384
# Should keep in mind
# 1. Loadbancing : TCP transport load balancing is per connection.(UDP transport loadbalancing is per packet)
# 2. In unexpected situations, UDP has its own protection feature (like packet loss etc.), but tcp does not have such a feature. (We will add protection later)
profiler.spandatasender.transport.type=UDP

# Capacity of the StatDataSender write queue.
profiler.statdatasender.write.queue.size=5120
#profiler.statdatasender.socket.sendbuffersize=1048576
#profiler.statdatasender.socket.timeout=3000
profiler.statdatasender.chunk.size=16384
# Should keep in mind
# 1. Loadbancing : TCP transport load balancing is per connection.(UDP transport loadbalancing is per packet)
# 2. In unexpected situations, UDP has its own protection feature (like packet loss etc.), but tcp does not have such a feature. (We will add protection later)
profiler.statdatasender.transport.type=UDP

# Interval to retry sending agent info. Unit is milliseconds.
profiler.agentInfo.send.retry.interval=300000
Expand Down
8 changes: 8 additions & 0 deletions agent/src/test/resources/pinpoint-disabled-plugin-test.config
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,19 @@ profiler.spandatasender.write.queue.size=5120
#profiler.spandatasender.socket.sendbuffersize=1048576
#profiler.spandatasender.socket.timeout=3000
profiler.spandatasender.chunk.size=16384
# Should keep in mind
# 1. Loadbancing : TCP transport load balancing is per connection.(UDP transport loadbalancing is per packet)
# 2. In unexpected situations, UDP has its own protection feature (like packet loss etc.), but tcp does not have such a feature. (We will add protection later)
profiler.spandatasender.transport.type=UDP

profiler.statdatasender.write.queue.size=5120
#profiler.statdatasender.socket.sendbuffersize=1048576
#profiler.statdatasender.socket.timeout=3000
profiler.statdatasender.chunk.size=16384
# Should keep in mind
# 1. Loadbancing : TCP transport load balancing is per connection.(UDP transport loadbalancing is per packet)
# 2. In unexpected situations, UDP has its own protection feature (like packet loss etc.), but tcp does not have such a feature. (We will add protection later)
profiler.statdatasender.transport.type=UDP

profiler.agentInfo.send.retry.interval=300000

Expand Down
8 changes: 8 additions & 0 deletions agent/src/test/resources/pinpoint-netty-plugin-test.config
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,19 @@ profiler.spandatasender.write.queue.size=5120
#profiler.spandatasender.socket.sendbuffersize=1048576
#profiler.spandatasender.socket.timeout=3000
profiler.spandatasender.chunk.size=16384
# Should keep in mind
# 1. Loadbancing : TCP transport load balancing is per connection.(UDP transport loadbalancing is per packet)
# 2. In unexpected situations, UDP has its own protection feature (like packet loss etc.), but tcp does not have such a feature. (We will add protection later)
profiler.spandatasender.transport.type=UDP

profiler.statdatasender.write.queue.size=5120
#profiler.statdatasender.socket.sendbuffersize=1048576
#profiler.statdatasender.socket.timeout=3000
profiler.statdatasender.chunk.size=16384
# Should keep in mind
# 1. Loadbancing : TCP transport load balancing is per connection.(UDP transport loadbalancing is per packet)
# 2. In unexpected situations, UDP has its own protection feature (like packet loss etc.), but tcp does not have such a feature. (We will add protection later)
profiler.statdatasender.transport.type=UDP

profiler.agentInfo.send.retry.interval=300000

Expand Down
8 changes: 8 additions & 0 deletions agent/src/test/resources/pinpoint-spring-bean-test.config
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,19 @@ profiler.spandatasender.write.queue.size=5120
#profiler.spandatasender.socket.sendbuffersize=1048576
#profiler.spandatasender.socket.timeout=3000
profiler.spandatasender.chunk.size=16384
# Should keep in mind
# 1. Loadbancing : TCP transport load balancing is per connection.(UDP transport loadbalancing is per packet)
# 2. In unexpected situations, UDP has its own protection feature (like packet loss etc.), but tcp does not have such a feature. (We will add protection later)
profiler.spandatasender.transport.type=UDP

profiler.statdatasender.write.queue.size=5120
#profiler.statdatasender.socket.sendbuffersize=1048576
#profiler.statdatasender.socket.timeout=3000
profiler.statdatasender.chunk.size=16384
# Should keep in mind
# 1. Loadbancing : TCP transport load balancing is per connection.(UDP transport loadbalancing is per packet)
# 2. In unexpected situations, UDP has its own protection feature (like packet loss etc.), but tcp does not have such a feature. (We will add protection later)
profiler.statdatasender.transport.type=UDP

profiler.agentInfo.send.retry.interval=300000

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,14 @@ public static ProfilerConfig load(String pinpointConfigFileName) throws IOExcept
private int spanDataSenderSocketSendBufferSize = 1024 * 64 * 16;
private int spanDataSenderSocketTimeout = 1000 * 3;
private int spanDataSenderChunkSize = 1024 * 16;
private String spanDataSenderTransportType = "UDP";
private String spanDataSenderSocketType = "OIO";

private int statDataSenderWriteQueueSize = 1024 * 5;
private int statDataSenderSocketSendBufferSize = 1024 * 64 * 16;
private int statDataSenderSocketTimeout = 1000 * 3;
private int statDataSenderChunkSize = 1024 * 16;
private String statDataSenderTransportType = "UDP";
private String statDataSenderSocketType = "OIO";

private boolean tcpDataSenderCommandAcceptEnable = false;
Expand Down Expand Up @@ -235,6 +237,11 @@ public String getStatDataSenderSocketType() {
return statDataSenderSocketType;
}

@Override
public String getStatDataSenderTransportType() {
return statDataSenderTransportType;
}

@Override
public int getSpanDataSenderWriteQueueSize() {
return spanDataSenderWriteQueueSize;
Expand Down Expand Up @@ -305,6 +312,11 @@ public String getSpanDataSenderSocketType() {
return spanDataSenderSocketType;
}

@Override
public String getSpanDataSenderTransportType() {
return spanDataSenderTransportType;
}

@Override
public int getSpanDataSenderChunkSize() {
return spanDataSenderChunkSize;
Expand Down Expand Up @@ -487,12 +499,14 @@ void readPropertyValues() {
this.spanDataSenderSocketTimeout = readInt("profiler.spandatasender.socket.timeout", 1000 * 3);
this.spanDataSenderChunkSize = readInt("profiler.spandatasender.chunk.size", 1024 * 16);
this.spanDataSenderSocketType = readString("profiler.spandatasender.socket.type", "OIO");
this.spanDataSenderTransportType = readString("profiler.spandatasender.transport.type", "UDP");

this.statDataSenderWriteQueueSize = readInt("profiler.statdatasender.write.queue.size", 1024 * 5);
this.statDataSenderSocketSendBufferSize = readInt("profiler.statdatasender.socket.sendbuffersize", 1024 * 64 * 16);
this.statDataSenderSocketTimeout = readInt("profiler.statdatasender.socket.timeout", 1000 * 3);
this.statDataSenderChunkSize = readInt("profiler.statdatasender.chunk.size", 1024 * 16);
this.statDataSenderSocketType = readString("profiler.statdatasender.socket.type", "OIO");
this.statDataSenderTransportType = readString("profiler.statdatasender.transport.type", "UDP");

this.tcpDataSenderCommandAcceptEnable = readBoolean("profiler.tcpdatasender.command.accept.enable", false);
this.tcpDataSenderCommandActiveThreadEnable = readBoolean("profiler.tcpdatasender.command.activethread.enable", false);
Expand Down Expand Up @@ -691,11 +705,13 @@ public String toString() {
sb.append(", spanDataSenderSocketTimeout=").append(spanDataSenderSocketTimeout);
sb.append(", spanDataSenderChunkSize=").append(spanDataSenderChunkSize);
sb.append(", spanDataSenderSocketType='").append(spanDataSenderSocketType).append('\'');
sb.append(", spanDataSenderTransportType='").append(spanDataSenderTransportType).append('\'');
sb.append(", statDataSenderWriteQueueSize=").append(statDataSenderWriteQueueSize);
sb.append(", statDataSenderSocketSendBufferSize=").append(statDataSenderSocketSendBufferSize);
sb.append(", statDataSenderSocketTimeout=").append(statDataSenderSocketTimeout);
sb.append(", statDataSenderChunkSize=").append(statDataSenderChunkSize);
sb.append(", statDataSenderSocketType='").append(statDataSenderSocketType).append('\'');
sb.append(", statDataSenderTransportType='").append(statDataSenderTransportType).append('\'');
sb.append(", tcpDataSenderCommandAcceptEnable=").append(tcpDataSenderCommandAcceptEnable);
sb.append(", tcpDataSenderCommandActiveThreadEnable=").append(tcpDataSenderCommandActiveThreadEnable);
sb.append(", tcpDataSenderCommandActiveThreadCountEnable=").append(tcpDataSenderCommandActiveThreadCountEnable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public interface ProfilerConfig {

String getStatDataSenderSocketType();

String getStatDataSenderTransportType();

int getSpanDataSenderWriteQueueSize();

int getSpanDataSenderSocketSendBufferSize();
Expand Down Expand Up @@ -74,6 +76,8 @@ public interface ProfilerConfig {

String getSpanDataSenderSocketType();

String getSpanDataSenderTransportType();

int getSpanDataSenderChunkSize();

int getStatDataSenderChunkSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.navercorp.pinpoint.profiler.sender.TcpDataSender;
import com.navercorp.pinpoint.profiler.sender.UdpDataSender;
import com.navercorp.pinpoint.rpc.client.DefaultPinpointClientFactory;
import com.navercorp.pinpoint.rpc.client.PinpointClient;
import com.navercorp.pinpoint.rpc.client.PinpointClientFactory;
import com.navercorp.pinpoint.thrift.dto.TResult;
import org.apache.thrift.TBase;
Expand All @@ -31,6 +30,7 @@
import org.slf4j.LoggerFactory;
import org.springframework.util.SocketUtils;

import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -64,7 +64,8 @@ public void receiverGroupTest1() throws Exception {
udpDataSender = new UdpDataSender("127.0.0.1", mockConfig.getUdpBindPort(), "test", 10, 1000, 1024 * 64 * 100);

pinpointClientFactory = createPinpointClientFactory();
tcpDataSender = new TcpDataSender("127.0.0.1", mockConfig.getTcpBindPort(), pinpointClientFactory);
InetSocketAddress address = new InetSocketAddress("127.0.0.1", mockConfig.getTcpBindPort());
tcpDataSender = new TcpDataSender(address, pinpointClientFactory);

udpDataSender.send(new TResult());

Expand Down Expand Up @@ -104,7 +105,8 @@ public void receiverGroupTest2() throws Exception {
Assert.assertFalse(sendLatch.await(1000, TimeUnit.MILLISECONDS));

pinpointClientFactory = createPinpointClientFactory();
tcpDataSender = new TcpDataSender("127.0.0.1", mockConfig.getTcpBindPort(), pinpointClientFactory);
InetSocketAddress address = new InetSocketAddress("127.0.0.1", mockConfig.getTcpBindPort());
tcpDataSender = new TcpDataSender(address, pinpointClientFactory);

Assert.assertTrue(tcpDataSender.isConnected());

Expand Down Expand Up @@ -141,7 +143,8 @@ public void receiverGroupTest3() throws Exception {
Assert.assertTrue(sendLatch.await(1000, TimeUnit.MILLISECONDS));

pinpointClientFactory = createPinpointClientFactory();
tcpDataSender = new TcpDataSender("127.0.0.1", mockConfig.getTcpBindPort(), pinpointClientFactory);
InetSocketAddress address = new InetSocketAddress("127.0.0.1", mockConfig.getTcpBindPort());
tcpDataSender = new TcpDataSender(address, pinpointClientFactory);

Assert.assertFalse(tcpDataSender.isConnected());
} finally {
Expand Down Expand Up @@ -178,16 +181,6 @@ private void closeDataSender(DataSender dataSender) {
}
}

private void closeClient(PinpointClient client) {
try {
if (client != null) {
client.close();
}
} catch (Exception e) {
// ignore
}
}

private void closeClientFactory(PinpointClientFactory factory) {
try {
if (factory != null) {
Expand Down
8 changes: 8 additions & 0 deletions plugins/google-httpclient/src/test/resources/pinpoint.config
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,19 @@ profiler.spandatasender.write.queue.size=5120
#profiler.spandatasender.socket.sendbuffersize=1048576
#profiler.spandatasender.socket.timeout=3000
profiler.spandatasender.chunk.size=16384
# Should keep in mind
# 1. Loadbancing : TCP transport load balancing is per connection.(UDP transport loadbalancing is per packet)
# 2. In unexpected situations, UDP has its own protection feature (like packet loss etc.), but tcp does not have such a feature. (We will add protection later)
profiler.spandatasender.transport.type=UDP

profiler.statdatasender.write.queue.size=5120
#profiler.statdatasender.socket.sendbuffersize=1048576
#profiler.statdatasender.socket.timeout=3000
profiler.statdatasender.chunk.size=16384
# Should keep in mind
# 1. Loadbancing : TCP transport load balancing is per connection.(UDP transport loadbalancing is per packet)
# 2. In unexpected situations, UDP has its own protection feature (like packet loss etc.), but tcp does not have such a feature. (We will add protection later)
profiler.statdatasender.transport.type=UDP

profiler.agentInfo.send.retry.interval=300000

Expand Down
8 changes: 8 additions & 0 deletions plugins/httpclient3/src/test/resources/pinpoint.config
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,19 @@ profiler.spandatasender.write.queue.size=5120
#profiler.spandatasender.socket.sendbuffersize=1048576
#profiler.spandatasender.socket.timeout=3000
profiler.spandatasender.chunk.size=16384
# Should keep in mind
# 1. Loadbancing : TCP transport load balancing is per connection.(UDP transport loadbalancing is per packet)
# 2. In unexpected situations, UDP has its own protection feature (like packet loss etc.), but tcp does not have such a feature. (We will add protection later)
profiler.spandatasender.transport.type=UDP

profiler.statdatasender.write.queue.size=5120
#profiler.statdatasender.socket.sendbuffersize=1048576
#profiler.statdatasender.socket.timeout=3000
profiler.statdatasender.chunk.size=16384
# Should keep in mind
# 1. Loadbancing : TCP transport load balancing is per connection.(UDP transport loadbalancing is per packet)
# 2. In unexpected situations, UDP has its own protection feature (like packet loss etc.), but tcp does not have such a feature. (We will add protection later)
profiler.statdatasender.transport.type=UDP

profiler.agentInfo.send.retry.interval=300000

Expand Down
8 changes: 8 additions & 0 deletions plugins/httpclient4/src/test/resources/pinpoint.config
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,19 @@ profiler.spandatasender.write.queue.size=5120
#profiler.spandatasender.socket.sendbuffersize=1048576
#profiler.spandatasender.socket.timeout=3000
profiler.spandatasender.chunk.size=16384
# Should keep in mind
# 1. Loadbancing : TCP transport load balancing is per connection.(UDP transport loadbalancing is per packet)
# 2. In unexpected situations, UDP has its own protection feature (like packet loss etc.), but tcp does not have such a feature. (We will add protection later)
profiler.spandatasender.transport.type=UDP

profiler.statdatasender.write.queue.size=5120
#profiler.statdatasender.socket.sendbuffersize=1048576
#profiler.statdatasender.socket.timeout=3000
profiler.statdatasender.chunk.size=16384
# Should keep in mind
# 1. Loadbancing : TCP transport load balancing is per connection.(UDP transport loadbalancing is per packet)
# 2. In unexpected situations, UDP has its own protection feature (like packet loss etc.), but tcp does not have such a feature. (We will add protection later)
profiler.statdatasender.transport.type=UDP

profiler.agentInfo.send.retry.interval=300000

Expand Down
Loading

0 comments on commit fc19f45

Please sign in to comment.