mirror of
				https://gitlab.crans.org/nounous/ghostream.git
				synced 2025-11-04 07:42:10 +01:00 
			
		
		
		
	Make webrtc and forwarding work with new messaging
This commit is contained in:
		@@ -2,12 +2,10 @@
 | 
				
			|||||||
package forwarding
 | 
					package forwarding
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"bufio"
 | 
					 | 
				
			||||||
	"io"
 | 
					 | 
				
			||||||
	"log"
 | 
						"log"
 | 
				
			||||||
	"os/exec"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"gitlab.crans.org/nounous/ghostream/stream/srt"
 | 
						"gitlab.crans.org/nounous/ghostream/stream"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Options to configure the stream forwarding.
 | 
					// Options to configure the stream forwarding.
 | 
				
			||||||
@@ -15,21 +13,46 @@ import (
 | 
				
			|||||||
type Options map[string][]string
 | 
					type Options map[string][]string
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Serve handles incoming packets from SRT and forward them to other external services
 | 
					// Serve handles incoming packets from SRT and forward them to other external services
 | 
				
			||||||
func Serve(inputChannel chan srt.Packet, cfg Options) {
 | 
					func Serve(streams map[string]*stream.Stream, cfg Options) {
 | 
				
			||||||
	if len(cfg) < 1 {
 | 
						if len(cfg) < 1 {
 | 
				
			||||||
		// No forwarding, ignore
 | 
							// No forwarding, ignore
 | 
				
			||||||
		for {
 | 
							return
 | 
				
			||||||
			<-inputChannel // Clear input channel
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	log.Printf("Stream forwarding initialized")
 | 
						log.Printf("Stream forwarding initialized")
 | 
				
			||||||
	ffmpegInstances := make(map[string]*exec.Cmd)
 | 
						for {
 | 
				
			||||||
 | 
							for name, st := range streams {
 | 
				
			||||||
 | 
								fwdCfg, ok := cfg[name]
 | 
				
			||||||
 | 
								if !ok {
 | 
				
			||||||
 | 
									// Not configured
 | 
				
			||||||
 | 
									continue
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								// Start forwarding
 | 
				
			||||||
 | 
								log.Printf("Starting forwarding for '%s'", name)
 | 
				
			||||||
 | 
								go forward(st, fwdCfg)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// Regulary pull stream list,
 | 
				
			||||||
 | 
							// it may be better to tweak the messaging system
 | 
				
			||||||
 | 
							// to get an event on a new stream.
 | 
				
			||||||
 | 
							time.Sleep(time.Second)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func forward(st *stream.Stream, fwdCfg []string) {
 | 
				
			||||||
 | 
						// FIXME
 | 
				
			||||||
 | 
						/*ffmpegInstances := make(map[string]*exec.Cmd)
 | 
				
			||||||
	ffmpegInputStreams := make(map[string]*io.WriteCloser)
 | 
						ffmpegInputStreams := make(map[string]*io.WriteCloser)
 | 
				
			||||||
	for {
 | 
						for {
 | 
				
			||||||
		var err error = nil
 | 
							var err error = nil
 | 
				
			||||||
		// Wait for packets
 | 
							// Wait for packets
 | 
				
			||||||
		packet := <-inputChannel
 | 
							// FIXME packet := <-inputChannel
 | 
				
			||||||
 | 
							packet := srt.Packet{
 | 
				
			||||||
 | 
								Data:       []byte{},
 | 
				
			||||||
 | 
								PacketType: "nothing",
 | 
				
			||||||
 | 
								StreamName: "demo",
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
		switch packet.PacketType {
 | 
							switch packet.PacketType {
 | 
				
			||||||
		case "register":
 | 
							case "register":
 | 
				
			||||||
			err = registerStream(packet.StreamName, ffmpegInstances, ffmpegInputStreams, cfg)
 | 
								err = registerStream(packet.StreamName, ffmpegInstances, ffmpegInputStreams, cfg)
 | 
				
			||||||
@@ -47,9 +70,10 @@ func Serve(inputChannel chan srt.Packet, cfg Options) {
 | 
				
			|||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			log.Printf("Error occurred while receiving SRT packet of type %s: %s", packet.PacketType, err)
 | 
								log.Printf("Error occurred while receiving SRT packet of type %s: %s", packet.PacketType, err)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}*/
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/*
 | 
				
			||||||
// registerStream creates ffmpeg instance associated with newly created stream
 | 
					// registerStream creates ffmpeg instance associated with newly created stream
 | 
				
			||||||
func registerStream(name string, ffmpegInstances map[string]*exec.Cmd, ffmpegInputStreams map[string]*io.WriteCloser, cfg Options) error {
 | 
					func registerStream(name string, ffmpegInstances map[string]*exec.Cmd, ffmpegInputStreams map[string]*io.WriteCloser, cfg Options) error {
 | 
				
			||||||
	streams, exist := cfg[name]
 | 
						streams, exist := cfg[name]
 | 
				
			||||||
@@ -119,3 +143,4 @@ func close(name string, ffmpegInstances map[string]*exec.Cmd, ffmpegInputStreams
 | 
				
			|||||||
	delete(ffmpegInputStreams, name)
 | 
						delete(ffmpegInputStreams, name)
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -6,6 +6,7 @@ import (
 | 
				
			|||||||
	"testing"
 | 
						"testing"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"gitlab.crans.org/nounous/ghostream/stream"
 | 
				
			||||||
	"gitlab.crans.org/nounous/ghostream/stream/srt"
 | 
						"gitlab.crans.org/nounous/ghostream/stream/srt"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -30,16 +31,15 @@ func TestForwardStream(t *testing.T) {
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
	}()
 | 
						}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	forwardingList := make(map[string][]string)
 | 
						cfg := make(map[string][]string)
 | 
				
			||||||
	forwardingList["demo"] = []string{"rtmp://127.0.0.1:1936/live/app"}
 | 
						cfg["demo"] = []string{"rtmp://127.0.0.1:1936/live/app"}
 | 
				
			||||||
 | 
					 | 
				
			||||||
	forwardingChannel := make(chan srt.Packet)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Register forwarding stream list
 | 
						// Register forwarding stream list
 | 
				
			||||||
	go Serve(forwardingChannel, forwardingList)
 | 
						streams := make(map[string]*stream.Stream)
 | 
				
			||||||
 | 
						go Serve(streams, cfg)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Serve SRT Server without authentification backend
 | 
						// Serve SRT Server without authentification backend
 | 
				
			||||||
	go srt.Serve(&srt.Options{Enabled: true, ListenAddress: ":9712", MaxClients: 2}, nil, forwardingChannel, nil)
 | 
						go srt.Serve(streams, nil, &srt.Options{Enabled: true, ListenAddress: ":9712", MaxClients: 2})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	ffmpeg := exec.Command("ffmpeg", "-hide_banner", "-loglevel", "error",
 | 
						ffmpeg := exec.Command("ffmpeg", "-hide_banner", "-loglevel", "error",
 | 
				
			||||||
		"-re", "-f", "lavfi", "-i", "testsrc=size=640x480:rate=10",
 | 
							"-re", "-f", "lavfi", "-i", "testsrc=size=640x480:rate=10",
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -3,61 +3,53 @@ package webrtc
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"bufio"
 | 
						"bufio"
 | 
				
			||||||
	"fmt"
 | 
					 | 
				
			||||||
	"io"
 | 
					 | 
				
			||||||
	"log"
 | 
						"log"
 | 
				
			||||||
	"net"
 | 
						"net"
 | 
				
			||||||
	"os/exec"
 | 
						"os/exec"
 | 
				
			||||||
 | 
						"strings"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/pion/rtp"
 | 
						"github.com/pion/rtp"
 | 
				
			||||||
	"github.com/pion/webrtc/v3"
 | 
						"github.com/pion/webrtc/v3"
 | 
				
			||||||
	"gitlab.crans.org/nounous/ghostream/stream/srt"
 | 
						"gitlab.crans.org/nounous/ghostream/stream"
 | 
				
			||||||
	"gitlab.crans.org/nounous/ghostream/stream/telnet"
 | 
					 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
var (
 | 
					var (
 | 
				
			||||||
	ffmpeg      = make(map[string]*exec.Cmd)
 | 
						activeStream map[string]struct{}
 | 
				
			||||||
	ffmpegInput = make(map[string]io.WriteCloser)
 | 
					 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func ingestFrom(inputChannel chan srt.Packet) {
 | 
					func autoIngest(streams map[string]*stream.Stream) {
 | 
				
			||||||
	// FIXME Clean code
 | 
						// Regulary check existing streams
 | 
				
			||||||
 | 
						activeStream = make(map[string]struct{})
 | 
				
			||||||
	for {
 | 
						for {
 | 
				
			||||||
		var err error = nil
 | 
							for name, st := range streams {
 | 
				
			||||||
		srtPacket := <-inputChannel
 | 
								if strings.Contains(name, "@") {
 | 
				
			||||||
		switch srtPacket.PacketType {
 | 
									// Not a source stream, pass
 | 
				
			||||||
		case "register":
 | 
									continue
 | 
				
			||||||
			go registerStream(&srtPacket)
 | 
					 | 
				
			||||||
			break
 | 
					 | 
				
			||||||
		case "sendData":
 | 
					 | 
				
			||||||
			if _, ok := ffmpegInput[srtPacket.StreamName]; !ok {
 | 
					 | 
				
			||||||
				break
 | 
					 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			// FIXME send to stream srtPacket.StreamName
 | 
					
 | 
				
			||||||
			if _, err := ffmpegInput[srtPacket.StreamName].Write(srtPacket.Data); err != nil {
 | 
								if _, ok := activeStream[name]; ok {
 | 
				
			||||||
				log.Printf("Failed to write data to ffmpeg input: %s", err)
 | 
									// Stream is already ingested
 | 
				
			||||||
 | 
									continue
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			break
 | 
					
 | 
				
			||||||
		case "close":
 | 
								// Start ingestion
 | 
				
			||||||
			log.Printf("WebRTC CloseConnection %s", srtPacket.StreamName)
 | 
								log.Printf("Starting webrtc for '%s'", name)
 | 
				
			||||||
			_ = ffmpeg[srtPacket.StreamName].Process.Kill()
 | 
								go ingest(name, st)
 | 
				
			||||||
			_ = ffmpegInput[srtPacket.StreamName].Close()
 | 
					 | 
				
			||||||
			delete(ffmpeg, srtPacket.StreamName)
 | 
					 | 
				
			||||||
			delete(ffmpegInput, srtPacket.StreamName)
 | 
					 | 
				
			||||||
			break
 | 
					 | 
				
			||||||
		default:
 | 
					 | 
				
			||||||
			log.Println("Unknown SRT srtPacket type:", srtPacket.PacketType)
 | 
					 | 
				
			||||||
			break
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			log.Printf("Error occurred while receiving SRT srtPacket of type %s: %s", srtPacket.PacketType, err)
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// Regulary pull stream list,
 | 
				
			||||||
 | 
							// it may be better to tweak the messaging system
 | 
				
			||||||
 | 
							// to get an event on a new stream.
 | 
				
			||||||
 | 
							time.Sleep(time.Second)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func registerStream(srtPacket *srt.Packet) {
 | 
					func ingest(name string, input *stream.Stream) {
 | 
				
			||||||
	log.Printf("WebRTC RegisterStream %s", srtPacket.StreamName)
 | 
						// Register to get stream
 | 
				
			||||||
 | 
						videoInput := make(chan []byte, 1024)
 | 
				
			||||||
 | 
						input.Register(videoInput)
 | 
				
			||||||
 | 
						activeStream[name] = struct{}{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Open a UDP Listener for RTP Packets on port 5004
 | 
						// Open a UDP Listener for RTP Packets on port 5004
 | 
				
			||||||
	videoListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5004})
 | 
						videoListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5004})
 | 
				
			||||||
@@ -70,55 +62,12 @@ func registerStream(srtPacket *srt.Packet) {
 | 
				
			|||||||
		log.Printf("Faited to open UDP listener %s", err)
 | 
							log.Printf("Faited to open UDP listener %s", err)
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	// FIXME Close UDP listeners at the end of the stream, not the end of the routine
 | 
					 | 
				
			||||||
	/*	defer func() {
 | 
					 | 
				
			||||||
		if err = videoListener.Close(); err != nil {
 | 
					 | 
				
			||||||
			log.Printf("Faited to close UDP listener %s", err)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		if err = audioListener.Close(); err != nil {
 | 
					 | 
				
			||||||
			log.Printf("Faited to close UDP listener %s", err)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}() */
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0",
 | 
						// Start ffmpag to convert videoInput to video and audio UDP
 | 
				
			||||||
		"-an", "-vcodec", "libvpx", "-crf", "10", "-cpu-used", "5", "-b:v", "6000k", "-maxrate", "8000k", "-bufsize", "12000k", // TODO Change bitrate when changing quality
 | 
						ffmpeg, err := startFFmpeg(videoInput)
 | 
				
			||||||
		"-qmin", "10", "-qmax", "42", "-threads", "4", "-deadline", "1", "-error-resilient", "1",
 | 
					 | 
				
			||||||
		"-auto-alt-ref", "1",
 | 
					 | 
				
			||||||
		"-f", "rtp", "rtp://127.0.0.1:5004",
 | 
					 | 
				
			||||||
		"-vn", "-acodec", "libopus", "-cpu-used", "5", "-deadline", "1", "-qmin", "10", "-qmax", "42", "-error-resilient", "1", "-auto-alt-ref", "1",
 | 
					 | 
				
			||||||
		"-f", "rtp", "rtp://127.0.0.1:5005"}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Export stream to ascii art
 | 
					 | 
				
			||||||
	if telnet.Cfg.Enabled {
 | 
					 | 
				
			||||||
		bitrate := fmt.Sprintf("%dk", telnet.Cfg.Width*telnet.Cfg.Height/telnet.Cfg.Delay)
 | 
					 | 
				
			||||||
		ffmpegArgs = append(ffmpegArgs,
 | 
					 | 
				
			||||||
			"-an", "-vf", fmt.Sprintf("scale=%dx%d", telnet.Cfg.Width, telnet.Cfg.Height),
 | 
					 | 
				
			||||||
			"-b:v", bitrate, "-minrate", bitrate, "-maxrate", bitrate, "-bufsize", bitrate, "-q", "42", "-pix_fmt", "gray", "-f", "rawvideo", "pipe:1")
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	ffmpeg[srtPacket.StreamName] = exec.Command("ffmpeg", ffmpegArgs...)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	input, err := ffmpeg[srtPacket.StreamName].StdinPipe()
 | 
					 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		panic(err)
 | 
							log.Printf("Error while starting ffmpeg: %s", err)
 | 
				
			||||||
	}
 | 
							return
 | 
				
			||||||
	ffmpegInput[srtPacket.StreamName] = input
 | 
					 | 
				
			||||||
	errOutput, err := ffmpeg[srtPacket.StreamName].StderrPipe()
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		panic(err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Receive raw video output and convert it to ASCII art, then forward it TCP
 | 
					 | 
				
			||||||
	if telnet.Cfg.Enabled {
 | 
					 | 
				
			||||||
		output, err := ffmpeg[srtPacket.StreamName].StdoutPipe()
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			panic(err)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		go telnet.StartASCIIArtStream(srtPacket.StreamName, output)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	if err := ffmpeg[srtPacket.StreamName].Start(); err != nil {
 | 
					 | 
				
			||||||
		panic(err)
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Receive video
 | 
						// Receive video
 | 
				
			||||||
@@ -128,7 +77,7 @@ func registerStream(srtPacket *srt.Packet) {
 | 
				
			|||||||
			n, _, err := videoListener.ReadFromUDP(inboundRTPPacket)
 | 
								n, _, err := videoListener.ReadFromUDP(inboundRTPPacket)
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				log.Printf("Failed to read from UDP: %s", err)
 | 
									log.Printf("Failed to read from UDP: %s", err)
 | 
				
			||||||
				continue
 | 
									break
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			packet := &rtp.Packet{}
 | 
								packet := &rtp.Packet{}
 | 
				
			||||||
			if err := packet.Unmarshal(inboundRTPPacket[:n]); err != nil {
 | 
								if err := packet.Unmarshal(inboundRTPPacket[:n]); err != nil {
 | 
				
			||||||
@@ -136,13 +85,13 @@ func registerStream(srtPacket *srt.Packet) {
 | 
				
			|||||||
				continue
 | 
									continue
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			if videoTracks[srtPacket.StreamName] == nil {
 | 
								if videoTracks[name] == nil {
 | 
				
			||||||
				videoTracks[srtPacket.StreamName] = make([]*webrtc.Track, 0)
 | 
									videoTracks[name] = make([]*webrtc.Track, 0)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			// Write RTP srtPacket to all video tracks
 | 
								// Write RTP srtPacket to all video tracks
 | 
				
			||||||
			// Adapt payload and SSRC to match destination
 | 
								// Adapt payload and SSRC to match destination
 | 
				
			||||||
			for _, videoTrack := range videoTracks[srtPacket.StreamName] {
 | 
								for _, videoTrack := range videoTracks[name] {
 | 
				
			||||||
				packet.Header.PayloadType = videoTrack.PayloadType()
 | 
									packet.Header.PayloadType = videoTrack.PayloadType()
 | 
				
			||||||
				packet.Header.SSRC = videoTrack.SSRC()
 | 
									packet.Header.SSRC = videoTrack.SSRC()
 | 
				
			||||||
				if writeErr := videoTrack.WriteRTP(packet); writeErr != nil {
 | 
									if writeErr := videoTrack.WriteRTP(packet); writeErr != nil {
 | 
				
			||||||
@@ -160,7 +109,7 @@ func registerStream(srtPacket *srt.Packet) {
 | 
				
			|||||||
			n, _, err := audioListener.ReadFromUDP(inboundRTPPacket)
 | 
								n, _, err := audioListener.ReadFromUDP(inboundRTPPacket)
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				log.Printf("Failed to read from UDP: %s", err)
 | 
									log.Printf("Failed to read from UDP: %s", err)
 | 
				
			||||||
				continue
 | 
									break
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			packet := &rtp.Packet{}
 | 
								packet := &rtp.Packet{}
 | 
				
			||||||
			if err := packet.Unmarshal(inboundRTPPacket[:n]); err != nil {
 | 
								if err := packet.Unmarshal(inboundRTPPacket[:n]); err != nil {
 | 
				
			||||||
@@ -168,13 +117,13 @@ func registerStream(srtPacket *srt.Packet) {
 | 
				
			|||||||
				continue
 | 
									continue
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			if audioTracks[srtPacket.StreamName] == nil {
 | 
								if audioTracks[name] == nil {
 | 
				
			||||||
				audioTracks[srtPacket.StreamName] = make([]*webrtc.Track, 0)
 | 
									audioTracks[name] = make([]*webrtc.Track, 0)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			// Write RTP srtPacket to all audio tracks
 | 
								// Write RTP srtPacket to all audio tracks
 | 
				
			||||||
			// Adapt payload and SSRC to match destination
 | 
								// Adapt payload and SSRC to match destination
 | 
				
			||||||
			for _, audioTrack := range audioTracks[srtPacket.StreamName] {
 | 
								for _, audioTrack := range audioTracks[name] {
 | 
				
			||||||
				packet.Header.PayloadType = audioTrack.PayloadType()
 | 
									packet.Header.PayloadType = audioTrack.PayloadType()
 | 
				
			||||||
				packet.Header.SSRC = audioTrack.SSRC()
 | 
									packet.Header.SSRC = audioTrack.SSRC()
 | 
				
			||||||
				if writeErr := audioTrack.WriteRTP(packet); writeErr != nil {
 | 
									if writeErr := audioTrack.WriteRTP(packet); writeErr != nil {
 | 
				
			||||||
@@ -185,10 +134,60 @@ func registerStream(srtPacket *srt.Packet) {
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
	}()
 | 
						}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Wait for stopped ffmpeg
 | 
				
			||||||
 | 
						if err = ffmpeg.Wait(); err != nil {
 | 
				
			||||||
 | 
							log.Printf("Faited to wait for ffmpeg: %s", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Close UDP listeners
 | 
				
			||||||
 | 
						if err = videoListener.Close(); err != nil {
 | 
				
			||||||
 | 
							log.Printf("Faited to close UDP listener: %s", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if err = audioListener.Close(); err != nil {
 | 
				
			||||||
 | 
							log.Printf("Faited to close UDP listener: %s", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						delete(activeStream, name)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func startFFmpeg(in <-chan []byte) (ffmpeg *exec.Cmd, err error) {
 | 
				
			||||||
 | 
						ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0",
 | 
				
			||||||
 | 
							"-an", "-vcodec", "libvpx", "-crf", "10", "-cpu-used", "5", "-b:v", "6000k", "-maxrate", "8000k", "-bufsize", "12000k", // TODO Change bitrate when changing quality
 | 
				
			||||||
 | 
							"-qmin", "10", "-qmax", "42", "-threads", "4", "-deadline", "1", "-error-resilient", "1",
 | 
				
			||||||
 | 
							"-auto-alt-ref", "1",
 | 
				
			||||||
 | 
							"-f", "rtp", "rtp://127.0.0.1:5004",
 | 
				
			||||||
 | 
							"-vn", "-acodec", "libopus", "-cpu-used", "5", "-deadline", "1", "-qmin", "10", "-qmax", "42", "-error-resilient", "1", "-auto-alt-ref", "1",
 | 
				
			||||||
 | 
							"-f", "rtp", "rtp://127.0.0.1:5005"}
 | 
				
			||||||
 | 
						ffmpeg = exec.Command("ffmpeg", ffmpegArgs...)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Handle errors output
 | 
				
			||||||
 | 
						errOutput, err := ffmpeg.StderrPipe()
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	go func() {
 | 
						go func() {
 | 
				
			||||||
		scanner := bufio.NewScanner(errOutput)
 | 
							scanner := bufio.NewScanner(errOutput)
 | 
				
			||||||
		for scanner.Scan() {
 | 
							for scanner.Scan() {
 | 
				
			||||||
			log.Printf("[WEBRTC FFMPEG %s] %s", "demo", scanner.Text())
 | 
								log.Printf("[WEBRTC FFMPEG %s] %s", "demo", scanner.Text())
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}()
 | 
						}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Handle stream input
 | 
				
			||||||
 | 
						input, err := ffmpeg.StdinPipe()
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						go func() {
 | 
				
			||||||
 | 
							for data := range in {
 | 
				
			||||||
 | 
								if _, err := input.Write(data); err != nil {
 | 
				
			||||||
 | 
									log.Printf("Failed to write data to ffmpeg input: %s", err)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// End of stream
 | 
				
			||||||
 | 
							ffmpeg.Process.Kill()
 | 
				
			||||||
 | 
						}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Start process
 | 
				
			||||||
 | 
						err = ffmpeg.Start()
 | 
				
			||||||
 | 
						return ffmpeg, err
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -8,7 +8,7 @@ import (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	"github.com/pion/webrtc/v3"
 | 
						"github.com/pion/webrtc/v3"
 | 
				
			||||||
	"gitlab.crans.org/nounous/ghostream/internal/monitoring"
 | 
						"gitlab.crans.org/nounous/ghostream/internal/monitoring"
 | 
				
			||||||
	"gitlab.crans.org/nounous/ghostream/stream/srt"
 | 
						"gitlab.crans.org/nounous/ghostream/stream"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Options holds web package configuration
 | 
					// Options holds web package configuration
 | 
				
			||||||
@@ -182,12 +182,12 @@ func getPayloadType(m webrtc.MediaEngine, codecType webrtc.RTPCodecType, codecNa
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Serve WebRTC media streaming server
 | 
					// Serve WebRTC media streaming server
 | 
				
			||||||
func Serve(remoteSdpChan chan struct {
 | 
					func Serve(streams map[string]*stream.Stream, remoteSdpChan chan struct {
 | 
				
			||||||
	StreamID          string
 | 
						StreamID          string
 | 
				
			||||||
	RemoteDescription webrtc.SessionDescription
 | 
						RemoteDescription webrtc.SessionDescription
 | 
				
			||||||
}, localSdpChan chan webrtc.SessionDescription, inputChannel chan srt.Packet, cfg *Options) {
 | 
					}, localSdpChan chan webrtc.SessionDescription, cfg *Options) {
 | 
				
			||||||
	if !cfg.Enabled {
 | 
						if !cfg.Enabled {
 | 
				
			||||||
		// SRT is not enabled, ignore
 | 
							// WebRTC is not enabled, ignore
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -197,8 +197,8 @@ func Serve(remoteSdpChan chan struct {
 | 
				
			|||||||
	videoTracks = make(map[string][]*webrtc.Track)
 | 
						videoTracks = make(map[string][]*webrtc.Track)
 | 
				
			||||||
	audioTracks = make(map[string][]*webrtc.Track)
 | 
						audioTracks = make(map[string][]*webrtc.Track)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Ingest data from SRT
 | 
						// Ingest data
 | 
				
			||||||
	go ingestFrom(inputChannel)
 | 
						go autoIngest(streams)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Handle new connections
 | 
						// Handle new connections
 | 
				
			||||||
	for {
 | 
						for {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -5,24 +5,24 @@ import (
 | 
				
			|||||||
	"testing"
 | 
						"testing"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/pion/webrtc/v3"
 | 
						"github.com/pion/webrtc/v3"
 | 
				
			||||||
	"gitlab.crans.org/nounous/ghostream/stream/srt"
 | 
						"gitlab.crans.org/nounous/ghostream/stream"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestServe(t *testing.T) {
 | 
					func TestServe(t *testing.T) {
 | 
				
			||||||
	// Serve WebRTC server
 | 
						// Init streams messaging and WebRTC server
 | 
				
			||||||
 | 
						streams := make(map[string]*stream.Stream)
 | 
				
			||||||
	remoteSdpChan := make(chan struct {
 | 
						remoteSdpChan := make(chan struct {
 | 
				
			||||||
		StreamID          string
 | 
							StreamID          string
 | 
				
			||||||
		RemoteDescription webrtc.SessionDescription
 | 
							RemoteDescription webrtc.SessionDescription
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
	localSdpChan := make(chan webrtc.SessionDescription)
 | 
						localSdpChan := make(chan webrtc.SessionDescription)
 | 
				
			||||||
	webrtcChannel := make(chan srt.Packet, 64)
 | 
					 | 
				
			||||||
	cfg := Options{
 | 
						cfg := Options{
 | 
				
			||||||
		Enabled:     true,
 | 
							Enabled:     true,
 | 
				
			||||||
		MinPortUDP:  10000,
 | 
							MinPortUDP:  10000,
 | 
				
			||||||
		MaxPortUDP:  10005,
 | 
							MaxPortUDP:  10005,
 | 
				
			||||||
		STUNServers: []string{"stun:stun.l.google.com:19302"},
 | 
							STUNServers: []string{"stun:stun.l.google.com:19302"},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	go Serve(remoteSdpChan, localSdpChan, webrtcChannel, &cfg)
 | 
						go Serve(streams, remoteSdpChan, localSdpChan, &cfg)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// New client connection
 | 
						// New client connection
 | 
				
			||||||
	mediaEngine := webrtc.MediaEngine{}
 | 
						mediaEngine := webrtc.MediaEngine{}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -153,5 +153,5 @@ func startFFmpeg(in <-chan []byte, cfg *Options) (*exec.Cmd, *io.ReadCloser, err
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// Start process
 | 
						// Start process
 | 
				
			||||||
	err = ffmpeg.Start()
 | 
						err = ffmpeg.Start()
 | 
				
			||||||
	return ffmpeg, &output, nil
 | 
						return ffmpeg, &output, err
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user