From 8883c878bf7fbe36dd685e6f23cb24d3e2da6975 Mon Sep 17 00:00:00 2001 From: Alexandre Iooss Date: Wed, 30 Sep 2020 15:28:19 +0200 Subject: [PATCH] Better error handling in stream forwarder --- main.go | 7 ++---- stream/forwarding/forwarding.go | 42 +++++++++++++++++++-------------- stream/srt/srt.go | 12 +++++++--- 3 files changed, 35 insertions(+), 26 deletions(-) diff --git a/main.go b/main.go index fc76592..eef49c5 100644 --- a/main.go +++ b/main.go @@ -96,11 +96,8 @@ func main() { go web.Serve(remoteSdpChan, localSdpChan, &cfg.Web) go webrtc.Serve(remoteSdpChan, localSdpChan, &cfg.WebRTC) - // Init stream forwarding - err = forwarding.New(&cfg.Forwarding) - if err != nil { - log.Fatalln("Failed to init stream forwarding:", err) - } + // Configure stream forwarding + forwarding.New(cfg.Forwarding) // Wait for routines select {} diff --git a/stream/forwarding/forwarding.go b/stream/forwarding/forwarding.go index a9012ec..bc0b2ad 100644 --- a/stream/forwarding/forwarding.go +++ b/stream/forwarding/forwarding.go @@ -12,56 +12,59 @@ import ( type Options map[string][]string var ( - options Options + cfg Options ffmpegInstances = make(map[string]*exec.Cmd) ffmpegInputStreams = make(map[string]*io.WriteCloser) ) // New Load configuration -func New(cfg *Options) error { - options = *cfg - return nil +func New(c Options) { + cfg = c + log.Printf("Stream forwarding initialized") } // RegisterStream Declare a new open stream and create ffmpeg instances -func RegisterStream(name string) { - if len(options[name]) == 0 { - return +func RegisterStream(name string) error { + streams, exist := cfg[name] + if !exist || len(streams) == 0 { + // Nothing to do, not configured + return nil } + // Launch FFMPEG instance params := []string{"-re", "-i", "pipe:0"} - for _, stream := range options[name] { + for _, stream := range streams { params = append(params, "-f", "flv", "-preset", "ultrafast", "-tune", "zerolatency", "-c", "copy", stream) } - // Launch FFMPEG instance ffmpeg := exec.Command("ffmpeg", params...) // Open pipes input, err := ffmpeg.StdinPipe() if err != nil { - panic(err) + return err } output, err := ffmpeg.StdoutPipe() if err != nil { - panic(err) + return err } errOutput, err := ffmpeg.StderrPipe() if err != nil { - panic(err) + return err } ffmpegInstances[name] = ffmpeg ffmpegInputStreams[name] = &input + // Start FFMpeg if err := ffmpeg.Start(); err != nil { - panic(err) + return err } // Log ffmpeg output go func() { scanner := bufio.NewScanner(output) for scanner.Scan() { - log.Println("[FFMPEG " + name + "] " + scanner.Text()) + log.Printf("[FFMPEG %s] %s", name, scanner.Text()) } }() @@ -69,12 +72,14 @@ func RegisterStream(name string) { go func() { scanner := bufio.NewScanner(errOutput) for scanner.Scan() { - log.Println("[FFMPEG ERROR " + name + "] " + scanner.Text()) + log.Printf("[FFMPEG ERR %s] %s", name, scanner.Text()) } }() + + return nil } -// SendPacket When a SRT packet is received, transmit it to all FFMPEG instances related to the stream key +// SendPacket forward data to all FFMpeg instances related to the stream name func SendPacket(name string, data []byte) { stdin := ffmpegInputStreams[name] _, err := (*stdin).Write(data) @@ -84,11 +89,12 @@ func SendPacket(name string, data []byte) { } // CloseConnection When the stream is ended, close FFMPEG instances -func CloseConnection(name string) { +func CloseConnection(name string) error { ffmpeg := ffmpegInstances[name] if err := ffmpeg.Process.Kill(); err != nil { - panic(err) + return err } delete(ffmpegInstances, name) delete(ffmpegInputStreams, name) + return nil } diff --git a/stream/srt/srt.go b/stream/srt/srt.go index 0f15472..a584d1f 100644 --- a/stream/srt/srt.go +++ b/stream/srt/srt.go @@ -57,21 +57,23 @@ func Serve(cfg *Options) { buff := make([]byte, 2048) // Setup stream forwarding - forwarding.RegisterStream("demo") // FIXME Replace with real stream key + // FIXME: demo should be replaced by stream name + if err := forwarding.RegisterStream("demo"); err != nil { + log.Println("Error occurred during forward stream init:", err) + break + } // Read RTP packets forever and send them to the WebRTC Client for { n, err := s.Read(buff, 10000) if err != nil { log.Println("Error occured while reading SRT socket:", err) - forwarding.CloseConnection("demo") break } if n == 0 { // End of stream log.Printf("Received no bytes, stopping stream.") - forwarding.CloseConnection("demo") break } @@ -84,5 +86,9 @@ func Serve(cfg *Options) { // See https://github.com/ebml-go/webm/blob/master/reader.go //err := videoTrack.WriteSample(media.Sample{Data: data, Samples: uint32(sampleCount)}) } + + if err := forwarding.CloseConnection("demo"); err != nil { + log.Printf("Failed to close forward stream: %s", err) + } } }