From 698b83fe6fe56a414a241fc8cbb610a9fbb115cd Mon Sep 17 00:00:00 2001 From: Yohann D'ANELLO Date: Sun, 18 Oct 2020 22:07:11 +0200 Subject: [PATCH] WebRTC is registering to the audio-transcoded stream --- stream/webrtc/ingest.go | 193 ++++++++++++++++++++++---------------- transcoder/audio/audio.go | 36 ++++--- 2 files changed, 135 insertions(+), 94 deletions(-) diff --git a/stream/webrtc/ingest.go b/stream/webrtc/ingest.go index 4bdc4b2..cf11ce0 100644 --- a/stream/webrtc/ingest.go +++ b/stream/webrtc/ingest.go @@ -2,15 +2,12 @@ package webrtc import ( - "bufio" + "github.com/pion/rtp" + "github.com/pion/webrtc/v3" "log" - "net" - "os/exec" "strings" "time" - "github.com/pion/rtp" - "github.com/pion/webrtc/v3" "gitlab.crans.org/nounous/ghostream/stream" ) @@ -35,7 +32,9 @@ func autoIngest(streams map[string]*stream.Stream) { // Start ingestion log.Printf("Starting webrtc for '%s'", name) - go ingest(name, st) + // FIXME Ensure that the audio stream exist, but that's poop code + time.Sleep(time.Second) + go ingest(name, st, streams[name+"@audio"]) } // Regulary pull stream list, @@ -45,76 +44,19 @@ func autoIngest(streams map[string]*stream.Stream) { } } -func ingest(name string, input *stream.Stream) { +func ingest(name string, input *stream.Stream, audio *stream.Stream) { // Register to get stream videoInput := make(chan []byte, 1024) input.Register(videoInput) + audioInput := make(chan []byte, 1024) + audio.Register(audioInput) activeStream[name] = struct{}{} - // TODO Register to all substreams and make RTP packets. Don't transcode in this package. - - // Open a UDP Listener for RTP Packets on port 5004 - videoListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5004}) - if err != nil { - log.Printf("Faited to open UDP listener %s", err) - return - } - audioListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5005}) - if err != nil { - log.Printf("Faited to open UDP listener %s", err) - return - } - - // Start ffmpag to convert videoInput to video and audio UDP - ffmpeg, err := startFFmpeg(videoInput) - if err != nil { - log.Printf("Error while starting ffmpeg: %s", err) - return - } - - // Receive video + // Receive audio data go func() { - inboundRTPPacket := make([]byte, 1500) // UDP MTU for { - n, _, err := videoListener.ReadFromUDP(inboundRTPPacket) - if err != nil { - log.Printf("Failed to read from UDP: %s", err) - break - } packet := &rtp.Packet{} - if err := packet.Unmarshal(inboundRTPPacket[:n]); err != nil { - log.Printf("Failed to unmarshal RTP srtPacket: %s", err) - continue - } - - if videoTracks[name] == nil { - videoTracks[name] = make([]*webrtc.Track, 0) - } - - // Write RTP srtPacket to all video tracks - // Adapt payload and SSRC to match destination - for _, videoTrack := range videoTracks[name] { - packet.Header.PayloadType = videoTrack.PayloadType() - packet.Header.SSRC = videoTrack.SSRC() - if writeErr := videoTrack.WriteRTP(packet); writeErr != nil { - log.Printf("Failed to write to video track: %s", err) - continue - } - } - } - }() - - // Receive audio - go func() { - inboundRTPPacket := make([]byte, 1500) // UDP MTU - for { - n, _, err := audioListener.ReadFromUDP(inboundRTPPacket) - if err != nil { - log.Printf("Failed to read from UDP: %s", err) - break - } - packet := &rtp.Packet{} - if err := packet.Unmarshal(inboundRTPPacket[:n]); err != nil { + if err := packet.Unmarshal(<-audioInput); err != nil { log.Printf("Failed to unmarshal RTP srtPacket: %s", err) continue } @@ -129,29 +71,116 @@ func ingest(name string, input *stream.Stream) { packet.Header.PayloadType = audioTrack.PayloadType() packet.Header.SSRC = audioTrack.SSRC() if writeErr := audioTrack.WriteRTP(packet); writeErr != nil { - log.Printf("Failed to write to audio track: %s", err) + log.Printf("Failed to write to audio track: %s", writeErr) continue } } } }() - // Wait for stopped ffmpeg - if err = ffmpeg.Wait(); err != nil { - log.Printf("Faited to wait for ffmpeg: %s", err) - } + select {} - // Close UDP listeners - if err = videoListener.Close(); err != nil { - log.Printf("Faited to close UDP listener: %s", err) - } - if err = audioListener.Close(); err != nil { - log.Printf("Faited to close UDP listener: %s", err) - } + // TODO Register to all substreams and make RTP packets. Don't transcode in this package. + + /* // Open a UDP Listener for RTP Packets on port 5004 + videoListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5004}) + if err != nil { + log.Printf("Faited to open UDP listener %s", err) + return + } + audioListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5005}) + if err != nil { + log.Printf("Faited to open UDP listener %s", err) + return + } + + // Start ffmpag to convert videoInput to video and audio UDP + ffmpeg, err := startFFmpeg(videoInput) + if err != nil { + log.Printf("Error while starting ffmpeg: %s", err) + return + } + + // Receive video + go func() { + inboundRTPPacket := make([]byte, 1500) // UDP MTU + for { + n, _, err := videoListener.ReadFromUDP(inboundRTPPacket) + if err != nil { + log.Printf("Failed to read from UDP: %s", err) + break + } + packet := &rtp.Packet{} + if err := packet.Unmarshal(inboundRTPPacket[:n]); err != nil { + log.Printf("Failed to unmarshal RTP srtPacket: %s", err) + continue + } + + if videoTracks[name] == nil { + videoTracks[name] = make([]*webrtc.Track, 0) + } + + // Write RTP srtPacket to all video tracks + // Adapt payload and SSRC to match destination + for _, videoTrack := range videoTracks[name] { + packet.Header.PayloadType = videoTrack.PayloadType() + packet.Header.SSRC = videoTrack.SSRC() + if writeErr := videoTrack.WriteRTP(packet); writeErr != nil { + log.Printf("Failed to write to video track: %s", err) + continue + } + } + } + }() + + // Receive audio + go func() { + inboundRTPPacket := make([]byte, 1500) // UDP MTU + for { + n, _, err := audioListener.ReadFromUDP(inboundRTPPacket) + if err != nil { + log.Printf("Failed to read from UDP: %s", err) + break + } + packet := &rtp.Packet{} + if err := packet.Unmarshal(inboundRTPPacket[:n]); err != nil { + log.Printf("Failed to unmarshal RTP srtPacket: %s", err) + continue + } + + if audioTracks[name] == nil { + audioTracks[name] = make([]*webrtc.Track, 0) + } + + // Write RTP srtPacket to all audio tracks + // Adapt payload and SSRC to match destination + for _, audioTrack := range audioTracks[name] { + packet.Header.PayloadType = audioTrack.PayloadType() + packet.Header.SSRC = audioTrack.SSRC() + if writeErr := audioTrack.WriteRTP(packet); writeErr != nil { + log.Printf("Failed to write to audio track: %s", err) + continue + } + } + } + }() + + // Wait for stopped ffmpeg + if err = ffmpeg.Wait(); err != nil { + log.Printf("Faited to wait for ffmpeg: %s", err) + } + + // Close UDP listeners + if err = videoListener.Close(); err != nil { + log.Printf("Faited to close UDP listener: %s", err) + } + if err = audioListener.Close(); err != nil { + log.Printf("Faited to close UDP listener: %s", err) + }*/ delete(activeStream, name) } -func startFFmpeg(in <-chan []byte) (ffmpeg *exec.Cmd, err error) { +/* func startFFmpeg(in <-chan []byte) (ffmpeg *exec.Cmd, err error) { ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0", "-an", "-vcodec", "libvpx", "-crf", "10", "-cpu-used", "5", "-b:v", "6000k", "-maxrate", "8000k", "-bufsize", "12000k", // TODO Change bitrate when changing quality "-qmin", "10", "-qmax", "42", "-threads", "4", "-deadline", "1", "-error-resilient", "1", @@ -192,4 +221,4 @@ func startFFmpeg(in <-chan []byte) (ffmpeg *exec.Cmd, err error) { // Start process err = ffmpeg.Start() return ffmpeg, err -} +} */ diff --git a/transcoder/audio/audio.go b/transcoder/audio/audio.go index 8886177..108eec8 100644 --- a/transcoder/audio/audio.go +++ b/transcoder/audio/audio.go @@ -4,8 +4,9 @@ package audio import ( "bufio" "fmt" - "io" "log" + "math/rand" + "net" "os/exec" "strings" "time" @@ -84,20 +85,37 @@ func transcode(input, output *stream.Stream, cfg *Options) { } // Stop transcode - ffmpeg.Process.Kill() + _ = ffmpeg.Process.Kill() + _ = rawvideo.Close() } // Start a ffmpeg instance to convert stream into audio -func startFFmpeg(in <-chan []byte, cfg *Options) (*exec.Cmd, *io.ReadCloser, error) { +func startFFmpeg(in <-chan []byte, cfg *Options) (*exec.Cmd, *net.UDPConn, error) { // TODO in a future release: remove FFMPEG dependency and transcode directly using the libopus API // FIXME It seems impossible to get a RTP Packet from standard output. // We need to find a clean solution, without waiting on UDP listeners. + // FIXME We should also not build RTP packets here. + + port := 0 + var udpListener *net.UDPConn + var err error + for { + port = rand.Intn(65535) + udpListener, err = net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: port}) + if err != nil { + if strings.Contains(fmt.Sprintf("%s", err), "address already in use") { + continue + } + return nil, nil, err + } + break + } bitrate := fmt.Sprintf("%dk", cfg.Bitrate) // Use copy audio codec, assume for now that libopus is used by the streamer ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0", - "-vn", "-c:a", "copy", "-b:a", bitrate, "-f", "rtp", "pipe:1"} + "-vn", "-c:a", "copy", "-b:a", bitrate, "-f", "rtp", fmt.Sprintf("rtp://127.0.0.1:%d", port)} ffmpeg := exec.Command("ffmpeg", ffmpegArgs...) // Handle errors output @@ -112,12 +130,6 @@ func startFFmpeg(in <-chan []byte, cfg *Options) (*exec.Cmd, *io.ReadCloser, err } }() - // Handle audio output - output, err := ffmpeg.StdoutPipe() - if err != nil { - return nil, nil, err - } - // Handle stream input input, err := ffmpeg.StdinPipe() if err != nil { @@ -125,11 +137,11 @@ func startFFmpeg(in <-chan []byte, cfg *Options) (*exec.Cmd, *io.ReadCloser, err } go func() { for data := range in { - input.Write(data) + _, _ = input.Write(data) } }() // Start process err = ffmpeg.Start() - return ffmpeg, &output, err + return ffmpeg, udpListener, err }