Skip to content

Commit 5f9fc4f

Browse files
committed
Preload frames statically in a new image frame source class
1 parent cc8328c commit 5f9fc4f

File tree

2 files changed

+197
-6
lines changed

2 files changed

+197
-6
lines changed

src/main/java/com/amazonaws/kinesisvideo/java/mediasource/file/ImageFileMediaSource.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public class ImageFileMediaSource implements MediaSource {
6060
private ImageFileMediaSourceConfiguration imageFileMediaSourceConfiguration;
6161
private MediaSourceState mediaSourceState;
6262
private MediaSourceSink mediaSourceSink;
63-
private ImageFrameSource imageFrameSource;
63+
private PreloadedSampleImageFrameSource preloadedSampleImageFrameSource;
6464

6565
public ImageFileMediaSource(@Nonnull final String streamName) {
6666
this(streamName, new CompletableFuture<>());
@@ -133,15 +133,15 @@ public void configure(final MediaSourceConfiguration configuration) {
133133
@Override
134134
public void start() throws KinesisVideoException {
135135
mediaSourceState = MediaSourceState.RUNNING;
136-
imageFrameSource = new ImageFrameSource(imageFileMediaSourceConfiguration);
137-
imageFrameSource.onStreamDataAvailable(new DefaultOnStreamDataAvailable(mediaSourceSink));
138-
imageFrameSource.start();
136+
preloadedSampleImageFrameSource = new PreloadedSampleImageFrameSource(imageFileMediaSourceConfiguration);
137+
preloadedSampleImageFrameSource.onStreamDataAvailable(new DefaultOnStreamDataAvailable(mediaSourceSink));
138+
preloadedSampleImageFrameSource.start();
139139
}
140140

141141
@Override
142142
public void stop() throws KinesisVideoException {
143-
if (imageFrameSource != null) {
144-
imageFrameSource.stop();
143+
if (preloadedSampleImageFrameSource != null) {
144+
preloadedSampleImageFrameSource.stop();
145145
}
146146

147147
try {
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
package com.amazonaws.kinesisvideo.java.mediasource.file;
2+
3+
import com.amazonaws.kinesisvideo.common.exception.KinesisVideoException;
4+
import com.amazonaws.kinesisvideo.internal.mediasource.OnStreamDataAvailable;
5+
6+
import com.amazonaws.kinesisvideo.producer.KinesisVideoFrame;
7+
import org.apache.commons.logging.Log;
8+
import org.apache.commons.logging.LogFactory;
9+
10+
import javax.annotation.concurrent.NotThreadSafe;
11+
import java.io.IOException;
12+
import java.nio.ByteBuffer;
13+
import java.nio.file.Files;
14+
import java.nio.file.Path;
15+
import java.nio.file.Paths;
16+
import java.time.Duration;
17+
import java.util.ArrayList;
18+
import java.util.List;
19+
import java.util.concurrent.ExecutorService;
20+
import java.util.concurrent.Executors;
21+
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
24+
import static com.amazonaws.kinesisvideo.producer.FrameFlags.FRAME_FLAG_KEY_FRAME;
25+
import static com.amazonaws.kinesisvideo.producer.FrameFlags.FRAME_FLAG_NONE;
26+
import static com.amazonaws.kinesisvideo.producer.Time.HUNDREDS_OF_NANOS_IN_A_MILLISECOND;
27+
import static com.amazonaws.kinesisvideo.producer.Time.NANOS_IN_A_MILLISECOND;
28+
29+
/**
30+
* Frame source backed by local image files to be loaded into static memory.
31+
*/
32+
@NotThreadSafe
33+
public class PreloadedSampleImageFrameSource {
34+
public static final int METADATA_INTERVAL = 8;
35+
private static final long FRAME_DURATION_20_MS = 20L;
36+
private final ExecutorService executor = Executors.newFixedThreadPool(1);
37+
private final int fps;
38+
private final ImageFileMediaSourceConfiguration configuration;
39+
40+
private OnStreamDataAvailable mkvDataAvailableCallback;
41+
private final AtomicBoolean isRunning = new AtomicBoolean(false);
42+
private int frameCounter;
43+
private final Log log = LogFactory.getLog(PreloadedSampleImageFrameSource.class);
44+
private final String metadataName = "ImageLoop";
45+
private int metadataCount = 0;
46+
private long currentFrameTimestampMs;
47+
private final long executorShutdownTimeoutSeconds = 5L;
48+
49+
private static final int FPS_25 = 25;
50+
private static final String IMAGE_DIR = "src/main/resources/data/h264/";
51+
private static final String IMAGE_FILENAME_FORMAT = "frame-%03d.h264";
52+
private static final int START_FILE_INDEX = 1;
53+
private static final int END_FILE_INDEX = 375;
54+
55+
private static final List<ByteBuffer> preloadedFrames;
56+
57+
// Preload image frames into memory.
58+
static {
59+
preloadedFrames = new ArrayList<>();
60+
try {
61+
final ImageFileMediaSourceConfiguration defaultConfig = new ImageFileMediaSourceConfiguration.Builder()
62+
.fps(FPS_25)
63+
.dir(IMAGE_DIR)
64+
.filenameFormat(IMAGE_FILENAME_FORMAT)
65+
.startFileIndex(START_FILE_INDEX)
66+
.endFileIndex(END_FILE_INDEX)
67+
.allowStreamCreation(false)
68+
.build();
69+
70+
for (int i = defaultConfig.getStartFileIndex(); i <= defaultConfig.getEndFileIndex(); i++) {
71+
String filename = String.format(defaultConfig.getFilenameFormat(), i);
72+
Path path = Paths.get(defaultConfig.getDir() + filename);
73+
byte[] data = Files.readAllBytes(path);
74+
preloadedFrames.add(ByteBuffer.wrap(data));
75+
}
76+
} catch (IOException e) {
77+
throw new RuntimeException("Failed to preload image frames", e);
78+
}
79+
}
80+
81+
82+
public PreloadedSampleImageFrameSource(final ImageFileMediaSourceConfiguration configuration) {
83+
this.configuration = configuration;
84+
this.fps = configuration.getFps();
85+
this.currentFrameTimestampMs = configuration.getStartTimeMs();
86+
}
87+
88+
public void start() {
89+
if (isRunning.get()) {
90+
throw new IllegalStateException("Frame source is already running");
91+
}
92+
93+
isRunning.set(true);
94+
startFrameGenerator();
95+
}
96+
97+
public void stop() {
98+
isRunning.set(false);
99+
stopFrameGenerator();
100+
}
101+
102+
public void onStreamDataAvailable(final OnStreamDataAvailable onMkvDataAvailable) {
103+
this.mkvDataAvailableCallback = onMkvDataAvailable;
104+
}
105+
106+
private void startFrameGenerator() {
107+
executor.execute(new Runnable() {
108+
@Override
109+
public void run() {
110+
try {
111+
generateFrameAndNotifyListener();
112+
} catch (final KinesisVideoException e) {
113+
log.error("Failed to keep generating frames with Exception", e);
114+
}
115+
}
116+
});
117+
}
118+
119+
private void generateFrameAndNotifyListener() throws KinesisVideoException {
120+
final double frameDurationMs = (double) Duration.ofSeconds(1L).toMillis() / fps;
121+
long nextFrameTimeNs = System.nanoTime(); // to prevent time drift
122+
123+
while (isRunning.get()) {
124+
if (mkvDataAvailableCallback != null) {
125+
mkvDataAvailableCallback.onFrameDataAvailable(createKinesisVideoFrameFromImage(frameCounter, currentFrameTimestampMs));
126+
if (isMetadataReady()) {
127+
mkvDataAvailableCallback.onFragmentMetadataAvailable(metadataName + metadataCount,
128+
Integer.toString(metadataCount++), false);
129+
}
130+
}
131+
132+
frameCounter++;
133+
currentFrameTimestampMs = configuration.getStartTimeMs() + Math.round(frameCounter * frameDurationMs);
134+
nextFrameTimeNs += (long)(frameDurationMs * NANOS_IN_A_MILLISECOND);
135+
136+
long sleepTimeMs = (nextFrameTimeNs - System.nanoTime()) / NANOS_IN_A_MILLISECOND; // Convert to Ms
137+
if (sleepTimeMs > 0) {
138+
try {
139+
Thread.sleep(sleepTimeMs);
140+
} catch (final InterruptedException e) {
141+
log.error("Frame interval wait interrupted by Exception ", e);
142+
}
143+
}
144+
}
145+
}
146+
147+
private boolean isMetadataReady() {
148+
return frameCounter % METADATA_INTERVAL == 0;
149+
}
150+
151+
private KinesisVideoFrame createKinesisVideoFrameFromImage(final long index, final long timestampMs) {
152+
final int flags = isKeyFrame() ? FRAME_FLAG_KEY_FRAME : FRAME_FLAG_NONE;
153+
int preloadIndex = (int) (index % preloadedFrames.size());
154+
ByteBuffer frameData = preloadedFrames.get(preloadIndex).duplicate(); // duplicate() used so each instance can track its own position.
155+
156+
return new KinesisVideoFrame(
157+
frameCounter,
158+
flags,
159+
timestampMs * HUNDREDS_OF_NANOS_IN_A_MILLISECOND,
160+
timestampMs * HUNDREDS_OF_NANOS_IN_A_MILLISECOND,
161+
FRAME_DURATION_20_MS * HUNDREDS_OF_NANOS_IN_A_MILLISECOND,
162+
frameData);
163+
}
164+
165+
private boolean isKeyFrame() {
166+
return frameCounter % configuration.getFps() == 0;
167+
}
168+
169+
170+
private void stopFrameGenerator() {
171+
executor.shutdown();
172+
try {
173+
if (!executor.awaitTermination(this.executorShutdownTimeoutSeconds, TimeUnit.SECONDS)) {
174+
log.warn("Executor did not terminate in time. Forcing shutdown.");
175+
final List<Runnable> droppedTasks = executor.shutdownNow();
176+
log.warn("Number of dropped tasks: " + droppedTasks.size());
177+
for (final Runnable task : droppedTasks) {
178+
log.warn("Dropped task of type: " + task.getClass().getName());
179+
}
180+
}
181+
} catch (final InterruptedException e) {
182+
log.error("Executor shutdown interrupted with Exception ", e);
183+
final List<Runnable> droppedTasks = executor.shutdownNow();
184+
log.warn("Number of dropped tasks: " + droppedTasks.size());
185+
for (final Runnable task : droppedTasks) {
186+
log.warn("Dropped task of type: " + task.getClass().getName());
187+
}
188+
Thread.currentThread().interrupt();
189+
}
190+
}
191+
}

0 commit comments

Comments
 (0)