Support stream listening

This commit is contained in:
Yohann D'ANELLO 2020-10-02 23:35:01 +02:00
parent 087d6eee3b
commit 8498829e08
1 changed files with 40 additions and 4 deletions

View File

@ -72,21 +72,35 @@ func Serve(cfg *Options, backend auth.Backend, forwarding chan Packet) {
// FIXME: See srtgo.SocketOptions and value, err := s.GetSockOptString to get parameters
// http://ffmpeg.org/ffmpeg-protocols.html#srt
// FIXME: Get the stream type
streamStarted := false
// FIXME Better structure
clientDataChannels := make([]chan Packet, cfg.MaxClients)
listeners := 0
for {
// Wait for new connection
s, err := sck.Accept()
if err != nil {
log.Println("Error occurred while accepting request:", err)
break // FIXME: should not break here
// log.Println("Error occurred while accepting request:", err)
continue // break // FIXME: should not break here
}
go acceptSocket(s)
if !streamStarted {
go acceptCallerSocket(s, clientDataChannels, &listeners)
streamStarted = true
} else {
dataChannel := make(chan Packet)
clientDataChannels[listeners] = dataChannel
listeners += 1
go acceptListeningSocket(s, dataChannel)
}
}
sck.Close()
}
func acceptSocket(s *srtgo.SrtSocket) {
func acceptCallerSocket(s *srtgo.SrtSocket, clientDataChannels []chan Packet, listeners *int) {
streamName, err := authenticateSocket(s)
if err != nil {
log.Println("Authentication failure:", err)
@ -122,6 +136,9 @@ func acceptSocket(s *srtgo.SrtSocket) {
data := make([]byte, n)
copy(data, buff[:n])
forwardingChannel <- Packet{StreamName: streamName, PacketType: "sendData", Data: data}
for i := 0; i < *listeners; i += 1 {
clientDataChannels[i] <- Packet{StreamName: streamName, PacketType: "sendData", Data: data}
}
// TODO: Send to WebRTC
// See https://github.com/ebml-go/webm/blob/master/reader.go
@ -131,6 +148,25 @@ func acceptSocket(s *srtgo.SrtSocket) {
forwardingChannel <- Packet{StreamName: streamName, PacketType: "close", Data: nil}
}
func acceptListeningSocket(s *srtgo.SrtSocket, dataChannel chan Packet) {
streamName, err := s.GetSockOptString(C.SRTO_STREAMID)
if err != nil {
panic(err)
}
log.Printf("New listener for stream %s", streamName)
for {
packet := <-dataChannel
if packet.PacketType == "sendData" {
_, err := s.Write(packet.Data, 10000)
if err != nil {
s.Close()
break
}
}
}
}
func authenticateSocket(s *srtgo.SrtSocket) (string, error) {
streamID, err := s.GetSockOptString(C.SRTO_STREAMID)
if err != nil {