Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fold processing stage #66

Open
jroper opened this issue Jul 6, 2018 · 9 comments
Open

Fold processing stage #66

jroper opened this issue Jul 6, 2018 · 9 comments
Milestone

Comments

@jroper
Copy link
Contributor

jroper commented Jul 6, 2018

I think a folding processing stage would be good. There are a number of use cases that I've come up with, particularly in IoT scenarios.

Let's say you have an IoT device that every few seconds sends the status of a door, ie, whether the door is opened or closed. You want to convert this status to an action, ie, a message should only be sent when the door changes from opened to closed, or vice versa, so that you can then do further processing down the stream without overwhelming your system with these status messages every few seconds. This is what it might look like:

public enum DoorStatus {
  Open, Closed;
  public DoorAction toAction() {
    switch (this) {
      case Open: return DoorAction.Opened;
      case Closed: return DoorAction.Closed;
    }
  }
}
public enum DoorAction {
  Opened, Closed;
}

@Incoming(provider = WebSocket.class, "door-status")
@Outgoing(provider = Kafka.class, "door-action")
public ProcessorBuilder<DoorStatus, DoorAction> convertDoorStatusToAction() {
  return ReactiveStreams.<DoorStatus>builder()
    // Fold to keep the last two statuses in an array,
    // we use an array because Java doesn't have tuples
    .fold(new DoorStatus[0], (prevTwo, next) -> {
      if (prevTwo.length == 0) return { next };
      else return { next, prevTwo[0] };
    })
    // Now convert each pair of consecutive statuses to an optional action
    .flatMapIterable(pair -> {
      // If it's the first one (due to new connection), consider the first one an action
      if (pair.length == 1) return List.of(pair[0].toAction());
      else if (pair[0] == pair[1]) return List.of();
      else return List.of(pair[0].toAction());
    });
}

Another use case might be that you have a temperature sensor, and you want to output a running average. This can be done in a very similar way to above.

One of the things that makes this not so nice is Java doesn't have a concept of tuples - typically when you do a fold, you want to fold into a tuple of your state, and an event that you want to output, and then a subsequent map will just emit the event and drop the state. But without tuple support in Java, you have to either do like the above, hack it using arrays, or create a new class just to hold your tuple. Of course, there are libraries out there that provide tuples, eg Akka provides Pair, though it's still cumbersome without any sort of syntactic sugaring around it provided by the language.

@cescoffier
Copy link
Contributor

About fold, yes, it's an interesting stage. So no problem for me.

About tuples and pairs, Java does not provide anything, and yes it's a pain. Implementing pair is easy. Implementing tuples is a bit more complicated because of the lack of type (or you need Tuple3<A,B,C>, Tuple4<A,B,C,D>, Tuple5<...>....

Now I'm not sure if the stream spec is the right place to define such utility classes but I don't have a strong opinion. For sure these classes are very useful when dealing with streaming / messaging.

@cescoffier
Copy link
Contributor

Forgot to mention our favorite topic: naming.

the fold operation is named:

  • fold in Akka Stream (scan being a terminal operation in Akka Stream, equivalent to our reduce) - also provide an async variant where the accumulator returns a CompletionStage
  • scan in Reactor (3 variants, no async version)
  • scan in RX Java (3 variants, no async version)

For the record, fold seems to be the most used word in the mathematical world: https://en.wikipedia.org/wiki/Fold_(higher-order_function)

I would go for fold, with a note in the javadoc that this operation can be called reduce or scan (also a mention to the reduce terminal operation).

About the variant, here are the candidates:

  • fold(BiFunction<T,T,T> accumulator) - use the first item as seed
  • fold(R initialValue, BiFunction<R,? super T,R> accumulator)
  • foldWith(Callable<R> seedSupplier, BiFunction<R,? super T,R> accumulator)
  • foldAsync(R initialValue, BiFunction<R,? super T, CompletionStage<R>> accumulator)

Another detail is whether or not the seed value is emitted in the downstream stream.

@jroper
Copy link
Contributor Author

jroper commented Jul 9, 2018

Definitely agree that streams is not the right place to define tuples.

With regards to naming - we also must consider, currently we have reduce, collect and friends, which are also folds, however those ones are terminal operations. The fold being proposed here is not a terminal operation, it emits its state on each element. But what if we also want to introduce a fold as a terminal operation? How do we distinguish folds that emit their state on each element from folds that terminate the stream and only emit their state on completion?

Akka Streams does this by making all folds that are terminal defined as constructors of Sink, so you construct your sink first and then pass it to a Flow or Source, while all folds that emit their state on each element are defined on Flow and Source. The problem with this for our API is that that approach would be a significant departure from the JDK8 streams API.

Maybe this is a non-issue, collect and reduce imply that they are terminal operations in their naming, you're not exactly reducing or collecting if you emit the same number of elements, so it's only a problem for the fold name itself. One option could be to use fold as a terminal operation, and scan as an operation that emits its state on each element. Though that is the opposite of Akka Streams, there's a potential for confusion there.

When you mention whether the seed value is emited in the downstream stream, do you mean whether the next element is separated out from the state and emitted? This is where tuples are useful, with a tuple (assuming language sugar with (T1, T2) syntax) you can easily:

<R, S> PublisherBuilder<R> fold(S initial, BiFunction<S, ? super T, (S, R)> accumulator);

Without a tuple, we'd have to create a type just for it:

class FoldResult<S, R> {
  private final S state;
  private final R result;
  ...
}
<R, S> PublisherBuilder<R> fold(S initial, BiFunction<S, ? super T, FoldResult<S, R>> accumulator);

Another possibility is to do what Collector does, and have a finisher function, except that the finisher gets applied to each element, eg:

<R, S> PublisherBuilder<R> fold(S initial, BiFunction<S, ? super T, S> accumulator, Function<S, R> finisher);

I don't really think that's necessary, since the finisher is no different to doing a map immediately after.

@cescoffier
Copy link
Contributor

Naming: I believe the fold or scan names are fine. I don't think we need a specific way to indicate it's a terminal operation or an intermediate operation. The signature indicates it. We can extend the Javadoc to make things clearer.

Emission: If you look at the scan method from RX (especially the marble picture), the operator emits the seed as the first value. The second value is computed using the accumulator. My question is whether we want the same behavior and emits the seed value or wait until we get the first value and call the accumulator.

@OndroMih
Copy link

What about making it explicit which methods are terminal? E.g. prefixing them with endWith or do such as endWithRedurce() or doReduce() ? It would be easy to autocomplete all terminal methods in an IDE and also look them up in the javadocs as they would be next to each other.

@jroper
Copy link
Contributor Author

jroper commented Jul 10, 2018

My primary concern with differentiation between terminal and intermediate operations is actually about future proofing. If we add fold today as an intermediate operation, but in future we decide we want a fold terminal operation, what can we do? Likewise, currently we have reduce as a terminal operation, what happens if there's a good reason in future to also have a reduce intermediate operation? Anyway, I'm not sure that it necessarily has to be addressed, just worth having a discussion about.

That RX Java behaviour is very weird. Reactor does the same. I don't know what the use case for that would be. Eg, calculate a running average, initial value is irrelevant. Emit an event on change of value, initial value is irrelevant. Calculate a running total, no one cares about zero, it's uninteresting.

@OndroMih
Copy link

@cescoffier: I don't think we need a specific way to indicate it's a terminal operation

I have an opposite opinion - for a person that doesn't use streams daily such as me, it's very hard to remember the names of terminal operations. I often end up going through all the stream/observable methods just to find something that might do what I want and the only differentiator between terminal and non-terminal operations is the method return type and it's very hard to filter them out, the IDE or Javadocs don't help there

@jroper
Copy link
Contributor Author

jroper commented Jul 10, 2018

Other potential prefixes for terminal stages:

  • to. We already have to(Subscriber) and to(SubscriberBuilder). toReduce? It's not very fluent.
  • run. Probably not a good idea given that we already have CompletionRunner.run, then it would be runReduce(...).run().
  • collect. Collect is actually the only terminal stage we support (at least currently). Everything terminates into a collector. So collect* are all specializations of collect, eg collectReduce.

I'm not too keen on any of the above, thoughts?

@cescoffier
Copy link
Contributor

As the goal is to be as close as possible from the Java Stream API, using collect* would be the closest. But not sure I like it. We have other terminal stages such as findFirst and Cancel` where it does not make sense.

@jroper jroper removed the streams label Aug 10, 2018
@cescoffier cescoffier added this to the 1.1 milestone Nov 6, 2018
@Emily-Jiang Emily-Jiang modified the milestones: 1.1, 2.0 Jun 24, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants