Skip to content

Commit

Permalink
start using buffer between incoming and destination
Browse files Browse the repository at this point in the history
  • Loading branch information
geekgonecrazy committed Jun 21, 2023
1 parent 74873de commit 29832ee
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 7 deletions.
27 changes: 21 additions & 6 deletions rtmpConnectionHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,27 @@ func rtmpConnectionHandler(conn *rtmp.Conn) {
os.Exit(1)
}

// Mark session as active and stash headers for replay on new destinations
session.ChangeState(true) // Mark active
// stash headers for replay on new destinations
session.SetHeaders(streams)

// If a delay is specified then we need to inspect first packet and set the buffer size
if session.Delay > 0 {
packet, err := conn.ReadPacket()
if err != nil {
fmt.Println("can't read packet:", err)
}

delaySeconds := time.Duration(session.Delay) * time.Second

packetsNeeded := delaySeconds / packet.CompositionTime

session.SetBufferSize(int(packetsNeeded))

session.RelayPacket(packet)
}

go session.ForwardPackets()

log.Println("RTMP connection now active for session", key)

for _, destination := range session.Destinations {
Expand All @@ -68,14 +85,12 @@ func rtmpConnectionHandler(conn *rtmp.Conn) {
break
}

session.RelayPacket(packet)

if time.Since(lastTime) > time.Second {
fmt.Println("Duration:", packet.Time)
lastTime = time.Now()
}

for _, destination := range session.Destinations {
destination.RTMP.WritePacket(packet)
}
}

session.ChangeState(false) // Mark inactive
Expand Down
41 changes: 40 additions & 1 deletion sessions/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ type Session struct {
Active bool `json:"active"`
End bool `json:"end"`
StreamHeaders []av.CodecData `json:"streamHeaders"`
_lock sync.Mutex // Might need if we allow modify
buffer chan av.Packet
desiredBufferSize int
bufferFilled bool
_lock sync.Mutex // Might need if we allow modify
}

type Destination struct {
Expand Down Expand Up @@ -111,6 +114,41 @@ func (s *Session) SetHeaders(streams []av.CodecData) {
s.StreamHeaders = streams
}

func (s *Session) ForwardPackets() {
s.ChangeState(true)

// Wait until buffer filled before we start flushing
for {
if !s.bufferFilled && len(s.buffer) < s.desiredBufferSize {
s.bufferFilled = true
continue
}

break
}

for p := range s.buffer {
for _, destination := range s.Destinations {
destination.RTMP.WritePacket(p)
}
}
}

func (s *Session) SetBufferSize(size int) error {
if s.Active {
log.Println("Can't set buffer size while session is active")
return nil
}

s.buffer = make(chan av.Packet, size)

return nil
}

func (s *Session) RelayPacket(p av.Packet) {
s.buffer <- p
}

func (s *Session) EndSession() {
s.End = true
}
Expand All @@ -131,6 +169,7 @@ func CreateSession(sessionPayload models.SessionPayload) error {
}

session := &Session{
buffer: make(chan av.Packet, 2),
StreamerID: sessionPayload.StreamerID,
Key: sessionPayload.Key,
Destinations: map[int]*Destination{},
Expand Down

0 comments on commit 29832ee

Please sign in to comment.