Merge branch 'messaging' into 'dev'

Messaging

See merge request nounous/ghostream!6
This commit is contained in:
erdnaxe 2020-10-18 16:17:01 +02:00
commit e51885aedc
22 changed files with 692 additions and 393 deletions

View File

@ -82,16 +82,24 @@ telnet:
#
#listenAddress: :8023
# Size is in characters. It is recommended to keep a 16x9 format.
#
#width: 80
#height: 45
## Transcoders configuration ##
transcoder:
text:
# By default the text transcoder is disabled.
# You need to enable it to use telnet output.
#
#enabled: false
# Time in milliseconds between two images.
# By default 50 ms, so 20 FPS.
# Displaying text takes time.
#
#delay: 50
# Size is in characters. It is recommended to keep a 16x9 format.
#
#width: 80
#height: 45
# Time in milliseconds between two images.
# By default 50 ms, so 20 FPS.
# Displaying text takes time.
#
#delay: 50
## Web server ##
# The web server serves a WebRTC player.

View File

@ -13,6 +13,8 @@ import (
"gitlab.crans.org/nounous/ghostream/stream/srt"
"gitlab.crans.org/nounous/ghostream/stream/telnet"
"gitlab.crans.org/nounous/ghostream/stream/webrtc"
"gitlab.crans.org/nounous/ghostream/transcoder"
"gitlab.crans.org/nounous/ghostream/transcoder/text"
"gitlab.crans.org/nounous/ghostream/web"
)
@ -23,6 +25,7 @@ type Config struct {
Monitoring monitoring.Options
Srt srt.Options
Telnet telnet.Options
Transcoder transcoder.Options
Web web.Options
WebRTC webrtc.Options
}
@ -54,9 +57,14 @@ func New() *Config {
Telnet: telnet.Options{
Enabled: false,
ListenAddress: ":8023",
Width: 80,
Height: 45,
Delay: 50,
},
Transcoder: transcoder.Options{
Text: text.Options{
Enabled: false,
Width: 80,
Height: 45,
Framerate: 20,
},
},
Web: web.Options{
Enabled: true,

18
main.go
View File

@ -10,10 +10,12 @@ import (
"gitlab.crans.org/nounous/ghostream/auth"
"gitlab.crans.org/nounous/ghostream/internal/config"
"gitlab.crans.org/nounous/ghostream/internal/monitoring"
"gitlab.crans.org/nounous/ghostream/stream"
"gitlab.crans.org/nounous/ghostream/stream/forwarding"
"gitlab.crans.org/nounous/ghostream/stream/srt"
"gitlab.crans.org/nounous/ghostream/stream/telnet"
"gitlab.crans.org/nounous/ghostream/stream/webrtc"
"gitlab.crans.org/nounous/ghostream/transcoder"
"gitlab.crans.org/nounous/ghostream/web"
)
@ -43,17 +45,17 @@ func main() {
})
localSdpChan := make(chan webrtc.SessionDescription)
// SRT channel for forwarding and webrtc
forwardingChannel := make(chan srt.Packet, 64)
webrtcChannel := make(chan srt.Packet, 64)
// Init streams messaging
streams := make(map[string]*stream.Stream)
// Start routines
go forwarding.Serve(forwardingChannel, cfg.Forwarding)
go transcoder.Init(streams, &cfg.Transcoder)
go forwarding.Serve(streams, cfg.Forwarding)
go monitoring.Serve(&cfg.Monitoring)
go srt.Serve(&cfg.Srt, authBackend, forwardingChannel, webrtcChannel)
go telnet.Serve(&cfg.Telnet)
go web.Serve(remoteSdpChan, localSdpChan, &cfg.Web)
go webrtc.Serve(remoteSdpChan, localSdpChan, webrtcChannel, &cfg.WebRTC)
go srt.Serve(streams, authBackend, &cfg.Srt)
go telnet.Serve(streams, &cfg.Telnet)
go web.Serve(streams, remoteSdpChan, localSdpChan, &cfg.Web)
go webrtc.Serve(streams, remoteSdpChan, localSdpChan, &cfg.WebRTC)
// Wait for routines
select {}

View File

@ -2,12 +2,10 @@
package forwarding
import (
"bufio"
"io"
"log"
"os/exec"
"time"
"gitlab.crans.org/nounous/ghostream/stream/srt"
"gitlab.crans.org/nounous/ghostream/stream"
)
// Options to configure the stream forwarding.
@ -15,21 +13,46 @@ import (
type Options map[string][]string
// Serve handles incoming packets from SRT and forward them to other external services
func Serve(inputChannel chan srt.Packet, cfg Options) {
func Serve(streams map[string]*stream.Stream, cfg Options) {
if len(cfg) < 1 {
// No forwarding, ignore
for {
<-inputChannel // Clear input channel
}
return
}
log.Printf("Stream forwarding initialized")
ffmpegInstances := make(map[string]*exec.Cmd)
for {
for name, st := range streams {
fwdCfg, ok := cfg[name]
if !ok {
// Not configured
continue
}
// Start forwarding
log.Printf("Starting forwarding for '%s'", name)
go forward(st, fwdCfg)
}
// Regulary pull stream list,
// it may be better to tweak the messaging system
// to get an event on a new stream.
time.Sleep(time.Second)
}
}
func forward(st *stream.Stream, fwdCfg []string) {
// FIXME
/*ffmpegInstances := make(map[string]*exec.Cmd)
ffmpegInputStreams := make(map[string]*io.WriteCloser)
for {
var err error = nil
// Wait for packets
packet := <-inputChannel
// FIXME packet := <-inputChannel
packet := srt.Packet{
Data: []byte{},
PacketType: "nothing",
StreamName: "demo",
}
switch packet.PacketType {
case "register":
err = registerStream(packet.StreamName, ffmpegInstances, ffmpegInputStreams, cfg)
@ -47,9 +70,10 @@ func Serve(inputChannel chan srt.Packet, cfg Options) {
if err != nil {
log.Printf("Error occurred while receiving SRT packet of type %s: %s", packet.PacketType, err)
}
}
}*/
}
/*
// registerStream creates ffmpeg instance associated with newly created stream
func registerStream(name string, ffmpegInstances map[string]*exec.Cmd, ffmpegInputStreams map[string]*io.WriteCloser, cfg Options) error {
streams, exist := cfg[name]
@ -119,3 +143,4 @@ func close(name string, ffmpegInstances map[string]*exec.Cmd, ffmpegInputStreams
delete(ffmpegInputStreams, name)
return nil
}
*/

View File

@ -6,6 +6,7 @@ import (
"testing"
"time"
"gitlab.crans.org/nounous/ghostream/stream"
"gitlab.crans.org/nounous/ghostream/stream/srt"
)
@ -30,16 +31,15 @@ func TestForwardStream(t *testing.T) {
}
}()
forwardingList := make(map[string][]string)
forwardingList["demo"] = []string{"rtmp://127.0.0.1:1936/live/app"}
forwardingChannel := make(chan srt.Packet)
cfg := make(map[string][]string)
cfg["demo"] = []string{"rtmp://127.0.0.1:1936/live/app"}
// Register forwarding stream list
go Serve(forwardingChannel, forwardingList)
streams := make(map[string]*stream.Stream)
go Serve(streams, cfg)
// Serve SRT Server without authentification backend
go srt.Serve(&srt.Options{Enabled: true, ListenAddress: ":9712", MaxClients: 2}, nil, forwardingChannel, nil)
go srt.Serve(streams, nil, &srt.Options{Enabled: true, ListenAddress: ":9712", MaxClients: 2})
ffmpeg := exec.Command("ffmpeg", "-hide_banner", "-loglevel", "error",
"-re", "-f", "lavfi", "-i", "testsrc=size=640x480:rate=10",

99
stream/messaging.go Normal file
View File

@ -0,0 +1,99 @@
// Package stream defines a structure to communication between inputs and outputs
package stream
import (
"sync"
)
// Stream makes packages able to subscribe to an incoming stream
type Stream struct {
// Incoming data come from this channel
Broadcast chan<- []byte
// Use a map to be able to delete an item
outputs map[chan []byte]struct{}
// Count clients for statistics
nbClients int
// Mutex to lock outputs map
lock sync.Mutex
}
// New creates a new stream.
func New() *Stream {
s := &Stream{}
broadcast := make(chan []byte, 1024)
s.Broadcast = broadcast
s.outputs = make(map[chan []byte]struct{})
s.nbClients = 0
go s.run(broadcast)
return s
}
func (s *Stream) run(broadcast <-chan []byte) {
for msg := range broadcast {
s.lock.Lock()
for output := range s.outputs {
select {
case output <- msg:
default:
// If full, do a ring buffer
// Check that output is not of size zero
if len(output) > 1 {
<-output
}
}
}
s.lock.Unlock()
}
// Incoming chan has been closed, close all outputs
s.lock.Lock()
for ch := range s.outputs {
delete(s.outputs, ch)
close(ch)
}
s.lock.Unlock()
}
// Close the incoming chan, this will also delete all outputs
func (s *Stream) Close() {
close(s.Broadcast)
}
// Register a new output on a stream.
func (s *Stream) Register(output chan []byte) {
s.lock.Lock()
defer s.lock.Unlock()
s.outputs[output] = struct{}{}
}
// Unregister removes an output.
// If hidden in true, then do not count this client.
func (s *Stream) Unregister(output chan []byte) {
s.lock.Lock()
defer s.lock.Unlock()
// Make sure we did not already close this output
_, ok := s.outputs[output]
if ok {
delete(s.outputs, output)
close(output)
}
}
// ClientCount returns the number of clients
func (s *Stream) ClientCount() int {
return s.nbClients
}
// IncrementClientCount increments the number of clients
func (s *Stream) IncrementClientCount() {
s.nbClients++
}
// DecrementClientCount decrements the number of clients
func (s *Stream) DecrementClientCount() {
s.nbClients--
}

42
stream/messaging_test.go Normal file
View File

@ -0,0 +1,42 @@
package stream
import (
"testing"
)
func TestWithoutOutputs(t *testing.T) {
stream := New()
defer stream.Close()
stream.Broadcast <- []byte("hello world")
}
func TestWithOneOutput(t *testing.T) {
stream := New()
defer stream.Close()
// Register one output
output := make(chan []byte, 64)
stream.Register(output)
stream.IncrementClientCount()
// Try to pass one message
stream.Broadcast <- []byte("hello world")
msg := <-output
if string(msg) != "hello world" {
t.Errorf("Message has wrong content: %s != hello world", msg)
}
// Check client count
if count := stream.ClientCount(); count != 1 {
t.Errorf("Client counter returned %d, expected 1", count)
}
// Unregister
stream.Unregister(output)
stream.DecrementClientCount()
// Check client count
if count := stream.ClientCount(); count != 0 {
t.Errorf("Client counter returned %d, expected 0", count)
}
}

View File

@ -5,23 +5,30 @@ import (
"log"
"github.com/haivision/srtgo"
"gitlab.crans.org/nounous/ghostream/stream"
)
func handleStreamer(s *srtgo.SrtSocket, name string, clientDataChannels map[string][]chan Packet, forwardingChannel, webrtcChannel chan Packet) {
func handleStreamer(socket *srtgo.SrtSocket, streams map[string]*stream.Stream, name string) {
// Check stream does not exist
if _, ok := streams[name]; ok {
log.Print("Stream already exists, refusing new streamer")
socket.Close()
return
}
// Create stream
log.Printf("New SRT streamer for stream %s", name)
// Create a new buffer
// UDP packet cannot be larger than MTU (1500)
buff := make([]byte, 1500)
// Setup stream forwarding
forwardingChannel <- Packet{StreamName: name, PacketType: "register", Data: nil}
webrtcChannel <- Packet{StreamName: name, PacketType: "register", Data: nil}
st := stream.New()
streams[name] = st
// Read RTP packets forever and send them to the WebRTC Client
for {
// Create a new buffer
// UDP packet cannot be larger than MTU (1500)
buff := make([]byte, 1500)
// 5s timeout
n, err := s.Read(buff, 5000)
n, err := socket.Read(buff, 5000)
if err != nil {
log.Println("Error occurred while reading SRT socket:", err)
break
@ -33,40 +40,49 @@ func handleStreamer(s *srtgo.SrtSocket, name string, clientDataChannels map[stri
break
}
// Send raw packet to other streams
// Copy data in another buffer to ensure that the data would not be overwritten
data := make([]byte, n)
copy(data, buff[:n])
forwardingChannel <- Packet{StreamName: name, PacketType: "sendData", Data: data}
webrtcChannel <- Packet{StreamName: name, PacketType: "sendData", Data: data}
for _, dataChannel := range clientDataChannels[name] {
dataChannel <- Packet{StreamName: name, PacketType: "sendData", Data: data}
}
// Send raw data to other streams
buff = buff[:n]
st.Broadcast <- buff
}
forwardingChannel <- Packet{StreamName: name, PacketType: "close", Data: nil}
webrtcChannel <- Packet{StreamName: name, PacketType: "close", Data: nil}
// Close stream
st.Close()
socket.Close()
delete(streams, name)
}
func handleViewer(s *srtgo.SrtSocket, name string, dataChannel chan Packet, dataChannels map[string][]chan Packet) {
// FIXME Should not pass all dataChannels to one viewer
func handleViewer(s *srtgo.SrtSocket, streams map[string]*stream.Stream, name string) {
log.Printf("New SRT viewer for stream %s", name)
// Receive packets from channel and send them
for {
packet := <-dataChannel
if packet.PacketType == "sendData" {
_, err := s.Write(packet.Data, 10000)
if err != nil {
s.Close()
for i, channel := range dataChannels[name] {
if channel == dataChannel {
dataChannels[name] = append(dataChannels[name][:i], dataChannels[name][i+1:]...)
}
}
return
}
// Get requested stream
st, ok := streams[name]
if !ok {
log.Println("Stream does not exist, refusing new viewer")
return
}
// Register new output
c := make(chan []byte, 1024)
st.Register(c)
st.IncrementClientCount()
// Receive data and send them
for data := range c {
if len(data) < 1 {
log.Print("Remove SRT viewer because of end of stream")
break
}
// Send data
_, err := s.Write(data, 1000)
if err != nil {
log.Printf("Remove SRT viewer because of sending error, %s", err)
break
}
}
// Close output
st.Unregister(c)
st.DecrementClientCount()
s.Close()
}

View File

@ -12,10 +12,7 @@ import (
"github.com/haivision/srtgo"
"gitlab.crans.org/nounous/ghostream/auth"
)
var (
clientDataChannels map[string][]chan Packet
"gitlab.crans.org/nounous/ghostream/stream"
)
// Options holds web package configuration
@ -25,13 +22,6 @@ type Options struct {
MaxClients int
}
// Packet contains the necessary data to broadcast events like stream creating, packet receiving or stream closing.
type Packet struct {
Data []byte
PacketType string
StreamName string
}
// Split host and port from listen address
func splitHostPort(hostport string) (string, uint16, error) {
host, portS, err := net.SplitHostPort(hostport)
@ -48,13 +38,8 @@ func splitHostPort(hostport string) (string, uint16, error) {
return host, uint16(port64), nil
}
// GetNumberConnectedSessions get the number of currently connected clients
func GetNumberConnectedSessions(streamID string) int {
return len(clientDataChannels[streamID])
}
// Serve SRT server
func Serve(cfg *Options, authBackend auth.Backend, forwardingChannel, webrtcChannel chan Packet) {
func Serve(streams map[string]*stream.Stream, authBackend auth.Backend, cfg *Options) {
if !cfg.Enabled {
// SRT is not enabled, ignore
return
@ -75,8 +60,6 @@ func Serve(cfg *Options, authBackend auth.Backend, forwardingChannel, webrtcChan
log.Fatal("Unable to listen for SRT clients:", err)
}
clientDataChannels = make(map[string][]chan Packet)
for {
// Wait for new connection
s, err := sck.Accept()
@ -86,6 +69,9 @@ func Serve(cfg *Options, authBackend auth.Backend, forwardingChannel, webrtcChan
continue
}
// FIXME: Flush socket
// Without this, the SRT buffer might get full before reading it
// streamid can be "name:password" for streamer or "name" for viewer
streamID, err := s.GetSockOptString(C.SRTO_STREAMID)
if err != nil {
@ -94,10 +80,6 @@ func Serve(cfg *Options, authBackend auth.Backend, forwardingChannel, webrtcChan
}
split := strings.Split(streamID, ":")
if clientDataChannels[streamID] == nil {
clientDataChannels[streamID] = make([]chan Packet, 0, cfg.MaxClients)
}
if len(split) > 1 {
// password was provided so it is a streamer
name, password := split[0], split[1]
@ -110,15 +92,13 @@ func Serve(cfg *Options, authBackend auth.Backend, forwardingChannel, webrtcChan
}
}
go handleStreamer(s, name, clientDataChannels, forwardingChannel, webrtcChannel)
go handleStreamer(s, streams, name)
} else {
// password was not provided so it is a viewer
name := split[0]
dataChannel := make(chan Packet, 4096)
clientDataChannels[streamID] = append(clientDataChannels[streamID], dataChannel)
go handleViewer(s, name, dataChannel, clientDataChannels)
// Send stream
go handleViewer(s, streams, name)
}
}
}

View File

@ -5,6 +5,8 @@ import (
"os/exec"
"testing"
"time"
"gitlab.crans.org/nounous/ghostream/stream"
)
// TestSplitHostPort Try to split a host like 127.0.0.1:1234 in host, port (127.0.0.1, 1234à
@ -55,7 +57,9 @@ func TestServeSRT(t *testing.T) {
t.Skip("WARNING: FFMPEG is not installed. Skipping stream test")
}
go Serve(&Options{Enabled: true, ListenAddress: ":9711", MaxClients: 2}, nil, nil, nil)
// Init streams messaging and SRT server
streams := make(map[string]*stream.Stream)
go Serve(streams, nil, &Options{Enabled: true, ListenAddress: ":9711", MaxClients: 2})
ffmpeg := exec.Command("ffmpeg", "-hide_banner", "-loglevel", "error",
"-f", "lavfi", "-i", "testsrc=size=640x480:rate=10",
@ -78,6 +82,4 @@ func TestServeSRT(t *testing.T) {
}()
time.Sleep(5 * time.Second) // Delay is in nanoseconds, here 5s
// TODO Kill SRT server
}

View File

@ -1,178 +1,112 @@
// Package telnet provides some fancy tools, like an ASCII-art stream.
// Package telnet expose text version of stream.
package telnet
import (
"fmt"
"io"
"log"
"net"
"strings"
"time"
)
var (
// Cfg contains the different options of the telnet package, see below
// TODO Config should not be exported
Cfg *Options
currentMessage map[string]*string
clientCount map[string]int
"gitlab.crans.org/nounous/ghostream/stream"
)
// Options holds telnet package configuration
type Options struct {
Enabled bool
ListenAddress string
Width int
Height int
Delay int
}
// Serve starts the telnet server and listen to clients
func Serve(config *Options) {
Cfg = config
if !config.Enabled {
// Serve Telnet server
func Serve(streams map[string]*stream.Stream, cfg *Options) {
if !cfg.Enabled {
// Telnet is not enabled, ignore
return
}
currentMessage = make(map[string]*string)
clientCount = make(map[string]int)
listener, err := net.Listen("tcp", config.ListenAddress)
// Start TCP server
listener, err := net.Listen("tcp", cfg.ListenAddress)
if err != nil {
log.Printf("Error while listening to the address %s: %s", config.ListenAddress, err)
return
log.Fatalf("Error while listening to the address %s: %s", cfg.ListenAddress, err)
}
log.Printf("Telnet server listening on %s", cfg.ListenAddress)
go func() {
for {
s, err := listener.Accept()
if err != nil {
log.Printf("Error while accepting TCP socket: %s", s)
continue
}
go func(s net.Conn) {
streamID := ""
// Request for stream ID
for {
_, err = s.Write([]byte("[GHOSTREAM]\nEnter stream ID: "))
if err != nil {
log.Println("Error while requesting stream ID to telnet client")
_ = s.Close()
return
}
buff := make([]byte, 255)
n, err := s.Read(buff)
if err != nil {
log.Println("Error while requesting stream ID to telnet client")
_ = s.Close()
return
}
// Avoid bruteforce
time.Sleep(3 * time.Second)
streamID = string(buff[:n])
streamID = strings.Replace(streamID, "\r", "", -1)
streamID = strings.Replace(streamID, "\n", "", -1)
if len(streamID) > 0 {
if strings.ToLower(streamID) == "exit" {
_, _ = s.Write([]byte("Goodbye!\n"))
_ = s.Close()
return
}
if _, ok := currentMessage[streamID]; !ok {
_, err = s.Write([]byte("Unknown stream ID.\n"))
if err != nil {
log.Println("Error while requesting stream ID to telnet client")
_ = s.Close()
return
}
continue
}
break
}
}
clientCount[streamID]++
// Hide terminal cursor
_, _ = s.Write([]byte("\033[?25l"))
for {
n, err := s.Write([]byte(*currentMessage[streamID]))
if err != nil {
log.Printf("Error while sending TCP data: %s", err)
_ = s.Close()
clientCount[streamID]--
break
}
if n == 0 {
_ = s.Close()
clientCount[streamID]--
break
}
time.Sleep(time.Duration(config.Delay) * time.Millisecond)
}
}(s)
}
}()
log.Println("Telnet server initialized")
}
// GetNumberConnectedSessions returns the numbers of clients that are viewing the stream through a telnet shell
func GetNumberConnectedSessions(streamID string) int {
if Cfg == nil || !Cfg.Enabled {
return 0
}
return clientCount[streamID]
}
// StartASCIIArtStream send all packets received by ffmpeg as ASCII Art to telnet clients
func StartASCIIArtStream(streamID string, reader io.ReadCloser) {
if !Cfg.Enabled {
_ = reader.Close()
return
}
currentMessage[streamID] = new(string)
pixelBuff := make([]byte, Cfg.Width*Cfg.Height)
textBuff := strings.Builder{}
// Handle each new client
for {
n, err := reader.Read(pixelBuff)
s, err := listener.Accept()
if err != nil {
log.Printf("An error occurred while reading input: %s", err)
break
}
if n == 0 {
// Stream is finished
break
log.Printf("Error while accepting TCP socket: %s", s)
continue
}
// Header
textBuff.Reset()
textBuff.Grow((40*Cfg.Width+6)*Cfg.Height + 47)
for i := 0; i < 42; i++ {
textBuff.WriteByte('\n')
}
// Convert image to ASCII
for i, pixel := range pixelBuff {
if i%Cfg.Width == 0 {
// New line
textBuff.WriteString("\033[49m\n")
}
// Print two times the character to make a square
text := fmt.Sprintf("\033[48;2;%d;%d;%dm ", pixel, pixel, pixel)
textBuff.WriteString(text)
textBuff.WriteString(text)
}
textBuff.WriteString("\033[49m")
*(currentMessage[streamID]) = textBuff.String()
go handleViewer(s, streams, cfg)
}
}
func handleViewer(s net.Conn, streams map[string]*stream.Stream, cfg *Options) {
// Prompt user about stream name
if _, err := s.Write([]byte("[GHOSTREAM]\nEnter stream name: ")); err != nil {
log.Printf("Error while writing to TCP socket: %s", err)
s.Close()
return
}
buff := make([]byte, 255)
n, err := s.Read(buff)
if err != nil {
log.Printf("Error while requesting stream ID to telnet client: %s", err)
s.Close()
return
}
name := strings.TrimSpace(string(buff[:n])) + "@text"
if len(name) < 1 {
// Too short, exit
s.Close()
return
}
// Wait a bit
time.Sleep(time.Second)
// Get requested stream
st, ok := streams[name]
if !ok {
log.Println("Stream does not exist, kicking new Telnet viewer")
if _, err := s.Write([]byte("This stream is inactive.\n")); err != nil {
log.Printf("Error while writing to TCP socket: %s", err)
}
s.Close()
return
}
// Register new client
log.Printf("New Telnet viewer for stream '%s'", name)
c := make(chan []byte, 128)
st.Register(c)
st.IncrementClientCount()
// Hide terminal cursor
if _, err = s.Write([]byte("\033[?25l")); err != nil {
log.Printf("Error while writing to TCP socket: %s", err)
s.Close()
return
}
// Receive data and send them
for data := range c {
if len(data) < 1 {
log.Print("Remove Telnet viewer because of end of stream")
break
}
// Send data
_, err := s.Write(data)
if err != nil {
log.Printf("Remove Telnet viewer because of sending error, %s", err)
break
}
}
// Close output
st.Unregister(c)
st.DecrementClientCount()
s.Close()
}

View File

@ -1,41 +1,35 @@
package telnet
import (
"bytes"
"io/ioutil"
"math/rand"
"net"
"testing"
"time"
"gitlab.crans.org/nounous/ghostream/stream"
)
// TestTelnetOutput creates a TCP client that connects to the server and get one image.
func TestTelnetOutput(t *testing.T) {
// Try to start Telnet server while it is disabled
Serve(&Options{Enabled: false})
StartASCIIArtStream("demo", ioutil.NopCloser(bytes.NewReader([]byte{})))
if GetNumberConnectedSessions("demo") != 0 {
t.Fatalf("Mysteriously found %d connected clients", GetNumberConnectedSessions("demo"))
}
streams := make(map[string]*stream.Stream)
go Serve(streams, &Options{Enabled: false})
// FIXME test connect
// Enable and start Telnet server
Serve(&Options{
cfg := Options{
Enabled: true,
ListenAddress: "127.0.0.1:8023",
Width: 80,
Height: 45,
Delay: 50,
})
}
go Serve(streams, &cfg)
// FIXME test connect
// Generate a random image, that should be given by FFMPEG
sampleImage := make([]byte, Cfg.Width*Cfg.Height)
/*sampleImage := make([]byte, cfg.Width*cfg.Height)
rand.Read(sampleImage)
reader := ioutil.NopCloser(bytes.NewBuffer(sampleImage))
// Send the image to the server
StartASCIIArtStream("demo", reader)
// Connect to the Telnet server
client, err := net.Dial("tcp", Cfg.ListenAddress)
client, err := net.Dial("tcp", cfg.ListenAddress)
if err != nil {
t.Fatalf("Error while connecting to the TCP server: %s", err)
}
@ -46,7 +40,7 @@ func TestTelnetOutput(t *testing.T) {
t.Fatalf("Error while closing TCP connection: %s", err)
}
client, err = net.Dial("tcp", Cfg.ListenAddress)
client, err = net.Dial("tcp", cfg.ListenAddress)
if err != nil {
t.Fatalf("Error while connecting to the TCP server: %s", err)
}
@ -110,5 +104,5 @@ func TestTelnetOutput(t *testing.T) {
time.Sleep(time.Second)
if GetNumberConnectedSessions("demo") != 0 {
t.Fatalf("Expected no telnet client, found %d", GetNumberConnectedSessions("demo"))
}
}*/
}

View File

@ -3,61 +3,53 @@ package webrtc
import (
"bufio"
"fmt"
"io"
"log"
"net"
"os/exec"
"strings"
"time"
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
"gitlab.crans.org/nounous/ghostream/stream/srt"
"gitlab.crans.org/nounous/ghostream/stream/telnet"
"gitlab.crans.org/nounous/ghostream/stream"
)
var (
ffmpeg = make(map[string]*exec.Cmd)
ffmpegInput = make(map[string]io.WriteCloser)
activeStream map[string]struct{}
)
func ingestFrom(inputChannel chan srt.Packet) {
// FIXME Clean code
func autoIngest(streams map[string]*stream.Stream) {
// Regulary check existing streams
activeStream = make(map[string]struct{})
for {
var err error = nil
srtPacket := <-inputChannel
switch srtPacket.PacketType {
case "register":
go registerStream(&srtPacket)
break
case "sendData":
if _, ok := ffmpegInput[srtPacket.StreamName]; !ok {
break
for name, st := range streams {
if strings.Contains(name, "@") {
// Not a source stream, pass
continue
}
// FIXME send to stream srtPacket.StreamName
if _, err := ffmpegInput[srtPacket.StreamName].Write(srtPacket.Data); err != nil {
log.Printf("Failed to write data to ffmpeg input: %s", err)
if _, ok := activeStream[name]; ok {
// Stream is already ingested
continue
}
break
case "close":
log.Printf("WebRTC CloseConnection %s", srtPacket.StreamName)
_ = ffmpeg[srtPacket.StreamName].Process.Kill()
_ = ffmpegInput[srtPacket.StreamName].Close()
delete(ffmpeg, srtPacket.StreamName)
delete(ffmpegInput, srtPacket.StreamName)
break
default:
log.Println("Unknown SRT srtPacket type:", srtPacket.PacketType)
break
}
if err != nil {
log.Printf("Error occurred while receiving SRT srtPacket of type %s: %s", srtPacket.PacketType, err)
// Start ingestion
log.Printf("Starting webrtc for '%s'", name)
go ingest(name, st)
}
// Regulary pull stream list,
// it may be better to tweak the messaging system
// to get an event on a new stream.
time.Sleep(time.Second)
}
}
func registerStream(srtPacket *srt.Packet) {
log.Printf("WebRTC RegisterStream %s", srtPacket.StreamName)
func ingest(name string, input *stream.Stream) {
// Register to get stream
videoInput := make(chan []byte, 1024)
input.Register(videoInput)
activeStream[name] = struct{}{}
// Open a UDP Listener for RTP Packets on port 5004
videoListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5004})
@ -70,55 +62,12 @@ func registerStream(srtPacket *srt.Packet) {
log.Printf("Faited to open UDP listener %s", err)
return
}
// FIXME Close UDP listeners at the end of the stream, not the end of the routine
/* defer func() {
if err = videoListener.Close(); err != nil {
log.Printf("Faited to close UDP listener %s", err)
}
if err = audioListener.Close(); err != nil {
log.Printf("Faited to close UDP listener %s", err)
}
}() */
ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0",
"-an", "-vcodec", "libvpx", "-crf", "10", "-cpu-used", "5", "-b:v", "6000k", "-maxrate", "8000k", "-bufsize", "12000k", // TODO Change bitrate when changing quality
"-qmin", "10", "-qmax", "42", "-threads", "4", "-deadline", "1", "-error-resilient", "1",
"-auto-alt-ref", "1",
"-f", "rtp", "rtp://127.0.0.1:5004",
"-vn", "-acodec", "libopus", "-cpu-used", "5", "-deadline", "1", "-qmin", "10", "-qmax", "42", "-error-resilient", "1", "-auto-alt-ref", "1",
"-f", "rtp", "rtp://127.0.0.1:5005"}
// Export stream to ascii art
if telnet.Cfg.Enabled {
bitrate := fmt.Sprintf("%dk", telnet.Cfg.Width*telnet.Cfg.Height/telnet.Cfg.Delay)
ffmpegArgs = append(ffmpegArgs,
"-an", "-vf", fmt.Sprintf("scale=%dx%d", telnet.Cfg.Width, telnet.Cfg.Height),
"-b:v", bitrate, "-minrate", bitrate, "-maxrate", bitrate, "-bufsize", bitrate, "-q", "42", "-pix_fmt", "gray", "-f", "rawvideo", "pipe:1")
}
ffmpeg[srtPacket.StreamName] = exec.Command("ffmpeg", ffmpegArgs...)
input, err := ffmpeg[srtPacket.StreamName].StdinPipe()
// Start ffmpag to convert videoInput to video and audio UDP
ffmpeg, err := startFFmpeg(videoInput)
if err != nil {
panic(err)
}
ffmpegInput[srtPacket.StreamName] = input
errOutput, err := ffmpeg[srtPacket.StreamName].StderrPipe()
if err != nil {
panic(err)
}
// Receive raw video output and convert it to ASCII art, then forward it TCP
if telnet.Cfg.Enabled {
output, err := ffmpeg[srtPacket.StreamName].StdoutPipe()
if err != nil {
panic(err)
}
go telnet.StartASCIIArtStream(srtPacket.StreamName, output)
}
if err := ffmpeg[srtPacket.StreamName].Start(); err != nil {
panic(err)
log.Printf("Error while starting ffmpeg: %s", err)
return
}
// Receive video
@ -128,7 +77,7 @@ func registerStream(srtPacket *srt.Packet) {
n, _, err := videoListener.ReadFromUDP(inboundRTPPacket)
if err != nil {
log.Printf("Failed to read from UDP: %s", err)
continue
break
}
packet := &rtp.Packet{}
if err := packet.Unmarshal(inboundRTPPacket[:n]); err != nil {
@ -136,13 +85,13 @@ func registerStream(srtPacket *srt.Packet) {
continue
}
if videoTracks[srtPacket.StreamName] == nil {
videoTracks[srtPacket.StreamName] = make([]*webrtc.Track, 0)
if videoTracks[name] == nil {
videoTracks[name] = make([]*webrtc.Track, 0)
}
// Write RTP srtPacket to all video tracks
// Adapt payload and SSRC to match destination
for _, videoTrack := range videoTracks[srtPacket.StreamName] {
for _, videoTrack := range videoTracks[name] {
packet.Header.PayloadType = videoTrack.PayloadType()
packet.Header.SSRC = videoTrack.SSRC()
if writeErr := videoTrack.WriteRTP(packet); writeErr != nil {
@ -160,7 +109,7 @@ func registerStream(srtPacket *srt.Packet) {
n, _, err := audioListener.ReadFromUDP(inboundRTPPacket)
if err != nil {
log.Printf("Failed to read from UDP: %s", err)
continue
break
}
packet := &rtp.Packet{}
if err := packet.Unmarshal(inboundRTPPacket[:n]); err != nil {
@ -168,13 +117,13 @@ func registerStream(srtPacket *srt.Packet) {
continue
}
if audioTracks[srtPacket.StreamName] == nil {
audioTracks[srtPacket.StreamName] = make([]*webrtc.Track, 0)
if audioTracks[name] == nil {
audioTracks[name] = make([]*webrtc.Track, 0)
}
// Write RTP srtPacket to all audio tracks
// Adapt payload and SSRC to match destination
for _, audioTrack := range audioTracks[srtPacket.StreamName] {
for _, audioTrack := range audioTracks[name] {
packet.Header.PayloadType = audioTrack.PayloadType()
packet.Header.SSRC = audioTrack.SSRC()
if writeErr := audioTrack.WriteRTP(packet); writeErr != nil {
@ -185,10 +134,60 @@ func registerStream(srtPacket *srt.Packet) {
}
}()
// Wait for stopped ffmpeg
if err = ffmpeg.Wait(); err != nil {
log.Printf("Faited to wait for ffmpeg: %s", err)
}
// Close UDP listeners
if err = videoListener.Close(); err != nil {
log.Printf("Faited to close UDP listener: %s", err)
}
if err = audioListener.Close(); err != nil {
log.Printf("Faited to close UDP listener: %s", err)
}
delete(activeStream, name)
}
func startFFmpeg(in <-chan []byte) (ffmpeg *exec.Cmd, err error) {
ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0",
"-an", "-vcodec", "libvpx", "-crf", "10", "-cpu-used", "5", "-b:v", "6000k", "-maxrate", "8000k", "-bufsize", "12000k", // TODO Change bitrate when changing quality
"-qmin", "10", "-qmax", "42", "-threads", "4", "-deadline", "1", "-error-resilient", "1",
"-auto-alt-ref", "1",
"-f", "rtp", "rtp://127.0.0.1:5004",
"-vn", "-acodec", "libopus", "-cpu-used", "5", "-deadline", "1", "-qmin", "10", "-qmax", "42", "-error-resilient", "1", "-auto-alt-ref", "1",
"-f", "rtp", "rtp://127.0.0.1:5005"}
ffmpeg = exec.Command("ffmpeg", ffmpegArgs...)
// Handle errors output
errOutput, err := ffmpeg.StderrPipe()
if err != nil {
return nil, err
}
go func() {
scanner := bufio.NewScanner(errOutput)
for scanner.Scan() {
log.Printf("[WEBRTC FFMPEG %s] %s", "demo", scanner.Text())
}
}()
// Handle stream input
input, err := ffmpeg.StdinPipe()
if err != nil {
return nil, err
}
go func() {
for data := range in {
if _, err := input.Write(data); err != nil {
log.Printf("Failed to write data to ffmpeg input: %s", err)
}
}
// End of stream
ffmpeg.Process.Kill()
}()
// Start process
err = ffmpeg.Start()
return ffmpeg, err
}

View File

@ -8,7 +8,7 @@ import (
"github.com/pion/webrtc/v3"
"gitlab.crans.org/nounous/ghostream/internal/monitoring"
"gitlab.crans.org/nounous/ghostream/stream/srt"
"gitlab.crans.org/nounous/ghostream/stream"
)
// Options holds web package configuration
@ -182,12 +182,12 @@ func getPayloadType(m webrtc.MediaEngine, codecType webrtc.RTPCodecType, codecNa
}
// Serve WebRTC media streaming server
func Serve(remoteSdpChan chan struct {
func Serve(streams map[string]*stream.Stream, remoteSdpChan chan struct {
StreamID string
RemoteDescription webrtc.SessionDescription
}, localSdpChan chan webrtc.SessionDescription, inputChannel chan srt.Packet, cfg *Options) {
}, localSdpChan chan webrtc.SessionDescription, cfg *Options) {
if !cfg.Enabled {
// SRT is not enabled, ignore
// WebRTC is not enabled, ignore
return
}
@ -197,8 +197,8 @@ func Serve(remoteSdpChan chan struct {
videoTracks = make(map[string][]*webrtc.Track)
audioTracks = make(map[string][]*webrtc.Track)
// Ingest data from SRT
go ingestFrom(inputChannel)
// Ingest data
go autoIngest(streams)
// Handle new connections
for {

View File

@ -5,24 +5,24 @@ import (
"testing"
"github.com/pion/webrtc/v3"
"gitlab.crans.org/nounous/ghostream/stream/srt"
"gitlab.crans.org/nounous/ghostream/stream"
)
func TestServe(t *testing.T) {
// Serve WebRTC server
// Init streams messaging and WebRTC server
streams := make(map[string]*stream.Stream)
remoteSdpChan := make(chan struct {
StreamID string
RemoteDescription webrtc.SessionDescription
})
localSdpChan := make(chan webrtc.SessionDescription)
webrtcChannel := make(chan srt.Packet, 64)
cfg := Options{
Enabled: true,
MinPortUDP: 10000,
MaxPortUDP: 10005,
STUNServers: []string{"stun:stun.l.google.com:19302"},
}
go Serve(remoteSdpChan, localSdpChan, webrtcChannel, &cfg)
go Serve(streams, remoteSdpChan, localSdpChan, &cfg)
// New client connection
mediaEngine := webrtc.MediaEngine{}

157
transcoder/text/text.go Normal file
View File

@ -0,0 +1,157 @@
// Package text transcode a video to text
package text
import (
"bufio"
"bytes"
"fmt"
"io"
"log"
"os/exec"
"strings"
"time"
"gitlab.crans.org/nounous/ghostream/stream"
)
// Options holds text package configuration
type Options struct {
Enabled bool
Width int
Height int
Framerate int
}
// Init text transcoder
func Init(streams map[string]*stream.Stream, cfg *Options) {
if !cfg.Enabled {
// Text transcode is not enabled, ignore
return
}
// Regulary check existing streams
for {
for sourceName, sourceStream := range streams {
if strings.Contains(sourceName, "@") {
// Not a source stream, pass
continue
}
// Check that the transcoded stream does not already exist
name := sourceName + "@text"
_, ok := streams[name]
if ok {
// Stream is already transcoded
continue
}
// Start conversion
log.Printf("Starting text transcode '%s'", name)
st := stream.New()
streams[name] = st
go transcode(sourceStream, st, cfg)
}
// Regulary pull stream list,
// it may be better to tweak the messaging system
// to get an event on a new stream.
time.Sleep(time.Second)
}
}
// Convert video to ANSI text
func transcode(input, output *stream.Stream, cfg *Options) {
// Start ffmpeg to transcode video to rawvideo
videoInput := make(chan []byte, 1024)
input.Register(videoInput)
ffmpeg, rawvideo, err := startFFmpeg(videoInput, cfg)
if err != nil {
log.Printf("Error while starting ffmpeg: %s", err)
return
}
// Transcode rawvideo to ANSI text
pixelBuff := make([]byte, cfg.Width*cfg.Height)
textBuff := bytes.Buffer{}
for {
n, err := (*rawvideo).Read(pixelBuff)
if err != nil {
log.Printf("An error occurred while reading input: %s", err)
break
}
if n == 0 {
// Stream is finished
break
}
// Header
textBuff.Reset()
textBuff.Grow((40*cfg.Width+6)*cfg.Height + 47)
for i := 0; i < 42; i++ {
textBuff.WriteByte('\n')
}
// Convert image to ASCII
for i, pixel := range pixelBuff {
if i%cfg.Width == 0 {
// New line
textBuff.WriteString("\033[49m\n")
}
// Print two times the character to make a square
text := fmt.Sprintf("\033[48;2;%d;%d;%dm ", pixel, pixel, pixel)
textBuff.WriteString(text)
textBuff.WriteString(text)
}
textBuff.WriteString("\033[49m")
output.Broadcast <- textBuff.Bytes()
}
// Stop transcode
ffmpeg.Process.Kill()
}
// Start a ffmpeg instance to convert stream into rawvideo
func startFFmpeg(in <-chan []byte, cfg *Options) (*exec.Cmd, *io.ReadCloser, error) {
bitrate := fmt.Sprintf("%dk", cfg.Width*cfg.Height*cfg.Framerate)
ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0",
"-an", "-vf", fmt.Sprintf("scale=%dx%d", cfg.Width, cfg.Height),
"-b:v", bitrate, "-minrate", bitrate, "-maxrate", bitrate, "-bufsize", bitrate,
"-q", "42", "-pix_fmt", "gray", "-f", "rawvideo", "pipe:1"}
ffmpeg := exec.Command("ffmpeg", ffmpegArgs...)
// Handle errors output
errOutput, err := ffmpeg.StderrPipe()
if err != nil {
return nil, nil, err
}
go func() {
scanner := bufio.NewScanner(errOutput)
for scanner.Scan() {
log.Printf("[TELNET FFMPEG %s] %s", "demo", scanner.Text())
}
}()
// Handle text output
output, err := ffmpeg.StdoutPipe()
if err != nil {
return nil, nil, err
}
// Handle stream input
input, err := ffmpeg.StdinPipe()
if err != nil {
return nil, nil, err
}
go func() {
for data := range in {
input.Write(data)
}
}()
// Start process
err = ffmpeg.Start()
return ffmpeg, &output, err
}

View File

@ -0,0 +1 @@
package text

17
transcoder/transcoder.go Normal file
View File

@ -0,0 +1,17 @@
// Package transcoder manages transcoders
package transcoder
import (
"gitlab.crans.org/nounous/ghostream/stream"
"gitlab.crans.org/nounous/ghostream/transcoder/text"
)
// Options holds text package configuration
type Options struct {
Text text.Options
}
// Init all transcoders
func Init(streams map[string]*stream.Stream, cfg *Options) {
go text.Init(streams, &cfg.Text)
}

View File

@ -0,0 +1 @@
package transcoder

View File

@ -13,14 +13,12 @@ import (
"github.com/markbates/pkger"
"gitlab.crans.org/nounous/ghostream/internal/monitoring"
"gitlab.crans.org/nounous/ghostream/stream/srt"
"gitlab.crans.org/nounous/ghostream/stream/telnet"
"gitlab.crans.org/nounous/ghostream/stream/webrtc"
)
var (
// Precompile regex
validPath = regexp.MustCompile("^/[a-z0-9_-]*$")
validPath = regexp.MustCompile("^/[a-z0-9@_-]*$")
)
// Handle WebRTC session description exchange via POST
@ -152,14 +150,19 @@ func staticHandler() http.Handler {
}
func statisticsHandler(w http.ResponseWriter, r *http.Request) {
// Display connected users stats, from WebRTC or streaming directly from a video player
streamID := strings.Replace(r.URL.Path[7:], "/", "", -1)
name := strings.Replace(r.URL.Path[7:], "/", "", -1)
userCount := 0
// Get requested stream
stream, ok := streams[name]
if ok {
// Get number of output channels
userCount = stream.ClientCount()
}
// Display connected users statistics
enc := json.NewEncoder(w)
err := enc.Encode(struct {
ConnectedViewers int
}{webrtc.GetNumberConnectedSessions(streamID) +
srt.GetNumberConnectedSessions(streamID) +
telnet.GetNumberConnectedSessions(streamID)})
err := enc.Encode(struct{ ConnectedViewers int }{userCount})
if err != nil {
http.Error(w, "Failed to generate JSON.", http.StatusInternalServerError)
log.Printf("Failed to generate JSON: %s", err)

View File

@ -11,6 +11,7 @@ import (
"github.com/markbates/pkger"
"github.com/pion/webrtc/v3"
"gitlab.crans.org/nounous/ghostream/stream"
)
// Options holds web package configuration
@ -41,6 +42,9 @@ var (
// Preload templates
templates *template.Template
// Streams to get statistics
streams map[string]*stream.Stream
)
// Load templates with pkger
@ -74,10 +78,11 @@ func loadTemplates() error {
}
// Serve HTTP server
func Serve(rSdpChan chan struct {
func Serve(s map[string]*stream.Stream, rSdpChan chan struct {
StreamID string
RemoteDescription webrtc.SessionDescription
}, lSdpChan chan webrtc.SessionDescription, c *Options) {
streams = s
remoteSdpChan = rSdpChan
localSdpChan = lSdpChan
cfg = c

View File

@ -4,11 +4,17 @@ import (
"net/http"
"testing"
"time"
"gitlab.crans.org/nounous/ghostream/stream"
)
// TestHTTPServe tries to serve a real HTTP server and load some pages
func TestHTTPServe(t *testing.T) {
go Serve(nil, nil, &Options{Enabled: false, ListenAddress: "127.0.0.1:8081"})
// Init streams messaging
streams := make(map[string]*stream.Stream)
// Create a disabled web server
go Serve(streams, nil, nil, &Options{Enabled: false, ListenAddress: "127.0.0.1:8081"})
// Sleep 500ms to ensure that the web server is running, to avoid fails because the request came too early
time.Sleep(500 * time.Millisecond)
@ -20,7 +26,7 @@ func TestHTTPServe(t *testing.T) {
}
// Now let's really start the web server
go Serve(nil, nil, &Options{Enabled: true, ListenAddress: "127.0.0.1:8081"})
go Serve(streams, nil, nil, &Options{Enabled: true, ListenAddress: "127.0.0.1:8081"})
// Sleep 500ms to ensure that the web server is running, to avoid fails because the request came too early
time.Sleep(500 * time.Millisecond)
@ -52,7 +58,7 @@ func TestHTTPServe(t *testing.T) {
t.Errorf("Viewer page returned %v != %v on GET", resp.StatusCode, http.StatusOK)
}
// Test viewer statistic endpoint
// Test viewer statistics endpoint
resp, err = http.Get("http://localhost:8081/_stats/demo/")
if err != nil {
t.Errorf("Error while getting /_stats: %s", err)