Skip to content

Commit

Permalink
lossy compression
Browse files Browse the repository at this point in the history
  • Loading branch information
panagiotisl committed Mar 28, 2022
1 parent af88f10 commit 76dff33
Show file tree
Hide file tree
Showing 4 changed files with 329 additions and 3 deletions.
6 changes: 3 additions & 3 deletions src/main/java/gr/aueb/compression/gorilla/Compressor32.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,16 +127,16 @@ private void writeExistingLeading(int xor) {
*/
private void writeNewLeading(int xor, int leadingZeros, int trailingZeros) {
out.writeBit();
out.writeBits(leadingZeros, 4); // Number of leading zeros in the next 5 bits
out.writeBits(leadingZeros, 4); // Number of leading zeros in the next 4 bits

int significantBits = 32 - leadingZeros - trailingZeros;
out.writeBits(significantBits, 6); // Length of meaningful bits in the next 6 bits
out.writeBits(significantBits, 6); // Length of meaningful bits in the next 5 bits
out.writeBits(xor >>> trailingZeros, significantBits); // Store the meaningful bits of XOR

storedLeadingZeros = leadingZeros;
storedTrailingZeros = trailingZeros;

size += 1 + 4 + 6 + significantBits;
size += 1 + 4 + 5 + significantBits;
}

public int getSize() {
Expand Down
176 changes: 176 additions & 0 deletions src/main/java/gr/aueb/compression/gorilla/LossyCompressor32.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package gr.aueb.compression.gorilla;

import fi.iki.yak.ts.compression.gorilla.BitOutput;

/**
* Implements the time series compression as described in the Facebook's Gorilla Paper. Value compression
* is for floating points only.
*
* @author Michael Burman
*/
public class LossyCompressor32 {

private int storedLeadingZeros = Integer.MAX_VALUE;
private int storedTrailingZeros = 0;
private int storedVal = 0;
private boolean first = true;
private int size;
private int cases[];
private float trailingDiff;
private float leadingDiff;


private BitOutput out;
private int logOfError;

public LossyCompressor32(BitOutput output, int logOfError) {
this.out = output;
this.size = 0;
this.logOfError = logOfError;
int cases[] = {0, 0, 0};
this.cases = cases;
this.trailingDiff = 0;
this.leadingDiff = 0;
}

/**
* Adds a new long value to the series. Note, values must be inserted in order.
*
* @param timestamp Timestamp which is inside the allowed time block (default 24 hours with millisecond precision)
* @param value next floating point value in the series
*/
public void addValue(int value) {
if(first) {
writeFirst(value);
} else {
compressValue(value);
}
}

/**
* Adds a new double value to the series. Note, values must be inserted in order.
*
* @param timestamp Timestamp which is inside the allowed time block (default 24 hours with millisecond precision)
* @param value next floating point value in the series
*/
public void addValue(float value) {
if(first) {
writeFirst(Float.floatToRawIntBits(value));
} else {
compressValue(Float.floatToRawIntBits(value));
}
}

private void writeFirst(int value) {
first = false;
storedVal = value;
out.writeBits(storedVal, 32);
size += 32;
}

/**
* Closes the block and writes the remaining stuff to the BitOutput.
*/
public void close() {
addValue(Float.NaN);
out.skipBit();
out.flush();
}

private void compressValue(int value) {
// TODO Fix already compiled into a big method
int integerDigits = (value << 1 >>> 24) - 127;
int space = 23 + this.logOfError - integerDigits;

if (space > 0) {
value = value >> space << space;
value = value | (storedVal & (2^space - 1));
}

int xor = storedVal ^ value;

if(xor == 0) {
// Write 0
cases[0] += 1;
out.skipBit();
size += 1;
} else {
int leadingZeros = Integer.numberOfLeadingZeros(xor);
int trailingZeros = Integer.numberOfTrailingZeros(xor);

// Check overflow of leading? Can't be 32!
if(leadingZeros >= 16) {
leadingZeros = 15;
}

// Store bit '1'
out.writeBit();
size += 1;

if(leadingZeros >= storedLeadingZeros && trailingZeros >= storedTrailingZeros) {
cases[1] += 1;
this.trailingDiff += trailingZeros - storedTrailingZeros;
this.leadingDiff += leadingZeros - storedLeadingZeros;
writeExistingLeading(xor);
} else {
cases[2] += 2;
writeNewLeading(xor, leadingZeros, trailingZeros);
}
}

storedVal = value;
}

/**
* If there at least as many leading zeros and as many trailing zeros as previous value, control bit = 0 (type a)
* store the meaningful XORed value
*
* @param xor XOR between previous value and current
*/
private void writeExistingLeading(int xor) {
out.skipBit();
int significantBits = 32 - storedLeadingZeros - storedTrailingZeros;
out.writeBits(xor >>> storedTrailingZeros, significantBits);
size += 1 + significantBits;
}

/**
* store the length of the number of leading zeros in the next 5 bits
* store length of the meaningful XORed value in the next 6 bits,
* store the meaningful bits of the XORed value
* (type b)
*
* @param xor XOR between previous value and current
* @param leadingZeros New leading zeros
* @param trailingZeros New trailing zeros
*/
private void writeNewLeading(int xor, int leadingZeros, int trailingZeros) {
out.writeBit();
out.writeBits(leadingZeros, 4); // Number of leading zeros in the next 4 bits

int significantBits = 32 - leadingZeros - trailingZeros;
out.writeBits(significantBits, 6); // Length of meaningful bits in the next 5 bits
out.writeBits(xor >>> trailingZeros, significantBits); // Store the meaningful bits of XOR

storedLeadingZeros = leadingZeros;
storedTrailingZeros = trailingZeros;

size += 1 + 4 + 5 + significantBits;
}

public int getSize() {
return size;
}

public float getLeadingDiff() {
return leadingDiff;
}

public float getTrailingDiff() {
return trailingDiff;
}

public int[] getCases() {
return cases;
}
}
55 changes: 55 additions & 0 deletions src/test/java/fi/iki/yak/ts/compression/gorilla/EncodeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@

import org.junit.jupiter.api.Test;

import gr.aueb.compression.gorilla.Decompressor32;
import gr.aueb.compression.gorilla.LossyCompressor32;

/**
* These are generic tests to test that input matches the output after compression + decompression cycle, using
* both the timestamp and value compression.
Expand Down Expand Up @@ -43,6 +46,28 @@ private void comparePairsToCompression(long blockTimestamp, Value[] pairs) {
assertNull(d.readPair());
}

private void comparePairsToCompressionLossy(long blockTimestamp, gr.aueb.compression.gorilla.Value[] pairs) {
ByteBufferBitOutput output = new ByteBufferBitOutput();
LossyCompressor32 c = new LossyCompressor32(output, -1);
Arrays.stream(pairs).forEach(p -> c.addValue(p.getFloatValue()));
c.close();
System.out.println("Size: " + c.getSize());

ByteBuffer byteBuffer = output.getByteBuffer();
byteBuffer.flip();

ByteBufferBitInput input = new ByteBufferBitInput(byteBuffer);
Decompressor32 d = new Decompressor32(input);

// Replace with stream once decompressor supports it
for(int i = 0; i < pairs.length; i++) {
gr.aueb.compression.gorilla.Value pair = d.readValue();
assertEquals(pairs[i].getFloatValue(), pair.getFloatValue(), 0.5, "Value did not match");
}

assertNull(d.readValue());
}

@Test
void simpleEncodeAndDecodeTest() throws Exception {
long now = LocalDateTime.now().truncatedTo(ChronoUnit.HOURS)
Expand All @@ -62,6 +87,36 @@ void simpleEncodeAndDecodeTest() throws Exception {
comparePairsToCompression(now, pairs);
}

@Test
void simpleLossyEncodeAndDecodeTest() throws Exception {
long now = LocalDateTime.now().truncatedTo(ChronoUnit.HOURS)
.toInstant(ZoneOffset.UTC).toEpochMilli();

gr.aueb.compression.gorilla.Value[] pairs = {
new gr.aueb.compression.gorilla.Value(Float.floatToRawIntBits(1.0f)),
new gr.aueb.compression.gorilla.Value(Float.floatToRawIntBits(-2.0f)),
new gr.aueb.compression.gorilla.Value(Float.floatToRawIntBits(-2.5f)),
new gr.aueb.compression.gorilla.Value(Float.floatToRawIntBits(65537f)),
new gr.aueb.compression.gorilla.Value(Float.floatToRawIntBits(2147483650.0f)),
new gr.aueb.compression.gorilla.Value(Float.floatToRawIntBits(-16384f)),
new gr.aueb.compression.gorilla.Value(Float.floatToRawIntBits(2.8f)),
new gr.aueb.compression.gorilla.Value(Float.floatToRawIntBits(-38.0f)),

/*
Digits: 23 1
Digits: 21 1
Digits: 7 16
Digits: 7 31
Digits: 23 14
Digits: 0 1
Digits: 0 5
Digits: 19 128
*/
};

comparePairsToCompressionLossy(now, pairs);
}

@Test
public void willItBlend() throws Exception {
long blockTimestamp = 1500400800000L;
Expand Down
95 changes: 95 additions & 0 deletions src/test/java/gr/aueb/compression/gorilla/CompressTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -238,4 +238,99 @@ public void testPrecision32ForBaselWindSpeed() throws IOException {
}
System.out.println(String.format("%s - Max precision error: %e, Range: %f, (error/range %%: %e)", filename, maxPrecisionError, (maxValue - minValue), maxPrecisionError / (maxValue - minValue)));
}

@Test
public void testPrecisionLossy32ForBaselTemp() throws IOException {

for (int logOfError = -10; logOfError < 10; logOfError++) {
String filename = "/basel-temp.csv.gz";
TimeseriesFileReader timeseriesFileReader = new TimeseriesFileReader(this.getClass().getResourceAsStream(filename));
Collection<Double> values;
double maxValue = Double.MIN_VALUE;
double minValue = Double.MAX_VALUE;
double maxPrecisionError = 0;
int totalSize = 0;
float totalBlocks = 0;
float totalTrailingDiff = 0;
int totalCases0 = 0;
int totalCases1 = 0;
int totalCases2 = 0;
while ((values = timeseriesFileReader.nextBlock()) != null) {
ByteBufferBitOutput output = new ByteBufferBitOutput();
LossyCompressor32 compressor = new LossyCompressor32(output, logOfError);
values.forEach(value -> compressor.addValue(value.floatValue()));
compressor.close();
totalSize += compressor.getSize();
totalBlocks += 1;
totalTrailingDiff += compressor.getTrailingDiff();
totalCases0 += compressor.getCases()[0];
totalCases1 += compressor.getCases()[1];
totalCases2 += compressor.getCases()[2];
ByteBuffer byteBuffer = output.getByteBuffer();
byteBuffer.flip();
ByteBufferBitInput input = new ByteBufferBitInput(byteBuffer);
Decompressor32 d = new Decompressor32(input);
for(Double value : values) {
maxValue = value > maxValue ? value : maxValue;
minValue = value < minValue ? value : minValue;
Value pair = d.readValue();
double precisionError = Math.abs(value.doubleValue() - pair.getFloatValue());
maxPrecisionError = (precisionError > maxPrecisionError) ? precisionError : maxPrecisionError;
assertEquals(value.doubleValue(), pair.getFloatValue(), Math.pow(2, logOfError), "Value did not match");
}
assertNull(d.readValue());
}
float total = totalCases0 + totalCases1 + totalCases2;
System.out.println(String.format("Lossy32 %s - Size : %d, Bits/value: %.2f, error: %f, Range: %.2f, (%.2f%%), Avg. Unexploited Trailing: %.2f, Cases 0: %.2f, 10: %.2f, 11: %.2f",
filename, totalSize, totalSize / (totalBlocks * TimeseriesFileReader.DEFAULT_BLOCK_SIZE), maxPrecisionError, (maxValue - minValue), 100* maxPrecisionError / (maxValue - minValue), totalTrailingDiff / totalCases1, totalCases0 / total, totalCases1 / total, totalCases2 / total));
}
}

@Test
public void testPrecisionLossy32ForBaselWindSpeed() throws IOException {

for (int logOfError = -10; logOfError < 10; logOfError++) {
String filename = "/basel-wind-speed.csv.gz";
TimeseriesFileReader timeseriesFileReader = new TimeseriesFileReader(this.getClass().getResourceAsStream(filename));
Collection<Double> values;
double maxValue = Double.MIN_VALUE;
double minValue = Double.MAX_VALUE;
double maxPrecisionError = 0;
int totalSize = 0;
float totalBlocks = 0;
float totalTrailingDiff = 0;
int totalCases0 = 0;
int totalCases1 = 0;
int totalCases2 = 0;
while ((values = timeseriesFileReader.nextBlock()) != null) {
ByteBufferBitOutput output = new ByteBufferBitOutput();
LossyCompressor32 compressor = new LossyCompressor32(output, logOfError);
values.forEach(value -> compressor.addValue(value.floatValue()));
compressor.close();
totalSize += compressor.getSize();
totalBlocks += 1;
totalTrailingDiff += compressor.getTrailingDiff();
totalCases0 += compressor.getCases()[0];
totalCases1 += compressor.getCases()[1];
totalCases2 += compressor.getCases()[2];
ByteBuffer byteBuffer = output.getByteBuffer();
byteBuffer.flip();
ByteBufferBitInput input = new ByteBufferBitInput(byteBuffer);
Decompressor32 d = new Decompressor32(input);
for(Double value : values) {
maxValue = value > maxValue ? value : maxValue;
minValue = value < minValue ? value : minValue;
Value pair = d.readValue();
double precisionError = Math.abs(value.doubleValue() - pair.getFloatValue());
maxPrecisionError = (precisionError > maxPrecisionError) ? precisionError : maxPrecisionError;
assertEquals(value.doubleValue(), pair.getFloatValue(), Math.pow(2, logOfError), "Value did not match");
}
assertNull(d.readValue());
}
float total = totalCases0 + totalCases1 + totalCases2;
System.out.println(String.format("Lossy32 %s - Size : %d, Bits/value: %.2f, error: %f, Range: %.2f, (%.2f%%), Avg. Unexploited Trailing: %.2f, Cases 0: %.2f, 10: %.2f, 11: %.2f",
filename, totalSize, totalSize / (totalBlocks * TimeseriesFileReader.DEFAULT_BLOCK_SIZE), maxPrecisionError, (maxValue - minValue), 100* maxPrecisionError / (maxValue - minValue), totalTrailingDiff / totalCases1, totalCases0 / total, totalCases1 / total, totalCases2 / total));
}
}

}

0 comments on commit 76dff33

Please sign in to comment.