diff --git a/docs/ghostream.example.yml b/docs/ghostream.example.yml index aa7624e..0a0ce36 100644 --- a/docs/ghostream.example.yml +++ b/docs/ghostream.example.yml @@ -82,16 +82,24 @@ telnet: # #listenAddress: :8023 - # Size is in characters. It is recommended to keep a 16x9 format. - # - #width: 80 - #height: 45 +## Transcoders configuration ## +transcoder: + text: + # By default the text transcoder is disabled. + # You need to enable it to use telnet output. + # + #enabled: false - # Time in milliseconds between two images. - # By default 50 ms, so 20 FPS. - # Displaying text takes time. - # - #delay: 50 + # Size is in characters. It is recommended to keep a 16x9 format. + # + #width: 80 + #height: 45 + + # Time in milliseconds between two images. + # By default 50 ms, so 20 FPS. + # Displaying text takes time. + # + #delay: 50 ## Web server ## # The web server serves a WebRTC player. diff --git a/internal/config/config.go b/internal/config/config.go index 09edec7..2a94292 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -13,6 +13,8 @@ import ( "gitlab.crans.org/nounous/ghostream/stream/srt" "gitlab.crans.org/nounous/ghostream/stream/telnet" "gitlab.crans.org/nounous/ghostream/stream/webrtc" + "gitlab.crans.org/nounous/ghostream/transcoder" + "gitlab.crans.org/nounous/ghostream/transcoder/text" "gitlab.crans.org/nounous/ghostream/web" ) @@ -23,6 +25,7 @@ type Config struct { Monitoring monitoring.Options Srt srt.Options Telnet telnet.Options + Transcoder transcoder.Options Web web.Options WebRTC webrtc.Options } @@ -54,9 +57,14 @@ func New() *Config { Telnet: telnet.Options{ Enabled: false, ListenAddress: ":8023", - Width: 80, - Height: 45, - Delay: 50, + }, + Transcoder: transcoder.Options{ + Text: text.Options{ + Enabled: false, + Width: 80, + Height: 45, + Framerate: 20, + }, }, Web: web.Options{ Enabled: true, diff --git a/main.go b/main.go index e82d055..87d0c41 100644 --- a/main.go +++ b/main.go @@ -10,10 +10,12 @@ import ( "gitlab.crans.org/nounous/ghostream/auth" "gitlab.crans.org/nounous/ghostream/internal/config" "gitlab.crans.org/nounous/ghostream/internal/monitoring" + "gitlab.crans.org/nounous/ghostream/stream" "gitlab.crans.org/nounous/ghostream/stream/forwarding" "gitlab.crans.org/nounous/ghostream/stream/srt" "gitlab.crans.org/nounous/ghostream/stream/telnet" "gitlab.crans.org/nounous/ghostream/stream/webrtc" + "gitlab.crans.org/nounous/ghostream/transcoder" "gitlab.crans.org/nounous/ghostream/web" ) @@ -43,17 +45,17 @@ func main() { }) localSdpChan := make(chan webrtc.SessionDescription) - // SRT channel for forwarding and webrtc - forwardingChannel := make(chan srt.Packet, 64) - webrtcChannel := make(chan srt.Packet, 64) + // Init streams messaging + streams := make(map[string]*stream.Stream) // Start routines - go forwarding.Serve(forwardingChannel, cfg.Forwarding) + go transcoder.Init(streams, &cfg.Transcoder) + go forwarding.Serve(streams, cfg.Forwarding) go monitoring.Serve(&cfg.Monitoring) - go srt.Serve(&cfg.Srt, authBackend, forwardingChannel, webrtcChannel) - go telnet.Serve(&cfg.Telnet) - go web.Serve(remoteSdpChan, localSdpChan, &cfg.Web) - go webrtc.Serve(remoteSdpChan, localSdpChan, webrtcChannel, &cfg.WebRTC) + go srt.Serve(streams, authBackend, &cfg.Srt) + go telnet.Serve(streams, &cfg.Telnet) + go web.Serve(streams, remoteSdpChan, localSdpChan, &cfg.Web) + go webrtc.Serve(streams, remoteSdpChan, localSdpChan, &cfg.WebRTC) // Wait for routines select {} diff --git a/stream/forwarding/forwarding.go b/stream/forwarding/forwarding.go index fbcd687..d8fcf88 100644 --- a/stream/forwarding/forwarding.go +++ b/stream/forwarding/forwarding.go @@ -2,12 +2,10 @@ package forwarding import ( - "bufio" - "io" "log" - "os/exec" + "time" - "gitlab.crans.org/nounous/ghostream/stream/srt" + "gitlab.crans.org/nounous/ghostream/stream" ) // Options to configure the stream forwarding. @@ -15,21 +13,46 @@ import ( type Options map[string][]string // Serve handles incoming packets from SRT and forward them to other external services -func Serve(inputChannel chan srt.Packet, cfg Options) { +func Serve(streams map[string]*stream.Stream, cfg Options) { if len(cfg) < 1 { // No forwarding, ignore - for { - <-inputChannel // Clear input channel - } + return } log.Printf("Stream forwarding initialized") - ffmpegInstances := make(map[string]*exec.Cmd) + for { + for name, st := range streams { + fwdCfg, ok := cfg[name] + if !ok { + // Not configured + continue + } + + // Start forwarding + log.Printf("Starting forwarding for '%s'", name) + go forward(st, fwdCfg) + } + + // Regulary pull stream list, + // it may be better to tweak the messaging system + // to get an event on a new stream. + time.Sleep(time.Second) + } +} + +func forward(st *stream.Stream, fwdCfg []string) { + // FIXME + /*ffmpegInstances := make(map[string]*exec.Cmd) ffmpegInputStreams := make(map[string]*io.WriteCloser) for { var err error = nil // Wait for packets - packet := <-inputChannel + // FIXME packet := <-inputChannel + packet := srt.Packet{ + Data: []byte{}, + PacketType: "nothing", + StreamName: "demo", + } switch packet.PacketType { case "register": err = registerStream(packet.StreamName, ffmpegInstances, ffmpegInputStreams, cfg) @@ -47,9 +70,10 @@ func Serve(inputChannel chan srt.Packet, cfg Options) { if err != nil { log.Printf("Error occurred while receiving SRT packet of type %s: %s", packet.PacketType, err) } - } + }*/ } +/* // registerStream creates ffmpeg instance associated with newly created stream func registerStream(name string, ffmpegInstances map[string]*exec.Cmd, ffmpegInputStreams map[string]*io.WriteCloser, cfg Options) error { streams, exist := cfg[name] @@ -119,3 +143,4 @@ func close(name string, ffmpegInstances map[string]*exec.Cmd, ffmpegInputStreams delete(ffmpegInputStreams, name) return nil } +*/ diff --git a/stream/forwarding/forwarding_test.go b/stream/forwarding/forwarding_test.go index bc9cb5e..1f6ea6a 100644 --- a/stream/forwarding/forwarding_test.go +++ b/stream/forwarding/forwarding_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "gitlab.crans.org/nounous/ghostream/stream" "gitlab.crans.org/nounous/ghostream/stream/srt" ) @@ -30,16 +31,15 @@ func TestForwardStream(t *testing.T) { } }() - forwardingList := make(map[string][]string) - forwardingList["demo"] = []string{"rtmp://127.0.0.1:1936/live/app"} - - forwardingChannel := make(chan srt.Packet) + cfg := make(map[string][]string) + cfg["demo"] = []string{"rtmp://127.0.0.1:1936/live/app"} // Register forwarding stream list - go Serve(forwardingChannel, forwardingList) + streams := make(map[string]*stream.Stream) + go Serve(streams, cfg) // Serve SRT Server without authentification backend - go srt.Serve(&srt.Options{Enabled: true, ListenAddress: ":9712", MaxClients: 2}, nil, forwardingChannel, nil) + go srt.Serve(streams, nil, &srt.Options{Enabled: true, ListenAddress: ":9712", MaxClients: 2}) ffmpeg := exec.Command("ffmpeg", "-hide_banner", "-loglevel", "error", "-re", "-f", "lavfi", "-i", "testsrc=size=640x480:rate=10", diff --git a/stream/messaging.go b/stream/messaging.go new file mode 100644 index 0000000..4c2c0e7 --- /dev/null +++ b/stream/messaging.go @@ -0,0 +1,99 @@ +// Package stream defines a structure to communication between inputs and outputs +package stream + +import ( + "sync" +) + +// Stream makes packages able to subscribe to an incoming stream +type Stream struct { + // Incoming data come from this channel + Broadcast chan<- []byte + + // Use a map to be able to delete an item + outputs map[chan []byte]struct{} + + // Count clients for statistics + nbClients int + + // Mutex to lock outputs map + lock sync.Mutex +} + +// New creates a new stream. +func New() *Stream { + s := &Stream{} + broadcast := make(chan []byte, 1024) + s.Broadcast = broadcast + s.outputs = make(map[chan []byte]struct{}) + s.nbClients = 0 + go s.run(broadcast) + return s +} + +func (s *Stream) run(broadcast <-chan []byte) { + for msg := range broadcast { + s.lock.Lock() + for output := range s.outputs { + select { + case output <- msg: + default: + // If full, do a ring buffer + // Check that output is not of size zero + if len(output) > 1 { + <-output + } + } + } + s.lock.Unlock() + } + + // Incoming chan has been closed, close all outputs + s.lock.Lock() + for ch := range s.outputs { + delete(s.outputs, ch) + close(ch) + } + s.lock.Unlock() +} + +// Close the incoming chan, this will also delete all outputs +func (s *Stream) Close() { + close(s.Broadcast) +} + +// Register a new output on a stream. +func (s *Stream) Register(output chan []byte) { + s.lock.Lock() + defer s.lock.Unlock() + s.outputs[output] = struct{}{} +} + +// Unregister removes an output. +// If hidden in true, then do not count this client. +func (s *Stream) Unregister(output chan []byte) { + s.lock.Lock() + defer s.lock.Unlock() + + // Make sure we did not already close this output + _, ok := s.outputs[output] + if ok { + delete(s.outputs, output) + close(output) + } +} + +// ClientCount returns the number of clients +func (s *Stream) ClientCount() int { + return s.nbClients +} + +// IncrementClientCount increments the number of clients +func (s *Stream) IncrementClientCount() { + s.nbClients++ +} + +// DecrementClientCount decrements the number of clients +func (s *Stream) DecrementClientCount() { + s.nbClients-- +} diff --git a/stream/messaging_test.go b/stream/messaging_test.go new file mode 100644 index 0000000..49e17a0 --- /dev/null +++ b/stream/messaging_test.go @@ -0,0 +1,42 @@ +package stream + +import ( + "testing" +) + +func TestWithoutOutputs(t *testing.T) { + stream := New() + defer stream.Close() + stream.Broadcast <- []byte("hello world") +} + +func TestWithOneOutput(t *testing.T) { + stream := New() + defer stream.Close() + + // Register one output + output := make(chan []byte, 64) + stream.Register(output) + stream.IncrementClientCount() + + // Try to pass one message + stream.Broadcast <- []byte("hello world") + msg := <-output + if string(msg) != "hello world" { + t.Errorf("Message has wrong content: %s != hello world", msg) + } + + // Check client count + if count := stream.ClientCount(); count != 1 { + t.Errorf("Client counter returned %d, expected 1", count) + } + + // Unregister + stream.Unregister(output) + stream.DecrementClientCount() + + // Check client count + if count := stream.ClientCount(); count != 0 { + t.Errorf("Client counter returned %d, expected 0", count) + } +} diff --git a/stream/srt/handler.go b/stream/srt/handler.go index c40036e..6dbaa43 100644 --- a/stream/srt/handler.go +++ b/stream/srt/handler.go @@ -5,23 +5,30 @@ import ( "log" "github.com/haivision/srtgo" + "gitlab.crans.org/nounous/ghostream/stream" ) -func handleStreamer(s *srtgo.SrtSocket, name string, clientDataChannels map[string][]chan Packet, forwardingChannel, webrtcChannel chan Packet) { +func handleStreamer(socket *srtgo.SrtSocket, streams map[string]*stream.Stream, name string) { + // Check stream does not exist + if _, ok := streams[name]; ok { + log.Print("Stream already exists, refusing new streamer") + socket.Close() + return + } + + // Create stream log.Printf("New SRT streamer for stream %s", name) - - // Create a new buffer - // UDP packet cannot be larger than MTU (1500) - buff := make([]byte, 1500) - - // Setup stream forwarding - forwardingChannel <- Packet{StreamName: name, PacketType: "register", Data: nil} - webrtcChannel <- Packet{StreamName: name, PacketType: "register", Data: nil} + st := stream.New() + streams[name] = st // Read RTP packets forever and send them to the WebRTC Client for { + // Create a new buffer + // UDP packet cannot be larger than MTU (1500) + buff := make([]byte, 1500) + // 5s timeout - n, err := s.Read(buff, 5000) + n, err := socket.Read(buff, 5000) if err != nil { log.Println("Error occurred while reading SRT socket:", err) break @@ -33,40 +40,49 @@ func handleStreamer(s *srtgo.SrtSocket, name string, clientDataChannels map[stri break } - // Send raw packet to other streams - // Copy data in another buffer to ensure that the data would not be overwritten - data := make([]byte, n) - copy(data, buff[:n]) - forwardingChannel <- Packet{StreamName: name, PacketType: "sendData", Data: data} - webrtcChannel <- Packet{StreamName: name, PacketType: "sendData", Data: data} - for _, dataChannel := range clientDataChannels[name] { - dataChannel <- Packet{StreamName: name, PacketType: "sendData", Data: data} - } + // Send raw data to other streams + buff = buff[:n] + st.Broadcast <- buff } - forwardingChannel <- Packet{StreamName: name, PacketType: "close", Data: nil} - webrtcChannel <- Packet{StreamName: name, PacketType: "close", Data: nil} + // Close stream + st.Close() + socket.Close() + delete(streams, name) } -func handleViewer(s *srtgo.SrtSocket, name string, dataChannel chan Packet, dataChannels map[string][]chan Packet) { - // FIXME Should not pass all dataChannels to one viewer - +func handleViewer(s *srtgo.SrtSocket, streams map[string]*stream.Stream, name string) { log.Printf("New SRT viewer for stream %s", name) - // Receive packets from channel and send them - for { - packet := <-dataChannel - if packet.PacketType == "sendData" { - _, err := s.Write(packet.Data, 10000) - if err != nil { - s.Close() - for i, channel := range dataChannels[name] { - if channel == dataChannel { - dataChannels[name] = append(dataChannels[name][:i], dataChannels[name][i+1:]...) - } - } - return - } + // Get requested stream + st, ok := streams[name] + if !ok { + log.Println("Stream does not exist, refusing new viewer") + return + } + + // Register new output + c := make(chan []byte, 1024) + st.Register(c) + st.IncrementClientCount() + + // Receive data and send them + for data := range c { + if len(data) < 1 { + log.Print("Remove SRT viewer because of end of stream") + break + } + + // Send data + _, err := s.Write(data, 1000) + if err != nil { + log.Printf("Remove SRT viewer because of sending error, %s", err) + break } } + + // Close output + st.Unregister(c) + st.DecrementClientCount() + s.Close() } diff --git a/stream/srt/srt.go b/stream/srt/srt.go index 916441a..81e766c 100644 --- a/stream/srt/srt.go +++ b/stream/srt/srt.go @@ -12,10 +12,7 @@ import ( "github.com/haivision/srtgo" "gitlab.crans.org/nounous/ghostream/auth" -) - -var ( - clientDataChannels map[string][]chan Packet + "gitlab.crans.org/nounous/ghostream/stream" ) // Options holds web package configuration @@ -25,13 +22,6 @@ type Options struct { MaxClients int } -// Packet contains the necessary data to broadcast events like stream creating, packet receiving or stream closing. -type Packet struct { - Data []byte - PacketType string - StreamName string -} - // Split host and port from listen address func splitHostPort(hostport string) (string, uint16, error) { host, portS, err := net.SplitHostPort(hostport) @@ -48,13 +38,8 @@ func splitHostPort(hostport string) (string, uint16, error) { return host, uint16(port64), nil } -// GetNumberConnectedSessions get the number of currently connected clients -func GetNumberConnectedSessions(streamID string) int { - return len(clientDataChannels[streamID]) -} - // Serve SRT server -func Serve(cfg *Options, authBackend auth.Backend, forwardingChannel, webrtcChannel chan Packet) { +func Serve(streams map[string]*stream.Stream, authBackend auth.Backend, cfg *Options) { if !cfg.Enabled { // SRT is not enabled, ignore return @@ -75,8 +60,6 @@ func Serve(cfg *Options, authBackend auth.Backend, forwardingChannel, webrtcChan log.Fatal("Unable to listen for SRT clients:", err) } - clientDataChannels = make(map[string][]chan Packet) - for { // Wait for new connection s, err := sck.Accept() @@ -86,6 +69,9 @@ func Serve(cfg *Options, authBackend auth.Backend, forwardingChannel, webrtcChan continue } + // FIXME: Flush socket + // Without this, the SRT buffer might get full before reading it + // streamid can be "name:password" for streamer or "name" for viewer streamID, err := s.GetSockOptString(C.SRTO_STREAMID) if err != nil { @@ -94,10 +80,6 @@ func Serve(cfg *Options, authBackend auth.Backend, forwardingChannel, webrtcChan } split := strings.Split(streamID, ":") - if clientDataChannels[streamID] == nil { - clientDataChannels[streamID] = make([]chan Packet, 0, cfg.MaxClients) - } - if len(split) > 1 { // password was provided so it is a streamer name, password := split[0], split[1] @@ -110,15 +92,13 @@ func Serve(cfg *Options, authBackend auth.Backend, forwardingChannel, webrtcChan } } - go handleStreamer(s, name, clientDataChannels, forwardingChannel, webrtcChannel) + go handleStreamer(s, streams, name) } else { // password was not provided so it is a viewer name := split[0] - dataChannel := make(chan Packet, 4096) - clientDataChannels[streamID] = append(clientDataChannels[streamID], dataChannel) - - go handleViewer(s, name, dataChannel, clientDataChannels) + // Send stream + go handleViewer(s, streams, name) } } } diff --git a/stream/srt/srt_test.go b/stream/srt/srt_test.go index 37f44f5..2ac869c 100644 --- a/stream/srt/srt_test.go +++ b/stream/srt/srt_test.go @@ -5,6 +5,8 @@ import ( "os/exec" "testing" "time" + + "gitlab.crans.org/nounous/ghostream/stream" ) // TestSplitHostPort Try to split a host like 127.0.0.1:1234 in host, port (127.0.0.1, 1234à @@ -55,7 +57,9 @@ func TestServeSRT(t *testing.T) { t.Skip("WARNING: FFMPEG is not installed. Skipping stream test") } - go Serve(&Options{Enabled: true, ListenAddress: ":9711", MaxClients: 2}, nil, nil, nil) + // Init streams messaging and SRT server + streams := make(map[string]*stream.Stream) + go Serve(streams, nil, &Options{Enabled: true, ListenAddress: ":9711", MaxClients: 2}) ffmpeg := exec.Command("ffmpeg", "-hide_banner", "-loglevel", "error", "-f", "lavfi", "-i", "testsrc=size=640x480:rate=10", @@ -78,6 +82,4 @@ func TestServeSRT(t *testing.T) { }() time.Sleep(5 * time.Second) // Delay is in nanoseconds, here 5s - - // TODO Kill SRT server } diff --git a/stream/telnet/telnet.go b/stream/telnet/telnet.go index a9a5693..5090895 100644 --- a/stream/telnet/telnet.go +++ b/stream/telnet/telnet.go @@ -1,178 +1,112 @@ -// Package telnet provides some fancy tools, like an ASCII-art stream. +// Package telnet expose text version of stream. package telnet import ( - "fmt" - "io" "log" "net" "strings" "time" -) -var ( - // Cfg contains the different options of the telnet package, see below - // TODO Config should not be exported - Cfg *Options - currentMessage map[string]*string - clientCount map[string]int + "gitlab.crans.org/nounous/ghostream/stream" ) // Options holds telnet package configuration type Options struct { Enabled bool ListenAddress string - Width int - Height int - Delay int } -// Serve starts the telnet server and listen to clients -func Serve(config *Options) { - Cfg = config - - if !config.Enabled { +// Serve Telnet server +func Serve(streams map[string]*stream.Stream, cfg *Options) { + if !cfg.Enabled { + // Telnet is not enabled, ignore return } - currentMessage = make(map[string]*string) - clientCount = make(map[string]int) - - listener, err := net.Listen("tcp", config.ListenAddress) + // Start TCP server + listener, err := net.Listen("tcp", cfg.ListenAddress) if err != nil { - log.Printf("Error while listening to the address %s: %s", config.ListenAddress, err) - return + log.Fatalf("Error while listening to the address %s: %s", cfg.ListenAddress, err) } + log.Printf("Telnet server listening on %s", cfg.ListenAddress) - go func() { - for { - s, err := listener.Accept() - if err != nil { - log.Printf("Error while accepting TCP socket: %s", s) - continue - } - - go func(s net.Conn) { - streamID := "" - // Request for stream ID - for { - _, err = s.Write([]byte("[GHOSTREAM]\nEnter stream ID: ")) - if err != nil { - log.Println("Error while requesting stream ID to telnet client") - _ = s.Close() - return - } - buff := make([]byte, 255) - n, err := s.Read(buff) - if err != nil { - log.Println("Error while requesting stream ID to telnet client") - _ = s.Close() - return - } - - // Avoid bruteforce - time.Sleep(3 * time.Second) - - streamID = string(buff[:n]) - streamID = strings.Replace(streamID, "\r", "", -1) - streamID = strings.Replace(streamID, "\n", "", -1) - - if len(streamID) > 0 { - if strings.ToLower(streamID) == "exit" { - _, _ = s.Write([]byte("Goodbye!\n")) - _ = s.Close() - return - } - if _, ok := currentMessage[streamID]; !ok { - _, err = s.Write([]byte("Unknown stream ID.\n")) - if err != nil { - log.Println("Error while requesting stream ID to telnet client") - _ = s.Close() - return - } - continue - } - break - } - } - - clientCount[streamID]++ - - // Hide terminal cursor - _, _ = s.Write([]byte("\033[?25l")) - - for { - n, err := s.Write([]byte(*currentMessage[streamID])) - if err != nil { - log.Printf("Error while sending TCP data: %s", err) - _ = s.Close() - clientCount[streamID]-- - break - } - if n == 0 { - _ = s.Close() - clientCount[streamID]-- - break - } - time.Sleep(time.Duration(config.Delay) * time.Millisecond) - } - }(s) - } - }() - - log.Println("Telnet server initialized") -} - -// GetNumberConnectedSessions returns the numbers of clients that are viewing the stream through a telnet shell -func GetNumberConnectedSessions(streamID string) int { - if Cfg == nil || !Cfg.Enabled { - return 0 - } - return clientCount[streamID] -} - -// StartASCIIArtStream send all packets received by ffmpeg as ASCII Art to telnet clients -func StartASCIIArtStream(streamID string, reader io.ReadCloser) { - if !Cfg.Enabled { - _ = reader.Close() - return - } - - currentMessage[streamID] = new(string) - pixelBuff := make([]byte, Cfg.Width*Cfg.Height) - textBuff := strings.Builder{} + // Handle each new client for { - n, err := reader.Read(pixelBuff) + s, err := listener.Accept() if err != nil { - log.Printf("An error occurred while reading input: %s", err) - break - } - if n == 0 { - // Stream is finished - break + log.Printf("Error while accepting TCP socket: %s", s) + continue } - // Header - textBuff.Reset() - textBuff.Grow((40*Cfg.Width+6)*Cfg.Height + 47) - for i := 0; i < 42; i++ { - textBuff.WriteByte('\n') - } - - // Convert image to ASCII - for i, pixel := range pixelBuff { - if i%Cfg.Width == 0 { - // New line - textBuff.WriteString("\033[49m\n") - } - - // Print two times the character to make a square - text := fmt.Sprintf("\033[48;2;%d;%d;%dm ", pixel, pixel, pixel) - textBuff.WriteString(text) - textBuff.WriteString(text) - } - textBuff.WriteString("\033[49m") - - *(currentMessage[streamID]) = textBuff.String() + go handleViewer(s, streams, cfg) } } + +func handleViewer(s net.Conn, streams map[string]*stream.Stream, cfg *Options) { + // Prompt user about stream name + if _, err := s.Write([]byte("[GHOSTREAM]\nEnter stream name: ")); err != nil { + log.Printf("Error while writing to TCP socket: %s", err) + s.Close() + return + } + buff := make([]byte, 255) + n, err := s.Read(buff) + if err != nil { + log.Printf("Error while requesting stream ID to telnet client: %s", err) + s.Close() + return + } + name := strings.TrimSpace(string(buff[:n])) + "@text" + if len(name) < 1 { + // Too short, exit + s.Close() + return + } + + // Wait a bit + time.Sleep(time.Second) + + // Get requested stream + st, ok := streams[name] + if !ok { + log.Println("Stream does not exist, kicking new Telnet viewer") + if _, err := s.Write([]byte("This stream is inactive.\n")); err != nil { + log.Printf("Error while writing to TCP socket: %s", err) + } + s.Close() + return + } + + // Register new client + log.Printf("New Telnet viewer for stream '%s'", name) + c := make(chan []byte, 128) + st.Register(c) + st.IncrementClientCount() + + // Hide terminal cursor + if _, err = s.Write([]byte("\033[?25l")); err != nil { + log.Printf("Error while writing to TCP socket: %s", err) + s.Close() + return + } + + // Receive data and send them + for data := range c { + if len(data) < 1 { + log.Print("Remove Telnet viewer because of end of stream") + break + } + + // Send data + _, err := s.Write(data) + if err != nil { + log.Printf("Remove Telnet viewer because of sending error, %s", err) + break + } + } + + // Close output + st.Unregister(c) + st.DecrementClientCount() + s.Close() +} diff --git a/stream/telnet/telnet_test.go b/stream/telnet/telnet_test.go index 5174157..76b3d3b 100644 --- a/stream/telnet/telnet_test.go +++ b/stream/telnet/telnet_test.go @@ -1,41 +1,35 @@ package telnet import ( - "bytes" - "io/ioutil" - "math/rand" - "net" "testing" - "time" + + "gitlab.crans.org/nounous/ghostream/stream" ) // TestTelnetOutput creates a TCP client that connects to the server and get one image. func TestTelnetOutput(t *testing.T) { // Try to start Telnet server while it is disabled - Serve(&Options{Enabled: false}) - StartASCIIArtStream("demo", ioutil.NopCloser(bytes.NewReader([]byte{}))) - if GetNumberConnectedSessions("demo") != 0 { - t.Fatalf("Mysteriously found %d connected clients", GetNumberConnectedSessions("demo")) - } + streams := make(map[string]*stream.Stream) + go Serve(streams, &Options{Enabled: false}) + + // FIXME test connect // Enable and start Telnet server - Serve(&Options{ + cfg := Options{ Enabled: true, ListenAddress: "127.0.0.1:8023", - Width: 80, - Height: 45, - Delay: 50, - }) + } + go Serve(streams, &cfg) + + // FIXME test connect // Generate a random image, that should be given by FFMPEG - sampleImage := make([]byte, Cfg.Width*Cfg.Height) + /*sampleImage := make([]byte, cfg.Width*cfg.Height) rand.Read(sampleImage) reader := ioutil.NopCloser(bytes.NewBuffer(sampleImage)) - // Send the image to the server - StartASCIIArtStream("demo", reader) // Connect to the Telnet server - client, err := net.Dial("tcp", Cfg.ListenAddress) + client, err := net.Dial("tcp", cfg.ListenAddress) if err != nil { t.Fatalf("Error while connecting to the TCP server: %s", err) } @@ -46,7 +40,7 @@ func TestTelnetOutput(t *testing.T) { t.Fatalf("Error while closing TCP connection: %s", err) } - client, err = net.Dial("tcp", Cfg.ListenAddress) + client, err = net.Dial("tcp", cfg.ListenAddress) if err != nil { t.Fatalf("Error while connecting to the TCP server: %s", err) } @@ -110,5 +104,5 @@ func TestTelnetOutput(t *testing.T) { time.Sleep(time.Second) if GetNumberConnectedSessions("demo") != 0 { t.Fatalf("Expected no telnet client, found %d", GetNumberConnectedSessions("demo")) - } + }*/ } diff --git a/stream/webrtc/ingest.go b/stream/webrtc/ingest.go index 66a4b0a..ccede0d 100644 --- a/stream/webrtc/ingest.go +++ b/stream/webrtc/ingest.go @@ -3,61 +3,53 @@ package webrtc import ( "bufio" - "fmt" - "io" "log" "net" "os/exec" + "strings" + "time" "github.com/pion/rtp" "github.com/pion/webrtc/v3" - "gitlab.crans.org/nounous/ghostream/stream/srt" - "gitlab.crans.org/nounous/ghostream/stream/telnet" + "gitlab.crans.org/nounous/ghostream/stream" ) var ( - ffmpeg = make(map[string]*exec.Cmd) - ffmpegInput = make(map[string]io.WriteCloser) + activeStream map[string]struct{} ) -func ingestFrom(inputChannel chan srt.Packet) { - // FIXME Clean code - +func autoIngest(streams map[string]*stream.Stream) { + // Regulary check existing streams + activeStream = make(map[string]struct{}) for { - var err error = nil - srtPacket := <-inputChannel - switch srtPacket.PacketType { - case "register": - go registerStream(&srtPacket) - break - case "sendData": - if _, ok := ffmpegInput[srtPacket.StreamName]; !ok { - break + for name, st := range streams { + if strings.Contains(name, "@") { + // Not a source stream, pass + continue } - // FIXME send to stream srtPacket.StreamName - if _, err := ffmpegInput[srtPacket.StreamName].Write(srtPacket.Data); err != nil { - log.Printf("Failed to write data to ffmpeg input: %s", err) + + if _, ok := activeStream[name]; ok { + // Stream is already ingested + continue } - break - case "close": - log.Printf("WebRTC CloseConnection %s", srtPacket.StreamName) - _ = ffmpeg[srtPacket.StreamName].Process.Kill() - _ = ffmpegInput[srtPacket.StreamName].Close() - delete(ffmpeg, srtPacket.StreamName) - delete(ffmpegInput, srtPacket.StreamName) - break - default: - log.Println("Unknown SRT srtPacket type:", srtPacket.PacketType) - break - } - if err != nil { - log.Printf("Error occurred while receiving SRT srtPacket of type %s: %s", srtPacket.PacketType, err) + + // Start ingestion + log.Printf("Starting webrtc for '%s'", name) + go ingest(name, st) } + + // Regulary pull stream list, + // it may be better to tweak the messaging system + // to get an event on a new stream. + time.Sleep(time.Second) } } -func registerStream(srtPacket *srt.Packet) { - log.Printf("WebRTC RegisterStream %s", srtPacket.StreamName) +func ingest(name string, input *stream.Stream) { + // Register to get stream + videoInput := make(chan []byte, 1024) + input.Register(videoInput) + activeStream[name] = struct{}{} // Open a UDP Listener for RTP Packets on port 5004 videoListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5004}) @@ -70,55 +62,12 @@ func registerStream(srtPacket *srt.Packet) { log.Printf("Faited to open UDP listener %s", err) return } - // FIXME Close UDP listeners at the end of the stream, not the end of the routine - /* defer func() { - if err = videoListener.Close(); err != nil { - log.Printf("Faited to close UDP listener %s", err) - } - if err = audioListener.Close(); err != nil { - log.Printf("Faited to close UDP listener %s", err) - } - }() */ - ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0", - "-an", "-vcodec", "libvpx", "-crf", "10", "-cpu-used", "5", "-b:v", "6000k", "-maxrate", "8000k", "-bufsize", "12000k", // TODO Change bitrate when changing quality - "-qmin", "10", "-qmax", "42", "-threads", "4", "-deadline", "1", "-error-resilient", "1", - "-auto-alt-ref", "1", - "-f", "rtp", "rtp://127.0.0.1:5004", - "-vn", "-acodec", "libopus", "-cpu-used", "5", "-deadline", "1", "-qmin", "10", "-qmax", "42", "-error-resilient", "1", "-auto-alt-ref", "1", - "-f", "rtp", "rtp://127.0.0.1:5005"} - - // Export stream to ascii art - if telnet.Cfg.Enabled { - bitrate := fmt.Sprintf("%dk", telnet.Cfg.Width*telnet.Cfg.Height/telnet.Cfg.Delay) - ffmpegArgs = append(ffmpegArgs, - "-an", "-vf", fmt.Sprintf("scale=%dx%d", telnet.Cfg.Width, telnet.Cfg.Height), - "-b:v", bitrate, "-minrate", bitrate, "-maxrate", bitrate, "-bufsize", bitrate, "-q", "42", "-pix_fmt", "gray", "-f", "rawvideo", "pipe:1") - } - - ffmpeg[srtPacket.StreamName] = exec.Command("ffmpeg", ffmpegArgs...) - - input, err := ffmpeg[srtPacket.StreamName].StdinPipe() + // Start ffmpag to convert videoInput to video and audio UDP + ffmpeg, err := startFFmpeg(videoInput) if err != nil { - panic(err) - } - ffmpegInput[srtPacket.StreamName] = input - errOutput, err := ffmpeg[srtPacket.StreamName].StderrPipe() - if err != nil { - panic(err) - } - - // Receive raw video output and convert it to ASCII art, then forward it TCP - if telnet.Cfg.Enabled { - output, err := ffmpeg[srtPacket.StreamName].StdoutPipe() - if err != nil { - panic(err) - } - go telnet.StartASCIIArtStream(srtPacket.StreamName, output) - } - - if err := ffmpeg[srtPacket.StreamName].Start(); err != nil { - panic(err) + log.Printf("Error while starting ffmpeg: %s", err) + return } // Receive video @@ -128,7 +77,7 @@ func registerStream(srtPacket *srt.Packet) { n, _, err := videoListener.ReadFromUDP(inboundRTPPacket) if err != nil { log.Printf("Failed to read from UDP: %s", err) - continue + break } packet := &rtp.Packet{} if err := packet.Unmarshal(inboundRTPPacket[:n]); err != nil { @@ -136,13 +85,13 @@ func registerStream(srtPacket *srt.Packet) { continue } - if videoTracks[srtPacket.StreamName] == nil { - videoTracks[srtPacket.StreamName] = make([]*webrtc.Track, 0) + if videoTracks[name] == nil { + videoTracks[name] = make([]*webrtc.Track, 0) } // Write RTP srtPacket to all video tracks // Adapt payload and SSRC to match destination - for _, videoTrack := range videoTracks[srtPacket.StreamName] { + for _, videoTrack := range videoTracks[name] { packet.Header.PayloadType = videoTrack.PayloadType() packet.Header.SSRC = videoTrack.SSRC() if writeErr := videoTrack.WriteRTP(packet); writeErr != nil { @@ -160,7 +109,7 @@ func registerStream(srtPacket *srt.Packet) { n, _, err := audioListener.ReadFromUDP(inboundRTPPacket) if err != nil { log.Printf("Failed to read from UDP: %s", err) - continue + break } packet := &rtp.Packet{} if err := packet.Unmarshal(inboundRTPPacket[:n]); err != nil { @@ -168,13 +117,13 @@ func registerStream(srtPacket *srt.Packet) { continue } - if audioTracks[srtPacket.StreamName] == nil { - audioTracks[srtPacket.StreamName] = make([]*webrtc.Track, 0) + if audioTracks[name] == nil { + audioTracks[name] = make([]*webrtc.Track, 0) } // Write RTP srtPacket to all audio tracks // Adapt payload and SSRC to match destination - for _, audioTrack := range audioTracks[srtPacket.StreamName] { + for _, audioTrack := range audioTracks[name] { packet.Header.PayloadType = audioTrack.PayloadType() packet.Header.SSRC = audioTrack.SSRC() if writeErr := audioTrack.WriteRTP(packet); writeErr != nil { @@ -185,10 +134,60 @@ func registerStream(srtPacket *srt.Packet) { } }() + // Wait for stopped ffmpeg + if err = ffmpeg.Wait(); err != nil { + log.Printf("Faited to wait for ffmpeg: %s", err) + } + + // Close UDP listeners + if err = videoListener.Close(); err != nil { + log.Printf("Faited to close UDP listener: %s", err) + } + if err = audioListener.Close(); err != nil { + log.Printf("Faited to close UDP listener: %s", err) + } + delete(activeStream, name) +} + +func startFFmpeg(in <-chan []byte) (ffmpeg *exec.Cmd, err error) { + ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0", + "-an", "-vcodec", "libvpx", "-crf", "10", "-cpu-used", "5", "-b:v", "6000k", "-maxrate", "8000k", "-bufsize", "12000k", // TODO Change bitrate when changing quality + "-qmin", "10", "-qmax", "42", "-threads", "4", "-deadline", "1", "-error-resilient", "1", + "-auto-alt-ref", "1", + "-f", "rtp", "rtp://127.0.0.1:5004", + "-vn", "-acodec", "libopus", "-cpu-used", "5", "-deadline", "1", "-qmin", "10", "-qmax", "42", "-error-resilient", "1", "-auto-alt-ref", "1", + "-f", "rtp", "rtp://127.0.0.1:5005"} + ffmpeg = exec.Command("ffmpeg", ffmpegArgs...) + + // Handle errors output + errOutput, err := ffmpeg.StderrPipe() + if err != nil { + return nil, err + } go func() { scanner := bufio.NewScanner(errOutput) for scanner.Scan() { log.Printf("[WEBRTC FFMPEG %s] %s", "demo", scanner.Text()) } }() + + // Handle stream input + input, err := ffmpeg.StdinPipe() + if err != nil { + return nil, err + } + go func() { + for data := range in { + if _, err := input.Write(data); err != nil { + log.Printf("Failed to write data to ffmpeg input: %s", err) + } + } + + // End of stream + ffmpeg.Process.Kill() + }() + + // Start process + err = ffmpeg.Start() + return ffmpeg, err } diff --git a/stream/webrtc/webrtc.go b/stream/webrtc/webrtc.go index 365ebc3..37390dd 100644 --- a/stream/webrtc/webrtc.go +++ b/stream/webrtc/webrtc.go @@ -8,7 +8,7 @@ import ( "github.com/pion/webrtc/v3" "gitlab.crans.org/nounous/ghostream/internal/monitoring" - "gitlab.crans.org/nounous/ghostream/stream/srt" + "gitlab.crans.org/nounous/ghostream/stream" ) // Options holds web package configuration @@ -182,12 +182,12 @@ func getPayloadType(m webrtc.MediaEngine, codecType webrtc.RTPCodecType, codecNa } // Serve WebRTC media streaming server -func Serve(remoteSdpChan chan struct { +func Serve(streams map[string]*stream.Stream, remoteSdpChan chan struct { StreamID string RemoteDescription webrtc.SessionDescription -}, localSdpChan chan webrtc.SessionDescription, inputChannel chan srt.Packet, cfg *Options) { +}, localSdpChan chan webrtc.SessionDescription, cfg *Options) { if !cfg.Enabled { - // SRT is not enabled, ignore + // WebRTC is not enabled, ignore return } @@ -197,8 +197,8 @@ func Serve(remoteSdpChan chan struct { videoTracks = make(map[string][]*webrtc.Track) audioTracks = make(map[string][]*webrtc.Track) - // Ingest data from SRT - go ingestFrom(inputChannel) + // Ingest data + go autoIngest(streams) // Handle new connections for { diff --git a/stream/webrtc/webrtc_test.go b/stream/webrtc/webrtc_test.go index 19f4b34..ee34ca5 100644 --- a/stream/webrtc/webrtc_test.go +++ b/stream/webrtc/webrtc_test.go @@ -5,24 +5,24 @@ import ( "testing" "github.com/pion/webrtc/v3" - "gitlab.crans.org/nounous/ghostream/stream/srt" + "gitlab.crans.org/nounous/ghostream/stream" ) func TestServe(t *testing.T) { - // Serve WebRTC server + // Init streams messaging and WebRTC server + streams := make(map[string]*stream.Stream) remoteSdpChan := make(chan struct { StreamID string RemoteDescription webrtc.SessionDescription }) localSdpChan := make(chan webrtc.SessionDescription) - webrtcChannel := make(chan srt.Packet, 64) cfg := Options{ Enabled: true, MinPortUDP: 10000, MaxPortUDP: 10005, STUNServers: []string{"stun:stun.l.google.com:19302"}, } - go Serve(remoteSdpChan, localSdpChan, webrtcChannel, &cfg) + go Serve(streams, remoteSdpChan, localSdpChan, &cfg) // New client connection mediaEngine := webrtc.MediaEngine{} diff --git a/transcoder/text/text.go b/transcoder/text/text.go new file mode 100644 index 0000000..02a0550 --- /dev/null +++ b/transcoder/text/text.go @@ -0,0 +1,157 @@ +// Package text transcode a video to text +package text + +import ( + "bufio" + "bytes" + "fmt" + "io" + "log" + "os/exec" + "strings" + "time" + + "gitlab.crans.org/nounous/ghostream/stream" +) + +// Options holds text package configuration +type Options struct { + Enabled bool + Width int + Height int + Framerate int +} + +// Init text transcoder +func Init(streams map[string]*stream.Stream, cfg *Options) { + if !cfg.Enabled { + // Text transcode is not enabled, ignore + return + } + + // Regulary check existing streams + for { + for sourceName, sourceStream := range streams { + if strings.Contains(sourceName, "@") { + // Not a source stream, pass + continue + } + + // Check that the transcoded stream does not already exist + name := sourceName + "@text" + _, ok := streams[name] + if ok { + // Stream is already transcoded + continue + } + + // Start conversion + log.Printf("Starting text transcode '%s'", name) + st := stream.New() + streams[name] = st + + go transcode(sourceStream, st, cfg) + } + + // Regulary pull stream list, + // it may be better to tweak the messaging system + // to get an event on a new stream. + time.Sleep(time.Second) + } +} + +// Convert video to ANSI text +func transcode(input, output *stream.Stream, cfg *Options) { + // Start ffmpeg to transcode video to rawvideo + videoInput := make(chan []byte, 1024) + input.Register(videoInput) + ffmpeg, rawvideo, err := startFFmpeg(videoInput, cfg) + if err != nil { + log.Printf("Error while starting ffmpeg: %s", err) + return + } + + // Transcode rawvideo to ANSI text + pixelBuff := make([]byte, cfg.Width*cfg.Height) + textBuff := bytes.Buffer{} + for { + n, err := (*rawvideo).Read(pixelBuff) + if err != nil { + log.Printf("An error occurred while reading input: %s", err) + break + } + if n == 0 { + // Stream is finished + break + } + + // Header + textBuff.Reset() + textBuff.Grow((40*cfg.Width+6)*cfg.Height + 47) + for i := 0; i < 42; i++ { + textBuff.WriteByte('\n') + } + + // Convert image to ASCII + for i, pixel := range pixelBuff { + if i%cfg.Width == 0 { + // New line + textBuff.WriteString("\033[49m\n") + } + + // Print two times the character to make a square + text := fmt.Sprintf("\033[48;2;%d;%d;%dm ", pixel, pixel, pixel) + textBuff.WriteString(text) + textBuff.WriteString(text) + } + textBuff.WriteString("\033[49m") + + output.Broadcast <- textBuff.Bytes() + } + + // Stop transcode + ffmpeg.Process.Kill() +} + +// Start a ffmpeg instance to convert stream into rawvideo +func startFFmpeg(in <-chan []byte, cfg *Options) (*exec.Cmd, *io.ReadCloser, error) { + bitrate := fmt.Sprintf("%dk", cfg.Width*cfg.Height*cfg.Framerate) + ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0", + "-an", "-vf", fmt.Sprintf("scale=%dx%d", cfg.Width, cfg.Height), + "-b:v", bitrate, "-minrate", bitrate, "-maxrate", bitrate, "-bufsize", bitrate, + "-q", "42", "-pix_fmt", "gray", "-f", "rawvideo", "pipe:1"} + ffmpeg := exec.Command("ffmpeg", ffmpegArgs...) + + // Handle errors output + errOutput, err := ffmpeg.StderrPipe() + if err != nil { + return nil, nil, err + } + go func() { + scanner := bufio.NewScanner(errOutput) + for scanner.Scan() { + log.Printf("[TELNET FFMPEG %s] %s", "demo", scanner.Text()) + } + }() + + // Handle text output + output, err := ffmpeg.StdoutPipe() + if err != nil { + return nil, nil, err + } + + // Handle stream input + input, err := ffmpeg.StdinPipe() + if err != nil { + return nil, nil, err + } + go func() { + for data := range in { + input.Write(data) + } + }() + + // Start process + err = ffmpeg.Start() + return ffmpeg, &output, err +} diff --git a/transcoder/text/text_test.go b/transcoder/text/text_test.go new file mode 100644 index 0000000..4c96c5e --- /dev/null +++ b/transcoder/text/text_test.go @@ -0,0 +1 @@ +package text diff --git a/transcoder/transcoder.go b/transcoder/transcoder.go new file mode 100644 index 0000000..bf84170 --- /dev/null +++ b/transcoder/transcoder.go @@ -0,0 +1,17 @@ +// Package transcoder manages transcoders +package transcoder + +import ( + "gitlab.crans.org/nounous/ghostream/stream" + "gitlab.crans.org/nounous/ghostream/transcoder/text" +) + +// Options holds text package configuration +type Options struct { + Text text.Options +} + +// Init all transcoders +func Init(streams map[string]*stream.Stream, cfg *Options) { + go text.Init(streams, &cfg.Text) +} diff --git a/transcoder/transcoder_test.go b/transcoder/transcoder_test.go new file mode 100644 index 0000000..5493916 --- /dev/null +++ b/transcoder/transcoder_test.go @@ -0,0 +1 @@ +package transcoder diff --git a/web/handler.go b/web/handler.go index d23f816..886e6e4 100644 --- a/web/handler.go +++ b/web/handler.go @@ -13,14 +13,12 @@ import ( "github.com/markbates/pkger" "gitlab.crans.org/nounous/ghostream/internal/monitoring" - "gitlab.crans.org/nounous/ghostream/stream/srt" - "gitlab.crans.org/nounous/ghostream/stream/telnet" "gitlab.crans.org/nounous/ghostream/stream/webrtc" ) var ( // Precompile regex - validPath = regexp.MustCompile("^/[a-z0-9_-]*$") + validPath = regexp.MustCompile("^/[a-z0-9@_-]*$") ) // Handle WebRTC session description exchange via POST @@ -152,14 +150,19 @@ func staticHandler() http.Handler { } func statisticsHandler(w http.ResponseWriter, r *http.Request) { - // Display connected users stats, from WebRTC or streaming directly from a video player - streamID := strings.Replace(r.URL.Path[7:], "/", "", -1) + name := strings.Replace(r.URL.Path[7:], "/", "", -1) + userCount := 0 + + // Get requested stream + stream, ok := streams[name] + if ok { + // Get number of output channels + userCount = stream.ClientCount() + } + + // Display connected users statistics enc := json.NewEncoder(w) - err := enc.Encode(struct { - ConnectedViewers int - }{webrtc.GetNumberConnectedSessions(streamID) + - srt.GetNumberConnectedSessions(streamID) + - telnet.GetNumberConnectedSessions(streamID)}) + err := enc.Encode(struct{ ConnectedViewers int }{userCount}) if err != nil { http.Error(w, "Failed to generate JSON.", http.StatusInternalServerError) log.Printf("Failed to generate JSON: %s", err) diff --git a/web/web.go b/web/web.go index 929053b..1f10a66 100644 --- a/web/web.go +++ b/web/web.go @@ -11,6 +11,7 @@ import ( "github.com/markbates/pkger" "github.com/pion/webrtc/v3" + "gitlab.crans.org/nounous/ghostream/stream" ) // Options holds web package configuration @@ -41,6 +42,9 @@ var ( // Preload templates templates *template.Template + + // Streams to get statistics + streams map[string]*stream.Stream ) // Load templates with pkger @@ -74,10 +78,11 @@ func loadTemplates() error { } // Serve HTTP server -func Serve(rSdpChan chan struct { +func Serve(s map[string]*stream.Stream, rSdpChan chan struct { StreamID string RemoteDescription webrtc.SessionDescription }, lSdpChan chan webrtc.SessionDescription, c *Options) { + streams = s remoteSdpChan = rSdpChan localSdpChan = lSdpChan cfg = c diff --git a/web/web_test.go b/web/web_test.go index 5d302f6..54378f3 100644 --- a/web/web_test.go +++ b/web/web_test.go @@ -4,11 +4,17 @@ import ( "net/http" "testing" "time" + + "gitlab.crans.org/nounous/ghostream/stream" ) // TestHTTPServe tries to serve a real HTTP server and load some pages func TestHTTPServe(t *testing.T) { - go Serve(nil, nil, &Options{Enabled: false, ListenAddress: "127.0.0.1:8081"}) + // Init streams messaging + streams := make(map[string]*stream.Stream) + + // Create a disabled web server + go Serve(streams, nil, nil, &Options{Enabled: false, ListenAddress: "127.0.0.1:8081"}) // Sleep 500ms to ensure that the web server is running, to avoid fails because the request came too early time.Sleep(500 * time.Millisecond) @@ -20,7 +26,7 @@ func TestHTTPServe(t *testing.T) { } // Now let's really start the web server - go Serve(nil, nil, &Options{Enabled: true, ListenAddress: "127.0.0.1:8081"}) + go Serve(streams, nil, nil, &Options{Enabled: true, ListenAddress: "127.0.0.1:8081"}) // Sleep 500ms to ensure that the web server is running, to avoid fails because the request came too early time.Sleep(500 * time.Millisecond) @@ -52,7 +58,7 @@ func TestHTTPServe(t *testing.T) { t.Errorf("Viewer page returned %v != %v on GET", resp.StatusCode, http.StatusOK) } - // Test viewer statistic endpoint + // Test viewer statistics endpoint resp, err = http.Get("http://localhost:8081/_stats/demo/") if err != nil { t.Errorf("Error while getting /_stats: %s", err)