1
0
mirror of https://gitlab.crans.org/nounous/ghostream.git synced 2025-10-24 05:53:03 +02:00

3 Commits

Author SHA1 Message Date
Yohann D'ANELLO
fa281e6b31 Try to mux into RTP with FFMPEG bindings 2020-10-19 10:18:45 +02:00
Yohann D'ANELLO
698b83fe6f WebRTC is registering to the audio-transcoded stream 2020-10-18 22:07:11 +02:00
Yohann D'ANELLO
20776d897c Create audio transcoder 2020-10-18 21:46:36 +02:00
64 changed files with 961 additions and 1664 deletions

6
.gitignore vendored
View File

@@ -17,9 +17,3 @@ pkged.go
# Profiler and test files
*.prof
*.test
# Javascript tools
.eslintrc.js
node_modules
package.json
package-lock.json

View File

@@ -2,18 +2,8 @@ stages:
- test
- quality-assurance
.go-cache:
variables:
GOPATH: $CI_PROJECT_DIR/.go
before_script:
- mkdir -p .go
cache:
paths:
- .go/pkg/mod/
unit_tests:
image: golang:1.15-alpine
extends: .go-cache
stage: test
before_script:
- apk add --no-cache -X http://dl-cdn.alpinelinux.org/alpine/edge/community build-base ffmpeg gcc libsrt-dev
@@ -28,7 +18,6 @@ unit_tests:
linters:
image: golang:1.15-alpine
extends: .go-cache
stage: quality-assurance
script:
- go get -u golang.org/x/lint/golint

View File

@@ -12,6 +12,6 @@ FROM alpine:3.12
RUN apk add --no-cache -X https://dl-cdn.alpinelinux.org/alpine/edge/community/ ffmpeg libsrt
COPY --from=build_base /code/out/ghostream /app/ghostream
WORKDIR /app
# 2112 for monitoring, 8023 for Telnet, 8080 for Web, 9710 for SRT, 10000-11000 (UDP) for WebRTC
EXPOSE 2112 8023 8080 9710/udp 10000-11000/udp
# 2112 for monitoring, 8023 for Telnet, 8080 for Web, 9710 for SRT, 10000-10005 (UDP) for WebRTC
EXPOSE 2112 8023 8080 9710/udp 10000-10005/udp
CMD ["/app/ghostream"]

View File

@@ -6,7 +6,7 @@ import (
func TestBasicLogin(t *testing.T) {
basicCredentials := make(map[string]string)
basicCredentials["demo"] = "$2b$10$xuU7XFwmRX2CMgdSaA8rM.4Y8.BtRNzhUedwN0G8tCegDRNUERTCS"
basicCredentials["demo"] = "$2b$15$LRnG3eIHFlYIguTxZOLH7eHwbQC/vqjnLq6nDFiHSUDKIU.f5/1H6"
// Test good credentials
backend, _ := New(&Options{Credentials: basicCredentials})

View File

@@ -3,14 +3,12 @@ package ldap
import (
"github.com/go-ldap/ldap/v3"
"log"
)
// Options holds package configuration
type Options struct {
Aliases map[string]string
URI string
UserDn string
URI string
UserDn string
}
// LDAP authentification backend
@@ -22,12 +20,6 @@ type LDAP struct {
// Login tries to bind to LDAP
// Returns (true, nil) if success
func (a LDAP) Login(username string, password string) (bool, error) {
// Resolve stream alias if necessary
for aliasFor, ok := a.Cfg.Aliases[username]; ok; aliasFor, ok = a.Cfg.Aliases[username] {
log.Printf("[LDAP] Use stream alias %s for username %s", username, aliasFor)
username = aliasFor
}
// Try to bind as user
bindDn := "cn=" + username + "," + a.Cfg.UserDn
err := a.Conn.Bind(bindDn, password)

View File

@@ -1,118 +0,0 @@
<?xml version="1.0" encoding="UTF-8" ?>
<Server version="7">
<Name>OvenMediaEngine</Name>
<Type>origin</Type>
<IP>*</IP>
<Bind>
<Providers>
<RTMP>
<Port>1915</Port>
</RTMP>
</Providers>
<Publishers>
<WebRTC>
<Signalling>
<Port>3333</Port>
</Signalling>
<IceCandidates>
<IceCandidate>*:10006-10010/udp</IceCandidate>
</IceCandidates>
</WebRTC>
<HLS>
<Port>80</Port>
</HLS>
<DASH>
<Port>80</Port>
</DASH>
</Publishers>
</Bind>
<VirtualHosts>
<VirtualHost>
<Name>default</Name>
<Domain>
<Names>
<Name>*</Name>
</Names>
</Domain>
<Applications>
<Application>
<Name>play</Name>
<Type>live</Type>
<Encodes>
<Encode>
<Name>opus_only</Name>
<Audio>
<Codec>opus</Codec>
<Bitrate>128000</Bitrate>
<Samplerate>48000</Samplerate>
<Channel>2</Channel>
</Audio>
<Video>
<Bypass>true</Bypass>
</Video>
</Encode>
<Encode>
<Name>bypass</Name>
<Audio>
<Bypass>true</Bypass>
</Audio>
<Video>
<Bypass>true</Bypass>
</Video>
</Encode>
</Encodes>
<Streams>
<Stream>
<Name>${OriginStreamName}</Name>
<Profiles>
<Profile>opus_only</Profile>
</Profiles>
</Stream>
<Stream>
<Name>${OriginStreamName}_bypass</Name>
<Profiles>
<Profile>bypass</Profile>
</Profiles>
</Stream>
</Streams>
<Providers>
<RTMP>
<BlockDuplicateStreamName>true</BlockDuplicateStreamName>
</RTMP>
</Providers>
<Publishers>
<ThreadCount>2</ThreadCount>
<WebRTC>
<Timeout>30000</Timeout>
</WebRTC>
<HLS>
<SegmentDuration>2</SegmentDuration>
<SegmentCount>2</SegmentCount>
<CrossDomain>
<Url>*</Url>
</CrossDomain>
</HLS>
<DASH>
<SegmentDuration>2</SegmentDuration>
<SegmentCount>2</SegmentCount>
<CrossDomain>
<Url>*</Url>
</CrossDomain>
</DASH>
<LLDASH>
<SegmentDuration>2</SegmentDuration>
<CrossDomain>
<Url>*</Url>
</CrossDomain>
</LLDASH>
</Publishers>
</Application>
</Applications>
</VirtualHost>
</VirtualHosts>
</Server>

View File

@@ -26,10 +26,11 @@ services:
- "--certificatesResolvers.mytlschallenge.acme.httpChallenge.entryPoint=web"
ghostream:
build: https://gitlab.crans.org/nounous/ghostream.git
build: ..
restart: always
ports:
- 9710:9710/udp
- 10000-10005:10000-10005/udp
volumes:
- ./ghostream_data:/etc/ghostream:ro
labels:
@@ -39,30 +40,3 @@ services:
- "traefik.http.routers.ghostream.tls.certresolver=mytlschallenge"
- "traefik.http.routers.ghostream.service=ghostream"
- "traefik.http.services.ghostream.loadbalancer.server.port=8080"
ovenmediaengine:
image: airensoft/ovenmediaengine:0.10.8
restart: always
ports:
# WebRTC ICE
- 10006-10010:10006-10010/udp
volumes:
- ./ovenmediaengine_data/conf/Server-docker.xml:/opt/ovenmediaengine/bin/origin_conf/Server.xml:ro
labels:
- "traefik.http.middlewares.sslheader.headers.customrequestheaders.X-Forwarded-Proto=https"
- "traefik.http.routers.ovenmediaengine.rule=Host(`stream.example.com`) && PathPrefix(`/play/`)"
- "traefik.http.routers.ovenmediaengine.priority=101"
- "traefik.http.routers.ovenmediaengine.entrypoints=websecure"
- "traefik.http.routers.ovenmediaengine.tls.certresolver=mytlschallenge"
- "traefik.http.services.ovenmediaengine.loadbalancer.server.port=3333"
- "traefik.http.routers.ovenmediaengine.service=ovenmediaengine"
- "traefik.http.routers.ovenmediaengine.middlewares=sslheader"
- "traefik.http.routers.ovenmediaengine-hls.rule=Host(`stream.example.com`) && Path(`/play/{app_name:.*}/{filename:.*}.{ext:(m3u8|mpd|ts)}`)"
- "traefik.http.routers.ovenmediaengine-hls.priority=102"
- "traefik.http.routers.ovenmediaengine-hls.entrypoints=websecure"
- "traefik.http.routers.ovenmediaengine-hls.tls.certresolver=mytlschallenge"
- "traefik.http.services.ovenmediaengine-hls.loadbalancer.server.port=80"
- "traefik.http.routers.ovenmediaengine-hls.service=ovenmediaengine-hls"
- "traefik.http.routers.ovenmediaengine-hls.middlewares=sslheader"

View File

@@ -22,38 +22,29 @@ auth:
# Basic backend configuration
# To generate bcrypt hashed password from Python, use:
# python3 -c 'import bcrypt; print(bcrypt.hashpw(b"PASSWORD", bcrypt.gensalt(rounds=12)).decode("ascii"))'
# python3 -c 'import bcrypt; print(bcrypt.hashpw(b"PASSWORD", bcrypt.gensalt(rounds=15)).decode("ascii"))'
#
#basic:
# credentials:
# # Demo user with password "demo"
# demo: $2b$10$xuU7XFwmRX2CMgdSaA8rM.4Y8.BtRNzhUedwN0G8tCegDRNUERTCS
# demo: $2b$15$LRnG3eIHFlYIguTxZOLH7eHwbQC/vqjnLq6nDFiHSUDKIU.f5/1H6
# LDAP backend configuration
#
#ldap:
# uri: ldap://127.0.0.1:389
# userdn: cn=users,dc=example,dc=com
#
# # You can define aliases, to stream on stream.example.com/example with the credentials of the demo account.
# aliases:
# example: demo
#
## Stream forwarding ##
# Forward an incoming stream to other servers
# The URL can be anything FFMpeg can accept as an stream output
# If a file is specified, the name may contains %Y, %m, %d, %H, %M or %S
# that will be replaced by the current date information.
forwarding:
# By default nothing is forwarded.
#
# This example forwards a stream named "demo" to Twitch and YouTube,
# and save the record in a timestamped-file,
#demo:
# - rtmp://live-cdg.twitch.tv/app/STREAM_KEY
# - rtmp://a.rtmp.youtube.com/live2/STREAM_KEY
# - /home/ghostream/lives/%name/live-%Y-%m-%d-%H-%M-%S.flv
## Prometheus monitoring ##
# Expose a monitoring endpoint for Prometheus
@@ -66,19 +57,6 @@ monitoring:
# To limit access to only localhost, use 127.0.0.1:2112
#listenAddress: :2112
## OvenMediaEngine ##
# Send the stream data to OvenMediaEngine to handle properly the web client
ome:
# If you disable OME module, the laggy webrtc client will be used.
#
#enabled: true
#
# The URL where OME listens RTMP, without the prefix.
#url: ovenmediaengine:1915
#
# The OME app where OME is waiting for the data of Ghostream.
#app: play
## SRT server ##
# The SRT server receive incoming stream and can also serve video to clients.
srt:
@@ -178,29 +156,17 @@ web:
#
#widgetURL: ""
# IMPORTANT, CHANGE THIS
# You need to declare which entity you are and to specify an address to claim some content.
legalMentionsEntity: "l'association Crans"
legalMentionsAddress: "61 Avenue du Président Wilson, 94235 Cachan Cedex, France"
legalMentionsFullAddress:
- Association Cr@ns - ENS Paris-Saclay
- Notification de Contenus Illicites
- 4, avenue des Sciences
- 91190 Gif-sur-Yvette
- France
legalMentionsEmail: "bureau[at]crans.org"
## WebRTC server ##
webrtc:
# If you disable webrtc module, the web client won't be able to play streams.
#
#enabled: false
#enabled: true
# UDP port range used to stream
# This range must be opened in your firewall.
#
#minPortUDP: 10000
#maxPortUDP: 11000
#maxPortUDP: 10005
# STUN servers, you should host your own Coturn instance
#

11
go.mod
View File

@@ -3,14 +3,13 @@ module gitlab.crans.org/nounous/ghostream
go 1.13
require (
github.com/3d0c/gmf v0.0.0-20200614092945-e58d8d5a6035
github.com/go-ldap/ldap/v3 v3.2.3
github.com/gorilla/websocket v1.4.0
github.com/haivision/srtgo v0.0.0-20201025191851-67964e8f497a
github.com/haivision/srtgo v0.0.0-20200731151239-e00427ae473a
github.com/markbates/pkger v0.17.1
github.com/pion/rtp v1.6.1
github.com/pion/webrtc/v3 v3.0.0-beta.10
github.com/pkg/profile v1.5.0
github.com/pion/rtp v1.6.0
github.com/pion/webrtc/v3 v3.0.0-beta.5
github.com/prometheus/client_golang v1.7.1
github.com/sherifabdlnaby/configuro v0.0.2
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
)

50
go.sum
View File

@@ -7,6 +7,8 @@ dmitri.shuralyov.com/html/belt v0.0.0-20180602232347-f7d459c86be0/go.mod h1:JLBr
dmitri.shuralyov.com/service/change v0.0.0-20181023043359-a85b471d5412/go.mod h1:a1inKt/atXimZ4Mv927x+r7UpyzRUf4emIoiiSC2TN4=
dmitri.shuralyov.com/state v0.0.0-20180228185332-28bcc343414c/go.mod h1:0PRwlb0D6DFvNNtx+9ybjezNCa8XF0xaYcETyp6rHWU=
git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg=
github.com/3d0c/gmf v0.0.0-20200614092945-e58d8d5a6035 h1:QZb1aMKxiYdGGieyIDmXuw9I9YcGWGViTrpQ6vcZX7Q=
github.com/3d0c/gmf v0.0.0-20200614092945-e58d8d5a6035/go.mod h1:0QMRcUq2JsDECeAq7bj4h79k7XbhtTsrPUQf6G7qfPs=
github.com/Azure/go-ntlmssp v0.0.0-20200615164410-66371956d46c h1:/IBSNwUN8+eKzUzbJPqhK839ygXJ82sde8x3ogr6R28=
github.com/Azure/go-ntlmssp v0.0.0-20200615164410-66371956d46c/go.mod h1:chxPXzSsl7ZWRAuOIE23GDNzjWuZquvFlgA8xmpunjU=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
@@ -113,15 +115,14 @@ github.com/googleapis/gax-go v2.0.0+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk
github.com/googleapis/gax-go/v2 v2.0.3/go.mod h1:LLvjysVCY1JZeum8Z6l8qUty8fiNwE08qbEPm1M08qg=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw=
github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/haivision/srtgo v0.0.0-20201025191851-67964e8f497a h1:54abJQezjMoiP+xMQ3ZQbcDXFjqytAYm/n0EVqrYeXg=
github.com/haivision/srtgo v0.0.0-20201025191851-67964e8f497a/go.mod h1:7izzTiCO3zc9ZIVTFMjxUiYL+kgryFP9rl3bsweqdmc=
github.com/haivision/srtgo v0.0.0-20200731151239-e00427ae473a h1:JliMkv/mAqM5+QzG6Hkw1XcVl1crU8yIQGnhppMv7s0=
github.com/haivision/srtgo v0.0.0-20200731151239-e00427ae473a/go.mod h1:yVZ4oACfcnUAcxrh+0b6IuIWfkHLK3IAQ99tuuhRx54=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
@@ -192,30 +193,29 @@ github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pion/datachannel v1.4.21 h1:3ZvhNyfmxsAqltQrApLPQMhSFNA+aT87RqyCq4OXmf0=
github.com/pion/datachannel v1.4.21/go.mod h1:oiNyP4gHx2DIwRzX/MFyH0Rz/Gz05OgBlayAI2hAWjg=
github.com/pion/dtls/v2 v2.0.3 h1:3qQ0s4+TXD00rsllL8g8KQcxAs+Y/Z6oz618RXX6p14=
github.com/pion/dtls/v2 v2.0.3/go.mod h1:TUjyL8bf8LH95h81Xj7kATmzMRt29F/4lxpIPj2Xe4Y=
github.com/pion/ice/v2 v2.0.9 h1:oHbiN6Q9tgb8Gfu3I4cbr5mHRE1uqiuFABQ8CbWjIyk=
github.com/pion/ice/v2 v2.0.9/go.mod h1:NK+o39ynb+N1YSj9fPgWs3vjVcrsWw0KCr/311MqVq8=
github.com/pion/dtls/v2 v2.0.2 h1:FHCHTiM182Y8e15aFTiORroiATUI16ryHiQh8AIOJ1E=
github.com/pion/dtls/v2 v2.0.2/go.mod h1:27PEO3MDdaCfo21heT59/vsdmZc0zMt9wQPcSlLu/1I=
github.com/pion/ice/v2 v2.0.6 h1:7Jf3AX6VIjgO2tGRyT0RGGxkDYOF4m5I5DQzf34IN1Y=
github.com/pion/ice/v2 v2.0.6/go.mod h1:xOXvVRlQC/B7FPJeJYKY6IepFRAKb3t1un1K9boYaaQ=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
github.com/pion/mdns v0.0.4 h1:O4vvVqr4DGX63vzmO6Fw9vpy3lfztVWHGCQfyw0ZLSY=
github.com/pion/mdns v0.0.4/go.mod h1:R1sL0p50l42S5lJs91oNdUL58nm0QHrhxnSegr++qC0=
github.com/pion/quic v0.1.4 h1:bNz9sCJjlM3GqMdq7Fne57FiWfdyiJ++yHVbuqeoD3Y=
github.com/pion/quic v0.1.4/go.mod h1:dBhNvkLoQqRwfi6h3Vqj3IcPLgiW7rkZxBbRdp7Vzvk=
github.com/pion/randutil v0.0.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/rtcp v1.2.4 h1:NT3H5LkUGgaEapvp0HGik+a+CpflRF7KTD7H+o7OWIM=
github.com/pion/rtcp v1.2.4/go.mod h1:52rMNPWFsjr39z9B9MhnkqhPLoeHTv1aN63o/42bWE0=
github.com/pion/rtp v1.6.1 h1:2Y2elcVBrahYnHKN2X7rMHX/r1R4TEBMP1LaVu/wNhk=
github.com/pion/rtp v1.6.1/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
github.com/pion/rtcp v1.2.3 h1:2wrhKnqgSz91Q5nzYTO07mQXztYPtxL8a0XOss4rJqA=
github.com/pion/rtcp v1.2.3/go.mod h1:zGhIv0RPRF0Z1Wiij22pUt5W/c9fevqSzT4jje/oK7I=
github.com/pion/rtp v1.6.0 h1:4Ssnl/T5W2LzxHj9ssYpGVEQh3YYhQFNVmSWO88MMwk=
github.com/pion/rtp v1.6.0/go.mod h1:QgfogHsMBVE/RFNno467U/KBqfUywEH+HK+0rtnwsdI=
github.com/pion/sctp v1.7.10 h1:o3p3/hZB5Cx12RMGyWmItevJtZ6o2cpuxaw6GOS4x+8=
github.com/pion/sctp v1.7.10/go.mod h1:EhpTUQu1/lcK3xI+eriS6/96fWetHGCvBi9MSsnaBN0=
github.com/pion/sctp v1.7.11 h1:UCnj7MsobLKLuP/Hh+JMiI/6W5Bs/VF45lWKgHFjSIE=
github.com/pion/sctp v1.7.11/go.mod h1:EhpTUQu1/lcK3xI+eriS6/96fWetHGCvBi9MSsnaBN0=
github.com/pion/sdp/v3 v3.0.2 h1:UNnSPVaMM+Pdu/mR9UvAyyo6zkdYbKeuOooCwZvTl/g=
github.com/pion/sdp/v3 v3.0.2/go.mod h1:bNiSknmJE0HYBprTHXKPQ3+JjacTv5uap92ueJZKsRk=
github.com/pion/srtp v1.5.2 h1:25DmvH+fqKZDqvX64vTwnycVwL9ooJxHF/gkX16bDBY=
github.com/pion/srtp v1.5.2/go.mod h1:NiBff/MSxUwMUwx/fRNyD/xGE+dVvf8BOCeXhjCXZ9U=
github.com/pion/sdp/v3 v3.0.1 h1:we4OyeTT6s+6sxrAKy/LVywskx1dzUQlh4xu3b+iAs0=
github.com/pion/sdp/v3 v3.0.1/go.mod h1:bNiSknmJE0HYBprTHXKPQ3+JjacTv5uap92ueJZKsRk=
github.com/pion/srtp v1.5.1 h1:9Q3jAfslYZBt+C69SI/ZcONJh9049JUHZWYRRf5KEKw=
github.com/pion/srtp v1.5.1/go.mod h1:B+QgX5xPeQTNc1CJStJPHzOlHK66ViMDWTT0HZTCkcA=
github.com/pion/stun v0.3.5 h1:uLUCBCkQby4S1cf6CGuR9QrVOKcvUwFeemaC865QHDg=
github.com/pion/stun v0.3.5/go.mod h1:gDMim+47EeEtfWogA37n6qXZS88L5V6LqFcf+DZA2UA=
github.com/pion/transport v0.8.10/go.mod h1:tBmha/UCjpum5hqTWhfAEs3CO4/tHSg0MYRhSzR+CZ8=
@@ -226,14 +226,12 @@ github.com/pion/turn/v2 v2.0.4 h1:oDguhEv2L/4rxwbL9clGLgtzQPjtuZwCdoM7Te8vQVk=
github.com/pion/turn/v2 v2.0.4/go.mod h1:1812p4DcGVbYVBTiraUmP50XoKye++AMkbfp+N27mog=
github.com/pion/udp v0.1.0 h1:uGxQsNyrqG3GLINv36Ff60covYmfrLoxzwnCsIYspXI=
github.com/pion/udp v0.1.0/go.mod h1:BPELIjbwE9PRbd/zxI/KYBnbo7B6+oA6YuEaNE8lths=
github.com/pion/webrtc/v3 v3.0.0-beta.10 h1:1aBn9jv/oe4v2Uf47HutWIjg2i2ZP/O7HqpgKPqSuhE=
github.com/pion/webrtc/v3 v3.0.0-beta.10/go.mod h1:GlriYYHJ5KkNsCunm3oFDPql4TDTrrNoI9iSWWSnafA=
github.com/pion/webrtc/v3 v3.0.0-beta.5 h1:A1hApeo77Kp2sHWwd53rIHZZm5W9JXEn//5jHj9+fdc=
github.com/pion/webrtc/v3 v3.0.0-beta.5/go.mod h1:HjQaNPq5SdNazGlwiim6/lWBkZD97mPPyDttr7byNA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/profile v1.5.0 h1:042Buzk+NhDI+DeSAA62RwJL8VAuZUMQZUjCsRz1Mug=
github.com/pkg/profile v1.5.0/go.mod h1:qBsxPvzyUincmltOk6iyRVxHYg4adc0OFOv72ZdLa18=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
@@ -352,8 +350,6 @@ golang.org/x/crypto v0.0.0-20200604202706-70a84ac30bf9 h1:vEg9joUBmeBcK9iSJftGNf
golang.org/x/crypto v0.0.0-20200604202706-70a84ac30bf9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a h1:vclmkQCjlDX5OydZ9wv8rBCcS0QyQY66Mpf/7BZbInM=
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
@@ -389,8 +385,6 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200822124328-c89045814202 h1:VvcQYSHwXgi7W+TpUR6A9g6Up98WAHf3f/ulnJ62IyA=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201002202402-0a1ea396d57c h1:dk0ukUIHmGHqASjP0iue2261isepFCC6XRCSd1nHgDw=
golang.org/x/net v0.0.0-20201002202402-0a1ea396d57c/go.mod h1:iQL9McJNjoIa5mjH6nYTCTZXUN6RP+XW3eib7Ya3XcI=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@@ -423,10 +417,6 @@ golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200610111108-226ff32320da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200926100807-9d91bd62050c h1:38q6VNPWR010vN82/SB121GujZNIfAUb4YttE2rhGuc=
golang.org/x/sys v0.0.0-20200926100807-9d91bd62050c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -456,8 +446,6 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/api v0.0.0-20180910000450-7ca32eb868bf/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
google.golang.org/api v0.0.0-20181030000543-1d582fd0359e/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
google.golang.org/api v0.1.0/go.mod h1:UGEZY7KEX120AnNLIHFMKIo4obdJhkp2tPbaPlQx13Y=

View File

@@ -2,7 +2,7 @@
package config
import (
"gitlab.crans.org/nounous/ghostream/stream/ovenmediaengine"
"gitlab.crans.org/nounous/ghostream/transcoder/audio"
"net"
"github.com/sherifabdlnaby/configuro"
@@ -24,7 +24,6 @@ type Config struct {
Auth auth.Options
Forwarding forwarding.Options
Monitoring monitoring.Options
OME ovenmediaengine.Options
Srt srt.Options
Telnet telnet.Options
Transcoder transcoder.Options
@@ -42,9 +41,8 @@ func New() *Config {
Credentials: make(map[string]string),
},
LDAP: ldap.Options{
Aliases: make(map[string]string),
URI: "ldap://127.0.0.1:389",
UserDn: "cn=users,dc=example,dc=com",
URI: "ldap://127.0.0.1:389",
UserDn: "cn=users,dc=example,dc=com",
},
},
Forwarding: make(map[string][]string),
@@ -52,11 +50,6 @@ func New() *Config {
Enabled: true,
ListenAddress: ":2112",
},
OME: ovenmediaengine.Options{
Enabled: true,
URL: "ovenmediaengine:1915",
App: "play",
},
Srt: srt.Options{
Enabled: true,
ListenAddress: ":9710",
@@ -67,6 +60,10 @@ func New() *Config {
ListenAddress: ":8023",
},
Transcoder: transcoder.Options{
Audio: audio.Options{
Enabled: true,
Bitrate: 160,
},
Text: text.Options{
Enabled: false,
Width: 80,
@@ -83,15 +80,10 @@ func New() *Config {
MapDomainToStream: make(map[string]string),
PlayerPoster: "/static/img/no_stream.svg",
ViewersCounterRefreshPeriod: 20000,
LegalMentionsEntity: "l'association Crans",
LegalMentionsAddress: "61 Avenue du Président Wilson, 94235 Cachan Cedex, France",
LegalMentionsFullAddress: []string{"Association Cr@ns - ENS Paris-Saclay",
"Notification de Contenus Illicites", "4, avenue des Sciences", "91190 Gif-sur-Yvette", "France"},
LegalMentionsEmail: "bureau[at]crans.org",
},
WebRTC: webrtc.Options{
Enabled: false,
MaxPortUDP: 11000,
Enabled: true,
MaxPortUDP: 10005,
MinPortUDP: 10000,
STUNServers: []string{"stun:stun.l.google.com:19302"},
},
@@ -130,7 +122,7 @@ func Load() (*Config, error) {
// If no credentials register, add demo account with password "demo"
if len(cfg.Auth.Basic.Credentials) < 1 {
cfg.Auth.Basic.Credentials["demo"] = "$2b$10$xuU7XFwmRX2CMgdSaA8rM.4Y8.BtRNzhUedwN0G8tCegDRNUERTCS"
cfg.Auth.Basic.Credentials["demo"] = "$2b$15$LRnG3eIHFlYIguTxZOLH7eHwbQC/vqjnLq6nDFiHSUDKIU.f5/1H6"
}
return cfg, nil

21
main.go
View File

@@ -5,14 +5,12 @@
package main
import (
"gitlab.crans.org/nounous/ghostream/stream/ovenmediaengine"
"log"
"github.com/pkg/profile"
"gitlab.crans.org/nounous/ghostream/auth"
"gitlab.crans.org/nounous/ghostream/internal/config"
"gitlab.crans.org/nounous/ghostream/internal/monitoring"
"gitlab.crans.org/nounous/ghostream/messaging"
"gitlab.crans.org/nounous/ghostream/stream"
"gitlab.crans.org/nounous/ghostream/stream/forwarding"
"gitlab.crans.org/nounous/ghostream/stream/srt"
"gitlab.crans.org/nounous/ghostream/stream/telnet"
@@ -22,9 +20,6 @@ import (
)
func main() {
// TODO Don't always profile if not needed
defer profile.Start().Stop()
// Configure logger
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
@@ -43,18 +38,24 @@ func main() {
defer authBackend.Close()
}
// WebRTC session description channels
remoteSdpChan := make(chan struct {
StreamID string
RemoteDescription webrtc.SessionDescription
})
localSdpChan := make(chan webrtc.SessionDescription)
// Init streams messaging
streams := messaging.New()
streams := make(map[string]*stream.Stream)
// Start routines
go transcoder.Init(streams, &cfg.Transcoder)
go forwarding.Serve(streams, cfg.Forwarding)
go monitoring.Serve(&cfg.Monitoring)
go ovenmediaengine.Serve(streams, &cfg.OME)
go srt.Serve(streams, authBackend, &cfg.Srt)
go telnet.Serve(streams, &cfg.Telnet)
go web.Serve(streams, &cfg.Web, &cfg.OME)
go webrtc.Serve(streams, &cfg.WebRTC)
go web.Serve(streams, remoteSdpChan, localSdpChan, &cfg.Web)
go webrtc.Serve(streams, remoteSdpChan, localSdpChan, &cfg.WebRTC)
// Wait for routines
select {}

View File

@@ -1,89 +0,0 @@
// Package messaging defines a structure to communication between inputs and outputs
package messaging
import (
"sync"
"github.com/pion/webrtc/v3"
)
// Quality holds a specific stream quality.
// It makes packages able to subscribe to an incoming stream.
type Quality struct {
// Incoming data come from this channel
Broadcast chan<- []byte
// Incoming data will be outputted to all those outputs.
// Use a map to be able to delete an item.
outputs map[chan []byte]struct{}
// Mutex to lock outputs map
lockOutputs sync.Mutex
// WebRTC session descriptor exchange.
// When new client connects, a SDP arrives on WebRtcRemoteSdp,
// then webrtc package answers on WebRtcLocalSdp.
WebRtcLocalSdp chan webrtc.SessionDescription
WebRtcRemoteSdp chan webrtc.SessionDescription
}
func newQuality() (q *Quality) {
q = &Quality{}
broadcast := make(chan []byte, 1024)
q.Broadcast = broadcast
q.outputs = make(map[chan []byte]struct{})
q.WebRtcLocalSdp = make(chan webrtc.SessionDescription, 1)
q.WebRtcRemoteSdp = make(chan webrtc.SessionDescription, 1)
go q.run(broadcast)
return q
}
func (q *Quality) run(broadcast <-chan []byte) {
for msg := range broadcast {
q.lockOutputs.Lock()
for output := range q.outputs {
select {
case output <- msg:
default:
// If full, do a ring buffer
// Check that output is not of size zero
if len(output) > 1 {
<-output
}
}
}
q.lockOutputs.Unlock()
}
// Incoming chan has been closed, close all outputs
q.lockOutputs.Lock()
for ch := range q.outputs {
delete(q.outputs, ch)
close(ch)
}
q.lockOutputs.Unlock()
}
// Close the incoming chan, this will also delete all outputs.
func (q *Quality) Close() {
close(q.Broadcast)
}
// Register a new output on a stream.
func (q *Quality) Register(output chan []byte) {
q.lockOutputs.Lock()
q.outputs[output] = struct{}{}
q.lockOutputs.Unlock()
}
// Unregister removes an output.
func (q *Quality) Unregister(output chan []byte) {
// Make sure we did not already close this output
q.lockOutputs.Lock()
_, ok := q.outputs[output]
if ok {
delete(q.outputs, output)
close(output)
}
defer q.lockOutputs.Unlock()
}

View File

@@ -1,84 +0,0 @@
// Package messaging defines a structure to communication between inputs and outputs
package messaging
import (
"errors"
"sync"
)
// Stream makes packages able to subscribe to an incoming stream
type Stream struct {
// Different qualities of this stream
qualities map[string]*Quality
// Mutex to lock outputs map
lockQualities sync.Mutex
// Count clients for statistics
nbClients int
}
func newStream() (s *Stream) {
s = &Stream{}
s.qualities = make(map[string]*Quality)
s.nbClients = 0
return s
}
// Close stream.
func (s *Stream) Close() {
for quality := range s.qualities {
s.DeleteQuality(quality)
}
}
// CreateQuality creates a new quality associated with this stream.
func (s *Stream) CreateQuality(name string) (quality *Quality, err error) {
// If quality already exist, fail
if _, ok := s.qualities[name]; ok {
return nil, errors.New("quality already exists")
}
s.lockQualities.Lock()
quality = newQuality()
s.qualities[name] = quality
s.lockQualities.Unlock()
return quality, nil
}
// DeleteQuality removes a stream quality.
func (s *Stream) DeleteQuality(name string) {
// Make sure we did not already close this output
s.lockQualities.Lock()
if _, ok := s.qualities[name]; ok {
s.qualities[name].Close()
delete(s.qualities, name)
}
s.lockQualities.Unlock()
}
// GetQuality gets a specific stream quality.
func (s *Stream) GetQuality(name string) (quality *Quality, err error) {
s.lockQualities.Lock()
quality, ok := s.qualities[name]
s.lockQualities.Unlock()
if !ok {
return nil, errors.New("quality does not exist")
}
return quality, nil
}
// ClientCount returns the number of clients.
func (s *Stream) ClientCount() int {
return s.nbClients
}
// IncrementClientCount increments the number of clients.
func (s *Stream) IncrementClientCount() {
s.nbClients++
}
// DecrementClientCount decrements the number of clients.
func (s *Stream) DecrementClientCount() {
s.nbClients--
}

View File

@@ -1,98 +0,0 @@
// Package messaging defines a structure to communication between inputs and outputs
package messaging
import (
"errors"
"log"
"sync"
)
// Streams hold all application streams.
type Streams struct {
// Associate each stream name to the stream
streams map[string]*Stream
// Mutex to lock streams
lockStreams sync.Mutex
// Subscribers get notified when a new stream is created
// Use a map to be able to delete a subscriber
eventSubscribers map[chan string]struct{}
// Mutex to lock eventSubscribers
lockSubscribers sync.Mutex
}
// New creates a new stream list.
func New() (l *Streams) {
l = &Streams{}
l.streams = make(map[string]*Stream)
l.eventSubscribers = make(map[chan string]struct{})
return l
}
// Subscribe to get notified on new stream.
func (l *Streams) Subscribe(output chan string) {
l.lockSubscribers.Lock()
l.eventSubscribers[output] = struct{}{}
l.lockSubscribers.Unlock()
}
// Unsubscribe to no longer get notified on new stream.
func (l *Streams) Unsubscribe(output chan string) {
// Make sure we did not already delete this subscriber
l.lockSubscribers.Lock()
if _, ok := l.eventSubscribers[output]; ok {
delete(l.eventSubscribers, output)
}
l.lockSubscribers.Unlock()
}
// Create a new stream.
func (l *Streams) Create(name string) (s *Stream, err error) {
// If stream already exist, fail
if _, ok := l.streams[name]; ok {
return nil, errors.New("stream already exists")
}
// Create stream
s = newStream()
l.lockStreams.Lock()
l.streams[name] = s
l.lockStreams.Unlock()
// Notify
l.lockSubscribers.Lock()
for sub := range l.eventSubscribers {
select {
case sub <- name:
default:
log.Printf("Failed to announce stream '%s' to subscriber", name)
}
}
l.lockSubscribers.Unlock()
return s, nil
}
// Get a stream.
func (l *Streams) Get(name string) (s *Stream, err error) {
// If stream does exist, return it
l.lockStreams.Lock()
s, ok := l.streams[name]
l.lockStreams.Unlock()
if !ok {
return nil, errors.New("stream does not exist")
}
return s, nil
}
// Delete a stream.
func (l *Streams) Delete(name string) {
// Make sure we did not already delete this stream
l.lockStreams.Lock()
if _, ok := l.streams[name]; ok {
l.streams[name].Close()
delete(l.streams, name)
}
l.lockStreams.Unlock()
}

View File

@@ -1,55 +0,0 @@
package messaging
import "testing"
func TestWithOneStream(t *testing.T) {
streams := New()
// Subscribe to new streams
event := make(chan string, 8)
streams.Subscribe(event)
// Create a stream
stream, err := streams.Create("demo")
if err != nil {
t.Errorf("Failed to create stream")
}
// Check that we receive the creation event
e := <-event
if e != "demo" {
t.Errorf("Message has wrong content: %s != demo", e)
}
// Create a quality
quality, err := stream.CreateQuality("source")
if err != nil {
t.Errorf("Failed to create quality")
}
// Register one output
output := make(chan []byte, 64)
quality.Register(output)
stream.IncrementClientCount()
// Try to pass one message
quality.Broadcast <- []byte("hello world")
msg := <-output
if string(msg) != "hello world" {
t.Errorf("Message has wrong content: %s != hello world", msg)
}
// Check client count
if count := stream.ClientCount(); count != 1 {
t.Errorf("Client counter returned %d, expected 1", count)
}
// Unregister
quality.Unregister(output)
stream.DecrementClientCount()
// Check client count
if count := stream.ClientCount(); count != 0 {
t.Errorf("Client counter returned %d, expected 0", count)
}
}

View File

@@ -3,13 +3,11 @@ package forwarding
import (
"bufio"
"fmt"
"log"
"os/exec"
"strings"
"time"
"gitlab.crans.org/nounous/ghostream/messaging"
"gitlab.crans.org/nounous/ghostream/stream"
)
// Options to configure the stream forwarding.
@@ -17,66 +15,43 @@ import (
type Options map[string][]string
// Serve handles incoming packets from SRT and forward them to other external services
func Serve(streams *messaging.Streams, cfg Options) {
func Serve(streams map[string]*stream.Stream, cfg Options) {
if len(cfg) < 1 {
// No forwarding, ignore
return
}
// Subscribe to new stream event
event := make(chan string, 8)
streams.Subscribe(event)
log.Printf("Stream forwarding initialized")
for {
for name, st := range streams {
fwdCfg, ok := cfg[name]
if !ok {
// Not configured
continue
}
// For each new stream
for name := range event {
streamCfg, ok := cfg[name]
if !ok {
// Not configured
continue
// Start forwarding
log.Printf("Starting forwarding for '%s'", name)
go forward(st, fwdCfg)
}
// Get stream
stream, err := streams.Get(name)
if err != nil {
log.Printf("Failed to get stream '%s'", name)
return
}
// Get specific quality
// FIXME: make it possible to forward other qualities
qualityName := "source"
quality, err := stream.GetQuality(qualityName)
if err != nil {
log.Printf("Failed to get quality '%s'", qualityName)
}
// Start forwarding
log.Printf("Starting forwarding for '%s' quality '%s'", name, qualityName)
go forward(name, quality, streamCfg)
// Regulary pull stream list,
// it may be better to tweak the messaging system
// to get an event on a new stream.
time.Sleep(time.Second)
}
}
// Start a FFMPEG instance and redirect stream output to forwarded streams
func forward(streamName string, q *messaging.Quality, fwdCfg []string) {
func forward(st *stream.Stream, fwdCfg []string) {
output := make(chan []byte, 1024)
q.Register(output)
st.Register(output)
// Launch FFMPEG instance
params := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0"}
params := []string{"-hide_banner", "-loglevel", "error", "-re", "-i", "pipe:0"}
for _, url := range fwdCfg {
// If the url should be date-formatted, replace special characters with the current time information
now := time.Now()
formattedURL := strings.ReplaceAll(url, "%Y", fmt.Sprintf("%04d", now.Year()))
formattedURL = strings.ReplaceAll(formattedURL, "%m", fmt.Sprintf("%02d", now.Month()))
formattedURL = strings.ReplaceAll(formattedURL, "%d", fmt.Sprintf("%02d", now.Day()))
formattedURL = strings.ReplaceAll(formattedURL, "%H", fmt.Sprintf("%02d", now.Hour()))
formattedURL = strings.ReplaceAll(formattedURL, "%M", fmt.Sprintf("%02d", now.Minute()))
formattedURL = strings.ReplaceAll(formattedURL, "%S", fmt.Sprintf("%02d", now.Second()))
formattedURL = strings.ReplaceAll(formattedURL, "%name", streamName)
params = append(params, "-f", "flv",
"-c:v", "copy", "-c:a", "aac", "-b:a", "160k", "-ar", "44100", formattedURL)
params = append(params, "-f", "flv", "-preset", "ultrafast", "-tune", "zerolatency",
"-c", "copy", url)
}
ffmpeg := exec.Command("ffmpeg", params...)
@@ -102,14 +77,14 @@ func forward(streamName string, q *messaging.Quality, fwdCfg []string) {
_ = input.Close()
_ = errOutput.Close()
_ = ffmpeg.Process.Kill()
q.Unregister(output)
st.Unregister(output)
}()
// Log standard error output
go func() {
scanner := bufio.NewScanner(errOutput)
for scanner.Scan() {
log.Printf("[FORWARDING FFMPEG %s] %s", streamName, scanner.Text())
log.Printf("[FORWARDING FFMPEG] %s", scanner.Text())
}
}()

View File

@@ -6,7 +6,7 @@ import (
"testing"
"time"
"gitlab.crans.org/nounous/ghostream/messaging"
"gitlab.crans.org/nounous/ghostream/stream"
"gitlab.crans.org/nounous/ghostream/stream/srt"
)
@@ -35,7 +35,7 @@ func TestForwardStream(t *testing.T) {
cfg["demo"] = []string{"rtmp://127.0.0.1:1936/live/app"}
// Register forwarding stream list
streams := messaging.New()
streams := make(map[string]*stream.Stream)
go Serve(streams, cfg)
// Serve SRT Server without authentification backend

99
stream/messaging.go Normal file
View File

@@ -0,0 +1,99 @@
// Package stream defines a structure to communication between inputs and outputs
package stream
import (
"sync"
)
// Stream makes packages able to subscribe to an incoming stream
type Stream struct {
// Incoming data come from this channel
Broadcast chan<- []byte
// Use a map to be able to delete an item
outputs map[chan []byte]struct{}
// Count clients for statistics
nbClients int
// Mutex to lock outputs map
lock sync.Mutex
}
// New creates a new stream.
func New() *Stream {
s := &Stream{}
broadcast := make(chan []byte, 1024)
s.Broadcast = broadcast
s.outputs = make(map[chan []byte]struct{})
s.nbClients = 0
go s.run(broadcast)
return s
}
func (s *Stream) run(broadcast <-chan []byte) {
for msg := range broadcast {
s.lock.Lock()
for output := range s.outputs {
select {
case output <- msg:
default:
// If full, do a ring buffer
// Check that output is not of size zero
if len(output) > 1 {
<-output
}
}
}
s.lock.Unlock()
}
// Incoming chan has been closed, close all outputs
s.lock.Lock()
for ch := range s.outputs {
delete(s.outputs, ch)
close(ch)
}
s.lock.Unlock()
}
// Close the incoming chan, this will also delete all outputs
func (s *Stream) Close() {
close(s.Broadcast)
}
// Register a new output on a stream.
func (s *Stream) Register(output chan []byte) {
s.lock.Lock()
defer s.lock.Unlock()
s.outputs[output] = struct{}{}
}
// Unregister removes an output.
// If hidden in true, then do not count this client.
func (s *Stream) Unregister(output chan []byte) {
s.lock.Lock()
defer s.lock.Unlock()
// Make sure we did not already close this output
_, ok := s.outputs[output]
if ok {
delete(s.outputs, output)
close(output)
}
}
// ClientCount returns the number of clients
func (s *Stream) ClientCount() int {
return s.nbClients
}
// IncrementClientCount increments the number of clients
func (s *Stream) IncrementClientCount() {
s.nbClients++
}
// DecrementClientCount decrements the number of clients
func (s *Stream) DecrementClientCount() {
s.nbClients--
}

42
stream/messaging_test.go Normal file
View File

@@ -0,0 +1,42 @@
package stream
import (
"testing"
)
func TestWithoutOutputs(t *testing.T) {
stream := New()
defer stream.Close()
stream.Broadcast <- []byte("hello world")
}
func TestWithOneOutput(t *testing.T) {
stream := New()
defer stream.Close()
// Register one output
output := make(chan []byte, 64)
stream.Register(output)
stream.IncrementClientCount()
// Try to pass one message
stream.Broadcast <- []byte("hello world")
msg := <-output
if string(msg) != "hello world" {
t.Errorf("Message has wrong content: %s != hello world", msg)
}
// Check client count
if count := stream.ClientCount(); count != 1 {
t.Errorf("Client counter returned %d, expected 1", count)
}
// Unregister
stream.Unregister(output)
stream.DecrementClientCount()
// Check client count
if count := stream.ClientCount(); count != 0 {
t.Errorf("Client counter returned %d, expected 0", count)
}
}

View File

@@ -1,112 +0,0 @@
// Package ovenmediaengine provides the forwarding to an ovenmediaengine server to handle the web client
package ovenmediaengine
import (
"bufio"
"fmt"
"log"
"os/exec"
"gitlab.crans.org/nounous/ghostream/messaging"
)
// Options holds ovenmediaengine package configuration
type Options struct {
Enabled bool
URL string
App string
}
var (
cfg *Options
)
// Serve handles incoming packets from SRT and forward them to OME
func Serve(streams *messaging.Streams, c *Options) {
cfg = c
if !c.Enabled {
return
}
// Subscribe to new stream event
event := make(chan string, 8)
streams.Subscribe(event)
log.Printf("Stream forwarding to OME initialized")
// For each new stream
for name := range event {
// Get stream
stream, err := streams.Get(name)
if err != nil {
log.Printf("Failed to get stream '%s'", name)
return
}
qualityName := "source"
quality, err := stream.GetQuality(qualityName)
if err != nil {
log.Printf("Failed to get quality '%s'", qualityName)
}
// Start forwarding
log.Printf("Starting forwarding to OME for '%s'", name)
go forward(name, quality)
}
}
// Start a FFMPEG instance and redirect stream output to OME
func forward(name string, q *messaging.Quality) {
output := make(chan []byte, 1024)
q.Register(output)
// TODO When a new OME version got released with SRT support, directly forward SRT packets, without using unwanted RTMP transport
// Launch FFMPEG instance
params := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0", "-f", "flv", "-c:v", "copy",
"-c:a", "aac", "-b:a", "160k", "-ar", "44100",
fmt.Sprintf("rtmp://%s/%s/%s", cfg.URL, cfg.App, name)}
ffmpeg := exec.Command("ffmpeg", params...)
// Open pipes
input, err := ffmpeg.StdinPipe()
if err != nil {
log.Printf("Error while opening forwarding ffmpeg input pipe: %s", err)
return
}
errOutput, err := ffmpeg.StderrPipe()
if err != nil {
log.Printf("Error while opening forwarding ffmpeg output pipe: %s", err)
return
}
// Start FFMpeg
if err := ffmpeg.Start(); err != nil {
log.Printf("Error while starting forwarding ffmpeg instance: %s", err)
}
// Kill FFMPEG when stream is ended
defer func() {
_ = input.Close()
_ = errOutput.Close()
_ = ffmpeg.Process.Kill()
q.Unregister(output)
}()
// Log standard error output
go func() {
scanner := bufio.NewScanner(errOutput)
for scanner.Scan() {
log.Printf("[FORWARDING OME FFMPEG %s] %s", name, scanner.Text())
}
}()
// Read stream output and redirect immediately to ffmpeg
for data := range output {
_, err := input.Write(data)
if err != nil {
log.Printf("Error while writing to forwarded stream: %s", err)
break
}
}
}

View File

@@ -5,26 +5,21 @@ import (
"log"
"github.com/haivision/srtgo"
"gitlab.crans.org/nounous/ghostream/messaging"
"gitlab.crans.org/nounous/ghostream/stream"
)
func handleStreamer(socket *srtgo.SrtSocket, streams *messaging.Streams, name string) {
// Create stream
stream, err := streams.Create(name)
if err != nil {
log.Printf("Error on stream creating: %s", err)
func handleStreamer(socket *srtgo.SrtSocket, streams map[string]*stream.Stream, name string) {
// Check stream does not exist
if _, ok := streams[name]; ok {
log.Print("Stream already exists, refusing new streamer")
socket.Close()
return
}
// Create source quality
q, err := stream.CreateQuality("source")
if err != nil {
log.Printf("Error on quality creating: %s", err)
socket.Close()
return
}
log.Printf("New SRT streamer for stream '%s' quality 'source'", name)
// Create stream
log.Printf("New SRT streamer for stream %s", name)
st := stream.New()
streams[name] = st
// Read RTP packets forever and send them to the WebRTC Client
for {
@@ -47,38 +42,29 @@ func handleStreamer(socket *srtgo.SrtSocket, streams *messaging.Streams, name st
// Send raw data to other streams
buff = buff[:n]
q.Broadcast <- buff
st.Broadcast <- buff
}
// Close stream
streams.Delete(name)
st.Close()
socket.Close()
delete(streams, name)
}
func handleViewer(socket *srtgo.SrtSocket, streams *messaging.Streams, name string) {
// Get requested stream
stream, err := streams.Get(name)
if err != nil {
log.Printf("Failed to get stream: %s", err)
socket.Close()
return
}
func handleViewer(s *srtgo.SrtSocket, streams map[string]*stream.Stream, name string) {
log.Printf("New SRT viewer for stream %s", name)
// Get requested quality
// FIXME: make qualities available
qualityName := "source"
q, err := stream.GetQuality(qualityName)
if err != nil {
log.Printf("Failed to get quality: %s", err)
socket.Close()
// Get requested stream
st, ok := streams[name]
if !ok {
log.Println("Stream does not exist, refusing new viewer")
return
}
log.Printf("New SRT viewer for stream %s quality %s", name, qualityName)
// Register new output
c := make(chan []byte, 1024)
q.Register(c)
stream.IncrementClientCount()
st.Register(c)
st.IncrementClientCount()
// Receive data and send them
for data := range c {
@@ -88,7 +74,7 @@ func handleViewer(socket *srtgo.SrtSocket, streams *messaging.Streams, name stri
}
// Send data
_, err := socket.Write(data, 1000)
_, err := s.Write(data, 1000)
if err != nil {
log.Printf("Remove SRT viewer because of sending error, %s", err)
break
@@ -96,7 +82,7 @@ func handleViewer(socket *srtgo.SrtSocket, streams *messaging.Streams, name stri
}
// Close output
q.Unregister(c)
stream.DecrementClientCount()
socket.Close()
st.Unregister(c)
st.DecrementClientCount()
s.Close()
}

View File

@@ -1,6 +1,9 @@
// Package srt serves a SRT server
package srt
// #include <srt/srt.h>
import "C"
import (
"log"
"net"
@@ -9,7 +12,7 @@ import (
"github.com/haivision/srtgo"
"gitlab.crans.org/nounous/ghostream/auth"
"gitlab.crans.org/nounous/ghostream/messaging"
"gitlab.crans.org/nounous/ghostream/stream"
)
// Options holds web package configuration
@@ -36,7 +39,7 @@ func splitHostPort(hostport string) (string, uint16, error) {
}
// Serve SRT server
func Serve(streams *messaging.Streams, authBackend auth.Backend, cfg *Options) {
func Serve(streams map[string]*stream.Stream, authBackend auth.Backend, cfg *Options) {
if !cfg.Enabled {
// SRT is not enabled, ignore
return
@@ -59,7 +62,7 @@ func Serve(streams *messaging.Streams, authBackend auth.Backend, cfg *Options) {
for {
// Wait for new connection
s, _, err := sck.Accept()
s, err := sck.Accept()
if err != nil {
// Something wrong happened
log.Println(err)
@@ -70,7 +73,7 @@ func Serve(streams *messaging.Streams, authBackend auth.Backend, cfg *Options) {
// Without this, the SRT buffer might get full before reading it
// streamid can be "name:password" for streamer or "name" for viewer
streamID, err := s.GetSockOptString(srtgo.SRTO_STREAMID)
streamID, err := s.GetSockOptString(C.SRTO_STREAMID)
if err != nil {
log.Print("Failed to get socket streamid")
continue

View File

@@ -6,7 +6,7 @@ import (
"testing"
"time"
"gitlab.crans.org/nounous/ghostream/messaging"
"gitlab.crans.org/nounous/ghostream/stream"
)
// TestSplitHostPort Try to split a host like 127.0.0.1:1234 in host, port (127.0.0.1, 1234à
@@ -58,7 +58,7 @@ func TestServeSRT(t *testing.T) {
}
// Init streams messaging and SRT server
streams := messaging.New()
streams := make(map[string]*stream.Stream)
go Serve(streams, nil, &Options{Enabled: true, ListenAddress: ":9711", MaxClients: 2})
ffmpeg := exec.Command("ffmpeg", "-hide_banner", "-loglevel", "error",

View File

@@ -7,7 +7,7 @@ import (
"strings"
"time"
"gitlab.crans.org/nounous/ghostream/messaging"
"gitlab.crans.org/nounous/ghostream/stream"
)
// Options holds telnet package configuration
@@ -17,7 +17,7 @@ type Options struct {
}
// Serve Telnet server
func Serve(streams *messaging.Streams, cfg *Options) {
func Serve(streams map[string]*stream.Stream, cfg *Options) {
if !cfg.Enabled {
// Telnet is not enabled, ignore
return
@@ -32,17 +32,17 @@ func Serve(streams *messaging.Streams, cfg *Options) {
// Handle each new client
for {
socket, err := listener.Accept()
s, err := listener.Accept()
if err != nil {
log.Printf("Error while accepting TCP socket: %s", err)
log.Printf("Error while accepting TCP socket: %s", s)
continue
}
go handleViewer(socket, streams, cfg)
go handleViewer(s, streams, cfg)
}
}
func handleViewer(s net.Conn, streams *messaging.Streams, cfg *Options) {
func handleViewer(s net.Conn, streams map[string]*stream.Stream, cfg *Options) {
// Prompt user about stream name
if _, err := s.Write([]byte("[GHOSTREAM]\nEnter stream name: ")); err != nil {
log.Printf("Error while writing to TCP socket: %s", err)
@@ -56,7 +56,7 @@ func handleViewer(s net.Conn, streams *messaging.Streams, cfg *Options) {
s.Close()
return
}
name := strings.TrimSpace(string(buff[:n]))
name := strings.TrimSpace(string(buff[:n])) + "@text"
if len(name) < 1 {
// Too short, exit
s.Close()
@@ -67,9 +67,9 @@ func handleViewer(s net.Conn, streams *messaging.Streams, cfg *Options) {
time.Sleep(time.Second)
// Get requested stream
stream, err := streams.Get(name)
if err != nil {
log.Printf("Kicking new Telnet viewer: %s", err)
st, ok := streams[name]
if !ok {
log.Println("Stream does not exist, kicking new Telnet viewer")
if _, err := s.Write([]byte("This stream is inactive.\n")); err != nil {
log.Printf("Error while writing to TCP socket: %s", err)
}
@@ -77,23 +77,11 @@ func handleViewer(s net.Conn, streams *messaging.Streams, cfg *Options) {
return
}
// Get requested quality
qualityName := "text"
q, err := stream.GetQuality(qualityName)
if err != nil {
log.Printf("Kicking new Telnet viewer: %s", err)
if _, err := s.Write([]byte("This stream is not converted to text.\n")); err != nil {
log.Printf("Error while writing to TCP socket: %s", err)
}
s.Close()
return
}
log.Printf("New Telnet viewer for stream %s quality %s", name, qualityName)
// Register new client
log.Printf("New Telnet viewer for stream '%s'", name)
c := make(chan []byte, 128)
q.Register(c)
stream.IncrementClientCount()
st.Register(c)
st.IncrementClientCount()
// Hide terminal cursor
if _, err = s.Write([]byte("\033[?25l")); err != nil {
@@ -118,7 +106,7 @@ func handleViewer(s net.Conn, streams *messaging.Streams, cfg *Options) {
}
// Close output
q.Unregister(c)
stream.DecrementClientCount()
st.Unregister(c)
st.DecrementClientCount()
s.Close()
}

View File

@@ -3,13 +3,13 @@ package telnet
import (
"testing"
"gitlab.crans.org/nounous/ghostream/messaging"
"gitlab.crans.org/nounous/ghostream/stream"
)
// TestTelnetOutput creates a TCP client that connects to the server and get one image.
func TestTelnetOutput(t *testing.T) {
// Try to start Telnet server while it is disabled
streams := messaging.New()
streams := make(map[string]*stream.Stream)
go Serve(streams, &Options{Enabled: false})
// FIXME test connect

View File

@@ -2,88 +2,114 @@
package webrtc
import (
"bufio"
"fmt"
"log"
"math/rand"
"net"
"os/exec"
"github.com/3d0c/gmf"
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
"gitlab.crans.org/nounous/ghostream/messaging"
"gitlab.crans.org/nounous/ghostream/stream"
"log"
"net"
"strings"
"time"
)
func ingest(name string, q *messaging.Quality) {
// Register to get stream
videoInput := make(chan []byte, 1024)
q.Register(videoInput)
var (
activeStream map[string]struct{}
)
// FIXME Mux into RTP without having multiple UDP listeners
firstPort := int(rand.Int31n(63535)) + 2000
// Open UDP listeners for RTP Packets
audioListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: firstPort})
if err != nil {
log.Printf("Faited to open UDP listener %s", err)
return
}
videoListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: firstPort + 1})
if err != nil {
log.Printf("Faited to open UDP listener %s", err)
return
}
// Start ffmpag to convert videoInput to video and audio UDP
ffmpeg, err := startFFmpeg(videoInput, firstPort)
if err != nil {
log.Printf("Error while starting ffmpeg: %s", err)
return
}
// Receive video
go func() {
inboundRTPPacket := make([]byte, 1500) // UDP MTU
for {
n, _, err := videoListener.ReadFromUDP(inboundRTPPacket)
if err != nil {
log.Printf("Failed to read from UDP: %s", err)
break
}
packet := &rtp.Packet{}
if err := packet.Unmarshal(inboundRTPPacket[:n]); err != nil {
log.Printf("Failed to unmarshal RTP srtPacket: %s", err)
func autoIngest(streams map[string]*stream.Stream) {
// Regulary check existing streams
activeStream = make(map[string]struct{})
for {
for name, st := range streams {
if strings.Contains(name, "@") {
// Not a source stream, pass
continue
}
if videoTracks[name] == nil {
videoTracks[name] = make([]*webrtc.Track, 0)
if _, ok := activeStream[name]; ok {
// Stream is already ingested
continue
}
// Write RTP srtPacket to all video tracks
// Adapt payload and SSRC to match destination
for _, videoTrack := range videoTracks[name] {
packet.Header.PayloadType = videoTrack.PayloadType()
packet.Header.SSRC = videoTrack.SSRC()
if writeErr := videoTrack.WriteRTP(packet); writeErr != nil {
log.Printf("Failed to write to video track: %s", err)
continue
}
}
// Start ingestion
log.Printf("Starting webrtc for '%s'", name)
// FIXME Ensure that the audio stream exist, but that's poop code
time.Sleep(time.Second)
go ingest(name, st, streams[name+"@audio"])
}
}()
// Receive audio
// Regulary pull stream list,
// it may be better to tweak the messaging system
// to get an event on a new stream.
time.Sleep(time.Second)
}
}
func ingest(name string, input *stream.Stream, audio *stream.Stream) {
// Register to get stream
videoInput := make(chan []byte, 1024)
input.Register(videoInput)
audioInput := make(chan []byte, 1024)
audio.Register(audioInput)
activeStream[name] = struct{}{}
inputCtx := gmf.NewCtx()
avioInputCtx, _ := gmf.NewAVIOContext(inputCtx, &gmf.AVIOHandlers{ReadPacket: func() ([]byte, int) {
data := <-audioInput
return data, len(data)
}})
log.Println("Open input")
inputCtx.SetPb(avioInputCtx).OpenInput("")
log.Println("Opened")
defer inputCtx.CloseInput()
defer avioInputCtx.Release()
if audioTracks[name] == nil {
audioTracks[name] = make([]*webrtc.Track, 0)
}
udpListener, _ := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 1234})
outputCtx, _ := gmf.NewOutputCtxWithFormatName("rtp://127.0.0.1:1234", "rtp")
avioOutputCtx, _ := gmf.NewAVIOContext(outputCtx, &gmf.AVIOHandlers{WritePacket: func(data []byte) int {
n := len(data)
log.Printf("Read %d bytes", n)
return n
}})
// FIXME DON'T RAN AN UDP LISTENER, PLIZ GET DIRECTLY UDP PACKETS, WHY IS IT SO COMPLICATED????
// outputCtx.SetPb(avioOutputCtx)
defer outputCtx.CloseOutput()
defer avioOutputCtx.Release()
log.Printf("%d streams", inputCtx.StreamsCnt())
for i := 0; i < inputCtx.StreamsCnt(); i++ {
srcStream, err := inputCtx.GetStream(i)
if err != nil {
log.Println("GetStream error")
}
outputCtx.AddStreamWithCodeCtx(srcStream.CodecCtx())
}
outputCtx.Dump()
if err := outputCtx.WriteHeader(); err != nil {
log.Printf("Unable to write RTP header: %s", err)
}
// Receive audio data
go func() {
inboundRTPPacket := make([]byte, 1500) // UDP MTU
buff := make([]byte, 1500)
for {
n, _, err := audioListener.ReadFromUDP(inboundRTPPacket)
if err != nil {
log.Printf("Failed to read from UDP: %s", err)
break
n, _ := udpListener.Read(buff)
if n == 0 {
return
}
packet := &rtp.Packet{}
if err := packet.Unmarshal(inboundRTPPacket[:n]); err != nil {
if err := packet.Unmarshal(buff[:n]); err != nil {
log.Printf("Failed to unmarshal RTP srtPacket: %s", err)
continue
}
@@ -98,36 +124,136 @@ func ingest(name string, q *messaging.Quality) {
packet.Header.PayloadType = audioTrack.PayloadType()
packet.Header.SSRC = audioTrack.SSRC()
if writeErr := audioTrack.WriteRTP(packet); writeErr != nil {
log.Printf("Failed to write to audio track: %s", err)
log.Printf("Failed to write to audio track: %s", writeErr)
continue
}
}
}
}()
// Wait for stopped ffmpeg
if err = ffmpeg.Wait(); err != nil {
log.Printf("Faited to wait for ffmpeg: %s", err)
first := false
for packet := range inputCtx.GetNewPackets() {
if first { //if read from rtsp ,the first packets is wrong.
if err := outputCtx.WritePacketNoBuffer(packet); err != nil {
log.Printf("Error while writing packet: %s", err)
}
}
first = true
packet.Free()
}
// Close UDP listeners
if err = videoListener.Close(); err != nil {
log.Printf("Faited to close UDP listener: %s", err)
}
if err = audioListener.Close(); err != nil {
log.Printf("Faited to close UDP listener: %s", err)
}
q.Unregister(videoInput)
select {}
// TODO Register to all substreams and make RTP packets. Don't transcode in this package.
/* // Open a UDP Listener for RTP Packets on port 5004
videoListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5004})
if err != nil {
log.Printf("Faited to open UDP listener %s", err)
return
}
audioListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5005})
if err != nil {
log.Printf("Faited to open UDP listener %s", err)
return
}
// Start ffmpag to convert videoInput to video and audio UDP
ffmpeg, err := startFFmpeg(videoInput)
if err != nil {
log.Printf("Error while starting ffmpeg: %s", err)
return
}
// Receive video
go func() {
inboundRTPPacket := make([]byte, 1500) // UDP MTU
for {
n, _, err := videoListener.ReadFromUDP(inboundRTPPacket)
if err != nil {
log.Printf("Failed to read from UDP: %s", err)
break
}
packet := &rtp.Packet{}
if err := packet.Unmarshal(inboundRTPPacket[:n]); err != nil {
log.Printf("Failed to unmarshal RTP srtPacket: %s", err)
continue
}
if videoTracks[name] == nil {
videoTracks[name] = make([]*webrtc.Track, 0)
}
// Write RTP srtPacket to all video tracks
// Adapt payload and SSRC to match destination
for _, videoTrack := range videoTracks[name] {
packet.Header.PayloadType = videoTrack.PayloadType()
packet.Header.SSRC = videoTrack.SSRC()
if writeErr := videoTrack.WriteRTP(packet); writeErr != nil {
log.Printf("Failed to write to video track: %s", err)
continue
}
}
}
}()
// Receive audio
go func() {
inboundRTPPacket := make([]byte, 1500) // UDP MTU
for {
n, _, err := audioListener.ReadFromUDP(inboundRTPPacket)
if err != nil {
log.Printf("Failed to read from UDP: %s", err)
break
}
packet := &rtp.Packet{}
if err := packet.Unmarshal(inboundRTPPacket[:n]); err != nil {
log.Printf("Failed to unmarshal RTP srtPacket: %s", err)
continue
}
if audioTracks[name] == nil {
audioTracks[name] = make([]*webrtc.Track, 0)
}
// Write RTP srtPacket to all audio tracks
// Adapt payload and SSRC to match destination
for _, audioTrack := range audioTracks[name] {
packet.Header.PayloadType = audioTrack.PayloadType()
packet.Header.SSRC = audioTrack.SSRC()
if writeErr := audioTrack.WriteRTP(packet); writeErr != nil {
log.Printf("Failed to write to audio track: %s", err)
continue
}
}
}
}()
// Wait for stopped ffmpeg
if err = ffmpeg.Wait(); err != nil {
log.Printf("Faited to wait for ffmpeg: %s", err)
}
// Close UDP listeners
if err = videoListener.Close(); err != nil {
log.Printf("Faited to close UDP listener: %s", err)
}
if err = audioListener.Close(); err != nil {
log.Printf("Faited to close UDP listener: %s", err)
}*/
delete(activeStream, name)
}
func startFFmpeg(in <-chan []byte, listeningPort int) (ffmpeg *exec.Cmd, err error) {
/* func startFFmpeg(in <-chan []byte) (ffmpeg *exec.Cmd, err error) {
ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0",
// Audio
"-vn", "-c:a", "libopus", "-b:a", "96k",
"-f", "rtp", fmt.Sprintf("rtp://127.0.0.1:%d", listeningPort),
// Source
"-an", "-c:v", "copy",
"-f", "rtp", fmt.Sprintf("rtp://127.0.0.1:%d", listeningPort+1)}
"-an", "-vcodec", "libvpx", "-crf", "10", "-cpu-used", "5", "-b:v", "6000k", "-maxrate", "8000k", "-bufsize", "12000k", // TODO Change bitrate when changing quality
"-qmin", "10", "-qmax", "42", "-threads", "4", "-deadline", "1", "-error-resilient", "1",
"-auto-alt-ref", "1",
"-f", "rtp", "rtp://127.0.0.1:5004",
"-vn", "-acodec", "libopus", "-cpu-used", "5", "-deadline", "1", "-qmin", "10", "-qmax", "42", "-error-resilient", "1", "-auto-alt-ref", "1",
"-f", "rtp", "rtp://127.0.0.1:5005"}
ffmpeg = exec.Command("ffmpeg", ffmpegArgs...)
// Handle errors output
@@ -161,4 +287,4 @@ func startFFmpeg(in <-chan []byte, listeningPort int) (ffmpeg *exec.Cmd, err err
// Start process
err = ffmpeg.Start()
return ffmpeg, err
}
} */

View File

@@ -8,7 +8,7 @@ import (
"github.com/pion/webrtc/v3"
"gitlab.crans.org/nounous/ghostream/internal/monitoring"
"gitlab.crans.org/nounous/ghostream/messaging"
"gitlab.crans.org/nounous/ghostream/stream"
)
// Options holds web package configuration
@@ -45,10 +45,13 @@ func GetNumberConnectedSessions(streamID string) int {
// newPeerHandler is called when server receive a new session description
// this initiates a WebRTC connection and return server description
func newPeerHandler(name string, localSdpChan chan webrtc.SessionDescription, remoteSdp webrtc.SessionDescription, cfg *Options) {
func newPeerHandler(localSdpChan chan webrtc.SessionDescription, remoteSdp struct {
StreamID string
RemoteDescription webrtc.SessionDescription
}, cfg *Options) {
// Create media engine using client SDP
mediaEngine := webrtc.MediaEngine{}
if err := mediaEngine.PopulateFromSDP(remoteSdp); err != nil {
if err := mediaEngine.PopulateFromSDP(remoteSdp.RemoteDescription); err != nil {
log.Println("Failed to create new media engine", err)
localSdpChan <- webrtc.SessionDescription{}
return
@@ -75,7 +78,7 @@ func newPeerHandler(name string, localSdpChan chan webrtc.SessionDescription, re
}
// Create video track
codec, payloadType := getPayloadType(mediaEngine, webrtc.RTPCodecTypeVideo, "H264")
codec, payloadType := getPayloadType(mediaEngine, webrtc.RTPCodecTypeVideo, "VP8")
videoTrack, err := webrtc.NewTrack(payloadType, rand.Uint32(), "video", "pion", codec)
if err != nil {
log.Println("Failed to create new video track", err)
@@ -103,13 +106,13 @@ func newPeerHandler(name string, localSdpChan chan webrtc.SessionDescription, re
}
// Set the remote SessionDescription
if err = peerConnection.SetRemoteDescription(remoteSdp); err != nil {
if err = peerConnection.SetRemoteDescription(remoteSdp.RemoteDescription); err != nil {
log.Println("Failed to set remote description", err)
localSdpChan <- webrtc.SessionDescription{}
return
}
streamID := name
streamID := remoteSdp.StreamID
split := strings.SplitN(streamID, "@", 2)
streamID = split[0]
quality := "source"
@@ -179,7 +182,10 @@ func getPayloadType(m webrtc.MediaEngine, codecType webrtc.RTPCodecType, codecNa
}
// Serve WebRTC media streaming server
func Serve(streams *messaging.Streams, cfg *Options) {
func Serve(streams map[string]*stream.Stream, remoteSdpChan chan struct {
StreamID string
RemoteDescription webrtc.SessionDescription
}, localSdpChan chan webrtc.SessionDescription, cfg *Options) {
if !cfg.Enabled {
// WebRTC is not enabled, ignore
return
@@ -187,42 +193,17 @@ func Serve(streams *messaging.Streams, cfg *Options) {
log.Printf("WebRTC server using UDP from port %d to %d", cfg.MinPortUDP, cfg.MaxPortUDP)
// WebRTC ingested tracks
// Allocate memory
videoTracks = make(map[string][]*webrtc.Track)
audioTracks = make(map[string][]*webrtc.Track)
// Subscribe to new stream event
event := make(chan string, 8)
streams.Subscribe(event)
// Ingest data
go autoIngest(streams)
// For each new stream
for name := range event {
// Get stream
stream, err := streams.Get(name)
if err != nil {
log.Printf("Failed to get stream '%s'", name)
}
// Get specific quality
// FIXME: make it possible to forward other qualities
qualityName := "source"
quality, err := stream.GetQuality(qualityName)
if err != nil {
log.Printf("Failed to get quality '%s'", qualityName)
}
// Start forwarding
log.Printf("Starting webrtc for '%s' quality '%s'", name, qualityName)
go ingest(name, quality)
go listenSdp(name, quality.WebRtcLocalSdp, quality.WebRtcRemoteSdp, cfg)
}
}
func listenSdp(name string, localSdp, remoteSdp chan webrtc.SessionDescription, cfg *Options) {
// Handle new connections
for {
// Wait for incoming session description
// then send the local description to browser
newPeerHandler(name, localSdp, <-remoteSdp, cfg)
newPeerHandler(localSdpChan, <-remoteSdpChan, cfg)
}
}

View File

@@ -5,19 +5,24 @@ import (
"testing"
"github.com/pion/webrtc/v3"
"gitlab.crans.org/nounous/ghostream/messaging"
"gitlab.crans.org/nounous/ghostream/stream"
)
func TestServe(t *testing.T) {
// Init streams messaging and WebRTC server
streams := messaging.New()
streams := make(map[string]*stream.Stream)
remoteSdpChan := make(chan struct {
StreamID string
RemoteDescription webrtc.SessionDescription
})
localSdpChan := make(chan webrtc.SessionDescription)
cfg := Options{
Enabled: true,
MinPortUDP: 10000,
MaxPortUDP: 10005,
STUNServers: []string{"stun:stun.l.google.com:19302"},
}
go Serve(streams, &cfg)
go Serve(streams, remoteSdpChan, localSdpChan, &cfg)
// New client connection
mediaEngine := webrtc.MediaEngine{}
@@ -26,7 +31,7 @@ func TestServe(t *testing.T) {
peerConnection, _ := api.NewPeerConnection(webrtc.Configuration{})
// Create video track
codec, payloadType := getPayloadType(mediaEngine, webrtc.RTPCodecTypeVideo, "H264")
codec, payloadType := getPayloadType(mediaEngine, webrtc.RTPCodecTypeVideo, "VP8")
videoTrack, err := webrtc.NewTrack(payloadType, rand.Uint32(), "video", "pion", codec)
if err != nil {
t.Error("Failed to create new video track", err)
@@ -53,6 +58,12 @@ func TestServe(t *testing.T) {
peerConnection.SetLocalDescription(offer)
<-gatherComplete
// FIXME: Send offer to server
// Send offer to server
remoteSdpChan <- struct {
StreamID string
RemoteDescription webrtc.SessionDescription
}{"demo", *peerConnection.LocalDescription()}
_ = <-localSdpChan
// FIXME: verify connection did work
}

153
transcoder/audio/audio.go Normal file
View File

@@ -0,0 +1,153 @@
// Package audio transcode a stream to filter the audio
package audio
import (
"bufio"
"fmt"
"github.com/3d0c/gmf"
"io"
"log"
"os/exec"
"strings"
"time"
"gitlab.crans.org/nounous/ghostream/stream"
)
// Options holds audio package configuration
type Options struct {
Enabled bool
Bitrate int
}
// Init text transcoder
func Init(streams map[string]*stream.Stream, cfg *Options) {
if !cfg.Enabled {
// Audio transcode is not enabled, ignore
return
}
// Regulary check existing streams
for {
for sourceName, sourceStream := range streams {
if strings.Contains(sourceName, "@") {
// Not a source stream, pass
continue
}
// Check that the transcoded stream does not already exist
name := sourceName + "@audio"
_, ok := streams[name]
if ok {
// Stream is already transcoded
continue
}
// Start conversion
log.Printf("Starting audio transcode '%s'", name)
st := stream.New()
streams[name] = st
go transcode(sourceStream, st, cfg)
}
// Regulary pull stream list,
// it may be better to tweak the messaging system
// to get an event on a new stream.
time.Sleep(time.Second)
}
}
// Extract audio from stream
func transcode(input, output *stream.Stream, cfg *Options) {
// Start ffmpeg to transcode video to audio
videoInput := make(chan []byte, 1024)
input.Register(videoInput)
ffmpeg, audio, err := startFFmpeg(videoInput, cfg)
if err != nil {
log.Printf("Error while starting ffmpeg: %s", err)
return
}
dataBuff := make([]byte, gmf.IO_BUFFER_SIZE) // UDP MTU
for {
n, err := (*audio).Read(dataBuff)
if err != nil {
log.Printf("An error occurred while reading input: %s", err)
break
}
if n == 0 {
// Stream is finished
break
}
output.Broadcast <- dataBuff[:n]
}
// Stop transcode
_ = ffmpeg.Process.Kill()
_ = (*audio).Close()
}
// Start a ffmpeg instance to convert stream into audio
func startFFmpeg(in <-chan []byte, cfg *Options) (*exec.Cmd, *io.ReadCloser, error) {
// TODO in a future release: remove FFMPEG dependency and transcode directly using the libopus API
// FIXME It seems impossible to get a RTP Packet from standard output.
// We need to find a clean solution, without waiting on UDP listeners.
// FIXME We should also not build RTP packets here.
/* port := 0
var udpListener *net.UDPConn
var err error
for {
port = rand.Intn(65535)
udpListener, err = net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: port})
if err != nil {
if strings.Contains(fmt.Sprintf("%s", err), "address already in use") {
continue
}
return nil, nil, err
}
break
}*/
bitrate := fmt.Sprintf("%dk", cfg.Bitrate)
// Use copy audio codec, assume for now that libopus is used by the streamer
ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0",
"-vn", "-c:a", "copy", "-b:a", bitrate, "-f", "mpegts", "pipe:1"}
ffmpeg := exec.Command("ffmpeg", ffmpegArgs...)
// Handle errors output
errOutput, err := ffmpeg.StderrPipe()
if err != nil {
return nil, nil, err
}
go func() {
scanner := bufio.NewScanner(errOutput)
for scanner.Scan() {
log.Printf("[AUDIO FFMPEG %s] %s", "demo", scanner.Text())
}
}()
// Handle text output
output, err := ffmpeg.StdoutPipe()
if err != nil {
return nil, nil, err
}
// Handle stream input
input, err := ffmpeg.StdinPipe()
if err != nil {
return nil, nil, err
}
go func() {
for data := range in {
_, _ = input.Write(data)
}
}()
// Start process
err = ffmpeg.Start()
return ffmpeg, &output, err
}

View File

@@ -0,0 +1 @@
package audio

View File

@@ -8,8 +8,10 @@ import (
"io"
"log"
"os/exec"
"strings"
"time"
"gitlab.crans.org/nounous/ghostream/messaging"
"gitlab.crans.org/nounous/ghostream/stream"
)
// Options holds text package configuration
@@ -21,46 +23,45 @@ type Options struct {
}
// Init text transcoder
func Init(streams *messaging.Streams, cfg *Options) {
func Init(streams map[string]*stream.Stream, cfg *Options) {
if !cfg.Enabled {
// Text transcode is not enabled, ignore
return
}
// Subscribe to new stream event
event := make(chan string, 8)
streams.Subscribe(event)
// Regulary check existing streams
for {
for sourceName, sourceStream := range streams {
if strings.Contains(sourceName, "@") {
// Not a source stream, pass
continue
}
// For each new stream
for name := range event {
// Get stream
stream, err := streams.Get(name)
if err != nil {
log.Printf("Failed to get stream '%s'", name)
// Check that the transcoded stream does not already exist
name := sourceName + "@text"
_, ok := streams[name]
if ok {
// Stream is already transcoded
continue
}
// Start conversion
log.Printf("Starting text transcode '%s'", name)
st := stream.New()
streams[name] = st
go transcode(sourceStream, st, cfg)
}
// Get specific quality
// FIXME: make it possible to forward other qualities
qualityName := "source"
quality, err := stream.GetQuality(qualityName)
if err != nil {
log.Printf("Failed to get quality '%s'", qualityName)
}
// Create new text quality
outputQuality, err := stream.CreateQuality("text")
if err != nil {
log.Printf("Failed to create quality 'text': %s", err)
}
// Start forwarding
log.Printf("Starting text transcoder for '%s' quality '%s'", name, qualityName)
go transcode(quality, outputQuality, cfg)
// Regulary pull stream list,
// it may be better to tweak the messaging system
// to get an event on a new stream.
time.Sleep(time.Second)
}
}
// Convert video to ANSI text
func transcode(input, output *messaging.Quality, cfg *Options) {
func transcode(input, output *stream.Stream, cfg *Options) {
// Start ffmpeg to transcode video to rawvideo
videoInput := make(chan []byte, 1024)
input.Register(videoInput)

View File

@@ -2,16 +2,19 @@
package transcoder
import (
"gitlab.crans.org/nounous/ghostream/messaging"
"gitlab.crans.org/nounous/ghostream/stream"
"gitlab.crans.org/nounous/ghostream/transcoder/audio"
"gitlab.crans.org/nounous/ghostream/transcoder/text"
)
// Options holds text package configuration
type Options struct {
Text text.Options
Text text.Options
Audio audio.Options
}
// Init all transcoders
func Init(streams *messaging.Streams, cfg *Options) {
func Init(streams map[string]*stream.Stream, cfg *Options) {
go text.Init(streams, &cfg.Text)
go audio.Init(streams, &cfg.Audio)
}

View File

@@ -10,37 +10,72 @@ import (
"net/http"
"regexp"
"strings"
"sync"
"time"
"github.com/markbates/pkger"
"gitlab.crans.org/nounous/ghostream/internal/monitoring"
"gitlab.crans.org/nounous/ghostream/stream/ovenmediaengine"
"gitlab.crans.org/nounous/ghostream/stream/webrtc"
)
var (
// Precompile regex
validPath = regexp.MustCompile("^/[a-z0-9@_-]*$")
counterMutex = new(sync.Mutex)
connectedClients = make(map[string]map[string]int64)
)
// Handle site index and viewer pages
func viewerHandler(w http.ResponseWriter, r *http.Request) {
// Validation on path
if validPath.FindStringSubmatch(r.URL.Path) == nil {
http.NotFound(w, r)
log.Printf("Replied not found on %s", r.URL.Path)
// Handle WebRTC session description exchange via POST
func viewerPostHandler(w http.ResponseWriter, r *http.Request) {
// Limit response body to 128KB
r.Body = http.MaxBytesReader(w, r.Body, 131072)
// Get stream ID from URL, or from domain name
path := r.URL.Path[1:]
host := r.Host
if strings.Contains(host, ":") {
realHost, _, err := net.SplitHostPort(r.Host)
if err != nil {
log.Printf("Failed to split host and port from %s", r.Host)
return
}
host = realHost
}
host = strings.Replace(host, ".", "-", -1)
if streamID, ok := cfg.MapDomainToStream[host]; ok {
path = streamID
}
// Decode client description
dec := json.NewDecoder(r.Body)
dec.DisallowUnknownFields()
remoteDescription := webrtc.SessionDescription{}
if err := dec.Decode(&remoteDescription); err != nil {
http.Error(w, "The JSON WebRTC offer is malformed", http.StatusBadRequest)
return
}
// Check method
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed.", http.StatusMethodNotAllowed)
// Exchange session descriptions with WebRTC stream server
remoteSdpChan <- struct {
StreamID string
RemoteDescription webrtc.SessionDescription
}{StreamID: path, RemoteDescription: remoteDescription}
localDescription := <-localSdpChan
// Send server description as JSON
jsonDesc, err := json.Marshal(localDescription)
if err != nil {
http.Error(w, "An error occurred while formating response", http.StatusInternalServerError)
log.Println("An error occurred while sending session description", err)
return
}
w.Header().Set("Content-Type", "application/json")
_, err = w.Write(jsonDesc)
if err != nil {
log.Println("An error occurred while sending session description", err)
}
// Increment monitoring
monitoring.WebSessions.Inc()
}
func viewerGetHandler(w http.ResponseWriter, r *http.Request) {
// Get stream ID from URL, or from domain name
path := r.URL.Path[1:]
host := r.Host
@@ -67,8 +102,7 @@ func viewerHandler(w http.ResponseWriter, r *http.Request) {
Cfg *Options
Path string
WidgetURL string
OMECfg *ovenmediaengine.Options
}{Path: path, Cfg: cfg, WidgetURL: "", OMECfg: omeCfg}
}{Path: path, Cfg: cfg, WidgetURL: ""}
// Load widget is user does not disable it with ?nowidget
if _, ok := r.URL.Query()["nowidget"]; !ok {
@@ -88,6 +122,27 @@ func viewerHandler(w http.ResponseWriter, r *http.Request) {
monitoring.WebViewerServed.Inc()
}
// Handle site index and viewer pages
// POST requests are used to exchange WebRTC session descriptions
func viewerHandler(w http.ResponseWriter, r *http.Request) {
// Validation on path
if validPath.FindStringSubmatch(r.URL.Path) == nil {
http.NotFound(w, r)
log.Printf("Replied not found on %s", r.URL.Path)
return
}
// Route depending on HTTP method
switch r.Method {
case http.MethodGet:
viewerGetHandler(w, r)
case http.MethodPost:
viewerPostHandler(w, r)
default:
http.Error(w, "Sorry, only GET and POST methods are supported.", http.StatusBadRequest)
}
}
func staticHandler() http.Handler {
// Set up static files server
staticFs := http.FileServer(pkger.Dir("/web/static"))
@@ -95,48 +150,22 @@ func staticHandler() http.Handler {
}
func statisticsHandler(w http.ResponseWriter, r *http.Request) {
// Retrieve stream name from URL
name := strings.SplitN(strings.Replace(r.URL.Path[7:], "/", "", -1), "@", 2)[0]
userCount := 0
// Clients have a unique generated identifier per session, that expires in 40 seconds.
// Each time the client connects to this page, the identifier is renewed.
// Yeah, that's not a good way to have stats, but it works...
if connectedClients[name] == nil {
counterMutex.Lock()
connectedClients[name] = make(map[string]int64)
counterMutex.Unlock()
}
currentTime := time.Now().Unix()
if _, ok := r.URL.Query()["uid"]; ok {
uid := r.URL.Query()["uid"][0]
counterMutex.Lock()
connectedClients[name][uid] = currentTime
counterMutex.Unlock()
}
toDelete := make([]string, 0)
counterMutex.Lock()
for uid, oldTime := range connectedClients[name] {
if currentTime-oldTime > 40 {
toDelete = append(toDelete, uid)
// Get all substreams
for _, outputType := range []string{"", "@720p", "@480p", "@360p", "@240p", "@text"} {
// Get requested stream
stream, ok := streams[name+outputType]
if ok {
// Get number of output channels
userCount += stream.ClientCount()
}
}
for _, uid := range toDelete {
delete(connectedClients[name], uid)
}
counterMutex.Unlock()
// Get requested stream
stream, err := streams.Get(name)
if err == nil {
userCount = stream.ClientCount()
userCount += webrtc.GetNumberConnectedSessions(name)
userCount += len(connectedClients[name])
}
// Display connected users statistics
enc := json.NewEncoder(w)
err = enc.Encode(struct{ ConnectedViewers int }{userCount})
err := enc.Encode(struct{ ConnectedViewers int }{userCount})
if err != nil {
http.Error(w, "Failed to generate JSON.", http.StatusInternalServerError)
log.Printf("Failed to generate JSON: %s", err)

View File

@@ -4,8 +4,6 @@ import (
"net/http"
"net/http/httptest"
"testing"
"gitlab.crans.org/nounous/ghostream/messaging"
)
func TestViewerPageGET(t *testing.T) {
@@ -14,9 +12,6 @@ func TestViewerPageGET(t *testing.T) {
t.Errorf("Failed to load templates: %v", err)
}
// Init streams messaging
streams = messaging.New()
cfg = &Options{}
// Test GET request

View File

@@ -2,7 +2,7 @@
video {
display: block;
flex-grow: 1;
max-width: 100%;
width: 100%;
/* Black borders around video */
background-color: #000;

View File

@@ -1,30 +0,0 @@
/**
* ViewerCounter show the number of active viewers
*/
export class ViewerCounter {
/**
* @param {HTMLElement} element
* @param {String} streamName
*/
constructor(element, streamName) {
this.element = element;
this.url = "/_stats/" + streamName;
this.uid = Math.floor(1e19 * Math.random()).toString(16);
}
/**
* Regulary update counter
*
* @param {Number} updatePeriod
*/
regularUpdate(updatePeriod) {
setInterval(() => this.refreshViewersCounter(), updatePeriod);
}
refreshViewersCounter() {
fetch(this.url + "?uid=" + this.uid)
.then(response => response.json())
.then((data) => this.element.innerText = data.ConnectedViewers)
.catch(console.log);
}
}

View File

@@ -1,99 +0,0 @@
/**
* GsWebRTC to connect to Ghostream
*/
export class GsWebRTC {
/**
* @param {list} stunServers STUN servers
* @param {HTMLElement} viewer Video HTML element
* @param {HTMLElement} connectionIndicator Connection indicator element
*/
constructor(stunServers, viewer, connectionIndicator) {
this.viewer = viewer;
this.connectionIndicator = connectionIndicator;
this.pc = new RTCPeerConnection({
iceServers: [{ urls: stunServers }]
});
// We want to receive audio and video
this.pc.addTransceiver("video", { "direction": "sendrecv" });
this.pc.addTransceiver("audio", { "direction": "sendrecv" });
// Configure events
this.pc.oniceconnectionstatechange = () => this._onConnectionStateChange();
this.pc.ontrack = (e) => this._onTrack(e);
}
/**
* On connection change, log it and change indicator.
* If connection closed or failed, try to reconnect.
*/
_onConnectionStateChange() {
console.log("[WebRTC] ICE connection state changed to " + this.pc.iceConnectionState);
switch (this.pc.iceConnectionState) {
case "disconnected":
this.connectionIndicator.style.fill = "#dc3545";
break;
case "checking":
this.connectionIndicator.style.fill = "#ffc107";
break;
case "connected":
this.connectionIndicator.style.fill = "#28a745";
break;
case "closed":
case "failed":
console.log("[WebRTC] Connection closed, restarting...");
/*peerConnection.close();
peerConnection = null;
setTimeout(startPeerConnection, 1000);*/
break;
}
}
/**
* On new track, add it to the player
* @param {Event} event
*/
_onTrack(event) {
console.log(`[WebRTC] New ${event.track.kind} track`);
if (event.track.kind === "video") {
this.viewer.srcObject = event.streams[0];
}
}
/**
* Create an offer and set local description.
* After that the browser will fire onicecandidate events.
*/
createOffer() {
this.pc.createOffer().then(offer => {
this.pc.setLocalDescription(offer);
console.log("[WebRTC] WebRTC offer created");
}).catch(console.log);
}
/**
* Register a function to call to send local descriptions
* @param {Function} sendFunction Called with a local description to send.
*/
onICECandidate(sendFunction) {
// When candidate is null, ICE layer has run out of potential configurations to suggest
// so let's send the offer to the server.
// FIXME: Send offers progressively to do Trickle ICE
this.pc.onicecandidate = event => {
if (event.candidate === null) {
// Send offer to server
console.log("[WebRTC] Sending session description to server");
sendFunction(this.pc.localDescription);
}
};
}
/**
* Set WebRTC remote description
* After that, the connection will be established and ontrack will be fired.
* @param {RTCSessionDescription} sdp Session description data
*/
setRemoteDescription(sdp) {
this.pc.setRemoteDescription(sdp);
}
}

View File

@@ -1,62 +0,0 @@
/**
* GsWebSocket to do Ghostream signalling
*/
export class GsWebSocket {
constructor() {
const protocol = (window.location.protocol === "https:") ? "wss://" : "ws://";
this.url = protocol + window.location.host + "/_ws/";
// Open WebSocket
this._open();
// Configure events
this.socket.addEventListener("open", () => {
console.log("[WebSocket] Connection established");
});
this.socket.addEventListener("close", () => {
console.log("[WebSocket] Connection closed, retrying connection in 1s...");
setTimeout(() => this._open(), 1000);
});
this.socket.addEventListener("error", () => {
console.log("[WebSocket] Connection errored, retrying connection in 1s...");
setTimeout(() => this._open(), 1000);
});
}
_open() {
console.log(`[WebSocket] Connecting to ${this.url}...`);
this.socket = new WebSocket(this.url);
}
/**
* Send local WebRTC session description to remote.
* @param {SessionDescription} localDescription WebRTC local SDP
* @param {string} stream Name of the stream
* @param {string} quality Requested quality
*/
sendLocalDescription(localDescription, stream, quality) {
if (this.socket.readyState !== 1) {
console.log("[WebSocket] Waiting for connection to send data...");
setTimeout(() => this.sendLocalDescription(localDescription, stream, quality), 100);
return;
}
console.log(`[WebSocket] Sending WebRTC local session description for stream ${stream} quality ${quality}`);
this.socket.send(JSON.stringify({
"webRtcSdp": localDescription,
"stream": stream,
"quality": quality
}));
}
/**
* Set callback function on new remote session description.
* @param {Function} callback Function called when data is received
*/
onRemoteDescription(callback) {
this.socket.addEventListener("message", (event) => {
console.log("[WebSocket] Received WebRTC remote session description");
const sdp = new RTCSessionDescription(JSON.parse(event.data));
callback(sdp);
});
}
}

View File

@@ -1,116 +0,0 @@
import { ViewerCounter } from "./modules/viewerCounter.js";
/**
* Initialize viewer page
*
* @param {String} stream
* @param {String} omeApp
* @param {Number} viewersCounterRefreshPeriod
* @param {String} posterUrl
*/
export function initViewerPage(stream, omeApp, viewersCounterRefreshPeriod, posterUrl) {
// Create viewer counter
const viewerCounter = new ViewerCounter(
document.getElementById("connected-people"),
stream,
);
viewerCounter.regularUpdate(viewersCounterRefreshPeriod);
viewerCounter.refreshViewersCounter();
// Side widget toggler
const sideWidgetToggle = document.getElementById("sideWidgetToggle");
const sideWidget = document.getElementById("sideWidget");
if (sideWidgetToggle !== null && sideWidget !== null) {
// On click, toggle side widget visibility
sideWidgetToggle.addEventListener("click", function () {
if (sideWidget.style.display === "none") {
sideWidget.style.display = "block";
sideWidgetToggle.textContent = "»";
} else {
sideWidget.style.display = "none";
sideWidgetToggle.textContent = "«";
}
});
}
// Create player
let player = OvenPlayer.create("viewer", {
title: stream,
image: posterUrl,
autoStart: true,
mute: true,
expandFullScreenUI: true,
sources: [
{
"file": "wss://" + window.location.host + "/" + omeApp + "/" + stream,
"type": "webrtc",
"label": " WebRTC - Source"
},
{
"file": "https://" + window.location.host + "/" + omeApp + "/" + stream + "_bypass/playlist.m3u8",
"type": "hls",
"label": " HLS"
},
{
"file": "https://" + window.location.host + "/" + omeApp + "/" + stream + "_bypass/manifest.mpd",
"type": "dash",
"label": "DASH"
},
{
"file": "https://" + window.location.host + "/" + omeApp + "/" + stream + "_bypass/manifest_ll.mpd",
"type": "dash",
"label": "LL-DASH"
},
]
});
player.on("stateChanged", function (data) {
if (data.newstate === "loading") {
document.getElementById("connectionIndicator").style.fill = '#ffc107'
}
if (data.newstate === "playing") {
document.getElementById("connectionIndicator").style.fill = '#28a745'
}
if (data.newstate === "idle") {
document.getElementById("connectionIndicator").style.fill = '#dc3545'
}
})
player.on("error", function (error) {
document.getElementById("connectionIndicator").style.fill = '#dc3545'
if (error.code === 501 || error.code === 406) {
// Clear messages
const errorMsg = document.getElementsByClassName("op-message-text")[0]
errorMsg.textContent = ""
const warningIcon = document.getElementsByClassName("op-message-icon")[0]
warningIcon.textContent = ""
// Reload in 30s
setTimeout(function () {
player.load()
}, 30000)
} else {
console.log(error);
}
});
// Register keyboard events
window.addEventListener("keydown", (event) => {
switch (event.key) {
case "f":
// F key put player in fullscreen
if (document.fullscreenElement !== null) {
document.exitFullscreen()
} else {
document.getElementsByTagName("video")[0].requestFullscreen()
}
break;
case "m":
case " ":
// M and space key mute player
player.setMute(!player.getMute())
event.preventDefault()
player.play()
break;
}
});
}

View File

@@ -0,0 +1,12 @@
// Side widget toggler
const sideWidgetToggle = document.getElementById("sideWidgetToggle")
sideWidgetToggle.addEventListener("click", function () {
const sideWidget = document.getElementById("sideWidget")
if (sideWidget.style.display === "none") {
sideWidget.style.display = "block"
sideWidgetToggle.textContent = "»"
} else {
sideWidget.style.display = "none"
sideWidgetToggle.textContent = "«"
}
})

View File

@@ -0,0 +1,9 @@
document.getElementById("quality").addEventListener("change", (event) => {
console.log(`Stream quality changed to ${event.target.value}`)
// Restart the connection with a new quality
peerConnection.close()
peerConnection = null
streamPath = window.location.href + event.target.value
startPeerConnection()
})

View File

@@ -1,87 +1,97 @@
import { GsWebSocket } from "./modules/websocket.js";
import { ViewerCounter } from "./modules/viewerCounter.js";
import { GsWebRTC } from "./modules/webrtc.js";
let peerConnection
let streamPath = window.location.href
/**
* Initialize viewer page
*
* @param {String} stream
* @param {List} stunServers
* @param {Number} viewersCounterRefreshPeriod
*/
export function initViewerPage(stream, stunServers, viewersCounterRefreshPeriod) {
// Viewer element
const viewer = document.getElementById("viewer");
startPeerConnection = () => {
// Init peer connection
peerConnection = new RTCPeerConnection({
iceServers: [{ urls: stunServers }]
})
// Default quality
let quality = "source";
// Create WebSocket and WebRTC
const websocket = new GsWebSocket();
const webrtc = new GsWebRTC(
stunServers,
viewer,
document.getElementById("connectionIndicator"),
);
webrtc.createOffer();
webrtc.onICECandidate(localDescription => {
websocket.sendLocalDescription(localDescription, stream, quality);
});
websocket.onRemoteDescription(sdp => {
webrtc.setRemoteDescription(sdp);
});
// Register keyboard events
window.addEventListener("keydown", (event) => {
switch (event.key) {
case "f":
// F key put player in fullscreen
if (document.fullscreenElement !== null) {
document.exitFullscreen();
} else {
viewer.requestFullscreen();
}
break;
case "m":
case " ":
// M and space key mute player
viewer.muted = !viewer.muted;
event.preventDefault();
viewer.play();
break;
// On connection change, change indicator color
// if connection failed, restart peer connection
peerConnection.oniceconnectionstatechange = e => {
console.log("ICE connection state changed, " + peerConnection.iceConnectionState)
switch (peerConnection.iceConnectionState) {
case "disconnected":
document.getElementById("connectionIndicator").style.fill = "#dc3545"
break
case "checking":
document.getElementById("connectionIndicator").style.fill = "#ffc107"
break
case "connected":
document.getElementById("connectionIndicator").style.fill = "#28a745"
break
case "closed":
case "failed":
console.log("Connection failed, restarting...")
peerConnection.close()
peerConnection = null
setTimeout(startPeerConnection, 1000)
break
}
});
// Create viewer counter
const viewerCounter = new ViewerCounter(
document.getElementById("connected-people"),
stream,
);
viewerCounter.regularUpdate(viewersCounterRefreshPeriod);
viewerCounter.refreshViewersCounter();
// Side widget toggler
const sideWidgetToggle = document.getElementById("sideWidgetToggle");
const sideWidget = document.getElementById("sideWidget");
if (sideWidgetToggle !== null && sideWidget !== null) {
// On click, toggle side widget visibility
sideWidgetToggle.addEventListener("click", function () {
if (sideWidget.style.display === "none") {
sideWidget.style.display = "block";
sideWidgetToggle.textContent = "»";
} else {
sideWidget.style.display = "none";
sideWidgetToggle.textContent = "«";
}
});
}
// Video quality toggler
document.getElementById("quality").addEventListener("change", (event) => {
quality = event.target.value;
console.log(`Stream quality changed to ${quality}`);
// We want to receive audio and video
peerConnection.addTransceiver('video', { 'direction': 'sendrecv' })
peerConnection.addTransceiver('audio', { 'direction': 'sendrecv' })
// Restart WebRTC negociation
webrtc.createOffer();
});
// Create offer and set local description
peerConnection.createOffer().then(offer => {
// After setLocalDescription, the browser will fire onicecandidate events
peerConnection.setLocalDescription(offer)
}).catch(console.log)
// When candidate is null, ICE layer has run out of potential configurations to suggest
// so let's send the offer to the server
peerConnection.onicecandidate = event => {
if (event.candidate === null) {
// Send offer to server
// The server know the stream name from the url
// The server replies with its description
// After setRemoteDescription, the browser will fire ontrack events
console.log("Sending session description to server")
fetch(streamPath, {
method: 'POST',
headers: {
'Accept': 'application/json',
'Content-Type': 'application/json'
},
body: JSON.stringify(peerConnection.localDescription)
})
.then(response => response.json())
.then((data) => peerConnection.setRemoteDescription(new RTCSessionDescription(data)))
.catch(console.log)
}
}
// When video track is received, configure player
peerConnection.ontrack = function (event) {
console.log(`New ${event.track.kind} track`)
if (event.track.kind === "video") {
const viewer = document.getElementById('viewer')
viewer.srcObject = event.streams[0]
}
}
}
// Register keyboard events
let viewer = document.getElementById("viewer")
window.addEventListener("keydown", (event) => {
switch (event.key) {
case 'f':
// F key put player in fullscreen
if (document.fullscreenElement !== null) {
document.exitFullscreen()
} else {
viewer.requestFullscreen()
}
break
case 'm':
case ' ':
// M and space key mute player
viewer.muted = !viewer.muted
event.preventDefault()
viewer.play()
break
}
})

View File

@@ -0,0 +1,12 @@
// Refresh viewer count by pulling metric from server
function refreshViewersCounter(streamID, period) {
// Distinguish oneDomainPerStream mode
fetch("/_stats/" + streamID)
.then(response => response.json())
.then((data) => document.getElementById("connected-people").innerText = data.ConnectedViewers)
.catch(console.log)
setTimeout(() => {
refreshViewersCounter(streamID, period)
}, period)
}

File diff suppressed because one or more lines are too long

View File

@@ -1 +0,0 @@
/*! OvenPlayerv0.9.0 | (c)2020 AirenSoft Co., Ltd. | MIT license (https://github.com/AirenSoft/OvenPlayerPrivate/blob/master/LICENSE) | Github : https://github.com/AirenSoft/OvenPlayer */

File diff suppressed because one or more lines are too long

View File

@@ -1 +0,0 @@
/*! OvenPlayerv0.9.0 | (c)2020 AirenSoft Co., Ltd. | MIT license (https://github.com/AirenSoft/OvenPlayerPrivate/blob/master/LICENSE) | Github : https://github.com/AirenSoft/OvenPlayer */

View File

@@ -1 +0,0 @@
/*! OvenPlayerv0.9.0 | (c)2020 AirenSoft Co., Ltd. | MIT license (https://github.com/AirenSoft/OvenPlayerPrivate/blob/master/LICENSE) | Github : https://github.com/AirenSoft/OvenPlayer */

File diff suppressed because one or more lines are too long

View File

@@ -1 +0,0 @@
/*! OvenPlayerv0.9.0 | (c)2020 AirenSoft Co., Ltd. | MIT license (https://github.com/AirenSoft/OvenPlayerPrivate/blob/master/LICENSE) | Github : https://github.com/AirenSoft/OvenPlayer */

File diff suppressed because one or more lines are too long

View File

@@ -1 +0,0 @@
/*! OvenPlayerv0.9.0 | (c)2020 AirenSoft Co., Ltd. | MIT license (https://github.com/AirenSoft/OvenPlayerPrivate/blob/master/LICENSE) | Github : https://github.com/AirenSoft/OvenPlayer */

File diff suppressed because one or more lines are too long

View File

@@ -1 +0,0 @@
/*! OvenPlayerv0.9.0 | (c)2020 AirenSoft Co., Ltd. | MIT license (https://github.com/AirenSoft/OvenPlayerPrivate/blob/master/LICENSE) | Github : https://github.com/AirenSoft/OvenPlayer */

File diff suppressed because one or more lines are too long

View File

@@ -1 +0,0 @@
/*! OvenPlayerv0.9.0 | (c)2020 AirenSoft Co., Ltd. | MIT license (https://github.com/AirenSoft/OvenPlayerPrivate/blob/master/LICENSE) | Github : https://github.com/AirenSoft/OvenPlayer */

View File

@@ -4,7 +4,6 @@
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
<title>{{if .Path}}{{.Path}} - {{end}}{{.Cfg.Name}}</title>
<link rel="stylesheet" href="static/css/style.css">
<link rel="stylesheet" href="static/css/player.css">

View File

@@ -9,11 +9,7 @@
</p>
<h2>Comment je diffuse ?</h2>
<p>
Pour diffuser un contenu vous devez avoir des identifiants valides.
Si le service est hébergé par une association, il est probable que
vous deviez être membre de cette association.
</p>
<p>Pour diffuser un contenu vous devez être adhérent Crans.</p>
<h3>Avec Open Broadcaster Software</h3>
<p>
@@ -25,7 +21,11 @@
<ul>
<li>
<b>Serveur :</b>
<code>srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}?IDENTIFIANT:MOT_DE_PASSE</code>,
<code>srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}</code>,
</li>
<li>
<b>Clé de stream :</b>
<code>IDENTIFIANT:MOT_DE_PASSE</code>
avec <code>IDENTIFIANT</code> et <code>MOT_DE_PASSE</code>
vos identifiants.
</li>
@@ -46,104 +46,29 @@
<p>
<code>
{{/* FIXME replace with good SRT params */}}
ffmpeg -re -i mavideo.webm -vcodec libx264
-preset:v veryfast -vprofile baseline -tune zerolatency
ffmpeg -re -i mavideo.webm -vcodec libx264 -vprofile baseline
-acodec aac -strict -2 -f flv
srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}?streamid=IDENTIFIANT:MOT_DE_PASSE
</code>
</p>
<h2>Comment lire un flux depuis un lecteur externe ?</h2>
<p>
À l'heure actuelle, la plupart des lecteurs vidéos ne supportent
pas le protocole SRT, ou le supportent mal. Un travail est en
cours pour les rendre un maximum compatibles. Liste non exhaustive
des lecteurs vidéos testés :
</p>
<h3>FFPlay</h3>
<p>
Si FFMpeg est installé sur votre machine, il est accompagné d'un
lecteur vidéo nommé <code>ffplay</code>. Si FFMpeg est compilé
avec le support de SRT (c'est le cas sur la plupart des distributions,
sauf cas ci-dessous), il vous suffira d'exécuter :
</p>
<p>
<code>
ffplay -fflags nobuffer srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}?streamid=IDENTIFIANT
</code>
</p>
<h3>MPV</h3>
<p>
MPV supporte officiellement SRT depuis le 16 octobre 2020.
Néanmoins, la version stable de MPV est beaucoup plus vieille.
Vous devez alors utiliser une version de développement pour
pouvoir lire un flux SRT depuis MPV. L'installation se fait
depuis <a href="https://mpv.io/installation/"> cette page</a>.
Sous Arch Linux, il vous suffit de récupérer le paquet
<code>mpv-git</code> dans l'AUR. Pour lire le flux, il suffit
d'exécuter :
</p>
<p>
<code>
mpv srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}?streamid=IDENTIFIANT
</code>
</p>
<h3>VLC Media Player</h3>
<p>
Bien que VLC supporte officiellement le protocole SRT,
toutes les options ne sont pas encore implémentées,
notamment l'option pour choisir son stream.
Cette option n'est supportée que dans la version de développement
depuis très récemment, grâce à un patch de l'un des développeurs
de Ghostream. Sous Arch Linux, il suffit de récupérer
le paquet <code>vlc-git</code> de l'AUR. Avec un VLC à jour,
il suffit d'exécuter :
</p>
<p>
<code>
vlc srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}?streamid=IDENTIFIANT
</code>
</p>
<p>
Ou bien d'aller dans Média -> Ouvrir un flux réseau et d'entrer l'URL
<code>srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}?streamid=IDENTIFIANT</code>.
</p>
<h3>Le protocole n'existe pas ou n'est pas supporté.</h3>
<p>
La technologie SRT est très récente et n'est pas supportée par
les dépôts stables. Ainsi, si vous avez Ubuntu &le; 20.04 ou
Debian &le; Buster, vous ne pourrez pas utiliser de lecteur vidéo
ni même diffuser avec votre machine. Vous devrez vous mettre à
jour vers Ubuntu 20.10 ou Debian Bullseye.
</p>
<h2>Mentions légales</h2>
<p>
Le service de diffusion vidéo du Crans est un service d'hébergement
au sens de l'article 6, I, 2e de la loi 2004-575 du 21 juin 2004.
Conformément aux dispositions de l'article 6, II du même,
conserve les données de nature à permettre
l'association Crans conserve les données de nature à permettre
l'identification des auteurs du contenu diffusé.
Ce service est hébergé par {{.Cfg.LegalMentionsEntity}}, au
{{.Cfg.LegalMentionsAddress}}.
Ce service est hébergé par l'association Crans, au
61 Avenue du Président Wilson, 94235 Cachan Cedex, France.
</p>
<p>
<b>En cas de réclamation sur le contenu diffusé</b>,
la loi vous autorise à contacter directement l'hébergeur à
l'adresse suivante :
<pre>{{range $i, $element := .Cfg.LegalMentionsFullAddress}}{{$element}}<br/>{{end}}</pre>
<pre>Association Cr@ns - ENS Paris-Saclay<br/>Notification de Contenus Illicites<br/>4, avenue des Sciences<br/>91190 Gif-sur-Yvette<br/>France</pre>
Vous pouvez également envoyer directement vos réclamations par
courrier électronique à l'adresse <code>{{.Cfg.LegalMentionsEmail}}</code>.
courrier électronique à l'adresse <code>bureau[at]crans.org</code>.
</p>
</div>
{{end}}

View File

@@ -6,14 +6,14 @@
<!-- Links and settings under video -->
<div class="controls">
<!-- <span class="control-quality">
<span class="control-quality">
<select id="quality">
<option value="source">Source</option>
<option value="720p">720p</option>
<option value="480p">480p</option>
<option value="240p">240p</option>
<option value="">Source</option>
<option value="@720p">720p</option>
<option value="@480p">480p</option>
<option value="@240p">240p</option>
</select>
</span> -->
</span>
<code class="control-srt-link">srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}?streamid={{.Path}}</code>
<span class="control-viewers" id="connected-people">0</span>
<svg class="control-indicator" id="connectionIndicator" fill="#dc3545" width="16" height="16" viewBox="0 0 16 16" xmlns="http://www.w3.org/2000/svg">
@@ -34,31 +34,21 @@
{{end}}
</div>
{{if .OMECfg.Enabled}}
<script src="https://cdn.jsdelivr.net/npm/hls.js@latest"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/dashjs/2.9.3/dash.all.min.js"></script>
<script src="/static/ovenplayer/ovenplayer.js"></script>
<script src="/static/js/ovenplayer.js"></script>
{{end}}
<script type="module">
{{if .OMECfg.Enabled}}
import { initViewerPage } from "/static/js/ovenplayer.js";
{{else}}
import { initViewerPage } from "/static/js/viewer.js";
{{end}}
// Some variables that need to be fixed by web page
const viewersCounterRefreshPeriod = Number("{{.Cfg.ViewersCounterRefreshPeriod}}");
const stream = "{{.Path}}";
{{if .WidgetURL}}<script src="/static/js/sideWidget.js"></script>{{end}}
<script src="/static/js/videoQuality.js"></script>
<script src="/static/js/viewer.js"></script>
<script src="/static/js/viewersCounter.js"></script>
<script>
const stunServers = [
{{range $id, $value := .Cfg.STUNServers}}
"{{$value}}",
'{{$value}}',
{{end}}
]
{{if .OMECfg.Enabled}}
initViewerPage(stream, {{.OMECfg.App}}, viewersCounterRefreshPeriod, {{.Cfg.PlayerPoster}})
{{else}}
initViewerPage(stream, stunServers, viewersCounterRefreshPeriod)
{{end}}
startPeerConnection()
// Wait a bit before pulling viewers counter for the first time
setTimeout(() => {
refreshViewersCounter("{{.Path}}", {{.Cfg.ViewersCounterRefreshPeriod}})
}, 1000)
</script>
{{end}}
{{end}}

View File

@@ -2,7 +2,6 @@
package web
import (
"gitlab.crans.org/nounous/ghostream/stream/ovenmediaengine"
"html/template"
"io/ioutil"
"log"
@@ -11,7 +10,8 @@ import (
"strings"
"github.com/markbates/pkger"
"gitlab.crans.org/nounous/ghostream/messaging"
"github.com/pion/webrtc/v3"
"gitlab.crans.org/nounous/ghostream/stream"
)
// Options holds web package configuration
@@ -28,22 +28,23 @@ type Options struct {
STUNServers []string
ViewersCounterRefreshPeriod int
WidgetURL string
LegalMentionsEntity string
LegalMentionsAddress string
LegalMentionsFullAddress []string
LegalMentionsEmail string
}
var (
cfg *Options
omeCfg *ovenmediaengine.Options
// WebRTC session description channels
remoteSdpChan chan struct {
StreamID string
RemoteDescription webrtc.SessionDescription
}
localSdpChan chan webrtc.SessionDescription
// Preload templates
templates *template.Template
// Streams to get statistics
streams *messaging.Streams
streams map[string]*stream.Stream
)
// Load templates with pkger
@@ -77,10 +78,14 @@ func loadTemplates() error {
}
// Serve HTTP server
func Serve(s *messaging.Streams, c *Options, ome *ovenmediaengine.Options) {
func Serve(s map[string]*stream.Stream, rSdpChan chan struct {
StreamID string
RemoteDescription webrtc.SessionDescription
}, lSdpChan chan webrtc.SessionDescription, c *Options) {
streams = s
remoteSdpChan = rSdpChan
localSdpChan = lSdpChan
cfg = c
omeCfg = ome
if !cfg.Enabled {
// Web server is not enabled, ignore
@@ -96,7 +101,6 @@ func Serve(s *messaging.Streams, c *Options, ome *ovenmediaengine.Options) {
mux := http.NewServeMux()
mux.HandleFunc("/", viewerHandler)
mux.Handle("/static/", staticHandler())
mux.HandleFunc("/_ws/", websocketHandler)
mux.HandleFunc("/_stats/", statisticsHandler)
log.Printf("HTTP server listening on %s", cfg.ListenAddress)
log.Fatal(http.ListenAndServe(cfg.ListenAddress, mux))

View File

@@ -5,16 +5,16 @@ import (
"testing"
"time"
"gitlab.crans.org/nounous/ghostream/messaging"
"gitlab.crans.org/nounous/ghostream/stream"
)
// TestHTTPServe tries to serve a real HTTP server and load some pages
func TestHTTPServe(t *testing.T) {
// Init streams messaging
streams := messaging.New()
streams := make(map[string]*stream.Stream)
// Create a disabled web server
go Serve(streams, &Options{Enabled: false, ListenAddress: "127.0.0.1:8081"})
go Serve(streams, nil, nil, &Options{Enabled: false, ListenAddress: "127.0.0.1:8081"})
// Sleep 500ms to ensure that the web server is running, to avoid fails because the request came too early
time.Sleep(500 * time.Millisecond)
@@ -26,7 +26,7 @@ func TestHTTPServe(t *testing.T) {
}
// Now let's really start the web server
go Serve(streams, &Options{Enabled: true, ListenAddress: "127.0.0.1:8081"})
go Serve(streams, nil, nil, &Options{Enabled: true, ListenAddress: "127.0.0.1:8081"})
// Sleep 500ms to ensure that the web server is running, to avoid fails because the request came too early
time.Sleep(500 * time.Millisecond)

View File

@@ -1,68 +0,0 @@
// Package web serves the JavaScript player and WebRTC negotiation
package web
import (
"log"
"net/http"
"github.com/gorilla/websocket"
"gitlab.crans.org/nounous/ghostream/stream/webrtc"
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
// clientDescription is sent by new client
type clientDescription struct {
WebRtcSdp webrtc.SessionDescription
Stream string
Quality string
}
// websocketHandler exchanges WebRTC SDP and viewer count
func websocketHandler(w http.ResponseWriter, r *http.Request) {
// Upgrade client connection to WebSocket
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("Failed to upgrade client to websocket: %s", err)
return
}
for {
// Get client description
c := &clientDescription{}
err = conn.ReadJSON(c)
if err != nil {
log.Printf("Failed to receive client description: %s", err)
_ = conn.Close()
return
}
// Get requested stream
stream, err := streams.Get(c.Stream)
if err != nil {
log.Printf("Stream not found: %s", c.Stream)
continue
}
// Get requested quality
q, err := stream.GetQuality(c.Quality)
if err != nil {
log.Printf("Quality not found: %s", c.Quality)
continue
}
// Exchange session descriptions with WebRTC stream server
// FIXME: Add trickle ICE support
q.WebRtcRemoteSdp <- c.WebRtcSdp
localDescription := <-q.WebRtcLocalSdp
// Send new local description
if err := conn.WriteJSON(localDescription); err != nil {
log.Println(err)
continue
}
}
}