From 8498829e08c7ab22651c06c88bd73c267b45e5ef Mon Sep 17 00:00:00 2001 From: Yohann D'ANELLO Date: Fri, 2 Oct 2020 23:35:01 +0200 Subject: [PATCH] Support stream listening --- stream/srt/srt.go | 44 ++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 40 insertions(+), 4 deletions(-) diff --git a/stream/srt/srt.go b/stream/srt/srt.go index 9729164..ae50ea3 100644 --- a/stream/srt/srt.go +++ b/stream/srt/srt.go @@ -72,21 +72,35 @@ func Serve(cfg *Options, backend auth.Backend, forwarding chan Packet) { // FIXME: See srtgo.SocketOptions and value, err := s.GetSockOptString to get parameters // http://ffmpeg.org/ffmpeg-protocols.html#srt + // FIXME: Get the stream type + streamStarted := false + // FIXME Better structure + clientDataChannels := make([]chan Packet, cfg.MaxClients) + listeners := 0 + for { // Wait for new connection s, err := sck.Accept() if err != nil { - log.Println("Error occurred while accepting request:", err) - break // FIXME: should not break here + // log.Println("Error occurred while accepting request:", err) + continue // break // FIXME: should not break here } - go acceptSocket(s) + if !streamStarted { + go acceptCallerSocket(s, clientDataChannels, &listeners) + streamStarted = true + } else { + dataChannel := make(chan Packet) + clientDataChannels[listeners] = dataChannel + listeners += 1 + go acceptListeningSocket(s, dataChannel) + } } sck.Close() } -func acceptSocket(s *srtgo.SrtSocket) { +func acceptCallerSocket(s *srtgo.SrtSocket, clientDataChannels []chan Packet, listeners *int) { streamName, err := authenticateSocket(s) if err != nil { log.Println("Authentication failure:", err) @@ -122,6 +136,9 @@ func acceptSocket(s *srtgo.SrtSocket) { data := make([]byte, n) copy(data, buff[:n]) forwardingChannel <- Packet{StreamName: streamName, PacketType: "sendData", Data: data} + for i := 0; i < *listeners; i += 1 { + clientDataChannels[i] <- Packet{StreamName: streamName, PacketType: "sendData", Data: data} + } // TODO: Send to WebRTC // See https://github.com/ebml-go/webm/blob/master/reader.go @@ -131,6 +148,25 @@ func acceptSocket(s *srtgo.SrtSocket) { forwardingChannel <- Packet{StreamName: streamName, PacketType: "close", Data: nil} } +func acceptListeningSocket(s *srtgo.SrtSocket, dataChannel chan Packet) { + streamName, err := s.GetSockOptString(C.SRTO_STREAMID) + if err != nil { + panic(err) + } + log.Printf("New listener for stream %s", streamName) + + for { + packet := <-dataChannel + if packet.PacketType == "sendData" { + _, err := s.Write(packet.Data, 10000) + if err != nil { + s.Close() + break + } + } + } +} + func authenticateSocket(s *srtgo.SrtSocket) (string, error) { streamID, err := s.GetSockOptString(C.SRTO_STREAMID) if err != nil {