mirror of
				https://gitlab.crans.org/nounous/ghostream.git
				synced 2025-11-04 07:42:10 +01:00 
			
		
		
		
	Use channels to send SRT packets to forwarded streams
This commit is contained in:
		
							
								
								
									
										7
									
								
								main.go
									
									
									
									
									
								
							
							
						
						
									
										7
									
								
								main.go
									
									
									
									
									
								
							@@ -90,14 +90,17 @@ func main() {
 | 
				
			|||||||
	remoteSdpChan := make(chan webrtc.SessionDescription)
 | 
						remoteSdpChan := make(chan webrtc.SessionDescription)
 | 
				
			||||||
	localSdpChan := make(chan webrtc.SessionDescription)
 | 
						localSdpChan := make(chan webrtc.SessionDescription)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// SRT channel, to propagate forwarding
 | 
				
			||||||
 | 
						forwardingChannel := make(chan srt.Packet)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Start stream, web and monitoring server
 | 
						// Start stream, web and monitoring server
 | 
				
			||||||
	go monitoring.Serve(&cfg.Monitoring)
 | 
						go monitoring.Serve(&cfg.Monitoring)
 | 
				
			||||||
	go srt.Serve(&cfg.Srt)
 | 
						go srt.Serve(&cfg.Srt, forwardingChannel)
 | 
				
			||||||
	go web.Serve(remoteSdpChan, localSdpChan, &cfg.Web)
 | 
						go web.Serve(remoteSdpChan, localSdpChan, &cfg.Web)
 | 
				
			||||||
	go webrtc.Serve(remoteSdpChan, localSdpChan, &cfg.WebRTC)
 | 
						go webrtc.Serve(remoteSdpChan, localSdpChan, &cfg.WebRTC)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Configure stream forwarding
 | 
						// Configure stream forwarding
 | 
				
			||||||
	forwarding.New(cfg.Forwarding)
 | 
						forwarding.New(cfg.Forwarding, forwardingChannel)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Wait for routines
 | 
						// Wait for routines
 | 
				
			||||||
	select {}
 | 
						select {}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -2,6 +2,7 @@ package forwarding
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"bufio"
 | 
						"bufio"
 | 
				
			||||||
 | 
						"gitlab.crans.org/nounous/ghostream/stream/srt"
 | 
				
			||||||
	"io"
 | 
						"io"
 | 
				
			||||||
	"log"
 | 
						"log"
 | 
				
			||||||
	"os/exec"
 | 
						"os/exec"
 | 
				
			||||||
@@ -13,16 +14,43 @@ type Options map[string][]string
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
var (
 | 
					var (
 | 
				
			||||||
	cfg                Options
 | 
						cfg                Options
 | 
				
			||||||
 | 
						forwardingChannel  chan srt.Packet
 | 
				
			||||||
	ffmpegInstances    = make(map[string]*exec.Cmd)
 | 
						ffmpegInstances    = make(map[string]*exec.Cmd)
 | 
				
			||||||
	ffmpegInputStreams = make(map[string]*io.WriteCloser)
 | 
						ffmpegInputStreams = make(map[string]*io.WriteCloser)
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// New Load configuration
 | 
					// New Load configuration and initialize SRT channel
 | 
				
			||||||
func New(c Options) {
 | 
					func New(c Options, channel chan srt.Packet) {
 | 
				
			||||||
	cfg = c
 | 
						cfg = c
 | 
				
			||||||
 | 
						forwardingChannel = channel
 | 
				
			||||||
 | 
						go waitForPackets()
 | 
				
			||||||
	log.Printf("Stream forwarding initialized")
 | 
						log.Printf("Stream forwarding initialized")
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func waitForPackets() {
 | 
				
			||||||
 | 
						for {
 | 
				
			||||||
 | 
							var err error = nil
 | 
				
			||||||
 | 
							packet := <-forwardingChannel
 | 
				
			||||||
 | 
							switch packet.PacketType {
 | 
				
			||||||
 | 
							case "register":
 | 
				
			||||||
 | 
								err = RegisterStream(packet.StreamName)
 | 
				
			||||||
 | 
								break
 | 
				
			||||||
 | 
							case "sendData":
 | 
				
			||||||
 | 
								err = SendPacket(packet.StreamName, packet.Data)
 | 
				
			||||||
 | 
								break
 | 
				
			||||||
 | 
							case "close":
 | 
				
			||||||
 | 
								err = CloseConnection(packet.StreamName)
 | 
				
			||||||
 | 
								break
 | 
				
			||||||
 | 
							default:
 | 
				
			||||||
 | 
								log.Println("Unknown SRT packet type:", packet.PacketType)
 | 
				
			||||||
 | 
								break
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								log.Printf("Error occured while receiving SRT packet of type %s: %s", packet.PacketType, err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// RegisterStream Declare a new open stream and create ffmpeg instances
 | 
					// RegisterStream Declare a new open stream and create ffmpeg instances
 | 
				
			||||||
func RegisterStream(name string) error {
 | 
					func RegisterStream(name string) error {
 | 
				
			||||||
	streams, exist := cfg[name]
 | 
						streams, exist := cfg[name]
 | 
				
			||||||
@@ -80,16 +108,14 @@ func RegisterStream(name string) error {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// SendPacket forward data to all FFMpeg instances related to the stream name
 | 
					// SendPacket forward data to all FFMpeg instances related to the stream name
 | 
				
			||||||
func SendPacket(name string, data []byte) {
 | 
					func SendPacket(name string, data []byte) error {
 | 
				
			||||||
	stdin := ffmpegInputStreams[name]
 | 
						stdin := ffmpegInputStreams[name]
 | 
				
			||||||
	if stdin == nil {
 | 
						if stdin == nil {
 | 
				
			||||||
		// Don't need to forward stream
 | 
							// Don't need to forward stream
 | 
				
			||||||
		return
 | 
							return nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	_, err := (*stdin).Write(data)
 | 
						_, err := (*stdin).Write(data)
 | 
				
			||||||
	if err != nil {
 | 
						return err
 | 
				
			||||||
		log.Printf("Error while sending a packet to external streaming server for key %s: %s", name, err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// CloseConnection When the stream is ended, close FFMPEG instances
 | 
					// CloseConnection When the stream is ended, close FFMPEG instances
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -52,11 +52,14 @@ func TestForwardStream(t *testing.T) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	forwardingList := make(map[string][]string)
 | 
						forwardingList := make(map[string][]string)
 | 
				
			||||||
	forwardingList["demo"] = []string{"rtmp://127.0.0.1:1936/live/app"}
 | 
						forwardingList["demo"] = []string{"rtmp://127.0.0.1:1936/live/app"}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						forwardingChannel = make(chan srt.Packet)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Register forwarding stream list
 | 
						// Register forwarding stream list
 | 
				
			||||||
	New(forwardingList)
 | 
						New(forwardingList, forwardingChannel)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Serve HTTP Server
 | 
						// Serve HTTP Server
 | 
				
			||||||
	go srt.Serve(&srt.Options{ListenAddress: ":9711", MaxClients: 2})
 | 
						go srt.Serve(&srt.Options{ListenAddress: ":9711", MaxClients: 2}, forwardingChannel)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	ffmpeg := exec.Command("ffmpeg",
 | 
						ffmpeg := exec.Command("ffmpeg",
 | 
				
			||||||
		"-i", "http://ftp.crans.org/events/Blender%20OpenMovies/big_buck_bunny_480p_stereo.ogg",
 | 
							"-i", "http://ftp.crans.org/events/Blender%20OpenMovies/big_buck_bunny_480p_stereo.ogg",
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -14,6 +14,12 @@ type Options struct {
 | 
				
			|||||||
	MaxClients    int
 | 
						MaxClients    int
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type Packet struct {
 | 
				
			||||||
 | 
						Data       []byte
 | 
				
			||||||
 | 
						PacketType string
 | 
				
			||||||
 | 
						StreamName string
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Split host and port from listen address
 | 
					// Split host and port from listen address
 | 
				
			||||||
func splitHostPort(hostport string) (string, uint16) {
 | 
					func splitHostPort(hostport string) (string, uint16) {
 | 
				
			||||||
	host, portS, err := net.SplitHostPort(hostport)
 | 
						host, portS, err := net.SplitHostPort(hostport)
 | 
				
			||||||
@@ -31,7 +37,7 @@ func splitHostPort(hostport string) (string, uint16) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Serve SRT server
 | 
					// Serve SRT server
 | 
				
			||||||
func Serve(cfg *Options) {
 | 
					func Serve(cfg *Options, forwardingChannel chan Packet) {
 | 
				
			||||||
	options := make(map[string]string)
 | 
						options := make(map[string]string)
 | 
				
			||||||
	options["transtype"] = "live"
 | 
						options["transtype"] = "live"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -59,10 +65,7 @@ func Serve(cfg *Options) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
		// Setup stream forwarding
 | 
							// Setup stream forwarding
 | 
				
			||||||
		// FIXME: demo should be replaced by stream name
 | 
							// FIXME: demo should be replaced by stream name
 | 
				
			||||||
		/*	if err := forwarding.RegisterStream("demo"); err != nil {
 | 
							forwardingChannel <- Packet{StreamName: "demo", PacketType: "register", Data: nil}
 | 
				
			||||||
			log.Println("Error occurred during forward stream init:", err)
 | 
					 | 
				
			||||||
			break
 | 
					 | 
				
			||||||
		} */
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
		// Read RTP packets forever and send them to the WebRTC Client
 | 
							// Read RTP packets forever and send them to the WebRTC Client
 | 
				
			||||||
		for {
 | 
							for {
 | 
				
			||||||
@@ -81,15 +84,13 @@ func Serve(cfg *Options) {
 | 
				
			|||||||
			// log.Printf("Received %d bytes", n)
 | 
								// log.Printf("Received %d bytes", n)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			// Send raw packet to other streams
 | 
								// Send raw packet to other streams
 | 
				
			||||||
			// forwarding.SendPacket("demo", buff[:n])
 | 
								forwardingChannel <- Packet{StreamName: "demo", PacketType: "sendData", Data: buff[:n]}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			// TODO: Send to WebRTC
 | 
								// TODO: Send to WebRTC
 | 
				
			||||||
			// See https://github.com/ebml-go/webm/blob/master/reader.go
 | 
								// See https://github.com/ebml-go/webm/blob/master/reader.go
 | 
				
			||||||
			//err := videoTrack.WriteSample(media.Sample{Data: data, Samples: uint32(sampleCount)})
 | 
								//err := videoTrack.WriteSample(media.Sample{Data: data, Samples: uint32(sampleCount)})
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		/*	if err := forwarding.CloseConnection("demo"); err != nil {
 | 
							forwardingChannel <- Packet{StreamName: "demo", PacketType: "close", Data: nil}
 | 
				
			||||||
			log.Printf("Failed to close forward stream: %s", err)
 | 
					 | 
				
			||||||
		} */
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -36,7 +36,7 @@ func TestServeSRT(t *testing.T) {
 | 
				
			|||||||
		t.Skip("WARNING: FFMPEG is not installed. Skipping stream test")
 | 
							t.Skip("WARNING: FFMPEG is not installed. Skipping stream test")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	go Serve(&Options{ListenAddress: ":9711", MaxClients: 2})
 | 
						go Serve(&Options{ListenAddress: ":9711", MaxClients: 2}, nil)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	ffmpeg := exec.Command("ffmpeg",
 | 
						ffmpeg := exec.Command("ffmpeg",
 | 
				
			||||||
		"-i", "http://ftp.crans.org/events/Blender%20OpenMovies/big_buck_bunny_480p_stereo.ogg",
 | 
							"-i", "http://ftp.crans.org/events/Blender%20OpenMovies/big_buck_bunny_480p_stereo.ogg",
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user