Skip to content

Commit

Permalink
Address comment on completeCodec
Browse files Browse the repository at this point in the history
Signed-off-by: Srikanth Govindarajan <[email protected]>
  • Loading branch information
srikanthjg committed Jan 28, 2025
1 parent f86437a commit b00b92c
Show file tree
Hide file tree
Showing 5 changed files with 6 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,6 @@ public void setup() {
lambdaRegion = System.getProperty("tests.lambda.processor.region");
functionName = System.getProperty("tests.lambda.processor.functionName");
role = System.getProperty("tests.lambda.processor.sts_role_arn");
lambdaRegion = "us-west-2";
functionName = "lambdaNoReturn";
role = "arn:aws:iam::176893235612:role/osis-s3-opensearch-role";


pluginMetrics = mock(PluginMetrics.class);
pluginSetting = mock(PluginSetting.class);
Expand Down Expand Up @@ -519,7 +515,7 @@ def lambda_handler(event, context):
}
}

// 11) Finally, check that we had at least one retry
// Finally, check that we had at least one retry
// If concurrency=1 is truly enforced, at least some calls should have gotten a 429
// -> triggered CountingRetryCondition
int retryCount = countingRetryCondition.getRetryCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,12 @@ private static List<Buffer> createBufferBatches(Collection<Record<Event>> record
if (ThresholdCheck.checkThresholdExceed(currentBufferPerBatch, maxEvents, maxBytes,
maxCollectionDuration)) {
batchedBuffers.add(currentBufferPerBatch);
currentBufferPerBatch.completeCodec();
currentBufferPerBatch = new InMemoryBuffer(keyName, outputCodecContext);
}
}

if (currentBufferPerBatch.getEventCount() > 0) {
batchedBuffers.add(currentBufferPerBatch);
currentBufferPerBatch.completeCodec();
}
return batchedBuffers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,5 @@ public interface Buffer {
Long getPayloadRequestSize();

Duration stopLatencyWatch();

void completeCodec();

}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void addRecord(Record<Event> record) {
eventCount++;
}

public void completeCodec() {
void completeCodec() {
if (eventCount > 0) {
try {
requestCodec.complete(this.byteArrayOutputStream);
Expand Down Expand Up @@ -108,6 +108,8 @@ public InvokeRequest getRequestPayload(String functionName, String invocationTyp
return null;
}

completeCodec();

SdkBytes payload = getPayload();
payloadRequestSize = payload.asByteArray().length;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,6 @@
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
<<<<<<< HEAD
=======

import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
>>>>>>> 00f98516c (Add retryCondidition to lambda Client)
import org.mockito.MockitoAnnotations;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
Expand Down Expand Up @@ -77,10 +66,11 @@
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down

0 comments on commit b00b92c

Please sign in to comment.