Skip to content

Commit

Permalink
Separate ValueCompression from GorillaCompressor and allow the usage of
Browse files Browse the repository at this point in the history
different value predictors. Include DFCM as another predictor
possibility.

(cherry picked from commit 6d0d61a)
  • Loading branch information
burmanm committed Sep 7, 2017
1 parent dbfc30d commit 077ddc8
Show file tree
Hide file tree
Showing 8 changed files with 326 additions and 108 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package fi.iki.yak.ts.compression.gorilla;

import fi.iki.yak.ts.compression.gorilla.predictors.LastValuePredictor;

/**
* Implements a slightly modified version of the time series compression as described in the Facebook's Gorilla
* Paper.
Expand All @@ -8,9 +10,6 @@
*/
public class GorillaCompressor {

private int storedLeadingZeros = Integer.MAX_VALUE;
private int storedTrailingZeros = 0;
private long storedVal = 0;
private long storedTimestamp = 0;
private int storedDelta = 0;

Expand All @@ -24,20 +23,19 @@ public class GorillaCompressor {

private BitOutput out;

private ValueCompressor valueCompressor;

public GorillaCompressor(long timestamp, BitOutput output) {
this(timestamp, output, new LastValuePredictor());
}

public GorillaCompressor(long timestamp, BitOutput output, Predictor predictor) {
blockTimestamp = timestamp;
out = output;
addHeader(timestamp);
this.valueCompressor = new ValueCompressor(output, predictor);
}

// public void compressLongStream(Stream<Pair> stream) {
// stream.peek(p -> writeFirst(p.getTimestamp(), Double.doubleToRawLongBits(p.getDoubleValue()))).skip(1)
// .forEach(p -> {
// compressTimestamp(p.getTimestamp());
// compressValue(p.getLongValue());
// });
// }

private void addHeader(long timestamp) {
out.writeBits(timestamp, 64);
}
Expand All @@ -53,7 +51,7 @@ public void addValue(long timestamp, long value) {
writeFirst(timestamp, value);
} else {
compressTimestamp(timestamp);
compressValue(value);
valueCompressor.compressValue(value);
}
}

Expand All @@ -69,16 +67,15 @@ public void addValue(long timestamp, double value) {
return;
}
compressTimestamp(timestamp);
compressValue(Double.doubleToRawLongBits(value));
valueCompressor.compressValue(Double.doubleToRawLongBits(value));
}

private void writeFirst(long timestamp, long value) {
storedDelta = (int) (timestamp - blockTimestamp);
storedTimestamp = timestamp;
storedVal = value;

out.writeBits(storedDelta, FIRST_DELTA_BITS);
out.writeBits(storedVal, 64);
valueCompressor.writeFirst(value);
}

/**
Expand All @@ -105,8 +102,6 @@ private void compressTimestamp(long timestamp) {
int newDelta = (int) (timestamp - storedTimestamp);
int deltaD = newDelta - storedDelta;

// TODO Fluctuating values will cause always 64 bits write (-2, +2 for example), zigzag could fix it..

if(deltaD == 0) {
out.skipBit();
} else {
Expand Down Expand Up @@ -165,65 +160,4 @@ public static int encodeZigZag32(final int n) {
}

// END: From protobuf

private void compressValue(long value) {
long xor = storedVal ^ value;

if(xor == 0) {
// Write 0
out.skipBit();
} else {
int leadingZeros = Long.numberOfLeadingZeros(xor);
int trailingZeros = Long.numberOfTrailingZeros(xor);

out.writeBit(); // Optimize to writeNewLeading / writeExistingLeading?

if(leadingZeros >= storedLeadingZeros && trailingZeros >= storedTrailingZeros) {
writeExistingLeading(xor);
} else {
writeNewLeading(xor, leadingZeros, trailingZeros);
}
}

storedVal = value;
}

/**
* If there at least as many leading zeros and as many trailing zeros as previous value, control bit = 0 (type a)
* store the meaningful XORed value
*
* @param xor XOR between previous value and current
*/
private void writeExistingLeading(long xor) {
out.skipBit();

int significantBits = 64 - storedLeadingZeros - storedTrailingZeros;
xor >>>= storedTrailingZeros;
out.writeBits(xor, significantBits);
}

/**
* store the length of the number of leading zeros in the next 5 bits
* store length of the meaningful XORed value in the next 6 bits,
* store the meaningful bits of the XORed value
* (type b)
*
* @param xor XOR between previous value and current
* @param leadingZeros New leading zeros
* @param trailingZeros New trailing zeros
*/
private void writeNewLeading(long xor, int leadingZeros, int trailingZeros) {
out.writeBit();

// Different from version 1.x, use (significantBits - 1) in storage - avoids a branch
int significantBits = 64 - leadingZeros - trailingZeros;

// Different from original, bits 5 -> 6, avoids a branch, allows storing small longs
out.writeBits(leadingZeros, 6); // Number of leading zeros in the next 6 bits
out.writeBits(significantBits - 1, 6); // Length of meaningful bits in the next 6 bits
out.writeBits(xor >>> trailingZeros, significantBits); // Store the meaningful bits of XOR

storedLeadingZeros = leadingZeros;
storedTrailingZeros = trailingZeros;
}
}
Original file line number Diff line number Diff line change
@@ -1,27 +1,31 @@
package fi.iki.yak.ts.compression.gorilla;

import fi.iki.yak.ts.compression.gorilla.predictors.LastValuePredictor;

/**
* Decompresses a compressed stream created by the GorillaCompressor.
*
* @author Michael Burman
*/
public class GorillaDecompressor {

private int storedLeadingZeros = Integer.MAX_VALUE;
private int storedTrailingZeros = 0;
private long storedVal = 0;
private long storedTimestamp = 0;
private long storedDelta = 0;

private long blockTimestamp = 0;

private long storedVal = 0;
private boolean endOfStream = false;

private BitInput in;
private ValueDecompressor decompressor;

public GorillaDecompressor(BitInput input) {
this(input, new LastValuePredictor());
}

public GorillaDecompressor(BitInput input, Predictor predictor) {
in = input;
readHeader();
this.decompressor = new ValueDecompressor(input, predictor);
}

private void readHeader() {
Expand Down Expand Up @@ -60,7 +64,8 @@ private void first() {
endOfStream = true;
return;
}
storedVal = in.getLong(64);
storedVal = decompressor.readFirst();
// storedVal = in.getLong(64);
storedTimestamp = blockTimestamp + storedDelta;
}

Expand All @@ -72,7 +77,7 @@ private void nextTimestamp() {
switch(readInstruction) {
case 0x00:
storedTimestamp = storedDelta + storedTimestamp;
nextValue();
storedVal = decompressor.nextValue();
return;
case 0x02:
deltaDelta = in.getLong(7);
Expand Down Expand Up @@ -101,29 +106,7 @@ private void nextTimestamp() {
storedDelta = storedDelta + deltaDelta;

storedTimestamp = storedDelta + storedTimestamp;
nextValue();
}

private void nextValue() {
int val = in.nextClearBit(2);

switch(val) {
case 3:
// New leading and trailing zeros
storedLeadingZeros = (int) in.getLong(6);

byte significantBits = (byte) in.getLong(6);
significantBits++;

storedTrailingZeros = 64 - significantBits - storedLeadingZeros;
// missing break is intentional, we want to overflow to next one
case 2:
long value = in.getLong(64 - storedLeadingZeros - storedTrailingZeros);
value <<= storedTrailingZeros;
value = storedVal ^ value;
storedVal = value;
break;
}
storedVal = decompressor.nextValue();
}

// START: From protobuf
Expand Down
37 changes: 37 additions & 0 deletions src/main/java/fi/iki/yak/ts/compression/gorilla/Predictor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fi.iki.yak.ts.compression.gorilla;

/**
* @author miburman
*/
public interface Predictor {

/**
* Give the real value
*
* @param value Long / bits of Double
*/
void update(long value);

/**
* Predicts the next value
*
* @return Predicted value
*/
long predict();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package fi.iki.yak.ts.compression.gorilla;

import fi.iki.yak.ts.compression.gorilla.predictors.LastValuePredictor;

/**
* ValueCompressor for the Gorilla encoding format. Supply with long presentation of the value,
* in case of doubles use Double.doubleToRawLongBits(value)
*
* @author Michael Burman
*/
public class ValueCompressor {
private int storedLeadingZeros = Integer.MAX_VALUE;
private int storedTrailingZeros = 0;

private Predictor predictor;
private BitOutput out;

public ValueCompressor(BitOutput out) {
this(out, new LastValuePredictor());
}

public ValueCompressor(BitOutput out, Predictor predictor) {
this.out = out;
this.predictor = predictor;
}

void writeFirst(long value) {
predictor.update(value);
out.writeBits(value, 64);
}

protected void compressValue(long value) {
// In original Gorilla, Last-Value predictor is used
long diff = predictor.predict() ^ value;
predictor.update(value);

if(diff == 0) {
// Write 0
out.skipBit();
} else {
int leadingZeros = Long.numberOfLeadingZeros(diff);
int trailingZeros = Long.numberOfTrailingZeros(diff);

out.writeBit(); // Optimize to writeNewLeading / writeExistingLeading?

if(leadingZeros >= storedLeadingZeros && trailingZeros >= storedTrailingZeros) {
writeExistingLeading(diff);
} else {
writeNewLeading(diff, leadingZeros, trailingZeros);
}
}
}

/**
* If there at least as many leading zeros and as many trailing zeros as previous value, control bit = 0 (type a)
* store the meaningful XORed value
*
* @param xor XOR between previous value and current
*/
private void writeExistingLeading(long xor) {
out.skipBit();

int significantBits = 64 - storedLeadingZeros - storedTrailingZeros;
xor >>>= storedTrailingZeros;
out.writeBits(xor, significantBits);
}

/**
* store the length of the number of leading zeros in the next 5 bits
* store length of the meaningful XORed value in the next 6 bits,
* store the meaningful bits of the XORed value
* (type b)
*
* @param xor XOR between previous value and current
* @param leadingZeros New leading zeros
* @param trailingZeros New trailing zeros
*/
private void writeNewLeading(long xor, int leadingZeros, int trailingZeros) {
out.writeBit();

// Different from version 1.x, use (significantBits - 1) in storage - avoids a branch
int significantBits = 64 - leadingZeros - trailingZeros;

// Different from original, bits 5 -> 6, avoids a branch, allows storing small longs
out.writeBits(leadingZeros, 6); // Number of leading zeros in the next 6 bits
out.writeBits(significantBits - 1, 6); // Length of meaningful bits in the next 6 bits
out.writeBits(xor >>> trailingZeros, significantBits); // Store the meaningful bits of XOR

storedLeadingZeros = leadingZeros;
storedTrailingZeros = trailingZeros;
}
}
Loading

0 comments on commit 077ddc8

Please sign in to comment.