Skip to content

Commit 00163e5

Browse files
committed
FEAT: Add Operation throughput and latency metrics by mbean.
1 parent 132254c commit 00163e5

File tree

9 files changed

+345
-0
lines changed

9 files changed

+345
-0
lines changed

src/main/java/net/spy/memcached/MemcachedConnection.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050
import net.spy.memcached.compat.SpyObject;
5151
import net.spy.memcached.compat.log.LoggerFactory;
5252
import net.spy.memcached.internal.ReconnDelay;
53+
import net.spy.memcached.metrics.OpLatencyMonitor;
54+
import net.spy.memcached.metrics.OpThroughputMonitor;
5355
import net.spy.memcached.ops.KeyedOperation;
5456
import net.spy.memcached.ops.MultiOperationCallback;
5557
import net.spy.memcached.ops.Operation;
@@ -990,6 +992,7 @@ private void handleReads(MemcachedNode qa)
990992
throw new IllegalStateException("No read operation.");
991993
}
992994
currentOp.readFromBuffer(rbuf);
995+
OpLatencyMonitor.getInstance().recordLatency(currentOp.getStartTime());
993996
if (currentOp.getState() == OperationState.COMPLETE) {
994997
getLogger().debug("Completed read op: %s and giving the next %d bytes",
995998
currentOp, rbuf.remaining());
@@ -1519,6 +1522,7 @@ public String toString() {
15191522
* @param op
15201523
*/
15211524
public static void opTimedOut(Operation op) {
1525+
OpThroughputMonitor.getInstance().addTimeOutedOpCount(1);
15221526
MemcachedConnection.setTimeout(op, true);
15231527
}
15241528

@@ -1528,6 +1532,7 @@ public static void opTimedOut(Operation op) {
15281532
* @param ops
15291533
*/
15301534
public static void opsTimedOut(Collection<Operation> ops) {
1535+
OpThroughputMonitor.getInstance().addTimeOutedOpCount(ops.size());
15311536
Collection<String> timedOutNodes = new HashSet<>();
15321537
for (Operation op : ops) {
15331538
try {
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package net.spy.memcached.metrics;
2+
3+
class LatencyMetricsSnapShot {
4+
private static final LatencyMetricsSnapShot EMPTY = new LatencyMetricsSnapShot(0, 0, 0, 0, 0, 0);
5+
6+
private final long avgLatency;
7+
private final long minLatency;
8+
private final long maxLatency;
9+
private final long p25Latency;
10+
private final long p50Latency;
11+
private final long p75Latency;
12+
private final long timestamp; // 캐시 생성 시간
13+
14+
LatencyMetricsSnapShot(long avg, long min, long max, long p25, long p50, long p75) {
15+
this.avgLatency = avg;
16+
this.minLatency = min;
17+
this.maxLatency = max;
18+
this.p25Latency = p25;
19+
this.p50Latency = p50;
20+
this.p75Latency = p75;
21+
this.timestamp = System.currentTimeMillis();
22+
}
23+
24+
public static LatencyMetricsSnapShot empty() {
25+
return EMPTY;
26+
}
27+
28+
public long getAvgLatency() {
29+
return avgLatency;
30+
}
31+
32+
public long getMinLatency() {
33+
return minLatency;
34+
}
35+
36+
public long getMaxLatency() {
37+
return maxLatency;
38+
}
39+
40+
public long getP25Latency() {
41+
return p25Latency;
42+
}
43+
44+
public long getP50Latency() {
45+
return p50Latency;
46+
}
47+
48+
public long getP75Latency() {
49+
return p75Latency;
50+
}
51+
52+
public long getTimestamp() {
53+
return timestamp;
54+
}
55+
}
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
package net.spy.memcached.metrics;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
import java.util.concurrent.TimeUnit;
6+
import java.util.concurrent.atomic.AtomicInteger;
7+
import java.util.concurrent.atomic.AtomicReference;
8+
import java.util.concurrent.atomic.AtomicReferenceArray;
9+
10+
import net.spy.memcached.ArcusMBeanServer;
11+
12+
public final class OpLatencyMonitor implements OpLatencyMonitorMBean {
13+
14+
private static final OpLatencyMonitor INSTANCE = new OpLatencyMonitor();
15+
private static final long CACHE_DURATION = 2000; // 2초 캐시
16+
private static final int WINDOW_SIZE = 10_000;
17+
18+
private final AtomicReferenceArray<Long> latencies = new AtomicReferenceArray<>(WINDOW_SIZE);
19+
private final AtomicInteger currentIndex = new AtomicInteger(0);
20+
private final AtomicInteger count = new AtomicInteger(0);
21+
private final AtomicReference<LatencyMetricsSnapShot> cachedMetrics
22+
= new AtomicReference<>(LatencyMetricsSnapShot.empty());
23+
private final boolean enabled;
24+
25+
private OpLatencyMonitor() {
26+
if (System.getProperty("arcus.mbean", "false").toLowerCase().equals("false")) {
27+
enabled = false;
28+
return;
29+
}
30+
enabled = true;
31+
for (int i = 0; i < WINDOW_SIZE; i++) {
32+
latencies.set(i, 0L);
33+
}
34+
35+
try {
36+
ArcusMBeanServer mbs = ArcusMBeanServer.getInstance();
37+
mbs.registMBean(this, this.getClass().getPackage().getName()
38+
+ ":type=" + this.getClass().getSimpleName());
39+
} catch (Exception e) {
40+
throw new RuntimeException("Failed to register MBean", e);
41+
}
42+
}
43+
44+
public static OpLatencyMonitor getInstance() {
45+
return INSTANCE;
46+
}
47+
48+
public void recordLatency(long startNanos) {
49+
if (!enabled) {
50+
return;
51+
}
52+
long latencyMicros = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startNanos);
53+
int index = currentIndex.getAndUpdate(i -> (i + 1) % WINDOW_SIZE);
54+
latencies.lazySet(index, latencyMicros);
55+
56+
if (count.get() < WINDOW_SIZE) {
57+
count.incrementAndGet();
58+
}
59+
}
60+
61+
// 모든 메트릭을 한 번에 계산하고 캐시하는 메서드
62+
private LatencyMetricsSnapShot computeMetrics() {
63+
int currentCount = count.get();
64+
if (currentCount == 0) {
65+
return LatencyMetricsSnapShot.empty();
66+
}
67+
68+
// 현재 데이터를 배열로 복사
69+
List<Long> sortedLatencies = new ArrayList<>(currentCount);
70+
int startIndex = currentIndex.get();
71+
72+
for (int i = 0; i < currentCount; i++) {
73+
int idx = (startIndex - i + WINDOW_SIZE) % WINDOW_SIZE;
74+
long value = latencies.get(idx);
75+
if (value > 0) {
76+
sortedLatencies.add(value);
77+
}
78+
}
79+
80+
if (sortedLatencies.isEmpty()) {
81+
return LatencyMetricsSnapShot.empty();
82+
}
83+
84+
sortedLatencies.sort(Long::compareTo);
85+
86+
// 모든 메트릭을 한 번에 계산
87+
long avg = sortedLatencies.stream().mapToLong(Long::longValue).sum() / sortedLatencies.size();
88+
long min = sortedLatencies.get(0);
89+
long max = sortedLatencies.get(sortedLatencies.size() - 1);
90+
long p25 = sortedLatencies.get((int) Math.ceil((sortedLatencies.size() * 25.0) / 100.0) - 1);
91+
long p50 = sortedLatencies.get((int) Math.ceil((sortedLatencies.size() * 50.0) / 100.0) - 1);
92+
long p75 = sortedLatencies.get((int) Math.ceil((sortedLatencies.size() * 75.0) / 100.0) - 1);
93+
94+
return new LatencyMetricsSnapShot(avg, min, max, p25, p50, p75);
95+
}
96+
97+
// 캐시된 메트릭을 가져오거나 필요시 새로 계산
98+
private LatencyMetricsSnapShot getMetricsSnapshot() {
99+
LatencyMetricsSnapShot current = cachedMetrics.get();
100+
long now = System.currentTimeMillis();
101+
102+
// 캐시가 유효한지 확인
103+
if (now - current.getTimestamp() < CACHE_DURATION) {
104+
return current;
105+
}
106+
107+
// 새로운 메트릭 계산 및 캐시 업데이트
108+
LatencyMetricsSnapShot newMetrics = computeMetrics();
109+
cachedMetrics.set(newMetrics);
110+
return newMetrics;
111+
}
112+
113+
@Override
114+
public long getAverageLatencyMicros() {
115+
return getMetricsSnapshot().getAvgLatency();
116+
}
117+
118+
@Override
119+
public long getMinLatencyMicros() {
120+
return getMetricsSnapshot().getMinLatency();
121+
}
122+
123+
@Override
124+
public long getMaxLatencyMicros() {
125+
return getMetricsSnapshot().getMaxLatency();
126+
}
127+
128+
@Override
129+
public long get25thPercentileLatencyMicros() {
130+
return getMetricsSnapshot().getP25Latency();
131+
}
132+
133+
@Override
134+
public long get50thPercentileLatencyMicros() {
135+
return getMetricsSnapshot().getP50Latency();
136+
}
137+
138+
@Override
139+
public long get75thPercentileLatencyMicros() {
140+
return getMetricsSnapshot().getP75Latency();
141+
}
142+
143+
@Override
144+
public void resetStatistics() {
145+
count.set(0);
146+
currentIndex.set(0);
147+
cachedMetrics.set(LatencyMetricsSnapShot.empty());
148+
}
149+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package net.spy.memcached.metrics;
2+
3+
public interface OpLatencyMonitorMBean {
4+
long getAverageLatencyMicros();
5+
6+
long getMaxLatencyMicros();
7+
8+
long getMinLatencyMicros();
9+
10+
long get25thPercentileLatencyMicros();
11+
12+
long get50thPercentileLatencyMicros();
13+
14+
long get75thPercentileLatencyMicros();
15+
16+
void resetStatistics();
17+
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package net.spy.memcached.metrics;
2+
3+
import java.util.concurrent.atomic.AtomicLong;
4+
import java.util.concurrent.atomic.LongAdder;
5+
6+
import net.spy.memcached.ArcusMBeanServer;
7+
8+
public final class OpThroughputMonitor implements OpThroughputMonitorMBean {
9+
private static final OpThroughputMonitor INSTANCE = new OpThroughputMonitor();
10+
11+
private final LongAdder completeOps = new LongAdder();
12+
private final LongAdder cancelOps = new LongAdder();
13+
private final LongAdder timeOutOps = new LongAdder();
14+
private final AtomicLong lastResetTime = new AtomicLong(System.currentTimeMillis());
15+
private final boolean enabled;
16+
17+
private OpThroughputMonitor() {
18+
if (System.getProperty("arcus.mbean", "false").toLowerCase().equals("false")) {
19+
enabled = false;
20+
return;
21+
}
22+
enabled = true;
23+
try {
24+
ArcusMBeanServer mbs = ArcusMBeanServer.getInstance();
25+
mbs.registMBean(this, this.getClass().getPackage().getName()
26+
+ ":type=" + this.getClass().getSimpleName());
27+
} catch (Exception e) {
28+
throw new RuntimeException("Failed to register Throughput MBean", e);
29+
}
30+
}
31+
32+
public static OpThroughputMonitor getInstance() {
33+
return INSTANCE;
34+
}
35+
36+
public void addCompletedOpCount() {
37+
if (!enabled) {
38+
return;
39+
}
40+
completeOps.increment();
41+
}
42+
43+
public void addCanceledOpCount() {
44+
if (!enabled) {
45+
return;
46+
}
47+
cancelOps.increment();
48+
}
49+
50+
public void addTimeOutedOpCount(int count) {
51+
if (!enabled) {
52+
return;
53+
}
54+
timeOutOps.add(count);
55+
}
56+
57+
@Override
58+
public long getCompletedOps() {
59+
return getThroughput(completeOps);
60+
}
61+
62+
@Override
63+
public long getCanceledOps() {
64+
return getThroughput(cancelOps);
65+
}
66+
67+
@Override
68+
public long getTimeoutOps() {
69+
return getThroughput(timeOutOps);
70+
}
71+
72+
@Override
73+
public void resetStatistics() {
74+
completeOps.reset();
75+
cancelOps.reset();
76+
timeOutOps.reset();
77+
lastResetTime.set(System.currentTimeMillis());
78+
}
79+
80+
private long getThroughput(LongAdder ops) {
81+
long currentTime = System.currentTimeMillis();
82+
long lastTime = lastResetTime.get();
83+
long countValue = ops.sum();
84+
85+
// 경과 시간 계산 (초 단위)
86+
long elapsedSeconds = (long) ((currentTime - lastTime) / 1000.0);
87+
88+
return countValue / elapsedSeconds;
89+
}
90+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package net.spy.memcached.metrics;
2+
3+
public interface OpThroughputMonitorMBean {
4+
long getCompletedOps();
5+
6+
long getCanceledOps();
7+
8+
long getTimeoutOps();
9+
10+
void resetStatistics();
11+
}

src/main/java/net/spy/memcached/ops/Operation.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,4 +134,8 @@ public interface Operation {
134134
/* ENABLE_MIGRATION end */
135135

136136
APIType getAPIType();
137+
138+
void setStartTime(long startTime);
139+
140+
long getStartTime();
137141
}

0 commit comments

Comments
 (0)