mirror of
				https://gitlab.crans.org/nounous/ghostream.git
				synced 2025-11-04 07:42:10 +01:00 
			
		
		
		
	Use external configuration to setup multicasts
This commit is contained in:
		@@ -46,3 +46,9 @@ webrtc:
 | 
			
		||||
  # STUN servers, you should host your own Coturn instance
 | 
			
		||||
  STUNServers:
 | 
			
		||||
    - stun:stun.l.google.com:19302
 | 
			
		||||
 | 
			
		||||
# Configuration for the multicast feature
 | 
			
		||||
multicast:
 | 
			
		||||
  outputs:
 | 
			
		||||
  # demo:
 | 
			
		||||
  #   - rtmp://localhost:1925
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										9
									
								
								main.go
									
									
									
									
									
								
							
							
						
						
									
										9
									
								
								main.go
									
									
									
									
									
								
							@@ -3,6 +3,7 @@
 | 
			
		||||
package main
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"gitlab.crans.org/nounous/ghostream/stream/multicast"
 | 
			
		||||
	"log"
 | 
			
		||||
	"strings"
 | 
			
		||||
 | 
			
		||||
@@ -57,6 +58,7 @@ func loadConfiguration() {
 | 
			
		||||
	viper.SetDefault("WebRTC.MinPortUDP", 10000)
 | 
			
		||||
	viper.SetDefault("WebRTC.MaxPortUDP", 10005)
 | 
			
		||||
	viper.SetDefault("WebRTC.STUNServers", []string{"stun:stun.l.google.com:19302"})
 | 
			
		||||
	viper.SetDefault("Multicast.Outputs", make(map[string][]string))
 | 
			
		||||
 | 
			
		||||
	// Copy STUN configuration to clients
 | 
			
		||||
	viper.Set("Web.STUNServers", viper.Get("WebRTC.STUNServers"))
 | 
			
		||||
@@ -68,6 +70,7 @@ func main() {
 | 
			
		||||
	cfg := struct {
 | 
			
		||||
		Auth       auth.Options
 | 
			
		||||
		Monitoring monitoring.Options
 | 
			
		||||
		Multicast  multicast.Options
 | 
			
		||||
		Srt        srt.Options
 | 
			
		||||
		Web        web.Options
 | 
			
		||||
		WebRTC     webrtc.Options
 | 
			
		||||
@@ -93,6 +96,12 @@ func main() {
 | 
			
		||||
	go web.Serve(remoteSdpChan, localSdpChan, &cfg.Web)
 | 
			
		||||
	go webrtc.Serve(remoteSdpChan, localSdpChan, &cfg.WebRTC)
 | 
			
		||||
 | 
			
		||||
	// Init multicast
 | 
			
		||||
	err = multicast.New(&cfg.Multicast)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Fatalln("Failed to load multicast app:", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Wait for routines
 | 
			
		||||
	select {}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -2,33 +2,55 @@ package multicast
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bufio"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"io"
 | 
			
		||||
	"log"
 | 
			
		||||
	"os/exec"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type Options struct {
 | 
			
		||||
	Outputs map[string][]string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	options            Options
 | 
			
		||||
	ffmpegInstances    = make(map[string][]*exec.Cmd)
 | 
			
		||||
	ffmpegInputStreams = make(map[string][]io.WriteCloser)
 | 
			
		||||
	ffmpegInputStreams = make(map[string][]*io.WriteCloser)
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Declare a new open stream and create ffmpeg instances
 | 
			
		||||
// New Load configuration
 | 
			
		||||
func New(cfg *Options) error {
 | 
			
		||||
	options = *cfg
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// RegisterStream Declare a new open stream and create ffmpeg instances
 | 
			
		||||
func RegisterStream(streamKey string) {
 | 
			
		||||
	ffmpegInstances[streamKey] = []*exec.Cmd{}
 | 
			
		||||
	ffmpegInputStreams[streamKey] = []io.WriteCloser{}
 | 
			
		||||
	ffmpegInputStreams[streamKey] = []*io.WriteCloser{}
 | 
			
		||||
 | 
			
		||||
	// TODO Export the list of multicasts
 | 
			
		||||
	for _, stream := range []string{fmt.Sprintf("rtmp://live.twitch.tv/app/%s", "TWITCH_STREAM_KEY")} {
 | 
			
		||||
	for _, stream := range options.Outputs[streamKey] {
 | 
			
		||||
		// Launch FFMPEG instance
 | 
			
		||||
		// TODO Set optimal parameters
 | 
			
		||||
		ffmpeg := exec.Command("ffmpeg", "-re", "-i", "pipe:0", "-f", "flv", "-c:v", "libx264", "-preset",
 | 
			
		||||
			"veryfast", "-maxrate", "3000k", "-bufsize", "6000k", "-pix_fmt", "yuv420p", "-g", "50", "-c:a", "aac",
 | 
			
		||||
			"-b:a", "160k", "-ac", "2", "-ar", "44100", stream)
 | 
			
		||||
 | 
			
		||||
		// Open pipes
 | 
			
		||||
		input, err := ffmpeg.StdinPipe()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			panic(err)
 | 
			
		||||
		}
 | 
			
		||||
		output, err := ffmpeg.StdoutPipe()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			panic(err)
 | 
			
		||||
		}
 | 
			
		||||
		errOutput, err := ffmpeg.StderrPipe()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			panic(err)
 | 
			
		||||
		}
 | 
			
		||||
		ffmpegInstances[streamKey] = append(ffmpegInstances[streamKey], ffmpeg)
 | 
			
		||||
		input, _ := ffmpeg.StdinPipe()
 | 
			
		||||
		ffmpegInputStreams[streamKey] = append(ffmpegInputStreams[streamKey], input)
 | 
			
		||||
		output, _ := ffmpeg.StdoutPipe()
 | 
			
		||||
		ffmpegInputStreams[streamKey] = append(ffmpegInputStreams[streamKey], &input)
 | 
			
		||||
 | 
			
		||||
		if err := ffmpeg.Start(); err != nil {
 | 
			
		||||
			panic(err)
 | 
			
		||||
@@ -38,24 +60,31 @@ func RegisterStream(streamKey string) {
 | 
			
		||||
		go func() {
 | 
			
		||||
			scanner := bufio.NewScanner(output)
 | 
			
		||||
			for scanner.Scan() {
 | 
			
		||||
				log.Println(scanner.Text())
 | 
			
		||||
				log.Println("[FFMPEG " + streamKey + "] " + scanner.Text())
 | 
			
		||||
			}
 | 
			
		||||
		}()
 | 
			
		||||
		// Log also error output
 | 
			
		||||
		go func() {
 | 
			
		||||
			scanner := bufio.NewScanner(errOutput)
 | 
			
		||||
			for scanner.Scan() {
 | 
			
		||||
				log.Println("[FFMPEG ERROR " + streamKey + "] " + scanner.Text())
 | 
			
		||||
			}
 | 
			
		||||
		}()
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// When a SRT packet is received, transmit it to all FFMPEG instances related to the stream key
 | 
			
		||||
// SendPacket When a SRT packet is received, transmit it to all FFMPEG instances related to the stream key
 | 
			
		||||
func SendPacket(streamKey string, data []byte) {
 | 
			
		||||
	for _, stdin := range ffmpegInputStreams[streamKey] {
 | 
			
		||||
		_, err := stdin.Write(data)
 | 
			
		||||
		_, err := (*stdin).Write(data)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			panic(err)
 | 
			
		||||
			log.Println("Error while sending a packet to external streaming server for key "+streamKey, err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// When the stream is ended, close FFMPEG instances
 | 
			
		||||
// CloseConnection When the stream is ended, close FFMPEG instances
 | 
			
		||||
func CloseConnection(streamKey string) {
 | 
			
		||||
	for _, ffmpeg := range ffmpegInstances[streamKey] {
 | 
			
		||||
		if err := ffmpeg.Process.Kill(); err != nil {
 | 
			
		||||
 
 | 
			
		||||
@@ -53,16 +53,18 @@ func Serve(cfg *Options) {
 | 
			
		||||
 | 
			
		||||
		buff := make([]byte, 2048)
 | 
			
		||||
		n, err := s.Read(buff, 10000)
 | 
			
		||||
		multicast.SendPacket("demo", buff[:n])
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			log.Println("Error occurred while reading SRT socket:", err)
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
		if n == 0 {
 | 
			
		||||
			// End of stream
 | 
			
		||||
			multicast.CloseConnection("demo")
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		multicast.SendPacket("demo", buff[:n])
 | 
			
		||||
 | 
			
		||||
		// Unmarshal the incoming packet
 | 
			
		||||
		packet := &rtp.Packet{}
 | 
			
		||||
		if err = packet.Unmarshal(buff[:n]); err != nil {
 | 
			
		||||
@@ -81,10 +83,11 @@ func Serve(cfg *Options) {
 | 
			
		||||
				break
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			multicast.SendPacket("demo", buff[:n])
 | 
			
		||||
 | 
			
		||||
			log.Printf("Received %d bytes", n)
 | 
			
		||||
 | 
			
		||||
			packet := &rtp.Packet{}
 | 
			
		||||
			multicast.SendPacket("demo", buff[:n])
 | 
			
		||||
			if err := packet.Unmarshal(buff[:n]); err != nil {
 | 
			
		||||
				panic(err)
 | 
			
		||||
			}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user