diff --git a/main.go b/main.go index 3c13dd3..32a1fda 100644 --- a/main.go +++ b/main.go @@ -104,15 +104,16 @@ func main() { remoteSdpChan := make(chan webrtc.SessionDescription) localSdpChan := make(chan webrtc.SessionDescription) - // SRT channel for forwarding + // SRT channel for forwarding and webrtc forwardingChannel := make(chan srt.Packet, 65536) + webrtcChannel := make(chan srt.Packet, 65536) // Start stream, web and monitoring server, and stream forwarding go forwarding.Serve(cfg.Forwarding, forwardingChannel) go monitoring.Serve(&cfg.Monitoring) - go srt.Serve(&cfg.Srt, authBackend, forwardingChannel) + go srt.Serve(&cfg.Srt, authBackend, forwardingChannel, webrtcChannel) go web.Serve(remoteSdpChan, localSdpChan, &cfg.Web) - go webrtc.Serve(remoteSdpChan, localSdpChan, &cfg.WebRTC) + go webrtc.Serve(remoteSdpChan, localSdpChan, webrtcChannel, &cfg.WebRTC) // Wait for routines select {} diff --git a/stream/srt/handler.go b/stream/srt/handler.go index 134ab1f..c0da7eb 100644 --- a/stream/srt/handler.go +++ b/stream/srt/handler.go @@ -6,7 +6,7 @@ import ( "github.com/haivision/srtgo" ) -func handleStreamer(s *srtgo.SrtSocket, name string, clientDataChannels *[]chan Packet, forwardingChannel chan Packet) { +func handleStreamer(s *srtgo.SrtSocket, name string, clientDataChannels *[]chan Packet, forwardingChannel, webrtcChannel chan Packet) { log.Printf("New SRT streamer for stream %s", name) // Create a new buffer @@ -15,6 +15,7 @@ func handleStreamer(s *srtgo.SrtSocket, name string, clientDataChannels *[]chan // Setup stream forwarding forwardingChannel <- Packet{StreamName: name, PacketType: "register", Data: nil} + webrtcChannel <- Packet{StreamName: name, PacketType: "register", Data: nil} // Read RTP packets forever and send them to the WebRTC Client for { @@ -36,12 +37,14 @@ func handleStreamer(s *srtgo.SrtSocket, name string, clientDataChannels *[]chan data := make([]byte, n) copy(data, buff[:n]) forwardingChannel <- Packet{StreamName: name, PacketType: "sendData", Data: data} + webrtcChannel <- Packet{StreamName: name, PacketType: "sendData", Data: data} for _, dataChannel := range *clientDataChannels { dataChannel <- Packet{StreamName: name, PacketType: "sendData", Data: data} } } forwardingChannel <- Packet{StreamName: name, PacketType: "close", Data: nil} + webrtcChannel <- Packet{StreamName: name, PacketType: "close", Data: nil} } func handleViewer(s *srtgo.SrtSocket, name string, dataChannel chan Packet, dataChannels *[]chan Packet) { diff --git a/stream/srt/srt.go b/stream/srt/srt.go index 431ed8b..2850788 100644 --- a/stream/srt/srt.go +++ b/stream/srt/srt.go @@ -43,7 +43,7 @@ func splitHostPort(hostport string) (string, uint16) { } // Serve SRT server -func Serve(cfg *Options, authBackend auth.Backend, forwardingChannel chan Packet) { +func Serve(cfg *Options, authBackend auth.Backend, forwardingChannel, webrtcChannel chan Packet) { // Start SRT in listening mode log.Printf("SRT server listening on %s", cfg.ListenAddress) host, port := splitHostPort(cfg.ListenAddress) @@ -82,7 +82,7 @@ func Serve(cfg *Options, authBackend auth.Backend, forwardingChannel chan Packet } } - go handleStreamer(s, name, &clientDataChannels, forwardingChannel) + go handleStreamer(s, name, &clientDataChannels, forwardingChannel, webrtcChannel) } else { // password was not provided so it is a viewer name := split[0] diff --git a/stream/webrtc/webrtc.go b/stream/webrtc/webrtc.go index 08a9e88..69a4a45 100644 --- a/stream/webrtc/webrtc.go +++ b/stream/webrtc/webrtc.go @@ -13,6 +13,7 @@ import ( "github.com/pion/webrtc/v3/pkg/media/ivfreader" "github.com/pion/webrtc/v3/pkg/media/oggreader" "gitlab.crans.org/nounous/ghostream/internal/monitoring" + "gitlab.crans.org/nounous/ghostream/stream/srt" ) // Options holds web package configuration @@ -242,10 +243,37 @@ func getPayloadType(m webrtc.MediaEngine, codecType webrtc.RTPCodecType, codecNa panic(fmt.Sprintf("Remote peer does not support %s", codecName)) } +func waitForPackets(inputChannel chan srt.Packet) { + for { + var err error = nil + packet := <-inputChannel + switch packet.PacketType { + case "register": + log.Printf("WebRTC RegisterStream %s", packet.StreamName) + break + case "sendData": + log.Printf("WebRTC SendPacket %s", packet.StreamName) + // packet.Data + break + case "close": + log.Printf("WebRTC CloseConnection %s", packet.StreamName) + break + default: + log.Println("Unknown SRT packet type:", packet.PacketType) + break + } + if err != nil { + log.Printf("Error occured while receiving SRT packet of type %s: %s", packet.PacketType, err) + } + } +} + // Serve WebRTC media streaming server -func Serve(remoteSdpChan, localSdpChan chan webrtc.SessionDescription, cfg *Options) { +func Serve(remoteSdpChan, localSdpChan chan webrtc.SessionDescription, inputChannel chan srt.Packet, cfg *Options) { log.Printf("WebRTC server using UDP from %d to %d", cfg.MinPortUDP, cfg.MaxPortUDP) + // FIXME: use data from inputChannel + go waitForPackets(inputChannel) go playVideo() go playAudio()