From 23b6eeaa05e5729a34e4bf2e65a5667aac69cbbe Mon Sep 17 00:00:00 2001 From: Alexandre Iooss Date: Sat, 17 Oct 2020 19:40:37 +0200 Subject: [PATCH] Add transcoder package with text transcoder --- internal/config/config.go | 13 +++- main.go | 4 +- stream/telnet/handler.go | 35 ++++----- stream/telnet/telnet.go | 31 +------- stream/telnet/telnet_test.go | 3 - .../convert.go => transcoder/text/text.go | 76 ++++++++++++++++--- transcoder/text/text_test.go | 1 + transcoder/transcoder.go | 16 ++++ 8 files changed, 112 insertions(+), 67 deletions(-) rename stream/telnet/convert.go => transcoder/text/text.go (57%) create mode 100644 transcoder/text/text_test.go create mode 100644 transcoder/transcoder.go diff --git a/internal/config/config.go b/internal/config/config.go index 09edec7..dbefc50 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -13,6 +13,8 @@ import ( "gitlab.crans.org/nounous/ghostream/stream/srt" "gitlab.crans.org/nounous/ghostream/stream/telnet" "gitlab.crans.org/nounous/ghostream/stream/webrtc" + "gitlab.crans.org/nounous/ghostream/transcoder" + "gitlab.crans.org/nounous/ghostream/transcoder/text" "gitlab.crans.org/nounous/ghostream/web" ) @@ -54,9 +56,14 @@ func New() *Config { Telnet: telnet.Options{ Enabled: false, ListenAddress: ":8023", - Width: 80, - Height: 45, - Delay: 50, + }, + Transcoder: transcoder.Options{ + Text: text.Options{ + Enabled: false, + Width: 80, + Height: 45, + Framerate: 20, + }, }, Web: web.Options{ Enabled: true, diff --git a/main.go b/main.go index 957b702..781b8dc 100644 --- a/main.go +++ b/main.go @@ -15,6 +15,7 @@ import ( "gitlab.crans.org/nounous/ghostream/stream/srt" "gitlab.crans.org/nounous/ghostream/stream/telnet" "gitlab.crans.org/nounous/ghostream/stream/webrtc" + "gitlab.crans.org/nounous/ghostream/transcoder" "gitlab.crans.org/nounous/ghostream/web" ) @@ -48,7 +49,8 @@ func main() { streams := make(map[string]*stream.Stream) // Start routines - go forwarding.Serve(streams, cfg.Forwarding) + go transcoder.Init(streams, &cfg.Transcoder) + go forwarding.Serve(streams, &cfg.Forwarding) go monitoring.Serve(&cfg.Monitoring) go srt.Serve(streams, authBackend, &cfg.Srt) go telnet.Serve(streams, &cfg.Telnet) diff --git a/stream/telnet/handler.go b/stream/telnet/handler.go index e5edd32..92ecc95 100644 --- a/stream/telnet/handler.go +++ b/stream/telnet/handler.go @@ -9,7 +9,7 @@ import ( "gitlab.crans.org/nounous/ghostream/stream" ) -func handleViewer(s net.Conn, streams map[string]*stream.Stream, textStreams map[string]*[]byte, cfg *Options) { +func handleViewer(s net.Conn, streams map[string]*stream.Stream, cfg *Options) { // Prompt user about stream name if _, err := s.Write([]byte("[GHOSTREAM]\nEnter stream name: ")); err != nil { log.Printf("Error while writing to TCP socket: %s", err) @@ -23,7 +23,7 @@ func handleViewer(s net.Conn, streams map[string]*stream.Stream, textStreams map s.Close() return } - name := strings.TrimSpace(string(buff[:n])) + name := strings.TrimSpace(string(buff[:n])) + "@text" if len(name) < 1 { // Too short, exit s.Close() @@ -45,7 +45,9 @@ func handleViewer(s net.Conn, streams map[string]*stream.Stream, textStreams map } // Register new client - log.Printf("New Telnet viewer for stream %s", name) + log.Printf("New Telnet viewer for stream '%s'", name) + c := make(chan []byte, 128) + st.Register(c) st.IncrementClientCount() // Hide terminal cursor @@ -55,28 +57,23 @@ func handleViewer(s net.Conn, streams map[string]*stream.Stream, textStreams map return } - // Send stream - for { - text, ok := textStreams[name] - if !ok { - log.Println("Stream is not converted to text, kicking Telnet viewer") - if _, err := s.Write([]byte("This stream cannot be opened.\n")); err != nil { - log.Printf("Error while writing to TCP socket: %s", err) - } + // Receive data and send them + for data := range c { + if len(data) < 1 { + log.Print("Remove Telnet viewer because of end of stream") break } - // Send text to client - n, err := s.Write(*text) - if err != nil || n == 0 { - log.Printf("Error while sending TCP data: %s", err) + // Send data + _, err := s.Write(data) + if err != nil { + log.Printf("Remove Telnet viewer because of sending error, %s", err) break } - - time.Sleep(time.Duration(cfg.Delay) * time.Millisecond) } - // Close connection - s.Close() + // Close output + st.Unregister(c) st.DecrementClientCount() + s.Close() } diff --git a/stream/telnet/telnet.go b/stream/telnet/telnet.go index 3a4ca54..5bf1636 100644 --- a/stream/telnet/telnet.go +++ b/stream/telnet/telnet.go @@ -4,7 +4,6 @@ package telnet import ( "log" "net" - "time" "gitlab.crans.org/nounous/ghostream/stream" ) @@ -13,9 +12,6 @@ import ( type Options struct { Enabled bool ListenAddress string - Width int - Height int - Delay int } // Serve Telnet server @@ -25,10 +21,6 @@ func Serve(streams map[string]*stream.Stream, cfg *Options) { return } - // Start conversion routine - textStreams := make(map[string]*[]byte) - go autoStartConversion(streams, textStreams, cfg) - // Start TCP server listener, err := net.Listen("tcp", cfg.ListenAddress) if err != nil { @@ -44,27 +36,6 @@ func Serve(streams map[string]*stream.Stream, cfg *Options) { continue } - go handleViewer(s, streams, textStreams, cfg) - } -} - -// Convertion routine listen to existing stream and start text conversions -func autoStartConversion(streams map[string]*stream.Stream, textStreams map[string]*[]byte, cfg *Options) { - for { - for name, stream := range streams { - textStream, ok := textStreams[name] - if ok { - // Everything is fine - continue - } - - // Start conversion - log.Printf("Starting text conversion of %s", name) - // FIXME that is not how to use a pointer - textStream = &[]byte{} - textStreams[name] = textStream - go streamToTextStream(stream, textStream, cfg) - } - time.Sleep(time.Second) + go handleViewer(s, streams, cfg) } } diff --git a/stream/telnet/telnet_test.go b/stream/telnet/telnet_test.go index 7f87542..76b3d3b 100644 --- a/stream/telnet/telnet_test.go +++ b/stream/telnet/telnet_test.go @@ -18,9 +18,6 @@ func TestTelnetOutput(t *testing.T) { cfg := Options{ Enabled: true, ListenAddress: "127.0.0.1:8023", - Width: 80, - Height: 45, - Delay: 50, } go Serve(streams, &cfg) diff --git a/stream/telnet/convert.go b/transcoder/text/text.go similarity index 57% rename from stream/telnet/convert.go rename to transcoder/text/text.go index 0e0683f..538d0e6 100644 --- a/stream/telnet/convert.go +++ b/transcoder/text/text.go @@ -1,4 +1,5 @@ -package telnet +// Package text transcode a video to text +package text import ( "bufio" @@ -7,20 +8,70 @@ import ( "io" "log" "os/exec" + "strings" + "time" "gitlab.crans.org/nounous/ghostream/stream" ) -// Convert rawvideo to ANSI text -func streamToTextStream(stream *stream.Stream, text *[]byte, cfg *Options) { - // Start ffmpeg - video := make(chan []byte) - stream.Register(video) - _, rawvideo, err := startFFmpeg(video, cfg) - if err != nil { - log.Printf("Error while starting ffmpeg: %s", err) +// Options holds text package configuration +type Options struct { + Enabled bool + Width int + Height int + Framerate int +} + +// Init text transcoder +func Init(streams map[string]*stream.Stream, cfg *Options) { + if !cfg.Enabled { + // Text transcode is not enabled, ignore + return } + // Regulary check existing streams + for { + for sourceName, sourceStream := range streams { + if strings.Contains(sourceName, "@") { + // Not a source stream, pass + continue + } + + // Check that the transcoded stream does not already exist + name := sourceName + "@text" + _, ok := streams[name] + if ok { + // Stream is already transcoded + continue + } + + // Start conversion + log.Printf("Starting text transcode '%s'", name) + st := stream.New() + streams[name] = st + + go transcode(sourceStream, st, cfg) + } + + // Regulary pull stream list, + // it may be better to tweak the messaging system + // to get an event on a new stream. + time.Sleep(time.Second) + } +} + +// Convert video to ANSI text +func transcode(input, output *stream.Stream, cfg *Options) { + // Start ffmpeg to transcode video to rawvideo + videoInput := make(chan []byte) + input.Register(videoInput) + ffmpeg, rawvideo, err := startFFmpeg(videoInput, cfg) + if err != nil { + log.Printf("Error while starting ffmpeg: %s", err) + return + } + + // Transcode rawvideo to ANSI text pixelBuff := make([]byte, cfg.Width*cfg.Height) textBuff := bytes.Buffer{} for { @@ -55,13 +106,16 @@ func streamToTextStream(stream *stream.Stream, text *[]byte, cfg *Options) { } textBuff.WriteString("\033[49m") - *text = textBuff.Bytes() + output.Broadcast <- textBuff.Bytes() } + + // Stop transcode + ffmpeg.Process.Kill() } // Start a ffmpeg instance to convert stream into rawvideo func startFFmpeg(in <-chan []byte, cfg *Options) (*exec.Cmd, *io.ReadCloser, error) { - bitrate := fmt.Sprintf("%dk", cfg.Width*cfg.Height/cfg.Delay) + bitrate := fmt.Sprintf("%dk", cfg.Width*cfg.Height*cfg.Framerate) ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0", "-an", "-vf", fmt.Sprintf("scale=%dx%d", cfg.Width, cfg.Height), "-b:v", bitrate, "-minrate", bitrate, "-maxrate", bitrate, "-bufsize", bitrate, diff --git a/transcoder/text/text_test.go b/transcoder/text/text_test.go new file mode 100644 index 0000000..4c96c5e --- /dev/null +++ b/transcoder/text/text_test.go @@ -0,0 +1 @@ +package text diff --git a/transcoder/transcoder.go b/transcoder/transcoder.go new file mode 100644 index 0000000..5fda7ef --- /dev/null +++ b/transcoder/transcoder.go @@ -0,0 +1,16 @@ +package transcoder + +import ( + "gitlab.crans.org/nounous/ghostream/stream" + "gitlab.crans.org/nounous/ghostream/transcoder/text" +) + +// Options holds text package configuration +type Options struct { + Text text.Options +} + +// Init all transcoders +func Init(streams map[string]*stream.Stream, cfg *Options) { + go text.Init(streams, &cfg.Text) +}