Skip to content

Commit

Permalink
added 32-bit support
Browse files Browse the repository at this point in the history
  • Loading branch information
panagiotisl committed Mar 12, 2022
1 parent 405543d commit 99eeb0a
Show file tree
Hide file tree
Showing 7 changed files with 743 additions and 137 deletions.
63 changes: 5 additions & 58 deletions src/main/java/fi/iki/yak/ts/compression/gorilla/Compressor.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,15 @@ public class Compressor {
private int storedLeadingZeros = Integer.MAX_VALUE;
private int storedTrailingZeros = 0;
private long storedVal = 0;
private long storedTimestamp = 0;
private long storedDelta = 0;
private boolean first = true;

private long blockTimestamp = 0;

public final static short FIRST_DELTA_BITS = 27;
// public final static short FIRST_DELTA_BITS = 27;

private BitOutput out;

// We should have access to the series?
public Compressor(long timestamp, BitOutput output) {
blockTimestamp = timestamp;
out = output;
addHeader(timestamp);
}

private void addHeader(long timestamp) {
// One byte: length of the first delta
// One byte: precision of timestamps
// out.writeBits(timestamp, 64);
}

/**
Expand All @@ -40,10 +29,9 @@ private void addHeader(long timestamp) {
* @param value next floating point value in the series
*/
public void addValue(long timestamp, long value) {
if(storedTimestamp == 0) {
if(first) {
writeFirst(timestamp, value);
} else {
// compressTimestamp(timestamp);
compressValue(value);
}
}
Expand All @@ -55,69 +43,28 @@ public void addValue(long timestamp, long value) {
* @param value next floating point value in the series
*/
public void addValue(long timestamp, double value) {
if(storedTimestamp == 0) {
if(first) {
writeFirst(timestamp, Double.doubleToRawLongBits(value));
} else {
// compressTimestamp(timestamp);
compressValue(Double.doubleToRawLongBits(value));
}
}

private void writeFirst(long timestamp, long value) {
storedDelta = timestamp - blockTimestamp;
storedTimestamp = timestamp;
first = false;
storedVal = value;

// out.writeBits(storedDelta, FIRST_DELTA_BITS);
out.writeBits(storedVal, 64);
}

/**
* Closes the block and writes the remaining stuff to the BitOutput.
*/
public void close() {
// These are selected to test interoperability and correctness of the solution, this can be read with go-tsz
addValue(0, Double.NaN);
// out.writeBits(0x0F, 4);
// out.writeBits(0xFFFFFFFF, 32);
out.skipBit();
out.flush();
}

/**
* Difference to the original Facebook paper, we store the first delta as 27 bits to allow
* millisecond accuracy for a one day block.
*
* Also, the timestamp delta-delta is not good for millisecond compressions..
*
* @param timestamp epoch
*/
private void compressTimestamp(long timestamp) {
// a) Calculate the delta of delta
long newDelta = (timestamp - storedTimestamp);
long deltaD = newDelta - storedDelta;

// If delta is zero, write single 0 bit
if(deltaD == 0) {
out.skipBit();
} else if(deltaD >= -63 && deltaD <= 64) {
out.writeBits(0x02, 2); // store '10'
out.writeBits(deltaD, 7); // Using 7 bits, store the value..
} else if(deltaD >= -255 && deltaD <= 256) {
out.writeBits(0x06, 3); // store '110'
out.writeBits(deltaD, 9); // Use 9 bits
} else if(deltaD >= -2047 && deltaD <= 2048) {
out.writeBits(0x0E, 4); // store '1110'
out.writeBits(deltaD, 12); // Use 12 bits
} else {
out.writeBits(0x0F, 4); // Store '1111'
out.writeBits(deltaD, 32); // Store delta using 32 bits
}

storedDelta = newDelta;
storedTimestamp = timestamp;
}

private void compressValue(long value) {
// TODO Fix already compiled into a big method
long xor = storedVal ^ value;
Expand Down
130 changes: 130 additions & 0 deletions src/main/java/fi/iki/yak/ts/compression/gorilla/Compressor32.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package fi.iki.yak.ts.compression.gorilla;

/**
* Implements the time series compression as described in the Facebook's Gorilla Paper. Value compression
* is for floating points only.
*
* @author Michael Burman
*/
public class Compressor32 {

private int storedLeadingZeros = Integer.MAX_VALUE;
private int storedTrailingZeros = 0;
private int storedVal = 0;
private boolean first = true;

// public final static short FIRST_DELTA_BITS = 27;

private BitOutput out;

// We should have access to the series?
public Compressor32(long timestamp, BitOutput output) {
out = output;
}

/**
* Adds a new long value to the series. Note, values must be inserted in order.
*
* @param timestamp Timestamp which is inside the allowed time block (default 24 hours with millisecond precision)
* @param value next floating point value in the series
*/
public void addValue(long timestamp, int value) {
if(first) {
writeFirst(timestamp, value);
} else {
compressValue(value);
}
}

/**
* Adds a new double value to the series. Note, values must be inserted in order.
*
* @param timestamp Timestamp which is inside the allowed time block (default 24 hours with millisecond precision)
* @param value next floating point value in the series
*/
public void addValue(long timestamp, float value) {
if(first) {
writeFirst(timestamp, Float.floatToRawIntBits(value));
} else {
compressValue(Float.floatToRawIntBits(value));
}
}

private void writeFirst(long timestamp, int value) {
first = false;
storedVal = value;
out.writeBits(storedVal, 32);
}

/**
* Closes the block and writes the remaining stuff to the BitOutput.
*/
public void close() {
addValue(0, Float.NaN);
out.skipBit();
out.flush();
}

private void compressValue(int value) {
// TODO Fix already compiled into a big method
int xor = storedVal ^ value;

if(xor == 0) {
// Write 0
out.skipBit();
} else {
int leadingZeros = Integer.numberOfLeadingZeros(xor);
int trailingZeros = Integer.numberOfTrailingZeros(xor);

// Check overflow of leading? Can't be 32!
if(leadingZeros >= 32) {
leadingZeros = 31;
}

// Store bit '1'
out.writeBit();

if(leadingZeros >= storedLeadingZeros && trailingZeros >= storedTrailingZeros) {
writeExistingLeading(xor);
} else {
writeNewLeading(xor, leadingZeros, trailingZeros);
}
}

storedVal = value;
}

/**
* If there at least as many leading zeros and as many trailing zeros as previous value, control bit = 0 (type a)
* store the meaningful XORed value
*
* @param xor XOR between previous value and current
*/
private void writeExistingLeading(int xor) {
out.skipBit();
int significantBits = 32 - storedLeadingZeros - storedTrailingZeros;
out.writeBits(xor >>> storedTrailingZeros, significantBits);
}

/**
* store the length of the number of leading zeros in the next 5 bits
* store length of the meaningful XORed value in the next 6 bits,
* store the meaningful bits of the XORed value
* (type b)
*
* @param xor XOR between previous value and current
* @param leadingZeros New leading zeros
* @param trailingZeros New trailing zeros
*/
private void writeNewLeading(int xor, int leadingZeros, int trailingZeros) {
out.writeBit();
out.writeBits(leadingZeros, 5); // Number of leading zeros in the next 5 bits

int significantBits = 32 - leadingZeros - trailingZeros;
out.writeBits(significantBits, 6); // Length of meaningful bits in the next 6 bits
out.writeBits(xor >>> trailingZeros, significantBits); // Store the meaningful bits of XOR

storedLeadingZeros = leadingZeros;
storedTrailingZeros = trailingZeros;
}
}
85 changes: 7 additions & 78 deletions src/main/java/fi/iki/yak/ts/compression/gorilla/Decompressor.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,7 @@ public class Decompressor {
private int storedLeadingZeros = Integer.MAX_VALUE;
private int storedTrailingZeros = 0;
private long storedVal = 0;
private long storedTimestamp = 0;
private long storedDelta = 0;

private long blockTimestamp = 0;

private boolean first = true;
private boolean endOfStream = false;

private BitInput in;
Expand All @@ -23,18 +19,9 @@ public class Decompressor {

private final static int NAN_INT = 0x7fc00000;

private final static double NAN_DOUBLE = Double.longBitsToDouble(0x7ff8000000000000L);

private final static float NAN_FLOAT = Float.intBitsToFloat(0x7fc00000);


public Decompressor(BitInput input) {
in = input;
// readHeader();
}

private void readHeader() {
blockTimestamp = in.getLong(64);
}

/**
Expand All @@ -47,81 +34,23 @@ public Pair readPair() {
if(endOfStream) {
return null;
}
return new Pair(storedTimestamp, storedVal);
return new Pair(0, storedVal);
}

private void next() {
if (storedTimestamp == 0) {
// First item to read
// storedDelta = in.getLong(Compressor.FIRST_DELTA_BITS);
// if(storedDelta == (1<<27) - 1) {
// endOfStream = true;
// return;
// }
if (first) {
first = false;
storedVal = in.getLong(64);
if (storedVal == NAN_LONG) {
endOfStream =true;
endOfStream = true;
return;
}
// storedTimestamp = blockTimestamp + storedDelta;
storedTimestamp = 1;

} else {
// nextTimestamp();
nextValue();
}
}

private int bitsToRead() {
int val = in.nextClearBit(4);
int toRead = 0;

switch(val) {
case 0x00:
break;
case 0x02:
toRead = 7; // '10'
break;
case 0x06:
toRead = 9; // '110'
break;
case 0x0e:
toRead = 12;
break;
case 0x0F:
toRead = 32;
break;
}

return toRead;
}

private void nextTimestamp() {
// Next, read timestamp
long deltaDelta = 0;
int toRead = bitsToRead();
if (toRead > 0) {
deltaDelta = in.getLong(toRead);

if(toRead == 32) {
if ((int) deltaDelta == 0xFFFFFFFF) {
// End of stream
endOfStream = true;
return;
}
} else {
// Turn "unsigned" long value back to signed one
if(deltaDelta > (1 << (toRead - 1))) {
deltaDelta -= (1 << toRead);
}
}

deltaDelta = (int) deltaDelta;
}

storedDelta = storedDelta + deltaDelta;
storedTimestamp = storedDelta + storedTimestamp;
nextValue();
}

private void nextValue() {
// Read value
if (in.readBit()) {
Expand Down
Loading

0 comments on commit 99eeb0a

Please sign in to comment.