From c317d91b8d26380ff0ea5667704d4f65070af02e Mon Sep 17 00:00:00 2001 From: Alexandre Iooss Date: Mon, 19 Oct 2020 19:28:30 +0200 Subject: [PATCH] Update package forwarding with Quality structure --- main.go | 4 +-- stream/forwarding/forwarding.go | 51 +++++++++++++++++----------- stream/forwarding/forwarding_test.go | 4 +-- 3 files changed, 35 insertions(+), 24 deletions(-) diff --git a/main.go b/main.go index 87d0c41..b45cc22 100644 --- a/main.go +++ b/main.go @@ -10,7 +10,7 @@ import ( "gitlab.crans.org/nounous/ghostream/auth" "gitlab.crans.org/nounous/ghostream/internal/config" "gitlab.crans.org/nounous/ghostream/internal/monitoring" - "gitlab.crans.org/nounous/ghostream/stream" + "gitlab.crans.org/nounous/ghostream/messaging" "gitlab.crans.org/nounous/ghostream/stream/forwarding" "gitlab.crans.org/nounous/ghostream/stream/srt" "gitlab.crans.org/nounous/ghostream/stream/telnet" @@ -46,7 +46,7 @@ func main() { localSdpChan := make(chan webrtc.SessionDescription) // Init streams messaging - streams := make(map[string]*stream.Stream) + streams := messaging.New() // Start routines go transcoder.Init(streams, &cfg.Transcoder) diff --git a/stream/forwarding/forwarding.go b/stream/forwarding/forwarding.go index 93c7cfa..f4fd157 100644 --- a/stream/forwarding/forwarding.go +++ b/stream/forwarding/forwarding.go @@ -5,9 +5,8 @@ import ( "bufio" "log" "os/exec" - "time" - "gitlab.crans.org/nounous/ghostream/stream" + "gitlab.crans.org/nounous/ghostream/messaging" ) // Options to configure the stream forwarding. @@ -15,37 +14,49 @@ import ( type Options map[string][]string // Serve handles incoming packets from SRT and forward them to other external services -func Serve(streams map[string]*stream.Stream, cfg Options) { +func Serve(streams *messaging.Streams, cfg Options) { if len(cfg) < 1 { // No forwarding, ignore return } + // Subscribe to new stream event + event := make(chan string, 8) + streams.Subscribe(event) log.Printf("Stream forwarding initialized") - for { - for name, st := range streams { - fwdCfg, ok := cfg[name] - if !ok { - // Not configured - continue - } - // Start forwarding - log.Printf("Starting forwarding for '%s'", name) - go forward(st, fwdCfg) + // For each new stream + for name := range event { + streamCfg, ok := cfg[name] + if !ok { + // Not configured + continue } - // Regulary pull stream list, - // it may be better to tweak the messaging system - // to get an event on a new stream. - time.Sleep(time.Second) + // Get stream + stream, err := streams.Get(name) + if err != nil { + log.Printf("Failed to get stream '%s'", name) + } + + // Get specific quality + // FIXME: make it possible to forward other qualities + qualityName := "source" + quality, err := stream.GetQuality(qualityName) + if err != nil { + log.Printf("Failed to get quality '%s'", qualityName) + } + + // Start forwarding + log.Printf("Starting forwarding for '%s' quality '%s'", name, qualityName) + go forward(quality, streamCfg) } } // Start a FFMPEG instance and redirect stream output to forwarded streams -func forward(st *stream.Stream, fwdCfg []string) { +func forward(q *messaging.Quality, fwdCfg []string) { output := make(chan []byte, 1024) - st.Register(output) + q.Register(output) // Launch FFMPEG instance params := []string{"-hide_banner", "-loglevel", "error", "-re", "-i", "pipe:0"} @@ -77,7 +88,7 @@ func forward(st *stream.Stream, fwdCfg []string) { _ = input.Close() _ = errOutput.Close() _ = ffmpeg.Process.Kill() - st.Unregister(output) + q.Unregister(output) }() // Log standard error output diff --git a/stream/forwarding/forwarding_test.go b/stream/forwarding/forwarding_test.go index 1f6ea6a..97d92c3 100644 --- a/stream/forwarding/forwarding_test.go +++ b/stream/forwarding/forwarding_test.go @@ -6,7 +6,7 @@ import ( "testing" "time" - "gitlab.crans.org/nounous/ghostream/stream" + "gitlab.crans.org/nounous/ghostream/messaging" "gitlab.crans.org/nounous/ghostream/stream/srt" ) @@ -35,7 +35,7 @@ func TestForwardStream(t *testing.T) { cfg["demo"] = []string{"rtmp://127.0.0.1:1936/live/app"} // Register forwarding stream list - streams := make(map[string]*stream.Stream) + streams := messaging.New() go Serve(streams, cfg) // Serve SRT Server without authentification backend