diff --git a/docs/ghostream.example.yml b/docs/ghostream.example.yml index 1df72ae..7130496 100644 --- a/docs/ghostream.example.yml +++ b/docs/ghostream.example.yml @@ -46,3 +46,9 @@ webrtc: # STUN servers, you should host your own Coturn instance STUNServers: - stun:stun.l.google.com:19302 + +# Configuration for the multicast feature +multicast: + outputs: + # demo: + # - rtmp://localhost:1925 diff --git a/main.go b/main.go index 5f8f1d3..e1c9342 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( + "gitlab.crans.org/nounous/ghostream/stream/multicast" "log" "strings" @@ -57,6 +58,7 @@ func loadConfiguration() { viper.SetDefault("WebRTC.MinPortUDP", 10000) viper.SetDefault("WebRTC.MaxPortUDP", 10005) viper.SetDefault("WebRTC.STUNServers", []string{"stun:stun.l.google.com:19302"}) + viper.SetDefault("Multicast.Outputs", make(map[string][]string)) // Copy STUN configuration to clients viper.Set("Web.STUNServers", viper.Get("WebRTC.STUNServers")) @@ -68,6 +70,7 @@ func main() { cfg := struct { Auth auth.Options Monitoring monitoring.Options + Multicast multicast.Options Srt srt.Options Web web.Options WebRTC webrtc.Options @@ -93,6 +96,12 @@ func main() { go web.Serve(remoteSdpChan, localSdpChan, &cfg.Web) go webrtc.Serve(remoteSdpChan, localSdpChan, &cfg.WebRTC) + // Init multicast + err = multicast.New(&cfg.Multicast) + if err != nil { + log.Fatalln("Failed to load multicast app:", err) + } + // Wait for routines select {} } diff --git a/main_test.go b/main_test.go index 06ab7d0..7650bd7 100644 --- a/main_test.go +++ b/main_test.go @@ -1 +1,28 @@ package main + +import ( + "github.com/spf13/viper" + "gitlab.crans.org/nounous/ghostream/auth" + "gitlab.crans.org/nounous/ghostream/internal/monitoring" + "gitlab.crans.org/nounous/ghostream/stream/multicast" + "gitlab.crans.org/nounous/ghostream/stream/srt" + "gitlab.crans.org/nounous/ghostream/stream/webrtc" + "gitlab.crans.org/nounous/ghostream/web" + "testing" +) + +// TestLoadConfiguration tests the configuration file loading and the init of some parameters +func TestLoadConfiguration(t *testing.T) { + loadConfiguration() + cfg := struct { + Auth auth.Options + Monitoring monitoring.Options + Multicast multicast.Options + Srt srt.Options + Web web.Options + WebRTC webrtc.Options + }{} + if err := viper.Unmarshal(&cfg); err != nil { + t.Fatal("Failed to load settings", err) + } +} diff --git a/stream/multicast/multicast_test.go b/stream/multicast/multicast_test.go new file mode 100644 index 0000000..aa01d64 --- /dev/null +++ b/stream/multicast/multicast_test.go @@ -0,0 +1 @@ +package multicast diff --git a/stream/multicast/muticast.go b/stream/multicast/muticast.go new file mode 100644 index 0000000..5b1d96c --- /dev/null +++ b/stream/multicast/muticast.go @@ -0,0 +1,95 @@ +package multicast + +import ( + "bufio" + "io" + "log" + "os/exec" +) + +// Options to configure the multicast: +//for each stream key, we can have several additional stream URL where the main stream is redirected to +type Options struct { + Outputs map[string][]string +} + +var ( + options Options + ffmpegInstances = make(map[string]*exec.Cmd) + ffmpegInputStreams = make(map[string]*io.WriteCloser) +) + +// New Load configuration +func New(cfg *Options) error { + options = *cfg + return nil +} + +// RegisterStream Declare a new open stream and create ffmpeg instances +func RegisterStream(streamKey string) { + if len(options.Outputs[streamKey]) == 0 { + return + } + + params := []string{"-re", "-i", "pipe:0"} + for _, stream := range options.Outputs[streamKey] { + params = append(params, "-f", "flv", "-preset", "ultrafast", "-tune", "zerolatency", + "-c", "copy", stream) + } + // Launch FFMPEG instance + ffmpeg := exec.Command("ffmpeg", params...) + + // Open pipes + input, err := ffmpeg.StdinPipe() + if err != nil { + panic(err) + } + output, err := ffmpeg.StdoutPipe() + if err != nil { + panic(err) + } + errOutput, err := ffmpeg.StderrPipe() + if err != nil { + panic(err) + } + ffmpegInstances[streamKey] = ffmpeg + ffmpegInputStreams[streamKey] = &input + + if err := ffmpeg.Start(); err != nil { + panic(err) + } + + // Log ffmpeg output + go func() { + scanner := bufio.NewScanner(output) + for scanner.Scan() { + log.Println("[FFMPEG " + streamKey + "] " + scanner.Text()) + } + }() + // Log also error output + go func() { + scanner := bufio.NewScanner(errOutput) + for scanner.Scan() { + log.Println("[FFMPEG ERROR " + streamKey + "] " + scanner.Text()) + } + }() +} + +// SendPacket When a SRT packet is received, transmit it to all FFMPEG instances related to the stream key +func SendPacket(streamKey string, data []byte) { + stdin := ffmpegInputStreams[streamKey] + _, err := (*stdin).Write(data) + if err != nil { + log.Println("Error while sending a packet to external streaming server for key "+streamKey, err) + } +} + +// CloseConnection When the stream is ended, close FFMPEG instances +func CloseConnection(streamKey string) { + ffmpeg := ffmpegInstances[streamKey] + if err := ffmpeg.Process.Kill(); err != nil { + panic(err) + } + delete(ffmpegInstances, streamKey) + delete(ffmpegInputStreams, streamKey) +} diff --git a/stream/srt/srt.go b/stream/srt/srt.go index ffb98be..d333aa9 100644 --- a/stream/srt/srt.go +++ b/stream/srt/srt.go @@ -1,6 +1,7 @@ package srt import ( + "gitlab.crans.org/nounous/ghostream/stream/multicast" "log" "net" "strconv" @@ -56,26 +57,35 @@ func Serve(cfg *Options) { // Create a new buffer buff := make([]byte, 2048) + // Setup linked multicasts + multicast.RegisterStream("demo") // FIXME Replace with real stream key + // Read RTP packets forever and send them to the WebRTC Client for { n, err := s.Read(buff, 10000) if err != nil { log.Println("Error occured while reading SRT socket:", err) + multicast.CloseConnection("demo") break } if n == 0 { // End of stream log.Printf("Received no bytes, stopping stream.") + multicast.CloseConnection("demo") break } log.Printf("Received %d bytes", n) + // Send raw packet to other streams + multicast.SendPacket("demo", buff[:n]) + // Unmarshal incoming packet packet := &rtp.Packet{} if err := packet.Unmarshal(buff[:n]); err != nil { log.Println("Error occured while unmarshaling SRT:", err) + multicast.CloseConnection("demo") break }