`@Override public boolean fromStream(final Stream<T> stream) { stream.collect(Collectors.toCollection(() -> distributor)); return true; }` mekaes me think that topic collects the whole input stream.