diff --git a/docs/ghostream.example.yml b/docs/ghostream.example.yml index 1df72ae..7130496 100644 --- a/docs/ghostream.example.yml +++ b/docs/ghostream.example.yml @@ -46,3 +46,9 @@ webrtc: # STUN servers, you should host your own Coturn instance STUNServers: - stun:stun.l.google.com:19302 + +# Configuration for the multicast feature +multicast: + outputs: + # demo: + # - rtmp://localhost:1925 diff --git a/main.go b/main.go index 5f8f1d3..e1c9342 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( + "gitlab.crans.org/nounous/ghostream/stream/multicast" "log" "strings" @@ -57,6 +58,7 @@ func loadConfiguration() { viper.SetDefault("WebRTC.MinPortUDP", 10000) viper.SetDefault("WebRTC.MaxPortUDP", 10005) viper.SetDefault("WebRTC.STUNServers", []string{"stun:stun.l.google.com:19302"}) + viper.SetDefault("Multicast.Outputs", make(map[string][]string)) // Copy STUN configuration to clients viper.Set("Web.STUNServers", viper.Get("WebRTC.STUNServers")) @@ -68,6 +70,7 @@ func main() { cfg := struct { Auth auth.Options Monitoring monitoring.Options + Multicast multicast.Options Srt srt.Options Web web.Options WebRTC webrtc.Options @@ -93,6 +96,12 @@ func main() { go web.Serve(remoteSdpChan, localSdpChan, &cfg.Web) go webrtc.Serve(remoteSdpChan, localSdpChan, &cfg.WebRTC) + // Init multicast + err = multicast.New(&cfg.Multicast) + if err != nil { + log.Fatalln("Failed to load multicast app:", err) + } + // Wait for routines select {} } diff --git a/stream/multicast/muticast.go b/stream/multicast/muticast.go index c74eb91..e948e32 100644 --- a/stream/multicast/muticast.go +++ b/stream/multicast/muticast.go @@ -2,33 +2,55 @@ package multicast import ( "bufio" - "fmt" "io" "log" "os/exec" ) +type Options struct { + Outputs map[string][]string +} + var ( + options Options ffmpegInstances = make(map[string][]*exec.Cmd) - ffmpegInputStreams = make(map[string][]io.WriteCloser) + ffmpegInputStreams = make(map[string][]*io.WriteCloser) ) -// Declare a new open stream and create ffmpeg instances +// New Load configuration +func New(cfg *Options) error { + options = *cfg + return nil +} + +// RegisterStream Declare a new open stream and create ffmpeg instances func RegisterStream(streamKey string) { ffmpegInstances[streamKey] = []*exec.Cmd{} - ffmpegInputStreams[streamKey] = []io.WriteCloser{} + ffmpegInputStreams[streamKey] = []*io.WriteCloser{} // TODO Export the list of multicasts - for _, stream := range []string{fmt.Sprintf("rtmp://live.twitch.tv/app/%s", "TWITCH_STREAM_KEY")} { + for _, stream := range options.Outputs[streamKey] { // Launch FFMPEG instance // TODO Set optimal parameters ffmpeg := exec.Command("ffmpeg", "-re", "-i", "pipe:0", "-f", "flv", "-c:v", "libx264", "-preset", "veryfast", "-maxrate", "3000k", "-bufsize", "6000k", "-pix_fmt", "yuv420p", "-g", "50", "-c:a", "aac", "-b:a", "160k", "-ac", "2", "-ar", "44100", stream) + + // Open pipes + input, err := ffmpeg.StdinPipe() + if err != nil { + panic(err) + } + output, err := ffmpeg.StdoutPipe() + if err != nil { + panic(err) + } + errOutput, err := ffmpeg.StderrPipe() + if err != nil { + panic(err) + } ffmpegInstances[streamKey] = append(ffmpegInstances[streamKey], ffmpeg) - input, _ := ffmpeg.StdinPipe() - ffmpegInputStreams[streamKey] = append(ffmpegInputStreams[streamKey], input) - output, _ := ffmpeg.StdoutPipe() + ffmpegInputStreams[streamKey] = append(ffmpegInputStreams[streamKey], &input) if err := ffmpeg.Start(); err != nil { panic(err) @@ -38,24 +60,31 @@ func RegisterStream(streamKey string) { go func() { scanner := bufio.NewScanner(output) for scanner.Scan() { - log.Println(scanner.Text()) + log.Println("[FFMPEG " + streamKey + "] " + scanner.Text()) + } + }() + // Log also error output + go func() { + scanner := bufio.NewScanner(errOutput) + for scanner.Scan() { + log.Println("[FFMPEG ERROR " + streamKey + "] " + scanner.Text()) } }() } } -// When a SRT packet is received, transmit it to all FFMPEG instances related to the stream key +// SendPacket When a SRT packet is received, transmit it to all FFMPEG instances related to the stream key func SendPacket(streamKey string, data []byte) { for _, stdin := range ffmpegInputStreams[streamKey] { - _, err := stdin.Write(data) + _, err := (*stdin).Write(data) if err != nil { - panic(err) + log.Println("Error while sending a packet to external streaming server for key "+streamKey, err) } } } -// When the stream is ended, close FFMPEG instances +// CloseConnection When the stream is ended, close FFMPEG instances func CloseConnection(streamKey string) { for _, ffmpeg := range ffmpegInstances[streamKey] { if err := ffmpeg.Process.Kill(); err != nil { diff --git a/stream/srt/srt.go b/stream/srt/srt.go index 6289e07..67445a0 100644 --- a/stream/srt/srt.go +++ b/stream/srt/srt.go @@ -53,16 +53,18 @@ func Serve(cfg *Options) { buff := make([]byte, 2048) n, err := s.Read(buff, 10000) - multicast.SendPacket("demo", buff[:n]) if err != nil { log.Println("Error occurred while reading SRT socket:", err) break } if n == 0 { // End of stream + multicast.CloseConnection("demo") break } + multicast.SendPacket("demo", buff[:n]) + // Unmarshal the incoming packet packet := &rtp.Packet{} if err = packet.Unmarshal(buff[:n]); err != nil { @@ -81,10 +83,11 @@ func Serve(cfg *Options) { break } + multicast.SendPacket("demo", buff[:n]) + log.Printf("Received %d bytes", n) packet := &rtp.Packet{} - multicast.SendPacket("demo", buff[:n]) if err := packet.Unmarshal(buff[:n]); err != nil { panic(err) }