mirror of
				https://gitlab.crans.org/nounous/ghostream.git
				synced 2025-10-31 21:24:29 +01:00 
			
		
		
		
	Remove cfg and forwardingChannel globals in forwarding package
This commit is contained in:
		
							
								
								
									
										2
									
								
								main.go
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								main.go
									
									
									
									
									
								
							| @@ -109,7 +109,7 @@ func main() { | ||||
| 	webrtcChannel := make(chan srt.Packet, 65536) | ||||
|  | ||||
| 	// Start stream, web and monitoring server, and stream forwarding | ||||
| 	go forwarding.Serve(cfg.Forwarding, forwardingChannel) | ||||
| 	go forwarding.Serve(forwardingChannel, cfg.Forwarding) | ||||
| 	go monitoring.Serve(&cfg.Monitoring) | ||||
| 	go srt.Serve(&cfg.Srt, authBackend, forwardingChannel, webrtcChannel) | ||||
| 	go web.Serve(remoteSdpChan, localSdpChan, &cfg.Web) | ||||
|   | ||||
| @@ -2,10 +2,11 @@ package forwarding | ||||
|  | ||||
| import ( | ||||
| 	"bufio" | ||||
| 	"gitlab.crans.org/nounous/ghostream/stream/srt" | ||||
| 	"io" | ||||
| 	"log" | ||||
| 	"os/exec" | ||||
|  | ||||
| 	"gitlab.crans.org/nounous/ghostream/stream/srt" | ||||
| ) | ||||
|  | ||||
| // Options to configure the stream forwarding. | ||||
| @@ -13,27 +14,20 @@ import ( | ||||
| type Options map[string][]string | ||||
|  | ||||
| var ( | ||||
| 	cfg                Options | ||||
| 	forwardingChannel  chan srt.Packet | ||||
| 	ffmpegInstances    = make(map[string]*exec.Cmd) | ||||
| 	ffmpegInputStreams = make(map[string]*io.WriteCloser) | ||||
| ) | ||||
|  | ||||
| // Serve Load configuration and initialize SRT channel | ||||
| func Serve(c Options, channel chan srt.Packet) { | ||||
| 	cfg = c | ||||
| 	forwardingChannel = channel | ||||
| // Serve handles incoming packets from SRT and forward them to other external services | ||||
| func Serve(inputChannel chan srt.Packet, cfg Options) { | ||||
| 	log.Printf("Stream forwarding initialized") | ||||
| 	waitForPackets() | ||||
| } | ||||
|  | ||||
| func waitForPackets() { | ||||
| 	for { | ||||
| 		var err error = nil | ||||
| 		packet := <-forwardingChannel | ||||
| 		// Wait for packets | ||||
| 		packet := <-inputChannel | ||||
| 		switch packet.PacketType { | ||||
| 		case "register": | ||||
| 			err = RegisterStream(packet.StreamName) | ||||
| 			err = RegisterStream(packet.StreamName, cfg) | ||||
| 			break | ||||
| 		case "sendData": | ||||
| 			err = SendPacket(packet.StreamName, packet.Data) | ||||
| @@ -52,7 +46,7 @@ func waitForPackets() { | ||||
| } | ||||
|  | ||||
| // RegisterStream Declare a new open stream and create ffmpeg instances | ||||
| func RegisterStream(name string) error { | ||||
| func RegisterStream(name string, cfg Options) error { | ||||
| 	streams, exist := cfg[name] | ||||
| 	if !exist || len(streams) == 0 { | ||||
| 		// Nothing to do, not configured | ||||
|   | ||||
| @@ -2,15 +2,17 @@ package forwarding | ||||
|  | ||||
| import ( | ||||
| 	"bufio" | ||||
| 	"gitlab.crans.org/nounous/ghostream/stream/srt" | ||||
| 	"log" | ||||
| 	"os/exec" | ||||
| 	"testing" | ||||
| 	"time" | ||||
|  | ||||
| 	"gitlab.crans.org/nounous/ghostream/stream/srt" | ||||
| ) | ||||
|  | ||||
| // TestServeSRT Serve a SRT server, stream content during 5 seconds and ensure that it is well received | ||||
| func TestForwardStream(t *testing.T) { | ||||
| 	// Check that ffmpeg is installed | ||||
| 	which := exec.Command("which", "ffmpeg") | ||||
| 	if err := which.Start(); err != nil { | ||||
| 		t.Fatal("Error while checking if ffmpeg got installed:", err) | ||||
| @@ -24,6 +26,7 @@ func TestForwardStream(t *testing.T) { | ||||
| 		t.Skip("WARNING: FFMPEG is not installed. Skipping stream test") | ||||
| 	} | ||||
|  | ||||
| 	// Start virtual RTMP server with ffmpeg | ||||
| 	forwardedFfmpeg := exec.Command("ffmpeg", "-y", // allow overwrite /dev/null | ||||
| 		"-listen", "1", "-i", "rtmp://127.0.0.1:1936/live/app", "-f", "null", "-c", "copy", "/dev/null") | ||||
| 	forwardingOutput, err := forwardedFfmpeg.StdoutPipe() | ||||
| @@ -31,7 +34,6 @@ func TestForwardStream(t *testing.T) { | ||||
| 	if err != nil { | ||||
| 		t.Fatal("Error while querying ffmpeg forwardingOutput:", err) | ||||
| 	} | ||||
|  | ||||
| 	if err := forwardedFfmpeg.Start(); err != nil { | ||||
| 		t.Fatal("Error while starting forwarding stream ffmpeg instance:", err) | ||||
| 	} | ||||
| @@ -53,13 +55,13 @@ func TestForwardStream(t *testing.T) { | ||||
| 	forwardingList := make(map[string][]string) | ||||
| 	forwardingList["demo"] = []string{"rtmp://127.0.0.1:1936/live/app"} | ||||
|  | ||||
| 	forwardingChannel = make(chan srt.Packet) | ||||
| 	forwardingChannel := make(chan srt.Packet) | ||||
|  | ||||
| 	// Register forwarding stream list | ||||
| 	go Serve(forwardingList, forwardingChannel) | ||||
| 	go Serve(forwardingChannel, forwardingList) | ||||
|  | ||||
| 	// Serve HTTP Server | ||||
| 	go srt.Serve(&srt.Options{ListenAddress: ":9712", MaxClients: 2}, nil, forwardingChannel) | ||||
| 	// Serve SRT Server without authentification backend | ||||
| 	go srt.Serve(&srt.Options{ListenAddress: ":9712", MaxClients: 2}, nil, forwardingChannel, nil) | ||||
|  | ||||
| 	ffmpeg := exec.Command("ffmpeg", | ||||
| 		"-re", "-f", "lavfi", "-i", "testsrc=size=640x480:rate=10", | ||||
|   | ||||
| @@ -36,7 +36,7 @@ func TestServeSRT(t *testing.T) { | ||||
| 		t.Skip("WARNING: FFMPEG is not installed. Skipping stream test") | ||||
| 	} | ||||
|  | ||||
| 	go Serve(&Options{ListenAddress: ":9711", MaxClients: 2}, nil, nil) | ||||
| 	go Serve(&Options{ListenAddress: ":9711", MaxClients: 2}, nil, nil, nil) | ||||
|  | ||||
| 	ffmpeg := exec.Command("ffmpeg", | ||||
| 		"-i", "http://ftp.crans.org/events/Blender%20OpenMovies/big_buck_bunny_480p_stereo.ogg", | ||||
|   | ||||
| @@ -281,7 +281,6 @@ func Serve(remoteSdpChan, localSdpChan chan webrtc.SessionDescription, inputChan | ||||
| 	for { | ||||
| 		// Wait for incoming session description | ||||
| 		// then send the local description to browser | ||||
| 		offer := <-remoteSdpChan | ||||
| 		localSdpChan <- newPeerHandler(offer, cfg) | ||||
| 		localSdpChan <- newPeerHandler(<-remoteSdpChan, cfg) | ||||
| 	} | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user