diff --git a/MutedStreamReplicator/.classpath b/MutedStreamReplicator/.classpath new file mode 100644 index 00000000..24107ee7 --- /dev/null +++ b/MutedStreamReplicator/.classpath @@ -0,0 +1,67 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/MutedStreamReplicator/.gitignore b/MutedStreamReplicator/.gitignore new file mode 100644 index 00000000..bd50e057 --- /dev/null +++ b/MutedStreamReplicator/.gitignore @@ -0,0 +1,8 @@ +/target/ +./target/* +/.classpath +/.project +/.settings/ +/src/main/java/io/antmedia/plugin/*.js +/log/ +.idea \ No newline at end of file diff --git a/MutedStreamReplicator/.project b/MutedStreamReplicator/.project new file mode 100644 index 00000000..3d653174 --- /dev/null +++ b/MutedStreamReplicator/.project @@ -0,0 +1,34 @@ + + + MutedStreamReplicator + + + + + + org.eclipse.jdt.core.javabuilder + + + + + org.eclipse.m2e.core.maven2Builder + + + + + + org.eclipse.jdt.core.javanature + org.eclipse.m2e.core.maven2Nature + + + + 1773393856443 + + 30 + + org.eclipse.core.resources.regexFilterMatcher + node_modules|\.git|__CREATED_BY_JAVA_LANGUAGE_SERVER__ + + + + diff --git a/MutedStreamReplicator/README.md b/MutedStreamReplicator/README.md new file mode 100644 index 00000000..879ae837 --- /dev/null +++ b/MutedStreamReplicator/README.md @@ -0,0 +1,65 @@ +# MutedStreamReplicator + +`MutedStreamReplicator` creates a muted companion stream for each published source stream by republishing it to the same application with the `-muted` suffix. + +Example: + +- Source stream: `liveStream1` +- Muted replica stream: `liveStream1-muted` + +The plugin listens for stream lifecycle events, starts a local RTMP endpoint for the muted replica, disables audio and video ingest on the replica stream itself, and then mirrors muxer output from the original enterprise transcoding pipeline into the replica pipeline. + +## Requirements + +- Ant Media Server Enterprise +- Maven +- Java version compatible with your Ant Media Server build + +This plugin depends on `EncoderAdaptor`, so it is intended for transcoding-enabled flows. + +## Build + +```sh +mvn clean install -Dgpg.skip=true +``` + +## Install + +Copy the generated plugin jar into your Ant Media Server plugins directory: + +```sh +cp target/MutedStreamReplicator.jar /usr/local/antmedia/plugins/ +``` + +Restart Ant Media Server after copying the jar: + +```sh +sudo service antmedia restart +``` + +## How It Works + +1. When a source stream starts, the plugin asks Ant Media Server to publish a local RTMP endpoint named `-muted`. +2. When that muted replica stream starts, the plugin disables direct ingest on the replica adaptor. +3. The plugin attaches lightweight receiver muxers to the source stream and matching rendition encoders. +4. Those receivers forward packets into the target stream's muxers, producing a muted copy of the source stream. + +## Notes + +- The muted replica stream id suffix is `-muted`. +- The plugin only manages lifecycle for streams it starts through this suffix convention. +- If the source or target adaptor is not an `EncoderAdaptor`, the plugin logs a warning and skips wiring. + +## Logs + +Useful logs can be watched with: + +```sh +tail -f /usr/local/antmedia/log/ant-media-server.log +``` + +Typical messages include: + +- `Started muted replica stream: -muted` +- `Muted replica stream started: -muted` +- `Failed to start muted replica stream ...` diff --git a/MutedStreamReplicator/mvn-settings.xml b/MutedStreamReplicator/mvn-settings.xml new file mode 100644 index 00000000..3365e684 --- /dev/null +++ b/MutedStreamReplicator/mvn-settings.xml @@ -0,0 +1,23 @@ + + + + ossrh + ${env.CI_DEPLOY_USERNAME} + ${env.CI_DEPLOY_PASSWORD} + + + + + + ossrh + + true + + + gpg + ${env.GPG_KEY_NAME} + ${env.GPG_PASSPHRASE} + + + + \ No newline at end of file diff --git a/MutedStreamReplicator/pom.xml b/MutedStreamReplicator/pom.xml new file mode 100644 index 00000000..ddfb567d --- /dev/null +++ b/MutedStreamReplicator/pom.xml @@ -0,0 +1,271 @@ + + + io.antmedia + parent + 3.0.0-SNAPSHOT + + 4.0.0 + io.antmedia.plugin + MutedStreamReplicator + jar + MutedStreamReplicator + http://maven.apache.org + + false + UTF-8 + + + + burak + burak@antmedia.io + + + + + Apache 2 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + A business-friendly OSS license + + + + + + true + + central + Central Repository + https://repo.maven.apache.org/maven2 + + + Central Portal Snapshots + central-portal-snapshots + https://central.sonatype.com/repository/maven-snapshots/ + + false + + + true + + + + + + ossrh + https://oss.sonatype.org/content/repositories/snapshots + + + ossrh + https://oss.sonatype.org/service/local/staging/deploy/maven2/ + + + + MutedStreamReplicator + src/test/java + + + org.sonatype.plugins + nexus-staging-maven-plugin + 1.6.7 + true + + ossrh + https://oss.sonatype.org/ + true + + + + org.apache.maven.plugins + maven-source-plugin + + + attach-sources + + jar-no-fork + + + + + + org.apache.maven.plugins + maven-source-plugin + + + attach-sources + + jar-no-fork + + + + + + maven-jar-plugin + + + org.apache.felix + maven-bundle-plugin + true + + + org.apache.maven.plugins + maven-gpg-plugin + 1.6 + + + sign-artifacts + verify + + sign + + + + + + + + src/main/resources + + + src/main/java + + **/*.xml + + + + + + src/test/resources + + + src/main/webapp + + **/*.xml + **/*.properties + + + + + + + org.apache.tomcat.embed + tomcat-embed-websocket + ${tomcat.version} + provided + + + io.antmedia + ant-media-server + ${project.parent.version} + provided + + + io.antmedia.enterprise + ant-media-enterprise + ${project.parent.version} + provided + + + org.slf4j + slf4j-api + provided + + + org.apache.mina + mina-core + ${mina.version} + bundle + provided + + + org.springframework + spring-beans + ${spring.version} + provided + + + org.springframework + spring-context-support + provided + + + org.springframework + spring-test + ${spring.version} + provided + + + org.springframework + spring-context + provided + + + org.springframework + spring-core + provided + + + org.springframework + spring-expression + ${spring.version} + provided + + + org.springframework + spring-aop + ${spring.version} + provided + + + org.springframework + spring-web + provided + + + org.glassfish.jersey.media + jersey-media-json-jackson + ${jersey.version} + provided + + + org.glassfish.jersey.ext + jersey-spring6 + ${jersey.version} + provided + + + junit + junit + test + + + org.mockito + mockito-core + 3.12.4 + test + + + org.awaitility + awaitility + ${awaitility.version} + test + + + com.google.code.gson + gson + ${gson.version} + test + + + org.bytedeco + ffmpeg-platform + ${javacpp.ffmpeg.version} + provided + + + Muted Stream Replicator for Ant Media Server + + Ant Media + http://antmedia.io + + diff --git a/MutedStreamReplicator/redeploy.sh b/MutedStreamReplicator/redeploy.sh new file mode 100755 index 00000000..526cff37 --- /dev/null +++ b/MutedStreamReplicator/redeploy.sh @@ -0,0 +1,27 @@ +#!/bin/sh +AMS_DIR=~/softwares/ant-media-server +ENTERPRISE_DIR=~/antmedia/Ant-Media-Enterprise + +mvn -f "$ENTERPRISE_DIR/pom.xml" clean install -Dmaven.javadoc.skip=true -Dmaven.test.skip=true -Dgpg.skip=true -Djarsigner.skip=true +OUT=$? + +if [ $OUT -ne 0 ]; then + exit $OUT +fi + +mvn clean install -Dmaven.javadoc.skip=true -Dmaven.test.skip=true -Dgpg.skip=true +OUT=$? + +if [ $OUT -ne 0 ]; then + exit $OUT +fi + +rm -r $AMS_DIR/plugins/MutedStreamReplicator* +cp target/MutedStreamReplicator.jar $AMS_DIR/plugins/ + +OUT=$? + +if [ $OUT -ne 0 ]; then + exit $OUT +fi +#./start-debug.sh diff --git a/MutedStreamReplicator/src/main/java/io/antmedia/plugin/MutedStreamReplicatorPlugin.java b/MutedStreamReplicator/src/main/java/io/antmedia/plugin/MutedStreamReplicatorPlugin.java new file mode 100644 index 00000000..eb318e4c --- /dev/null +++ b/MutedStreamReplicator/src/main/java/io/antmedia/plugin/MutedStreamReplicatorPlugin.java @@ -0,0 +1,311 @@ +package io.antmedia.plugin; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import com.nimbusds.jose.shaded.gson.Gson; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.stereotype.Component; + +import io.antmedia.AntMediaApplicationAdapter; +import io.antmedia.datastore.db.types.Broadcast; +import io.antmedia.datastore.db.types.Endpoint; +import io.antmedia.muxer.IAntMediaStreamHandler; +import io.antmedia.muxer.MuxAdaptor; +import io.antmedia.plugin.api.IStreamListener; +import io.antmedia.plugin.mutedstreamreplicator.MutedStreamManager; +import io.antmedia.rest.model.Result; + +@Component(value="plugin.muted-stream-replicator") +public class MutedStreamReplicatorPlugin implements ApplicationContextAware, IStreamListener{ + + public static final String PLUGIN_BEAN_NAME = "plugin.muted-stream-replicator"; + /** @deprecated Use settings instead. Kept for backwards compatibility. */ + @Deprecated + public static final String DEFAULT_MUTED_SUFFIX = "-muted"; + + private static final Logger logger = LoggerFactory.getLogger(MutedStreamReplicatorPlugin.class); + private static final Gson gson = new Gson(); + + private ApplicationContext applicationContext; + private AntMediaApplicationAdapter appAdapter; + private final Map mutedStreamManagers = new ConcurrentHashMap<>(); + + @Override + public void setApplicationContext(ApplicationContext applicationContext) { + this.applicationContext = applicationContext; + this.appAdapter = (AntMediaApplicationAdapter) applicationContext.getBean(AntMediaApplicationAdapter.BEAN_NAME); + + // Run cleanup before adding the listener. + // There might be hanging endpoints left behind in case of crash + if (loadSettings().isCleanupOrphanedReplicasOnStartup()) { + try { + int removed = cleanupOrphanedReplicaEndpoints(); + if (removed > 0) { + logger.info("Removed {} orphan replica endpoint(s) on startup", removed); + } + } catch (Exception e) { + logger.error("Orphan replica endpoint cleanup failed: {}", e.getMessage(), e); + } + } + + this.appAdapter.addStreamListener(this); + } + + public MuxAdaptor getMuxAdaptor(String streamId) { + IAntMediaStreamHandler application = getApplication(); + return application != null ? application.getMuxAdaptor(streamId) : null; + } + + public IAntMediaStreamHandler getApplication() { + if (appAdapter != null) { + return appAdapter; + } + return applicationContext != null + ? (IAntMediaStreamHandler) applicationContext.getBean(AntMediaApplicationAdapter.BEAN_NAME) + : null; + } + + @Override + public void streamStarted(Broadcast broadcast) { + if (broadcast == null || StringUtils.isBlank(broadcast.getStreamId())) { + logger.warn("Ignoring stream start event because broadcast or stream id is missing"); + return; + } + + String streamId = broadcast.getStreamId(); + if (isMutedReplicaStream(streamId)) { + handleMutedReplicaStarted(streamId); + } + else { + handleSourceStreamStarted(streamId); + } + } + + private void handleMutedReplicaStarted(String mutedReplicaStreamId) { + if (stripAbrSuffix(mutedReplicaStreamId) != null) { + return; + } + + logger.info("Muted replica stream started: {}", mutedReplicaStreamId); + + String sourceStreamId = getSourceStreamId(mutedReplicaStreamId); + MuxAdaptor targetAdaptor = getMuxAdaptor(mutedReplicaStreamId); + MuxAdaptor sourceAdaptor = getMuxAdaptor(sourceStreamId); + + if (!(sourceAdaptor instanceof io.antmedia.enterprise.adaptive.EncoderAdaptor) + || !(targetAdaptor instanceof io.antmedia.enterprise.adaptive.EncoderAdaptor)) { + logger.warn("Cannot wire muted replica stream {} because source adaptor ({}) or target adaptor ({}) is not an EncoderAdaptor", + mutedReplicaStreamId, getAdaptorName(sourceAdaptor), getAdaptorName(targetAdaptor)); + return; + } + + MutedStreamManager previousManager = mutedStreamManagers.remove(mutedReplicaStreamId); + if (previousManager != null) { + previousManager.stop(); + } + + MutedStreamManager mutedStreamManager = new MutedStreamManager( + (io.antmedia.enterprise.adaptive.EncoderAdaptor) sourceAdaptor, + (io.antmedia.enterprise.adaptive.EncoderAdaptor) targetAdaptor); + if (mutedStreamManager.start()) { + mutedStreamManagers.put(mutedReplicaStreamId, mutedStreamManager); + } + else { + logger.warn("Muted replica manager could not be started for {}", mutedReplicaStreamId); + } + } + + public MutedStreamReplicatorSettings loadSettings() { + Object raw = getApplication().getAppSettings().getCustomSetting(PLUGIN_BEAN_NAME); + if (raw != null) { + try { + return gson.fromJson(raw.toString(), MutedStreamReplicatorSettings.class); + } catch (Exception e) { + logger.error("Invalid MutedStreamReplicator settings, using defaults: {}", e.getMessage()); + } + } + return new MutedStreamReplicatorSettings(); + } + + private void handleSourceStreamStarted(String sourceStreamId) { + MuxAdaptor sourceAdaptor = getMuxAdaptor(sourceStreamId); + if (sourceAdaptor == null) { + logger.warn("Cannot start muted replica for {} because source mux adaptor is not available yet", sourceStreamId); + return; + } + + String mutedReplicaStreamId = getMutedStreamId(sourceStreamId); + String replicaEndpointUrl = getReplicaRtmpUrl(mutedReplicaStreamId); + + IAntMediaStreamHandler app = getApplication(); + if (app.getAppSettings().isAcceptOnlyStreamsInDataStore() && app.getDataStore().get(mutedReplicaStreamId) == null) { + Broadcast b = AntMediaApplicationAdapter.createZombiBroadcast( + mutedReplicaStreamId, mutedReplicaStreamId, + IAntMediaStreamHandler.BROADCAST_STATUS_CREATED, + "MutedStreamReplicatorPlugin", + "", "{}", ""); + b.setType(AntMediaApplicationAdapter.LIVE_STREAM); + + app.getDataStore().save(b); + } + + Result result = sourceAdaptor.startEndpointStreaming(replicaEndpointUrl, 0); + + if (result != null && result.isSuccess()) { + logger.info("Started muted replica stream: {}", mutedReplicaStreamId); + Endpoint replicaEndpoint = new Endpoint(); + replicaEndpoint.setEndpointUrl(replicaEndpointUrl); + replicaEndpoint.setEndpointServiceId("muted-" + replicaEndpointUrl.hashCode()); + app.getDataStore().addEndpoint(sourceStreamId, replicaEndpoint); + } + else { + String reason = result != null ? result.getMessage() : "No result returned from startEndpointStreaming"; + logger.error("Failed to start muted replica stream {}. Reason: {}", mutedReplicaStreamId, reason); + } + } + + private String getMutedStreamId(String sourceStreamId) { + MutedStreamReplicatorSettings settings = loadSettings(); + return settings.getMutedStreamPrefix() + sourceStreamId + settings.getMutedStreamSuffix(); + } + + private boolean isMutedReplicaStream(String streamId) { + MutedStreamReplicatorSettings settings = loadSettings(); + String prefix = settings.getMutedStreamPrefix(); + String suffix = settings.getMutedStreamSuffix(); + if (matchesMutedPattern(streamId, prefix, suffix)) { + return true; + } + + // Also catch ABR renditions of a muted stream (EX: "streamId-muted_480p1000kbps") + String base = stripAbrSuffix(streamId); + return base != null && matchesMutedPattern(base, prefix, suffix); + } + + private static boolean matchesMutedPattern(String streamId, String prefix, String suffix) { + boolean matchesPrefix = StringUtils.isBlank(prefix) || streamId.startsWith(prefix); + boolean matchesSuffix = StringUtils.isBlank(suffix) || streamId.endsWith(suffix); + return matchesPrefix && matchesSuffix && streamId.length() > prefix.length() + suffix.length(); + } + + /** Returns the stream ID without an ABR rendition suffix like "_480p1000kbps", or null if none. */ + private static String stripAbrSuffix(String streamId) { + int idx = streamId.lastIndexOf('_'); + if (idx < 0) return null; + String tail = streamId.substring(idx + 1); + return tail.matches("\\d+p\\d+kbps") ? streamId.substring(0, idx) : null; + } + + private String getSourceStreamId(String mutedReplicaStreamId) { + MutedStreamReplicatorSettings settings = loadSettings(); + String result = mutedReplicaStreamId; + if (!StringUtils.isBlank(settings.getMutedStreamPrefix())) { + result = result.substring(settings.getMutedStreamPrefix().length()); + } + if (!StringUtils.isBlank(settings.getMutedStreamSuffix())) { + result = result.substring(0, result.length() - settings.getMutedStreamSuffix().length()); + } + return result; + } + + private String getAdaptorName(MuxAdaptor muxAdaptor) { + return muxAdaptor != null ? muxAdaptor.getClass().getSimpleName() : "null"; + } + + private String getReplicaRtmpUrl(String streamId) { + return getLocalReplicaUrlPrefix() + streamId; + } + + private String getLocalReplicaUrlPrefix() { + AntMediaApplicationAdapter application = (AntMediaApplicationAdapter) getApplication(); + return "rtmp://127.0.0.1:" + application.getServerSettings().getRtmpPort() + "/" + + application.getScope().getName() + "/"; + } + + // Drops replica endpoints left behind by a previous crash that skipped streamFinished. + private int cleanupOrphanedReplicaEndpoints() { + IAntMediaStreamHandler app = getApplication(); + if (app == null || app.getDataStore() == null) { + return 0; + } + String prefix = getLocalReplicaUrlPrefix(); + + int removed = 0; + final int pageSize = 100; + int offset = 0; + List page; + do { + page = app.getDataStore().getBroadcastList(offset, pageSize, null, null, null, null); + if (page == null) { + break; + } + for (Broadcast broadcast : page) { + if (broadcast == null || broadcast.getStreamId() == null) { + continue; + } + List endpoints = broadcast.getEndPointList(); + if (endpoints == null || endpoints.isEmpty()) { + continue; + } + // Snapshot the list — removeEndpoint mutates it. + for (Endpoint endpoint : new ArrayList<>(endpoints)) { + String url = endpoint != null ? endpoint.getEndpointUrl() : null; + if (url == null || !url.startsWith(prefix)) { + continue; + } + if (!isMutedReplicaStream(url.substring(prefix.length()))) { + continue; + } + if (app.getDataStore().removeEndpoint(broadcast.getStreamId(), endpoint, true)) { + removed++; + logger.info("Cleanup removed orphan replica endpoint {} from broadcast {}", url, broadcast.getStreamId()); + } + } + } + offset += pageSize; + } while (page.size() == pageSize); + + return removed; + } + + @Override + public void streamFinished(Broadcast broadcast) { + if (broadcast == null || StringUtils.isBlank(broadcast.getStreamId())) { + return; + } + + String streamId = broadcast.getStreamId(); + + MutedStreamManager mutedStreamManager = mutedStreamManagers.remove(streamId); + if (mutedStreamManager != null) { + mutedStreamManager.stop(); + } + + if (!isMutedReplicaStream(streamId)) { + String mutedStreamId = getMutedStreamId(streamId); + IAntMediaStreamHandler app = getApplication(); + if (!loadSettings().isKeepMutedStreamsAfterEnd() && app.getDataStore().get(mutedStreamId) != null) { + app.getDataStore().delete(mutedStreamId); + logger.info("Deleted muted replica broadcast {} after source stream {} ended", mutedStreamId, streamId); + } + Endpoint replicaEndpoint = new Endpoint(); + replicaEndpoint.setEndpointUrl(getReplicaRtmpUrl(mutedStreamId)); + app.getDataStore().removeEndpoint(streamId, replicaEndpoint, true); + } + } + + @Override + public void joinedTheRoom(String roomId, String streamId) { + } + + @Override + public void leftTheRoom(String roomId, String streamId) { + } +} diff --git a/MutedStreamReplicator/src/main/java/io/antmedia/plugin/MutedStreamReplicatorSettings.java b/MutedStreamReplicator/src/main/java/io/antmedia/plugin/MutedStreamReplicatorSettings.java new file mode 100644 index 00000000..afef4b8d --- /dev/null +++ b/MutedStreamReplicator/src/main/java/io/antmedia/plugin/MutedStreamReplicatorSettings.java @@ -0,0 +1,31 @@ +package io.antmedia.plugin; + +public class MutedStreamReplicatorSettings { + + /** Suffix appended to the source stream ID to form the muted replica stream ID. */ + private String mutedStreamSuffix = "-muted"; + + /** Prefix prepended to the source stream ID to form the muted replica stream ID. */ + private String mutedStreamPrefix = ""; + + /** + * If true, the muted replica broadcast record is kept in the datastore after the source stream ends. + * If false (default), the muted replica broadcast is deleted when the source stream ends. + */ + private boolean keepMutedStreamsAfterEnd = false; + + /** If true, removes replica endpoints left in the datastore by a previous crash on startup. */ + private boolean cleanupOrphanedReplicasOnStartup = true; + + public String getMutedStreamSuffix() { return mutedStreamSuffix; } + public void setMutedStreamSuffix(String v) { this.mutedStreamSuffix = v; } + + public String getMutedStreamPrefix() { return mutedStreamPrefix; } + public void setMutedStreamPrefix(String v) { this.mutedStreamPrefix = v; } + + public boolean isKeepMutedStreamsAfterEnd() { return keepMutedStreamsAfterEnd; } + public void setKeepMutedStreamsAfterEnd(boolean v) { this.keepMutedStreamsAfterEnd = v; } + + public boolean isCleanupOrphanedReplicasOnStartup() { return cleanupOrphanedReplicasOnStartup; } + public void setCleanupOrphanedReplicasOnStartup(boolean v) { this.cleanupOrphanedReplicasOnStartup = v; } +} diff --git a/MutedStreamReplicator/src/main/java/io/antmedia/plugin/mutedstreamreplicator/MutedPacketReceiver.java b/MutedStreamReplicator/src/main/java/io/antmedia/plugin/mutedstreamreplicator/MutedPacketReceiver.java new file mode 100644 index 00000000..3cce3c02 --- /dev/null +++ b/MutedStreamReplicator/src/main/java/io/antmedia/plugin/mutedstreamreplicator/MutedPacketReceiver.java @@ -0,0 +1,140 @@ +package io.antmedia.plugin.mutedstreamreplicator; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.bytedeco.ffmpeg.avcodec.AVCodec; +import org.bytedeco.ffmpeg.avcodec.AVCodecContext; +import org.bytedeco.ffmpeg.avcodec.AVCodecParameters; +import org.bytedeco.ffmpeg.avcodec.AVPacket; +import org.bytedeco.ffmpeg.avformat.AVFormatContext; +import org.bytedeco.ffmpeg.avformat.AVStream; +import org.bytedeco.ffmpeg.avutil.AVRational; + +import static org.bytedeco.ffmpeg.global.avcodec.AV_PKT_FLAG_KEY; +import static org.bytedeco.ffmpeg.global.avutil.AVMEDIA_TYPE_VIDEO; +import org.red5.server.api.scope.IScope; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.antmedia.muxer.Muxer; + +public class MutedPacketReceiver extends Muxer { + private static final Logger logger = LoggerFactory.getLogger(MutedPacketReceiver.class); + + private final List targetMuxerList; + private volatile boolean firstKeyframeSeen = false; + + public MutedPacketReceiver(List targetMuxerList) { + super(null); + this.targetMuxerList = targetMuxerList != null ? targetMuxerList : Collections.emptyList(); + } + + @Override + public void init(IScope scope, String name, int resolution, boolean overrideIfExist, String subFolder, int bitrate) { + } + + @Override + public synchronized boolean addStream(AVCodecParameters codecParameters, AVRational timebase, int streamIndex) { + return true; + } + + @Override + public synchronized boolean addStream(AVCodec codec, AVCodecContext codecContext, int streamIndex) { + return true; + } + + @Override + public synchronized void writePacket(AVPacket pkt, AVStream stream) { + if (stream.codecpar().codec_type() != AVMEDIA_TYPE_VIDEO) { + return; + } + boolean isKey = (pkt.flags() & AV_PKT_FLAG_KEY) != 0; + if (!firstKeyframeSeen) { + if (isKey) { + firstKeyframeSeen = true; + } else { + return; + } + } + for (Muxer muxer : getTargetMuxerSnapshot()) { + muxer.writePacket(pkt, stream); + } + } + + @Override + public synchronized void writePacket(AVPacket pkt, AVCodecContext codecContext) { + if (codecContext.codec_type() != AVMEDIA_TYPE_VIDEO) { + return; + } + boolean isKey = (pkt.flags() & AV_PKT_FLAG_KEY) != 0; + if (!firstKeyframeSeen) { + if (isKey) { + firstKeyframeSeen = true; + // On first keyframe, codecContext is fully initialized. Update each target + // muxer's inputTimeBaseMap (for correct timestamp rescaling) and videoExtradata + // (for correct SPS/PPS prepending) to match the source encoder. + for (Muxer muxer : getTargetMuxerSnapshot()) { + muxer.contextChanged(codecContext, 0); + } + } else { + return; + } + } + // Force video stream index to 0. Muxers sometime register video at index 0, audio at 1. + // Source encoder (esp. WebRTC) may produce video packets with streamIdx=1, which would + // be treated as audio by downstream muxers, causing errors. + pkt.stream_index(0); + for (Muxer muxer : getTargetMuxerSnapshot()) { + muxer.writePacket(pkt, codecContext); + } + } + + @Override + public synchronized void writeAudioBuffer(ByteBuffer audioFrame, int streamIndex, long timestamp) { + } + + @Override + public synchronized void writeVideoBuffer(ByteBuffer encodedVideoFrame, long dts, int frameRotation, + int streamIndex, boolean isKeyFrame, long firstFrameTimeStamp, long pts) { + for (Muxer muxer : getTargetMuxerSnapshot()) { + muxer.writeVideoBuffer(encodedVideoFrame, dts, frameRotation, streamIndex, isKeyFrame, firstFrameTimeStamp, pts); + } + } + + @Override + public synchronized void writeTrailer() { + isRunning.set(false); + } + + @Override + public synchronized boolean prepareIO() { + isRunning.set(true); + return true; + } + + @Override + public boolean isCodecSupported(int codecId) { + return true; + } + + @Override + public AVFormatContext getOutputFormatContext() { + return null; + } + + /** + * Copy the current target list so packet fan-out does not hold the encoder lock while writing. + */ + private List getTargetMuxerSnapshot() { + synchronized (targetMuxerList) { + if (targetMuxerList.isEmpty()) { + logger.debug("Muted packet receiver has no target muxers for stream {}", streamId); + return Collections.emptyList(); + } + return new ArrayList<>(targetMuxerList); + } + } +} diff --git a/MutedStreamReplicator/src/main/java/io/antmedia/plugin/mutedstreamreplicator/MutedStreamManager.java b/MutedStreamReplicator/src/main/java/io/antmedia/plugin/mutedstreamreplicator/MutedStreamManager.java new file mode 100644 index 00000000..0e6bc6f5 --- /dev/null +++ b/MutedStreamReplicator/src/main/java/io/antmedia/plugin/mutedstreamreplicator/MutedStreamManager.java @@ -0,0 +1,194 @@ +package io.antmedia.plugin.mutedstreamreplicator; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.antmedia.enterprise.adaptive.EncoderAdaptor; +import io.antmedia.enterprise.adaptive.StreamAdaptor; +import io.antmedia.enterprise.adaptive.base.VideoEncoder; +import io.antmedia.enterprise.preview.PreviewEncoder; +import io.antmedia.muxer.Muxer; +import io.vertx.core.Vertx; + +public class MutedStreamManager { + private static final long TARGET_BUFFER_CLEAR_PERIOD_MS = 30; + private static final Logger logger = LoggerFactory.getLogger(MutedStreamManager.class); + + private final EncoderAdaptor sourceAdaptor; + private final EncoderAdaptor targetAdaptor; + private final List attachedReceivers = new ArrayList<>(); + private long targetBufferCleanerTimerId = -1; + + public MutedStreamManager(EncoderAdaptor sourceAdaptor, EncoderAdaptor targetAdaptor) { + this.sourceAdaptor = sourceAdaptor; + this.targetAdaptor = targetAdaptor; + } + + public boolean start() { + stop(); + if (sourceAdaptor == null || targetAdaptor == null) { + logger.warn("Cannot start muted stream manager because source or target adaptor is null"); + return false; + } + + targetAdaptor.setEnableAudio(false); + + startTargetBufferCleaner(); + + int attachedCount = 0; + attachedCount += attachDirectMuxer(); + attachedCount += attachRenditionMuxers(); + + if (attachedCount == 0) { + logger.warn("No muted packet receiver could be attached for source stream {} and target stream {}", + sourceAdaptor.getStreamId(), targetAdaptor.getStreamId()); + return false; + } + + return true; + } + + /** + * Direct muxers on the target stream are the best-effort path for formats that do not depend on renditions. + */ + private int attachDirectMuxer() { + List targetMuxers = targetAdaptor.getMuxerList(); + if (targetMuxers == null || targetMuxers.isEmpty()) { + logger.info("Target stream {} has no direct muxers to mirror", targetAdaptor.getStreamId()); + return 0; + } + + MutedPacketReceiver receiver = new MutedPacketReceiver(targetMuxers); + if (!sourceAdaptor.addMuxer(receiver)) { + logger.warn("Could not attach direct muted packet receiver from {} to {}", + sourceAdaptor.getStreamId(), targetAdaptor.getStreamId()); + return 0; + } + attachedReceivers.add(AttachedReceiver.forMuxAdaptor(sourceAdaptor, receiver)); + return 1; + } + + private int attachRenditionMuxers() { + Map> targetEncodersByHeight = getTargetEncodersByHeight(); + if (targetEncodersByHeight.isEmpty()) { + logger.info("Target stream {} has no rendition encoders to mirror", targetAdaptor.getStreamId()); + return 0; + } + + int attachedCount = 0; + for (StreamAdaptor sourceStreamAdaptor : sourceAdaptor.getStreamAdaptorList()) { + for (VideoEncoder sourceVideoEncoder : sourceStreamAdaptor.getVideoEncoderList()) { + // PreviewEncoder causes problems, skipped. + if (sourceVideoEncoder instanceof PreviewEncoder) { + continue; + } + + List matchingTargetEncoders = targetEncodersByHeight.get(sourceVideoEncoder.getResolutionHeight()); + if (matchingTargetEncoders == null || matchingTargetEncoders.isEmpty()) { + continue; + } + + for (VideoEncoder targetVideoEncoder : matchingTargetEncoders) { + List targetMuxers = targetVideoEncoder.getMuxerList(); + if (targetMuxers == null || targetMuxers.isEmpty()) { + continue; + } + + // Remove preview muxer for muted streams, since it causes some weird log spam. + List filteredMuxers = targetMuxers.stream() + .filter(m -> !(m instanceof io.antmedia.enterprise.preview.PreviewMuxer)) + .collect(java.util.stream.Collectors.toList()); + if (filteredMuxers.isEmpty()) { + continue; + } + + logger.info("Attaching muted rendition receiver for source stream {} at {}p to {} target muxers: {}", + sourceAdaptor.getStreamId(), sourceVideoEncoder.getResolutionHeight(), + filteredMuxers.size(), + filteredMuxers.stream().map(m -> m.getClass().getSimpleName() + + "[idx=" + m.getRegisteredStreamIndexList() + "]") + .collect(java.util.stream.Collectors.joining(", "))); + + MutedPacketReceiver receiver = new MutedPacketReceiver(filteredMuxers); + sourceVideoEncoder.addMuxer(receiver); + attachedReceivers.add(AttachedReceiver.forVideoEncoder(sourceVideoEncoder, receiver)); + attachedCount++; + } + } + } + return attachedCount; + } + + private Map> getTargetEncodersByHeight() { + Map> targetEncodersByHeight = new LinkedHashMap<>(); + for (StreamAdaptor targetStreamAdaptor : targetAdaptor.getStreamAdaptorList()) { + for (VideoEncoder targetVideoEncoder : targetStreamAdaptor.getVideoEncoderList()) { + if (targetVideoEncoder instanceof PreviewEncoder) { + continue; + } + targetEncodersByHeight + .computeIfAbsent(targetVideoEncoder.getResolutionHeight(), ignored -> new ArrayList<>()) + .add(targetVideoEncoder); + } + } + return targetEncodersByHeight; + } + + public void stop() { + stopTargetBufferCleaner(); + for (AttachedReceiver attachedReceiver : attachedReceivers) { + attachedReceiver.detach(); + } + attachedReceivers.clear(); + } + + //this is a trick to prevent feeding the encoder adaptor with packets + private void startTargetBufferCleaner() { + stopTargetBufferCleaner(); + + Vertx vertx = targetAdaptor.getVertx(); + if (vertx == null) { + logger.warn("Vertx is not available for muted target stream {}", targetAdaptor.getStreamId()); + return; + } + + targetAdaptor.setBufferTimeMs(TARGET_BUFFER_CLEAR_PERIOD_MS); + targetBufferCleanerTimerId = vertx.setPeriodic(TARGET_BUFFER_CLEAR_PERIOD_MS, id -> targetAdaptor.getBufferQueue().clear()); + } + + private void stopTargetBufferCleaner() { + Vertx vertx = targetAdaptor != null ? targetAdaptor.getVertx() : null; + if (targetBufferCleanerTimerId != -1 && vertx != null) { + vertx.cancelTimer(targetBufferCleanerTimerId); + } + targetBufferCleanerTimerId = -1; + } + + private static class AttachedReceiver { + private final MutedPacketReceiver receiver; + private final Runnable removeAction; + + private AttachedReceiver(MutedPacketReceiver receiver, Runnable removeAction) { + this.receiver = receiver; + this.removeAction = removeAction; + } + + public static AttachedReceiver forMuxAdaptor(EncoderAdaptor adaptor, MutedPacketReceiver receiver) { + return new AttachedReceiver(receiver, () -> adaptor.removeMuxer(receiver)); + } + + public static AttachedReceiver forVideoEncoder(VideoEncoder encoder, MutedPacketReceiver receiver) { + return new AttachedReceiver(receiver, () -> encoder.removeMuxer(receiver)); + } + + public void detach() { + removeAction.run(); + receiver.writeTrailer(); + } + } +} diff --git a/MutedStreamReplicator/src/test/java/io/antmedia/plugin/MutedStreamReplicatorPluginUnitTest.java b/MutedStreamReplicator/src/test/java/io/antmedia/plugin/MutedStreamReplicatorPluginUnitTest.java new file mode 100644 index 00000000..dc60880a --- /dev/null +++ b/MutedStreamReplicator/src/test/java/io/antmedia/plugin/MutedStreamReplicatorPluginUnitTest.java @@ -0,0 +1,192 @@ +package io.antmedia.plugin; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedList; +import java.util.Map; +import java.util.Queue; + +import org.junit.Test; +import org.red5.server.api.scope.IScope; +import org.springframework.context.ApplicationContext; + +import io.antmedia.AntMediaApplicationAdapter; +import io.antmedia.AppSettings; +import io.antmedia.datastore.db.DataStore; +import io.antmedia.datastore.db.types.Broadcast; +import io.antmedia.enterprise.adaptive.EncoderAdaptor; +import io.antmedia.rest.model.Result; +import io.antmedia.settings.ServerSettings; +import io.antmedia.muxer.MuxAdaptor; +import io.antmedia.muxer.Muxer; +import io.vertx.core.Vertx; + +public class MutedStreamReplicatorPluginUnitTest { + + @Test + public void testSetApplicationContextRegistersStreamListener() { + ApplicationContext context = mock(ApplicationContext.class); + AntMediaApplicationAdapter appAdapter = mock(AntMediaApplicationAdapter.class); + MutedStreamReplicatorPlugin plugin = new MutedStreamReplicatorPlugin(); + + when(context.getBean(AntMediaApplicationAdapter.BEAN_NAME)).thenReturn(appAdapter); + + plugin.setApplicationContext(context); + + verify(appAdapter).addStreamListener(plugin); + } + + @Test + public void testGetMuxAdaptorReturnsNullWhenApplicationIsMissing() { + MutedStreamReplicatorPlugin plugin = new MutedStreamReplicatorPlugin(); + assertTrue(plugin.getMuxAdaptor("stream1") == null); + } + + @Test + public void testSourceStreamStartedStartsReplicaEndpoint() throws Exception { + MutedStreamReplicatorPlugin plugin = spy(new MutedStreamReplicatorPlugin()); + EncoderAdaptor sourceAdaptor = mock(EncoderAdaptor.class); + AntMediaApplicationAdapter appAdapter = mock(AntMediaApplicationAdapter.class); + AppSettings appSettings = mock(AppSettings.class); + ServerSettings serverSettings = mock(ServerSettings.class); + IScope scope = mock(IScope.class); + Broadcast broadcast = new Broadcast(); + broadcast.setStreamId("stream1"); + + when(serverSettings.getRtmpPort()).thenReturn(1935); + when(scope.getName()).thenReturn("LiveApp"); + when(appAdapter.getServerSettings()).thenReturn(serverSettings); + when(appAdapter.getScope()).thenReturn(scope); + when(appAdapter.getAppSettings()).thenReturn(appSettings); + when(appSettings.getCustomSetting(any())).thenReturn(null); + when(sourceAdaptor.startEndpointStreaming(any(String.class), eq(0))).thenReturn(new Result(true)); + setField(plugin, "appAdapter", appAdapter); + doReturn(sourceAdaptor).when(plugin).getMuxAdaptor("stream1"); + doReturn(appAdapter).when(plugin).getApplication(); + + plugin.streamStarted(broadcast); + + verify(sourceAdaptor).startEndpointStreaming("rtmp://127.0.0.1:1935/LiveApp/stream1-muted", 0); + } + + @Test + public void testMutedReplicaStartedCreatesManagerAndStopsOnFinish() throws Exception { + MutedStreamReplicatorPlugin plugin = spy(new MutedStreamReplicatorPlugin()); + EncoderAdaptor sourceAdaptor = mock(EncoderAdaptor.class); + EncoderAdaptor targetAdaptor = mock(EncoderAdaptor.class); + AntMediaApplicationAdapter appAdapter = mock(AntMediaApplicationAdapter.class); + AppSettings appSettings = mock(AppSettings.class); + Vertx vertx = mock(Vertx.class); + Broadcast broadcast = new Broadcast(); + Broadcast finished = new Broadcast(); + broadcast.setStreamId("stream1-muted"); + finished.setStreamId("stream1-muted"); + + when(appAdapter.getAppSettings()).thenReturn(appSettings); + when(appSettings.getCustomSetting(any())).thenReturn(null); + when(sourceAdaptor.addMuxer(any())).thenReturn(true); + when(targetAdaptor.getMuxerList()).thenReturn(Arrays.asList(mock(Muxer.class))); + when(sourceAdaptor.getStreamAdaptorList()).thenReturn(new LinkedList<>()); + when(targetAdaptor.getStreamAdaptorList()).thenReturn(new LinkedList<>()); + when(sourceAdaptor.getStreamId()).thenReturn("stream1"); + when(targetAdaptor.getStreamId()).thenReturn("stream1-muted"); + when(targetAdaptor.getVertx()).thenReturn(vertx); + when(vertx.setPeriodic(any(Long.class), any())).thenReturn(11L); + doReturn(sourceAdaptor).when(plugin).getMuxAdaptor("stream1"); + doReturn(targetAdaptor).when(plugin).getMuxAdaptor("stream1-muted"); + doReturn(appAdapter).when(plugin).getApplication(); + + plugin.streamStarted(broadcast); + + verify(targetAdaptor).setBufferTimeMs(500); + verify(vertx).setPeriodic(any(Long.class), any()); + assertEquals(1, getManagers(plugin).size()); + + plugin.streamFinished(finished); + + verify(vertx).cancelTimer(11L); + verify(sourceAdaptor).removeMuxer(any()); + assertEquals(0, getManagers(plugin).size()); + } + + @Test + public void testMutedReplicaStartedIgnoresNonEncoderAdaptors() throws Exception { + MutedStreamReplicatorPlugin plugin = spy(new MutedStreamReplicatorPlugin()); + AntMediaApplicationAdapter appAdapter = mock(AntMediaApplicationAdapter.class); + AppSettings appSettings = mock(AppSettings.class); + Broadcast broadcast = new Broadcast(); + broadcast.setStreamId("stream1-muted"); + + when(appAdapter.getAppSettings()).thenReturn(appSettings); + when(appSettings.getCustomSetting(any())).thenReturn(null); + doReturn(mock(MuxAdaptor.class)).when(plugin).getMuxAdaptor("stream1"); + doReturn(mock(MuxAdaptor.class)).when(plugin).getMuxAdaptor("stream1-muted"); + doReturn(appAdapter).when(plugin).getApplication(); + + plugin.streamStarted(broadcast); + + assertEquals(0, getManagers(plugin).size()); + } + + @Test + public void testInvalidBroadcastsAreIgnored() throws Exception { + MutedStreamReplicatorPlugin plugin = spy(new MutedStreamReplicatorPlugin()); + Broadcast broadcast = new Broadcast(); + broadcast.setStreamId(" "); + + plugin.streamStarted((Broadcast) null); + plugin.streamStarted(broadcast); + plugin.streamFinished((Broadcast) null); + plugin.streamFinished(broadcast); + + verify(plugin, never()).getMuxAdaptor(any()); + } + + @SuppressWarnings("unchecked") + private Map getManagers(MutedStreamReplicatorPlugin plugin) { + try { + Field field = MutedStreamReplicatorPlugin.class.getDeclaredField("mutedStreamManagers"); + field.setAccessible(true); + return (Map) field.get(plugin); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void setField(Object target, String fieldName, Object value) { + try { + Field field = findField(target.getClass(), fieldName); + field.setAccessible(true); + field.set(target, value); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + private Field findField(Class type, String fieldName) throws NoSuchFieldException { + Class current = type; + while (current != null) { + try { + return current.getDeclaredField(fieldName); + } + catch (NoSuchFieldException e) { + current = current.getSuperclass(); + } + } + throw new NoSuchFieldException(fieldName); + } +} diff --git a/MutedStreamReplicator/src/test/java/io/antmedia/plugin/integration/MutedStreamReplicatorIntegrationTest.java b/MutedStreamReplicator/src/test/java/io/antmedia/plugin/integration/MutedStreamReplicatorIntegrationTest.java new file mode 100644 index 00000000..5750e5d4 --- /dev/null +++ b/MutedStreamReplicator/src/test/java/io/antmedia/plugin/integration/MutedStreamReplicatorIntegrationTest.java @@ -0,0 +1,304 @@ +package io.antmedia.plugin.integration; + +import static org.awaitility.Awaitility.await; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.CookieManager; +import java.net.CookiePolicy; +import java.net.HttpURLConnection; +import java.net.InetAddress; +import java.net.URI; +import java.net.URLEncoder; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.TimeUnit; + +import org.awaitility.core.ThrowingRunnable; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.gson.Gson; + +import io.antmedia.AppSettings; +import io.antmedia.EncoderSettings; +import io.antmedia.datastore.db.types.Broadcast; +import io.antmedia.datastore.db.types.User; +import io.antmedia.rest.model.Result; + +public class MutedStreamReplicatorIntegrationTest { + + private static final Gson GSON = new Gson(); + private static final String APP_NAME = "LiveApp"; + private static final String TEST_USER_EMAIL = "test@antmedia.io"; + private static final String TEST_USER_PASSWORD_HASH = "05a671c66aefea124cc08b76ea6d30bb"; + private static final String MUTED_SUFFIX = "-muted"; + private static final Duration REQUEST_TIMEOUT = Duration.ofSeconds(10); + + private final CookieManager cookieManager = new CookieManager(null, CookiePolicy.ACCEPT_ALL); + private HttpClient httpClient; + private Process ffmpegProcess; + private AppSettings originalSettings; + private String streamId; + private String mutedStreamId; + + @Before + public void before() { + httpClient = HttpClient.newBuilder() + .connectTimeout(REQUEST_TIMEOUT) + .cookieHandler(cookieManager) + .followRedirects(HttpClient.Redirect.NORMAL) + .build(); + } + + @After + public void after() throws Exception { + stopProcess(ffmpegProcess); + deleteBroadcastIfExists(mutedStreamId); + deleteBroadcastIfExists(streamId); + restoreAppSettings(); + } + + @Test + public void testMutedReplicaIsGeneratedForAdaptiveRtmpStream() throws Exception { + authenticateConsole(); + + originalSettings = getAppSettings(); + AppSettings updatedSettings = cloneSettings(originalSettings); + updatedSettings.setEncoderSettings(Arrays.asList( + new EncoderSettings(240, 300_000, 64_000, true), + new EncoderSettings(360, 500_000, 64_000, true))); + assertTrue(setAppSettings(updatedSettings).isSuccess()); + + await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> { + List encoderSettings = getAppSettings().getEncoderSettings(); + assertNotNull(encoderSettings); + assertEquals(2, encoderSettings.size()); + }); + + streamId = "muted-replicator-" + System.currentTimeMillis(); + mutedStreamId = streamId + MUTED_SUFFIX; + + ffmpegProcess = startPublishingStream(streamId); + + await().atMost(45, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> isBroadcasting(streamId)); + await().atMost(45, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> isBroadcasting(mutedStreamId)); + + String originalPlaylist = streamPlaylistUrl(streamId); + String originalAdaptivePlaylist = adaptivePlaylistUrl(streamId); + String mutedPlaylist = streamPlaylistUrl(mutedStreamId); + String mutedAdaptivePlaylist = adaptivePlaylistUrl(mutedStreamId); + + await().atMost(60, TimeUnit.SECONDS).pollInterval(2, TimeUnit.SECONDS).until(() -> urlAvailable(originalPlaylist)); + await().atMost(60, TimeUnit.SECONDS).pollInterval(2, TimeUnit.SECONDS).until(() -> urlAvailable(originalAdaptivePlaylist)); + await().atMost(60, TimeUnit.SECONDS).pollInterval(2, TimeUnit.SECONDS).until(() -> urlAvailable(mutedPlaylist)); + await().atMost(60, TimeUnit.SECONDS).pollInterval(2, TimeUnit.SECONDS).until(() -> urlAvailable(mutedAdaptivePlaylist)); + + assertTrue("Original stream should contain video and audio", probeStream(originalPlaylist, true, true)); + assertTrue("Muted stream should be probeable and contain video", probeStream(mutedPlaylist, true, false)); + assertTrue("Original adaptive playlist should be probeable", probeStream(originalAdaptivePlaylist, true, false)); + assertTrue("Muted adaptive playlist should be probeable", probeStream(mutedAdaptivePlaylist, true, false)); + } + + private void authenticateConsole() throws Exception { + Result firstLoginStatus = get(ROOTServiceUrl() + "/first-login-status", Result.class); + if (firstLoginStatus.isSuccess()) { + User user = new User(); + user.setEmail(TEST_USER_EMAIL); + user.setPassword(TEST_USER_PASSWORD_HASH); + assertTrue(post(ROOTServiceUrl() + "/users/initial", user, Result.class).isSuccess()); + } + + User user = new User(); + user.setEmail(TEST_USER_EMAIL); + user.setPassword(TEST_USER_PASSWORD_HASH); + assertTrue(post(ROOTServiceUrl() + "/users/authenticate", user, Result.class).isSuccess()); + } + + private AppSettings getAppSettings() throws Exception { + return get(ROOTServiceUrl() + "/applications/settings/" + APP_NAME, AppSettings.class); + } + + private Result setAppSettings(AppSettings appSettings) throws Exception { + return post(ROOTServiceUrl() + "/applications/settings/" + APP_NAME, appSettings, Result.class); + } + + private AppSettings cloneSettings(AppSettings appSettings) { + return GSON.fromJson(GSON.toJson(appSettings), AppSettings.class); + } + + private Process startPublishingStream(String publishedStreamId) throws IOException { + String ffmpegPath = executablePath("ffmpeg"); + String command = ffmpegPath + + " -stream_loop -1 -re -i " + quote(resolveSampleFile().getAbsolutePath()) + + " -codec copy -f flv rtmp://127.0.0.1/" + APP_NAME + "/" + publishedStreamId; + return new ProcessBuilder("bash", "-lc", command) + .redirectErrorStream(true) + .start(); + } + + private boolean isBroadcasting(String targetStreamId) throws Exception { + Broadcast broadcast = getBroadcast(targetStreamId); + return broadcast != null && "broadcasting".equalsIgnoreCase(broadcast.getStatus()); + } + + private Broadcast getBroadcast(String targetStreamId) throws Exception { + HttpResponse response = send(requestBuilder(appRestUrl() + "/broadcasts/" + targetStreamId).GET().build()); + if (response.statusCode() == HttpURLConnection.HTTP_NOT_FOUND) { + return null; + } + assertEquals(HttpURLConnection.HTTP_OK, response.statusCode()); + return GSON.fromJson(response.body(), Broadcast.class); + } + + private void deleteBroadcastIfExists(String targetStreamId) throws Exception { + if (targetStreamId == null) { + return; + } + + HttpResponse response = send(requestBuilder(appRestUrl() + "/broadcasts/" + targetStreamId).DELETE().build()); + assertTrue("Delete should return 200 or 404 for " + targetStreamId, + response.statusCode() == HttpURLConnection.HTTP_OK || response.statusCode() == HttpURLConnection.HTTP_NOT_FOUND); + } + + private String streamPlaylistUrl(String targetStreamId) { + return "http://127.0.0.1:5080/" + APP_NAME + "/streams/" + targetStreamId + ".m3u8"; + } + + private String adaptivePlaylistUrl(String targetStreamId) { + return "http://127.0.0.1:5080/" + APP_NAME + "/streams/" + targetStreamId + "_adaptive.m3u8"; + } + + private boolean urlAvailable(String url) throws Exception { + HttpResponse response = send(requestBuilder(url) + .method("HEAD", HttpRequest.BodyPublishers.noBody()) + .build()); + return response.statusCode() == HttpURLConnection.HTTP_OK; + } + + private boolean probeStream(String url, boolean requireVideo, boolean requireAudio) throws Exception { + Process process = new ProcessBuilder( + executablePath("ffprobe"), + "-v", "error", + "-show_entries", "stream=codec_type", + "-of", "default=noprint_wrappers=1:nokey=1", + url) + .redirectErrorStream(true) + .start(); + + String output = readAll(process.getInputStream()); + assertEquals("ffprobe should exit successfully for " + url + ". Output: " + output, 0, process.waitFor()); + + boolean hasVideo = output.contains("video"); + boolean hasAudio = output.contains("audio"); + + return (!requireVideo || hasVideo) && (!requireAudio || hasAudio); + } + + private T get(String url, Class responseType) throws Exception { + HttpResponse response = send(requestBuilder(url).GET().build()); + assertEquals(HttpURLConnection.HTTP_OK, response.statusCode()); + return GSON.fromJson(response.body(), responseType); + } + + private T post(String url, Object body, Class responseType) throws Exception { + HttpRequest request = requestBuilder(url) + .POST(HttpRequest.BodyPublishers.ofString(GSON.toJson(body))) + .build(); + HttpResponse response = send(request); + assertEquals("Unexpected status code for " + url + " body: " + response.body(), HttpURLConnection.HTTP_OK, response.statusCode()); + return GSON.fromJson(response.body(), responseType); + } + + private HttpRequest.Builder requestBuilder(String url) { + return HttpRequest.newBuilder(URI.create(url)) + .timeout(REQUEST_TIMEOUT) + .header("Content-Type", "application/json"); + } + + private HttpResponse send(HttpRequest request) throws Exception { + return httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + } + + private void restoreAppSettings() throws Exception { + if (originalSettings == null) { + return; + } + + await().atMost(20, TimeUnit.SECONDS).untilAsserted(new ThrowingRunnable() { + @Override + public void run() throws Throwable { + assertTrue(setAppSettings(originalSettings).isSuccess()); + } + }); + } + + private File resolveSampleFile() { + List candidates = Arrays.asList( + new File("../../Ant-Media-Server/src/test/resources/test.flv"), + new File("../Ant-Media-Server/src/test/resources/test.flv"), + new File("src/test/resources/test.flv")); + for (File candidate : candidates) { + if (candidate.exists()) { + return candidate; + } + } + throw new IllegalStateException("Cannot find test.flv sample file for ffmpeg publish test"); + } + + private String ROOTServiceUrl() throws Exception { + return "http://" + InetAddress.getLocalHost().getHostAddress() + ":5080/rest/v2"; + } + + private String appRestUrl() { + return "http://127.0.0.1:5080/" + APP_NAME + "/rest/v2"; + } + + private String executablePath(String executable) { + String osName = System.getProperty("os.name", "").toLowerCase(Locale.ENGLISH); + if (osName.startsWith("mac os x") || osName.startsWith("darwin")) { + return "/usr/local/bin/" + executable; + } + return executable; + } + + private void stopProcess(Process process) throws InterruptedException { + if (process == null) { + return; + } + process.destroy(); + if (!process.waitFor(5, TimeUnit.SECONDS)) { + process.destroyForcibly(); + process.waitFor(5, TimeUnit.SECONDS); + } + } + + private String readAll(InputStream inputStream) throws IOException { + StringBuilder builder = new StringBuilder(); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { + String line; + while ((line = reader.readLine()) != null) { + builder.append(line).append('\n'); + } + } + return builder.toString(); + } + + private String quote(String value) { + return "'" + value.replace("'", "'\"'\"'") + "'"; + } +} diff --git a/MutedStreamReplicator/src/test/java/io/antmedia/plugin/mutedstreamreplicator/MutedPacketReceiverUnitTest.java b/MutedStreamReplicator/src/test/java/io/antmedia/plugin/mutedstreamreplicator/MutedPacketReceiverUnitTest.java new file mode 100644 index 00000000..69d7d3c1 --- /dev/null +++ b/MutedStreamReplicator/src/test/java/io/antmedia/plugin/mutedstreamreplicator/MutedPacketReceiverUnitTest.java @@ -0,0 +1,86 @@ +package io.antmedia.plugin.mutedstreamreplicator; + +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; + +import org.bytedeco.ffmpeg.avcodec.AVCodecContext; +import org.bytedeco.ffmpeg.avcodec.AVPacket; +import org.bytedeco.ffmpeg.avformat.AVStream; +import org.bytedeco.ffmpeg.avcodec.AVCodecParameters; +import org.bytedeco.ffmpeg.avutil.AVRational; +import org.junit.Test; + +import io.antmedia.muxer.Muxer; + +public class MutedPacketReceiverUnitTest { + + @Test + public void testPrepareIOAndWriteTrailerToggleRunningState() { + MutedPacketReceiver receiver = new MutedPacketReceiver(Collections.emptyList()); + + assertTrue(receiver.prepareIO()); + assertTrue(receiver.getIsRunning().get()); + + receiver.writeTrailer(); + + assertFalse(receiver.getIsRunning().get()); + } + + @Test + public void testWritePacketAndVideoBufferFanOutToAllTargetMuxers() { + Muxer muxer1 = mock(Muxer.class); + Muxer muxer2 = mock(Muxer.class); + MutedPacketReceiver receiver = new MutedPacketReceiver(Arrays.asList(muxer1, muxer2)); + AVPacket packet = new AVPacket(); + AVStream stream = mock(AVStream.class); + AVCodecContext codecContext = mock(AVCodecContext.class); + ByteBuffer videoBuffer = ByteBuffer.allocate(8); + + receiver.writePacket(packet, stream); + receiver.writePacket(packet, codecContext); + receiver.writeVideoBuffer(videoBuffer, 10L, 0, 1, true, 20L, 30L); + + verify(muxer1).writePacket(packet, stream); + verify(muxer2).writePacket(packet, stream); + verify(muxer1).writePacket(packet, codecContext); + verify(muxer2).writePacket(packet, codecContext); + verify(muxer1).writeVideoBuffer(videoBuffer, 10L, 0, 1, true, 20L, 30L); + verify(muxer2).writeVideoBuffer(videoBuffer, 10L, 0, 1, true, 20L, 30L); + } + + @Test + public void testNullOrEmptyTargetsAreHandledGracefully() { + MutedPacketReceiver receiver = new MutedPacketReceiver(null); + AVPacket packet = new AVPacket(); + AVStream stream = mock(AVStream.class); + AVCodecContext codecContext = mock(AVCodecContext.class); + + receiver.writePacket(packet, stream); + receiver.writePacket(packet, codecContext); + receiver.writeVideoBuffer(ByteBuffer.allocate(4), 1L, 0, 0, false, 1L, 1L); + receiver.writeAudioBuffer(ByteBuffer.allocate(4), 0, 1L); + + assertTrue(receiver.isCodecSupported(42)); + assertNull(receiver.getOutputFormatContext()); + assertTrue(receiver.addStream(mock(AVCodecParameters.class), new AVRational(), 0)); + assertTrue(receiver.addStream((org.bytedeco.ffmpeg.avcodec.AVCodec) null, codecContext, 0)); + } + + @Test + public void testSnapshotPreventsUnexpectedWritesWhenTargetListIsEmpty() { + Muxer muxer = mock(Muxer.class); + MutedPacketReceiver receiver = new MutedPacketReceiver(Collections.emptyList()); + + receiver.writePacket(new AVPacket(), mock(AVStream.class)); + + verify(muxer, never()).writePacket(org.mockito.ArgumentMatchers.any(AVPacket.class), org.mockito.ArgumentMatchers.any(AVStream.class)); + } +} diff --git a/MutedStreamReplicator/src/test/java/io/antmedia/plugin/mutedstreamreplicator/MutedStreamManagerUnitTest.java b/MutedStreamReplicator/src/test/java/io/antmedia/plugin/mutedstreamreplicator/MutedStreamManagerUnitTest.java new file mode 100644 index 00000000..6b544343 --- /dev/null +++ b/MutedStreamReplicator/src/test/java/io/antmedia/plugin/mutedstreamreplicator/MutedStreamManagerUnitTest.java @@ -0,0 +1,101 @@ +package io.antmedia.plugin.mutedstreamreplicator; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedList; +import java.util.Queue; + +import org.junit.Test; + +import io.antmedia.enterprise.adaptive.EncoderAdaptor; +import io.antmedia.enterprise.adaptive.StreamAdaptor; +import io.antmedia.enterprise.adaptive.base.VideoEncoder; +import io.antmedia.muxer.Muxer; + +public class MutedStreamManagerUnitTest { + + @Test + public void testStartReturnsFalseWhenAdaptorsAreNull() { + assertFalse(new MutedStreamManager(null, null).start()); + } + + @Test + public void testStartAttachesDirectReceiverAndStopDetachesIt() { + EncoderAdaptor sourceAdaptor = mock(EncoderAdaptor.class); + EncoderAdaptor targetAdaptor = mock(EncoderAdaptor.class); + Muxer targetMuxer = mock(Muxer.class); + Queue emptyAdaptors = new LinkedList<>(); + + when(sourceAdaptor.addMuxer(any(MutedPacketReceiver.class))).thenReturn(true); + when(targetAdaptor.getMuxerList()).thenReturn(Arrays.asList(targetMuxer)); + when(sourceAdaptor.getStreamAdaptorList()).thenReturn(emptyAdaptors); + when(targetAdaptor.getStreamAdaptorList()).thenReturn(emptyAdaptors); + + MutedStreamManager manager = new MutedStreamManager(sourceAdaptor, targetAdaptor); + + assertTrue(manager.start()); + + verify(sourceAdaptor).addMuxer(any(MutedPacketReceiver.class)); + + manager.stop(); + + verify(sourceAdaptor).removeMuxer(any(MutedPacketReceiver.class)); + } + + @Test + public void testStartAttachesMatchingRenditionReceiverAndStopDetachesIt() { + EncoderAdaptor sourceAdaptor = mock(EncoderAdaptor.class); + EncoderAdaptor targetAdaptor = mock(EncoderAdaptor.class); + StreamAdaptor sourceStreamAdaptor = mock(StreamAdaptor.class); + StreamAdaptor targetStreamAdaptor = mock(StreamAdaptor.class); + VideoEncoder sourceVideoEncoder = mock(VideoEncoder.class); + VideoEncoder targetVideoEncoder = mock(VideoEncoder.class); + Muxer targetMuxer = mock(Muxer.class); + Queue sourceAdaptors = new LinkedList<>(); + Queue targetAdaptors = new LinkedList<>(); + + sourceAdaptors.add(sourceStreamAdaptor); + targetAdaptors.add(targetStreamAdaptor); + + when(targetAdaptor.getMuxerList()).thenReturn(Collections.emptyList()); + when(sourceAdaptor.getStreamAdaptorList()).thenReturn(sourceAdaptors); + when(targetAdaptor.getStreamAdaptorList()).thenReturn(targetAdaptors); + when(sourceStreamAdaptor.getVideoEncoderList()).thenReturn(Arrays.asList(sourceVideoEncoder)); + when(targetStreamAdaptor.getVideoEncoderList()).thenReturn(Arrays.asList(targetVideoEncoder)); + when(sourceVideoEncoder.getResolutionHeight()).thenReturn(360); + when(targetVideoEncoder.getResolutionHeight()).thenReturn(360); + when(targetVideoEncoder.getMuxerList()).thenReturn(Arrays.asList(targetMuxer)); + + MutedStreamManager manager = new MutedStreamManager(sourceAdaptor, targetAdaptor); + + assertTrue(manager.start()); + + verify(sourceVideoEncoder).addMuxer(any(MutedPacketReceiver.class)); + + manager.stop(); + + verify(sourceVideoEncoder).removeMuxer(any(MutedPacketReceiver.class)); + verify(sourceAdaptor, never()).removeMuxer(any(MutedPacketReceiver.class)); + } + + @Test + public void testStartReturnsFalseWhenNoReceiversCanBeAttached() { + EncoderAdaptor sourceAdaptor = mock(EncoderAdaptor.class); + EncoderAdaptor targetAdaptor = mock(EncoderAdaptor.class); + + when(targetAdaptor.getMuxerList()).thenReturn(Collections.emptyList()); + when(sourceAdaptor.getStreamAdaptorList()).thenReturn(new LinkedList<>()); + when(targetAdaptor.getStreamAdaptorList()).thenReturn(new LinkedList<>()); + + assertFalse(new MutedStreamManager(sourceAdaptor, targetAdaptor).start()); + verify(sourceAdaptor, never()).addMuxer(any(MutedPacketReceiver.class)); + } +}