Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 75 additions & 44 deletions newrelic-agent/src/main/java/com/newrelic/agent/RPMService.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;

/**
Expand Down Expand Up @@ -95,6 +96,7 @@ public class RPMService extends AbstractService implements IRPMService, Environm
private long connectionTimestamp = 0;
private final AtomicInteger last503Error = new AtomicInteger(0);
private final AtomicInteger retryCount = new AtomicInteger(0);
private final ReentrantLock reentrantLock = new ReentrantLock();

private String rpmLink;
private long lastReportTime;
Expand Down Expand Up @@ -248,47 +250,54 @@ private Map<String, Object> getSettings(boolean sendEnvironmentInfo) {
* Notify RPM that this agent has launched, and obtain the agent run id
*/
@Override
public synchronized void launch() throws Exception {
if (isConnected()) {
return;
}

Map<String, Object> data = doConnect();
Agent.LOG.log(Level.FINER, "Connection response : {0}", data);
List<String> requiredParams = new ArrayList<>(Arrays.asList(COLLECT_ERRORS_KEY, COLLECT_TRACES_KEY, DATA_REPORT_PERIOD_KEY));
if (!data.keySet().containsAll(requiredParams)) {
requiredParams.removeAll(data.keySet());
throw new UnexpectedException(MessageFormat.format("Missing the following connection parameters: {0}", requiredParams));
}
Agent.LOG.log(Level.INFO, "Agent {0} connected to {1}", toString(), getHostString());
public void launch() throws Exception {
reentrantLock.lock();

try {
logCollectorMessages(data);
} catch (Exception ex) {
Agent.LOG.log(Level.FINEST, ex, "Error processing collector connect messages");
}
if (isConnected()) {
return;
}

AgentConfig config = null;
if (connectionConfigListener != null) {
// Merge server-side data with local config before notifying connection listeners
config = connectionConfigListener.connected(this, data);
}
Map<String, Object> data = doConnect();
Agent.LOG.log(Level.FINER, "Connection response : {0}", data);
List<String> requiredParams = new ArrayList<>(Arrays.asList(COLLECT_ERRORS_KEY, COLLECT_TRACES_KEY, DATA_REPORT_PERIOD_KEY));
if (!data.keySet().containsAll(requiredParams)) {
requiredParams.removeAll(data.keySet());
throw new UnexpectedException(MessageFormat.format("Missing the following connection parameters: {0}", requiredParams));
}
Agent.LOG.log(Level.INFO, "Agent {0} connected to {1}", toString(), getHostString());

try {
logCollectorMessages(data);
} catch (Exception ex) {
Agent.LOG.log(Level.FINEST, ex, "Error processing collector connect messages");
}

connectionTimestamp = System.nanoTime();
connected = true;
hasEverConnected = true;
entityGuid = data.get("entity_guid") != null ? data.get("entity_guid").toString() : "";
AgentConfig config = null;
if (connectionConfigListener != null) {
// Merge server-side data with local config before notifying connection listeners
config = connectionConfigListener.connected(this, data);
}

if (connectionListener != null) {
config = config != null ? config : ServiceFactory.getConfigService().getDefaultAgentConfig();
connectionListener.connected(this, config);
}
connectionTimestamp = System.nanoTime();
connected = true;
hasEverConnected = true;
entityGuid = data.get("entity_guid") != null ? data.get("entity_guid").toString() : "";

String agentRunToken = (String) data.get(ConnectionResponse.AGENT_RUN_ID_KEY);
Map<String, String> requestMetadata = (Map<String, String>) data.get(ConnectionResponse.REQUEST_HEADERS);
for (AgentConnectionEstablishedListener listener : agentConnectionEstablishedListeners) {
listener.onEstablished(appName, agentRunToken, requestMetadata);
if (connectionListener != null) {
config = config != null ? config : ServiceFactory.getConfigService().getDefaultAgentConfig();
connectionListener.connected(this, config);
}

String agentRunToken = (String) data.get(ConnectionResponse.AGENT_RUN_ID_KEY);
Map<String, String> requestMetadata = (Map<String, String>) data.get(ConnectionResponse.REQUEST_HEADERS);
for (AgentConnectionEstablishedListener listener : agentConnectionEstablishedListeners) {
listener.onEstablished(appName, agentRunToken, requestMetadata);
}
} finally {
reentrantLock.unlock();
}

}

private Map<String, Object> doConnect() throws Exception {
Expand Down Expand Up @@ -379,15 +388,21 @@ private void disconnect() {
}

@Override
public synchronized void reconnect() {
Agent.LOG.log(Level.INFO, "{0} is reconnecting", getApplicationName());
public void reconnect() {
reentrantLock.lock();
try {
shutdown();
} catch (Exception e) {
// ignore
Agent.LOG.log(Level.INFO, "{0} is reconnecting", getApplicationName());
try {
shutdown();
} catch (Exception e) {
// ignore
} finally {
reconnectAsync();
}
} finally {
reconnectAsync();
reentrantLock.unlock();
}

}

@Override
Expand Down Expand Up @@ -725,8 +740,16 @@ public boolean isMainApp() {
/**
* notify RPM that the agent is shutting down
*/
public synchronized void shutdown() throws Exception {
disconnect();
public void shutdown() throws Exception {
if (reentrantLock.tryLock(5, TimeUnit.SECONDS)) {
try {
disconnect();
} finally {
reentrantLock.unlock();
}
} else {
Agent.LOG.log(Level.WARNING, "Unable to acquire lock for shutdown within timeout - shutdown may already be in progress");
}
}

@Override
Expand All @@ -740,25 +763,33 @@ public void harvestNow() {
//
// Note: even if we are not connected, we don't need to initiate a connection attempt - although there are
// several cases, bottom line is that the Agent should already be attempting to connect.
//
// Update: Replaced the synchronized block with a reentrant lock with timeout. Same thing in the
// shutdown() method, so threads can now fail gracefully rather than deadlocking indefinitely.

final int MAX_WAIT_SECONDS = 10;
final long end = System.currentTimeMillis() + MAX_WAIT_SECONDS * 1000L;
boolean done = false;
Throwable trouble = null;

while (!done && System.currentTimeMillis() < end) {
boolean lockAcquired = false;
try {
synchronized (this) {
if (reentrantLock.tryLock(200, TimeUnit.MILLISECONDS)) {
lockAcquired = true;
if (isConnected()) {
ServiceFactory.getHarvestService().harvestNow();
done = true;
}
}
Thread.sleep(200);
} catch (InterruptedException iex) {
// sleep returned early - ignore it - the process is ending anyway
} catch (Exception ex) {
trouble = ex;
} finally {
if (lockAcquired) {
reentrantLock.unlock();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@
import java.text.MessageFormat;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;

public class CoreServiceImpl extends AbstractService implements CoreService, HealthDataProducer {
private volatile boolean enabled = true;
private final Instrumentation instrumentation;
private volatile InstrumentationProxy instrumentationProxy;
private final List<HealthDataChangeListener> healthDataChangeListeners = new CopyOnWriteArrayList<>();

private final AtomicBoolean shutdownInProgress = new AtomicBoolean(false);

public CoreServiceImpl(Instrumentation instrumentation) {
super(CoreService.class.getName());
Expand Down Expand Up @@ -125,7 +126,7 @@ private void jvmShutdown(long startTime) {
}

if (config.isSendDataOnExit() && ((System.currentTimeMillis() - startTime) >= config.getSendDataOnExitThresholdInMillis())) {
// Grab all RPMService instances (may be multiple with auto_app_naming enabled) and harvest them
// Grab all RPMService instances (maybe multiple with auto_app_naming enabled) and harvest them
List<IRPMService> rpmServices = ServiceFactory.getRPMServiceManager().getRPMServices();
for (IRPMService rpmService : rpmServices) {
rpmService.harvestNow();
Expand All @@ -137,12 +138,15 @@ private void jvmShutdown(long startTime) {
getLogger().fine("Agent JVM shutdown hook: done.");
}

private synchronized void shutdown() {
try {
ServiceFactory.getServiceManager().stop();
getLogger().info("New Relic Agent has shutdown");
} catch (Throwable t) {
Agent.LOG.log(Level.SEVERE, t, "Error shutting down New Relic Agent");
private void shutdown() {
// Prevent multiple shutdown attempts occurring at the same time
if (shutdownInProgress.compareAndSet(false, true)) {
try {
ServiceFactory.getServiceManager().stop();
getLogger().info("New Relic Agent has shutdown");
} catch (Throwable t) {
Agent.LOG.log(Level.SEVERE, t, "Error shutting down New Relic Agent");
}
}
}

Expand Down
148 changes: 148 additions & 0 deletions newrelic-agent/src/test/java/com/newrelic/agent/RPMServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1225,6 +1225,154 @@ public void forceRestartExceptionWithPut() throws Exception {
doForceRestartException();
}

@Test(timeout = 15000)
public void concurrentHarvestAndShutdown_willNotDeadlock() throws Exception {
final RPMService rmpService = createAndLaunchRPMService();

final AtomicReference<Throwable> harvestError = new AtomicReference<>();
final AtomicReference<Throwable> shutdownError = new AtomicReference<>();
final CountDownLatch startLatch = new CountDownLatch(1);
final CountDownLatch doneLatch = new CountDownLatch(2);

// This thread calls harvestNow()
Thread harvestThread = new Thread(() -> {
try {
startLatch.await();
rmpService.harvestNow();
} catch (Throwable t) {
harvestError.set(t);
} finally {
doneLatch.countDown();
}
}, "harvest");

// This one calls shutdown()
Thread shutdownThread = new Thread(() -> {
try {
startLatch.await();
rmpService.shutdown();
} catch (Throwable t) {
shutdownError.set(t);
} finally {
doneLatch.countDown();
}
}, "shutdown");

// Fire off both threads and start them both simultaneously by triggering the startLatch instance
harvestThread.start();
shutdownThread.start();
startLatch.countDown();

// Wait for both to complete (or timeout after 15 seconds)
boolean completed = doneLatch.await(15, TimeUnit.SECONDS);

assertTrue("harvest() and shutdown() should both complete without timeout (no deadlock)", completed);

// Verify no unexpected exceptions (IllegalMonitorStateException should not occur)
if (harvestError.get() != null && !(harvestError.get() instanceof InterruptedException)) {
assertFalse("harvest() should not throw IllegalMonitorStateException",
harvestError.get() instanceof IllegalMonitorStateException);
}
if (shutdownError.get() != null) {
assertFalse("shutdown() should not throw IllegalMonitorStateException",
shutdownError.get() instanceof IllegalMonitorStateException);
}
}

@Test(timeout = 10000)
public void multipleConcurrentShutdownCalls_doNotDeadlock() throws Exception {
final RPMService rpmService = createAndLaunchRPMService();

final CountDownLatch startLatch = new CountDownLatch(1);
final CountDownLatch doneLatch = new CountDownLatch(3);
final AtomicInteger successCount = new AtomicInteger(0);

// 3 threads all try to shutdown() simultaneously
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
startLatch.await();
rpmService.shutdown();
successCount.incrementAndGet();
} catch (Exception e) {
// Expected that some may fail, but should not deadlock
} finally {
doneLatch.countDown();
}
}, "shutdown-" + i).start();
}

startLatch.countDown();
boolean completed = doneLatch.await(10, TimeUnit.SECONDS);

assertTrue("All shutdown() calls should complete without deadlock", completed);
assertTrue("At least one shutdown should succeed", successCount.get() >= 1);
}

@Test(timeout = 15000)
public void reconnect_duringHarvest_shouldNotDeadlock() throws Exception {
final RPMService rpmService = createAndLaunchRPMService();

final CountDownLatch startLatch = new CountDownLatch(1);
final CountDownLatch doneLatch = new CountDownLatch(2);
final AtomicReference<Throwable> error = new AtomicReference<>();

// Simulate harvest cycle
Thread harvestThread = new Thread(() -> {
try {
startLatch.await();
for (int i = 0; i < 5 && !Thread.interrupted(); i++) {
Thread.sleep(100);
}
rpmService.harvestNow();
} catch (Throwable t) {
if (!(t instanceof InterruptedException)) {
error.set(t);
}
} finally {
doneLatch.countDown();
}
}, "harvest");

// Reconnect call
Thread reconnectThread = new Thread(() -> {
try {
startLatch.await();
Thread.sleep(50); // Let harvest start first
rpmService.reconnect();
} catch (Throwable t) {
error.set(t);
} finally {
doneLatch.countDown();
}
}, "reconnect");

harvestThread.start();
reconnectThread.start();

startLatch.countDown();

boolean completed = doneLatch.await(15, TimeUnit.SECONDS);
assertTrue("Operations should complete without deadlock", completed);

if (error.get() != null) {
assertFalse("Should not throw IllegalMonitorStateException",
error.get() instanceof IllegalMonitorStateException);
}

rpmService.shutdown();
}

private RPMService createAndLaunchRPMService() throws Exception {
Map<String, Object> config = createStagingMap(true, false);
createServiceManager(config);

List<String> appNames = singletonList("MyApplication");
RPMService rpmService = new RPMService(appNames, null, null, Collections.emptyList());
rpmService.launch();
return rpmService;
}

private void doForceRestartException() throws Exception {
MockDataSenderFactory dataSenderFactory = new MockDataSenderFactory();
DataSenderFactory.setDataSenderFactory(dataSenderFactory);
Expand Down
Loading