// Package audio transcode a stream to filter the audio package audio import ( "bufio" "fmt" "log" "math/rand" "net" "os/exec" "strings" "time" "gitlab.crans.org/nounous/ghostream/stream" ) // Options holds audio package configuration type Options struct { Enabled bool Bitrate int } // Init text transcoder func Init(streams map[string]*stream.Stream, cfg *Options) { if !cfg.Enabled { // Audio transcode is not enabled, ignore return } // Regulary check existing streams for { for sourceName, sourceStream := range streams { if strings.Contains(sourceName, "@") { // Not a source stream, pass continue } // Check that the transcoded stream does not already exist name := sourceName + "@audio" _, ok := streams[name] if ok { // Stream is already transcoded continue } // Start conversion log.Printf("Starting audio transcode '%s'", name) st := stream.New() streams[name] = st go transcode(sourceStream, st, cfg) } // Regulary pull stream list, // it may be better to tweak the messaging system // to get an event on a new stream. time.Sleep(time.Second) } } // Extract audio from stream func transcode(input, output *stream.Stream, cfg *Options) { // Start ffmpeg to transcode video to audio videoInput := make(chan []byte, 1024) input.Register(videoInput) ffmpeg, rawvideo, err := startFFmpeg(videoInput, cfg) if err != nil { log.Printf("Error while starting ffmpeg: %s", err) return } dataBuff := make([]byte, 1316) // UDP MTU for { n, err := (*rawvideo).Read(dataBuff) if err != nil { log.Printf("An error occurred while reading input: %s", err) break } if n == 0 { // Stream is finished break } output.Broadcast <- dataBuff[:n] } // Stop transcode _ = ffmpeg.Process.Kill() _ = rawvideo.Close() } // Start a ffmpeg instance to convert stream into audio func startFFmpeg(in <-chan []byte, cfg *Options) (*exec.Cmd, *net.UDPConn, error) { // TODO in a future release: remove FFMPEG dependency and transcode directly using the libopus API // FIXME It seems impossible to get a RTP Packet from standard output. // We need to find a clean solution, without waiting on UDP listeners. // FIXME We should also not build RTP packets here. port := 0 var udpListener *net.UDPConn var err error for { port = rand.Intn(65535) udpListener, err = net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: port}) if err != nil { if strings.Contains(fmt.Sprintf("%s", err), "address already in use") { continue } return nil, nil, err } break } bitrate := fmt.Sprintf("%dk", cfg.Bitrate) // Use copy audio codec, assume for now that libopus is used by the streamer ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0", "-vn", "-c:a", "copy", "-b:a", bitrate, "-f", "rtp", fmt.Sprintf("rtp://127.0.0.1:%d", port)} ffmpeg := exec.Command("ffmpeg", ffmpegArgs...) // Handle errors output errOutput, err := ffmpeg.StderrPipe() if err != nil { return nil, nil, err } go func() { scanner := bufio.NewScanner(errOutput) for scanner.Scan() { log.Printf("[AUDIO FFMPEG %s] %s", "demo", scanner.Text()) } }() // Handle stream input input, err := ffmpeg.StdinPipe() if err != nil { return nil, nil, err } go func() { for data := range in { _, _ = input.Write(data) } }() // Start process err = ffmpeg.Start() return ffmpeg, udpListener, err }