Merge branch 'asciiart' into 'dev'

Asciiart and performance

Closes #16

See merge request nounous/ghostream!4
This commit is contained in:
erdnaxe 2020-10-13 18:48:00 +02:00
commit d6750f219a
12 changed files with 392 additions and 154 deletions

View File

@ -12,6 +12,6 @@ FROM alpine:3.12
RUN apk add --no-cache -X https://dl-cdn.alpinelinux.org/alpine/edge/community/ ffmpeg libsrt
COPY --from=build_base /code/out/ghostream /app/ghostream
WORKDIR /app
# 9710 for SRT, 8080 for Web, 2112 for monitoring and 10000-10005 (UDP) for WebRTC
EXPOSE 9710/udp 8080 2112 10000-10005/udp
# 2112 for monitoring, 8023 for Telnet, 8080 for Web, 9710 for SRT, 10000-10005 (UDP) for WebRTC
EXPOSE 2112 8023 8080 9710/udp 10000-10005/udp
CMD ["/app/ghostream"]

View File

@ -73,6 +73,27 @@ srt:
# Max number of active SRT connections
#maxClients: 64
## Telnet server ##
# The telnet server receive the stream and emit the stream as ASCII-art.
telnet:
# By default, this easter egg is disabled.
# You must enable it to use it.
#
#enabled: false
#listenAddress: :8023
# Size is in characters. It is recommended to keep a 16x9 format.
#
#width: 80
#height: 45
# Time in milliseconds that we should sleep between two images.
# By default, 20 FPS. Displaying text takes time...
#
#delay: 50
## Web server ##
# The web server serves a WebRTC player.
web:
@ -103,11 +124,14 @@ web:
#
#name: Ghostream
# Use the domain name as the stream name
# e.g., on http://example.com:8080/ the stream served will be "example.com"
# Use the domain name as the stream name for some hosts
# e.g., on http://stream.example.com:8080/, if the domain stream.example.com is mapped to "example",
# the stream served will be "example".
# This implies that your domain will be able to serve only one stream.
# Dots in the domain name must be remplaced by dashes to avoid yaml issues.
#
#oneStreamPerDomain: false
#mapDomainToStream:
# stream-example-com: example
# Stream player poster
# Shown when stream is loading or inactive.

View File

@ -11,6 +11,7 @@ import (
"gitlab.crans.org/nounous/ghostream/internal/monitoring"
"gitlab.crans.org/nounous/ghostream/stream/forwarding"
"gitlab.crans.org/nounous/ghostream/stream/srt"
"gitlab.crans.org/nounous/ghostream/stream/telnet"
"gitlab.crans.org/nounous/ghostream/stream/webrtc"
"gitlab.crans.org/nounous/ghostream/web"
)
@ -21,6 +22,7 @@ type Config struct {
Forwarding forwarding.Options
Monitoring monitoring.Options
Srt srt.Options
Telnet telnet.Options
Web web.Options
WebRTC webrtc.Options
}
@ -49,13 +51,20 @@ func New() *Config {
ListenAddress: ":9710",
MaxClients: 64,
},
Telnet: telnet.Options{
Enabled: false,
ListenAddress: ":8023",
Width: 80,
Height: 45,
Delay: 50,
},
Web: web.Options{
Enabled: true,
Favicon: "/static/img/favicon.svg",
Hostname: "localhost",
ListenAddress: ":8080",
Name: "Ghostream",
OneStreamPerDomain: false,
MapDomainToStream: make(map[string]string),
PlayerPoster: "/static/img/no_stream.svg",
ViewersCounterRefreshPeriod: 20000,
},

View File

@ -12,6 +12,7 @@ import (
"gitlab.crans.org/nounous/ghostream/internal/monitoring"
"gitlab.crans.org/nounous/ghostream/stream/forwarding"
"gitlab.crans.org/nounous/ghostream/stream/srt"
"gitlab.crans.org/nounous/ghostream/stream/telnet"
"gitlab.crans.org/nounous/ghostream/stream/webrtc"
"gitlab.crans.org/nounous/ghostream/web"
)
@ -50,6 +51,7 @@ func main() {
go forwarding.Serve(forwardingChannel, cfg.Forwarding)
go monitoring.Serve(&cfg.Monitoring)
go srt.Serve(&cfg.Srt, authBackend, forwardingChannel, webrtcChannel)
go telnet.Serve(&cfg.Telnet)
go web.Serve(remoteSdpChan, localSdpChan, &cfg.Web)
go webrtc.Serve(remoteSdpChan, localSdpChan, webrtcChannel, &cfg.WebRTC)

View File

@ -18,7 +18,9 @@ type Options map[string][]string
func Serve(inputChannel chan srt.Packet, cfg Options) {
if len(cfg) < 1 {
// No forwarding, ignore
return
for {
<-inputChannel // Clear input channel
}
}
log.Printf("Stream forwarding initialized")

161
stream/telnet/telnet.go Normal file
View File

@ -0,0 +1,161 @@
// Package telnet provides some fancy tools, like an ASCII-art stream.
package telnet
import (
"io"
"log"
"net"
"strings"
"time"
)
var (
// Cfg contains the different options of the telnet package, see below
// TODO Config should not be exported
Cfg *Options
currentMessage map[string]*string
clientCount map[string]int
)
// Options holds telnet package configuration
type Options struct {
Enabled bool
ListenAddress string
Width int
Height int
Delay int
}
// Serve starts the telnet server and listen to clients
func Serve(config *Options) {
Cfg = config
if !Cfg.Enabled {
return
}
currentMessage = make(map[string]*string)
clientCount = make(map[string]int)
listener, err := net.Listen("tcp", Cfg.ListenAddress)
if err != nil {
log.Printf("Error while listening to the address %s: %s", Cfg.ListenAddress, err)
return
}
go func() {
for {
s, err := listener.Accept()
if err != nil {
log.Printf("Error while accepting TCP socket: %s", s)
continue
}
go func(s net.Conn) {
streamID := ""
// Request for stream ID
for {
_, _ = s.Write([]byte("[GHOSTREAM]\n"))
_, err = s.Write([]byte("Enter stream ID: "))
if err != nil {
log.Println("Error while requesting stream ID to telnet client")
_ = s.Close()
return
}
buff := make([]byte, 255)
n, err := s.Read(buff)
if err != nil {
log.Println("Error while requesting stream ID to telnet client")
_ = s.Close()
return
}
// Avoid bruteforce
time.Sleep(3 * time.Second)
streamID = string(buff[:n])
streamID = strings.Replace(streamID, "\r", "", -1)
streamID = strings.Replace(streamID, "\n", "", -1)
if len(streamID) > 0 {
if strings.ToLower(streamID) == "exit" {
_, _ = s.Write([]byte("Goodbye!\n"))
_ = s.Close()
return
}
if _, ok := currentMessage[streamID]; !ok {
_, err = s.Write([]byte("Unknown stream ID.\n"))
if err != nil {
log.Println("Error while requesting stream ID to telnet client")
_ = s.Close()
return
}
continue
}
break
}
}
clientCount[streamID]++
for {
n, err := s.Write([]byte(*currentMessage[streamID]))
if err != nil {
log.Printf("Error while sending TCP data: %s", err)
_ = s.Close()
clientCount[streamID]--
break
}
if n == 0 {
_ = s.Close()
clientCount[streamID]--
break
}
time.Sleep(time.Duration(Cfg.Delay) * time.Millisecond)
}
}(s)
}
}()
log.Println("Telnet server initialized")
}
// GetNumberConnectedSessions returns the numbers of clients that are viewing the stream through a telnet shell
func GetNumberConnectedSessions(streamID string) int {
if Cfg == nil || !Cfg.Enabled {
return 0
}
return clientCount[streamID]
}
func asciiChar(pixel byte) string {
asciiChars := []string{"@", "#", "$", "%", "?", "*", "+", ";", ":", ",", ".", " "}
return asciiChars[(255-pixel)/22]
}
// StartASCIIArtStream send all packets received by ffmpeg as ASCII Art to telnet clients
func StartASCIIArtStream(streamID string, reader io.ReadCloser) {
if !Cfg.Enabled {
_ = reader.Close()
return
}
currentMessage[streamID] = new(string)
buff := make([]byte, Cfg.Width*Cfg.Height)
header := "\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n"
for {
n, _ := reader.Read(buff)
if n == 0 {
break
}
imageStr := ""
for j := 0; j < Cfg.Height; j++ {
for i := 0; i < Cfg.Width; i++ {
pixel := buff[Cfg.Width*j+i]
imageStr += asciiChar(pixel) + asciiChar(pixel)
}
imageStr += "\n"
}
*(currentMessage[streamID]) = header + imageStr
}
}

View File

@ -3,149 +3,47 @@ package webrtc
import (
"bufio"
"fmt"
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
"gitlab.crans.org/nounous/ghostream/stream/srt"
"gitlab.crans.org/nounous/ghostream/stream/telnet"
"io"
"log"
"net"
"os/exec"
)
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
"gitlab.crans.org/nounous/ghostream/stream/srt"
var (
ffmpeg = make(map[string]*exec.Cmd)
ffmpegInput = make(map[string]io.WriteCloser)
)
func ingestFrom(inputChannel chan srt.Packet) {
// FIXME Clean code
var ffmpeg *exec.Cmd
var ffmpegInput io.WriteCloser
for {
var err error = nil
srtPacket := <-inputChannel
switch srtPacket.PacketType {
case "register":
log.Printf("WebRTC RegisterStream %s", srtPacket.StreamName)
// Open a UDP Listener for RTP Packets on port 5004
videoListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5004})
if err != nil {
log.Printf("Faited to open UDP listener %s", err)
return
}
audioListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5005})
if err != nil {
log.Printf("Faited to open UDP listener %s", err)
return
}
defer func() {
if err = videoListener.Close(); err != nil {
log.Printf("Faited to close UDP listener %s", err)
}
if err = audioListener.Close(); err != nil {
log.Printf("Faited to close UDP listener %s", err)
}
}()
ffmpeg = exec.Command("ffmpeg", "-hide_banner", "-loglevel", "error", "-re", "-i", "pipe:0",
"-an", "-vcodec", "libvpx", "-crf", "10", "-cpu-used", "5", "-b:v", "6000k", "-maxrate", "8000k", "-bufsize", "12000k", // TODO Change bitrate when changing quality
"-qmin", "10", "-qmax", "42", "-threads", "4", "-deadline", "1", "-error-resilient", "1",
"-auto-alt-ref", "1",
"-f", "rtp", "rtp://127.0.0.1:5004",
"-vn", "-acodec", "libopus", "-cpu-used", "5", "-deadline", "1", "-qmin", "10", "-qmax", "42", "-error-resilient", "1", "-auto-alt-ref", "1",
"-f", "rtp", "rtp://127.0.0.1:5005")
input, err := ffmpeg.StdinPipe()
if err != nil {
panic(err)
}
ffmpegInput = input
errOutput, err := ffmpeg.StderrPipe()
if err != nil {
panic(err)
}
if err := ffmpeg.Start(); err != nil {
panic(err)
}
// Receive video
go func() {
for {
inboundRTPPacket := make([]byte, 1500) // UDP MTU
n, _, err := videoListener.ReadFromUDP(inboundRTPPacket)
if err != nil {
log.Printf("Failed to read from UDP: %s", err)
continue
}
packet := &rtp.Packet{}
if err := packet.Unmarshal(inboundRTPPacket[:n]); err != nil {
log.Printf("Failed to unmarshal RTP srtPacket: %s", err)
continue
}
if videoTracks[srtPacket.StreamName] == nil {
videoTracks[srtPacket.StreamName] = make([]*webrtc.Track, 0)
}
// Write RTP srtPacket to all video tracks
// Adapt payload and SSRC to match destination
for _, videoTrack := range videoTracks[srtPacket.StreamName] {
packet.Header.PayloadType = videoTrack.PayloadType()
packet.Header.SSRC = videoTrack.SSRC()
if writeErr := videoTrack.WriteRTP(packet); writeErr != nil {
log.Printf("Failed to write to video track: %s", err)
continue
}
}
}
}()
// Receive audio
go func() {
for {
inboundRTPPacket := make([]byte, 1500) // UDP MTU
n, _, err := audioListener.ReadFromUDP(inboundRTPPacket)
if err != nil {
log.Printf("Failed to read from UDP: %s", err)
continue
}
packet := &rtp.Packet{}
if err := packet.Unmarshal(inboundRTPPacket[:n]); err != nil {
log.Printf("Failed to unmarshal RTP srtPacket: %s", err)
continue
}
if audioTracks[srtPacket.StreamName] == nil {
audioTracks[srtPacket.StreamName] = make([]*webrtc.Track, 0)
}
// Write RTP srtPacket to all audio tracks
// Adapt payload and SSRC to match destination
for _, audioTrack := range audioTracks[srtPacket.StreamName] {
packet.Header.PayloadType = audioTrack.PayloadType()
packet.Header.SSRC = audioTrack.SSRC()
if writeErr := audioTrack.WriteRTP(packet); writeErr != nil {
log.Printf("Failed to write to audio track: %s", err)
continue
}
}
}
}()
go func() {
scanner := bufio.NewScanner(errOutput)
for scanner.Scan() {
log.Printf("[WEBRTC FFMPEG %s] %s", "demo", scanner.Text())
}
}()
go registerStream(&srtPacket)
break
case "sendData":
if _, ok := ffmpegInput[srtPacket.StreamName]; !ok {
break
}
// FIXME send to stream srtPacket.StreamName
if _, err := ffmpegInput.Write(srtPacket.Data); err != nil {
if _, err := ffmpegInput[srtPacket.StreamName].Write(srtPacket.Data); err != nil {
log.Printf("Failed to write data to ffmpeg input: %s", err)
}
break
case "close":
log.Printf("WebRTC CloseConnection %s", srtPacket.StreamName)
_ = ffmpeg[srtPacket.StreamName].Process.Kill()
_ = ffmpegInput[srtPacket.StreamName].Close()
delete(ffmpeg, srtPacket.StreamName)
delete(ffmpegInput, srtPacket.StreamName)
break
default:
log.Println("Unknown SRT srtPacket type:", srtPacket.PacketType)
@ -156,3 +54,140 @@ func ingestFrom(inputChannel chan srt.Packet) {
}
}
}
func registerStream(srtPacket *srt.Packet) {
log.Printf("WebRTC RegisterStream %s", srtPacket.StreamName)
// Open a UDP Listener for RTP Packets on port 5004
videoListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5004})
if err != nil {
log.Printf("Faited to open UDP listener %s", err)
return
}
audioListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5005})
if err != nil {
log.Printf("Faited to open UDP listener %s", err)
return
}
// FIXME Close UDP listeners at the end of the stream, not the end of the routine
/* defer func() {
if err = videoListener.Close(); err != nil {
log.Printf("Faited to close UDP listener %s", err)
}
if err = audioListener.Close(); err != nil {
log.Printf("Faited to close UDP listener %s", err)
}
}() */
ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0",
"-an", "-vcodec", "libvpx", "-crf", "10", "-cpu-used", "5", "-b:v", "6000k", "-maxrate", "8000k", "-bufsize", "12000k", // TODO Change bitrate when changing quality
"-qmin", "10", "-qmax", "42", "-threads", "4", "-deadline", "1", "-error-resilient", "1",
"-auto-alt-ref", "1",
"-f", "rtp", "rtp://127.0.0.1:5004",
"-vn", "-acodec", "libopus", "-cpu-used", "5", "-deadline", "1", "-qmin", "10", "-qmax", "42", "-error-resilient", "1", "-auto-alt-ref", "1",
"-f", "rtp", "rtp://127.0.0.1:5005"}
// Export stream to ascii art
if telnet.Cfg.Enabled {
bitrate := fmt.Sprintf("%dk", telnet.Cfg.Width*telnet.Cfg.Height/telnet.Cfg.Delay)
ffmpegArgs = append(ffmpegArgs,
"-an", "-vf", fmt.Sprintf("scale=%dx%d", telnet.Cfg.Width, telnet.Cfg.Height),
"-b:v", bitrate, "-minrate", bitrate, "-maxrate", bitrate, "-bufsize", bitrate, "-q", "42", "-pix_fmt", "gray", "-f", "rawvideo", "pipe:1")
}
ffmpeg[srtPacket.StreamName] = exec.Command("ffmpeg", ffmpegArgs...)
input, err := ffmpeg[srtPacket.StreamName].StdinPipe()
if err != nil {
panic(err)
}
ffmpegInput[srtPacket.StreamName] = input
errOutput, err := ffmpeg[srtPacket.StreamName].StderrPipe()
if err != nil {
panic(err)
}
// Receive raw video output and convert it to ASCII art, then forward it TCP
if telnet.Cfg.Enabled {
output, err := ffmpeg[srtPacket.StreamName].StdoutPipe()
if err != nil {
panic(err)
}
go telnet.StartASCIIArtStream(srtPacket.StreamName, output)
}
if err := ffmpeg[srtPacket.StreamName].Start(); err != nil {
panic(err)
}
// Receive video
go func() {
inboundRTPPacket := make([]byte, 1500) // UDP MTU
for {
n, _, err := videoListener.ReadFromUDP(inboundRTPPacket)
if err != nil {
log.Printf("Failed to read from UDP: %s", err)
continue
}
packet := &rtp.Packet{}
if err := packet.Unmarshal(inboundRTPPacket[:n]); err != nil {
log.Printf("Failed to unmarshal RTP srtPacket: %s", err)
continue
}
if videoTracks[srtPacket.StreamName] == nil {
videoTracks[srtPacket.StreamName] = make([]*webrtc.Track, 0)
}
// Write RTP srtPacket to all video tracks
// Adapt payload and SSRC to match destination
for _, videoTrack := range videoTracks[srtPacket.StreamName] {
packet.Header.PayloadType = videoTrack.PayloadType()
packet.Header.SSRC = videoTrack.SSRC()
if writeErr := videoTrack.WriteRTP(packet); writeErr != nil {
log.Printf("Failed to write to video track: %s", err)
continue
}
}
}
}()
// Receive audio
go func() {
inboundRTPPacket := make([]byte, 1500) // UDP MTU
for {
n, _, err := audioListener.ReadFromUDP(inboundRTPPacket)
if err != nil {
log.Printf("Failed to read from UDP: %s", err)
continue
}
packet := &rtp.Packet{}
if err := packet.Unmarshal(inboundRTPPacket[:n]); err != nil {
log.Printf("Failed to unmarshal RTP srtPacket: %s", err)
continue
}
if audioTracks[srtPacket.StreamName] == nil {
audioTracks[srtPacket.StreamName] = make([]*webrtc.Track, 0)
}
// Write RTP srtPacket to all audio tracks
// Adapt payload and SSRC to match destination
for _, audioTrack := range audioTracks[srtPacket.StreamName] {
packet.Header.PayloadType = audioTrack.PayloadType()
packet.Header.SSRC = audioTrack.SSRC()
if writeErr := audioTrack.WriteRTP(packet); writeErr != nil {
log.Printf("Failed to write to audio track: %s", err)
continue
}
}
}
}()
go func() {
scanner := bufio.NewScanner(errOutput)
for scanner.Scan() {
log.Printf("[WEBRTC FFMPEG %s] %s", "demo", scanner.Text())
}
}()
}

View File

@ -13,6 +13,7 @@ import (
"github.com/markbates/pkger"
"gitlab.crans.org/nounous/ghostream/internal/monitoring"
"gitlab.crans.org/nounous/ghostream/stream/srt"
"gitlab.crans.org/nounous/ghostream/stream/telnet"
"gitlab.crans.org/nounous/ghostream/stream/webrtc"
)
@ -23,17 +24,18 @@ func viewerPostHandler(w http.ResponseWriter, r *http.Request) {
// Get stream ID from URL, or from domain name
path := r.URL.Path[1:]
if cfg.OneStreamPerDomain {
host := r.Host
if strings.Contains(host, ":") {
realHost, _, err := net.SplitHostPort(r.Host)
if err != nil {
log.Printf("Failed to split host and port from %s", r.Host)
return
}
host = realHost
host := r.Host
if strings.Contains(host, ":") {
realHost, _, err := net.SplitHostPort(r.Host)
if err != nil {
log.Printf("Failed to split host and port from %s", r.Host)
return
}
path = host
host = realHost
}
host = strings.Replace(host, ".", "-", -1)
if streamID, ok := cfg.MapDomainToStream[host]; ok {
path = streamID
}
// Decode client description
@ -72,20 +74,21 @@ func viewerPostHandler(w http.ResponseWriter, r *http.Request) {
func viewerGetHandler(w http.ResponseWriter, r *http.Request) {
// Get stream ID from URL, or from domain name
path := r.URL.Path[1:]
if cfg.OneStreamPerDomain {
host := r.Host
if strings.Contains(host, ":") {
realHost, _, err := net.SplitHostPort(r.Host)
if err != nil {
log.Printf("Failed to split host and port from %s", r.Host)
return
}
host = realHost
host := r.Host
if strings.Contains(host, ":") {
realHost, _, err := net.SplitHostPort(r.Host)
if err != nil {
log.Printf("Failed to split host and port from %s", r.Host)
return
}
host = realHost
}
host = strings.Replace(host, ".", "-", -1)
if streamID, ok := cfg.MapDomainToStream[host]; ok {
if path == "about" {
path = ""
} else {
path = host
path = streamID
}
}
@ -144,7 +147,9 @@ func statisticsHandler(w http.ResponseWriter, r *http.Request) {
enc := json.NewEncoder(w)
err := enc.Encode(struct {
ConnectedViewers int
}{webrtc.GetNumberConnectedSessions(streamID) + srt.GetNumberConnectedSessions(streamID)})
}{webrtc.GetNumberConnectedSessions(streamID) +
srt.GetNumberConnectedSessions(streamID) +
telnet.GetNumberConnectedSessions(streamID)})
if err != nil {
http.Error(w, "Failed to generate JSON.", http.StatusInternalServerError)
log.Printf("Failed to generate JSON: %s", err)

View File

@ -1,12 +1,12 @@
// Refresh viewer count by pulling metric from server
function refreshViewersCounter(period) {
function refreshViewersCounter(streamID, period) {
// Distinguish oneDomainPerStream mode
fetch("/_stats/" + (location.pathname === "/" ? location.host : location.pathname.substring(1)))
fetch("/_stats/" + streamID)
.then(response => response.json())
.then((data) => document.getElementById("connected-people").innerText = data.ConnectedViewers)
.catch(console.log)
setTimeout(() => {
refreshViewersCounter(period)
refreshViewersCounter(streamID, period)
}, period)
}

View File

@ -4,7 +4,7 @@
<head>
<meta charset="UTF-8">
<title>{{if .Path}}{{if not .Cfg.OneStreamPerDomain}}{{.Path}} - {{end}}{{end}}{{.Cfg.Name}}</title>
<title>{{if .Path}}{{.Path}} - {{end}}{{.Cfg.Name}}</title>
<link rel="stylesheet" href="static/css/style.css">
<link rel="stylesheet" href="static/css/player.css">
{{if .Cfg.CustomCSS}}<link rel="stylesheet" href="{{.Cfg.CustomCSS}}">{{end}}

View File

@ -50,7 +50,7 @@
// Wait a bit before pulling viewers counter for the first time
setTimeout(() => {
refreshViewersCounter({{.Cfg.ViewersCounterRefreshPeriod}})
refreshViewersCounter("{{.Path}}", {{.Cfg.ViewersCounterRefreshPeriod}})
}, 1000)
</script>
{{end}}

View File

@ -22,7 +22,7 @@ type Options struct {
Hostname string
ListenAddress string
Name string
OneStreamPerDomain bool
MapDomainToStream map[string]string
PlayerPoster string
SRTServerPort string
STUNServers []string