Fix messaging hang by output

This commit is contained in:
Alexandre Iooss 2020-10-18 15:13:13 +02:00
parent 09a3422d06
commit 37d944621b
No known key found for this signature in database
GPG Key ID: 6C79278F3FCDCC02
3 changed files with 19 additions and 23 deletions

View File

@ -16,7 +16,7 @@ type Stream struct {
// Count clients for statistics // Count clients for statistics
nbClients int nbClients int
// Mutex to lock this ressource // Mutex to lock outputs map
lock sync.Mutex lock sync.Mutex
} }
@ -33,28 +33,28 @@ func New() *Stream {
func (s *Stream) run(broadcast <-chan []byte) { func (s *Stream) run(broadcast <-chan []byte) {
for msg := range broadcast { for msg := range broadcast {
func() { s.lock.Lock()
s.lock.Lock() for output := range s.outputs {
defer s.lock.Unlock() select {
for output := range s.outputs { case output <- msg:
select { default:
case output <- msg: // If full, do a ring buffer
default: // Check that output is not of size zero
// If full, do a ring buffer if len(output) > 1 {
<-output <-output
output <- msg
} }
} }
}() }
s.lock.Unlock()
} }
// Incoming chan has been closed, close all outputs // Incoming chan has been closed, close all outputs
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock()
for ch := range s.outputs { for ch := range s.outputs {
delete(s.outputs, ch) delete(s.outputs, ch)
close(ch) close(ch)
} }
s.lock.Unlock()
} }
// Close the incoming chan, this will also delete all outputs // Close the incoming chan, this will also delete all outputs
@ -63,7 +63,6 @@ func (s *Stream) Close() {
} }
// Register a new output on a stream. // Register a new output on a stream.
// If hidden in true, then do not count this client.
func (s *Stream) Register(output chan []byte) { func (s *Stream) Register(output chan []byte) {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()

View File

@ -21,12 +21,12 @@ func handleStreamer(socket *srtgo.SrtSocket, streams map[string]*stream.Stream,
st := stream.New() st := stream.New()
streams[name] = st streams[name] = st
// Create a new buffer
// UDP packet cannot be larger than MTU (1500)
buff := make([]byte, 1500)
// Read RTP packets forever and send them to the WebRTC Client // Read RTP packets forever and send them to the WebRTC Client
for { for {
// Create a new buffer
// UDP packet cannot be larger than MTU (1500)
buff := make([]byte, 1500)
// 5s timeout // 5s timeout
n, err := socket.Read(buff, 5000) n, err := socket.Read(buff, 5000)
if err != nil { if err != nil {
@ -41,11 +41,8 @@ func handleStreamer(socket *srtgo.SrtSocket, streams map[string]*stream.Stream,
} }
// Send raw data to other streams // Send raw data to other streams
// Copy data in another buffer to ensure that the data would not be overwritten buff = buff[:n]
// FIXME: might be unnecessary st.Broadcast <- buff
data := make([]byte, n)
copy(data, buff[:n])
st.Broadcast <- data
} }
// Close stream // Close stream

View File

@ -63,7 +63,7 @@ func Init(streams map[string]*stream.Stream, cfg *Options) {
// Convert video to ANSI text // Convert video to ANSI text
func transcode(input, output *stream.Stream, cfg *Options) { func transcode(input, output *stream.Stream, cfg *Options) {
// Start ffmpeg to transcode video to rawvideo // Start ffmpeg to transcode video to rawvideo
videoInput := make(chan []byte) videoInput := make(chan []byte, 1024)
input.Register(videoInput) input.Register(videoInput)
ffmpeg, rawvideo, err := startFFmpeg(videoInput, cfg) ffmpeg, rawvideo, err := startFFmpeg(videoInput, cfg)
if err != nil { if err != nil {