Skip to content

Improve docs with examples of testing projection handlers #1328

@sean-walsh

Description

@sean-walsh

We came up with a scala example internally and I wrote a java one. This is customer driven, it wasn't apparent that it was even possible to use the testkit to test a projection of List of envelopes. Here is the java example I sent out to the customer.

package example;

import akka.Done;
import akka.NotUsed;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.persistence.query.Offset;
import akka.projection.javadsl.Handler;
import akka.persistence.query.typed.EventEnvelope;
import akka.projection.ProjectionId;

import akka.projection.r2dbc.javadsl.R2dbcHandler;
import akka.projection.testkit.javadsl.ProjectionTestKit;
import akka.projection.testkit.javadsl.TestProjection;
import akka.projection.testkit.javadsl.TestSourceProvider;
import akka.stream.javadsl.Source;
import org.junit.ClassRule;
import org.junit.Test;

import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;

import static org.junit.Assert.assertEquals;

public class GroupedProjectionTest {
    record DeviceInfo(String deviceId) {
    }

    interface DeviceRepository {
        CompletionStage<Done> saveOrUpdate(DeviceInfo deviceInfo);

        CompletionStage<Optional<DeviceInfo>> findById(String id);

        CompletionStage<Optional<Long>> getCount(String id);
    }

    // stub out the db layer and simulate recording item count updates
    static class TestDeviceRepository implements DeviceRepository {
        public final Map<String, DeviceInfo> deviceInfos = new ConcurrentHashMap<>();

        @Override
        public CompletionStage<Done> saveOrUpdate(DeviceInfo deviceInfo) {
            deviceInfos.put(deviceInfo.deviceId(), deviceInfo);
            return CompletableFuture.completedFuture(Done.getInstance());
        }

        @Override
        public CompletionStage<Optional<DeviceInfo>> findById(String id) {
            return CompletableFuture.supplyAsync(() -> Optional.of(deviceInfos.get(id)));
        }

        @Override
        public CompletionStage<Optional<Long>> getCount(String id) {
            long longValue = deviceInfos.size();
            return CompletableFuture.supplyAsync(() -> Optional.of(longValue));
        }
    }

    // The projection handler (note: does not use r2dbc session, uses external repo instead, Spring etc
    static class GroupedProjectionHandler
            extends R2dbcHandler<List<EventEnvelope<DeviceEntity.Event>>> {

        private final String slice;
        private final DeviceRepository deviceRepository;

        public GroupedProjectionHandler(String slice, DeviceRepository deviceRepository) {
            this.slice = slice;
            this.deviceRepository = deviceRepository;
        }

        @Override
        public CompletionStage<Done> process(
                akka.projection.r2dbc.javadsl.R2dbcSession session, List<EventEnvelope<DeviceEntity.Event>> envelopes) {

            for (EventEnvelope<DeviceEntity.Event> envelope : envelopes) {
                DeviceEntity.Event event = envelope.event();

                if (event instanceof DeviceEntity.DeviceRegistered deviceRegistered) {
                    deviceRepository.saveOrUpdate(new DeviceInfo(deviceRegistered.deviceId));
                } else {
                    System.out.println("Unknown event type: " + event.getClass().getName());
                }
            }

            return CompletableFuture.completedFuture(Done.getInstance());
        }
    }

    @ClassRule
    public static final TestKitJunitResource testKit = new TestKitJunitResource();
    ProjectionTestKit projectionTestKit = ProjectionTestKit.create(testKit.system());

    @Test
    public void MyProjectionHandlerTest() {
        Source<List<EventEnvelope<DeviceEntity.Event>>, NotUsed> events =
                Source.from(
                        List.of(
                                Arrays.asList(
                                        createEnvelope(new DeviceEntity.DeviceRegistered("device1"), 0L),
                                        createEnvelope(new DeviceEntity.DeviceRegistered("device2"), 1L),
                                        createEnvelope(new DeviceEntity.DeviceRegistered("device3"), 2L),
                                        createEnvelope(new DeviceEntity.DeviceRegistered("device4"), 3L)
                                )
                        ));

        TestDeviceRepository repository = new TestDeviceRepository();
        ProjectionId projectionId = ProjectionId.of("device", "devices-1");

        TestSourceProvider<Offset, List<EventEnvelope<DeviceEntity.Event>>> sourceProvider =
                TestSourceProvider.create(events, (e) -> e.getLast().offset());

        TestProjection<Offset, List<EventEnvelope<DeviceEntity.Event>>> projection =
                TestProjection.create(
                        projectionId,
                        sourceProvider,
                        () -> toAsyncHandler(new GroupedProjectionHandler("devices-0", repository)));

        projectionTestKit.run(
                projection,
                () -> {
                    assertEquals(4, repository.deviceInfos.size());
                });
    }

    private <T> EventEnvelope<T> createEnvelope(T event, long seqNo) {
        return EventEnvelope.apply(
                Offset.sequence(seqNo), "persistenceId", seqNo, event, 0L, "Device", 1);
    }

    private Handler<List<EventEnvelope<DeviceEntity.Event>>> toAsyncHandler(
            GroupedProjectionHandler deviceHandler) {
        return new Handler<List<EventEnvelope<DeviceEntity.Event>>>() {
            @Override
            public CompletionStage<Done> process(List<EventEnvelope<DeviceEntity.Event>> eventEventEnvelopes)
                    throws Exception {
                return CompletableFuture.supplyAsync(
                        () -> {
                            deviceHandler.process(
                                    // session = null is safe.
                                    // The real handler never uses the session. The connection is provided to the repo
                                    // by Spring itself
                                    null, eventEventEnvelopes);
                            return Done.getInstance();
                        });
            }
        };
    }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions