From 4f4c9c4545b12551c98dba54097b9d5eb7bcdad2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pa=C5=ADlo=20Ebermann?= Date: Thu, 2 Aug 2018 18:05:54 +0200 Subject: [PATCH 01/10] Create CODEOWNERS file Zalando's new Github setup needs this file to define who is allowed to approve what. (cherry picked from commit 3aa45795ddcfb3608875d3f86cf093c5f47ceee0) --- CODEOWNERS | 1 + 1 file changed, 1 insertion(+) create mode 100644 CODEOWNERS diff --git a/CODEOWNERS b/CODEOWNERS new file mode 100644 index 00000000..d45fc72b --- /dev/null +++ b/CODEOWNERS @@ -0,0 +1 @@ +* @bgehrels @pebermann From 5d7b4c61f5c8b7f1a5d162bbb633b971e04f565d Mon Sep 17 00:00:00 2001 From: bgehrels Date: Mon, 6 May 2019 15:49:57 +0200 Subject: [PATCH 02/10] Upgrade depdendencies --- nakadi-producer-spring-boot-starter/pom.xml | 16 +++++++--------- nakadi-producer/pom.xml | 18 ++++++++++-------- .../MockNakadiPublishingClient.java | 12 ++++++------ pom.xml | 8 ++++---- 4 files changed, 27 insertions(+), 27 deletions(-) diff --git a/nakadi-producer-spring-boot-starter/pom.xml b/nakadi-producer-spring-boot-starter/pom.xml index 7966ae8f..851570f8 100644 --- a/nakadi-producer-spring-boot-starter/pom.xml +++ b/nakadi-producer-spring-boot-starter/pom.xml @@ -10,7 +10,7 @@ org.zalando nakadi-producer-reactor - 4.3.0 + 4.3.1 nakadi-producer-spring-boot-starter @@ -22,7 +22,6 @@ 1.8 1.8 1.8 - 0.4.24 @@ -53,13 +52,13 @@ org.zalando tracer-core - 0.11.2 + 0.17.1 true org.zalando.stups tokens - 0.10.0 + 0.12.2 true @@ -98,9 +97,10 @@ test + org.springframework.boot spring-boot-starter-web - test + true @@ -124,7 +124,6 @@ org.apache.maven.plugins maven-source-plugin - 2.2.1 attach-sources @@ -137,7 +136,6 @@ org.apache.maven.plugins maven-javadoc-plugin - 2.9.1 -Xdoclint:none @@ -153,7 +151,7 @@ org.apache.maven.plugins maven-gpg-plugin - 1.5 + 1.6 sign-artifacts @@ -167,7 +165,7 @@ org.sonatype.plugins nexus-staging-maven-plugin - 1.6.7 + 1.6.8 true ossrh diff --git a/nakadi-producer/pom.xml b/nakadi-producer/pom.xml index 04ccaa25..9cf2beaf 100644 --- a/nakadi-producer/pom.xml +++ b/nakadi-producer/pom.xml @@ -10,7 +10,7 @@ org.zalando nakadi-producer-reactor - 4.3.0 + 4.3.1 nakadi-producer @@ -22,7 +22,7 @@ 1.8 1.8 1.8 - 0.4.24 + 2.27.0 @@ -34,7 +34,7 @@ com.fasterxml.jackson.core jackson-databind - 2.8.11.2 + 2.8.11.3 org.slf4j @@ -43,7 +43,12 @@ org.zalando fahrschein - 0.9.1 + 0.17.0 + + + com.google.guava + guava + 27.1-jre org.projectlombok @@ -57,7 +62,7 @@ javax.interceptor javax.interceptor-api - 1.2 + 1.2.2 provided true @@ -70,7 +75,6 @@ org.mockito mockito-core - 2.8.9 test @@ -104,7 +108,6 @@ org.apache.maven.plugins maven-source-plugin - 2.2.1 attach-sources @@ -117,7 +120,6 @@ org.apache.maven.plugins maven-javadoc-plugin - 2.9.1 -Xdoclint:none diff --git a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/MockNakadiPublishingClient.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/MockNakadiPublishingClient.java index 638fd12b..e884504d 100644 --- a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/MockNakadiPublishingClient.java +++ b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/MockNakadiPublishingClient.java @@ -1,11 +1,9 @@ package org.zalando.nakadiproducer.transmission; import java.util.ArrayList; +import java.util.Collection; import java.util.List; -import org.springframework.util.LinkedMultiValueMap; -import org.springframework.util.MultiValueMap; - import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationFeature; @@ -14,10 +12,12 @@ import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.fasterxml.jackson.module.paramnames.ParameterNamesModule; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; public class MockNakadiPublishingClient implements NakadiPublishingClient { private final ObjectMapper objectMapper; - private final MultiValueMap sentEvents = new LinkedMultiValueMap<>(); + private final Multimap sentEvents = ArrayListMultimap.create(); public MockNakadiPublishingClient() { this(createDefaultObjectMapper()); @@ -29,12 +29,12 @@ public MockNakadiPublishingClient(ObjectMapper objectMapper) { @Override public synchronized void publish(String eventType, List nakadiEvents) throws Exception { - nakadiEvents.stream().map(this::transformToJson).forEach(e -> sentEvents.add(eventType, e)); + nakadiEvents.stream().map(this::transformToJson).forEach(e -> sentEvents.put(eventType, e)); } public synchronized List getSentEvents(String eventType) { ArrayList events = new ArrayList<>(); - List sentEvents = this.sentEvents.get(eventType); + Collection sentEvents = this.sentEvents.get(eventType); if (sentEvents != null) { events.addAll(sentEvents); } diff --git a/pom.xml b/pom.xml index 56380d33..16b090ba 100644 --- a/pom.xml +++ b/pom.xml @@ -10,12 +10,12 @@ org.springframework.boot spring-boot-starter-parent - 1.5.3.RELEASE + 1.5.20.RELEASE nakadi-producer-reactor org.zalando - 4.3.0 + 4.3.1 pom Nakadi Event Producer Reactor @@ -40,7 +40,7 @@ org.apache.maven.plugins maven-gpg-plugin - 1.5 + 1.6 sign-artifacts @@ -54,7 +54,7 @@ org.sonatype.plugins nexus-staging-maven-plugin - 1.6.7 + 1.6.8 true ossrh From 12911153b13a045797e71e528fbd2c1a9d6d6974 Mon Sep 17 00:00:00 2001 From: bgehrels Date: Wed, 20 Feb 2019 11:28:06 +0100 Subject: [PATCH 03/10] #116: create a new trace if none has been started yet during snapshot creation (cherry picked from commit b5841a34bd52dad7f12cb2966b718b329bc07635) --- .../NakadiProducerAutoConfiguration.java | 4 +-- .../flowid/NoopFlowIdComponent.java | 5 +++- .../flowid/TracerFlowIdComponent.java | 16 +++++++++++ .../impl/SnapshotEventCreationEndpoint.java | 6 ++++- .../flowid/TracerFlowIdComponentTest.java | 27 ++++++++++++++++--- .../flowid/FlowIdComponent.java | 2 ++ 6 files changed, 53 insertions(+), 7 deletions(-) diff --git a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/NakadiProducerAutoConfiguration.java b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/NakadiProducerAutoConfiguration.java index 29816e52..acca8ec3 100644 --- a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/NakadiProducerAutoConfiguration.java +++ b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/NakadiProducerAutoConfiguration.java @@ -116,8 +116,8 @@ static class ManagementEndpointConfiguration { @Bean @ConditionalOnMissingBean public SnapshotEventCreationEndpoint snapshotEventCreationEndpoint( - SnapshotCreationService snapshotCreationService) { - return new SnapshotEventCreationEndpoint(snapshotCreationService); + SnapshotCreationService snapshotCreationService, FlowIdComponent flowIdComponent) { + return new SnapshotEventCreationEndpoint(snapshotCreationService, flowIdComponent); } @Bean diff --git a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/flowid/NoopFlowIdComponent.java b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/flowid/NoopFlowIdComponent.java index 2617a663..c760861c 100644 --- a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/flowid/NoopFlowIdComponent.java +++ b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/flowid/NoopFlowIdComponent.java @@ -7,7 +7,10 @@ public class NoopFlowIdComponent implements FlowIdComponent { @Override public String getXFlowIdValue() { - log.debug("No bean of class FlowIdComponent was found. Returning null."); return null; } + + @Override + public void startTraceIfNoneExists() { + } } diff --git a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/flowid/TracerFlowIdComponent.java b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/flowid/TracerFlowIdComponent.java index 92cc8745..8b3a552b 100644 --- a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/flowid/TracerFlowIdComponent.java +++ b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/flowid/TracerFlowIdComponent.java @@ -37,4 +37,20 @@ public String getXFlowIdValue() { } return null; } + + @Override + public void startTraceIfNoneExists() { + if (tracer != null) { + try { + tracer.get(X_FLOW_ID).getValue(); + } catch (IllegalArgumentException e) { + tracer.start(); + } catch (IllegalStateException e) { + log.warn("Unexpected Error while checking for an existing Trace Id {}. " + + "Please check your tracer configuration: {}", X_FLOW_ID, e.getMessage()); + } + } else { + log.warn("No bean of class Tracer was found. Returning null."); + } + } } diff --git a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotEventCreationEndpoint.java b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotEventCreationEndpoint.java index 3545fbf9..82b1811b 100644 --- a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotEventCreationEndpoint.java +++ b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotEventCreationEndpoint.java @@ -7,14 +7,17 @@ import lombok.AllArgsConstructor; import lombok.Getter; +import org.zalando.nakadiproducer.flowid.FlowIdComponent; @ConfigurationProperties("endpoints.snapshot-event-creation") public class SnapshotEventCreationEndpoint extends AbstractEndpoint { private final SnapshotCreationService snapshotCreationService; + private final FlowIdComponent flowIdComponent; - public SnapshotEventCreationEndpoint(SnapshotCreationService snapshotCreationService) { + public SnapshotEventCreationEndpoint(SnapshotCreationService snapshotCreationService, FlowIdComponent flowIdComponent) { super("snapshot_event_creation", true, true); this.snapshotCreationService = snapshotCreationService; + this.flowIdComponent = flowIdComponent; } @Override @@ -23,6 +26,7 @@ public SnapshotReport invoke() { } public void invoke(String eventType, String filter) { + flowIdComponent.startTraceIfNoneExists(); snapshotCreationService.createSnapshotEvents(eventType, filter); } diff --git a/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/flowid/TracerFlowIdComponentTest.java b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/flowid/TracerFlowIdComponentTest.java index 8d22f542..a74c4028 100644 --- a/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/flowid/TracerFlowIdComponentTest.java +++ b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/flowid/TracerFlowIdComponentTest.java @@ -1,15 +1,16 @@ package org.zalando.nakadiproducer.flowid; import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import org.hamcrest.Matchers; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Answers; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.runners.MockitoJUnitRunner; -import org.zalando.nakadiproducer.flowid.TracerFlowIdComponent; import org.zalando.tracer.Tracer; @RunWith(MockitoJUnitRunner.class) @@ -21,10 +22,30 @@ public class TracerFlowIdComponentTest { @Test public void makeSureItWorks() { TracerFlowIdComponent flowIdComponent = new TracerFlowIdComponent(tracer); - Mockito.when(tracer.get("X-Flow-ID").getValue()).thenReturn("A_FUNKY_VALUE"); + when(tracer.get("X-Flow-ID").getValue()).thenReturn("A_FUNKY_VALUE"); assertThat(flowIdComponent.getXFlowIdKey(), Matchers.equalTo("X-Flow-ID")); assertThat(flowIdComponent.getXFlowIdValue(), Matchers.equalTo("A_FUNKY_VALUE")); } + @Test + public void makeSureTraceWillBeStartedIfNoneExists() { + TracerFlowIdComponent flowIdComponent = new TracerFlowIdComponent(tracer); + when(tracer.get("X-Flow-ID")).thenThrow(new IllegalArgumentException()); + + flowIdComponent.startTraceIfNoneExists(); + + verify(tracer).start(); + } + + @Test + public void makeSureTraceWillNotStartedIfOneExists() { + TracerFlowIdComponent flowIdComponent = new TracerFlowIdComponent(tracer); + when(tracer.get("X-Flow-ID").getValue()).thenReturn("A_FUNKY_VALUE"); + + flowIdComponent.startTraceIfNoneExists(); + + verify(tracer, never()).start(); + } + } \ No newline at end of file diff --git a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/flowid/FlowIdComponent.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/flowid/FlowIdComponent.java index 8e3922dc..3d582b95 100644 --- a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/flowid/FlowIdComponent.java +++ b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/flowid/FlowIdComponent.java @@ -2,4 +2,6 @@ public interface FlowIdComponent { String getXFlowIdValue(); + + void startTraceIfNoneExists(); } From 11381272ee03710c0279718653d31228f448aae1 Mon Sep 17 00:00:00 2001 From: bgehrels Date: Wed, 20 Feb 2019 17:58:34 +0100 Subject: [PATCH 04/10] #116: fix misleading log message (cherry picked from commit 8843522c593a5c9efb9329ff71e099e2b9782400) --- .../zalando/nakadiproducer/flowid/TracerFlowIdComponent.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/flowid/TracerFlowIdComponent.java b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/flowid/TracerFlowIdComponent.java index 8b3a552b..eb852f6b 100644 --- a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/flowid/TracerFlowIdComponent.java +++ b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/flowid/TracerFlowIdComponent.java @@ -50,7 +50,7 @@ public void startTraceIfNoneExists() { "Please check your tracer configuration: {}", X_FLOW_ID, e.getMessage()); } } else { - log.warn("No bean of class Tracer was found. Returning null."); + log.warn("No bean of class Tracer was found."); } } } From a1963a45cad6e905d27b60779d42e432daaa127b Mon Sep 17 00:00:00 2001 From: bgehrels Date: Tue, 26 Feb 2019 17:30:39 +0100 Subject: [PATCH 05/10] #116: check for the right exception when starting the trace, version bump (cherry picked from commit e72bc253391275ff104fd36378724a642e4f265b) --- .../zalando/nakadiproducer/flowid/TracerFlowIdComponent.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/flowid/TracerFlowIdComponent.java b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/flowid/TracerFlowIdComponent.java index eb852f6b..c9a00923 100644 --- a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/flowid/TracerFlowIdComponent.java +++ b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/flowid/TracerFlowIdComponent.java @@ -43,11 +43,8 @@ public void startTraceIfNoneExists() { if (tracer != null) { try { tracer.get(X_FLOW_ID).getValue(); - } catch (IllegalArgumentException e) { + } catch (IllegalArgumentException|IllegalStateException e) { tracer.start(); - } catch (IllegalStateException e) { - log.warn("Unexpected Error while checking for an existing Trace Id {}. " + - "Please check your tracer configuration: {}", X_FLOW_ID, e.getMessage()); } } else { log.warn("No bean of class Tracer was found."); From 5bceeb3375f48e00caf457e041b7260406c8232d Mon Sep 17 00:00:00 2001 From: bgehrels Date: Mon, 4 Mar 2019 12:35:29 +0100 Subject: [PATCH 06/10] #116: give a waring if the trace could not be started due to configuration issues (cherry picked from commit aa8c1ee776769be539ff00e63639fe8b2e02b880) --- .../flowid/TracerFlowIdComponent.java | 6 +++++- .../flowid/TracerFlowIdComponentTest.java | 14 ++++++++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/flowid/TracerFlowIdComponent.java b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/flowid/TracerFlowIdComponent.java index c9a00923..a2778cd1 100644 --- a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/flowid/TracerFlowIdComponent.java +++ b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/flowid/TracerFlowIdComponent.java @@ -43,7 +43,11 @@ public void startTraceIfNoneExists() { if (tracer != null) { try { tracer.get(X_FLOW_ID).getValue(); - } catch (IllegalArgumentException|IllegalStateException e) { + } catch (IllegalArgumentException e) { + log.warn("No trace was configured for the name {}. Returning null. " + + "To configure Tracer provide an application property: " + + "tracer.traces.X-Flow-ID=flow-id", X_FLOW_ID); + } catch (IllegalStateException e) { tracer.start(); } } else { diff --git a/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/flowid/TracerFlowIdComponentTest.java b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/flowid/TracerFlowIdComponentTest.java index a74c4028..7a86f0d1 100644 --- a/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/flowid/TracerFlowIdComponentTest.java +++ b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/flowid/TracerFlowIdComponentTest.java @@ -29,15 +29,25 @@ public void makeSureItWorks() { } @Test - public void makeSureTraceWillBeStartedIfNoneExists() { + public void makeSureTraceWillBeStartedIfNoneHasBeenStartedBefore() { TracerFlowIdComponent flowIdComponent = new TracerFlowIdComponent(tracer); - when(tracer.get("X-Flow-ID")).thenThrow(new IllegalArgumentException()); + when(tracer.get("X-Flow-ID").getValue()).thenThrow(new IllegalStateException()); flowIdComponent.startTraceIfNoneExists(); verify(tracer).start(); } + @Test + public void wontFailIfTraceHasNotBeenConfiguredInStartTrace() { + TracerFlowIdComponent flowIdComponent = new TracerFlowIdComponent(tracer); + when(tracer.get("X-Flow-ID")).thenThrow(new IllegalArgumentException()); + + flowIdComponent.startTraceIfNoneExists(); + + // then no exception is thrown + } + @Test public void makeSureTraceWillNotStartedIfOneExists() { TracerFlowIdComponent flowIdComponent = new TracerFlowIdComponent(tracer); From 446b6bb5e9a73f56d2dd950e2411bc56e350ee98 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pa=C5=ADlo=20Ebermann?= Date: Tue, 16 Apr 2019 11:22:10 +0200 Subject: [PATCH 07/10] fix my user name in codeowners file Not sure how that happened, and was not noticed until now. (cherry picked from commit 7a6c165aded59a2915e431887a972b144d81ae60) --- CODEOWNERS | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CODEOWNERS b/CODEOWNERS index d45fc72b..9e02ed1e 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -1 +1,2 @@ -* @bgehrels @pebermann +# The two maintainers are code owners for everything. +* @bgehrels @epaul From c86b9f2ce973a4b030054117051a28ac89826470 Mon Sep 17 00:00:00 2001 From: Florian Brons Date: Wed, 3 Apr 2019 16:24:41 +0200 Subject: [PATCH 08/10] #120 make lock-timeout configurable (cherry picked from commit add7ccfcd40895db4238c5dd51e60938bf52b52f) --- README.md | 15 ++++ .../NakadiProducerAutoConfiguration.java | 15 ++-- .../zalando/nakadiproducer/LockTimeoutIT.java | 89 +++++++++++++++++++ .../impl/EventTransmissionService.java | 23 +++-- 4 files changed, 130 insertions(+), 12 deletions(-) create mode 100644 nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/LockTimeoutIT.java diff --git a/README.md b/README.md index ae5d8cc1..b452d73a 100644 --- a/README.md +++ b/README.md @@ -277,6 +277,21 @@ You may also define a spring bean of type `FlywayCallback` and annotate it with interface provide several hook into the schema management lifecycle that may, for example, be used to `SET ROLE migrator` before and `RESET ROLE` after each migration. +### Customizing event locks + +* **lock-duration**: The selected events are locked before transmission. If the transmission fails the events stay locked +until the lock expires (default: 600 seconds). + +* **lock-duration-buffer**: Since clocks never work exactly synchronous and sending events also takes some time, a safety +buffer is included. During the last x seconds before the expiration of the lock the events are not considered for +transmission (default: 60 seconds). + +```yaml +nakadi-producer: + lock-duration: 600 + lock-duration-buffer: 60 +``` + ### Test support This library provides a mock implementation of its Nakadi client that can be used in integration testing: diff --git a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/NakadiProducerAutoConfiguration.java b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/NakadiProducerAutoConfiguration.java index acca8ec3..7d259bf2 100644 --- a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/NakadiProducerAutoConfiguration.java +++ b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/NakadiProducerAutoConfiguration.java @@ -197,11 +197,16 @@ public EventTransmissionScheduler eventTransmissionScheduler(EventTransmitter ev return new EventTransmissionScheduler(eventTransmitter, scheduledTransmissionEnabled); } - @Bean - public EventTransmissionService eventTransmissionService(EventLogRepository eventLogRepository, - NakadiPublishingClient nakadiPublishingClient, ObjectMapper objectMapper) { - return new EventTransmissionService(eventLogRepository, nakadiPublishingClient, objectMapper); - } + @Bean + public EventTransmissionService eventTransmissionService( + EventLogRepository eventLogRepository, + NakadiPublishingClient nakadiPublishingClient, + ObjectMapper objectMapper, + @Value("${nakadi-producer.lock-duration:600}") int lockDuration, + @Value("${nakadi-producer.lock-duration-buffer:60}") int lockDurationBuffer) { + return new EventTransmissionService( + eventLogRepository, nakadiPublishingClient, objectMapper, lockDuration, lockDurationBuffer); + } @Bean public FlywayMigrator flywayMigrator() { diff --git a/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/LockTimeoutIT.java b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/LockTimeoutIT.java new file mode 100644 index 00000000..aeaf24d9 --- /dev/null +++ b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/LockTimeoutIT.java @@ -0,0 +1,89 @@ +package org.zalando.nakadiproducer; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.transaction.annotation.Transactional; +import org.zalando.nakadiproducer.eventlog.EventLogWriter; +import org.zalando.nakadiproducer.eventlog.impl.EventLog; +import org.zalando.nakadiproducer.transmission.MockNakadiPublishingClient; +import org.zalando.nakadiproducer.transmission.impl.EventTransmissionService; +import org.zalando.nakadiproducer.transmission.impl.EventTransmitter; +import org.zalando.nakadiproducer.util.Fixture; + +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; +import java.util.Collection; + +import static java.time.temporal.ChronoUnit.SECONDS; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; + +@Transactional +@SpringBootTest(properties = { + "nakadi-producer.scheduled-transmission-enabled:false", + "nakadi-producer.lock-duration:300", + "nakadi-producer.lock-duration-buffer:30"}) +public class LockTimeoutIT extends BaseMockedExternalCommunicationIT { + private static final String MY_EVENT_TYPE = "myEventType"; + + @Autowired + private EventLogWriter eventLogWriter; + + @Autowired + private EventTransmitter eventTransmitter; + + @Autowired + private EventTransmissionService eventTransmissionService; + + @Autowired + private MockNakadiPublishingClient nakadiClient; + + @Before + @After + public void clearNakadiEvents() { + mockServiceClock(Instant.now()); + eventTransmitter.sendEvents(); + nakadiClient.clearSentEvents(); + } + + @Test + public void testLockedUntil() { + eventLogWriter.fireBusinessEvent(MY_EVENT_TYPE, Fixture.mockPayload(1, "code123")); + + Instant timeOfInitialLock = Instant.now(); + mockServiceClock(timeOfInitialLock); + + assertThat(eventTransmissionService.lockSomeEvents().size(), is(1)); + assertThat(eventTransmissionService.lockSomeEvents(), empty()); + + // lock is still valid + mockServiceClock(timeOfInitialLock.plus(300 - 5, SECONDS)); + assertThat(eventTransmissionService.lockSomeEvents(), empty()); + + // lock is expired + mockServiceClock(timeOfInitialLock.plus(300 + 5, SECONDS)); + assertThat(eventTransmissionService.lockSomeEvents().size(), is(1)); + } + + @Test + public void testLockNearlyExpired() { + eventLogWriter.fireBusinessEvent(MY_EVENT_TYPE, Fixture.mockPayload(1, "code456")); + Instant timeOfInitialLock = Instant.now(); + + Collection lockedEvent = eventTransmissionService.lockSomeEvents(); + + // event will not be sent, because the event-lock is "nearlyExpired" + mockServiceClock(timeOfInitialLock.plus(300 - 30 + 5, SECONDS)); + eventTransmissionService.sendEvents(lockedEvent); + assertThat(nakadiClient.getSentEvents(MY_EVENT_TYPE), empty()); + } + + private void mockServiceClock(Instant ins) { + eventTransmissionService.overrideClock(Clock.fixed(ins, ZoneId.systemDefault())); + } +} \ No newline at end of file diff --git a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionService.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionService.java index 64ad7922..240e6334 100644 --- a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionService.java +++ b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionService.java @@ -23,7 +23,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static java.time.temporal.ChronoUnit.MINUTES; +import static java.time.temporal.ChronoUnit.SECONDS; @Slf4j public class EventTransmissionService { @@ -31,20 +31,29 @@ public class EventTransmissionService { private final EventLogRepository eventLogRepository; private final NakadiPublishingClient nakadiPublishingClient; private final ObjectMapper objectMapper; + private final int lockDuration; + private final int lockDurationBuffer; private Clock clock = Clock.systemDefaultZone(); public EventTransmissionService(EventLogRepository eventLogRepository, NakadiPublishingClient nakadiPublishingClient, ObjectMapper objectMapper) { + this(eventLogRepository, nakadiPublishingClient, objectMapper, 600, 60); + } + + public EventTransmissionService(EventLogRepository eventLogRepository, NakadiPublishingClient nakadiPublishingClient, ObjectMapper objectMapper, + int lockDuration, int lockDurationBuffer) { this.eventLogRepository = eventLogRepository; this.nakadiPublishingClient = nakadiPublishingClient; this.objectMapper = objectMapper; + this.lockDuration = lockDuration; + this.lockDurationBuffer = lockDurationBuffer; } @Transactional public Collection lockSomeEvents() { String lockId = UUID.randomUUID().toString(); - log.debug("Locking events for replication with lockId {}", lockId); - eventLogRepository.lockSomeMessages(lockId, now(), now().plus(10, MINUTES)); + log.debug("Locking events for replication with lockId {} for {} seconds", lockId, lockDuration); + eventLogRepository.lockSomeMessages(lockId, now(), now().plus(lockDuration, SECONDS)); return eventLogRepository.findByLockedByAndLockedUntilGreaterThan(lockId, now()); } @@ -118,10 +127,10 @@ private List collectEids(EventPublishingException e) { } private boolean lockNearlyExpired(EventLog eventLog) { - // since clocks never work exactly synchronous and sending the event also takes some time, we include a minute - // of safety buffer here. This is still not 100% precise, but since we require events to be consumed idempotent, - // sending one event twice wont hurt much. - return now().isAfter(eventLog.getLockedUntil().minus(1, MINUTES)); + // since clocks never work exactly synchronous and sending the event also takes some time, we include a safety + // buffer here. This is still not 100% precise, but since we require events to be consumed idempotent, sending + // one event twice wont hurt much. + return now().isAfter(eventLog.getLockedUntil().minus(lockDurationBuffer, SECONDS)); } private NakadiEvent mapToNakadiEvent(final EventLog event) throws IOException { From 1b1546d3dbdd93d5d9b729c4798df0b7997064ca Mon Sep 17 00:00:00 2001 From: Florian Brons Date: Tue, 16 Apr 2019 09:09:02 +0200 Subject: [PATCH 09/10] #120 make lock-timeout configurable (cherry picked from commit 0b7d7934f759819321274bf9fc4ec93a4ca50b85) --- README.md | 30 +++++++++---------- .../impl/EventTransmissionService.java | 4 --- .../impl/EventTransmissionServiceTest.java | 3 +- 3 files changed, 16 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index b452d73a..020256a4 100644 --- a/README.md +++ b/README.md @@ -277,21 +277,6 @@ You may also define a spring bean of type `FlywayCallback` and annotate it with interface provide several hook into the schema management lifecycle that may, for example, be used to `SET ROLE migrator` before and `RESET ROLE` after each migration. -### Customizing event locks - -* **lock-duration**: The selected events are locked before transmission. If the transmission fails the events stay locked -until the lock expires (default: 600 seconds). - -* **lock-duration-buffer**: Since clocks never work exactly synchronous and sending events also takes some time, a safety -buffer is included. During the last x seconds before the expiration of the lock the events are not considered for -transmission (default: 60 seconds). - -```yaml -nakadi-producer: - lock-duration: 600 - lock-duration-buffer: 60 -``` - ### Test support This library provides a mock implementation of its Nakadi client that can be used in integration testing: @@ -332,6 +317,21 @@ The example above uses `com.jayway.jsonpath:json-path:jar:2.2.0` to parse and te Note that you should disable the scheduled event transmission for the test (e.g. by setting `nakadi-producer.scheduled-transmission-enabled:false`), as that might interfere with the manual transmission and the clearing in the test setup, leading to events from one test showing up in the next test, depending on timing issues. +### Customizing event locks + +* **lock-duration**: The selected events are locked before transmission. If the transmission fails the events stay locked +until the lock expires. The default is currently 600 seconds but might change in future releases. + +* **lock-duration-buffer**: Since clocks never work exactly synchronous and sending events also takes some time, a safety +buffer is included. During the last x seconds before the expiration of the lock the events are not considered for +transmission. The default is currently 60 seconds but might change in future releases. + +```yaml +nakadi-producer: + lock-duration: 600 + lock-duration-buffer: 60 +``` + ## Contributing We welcome contributions. Please have a look at our [contribution guidelines](CONTRIBUTING.md). diff --git a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionService.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionService.java index 240e6334..613f9834 100644 --- a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionService.java +++ b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionService.java @@ -36,10 +36,6 @@ public class EventTransmissionService { private Clock clock = Clock.systemDefaultZone(); - public EventTransmissionService(EventLogRepository eventLogRepository, NakadiPublishingClient nakadiPublishingClient, ObjectMapper objectMapper) { - this(eventLogRepository, nakadiPublishingClient, objectMapper, 600, 60); - } - public EventTransmissionService(EventLogRepository eventLogRepository, NakadiPublishingClient nakadiPublishingClient, ObjectMapper objectMapper, int lockDuration, int lockDurationBuffer) { this.eventLogRepository = eventLogRepository; diff --git a/nakadi-producer/src/test/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionServiceTest.java b/nakadi-producer/src/test/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionServiceTest.java index df08c90c..fecfbf67 100644 --- a/nakadi-producer/src/test/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionServiceTest.java +++ b/nakadi-producer/src/test/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionServiceTest.java @@ -5,7 +5,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.Before; import org.junit.Test; -import org.mockito.ArgumentMatchers; import org.mockito.Mockito; import org.zalando.fahrschein.EventPublishingException; import org.zalando.fahrschein.domain.BatchItemResponse; @@ -53,7 +52,7 @@ public void setUp() { repo = mock(EventLogRepository.class); publishingClient = spy(new MockNakadiPublishingClient()); mapper = spy(new ObjectMapper()); - service = new EventTransmissionService(repo, publishingClient, mapper); + service = new EventTransmissionService(repo, publishingClient, mapper, 600, 60); } @Test From 22c01b67736e1a28f10e617f09e1c38cfa632568 Mon Sep 17 00:00:00 2001 From: bgehrels Date: Tue, 7 May 2019 11:31:43 +0200 Subject: [PATCH 10/10] version bump --- nakadi-producer-spring-boot-starter/pom.xml | 2 +- nakadi-producer/pom.xml | 2 +- pom.xml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/nakadi-producer-spring-boot-starter/pom.xml b/nakadi-producer-spring-boot-starter/pom.xml index 851570f8..4e1de1fe 100644 --- a/nakadi-producer-spring-boot-starter/pom.xml +++ b/nakadi-producer-spring-boot-starter/pom.xml @@ -10,7 +10,7 @@ org.zalando nakadi-producer-reactor - 4.3.1 + 4.4.0 nakadi-producer-spring-boot-starter diff --git a/nakadi-producer/pom.xml b/nakadi-producer/pom.xml index 9cf2beaf..73678dc4 100644 --- a/nakadi-producer/pom.xml +++ b/nakadi-producer/pom.xml @@ -10,7 +10,7 @@ org.zalando nakadi-producer-reactor - 4.3.1 + 4.4.0 nakadi-producer diff --git a/pom.xml b/pom.xml index 16b090ba..cc54e5d7 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ nakadi-producer-reactor org.zalando - 4.3.1 + 4.4.0 pom Nakadi Event Producer Reactor