From 20776d897c14ce2cdda5dd717295bad1179dba6c Mon Sep 17 00:00:00 2001 From: Yohann D'ANELLO Date: Sun, 18 Oct 2020 21:46:36 +0200 Subject: [PATCH] Create audio transcoder --- internal/config/config.go | 5 ++ stream/webrtc/ingest.go | 2 + transcoder/audio/audio.go | 135 +++++++++++++++++++++++++++++++++ transcoder/audio/audio_test.go | 1 + transcoder/transcoder.go | 5 +- 5 files changed, 147 insertions(+), 1 deletion(-) create mode 100644 transcoder/audio/audio.go create mode 100644 transcoder/audio/audio_test.go diff --git a/internal/config/config.go b/internal/config/config.go index 2a94292..a6259ec 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -2,6 +2,7 @@ package config import ( + "gitlab.crans.org/nounous/ghostream/transcoder/audio" "net" "github.com/sherifabdlnaby/configuro" @@ -59,6 +60,10 @@ func New() *Config { ListenAddress: ":8023", }, Transcoder: transcoder.Options{ + Audio: audio.Options{ + Enabled: true, + Bitrate: 160, + }, Text: text.Options{ Enabled: false, Width: 80, diff --git a/stream/webrtc/ingest.go b/stream/webrtc/ingest.go index ccede0d..4bdc4b2 100644 --- a/stream/webrtc/ingest.go +++ b/stream/webrtc/ingest.go @@ -51,6 +51,8 @@ func ingest(name string, input *stream.Stream) { input.Register(videoInput) activeStream[name] = struct{}{} + // TODO Register to all substreams and make RTP packets. Don't transcode in this package. + // Open a UDP Listener for RTP Packets on port 5004 videoListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5004}) if err != nil { diff --git a/transcoder/audio/audio.go b/transcoder/audio/audio.go new file mode 100644 index 0000000..8886177 --- /dev/null +++ b/transcoder/audio/audio.go @@ -0,0 +1,135 @@ +// Package audio transcode a stream to filter the audio +package audio + +import ( + "bufio" + "fmt" + "io" + "log" + "os/exec" + "strings" + "time" + + "gitlab.crans.org/nounous/ghostream/stream" +) + +// Options holds audio package configuration +type Options struct { + Enabled bool + Bitrate int +} + +// Init text transcoder +func Init(streams map[string]*stream.Stream, cfg *Options) { + if !cfg.Enabled { + // Audio transcode is not enabled, ignore + return + } + + // Regulary check existing streams + for { + for sourceName, sourceStream := range streams { + if strings.Contains(sourceName, "@") { + // Not a source stream, pass + continue + } + + // Check that the transcoded stream does not already exist + name := sourceName + "@audio" + _, ok := streams[name] + if ok { + // Stream is already transcoded + continue + } + + // Start conversion + log.Printf("Starting audio transcode '%s'", name) + st := stream.New() + streams[name] = st + + go transcode(sourceStream, st, cfg) + } + + // Regulary pull stream list, + // it may be better to tweak the messaging system + // to get an event on a new stream. + time.Sleep(time.Second) + } +} + +// Extract audio from stream +func transcode(input, output *stream.Stream, cfg *Options) { + // Start ffmpeg to transcode video to audio + videoInput := make(chan []byte, 1024) + input.Register(videoInput) + ffmpeg, rawvideo, err := startFFmpeg(videoInput, cfg) + if err != nil { + log.Printf("Error while starting ffmpeg: %s", err) + return + } + + dataBuff := make([]byte, 1316) // UDP MTU + for { + n, err := (*rawvideo).Read(dataBuff) + if err != nil { + log.Printf("An error occurred while reading input: %s", err) + break + } + if n == 0 { + // Stream is finished + break + } + + output.Broadcast <- dataBuff[:n] + } + + // Stop transcode + ffmpeg.Process.Kill() +} + +// Start a ffmpeg instance to convert stream into audio +func startFFmpeg(in <-chan []byte, cfg *Options) (*exec.Cmd, *io.ReadCloser, error) { + // TODO in a future release: remove FFMPEG dependency and transcode directly using the libopus API + + // FIXME It seems impossible to get a RTP Packet from standard output. + // We need to find a clean solution, without waiting on UDP listeners. + + bitrate := fmt.Sprintf("%dk", cfg.Bitrate) + // Use copy audio codec, assume for now that libopus is used by the streamer + ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0", + "-vn", "-c:a", "copy", "-b:a", bitrate, "-f", "rtp", "pipe:1"} + ffmpeg := exec.Command("ffmpeg", ffmpegArgs...) + + // Handle errors output + errOutput, err := ffmpeg.StderrPipe() + if err != nil { + return nil, nil, err + } + go func() { + scanner := bufio.NewScanner(errOutput) + for scanner.Scan() { + log.Printf("[AUDIO FFMPEG %s] %s", "demo", scanner.Text()) + } + }() + + // Handle audio output + output, err := ffmpeg.StdoutPipe() + if err != nil { + return nil, nil, err + } + + // Handle stream input + input, err := ffmpeg.StdinPipe() + if err != nil { + return nil, nil, err + } + go func() { + for data := range in { + input.Write(data) + } + }() + + // Start process + err = ffmpeg.Start() + return ffmpeg, &output, err +} diff --git a/transcoder/audio/audio_test.go b/transcoder/audio/audio_test.go new file mode 100644 index 0000000..fc8208e --- /dev/null +++ b/transcoder/audio/audio_test.go @@ -0,0 +1 @@ +package audio diff --git a/transcoder/transcoder.go b/transcoder/transcoder.go index bf84170..4e996c0 100644 --- a/transcoder/transcoder.go +++ b/transcoder/transcoder.go @@ -3,15 +3,18 @@ package transcoder import ( "gitlab.crans.org/nounous/ghostream/stream" + "gitlab.crans.org/nounous/ghostream/transcoder/audio" "gitlab.crans.org/nounous/ghostream/transcoder/text" ) // Options holds text package configuration type Options struct { - Text text.Options + Text text.Options + Audio audio.Options } // Init all transcoders func Init(streams map[string]*stream.Stream, cfg *Options) { go text.Init(streams, &cfg.Text) + go audio.Init(streams, &cfg.Audio) }