1
0
mirror of https://gitlab.crans.org/nounous/ghostream.git synced 2024-12-22 19:42:20 +00:00

Use messaging system in SRT package

This commit is contained in:
Alexandre Iooss 2020-10-17 12:26:24 +02:00
parent e0911ab050
commit 68d4ad8aee
No known key found for this signature in database
GPG Key ID: 6C79278F3FCDCC02
4 changed files with 64 additions and 68 deletions

View File

@ -9,7 +9,7 @@ type Stream struct {
Broadcast chan<- []byte Broadcast chan<- []byte
// Use a map to be able to delete an item // Use a map to be able to delete an item
outputs map[chan<- []byte]struct{} outputs map[chan []byte]struct{}
// Mutex to lock this ressource // Mutex to lock this ressource
lock sync.Mutex lock sync.Mutex
@ -20,7 +20,7 @@ func New() *Stream {
s := &Stream{} s := &Stream{}
broadcast := make(chan []byte, 64) broadcast := make(chan []byte, 64)
s.Broadcast = broadcast s.Broadcast = broadcast
s.outputs = make(map[chan<- []byte]struct{}) s.outputs = make(map[chan []byte]struct{})
go s.run(broadcast) go s.run(broadcast)
return s return s
} }
@ -34,9 +34,9 @@ func (s *Stream) run(broadcast <-chan []byte) {
select { select {
case output <- msg: case output <- msg:
default: default:
// Remove output if failed // If full, do a ring buffer
delete(s.outputs, output) <-output
close(output) output <- msg
} }
} }
}() }()
@ -57,14 +57,14 @@ func (s *Stream) Close() {
} }
// Register a new output on a stream // Register a new output on a stream
func (s *Stream) Register(output chan<- []byte) { func (s *Stream) Register(output chan []byte) {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
s.outputs[output] = struct{}{} s.outputs[output] = struct{}{}
} }
// Unregister removes an output // Unregister removes an output
func (s *Stream) Unregister(output chan<- []byte) { func (s *Stream) Unregister(output chan []byte) {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()

View File

@ -5,23 +5,30 @@ import (
"log" "log"
"github.com/haivision/srtgo" "github.com/haivision/srtgo"
"gitlab.crans.org/nounous/ghostream/stream"
) )
func handleStreamer(s *srtgo.SrtSocket, name string, clientDataChannels map[string][]chan Packet, forwardingChannel, webrtcChannel chan Packet) { func handleStreamer(socket *srtgo.SrtSocket, streams map[string]stream.Stream, name string) {
// Check stream does not exist
if _, ok := streams[name]; ok {
log.Print("Stream already exists, refusing new streamer")
socket.Close()
return
}
// Create stream
log.Printf("New SRT streamer for stream %s", name) log.Printf("New SRT streamer for stream %s", name)
st := *stream.New()
streams[name] = st
// Create a new buffer // Create a new buffer
// UDP packet cannot be larger than MTU (1500) // UDP packet cannot be larger than MTU (1500)
buff := make([]byte, 1500) buff := make([]byte, 1500)
// Setup stream forwarding
forwardingChannel <- Packet{StreamName: name, PacketType: "register", Data: nil}
webrtcChannel <- Packet{StreamName: name, PacketType: "register", Data: nil}
// Read RTP packets forever and send them to the WebRTC Client // Read RTP packets forever and send them to the WebRTC Client
for { for {
// 5s timeout // 5s timeout
n, err := s.Read(buff, 5000) n, err := socket.Read(buff, 5000)
if err != nil { if err != nil {
log.Println("Error occurred while reading SRT socket:", err) log.Println("Error occurred while reading SRT socket:", err)
break break
@ -33,40 +40,50 @@ func handleStreamer(s *srtgo.SrtSocket, name string, clientDataChannels map[stri
break break
} }
// Send raw packet to other streams // Send raw data to other streams
// Copy data in another buffer to ensure that the data would not be overwritten // Copy data in another buffer to ensure that the data would not be overwritten
// FIXME: might be unnecessary
data := make([]byte, n) data := make([]byte, n)
copy(data, buff[:n]) copy(data, buff[:n])
forwardingChannel <- Packet{StreamName: name, PacketType: "sendData", Data: data} st.Broadcast <- data
webrtcChannel <- Packet{StreamName: name, PacketType: "sendData", Data: data}
for _, dataChannel := range clientDataChannels[name] {
dataChannel <- Packet{StreamName: name, PacketType: "sendData", Data: data}
}
} }
forwardingChannel <- Packet{StreamName: name, PacketType: "close", Data: nil} // Close stream
webrtcChannel <- Packet{StreamName: name, PacketType: "close", Data: nil} st.Close()
socket.Close()
delete(streams, name)
} }
func handleViewer(s *srtgo.SrtSocket, name string, dataChannel chan Packet, dataChannels map[string][]chan Packet) { func handleViewer(s *srtgo.SrtSocket, streams map[string]stream.Stream, name string) {
// FIXME Should not pass all dataChannels to one viewer
log.Printf("New SRT viewer for stream %s", name) log.Printf("New SRT viewer for stream %s", name)
// Receive packets from channel and send them // Get requested stream
for { st, ok := streams[name]
packet := <-dataChannel if !ok {
if packet.PacketType == "sendData" { log.Println("Stream does not exist, refusing new viewer")
_, err := s.Write(packet.Data, 10000)
if err != nil {
s.Close()
for i, channel := range dataChannels[name] {
if channel == dataChannel {
dataChannels[name] = append(dataChannels[name][:i], dataChannels[name][i+1:]...)
}
}
return return
} }
// Register new output
c := make(chan []byte, 128)
st.Register(c)
// Receive data and send them
for {
data := <-c
if len(data) < 1 {
log.Print("Remove SRT viewer because of end of stream")
break
}
_, err := s.Write(data, 1000)
if err != nil {
log.Printf("Remove SRT viewer because of sending error, %s", err)
break
} }
} }
// Close output
st.Unregister(c)
s.Close()
} }

View File

@ -12,10 +12,7 @@ import (
"github.com/haivision/srtgo" "github.com/haivision/srtgo"
"gitlab.crans.org/nounous/ghostream/auth" "gitlab.crans.org/nounous/ghostream/auth"
) "gitlab.crans.org/nounous/ghostream/stream"
var (
clientDataChannels map[string][]chan Packet
) )
// Options holds web package configuration // Options holds web package configuration
@ -25,13 +22,6 @@ type Options struct {
MaxClients int MaxClients int
} }
// Packet contains the necessary data to broadcast events like stream creating, packet receiving or stream closing.
type Packet struct {
Data []byte
PacketType string
StreamName string
}
// Split host and port from listen address // Split host and port from listen address
func splitHostPort(hostport string) (string, uint16, error) { func splitHostPort(hostport string) (string, uint16, error) {
host, portS, err := net.SplitHostPort(hostport) host, portS, err := net.SplitHostPort(hostport)
@ -48,13 +38,8 @@ func splitHostPort(hostport string) (string, uint16, error) {
return host, uint16(port64), nil return host, uint16(port64), nil
} }
// GetNumberConnectedSessions get the number of currently connected clients
func GetNumberConnectedSessions(streamID string) int {
return len(clientDataChannels[streamID])
}
// Serve SRT server // Serve SRT server
func Serve(cfg *Options, authBackend auth.Backend, forwardingChannel, webrtcChannel chan Packet) { func Serve(streams map[string]stream.Stream, authBackend auth.Backend, cfg *Options) {
if !cfg.Enabled { if !cfg.Enabled {
// SRT is not enabled, ignore // SRT is not enabled, ignore
return return
@ -75,8 +60,6 @@ func Serve(cfg *Options, authBackend auth.Backend, forwardingChannel, webrtcChan
log.Fatal("Unable to listen for SRT clients:", err) log.Fatal("Unable to listen for SRT clients:", err)
} }
clientDataChannels = make(map[string][]chan Packet)
for { for {
// Wait for new connection // Wait for new connection
s, err := sck.Accept() s, err := sck.Accept()
@ -94,10 +77,6 @@ func Serve(cfg *Options, authBackend auth.Backend, forwardingChannel, webrtcChan
} }
split := strings.Split(streamID, ":") split := strings.Split(streamID, ":")
if clientDataChannels[streamID] == nil {
clientDataChannels[streamID] = make([]chan Packet, 0, cfg.MaxClients)
}
if len(split) > 1 { if len(split) > 1 {
// password was provided so it is a streamer // password was provided so it is a streamer
name, password := split[0], split[1] name, password := split[0], split[1]
@ -110,15 +89,13 @@ func Serve(cfg *Options, authBackend auth.Backend, forwardingChannel, webrtcChan
} }
} }
go handleStreamer(s, name, clientDataChannels, forwardingChannel, webrtcChannel) go handleStreamer(s, streams, name)
} else { } else {
// password was not provided so it is a viewer // password was not provided so it is a viewer
name := split[0] name := split[0]
dataChannel := make(chan Packet, 4096) // Send stream
clientDataChannels[streamID] = append(clientDataChannels[streamID], dataChannel) go handleViewer(s, streams, name)
go handleViewer(s, name, dataChannel, clientDataChannels)
} }
} }
} }

View File

@ -5,6 +5,8 @@ import (
"os/exec" "os/exec"
"testing" "testing"
"time" "time"
"gitlab.crans.org/nounous/ghostream/stream"
) )
// TestSplitHostPort Try to split a host like 127.0.0.1:1234 in host, port (127.0.0.1, 1234à // TestSplitHostPort Try to split a host like 127.0.0.1:1234 in host, port (127.0.0.1, 1234à
@ -55,7 +57,9 @@ func TestServeSRT(t *testing.T) {
t.Skip("WARNING: FFMPEG is not installed. Skipping stream test") t.Skip("WARNING: FFMPEG is not installed. Skipping stream test")
} }
go Serve(&Options{Enabled: true, ListenAddress: ":9711", MaxClients: 2}, nil, nil, nil) // Init streams messaging and SRT server
streams := make(map[string]stream.Stream)
go Serve(streams, nil, &Options{Enabled: true, ListenAddress: ":9711", MaxClients: 2})
ffmpeg := exec.Command("ffmpeg", "-hide_banner", "-loglevel", "error", ffmpeg := exec.Command("ffmpeg", "-hide_banner", "-loglevel", "error",
"-f", "lavfi", "-i", "testsrc=size=640x480:rate=10", "-f", "lavfi", "-i", "testsrc=size=640x480:rate=10",
@ -78,6 +82,4 @@ func TestServeSRT(t *testing.T) {
}() }()
time.Sleep(5 * time.Second) // Delay is in nanoseconds, here 5s time.Sleep(5 * time.Second) // Delay is in nanoseconds, here 5s
// TODO Kill SRT server
} }