From 73e6be1274cb46fc68de0a3f24ecba8b449b530d Mon Sep 17 00:00:00 2001 From: Alexandre Iooss Date: Sat, 17 Oct 2020 10:02:38 +0200 Subject: [PATCH 01/15] Add Stream messaging struct --- stream/messaging.go | 77 ++++++++++++++++++++++++++++++++++++++++ stream/messaging_test.go | 30 ++++++++++++++++ 2 files changed, 107 insertions(+) create mode 100644 stream/messaging.go create mode 100644 stream/messaging_test.go diff --git a/stream/messaging.go b/stream/messaging.go new file mode 100644 index 0000000..c22e6b0 --- /dev/null +++ b/stream/messaging.go @@ -0,0 +1,77 @@ +// Package stream defines a structure to communication between inputs and outputs +package stream + +import "sync" + +// Stream makes packages able to subscribe to an incoming stream +type Stream struct { + // Incoming data come from this channel + Broadcast chan<- interface{} + + // Use a map to be able to delete an item + outputs map[chan<- interface{}]struct{} + + // Mutex to lock this ressource + lock sync.Mutex +} + +// New creates a new stream. +func New() *Stream { + s := &Stream{} + broadcast := make(chan interface{}, 64) + s.Broadcast = broadcast + s.outputs = make(map[chan<- interface{}]struct{}) + go s.run(broadcast) + return s +} + +func (s *Stream) run(broadcast <-chan interface{}) { + for msg := range broadcast { + func() { + s.lock.Lock() + defer s.lock.Unlock() + for output := range s.outputs { + select { + case output <- msg: + default: + // Remove output if failed + delete(s.outputs, output) + close(output) + } + } + }() + } + + // Incoming chan has been closed, close all outputs + s.lock.Lock() + defer s.lock.Unlock() + for ch := range s.outputs { + delete(s.outputs, ch) + close(ch) + } +} + +// Close the incoming chan, this will also delete all outputs +func (s *Stream) Close() { + close(s.Broadcast) +} + +// Register a new output on a stream +func (s *Stream) Register(output chan<- interface{}) { + s.lock.Lock() + defer s.lock.Unlock() + s.outputs[output] = struct{}{} +} + +// Unregister removes an output +func (s *Stream) Unregister(output chan<- interface{}) { + s.lock.Lock() + defer s.lock.Unlock() + + // Make sure we did not already close this output + _, ok := s.outputs[output] + if ok { + delete(s.outputs, output) + close(output) + } +} diff --git a/stream/messaging_test.go b/stream/messaging_test.go new file mode 100644 index 0000000..178ce75 --- /dev/null +++ b/stream/messaging_test.go @@ -0,0 +1,30 @@ +package stream + +import ( + "testing" +) + +func TestWithoutOutputs(t *testing.T) { + stream := New() + defer stream.Close() + stream.Broadcast <- "hello world" +} + +func TestWithOneOutput(t *testing.T) { + stream := New() + defer stream.Close() + + // Register one output + output := make(chan interface{}, 64) + stream.Register(output) + + // Try to pass one message + stream.Broadcast <- "hello world" + msg := <-output + if m, ok := msg.(string); !ok || m != "hello world" { + t.Error("Message has wrong type or content") + } + + // Unregister + stream.Unregister(output) +} From e0911ab0505804565fd11f4a26283e3b046d1b90 Mon Sep 17 00:00:00 2001 From: Alexandre Iooss Date: Sat, 17 Oct 2020 10:21:40 +0200 Subject: [PATCH 02/15] Use []byte for stream data --- stream/messaging.go | 14 +++++++------- stream/messaging_test.go | 10 +++++----- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/stream/messaging.go b/stream/messaging.go index c22e6b0..71e6b6a 100644 --- a/stream/messaging.go +++ b/stream/messaging.go @@ -6,10 +6,10 @@ import "sync" // Stream makes packages able to subscribe to an incoming stream type Stream struct { // Incoming data come from this channel - Broadcast chan<- interface{} + Broadcast chan<- []byte // Use a map to be able to delete an item - outputs map[chan<- interface{}]struct{} + outputs map[chan<- []byte]struct{} // Mutex to lock this ressource lock sync.Mutex @@ -18,14 +18,14 @@ type Stream struct { // New creates a new stream. func New() *Stream { s := &Stream{} - broadcast := make(chan interface{}, 64) + broadcast := make(chan []byte, 64) s.Broadcast = broadcast - s.outputs = make(map[chan<- interface{}]struct{}) + s.outputs = make(map[chan<- []byte]struct{}) go s.run(broadcast) return s } -func (s *Stream) run(broadcast <-chan interface{}) { +func (s *Stream) run(broadcast <-chan []byte) { for msg := range broadcast { func() { s.lock.Lock() @@ -57,14 +57,14 @@ func (s *Stream) Close() { } // Register a new output on a stream -func (s *Stream) Register(output chan<- interface{}) { +func (s *Stream) Register(output chan<- []byte) { s.lock.Lock() defer s.lock.Unlock() s.outputs[output] = struct{}{} } // Unregister removes an output -func (s *Stream) Unregister(output chan<- interface{}) { +func (s *Stream) Unregister(output chan<- []byte) { s.lock.Lock() defer s.lock.Unlock() diff --git a/stream/messaging_test.go b/stream/messaging_test.go index 178ce75..b2845e2 100644 --- a/stream/messaging_test.go +++ b/stream/messaging_test.go @@ -7,7 +7,7 @@ import ( func TestWithoutOutputs(t *testing.T) { stream := New() defer stream.Close() - stream.Broadcast <- "hello world" + stream.Broadcast <- []byte("hello world") } func TestWithOneOutput(t *testing.T) { @@ -15,14 +15,14 @@ func TestWithOneOutput(t *testing.T) { defer stream.Close() // Register one output - output := make(chan interface{}, 64) + output := make(chan []byte, 64) stream.Register(output) // Try to pass one message - stream.Broadcast <- "hello world" + stream.Broadcast <- []byte("hello world") msg := <-output - if m, ok := msg.(string); !ok || m != "hello world" { - t.Error("Message has wrong type or content") + if string(msg) != "hello world" { + t.Errorf("Message has wrong content: %s != hello world", msg) } // Unregister From 68d4ad8aeea1f940492065fe98a50fb70b5879d7 Mon Sep 17 00:00:00 2001 From: Alexandre Iooss Date: Sat, 17 Oct 2020 12:26:24 +0200 Subject: [PATCH 03/15] Use messaging system in SRT package --- stream/messaging.go | 14 ++++---- stream/srt/handler.go | 77 ++++++++++++++++++++++++++---------------- stream/srt/srt.go | 33 +++--------------- stream/srt/srt_test.go | 8 +++-- 4 files changed, 64 insertions(+), 68 deletions(-) diff --git a/stream/messaging.go b/stream/messaging.go index 71e6b6a..05d55ad 100644 --- a/stream/messaging.go +++ b/stream/messaging.go @@ -9,7 +9,7 @@ type Stream struct { Broadcast chan<- []byte // Use a map to be able to delete an item - outputs map[chan<- []byte]struct{} + outputs map[chan []byte]struct{} // Mutex to lock this ressource lock sync.Mutex @@ -20,7 +20,7 @@ func New() *Stream { s := &Stream{} broadcast := make(chan []byte, 64) s.Broadcast = broadcast - s.outputs = make(map[chan<- []byte]struct{}) + s.outputs = make(map[chan []byte]struct{}) go s.run(broadcast) return s } @@ -34,9 +34,9 @@ func (s *Stream) run(broadcast <-chan []byte) { select { case output <- msg: default: - // Remove output if failed - delete(s.outputs, output) - close(output) + // If full, do a ring buffer + <-output + output <- msg } } }() @@ -57,14 +57,14 @@ func (s *Stream) Close() { } // Register a new output on a stream -func (s *Stream) Register(output chan<- []byte) { +func (s *Stream) Register(output chan []byte) { s.lock.Lock() defer s.lock.Unlock() s.outputs[output] = struct{}{} } // Unregister removes an output -func (s *Stream) Unregister(output chan<- []byte) { +func (s *Stream) Unregister(output chan []byte) { s.lock.Lock() defer s.lock.Unlock() diff --git a/stream/srt/handler.go b/stream/srt/handler.go index c40036e..da6528e 100644 --- a/stream/srt/handler.go +++ b/stream/srt/handler.go @@ -5,23 +5,30 @@ import ( "log" "github.com/haivision/srtgo" + "gitlab.crans.org/nounous/ghostream/stream" ) -func handleStreamer(s *srtgo.SrtSocket, name string, clientDataChannels map[string][]chan Packet, forwardingChannel, webrtcChannel chan Packet) { +func handleStreamer(socket *srtgo.SrtSocket, streams map[string]stream.Stream, name string) { + // Check stream does not exist + if _, ok := streams[name]; ok { + log.Print("Stream already exists, refusing new streamer") + socket.Close() + return + } + + // Create stream log.Printf("New SRT streamer for stream %s", name) + st := *stream.New() + streams[name] = st // Create a new buffer // UDP packet cannot be larger than MTU (1500) buff := make([]byte, 1500) - // Setup stream forwarding - forwardingChannel <- Packet{StreamName: name, PacketType: "register", Data: nil} - webrtcChannel <- Packet{StreamName: name, PacketType: "register", Data: nil} - // Read RTP packets forever and send them to the WebRTC Client for { // 5s timeout - n, err := s.Read(buff, 5000) + n, err := socket.Read(buff, 5000) if err != nil { log.Println("Error occurred while reading SRT socket:", err) break @@ -33,40 +40,50 @@ func handleStreamer(s *srtgo.SrtSocket, name string, clientDataChannels map[stri break } - // Send raw packet to other streams + // Send raw data to other streams // Copy data in another buffer to ensure that the data would not be overwritten + // FIXME: might be unnecessary data := make([]byte, n) copy(data, buff[:n]) - forwardingChannel <- Packet{StreamName: name, PacketType: "sendData", Data: data} - webrtcChannel <- Packet{StreamName: name, PacketType: "sendData", Data: data} - for _, dataChannel := range clientDataChannels[name] { - dataChannel <- Packet{StreamName: name, PacketType: "sendData", Data: data} - } + st.Broadcast <- data } - forwardingChannel <- Packet{StreamName: name, PacketType: "close", Data: nil} - webrtcChannel <- Packet{StreamName: name, PacketType: "close", Data: nil} + // Close stream + st.Close() + socket.Close() + delete(streams, name) } -func handleViewer(s *srtgo.SrtSocket, name string, dataChannel chan Packet, dataChannels map[string][]chan Packet) { - // FIXME Should not pass all dataChannels to one viewer - +func handleViewer(s *srtgo.SrtSocket, streams map[string]stream.Stream, name string) { log.Printf("New SRT viewer for stream %s", name) - // Receive packets from channel and send them + // Get requested stream + st, ok := streams[name] + if !ok { + log.Println("Stream does not exist, refusing new viewer") + return + } + + // Register new output + c := make(chan []byte, 128) + st.Register(c) + + // Receive data and send them for { - packet := <-dataChannel - if packet.PacketType == "sendData" { - _, err := s.Write(packet.Data, 10000) - if err != nil { - s.Close() - for i, channel := range dataChannels[name] { - if channel == dataChannel { - dataChannels[name] = append(dataChannels[name][:i], dataChannels[name][i+1:]...) - } - } - return - } + data := <-c + if len(data) < 1 { + log.Print("Remove SRT viewer because of end of stream") + break + } + + _, err := s.Write(data, 1000) + if err != nil { + log.Printf("Remove SRT viewer because of sending error, %s", err) + break } } + + // Close output + st.Unregister(c) + s.Close() } diff --git a/stream/srt/srt.go b/stream/srt/srt.go index 916441a..dad5f4e 100644 --- a/stream/srt/srt.go +++ b/stream/srt/srt.go @@ -12,10 +12,7 @@ import ( "github.com/haivision/srtgo" "gitlab.crans.org/nounous/ghostream/auth" -) - -var ( - clientDataChannels map[string][]chan Packet + "gitlab.crans.org/nounous/ghostream/stream" ) // Options holds web package configuration @@ -25,13 +22,6 @@ type Options struct { MaxClients int } -// Packet contains the necessary data to broadcast events like stream creating, packet receiving or stream closing. -type Packet struct { - Data []byte - PacketType string - StreamName string -} - // Split host and port from listen address func splitHostPort(hostport string) (string, uint16, error) { host, portS, err := net.SplitHostPort(hostport) @@ -48,13 +38,8 @@ func splitHostPort(hostport string) (string, uint16, error) { return host, uint16(port64), nil } -// GetNumberConnectedSessions get the number of currently connected clients -func GetNumberConnectedSessions(streamID string) int { - return len(clientDataChannels[streamID]) -} - // Serve SRT server -func Serve(cfg *Options, authBackend auth.Backend, forwardingChannel, webrtcChannel chan Packet) { +func Serve(streams map[string]stream.Stream, authBackend auth.Backend, cfg *Options) { if !cfg.Enabled { // SRT is not enabled, ignore return @@ -75,8 +60,6 @@ func Serve(cfg *Options, authBackend auth.Backend, forwardingChannel, webrtcChan log.Fatal("Unable to listen for SRT clients:", err) } - clientDataChannels = make(map[string][]chan Packet) - for { // Wait for new connection s, err := sck.Accept() @@ -94,10 +77,6 @@ func Serve(cfg *Options, authBackend auth.Backend, forwardingChannel, webrtcChan } split := strings.Split(streamID, ":") - if clientDataChannels[streamID] == nil { - clientDataChannels[streamID] = make([]chan Packet, 0, cfg.MaxClients) - } - if len(split) > 1 { // password was provided so it is a streamer name, password := split[0], split[1] @@ -110,15 +89,13 @@ func Serve(cfg *Options, authBackend auth.Backend, forwardingChannel, webrtcChan } } - go handleStreamer(s, name, clientDataChannels, forwardingChannel, webrtcChannel) + go handleStreamer(s, streams, name) } else { // password was not provided so it is a viewer name := split[0] - dataChannel := make(chan Packet, 4096) - clientDataChannels[streamID] = append(clientDataChannels[streamID], dataChannel) - - go handleViewer(s, name, dataChannel, clientDataChannels) + // Send stream + go handleViewer(s, streams, name) } } } diff --git a/stream/srt/srt_test.go b/stream/srt/srt_test.go index 37f44f5..015537d 100644 --- a/stream/srt/srt_test.go +++ b/stream/srt/srt_test.go @@ -5,6 +5,8 @@ import ( "os/exec" "testing" "time" + + "gitlab.crans.org/nounous/ghostream/stream" ) // TestSplitHostPort Try to split a host like 127.0.0.1:1234 in host, port (127.0.0.1, 1234à @@ -55,7 +57,9 @@ func TestServeSRT(t *testing.T) { t.Skip("WARNING: FFMPEG is not installed. Skipping stream test") } - go Serve(&Options{Enabled: true, ListenAddress: ":9711", MaxClients: 2}, nil, nil, nil) + // Init streams messaging and SRT server + streams := make(map[string]stream.Stream) + go Serve(streams, nil, &Options{Enabled: true, ListenAddress: ":9711", MaxClients: 2}) ffmpeg := exec.Command("ffmpeg", "-hide_banner", "-loglevel", "error", "-f", "lavfi", "-i", "testsrc=size=640x480:rate=10", @@ -78,6 +82,4 @@ func TestServeSRT(t *testing.T) { }() time.Sleep(5 * time.Second) // Delay is in nanoseconds, here 5s - - // TODO Kill SRT server } From b8ee60ce9f57a85f1df38fb61b4912d9d126df4a Mon Sep 17 00:00:00 2001 From: Alexandre Iooss Date: Sat, 17 Oct 2020 12:38:18 +0200 Subject: [PATCH 04/15] Use messaging in web package --- web/handler.go | 23 +++++++++++++---------- web/web.go | 7 ++++++- web/web_test.go | 12 +++++++++--- 3 files changed, 28 insertions(+), 14 deletions(-) diff --git a/web/handler.go b/web/handler.go index d23f816..28844b0 100644 --- a/web/handler.go +++ b/web/handler.go @@ -13,14 +13,12 @@ import ( "github.com/markbates/pkger" "gitlab.crans.org/nounous/ghostream/internal/monitoring" - "gitlab.crans.org/nounous/ghostream/stream/srt" - "gitlab.crans.org/nounous/ghostream/stream/telnet" "gitlab.crans.org/nounous/ghostream/stream/webrtc" ) var ( // Precompile regex - validPath = regexp.MustCompile("^/[a-z0-9_-]*$") + validPath = regexp.MustCompile("^/[a-z0-9@_-]*$") ) // Handle WebRTC session description exchange via POST @@ -152,14 +150,19 @@ func staticHandler() http.Handler { } func statisticsHandler(w http.ResponseWriter, r *http.Request) { - // Display connected users stats, from WebRTC or streaming directly from a video player - streamID := strings.Replace(r.URL.Path[7:], "/", "", -1) + name := strings.Replace(r.URL.Path[7:], "/", "", -1) + userCount := 0 + + // Get requested stream + stream, ok := streams[name] + if ok { + // Get number of output channels + userCount = stream.Count() + } + + // Display connected users statistics enc := json.NewEncoder(w) - err := enc.Encode(struct { - ConnectedViewers int - }{webrtc.GetNumberConnectedSessions(streamID) + - srt.GetNumberConnectedSessions(streamID) + - telnet.GetNumberConnectedSessions(streamID)}) + err := enc.Encode(struct{ ConnectedViewers int }{userCount}) if err != nil { http.Error(w, "Failed to generate JSON.", http.StatusInternalServerError) log.Printf("Failed to generate JSON: %s", err) diff --git a/web/web.go b/web/web.go index 929053b..a6c6d2f 100644 --- a/web/web.go +++ b/web/web.go @@ -11,6 +11,7 @@ import ( "github.com/markbates/pkger" "github.com/pion/webrtc/v3" + "gitlab.crans.org/nounous/ghostream/stream" ) // Options holds web package configuration @@ -41,6 +42,9 @@ var ( // Preload templates templates *template.Template + + // Streams to get statistics + streams map[string]stream.Stream ) // Load templates with pkger @@ -74,10 +78,11 @@ func loadTemplates() error { } // Serve HTTP server -func Serve(rSdpChan chan struct { +func Serve(s map[string]stream.Stream, rSdpChan chan struct { StreamID string RemoteDescription webrtc.SessionDescription }, lSdpChan chan webrtc.SessionDescription, c *Options) { + streams = s remoteSdpChan = rSdpChan localSdpChan = lSdpChan cfg = c diff --git a/web/web_test.go b/web/web_test.go index 5d302f6..485b932 100644 --- a/web/web_test.go +++ b/web/web_test.go @@ -4,11 +4,17 @@ import ( "net/http" "testing" "time" + + "gitlab.crans.org/nounous/ghostream/stream" ) // TestHTTPServe tries to serve a real HTTP server and load some pages func TestHTTPServe(t *testing.T) { - go Serve(nil, nil, &Options{Enabled: false, ListenAddress: "127.0.0.1:8081"}) + // Init streams messaging + streams := make(map[string]stream.Stream) + + // Create a disabled web server + go Serve(streams, nil, nil, &Options{Enabled: false, ListenAddress: "127.0.0.1:8081"}) // Sleep 500ms to ensure that the web server is running, to avoid fails because the request came too early time.Sleep(500 * time.Millisecond) @@ -20,7 +26,7 @@ func TestHTTPServe(t *testing.T) { } // Now let's really start the web server - go Serve(nil, nil, &Options{Enabled: true, ListenAddress: "127.0.0.1:8081"}) + go Serve(streams, nil, nil, &Options{Enabled: true, ListenAddress: "127.0.0.1:8081"}) // Sleep 500ms to ensure that the web server is running, to avoid fails because the request came too early time.Sleep(500 * time.Millisecond) @@ -52,7 +58,7 @@ func TestHTTPServe(t *testing.T) { t.Errorf("Viewer page returned %v != %v on GET", resp.StatusCode, http.StatusOK) } - // Test viewer statistic endpoint + // Test viewer statistics endpoint resp, err = http.Get("http://localhost:8081/_stats/demo/") if err != nil { t.Errorf("Error while getting /_stats: %s", err) From 5b85eed646ca118aa3d61437f2a9904e1a9eeac3 Mon Sep 17 00:00:00 2001 From: Alexandre Iooss Date: Sat, 17 Oct 2020 13:03:49 +0200 Subject: [PATCH 05/15] Add Count method to stream --- stream/messaging.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/stream/messaging.go b/stream/messaging.go index 05d55ad..a53c427 100644 --- a/stream/messaging.go +++ b/stream/messaging.go @@ -75,3 +75,8 @@ func (s *Stream) Unregister(output chan []byte) { close(output) } } + +// Count number of outputs +func (s *Stream) Count() int { + return len(s.outputs) +} From 5b8c73057bc1bf476e1e37c8376a1a2f235f4cd0 Mon Sep 17 00:00:00 2001 From: Alexandre Iooss Date: Sat, 17 Oct 2020 13:43:16 +0200 Subject: [PATCH 06/15] Use reference to Stream --- stream/messaging.go | 24 ++++++++++++++++++------ stream/srt/handler.go | 14 +++++++------- stream/srt/srt.go | 2 +- stream/srt/srt_test.go | 2 +- web/web.go | 4 ++-- web/web_test.go | 2 +- 6 files changed, 30 insertions(+), 18 deletions(-) diff --git a/stream/messaging.go b/stream/messaging.go index a53c427..ea1501c 100644 --- a/stream/messaging.go +++ b/stream/messaging.go @@ -11,6 +11,9 @@ type Stream struct { // Use a map to be able to delete an item outputs map[chan []byte]struct{} + // Count clients for statistics + nbClients int + // Mutex to lock this ressource lock sync.Mutex } @@ -21,6 +24,7 @@ func New() *Stream { broadcast := make(chan []byte, 64) s.Broadcast = broadcast s.outputs = make(map[chan []byte]struct{}) + s.nbClients = 0 go s.run(broadcast) return s } @@ -56,15 +60,20 @@ func (s *Stream) Close() { close(s.Broadcast) } -// Register a new output on a stream -func (s *Stream) Register(output chan []byte) { +// Register a new output on a stream. +// If hidden in true, then do not count this client. +func (s *Stream) Register(output chan []byte, hidden bool) { s.lock.Lock() defer s.lock.Unlock() s.outputs[output] = struct{}{} + if !hidden { + s.nbClients++ + } } -// Unregister removes an output -func (s *Stream) Unregister(output chan []byte) { +// Unregister removes an output. +// If hidden in true, then do not count this client. +func (s *Stream) Unregister(output chan []byte, hidden bool) { s.lock.Lock() defer s.lock.Unlock() @@ -73,10 +82,13 @@ func (s *Stream) Unregister(output chan []byte) { if ok { delete(s.outputs, output) close(output) + if !hidden { + s.nbClients-- + } } } -// Count number of outputs +// Count number of clients func (s *Stream) Count() int { - return len(s.outputs) + return s.nbClients } diff --git a/stream/srt/handler.go b/stream/srt/handler.go index da6528e..8ea3d98 100644 --- a/stream/srt/handler.go +++ b/stream/srt/handler.go @@ -8,7 +8,7 @@ import ( "gitlab.crans.org/nounous/ghostream/stream" ) -func handleStreamer(socket *srtgo.SrtSocket, streams map[string]stream.Stream, name string) { +func handleStreamer(socket *srtgo.SrtSocket, streams map[string]*stream.Stream, name string) { // Check stream does not exist if _, ok := streams[name]; ok { log.Print("Stream already exists, refusing new streamer") @@ -18,7 +18,7 @@ func handleStreamer(socket *srtgo.SrtSocket, streams map[string]stream.Stream, n // Create stream log.Printf("New SRT streamer for stream %s", name) - st := *stream.New() + st := stream.New() streams[name] = st // Create a new buffer @@ -54,7 +54,7 @@ func handleStreamer(socket *srtgo.SrtSocket, streams map[string]stream.Stream, n delete(streams, name) } -func handleViewer(s *srtgo.SrtSocket, streams map[string]stream.Stream, name string) { +func handleViewer(s *srtgo.SrtSocket, streams map[string]*stream.Stream, name string) { log.Printf("New SRT viewer for stream %s", name) // Get requested stream @@ -66,16 +66,16 @@ func handleViewer(s *srtgo.SrtSocket, streams map[string]stream.Stream, name str // Register new output c := make(chan []byte, 128) - st.Register(c) + st.Register(c, false) // Receive data and send them - for { - data := <-c + for data := range c { if len(data) < 1 { log.Print("Remove SRT viewer because of end of stream") break } + // Send data _, err := s.Write(data, 1000) if err != nil { log.Printf("Remove SRT viewer because of sending error, %s", err) @@ -84,6 +84,6 @@ func handleViewer(s *srtgo.SrtSocket, streams map[string]stream.Stream, name str } // Close output - st.Unregister(c) + st.Unregister(c, false) s.Close() } diff --git a/stream/srt/srt.go b/stream/srt/srt.go index dad5f4e..6efe7a2 100644 --- a/stream/srt/srt.go +++ b/stream/srt/srt.go @@ -39,7 +39,7 @@ func splitHostPort(hostport string) (string, uint16, error) { } // Serve SRT server -func Serve(streams map[string]stream.Stream, authBackend auth.Backend, cfg *Options) { +func Serve(streams map[string]*stream.Stream, authBackend auth.Backend, cfg *Options) { if !cfg.Enabled { // SRT is not enabled, ignore return diff --git a/stream/srt/srt_test.go b/stream/srt/srt_test.go index 015537d..2ac869c 100644 --- a/stream/srt/srt_test.go +++ b/stream/srt/srt_test.go @@ -58,7 +58,7 @@ func TestServeSRT(t *testing.T) { } // Init streams messaging and SRT server - streams := make(map[string]stream.Stream) + streams := make(map[string]*stream.Stream) go Serve(streams, nil, &Options{Enabled: true, ListenAddress: ":9711", MaxClients: 2}) ffmpeg := exec.Command("ffmpeg", "-hide_banner", "-loglevel", "error", diff --git a/web/web.go b/web/web.go index a6c6d2f..1f10a66 100644 --- a/web/web.go +++ b/web/web.go @@ -44,7 +44,7 @@ var ( templates *template.Template // Streams to get statistics - streams map[string]stream.Stream + streams map[string]*stream.Stream ) // Load templates with pkger @@ -78,7 +78,7 @@ func loadTemplates() error { } // Serve HTTP server -func Serve(s map[string]stream.Stream, rSdpChan chan struct { +func Serve(s map[string]*stream.Stream, rSdpChan chan struct { StreamID string RemoteDescription webrtc.SessionDescription }, lSdpChan chan webrtc.SessionDescription, c *Options) { diff --git a/web/web_test.go b/web/web_test.go index 485b932..54378f3 100644 --- a/web/web_test.go +++ b/web/web_test.go @@ -11,7 +11,7 @@ import ( // TestHTTPServe tries to serve a real HTTP server and load some pages func TestHTTPServe(t *testing.T) { // Init streams messaging - streams := make(map[string]stream.Stream) + streams := make(map[string]*stream.Stream) // Create a disabled web server go Serve(streams, nil, nil, &Options{Enabled: false, ListenAddress: "127.0.0.1:8081"}) From 70798ce1dfbab60ab645cf6f848c7e47c24724ec Mon Sep 17 00:00:00 2001 From: Alexandre Iooss Date: Sat, 17 Oct 2020 13:45:52 +0200 Subject: [PATCH 07/15] Test client count --- stream/messaging_test.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/stream/messaging_test.go b/stream/messaging_test.go index b2845e2..f7141b2 100644 --- a/stream/messaging_test.go +++ b/stream/messaging_test.go @@ -16,7 +16,7 @@ func TestWithOneOutput(t *testing.T) { // Register one output output := make(chan []byte, 64) - stream.Register(output) + stream.Register(output, false) // Try to pass one message stream.Broadcast <- []byte("hello world") @@ -25,6 +25,16 @@ func TestWithOneOutput(t *testing.T) { t.Errorf("Message has wrong content: %s != hello world", msg) } + // Check client count + if count := stream.Count(); count != 1 { + t.Errorf("Client counter returned %d, expected 1", count) + } + // Unregister - stream.Unregister(output) + stream.Unregister(output, false) + + // Check client count + if count := stream.Count(); count != 0 { + t.Errorf("Client counter returned %d, expected 0", count) + } } From f0990a630d8bbfa4f65cd151b92d3bb848ea1d5b Mon Sep 17 00:00:00 2001 From: Alexandre Iooss Date: Sat, 17 Oct 2020 16:17:19 +0200 Subject: [PATCH 08/15] Make client count independant of outputs --- stream/messaging.go | 24 ++++++++++++++---------- stream/messaging_test.go | 10 ++++++---- stream/srt/handler.go | 6 ++++-- 3 files changed, 24 insertions(+), 16 deletions(-) diff --git a/stream/messaging.go b/stream/messaging.go index ea1501c..4876d63 100644 --- a/stream/messaging.go +++ b/stream/messaging.go @@ -62,18 +62,15 @@ func (s *Stream) Close() { // Register a new output on a stream. // If hidden in true, then do not count this client. -func (s *Stream) Register(output chan []byte, hidden bool) { +func (s *Stream) Register(output chan []byte) { s.lock.Lock() defer s.lock.Unlock() s.outputs[output] = struct{}{} - if !hidden { - s.nbClients++ - } } // Unregister removes an output. // If hidden in true, then do not count this client. -func (s *Stream) Unregister(output chan []byte, hidden bool) { +func (s *Stream) Unregister(output chan []byte) { s.lock.Lock() defer s.lock.Unlock() @@ -82,13 +79,20 @@ func (s *Stream) Unregister(output chan []byte, hidden bool) { if ok { delete(s.outputs, output) close(output) - if !hidden { - s.nbClients-- - } } } -// Count number of clients -func (s *Stream) Count() int { +// ClientCount returns the number of clients +func (s *Stream) ClientCount() int { return s.nbClients } + +// IncrementClientCount increments the number of clients +func (s *Stream) IncrementClientCount() { + s.nbClients++ +} + +// DecrementClientCount decrements the number of clients +func (s *Stream) DecrementClientCount() { + s.nbClients-- +} diff --git a/stream/messaging_test.go b/stream/messaging_test.go index f7141b2..49e17a0 100644 --- a/stream/messaging_test.go +++ b/stream/messaging_test.go @@ -16,7 +16,8 @@ func TestWithOneOutput(t *testing.T) { // Register one output output := make(chan []byte, 64) - stream.Register(output, false) + stream.Register(output) + stream.IncrementClientCount() // Try to pass one message stream.Broadcast <- []byte("hello world") @@ -26,15 +27,16 @@ func TestWithOneOutput(t *testing.T) { } // Check client count - if count := stream.Count(); count != 1 { + if count := stream.ClientCount(); count != 1 { t.Errorf("Client counter returned %d, expected 1", count) } // Unregister - stream.Unregister(output, false) + stream.Unregister(output) + stream.DecrementClientCount() // Check client count - if count := stream.Count(); count != 0 { + if count := stream.ClientCount(); count != 0 { t.Errorf("Client counter returned %d, expected 0", count) } } diff --git a/stream/srt/handler.go b/stream/srt/handler.go index 8ea3d98..9d521dc 100644 --- a/stream/srt/handler.go +++ b/stream/srt/handler.go @@ -66,7 +66,8 @@ func handleViewer(s *srtgo.SrtSocket, streams map[string]*stream.Stream, name st // Register new output c := make(chan []byte, 128) - st.Register(c, false) + st.Register(c) + st.IncrementClientCount() // Receive data and send them for data := range c { @@ -84,6 +85,7 @@ func handleViewer(s *srtgo.SrtSocket, streams map[string]*stream.Stream, name st } // Close output - st.Unregister(c, false) + st.Unregister(c) + st.DecrementClientCount() s.Close() } From 88dfc22d8190835b8a2c851795653ea9de395e63 Mon Sep 17 00:00:00 2001 From: Alexandre Iooss Date: Sat, 17 Oct 2020 18:12:25 +0200 Subject: [PATCH 09/15] Restructure telnet package --- stream/telnet/convert.go | 103 ++++++++++++++++++++ stream/telnet/handler.go | 82 ++++++++++++++++ stream/telnet/telnet.go | 183 +++++++---------------------------- stream/telnet/telnet_test.go | 33 +++---- web/handler.go | 2 +- 5 files changed, 238 insertions(+), 165 deletions(-) create mode 100644 stream/telnet/convert.go create mode 100644 stream/telnet/handler.go diff --git a/stream/telnet/convert.go b/stream/telnet/convert.go new file mode 100644 index 0000000..0e0683f --- /dev/null +++ b/stream/telnet/convert.go @@ -0,0 +1,103 @@ +package telnet + +import ( + "bufio" + "bytes" + "fmt" + "io" + "log" + "os/exec" + + "gitlab.crans.org/nounous/ghostream/stream" +) + +// Convert rawvideo to ANSI text +func streamToTextStream(stream *stream.Stream, text *[]byte, cfg *Options) { + // Start ffmpeg + video := make(chan []byte) + stream.Register(video) + _, rawvideo, err := startFFmpeg(video, cfg) + if err != nil { + log.Printf("Error while starting ffmpeg: %s", err) + } + + pixelBuff := make([]byte, cfg.Width*cfg.Height) + textBuff := bytes.Buffer{} + for { + n, err := (*rawvideo).Read(pixelBuff) + if err != nil { + log.Printf("An error occurred while reading input: %s", err) + break + } + if n == 0 { + // Stream is finished + break + } + + // Header + textBuff.Reset() + textBuff.Grow((40*cfg.Width+6)*cfg.Height + 47) + for i := 0; i < 42; i++ { + textBuff.WriteByte('\n') + } + + // Convert image to ASCII + for i, pixel := range pixelBuff { + if i%cfg.Width == 0 { + // New line + textBuff.WriteString("\033[49m\n") + } + + // Print two times the character to make a square + text := fmt.Sprintf("\033[48;2;%d;%d;%dm ", pixel, pixel, pixel) + textBuff.WriteString(text) + textBuff.WriteString(text) + } + textBuff.WriteString("\033[49m") + + *text = textBuff.Bytes() + } +} + +// Start a ffmpeg instance to convert stream into rawvideo +func startFFmpeg(in <-chan []byte, cfg *Options) (*exec.Cmd, *io.ReadCloser, error) { + bitrate := fmt.Sprintf("%dk", cfg.Width*cfg.Height/cfg.Delay) + ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0", + "-an", "-vf", fmt.Sprintf("scale=%dx%d", cfg.Width, cfg.Height), + "-b:v", bitrate, "-minrate", bitrate, "-maxrate", bitrate, "-bufsize", bitrate, + "-q", "42", "-pix_fmt", "gray", "-f", "rawvideo", "pipe:1"} + ffmpeg := exec.Command("ffmpeg", ffmpegArgs...) + + // Handle errors output + errOutput, err := ffmpeg.StderrPipe() + if err != nil { + return nil, nil, err + } + go func() { + scanner := bufio.NewScanner(errOutput) + for scanner.Scan() { + log.Printf("[TELNET FFMPEG %s] %s", "demo", scanner.Text()) + } + }() + + // Handle text output + output, err := ffmpeg.StdoutPipe() + if err != nil { + return nil, nil, err + } + + // Handle stream input + input, err := ffmpeg.StdinPipe() + if err != nil { + return nil, nil, err + } + go func() { + for data := range in { + input.Write(data) + } + }() + + // Start process + err = ffmpeg.Start() + return ffmpeg, &output, nil +} diff --git a/stream/telnet/handler.go b/stream/telnet/handler.go new file mode 100644 index 0000000..e5edd32 --- /dev/null +++ b/stream/telnet/handler.go @@ -0,0 +1,82 @@ +package telnet + +import ( + "log" + "net" + "strings" + "time" + + "gitlab.crans.org/nounous/ghostream/stream" +) + +func handleViewer(s net.Conn, streams map[string]*stream.Stream, textStreams map[string]*[]byte, cfg *Options) { + // Prompt user about stream name + if _, err := s.Write([]byte("[GHOSTREAM]\nEnter stream name: ")); err != nil { + log.Printf("Error while writing to TCP socket: %s", err) + s.Close() + return + } + buff := make([]byte, 255) + n, err := s.Read(buff) + if err != nil { + log.Printf("Error while requesting stream ID to telnet client: %s", err) + s.Close() + return + } + name := strings.TrimSpace(string(buff[:n])) + if len(name) < 1 { + // Too short, exit + s.Close() + return + } + + // Wait a bit + time.Sleep(time.Second) + + // Get requested stream + st, ok := streams[name] + if !ok { + log.Println("Stream does not exist, kicking new Telnet viewer") + if _, err := s.Write([]byte("This stream is inactive.\n")); err != nil { + log.Printf("Error while writing to TCP socket: %s", err) + } + s.Close() + return + } + + // Register new client + log.Printf("New Telnet viewer for stream %s", name) + st.IncrementClientCount() + + // Hide terminal cursor + if _, err = s.Write([]byte("\033[?25l")); err != nil { + log.Printf("Error while writing to TCP socket: %s", err) + s.Close() + return + } + + // Send stream + for { + text, ok := textStreams[name] + if !ok { + log.Println("Stream is not converted to text, kicking Telnet viewer") + if _, err := s.Write([]byte("This stream cannot be opened.\n")); err != nil { + log.Printf("Error while writing to TCP socket: %s", err) + } + break + } + + // Send text to client + n, err := s.Write(*text) + if err != nil || n == 0 { + log.Printf("Error while sending TCP data: %s", err) + break + } + + time.Sleep(time.Duration(cfg.Delay) * time.Millisecond) + } + + // Close connection + s.Close() + st.DecrementClientCount() +} diff --git a/stream/telnet/telnet.go b/stream/telnet/telnet.go index a9a5693..bb51482 100644 --- a/stream/telnet/telnet.go +++ b/stream/telnet/telnet.go @@ -2,20 +2,11 @@ package telnet import ( - "fmt" - "io" "log" "net" - "strings" "time" -) -var ( - // Cfg contains the different options of the telnet package, see below - // TODO Config should not be exported - Cfg *Options - currentMessage map[string]*string - clientCount map[string]int + "gitlab.crans.org/nounous/ghostream/stream" ) // Options holds telnet package configuration @@ -27,152 +18,52 @@ type Options struct { Delay int } -// Serve starts the telnet server and listen to clients -func Serve(config *Options) { - Cfg = config - - if !config.Enabled { +// Serve Telnet server +func Serve(streams map[string]*stream.Stream, cfg *Options) { + if !cfg.Enabled { + // Telnet is not enabled, ignore return } - currentMessage = make(map[string]*string) - clientCount = make(map[string]int) + // Start conversion routine + textStreams := make(map[string]*[]byte) + go autoStartConversion(streams, textStreams, cfg) - listener, err := net.Listen("tcp", config.ListenAddress) + // Start TCP server + listener, err := net.Listen("tcp", cfg.ListenAddress) if err != nil { - log.Printf("Error while listening to the address %s: %s", config.ListenAddress, err) - return + log.Fatalf("Error while listening to the address %s: %s", cfg.ListenAddress, err) } + log.Printf("Telnet server listening on %s", cfg.ListenAddress) - go func() { - for { - s, err := listener.Accept() - if err != nil { - log.Printf("Error while accepting TCP socket: %s", s) + // Handle each new client + for { + s, err := listener.Accept() + if err != nil { + log.Printf("Error while accepting TCP socket: %s", s) + continue + } + + go handleViewer(s, streams, textStreams, cfg) + } +} + +// Convertion routine listen to existing stream and start text conversions +func autoStartConversion(streams map[string]*stream.Stream, textStreams map[string]*[]byte, cfg *Options) { + for { + for name, stream := range streams { + textStream, ok := textStreams[name] + if ok { + // Everything is fine continue } - go func(s net.Conn) { - streamID := "" - // Request for stream ID - for { - _, err = s.Write([]byte("[GHOSTREAM]\nEnter stream ID: ")) - if err != nil { - log.Println("Error while requesting stream ID to telnet client") - _ = s.Close() - return - } - buff := make([]byte, 255) - n, err := s.Read(buff) - if err != nil { - log.Println("Error while requesting stream ID to telnet client") - _ = s.Close() - return - } - - // Avoid bruteforce - time.Sleep(3 * time.Second) - - streamID = string(buff[:n]) - streamID = strings.Replace(streamID, "\r", "", -1) - streamID = strings.Replace(streamID, "\n", "", -1) - - if len(streamID) > 0 { - if strings.ToLower(streamID) == "exit" { - _, _ = s.Write([]byte("Goodbye!\n")) - _ = s.Close() - return - } - if _, ok := currentMessage[streamID]; !ok { - _, err = s.Write([]byte("Unknown stream ID.\n")) - if err != nil { - log.Println("Error while requesting stream ID to telnet client") - _ = s.Close() - return - } - continue - } - break - } - } - - clientCount[streamID]++ - - // Hide terminal cursor - _, _ = s.Write([]byte("\033[?25l")) - - for { - n, err := s.Write([]byte(*currentMessage[streamID])) - if err != nil { - log.Printf("Error while sending TCP data: %s", err) - _ = s.Close() - clientCount[streamID]-- - break - } - if n == 0 { - _ = s.Close() - clientCount[streamID]-- - break - } - time.Sleep(time.Duration(config.Delay) * time.Millisecond) - } - }(s) + // Start conversion + log.Print("Starting text conversion of %s", name) + textStream = &[]byte{} + textStreams[name] = textStream + go streamToTextStream(stream, textStream, cfg) } - }() - - log.Println("Telnet server initialized") -} - -// GetNumberConnectedSessions returns the numbers of clients that are viewing the stream through a telnet shell -func GetNumberConnectedSessions(streamID string) int { - if Cfg == nil || !Cfg.Enabled { - return 0 - } - return clientCount[streamID] -} - -// StartASCIIArtStream send all packets received by ffmpeg as ASCII Art to telnet clients -func StartASCIIArtStream(streamID string, reader io.ReadCloser) { - if !Cfg.Enabled { - _ = reader.Close() - return - } - - currentMessage[streamID] = new(string) - pixelBuff := make([]byte, Cfg.Width*Cfg.Height) - textBuff := strings.Builder{} - for { - n, err := reader.Read(pixelBuff) - if err != nil { - log.Printf("An error occurred while reading input: %s", err) - break - } - if n == 0 { - // Stream is finished - break - } - - // Header - textBuff.Reset() - textBuff.Grow((40*Cfg.Width+6)*Cfg.Height + 47) - for i := 0; i < 42; i++ { - textBuff.WriteByte('\n') - } - - // Convert image to ASCII - for i, pixel := range pixelBuff { - if i%Cfg.Width == 0 { - // New line - textBuff.WriteString("\033[49m\n") - } - - // Print two times the character to make a square - text := fmt.Sprintf("\033[48;2;%d;%d;%dm ", pixel, pixel, pixel) - textBuff.WriteString(text) - textBuff.WriteString(text) - } - textBuff.WriteString("\033[49m") - - *(currentMessage[streamID]) = textBuff.String() + time.Sleep(time.Second) } } diff --git a/stream/telnet/telnet_test.go b/stream/telnet/telnet_test.go index 5174157..7f87542 100644 --- a/stream/telnet/telnet_test.go +++ b/stream/telnet/telnet_test.go @@ -1,41 +1,38 @@ package telnet import ( - "bytes" - "io/ioutil" - "math/rand" - "net" "testing" - "time" + + "gitlab.crans.org/nounous/ghostream/stream" ) // TestTelnetOutput creates a TCP client that connects to the server and get one image. func TestTelnetOutput(t *testing.T) { // Try to start Telnet server while it is disabled - Serve(&Options{Enabled: false}) - StartASCIIArtStream("demo", ioutil.NopCloser(bytes.NewReader([]byte{}))) - if GetNumberConnectedSessions("demo") != 0 { - t.Fatalf("Mysteriously found %d connected clients", GetNumberConnectedSessions("demo")) - } + streams := make(map[string]*stream.Stream) + go Serve(streams, &Options{Enabled: false}) + + // FIXME test connect // Enable and start Telnet server - Serve(&Options{ + cfg := Options{ Enabled: true, ListenAddress: "127.0.0.1:8023", Width: 80, Height: 45, Delay: 50, - }) + } + go Serve(streams, &cfg) + + // FIXME test connect // Generate a random image, that should be given by FFMPEG - sampleImage := make([]byte, Cfg.Width*Cfg.Height) + /*sampleImage := make([]byte, cfg.Width*cfg.Height) rand.Read(sampleImage) reader := ioutil.NopCloser(bytes.NewBuffer(sampleImage)) - // Send the image to the server - StartASCIIArtStream("demo", reader) // Connect to the Telnet server - client, err := net.Dial("tcp", Cfg.ListenAddress) + client, err := net.Dial("tcp", cfg.ListenAddress) if err != nil { t.Fatalf("Error while connecting to the TCP server: %s", err) } @@ -46,7 +43,7 @@ func TestTelnetOutput(t *testing.T) { t.Fatalf("Error while closing TCP connection: %s", err) } - client, err = net.Dial("tcp", Cfg.ListenAddress) + client, err = net.Dial("tcp", cfg.ListenAddress) if err != nil { t.Fatalf("Error while connecting to the TCP server: %s", err) } @@ -110,5 +107,5 @@ func TestTelnetOutput(t *testing.T) { time.Sleep(time.Second) if GetNumberConnectedSessions("demo") != 0 { t.Fatalf("Expected no telnet client, found %d", GetNumberConnectedSessions("demo")) - } + }*/ } diff --git a/web/handler.go b/web/handler.go index 28844b0..886e6e4 100644 --- a/web/handler.go +++ b/web/handler.go @@ -157,7 +157,7 @@ func statisticsHandler(w http.ResponseWriter, r *http.Request) { stream, ok := streams[name] if ok { // Get number of output channels - userCount = stream.Count() + userCount = stream.ClientCount() } // Display connected users statistics From 1469bd8759db45814475cd6a1b8e707ff9cc4cd7 Mon Sep 17 00:00:00 2001 From: Alexandre Iooss Date: Sat, 17 Oct 2020 18:22:06 +0200 Subject: [PATCH 10/15] Define streams in main.go --- main.go | 16 ++++++++-------- stream/telnet/telnet.go | 3 ++- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/main.go b/main.go index e82d055..957b702 100644 --- a/main.go +++ b/main.go @@ -10,6 +10,7 @@ import ( "gitlab.crans.org/nounous/ghostream/auth" "gitlab.crans.org/nounous/ghostream/internal/config" "gitlab.crans.org/nounous/ghostream/internal/monitoring" + "gitlab.crans.org/nounous/ghostream/stream" "gitlab.crans.org/nounous/ghostream/stream/forwarding" "gitlab.crans.org/nounous/ghostream/stream/srt" "gitlab.crans.org/nounous/ghostream/stream/telnet" @@ -43,17 +44,16 @@ func main() { }) localSdpChan := make(chan webrtc.SessionDescription) - // SRT channel for forwarding and webrtc - forwardingChannel := make(chan srt.Packet, 64) - webrtcChannel := make(chan srt.Packet, 64) + // Init streams messaging + streams := make(map[string]*stream.Stream) // Start routines - go forwarding.Serve(forwardingChannel, cfg.Forwarding) + go forwarding.Serve(streams, cfg.Forwarding) go monitoring.Serve(&cfg.Monitoring) - go srt.Serve(&cfg.Srt, authBackend, forwardingChannel, webrtcChannel) - go telnet.Serve(&cfg.Telnet) - go web.Serve(remoteSdpChan, localSdpChan, &cfg.Web) - go webrtc.Serve(remoteSdpChan, localSdpChan, webrtcChannel, &cfg.WebRTC) + go srt.Serve(streams, authBackend, &cfg.Srt) + go telnet.Serve(streams, &cfg.Telnet) + go web.Serve(streams, remoteSdpChan, localSdpChan, &cfg.Web) + go webrtc.Serve(streams, remoteSdpChan, localSdpChan, &cfg.WebRTC) // Wait for routines select {} diff --git a/stream/telnet/telnet.go b/stream/telnet/telnet.go index bb51482..3a4ca54 100644 --- a/stream/telnet/telnet.go +++ b/stream/telnet/telnet.go @@ -59,7 +59,8 @@ func autoStartConversion(streams map[string]*stream.Stream, textStreams map[stri } // Start conversion - log.Print("Starting text conversion of %s", name) + log.Printf("Starting text conversion of %s", name) + // FIXME that is not how to use a pointer textStream = &[]byte{} textStreams[name] = textStream go streamToTextStream(stream, textStream, cfg) From 23b6eeaa05e5729a34e4bf2e65a5667aac69cbbe Mon Sep 17 00:00:00 2001 From: Alexandre Iooss Date: Sat, 17 Oct 2020 19:40:37 +0200 Subject: [PATCH 11/15] Add transcoder package with text transcoder --- internal/config/config.go | 13 +++- main.go | 4 +- stream/telnet/handler.go | 35 ++++----- stream/telnet/telnet.go | 31 +------- stream/telnet/telnet_test.go | 3 - .../convert.go => transcoder/text/text.go | 76 ++++++++++++++++--- transcoder/text/text_test.go | 1 + transcoder/transcoder.go | 16 ++++ 8 files changed, 112 insertions(+), 67 deletions(-) rename stream/telnet/convert.go => transcoder/text/text.go (57%) create mode 100644 transcoder/text/text_test.go create mode 100644 transcoder/transcoder.go diff --git a/internal/config/config.go b/internal/config/config.go index 09edec7..dbefc50 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -13,6 +13,8 @@ import ( "gitlab.crans.org/nounous/ghostream/stream/srt" "gitlab.crans.org/nounous/ghostream/stream/telnet" "gitlab.crans.org/nounous/ghostream/stream/webrtc" + "gitlab.crans.org/nounous/ghostream/transcoder" + "gitlab.crans.org/nounous/ghostream/transcoder/text" "gitlab.crans.org/nounous/ghostream/web" ) @@ -54,9 +56,14 @@ func New() *Config { Telnet: telnet.Options{ Enabled: false, ListenAddress: ":8023", - Width: 80, - Height: 45, - Delay: 50, + }, + Transcoder: transcoder.Options{ + Text: text.Options{ + Enabled: false, + Width: 80, + Height: 45, + Framerate: 20, + }, }, Web: web.Options{ Enabled: true, diff --git a/main.go b/main.go index 957b702..781b8dc 100644 --- a/main.go +++ b/main.go @@ -15,6 +15,7 @@ import ( "gitlab.crans.org/nounous/ghostream/stream/srt" "gitlab.crans.org/nounous/ghostream/stream/telnet" "gitlab.crans.org/nounous/ghostream/stream/webrtc" + "gitlab.crans.org/nounous/ghostream/transcoder" "gitlab.crans.org/nounous/ghostream/web" ) @@ -48,7 +49,8 @@ func main() { streams := make(map[string]*stream.Stream) // Start routines - go forwarding.Serve(streams, cfg.Forwarding) + go transcoder.Init(streams, &cfg.Transcoder) + go forwarding.Serve(streams, &cfg.Forwarding) go monitoring.Serve(&cfg.Monitoring) go srt.Serve(streams, authBackend, &cfg.Srt) go telnet.Serve(streams, &cfg.Telnet) diff --git a/stream/telnet/handler.go b/stream/telnet/handler.go index e5edd32..92ecc95 100644 --- a/stream/telnet/handler.go +++ b/stream/telnet/handler.go @@ -9,7 +9,7 @@ import ( "gitlab.crans.org/nounous/ghostream/stream" ) -func handleViewer(s net.Conn, streams map[string]*stream.Stream, textStreams map[string]*[]byte, cfg *Options) { +func handleViewer(s net.Conn, streams map[string]*stream.Stream, cfg *Options) { // Prompt user about stream name if _, err := s.Write([]byte("[GHOSTREAM]\nEnter stream name: ")); err != nil { log.Printf("Error while writing to TCP socket: %s", err) @@ -23,7 +23,7 @@ func handleViewer(s net.Conn, streams map[string]*stream.Stream, textStreams map s.Close() return } - name := strings.TrimSpace(string(buff[:n])) + name := strings.TrimSpace(string(buff[:n])) + "@text" if len(name) < 1 { // Too short, exit s.Close() @@ -45,7 +45,9 @@ func handleViewer(s net.Conn, streams map[string]*stream.Stream, textStreams map } // Register new client - log.Printf("New Telnet viewer for stream %s", name) + log.Printf("New Telnet viewer for stream '%s'", name) + c := make(chan []byte, 128) + st.Register(c) st.IncrementClientCount() // Hide terminal cursor @@ -55,28 +57,23 @@ func handleViewer(s net.Conn, streams map[string]*stream.Stream, textStreams map return } - // Send stream - for { - text, ok := textStreams[name] - if !ok { - log.Println("Stream is not converted to text, kicking Telnet viewer") - if _, err := s.Write([]byte("This stream cannot be opened.\n")); err != nil { - log.Printf("Error while writing to TCP socket: %s", err) - } + // Receive data and send them + for data := range c { + if len(data) < 1 { + log.Print("Remove Telnet viewer because of end of stream") break } - // Send text to client - n, err := s.Write(*text) - if err != nil || n == 0 { - log.Printf("Error while sending TCP data: %s", err) + // Send data + _, err := s.Write(data) + if err != nil { + log.Printf("Remove Telnet viewer because of sending error, %s", err) break } - - time.Sleep(time.Duration(cfg.Delay) * time.Millisecond) } - // Close connection - s.Close() + // Close output + st.Unregister(c) st.DecrementClientCount() + s.Close() } diff --git a/stream/telnet/telnet.go b/stream/telnet/telnet.go index 3a4ca54..5bf1636 100644 --- a/stream/telnet/telnet.go +++ b/stream/telnet/telnet.go @@ -4,7 +4,6 @@ package telnet import ( "log" "net" - "time" "gitlab.crans.org/nounous/ghostream/stream" ) @@ -13,9 +12,6 @@ import ( type Options struct { Enabled bool ListenAddress string - Width int - Height int - Delay int } // Serve Telnet server @@ -25,10 +21,6 @@ func Serve(streams map[string]*stream.Stream, cfg *Options) { return } - // Start conversion routine - textStreams := make(map[string]*[]byte) - go autoStartConversion(streams, textStreams, cfg) - // Start TCP server listener, err := net.Listen("tcp", cfg.ListenAddress) if err != nil { @@ -44,27 +36,6 @@ func Serve(streams map[string]*stream.Stream, cfg *Options) { continue } - go handleViewer(s, streams, textStreams, cfg) - } -} - -// Convertion routine listen to existing stream and start text conversions -func autoStartConversion(streams map[string]*stream.Stream, textStreams map[string]*[]byte, cfg *Options) { - for { - for name, stream := range streams { - textStream, ok := textStreams[name] - if ok { - // Everything is fine - continue - } - - // Start conversion - log.Printf("Starting text conversion of %s", name) - // FIXME that is not how to use a pointer - textStream = &[]byte{} - textStreams[name] = textStream - go streamToTextStream(stream, textStream, cfg) - } - time.Sleep(time.Second) + go handleViewer(s, streams, cfg) } } diff --git a/stream/telnet/telnet_test.go b/stream/telnet/telnet_test.go index 7f87542..76b3d3b 100644 --- a/stream/telnet/telnet_test.go +++ b/stream/telnet/telnet_test.go @@ -18,9 +18,6 @@ func TestTelnetOutput(t *testing.T) { cfg := Options{ Enabled: true, ListenAddress: "127.0.0.1:8023", - Width: 80, - Height: 45, - Delay: 50, } go Serve(streams, &cfg) diff --git a/stream/telnet/convert.go b/transcoder/text/text.go similarity index 57% rename from stream/telnet/convert.go rename to transcoder/text/text.go index 0e0683f..538d0e6 100644 --- a/stream/telnet/convert.go +++ b/transcoder/text/text.go @@ -1,4 +1,5 @@ -package telnet +// Package text transcode a video to text +package text import ( "bufio" @@ -7,20 +8,70 @@ import ( "io" "log" "os/exec" + "strings" + "time" "gitlab.crans.org/nounous/ghostream/stream" ) -// Convert rawvideo to ANSI text -func streamToTextStream(stream *stream.Stream, text *[]byte, cfg *Options) { - // Start ffmpeg - video := make(chan []byte) - stream.Register(video) - _, rawvideo, err := startFFmpeg(video, cfg) - if err != nil { - log.Printf("Error while starting ffmpeg: %s", err) +// Options holds text package configuration +type Options struct { + Enabled bool + Width int + Height int + Framerate int +} + +// Init text transcoder +func Init(streams map[string]*stream.Stream, cfg *Options) { + if !cfg.Enabled { + // Text transcode is not enabled, ignore + return } + // Regulary check existing streams + for { + for sourceName, sourceStream := range streams { + if strings.Contains(sourceName, "@") { + // Not a source stream, pass + continue + } + + // Check that the transcoded stream does not already exist + name := sourceName + "@text" + _, ok := streams[name] + if ok { + // Stream is already transcoded + continue + } + + // Start conversion + log.Printf("Starting text transcode '%s'", name) + st := stream.New() + streams[name] = st + + go transcode(sourceStream, st, cfg) + } + + // Regulary pull stream list, + // it may be better to tweak the messaging system + // to get an event on a new stream. + time.Sleep(time.Second) + } +} + +// Convert video to ANSI text +func transcode(input, output *stream.Stream, cfg *Options) { + // Start ffmpeg to transcode video to rawvideo + videoInput := make(chan []byte) + input.Register(videoInput) + ffmpeg, rawvideo, err := startFFmpeg(videoInput, cfg) + if err != nil { + log.Printf("Error while starting ffmpeg: %s", err) + return + } + + // Transcode rawvideo to ANSI text pixelBuff := make([]byte, cfg.Width*cfg.Height) textBuff := bytes.Buffer{} for { @@ -55,13 +106,16 @@ func streamToTextStream(stream *stream.Stream, text *[]byte, cfg *Options) { } textBuff.WriteString("\033[49m") - *text = textBuff.Bytes() + output.Broadcast <- textBuff.Bytes() } + + // Stop transcode + ffmpeg.Process.Kill() } // Start a ffmpeg instance to convert stream into rawvideo func startFFmpeg(in <-chan []byte, cfg *Options) (*exec.Cmd, *io.ReadCloser, error) { - bitrate := fmt.Sprintf("%dk", cfg.Width*cfg.Height/cfg.Delay) + bitrate := fmt.Sprintf("%dk", cfg.Width*cfg.Height*cfg.Framerate) ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0", "-an", "-vf", fmt.Sprintf("scale=%dx%d", cfg.Width, cfg.Height), "-b:v", bitrate, "-minrate", bitrate, "-maxrate", bitrate, "-bufsize", bitrate, diff --git a/transcoder/text/text_test.go b/transcoder/text/text_test.go new file mode 100644 index 0000000..4c96c5e --- /dev/null +++ b/transcoder/text/text_test.go @@ -0,0 +1 @@ +package text diff --git a/transcoder/transcoder.go b/transcoder/transcoder.go new file mode 100644 index 0000000..5fda7ef --- /dev/null +++ b/transcoder/transcoder.go @@ -0,0 +1,16 @@ +package transcoder + +import ( + "gitlab.crans.org/nounous/ghostream/stream" + "gitlab.crans.org/nounous/ghostream/transcoder/text" +) + +// Options holds text package configuration +type Options struct { + Text text.Options +} + +// Init all transcoders +func Init(streams map[string]*stream.Stream, cfg *Options) { + go text.Init(streams, &cfg.Text) +} From 09a3422d061642b8b6c7b399bc47247e7f50483f Mon Sep 17 00:00:00 2001 From: Alexandre Iooss Date: Sun, 18 Oct 2020 11:06:54 +0200 Subject: [PATCH 12/15] Configure and test transcoder --- docs/ghostream.example.yml | 26 +++++++++++++++++--------- internal/config/config.go | 1 + main.go | 2 +- stream/messaging.go | 6 ++++-- stream/srt/handler.go | 2 +- stream/srt/srt.go | 3 +++ transcoder/transcoder_test.go | 1 + 7 files changed, 28 insertions(+), 13 deletions(-) create mode 100644 transcoder/transcoder_test.go diff --git a/docs/ghostream.example.yml b/docs/ghostream.example.yml index aa7624e..0a0ce36 100644 --- a/docs/ghostream.example.yml +++ b/docs/ghostream.example.yml @@ -82,16 +82,24 @@ telnet: # #listenAddress: :8023 - # Size is in characters. It is recommended to keep a 16x9 format. - # - #width: 80 - #height: 45 +## Transcoders configuration ## +transcoder: + text: + # By default the text transcoder is disabled. + # You need to enable it to use telnet output. + # + #enabled: false - # Time in milliseconds between two images. - # By default 50 ms, so 20 FPS. - # Displaying text takes time. - # - #delay: 50 + # Size is in characters. It is recommended to keep a 16x9 format. + # + #width: 80 + #height: 45 + + # Time in milliseconds between two images. + # By default 50 ms, so 20 FPS. + # Displaying text takes time. + # + #delay: 50 ## Web server ## # The web server serves a WebRTC player. diff --git a/internal/config/config.go b/internal/config/config.go index dbefc50..2a94292 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -25,6 +25,7 @@ type Config struct { Monitoring monitoring.Options Srt srt.Options Telnet telnet.Options + Transcoder transcoder.Options Web web.Options WebRTC webrtc.Options } diff --git a/main.go b/main.go index 781b8dc..87d0c41 100644 --- a/main.go +++ b/main.go @@ -50,7 +50,7 @@ func main() { // Start routines go transcoder.Init(streams, &cfg.Transcoder) - go forwarding.Serve(streams, &cfg.Forwarding) + go forwarding.Serve(streams, cfg.Forwarding) go monitoring.Serve(&cfg.Monitoring) go srt.Serve(streams, authBackend, &cfg.Srt) go telnet.Serve(streams, &cfg.Telnet) diff --git a/stream/messaging.go b/stream/messaging.go index 4876d63..0798d84 100644 --- a/stream/messaging.go +++ b/stream/messaging.go @@ -1,7 +1,9 @@ // Package stream defines a structure to communication between inputs and outputs package stream -import "sync" +import ( + "sync" +) // Stream makes packages able to subscribe to an incoming stream type Stream struct { @@ -21,7 +23,7 @@ type Stream struct { // New creates a new stream. func New() *Stream { s := &Stream{} - broadcast := make(chan []byte, 64) + broadcast := make(chan []byte, 1024) s.Broadcast = broadcast s.outputs = make(map[chan []byte]struct{}) s.nbClients = 0 diff --git a/stream/srt/handler.go b/stream/srt/handler.go index 9d521dc..df0e324 100644 --- a/stream/srt/handler.go +++ b/stream/srt/handler.go @@ -65,7 +65,7 @@ func handleViewer(s *srtgo.SrtSocket, streams map[string]*stream.Stream, name st } // Register new output - c := make(chan []byte, 128) + c := make(chan []byte, 1024) st.Register(c) st.IncrementClientCount() diff --git a/stream/srt/srt.go b/stream/srt/srt.go index 6efe7a2..81e766c 100644 --- a/stream/srt/srt.go +++ b/stream/srt/srt.go @@ -69,6 +69,9 @@ func Serve(streams map[string]*stream.Stream, authBackend auth.Backend, cfg *Opt continue } + // FIXME: Flush socket + // Without this, the SRT buffer might get full before reading it + // streamid can be "name:password" for streamer or "name" for viewer streamID, err := s.GetSockOptString(C.SRTO_STREAMID) if err != nil { diff --git a/transcoder/transcoder_test.go b/transcoder/transcoder_test.go new file mode 100644 index 0000000..5493916 --- /dev/null +++ b/transcoder/transcoder_test.go @@ -0,0 +1 @@ +package transcoder From 37d944621bc551b87156412fe5640b0eb05931d9 Mon Sep 17 00:00:00 2001 From: Alexandre Iooss Date: Sun, 18 Oct 2020 15:13:13 +0200 Subject: [PATCH 13/15] Fix messaging hang by output --- stream/messaging.go | 25 ++++++++++++------------- stream/srt/handler.go | 15 ++++++--------- transcoder/text/text.go | 2 +- 3 files changed, 19 insertions(+), 23 deletions(-) diff --git a/stream/messaging.go b/stream/messaging.go index 0798d84..4c2c0e7 100644 --- a/stream/messaging.go +++ b/stream/messaging.go @@ -16,7 +16,7 @@ type Stream struct { // Count clients for statistics nbClients int - // Mutex to lock this ressource + // Mutex to lock outputs map lock sync.Mutex } @@ -33,28 +33,28 @@ func New() *Stream { func (s *Stream) run(broadcast <-chan []byte) { for msg := range broadcast { - func() { - s.lock.Lock() - defer s.lock.Unlock() - for output := range s.outputs { - select { - case output <- msg: - default: - // If full, do a ring buffer + s.lock.Lock() + for output := range s.outputs { + select { + case output <- msg: + default: + // If full, do a ring buffer + // Check that output is not of size zero + if len(output) > 1 { <-output - output <- msg } } - }() + } + s.lock.Unlock() } // Incoming chan has been closed, close all outputs s.lock.Lock() - defer s.lock.Unlock() for ch := range s.outputs { delete(s.outputs, ch) close(ch) } + s.lock.Unlock() } // Close the incoming chan, this will also delete all outputs @@ -63,7 +63,6 @@ func (s *Stream) Close() { } // Register a new output on a stream. -// If hidden in true, then do not count this client. func (s *Stream) Register(output chan []byte) { s.lock.Lock() defer s.lock.Unlock() diff --git a/stream/srt/handler.go b/stream/srt/handler.go index df0e324..6dbaa43 100644 --- a/stream/srt/handler.go +++ b/stream/srt/handler.go @@ -21,12 +21,12 @@ func handleStreamer(socket *srtgo.SrtSocket, streams map[string]*stream.Stream, st := stream.New() streams[name] = st - // Create a new buffer - // UDP packet cannot be larger than MTU (1500) - buff := make([]byte, 1500) - // Read RTP packets forever and send them to the WebRTC Client for { + // Create a new buffer + // UDP packet cannot be larger than MTU (1500) + buff := make([]byte, 1500) + // 5s timeout n, err := socket.Read(buff, 5000) if err != nil { @@ -41,11 +41,8 @@ func handleStreamer(socket *srtgo.SrtSocket, streams map[string]*stream.Stream, } // Send raw data to other streams - // Copy data in another buffer to ensure that the data would not be overwritten - // FIXME: might be unnecessary - data := make([]byte, n) - copy(data, buff[:n]) - st.Broadcast <- data + buff = buff[:n] + st.Broadcast <- buff } // Close stream diff --git a/transcoder/text/text.go b/transcoder/text/text.go index 538d0e6..44114f4 100644 --- a/transcoder/text/text.go +++ b/transcoder/text/text.go @@ -63,7 +63,7 @@ func Init(streams map[string]*stream.Stream, cfg *Options) { // Convert video to ANSI text func transcode(input, output *stream.Stream, cfg *Options) { // Start ffmpeg to transcode video to rawvideo - videoInput := make(chan []byte) + videoInput := make(chan []byte, 1024) input.Register(videoInput) ffmpeg, rawvideo, err := startFFmpeg(videoInput, cfg) if err != nil { From b9da2ab3a7f421bbb36a0874cdbbf93a053ceaad Mon Sep 17 00:00:00 2001 From: Alexandre Iooss Date: Sun, 18 Oct 2020 16:05:28 +0200 Subject: [PATCH 14/15] Make webrtc and forwarding work with new messaging --- stream/forwarding/forwarding.go | 47 +++++-- stream/forwarding/forwarding_test.go | 12 +- stream/webrtc/ingest.go | 183 +++++++++++++-------------- stream/webrtc/webrtc.go | 12 +- stream/webrtc/webrtc_test.go | 8 +- transcoder/text/text.go | 2 +- 6 files changed, 144 insertions(+), 120 deletions(-) diff --git a/stream/forwarding/forwarding.go b/stream/forwarding/forwarding.go index fbcd687..d8fcf88 100644 --- a/stream/forwarding/forwarding.go +++ b/stream/forwarding/forwarding.go @@ -2,12 +2,10 @@ package forwarding import ( - "bufio" - "io" "log" - "os/exec" + "time" - "gitlab.crans.org/nounous/ghostream/stream/srt" + "gitlab.crans.org/nounous/ghostream/stream" ) // Options to configure the stream forwarding. @@ -15,21 +13,46 @@ import ( type Options map[string][]string // Serve handles incoming packets from SRT and forward them to other external services -func Serve(inputChannel chan srt.Packet, cfg Options) { +func Serve(streams map[string]*stream.Stream, cfg Options) { if len(cfg) < 1 { // No forwarding, ignore - for { - <-inputChannel // Clear input channel - } + return } log.Printf("Stream forwarding initialized") - ffmpegInstances := make(map[string]*exec.Cmd) + for { + for name, st := range streams { + fwdCfg, ok := cfg[name] + if !ok { + // Not configured + continue + } + + // Start forwarding + log.Printf("Starting forwarding for '%s'", name) + go forward(st, fwdCfg) + } + + // Regulary pull stream list, + // it may be better to tweak the messaging system + // to get an event on a new stream. + time.Sleep(time.Second) + } +} + +func forward(st *stream.Stream, fwdCfg []string) { + // FIXME + /*ffmpegInstances := make(map[string]*exec.Cmd) ffmpegInputStreams := make(map[string]*io.WriteCloser) for { var err error = nil // Wait for packets - packet := <-inputChannel + // FIXME packet := <-inputChannel + packet := srt.Packet{ + Data: []byte{}, + PacketType: "nothing", + StreamName: "demo", + } switch packet.PacketType { case "register": err = registerStream(packet.StreamName, ffmpegInstances, ffmpegInputStreams, cfg) @@ -47,9 +70,10 @@ func Serve(inputChannel chan srt.Packet, cfg Options) { if err != nil { log.Printf("Error occurred while receiving SRT packet of type %s: %s", packet.PacketType, err) } - } + }*/ } +/* // registerStream creates ffmpeg instance associated with newly created stream func registerStream(name string, ffmpegInstances map[string]*exec.Cmd, ffmpegInputStreams map[string]*io.WriteCloser, cfg Options) error { streams, exist := cfg[name] @@ -119,3 +143,4 @@ func close(name string, ffmpegInstances map[string]*exec.Cmd, ffmpegInputStreams delete(ffmpegInputStreams, name) return nil } +*/ diff --git a/stream/forwarding/forwarding_test.go b/stream/forwarding/forwarding_test.go index bc9cb5e..1f6ea6a 100644 --- a/stream/forwarding/forwarding_test.go +++ b/stream/forwarding/forwarding_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "gitlab.crans.org/nounous/ghostream/stream" "gitlab.crans.org/nounous/ghostream/stream/srt" ) @@ -30,16 +31,15 @@ func TestForwardStream(t *testing.T) { } }() - forwardingList := make(map[string][]string) - forwardingList["demo"] = []string{"rtmp://127.0.0.1:1936/live/app"} - - forwardingChannel := make(chan srt.Packet) + cfg := make(map[string][]string) + cfg["demo"] = []string{"rtmp://127.0.0.1:1936/live/app"} // Register forwarding stream list - go Serve(forwardingChannel, forwardingList) + streams := make(map[string]*stream.Stream) + go Serve(streams, cfg) // Serve SRT Server without authentification backend - go srt.Serve(&srt.Options{Enabled: true, ListenAddress: ":9712", MaxClients: 2}, nil, forwardingChannel, nil) + go srt.Serve(streams, nil, &srt.Options{Enabled: true, ListenAddress: ":9712", MaxClients: 2}) ffmpeg := exec.Command("ffmpeg", "-hide_banner", "-loglevel", "error", "-re", "-f", "lavfi", "-i", "testsrc=size=640x480:rate=10", diff --git a/stream/webrtc/ingest.go b/stream/webrtc/ingest.go index 66a4b0a..ccede0d 100644 --- a/stream/webrtc/ingest.go +++ b/stream/webrtc/ingest.go @@ -3,61 +3,53 @@ package webrtc import ( "bufio" - "fmt" - "io" "log" "net" "os/exec" + "strings" + "time" "github.com/pion/rtp" "github.com/pion/webrtc/v3" - "gitlab.crans.org/nounous/ghostream/stream/srt" - "gitlab.crans.org/nounous/ghostream/stream/telnet" + "gitlab.crans.org/nounous/ghostream/stream" ) var ( - ffmpeg = make(map[string]*exec.Cmd) - ffmpegInput = make(map[string]io.WriteCloser) + activeStream map[string]struct{} ) -func ingestFrom(inputChannel chan srt.Packet) { - // FIXME Clean code - +func autoIngest(streams map[string]*stream.Stream) { + // Regulary check existing streams + activeStream = make(map[string]struct{}) for { - var err error = nil - srtPacket := <-inputChannel - switch srtPacket.PacketType { - case "register": - go registerStream(&srtPacket) - break - case "sendData": - if _, ok := ffmpegInput[srtPacket.StreamName]; !ok { - break + for name, st := range streams { + if strings.Contains(name, "@") { + // Not a source stream, pass + continue } - // FIXME send to stream srtPacket.StreamName - if _, err := ffmpegInput[srtPacket.StreamName].Write(srtPacket.Data); err != nil { - log.Printf("Failed to write data to ffmpeg input: %s", err) + + if _, ok := activeStream[name]; ok { + // Stream is already ingested + continue } - break - case "close": - log.Printf("WebRTC CloseConnection %s", srtPacket.StreamName) - _ = ffmpeg[srtPacket.StreamName].Process.Kill() - _ = ffmpegInput[srtPacket.StreamName].Close() - delete(ffmpeg, srtPacket.StreamName) - delete(ffmpegInput, srtPacket.StreamName) - break - default: - log.Println("Unknown SRT srtPacket type:", srtPacket.PacketType) - break - } - if err != nil { - log.Printf("Error occurred while receiving SRT srtPacket of type %s: %s", srtPacket.PacketType, err) + + // Start ingestion + log.Printf("Starting webrtc for '%s'", name) + go ingest(name, st) } + + // Regulary pull stream list, + // it may be better to tweak the messaging system + // to get an event on a new stream. + time.Sleep(time.Second) } } -func registerStream(srtPacket *srt.Packet) { - log.Printf("WebRTC RegisterStream %s", srtPacket.StreamName) +func ingest(name string, input *stream.Stream) { + // Register to get stream + videoInput := make(chan []byte, 1024) + input.Register(videoInput) + activeStream[name] = struct{}{} // Open a UDP Listener for RTP Packets on port 5004 videoListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5004}) @@ -70,55 +62,12 @@ func registerStream(srtPacket *srt.Packet) { log.Printf("Faited to open UDP listener %s", err) return } - // FIXME Close UDP listeners at the end of the stream, not the end of the routine - /* defer func() { - if err = videoListener.Close(); err != nil { - log.Printf("Faited to close UDP listener %s", err) - } - if err = audioListener.Close(); err != nil { - log.Printf("Faited to close UDP listener %s", err) - } - }() */ - ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0", - "-an", "-vcodec", "libvpx", "-crf", "10", "-cpu-used", "5", "-b:v", "6000k", "-maxrate", "8000k", "-bufsize", "12000k", // TODO Change bitrate when changing quality - "-qmin", "10", "-qmax", "42", "-threads", "4", "-deadline", "1", "-error-resilient", "1", - "-auto-alt-ref", "1", - "-f", "rtp", "rtp://127.0.0.1:5004", - "-vn", "-acodec", "libopus", "-cpu-used", "5", "-deadline", "1", "-qmin", "10", "-qmax", "42", "-error-resilient", "1", "-auto-alt-ref", "1", - "-f", "rtp", "rtp://127.0.0.1:5005"} - - // Export stream to ascii art - if telnet.Cfg.Enabled { - bitrate := fmt.Sprintf("%dk", telnet.Cfg.Width*telnet.Cfg.Height/telnet.Cfg.Delay) - ffmpegArgs = append(ffmpegArgs, - "-an", "-vf", fmt.Sprintf("scale=%dx%d", telnet.Cfg.Width, telnet.Cfg.Height), - "-b:v", bitrate, "-minrate", bitrate, "-maxrate", bitrate, "-bufsize", bitrate, "-q", "42", "-pix_fmt", "gray", "-f", "rawvideo", "pipe:1") - } - - ffmpeg[srtPacket.StreamName] = exec.Command("ffmpeg", ffmpegArgs...) - - input, err := ffmpeg[srtPacket.StreamName].StdinPipe() + // Start ffmpag to convert videoInput to video and audio UDP + ffmpeg, err := startFFmpeg(videoInput) if err != nil { - panic(err) - } - ffmpegInput[srtPacket.StreamName] = input - errOutput, err := ffmpeg[srtPacket.StreamName].StderrPipe() - if err != nil { - panic(err) - } - - // Receive raw video output and convert it to ASCII art, then forward it TCP - if telnet.Cfg.Enabled { - output, err := ffmpeg[srtPacket.StreamName].StdoutPipe() - if err != nil { - panic(err) - } - go telnet.StartASCIIArtStream(srtPacket.StreamName, output) - } - - if err := ffmpeg[srtPacket.StreamName].Start(); err != nil { - panic(err) + log.Printf("Error while starting ffmpeg: %s", err) + return } // Receive video @@ -128,7 +77,7 @@ func registerStream(srtPacket *srt.Packet) { n, _, err := videoListener.ReadFromUDP(inboundRTPPacket) if err != nil { log.Printf("Failed to read from UDP: %s", err) - continue + break } packet := &rtp.Packet{} if err := packet.Unmarshal(inboundRTPPacket[:n]); err != nil { @@ -136,13 +85,13 @@ func registerStream(srtPacket *srt.Packet) { continue } - if videoTracks[srtPacket.StreamName] == nil { - videoTracks[srtPacket.StreamName] = make([]*webrtc.Track, 0) + if videoTracks[name] == nil { + videoTracks[name] = make([]*webrtc.Track, 0) } // Write RTP srtPacket to all video tracks // Adapt payload and SSRC to match destination - for _, videoTrack := range videoTracks[srtPacket.StreamName] { + for _, videoTrack := range videoTracks[name] { packet.Header.PayloadType = videoTrack.PayloadType() packet.Header.SSRC = videoTrack.SSRC() if writeErr := videoTrack.WriteRTP(packet); writeErr != nil { @@ -160,7 +109,7 @@ func registerStream(srtPacket *srt.Packet) { n, _, err := audioListener.ReadFromUDP(inboundRTPPacket) if err != nil { log.Printf("Failed to read from UDP: %s", err) - continue + break } packet := &rtp.Packet{} if err := packet.Unmarshal(inboundRTPPacket[:n]); err != nil { @@ -168,13 +117,13 @@ func registerStream(srtPacket *srt.Packet) { continue } - if audioTracks[srtPacket.StreamName] == nil { - audioTracks[srtPacket.StreamName] = make([]*webrtc.Track, 0) + if audioTracks[name] == nil { + audioTracks[name] = make([]*webrtc.Track, 0) } // Write RTP srtPacket to all audio tracks // Adapt payload and SSRC to match destination - for _, audioTrack := range audioTracks[srtPacket.StreamName] { + for _, audioTrack := range audioTracks[name] { packet.Header.PayloadType = audioTrack.PayloadType() packet.Header.SSRC = audioTrack.SSRC() if writeErr := audioTrack.WriteRTP(packet); writeErr != nil { @@ -185,10 +134,60 @@ func registerStream(srtPacket *srt.Packet) { } }() + // Wait for stopped ffmpeg + if err = ffmpeg.Wait(); err != nil { + log.Printf("Faited to wait for ffmpeg: %s", err) + } + + // Close UDP listeners + if err = videoListener.Close(); err != nil { + log.Printf("Faited to close UDP listener: %s", err) + } + if err = audioListener.Close(); err != nil { + log.Printf("Faited to close UDP listener: %s", err) + } + delete(activeStream, name) +} + +func startFFmpeg(in <-chan []byte) (ffmpeg *exec.Cmd, err error) { + ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0", + "-an", "-vcodec", "libvpx", "-crf", "10", "-cpu-used", "5", "-b:v", "6000k", "-maxrate", "8000k", "-bufsize", "12000k", // TODO Change bitrate when changing quality + "-qmin", "10", "-qmax", "42", "-threads", "4", "-deadline", "1", "-error-resilient", "1", + "-auto-alt-ref", "1", + "-f", "rtp", "rtp://127.0.0.1:5004", + "-vn", "-acodec", "libopus", "-cpu-used", "5", "-deadline", "1", "-qmin", "10", "-qmax", "42", "-error-resilient", "1", "-auto-alt-ref", "1", + "-f", "rtp", "rtp://127.0.0.1:5005"} + ffmpeg = exec.Command("ffmpeg", ffmpegArgs...) + + // Handle errors output + errOutput, err := ffmpeg.StderrPipe() + if err != nil { + return nil, err + } go func() { scanner := bufio.NewScanner(errOutput) for scanner.Scan() { log.Printf("[WEBRTC FFMPEG %s] %s", "demo", scanner.Text()) } }() + + // Handle stream input + input, err := ffmpeg.StdinPipe() + if err != nil { + return nil, err + } + go func() { + for data := range in { + if _, err := input.Write(data); err != nil { + log.Printf("Failed to write data to ffmpeg input: %s", err) + } + } + + // End of stream + ffmpeg.Process.Kill() + }() + + // Start process + err = ffmpeg.Start() + return ffmpeg, err } diff --git a/stream/webrtc/webrtc.go b/stream/webrtc/webrtc.go index 365ebc3..37390dd 100644 --- a/stream/webrtc/webrtc.go +++ b/stream/webrtc/webrtc.go @@ -8,7 +8,7 @@ import ( "github.com/pion/webrtc/v3" "gitlab.crans.org/nounous/ghostream/internal/monitoring" - "gitlab.crans.org/nounous/ghostream/stream/srt" + "gitlab.crans.org/nounous/ghostream/stream" ) // Options holds web package configuration @@ -182,12 +182,12 @@ func getPayloadType(m webrtc.MediaEngine, codecType webrtc.RTPCodecType, codecNa } // Serve WebRTC media streaming server -func Serve(remoteSdpChan chan struct { +func Serve(streams map[string]*stream.Stream, remoteSdpChan chan struct { StreamID string RemoteDescription webrtc.SessionDescription -}, localSdpChan chan webrtc.SessionDescription, inputChannel chan srt.Packet, cfg *Options) { +}, localSdpChan chan webrtc.SessionDescription, cfg *Options) { if !cfg.Enabled { - // SRT is not enabled, ignore + // WebRTC is not enabled, ignore return } @@ -197,8 +197,8 @@ func Serve(remoteSdpChan chan struct { videoTracks = make(map[string][]*webrtc.Track) audioTracks = make(map[string][]*webrtc.Track) - // Ingest data from SRT - go ingestFrom(inputChannel) + // Ingest data + go autoIngest(streams) // Handle new connections for { diff --git a/stream/webrtc/webrtc_test.go b/stream/webrtc/webrtc_test.go index 19f4b34..ee34ca5 100644 --- a/stream/webrtc/webrtc_test.go +++ b/stream/webrtc/webrtc_test.go @@ -5,24 +5,24 @@ import ( "testing" "github.com/pion/webrtc/v3" - "gitlab.crans.org/nounous/ghostream/stream/srt" + "gitlab.crans.org/nounous/ghostream/stream" ) func TestServe(t *testing.T) { - // Serve WebRTC server + // Init streams messaging and WebRTC server + streams := make(map[string]*stream.Stream) remoteSdpChan := make(chan struct { StreamID string RemoteDescription webrtc.SessionDescription }) localSdpChan := make(chan webrtc.SessionDescription) - webrtcChannel := make(chan srt.Packet, 64) cfg := Options{ Enabled: true, MinPortUDP: 10000, MaxPortUDP: 10005, STUNServers: []string{"stun:stun.l.google.com:19302"}, } - go Serve(remoteSdpChan, localSdpChan, webrtcChannel, &cfg) + go Serve(streams, remoteSdpChan, localSdpChan, &cfg) // New client connection mediaEngine := webrtc.MediaEngine{} diff --git a/transcoder/text/text.go b/transcoder/text/text.go index 44114f4..02a0550 100644 --- a/transcoder/text/text.go +++ b/transcoder/text/text.go @@ -153,5 +153,5 @@ func startFFmpeg(in <-chan []byte, cfg *Options) (*exec.Cmd, *io.ReadCloser, err // Start process err = ffmpeg.Start() - return ffmpeg, &output, nil + return ffmpeg, &output, err } From c42ca78faccd3a5d94bb1e5c65514052bde82311 Mon Sep 17 00:00:00 2001 From: Alexandre Iooss Date: Sun, 18 Oct 2020 16:14:08 +0200 Subject: [PATCH 15/15] Add package comment on transcoder --- stream/telnet/handler.go | 79 ---------------------------------------- stream/telnet/telnet.go | 73 ++++++++++++++++++++++++++++++++++++- transcoder/transcoder.go | 1 + 3 files changed, 73 insertions(+), 80 deletions(-) delete mode 100644 stream/telnet/handler.go diff --git a/stream/telnet/handler.go b/stream/telnet/handler.go deleted file mode 100644 index 92ecc95..0000000 --- a/stream/telnet/handler.go +++ /dev/null @@ -1,79 +0,0 @@ -package telnet - -import ( - "log" - "net" - "strings" - "time" - - "gitlab.crans.org/nounous/ghostream/stream" -) - -func handleViewer(s net.Conn, streams map[string]*stream.Stream, cfg *Options) { - // Prompt user about stream name - if _, err := s.Write([]byte("[GHOSTREAM]\nEnter stream name: ")); err != nil { - log.Printf("Error while writing to TCP socket: %s", err) - s.Close() - return - } - buff := make([]byte, 255) - n, err := s.Read(buff) - if err != nil { - log.Printf("Error while requesting stream ID to telnet client: %s", err) - s.Close() - return - } - name := strings.TrimSpace(string(buff[:n])) + "@text" - if len(name) < 1 { - // Too short, exit - s.Close() - return - } - - // Wait a bit - time.Sleep(time.Second) - - // Get requested stream - st, ok := streams[name] - if !ok { - log.Println("Stream does not exist, kicking new Telnet viewer") - if _, err := s.Write([]byte("This stream is inactive.\n")); err != nil { - log.Printf("Error while writing to TCP socket: %s", err) - } - s.Close() - return - } - - // Register new client - log.Printf("New Telnet viewer for stream '%s'", name) - c := make(chan []byte, 128) - st.Register(c) - st.IncrementClientCount() - - // Hide terminal cursor - if _, err = s.Write([]byte("\033[?25l")); err != nil { - log.Printf("Error while writing to TCP socket: %s", err) - s.Close() - return - } - - // Receive data and send them - for data := range c { - if len(data) < 1 { - log.Print("Remove Telnet viewer because of end of stream") - break - } - - // Send data - _, err := s.Write(data) - if err != nil { - log.Printf("Remove Telnet viewer because of sending error, %s", err) - break - } - } - - // Close output - st.Unregister(c) - st.DecrementClientCount() - s.Close() -} diff --git a/stream/telnet/telnet.go b/stream/telnet/telnet.go index 5bf1636..5090895 100644 --- a/stream/telnet/telnet.go +++ b/stream/telnet/telnet.go @@ -1,9 +1,11 @@ -// Package telnet provides some fancy tools, like an ASCII-art stream. +// Package telnet expose text version of stream. package telnet import ( "log" "net" + "strings" + "time" "gitlab.crans.org/nounous/ghostream/stream" ) @@ -39,3 +41,72 @@ func Serve(streams map[string]*stream.Stream, cfg *Options) { go handleViewer(s, streams, cfg) } } + +func handleViewer(s net.Conn, streams map[string]*stream.Stream, cfg *Options) { + // Prompt user about stream name + if _, err := s.Write([]byte("[GHOSTREAM]\nEnter stream name: ")); err != nil { + log.Printf("Error while writing to TCP socket: %s", err) + s.Close() + return + } + buff := make([]byte, 255) + n, err := s.Read(buff) + if err != nil { + log.Printf("Error while requesting stream ID to telnet client: %s", err) + s.Close() + return + } + name := strings.TrimSpace(string(buff[:n])) + "@text" + if len(name) < 1 { + // Too short, exit + s.Close() + return + } + + // Wait a bit + time.Sleep(time.Second) + + // Get requested stream + st, ok := streams[name] + if !ok { + log.Println("Stream does not exist, kicking new Telnet viewer") + if _, err := s.Write([]byte("This stream is inactive.\n")); err != nil { + log.Printf("Error while writing to TCP socket: %s", err) + } + s.Close() + return + } + + // Register new client + log.Printf("New Telnet viewer for stream '%s'", name) + c := make(chan []byte, 128) + st.Register(c) + st.IncrementClientCount() + + // Hide terminal cursor + if _, err = s.Write([]byte("\033[?25l")); err != nil { + log.Printf("Error while writing to TCP socket: %s", err) + s.Close() + return + } + + // Receive data and send them + for data := range c { + if len(data) < 1 { + log.Print("Remove Telnet viewer because of end of stream") + break + } + + // Send data + _, err := s.Write(data) + if err != nil { + log.Printf("Remove Telnet viewer because of sending error, %s", err) + break + } + } + + // Close output + st.Unregister(c) + st.DecrementClientCount() + s.Close() +} diff --git a/transcoder/transcoder.go b/transcoder/transcoder.go index 5fda7ef..bf84170 100644 --- a/transcoder/transcoder.go +++ b/transcoder/transcoder.go @@ -1,3 +1,4 @@ +// Package transcoder manages transcoders package transcoder import (