Skip to content

Commit

Permalink
Implement RTMP buffer on the session
Browse files Browse the repository at this point in the history
  • Loading branch information
geekgonecrazy committed Jun 23, 2023
1 parent 29832ee commit 5c771cd
Show file tree
Hide file tree
Showing 6 changed files with 1,474 additions and 783 deletions.
2 changes: 1 addition & 1 deletion controllers/myStreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func GetMyStreamerHandler(c echo.Context) error {

session, _ := sessions.GetSession(streamKey)

if session != nil && session.Active {
if session != nil && session.Running {
myStreamer.Live = true
}

Expand Down
69 changes: 13 additions & 56 deletions rtmpConnectionHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"log"
"os"
"strings"
"time"

"github.com/geekgonecrazy/prismplus/sessions"
"github.com/geekgonecrazy/prismplus/streamers"
Expand Down Expand Up @@ -41,77 +40,35 @@ func rtmpConnectionHandler(conn *rtmp.Conn) {
os.Exit(1)
}

// stash headers for replay on new destinations
session.SetHeaders(streams)

// If a delay is specified then we need to inspect first packet and set the buffer size
if session.Delay > 0 {
packet, err := conn.ReadPacket()
if err != nil {
fmt.Println("can't read packet:", err)
}

delaySeconds := time.Duration(session.Delay) * time.Second

packetsNeeded := delaySeconds / packet.CompositionTime
packet, err := conn.ReadPacket()
if err != nil {
fmt.Println("can't read packet:", err)
}

session.SetBufferSize(int(packetsNeeded))
session.SetBufferSize(packet)

session.RelayPacket(packet)
}
// stash headers for replay on new destinations
session.SetHeaders(streams)

go session.ForwardPackets()
go session.Run()

log.Println("RTMP connection now active for session", key)

for _, destination := range session.Destinations {
if err := destination.RTMP.WriteHeader(streams); err != nil {
fmt.Println("can't write header to destination stream:", err)
// os.Exit(1)
}
go destination.RTMP.Loop()
}

lastTime := time.Now()
for {
if session.End {
fmt.Printf("Ending session %s\n", key)
break
}

packet, err := conn.ReadPacket()
if err != nil {
fmt.Println("can't read packet:", err)
break
}

session.RelayPacket(packet)

if time.Since(lastTime) > time.Second {
fmt.Println("Duration:", packet.Time)
lastTime = time.Now()
}
}

session.ChangeState(false) // Mark inactive
session.StreamDisconnected()
log.Println("Not processing any more. RTMP relaying stopped")

for _, destination := range session.Destinations {
err := destination.RTMP.Disconnect()
if err != nil {
fmt.Println(err)
os.Exit(1)
}
}

if session.End {
fmt.Printf("Session %s ended\n", key)
// Make sure we are closed
if err := conn.Close(); err != nil {
log.Println(err)
}

if err := sessions.DeleteSession(key); err != nil {
log.Println(err)
}
// Make sure we are closed
if err := conn.Close(); err != nil {
log.Println(err)
}
}
203 changes: 166 additions & 37 deletions sessions/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"
"strings"
"sync"
"time"

"github.com/geekgonecrazy/prismplus/models"
"github.com/geekgonecrazy/prismplus/rtmp"
Expand All @@ -17,19 +18,35 @@ var (
ErrNotFound = errors.New("not found")
)

type StreamState uint8

const (
StreamCreated StreamState = iota
StreamPacketReceived
StreamBuffering
StreamStreaming
StreamDisconnected
)

type Session struct {
StreamerID int `json:"streamerId"`
Key string `json:"key"`
Destinations map[int]*Destination `json:"destinations"`
Delay int `json:"delay"`
NextDestinationID int `json:"nextDestinationId"`
Active bool `json:"active"`
End bool `json:"end"`
StreamHeaders []av.CodecData `json:"streamHeaders"`
buffer chan av.Packet
desiredBufferSize int
bufferFilled bool
_lock sync.Mutex // Might need if we allow modify
StreamerID int `json:"streamerId"`
Key string `json:"key"`
Destinations map[int]*Destination `json:"destinations"`
Delay int `json:"delay"`
NextDestinationID int `json:"nextDestinationId"`
Running bool `json:"running"`
streamHeaders []av.CodecData `json:"streamHeaders"`
streamStatus StreamState
buffer chan av.Packet
incomingDuration time.Duration
previousIncomingDuration time.Duration
bufferedDuration time.Duration
outgoingDuration time.Duration
bufferLength time.Duration
lastPacketTime time.Time //lastPacketTime is currently only used to help with verbose logging, don't use for any logic
discrepancySize time.Duration
stop chan bool
_lock sync.Mutex // Might need if we allow modify
}

type Destination struct {
Expand Down Expand Up @@ -62,8 +79,8 @@ func (s *Session) AddDestination(destinationPayload models.Destination) error {
RTMP: conn,
}

if s.Active {
if err := conn.WriteHeader(s.StreamHeaders); err != nil {
if s.Running {
if err := conn.WriteHeader(s.streamHeaders); err != nil {
fmt.Println("can't write header:", err)
// os.Exit(1)
}
Expand Down Expand Up @@ -106,51 +123,159 @@ func (s *Session) RemoveDestination(id int) error {
return nil
}

func (s *Session) ChangeState(active bool) {
s.Active = active
}

func (s *Session) SetHeaders(streams []av.CodecData) {
s.StreamHeaders = streams
s.streamHeaders = streams
}

func (s *Session) ForwardPackets() {
s.ChangeState(true)
func (s *Session) Run() {
// Prevent another routine from running
if s.Running {
return
}

// Wait until buffer filled before we start flushing
// TODO: Sessions once ended actually shouldn't be removed
// Reset in case the session is reused
s.lastPacketTime = time.Now()
s.bufferedDuration = time.Duration(0)
s.outgoingDuration = time.Duration(0)
s.incomingDuration = time.Duration(0)

s.Running = true
s.streamStatus = StreamBuffering
log.Println("Session switched to buffering")

// Loop waiting for buffering to finish
for {
if !s.bufferFilled && len(s.buffer) < s.desiredBufferSize {
s.bufferFilled = true
continue
if s.streamStatus == StreamDisconnected {
log.Println("Stopping buffer due to disconnect")
break
}

break
if time.Since(s.lastPacketTime) > time.Second {
log.Printf("Buffered packets: %s/%s Buffered Packets: %d", s.bufferedDuration, s.bufferLength, len(s.buffer))
s.lastPacketTime = time.Now()
}

if s.incomingDuration >= s.bufferLength {
log.Println("Session switched to streaming", len(s.buffer))
s.streamStatus = StreamStreaming
break
}
}

for p := range s.buffer {
if s.streamStatus == StreamStreaming {
log.Println("Finished Buffering")

log.Println("Connecting to destinations")
for _, destination := range s.Destinations {
destination.RTMP.WritePacket(p)
if err := destination.RTMP.WriteHeader(s.streamHeaders); err != nil {
fmt.Println("can't write header to destination stream:", err)
// os.Exit(1)
}

go destination.RTMP.Loop()
}

log.Println("Beginning to stream to destinations")

streamedTime := time.Now()

Loop:
for {

select {
case <-s.stop:
break Loop
case packet := <-s.buffer:
// Use the timing in the packet - the time streamed to figure out when next packet should go out
time.Sleep(packet.Time - s.outgoingDuration)

// Verbose logging just to be able to see the state of the buffer
if time.Since(streamedTime) > time.Second {
log.Printf("Outgoing Packet Time: %s (idx %d); Incoming Packet Time: %s; Buffered up to Time: %s; Buffered Packets: %d", packet.Time, packet.Idx, s.incomingDuration, s.bufferedDuration, len(s.buffer))

streamedTime = time.Now()
}

// Write packet out to all destinations
for _, destination := range s.Destinations {
destination.RTMP.WritePacket(packet)
}

// Update with the time marker that has been already streamed
s.outgoingDuration = packet.Time
s.lastPacketTime = time.Now()
}

if s.streamStatus == StreamDisconnected && len(s.buffer) == 0 {
log.Println("Stream is disconnected and buffer is empty. Sending Stop Signal")
s.stop <- true
}
}
}

log.Println("Disconnecting from destinations")
for _, destination := range s.Destinations {
err := destination.RTMP.Disconnect()
if err != nil {
fmt.Println(err)
}
}

s.Running = false

log.Println("Attempt to self destruct session")
if err := DeleteSession(s.Key); err != nil {
log.Println(err)
}
}

func (s *Session) SetBufferSize(size int) error {
if s.Active {
log.Println("Can't set buffer size while session is active")
func (s *Session) SetBufferSize(packet av.Packet) error {
if s.Running && s.streamStatus != StreamCreated {
log.Println("Can't set Packet size for buffer while session is running")
s.RelayPacket(packet)
return nil
}

s.buffer = make(chan av.Packet, size)
packetsNeeded := s.bufferLength / packet.CompositionTime
size := int(packetsNeeded) * 4

log.Println("Packet size:", packet.CompositionTime)

log.Println("Setting buffer size to:", size)

// Need to figure out how to do this properly again with math
s.buffer = make(chan av.Packet, 100000)

s.RelayPacket(packet)

return nil
}

func (s *Session) RelayPacket(p av.Packet) {
packetDuration := p.Time

if s.discrepancySize+packetDuration < s.previousIncomingDuration {
s.discrepancySize = s.bufferedDuration
log.Println("Discrepancy detected correcting", s.discrepancySize)
}

s.incomingDuration = packetDuration

p.Time = s.discrepancySize + packetDuration

s.bufferedDuration = p.Time
s.previousIncomingDuration = p.Time

s.buffer <- p
}

func (s *Session) StreamDisconnected() {
s.streamStatus = StreamDisconnected
}

func (s *Session) EndSession() {
s.End = true
s.stop <- true
}

func InitializeSessionStore() {
Expand All @@ -165,17 +290,21 @@ func CreateSession(sessionPayload models.SessionPayload) error {
}

if existingSession != nil {
return errors.New("Already Exists")
return errors.New("session already Exists")
}

bufferLength := time.Second * time.Duration(sessionPayload.Delay)

session := &Session{
buffer: make(chan av.Packet, 2),
Delay: sessionPayload.Delay,
StreamerID: sessionPayload.StreamerID,
Key: sessionPayload.Key,
Destinations: map[int]*Destination{},
NextDestinationID: 0,
Active: false,
End: false,
Running: false,
buffer: make(chan av.Packet, 1),
stop: make(chan bool, 1),
bufferLength: bufferLength,
}

_sessions[sessionPayload.Key] = session
Expand Down
Loading

0 comments on commit 5c771cd

Please sign in to comment.