1
0
mirror of https://gitlab.crans.org/nounous/ghostream.git synced 2024-12-22 10:22:19 +00:00

Create audio transcoder

This commit is contained in:
Yohann D'ANELLO 2020-10-18 21:46:36 +02:00
parent b5aac93c97
commit 20776d897c
5 changed files with 147 additions and 1 deletions

View File

@ -2,6 +2,7 @@
package config
import (
"gitlab.crans.org/nounous/ghostream/transcoder/audio"
"net"
"github.com/sherifabdlnaby/configuro"
@ -59,6 +60,10 @@ func New() *Config {
ListenAddress: ":8023",
},
Transcoder: transcoder.Options{
Audio: audio.Options{
Enabled: true,
Bitrate: 160,
},
Text: text.Options{
Enabled: false,
Width: 80,

View File

@ -51,6 +51,8 @@ func ingest(name string, input *stream.Stream) {
input.Register(videoInput)
activeStream[name] = struct{}{}
// TODO Register to all substreams and make RTP packets. Don't transcode in this package.
// Open a UDP Listener for RTP Packets on port 5004
videoListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5004})
if err != nil {

135
transcoder/audio/audio.go Normal file
View File

@ -0,0 +1,135 @@
// Package audio transcode a stream to filter the audio
package audio
import (
"bufio"
"fmt"
"io"
"log"
"os/exec"
"strings"
"time"
"gitlab.crans.org/nounous/ghostream/stream"
)
// Options holds audio package configuration
type Options struct {
Enabled bool
Bitrate int
}
// Init text transcoder
func Init(streams map[string]*stream.Stream, cfg *Options) {
if !cfg.Enabled {
// Audio transcode is not enabled, ignore
return
}
// Regulary check existing streams
for {
for sourceName, sourceStream := range streams {
if strings.Contains(sourceName, "@") {
// Not a source stream, pass
continue
}
// Check that the transcoded stream does not already exist
name := sourceName + "@audio"
_, ok := streams[name]
if ok {
// Stream is already transcoded
continue
}
// Start conversion
log.Printf("Starting audio transcode '%s'", name)
st := stream.New()
streams[name] = st
go transcode(sourceStream, st, cfg)
}
// Regulary pull stream list,
// it may be better to tweak the messaging system
// to get an event on a new stream.
time.Sleep(time.Second)
}
}
// Extract audio from stream
func transcode(input, output *stream.Stream, cfg *Options) {
// Start ffmpeg to transcode video to audio
videoInput := make(chan []byte, 1024)
input.Register(videoInput)
ffmpeg, rawvideo, err := startFFmpeg(videoInput, cfg)
if err != nil {
log.Printf("Error while starting ffmpeg: %s", err)
return
}
dataBuff := make([]byte, 1316) // UDP MTU
for {
n, err := (*rawvideo).Read(dataBuff)
if err != nil {
log.Printf("An error occurred while reading input: %s", err)
break
}
if n == 0 {
// Stream is finished
break
}
output.Broadcast <- dataBuff[:n]
}
// Stop transcode
ffmpeg.Process.Kill()
}
// Start a ffmpeg instance to convert stream into audio
func startFFmpeg(in <-chan []byte, cfg *Options) (*exec.Cmd, *io.ReadCloser, error) {
// TODO in a future release: remove FFMPEG dependency and transcode directly using the libopus API
// FIXME It seems impossible to get a RTP Packet from standard output.
// We need to find a clean solution, without waiting on UDP listeners.
bitrate := fmt.Sprintf("%dk", cfg.Bitrate)
// Use copy audio codec, assume for now that libopus is used by the streamer
ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0",
"-vn", "-c:a", "copy", "-b:a", bitrate, "-f", "rtp", "pipe:1"}
ffmpeg := exec.Command("ffmpeg", ffmpegArgs...)
// Handle errors output
errOutput, err := ffmpeg.StderrPipe()
if err != nil {
return nil, nil, err
}
go func() {
scanner := bufio.NewScanner(errOutput)
for scanner.Scan() {
log.Printf("[AUDIO FFMPEG %s] %s", "demo", scanner.Text())
}
}()
// Handle audio output
output, err := ffmpeg.StdoutPipe()
if err != nil {
return nil, nil, err
}
// Handle stream input
input, err := ffmpeg.StdinPipe()
if err != nil {
return nil, nil, err
}
go func() {
for data := range in {
input.Write(data)
}
}()
// Start process
err = ffmpeg.Start()
return ffmpeg, &output, err
}

View File

@ -0,0 +1 @@
package audio

View File

@ -3,15 +3,18 @@ package transcoder
import (
"gitlab.crans.org/nounous/ghostream/stream"
"gitlab.crans.org/nounous/ghostream/transcoder/audio"
"gitlab.crans.org/nounous/ghostream/transcoder/text"
)
// Options holds text package configuration
type Options struct {
Text text.Options
Text text.Options
Audio audio.Options
}
// Init all transcoders
func Init(streams map[string]*stream.Stream, cfg *Options) {
go text.Init(streams, &cfg.Text)
go audio.Init(streams, &cfg.Audio)
}