Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix RTMP timestamp overflow after 24 days of non-stop streaming #6559

Merged
merged 4 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 46 additions & 2 deletions src/main/java/io/antmedia/muxer/MuxAdaptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ public class MuxAdaptor implements IRecordingListener, IEndpointStatusListener {
*/
private Deque<PacketTime> packetTimeList = new ConcurrentLinkedDeque<>();

private long lastDTS = -1;
private int overflowCount = 0;

public boolean addID3Data(String data) {
for (Muxer muxer : muxerList) {
if(muxer instanceof HLSMuxer) {
Expand Down Expand Up @@ -294,7 +297,7 @@ public PacketTime(long packetTimeMs, long systemTimeMs) {
private BytePointer audioExtraDataPointer;
private BytePointer videoExtraDataPointer;
private AtomicLong endpointStatusUpdaterTimer = new AtomicLong(-1l);
private ConcurrentHashMap<String, String> endpointStatusUpdateMap = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, String> endpointStatusUpdateMap = new ConcurrentHashMap<>();

protected PacketFeeder packetFeeder;

Expand Down Expand Up @@ -1060,6 +1063,32 @@ public DataStore getDataStore() {
return dataStore;
}


public long correctPacketDtsOverflow(long packetDts) {
/*
* Continuous RTMP streaming for approximately 24 days can cause the DTS values to overflow
* and reset to 0 once they reach the maximum value for a signed integer.
* This method handles the overflow by continuing to increment the DTS values as if they hadn't reset,
* ensuring that the timestamps remain consistent and do not start over from 0.
* If this correction is not applied, errors occur when writing to the HLS muxer, leading to a halt in .ts generation.
*/


if (lastDTS > packetDts) {

if (lastDTS > (packetDts + (long) overflowCount * Integer.MAX_VALUE)) {
overflowCount++;
}

packetDts = packetDts + (long) overflowCount * Integer.MAX_VALUE;
}

lastDTS = packetDts;

return packetDts;

}

/**
* This is the entrance points for the packet coming from the RTMP stream.
* It's directly used in EncoderAdaptor in Enterprise
Expand All @@ -1070,7 +1099,9 @@ public DataStore getDataStore() {
*/
public void writeStreamPacket(IStreamPacket packet)
{
long dts = Integer.toUnsignedLong(packet.getTimestamp());
//RTMPProtocolDecoder overflows after 24 days(Integer.MAX_Value) of continuous streaming and it starts from zero again.
//According to the protocol it should overflow after 49 days. Anyway, we fix the overflow here
long dts = correctPacketDtsOverflow(packet.getTimestamp());

if (packet.getDataType() == Constants.TYPE_VIDEO_DATA)
{
Expand Down Expand Up @@ -2707,6 +2738,19 @@ public void setWidth(int width) {
this.width = width;
}

public long getLastDTS() {
return lastDTS;
}

public int getOverflowCount() {
return overflowCount;
}

public PacketFeeder getPacketFeeder() {
return packetFeeder;
}


public void setTotalByteReceived(long totalByteReceived) {
this.totalByteReceived = totalByteReceived;
}
Expand Down
204 changes: 194 additions & 10 deletions src/test/java/io/antmedia/test/MuxerUnitTest.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
package io.antmedia.test;


import static org.bytedeco.ffmpeg.global.avcodec.*;
import static org.bytedeco.ffmpeg.global.avcodec.AV_CODEC_ID_AAC;
import static org.bytedeco.ffmpeg.global.avcodec.AV_CODEC_ID_AC3;
import static org.bytedeco.ffmpeg.global.avcodec.AV_CODEC_ID_H264;
import static org.bytedeco.ffmpeg.global.avcodec.AV_CODEC_ID_H265;
import static org.bytedeco.ffmpeg.global.avcodec.AV_CODEC_ID_HCA;
import static org.bytedeco.ffmpeg.global.avcodec.AV_CODEC_ID_HEVC;
import static org.bytedeco.ffmpeg.global.avcodec.AV_CODEC_ID_MP3;
import static org.bytedeco.ffmpeg.global.avcodec.AV_CODEC_ID_NONE;
import static org.bytedeco.ffmpeg.global.avcodec.AV_CODEC_ID_VP8;
import static org.bytedeco.ffmpeg.global.avcodec.AV_PKT_FLAG_KEY;
import static org.bytedeco.ffmpeg.global.avcodec.av_init_packet;
import static org.bytedeco.ffmpeg.global.avcodec.av_packet_alloc;
import static org.bytedeco.ffmpeg.global.avcodec.av_packet_free;
import static org.bytedeco.ffmpeg.global.avcodec.av_packet_unref;
import static org.bytedeco.ffmpeg.global.avformat.AVFMT_NOFILE;
import static org.bytedeco.ffmpeg.global.avformat.av_read_frame;
import static org.bytedeco.ffmpeg.global.avformat.av_stream_get_side_data;
import static org.bytedeco.ffmpeg.global.avformat.*;
import static org.bytedeco.ffmpeg.global.avformat.avformat_alloc_output_context2;
import static org.bytedeco.ffmpeg.global.avformat.avformat_close_input;
import static org.bytedeco.ffmpeg.global.avformat.avformat_find_stream_info;
import static org.bytedeco.ffmpeg.global.avformat.avformat_free_context;
import static org.bytedeco.ffmpeg.global.avformat.*;
import static org.bytedeco.ffmpeg.global.avformat.avformat_open_input;
import static org.bytedeco.ffmpeg.global.avutil.AVMEDIA_TYPE_ATTACHMENT;
import static org.bytedeco.ffmpeg.global.avutil.AVMEDIA_TYPE_AUDIO;
import static org.bytedeco.ffmpeg.global.avutil.AVMEDIA_TYPE_DATA;
Expand Down Expand Up @@ -60,7 +73,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import io.antmedia.EncoderSettings;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.lang3.RandomUtils;
import org.apache.mina.core.buffer.IoBuffer;
Expand Down Expand Up @@ -104,7 +116,6 @@
import org.red5.io.flv.impl.FLVReader;
import org.red5.io.flv.impl.Tag;
import org.red5.io.object.DataTypes;
import org.red5.server.api.IContext;
import org.red5.server.api.scope.IScope;
import org.red5.server.api.stream.IStreamCapableConnection;
import org.red5.server.api.stream.IStreamPacket;
Expand All @@ -124,14 +135,15 @@
import org.red5.server.stream.VideoCodecFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.annotation.DirtiesContext.ClassMode;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
import org.springframework.test.util.ReflectionTestUtils;

import io.antmedia.AntMediaApplicationAdapter;
import io.antmedia.AppSettings;
import io.antmedia.EncoderSettings;
import io.antmedia.RecordType;
import io.antmedia.datastore.db.DataStore;
import io.antmedia.datastore.db.DataStoreFactory;
Expand All @@ -143,11 +155,11 @@
import io.antmedia.integration.AppFunctionalV2Test;
import io.antmedia.integration.MuxingTest;
import io.antmedia.muxer.HLSMuxer;
import io.antmedia.muxer.RecordMuxer;
import io.antmedia.muxer.IAntMediaStreamHandler;
import io.antmedia.muxer.Mp4Muxer;
import io.antmedia.muxer.MuxAdaptor;
import io.antmedia.muxer.Muxer;
import io.antmedia.muxer.RecordMuxer;
import io.antmedia.muxer.RtmpMuxer;
import io.antmedia.muxer.WebMMuxer;
import io.antmedia.muxer.parser.AACConfigParser;
Expand All @@ -164,7 +176,6 @@
import io.antmedia.test.utils.VideoInfo;
import io.antmedia.test.utils.VideoProber;
import io.vertx.core.Vertx;
import org.springframework.test.util.ReflectionTestUtils;

@ContextConfiguration(locations = {"test.xml"})
//@ContextConfiguration(classes = {AppConfig.class})
Expand Down Expand Up @@ -5486,8 +5497,6 @@ public void testSetSEIData() {

}



}

@Test
Expand All @@ -5496,4 +5505,179 @@ public void testRecordingWithRecordingSubfolder() {
testMp4Muxing("record" + RandomUtils.nextInt(0, 10000));
}

@Test
public void testRtmpDtsOverflow() {

if (appScope == null) {
appScope = (WebScope) applicationContext.getBean("web.scope");
logger.debug("Application / web scope: {}", appScope);
assertTrue(appScope.getDepth() == 1);
}

ClientBroadcastStream clientBroadcastStream = mock(ClientBroadcastStream.class);
MuxAdaptor muxAdaptor = Mockito.spy(MuxAdaptor.initializeMuxAdaptor(clientBroadcastStream, null, false, appScope));
PacketFeeder packetFeeder = new PacketFeeder("test");
muxAdaptor.setPacketFeeder(packetFeeder);

muxAdaptor.setVideoStreamIndex(0);
muxAdaptor.setAudioStreamIndex(1);

HLSMuxer hlsMuxer = mock(HLSMuxer.class);
muxAdaptor.addMuxer(hlsMuxer);

muxAdaptor.setEnableAudio(true);
muxAdaptor.setEnableVideo(true);

ByteBuffer byteBuffer = mock(ByteBuffer.class);
IoBuffer ioBuffer = mock(IoBuffer.class);
when(ioBuffer.limit()).thenReturn(1024);
when(ioBuffer.buf()).thenReturn(byteBuffer);
when(byteBuffer.position(2)).thenReturn(ByteBuffer.allocateDirect(3));
when(byteBuffer.position(5)).thenReturn(ByteBuffer.allocateDirect(3));


when(ioBuffer.position(0)).thenReturn(ioBuffer);
when(ioBuffer.position(2)).thenReturn(ioBuffer);
when(ioBuffer.position(3)).thenReturn(ioBuffer);

ByteBuffer directByteBuffer = ByteBuffer.allocateDirect(1024-2);
directByteBuffer.put(ioBuffer.buf().position(2));
directByteBuffer.position(0);

ByteBuffer directByteBufferVideo = ByteBuffer.allocateDirect(1024-5);
directByteBufferVideo.put(ioBuffer.buf().position(2));
directByteBufferVideo.position(3);

//audio packets
IStreamPacket audioPacket1 = mock(IStreamPacket.class);
when(audioPacket1.getDataType()).thenReturn(Constants.TYPE_AUDIO_DATA);
when(audioPacket1.getTimestamp()).thenReturn(2147483584);
when(audioPacket1.getData()).thenReturn(ioBuffer);

IStreamPacket audioPacket2 = mock(IStreamPacket.class);
when(audioPacket2.getDataType()).thenReturn(Constants.TYPE_AUDIO_DATA);
when(audioPacket2.getTimestamp()).thenReturn(2147483604);
when(audioPacket2.getData()).thenReturn(ioBuffer);

IStreamPacket audioPacket3 = mock(IStreamPacket.class);
when(audioPacket3.getDataType()).thenReturn(Constants.TYPE_AUDIO_DATA);
when(audioPacket3.getTimestamp()).thenReturn(2147483627);
when(audioPacket3.getData()).thenReturn(ioBuffer);

IStreamPacket audioPacket4 = mock(IStreamPacket.class);
when(audioPacket4.getDataType()).thenReturn(Constants.TYPE_AUDIO_DATA);
when(audioPacket4.getTimestamp()).thenReturn(2147483628);
when(audioPacket4.getData()).thenReturn(ioBuffer);

IStreamPacket audioPacketOverflowed = mock(IStreamPacket.class);
when(audioPacketOverflowed.getDataType()).thenReturn(Constants.TYPE_AUDIO_DATA);
when(audioPacketOverflowed.getTimestamp()).thenReturn(24);
when(audioPacketOverflowed.getData()).thenReturn(ioBuffer);

//video packets
IStreamPacket videoPacket1 = mock(CachedEvent.class);
when(videoPacket1.getDataType()).thenReturn(Constants.TYPE_VIDEO_DATA);
when(videoPacket1.getTimestamp()).thenReturn(2147483579);
when(videoPacket1.getData()).thenReturn(ioBuffer);

IStreamPacket videoPacket2 = mock(CachedEvent.class);
when(videoPacket2.getDataType()).thenReturn(Constants.TYPE_VIDEO_DATA);
when(videoPacket2.getTimestamp()).thenReturn(2147483613);
when(videoPacket2.getData()).thenReturn(ioBuffer);

IStreamPacket videoPacket3 = mock(CachedEvent.class);
when(videoPacket3.getDataType()).thenReturn(Constants.TYPE_VIDEO_DATA);
when(videoPacket3.getTimestamp()).thenReturn(2147483646);
when(videoPacket3.getData()).thenReturn(ioBuffer);

IStreamPacket videoPacket4 = mock(CachedEvent.class);
when(videoPacket4.getDataType()).thenReturn(Constants.TYPE_VIDEO_DATA);
when(videoPacket4.getTimestamp()).thenReturn(2147483647);
when(videoPacket4.getData()).thenReturn(ioBuffer);

IStreamPacket videoPacketOverflowed = mock(CachedEvent.class);

when(videoPacketOverflowed.getDataType()).thenReturn(Constants.TYPE_VIDEO_DATA);
when(videoPacketOverflowed.getTimestamp()).thenReturn(65);
when(videoPacketOverflowed.getData()).thenReturn(ioBuffer);


muxAdaptor.writeStreamPacket(audioPacket1);
int overFlowCount = muxAdaptor.getOverflowCount();
assertEquals(0, overFlowCount);
long lastAudioDts = muxAdaptor.getLastDTS();
assertEquals(lastAudioDts, audioPacket1.getTimestamp());

muxAdaptor.writeStreamPacket(videoPacket1);
overFlowCount = muxAdaptor.getOverflowCount();
assertEquals(1, overFlowCount);
long lastVideoDts = muxAdaptor.getLastDTS();
assertEquals(lastVideoDts, videoPacket1.getTimestamp() + (long) overFlowCount * Integer.MAX_VALUE);

muxAdaptor.writeStreamPacket(audioPacket2);
overFlowCount = muxAdaptor.getOverflowCount();
assertEquals(1, overFlowCount);
lastAudioDts = muxAdaptor.getLastDTS();
assertEquals(lastAudioDts, audioPacket2.getTimestamp() + (long) overFlowCount * Integer.MAX_VALUE);

muxAdaptor.writeStreamPacket(videoPacket2);
overFlowCount = muxAdaptor.getOverflowCount();
assertEquals(1, overFlowCount);
lastVideoDts = muxAdaptor.getLastDTS();
assertEquals(lastVideoDts, videoPacket2.getTimestamp() + (long) overFlowCount * Integer.MAX_VALUE);


verify(hlsMuxer,times(1)).writeAudioBuffer(directByteBuffer,1, audioPacket2.getTimestamp() + (long) overFlowCount * Integer.MAX_VALUE );
verify(hlsMuxer,times(1)).writeVideoBuffer(directByteBufferVideo, videoPacket2.getTimestamp() + (long) overFlowCount * Integer.MAX_VALUE, 0, 0, false, 0, videoPacket2.getTimestamp() + (long) overFlowCount * Integer.MAX_VALUE);

muxAdaptor.writeStreamPacket(audioPacket3);
overFlowCount = muxAdaptor.getOverflowCount();
assertEquals(1, overFlowCount);
lastAudioDts = muxAdaptor.getLastDTS();
assertEquals(lastAudioDts, audioPacket3.getTimestamp() + (long) overFlowCount * Integer.MAX_VALUE);

muxAdaptor.writeStreamPacket(videoPacket3);
overFlowCount = muxAdaptor.getOverflowCount();
assertEquals(1, overFlowCount);
lastVideoDts = muxAdaptor.getLastDTS();
assertEquals(lastVideoDts, videoPacket3.getTimestamp() + (long) overFlowCount * Integer.MAX_VALUE);

verify(hlsMuxer,times(1)).writeAudioBuffer(directByteBuffer,1, audioPacket3.getTimestamp() + (long) overFlowCount * Integer.MAX_VALUE);

directByteBufferVideo.position(0);
verify(hlsMuxer,times(1)).writeVideoBuffer(directByteBufferVideo, videoPacket3.getTimestamp() + (long) overFlowCount * Integer.MAX_VALUE, 0, 0, false, 0,
videoPacket3.getTimestamp() + (long) overFlowCount * Integer.MAX_VALUE);

muxAdaptor.writeStreamPacket(audioPacket4);
overFlowCount = muxAdaptor.getOverflowCount();
assertEquals(2, overFlowCount);
lastAudioDts = muxAdaptor.getLastDTS();
assertEquals(lastAudioDts, audioPacket4.getTimestamp()+(long) overFlowCount * Integer.MAX_VALUE);

muxAdaptor.writeStreamPacket(videoPacket4);
overFlowCount = muxAdaptor.getOverflowCount();
assertEquals(2, overFlowCount);
lastVideoDts = muxAdaptor.getLastDTS();
assertEquals(lastVideoDts, videoPacket4.getTimestamp() + (long) overFlowCount * Integer.MAX_VALUE);

verify(hlsMuxer,times(1)).writeAudioBuffer(directByteBuffer,1, audioPacket4.getTimestamp() + (long) overFlowCount * Integer.MAX_VALUE);
verify(hlsMuxer,times(1)).writeVideoBuffer(directByteBufferVideo, videoPacket4.getTimestamp() + (long) overFlowCount * Integer.MAX_VALUE, 0, 0, false, 0,
videoPacket4.getTimestamp() + (long) overFlowCount * Integer.MAX_VALUE);

muxAdaptor.writeStreamPacket(audioPacketOverflowed);
overFlowCount = muxAdaptor.getOverflowCount();
assertEquals(3, overFlowCount);
lastAudioDts = muxAdaptor.getLastDTS();
assertEquals(lastAudioDts, audioPacketOverflowed.getTimestamp() + (long) overFlowCount * Integer.MAX_VALUE);

muxAdaptor.writeStreamPacket(videoPacketOverflowed);
overFlowCount = muxAdaptor.getOverflowCount();
assertEquals(3, overFlowCount);
lastVideoDts = muxAdaptor.getLastDTS();
assertEquals(lastVideoDts, videoPacketOverflowed.getTimestamp() + (long) overFlowCount * Integer.MAX_VALUE);

verify(hlsMuxer,times(1)).writeAudioBuffer(directByteBuffer,1, lastAudioDts);
verify(hlsMuxer,times(1)).writeVideoBuffer(directByteBufferVideo, lastVideoDts, 0, 0, false, 0, lastVideoDts);
}

}
Loading