Skip to content

Commit

Permalink
Merge pull request #125 from zalando-nakadi/backport-features-from-v20
Browse files Browse the repository at this point in the history
Backport features from v20
  • Loading branch information
BGehrels authored May 7, 2019
2 parents 2f52dd4 + 22c01b6 commit f4843bf
Show file tree
Hide file tree
Showing 15 changed files with 221 additions and 49 deletions.
2 changes: 2 additions & 0 deletions CODEOWNERS
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# The two maintainers are code owners for everything.
* @bgehrels @epaul
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,21 @@ The example above uses `com.jayway.jsonpath:json-path:jar:2.2.0` to parse and te

Note that you should disable the scheduled event transmission for the test (e.g. by setting `nakadi-producer.scheduled-transmission-enabled:false`), as that might interfere with the manual transmission and the clearing in the test setup, leading to events from one test showing up in the next test, depending on timing issues.

### Customizing event locks

* **lock-duration**: The selected events are locked before transmission. If the transmission fails the events stay locked
until the lock expires. The default is currently 600 seconds but might change in future releases.

* **lock-duration-buffer**: Since clocks never work exactly synchronous and sending events also takes some time, a safety
buffer is included. During the last x seconds before the expiration of the lock the events are not considered for
transmission. The default is currently 60 seconds but might change in future releases.

```yaml
nakadi-producer:
lock-duration: 600
lock-duration-buffer: 60
```

## Contributing

We welcome contributions. Please have a look at our [contribution guidelines](CONTRIBUTING.md).
Expand Down
16 changes: 7 additions & 9 deletions nakadi-producer-spring-boot-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<parent>
<groupId>org.zalando</groupId>
<artifactId>nakadi-producer-reactor</artifactId>
<version>4.3.0</version>
<version>4.4.0</version>
</parent>

<artifactId>nakadi-producer-spring-boot-starter</artifactId>
Expand All @@ -22,7 +22,6 @@
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<zalando-swagger-codegen-maven-plugin.version>0.4.24</zalando-swagger-codegen-maven-plugin.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -53,13 +52,13 @@
<dependency>
<groupId>org.zalando</groupId>
<artifactId>tracer-core</artifactId>
<version>0.11.2</version>
<version>0.17.1</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.zalando.stups</groupId>
<artifactId>tokens</artifactId>
<version>0.10.0</version>
<version>0.12.2</version>
<optional>true</optional>
</dependency>
<dependency>
Expand Down Expand Up @@ -98,9 +97,10 @@
<scope>test</scope>
</dependency>
<dependency>
<!-- needed for the optional spring actuator http endpoints -->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<scope>test</scope>
<optional>true</optional>
</dependency>
</dependencies>

Expand All @@ -124,7 +124,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
Expand All @@ -137,7 +136,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.9.1</version>
<configuration>
<additionalparam>-Xdoclint:none</additionalparam>
</configuration>
Expand All @@ -153,7 +151,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.5</version>
<version>1.6</version>
<executions>
<execution>
<id>sign-artifacts</id>
Expand All @@ -167,7 +165,7 @@
<plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>nexus-staging-maven-plugin</artifactId>
<version>1.6.7</version>
<version>1.6.8</version>
<extensions>true</extensions>
<configuration>
<serverId>ossrh</serverId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ static class ManagementEndpointConfiguration {
@Bean
@ConditionalOnMissingBean
public SnapshotEventCreationEndpoint snapshotEventCreationEndpoint(
SnapshotCreationService snapshotCreationService) {
return new SnapshotEventCreationEndpoint(snapshotCreationService);
SnapshotCreationService snapshotCreationService, FlowIdComponent flowIdComponent) {
return new SnapshotEventCreationEndpoint(snapshotCreationService, flowIdComponent);
}

@Bean
Expand Down Expand Up @@ -197,11 +197,16 @@ public EventTransmissionScheduler eventTransmissionScheduler(EventTransmitter ev
return new EventTransmissionScheduler(eventTransmitter, scheduledTransmissionEnabled);
}

@Bean
public EventTransmissionService eventTransmissionService(EventLogRepository eventLogRepository,
NakadiPublishingClient nakadiPublishingClient, ObjectMapper objectMapper) {
return new EventTransmissionService(eventLogRepository, nakadiPublishingClient, objectMapper);
}
@Bean
public EventTransmissionService eventTransmissionService(
EventLogRepository eventLogRepository,
NakadiPublishingClient nakadiPublishingClient,
ObjectMapper objectMapper,
@Value("${nakadi-producer.lock-duration:600}") int lockDuration,
@Value("${nakadi-producer.lock-duration-buffer:60}") int lockDurationBuffer) {
return new EventTransmissionService(
eventLogRepository, nakadiPublishingClient, objectMapper, lockDuration, lockDurationBuffer);
}

@Bean
public FlywayMigrator flywayMigrator() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ public class NoopFlowIdComponent implements FlowIdComponent {

@Override
public String getXFlowIdValue() {
log.debug("No bean of class FlowIdComponent was found. Returning null.");
return null;
}

@Override
public void startTraceIfNoneExists() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,21 @@ public String getXFlowIdValue() {
}
return null;
}

@Override
public void startTraceIfNoneExists() {
if (tracer != null) {
try {
tracer.get(X_FLOW_ID).getValue();
} catch (IllegalArgumentException e) {
log.warn("No trace was configured for the name {}. Returning null. " +
"To configure Tracer provide an application property: " +
"tracer.traces.X-Flow-ID=flow-id", X_FLOW_ID);
} catch (IllegalStateException e) {
tracer.start();
}
} else {
log.warn("No bean of class Tracer was found.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@

import lombok.AllArgsConstructor;
import lombok.Getter;
import org.zalando.nakadiproducer.flowid.FlowIdComponent;

@ConfigurationProperties("endpoints.snapshot-event-creation")
public class SnapshotEventCreationEndpoint extends AbstractEndpoint<SnapshotEventCreationEndpoint.SnapshotReport> {
private final SnapshotCreationService snapshotCreationService;
private final FlowIdComponent flowIdComponent;

public SnapshotEventCreationEndpoint(SnapshotCreationService snapshotCreationService) {
public SnapshotEventCreationEndpoint(SnapshotCreationService snapshotCreationService, FlowIdComponent flowIdComponent) {
super("snapshot_event_creation", true, true);
this.snapshotCreationService = snapshotCreationService;
this.flowIdComponent = flowIdComponent;
}

@Override
Expand All @@ -23,6 +26,7 @@ public SnapshotReport invoke() {
}

public void invoke(String eventType, String filter) {
flowIdComponent.startTraceIfNoneExists();
snapshotCreationService.createSnapshotEvents(eventType, filter);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package org.zalando.nakadiproducer;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.transaction.annotation.Transactional;
import org.zalando.nakadiproducer.eventlog.EventLogWriter;
import org.zalando.nakadiproducer.eventlog.impl.EventLog;
import org.zalando.nakadiproducer.transmission.MockNakadiPublishingClient;
import org.zalando.nakadiproducer.transmission.impl.EventTransmissionService;
import org.zalando.nakadiproducer.transmission.impl.EventTransmitter;
import org.zalando.nakadiproducer.util.Fixture;

import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Collection;

import static java.time.temporal.ChronoUnit.SECONDS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;

@Transactional
@SpringBootTest(properties = {
"nakadi-producer.scheduled-transmission-enabled:false",
"nakadi-producer.lock-duration:300",
"nakadi-producer.lock-duration-buffer:30"})
public class LockTimeoutIT extends BaseMockedExternalCommunicationIT {
private static final String MY_EVENT_TYPE = "myEventType";

@Autowired
private EventLogWriter eventLogWriter;

@Autowired
private EventTransmitter eventTransmitter;

@Autowired
private EventTransmissionService eventTransmissionService;

@Autowired
private MockNakadiPublishingClient nakadiClient;

@Before
@After
public void clearNakadiEvents() {
mockServiceClock(Instant.now());
eventTransmitter.sendEvents();
nakadiClient.clearSentEvents();
}

@Test
public void testLockedUntil() {
eventLogWriter.fireBusinessEvent(MY_EVENT_TYPE, Fixture.mockPayload(1, "code123"));

Instant timeOfInitialLock = Instant.now();
mockServiceClock(timeOfInitialLock);

assertThat(eventTransmissionService.lockSomeEvents().size(), is(1));
assertThat(eventTransmissionService.lockSomeEvents(), empty());

// lock is still valid
mockServiceClock(timeOfInitialLock.plus(300 - 5, SECONDS));
assertThat(eventTransmissionService.lockSomeEvents(), empty());

// lock is expired
mockServiceClock(timeOfInitialLock.plus(300 + 5, SECONDS));
assertThat(eventTransmissionService.lockSomeEvents().size(), is(1));
}

@Test
public void testLockNearlyExpired() {
eventLogWriter.fireBusinessEvent(MY_EVENT_TYPE, Fixture.mockPayload(1, "code456"));
Instant timeOfInitialLock = Instant.now();

Collection<EventLog> lockedEvent = eventTransmissionService.lockSomeEvents();

// event will not be sent, because the event-lock is "nearlyExpired"
mockServiceClock(timeOfInitialLock.plus(300 - 30 + 5, SECONDS));
eventTransmissionService.sendEvents(lockedEvent);
assertThat(nakadiClient.getSentEvents(MY_EVENT_TYPE), empty());
}

private void mockServiceClock(Instant ins) {
eventTransmissionService.overrideClock(Clock.fixed(ins, ZoneId.systemDefault()));
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package org.zalando.nakadiproducer.flowid;

import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import org.zalando.nakadiproducer.flowid.TracerFlowIdComponent;
import org.zalando.tracer.Tracer;

@RunWith(MockitoJUnitRunner.class)
Expand All @@ -21,10 +22,40 @@ public class TracerFlowIdComponentTest {
@Test
public void makeSureItWorks() {
TracerFlowIdComponent flowIdComponent = new TracerFlowIdComponent(tracer);
Mockito.when(tracer.get("X-Flow-ID").getValue()).thenReturn("A_FUNKY_VALUE");
when(tracer.get("X-Flow-ID").getValue()).thenReturn("A_FUNKY_VALUE");

assertThat(flowIdComponent.getXFlowIdKey(), Matchers.equalTo("X-Flow-ID"));
assertThat(flowIdComponent.getXFlowIdValue(), Matchers.equalTo("A_FUNKY_VALUE"));
}

@Test
public void makeSureTraceWillBeStartedIfNoneHasBeenStartedBefore() {
TracerFlowIdComponent flowIdComponent = new TracerFlowIdComponent(tracer);
when(tracer.get("X-Flow-ID").getValue()).thenThrow(new IllegalStateException());

flowIdComponent.startTraceIfNoneExists();

verify(tracer).start();
}

@Test
public void wontFailIfTraceHasNotBeenConfiguredInStartTrace() {
TracerFlowIdComponent flowIdComponent = new TracerFlowIdComponent(tracer);
when(tracer.get("X-Flow-ID")).thenThrow(new IllegalArgumentException());

flowIdComponent.startTraceIfNoneExists();

// then no exception is thrown
}

@Test
public void makeSureTraceWillNotStartedIfOneExists() {
TracerFlowIdComponent flowIdComponent = new TracerFlowIdComponent(tracer);
when(tracer.get("X-Flow-ID").getValue()).thenReturn("A_FUNKY_VALUE");

flowIdComponent.startTraceIfNoneExists();

verify(tracer, never()).start();
}

}
Loading

0 comments on commit f4843bf

Please sign in to comment.