diff --git a/pom.xml b/pom.xml index 13825f7..a9a9a2e 100644 --- a/pom.xml +++ b/pom.xml @@ -51,6 +51,11 @@ + + commons-lang + commons-lang + 2.6 + org.junit.jupiter junit-jupiter-engine @@ -134,17 +139,6 @@ false - - org.apache.maven.plugins - maven-javadoc-plugin - - - - jar - - - - org.apache.maven.plugins maven-source-plugin diff --git a/src/main/java/gr/aueb/compression/gorilla/DecompressorPmcMr.java b/src/main/java/gr/aueb/compression/gorilla/DecompressorPmcMr.java new file mode 100644 index 0000000..121b177 --- /dev/null +++ b/src/main/java/gr/aueb/compression/gorilla/DecompressorPmcMr.java @@ -0,0 +1,49 @@ +package gr.aueb.compression.gorilla; + +import java.util.List; + +import gr.aueb.compression.gorilla.PmcMR.Constant; + +public class DecompressorPmcMr { + + private List constants; + private float storedVal = 0f; + private boolean endOfStream = false; + private int currentElement = 0; + private int currentTimestampOffset = 0; + + public DecompressorPmcMr(List constants) { + this.constants = constants; + } + + /** + * Returns the next pair in the time series, if available. + * + * @return Pair if there's next value, null if series is done. + */ + public Float readValue() { + next(); + if(endOfStream) { + return null; + } + return storedVal; + } + + private void next() { + Constant constant = constants.get(currentElement); + if (constant.getFinalTimestamp() >= (constant.getInitialTimestamp() + currentTimestampOffset)) { + storedVal = constant.getValue(); + currentTimestampOffset++; + } else { + currentElement++; + if (currentElement < constants.size()) { + constant = constants.get(currentElement); + storedVal = constant.getValue(); + currentTimestampOffset = 1; + } else { + endOfStream = true; + } + } + } + +} \ No newline at end of file diff --git a/src/main/java/gr/aueb/compression/gorilla/DecompressorSwingFilter.java b/src/main/java/gr/aueb/compression/gorilla/DecompressorSwingFilter.java new file mode 100644 index 0000000..3ba3812 --- /dev/null +++ b/src/main/java/gr/aueb/compression/gorilla/DecompressorSwingFilter.java @@ -0,0 +1,50 @@ +package gr.aueb.compression.gorilla; + +import java.util.List; + +import gr.aueb.compression.gorilla.PmcMR.Constant; +import gr.aueb.compression.gorilla.SwingFilter.SwingSegment; + +public class DecompressorSwingFilter { + + private List swingSegments; + private float storedVal = 0f; + private boolean endOfStream = false; + private int currentElement = 0; + private int currentTimestampOffset = 0; + + public DecompressorSwingFilter(List swingSegments) { + this.swingSegments = swingSegments; + } + + /** + * Returns the next pair in the time series, if available. + * + * @return Pair if there's next value, null if series is done. + */ + public Float readValue() { + next(); + if(endOfStream) { + return null; + } + return storedVal; + } + + private void next() { + SwingSegment swingSegment = swingSegments.get(currentElement); + if (swingSegment.getFinalTimestamp() >= (swingSegment.getInitialTimestamp() + currentTimestampOffset)) { + storedVal = swingSegment.getLine().get(swingSegment.getInitialTimestamp() + currentTimestampOffset); + currentTimestampOffset++; + } else { + currentElement++; + if (currentElement < swingSegments.size()) { + swingSegment = swingSegments.get(currentElement); + storedVal = swingSegment.getLine().get(swingSegment.getInitialTimestamp()); + currentTimestampOffset = 1; + } else { + endOfStream = true; + } + } + } + +} \ No newline at end of file diff --git a/src/main/java/gr/aueb/compression/gorilla/LinearFunction.java b/src/main/java/gr/aueb/compression/gorilla/LinearFunction.java new file mode 100644 index 0000000..f81e16d --- /dev/null +++ b/src/main/java/gr/aueb/compression/gorilla/LinearFunction.java @@ -0,0 +1,32 @@ +/* Copyright 2018 The ModelarDB Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gr.aueb.compression.gorilla; + +public class LinearFunction { + + /** Constructors **/ + public LinearFunction(long ts, float vs, long te, float ve) { + this.a = (ve - vs) / (te - ts); + this.b = vs - a * ts; + } + + /** Public Methods **/ + public float get(long ts) { + return this.a * ts + this.b; + } + + /** Instance Variables **/ + public final float a, b; +} diff --git a/src/main/java/gr/aueb/compression/gorilla/LossyCompressor32.java b/src/main/java/gr/aueb/compression/gorilla/LossyCompressor32.java index 69ee91c..59d4ab4 100644 --- a/src/main/java/gr/aueb/compression/gorilla/LossyCompressor32.java +++ b/src/main/java/gr/aueb/compression/gorilla/LossyCompressor32.java @@ -78,6 +78,15 @@ public void close() { } private void compressValue(int value) { + // if values is within error wrt the previous value, use the previous value + if (Math.abs(Float.intBitsToFloat(value) - Float.intBitsToFloat(storedVal)) < Math.pow(2, this.logOfError)) { + // Write 0 + cases[0] += 1; + out.skipBit(); + size += 1; + return; + } + // TODO Fix already compiled into a big method int integerDigits = (value << 1 >>> 24) - 127; int space = 23 + this.logOfError - integerDigits; diff --git a/src/main/java/gr/aueb/compression/gorilla/PmcMR.java b/src/main/java/gr/aueb/compression/gorilla/PmcMR.java new file mode 100644 index 0000000..779b350 --- /dev/null +++ b/src/main/java/gr/aueb/compression/gorilla/PmcMR.java @@ -0,0 +1,87 @@ +package gr.aueb.compression.gorilla; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +public class PmcMR { + + + public List filter(Collection points, float epsilon) { + + List constants = new ArrayList<>(); + Constant currentConstant = null; + + float max = Float.MIN_VALUE; + float min = Float.MAX_VALUE; + + for (Point point : points) { + + if (point.getValue() > max) { + max = point.getValue(); + } + if (point.getValue() < min) { + min = point.getValue(); + } + + if (max - min <= epsilon && currentConstant != null) { + currentConstant.setFinalTimestamp(point.getTimestamp()); + currentConstant.setValue(max - ((max - min) / 2)); + } else { + if (currentConstant != null) { + constants.add(currentConstant); + } + max = point.getValue(); + min = point.getValue(); + currentConstant = new Constant(); + currentConstant.setInitialTimestamp(point.getTimestamp()); + currentConstant.setFinalTimestamp(point.getTimestamp()); + currentConstant.setValue(point.getValue()); + } + + } + if (currentConstant != null) { + constants.add(currentConstant); + } + + return constants; + } + + public class Constant { + + private long initialTimestamp; + private long finalTimestamp; + private float value; + + public void setFinalTimestamp(long finalTimestamp) { + this.finalTimestamp = finalTimestamp; + } + + public void setInitialTimestamp(long initialTimestamp) { + this.initialTimestamp = initialTimestamp; + } + + public void setValue(float value) { + this.value = value; + } + + public long getFinalTimestamp() { + return finalTimestamp; + } + + public long getInitialTimestamp() { + return initialTimestamp; + } + + public float getValue() { + return value; + } + + @Override + public String toString() { + return String.format("%d-%d: %f", getInitialTimestamp(), getFinalTimestamp(), getValue()); + } + + } + +} diff --git a/src/main/java/gr/aueb/compression/gorilla/Point.java b/src/main/java/gr/aueb/compression/gorilla/Point.java new file mode 100644 index 0000000..0b01187 --- /dev/null +++ b/src/main/java/gr/aueb/compression/gorilla/Point.java @@ -0,0 +1,21 @@ +package gr.aueb.compression.gorilla; + +public class Point { + + private final long timestamp; + private final float value; + + public Point(long timestamp, float value) { + this.timestamp = timestamp; + this.value = value; + } + + public long getTimestamp() { + return timestamp; + } + + public float getValue() { + return value; + } + +} diff --git a/src/main/java/gr/aueb/compression/gorilla/SwingFilter.java b/src/main/java/gr/aueb/compression/gorilla/SwingFilter.java new file mode 100644 index 0000000..68b7fd4 --- /dev/null +++ b/src/main/java/gr/aueb/compression/gorilla/SwingFilter.java @@ -0,0 +1,186 @@ +package gr.aueb.compression.gorilla; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +public class SwingFilter { + + + + public List filter(Collection points, float epsilon) { + + List swingSegments = new ArrayList<>(); + + Point first = null; + LinearFunction uiOld = null; + LinearFunction liOld = null; + Point last = null; + + for (Point point : points) { + last = point; + if (first == null) { + first = point; + } + else { + if (uiOld != null && liOld !=null && (uiOld.get(point.getTimestamp()) < point.getValue() || liOld.get(point.getTimestamp()) > point.getValue())) { + System.out.println("need to start new line"); + LinearFunction line = new LinearFunction(first.getTimestamp(), first.getValue(), point.getTimestamp(), (uiOld.get(point.getTimestamp()) + liOld.get(point.getTimestamp()) / 2)); + swingSegments.add(new SwingSegment(first.getTimestamp(), point.getTimestamp(), line)); + uiOld = null; + liOld = null; + } + + LinearFunction uiNew = new LinearFunction(first.getTimestamp(), first.getValue(), point.getTimestamp(), point.getValue() + epsilon); + LinearFunction liNew = new LinearFunction(first.getTimestamp(), first.getValue(), point.getTimestamp(), point.getValue() - epsilon); + + if (uiOld == null || uiOld.get(point.getTimestamp()) > uiNew.get(point.getTimestamp())) { + uiOld = uiNew; + System.out.println("resetting upper"); + } + if (liOld == null || liOld.get(point.getTimestamp()) < liNew.get(point.getTimestamp())) { + liOld = liNew; + System.out.println("resetting lower"); + } + } + } + + if (uiOld != null && liOld !=null && (uiOld.get(last.getTimestamp()) < last.getValue() || liOld.get(last.getTimestamp()) > last.getValue())) { + System.out.println("need to start new line"); + LinearFunction line = new LinearFunction(first.getTimestamp(), first.getValue(), last.getTimestamp(), (uiOld.get(last.getTimestamp()) + liOld.get(last.getTimestamp()) / 2)); + swingSegments.add(new SwingSegment(first.getTimestamp(), last.getTimestamp(), line)); + uiOld = null; + liOld = null; + } + + return swingSegments; + } + + + public class SwingSegment { + + private long initialTimestamp; + private long finalTimestamp; + private LinearFunction line; + + public SwingSegment(long initialTimestamp, long finalTimestamp, LinearFunction line) { + this.initialTimestamp = initialTimestamp; + this.finalTimestamp = finalTimestamp; + this.line = line; + } + + public long getFinalTimestamp() { + return finalTimestamp; + } + + public long getInitialTimestamp() { + return initialTimestamp; + } + + public LinearFunction getLine() { + return line; + } + + @Override + public String toString() { + return String.format("%d-%d: %f", getInitialTimestamp(), getFinalTimestamp(), getLine()); + } + + } + + /** + * + * + * // initialization +1. (t1,X1) = getNext();(t2,X2) = getNext(); +2. Make a recording: (t0’,X0’) = (t1,X1); +3. Start a new filtering interval with ui1 passing through (t1,X1) +and (t2,X2+Vd(i,εi)); and li1 passing through (t1,X1) and (t2,X2- +Vd(i,εi)), for every dimension xi, i∈[1,d]; +4. set k = 1; +//main loop +5. while (true) +6. (tj,Xj) = getNext(); +7. if (tj,Xj) is null or (tj,Xj) is more than εi above uik or below lik +in the xi dimension for any i∈[1,d] //recording mechanism +8. Make a new recording: (tk,Xk), such that tk=tj-1, xik falls +between uik and lik, and xik minimizes Eik, for every +dimension xi, i∈[1,d]; +9. Start a new filtering interval with ui(k+1) passing through +(tk,Xk) and (tj,Xj+Vd(i,εi)); and li(k+1) passing through (tk,xk) +and (tj,Xj-Vd(i,εi)); +10. set k = k+1; +11. if (tj,Xj) is null //end of signal +12. return; +13. else //filtering mechanism +14. for each dimension xi, i∈[1,d] +15. if (tj,Xj) falls more than εi above lik in the xi dimension +16. “Swing up” lik such that it passes through (tk,xk) and +(tj,Xj-Vd(i,εi)); +17. if (tj,Xj) falls more than εi below uik in the xi dimension +18. “Swing down” uik such that it passes through (tk,xk) +and (tj,Xj+Vd(i,εi)); + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + lines = [] + line_first_timestamp, line_first_value = None, None + coefficients_up, coefficients_down = None, None + polynomial_up, polynomial_down = None, None + for timestamp, value in ts.items(): + if polynomial_up is not None and polynomial_down is not None: + up_val = value + epsilon + down_val = value - epsilon + up_lim = polynomial_up(timestamp) + down_lim = polynomial_down(timestamp) + + if (not math.isclose(up_val, up_lim) and up_val > up_lim and not math.isclose(down_val, up_lim) and down_val > up_lim) or\ + (not math.isclose(up_val, down_lim) and up_val < down_lim and not math.isclose(down_val, down_lim) and down_val < down_lim): + lines.append([line_first_timestamp, np.polyfit(x=[line_first_timestamp, previous_timestamp], y=[line_first_value, previous_value], deg=1)]) + line_first_timestamp, line_first_value = None, None + coefficients_up, coefficients_down = None, None + polynomial_up, polynomial_down = None, None + + if line_first_timestamp is None and line_first_value is None: + line_first_timestamp, line_first_value = timestamp, value + continue + + coefficients_up_temp = np.polyfit(x=[line_first_timestamp, timestamp], y=[line_first_value, value + epsilon], deg=1) + coefficients_down_temp = np.polyfit(x=[line_first_timestamp, timestamp], y=[line_first_value, value - epsilon], deg=1) + polynomial_up_temp = np.poly1d(coefficients_up_temp) + polynomial_down_temp = np.poly1d(coefficients_down_temp) + + if coefficients_up is None or coefficients_down is None: + coefficients_up = coefficients_up_temp + coefficients_down = coefficients_down_temp + polynomial_up = np.poly1d(coefficients_up) + polynomial_down = np.poly1d(coefficients_down) + if polynomial_up_temp(timestamp) < polynomial_up(timestamp): + coefficients_up = coefficients_up_temp + polynomial_up = np.poly1d(coefficients_up) + if polynomial_down_temp(timestamp) > polynomial_down(timestamp): + coefficients_down = coefficients_down_temp + polynomial_down = np.poly1d(coefficients_down) + previous_timestamp = timestamp + previous_value = value + + # Raises a warning if there is one point only, line_first_timestamp == timestamp + lines.append([line_first_timestamp, np.polyfit(x=[line_first_timestamp, timestamp], y=[line_first_value, value], deg=1)]) + + * + * */ + +} diff --git a/src/test/java/gr/aueb/compression/gorilla/CompressTest.java b/src/test/java/gr/aueb/compression/gorilla/CompressTest.java index 41397ff..4eed26a 100644 --- a/src/test/java/gr/aueb/compression/gorilla/CompressTest.java +++ b/src/test/java/gr/aueb/compression/gorilla/CompressTest.java @@ -11,6 +11,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.zip.GZIPInputStream; import org.junit.jupiter.api.Test; @@ -18,6 +19,8 @@ import fi.iki.yak.ts.compression.gorilla.ByteBufferBitInput; import fi.iki.yak.ts.compression.gorilla.ByteBufferBitOutput; import fi.iki.yak.ts.compression.gorilla.Compressor; +import gr.aueb.compression.gorilla.PmcMR.Constant; +import gr.aueb.compression.gorilla.SwingFilter.SwingSegment; /** * These are generic tests to test that input matches the output after compression + decompression cycle, using @@ -332,5 +335,172 @@ public void testPrecisionLossy32ForBaselWindSpeed() throws IOException { filename, totalSize, totalSize / (totalBlocks * TimeseriesFileReader.DEFAULT_BLOCK_SIZE), maxPrecisionError, (maxValue - minValue), 100* maxPrecisionError / (maxValue - minValue), totalTrailingDiff / totalCases1, totalCases0 / total, totalCases1 / total, totalCases2 / total)); } } + + @Test + public void testPmcMRFilterForBaselWindSpeed() throws IOException { + for (int logOfError = -10; logOfError < 10; logOfError++) { + String filename = "/basel-wind-speed.csv.gz"; + TimeseriesFileReader timeseriesFileReader = new TimeseriesFileReader(this.getClass().getResourceAsStream(filename)); + Collection values; + double maxValue = Double.MIN_VALUE; + double minValue = Double.MAX_VALUE; + int timestamp = 0; + double maxPrecisionError = 0; + int totalSize = 0; + float totalBlocks = 0; + while ((values = timeseriesFileReader.nextBlock()) != null) { + Collection points = new ArrayList<>(); + for (Double value : values) { + points.add(new Point(timestamp++, value.floatValue())); + } + List constants = new PmcMR().filter(points, ((float) Math.pow(2, logOfError))); + + totalBlocks += 1; + totalSize += constants.size() * 2 * 32; + + DecompressorPmcMr d = new DecompressorPmcMr(constants); + + for(Double value : values) { + maxValue = value > maxValue ? value : maxValue; + minValue = value < minValue ? value : minValue; + Float decompressedValue = d.readValue(); + double precisionError = Math.abs(value.doubleValue() - decompressedValue); + maxPrecisionError = (precisionError > maxPrecisionError) ? precisionError : maxPrecisionError; + assertEquals(value.doubleValue(), decompressedValue, Math.pow(2, logOfError), "Value did not match"); + } + } + System.out.println(String.format("PMC-MR %s - Size : %d, Bits/value: %.2f, error: %f, Range: %.2f, (%.2f%%)", + filename, totalSize, totalSize / (totalBlocks * TimeseriesFileReader.DEFAULT_BLOCK_SIZE), maxPrecisionError, (maxValue - minValue), 100* maxPrecisionError / (maxValue - minValue))); + } + + } + + @Test + public void testPmcMRFilterForBaselTemp() throws IOException { + for (int logOfError = -10; logOfError < 10; logOfError++) { + String filename = "/basel-temp.csv.gz"; + TimeseriesFileReader timeseriesFileReader = new TimeseriesFileReader(this.getClass().getResourceAsStream(filename)); + Collection values; + double maxValue = Double.MIN_VALUE; + double minValue = Double.MAX_VALUE; + int timestamp = 0; + double maxPrecisionError = 0; + int totalSize = 0; + float totalBlocks = 0; + while ((values = timeseriesFileReader.nextBlock()) != null) { + Collection points = new ArrayList<>(); + for (Double value : values) { + points.add(new Point(timestamp++, value.floatValue())); + } + List constants = new PmcMR().filter(points, ((float) Math.pow(2, logOfError))); + + totalBlocks += 1; + totalSize += constants.size() * 2 * 32; + + DecompressorPmcMr d = new DecompressorPmcMr(constants); + + for(Double value : values) { + maxValue = value > maxValue ? value : maxValue; + minValue = value < minValue ? value : minValue; + Float decompressedValue = d.readValue(); + double precisionError = Math.abs(value.doubleValue() - decompressedValue); + maxPrecisionError = (precisionError > maxPrecisionError) ? precisionError : maxPrecisionError; + assertEquals(value.doubleValue(), decompressedValue, Math.pow(2, logOfError), "Value did not match"); + } + + } + System.out.println(String.format("PMC-MR %s - Size : %d, Bits/value: %.2f, error: %f, Range: %.2f, (%.2f%%)", + filename, totalSize, totalSize / (totalBlocks * TimeseriesFileReader.DEFAULT_BLOCK_SIZE), maxPrecisionError, (maxValue - minValue), 100* maxPrecisionError / (maxValue - minValue))); + } + + } + + @Test + public void testSwingFilterSimple() throws IOException { + for (int logOfError = -1; logOfError < 0; logOfError++) { + Collection values = new ArrayList<>(); + values.add(0.0); + values.add(3.2399998); + values.add(1.08); + values.add(1.1384199); + values.add(3.4152596); + values.add(4.3349743); + double maxValue = Double.MIN_VALUE; + double minValue = Double.MAX_VALUE; + int timestamp = 0; + double maxPrecisionError = 0; + int totalSize = 0; + float totalBlocks = 0; + Collection points = new ArrayList<>(); + for (Double value : values) { + points.add(new Point(timestamp++, value.floatValue())); + } + List lines = new SwingFilter().filter(points, ((float) Math.pow(2, logOfError))); + + totalBlocks += 1; + totalSize += lines.size() * 3 * 32; + + DecompressorSwingFilter d = new DecompressorSwingFilter(lines); + + for(Double value : values) { + maxValue = value > maxValue ? value : maxValue; + minValue = value < minValue ? value : minValue; + Float decompressedValue = d.readValue(); + double precisionError = Math.abs(value.doubleValue() - decompressedValue); + maxPrecisionError = (precisionError > maxPrecisionError) ? precisionError : maxPrecisionError; + System.out.println(value.doubleValue() + " " + decompressedValue); + assertEquals(value.doubleValue(), decompressedValue, Math.pow(2, logOfError), "Value did not match"); + + System.out.println(String.format("Lossy32 %s - Size : %d, Bits/value: %.2f", + "simple", totalSize, totalSize / (totalBlocks * TimeseriesFileReader.DEFAULT_BLOCK_SIZE))); + } + } + } + + //@Test + public void testPmcMRfilterSimple() throws IOException { + for (int logOfError = -10; logOfError < 10; logOfError++) { + Collection values = new ArrayList<>(); + values.add(0.0); + values.add(3.2399998); + values.add(1.08); + values.add(1.1384199); + values.add(3.4152596); + values.add(4.3349743); + double maxValue = Double.MIN_VALUE; + double minValue = Double.MAX_VALUE; + int timestamp = 0; + double maxPrecisionError = 0; + int totalSize = 0; + float totalBlocks = 0; + Collection points = new ArrayList<>(); + for (Double value : values) { + points.add(new Point(timestamp++, value.floatValue())); + } + List constants = new PmcMR().filter(points, ((float) Math.pow(2, logOfError))); + + totalBlocks += 1; + totalSize += constants.size() * 2 * 32; + + DecompressorPmcMr d = new DecompressorPmcMr(constants); + + for(Double value : values) { + maxValue = value > maxValue ? value : maxValue; + minValue = value < minValue ? value : minValue; + Float decompressedValue = d.readValue(); + double precisionError = Math.abs(value.doubleValue() - decompressedValue); + maxPrecisionError = (precisionError > maxPrecisionError) ? precisionError : maxPrecisionError; + System.out.println(value.doubleValue() + " " + decompressedValue); + assertEquals(value.doubleValue(), decompressedValue, Math.pow(2, logOfError), "Value did not match"); + } + + System.out.println(String.format("Lossy32 %s - Size : %d, Bits/value: %.2f", + "simple", totalSize, totalSize / (totalBlocks * TimeseriesFileReader.DEFAULT_BLOCK_SIZE))); + } + + + + } + }