diff --git a/main.go b/main.go index 163154a..ed2681e 100644 --- a/main.go +++ b/main.go @@ -103,7 +103,7 @@ func main() { remoteSdpChan := make(chan webrtc.SessionDescription) localSdpChan := make(chan webrtc.SessionDescription) - // SRT channel, to propagate forwarding + // SRT channel for forwarding forwardingChannel := make(chan srt.Packet, 65536) // Start stream, web and monitoring server, and stream forwarding diff --git a/stream/srt/handler.go b/stream/srt/handler.go new file mode 100644 index 0000000..17e9bd7 --- /dev/null +++ b/stream/srt/handler.go @@ -0,0 +1,67 @@ +package srt + +import ( + "log" + + "github.com/haivision/srtgo" +) + +func handleStreamer(s *srtgo.SrtSocket, name string, clientDataChannels *[]chan Packet, forwardingChannel chan Packet) { + log.Printf("New SRT streamer for stream %s", name) + + // Create a new buffer + buff := make([]byte, 2048) + + // Setup stream forwarding + forwardingChannel <- Packet{StreamName: name, PacketType: "register", Data: nil} + + // Read RTP packets forever and send them to the WebRTC Client + for { + // UDP packet cannot be larger than MTU (1500) + n, err := s.Read(buff, 1500) + if err != nil { + log.Println("Error occured while reading SRT socket:", err) + break + } + + if n == 0 { + // End of stream + log.Printf("Received no bytes, stopping stream.") + break + } + + // Send raw packet to other streams + // Copy data in another buffer to ensure that the data would not be overwritten + data := make([]byte, n) + copy(data, buff[:n]) + forwardingChannel <- Packet{StreamName: name, PacketType: "sendData", Data: data} + for _, dataChannel := range *clientDataChannels { + dataChannel <- Packet{StreamName: name, PacketType: "sendData", Data: data} + } + } + + forwardingChannel <- Packet{StreamName: name, PacketType: "close", Data: nil} +} + +func handleViewer(s *srtgo.SrtSocket, name string, dataChannel chan Packet, dataChannels *[]chan Packet) { + // FIXME Should not pass all dataChannels to one viewer + + log.Printf("New SRT viewer for stream %s", name) + + // Receive packets from channel and send them + for { + packet := <-dataChannel + if packet.PacketType == "sendData" { + _, err := s.Write(packet.Data, 10000) + if err != nil { + s.Close() + for i, channel := range *dataChannels { + if channel == dataChannel { + *dataChannels = append((*dataChannels)[:i], (*dataChannels)[i+1:]...) + } + } + return + } + } + } +} diff --git a/stream/srt/srt.go b/stream/srt/srt.go index 36b5569..431ed8b 100644 --- a/stream/srt/srt.go +++ b/stream/srt/srt.go @@ -94,67 +94,3 @@ func Serve(cfg *Options, authBackend auth.Backend, forwardingChannel chan Packet } } } - -func handleStreamer(s *srtgo.SrtSocket, name string, clientDataChannels *[]chan Packet, forwardingChannel chan Packet) { - log.Printf("New SRT streamer for stream %s", name) - - // Create a new buffer - buff := make([]byte, 2048) - - // Setup stream forwarding - forwardingChannel <- Packet{StreamName: name, PacketType: "register", Data: nil} - - // Read RTP packets forever and send them to the WebRTC Client - for { - n, err := s.Read(buff, 10000) - if err != nil { - log.Println("Error occured while reading SRT socket:", err) - break - } - - if n == 0 { - // End of stream - log.Printf("Received no bytes, stopping stream.") - break - } - // log.Printf("Received %d bytes", n) - - // Send raw packet to other streams - // Copy data in another buffer to ensure that the data would not be overwritten - data := make([]byte, n) - copy(data, buff[:n]) - forwardingChannel <- Packet{StreamName: name, PacketType: "sendData", Data: data} - for _, dataChannel := range *clientDataChannels { - dataChannel <- Packet{StreamName: name, PacketType: "sendData", Data: data} - } - - // TODO: Send to WebRTC - // See https://github.com/ebml-go/webm/blob/master/reader.go - //err := videoTrack.WriteSample(media.Sample{Data: data, Samples: uint32(sampleCount)}) - } - - forwardingChannel <- Packet{StreamName: name, PacketType: "close", Data: nil} -} - -func handleViewer(s *srtgo.SrtSocket, name string, dataChannel chan Packet, dataChannels *[]chan Packet) { - // FIXME Should not pass all dataChannels to one viewer - - log.Printf("New SRT viewer for stream %s", name) - - // Receive packets from channel and send them - for { - packet := <-dataChannel - if packet.PacketType == "sendData" { - _, err := s.Write(packet.Data, 10000) - if err != nil { - s.Close() - for i, channel := range *dataChannels { - if channel == dataChannel { - *dataChannels = append((*dataChannels)[:i], (*dataChannels)[i+1:]...) - } - } - return - } - } - } -}