From 34200afaed94ca7bd88d8b9c603223c7e7a6084b Mon Sep 17 00:00:00 2001 From: Alexandre Iooss Date: Mon, 19 Oct 2020 19:48:44 +0200 Subject: [PATCH] Update package webrtc with Quality structure --- stream/webrtc/ingest.go | 57 ++++++++++++++++-------------------- stream/webrtc/webrtc.go | 4 +-- stream/webrtc/webrtc_test.go | 4 +-- 3 files changed, 29 insertions(+), 36 deletions(-) diff --git a/stream/webrtc/ingest.go b/stream/webrtc/ingest.go index ccede0d..77a7bc1 100644 --- a/stream/webrtc/ingest.go +++ b/stream/webrtc/ingest.go @@ -6,50 +6,43 @@ import ( "log" "net" "os/exec" - "strings" - "time" "github.com/pion/rtp" "github.com/pion/webrtc/v3" - "gitlab.crans.org/nounous/ghostream/stream" + "gitlab.crans.org/nounous/ghostream/messaging" ) -var ( - activeStream map[string]struct{} -) +func autoIngest(streams *messaging.Streams) { + // Subscribe to new stream event + event := make(chan string, 8) + streams.Subscribe(event) -func autoIngest(streams map[string]*stream.Stream) { - // Regulary check existing streams - activeStream = make(map[string]struct{}) - for { - for name, st := range streams { - if strings.Contains(name, "@") { - // Not a source stream, pass - continue - } - - if _, ok := activeStream[name]; ok { - // Stream is already ingested - continue - } - - // Start ingestion - log.Printf("Starting webrtc for '%s'", name) - go ingest(name, st) + // For each new stream + for name := range event { + // Get stream + stream, err := streams.Get(name) + if err != nil { + log.Printf("Failed to get stream '%s'", name) } - // Regulary pull stream list, - // it may be better to tweak the messaging system - // to get an event on a new stream. - time.Sleep(time.Second) + // Get specific quality + // FIXME: make it possible to forward other qualities + qualityName := "source" + quality, err := stream.GetQuality(qualityName) + if err != nil { + log.Printf("Failed to get quality '%s'", qualityName) + } + + // Start forwarding + log.Printf("Starting webrtc for '%s' quality '%s'", name, qualityName) + go ingest(name, quality) } } -func ingest(name string, input *stream.Stream) { +func ingest(name string, q *messaging.Quality) { // Register to get stream videoInput := make(chan []byte, 1024) - input.Register(videoInput) - activeStream[name] = struct{}{} + q.Register(videoInput) // Open a UDP Listener for RTP Packets on port 5004 videoListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5004}) @@ -146,7 +139,7 @@ func ingest(name string, input *stream.Stream) { if err = audioListener.Close(); err != nil { log.Printf("Faited to close UDP listener: %s", err) } - delete(activeStream, name) + q.Unregister(videoInput) } func startFFmpeg(in <-chan []byte) (ffmpeg *exec.Cmd, err error) { diff --git a/stream/webrtc/webrtc.go b/stream/webrtc/webrtc.go index 37390dd..ddaceb3 100644 --- a/stream/webrtc/webrtc.go +++ b/stream/webrtc/webrtc.go @@ -8,7 +8,7 @@ import ( "github.com/pion/webrtc/v3" "gitlab.crans.org/nounous/ghostream/internal/monitoring" - "gitlab.crans.org/nounous/ghostream/stream" + "gitlab.crans.org/nounous/ghostream/messaging" ) // Options holds web package configuration @@ -182,7 +182,7 @@ func getPayloadType(m webrtc.MediaEngine, codecType webrtc.RTPCodecType, codecNa } // Serve WebRTC media streaming server -func Serve(streams map[string]*stream.Stream, remoteSdpChan chan struct { +func Serve(streams *messaging.Streams, remoteSdpChan chan struct { StreamID string RemoteDescription webrtc.SessionDescription }, localSdpChan chan webrtc.SessionDescription, cfg *Options) { diff --git a/stream/webrtc/webrtc_test.go b/stream/webrtc/webrtc_test.go index ee34ca5..8383d70 100644 --- a/stream/webrtc/webrtc_test.go +++ b/stream/webrtc/webrtc_test.go @@ -5,12 +5,12 @@ import ( "testing" "github.com/pion/webrtc/v3" - "gitlab.crans.org/nounous/ghostream/stream" + "gitlab.crans.org/nounous/ghostream/messaging" ) func TestServe(t *testing.T) { // Init streams messaging and WebRTC server - streams := make(map[string]*stream.Stream) + streams := messaging.New() remoteSdpChan := make(chan struct { StreamID string RemoteDescription webrtc.SessionDescription