From 077ddc8fa5010fa1db4e4b0caa4c367bcfa94d47 Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Thu, 7 Sep 2017 21:03:29 +0300 Subject: [PATCH] Separate ValueCompression from GorillaCompressor and allow the usage of different value predictors. Include DFCM as another predictor possibility. (cherry picked from commit 6d0d61a261e38dfdad4950c6867b5ad7232ff99e) --- .../gorilla/GorillaCompressor.java | 90 +++--------------- .../gorilla/GorillaDecompressor.java | 43 +++------ .../yak/ts/compression/gorilla/Predictor.java | 37 ++++++++ .../compression/gorilla/ValueCompressor.java | 92 +++++++++++++++++++ .../gorilla/ValueDecompressor.java | 55 +++++++++++ .../gorilla/predictors/DifferentialFCM.java | 47 ++++++++++ .../predictors/LastValuePredictor.java | 22 +++++ .../gorilla/EncodeGorillaTest.java | 48 ++++++++++ 8 files changed, 326 insertions(+), 108 deletions(-) create mode 100644 src/main/java/fi/iki/yak/ts/compression/gorilla/Predictor.java create mode 100644 src/main/java/fi/iki/yak/ts/compression/gorilla/ValueCompressor.java create mode 100644 src/main/java/fi/iki/yak/ts/compression/gorilla/ValueDecompressor.java create mode 100644 src/main/java/fi/iki/yak/ts/compression/gorilla/predictors/DifferentialFCM.java create mode 100644 src/main/java/fi/iki/yak/ts/compression/gorilla/predictors/LastValuePredictor.java diff --git a/src/main/java/fi/iki/yak/ts/compression/gorilla/GorillaCompressor.java b/src/main/java/fi/iki/yak/ts/compression/gorilla/GorillaCompressor.java index cd470a2..e736414 100644 --- a/src/main/java/fi/iki/yak/ts/compression/gorilla/GorillaCompressor.java +++ b/src/main/java/fi/iki/yak/ts/compression/gorilla/GorillaCompressor.java @@ -1,5 +1,7 @@ package fi.iki.yak.ts.compression.gorilla; +import fi.iki.yak.ts.compression.gorilla.predictors.LastValuePredictor; + /** * Implements a slightly modified version of the time series compression as described in the Facebook's Gorilla * Paper. @@ -8,9 +10,6 @@ */ public class GorillaCompressor { - private int storedLeadingZeros = Integer.MAX_VALUE; - private int storedTrailingZeros = 0; - private long storedVal = 0; private long storedTimestamp = 0; private int storedDelta = 0; @@ -24,20 +23,19 @@ public class GorillaCompressor { private BitOutput out; + private ValueCompressor valueCompressor; + public GorillaCompressor(long timestamp, BitOutput output) { + this(timestamp, output, new LastValuePredictor()); + } + + public GorillaCompressor(long timestamp, BitOutput output, Predictor predictor) { blockTimestamp = timestamp; out = output; addHeader(timestamp); + this.valueCompressor = new ValueCompressor(output, predictor); } -// public void compressLongStream(Stream stream) { -// stream.peek(p -> writeFirst(p.getTimestamp(), Double.doubleToRawLongBits(p.getDoubleValue()))).skip(1) -// .forEach(p -> { -// compressTimestamp(p.getTimestamp()); -// compressValue(p.getLongValue()); -// }); -// } - private void addHeader(long timestamp) { out.writeBits(timestamp, 64); } @@ -53,7 +51,7 @@ public void addValue(long timestamp, long value) { writeFirst(timestamp, value); } else { compressTimestamp(timestamp); - compressValue(value); + valueCompressor.compressValue(value); } } @@ -69,16 +67,15 @@ public void addValue(long timestamp, double value) { return; } compressTimestamp(timestamp); - compressValue(Double.doubleToRawLongBits(value)); + valueCompressor.compressValue(Double.doubleToRawLongBits(value)); } private void writeFirst(long timestamp, long value) { storedDelta = (int) (timestamp - blockTimestamp); storedTimestamp = timestamp; - storedVal = value; out.writeBits(storedDelta, FIRST_DELTA_BITS); - out.writeBits(storedVal, 64); + valueCompressor.writeFirst(value); } /** @@ -105,8 +102,6 @@ private void compressTimestamp(long timestamp) { int newDelta = (int) (timestamp - storedTimestamp); int deltaD = newDelta - storedDelta; - // TODO Fluctuating values will cause always 64 bits write (-2, +2 for example), zigzag could fix it.. - if(deltaD == 0) { out.skipBit(); } else { @@ -165,65 +160,4 @@ public static int encodeZigZag32(final int n) { } // END: From protobuf - - private void compressValue(long value) { - long xor = storedVal ^ value; - - if(xor == 0) { - // Write 0 - out.skipBit(); - } else { - int leadingZeros = Long.numberOfLeadingZeros(xor); - int trailingZeros = Long.numberOfTrailingZeros(xor); - - out.writeBit(); // Optimize to writeNewLeading / writeExistingLeading? - - if(leadingZeros >= storedLeadingZeros && trailingZeros >= storedTrailingZeros) { - writeExistingLeading(xor); - } else { - writeNewLeading(xor, leadingZeros, trailingZeros); - } - } - - storedVal = value; - } - - /** - * If there at least as many leading zeros and as many trailing zeros as previous value, control bit = 0 (type a) - * store the meaningful XORed value - * - * @param xor XOR between previous value and current - */ - private void writeExistingLeading(long xor) { - out.skipBit(); - - int significantBits = 64 - storedLeadingZeros - storedTrailingZeros; - xor >>>= storedTrailingZeros; - out.writeBits(xor, significantBits); - } - - /** - * store the length of the number of leading zeros in the next 5 bits - * store length of the meaningful XORed value in the next 6 bits, - * store the meaningful bits of the XORed value - * (type b) - * - * @param xor XOR between previous value and current - * @param leadingZeros New leading zeros - * @param trailingZeros New trailing zeros - */ - private void writeNewLeading(long xor, int leadingZeros, int trailingZeros) { - out.writeBit(); - - // Different from version 1.x, use (significantBits - 1) in storage - avoids a branch - int significantBits = 64 - leadingZeros - trailingZeros; - - // Different from original, bits 5 -> 6, avoids a branch, allows storing small longs - out.writeBits(leadingZeros, 6); // Number of leading zeros in the next 6 bits - out.writeBits(significantBits - 1, 6); // Length of meaningful bits in the next 6 bits - out.writeBits(xor >>> trailingZeros, significantBits); // Store the meaningful bits of XOR - - storedLeadingZeros = leadingZeros; - storedTrailingZeros = trailingZeros; - } } diff --git a/src/main/java/fi/iki/yak/ts/compression/gorilla/GorillaDecompressor.java b/src/main/java/fi/iki/yak/ts/compression/gorilla/GorillaDecompressor.java index 61b7415..58a9916 100644 --- a/src/main/java/fi/iki/yak/ts/compression/gorilla/GorillaDecompressor.java +++ b/src/main/java/fi/iki/yak/ts/compression/gorilla/GorillaDecompressor.java @@ -1,27 +1,31 @@ package fi.iki.yak.ts.compression.gorilla; +import fi.iki.yak.ts.compression.gorilla.predictors.LastValuePredictor; + /** * Decompresses a compressed stream created by the GorillaCompressor. * * @author Michael Burman */ public class GorillaDecompressor { - - private int storedLeadingZeros = Integer.MAX_VALUE; - private int storedTrailingZeros = 0; - private long storedVal = 0; private long storedTimestamp = 0; private long storedDelta = 0; private long blockTimestamp = 0; - + private long storedVal = 0; private boolean endOfStream = false; private BitInput in; + private ValueDecompressor decompressor; public GorillaDecompressor(BitInput input) { + this(input, new LastValuePredictor()); + } + + public GorillaDecompressor(BitInput input, Predictor predictor) { in = input; readHeader(); + this.decompressor = new ValueDecompressor(input, predictor); } private void readHeader() { @@ -60,7 +64,8 @@ private void first() { endOfStream = true; return; } - storedVal = in.getLong(64); + storedVal = decompressor.readFirst(); +// storedVal = in.getLong(64); storedTimestamp = blockTimestamp + storedDelta; } @@ -72,7 +77,7 @@ private void nextTimestamp() { switch(readInstruction) { case 0x00: storedTimestamp = storedDelta + storedTimestamp; - nextValue(); + storedVal = decompressor.nextValue(); return; case 0x02: deltaDelta = in.getLong(7); @@ -101,29 +106,7 @@ private void nextTimestamp() { storedDelta = storedDelta + deltaDelta; storedTimestamp = storedDelta + storedTimestamp; - nextValue(); - } - - private void nextValue() { - int val = in.nextClearBit(2); - - switch(val) { - case 3: - // New leading and trailing zeros - storedLeadingZeros = (int) in.getLong(6); - - byte significantBits = (byte) in.getLong(6); - significantBits++; - - storedTrailingZeros = 64 - significantBits - storedLeadingZeros; - // missing break is intentional, we want to overflow to next one - case 2: - long value = in.getLong(64 - storedLeadingZeros - storedTrailingZeros); - value <<= storedTrailingZeros; - value = storedVal ^ value; - storedVal = value; - break; - } + storedVal = decompressor.nextValue(); } // START: From protobuf diff --git a/src/main/java/fi/iki/yak/ts/compression/gorilla/Predictor.java b/src/main/java/fi/iki/yak/ts/compression/gorilla/Predictor.java new file mode 100644 index 0000000..6dc5bc7 --- /dev/null +++ b/src/main/java/fi/iki/yak/ts/compression/gorilla/Predictor.java @@ -0,0 +1,37 @@ +/* + * Copyright 2017 Red Hat, Inc. and/or its affiliates + * and other contributors as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package fi.iki.yak.ts.compression.gorilla; + +/** + * @author miburman + */ +public interface Predictor { + + /** + * Give the real value + * + * @param value Long / bits of Double + */ + void update(long value); + + /** + * Predicts the next value + * + * @return Predicted value + */ + long predict(); +} diff --git a/src/main/java/fi/iki/yak/ts/compression/gorilla/ValueCompressor.java b/src/main/java/fi/iki/yak/ts/compression/gorilla/ValueCompressor.java new file mode 100644 index 0000000..05bf0e1 --- /dev/null +++ b/src/main/java/fi/iki/yak/ts/compression/gorilla/ValueCompressor.java @@ -0,0 +1,92 @@ +package fi.iki.yak.ts.compression.gorilla; + +import fi.iki.yak.ts.compression.gorilla.predictors.LastValuePredictor; + +/** + * ValueCompressor for the Gorilla encoding format. Supply with long presentation of the value, + * in case of doubles use Double.doubleToRawLongBits(value) + * + * @author Michael Burman + */ +public class ValueCompressor { + private int storedLeadingZeros = Integer.MAX_VALUE; + private int storedTrailingZeros = 0; + + private Predictor predictor; + private BitOutput out; + + public ValueCompressor(BitOutput out) { + this(out, new LastValuePredictor()); + } + + public ValueCompressor(BitOutput out, Predictor predictor) { + this.out = out; + this.predictor = predictor; + } + + void writeFirst(long value) { + predictor.update(value); + out.writeBits(value, 64); + } + + protected void compressValue(long value) { + // In original Gorilla, Last-Value predictor is used + long diff = predictor.predict() ^ value; + predictor.update(value); + + if(diff == 0) { + // Write 0 + out.skipBit(); + } else { + int leadingZeros = Long.numberOfLeadingZeros(diff); + int trailingZeros = Long.numberOfTrailingZeros(diff); + + out.writeBit(); // Optimize to writeNewLeading / writeExistingLeading? + + if(leadingZeros >= storedLeadingZeros && trailingZeros >= storedTrailingZeros) { + writeExistingLeading(diff); + } else { + writeNewLeading(diff, leadingZeros, trailingZeros); + } + } + } + + /** + * If there at least as many leading zeros and as many trailing zeros as previous value, control bit = 0 (type a) + * store the meaningful XORed value + * + * @param xor XOR between previous value and current + */ + private void writeExistingLeading(long xor) { + out.skipBit(); + + int significantBits = 64 - storedLeadingZeros - storedTrailingZeros; + xor >>>= storedTrailingZeros; + out.writeBits(xor, significantBits); + } + + /** + * store the length of the number of leading zeros in the next 5 bits + * store length of the meaningful XORed value in the next 6 bits, + * store the meaningful bits of the XORed value + * (type b) + * + * @param xor XOR between previous value and current + * @param leadingZeros New leading zeros + * @param trailingZeros New trailing zeros + */ + private void writeNewLeading(long xor, int leadingZeros, int trailingZeros) { + out.writeBit(); + + // Different from version 1.x, use (significantBits - 1) in storage - avoids a branch + int significantBits = 64 - leadingZeros - trailingZeros; + + // Different from original, bits 5 -> 6, avoids a branch, allows storing small longs + out.writeBits(leadingZeros, 6); // Number of leading zeros in the next 6 bits + out.writeBits(significantBits - 1, 6); // Length of meaningful bits in the next 6 bits + out.writeBits(xor >>> trailingZeros, significantBits); // Store the meaningful bits of XOR + + storedLeadingZeros = leadingZeros; + storedTrailingZeros = trailingZeros; + } +} diff --git a/src/main/java/fi/iki/yak/ts/compression/gorilla/ValueDecompressor.java b/src/main/java/fi/iki/yak/ts/compression/gorilla/ValueDecompressor.java new file mode 100644 index 0000000..43a08ad --- /dev/null +++ b/src/main/java/fi/iki/yak/ts/compression/gorilla/ValueDecompressor.java @@ -0,0 +1,55 @@ +package fi.iki.yak.ts.compression.gorilla; + +import fi.iki.yak.ts.compression.gorilla.predictors.LastValuePredictor; + +/** + * Value decompressor for Gorilla encoded values + * + * @author Michael Burman + */ +public class ValueDecompressor { + private BitInput in; + private Predictor predictor; + + private int storedLeadingZeros = Integer.MAX_VALUE; + private int storedTrailingZeros = 0; + + public ValueDecompressor(BitInput input) { + this(input, new LastValuePredictor()); + } + + public ValueDecompressor(BitInput input, Predictor predictor) { + this.in = input; + this.predictor = predictor; + } + + public long readFirst() { + long value = in.getLong(Long.SIZE); + predictor.update(value); + return value; + } + + public long nextValue() { + int val = in.nextClearBit(2); + + switch(val) { + case 3: + // New leading and trailing zeros + storedLeadingZeros = (int) in.getLong(6); + + byte significantBits = (byte) in.getLong(6); + significantBits++; + + storedTrailingZeros = Long.SIZE - significantBits - storedLeadingZeros; + // missing break is intentional, we want to overflow to next one + case 2: + long value = in.getLong(Long.SIZE - storedLeadingZeros - storedTrailingZeros); + value <<= storedTrailingZeros; + + value = predictor.predict() ^ value; + predictor.update(value); + return value; + } + return predictor.predict(); + } +} diff --git a/src/main/java/fi/iki/yak/ts/compression/gorilla/predictors/DifferentialFCM.java b/src/main/java/fi/iki/yak/ts/compression/gorilla/predictors/DifferentialFCM.java new file mode 100644 index 0000000..bac05f9 --- /dev/null +++ b/src/main/java/fi/iki/yak/ts/compression/gorilla/predictors/DifferentialFCM.java @@ -0,0 +1,47 @@ +package fi.iki.yak.ts.compression.gorilla.predictors; + +import fi.iki.yak.ts.compression.gorilla.Predictor; + +/** + * Differential Finite Context Method (DFCM) is a context based predictor. + * + * @author Michael Burman + */ +public class DifferentialFCM implements Predictor { + + private long lastValue = 0L; + private long[] table; + private int lastHash = 0; + + private final int mask; + + /** + * Create a new DFCM predictor + * + * @param size Prediction table size, will be rounded to the next power of two and must be larger than 0 + */ + public DifferentialFCM(int size) { + if(size > 0) { + size--; + int leadingZeros = Long.numberOfLeadingZeros(size); + int newSize = 1 << (Long.SIZE - leadingZeros); + + this.table = new long[newSize]; + this.mask = newSize - 1; + } else { + throw new IllegalArgumentException("Size must be positive and a power of two"); + } + } + + @Override + public void update(long value) { + table[lastHash] = value - lastValue; + lastHash = (int) (((lastHash << 5) ^ ((value - lastValue) >> 50)) & this.mask); + lastValue = value; + } + + @Override + public long predict() { + return table[lastHash] + lastValue; + } +} diff --git a/src/main/java/fi/iki/yak/ts/compression/gorilla/predictors/LastValuePredictor.java b/src/main/java/fi/iki/yak/ts/compression/gorilla/predictors/LastValuePredictor.java new file mode 100644 index 0000000..5e4452c --- /dev/null +++ b/src/main/java/fi/iki/yak/ts/compression/gorilla/predictors/LastValuePredictor.java @@ -0,0 +1,22 @@ +package fi.iki.yak.ts.compression.gorilla.predictors; + +import fi.iki.yak.ts.compression.gorilla.Predictor; + +/** + * Last-Value predictor, a computational predictor using previous value as a prediction for the next one + * + * @author Michael Burman + */ +public class LastValuePredictor implements Predictor { + private long storedVal = 0; + + public LastValuePredictor() {} + + public void update(long value) { + this.storedVal = value; + } + + public long predict() { + return storedVal; + } +} diff --git a/src/test/java/fi/iki/yak/ts/compression/gorilla/EncodeGorillaTest.java b/src/test/java/fi/iki/yak/ts/compression/gorilla/EncodeGorillaTest.java index a6c8af7..7501fc4 100644 --- a/src/test/java/fi/iki/yak/ts/compression/gorilla/EncodeGorillaTest.java +++ b/src/test/java/fi/iki/yak/ts/compression/gorilla/EncodeGorillaTest.java @@ -13,6 +13,8 @@ import org.junit.jupiter.api.Test; +import fi.iki.yak.ts.compression.gorilla.predictors.DifferentialFCM; + /** * These are generic tests to test that input matches the output after compression + decompression cycle, using * both the timestamp and value compression. @@ -521,4 +523,50 @@ void testLongEncoding() throws Exception { } assertNull(d.readPair()); } + + /** + * Tests writing enough large amount of datapoints that causes the included LongArrayOutput to do + * internal byte array expansion. + */ + @Test + void testDifferentialFCM() throws Exception { + // This test should trigger ByteBuffer reallocation + int amountOfPoints = 100000; + long blockStart = LocalDateTime.now().truncatedTo(ChronoUnit.HOURS) + .toInstant(ZoneOffset.UTC).toEpochMilli(); + LongArrayOutput output = new LongArrayOutput(); + + long now = blockStart + 60; + ByteBuffer bb = ByteBuffer.allocateDirect(amountOfPoints * 2*Long.BYTES); + + for(int i = 0; i < amountOfPoints; i++) { + bb.putLong(now + i*60); + bb.putDouble(i * Math.random()); + } + + GorillaCompressor c = new GorillaCompressor(blockStart, output, new DifferentialFCM(1024)); + + bb.flip(); + + for(int j = 0; j < amountOfPoints; j++) { + c.addValue(bb.getLong(), bb.getDouble()); + } + + c.close(); + + bb.flip(); + + LongArrayInput input = new LongArrayInput(output.getLongArray()); + GorillaDecompressor d = new GorillaDecompressor(input, new DifferentialFCM(1024)); + + for(int i = 0; i < amountOfPoints; i++) { + long tStamp = bb.getLong(); + double val = bb.getDouble(); + Pair pair = d.readPair(); + assertEquals(tStamp, pair.getTimestamp(), "Expected timestamp did not match at point " + i); + assertEquals(val, pair.getDoubleValue()); + } + assertNull(d.readPair()); + } + }