diff --git a/stream/multicast/muticast.go b/stream/multicast/muticast.go new file mode 100644 index 0000000..c74eb91 --- /dev/null +++ b/stream/multicast/muticast.go @@ -0,0 +1,67 @@ +package multicast + +import ( + "bufio" + "fmt" + "io" + "log" + "os/exec" +) + +var ( + ffmpegInstances = make(map[string][]*exec.Cmd) + ffmpegInputStreams = make(map[string][]io.WriteCloser) +) + +// Declare a new open stream and create ffmpeg instances +func RegisterStream(streamKey string) { + ffmpegInstances[streamKey] = []*exec.Cmd{} + ffmpegInputStreams[streamKey] = []io.WriteCloser{} + + // TODO Export the list of multicasts + for _, stream := range []string{fmt.Sprintf("rtmp://live.twitch.tv/app/%s", "TWITCH_STREAM_KEY")} { + // Launch FFMPEG instance + // TODO Set optimal parameters + ffmpeg := exec.Command("ffmpeg", "-re", "-i", "pipe:0", "-f", "flv", "-c:v", "libx264", "-preset", + "veryfast", "-maxrate", "3000k", "-bufsize", "6000k", "-pix_fmt", "yuv420p", "-g", "50", "-c:a", "aac", + "-b:a", "160k", "-ac", "2", "-ar", "44100", stream) + ffmpegInstances[streamKey] = append(ffmpegInstances[streamKey], ffmpeg) + input, _ := ffmpeg.StdinPipe() + ffmpegInputStreams[streamKey] = append(ffmpegInputStreams[streamKey], input) + output, _ := ffmpeg.StdoutPipe() + + if err := ffmpeg.Start(); err != nil { + panic(err) + } + + // Log ffmpeg output + go func() { + scanner := bufio.NewScanner(output) + for scanner.Scan() { + log.Println(scanner.Text()) + } + }() + } +} + +// When a SRT packet is received, transmit it to all FFMPEG instances related to the stream key +func SendPacket(streamKey string, data []byte) { + for _, stdin := range ffmpegInputStreams[streamKey] { + _, err := stdin.Write(data) + if err != nil { + panic(err) + } + } + +} + +// When the stream is ended, close FFMPEG instances +func CloseConnection(streamKey string) { + for _, ffmpeg := range ffmpegInstances[streamKey] { + if err := ffmpeg.Process.Kill(); err != nil { + panic(err) + } + } + delete(ffmpegInstances, streamKey) + delete(ffmpegInputStreams, streamKey) +} diff --git a/stream/srt/srt.go b/stream/srt/srt.go index 95157f1..6289e07 100644 --- a/stream/srt/srt.go +++ b/stream/srt/srt.go @@ -1,11 +1,9 @@ package srt import ( - "bufio" - "fmt" + "gitlab.crans.org/nounous/ghostream/stream/multicast" "log" "net" - "os/exec" "strconv" "github.com/haivision/srtgo" @@ -51,14 +49,11 @@ func Serve(cfg *Options) { continue } - // Launch ffmpeg to stream on other RTMP servers - ffmpeg := exec.Command("ffmpeg", "-re", "-i", "pipe:0", "-f", "flv", "-c:v", "libx264", "-preset", "veryfast", "-maxrate", "3000k", "-bufsize", "6000k", "-pix_fmt", "yuv420p", "-g", "50", "-c:a", "aac", "-b:a", "160k", "-ac", "2", "-ar", "44100", fmt.Sprintf("rtmp://live.twitch.tv/app/%s", "TWITCH_STREAM_KEY")) //nolint - ffmpegIn, _ := ffmpeg.StdinPipe() - ffmpegOut, _ := ffmpeg.StderrPipe() + multicast.RegisterStream("demo") // FIXME Replace with real stream key buff := make([]byte, 2048) n, err := s.Read(buff, 10000) - ffmpegIn.Write(buff[:n]) + multicast.SendPacket("demo", buff[:n]) if err != nil { log.Println("Error occurred while reading SRT socket:", err) break @@ -77,30 +72,19 @@ func Serve(cfg *Options) { // videoTrack, err := peerConnection.NewTrack(payloadType, packet.SSRC, "video", "pion") - if err := ffmpeg.Start(); err != nil { - panic(err) - } - - // Log ffmpeg output - go func() { - scanner := bufio.NewScanner(ffmpegOut) - for scanner.Scan() { - log.Println(scanner.Text()) - } - }() - // Read RTP packets forever and send them to the WebRTC Client for { n, err := s.Read(buff, 10000) if err != nil { log.Println("Error occured while reading SRT socket:", err) + multicast.CloseConnection("demo") break } log.Printf("Received %d bytes", n) packet := &rtp.Packet{} - ffmpegIn.Write(buff[:n]) + multicast.SendPacket("demo", buff[:n]) if err := packet.Unmarshal(buff[:n]); err != nil { panic(err) }