Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions evcache-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ dependencies {
api group:"joda-time", name:"joda-time", version:"latest.release"
api group:"javax.annotation", name:"javax.annotation-api", version:"latest.release"
api group:"com.github.fzakaria", name:"ascii85", version:"latest.release"
api group:"com.github.luben", name:"zstd-jni", version:"latest.release"
Comment thread
janewang1680 marked this conversation as resolved.
Outdated

testImplementation group:"org.testng", name:"testng", version:"7.5"
testImplementation group:"com.beust", name:"jcommander", version:"1.72"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public class EVCacheImpl implements EVCache, EVCacheImplMBean {
this.maxHashLength = propertyRepository.get(appName + ".max.hash.length", Integer.class).orElse(-1);
this.encoderBase = propertyRepository.get(appName + ".hash.encoder", String.class).orElse("base64");
this.autoHashKeys = propertyRepository.get(_appName + ".auto.hash.keys", Boolean.class).orElseGet("evcache.auto.hash.keys").orElse(false);
this.evcacheValueTranscoder = new EVCacheTranscoder();
this.evcacheValueTranscoder = new EVCacheTranscoder(_appName, propertyRepository);
evcacheValueTranscoder.setCompressionThreshold(Integer.MAX_VALUE);

// default max key length is 200, instead of using what is defined in MemcachedClientIF.MAX_KEY_LENGTH (250). This is to accommodate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,36 @@

package com.netflix.evcache;

import com.github.luben.zstd.Zstd;
import com.github.luben.zstd.ZstdInputStream;
import com.netflix.archaius.api.Property;
import com.netflix.evcache.metrics.EVCacheMetricsFactory;
import com.netflix.evcache.pool.ServerGroup;
import com.netflix.spectator.api.BasicTag;
import com.netflix.spectator.api.Tag;
import com.netflix.spectator.api.Timer;
import net.spy.memcached.CachedData;
import net.spy.memcached.transcoders.BaseSerializingTranscoder;
import net.spy.memcached.transcoders.Transcoder;
import net.spy.memcached.transcoders.TranscoderUtils;
import net.spy.memcached.util.StringUtils;

import java.time.Duration;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Transcoder that serializes and compresses objects.
*/
public class EVCacheSerializingTranscoder extends BaseSerializingTranscoder implements
Transcoder<Object> {
Logger logger = LoggerFactory.getLogger(EVCacheSerializingTranscoder.class);
Comment thread
janewang1680 marked this conversation as resolved.
Outdated

// General flags
static final int SERIALIZED = 1;
Expand All @@ -63,10 +68,16 @@ public class EVCacheSerializingTranscoder extends BaseSerializingTranscoder impl
static final int SPECIAL_DOUBLE = (7 << 8);
static final int SPECIAL_BYTEARRAY = (8 << 8);

static final String COMPRESSION = "COMPRESSION_METRIC";
public enum CompressionAlgorithm { GZIP, ZSTD }

public static final int DEFAULT_ZSTD_COMPRESSION_LEVEL = 3;

private static final int ZSTD_MAGIC = 0xFD2FB528;

private final TranscoderUtils tu = new TranscoderUtils(true);
private Timer timer;
private Property<String> compressionAlgorithmProperty;
private Property<Integer> zstdLevelProperty;
Comment thread
janewang1680 marked this conversation as resolved.
protected final String appName;

/**
* Get a serializing transcoder with the default max data size.
Expand All @@ -79,7 +90,23 @@ public EVCacheSerializingTranscoder() {
* Get a serializing transcoder that specifies the max data size.
*/
public EVCacheSerializingTranscoder(int max) {
this(null, max);
}

/**
* Get a serializing transcoder that specifies the owning app name and the max data size.
*/
public EVCacheSerializingTranscoder(String appName, int max) {
super(max);
this.appName = appName;
}

public void setCompressionAlgorithmProperty(Property<String> algorithmProperty) {
this.compressionAlgorithmProperty = algorithmProperty;
}

public void setCompressionLevelProperty(Property<Integer> levelProperty) {
this.zstdLevelProperty = levelProperty;
}

@Override
Expand Down Expand Up @@ -179,31 +206,103 @@ public CachedData encode(Object o) {
}
assert b != null;
if (b.length > compressionThreshold) {
int originalLength = b.length;
byte[] compressed = compress(b);
if (compressed.length < b.length) {
if (compressed.length < originalLength) {
getLogger().trace("Compressed %s from %d to %d",
o.getClass().getName(), b.length, compressed.length);
o.getClass().getName(), originalLength, compressed.length);
b = compressed;
flags |= COMPRESSED;
} else {
getLogger().debug("Compression increased the size of %s from %d to %d",
o.getClass().getName(), b.length, compressed.length);
o.getClass().getName(), originalLength, compressed.length);
}

long compression_ratio = Math.round((double) compressed.length / b.length * 100);
updateTimerWithCompressionRatio(compression_ratio);
long ratioPerCent = Math.round((double) compressed.length / originalLength * 100);
Comment thread
janewang1680 marked this conversation as resolved.
Outdated
recordCompressionRatio(ratioPerCent);
}
return new CachedData(flags, b, getMaxSize());
}

private void updateTimerWithCompressionRatio(long ratio_percentage) {
if(timer == null) {
final List<Tag> tagList = new ArrayList<Tag>(1);
tagList.add(new BasicTag(EVCacheMetricsFactory.COMPRESSION_TYPE, "gzip"));
timer = EVCacheMetricsFactory.getInstance().getPercentileTimer(EVCacheMetricsFactory.COMPRESSION_RATIO, tagList, Duration.ofMillis(100));
};
@Override
protected byte[] compress(byte[] in) {
if (in == null) throw new NullPointerException("Can't compress null");

CompressionAlgorithm compressionAlgorithm = resolveCompressionAlgorithm();

switch (compressionAlgorithm) {
case ZSTD:
int zstdLevel = zstdLevelProperty.orElse(DEFAULT_ZSTD_COMPRESSION_LEVEL).get();
logger.debug("algorithm: " + compressionAlgorithm + ", level: " + zstdLevel + ", appName: " + appName);
Comment thread
janewang1680 marked this conversation as resolved.
Outdated
return Zstd.compress(in, zstdLevel);
case GZIP:
logger.debug("algorithm: " + compressionAlgorithm + ", appName:" + appName);
return super.compress(in);
default:
throw new IllegalArgumentException("Unsupported compression algorithm: " + compressionAlgorithm);
}
}

@Override
protected byte[] decompress(byte[] in) {
if (in == null || in.length == 0) return in;
if (isZstdCompressed(in)) return decompressZstd(in);
return super.decompress(in);
}

private boolean isZstdCompressed(byte[] data) {
if (data == null || data.length < 4) return false;
int magic = ByteBuffer.wrap(data, 0, 4).order(ByteOrder.LITTLE_ENDIAN).getInt();
return magic == ZSTD_MAGIC;
}

timer.record(ratio_percentage, TimeUnit.MILLISECONDS);
private byte[] decompressZstd(byte[] in) {
long originalSize = Zstd.getFrameContentSize(in);
if (originalSize > Integer.MAX_VALUE) {
getLogger().warn("Zstd decompressed size exceeds int range: " + originalSize);
Comment thread
janewang1680 marked this conversation as resolved.
Outdated
return null;
}
if (originalSize > 0) {
// Fast path: frame carries a content-size header (compress() above always does).
return Zstd.decompress(in, (int) originalSize);
}
// Slow path: declared size is 0, unknown (-1), or invalid (-2) — stream-decode and let
// ZstdInputStream surface any frame errors.
ZstdInputStream zis = null;
try {
zis = new ZstdInputStream(new ByteArrayInputStream(in));
return readAll(zis);
} catch (IOException e) {
getLogger().error("Error reading Zstd input stream", e);
return null;
} finally {
try { if (zis != null) zis.close(); } catch (IOException ignored) {}
}
}

private static byte[] readAll(InputStream in) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
byte[] buf = new byte[8192];
int n;
while ((n = in.read(buf)) != -1) {
out.write(buf, 0, n);
}
return out.toByteArray();
}

private void recordCompressionRatio(long ratioPerCent) {
final List<Tag> tagList = new ArrayList<Tag>(2);
tagList.add(new BasicTag(EVCacheMetricsFactory.COMPRESSION_TYPE, resolveCompressionAlgorithm().name().toLowerCase()));
if (appName != null && !appName.isEmpty()) {
tagList.add(new BasicTag(EVCacheMetricsFactory.CACHE, appName));
}
EVCacheMetricsFactory.getInstance()
.getDistributionSummary(EVCacheMetricsFactory.COMPRESSION_RATIO, tagList)
.record(ratioPerCent);
}

private CompressionAlgorithm resolveCompressionAlgorithm() {
return CompressionAlgorithm.valueOf(
compressionAlgorithmProperty.orElse(CompressionAlgorithm.GZIP.name()).get().toUpperCase());
}
}
Original file line number Diff line number Diff line change
@@ -1,32 +1,69 @@
package com.netflix.evcache;

import com.netflix.archaius.api.Property;
import com.netflix.archaius.api.PropertyRepository;
import com.netflix.evcache.util.EVCacheConfig;

import net.spy.memcached.CachedData;

public class EVCacheTranscoder extends EVCacheSerializingTranscoder {

public EVCacheTranscoder() {
this(EVCacheConfig.getInstance().getPropertyRepository().get("default.evcache.max.data.size", Integer.class).orElse(20 * 1024 * 1024).get());
this((String) null);
}

public EVCacheTranscoder(String appName) {
this(appName, EVCacheConfig.getInstance().getPropertyRepository());
}

public EVCacheTranscoder(int max) {
this(max, EVCacheConfig.getInstance().getPropertyRepository().get("default.evcache.compression.threshold", Integer.class).orElse(120).get());
this(null, max);
}

public EVCacheTranscoder(String appName, int max) {
this(appName, EVCacheConfig.getInstance().getPropertyRepository(), max);
}

public EVCacheTranscoder(int max, int compressionThreshold) {
super(max);
setCompressionThreshold(compressionThreshold);
this(null, max, compressionThreshold);
}

@Override
public boolean asyncDecode(CachedData d) {
return super.asyncDecode(d);
public EVCacheTranscoder(String appName, int max, int compressionThreshold) {
this(appName, EVCacheConfig.getInstance().getPropertyRepository(), max, compressionThreshold);
}

@Override
public Object decode(CachedData d) {
return super.decode(d);
/**
* Repository-aware constructors. The compression algorithm/level are read dynamically from the
* supplied {@link PropertyRepository}, so callers must pass the repository that is wired to Fast
* Properties (e.g. {@code poolManager.getEVCacheConfig().getPropertyRepository()}) for FP overrides
* to take effect. The no-repository constructors above fall back to {@link EVCacheConfig#getInstance()}.
*/
public EVCacheTranscoder(String appName, PropertyRepository config) {
this(appName, config, config.get("default.evcache.max.data.size", Integer.class).orElse(20 * 1024 * 1024).get());
}

public EVCacheTranscoder(String appName, PropertyRepository config, int max) {
this(appName, config, max, config.get("default.evcache.compression.threshold", Integer.class).orElse(120).get());
}

public EVCacheTranscoder(String appName, PropertyRepository config, int max, int compressionThreshold) {
super(appName, max);
setCompressionThreshold(compressionThreshold);
Property<String> algoProperty = getProperty(config, "evcacheclient.compression.algo", String.class);
setCompressionAlgorithmProperty(algoProperty);
Property<Integer> zstdLevelProperty = getProperty(config, "evcacheclient.compression.zstd.level", Integer.class);
setCompressionLevelProperty(zstdLevelProperty);
}

/**
* Resolves a property preferring the appName-prefixed key (e.g. {@code EVCACHE_VH_ARCHIVE.default.evcache.compression.algo})
* and falling back to the global {@code default.evcache.*} key when no app-specific override exists.
*/
private <T> Property<T> getProperty(PropertyRepository config, String key, Class<T> type) {
if (appName == null || appName.isEmpty()) {
return config.get(key, type);
}
return config.get(appName + "." + key, type).orElseGet(key);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ public BlockingQueue<Operation> createWriteOperationQueue() {
}

public Transcoder<Object> getDefaultTranscoder() {
return new EVCacheTranscoder();
return new EVCacheTranscoder(appName,
client.getPool().getEVCacheClientPoolManager().getEVCacheConfig().getPropertyRepository());
}

public FailureMode getFailureMode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ public BlockingQueue<Operation> createWriteOperationQueue() {
}

public Transcoder<Object> getDefaultTranscoder() {
return new EVCacheTranscoder();
return new EVCacheTranscoder(appName,
client.getPool().getEVCacheClientPoolManager().getEVCacheConfig().getPropertyRepository());
}

public FailureMode getFailureMode() {
Expand Down
Loading
Loading