From 37d944621bc551b87156412fe5640b0eb05931d9 Mon Sep 17 00:00:00 2001 From: Alexandre Iooss Date: Sun, 18 Oct 2020 15:13:13 +0200 Subject: [PATCH] Fix messaging hang by output --- stream/messaging.go | 25 ++++++++++++------------- stream/srt/handler.go | 15 ++++++--------- transcoder/text/text.go | 2 +- 3 files changed, 19 insertions(+), 23 deletions(-) diff --git a/stream/messaging.go b/stream/messaging.go index 0798d84..4c2c0e7 100644 --- a/stream/messaging.go +++ b/stream/messaging.go @@ -16,7 +16,7 @@ type Stream struct { // Count clients for statistics nbClients int - // Mutex to lock this ressource + // Mutex to lock outputs map lock sync.Mutex } @@ -33,28 +33,28 @@ func New() *Stream { func (s *Stream) run(broadcast <-chan []byte) { for msg := range broadcast { - func() { - s.lock.Lock() - defer s.lock.Unlock() - for output := range s.outputs { - select { - case output <- msg: - default: - // If full, do a ring buffer + s.lock.Lock() + for output := range s.outputs { + select { + case output <- msg: + default: + // If full, do a ring buffer + // Check that output is not of size zero + if len(output) > 1 { <-output - output <- msg } } - }() + } + s.lock.Unlock() } // Incoming chan has been closed, close all outputs s.lock.Lock() - defer s.lock.Unlock() for ch := range s.outputs { delete(s.outputs, ch) close(ch) } + s.lock.Unlock() } // Close the incoming chan, this will also delete all outputs @@ -63,7 +63,6 @@ func (s *Stream) Close() { } // Register a new output on a stream. -// If hidden in true, then do not count this client. func (s *Stream) Register(output chan []byte) { s.lock.Lock() defer s.lock.Unlock() diff --git a/stream/srt/handler.go b/stream/srt/handler.go index df0e324..6dbaa43 100644 --- a/stream/srt/handler.go +++ b/stream/srt/handler.go @@ -21,12 +21,12 @@ func handleStreamer(socket *srtgo.SrtSocket, streams map[string]*stream.Stream, st := stream.New() streams[name] = st - // Create a new buffer - // UDP packet cannot be larger than MTU (1500) - buff := make([]byte, 1500) - // Read RTP packets forever and send them to the WebRTC Client for { + // Create a new buffer + // UDP packet cannot be larger than MTU (1500) + buff := make([]byte, 1500) + // 5s timeout n, err := socket.Read(buff, 5000) if err != nil { @@ -41,11 +41,8 @@ func handleStreamer(socket *srtgo.SrtSocket, streams map[string]*stream.Stream, } // Send raw data to other streams - // Copy data in another buffer to ensure that the data would not be overwritten - // FIXME: might be unnecessary - data := make([]byte, n) - copy(data, buff[:n]) - st.Broadcast <- data + buff = buff[:n] + st.Broadcast <- buff } // Close stream diff --git a/transcoder/text/text.go b/transcoder/text/text.go index 538d0e6..44114f4 100644 --- a/transcoder/text/text.go +++ b/transcoder/text/text.go @@ -63,7 +63,7 @@ func Init(streams map[string]*stream.Stream, cfg *Options) { // Convert video to ANSI text func transcode(input, output *stream.Stream, cfg *Options) { // Start ffmpeg to transcode video to rawvideo - videoInput := make(chan []byte) + videoInput := make(chan []byte, 1024) input.Register(videoInput) ffmpeg, rawvideo, err := startFFmpeg(videoInput, cfg) if err != nil {