diff --git a/stream/messaging.go b/stream/messaging.go index a53c427..ea1501c 100644 --- a/stream/messaging.go +++ b/stream/messaging.go @@ -11,6 +11,9 @@ type Stream struct { // Use a map to be able to delete an item outputs map[chan []byte]struct{} + // Count clients for statistics + nbClients int + // Mutex to lock this ressource lock sync.Mutex } @@ -21,6 +24,7 @@ func New() *Stream { broadcast := make(chan []byte, 64) s.Broadcast = broadcast s.outputs = make(map[chan []byte]struct{}) + s.nbClients = 0 go s.run(broadcast) return s } @@ -56,15 +60,20 @@ func (s *Stream) Close() { close(s.Broadcast) } -// Register a new output on a stream -func (s *Stream) Register(output chan []byte) { +// Register a new output on a stream. +// If hidden in true, then do not count this client. +func (s *Stream) Register(output chan []byte, hidden bool) { s.lock.Lock() defer s.lock.Unlock() s.outputs[output] = struct{}{} + if !hidden { + s.nbClients++ + } } -// Unregister removes an output -func (s *Stream) Unregister(output chan []byte) { +// Unregister removes an output. +// If hidden in true, then do not count this client. +func (s *Stream) Unregister(output chan []byte, hidden bool) { s.lock.Lock() defer s.lock.Unlock() @@ -73,10 +82,13 @@ func (s *Stream) Unregister(output chan []byte) { if ok { delete(s.outputs, output) close(output) + if !hidden { + s.nbClients-- + } } } -// Count number of outputs +// Count number of clients func (s *Stream) Count() int { - return len(s.outputs) + return s.nbClients } diff --git a/stream/srt/handler.go b/stream/srt/handler.go index da6528e..8ea3d98 100644 --- a/stream/srt/handler.go +++ b/stream/srt/handler.go @@ -8,7 +8,7 @@ import ( "gitlab.crans.org/nounous/ghostream/stream" ) -func handleStreamer(socket *srtgo.SrtSocket, streams map[string]stream.Stream, name string) { +func handleStreamer(socket *srtgo.SrtSocket, streams map[string]*stream.Stream, name string) { // Check stream does not exist if _, ok := streams[name]; ok { log.Print("Stream already exists, refusing new streamer") @@ -18,7 +18,7 @@ func handleStreamer(socket *srtgo.SrtSocket, streams map[string]stream.Stream, n // Create stream log.Printf("New SRT streamer for stream %s", name) - st := *stream.New() + st := stream.New() streams[name] = st // Create a new buffer @@ -54,7 +54,7 @@ func handleStreamer(socket *srtgo.SrtSocket, streams map[string]stream.Stream, n delete(streams, name) } -func handleViewer(s *srtgo.SrtSocket, streams map[string]stream.Stream, name string) { +func handleViewer(s *srtgo.SrtSocket, streams map[string]*stream.Stream, name string) { log.Printf("New SRT viewer for stream %s", name) // Get requested stream @@ -66,16 +66,16 @@ func handleViewer(s *srtgo.SrtSocket, streams map[string]stream.Stream, name str // Register new output c := make(chan []byte, 128) - st.Register(c) + st.Register(c, false) // Receive data and send them - for { - data := <-c + for data := range c { if len(data) < 1 { log.Print("Remove SRT viewer because of end of stream") break } + // Send data _, err := s.Write(data, 1000) if err != nil { log.Printf("Remove SRT viewer because of sending error, %s", err) @@ -84,6 +84,6 @@ func handleViewer(s *srtgo.SrtSocket, streams map[string]stream.Stream, name str } // Close output - st.Unregister(c) + st.Unregister(c, false) s.Close() } diff --git a/stream/srt/srt.go b/stream/srt/srt.go index dad5f4e..6efe7a2 100644 --- a/stream/srt/srt.go +++ b/stream/srt/srt.go @@ -39,7 +39,7 @@ func splitHostPort(hostport string) (string, uint16, error) { } // Serve SRT server -func Serve(streams map[string]stream.Stream, authBackend auth.Backend, cfg *Options) { +func Serve(streams map[string]*stream.Stream, authBackend auth.Backend, cfg *Options) { if !cfg.Enabled { // SRT is not enabled, ignore return diff --git a/stream/srt/srt_test.go b/stream/srt/srt_test.go index 015537d..2ac869c 100644 --- a/stream/srt/srt_test.go +++ b/stream/srt/srt_test.go @@ -58,7 +58,7 @@ func TestServeSRT(t *testing.T) { } // Init streams messaging and SRT server - streams := make(map[string]stream.Stream) + streams := make(map[string]*stream.Stream) go Serve(streams, nil, &Options{Enabled: true, ListenAddress: ":9711", MaxClients: 2}) ffmpeg := exec.Command("ffmpeg", "-hide_banner", "-loglevel", "error", diff --git a/web/web.go b/web/web.go index a6c6d2f..1f10a66 100644 --- a/web/web.go +++ b/web/web.go @@ -44,7 +44,7 @@ var ( templates *template.Template // Streams to get statistics - streams map[string]stream.Stream + streams map[string]*stream.Stream ) // Load templates with pkger @@ -78,7 +78,7 @@ func loadTemplates() error { } // Serve HTTP server -func Serve(s map[string]stream.Stream, rSdpChan chan struct { +func Serve(s map[string]*stream.Stream, rSdpChan chan struct { StreamID string RemoteDescription webrtc.SessionDescription }, lSdpChan chan webrtc.SessionDescription, c *Options) { diff --git a/web/web_test.go b/web/web_test.go index 485b932..54378f3 100644 --- a/web/web_test.go +++ b/web/web_test.go @@ -11,7 +11,7 @@ import ( // TestHTTPServe tries to serve a real HTTP server and load some pages func TestHTTPServe(t *testing.T) { // Init streams messaging - streams := make(map[string]stream.Stream) + streams := make(map[string]*stream.Stream) // Create a disabled web server go Serve(streams, nil, nil, &Options{Enabled: false, ListenAddress: "127.0.0.1:8081"})