Skip to content

Commit 2c8a220

Browse files
committed
Test SDK resilience to callback implementation errors
1 parent 8b00331 commit 2c8a220

File tree

1 file changed

+319
-0
lines changed

1 file changed

+319
-0
lines changed
Lines changed: 319 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,319 @@
1+
package com.amazonaws.kinesisvideo.common;
2+
3+
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
4+
import com.amazonaws.kinesisvideo.client.KinesisVideoClient;
5+
import com.amazonaws.kinesisvideo.client.KinesisVideoClientConfiguration;
6+
import com.amazonaws.kinesisvideo.common.exception.KinesisVideoException;
7+
import com.amazonaws.kinesisvideo.internal.client.mediasource.MediaSource;
8+
import com.amazonaws.kinesisvideo.java.auth.JavaCredentialsFactory;
9+
import com.amazonaws.kinesisvideo.java.client.KinesisVideoJavaClientFactory;
10+
import com.amazonaws.kinesisvideo.java.mediasource.file.ImageFileMediaSource;
11+
import com.amazonaws.kinesisvideo.java.mediasource.file.ImageFileMediaSourceConfiguration;
12+
import com.amazonaws.kinesisvideo.producer.DeviceInfo;
13+
import com.amazonaws.kinesisvideo.producer.FragmentAckType;
14+
import com.amazonaws.kinesisvideo.producer.KinesisVideoFragmentAck;
15+
import com.amazonaws.kinesisvideo.producer.ProducerException;
16+
import com.amazonaws.kinesisvideo.producer.StorageInfo;
17+
import com.amazonaws.kinesisvideo.producer.Tag;
18+
import com.amazonaws.kinesisvideo.streaming.DefaultStreamCallbacks;
19+
import com.amazonaws.services.kinesisvideo.AmazonKinesisVideo;
20+
import com.amazonaws.services.kinesisvideo.AmazonKinesisVideoClientBuilder;
21+
import com.amazonaws.services.kinesisvideo.model.CreateStreamRequest;
22+
import com.amazonaws.services.kinesisvideo.model.DeleteStreamRequest;
23+
import com.amazonaws.services.kinesisvideo.model.DescribeStreamRequest;
24+
import com.amazonaws.services.kinesisvideo.model.DescribeStreamResult;
25+
import org.apache.logging.log4j.LogManager;
26+
import org.apache.logging.log4j.Logger;
27+
import org.junit.After;
28+
import org.junit.Before;
29+
import org.junit.Rule;
30+
import org.junit.Test;
31+
import org.junit.rules.Timeout;
32+
33+
import javax.annotation.Nonnull;
34+
import java.util.ArrayList;
35+
import java.util.List;
36+
import java.util.Optional;
37+
import java.util.concurrent.Executors;
38+
import java.util.concurrent.ScheduledExecutorService;
39+
import java.util.concurrent.atomic.AtomicBoolean;
40+
41+
import static org.junit.Assert.assertNotEquals;
42+
import static org.junit.Assert.assertTrue;
43+
import static org.junit.Assert.fail;
44+
import static org.junit.Assume.assumeNotNull;
45+
46+
public class StreamCallbacksErrorHandlingIntegTest {
47+
48+
private static final Logger log = LogManager.getLogger(StreamCallbacksErrorHandlingIntegTest.class);
49+
50+
private static final int FPS_25 = 25;
51+
private static final String H264_FILES_DIR = "src/main/resources/data/h264/";
52+
private static final String IMAGE_FILENAME_FORMAT = "frame-%03d.h264";
53+
private static final int START_FILE_INDEX = 1;
54+
private static final int END_FILE_INDEX = 375;
55+
56+
private static final int STREAMING_DURATION_MS = 3000;
57+
private static final int SETUP_TEARDOWN_PADDING_MS = 5000;
58+
59+
private String streamName = null;
60+
61+
@Rule
62+
public Timeout globalTimeout = Timeout.millis(STREAMING_DURATION_MS + SETUP_TEARDOWN_PADDING_MS);
63+
64+
@Before
65+
public void setUp() {
66+
final boolean jniLoaded = ProducerTestBase.isJNILoaded();
67+
if (!jniLoaded) {
68+
fail("JNI library not found.");
69+
}
70+
71+
assumeNotNull("Unable to find credentials!", DefaultAWSCredentialsProviderChain.getInstance().getCredentials());
72+
}
73+
74+
@After
75+
public void tearDown() {
76+
if (this.streamName != null) {
77+
final AmazonKinesisVideo awsSdkKinesisVideoClient = AmazonKinesisVideoClientBuilder.standard().build();
78+
try {
79+
final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest().withStreamName(this.streamName);
80+
final DescribeStreamResult describeStreamResult = awsSdkKinesisVideoClient.describeStream(describeStreamRequest);
81+
82+
final DeleteStreamRequest deleteStreamRequest = new DeleteStreamRequest()
83+
.withStreamARN(describeStreamResult.getStreamInfo().getStreamARN())
84+
.withCurrentVersion(describeStreamResult.getStreamInfo().getVersion());
85+
awsSdkKinesisVideoClient.deleteStream(deleteStreamRequest);
86+
} catch (final Exception e) {
87+
log.error("Failed to delete the stream: {}", this.streamName, e);
88+
fail("Failed to delete the stream: " + this.streamName);
89+
}
90+
}
91+
}
92+
93+
/**
94+
* Fault-injection scenario: Throwing a runtime exception in the user-implementation of the fragmentAckReceived
95+
* callback. The SDK should log and ignore application errors and continue.
96+
*/
97+
@Test
98+
public void when_streamCallbacksThrowRuntimeException_thenProducerContinuesNormally()
99+
throws KinesisVideoException, InterruptedException {
100+
final String testName = new Object() {
101+
}.getClass().getEnclosingMethod().getName();
102+
final String testStreamName = Optional.ofNullable(System.getenv("TEST_STREAMS_PREFIX")).orElse("") + "StreamCallbacksErrorHandlingIntegTest" + testName + "_" + System.currentTimeMillis();
103+
this.streamName = testStreamName;
104+
createStream(this.streamName);
105+
106+
final KinesisVideoClientConfiguration configuration = KinesisVideoClientConfiguration.builder()
107+
.withCredentialsProvider(JavaCredentialsFactory
108+
.createKinesisVideoCredentialsProvider(DefaultAWSCredentialsProviderChain
109+
.getInstance()
110+
)
111+
)
112+
.build();
113+
final DeviceInfo deviceInfo = createTestDeviceInfo(testName + "-device");
114+
final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
115+
final KinesisVideoClient client = KinesisVideoJavaClientFactory.createKinesisVideoClient(configuration,
116+
deviceInfo, executorService);
117+
118+
assertTrue("Client should be initialized!", client.isInitialized());
119+
120+
final List<KinesisVideoFragmentAck> acksReceived = new ArrayList<>();
121+
final AtomicBoolean streamReadyCalled = new AtomicBoolean(false);
122+
final AtomicBoolean streamClosedCalled = new AtomicBoolean(false);
123+
final MediaSource mediaSource = new ImageFileMediaSource(testStreamName);
124+
mediaSource.configure(new ImageFileMediaSourceConfiguration.Builder()
125+
.fps(FPS_25)
126+
.dir(H264_FILES_DIR)
127+
.filenameFormat(IMAGE_FILENAME_FORMAT)
128+
.startFileIndex(START_FILE_INDEX)
129+
.endFileIndex(END_FILE_INDEX)
130+
.streamCallbacks(new DefaultStreamCallbacks() {
131+
@Override
132+
public void fragmentAckReceived(final long uploadHandle, @Nonnull final KinesisVideoFragmentAck fragmentAck) throws ProducerException {
133+
super.fragmentAckReceived(uploadHandle, fragmentAck);
134+
acksReceived.add(fragmentAck);
135+
136+
assertNotEquals("Received an unexpected ERROR ack: " + fragmentAck,
137+
FragmentAckType.FRAGMENT_ACK_TYPE_ERROR, fragmentAck.getAckType().getIntType());
138+
139+
throw new RuntimeException("Throwing an exception to check error handling in fragmentAckReceived!");
140+
}
141+
142+
@Override
143+
public void streamErrorReport(final long uploadHandle, final long frameTimecode, final long statusCode) throws ProducerException {
144+
super.streamErrorReport(uploadHandle, frameTimecode, statusCode);
145+
146+
fail("Received an unexpected ERROR for the stream: 0x" + Long.toHexString(statusCode));
147+
}
148+
149+
@Override
150+
public void streamReady() throws ProducerException {
151+
super.streamReady();
152+
streamReadyCalled.set(true);
153+
}
154+
155+
@Override
156+
public void streamClosed(final long uploadHandle) throws ProducerException {
157+
super.streamClosed(uploadHandle);
158+
streamClosedCalled.set(true);
159+
}
160+
})
161+
.build());
162+
163+
client.registerMediaSource(mediaSource);
164+
165+
mediaSource.start();
166+
167+
log.info("Started media source");
168+
Thread.sleep(STREAMING_DURATION_MS);
169+
170+
log.info("Stopping media source");
171+
mediaSource.stop();
172+
client.unregisterMediaSource(mediaSource);
173+
mediaSource.free();
174+
175+
final long persistedAcksCount = acksReceived.stream()
176+
.filter(ack -> ack.getAckType().getIntType() == FragmentAckType.FRAGMENT_ACK_TYPE_PERSISTED)
177+
.count();
178+
179+
assertTrue("Didn't receive any PERSISTED ACKs. Received: " + acksReceived, persistedAcksCount > 0);
180+
assertTrue("StreamReady callback wasn't called", streamReadyCalled.get());
181+
assertTrue("StreamClosed callback wasn't called", streamClosedCalled.get());
182+
183+
executorService.shutdownNow();
184+
}
185+
186+
/**
187+
* Fault-injection scenario: Throwing an Error in the user-implementation of the fragmentAckReceived
188+
* callback. The SDK should log and ignore application errors and continue.
189+
*/
190+
@Test
191+
public void when_streamCallbacksThrowError_thenProducerContinuesNormally()
192+
throws KinesisVideoException, InterruptedException {
193+
final String testName = new Object() {
194+
}.getClass().getEnclosingMethod().getName();
195+
final String testStreamName = Optional.ofNullable(System.getenv("TEST_STREAMS_PREFIX")).orElse("") + "StreamCallbacksErrorHandlingIntegTest" + testName + "_" + System.currentTimeMillis();
196+
this.streamName = testStreamName;
197+
createStream(this.streamName);
198+
199+
final KinesisVideoClientConfiguration configuration = KinesisVideoClientConfiguration.builder()
200+
.withCredentialsProvider(JavaCredentialsFactory
201+
.createKinesisVideoCredentialsProvider(DefaultAWSCredentialsProviderChain
202+
.getInstance()
203+
)
204+
)
205+
.build();
206+
final DeviceInfo deviceInfo = createTestDeviceInfo(testName + "-device");
207+
final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
208+
final KinesisVideoClient client = KinesisVideoJavaClientFactory.createKinesisVideoClient(configuration,
209+
deviceInfo, executorService);
210+
211+
assertTrue("Client should be initialized!", client.isInitialized());
212+
213+
final List<KinesisVideoFragmentAck> acksReceived = new ArrayList<>();
214+
final AtomicBoolean streamReadyCalled = new AtomicBoolean(false);
215+
final AtomicBoolean streamClosedCalled = new AtomicBoolean(false);
216+
final MediaSource mediaSource = new ImageFileMediaSource(testStreamName);
217+
mediaSource.configure(new ImageFileMediaSourceConfiguration.Builder()
218+
.fps(FPS_25)
219+
.dir(H264_FILES_DIR)
220+
.filenameFormat(IMAGE_FILENAME_FORMAT)
221+
.startFileIndex(START_FILE_INDEX)
222+
.endFileIndex(END_FILE_INDEX)
223+
.streamCallbacks(new DefaultStreamCallbacks() {
224+
@Override
225+
public void fragmentAckReceived(final long uploadHandle, @Nonnull final KinesisVideoFragmentAck fragmentAck) throws ProducerException {
226+
super.fragmentAckReceived(uploadHandle, fragmentAck);
227+
acksReceived.add(fragmentAck);
228+
229+
assertNotEquals("Received an unexpected ERROR ack: " + fragmentAck,
230+
FragmentAckType.FRAGMENT_ACK_TYPE_ERROR, fragmentAck.getAckType().getIntType());
231+
232+
throw new Error("Throwing an exception to check error handling in fragmentAckReceived!");
233+
}
234+
235+
@Override
236+
public void streamErrorReport(final long uploadHandle, final long frameTimecode, final long statusCode) throws ProducerException {
237+
super.streamErrorReport(uploadHandle, frameTimecode, statusCode);
238+
239+
fail("Received an unexpected ERROR for the stream: 0x" + Long.toHexString(statusCode));
240+
}
241+
242+
@Override
243+
public void streamReady() throws ProducerException {
244+
super.streamReady();
245+
streamReadyCalled.set(true);
246+
}
247+
248+
@Override
249+
public void streamClosed(final long uploadHandle) throws ProducerException {
250+
super.streamClosed(uploadHandle);
251+
streamClosedCalled.set(true);
252+
}
253+
})
254+
.build());
255+
256+
client.registerMediaSource(mediaSource);
257+
258+
mediaSource.start();
259+
260+
log.info("Started media source");
261+
Thread.sleep(STREAMING_DURATION_MS);
262+
263+
log.info("Stopping media source");
264+
mediaSource.stop();
265+
client.unregisterMediaSource(mediaSource);
266+
mediaSource.free();
267+
268+
final long persistedAcksCount = acksReceived.stream()
269+
.filter(ack -> ack.getAckType().getIntType() == FragmentAckType.FRAGMENT_ACK_TYPE_PERSISTED)
270+
.count();
271+
272+
assertTrue("Didn't receive any PERSISTED ACKs. Received: " + acksReceived, persistedAcksCount > 0);
273+
assertTrue("StreamReady callback wasn't called", streamReadyCalled.get());
274+
assertTrue("StreamClosed callback wasn't called", streamClosedCalled.get());
275+
276+
executorService.shutdownNow();
277+
}
278+
279+
private void createStream(@Nonnull final String streamName) {
280+
assumeNotNull(streamName, "StreamName cannot be null");
281+
282+
final AmazonKinesisVideo awsSdkKinesisVideoClient = AmazonKinesisVideoClientBuilder.standard().build();
283+
try {
284+
final CreateStreamRequest createStreamRequest = new CreateStreamRequest()
285+
.withStreamName(this.streamName)
286+
.withDataRetentionInHours(2);
287+
awsSdkKinesisVideoClient.createStream(createStreamRequest);
288+
} catch (final Exception e) {
289+
log.error("Failed to create the stream: {}", this.streamName, e);
290+
fail("Failed to create the stream: " + this.streamName);
291+
}
292+
}
293+
294+
@Nonnull
295+
@SuppressWarnings("ConstantConditions")
296+
private DeviceInfo createTestDeviceInfo(@Nonnull final String deviceName) {
297+
assumeNotNull("Device name cannot be null", deviceName);
298+
299+
final int storageInfoVersion = 0;
300+
final StorageInfo.DeviceStorageType storageType = StorageInfo.DeviceStorageType.DEVICE_STORAGE_TYPE_IN_MEM;
301+
final long storageSizeBytes = 1024 * 1024 * 10; // 10 MB
302+
final int spillRatio = 90;
303+
final String rootDirectory = "/tmp";
304+
final StorageInfo storageInfo = new StorageInfo(storageInfoVersion,
305+
storageType,
306+
storageSizeBytes,
307+
spillRatio,
308+
rootDirectory);
309+
310+
final int deviceInfoVersion = 0;
311+
final Tag[] tags = null;
312+
final int numStreams = 1;
313+
return new DeviceInfo(deviceInfoVersion,
314+
deviceName,
315+
storageInfo,
316+
numStreams,
317+
tags);
318+
}
319+
}

0 commit comments

Comments
 (0)