From 76dff331850397eb42864ab21286ce93ddbd22ce Mon Sep 17 00:00:00 2001 From: Panagiotis Liakos Date: Mon, 28 Mar 2022 23:28:41 +0300 Subject: [PATCH] lossy compression --- .../compression/gorilla/Compressor32.java | 6 +- .../gorilla/LossyCompressor32.java | 176 ++++++++++++++++++ .../ts/compression/gorilla/EncodeTest.java | 55 ++++++ .../compression/gorilla/CompressTest.java | 95 ++++++++++ 4 files changed, 329 insertions(+), 3 deletions(-) create mode 100644 src/main/java/gr/aueb/compression/gorilla/LossyCompressor32.java diff --git a/src/main/java/gr/aueb/compression/gorilla/Compressor32.java b/src/main/java/gr/aueb/compression/gorilla/Compressor32.java index 41e96a3..a11bff1 100644 --- a/src/main/java/gr/aueb/compression/gorilla/Compressor32.java +++ b/src/main/java/gr/aueb/compression/gorilla/Compressor32.java @@ -127,16 +127,16 @@ private void writeExistingLeading(int xor) { */ private void writeNewLeading(int xor, int leadingZeros, int trailingZeros) { out.writeBit(); - out.writeBits(leadingZeros, 4); // Number of leading zeros in the next 5 bits + out.writeBits(leadingZeros, 4); // Number of leading zeros in the next 4 bits int significantBits = 32 - leadingZeros - trailingZeros; - out.writeBits(significantBits, 6); // Length of meaningful bits in the next 6 bits + out.writeBits(significantBits, 6); // Length of meaningful bits in the next 5 bits out.writeBits(xor >>> trailingZeros, significantBits); // Store the meaningful bits of XOR storedLeadingZeros = leadingZeros; storedTrailingZeros = trailingZeros; - size += 1 + 4 + 6 + significantBits; + size += 1 + 4 + 5 + significantBits; } public int getSize() { diff --git a/src/main/java/gr/aueb/compression/gorilla/LossyCompressor32.java b/src/main/java/gr/aueb/compression/gorilla/LossyCompressor32.java new file mode 100644 index 0000000..69ee91c --- /dev/null +++ b/src/main/java/gr/aueb/compression/gorilla/LossyCompressor32.java @@ -0,0 +1,176 @@ +package gr.aueb.compression.gorilla; + +import fi.iki.yak.ts.compression.gorilla.BitOutput; + +/** + * Implements the time series compression as described in the Facebook's Gorilla Paper. Value compression + * is for floating points only. + * + * @author Michael Burman + */ +public class LossyCompressor32 { + + private int storedLeadingZeros = Integer.MAX_VALUE; + private int storedTrailingZeros = 0; + private int storedVal = 0; + private boolean first = true; + private int size; + private int cases[]; + private float trailingDiff; + private float leadingDiff; + + + private BitOutput out; + private int logOfError; + + public LossyCompressor32(BitOutput output, int logOfError) { + this.out = output; + this.size = 0; + this.logOfError = logOfError; + int cases[] = {0, 0, 0}; + this.cases = cases; + this.trailingDiff = 0; + this.leadingDiff = 0; + } + + /** + * Adds a new long value to the series. Note, values must be inserted in order. + * + * @param timestamp Timestamp which is inside the allowed time block (default 24 hours with millisecond precision) + * @param value next floating point value in the series + */ + public void addValue(int value) { + if(first) { + writeFirst(value); + } else { + compressValue(value); + } + } + + /** + * Adds a new double value to the series. Note, values must be inserted in order. + * + * @param timestamp Timestamp which is inside the allowed time block (default 24 hours with millisecond precision) + * @param value next floating point value in the series + */ + public void addValue(float value) { + if(first) { + writeFirst(Float.floatToRawIntBits(value)); + } else { + compressValue(Float.floatToRawIntBits(value)); + } + } + + private void writeFirst(int value) { + first = false; + storedVal = value; + out.writeBits(storedVal, 32); + size += 32; + } + + /** + * Closes the block and writes the remaining stuff to the BitOutput. + */ + public void close() { + addValue(Float.NaN); + out.skipBit(); + out.flush(); + } + + private void compressValue(int value) { + // TODO Fix already compiled into a big method + int integerDigits = (value << 1 >>> 24) - 127; + int space = 23 + this.logOfError - integerDigits; + + if (space > 0) { + value = value >> space << space; + value = value | (storedVal & (2^space - 1)); + } + + int xor = storedVal ^ value; + + if(xor == 0) { + // Write 0 + cases[0] += 1; + out.skipBit(); + size += 1; + } else { + int leadingZeros = Integer.numberOfLeadingZeros(xor); + int trailingZeros = Integer.numberOfTrailingZeros(xor); + + // Check overflow of leading? Can't be 32! + if(leadingZeros >= 16) { + leadingZeros = 15; + } + + // Store bit '1' + out.writeBit(); + size += 1; + + if(leadingZeros >= storedLeadingZeros && trailingZeros >= storedTrailingZeros) { + cases[1] += 1; + this.trailingDiff += trailingZeros - storedTrailingZeros; + this.leadingDiff += leadingZeros - storedLeadingZeros; + writeExistingLeading(xor); + } else { + cases[2] += 2; + writeNewLeading(xor, leadingZeros, trailingZeros); + } + } + + storedVal = value; + } + + /** + * If there at least as many leading zeros and as many trailing zeros as previous value, control bit = 0 (type a) + * store the meaningful XORed value + * + * @param xor XOR between previous value and current + */ + private void writeExistingLeading(int xor) { + out.skipBit(); + int significantBits = 32 - storedLeadingZeros - storedTrailingZeros; + out.writeBits(xor >>> storedTrailingZeros, significantBits); + size += 1 + significantBits; + } + + /** + * store the length of the number of leading zeros in the next 5 bits + * store length of the meaningful XORed value in the next 6 bits, + * store the meaningful bits of the XORed value + * (type b) + * + * @param xor XOR between previous value and current + * @param leadingZeros New leading zeros + * @param trailingZeros New trailing zeros + */ + private void writeNewLeading(int xor, int leadingZeros, int trailingZeros) { + out.writeBit(); + out.writeBits(leadingZeros, 4); // Number of leading zeros in the next 4 bits + + int significantBits = 32 - leadingZeros - trailingZeros; + out.writeBits(significantBits, 6); // Length of meaningful bits in the next 5 bits + out.writeBits(xor >>> trailingZeros, significantBits); // Store the meaningful bits of XOR + + storedLeadingZeros = leadingZeros; + storedTrailingZeros = trailingZeros; + + size += 1 + 4 + 5 + significantBits; + } + + public int getSize() { + return size; + } + + public float getLeadingDiff() { + return leadingDiff; + } + + public float getTrailingDiff() { + return trailingDiff; + } + + public int[] getCases() { + return cases; + } +} diff --git a/src/test/java/fi/iki/yak/ts/compression/gorilla/EncodeTest.java b/src/test/java/fi/iki/yak/ts/compression/gorilla/EncodeTest.java index b1d5c59..3a204da 100644 --- a/src/test/java/fi/iki/yak/ts/compression/gorilla/EncodeTest.java +++ b/src/test/java/fi/iki/yak/ts/compression/gorilla/EncodeTest.java @@ -13,6 +13,9 @@ import org.junit.jupiter.api.Test; +import gr.aueb.compression.gorilla.Decompressor32; +import gr.aueb.compression.gorilla.LossyCompressor32; + /** * These are generic tests to test that input matches the output after compression + decompression cycle, using * both the timestamp and value compression. @@ -43,6 +46,28 @@ private void comparePairsToCompression(long blockTimestamp, Value[] pairs) { assertNull(d.readPair()); } + private void comparePairsToCompressionLossy(long blockTimestamp, gr.aueb.compression.gorilla.Value[] pairs) { + ByteBufferBitOutput output = new ByteBufferBitOutput(); + LossyCompressor32 c = new LossyCompressor32(output, -1); + Arrays.stream(pairs).forEach(p -> c.addValue(p.getFloatValue())); + c.close(); + System.out.println("Size: " + c.getSize()); + + ByteBuffer byteBuffer = output.getByteBuffer(); + byteBuffer.flip(); + + ByteBufferBitInput input = new ByteBufferBitInput(byteBuffer); + Decompressor32 d = new Decompressor32(input); + + // Replace with stream once decompressor supports it + for(int i = 0; i < pairs.length; i++) { + gr.aueb.compression.gorilla.Value pair = d.readValue(); + assertEquals(pairs[i].getFloatValue(), pair.getFloatValue(), 0.5, "Value did not match"); + } + + assertNull(d.readValue()); + } + @Test void simpleEncodeAndDecodeTest() throws Exception { long now = LocalDateTime.now().truncatedTo(ChronoUnit.HOURS) @@ -62,6 +87,36 @@ void simpleEncodeAndDecodeTest() throws Exception { comparePairsToCompression(now, pairs); } + @Test + void simpleLossyEncodeAndDecodeTest() throws Exception { + long now = LocalDateTime.now().truncatedTo(ChronoUnit.HOURS) + .toInstant(ZoneOffset.UTC).toEpochMilli(); + + gr.aueb.compression.gorilla.Value[] pairs = { + new gr.aueb.compression.gorilla.Value(Float.floatToRawIntBits(1.0f)), + new gr.aueb.compression.gorilla.Value(Float.floatToRawIntBits(-2.0f)), + new gr.aueb.compression.gorilla.Value(Float.floatToRawIntBits(-2.5f)), + new gr.aueb.compression.gorilla.Value(Float.floatToRawIntBits(65537f)), + new gr.aueb.compression.gorilla.Value(Float.floatToRawIntBits(2147483650.0f)), + new gr.aueb.compression.gorilla.Value(Float.floatToRawIntBits(-16384f)), + new gr.aueb.compression.gorilla.Value(Float.floatToRawIntBits(2.8f)), + new gr.aueb.compression.gorilla.Value(Float.floatToRawIntBits(-38.0f)), + + /* + Digits: 23 1 + Digits: 21 1 + Digits: 7 16 + Digits: 7 31 + Digits: 23 14 + Digits: 0 1 + Digits: 0 5 + Digits: 19 128 + */ + }; + + comparePairsToCompressionLossy(now, pairs); + } + @Test public void willItBlend() throws Exception { long blockTimestamp = 1500400800000L; diff --git a/src/test/java/gr/aueb/compression/gorilla/CompressTest.java b/src/test/java/gr/aueb/compression/gorilla/CompressTest.java index 3db08b9..41397ff 100644 --- a/src/test/java/gr/aueb/compression/gorilla/CompressTest.java +++ b/src/test/java/gr/aueb/compression/gorilla/CompressTest.java @@ -238,4 +238,99 @@ public void testPrecision32ForBaselWindSpeed() throws IOException { } System.out.println(String.format("%s - Max precision error: %e, Range: %f, (error/range %%: %e)", filename, maxPrecisionError, (maxValue - minValue), maxPrecisionError / (maxValue - minValue))); } + + @Test + public void testPrecisionLossy32ForBaselTemp() throws IOException { + + for (int logOfError = -10; logOfError < 10; logOfError++) { + String filename = "/basel-temp.csv.gz"; + TimeseriesFileReader timeseriesFileReader = new TimeseriesFileReader(this.getClass().getResourceAsStream(filename)); + Collection values; + double maxValue = Double.MIN_VALUE; + double minValue = Double.MAX_VALUE; + double maxPrecisionError = 0; + int totalSize = 0; + float totalBlocks = 0; + float totalTrailingDiff = 0; + int totalCases0 = 0; + int totalCases1 = 0; + int totalCases2 = 0; + while ((values = timeseriesFileReader.nextBlock()) != null) { + ByteBufferBitOutput output = new ByteBufferBitOutput(); + LossyCompressor32 compressor = new LossyCompressor32(output, logOfError); + values.forEach(value -> compressor.addValue(value.floatValue())); + compressor.close(); + totalSize += compressor.getSize(); + totalBlocks += 1; + totalTrailingDiff += compressor.getTrailingDiff(); + totalCases0 += compressor.getCases()[0]; + totalCases1 += compressor.getCases()[1]; + totalCases2 += compressor.getCases()[2]; + ByteBuffer byteBuffer = output.getByteBuffer(); + byteBuffer.flip(); + ByteBufferBitInput input = new ByteBufferBitInput(byteBuffer); + Decompressor32 d = new Decompressor32(input); + for(Double value : values) { + maxValue = value > maxValue ? value : maxValue; + minValue = value < minValue ? value : minValue; + Value pair = d.readValue(); + double precisionError = Math.abs(value.doubleValue() - pair.getFloatValue()); + maxPrecisionError = (precisionError > maxPrecisionError) ? precisionError : maxPrecisionError; + assertEquals(value.doubleValue(), pair.getFloatValue(), Math.pow(2, logOfError), "Value did not match"); + } + assertNull(d.readValue()); + } + float total = totalCases0 + totalCases1 + totalCases2; + System.out.println(String.format("Lossy32 %s - Size : %d, Bits/value: %.2f, error: %f, Range: %.2f, (%.2f%%), Avg. Unexploited Trailing: %.2f, Cases 0: %.2f, 10: %.2f, 11: %.2f", + filename, totalSize, totalSize / (totalBlocks * TimeseriesFileReader.DEFAULT_BLOCK_SIZE), maxPrecisionError, (maxValue - minValue), 100* maxPrecisionError / (maxValue - minValue), totalTrailingDiff / totalCases1, totalCases0 / total, totalCases1 / total, totalCases2 / total)); + } + } + + @Test + public void testPrecisionLossy32ForBaselWindSpeed() throws IOException { + + for (int logOfError = -10; logOfError < 10; logOfError++) { + String filename = "/basel-wind-speed.csv.gz"; + TimeseriesFileReader timeseriesFileReader = new TimeseriesFileReader(this.getClass().getResourceAsStream(filename)); + Collection values; + double maxValue = Double.MIN_VALUE; + double minValue = Double.MAX_VALUE; + double maxPrecisionError = 0; + int totalSize = 0; + float totalBlocks = 0; + float totalTrailingDiff = 0; + int totalCases0 = 0; + int totalCases1 = 0; + int totalCases2 = 0; + while ((values = timeseriesFileReader.nextBlock()) != null) { + ByteBufferBitOutput output = new ByteBufferBitOutput(); + LossyCompressor32 compressor = new LossyCompressor32(output, logOfError); + values.forEach(value -> compressor.addValue(value.floatValue())); + compressor.close(); + totalSize += compressor.getSize(); + totalBlocks += 1; + totalTrailingDiff += compressor.getTrailingDiff(); + totalCases0 += compressor.getCases()[0]; + totalCases1 += compressor.getCases()[1]; + totalCases2 += compressor.getCases()[2]; + ByteBuffer byteBuffer = output.getByteBuffer(); + byteBuffer.flip(); + ByteBufferBitInput input = new ByteBufferBitInput(byteBuffer); + Decompressor32 d = new Decompressor32(input); + for(Double value : values) { + maxValue = value > maxValue ? value : maxValue; + minValue = value < minValue ? value : minValue; + Value pair = d.readValue(); + double precisionError = Math.abs(value.doubleValue() - pair.getFloatValue()); + maxPrecisionError = (precisionError > maxPrecisionError) ? precisionError : maxPrecisionError; + assertEquals(value.doubleValue(), pair.getFloatValue(), Math.pow(2, logOfError), "Value did not match"); + } + assertNull(d.readValue()); + } + float total = totalCases0 + totalCases1 + totalCases2; + System.out.println(String.format("Lossy32 %s - Size : %d, Bits/value: %.2f, error: %f, Range: %.2f, (%.2f%%), Avg. Unexploited Trailing: %.2f, Cases 0: %.2f, 10: %.2f, 11: %.2f", + filename, totalSize, totalSize / (totalBlocks * TimeseriesFileReader.DEFAULT_BLOCK_SIZE), maxPrecisionError, (maxValue - minValue), 100* maxPrecisionError / (maxValue - minValue), totalTrailingDiff / totalCases1, totalCases0 / total, totalCases1 / total, totalCases2 / total)); + } + } + }