Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions evcache-client/test/com/netflix/evcache/test/EVCacheTestDI.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,17 @@
import com.netflix.evcache.EVCacheException;
import com.netflix.evcache.EVCacheGetOperationListener;
import com.netflix.evcache.EVCacheLatch;
import com.netflix.evcache.metrics.EVCacheMetricsFactory;
import com.netflix.evcache.operation.EVCacheOperationFuture;
import com.netflix.evcache.pool.EVCacheClient;
import com.netflix.evcache.pool.ServerGroup;
import com.netflix.evcache.test.transcoder.Movie;
import com.netflix.evcache.test.transcoder.MovieTranscoder;
import com.netflix.evcache.util.KeyHasher;
import com.netflix.spectator.api.Gauge;
import com.netflix.spectator.api.Id;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.patterns.PolledMeter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -76,6 +81,36 @@ public void testEVCache() {
assertNotNull(evCache);
}

@Test(dependsOnMethods = { "testGet" })
public void testLoopCpuWallTimeRatioMetricRegistered() throws Exception {
final Registry registry = EVCacheMetricsFactory.getInstance().getRegistry();
final Map<ServerGroup, List<EVCacheClient>> clientsByServerGroup = manager.getEVCacheClientPool(appName).getAllInstancesByServerGroup();
assertFalse(clientsByServerGroup.isEmpty(), "expected EVCache clients for " + appName);

PolledMeter.update(registry);
for (List<EVCacheClient> clients : clientsByServerGroup.values()) {
for (EVCacheClient client : clients) {
final Id id = EVCacheMetricsFactory.getInstance().getId(EVCacheMetricsFactory.INTERNAL_LOOP_CPU_WALL_TIME_RATIO, client.getTagList());
assertTrue(registry.state().containsKey(id), "expected loop cpuWallTimeRatio meter for client " + client);
}
}

boolean nonZero = false;
for (int attempt = 0; attempt < 10 && !nonZero; attempt++) {
get(0, evCache);
Thread.sleep(1_100);
PolledMeter.update(registry);
for (List<EVCacheClient> clients : clientsByServerGroup.values()) {
for (EVCacheClient client : clients) {
final Id id = EVCacheMetricsFactory.getInstance().getId(EVCacheMetricsFactory.INTERNAL_LOOP_CPU_WALL_TIME_RATIO, client.getTagList());
final Gauge gauge = registry.gauge(id);
nonZero |= gauge.value() > 0.0;
}
}
}
assertTrue(nonZero, "expected loop cpuWallTimeRatio meter to report a non-zero value");
}

@Test(dependsOnMethods = { "testEVCache" })
public void testKeySizeCheck() throws Exception {
final String key = "This is an invalid key";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ public String getStatusCode(StatusCode sc) {
public static final String INTERNAL_EXECUTOR = "internal.evc.client.executor";
public static final String INTERNAL_EXECUTOR_SCHEDULED = "internal.evc.client.scheduledExecutor";
public static final String INTERNAL_POOL_INIT_ERROR = "internal.evc.client.init.error";
public static final String INTERNAL_LOOP_CPU_WALL_TIME_RATIO = "internal.evc.client.loop.cpuWallTimeRatio";

public static final String INTERNAL_NUM_CHUNK_SIZE = "internal.evc.client.chunking.numOfChunks";
public static final String INTERNAL_CHUNK_DATA_SIZE = "internal.evc.client.chunking.dataSize";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
import com.netflix.evcache.util.KeyHasher.HashingAlgorithm;
import com.netflix.spectator.api.BasicTag;
import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Id;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Tag;
import com.netflix.spectator.api.patterns.PolledMeter;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.PrintWriter;
Expand Down Expand Up @@ -102,6 +105,7 @@
private final Property<Boolean> ignoreTouch;
private List<Tag> tags;
private final Map<String, Counter> counterMap = new ConcurrentHashMap<String, Counter>();
private final Id loopCpuWallTimeRatioId;
private final Property<String> hashingAlgo;
protected final Counter operationsCounter;
private final boolean isDuetClient;
Expand Down Expand Up @@ -133,6 +137,8 @@
tagList.add(new BasicTag(EVCacheMetricsFactory.STAT_NAME, EVCacheMetricsFactory.POOL_OPERATIONS));
operationsCounter = EVCacheMetricsFactory.getInstance().getCounter(EVCacheMetricsFactory.INTERNAL_STATS, tagList);

final Registry registry = EVCacheMetricsFactory.getInstance().getRegistry();

this.enableChunking = EVCacheConfig.getInstance().getPropertyRepository().get(this.serverGroup.getName()+ ".chunk.data", Boolean.class).orElseGet(appName + ".chunk.data").orElse(false);
this.chunkSize = EVCacheConfig.getInstance().getPropertyRepository().get(this.serverGroup.getName() + ".chunk.size", Integer.class).orElseGet(appName + ".chunk.size").orElse(1180);
this.writeBlock = EVCacheConfig.getInstance().getPropertyRepository().get(appName + "." + this.serverGroup.getName() + ".write.block.duration", Integer.class).orElseGet(appName + ".write.block.duration").orElse(25);
Expand All @@ -141,10 +147,14 @@
this.ignoreTouch = EVCacheConfig.getInstance().getPropertyRepository().get(appName + "." + this.serverGroup.getName() + ".ignore.touch", Boolean.class).orElseGet(appName + ".ignore.touch").orElse(false);

this.connectionFactory = pool.getEVCacheClientPoolManager().getConnectionFactoryProvider().getConnectionFactory(this);
loopCpuWallTimeRatioId = EVCacheMetricsFactory.getInstance().getId(EVCacheMetricsFactory.INTERNAL_LOOP_CPU_WALL_TIME_RATIO, this.tags);
this.connectionObserver = new EVCacheConnectionObserver(this);
this.ignoreInactiveNodes = EVCacheConfig.getInstance().getPropertyRepository().get(appName + ".ignore.inactive.nodes", Boolean.class).orElse(true);

this.evcacheMemcachedClient = new EVCacheMemcachedClient(connectionFactory, memcachedNodesInZone, readTimeout, this);
PolledMeter.using(registry)
.withId(loopCpuWallTimeRatioId)
.monitorValue(this.evcacheMemcachedClient.getLoopProbe(), EVCacheLoopProbe::sampleCpuWallTimeRatio);
this.evcacheMemcachedClient.addObserver(connectionObserver);

this.decodingTranscoder = new EVCacheSerializingTranscoder(Integer.MAX_VALUE);
Expand Down Expand Up @@ -1085,7 +1095,7 @@
if (!ensureWriteQueueSize(node, key, Call.SET)) {
if (log.isInfoEnabled()) log.info("Node : " + node + " is not active. Failing fast and dropping the write event.");
final ListenableFuture<Boolean, OperationCompletionListener> defaultFuture = (ListenableFuture<Boolean, OperationCompletionListener>) getDefaultFuture();
if (evcacheLatch != null && evcacheLatch instanceof EVCacheLatchImpl && !isInWriteOnly()) ((EVCacheLatchImpl) evcacheLatch).addFuture(defaultFuture);

Check failure on line 1098 in evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java

View workflow job for this annotation

GitHub Actions / CI with Java 8

unknown tag: Deprecated
return defaultFuture;
}

Expand Down Expand Up @@ -1342,6 +1352,11 @@
if(shutdown) return true;

shutdown = true;
try {
PolledMeter.remove(EVCacheMetricsFactory.getInstance().getRegistry(), loopCpuWallTimeRatioId);
} catch(Throwable t) {
log.warn("Exception while removing loop cpuWallTimeRatio meter", t);
}
try {
evcacheMemcachedClient.shutdown(timeout, unit);
} catch(Throwable t) {
Expand Down Expand Up @@ -1769,7 +1784,7 @@
private boolean isDataAvailableForRead(BufferedInputStream bufferedReader, long timeout, TimeUnit unit, Socket socket) throws IOException {
long expiry = System.currentTimeMillis() + unit.toMillis(timeout);
int tryCount = 0;
while(expiry > System.currentTimeMillis()) {

Check failure on line 1787 in evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java

View workflow job for this annotation

GitHub Actions / CI with Java 8

element not closed: B

Check failure on line 1787 in evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java

View workflow job for this annotation

GitHub Actions / CI with Java 8

element not closed: b
if(log.isDebugEnabled()) log.debug("For Socket " + socket + " number of bytes available = " + bufferedReader.available() + " and try number is " + tryCount);
if(bufferedReader.available() > 0) {
return true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package com.netflix.evcache.pool;

import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Publishes the loop thread's CPU-time to wall-time ratio from the loop thread itself.
*
* <p>The loop thread periodically publishes an immutable {@code long[]} snapshot
* containing {@code {threadCpuNs, wallNs}}. Spectator's polling thread reads the
* latest snapshot and computes the delta ratio without performing cross-thread
* ThreadMXBean lookups.</p>
*
* <p>The reported value is {@code dCpu/dWall} in [0, 1.05]: the fraction of wall
* time the loop thread was on a CPU. Time parked in {@code selector.select()}
* is correctly excluded, but time the thread was runnable-but-descheduled
* (CPU contention) is also excluded, so this metric is a lower bound on true
* loop demand under CPU pressure.</p>
*/
public final class EVCacheLoopProbe {
private static final Logger log = LoggerFactory.getLogger(EVCacheLoopProbe.class);
private static final ThreadMXBean THREAD_MX_BEAN = ManagementFactory.getThreadMXBean();
private static final long PUBLISH_INTERVAL_NS = TimeUnit.MILLISECONDS.toNanos(1_000);
private static final int CPU_UTILIZATION_WARNING_THRESHOLD = 3;

private final AtomicReference<long[]> snapshot = new AtomicReference<long[]>(new long[] { 0L, 0L });
private final boolean cpuTimeAvailable;

// Loop-thread-private throttle state.
private long nextPublishNs;
private boolean tickFailureLogged;
private boolean negativeCpuTimeLogged;

// PolledMeter-reader-private state.
private long prevCpuNs;
private long prevWallNs;
private int aboveOneSamples;
private boolean aboveOneLogged;

public EVCacheLoopProbe() {
this.cpuTimeAvailable = isCurrentThreadCpuTimeAvailable();
if (!cpuTimeAvailable) {
log.warn("Thread CPU time is not available; EVCache loop cpuWallTimeRatio will report NaN");
}
}

/**
* Publish the current thread's CPU time and wall time at most every second.
*
* <p>This method is intentionally no-throw: it is called from the EVCache IO
* loop in a finally block and must never terminate the loop.</p>
*/
public void tick() {
try {
tickInternal();
} catch (Throwable t) {
if (!tickFailureLogged) {
tickFailureLogged = true;
try {
log.warn("EVCache loop cpuWallTimeRatio probe failed; suppressing future probe errors", t);
} catch (Throwable ignored) {
// Keep the event loop alive even if logging fails.
}
}
}
}

private void tickInternal() {
if (!cpuTimeAvailable) return;

final long now = System.nanoTime();
if (nextPublishNs != 0L && now - nextPublishNs < 0L) return;
nextPublishNs = now + PUBLISH_INTERVAL_NS;

final long cpuNs = THREAD_MX_BEAN.getCurrentThreadCpuTime();
if (cpuNs < 0L) {
if (!negativeCpuTimeLogged) {
negativeCpuTimeLogged = true;
log.warn("Thread CPU time returned a negative value; skipping EVCache loop cpuWallTimeRatio publish");
}
return;
}

snapshot.lazySet(new long[] { cpuNs, now });
}

/**
* Return loop-thread cpu-time / wall-time ratio over the interval since the previous poll.
*/
public double sampleCpuWallTimeRatio() {
if (!cpuTimeAvailable) return Double.NaN;

final long[] s = snapshot.get();
final long cpuNs = s[0];
final long wallNs = s[1];
if (prevWallNs == 0L) {
prevCpuNs = cpuNs;
prevWallNs = wallNs;
return Double.NaN;
}

final long dWall = wallNs - prevWallNs;
if (dWall <= 0L) return 0.0;

final long dCpu = cpuNs - prevCpuNs;
prevCpuNs = cpuNs;
prevWallNs = wallNs;

double ratio = (double) dCpu / (double) dWall;
if (ratio < 0.0) return 0.0;

if (ratio > 1.0) {
aboveOneSamples++;
if (aboveOneSamples >= CPU_UTILIZATION_WARNING_THRESHOLD && !aboveOneLogged) {
aboveOneLogged = true;
log.warn("EVCache loop cpuWallTimeRatio exceeded 1.0 for {} consecutive samples; latest value={}",
CPU_UTILIZATION_WARNING_THRESHOLD, ratio);
}
} else {
aboveOneSamples = 0;
}

return Math.min(ratio, 1.05);
}

private static boolean isCurrentThreadCpuTimeAvailable() {
try {
return THREAD_MX_BEAN.isThreadCpuTimeSupported() && THREAD_MX_BEAN.isThreadCpuTimeEnabled();
} catch (Throwable t) {
log.warn("Unable to determine ThreadMXBean CPU-time capability", t);
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;

import com.netflix.evcache.pool.EVCacheLoopProbe;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import net.spy.memcached.ops.Operation;

public class EVCacheConnection extends MemcachedConnection {
private static final Logger log = LoggerFactory.getLogger(EVCacheConnection.class);
private EVCacheLoopProbe probe;
private final net.spy.memcached.compat.log.Logger spyLogger;

public EVCacheConnection(String name, int bufSize, ConnectionFactory f,
Expand All @@ -28,6 +31,21 @@ public EVCacheConnection(String name, int bufSize, ConnectionFactory f,
spyLogger = super.getLogger();
}

@Override
public synchronized void start() {
// MemcachedConnection starts the thread from its constructor. Initialize
// the probe before super.start() so Thread.start() safely publishes it
// to run() without requiring a volatile read on the event-loop path.
if (probe == null) {
probe = new EVCacheLoopProbe();
}
super.start();
}

public EVCacheLoopProbe getProbe() {
return probe;
}

@Override
public void shutdown() throws IOException {
try {
Expand Down Expand Up @@ -63,6 +81,8 @@ public void run() {
} catch (Throwable e) {
log.error("SEVERE EVCACHE ISSUE.", e);// This ensures the thread
// doesn't die
} finally {
probe.tick();
}
}
if (log.isDebugEnabled()) log.debug(toString() + " : Shutdown");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.netflix.evcache.operation.EVCacheOperationFuture;
import com.netflix.evcache.pool.EVCacheClient;
import com.netflix.evcache.pool.EVCacheClientUtil;
import com.netflix.evcache.pool.EVCacheLoopProbe;
import com.netflix.evcache.pool.EVCacheValue;
import com.netflix.evcache.util.EVCacheConfig;
import com.netflix.spectator.api.BasicTag;
Expand Down Expand Up @@ -123,6 +124,10 @@ public NodeLocator getNodeLocator() {
return this.mconn.getLocator();
}

public EVCacheLoopProbe getLoopProbe() {
return ((EVCacheConnection) this.mconn).getProbe();
}

public MemcachedNode getEVCacheNode(String key) {
return this.mconn.getLocator().getPrimary(key);
}
Expand Down
Loading
Loading