Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: PPL command - WMA Trendline #3293

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public <R, C> R accept(AbstractNodeVisitor<R, C> nodeVisitor, C context) {
}

public enum TrendlineType {
SMA
SMA,
WMA
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,20 @@
import com.google.common.collect.ImmutableMap.Builder;
import java.time.Instant;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.ast.tree.Trendline;
import org.opensearch.sql.data.model.ExprDoubleValue;
import org.opensearch.sql.data.model.ExprTupleValue;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.model.ExprValueUtils;
Expand Down Expand Up @@ -106,16 +110,28 @@ private Map<String, ExprValue> consumeInputTuple(ExprValue inputValue) {

private static TrendlineAccumulator createAccumulator(
Pair<Trendline.TrendlineComputation, ExprCoreType> computation) {
// Add a switch statement based on computation type to choose the accumulator when more
// types of computations are supported.
return new SimpleMovingAverageAccumulator(computation.getKey(), computation.getValue());
return switch (computation.getKey().getComputationType()) {
case SMA -> new SimpleMovingAverageAccumulator(computation.getKey(), computation.getValue());
case WMA -> new WeightedMovingAverageAccumulator(
computation.getKey(), computation.getValue());
};
}
andy-k-improving marked this conversation as resolved.
Show resolved Hide resolved

/** Maintains stateful information for calculating the trendline. */
private interface TrendlineAccumulator {
void accumulate(ExprValue value);
private abstract static class TrendlineAccumulator<C extends Collection<ExprValue>> {
andy-k-improving marked this conversation as resolved.
Show resolved Hide resolved

ExprValue calculate();
protected final LiteralExpression dataPointsNeeded;

protected final C receivedValues;

private TrendlineAccumulator(LiteralExpression dataPointsNeeded, C receivedValues) {
this.dataPointsNeeded = dataPointsNeeded;
this.receivedValues = receivedValues;
}

abstract void accumulate(ExprValue value);

abstract ExprValue calculate();

static ArithmeticEvaluator getEvaluator(ExprCoreType type) {
andy-k-improving marked this conversation as resolved.
Show resolved Hide resolved
switch (type) {
Expand All @@ -133,16 +149,16 @@ static ArithmeticEvaluator getEvaluator(ExprCoreType type) {
}
}

private static class SimpleMovingAverageAccumulator implements TrendlineAccumulator {
private final LiteralExpression dataPointsNeeded;
private final EvictingQueue<ExprValue> receivedValues;
private static class SimpleMovingAverageAccumulator
extends TrendlineAccumulator<Queue<ExprValue>> {
private final ArithmeticEvaluator evaluator;
private Expression runningTotal = null;

public SimpleMovingAverageAccumulator(
Trendline.TrendlineComputation computation, ExprCoreType type) {
dataPointsNeeded = DSL.literal(computation.getNumberOfDataPoints().doubleValue());
receivedValues = EvictingQueue.create(computation.getNumberOfDataPoints());
super(
DSL.literal(computation.getNumberOfDataPoints().doubleValue()),
EvictingQueue.create(computation.getNumberOfDataPoints()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this logic be moved up to the parent class? Seems like it is duplicated in both the SMA and WMA cases, with the exception that the SMA sub-class uses an EvictingQueue while the WMA sub-class uses a LinkedList. Can't they both use the same data structure?

evaluator = TrendlineAccumulator.getEvaluator(type);
}

Expand Down Expand Up @@ -187,6 +203,103 @@ public ExprValue calculate() {
}
}

private static class WeightedMovingAverageAccumulator
extends TrendlineAccumulator<ArrayList<ExprValue>> {
private final WmaTrendlineEvaluator evaluator;

public WeightedMovingAverageAccumulator(
Trendline.TrendlineComputation computation, ExprCoreType type) {
super(
DSL.literal(computation.getNumberOfDataPoints()),
new ArrayList<>(computation.getNumberOfDataPoints()));
this.evaluator = getWmaEvaluator(type);
}

static WmaTrendlineEvaluator getWmaEvaluator(ExprCoreType type) {
return switch (type) {
case DOUBLE -> NumericWmaEvaluator.INSTANCE;
andy-k-improving marked this conversation as resolved.
Show resolved Hide resolved
case DATE, TIMESTAMP -> TimeStampWmaEvaluator.INSTANCE;
case TIME -> TimeWmaEvaluator.INSTANCE;
default -> throw new IllegalArgumentException(
String.format("Invalid type %s used for weighted moving average.", type.typeName()));
};
}

@Override
public void accumulate(ExprValue value) {
receivedValues.add(value);
if (receivedValues.size() > dataPointsNeeded.valueOf().integerValue()) {
receivedValues.removeFirst();
}
}

@Override
public ExprValue calculate() {
if (receivedValues.size() < dataPointsNeeded.valueOf().integerValue()) {
return null;
andy-k-improving marked this conversation as resolved.
Show resolved Hide resolved
} else if (dataPointsNeeded.valueOf().integerValue() == 1) {
return receivedValues.getFirst();
}
return evaluator.evaluate(receivedValues);
}

private static class NumericWmaEvaluator implements WmaTrendlineEvaluator {

private static final NumericWmaEvaluator INSTANCE = new NumericWmaEvaluator();

@Override
public ExprValue evaluate(ArrayList<ExprValue> receivedValues) {
double sum = 0D;
int totalWeight = (receivedValues.size() * (receivedValues.size() + 1)) / 2;
for (int i = 0; i < receivedValues.size(); i++) {
sum += receivedValues.get(i).doubleValue() * ((i + 1D) / totalWeight);
}
return new ExprDoubleValue(sum);
}
}
Copy link
Contributor

@currantw currantw Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to have been quite an explosion of classes here! Some ideas:

  • Do we need to do this with singleton classes? Rather than an Evaluator class with a single evaluate method, would it make sense to instead store a reference to a evaluate method that takes a List of ExprValues and returns the resulting ExprValue? Don't know what is more common, but it seems like a lot to have seven separate Evaluator classes; perhaps seven Evaluator method would be more manageable?
  • As alluded to above, could this take List<ExprValue> instead of an ArrayList?
  • As mentioned in a previous comment, I think LinkedList might be more efficient for this purpose -- but not if we iterator over it like this! Are we able to use an iterator to do so? Then we would get O(1) access for both a LinkedList or an ArrayList. See below for how I think this could look.
  • As mentioned in a previous comment, SMA and WMA are pretty much the same ... only the weights change. Would it be possible to change the evaluate signature so that it takes a List<ExprValue> and a List<Double> of weights? i.e. evaluate(List<ExprValue> values, List<Double> weights)? This would have the added advantage that, in the case of WMA, we wouldn't need to re-calculate the weights every time.
  public ExprValue evaluate(List<ExprValue> values) {

    // Calculate weights
    int n = values.size();
    double denominator = (double) (n * (n + 1)) / 2.0;
    List<Double> weights = IntStream.range(n, 0).mapToDouble(i -> (double) i / denominator).boxed().toList().reversed();

    // Calculate weighted average.
    double average = 0.0;
    
    Iterator<ExprValue> valuesIterator = values.iterator();
    Iterator<Double> weightsIterator = weights.iterator();
    
    while(valuesIterator.hasNext() && weightsIterator.hasNext()) {
      average += valuesIterator.next().doubleValue() * weightsIterator.next();
    }

    return new ExprDoubleValue(average);
  }

Copy link
Contributor Author

@andy-k-improving andy-k-improving Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the point for too many Evaluator, however indeed these are two distinct types of Evaluator, in the case of SMA, Evaluator not just take the new item, but also read the running-total in order to avoid re-computation, and all evaluators under the SMA umbrella take this into consideration especially on the API signature level, ex:

public ExprValue evaluate(Expression runningTotal, LiteralExpression numberOfDataPoints) {
      return DSL.divide(runningTotal, numberOfDataPoints).valueOf();
    }

Also the method calculateFirstTotal is unique to SMA:

@Override
    public Expression calculateFirstTotal(List<ExprValue> dataPoints) {
      Expression total = DSL.literal(0.0D);
      for (ExprValue dataPoint : dataPoints) {
        total = DSL.add(total, DSL.literal(dataPoint.doubleValue()));
      }
      return DSL.literal(total.valueOf().doubleValue());
    }

However in contrast, WMA don't share the same characteristic, which re-computation is required upon every update, also, the concept of running-total is not applicable here.

Regarding the concern of too many evaluators, I have updated to move all WMA related evaludator inside of class WeightedMovingAverageAccumulator in order to further narrow down the scope.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Good explanation for why SMA and WMA accumulators should be separate - thanks for that!

For WMA, I still think there is opportunity to combine some common logic. The weights should only need to be calculated once - can we pass them directly to the Evaluator from the Accumulator? Moreover, as I (tried to) describe in this comment, can we extract the common logic from all the WMA accumulators (related applying the weights to the values), and only have the different parts (mapping each value to a number, mapping the result back to the right ExprValue) split out into the different implementations.

As mentioned elsewhere, I think we should also use an iterator for these loops, so that we can get O(1) access for a linked list or queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Above make sense, and I have now using BiFunction to replace original usage of custom interface of wmaEvalulator along with the static class creation.
Also I have moved out the logic of totalWeight calculation out from respective function call, as that is common to all calculation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Above make sense, and I have now using BiFunction to replace original usage of custom interface of wmaEvalulator along with the static class creation.

Thanks. This looks good to me.

Also I have moved out the logic of totalWeight calculation out from respective function call, as that is common to all calculation.

I like that you have moved the totalWeight (the denominator) so that it doesn't need to be re-calculated each time. However, I think it is possible to move all the weight calculations out of these functions, and just pass a list containing all the weights to the function (i.e. to store n "complete" weights values as a WeightedMovingAverageAccumulator member).

I also think it is possible to extract a the common logic for determining sum into another helper function: as mentioned in a previous comment, that logic is the same, except for mapping the ExprValue list to longs.

Let me know if you want to discuss either.


private static class TimeStampWmaEvaluator implements WmaTrendlineEvaluator {

private static final TimeStampWmaEvaluator INSTANCE = new TimeStampWmaEvaluator();

@Override
public ExprValue evaluate(ArrayList<ExprValue> receivedValues) {
long sum = 0L;
int totalWeight = (receivedValues.size() * (receivedValues.size() + 1)) / 2;
for (int i = 0; i < receivedValues.size(); i++) {
sum +=
(long)
(receivedValues.get(i).timestampValue().toEpochMilli()
* ((i + 1D) / totalWeight));
}

return ExprValueUtils.timestampValue(Instant.ofEpochMilli((sum)));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At it's heart, most of the logic between NumericWmaEvaluator, TimeWmaEvaluator, and TimeStampWmaEvaluator seems to be common: all the logic for determining the weights and calculating the weights average is the same, the only differences are (a) the type (double vs. long), and (b) how we get the numerical value from the ExprValue, and (c) how we convert the resulting weighted average back into an ExprValue.

Would it be possible to extract all of this common logic to WmaTrendlineEvaluator in a helper method like Long evaluateHelper(Collection<Long> values), and then have each sub-class only (i) convert a collection of ExprValue to a collection of Long, (ii) call evaluateHelper (pick a better name, obviously) to get the weighted average as a Long, and then (iii) convert that Long back into an ExprValue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in another comment, still think there is common logic that can be extracted. Please let me know if you want to discuss.

}

private static class TimeWmaEvaluator implements WmaTrendlineEvaluator {

private static final TimeWmaEvaluator INSTANCE = new TimeWmaEvaluator();

@Override
public ExprValue evaluate(ArrayList<ExprValue> receivedValues) {
long sum = 0L;
int totalWeight = (receivedValues.size() * (receivedValues.size() + 1)) / 2;
for (int i = 0; i < receivedValues.size(); i++) {
sum +=
(long)
(MILLIS.between(LocalTime.MIN, receivedValues.get(i).timeValue())
* ((i + 1D) / totalWeight));
}
return ExprValueUtils.timeValue(LocalTime.MIN.plus(sum, MILLIS));
}
}

private interface WmaTrendlineEvaluator {
ExprValue evaluate(ArrayList<ExprValue> receivedValues);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As alluded to in other comments, I'm not sure we need separate Evaluators for simple and weights moving averages, if we design the interface right.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explanation of this in another comments. Please resolve. ✅

}

private interface ArithmeticEvaluator {
Expression calculateFirstTotal(List<ExprValue> dataPoints);

Expand Down
Loading
Loading