1
0
mirror of https://gitlab.crans.org/nounous/ghostream.git synced 2025-10-25 11:43:04 +02:00

3 Commits

Author SHA1 Message Date
Yohann D'ANELLO
fa281e6b31 Try to mux into RTP with FFMPEG bindings 2020-10-19 10:18:45 +02:00
Yohann D'ANELLO
698b83fe6f WebRTC is registering to the audio-transcoded stream 2020-10-18 22:07:11 +02:00
Yohann D'ANELLO
20776d897c Create audio transcoder 2020-10-18 21:46:36 +02:00
46 changed files with 945 additions and 1138 deletions

6
.gitignore vendored
View File

@@ -17,9 +17,3 @@ pkged.go
# Profiler and test files # Profiler and test files
*.prof *.prof
*.test *.test
# Javascript tools
.eslintrc.js
node_modules
package.json
package-lock.json

View File

@@ -2,18 +2,8 @@ stages:
- test - test
- quality-assurance - quality-assurance
.go-cache:
variables:
GOPATH: $CI_PROJECT_DIR/.go
before_script:
- mkdir -p .go
cache:
paths:
- .go/pkg/mod/
unit_tests: unit_tests:
image: golang:1.15-alpine image: golang:1.15-alpine
extends: .go-cache
stage: test stage: test
before_script: before_script:
- apk add --no-cache -X http://dl-cdn.alpinelinux.org/alpine/edge/community build-base ffmpeg gcc libsrt-dev - apk add --no-cache -X http://dl-cdn.alpinelinux.org/alpine/edge/community build-base ffmpeg gcc libsrt-dev
@@ -28,7 +18,6 @@ unit_tests:
linters: linters:
image: golang:1.15-alpine image: golang:1.15-alpine
extends: .go-cache
stage: quality-assurance stage: quality-assurance
script: script:
- go get -u golang.org/x/lint/golint - go get -u golang.org/x/lint/golint

View File

@@ -12,6 +12,6 @@ FROM alpine:3.12
RUN apk add --no-cache -X https://dl-cdn.alpinelinux.org/alpine/edge/community/ ffmpeg libsrt RUN apk add --no-cache -X https://dl-cdn.alpinelinux.org/alpine/edge/community/ ffmpeg libsrt
COPY --from=build_base /code/out/ghostream /app/ghostream COPY --from=build_base /code/out/ghostream /app/ghostream
WORKDIR /app WORKDIR /app
# 2112 for monitoring, 8023 for Telnet, 8080 for Web, 9710 for SRT, 10000-11000 (UDP) for WebRTC # 2112 for monitoring, 8023 for Telnet, 8080 for Web, 9710 for SRT, 10000-10005 (UDP) for WebRTC
EXPOSE 2112 8023 8080 9710/udp 10000-11000/udp EXPOSE 2112 8023 8080 9710/udp 10000-10005/udp
CMD ["/app/ghostream"] CMD ["/app/ghostream"]

View File

@@ -6,7 +6,7 @@ import (
func TestBasicLogin(t *testing.T) { func TestBasicLogin(t *testing.T) {
basicCredentials := make(map[string]string) basicCredentials := make(map[string]string)
basicCredentials["demo"] = "$2b$10$xuU7XFwmRX2CMgdSaA8rM.4Y8.BtRNzhUedwN0G8tCegDRNUERTCS" basicCredentials["demo"] = "$2b$15$LRnG3eIHFlYIguTxZOLH7eHwbQC/vqjnLq6nDFiHSUDKIU.f5/1H6"
// Test good credentials // Test good credentials
backend, _ := New(&Options{Credentials: basicCredentials}) backend, _ := New(&Options{Credentials: basicCredentials})

View File

@@ -30,7 +30,7 @@ services:
restart: always restart: always
ports: ports:
- 9710:9710/udp - 9710:9710/udp
- 10000-11000:10000-11000/udp - 10000-10005:10000-10005/udp
volumes: volumes:
- ./ghostream_data:/etc/ghostream:ro - ./ghostream_data:/etc/ghostream:ro
labels: labels:

View File

@@ -22,12 +22,12 @@ auth:
# Basic backend configuration # Basic backend configuration
# To generate bcrypt hashed password from Python, use: # To generate bcrypt hashed password from Python, use:
# python3 -c 'import bcrypt; print(bcrypt.hashpw(b"PASSWORD", bcrypt.gensalt(rounds=12)).decode("ascii"))' # python3 -c 'import bcrypt; print(bcrypt.hashpw(b"PASSWORD", bcrypt.gensalt(rounds=15)).decode("ascii"))'
# #
#basic: #basic:
# credentials: # credentials:
# # Demo user with password "demo" # # Demo user with password "demo"
# demo: $2b$10$xuU7XFwmRX2CMgdSaA8rM.4Y8.BtRNzhUedwN0G8tCegDRNUERTCS # demo: $2b$15$LRnG3eIHFlYIguTxZOLH7eHwbQC/vqjnLq6nDFiHSUDKIU.f5/1H6
# LDAP backend configuration # LDAP backend configuration
# #
@@ -38,17 +38,13 @@ auth:
## Stream forwarding ## ## Stream forwarding ##
# Forward an incoming stream to other servers # Forward an incoming stream to other servers
# The URL can be anything FFMpeg can accept as an stream output # The URL can be anything FFMpeg can accept as an stream output
# If a file is specified, the name may contains %Y, %m, %d, %H, %M or %S
# that will be replaced by the current date information.
forwarding: forwarding:
# By default nothing is forwarded. # By default nothing is forwarded.
# #
# This example forwards a stream named "demo" to Twitch and YouTube, # This example forwards a stream named "demo" to Twitch and YouTube,
# and save the record in a timestamped-file,
#demo: #demo:
# - rtmp://live-cdg.twitch.tv/app/STREAM_KEY # - rtmp://live-cdg.twitch.tv/app/STREAM_KEY
# - rtmp://a.rtmp.youtube.com/live2/STREAM_KEY # - rtmp://a.rtmp.youtube.com/live2/STREAM_KEY
# - /home/ghostream/lives/%name/live-%Y-%m-%d-%H-%M-%S.flv
## Prometheus monitoring ## ## Prometheus monitoring ##
# Expose a monitoring endpoint for Prometheus # Expose a monitoring endpoint for Prometheus
@@ -170,7 +166,7 @@ webrtc:
# This range must be opened in your firewall. # This range must be opened in your firewall.
# #
#minPortUDP: 10000 #minPortUDP: 10000
#maxPortUDP: 11000 #maxPortUDP: 10005
# STUN servers, you should host your own Coturn instance # STUN servers, you should host your own Coturn instance
# #

11
go.mod
View File

@@ -3,14 +3,13 @@ module gitlab.crans.org/nounous/ghostream
go 1.13 go 1.13
require ( require (
github.com/3d0c/gmf v0.0.0-20200614092945-e58d8d5a6035
github.com/go-ldap/ldap/v3 v3.2.3 github.com/go-ldap/ldap/v3 v3.2.3
github.com/gorilla/websocket v1.4.0 github.com/haivision/srtgo v0.0.0-20200731151239-e00427ae473a
github.com/haivision/srtgo v0.0.0-20201025191851-67964e8f497a
github.com/markbates/pkger v0.17.1 github.com/markbates/pkger v0.17.1
github.com/pion/rtp v1.6.1 github.com/pion/rtp v1.6.0
github.com/pion/webrtc/v3 v3.0.0-beta.10 github.com/pion/webrtc/v3 v3.0.0-beta.5
github.com/pkg/profile v1.5.0
github.com/prometheus/client_golang v1.7.1 github.com/prometheus/client_golang v1.7.1
github.com/sherifabdlnaby/configuro v0.0.2 github.com/sherifabdlnaby/configuro v0.0.2
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
) )

50
go.sum
View File

@@ -7,6 +7,8 @@ dmitri.shuralyov.com/html/belt v0.0.0-20180602232347-f7d459c86be0/go.mod h1:JLBr
dmitri.shuralyov.com/service/change v0.0.0-20181023043359-a85b471d5412/go.mod h1:a1inKt/atXimZ4Mv927x+r7UpyzRUf4emIoiiSC2TN4= dmitri.shuralyov.com/service/change v0.0.0-20181023043359-a85b471d5412/go.mod h1:a1inKt/atXimZ4Mv927x+r7UpyzRUf4emIoiiSC2TN4=
dmitri.shuralyov.com/state v0.0.0-20180228185332-28bcc343414c/go.mod h1:0PRwlb0D6DFvNNtx+9ybjezNCa8XF0xaYcETyp6rHWU= dmitri.shuralyov.com/state v0.0.0-20180228185332-28bcc343414c/go.mod h1:0PRwlb0D6DFvNNtx+9ybjezNCa8XF0xaYcETyp6rHWU=
git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg= git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg=
github.com/3d0c/gmf v0.0.0-20200614092945-e58d8d5a6035 h1:QZb1aMKxiYdGGieyIDmXuw9I9YcGWGViTrpQ6vcZX7Q=
github.com/3d0c/gmf v0.0.0-20200614092945-e58d8d5a6035/go.mod h1:0QMRcUq2JsDECeAq7bj4h79k7XbhtTsrPUQf6G7qfPs=
github.com/Azure/go-ntlmssp v0.0.0-20200615164410-66371956d46c h1:/IBSNwUN8+eKzUzbJPqhK839ygXJ82sde8x3ogr6R28= github.com/Azure/go-ntlmssp v0.0.0-20200615164410-66371956d46c h1:/IBSNwUN8+eKzUzbJPqhK839ygXJ82sde8x3ogr6R28=
github.com/Azure/go-ntlmssp v0.0.0-20200615164410-66371956d46c/go.mod h1:chxPXzSsl7ZWRAuOIE23GDNzjWuZquvFlgA8xmpunjU= github.com/Azure/go-ntlmssp v0.0.0-20200615164410-66371956d46c/go.mod h1:chxPXzSsl7ZWRAuOIE23GDNzjWuZquvFlgA8xmpunjU=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
@@ -113,15 +115,14 @@ github.com/googleapis/gax-go v2.0.0+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk
github.com/googleapis/gax-go/v2 v2.0.3/go.mod h1:LLvjysVCY1JZeum8Z6l8qUty8fiNwE08qbEPm1M08qg= github.com/googleapis/gax-go/v2 v2.0.3/go.mod h1:LLvjysVCY1JZeum8Z6l8qUty8fiNwE08qbEPm1M08qg=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw= github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw=
github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/haivision/srtgo v0.0.0-20201025191851-67964e8f497a h1:54abJQezjMoiP+xMQ3ZQbcDXFjqytAYm/n0EVqrYeXg= github.com/haivision/srtgo v0.0.0-20200731151239-e00427ae473a h1:JliMkv/mAqM5+QzG6Hkw1XcVl1crU8yIQGnhppMv7s0=
github.com/haivision/srtgo v0.0.0-20201025191851-67964e8f497a/go.mod h1:7izzTiCO3zc9ZIVTFMjxUiYL+kgryFP9rl3bsweqdmc= github.com/haivision/srtgo v0.0.0-20200731151239-e00427ae473a/go.mod h1:yVZ4oACfcnUAcxrh+0b6IuIWfkHLK3IAQ99tuuhRx54=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
@@ -192,30 +193,29 @@ github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pion/datachannel v1.4.21 h1:3ZvhNyfmxsAqltQrApLPQMhSFNA+aT87RqyCq4OXmf0= github.com/pion/datachannel v1.4.21 h1:3ZvhNyfmxsAqltQrApLPQMhSFNA+aT87RqyCq4OXmf0=
github.com/pion/datachannel v1.4.21/go.mod h1:oiNyP4gHx2DIwRzX/MFyH0Rz/Gz05OgBlayAI2hAWjg= github.com/pion/datachannel v1.4.21/go.mod h1:oiNyP4gHx2DIwRzX/MFyH0Rz/Gz05OgBlayAI2hAWjg=
github.com/pion/dtls/v2 v2.0.3 h1:3qQ0s4+TXD00rsllL8g8KQcxAs+Y/Z6oz618RXX6p14= github.com/pion/dtls/v2 v2.0.2 h1:FHCHTiM182Y8e15aFTiORroiATUI16ryHiQh8AIOJ1E=
github.com/pion/dtls/v2 v2.0.3/go.mod h1:TUjyL8bf8LH95h81Xj7kATmzMRt29F/4lxpIPj2Xe4Y= github.com/pion/dtls/v2 v2.0.2/go.mod h1:27PEO3MDdaCfo21heT59/vsdmZc0zMt9wQPcSlLu/1I=
github.com/pion/ice/v2 v2.0.9 h1:oHbiN6Q9tgb8Gfu3I4cbr5mHRE1uqiuFABQ8CbWjIyk= github.com/pion/ice/v2 v2.0.6 h1:7Jf3AX6VIjgO2tGRyT0RGGxkDYOF4m5I5DQzf34IN1Y=
github.com/pion/ice/v2 v2.0.9/go.mod h1:NK+o39ynb+N1YSj9fPgWs3vjVcrsWw0KCr/311MqVq8= github.com/pion/ice/v2 v2.0.6/go.mod h1:xOXvVRlQC/B7FPJeJYKY6IepFRAKb3t1un1K9boYaaQ=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
github.com/pion/mdns v0.0.4 h1:O4vvVqr4DGX63vzmO6Fw9vpy3lfztVWHGCQfyw0ZLSY= github.com/pion/mdns v0.0.4 h1:O4vvVqr4DGX63vzmO6Fw9vpy3lfztVWHGCQfyw0ZLSY=
github.com/pion/mdns v0.0.4/go.mod h1:R1sL0p50l42S5lJs91oNdUL58nm0QHrhxnSegr++qC0= github.com/pion/mdns v0.0.4/go.mod h1:R1sL0p50l42S5lJs91oNdUL58nm0QHrhxnSegr++qC0=
github.com/pion/quic v0.1.4 h1:bNz9sCJjlM3GqMdq7Fne57FiWfdyiJ++yHVbuqeoD3Y= github.com/pion/quic v0.1.4 h1:bNz9sCJjlM3GqMdq7Fne57FiWfdyiJ++yHVbuqeoD3Y=
github.com/pion/quic v0.1.4/go.mod h1:dBhNvkLoQqRwfi6h3Vqj3IcPLgiW7rkZxBbRdp7Vzvk= github.com/pion/quic v0.1.4/go.mod h1:dBhNvkLoQqRwfi6h3Vqj3IcPLgiW7rkZxBbRdp7Vzvk=
github.com/pion/randutil v0.0.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/rtcp v1.2.4 h1:NT3H5LkUGgaEapvp0HGik+a+CpflRF7KTD7H+o7OWIM= github.com/pion/rtcp v1.2.3 h1:2wrhKnqgSz91Q5nzYTO07mQXztYPtxL8a0XOss4rJqA=
github.com/pion/rtcp v1.2.4/go.mod h1:52rMNPWFsjr39z9B9MhnkqhPLoeHTv1aN63o/42bWE0= github.com/pion/rtcp v1.2.3/go.mod h1:zGhIv0RPRF0Z1Wiij22pUt5W/c9fevqSzT4jje/oK7I=
github.com/pion/rtp v1.6.1 h1:2Y2elcVBrahYnHKN2X7rMHX/r1R4TEBMP1LaVu/wNhk= github.com/pion/rtp v1.6.0 h1:4Ssnl/T5W2LzxHj9ssYpGVEQh3YYhQFNVmSWO88MMwk=
github.com/pion/rtp v1.6.1/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= github.com/pion/rtp v1.6.0/go.mod h1:QgfogHsMBVE/RFNno467U/KBqfUywEH+HK+0rtnwsdI=
github.com/pion/sctp v1.7.10 h1:o3p3/hZB5Cx12RMGyWmItevJtZ6o2cpuxaw6GOS4x+8= github.com/pion/sctp v1.7.10 h1:o3p3/hZB5Cx12RMGyWmItevJtZ6o2cpuxaw6GOS4x+8=
github.com/pion/sctp v1.7.10/go.mod h1:EhpTUQu1/lcK3xI+eriS6/96fWetHGCvBi9MSsnaBN0= github.com/pion/sctp v1.7.10/go.mod h1:EhpTUQu1/lcK3xI+eriS6/96fWetHGCvBi9MSsnaBN0=
github.com/pion/sctp v1.7.11 h1:UCnj7MsobLKLuP/Hh+JMiI/6W5Bs/VF45lWKgHFjSIE= github.com/pion/sdp/v3 v3.0.1 h1:we4OyeTT6s+6sxrAKy/LVywskx1dzUQlh4xu3b+iAs0=
github.com/pion/sctp v1.7.11/go.mod h1:EhpTUQu1/lcK3xI+eriS6/96fWetHGCvBi9MSsnaBN0= github.com/pion/sdp/v3 v3.0.1/go.mod h1:bNiSknmJE0HYBprTHXKPQ3+JjacTv5uap92ueJZKsRk=
github.com/pion/sdp/v3 v3.0.2 h1:UNnSPVaMM+Pdu/mR9UvAyyo6zkdYbKeuOooCwZvTl/g= github.com/pion/srtp v1.5.1 h1:9Q3jAfslYZBt+C69SI/ZcONJh9049JUHZWYRRf5KEKw=
github.com/pion/sdp/v3 v3.0.2/go.mod h1:bNiSknmJE0HYBprTHXKPQ3+JjacTv5uap92ueJZKsRk= github.com/pion/srtp v1.5.1/go.mod h1:B+QgX5xPeQTNc1CJStJPHzOlHK66ViMDWTT0HZTCkcA=
github.com/pion/srtp v1.5.2 h1:25DmvH+fqKZDqvX64vTwnycVwL9ooJxHF/gkX16bDBY=
github.com/pion/srtp v1.5.2/go.mod h1:NiBff/MSxUwMUwx/fRNyD/xGE+dVvf8BOCeXhjCXZ9U=
github.com/pion/stun v0.3.5 h1:uLUCBCkQby4S1cf6CGuR9QrVOKcvUwFeemaC865QHDg= github.com/pion/stun v0.3.5 h1:uLUCBCkQby4S1cf6CGuR9QrVOKcvUwFeemaC865QHDg=
github.com/pion/stun v0.3.5/go.mod h1:gDMim+47EeEtfWogA37n6qXZS88L5V6LqFcf+DZA2UA= github.com/pion/stun v0.3.5/go.mod h1:gDMim+47EeEtfWogA37n6qXZS88L5V6LqFcf+DZA2UA=
github.com/pion/transport v0.8.10/go.mod h1:tBmha/UCjpum5hqTWhfAEs3CO4/tHSg0MYRhSzR+CZ8= github.com/pion/transport v0.8.10/go.mod h1:tBmha/UCjpum5hqTWhfAEs3CO4/tHSg0MYRhSzR+CZ8=
@@ -226,14 +226,12 @@ github.com/pion/turn/v2 v2.0.4 h1:oDguhEv2L/4rxwbL9clGLgtzQPjtuZwCdoM7Te8vQVk=
github.com/pion/turn/v2 v2.0.4/go.mod h1:1812p4DcGVbYVBTiraUmP50XoKye++AMkbfp+N27mog= github.com/pion/turn/v2 v2.0.4/go.mod h1:1812p4DcGVbYVBTiraUmP50XoKye++AMkbfp+N27mog=
github.com/pion/udp v0.1.0 h1:uGxQsNyrqG3GLINv36Ff60covYmfrLoxzwnCsIYspXI= github.com/pion/udp v0.1.0 h1:uGxQsNyrqG3GLINv36Ff60covYmfrLoxzwnCsIYspXI=
github.com/pion/udp v0.1.0/go.mod h1:BPELIjbwE9PRbd/zxI/KYBnbo7B6+oA6YuEaNE8lths= github.com/pion/udp v0.1.0/go.mod h1:BPELIjbwE9PRbd/zxI/KYBnbo7B6+oA6YuEaNE8lths=
github.com/pion/webrtc/v3 v3.0.0-beta.10 h1:1aBn9jv/oe4v2Uf47HutWIjg2i2ZP/O7HqpgKPqSuhE= github.com/pion/webrtc/v3 v3.0.0-beta.5 h1:A1hApeo77Kp2sHWwd53rIHZZm5W9JXEn//5jHj9+fdc=
github.com/pion/webrtc/v3 v3.0.0-beta.10/go.mod h1:GlriYYHJ5KkNsCunm3oFDPql4TDTrrNoI9iSWWSnafA= github.com/pion/webrtc/v3 v3.0.0-beta.5/go.mod h1:HjQaNPq5SdNazGlwiim6/lWBkZD97mPPyDttr7byNA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/profile v1.5.0 h1:042Buzk+NhDI+DeSAA62RwJL8VAuZUMQZUjCsRz1Mug=
github.com/pkg/profile v1.5.0/go.mod h1:qBsxPvzyUincmltOk6iyRVxHYg4adc0OFOv72ZdLa18=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
@@ -352,8 +350,6 @@ golang.org/x/crypto v0.0.0-20200604202706-70a84ac30bf9 h1:vEg9joUBmeBcK9iSJftGNf
golang.org/x/crypto v0.0.0-20200604202706-70a84ac30bf9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200604202706-70a84ac30bf9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a h1:vclmkQCjlDX5OydZ9wv8rBCcS0QyQY66Mpf/7BZbInM=
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
@@ -389,8 +385,6 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200822124328-c89045814202 h1:VvcQYSHwXgi7W+TpUR6A9g6Up98WAHf3f/ulnJ62IyA= golang.org/x/net v0.0.0-20200822124328-c89045814202 h1:VvcQYSHwXgi7W+TpUR6A9g6Up98WAHf3f/ulnJ62IyA=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201002202402-0a1ea396d57c h1:dk0ukUIHmGHqASjP0iue2261isepFCC6XRCSd1nHgDw=
golang.org/x/net v0.0.0-20201002202402-0a1ea396d57c/go.mod h1:iQL9McJNjoIa5mjH6nYTCTZXUN6RP+XW3eib7Ya3XcI=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@@ -423,10 +417,6 @@ golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200610111108-226ff32320da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200610111108-226ff32320da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80= golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200926100807-9d91bd62050c h1:38q6VNPWR010vN82/SB121GujZNIfAUb4YttE2rhGuc=
golang.org/x/sys v0.0.0-20200926100807-9d91bd62050c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -456,8 +446,6 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/api v0.0.0-20180910000450-7ca32eb868bf/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0= google.golang.org/api v0.0.0-20180910000450-7ca32eb868bf/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
google.golang.org/api v0.0.0-20181030000543-1d582fd0359e/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0= google.golang.org/api v0.0.0-20181030000543-1d582fd0359e/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
google.golang.org/api v0.1.0/go.mod h1:UGEZY7KEX120AnNLIHFMKIo4obdJhkp2tPbaPlQx13Y= google.golang.org/api v0.1.0/go.mod h1:UGEZY7KEX120AnNLIHFMKIo4obdJhkp2tPbaPlQx13Y=

View File

@@ -2,6 +2,7 @@
package config package config
import ( import (
"gitlab.crans.org/nounous/ghostream/transcoder/audio"
"net" "net"
"github.com/sherifabdlnaby/configuro" "github.com/sherifabdlnaby/configuro"
@@ -59,6 +60,10 @@ func New() *Config {
ListenAddress: ":8023", ListenAddress: ":8023",
}, },
Transcoder: transcoder.Options{ Transcoder: transcoder.Options{
Audio: audio.Options{
Enabled: true,
Bitrate: 160,
},
Text: text.Options{ Text: text.Options{
Enabled: false, Enabled: false,
Width: 80, Width: 80,
@@ -78,7 +83,7 @@ func New() *Config {
}, },
WebRTC: webrtc.Options{ WebRTC: webrtc.Options{
Enabled: true, Enabled: true,
MaxPortUDP: 11000, MaxPortUDP: 10005,
MinPortUDP: 10000, MinPortUDP: 10000,
STUNServers: []string{"stun:stun.l.google.com:19302"}, STUNServers: []string{"stun:stun.l.google.com:19302"},
}, },
@@ -117,7 +122,7 @@ func Load() (*Config, error) {
// If no credentials register, add demo account with password "demo" // If no credentials register, add demo account with password "demo"
if len(cfg.Auth.Basic.Credentials) < 1 { if len(cfg.Auth.Basic.Credentials) < 1 {
cfg.Auth.Basic.Credentials["demo"] = "$2b$10$xuU7XFwmRX2CMgdSaA8rM.4Y8.BtRNzhUedwN0G8tCegDRNUERTCS" cfg.Auth.Basic.Credentials["demo"] = "$2b$15$LRnG3eIHFlYIguTxZOLH7eHwbQC/vqjnLq6nDFiHSUDKIU.f5/1H6"
} }
return cfg, nil return cfg, nil

19
main.go
View File

@@ -7,11 +7,10 @@ package main
import ( import (
"log" "log"
"github.com/pkg/profile"
"gitlab.crans.org/nounous/ghostream/auth" "gitlab.crans.org/nounous/ghostream/auth"
"gitlab.crans.org/nounous/ghostream/internal/config" "gitlab.crans.org/nounous/ghostream/internal/config"
"gitlab.crans.org/nounous/ghostream/internal/monitoring" "gitlab.crans.org/nounous/ghostream/internal/monitoring"
"gitlab.crans.org/nounous/ghostream/messaging" "gitlab.crans.org/nounous/ghostream/stream"
"gitlab.crans.org/nounous/ghostream/stream/forwarding" "gitlab.crans.org/nounous/ghostream/stream/forwarding"
"gitlab.crans.org/nounous/ghostream/stream/srt" "gitlab.crans.org/nounous/ghostream/stream/srt"
"gitlab.crans.org/nounous/ghostream/stream/telnet" "gitlab.crans.org/nounous/ghostream/stream/telnet"
@@ -21,9 +20,6 @@ import (
) )
func main() { func main() {
// TODO Don't always profile if not needed
defer profile.Start().Stop()
// Configure logger // Configure logger
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile) log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
@@ -42,8 +38,15 @@ func main() {
defer authBackend.Close() defer authBackend.Close()
} }
// WebRTC session description channels
remoteSdpChan := make(chan struct {
StreamID string
RemoteDescription webrtc.SessionDescription
})
localSdpChan := make(chan webrtc.SessionDescription)
// Init streams messaging // Init streams messaging
streams := messaging.New() streams := make(map[string]*stream.Stream)
// Start routines // Start routines
go transcoder.Init(streams, &cfg.Transcoder) go transcoder.Init(streams, &cfg.Transcoder)
@@ -51,8 +54,8 @@ func main() {
go monitoring.Serve(&cfg.Monitoring) go monitoring.Serve(&cfg.Monitoring)
go srt.Serve(streams, authBackend, &cfg.Srt) go srt.Serve(streams, authBackend, &cfg.Srt)
go telnet.Serve(streams, &cfg.Telnet) go telnet.Serve(streams, &cfg.Telnet)
go web.Serve(streams, &cfg.Web) go web.Serve(streams, remoteSdpChan, localSdpChan, &cfg.Web)
go webrtc.Serve(streams, &cfg.WebRTC) go webrtc.Serve(streams, remoteSdpChan, localSdpChan, &cfg.WebRTC)
// Wait for routines // Wait for routines
select {} select {}

View File

@@ -1,89 +0,0 @@
// Package messaging defines a structure to communication between inputs and outputs
package messaging
import (
"sync"
"github.com/pion/webrtc/v3"
)
// Quality holds a specific stream quality.
// It makes packages able to subscribe to an incoming stream.
type Quality struct {
// Incoming data come from this channel
Broadcast chan<- []byte
// Incoming data will be outputted to all those outputs.
// Use a map to be able to delete an item.
outputs map[chan []byte]struct{}
// Mutex to lock outputs map
lockOutputs sync.Mutex
// WebRTC session descriptor exchange.
// When new client connects, a SDP arrives on WebRtcRemoteSdp,
// then webrtc package answers on WebRtcLocalSdp.
WebRtcLocalSdp chan webrtc.SessionDescription
WebRtcRemoteSdp chan webrtc.SessionDescription
}
func newQuality() (q *Quality) {
q = &Quality{}
broadcast := make(chan []byte, 1024)
q.Broadcast = broadcast
q.outputs = make(map[chan []byte]struct{})
q.WebRtcLocalSdp = make(chan webrtc.SessionDescription, 1)
q.WebRtcRemoteSdp = make(chan webrtc.SessionDescription, 1)
go q.run(broadcast)
return q
}
func (q *Quality) run(broadcast <-chan []byte) {
for msg := range broadcast {
q.lockOutputs.Lock()
for output := range q.outputs {
select {
case output <- msg:
default:
// If full, do a ring buffer
// Check that output is not of size zero
if len(output) > 1 {
<-output
}
}
}
q.lockOutputs.Unlock()
}
// Incoming chan has been closed, close all outputs
q.lockOutputs.Lock()
for ch := range q.outputs {
delete(q.outputs, ch)
close(ch)
}
q.lockOutputs.Unlock()
}
// Close the incoming chan, this will also delete all outputs.
func (q *Quality) Close() {
close(q.Broadcast)
}
// Register a new output on a stream.
func (q *Quality) Register(output chan []byte) {
q.lockOutputs.Lock()
q.outputs[output] = struct{}{}
q.lockOutputs.Unlock()
}
// Unregister removes an output.
func (q *Quality) Unregister(output chan []byte) {
// Make sure we did not already close this output
q.lockOutputs.Lock()
_, ok := q.outputs[output]
if ok {
delete(q.outputs, output)
close(output)
}
defer q.lockOutputs.Unlock()
}

View File

@@ -1,84 +0,0 @@
// Package messaging defines a structure to communication between inputs and outputs
package messaging
import (
"errors"
"sync"
)
// Stream makes packages able to subscribe to an incoming stream
type Stream struct {
// Different qualities of this stream
qualities map[string]*Quality
// Mutex to lock outputs map
lockQualities sync.Mutex
// Count clients for statistics
nbClients int
}
func newStream() (s *Stream) {
s = &Stream{}
s.qualities = make(map[string]*Quality)
s.nbClients = 0
return s
}
// Close stream.
func (s *Stream) Close() {
for quality := range s.qualities {
s.DeleteQuality(quality)
}
}
// CreateQuality creates a new quality associated with this stream.
func (s *Stream) CreateQuality(name string) (quality *Quality, err error) {
// If quality already exist, fail
if _, ok := s.qualities[name]; ok {
return nil, errors.New("quality already exists")
}
s.lockQualities.Lock()
quality = newQuality()
s.qualities[name] = quality
s.lockQualities.Unlock()
return quality, nil
}
// DeleteQuality removes a stream quality.
func (s *Stream) DeleteQuality(name string) {
// Make sure we did not already close this output
s.lockQualities.Lock()
if _, ok := s.qualities[name]; ok {
s.qualities[name].Close()
delete(s.qualities, name)
}
s.lockQualities.Unlock()
}
// GetQuality gets a specific stream quality.
func (s *Stream) GetQuality(name string) (quality *Quality, err error) {
s.lockQualities.Lock()
quality, ok := s.qualities[name]
s.lockQualities.Unlock()
if !ok {
return nil, errors.New("quality does not exist")
}
return quality, nil
}
// ClientCount returns the number of clients.
func (s *Stream) ClientCount() int {
return s.nbClients
}
// IncrementClientCount increments the number of clients.
func (s *Stream) IncrementClientCount() {
s.nbClients++
}
// DecrementClientCount decrements the number of clients.
func (s *Stream) DecrementClientCount() {
s.nbClients--
}

View File

@@ -1,98 +0,0 @@
// Package messaging defines a structure to communication between inputs and outputs
package messaging
import (
"errors"
"log"
"sync"
)
// Streams hold all application streams.
type Streams struct {
// Associate each stream name to the stream
streams map[string]*Stream
// Mutex to lock streams
lockStreams sync.Mutex
// Subscribers get notified when a new stream is created
// Use a map to be able to delete a subscriber
eventSubscribers map[chan string]struct{}
// Mutex to lock eventSubscribers
lockSubscribers sync.Mutex
}
// New creates a new stream list.
func New() (l *Streams) {
l = &Streams{}
l.streams = make(map[string]*Stream)
l.eventSubscribers = make(map[chan string]struct{})
return l
}
// Subscribe to get notified on new stream.
func (l *Streams) Subscribe(output chan string) {
l.lockSubscribers.Lock()
l.eventSubscribers[output] = struct{}{}
l.lockSubscribers.Unlock()
}
// Unsubscribe to no longer get notified on new stream.
func (l *Streams) Unsubscribe(output chan string) {
// Make sure we did not already delete this subscriber
l.lockSubscribers.Lock()
if _, ok := l.eventSubscribers[output]; ok {
delete(l.eventSubscribers, output)
}
l.lockSubscribers.Unlock()
}
// Create a new stream.
func (l *Streams) Create(name string) (s *Stream, err error) {
// If stream already exist, fail
if _, ok := l.streams[name]; ok {
return nil, errors.New("stream already exists")
}
// Create stream
s = newStream()
l.lockStreams.Lock()
l.streams[name] = s
l.lockStreams.Unlock()
// Notify
l.lockSubscribers.Lock()
for sub := range l.eventSubscribers {
select {
case sub <- name:
default:
log.Printf("Failed to announce stream '%s' to subscriber", name)
}
}
l.lockSubscribers.Unlock()
return s, nil
}
// Get a stream.
func (l *Streams) Get(name string) (s *Stream, err error) {
// If stream does exist, return it
l.lockStreams.Lock()
s, ok := l.streams[name]
l.lockStreams.Unlock()
if !ok {
return nil, errors.New("stream does not exist")
}
return s, nil
}
// Delete a stream.
func (l *Streams) Delete(name string) {
// Make sure we did not already delete this stream
l.lockStreams.Lock()
if _, ok := l.streams[name]; ok {
l.streams[name].Close()
delete(l.streams, name)
}
l.lockStreams.Unlock()
}

View File

@@ -1,55 +0,0 @@
package messaging
import "testing"
func TestWithOneStream(t *testing.T) {
streams := New()
// Subscribe to new streams
event := make(chan string, 8)
streams.Subscribe(event)
// Create a stream
stream, err := streams.Create("demo")
if err != nil {
t.Errorf("Failed to create stream")
}
// Check that we receive the creation event
e := <-event
if e != "demo" {
t.Errorf("Message has wrong content: %s != demo", e)
}
// Create a quality
quality, err := stream.CreateQuality("source")
if err != nil {
t.Errorf("Failed to create quality")
}
// Register one output
output := make(chan []byte, 64)
quality.Register(output)
stream.IncrementClientCount()
// Try to pass one message
quality.Broadcast <- []byte("hello world")
msg := <-output
if string(msg) != "hello world" {
t.Errorf("Message has wrong content: %s != hello world", msg)
}
// Check client count
if count := stream.ClientCount(); count != 1 {
t.Errorf("Client counter returned %d, expected 1", count)
}
// Unregister
quality.Unregister(output)
stream.DecrementClientCount()
// Check client count
if count := stream.ClientCount(); count != 0 {
t.Errorf("Client counter returned %d, expected 0", count)
}
}

View File

@@ -3,13 +3,11 @@ package forwarding
import ( import (
"bufio" "bufio"
"fmt"
"log" "log"
"os/exec" "os/exec"
"strings"
"time" "time"
"gitlab.crans.org/nounous/ghostream/messaging" "gitlab.crans.org/nounous/ghostream/stream"
) )
// Options to configure the stream forwarding. // Options to configure the stream forwarding.
@@ -17,65 +15,43 @@ import (
type Options map[string][]string type Options map[string][]string
// Serve handles incoming packets from SRT and forward them to other external services // Serve handles incoming packets from SRT and forward them to other external services
func Serve(streams *messaging.Streams, cfg Options) { func Serve(streams map[string]*stream.Stream, cfg Options) {
if len(cfg) < 1 { if len(cfg) < 1 {
// No forwarding, ignore // No forwarding, ignore
return return
} }
// Subscribe to new stream event
event := make(chan string, 8)
streams.Subscribe(event)
log.Printf("Stream forwarding initialized") log.Printf("Stream forwarding initialized")
for {
// For each new stream for name, st := range streams {
for name := range event { fwdCfg, ok := cfg[name]
streamCfg, ok := cfg[name]
if !ok { if !ok {
// Not configured // Not configured
continue continue
} }
// Get stream
stream, err := streams.Get(name)
if err != nil {
log.Printf("Failed to get stream '%s'", name)
}
// Get specific quality
// FIXME: make it possible to forward other qualities
qualityName := "source"
quality, err := stream.GetQuality(qualityName)
if err != nil {
log.Printf("Failed to get quality '%s'", qualityName)
}
// Start forwarding // Start forwarding
log.Printf("Starting forwarding for '%s' quality '%s'", name, qualityName) log.Printf("Starting forwarding for '%s'", name)
go forward(name, quality, streamCfg) go forward(st, fwdCfg)
}
// Regulary pull stream list,
// it may be better to tweak the messaging system
// to get an event on a new stream.
time.Sleep(time.Second)
} }
} }
// Start a FFMPEG instance and redirect stream output to forwarded streams // Start a FFMPEG instance and redirect stream output to forwarded streams
func forward(streamName string, q *messaging.Quality, fwdCfg []string) { func forward(st *stream.Stream, fwdCfg []string) {
output := make(chan []byte, 1024) output := make(chan []byte, 1024)
q.Register(output) st.Register(output)
// Launch FFMPEG instance // Launch FFMPEG instance
params := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0"} params := []string{"-hide_banner", "-loglevel", "error", "-re", "-i", "pipe:0"}
for _, url := range fwdCfg { for _, url := range fwdCfg {
// If the url should be date-formatted, replace special characters with the current time information
now := time.Now()
formattedURL := strings.ReplaceAll(url, "%Y", fmt.Sprintf("%04d", now.Year()))
formattedURL = strings.ReplaceAll(formattedURL, "%m", fmt.Sprintf("%02d", now.Month()))
formattedURL = strings.ReplaceAll(formattedURL, "%d", fmt.Sprintf("%02d", now.Day()))
formattedURL = strings.ReplaceAll(formattedURL, "%H", fmt.Sprintf("%02d", now.Hour()))
formattedURL = strings.ReplaceAll(formattedURL, "%M", fmt.Sprintf("%02d", now.Minute()))
formattedURL = strings.ReplaceAll(formattedURL, "%S", fmt.Sprintf("%02d", now.Second()))
formattedURL = strings.ReplaceAll(formattedURL, "%name", streamName)
params = append(params, "-f", "flv", "-preset", "ultrafast", "-tune", "zerolatency", params = append(params, "-f", "flv", "-preset", "ultrafast", "-tune", "zerolatency",
"-c", "copy", formattedURL) "-c", "copy", url)
} }
ffmpeg := exec.Command("ffmpeg", params...) ffmpeg := exec.Command("ffmpeg", params...)
@@ -101,14 +77,14 @@ func forward(streamName string, q *messaging.Quality, fwdCfg []string) {
_ = input.Close() _ = input.Close()
_ = errOutput.Close() _ = errOutput.Close()
_ = ffmpeg.Process.Kill() _ = ffmpeg.Process.Kill()
q.Unregister(output) st.Unregister(output)
}() }()
// Log standard error output // Log standard error output
go func() { go func() {
scanner := bufio.NewScanner(errOutput) scanner := bufio.NewScanner(errOutput)
for scanner.Scan() { for scanner.Scan() {
log.Printf("[FORWARDING FFMPEG %s] %s", streamName, scanner.Text()) log.Printf("[FORWARDING FFMPEG] %s", scanner.Text())
} }
}() }()

View File

@@ -6,7 +6,7 @@ import (
"testing" "testing"
"time" "time"
"gitlab.crans.org/nounous/ghostream/messaging" "gitlab.crans.org/nounous/ghostream/stream"
"gitlab.crans.org/nounous/ghostream/stream/srt" "gitlab.crans.org/nounous/ghostream/stream/srt"
) )
@@ -35,7 +35,7 @@ func TestForwardStream(t *testing.T) {
cfg["demo"] = []string{"rtmp://127.0.0.1:1936/live/app"} cfg["demo"] = []string{"rtmp://127.0.0.1:1936/live/app"}
// Register forwarding stream list // Register forwarding stream list
streams := messaging.New() streams := make(map[string]*stream.Stream)
go Serve(streams, cfg) go Serve(streams, cfg)
// Serve SRT Server without authentification backend // Serve SRT Server without authentification backend

99
stream/messaging.go Normal file
View File

@@ -0,0 +1,99 @@
// Package stream defines a structure to communication between inputs and outputs
package stream
import (
"sync"
)
// Stream makes packages able to subscribe to an incoming stream
type Stream struct {
// Incoming data come from this channel
Broadcast chan<- []byte
// Use a map to be able to delete an item
outputs map[chan []byte]struct{}
// Count clients for statistics
nbClients int
// Mutex to lock outputs map
lock sync.Mutex
}
// New creates a new stream.
func New() *Stream {
s := &Stream{}
broadcast := make(chan []byte, 1024)
s.Broadcast = broadcast
s.outputs = make(map[chan []byte]struct{})
s.nbClients = 0
go s.run(broadcast)
return s
}
func (s *Stream) run(broadcast <-chan []byte) {
for msg := range broadcast {
s.lock.Lock()
for output := range s.outputs {
select {
case output <- msg:
default:
// If full, do a ring buffer
// Check that output is not of size zero
if len(output) > 1 {
<-output
}
}
}
s.lock.Unlock()
}
// Incoming chan has been closed, close all outputs
s.lock.Lock()
for ch := range s.outputs {
delete(s.outputs, ch)
close(ch)
}
s.lock.Unlock()
}
// Close the incoming chan, this will also delete all outputs
func (s *Stream) Close() {
close(s.Broadcast)
}
// Register a new output on a stream.
func (s *Stream) Register(output chan []byte) {
s.lock.Lock()
defer s.lock.Unlock()
s.outputs[output] = struct{}{}
}
// Unregister removes an output.
// If hidden in true, then do not count this client.
func (s *Stream) Unregister(output chan []byte) {
s.lock.Lock()
defer s.lock.Unlock()
// Make sure we did not already close this output
_, ok := s.outputs[output]
if ok {
delete(s.outputs, output)
close(output)
}
}
// ClientCount returns the number of clients
func (s *Stream) ClientCount() int {
return s.nbClients
}
// IncrementClientCount increments the number of clients
func (s *Stream) IncrementClientCount() {
s.nbClients++
}
// DecrementClientCount decrements the number of clients
func (s *Stream) DecrementClientCount() {
s.nbClients--
}

42
stream/messaging_test.go Normal file
View File

@@ -0,0 +1,42 @@
package stream
import (
"testing"
)
func TestWithoutOutputs(t *testing.T) {
stream := New()
defer stream.Close()
stream.Broadcast <- []byte("hello world")
}
func TestWithOneOutput(t *testing.T) {
stream := New()
defer stream.Close()
// Register one output
output := make(chan []byte, 64)
stream.Register(output)
stream.IncrementClientCount()
// Try to pass one message
stream.Broadcast <- []byte("hello world")
msg := <-output
if string(msg) != "hello world" {
t.Errorf("Message has wrong content: %s != hello world", msg)
}
// Check client count
if count := stream.ClientCount(); count != 1 {
t.Errorf("Client counter returned %d, expected 1", count)
}
// Unregister
stream.Unregister(output)
stream.DecrementClientCount()
// Check client count
if count := stream.ClientCount(); count != 0 {
t.Errorf("Client counter returned %d, expected 0", count)
}
}

View File

@@ -5,26 +5,21 @@ import (
"log" "log"
"github.com/haivision/srtgo" "github.com/haivision/srtgo"
"gitlab.crans.org/nounous/ghostream/messaging" "gitlab.crans.org/nounous/ghostream/stream"
) )
func handleStreamer(socket *srtgo.SrtSocket, streams *messaging.Streams, name string) { func handleStreamer(socket *srtgo.SrtSocket, streams map[string]*stream.Stream, name string) {
// Create stream // Check stream does not exist
stream, err := streams.Create(name) if _, ok := streams[name]; ok {
if err != nil { log.Print("Stream already exists, refusing new streamer")
log.Printf("Error on stream creating: %s", err)
socket.Close() socket.Close()
return return
} }
// Create source quality // Create stream
q, err := stream.CreateQuality("source") log.Printf("New SRT streamer for stream %s", name)
if err != nil { st := stream.New()
log.Printf("Error on quality creating: %s", err) streams[name] = st
socket.Close()
return
}
log.Printf("New SRT streamer for stream '%s' quality 'source'", name)
// Read RTP packets forever and send them to the WebRTC Client // Read RTP packets forever and send them to the WebRTC Client
for { for {
@@ -47,38 +42,29 @@ func handleStreamer(socket *srtgo.SrtSocket, streams *messaging.Streams, name st
// Send raw data to other streams // Send raw data to other streams
buff = buff[:n] buff = buff[:n]
q.Broadcast <- buff st.Broadcast <- buff
} }
// Close stream // Close stream
streams.Delete(name) st.Close()
socket.Close() socket.Close()
delete(streams, name)
} }
func handleViewer(socket *srtgo.SrtSocket, streams *messaging.Streams, name string) { func handleViewer(s *srtgo.SrtSocket, streams map[string]*stream.Stream, name string) {
log.Printf("New SRT viewer for stream %s", name)
// Get requested stream // Get requested stream
stream, err := streams.Get(name) st, ok := streams[name]
if err != nil { if !ok {
log.Printf("Failed to get stream: %s", err) log.Println("Stream does not exist, refusing new viewer")
socket.Close()
return return
} }
// Get requested quality
// FIXME: make qualities available
qualityName := "source"
q, err := stream.GetQuality(qualityName)
if err != nil {
log.Printf("Failed to get quality: %s", err)
socket.Close()
return
}
log.Printf("New SRT viewer for stream %s quality %s", name, qualityName)
// Register new output // Register new output
c := make(chan []byte, 1024) c := make(chan []byte, 1024)
q.Register(c) st.Register(c)
stream.IncrementClientCount() st.IncrementClientCount()
// Receive data and send them // Receive data and send them
for data := range c { for data := range c {
@@ -88,7 +74,7 @@ func handleViewer(socket *srtgo.SrtSocket, streams *messaging.Streams, name stri
} }
// Send data // Send data
_, err := socket.Write(data, 1000) _, err := s.Write(data, 1000)
if err != nil { if err != nil {
log.Printf("Remove SRT viewer because of sending error, %s", err) log.Printf("Remove SRT viewer because of sending error, %s", err)
break break
@@ -96,7 +82,7 @@ func handleViewer(socket *srtgo.SrtSocket, streams *messaging.Streams, name stri
} }
// Close output // Close output
q.Unregister(c) st.Unregister(c)
stream.DecrementClientCount() st.DecrementClientCount()
socket.Close() s.Close()
} }

View File

@@ -1,6 +1,9 @@
// Package srt serves a SRT server // Package srt serves a SRT server
package srt package srt
// #include <srt/srt.h>
import "C"
import ( import (
"log" "log"
"net" "net"
@@ -9,7 +12,7 @@ import (
"github.com/haivision/srtgo" "github.com/haivision/srtgo"
"gitlab.crans.org/nounous/ghostream/auth" "gitlab.crans.org/nounous/ghostream/auth"
"gitlab.crans.org/nounous/ghostream/messaging" "gitlab.crans.org/nounous/ghostream/stream"
) )
// Options holds web package configuration // Options holds web package configuration
@@ -36,7 +39,7 @@ func splitHostPort(hostport string) (string, uint16, error) {
} }
// Serve SRT server // Serve SRT server
func Serve(streams *messaging.Streams, authBackend auth.Backend, cfg *Options) { func Serve(streams map[string]*stream.Stream, authBackend auth.Backend, cfg *Options) {
if !cfg.Enabled { if !cfg.Enabled {
// SRT is not enabled, ignore // SRT is not enabled, ignore
return return
@@ -59,7 +62,7 @@ func Serve(streams *messaging.Streams, authBackend auth.Backend, cfg *Options) {
for { for {
// Wait for new connection // Wait for new connection
s, _, err := sck.Accept() s, err := sck.Accept()
if err != nil { if err != nil {
// Something wrong happened // Something wrong happened
log.Println(err) log.Println(err)
@@ -70,7 +73,7 @@ func Serve(streams *messaging.Streams, authBackend auth.Backend, cfg *Options) {
// Without this, the SRT buffer might get full before reading it // Without this, the SRT buffer might get full before reading it
// streamid can be "name:password" for streamer or "name" for viewer // streamid can be "name:password" for streamer or "name" for viewer
streamID, err := s.GetSockOptString(srtgo.SRTO_STREAMID) streamID, err := s.GetSockOptString(C.SRTO_STREAMID)
if err != nil { if err != nil {
log.Print("Failed to get socket streamid") log.Print("Failed to get socket streamid")
continue continue

View File

@@ -6,7 +6,7 @@ import (
"testing" "testing"
"time" "time"
"gitlab.crans.org/nounous/ghostream/messaging" "gitlab.crans.org/nounous/ghostream/stream"
) )
// TestSplitHostPort Try to split a host like 127.0.0.1:1234 in host, port (127.0.0.1, 1234à // TestSplitHostPort Try to split a host like 127.0.0.1:1234 in host, port (127.0.0.1, 1234à
@@ -58,7 +58,7 @@ func TestServeSRT(t *testing.T) {
} }
// Init streams messaging and SRT server // Init streams messaging and SRT server
streams := messaging.New() streams := make(map[string]*stream.Stream)
go Serve(streams, nil, &Options{Enabled: true, ListenAddress: ":9711", MaxClients: 2}) go Serve(streams, nil, &Options{Enabled: true, ListenAddress: ":9711", MaxClients: 2})
ffmpeg := exec.Command("ffmpeg", "-hide_banner", "-loglevel", "error", ffmpeg := exec.Command("ffmpeg", "-hide_banner", "-loglevel", "error",

View File

@@ -7,7 +7,7 @@ import (
"strings" "strings"
"time" "time"
"gitlab.crans.org/nounous/ghostream/messaging" "gitlab.crans.org/nounous/ghostream/stream"
) )
// Options holds telnet package configuration // Options holds telnet package configuration
@@ -17,7 +17,7 @@ type Options struct {
} }
// Serve Telnet server // Serve Telnet server
func Serve(streams *messaging.Streams, cfg *Options) { func Serve(streams map[string]*stream.Stream, cfg *Options) {
if !cfg.Enabled { if !cfg.Enabled {
// Telnet is not enabled, ignore // Telnet is not enabled, ignore
return return
@@ -32,17 +32,17 @@ func Serve(streams *messaging.Streams, cfg *Options) {
// Handle each new client // Handle each new client
for { for {
socket, err := listener.Accept() s, err := listener.Accept()
if err != nil { if err != nil {
log.Printf("Error while accepting TCP socket: %s", err) log.Printf("Error while accepting TCP socket: %s", s)
continue continue
} }
go handleViewer(socket, streams, cfg) go handleViewer(s, streams, cfg)
} }
} }
func handleViewer(s net.Conn, streams *messaging.Streams, cfg *Options) { func handleViewer(s net.Conn, streams map[string]*stream.Stream, cfg *Options) {
// Prompt user about stream name // Prompt user about stream name
if _, err := s.Write([]byte("[GHOSTREAM]\nEnter stream name: ")); err != nil { if _, err := s.Write([]byte("[GHOSTREAM]\nEnter stream name: ")); err != nil {
log.Printf("Error while writing to TCP socket: %s", err) log.Printf("Error while writing to TCP socket: %s", err)
@@ -56,7 +56,7 @@ func handleViewer(s net.Conn, streams *messaging.Streams, cfg *Options) {
s.Close() s.Close()
return return
} }
name := strings.TrimSpace(string(buff[:n])) name := strings.TrimSpace(string(buff[:n])) + "@text"
if len(name) < 1 { if len(name) < 1 {
// Too short, exit // Too short, exit
s.Close() s.Close()
@@ -67,9 +67,9 @@ func handleViewer(s net.Conn, streams *messaging.Streams, cfg *Options) {
time.Sleep(time.Second) time.Sleep(time.Second)
// Get requested stream // Get requested stream
stream, err := streams.Get(name) st, ok := streams[name]
if err != nil { if !ok {
log.Printf("Kicking new Telnet viewer: %s", err) log.Println("Stream does not exist, kicking new Telnet viewer")
if _, err := s.Write([]byte("This stream is inactive.\n")); err != nil { if _, err := s.Write([]byte("This stream is inactive.\n")); err != nil {
log.Printf("Error while writing to TCP socket: %s", err) log.Printf("Error while writing to TCP socket: %s", err)
} }
@@ -77,23 +77,11 @@ func handleViewer(s net.Conn, streams *messaging.Streams, cfg *Options) {
return return
} }
// Get requested quality
qualityName := "text"
q, err := stream.GetQuality(qualityName)
if err != nil {
log.Printf("Kicking new Telnet viewer: %s", err)
if _, err := s.Write([]byte("This stream is not converted to text.\n")); err != nil {
log.Printf("Error while writing to TCP socket: %s", err)
}
s.Close()
return
}
log.Printf("New Telnet viewer for stream %s quality %s", name, qualityName)
// Register new client // Register new client
log.Printf("New Telnet viewer for stream '%s'", name)
c := make(chan []byte, 128) c := make(chan []byte, 128)
q.Register(c) st.Register(c)
stream.IncrementClientCount() st.IncrementClientCount()
// Hide terminal cursor // Hide terminal cursor
if _, err = s.Write([]byte("\033[?25l")); err != nil { if _, err = s.Write([]byte("\033[?25l")); err != nil {
@@ -118,7 +106,7 @@ func handleViewer(s net.Conn, streams *messaging.Streams, cfg *Options) {
} }
// Close output // Close output
q.Unregister(c) st.Unregister(c)
stream.DecrementClientCount() st.DecrementClientCount()
s.Close() s.Close()
} }

View File

@@ -3,13 +3,13 @@ package telnet
import ( import (
"testing" "testing"
"gitlab.crans.org/nounous/ghostream/messaging" "gitlab.crans.org/nounous/ghostream/stream"
) )
// TestTelnetOutput creates a TCP client that connects to the server and get one image. // TestTelnetOutput creates a TCP client that connects to the server and get one image.
func TestTelnetOutput(t *testing.T) { func TestTelnetOutput(t *testing.T) {
// Try to start Telnet server while it is disabled // Try to start Telnet server while it is disabled
streams := messaging.New() streams := make(map[string]*stream.Stream)
go Serve(streams, &Options{Enabled: false}) go Serve(streams, &Options{Enabled: false})
// FIXME test connect // FIXME test connect

View File

@@ -2,40 +2,166 @@
package webrtc package webrtc
import ( import (
"bufio" "github.com/3d0c/gmf"
"fmt"
"log"
"math/rand"
"net"
"os/exec"
"github.com/pion/rtp" "github.com/pion/rtp"
"github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3"
"gitlab.crans.org/nounous/ghostream/messaging" "gitlab.crans.org/nounous/ghostream/stream"
"log"
"net"
"strings"
"time"
) )
func ingest(name string, q *messaging.Quality) { var (
activeStream map[string]struct{}
)
func autoIngest(streams map[string]*stream.Stream) {
// Regulary check existing streams
activeStream = make(map[string]struct{})
for {
for name, st := range streams {
if strings.Contains(name, "@") {
// Not a source stream, pass
continue
}
if _, ok := activeStream[name]; ok {
// Stream is already ingested
continue
}
// Start ingestion
log.Printf("Starting webrtc for '%s'", name)
// FIXME Ensure that the audio stream exist, but that's poop code
time.Sleep(time.Second)
go ingest(name, st, streams[name+"@audio"])
}
// Regulary pull stream list,
// it may be better to tweak the messaging system
// to get an event on a new stream.
time.Sleep(time.Second)
}
}
func ingest(name string, input *stream.Stream, audio *stream.Stream) {
// Register to get stream // Register to get stream
videoInput := make(chan []byte, 1024) videoInput := make(chan []byte, 1024)
q.Register(videoInput) input.Register(videoInput)
audioInput := make(chan []byte, 1024)
audio.Register(audioInput)
activeStream[name] = struct{}{}
// FIXME Mux into RTP without having multiple UDP listeners inputCtx := gmf.NewCtx()
firstPort := int(rand.Int31n(63535)) + 2000 avioInputCtx, _ := gmf.NewAVIOContext(inputCtx, &gmf.AVIOHandlers{ReadPacket: func() ([]byte, int) {
data := <-audioInput
return data, len(data)
}})
log.Println("Open input")
inputCtx.SetPb(avioInputCtx).OpenInput("")
log.Println("Opened")
defer inputCtx.CloseInput()
defer avioInputCtx.Release()
// Open UDP listeners for RTP Packets if audioTracks[name] == nil {
audioListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: firstPort}) audioTracks[name] = make([]*webrtc.Track, 0)
}
udpListener, _ := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 1234})
outputCtx, _ := gmf.NewOutputCtxWithFormatName("rtp://127.0.0.1:1234", "rtp")
avioOutputCtx, _ := gmf.NewAVIOContext(outputCtx, &gmf.AVIOHandlers{WritePacket: func(data []byte) int {
n := len(data)
log.Printf("Read %d bytes", n)
return n
}})
// FIXME DON'T RAN AN UDP LISTENER, PLIZ GET DIRECTLY UDP PACKETS, WHY IS IT SO COMPLICATED????
// outputCtx.SetPb(avioOutputCtx)
defer outputCtx.CloseOutput()
defer avioOutputCtx.Release()
log.Printf("%d streams", inputCtx.StreamsCnt())
for i := 0; i < inputCtx.StreamsCnt(); i++ {
srcStream, err := inputCtx.GetStream(i)
if err != nil {
log.Println("GetStream error")
}
outputCtx.AddStreamWithCodeCtx(srcStream.CodecCtx())
}
outputCtx.Dump()
if err := outputCtx.WriteHeader(); err != nil {
log.Printf("Unable to write RTP header: %s", err)
}
// Receive audio data
go func() {
buff := make([]byte, 1500)
for {
n, _ := udpListener.Read(buff)
if n == 0 {
return
}
packet := &rtp.Packet{}
if err := packet.Unmarshal(buff[:n]); err != nil {
log.Printf("Failed to unmarshal RTP srtPacket: %s", err)
continue
}
if audioTracks[name] == nil {
audioTracks[name] = make([]*webrtc.Track, 0)
}
// Write RTP srtPacket to all audio tracks
// Adapt payload and SSRC to match destination
for _, audioTrack := range audioTracks[name] {
packet.Header.PayloadType = audioTrack.PayloadType()
packet.Header.SSRC = audioTrack.SSRC()
if writeErr := audioTrack.WriteRTP(packet); writeErr != nil {
log.Printf("Failed to write to audio track: %s", writeErr)
continue
}
}
}
}()
first := false
for packet := range inputCtx.GetNewPackets() {
if first { //if read from rtsp ,the first packets is wrong.
if err := outputCtx.WritePacketNoBuffer(packet); err != nil {
log.Printf("Error while writing packet: %s", err)
}
}
first = true
packet.Free()
}
select {}
// TODO Register to all substreams and make RTP packets. Don't transcode in this package.
/* // Open a UDP Listener for RTP Packets on port 5004
videoListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5004})
if err != nil { if err != nil {
log.Printf("Faited to open UDP listener %s", err) log.Printf("Faited to open UDP listener %s", err)
return return
} }
videoListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: firstPort + 1}) audioListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5005})
if err != nil { if err != nil {
log.Printf("Faited to open UDP listener %s", err) log.Printf("Faited to open UDP listener %s", err)
return return
} }
// Start ffmpag to convert videoInput to video and audio UDP // Start ffmpag to convert videoInput to video and audio UDP
ffmpeg, err := startFFmpeg(videoInput, firstPort) ffmpeg, err := startFFmpeg(videoInput)
if err != nil { if err != nil {
log.Printf("Error while starting ffmpeg: %s", err) log.Printf("Error while starting ffmpeg: %s", err)
return return
@@ -116,18 +242,18 @@ func ingest(name string, q *messaging.Quality) {
} }
if err = audioListener.Close(); err != nil { if err = audioListener.Close(); err != nil {
log.Printf("Faited to close UDP listener: %s", err) log.Printf("Faited to close UDP listener: %s", err)
} }*/
q.Unregister(videoInput) delete(activeStream, name)
} }
func startFFmpeg(in <-chan []byte, listeningPort int) (ffmpeg *exec.Cmd, err error) { /* func startFFmpeg(in <-chan []byte) (ffmpeg *exec.Cmd, err error) {
ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0", ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0",
// Audio "-an", "-vcodec", "libvpx", "-crf", "10", "-cpu-used", "5", "-b:v", "6000k", "-maxrate", "8000k", "-bufsize", "12000k", // TODO Change bitrate when changing quality
"-vn", "-c:a", "libopus", "-b:a", "96k", "-qmin", "10", "-qmax", "42", "-threads", "4", "-deadline", "1", "-error-resilient", "1",
"-f", "rtp", fmt.Sprintf("rtp://127.0.0.1:%d", listeningPort), "-auto-alt-ref", "1",
// Source "-f", "rtp", "rtp://127.0.0.1:5004",
"-an", "-c:v", "copy", "-vn", "-acodec", "libopus", "-cpu-used", "5", "-deadline", "1", "-qmin", "10", "-qmax", "42", "-error-resilient", "1", "-auto-alt-ref", "1",
"-f", "rtp", fmt.Sprintf("rtp://127.0.0.1:%d", listeningPort+1)} "-f", "rtp", "rtp://127.0.0.1:5005"}
ffmpeg = exec.Command("ffmpeg", ffmpegArgs...) ffmpeg = exec.Command("ffmpeg", ffmpegArgs...)
// Handle errors output // Handle errors output
@@ -161,4 +287,4 @@ func startFFmpeg(in <-chan []byte, listeningPort int) (ffmpeg *exec.Cmd, err err
// Start process // Start process
err = ffmpeg.Start() err = ffmpeg.Start()
return ffmpeg, err return ffmpeg, err
} } */

View File

@@ -8,7 +8,7 @@ import (
"github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3"
"gitlab.crans.org/nounous/ghostream/internal/monitoring" "gitlab.crans.org/nounous/ghostream/internal/monitoring"
"gitlab.crans.org/nounous/ghostream/messaging" "gitlab.crans.org/nounous/ghostream/stream"
) )
// Options holds web package configuration // Options holds web package configuration
@@ -45,10 +45,13 @@ func GetNumberConnectedSessions(streamID string) int {
// newPeerHandler is called when server receive a new session description // newPeerHandler is called when server receive a new session description
// this initiates a WebRTC connection and return server description // this initiates a WebRTC connection and return server description
func newPeerHandler(name string, localSdpChan chan webrtc.SessionDescription, remoteSdp webrtc.SessionDescription, cfg *Options) { func newPeerHandler(localSdpChan chan webrtc.SessionDescription, remoteSdp struct {
StreamID string
RemoteDescription webrtc.SessionDescription
}, cfg *Options) {
// Create media engine using client SDP // Create media engine using client SDP
mediaEngine := webrtc.MediaEngine{} mediaEngine := webrtc.MediaEngine{}
if err := mediaEngine.PopulateFromSDP(remoteSdp); err != nil { if err := mediaEngine.PopulateFromSDP(remoteSdp.RemoteDescription); err != nil {
log.Println("Failed to create new media engine", err) log.Println("Failed to create new media engine", err)
localSdpChan <- webrtc.SessionDescription{} localSdpChan <- webrtc.SessionDescription{}
return return
@@ -75,7 +78,7 @@ func newPeerHandler(name string, localSdpChan chan webrtc.SessionDescription, re
} }
// Create video track // Create video track
codec, payloadType := getPayloadType(mediaEngine, webrtc.RTPCodecTypeVideo, "H264") codec, payloadType := getPayloadType(mediaEngine, webrtc.RTPCodecTypeVideo, "VP8")
videoTrack, err := webrtc.NewTrack(payloadType, rand.Uint32(), "video", "pion", codec) videoTrack, err := webrtc.NewTrack(payloadType, rand.Uint32(), "video", "pion", codec)
if err != nil { if err != nil {
log.Println("Failed to create new video track", err) log.Println("Failed to create new video track", err)
@@ -103,13 +106,13 @@ func newPeerHandler(name string, localSdpChan chan webrtc.SessionDescription, re
} }
// Set the remote SessionDescription // Set the remote SessionDescription
if err = peerConnection.SetRemoteDescription(remoteSdp); err != nil { if err = peerConnection.SetRemoteDescription(remoteSdp.RemoteDescription); err != nil {
log.Println("Failed to set remote description", err) log.Println("Failed to set remote description", err)
localSdpChan <- webrtc.SessionDescription{} localSdpChan <- webrtc.SessionDescription{}
return return
} }
streamID := name streamID := remoteSdp.StreamID
split := strings.SplitN(streamID, "@", 2) split := strings.SplitN(streamID, "@", 2)
streamID = split[0] streamID = split[0]
quality := "source" quality := "source"
@@ -179,7 +182,10 @@ func getPayloadType(m webrtc.MediaEngine, codecType webrtc.RTPCodecType, codecNa
} }
// Serve WebRTC media streaming server // Serve WebRTC media streaming server
func Serve(streams *messaging.Streams, cfg *Options) { func Serve(streams map[string]*stream.Stream, remoteSdpChan chan struct {
StreamID string
RemoteDescription webrtc.SessionDescription
}, localSdpChan chan webrtc.SessionDescription, cfg *Options) {
if !cfg.Enabled { if !cfg.Enabled {
// WebRTC is not enabled, ignore // WebRTC is not enabled, ignore
return return
@@ -187,42 +193,17 @@ func Serve(streams *messaging.Streams, cfg *Options) {
log.Printf("WebRTC server using UDP from port %d to %d", cfg.MinPortUDP, cfg.MaxPortUDP) log.Printf("WebRTC server using UDP from port %d to %d", cfg.MinPortUDP, cfg.MaxPortUDP)
// WebRTC ingested tracks // Allocate memory
videoTracks = make(map[string][]*webrtc.Track) videoTracks = make(map[string][]*webrtc.Track)
audioTracks = make(map[string][]*webrtc.Track) audioTracks = make(map[string][]*webrtc.Track)
// Subscribe to new stream event // Ingest data
event := make(chan string, 8) go autoIngest(streams)
streams.Subscribe(event)
// For each new stream
for name := range event {
// Get stream
stream, err := streams.Get(name)
if err != nil {
log.Printf("Failed to get stream '%s'", name)
}
// Get specific quality
// FIXME: make it possible to forward other qualities
qualityName := "source"
quality, err := stream.GetQuality(qualityName)
if err != nil {
log.Printf("Failed to get quality '%s'", qualityName)
}
// Start forwarding
log.Printf("Starting webrtc for '%s' quality '%s'", name, qualityName)
go ingest(name, quality)
go listenSdp(name, quality.WebRtcLocalSdp, quality.WebRtcRemoteSdp, cfg)
}
}
func listenSdp(name string, localSdp, remoteSdp chan webrtc.SessionDescription, cfg *Options) {
// Handle new connections // Handle new connections
for { for {
// Wait for incoming session description // Wait for incoming session description
// then send the local description to browser // then send the local description to browser
newPeerHandler(name, localSdp, <-remoteSdp, cfg) newPeerHandler(localSdpChan, <-remoteSdpChan, cfg)
} }
} }

View File

@@ -5,19 +5,24 @@ import (
"testing" "testing"
"github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3"
"gitlab.crans.org/nounous/ghostream/messaging" "gitlab.crans.org/nounous/ghostream/stream"
) )
func TestServe(t *testing.T) { func TestServe(t *testing.T) {
// Init streams messaging and WebRTC server // Init streams messaging and WebRTC server
streams := messaging.New() streams := make(map[string]*stream.Stream)
remoteSdpChan := make(chan struct {
StreamID string
RemoteDescription webrtc.SessionDescription
})
localSdpChan := make(chan webrtc.SessionDescription)
cfg := Options{ cfg := Options{
Enabled: true, Enabled: true,
MinPortUDP: 10000, MinPortUDP: 10000,
MaxPortUDP: 10005, MaxPortUDP: 10005,
STUNServers: []string{"stun:stun.l.google.com:19302"}, STUNServers: []string{"stun:stun.l.google.com:19302"},
} }
go Serve(streams, &cfg) go Serve(streams, remoteSdpChan, localSdpChan, &cfg)
// New client connection // New client connection
mediaEngine := webrtc.MediaEngine{} mediaEngine := webrtc.MediaEngine{}
@@ -26,7 +31,7 @@ func TestServe(t *testing.T) {
peerConnection, _ := api.NewPeerConnection(webrtc.Configuration{}) peerConnection, _ := api.NewPeerConnection(webrtc.Configuration{})
// Create video track // Create video track
codec, payloadType := getPayloadType(mediaEngine, webrtc.RTPCodecTypeVideo, "H264") codec, payloadType := getPayloadType(mediaEngine, webrtc.RTPCodecTypeVideo, "VP8")
videoTrack, err := webrtc.NewTrack(payloadType, rand.Uint32(), "video", "pion", codec) videoTrack, err := webrtc.NewTrack(payloadType, rand.Uint32(), "video", "pion", codec)
if err != nil { if err != nil {
t.Error("Failed to create new video track", err) t.Error("Failed to create new video track", err)
@@ -53,6 +58,12 @@ func TestServe(t *testing.T) {
peerConnection.SetLocalDescription(offer) peerConnection.SetLocalDescription(offer)
<-gatherComplete <-gatherComplete
// FIXME: Send offer to server // Send offer to server
remoteSdpChan <- struct {
StreamID string
RemoteDescription webrtc.SessionDescription
}{"demo", *peerConnection.LocalDescription()}
_ = <-localSdpChan
// FIXME: verify connection did work // FIXME: verify connection did work
} }

153
transcoder/audio/audio.go Normal file
View File

@@ -0,0 +1,153 @@
// Package audio transcode a stream to filter the audio
package audio
import (
"bufio"
"fmt"
"github.com/3d0c/gmf"
"io"
"log"
"os/exec"
"strings"
"time"
"gitlab.crans.org/nounous/ghostream/stream"
)
// Options holds audio package configuration
type Options struct {
Enabled bool
Bitrate int
}
// Init text transcoder
func Init(streams map[string]*stream.Stream, cfg *Options) {
if !cfg.Enabled {
// Audio transcode is not enabled, ignore
return
}
// Regulary check existing streams
for {
for sourceName, sourceStream := range streams {
if strings.Contains(sourceName, "@") {
// Not a source stream, pass
continue
}
// Check that the transcoded stream does not already exist
name := sourceName + "@audio"
_, ok := streams[name]
if ok {
// Stream is already transcoded
continue
}
// Start conversion
log.Printf("Starting audio transcode '%s'", name)
st := stream.New()
streams[name] = st
go transcode(sourceStream, st, cfg)
}
// Regulary pull stream list,
// it may be better to tweak the messaging system
// to get an event on a new stream.
time.Sleep(time.Second)
}
}
// Extract audio from stream
func transcode(input, output *stream.Stream, cfg *Options) {
// Start ffmpeg to transcode video to audio
videoInput := make(chan []byte, 1024)
input.Register(videoInput)
ffmpeg, audio, err := startFFmpeg(videoInput, cfg)
if err != nil {
log.Printf("Error while starting ffmpeg: %s", err)
return
}
dataBuff := make([]byte, gmf.IO_BUFFER_SIZE) // UDP MTU
for {
n, err := (*audio).Read(dataBuff)
if err != nil {
log.Printf("An error occurred while reading input: %s", err)
break
}
if n == 0 {
// Stream is finished
break
}
output.Broadcast <- dataBuff[:n]
}
// Stop transcode
_ = ffmpeg.Process.Kill()
_ = (*audio).Close()
}
// Start a ffmpeg instance to convert stream into audio
func startFFmpeg(in <-chan []byte, cfg *Options) (*exec.Cmd, *io.ReadCloser, error) {
// TODO in a future release: remove FFMPEG dependency and transcode directly using the libopus API
// FIXME It seems impossible to get a RTP Packet from standard output.
// We need to find a clean solution, without waiting on UDP listeners.
// FIXME We should also not build RTP packets here.
/* port := 0
var udpListener *net.UDPConn
var err error
for {
port = rand.Intn(65535)
udpListener, err = net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: port})
if err != nil {
if strings.Contains(fmt.Sprintf("%s", err), "address already in use") {
continue
}
return nil, nil, err
}
break
}*/
bitrate := fmt.Sprintf("%dk", cfg.Bitrate)
// Use copy audio codec, assume for now that libopus is used by the streamer
ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0",
"-vn", "-c:a", "copy", "-b:a", bitrate, "-f", "mpegts", "pipe:1"}
ffmpeg := exec.Command("ffmpeg", ffmpegArgs...)
// Handle errors output
errOutput, err := ffmpeg.StderrPipe()
if err != nil {
return nil, nil, err
}
go func() {
scanner := bufio.NewScanner(errOutput)
for scanner.Scan() {
log.Printf("[AUDIO FFMPEG %s] %s", "demo", scanner.Text())
}
}()
// Handle text output
output, err := ffmpeg.StdoutPipe()
if err != nil {
return nil, nil, err
}
// Handle stream input
input, err := ffmpeg.StdinPipe()
if err != nil {
return nil, nil, err
}
go func() {
for data := range in {
_, _ = input.Write(data)
}
}()
// Start process
err = ffmpeg.Start()
return ffmpeg, &output, err
}

View File

@@ -0,0 +1 @@
package audio

View File

@@ -8,8 +8,10 @@ import (
"io" "io"
"log" "log"
"os/exec" "os/exec"
"strings"
"time"
"gitlab.crans.org/nounous/ghostream/messaging" "gitlab.crans.org/nounous/ghostream/stream"
) )
// Options holds text package configuration // Options holds text package configuration
@@ -21,46 +23,45 @@ type Options struct {
} }
// Init text transcoder // Init text transcoder
func Init(streams *messaging.Streams, cfg *Options) { func Init(streams map[string]*stream.Stream, cfg *Options) {
if !cfg.Enabled { if !cfg.Enabled {
// Text transcode is not enabled, ignore // Text transcode is not enabled, ignore
return return
} }
// Subscribe to new stream event // Regulary check existing streams
event := make(chan string, 8) for {
streams.Subscribe(event) for sourceName, sourceStream := range streams {
if strings.Contains(sourceName, "@") {
// For each new stream // Not a source stream, pass
for name := range event { continue
// Get stream
stream, err := streams.Get(name)
if err != nil {
log.Printf("Failed to get stream '%s'", name)
} }
// Get specific quality // Check that the transcoded stream does not already exist
// FIXME: make it possible to forward other qualities name := sourceName + "@text"
qualityName := "source" _, ok := streams[name]
quality, err := stream.GetQuality(qualityName) if ok {
if err != nil { // Stream is already transcoded
log.Printf("Failed to get quality '%s'", qualityName) continue
} }
// Create new text quality // Start conversion
outputQuality, err := stream.CreateQuality("text") log.Printf("Starting text transcode '%s'", name)
if err != nil { st := stream.New()
log.Printf("Failed to create quality 'text': %s", err) streams[name] = st
go transcode(sourceStream, st, cfg)
} }
// Start forwarding // Regulary pull stream list,
log.Printf("Starting text transcoder for '%s' quality '%s'", name, qualityName) // it may be better to tweak the messaging system
go transcode(quality, outputQuality, cfg) // to get an event on a new stream.
time.Sleep(time.Second)
} }
} }
// Convert video to ANSI text // Convert video to ANSI text
func transcode(input, output *messaging.Quality, cfg *Options) { func transcode(input, output *stream.Stream, cfg *Options) {
// Start ffmpeg to transcode video to rawvideo // Start ffmpeg to transcode video to rawvideo
videoInput := make(chan []byte, 1024) videoInput := make(chan []byte, 1024)
input.Register(videoInput) input.Register(videoInput)

View File

@@ -2,16 +2,19 @@
package transcoder package transcoder
import ( import (
"gitlab.crans.org/nounous/ghostream/messaging" "gitlab.crans.org/nounous/ghostream/stream"
"gitlab.crans.org/nounous/ghostream/transcoder/audio"
"gitlab.crans.org/nounous/ghostream/transcoder/text" "gitlab.crans.org/nounous/ghostream/transcoder/text"
) )
// Options holds text package configuration // Options holds text package configuration
type Options struct { type Options struct {
Text text.Options Text text.Options
Audio audio.Options
} }
// Init all transcoders // Init all transcoders
func Init(streams *messaging.Streams, cfg *Options) { func Init(streams map[string]*stream.Stream, cfg *Options) {
go text.Init(streams, &cfg.Text) go text.Init(streams, &cfg.Text)
go audio.Init(streams, &cfg.Audio)
} }

View File

@@ -21,20 +21,61 @@ var (
validPath = regexp.MustCompile("^/[a-z0-9@_-]*$") validPath = regexp.MustCompile("^/[a-z0-9@_-]*$")
) )
// Handle site index and viewer pages // Handle WebRTC session description exchange via POST
func viewerHandler(w http.ResponseWriter, r *http.Request) { func viewerPostHandler(w http.ResponseWriter, r *http.Request) {
// Validation on path // Limit response body to 128KB
if validPath.FindStringSubmatch(r.URL.Path) == nil { r.Body = http.MaxBytesReader(w, r.Body, 131072)
http.NotFound(w, r)
log.Printf("Replied not found on %s", r.URL.Path) // Get stream ID from URL, or from domain name
path := r.URL.Path[1:]
host := r.Host
if strings.Contains(host, ":") {
realHost, _, err := net.SplitHostPort(r.Host)
if err != nil {
log.Printf("Failed to split host and port from %s", r.Host)
return
}
host = realHost
}
host = strings.Replace(host, ".", "-", -1)
if streamID, ok := cfg.MapDomainToStream[host]; ok {
path = streamID
}
// Decode client description
dec := json.NewDecoder(r.Body)
dec.DisallowUnknownFields()
remoteDescription := webrtc.SessionDescription{}
if err := dec.Decode(&remoteDescription); err != nil {
http.Error(w, "The JSON WebRTC offer is malformed", http.StatusBadRequest)
return return
} }
// Check method // Exchange session descriptions with WebRTC stream server
if r.Method != http.MethodGet { remoteSdpChan <- struct {
http.Error(w, "Method not allowed.", http.StatusMethodNotAllowed) StreamID string
RemoteDescription webrtc.SessionDescription
}{StreamID: path, RemoteDescription: remoteDescription}
localDescription := <-localSdpChan
// Send server description as JSON
jsonDesc, err := json.Marshal(localDescription)
if err != nil {
http.Error(w, "An error occurred while formating response", http.StatusInternalServerError)
log.Println("An error occurred while sending session description", err)
return
}
w.Header().Set("Content-Type", "application/json")
_, err = w.Write(jsonDesc)
if err != nil {
log.Println("An error occurred while sending session description", err)
} }
// Increment monitoring
monitoring.WebSessions.Inc()
}
func viewerGetHandler(w http.ResponseWriter, r *http.Request) {
// Get stream ID from URL, or from domain name // Get stream ID from URL, or from domain name
path := r.URL.Path[1:] path := r.URL.Path[1:]
host := r.Host host := r.Host
@@ -81,6 +122,27 @@ func viewerHandler(w http.ResponseWriter, r *http.Request) {
monitoring.WebViewerServed.Inc() monitoring.WebViewerServed.Inc()
} }
// Handle site index and viewer pages
// POST requests are used to exchange WebRTC session descriptions
func viewerHandler(w http.ResponseWriter, r *http.Request) {
// Validation on path
if validPath.FindStringSubmatch(r.URL.Path) == nil {
http.NotFound(w, r)
log.Printf("Replied not found on %s", r.URL.Path)
return
}
// Route depending on HTTP method
switch r.Method {
case http.MethodGet:
viewerGetHandler(w, r)
case http.MethodPost:
viewerPostHandler(w, r)
default:
http.Error(w, "Sorry, only GET and POST methods are supported.", http.StatusBadRequest)
}
}
func staticHandler() http.Handler { func staticHandler() http.Handler {
// Set up static files server // Set up static files server
staticFs := http.FileServer(pkger.Dir("/web/static")) staticFs := http.FileServer(pkger.Dir("/web/static"))
@@ -91,16 +153,19 @@ func statisticsHandler(w http.ResponseWriter, r *http.Request) {
name := strings.SplitN(strings.Replace(r.URL.Path[7:], "/", "", -1), "@", 2)[0] name := strings.SplitN(strings.Replace(r.URL.Path[7:], "/", "", -1), "@", 2)[0]
userCount := 0 userCount := 0
// Get all substreams
for _, outputType := range []string{"", "@720p", "@480p", "@360p", "@240p", "@text"} {
// Get requested stream // Get requested stream
stream, err := streams.Get(name) stream, ok := streams[name+outputType]
if err == nil { if ok {
userCount = stream.ClientCount() // Get number of output channels
userCount += webrtc.GetNumberConnectedSessions(name) userCount += stream.ClientCount()
}
} }
// Display connected users statistics // Display connected users statistics
enc := json.NewEncoder(w) enc := json.NewEncoder(w)
err = enc.Encode(struct{ ConnectedViewers int }{userCount}) err := enc.Encode(struct{ ConnectedViewers int }{userCount})
if err != nil { if err != nil {
http.Error(w, "Failed to generate JSON.", http.StatusInternalServerError) http.Error(w, "Failed to generate JSON.", http.StatusInternalServerError)
log.Printf("Failed to generate JSON: %s", err) log.Printf("Failed to generate JSON: %s", err)

View File

@@ -4,8 +4,6 @@ import (
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"testing" "testing"
"gitlab.crans.org/nounous/ghostream/messaging"
) )
func TestViewerPageGET(t *testing.T) { func TestViewerPageGET(t *testing.T) {
@@ -14,9 +12,6 @@ func TestViewerPageGET(t *testing.T) {
t.Errorf("Failed to load templates: %v", err) t.Errorf("Failed to load templates: %v", err)
} }
// Init streams messaging
streams = messaging.New()
cfg = &Options{} cfg = &Options{}
// Test GET request // Test GET request

View File

@@ -2,7 +2,7 @@
video { video {
display: block; display: block;
flex-grow: 1; flex-grow: 1;
max-width: 100%; width: 100%;
/* Black borders around video */ /* Black borders around video */
background-color: #000; background-color: #000;

View File

@@ -1,29 +0,0 @@
/**
* ViewerCounter show the number of active viewers
*/
export class ViewerCounter {
/**
* @param {HTMLElement} element
* @param {String} streamName
*/
constructor(element, streamName) {
this.element = element;
this.url = "/_stats/" + streamName;
}
/**
* Regulary update counter
*
* @param {Number} updatePeriod
*/
regularUpdate(updatePeriod) {
setInterval(() => this.refreshViewersCounter(), updatePeriod);
}
refreshViewersCounter() {
fetch(this.url)
.then(response => response.json())
.then((data) => this.element.innerText = data.ConnectedViewers)
.catch(console.log);
}
}

View File

@@ -1,99 +0,0 @@
/**
* GsWebRTC to connect to Ghostream
*/
export class GsWebRTC {
/**
* @param {list} stunServers STUN servers
* @param {HTMLElement} viewer Video HTML element
* @param {HTMLElement} connectionIndicator Connection indicator element
*/
constructor(stunServers, viewer, connectionIndicator) {
this.viewer = viewer;
this.connectionIndicator = connectionIndicator;
this.pc = new RTCPeerConnection({
iceServers: [{ urls: stunServers }]
});
// We want to receive audio and video
this.pc.addTransceiver("video", { "direction": "sendrecv" });
this.pc.addTransceiver("audio", { "direction": "sendrecv" });
// Configure events
this.pc.oniceconnectionstatechange = () => this._onConnectionStateChange();
this.pc.ontrack = (e) => this._onTrack(e);
}
/**
* On connection change, log it and change indicator.
* If connection closed or failed, try to reconnect.
*/
_onConnectionStateChange() {
console.log("[WebRTC] ICE connection state changed to " + this.pc.iceConnectionState);
switch (this.pc.iceConnectionState) {
case "disconnected":
this.connectionIndicator.style.fill = "#dc3545";
break;
case "checking":
this.connectionIndicator.style.fill = "#ffc107";
break;
case "connected":
this.connectionIndicator.style.fill = "#28a745";
break;
case "closed":
case "failed":
console.log("[WebRTC] Connection closed, restarting...");
/*peerConnection.close();
peerConnection = null;
setTimeout(startPeerConnection, 1000);*/
break;
}
}
/**
* On new track, add it to the player
* @param {Event} event
*/
_onTrack(event) {
console.log(`[WebRTC] New ${event.track.kind} track`);
if (event.track.kind === "video") {
this.viewer.srcObject = event.streams[0];
}
}
/**
* Create an offer and set local description.
* After that the browser will fire onicecandidate events.
*/
createOffer() {
this.pc.createOffer().then(offer => {
this.pc.setLocalDescription(offer);
console.log("[WebRTC] WebRTC offer created");
}).catch(console.log);
}
/**
* Register a function to call to send local descriptions
* @param {Function} sendFunction Called with a local description to send.
*/
onICECandidate(sendFunction) {
// When candidate is null, ICE layer has run out of potential configurations to suggest
// so let's send the offer to the server.
// FIXME: Send offers progressively to do Trickle ICE
this.pc.onicecandidate = event => {
if (event.candidate === null) {
// Send offer to server
console.log("[WebRTC] Sending session description to server");
sendFunction(this.pc.localDescription);
}
};
}
/**
* Set WebRTC remote description
* After that, the connection will be established and ontrack will be fired.
* @param {RTCSessionDescription} sdp Session description data
*/
setRemoteDescription(sdp) {
this.pc.setRemoteDescription(sdp);
}
}

View File

@@ -1,62 +0,0 @@
/**
* GsWebSocket to do Ghostream signalling
*/
export class GsWebSocket {
constructor() {
const protocol = (window.location.protocol === "https:") ? "wss://" : "ws://";
this.url = protocol + window.location.host + "/_ws/";
// Open WebSocket
this._open();
// Configure events
this.socket.addEventListener("open", () => {
console.log("[WebSocket] Connection established");
});
this.socket.addEventListener("close", () => {
console.log("[WebSocket] Connection closed, retrying connection in 1s...");
setTimeout(() => this._open(), 1000);
});
this.socket.addEventListener("error", () => {
console.log("[WebSocket] Connection errored, retrying connection in 1s...");
setTimeout(() => this._open(), 1000);
});
}
_open() {
console.log(`[WebSocket] Connecting to ${this.url}...`);
this.socket = new WebSocket(this.url);
}
/**
* Send local WebRTC session description to remote.
* @param {SessionDescription} localDescription WebRTC local SDP
* @param {string} stream Name of the stream
* @param {string} quality Requested quality
*/
sendLocalDescription(localDescription, stream, quality) {
if (this.socket.readyState !== 1) {
console.log("[WebSocket] Waiting for connection to send data...");
setTimeout(() => this.sendLocalDescription(localDescription, stream, quality), 100);
return;
}
console.log(`[WebSocket] Sending WebRTC local session description for stream ${stream} quality ${quality}`);
this.socket.send(JSON.stringify({
"webRtcSdp": localDescription,
"stream": stream,
"quality": quality
}));
}
/**
* Set callback function on new remote session description.
* @param {Function} callback Function called when data is received
*/
onRemoteDescription(callback) {
this.socket.addEventListener("message", (event) => {
console.log("[WebSocket] Received WebRTC remote session description");
const sdp = new RTCSessionDescription(JSON.parse(event.data));
callback(sdp);
});
}
}

View File

@@ -0,0 +1,12 @@
// Side widget toggler
const sideWidgetToggle = document.getElementById("sideWidgetToggle")
sideWidgetToggle.addEventListener("click", function () {
const sideWidget = document.getElementById("sideWidget")
if (sideWidget.style.display === "none") {
sideWidget.style.display = "block"
sideWidgetToggle.textContent = "»"
} else {
sideWidget.style.display = "none"
sideWidgetToggle.textContent = "«"
}
})

View File

@@ -0,0 +1,9 @@
document.getElementById("quality").addEventListener("change", (event) => {
console.log(`Stream quality changed to ${event.target.value}`)
// Restart the connection with a new quality
peerConnection.close()
peerConnection = null
streamPath = window.location.href + event.target.value
startPeerConnection()
})

View File

@@ -1,87 +1,97 @@
import { GsWebSocket } from "./modules/websocket.js"; let peerConnection
import { ViewerCounter } from "./modules/viewerCounter.js"; let streamPath = window.location.href
import { GsWebRTC } from "./modules/webrtc.js";
/** startPeerConnection = () => {
* Initialize viewer page // Init peer connection
* peerConnection = new RTCPeerConnection({
* @param {String} stream iceServers: [{ urls: stunServers }]
* @param {List} stunServers })
* @param {Number} viewersCounterRefreshPeriod
*/
export function initViewerPage(stream, stunServers, viewersCounterRefreshPeriod) {
// Viewer element
const viewer = document.getElementById("viewer");
// Default quality // On connection change, change indicator color
let quality = "source"; // if connection failed, restart peer connection
peerConnection.oniceconnectionstatechange = e => {
console.log("ICE connection state changed, " + peerConnection.iceConnectionState)
switch (peerConnection.iceConnectionState) {
case "disconnected":
document.getElementById("connectionIndicator").style.fill = "#dc3545"
break
case "checking":
document.getElementById("connectionIndicator").style.fill = "#ffc107"
break
case "connected":
document.getElementById("connectionIndicator").style.fill = "#28a745"
break
case "closed":
case "failed":
console.log("Connection failed, restarting...")
peerConnection.close()
peerConnection = null
setTimeout(startPeerConnection, 1000)
break
}
}
// Create WebSocket and WebRTC // We want to receive audio and video
const websocket = new GsWebSocket(); peerConnection.addTransceiver('video', { 'direction': 'sendrecv' })
const webrtc = new GsWebRTC( peerConnection.addTransceiver('audio', { 'direction': 'sendrecv' })
stunServers,
viewer, // Create offer and set local description
document.getElementById("connectionIndicator"), peerConnection.createOffer().then(offer => {
); // After setLocalDescription, the browser will fire onicecandidate events
webrtc.createOffer(); peerConnection.setLocalDescription(offer)
webrtc.onICECandidate(localDescription => { }).catch(console.log)
websocket.sendLocalDescription(localDescription, stream, quality);
}); // When candidate is null, ICE layer has run out of potential configurations to suggest
websocket.onRemoteDescription(sdp => { // so let's send the offer to the server
webrtc.setRemoteDescription(sdp); peerConnection.onicecandidate = event => {
}); if (event.candidate === null) {
// Send offer to server
// The server know the stream name from the url
// The server replies with its description
// After setRemoteDescription, the browser will fire ontrack events
console.log("Sending session description to server")
fetch(streamPath, {
method: 'POST',
headers: {
'Accept': 'application/json',
'Content-Type': 'application/json'
},
body: JSON.stringify(peerConnection.localDescription)
})
.then(response => response.json())
.then((data) => peerConnection.setRemoteDescription(new RTCSessionDescription(data)))
.catch(console.log)
}
}
// When video track is received, configure player
peerConnection.ontrack = function (event) {
console.log(`New ${event.track.kind} track`)
if (event.track.kind === "video") {
const viewer = document.getElementById('viewer')
viewer.srcObject = event.streams[0]
}
}
}
// Register keyboard events // Register keyboard events
let viewer = document.getElementById("viewer")
window.addEventListener("keydown", (event) => { window.addEventListener("keydown", (event) => {
switch (event.key) { switch (event.key) {
case "f": case 'f':
// F key put player in fullscreen // F key put player in fullscreen
if (document.fullscreenElement !== null) { if (document.fullscreenElement !== null) {
document.exitFullscreen(); document.exitFullscreen()
} else { } else {
viewer.requestFullscreen(); viewer.requestFullscreen()
} }
break; break
case "m": case 'm':
case " ": case ' ':
// M and space key mute player // M and space key mute player
viewer.muted = !viewer.muted; viewer.muted = !viewer.muted
event.preventDefault(); event.preventDefault()
viewer.play(); viewer.play()
break; break
}
});
// Create viewer counter
const viewerCounter = new ViewerCounter(
document.getElementById("connected-people"),
stream,
);
viewerCounter.regularUpdate(viewersCounterRefreshPeriod);
viewerCounter.refreshViewersCounter();
// Side widget toggler
const sideWidgetToggle = document.getElementById("sideWidgetToggle");
const sideWidget = document.getElementById("sideWidget");
if (sideWidgetToggle !== null && sideWidget !== null) {
// On click, toggle side widget visibility
sideWidgetToggle.addEventListener("click", function () {
if (sideWidget.style.display === "none") {
sideWidget.style.display = "block";
sideWidgetToggle.textContent = "»";
} else {
sideWidget.style.display = "none";
sideWidgetToggle.textContent = "«";
}
});
}
// Video quality toggler
document.getElementById("quality").addEventListener("change", (event) => {
quality = event.target.value;
console.log(`Stream quality changed to ${quality}`);
// Restart WebRTC negociation
webrtc.createOffer();
});
} }
})

View File

@@ -0,0 +1,12 @@
// Refresh viewer count by pulling metric from server
function refreshViewersCounter(streamID, period) {
// Distinguish oneDomainPerStream mode
fetch("/_stats/" + streamID)
.then(response => response.json())
.then((data) => document.getElementById("connected-people").innerText = data.ConnectedViewers)
.catch(console.log)
setTimeout(() => {
refreshViewersCounter(streamID, period)
}, period)
}

View File

@@ -4,7 +4,6 @@
<head> <head>
<meta charset="UTF-8"> <meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
<title>{{if .Path}}{{.Path}} - {{end}}{{.Cfg.Name}}</title> <title>{{if .Path}}{{.Path}} - {{end}}{{.Cfg.Name}}</title>
<link rel="stylesheet" href="static/css/style.css"> <link rel="stylesheet" href="static/css/style.css">
<link rel="stylesheet" href="static/css/player.css"> <link rel="stylesheet" href="static/css/player.css">

View File

@@ -21,7 +21,11 @@
<ul> <ul>
<li> <li>
<b>Serveur :</b> <b>Serveur :</b>
<code>srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}?IDENTIFIANT:MOT_DE_PASS</code>, <code>srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}</code>,
</li>
<li>
<b>Clé de stream :</b>
<code>IDENTIFIANT:MOT_DE_PASSE</code>
avec <code>IDENTIFIANT</code> et <code>MOT_DE_PASSE</code> avec <code>IDENTIFIANT</code> et <code>MOT_DE_PASSE</code>
vos identifiants. vos identifiants.
</li> </li>
@@ -48,81 +52,6 @@
</code> </code>
</p> </p>
<h2>Comment lire un flux depuis un lecteur externe ?</h2>
<p>
À l'heure actuelle, la plupart des lecteurs vidéos ne supportent
pas le protocole SRT, ou le supportent mal. Un travail est en
cours pour les rendre un maximum compatibles. Liste non exhaustive
des lecteurs vidéos testés :
</p>
<h3>FFPlay</h3>
<p>
Si FFMpeg est installé sur votre machine, il est accompagné d'un
lecteur vidéo nommé <code>ffplay</code>. Si FFMpeg est compilé
avec le support de SRT (c'est le cas sur la plupart des distributions,
sauf cas ci-dessous), il vous suffira d'exécuter :
</p>
<p>
<code>
ffplay -fflags nobuffer srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}?streamid=IDENTIFIANT
</code>
</p>
<h3>MPV</h3>
<p>
MPV supporte officiellement SRT depuis le 16 octobre 2020.
Néanmoins, la version stable de MPV est beaucoup plus vieille.
Vous devez alors utiliser une version de développement pour
pouvoir lire un flux SRT depuis MPV. L'installation se fait
depuis <a href="https://mpv.io/installation/"> cette page</a>.
Sous Arch Linux, il vous suffit de récupérer le paquet
<code>mpv-git</code> dans l'AUR. Pour lire le flux, il suffit
d'exécuter :
</p>
<p>
<code>
mpv srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}?streamid=IDENTIFIANT
</code>
</p>
<h3>VLC Media Player</h3>
<p>
Bien que VLC supporte officiellement le protocole SRT,
toutes les options ne sont pas encore implémentées,
notamment l'option pour choisir son stream.
<a href="https://patches.videolan.org/patch/30299/">Un patch</a>
a été soumis et est en attente d'acceptation.
Une fois le patch accepté, il sera appliqué dans les versions
de développement de VLC. Sous Arch Linux, il suffit de récupérer
le paquet <code>vlc-git</code> de l'AUR. Avec un VLC à jour,
il suffit d'exécuter :
</p>
<p>
<code>
vlc srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}?streamid=IDENTIFIANT
</code>
</p>
<p>
Ou bien d'aller dans Média -> Ouvrir un flux réseau et d'entrer l'URL
<code>srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}?streamid=IDENTIFIANT</code>.
</p>
<h3>Le protocole n'existe pas ou n'est pas supporté.</h3>
<p>
La technologie SRT est très récente et n'est pas supportée par
les dépôts stables. Ainsi, si vous avez Ubuntu &le; 20.04 ou
Debian &le; Buster, vous ne pourrez pas utiliser de lecteur vidéo
ni même diffuser avec votre machine. Vous devrez vous mettre à
jour vers Ubuntu 20.10 ou Debian Bullseye.
</p>
<h2>Mentions légales</h2> <h2>Mentions légales</h2>
<p> <p>
Le service de diffusion vidéo du Crans est un service d'hébergement Le service de diffusion vidéo du Crans est un service d'hébergement

View File

@@ -8,10 +8,10 @@
<div class="controls"> <div class="controls">
<span class="control-quality"> <span class="control-quality">
<select id="quality"> <select id="quality">
<option value="source">Source</option> <option value="">Source</option>
<option value="720p">720p</option> <option value="@720p">720p</option>
<option value="480p">480p</option> <option value="@480p">480p</option>
<option value="240p">240p</option> <option value="@240p">240p</option>
</select> </select>
</span> </span>
<code class="control-srt-link">srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}?streamid={{.Path}}</code> <code class="control-srt-link">srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}?streamid={{.Path}}</code>
@@ -34,17 +34,21 @@
{{end}} {{end}}
</div> </div>
<script type="module"> {{if .WidgetURL}}<script src="/static/js/sideWidget.js"></script>{{end}}
import { initViewerPage } from "/static/js/viewer.js"; <script src="/static/js/videoQuality.js"></script>
<script src="/static/js/viewer.js"></script>
// Some variables that need to be fixed by web page <script src="/static/js/viewersCounter.js"></script>
const viewersCounterRefreshPeriod = Number("{{.Cfg.ViewersCounterRefreshPeriod}}"); <script>
const stream = "{{.Path}}";
const stunServers = [ const stunServers = [
{{range $id, $value := .Cfg.STUNServers}} {{range $id, $value := .Cfg.STUNServers}}
"{{$value}}", '{{$value}}',
{{end}} {{end}}
] ]
initViewerPage(stream, stunServers, viewersCounterRefreshPeriod) startPeerConnection()
// Wait a bit before pulling viewers counter for the first time
setTimeout(() => {
refreshViewersCounter("{{.Path}}", {{.Cfg.ViewersCounterRefreshPeriod}})
}, 1000)
</script> </script>
{{end}} {{end}}

View File

@@ -10,7 +10,8 @@ import (
"strings" "strings"
"github.com/markbates/pkger" "github.com/markbates/pkger"
"gitlab.crans.org/nounous/ghostream/messaging" "github.com/pion/webrtc/v3"
"gitlab.crans.org/nounous/ghostream/stream"
) )
// Options holds web package configuration // Options holds web package configuration
@@ -32,11 +33,18 @@ type Options struct {
var ( var (
cfg *Options cfg *Options
// WebRTC session description channels
remoteSdpChan chan struct {
StreamID string
RemoteDescription webrtc.SessionDescription
}
localSdpChan chan webrtc.SessionDescription
// Preload templates // Preload templates
templates *template.Template templates *template.Template
// Streams to get statistics // Streams to get statistics
streams *messaging.Streams streams map[string]*stream.Stream
) )
// Load templates with pkger // Load templates with pkger
@@ -70,8 +78,13 @@ func loadTemplates() error {
} }
// Serve HTTP server // Serve HTTP server
func Serve(s *messaging.Streams, c *Options) { func Serve(s map[string]*stream.Stream, rSdpChan chan struct {
StreamID string
RemoteDescription webrtc.SessionDescription
}, lSdpChan chan webrtc.SessionDescription, c *Options) {
streams = s streams = s
remoteSdpChan = rSdpChan
localSdpChan = lSdpChan
cfg = c cfg = c
if !cfg.Enabled { if !cfg.Enabled {
@@ -88,7 +101,6 @@ func Serve(s *messaging.Streams, c *Options) {
mux := http.NewServeMux() mux := http.NewServeMux()
mux.HandleFunc("/", viewerHandler) mux.HandleFunc("/", viewerHandler)
mux.Handle("/static/", staticHandler()) mux.Handle("/static/", staticHandler())
mux.HandleFunc("/_ws/", websocketHandler)
mux.HandleFunc("/_stats/", statisticsHandler) mux.HandleFunc("/_stats/", statisticsHandler)
log.Printf("HTTP server listening on %s", cfg.ListenAddress) log.Printf("HTTP server listening on %s", cfg.ListenAddress)
log.Fatal(http.ListenAndServe(cfg.ListenAddress, mux)) log.Fatal(http.ListenAndServe(cfg.ListenAddress, mux))

View File

@@ -5,16 +5,16 @@ import (
"testing" "testing"
"time" "time"
"gitlab.crans.org/nounous/ghostream/messaging" "gitlab.crans.org/nounous/ghostream/stream"
) )
// TestHTTPServe tries to serve a real HTTP server and load some pages // TestHTTPServe tries to serve a real HTTP server and load some pages
func TestHTTPServe(t *testing.T) { func TestHTTPServe(t *testing.T) {
// Init streams messaging // Init streams messaging
streams := messaging.New() streams := make(map[string]*stream.Stream)
// Create a disabled web server // Create a disabled web server
go Serve(streams, &Options{Enabled: false, ListenAddress: "127.0.0.1:8081"}) go Serve(streams, nil, nil, &Options{Enabled: false, ListenAddress: "127.0.0.1:8081"})
// Sleep 500ms to ensure that the web server is running, to avoid fails because the request came too early // Sleep 500ms to ensure that the web server is running, to avoid fails because the request came too early
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
@@ -26,7 +26,7 @@ func TestHTTPServe(t *testing.T) {
} }
// Now let's really start the web server // Now let's really start the web server
go Serve(streams, &Options{Enabled: true, ListenAddress: "127.0.0.1:8081"}) go Serve(streams, nil, nil, &Options{Enabled: true, ListenAddress: "127.0.0.1:8081"})
// Sleep 500ms to ensure that the web server is running, to avoid fails because the request came too early // Sleep 500ms to ensure that the web server is running, to avoid fails because the request came too early
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)

View File

@@ -1,68 +0,0 @@
// Package web serves the JavaScript player and WebRTC negotiation
package web
import (
"log"
"net/http"
"github.com/gorilla/websocket"
"gitlab.crans.org/nounous/ghostream/stream/webrtc"
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
// clientDescription is sent by new client
type clientDescription struct {
WebRtcSdp webrtc.SessionDescription
Stream string
Quality string
}
// websocketHandler exchanges WebRTC SDP and viewer count
func websocketHandler(w http.ResponseWriter, r *http.Request) {
// Upgrade client connection to WebSocket
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("Failed to upgrade client to websocket: %s", err)
return
}
for {
// Get client description
c := &clientDescription{}
err = conn.ReadJSON(c)
if err != nil {
log.Printf("Failed to receive client description: %s", err)
_ = conn.Close()
return
}
// Get requested stream
stream, err := streams.Get(c.Stream)
if err != nil {
log.Printf("Stream not found: %s", c.Stream)
continue
}
// Get requested quality
q, err := stream.GetQuality(c.Quality)
if err != nil {
log.Printf("Quality not found: %s", c.Quality)
continue
}
// Exchange session descriptions with WebRTC stream server
// FIXME: Add trickle ICE support
q.WebRtcRemoteSdp <- c.WebRtcSdp
localDescription := <-q.WebRtcLocalSdp
// Send new local description
if err := conn.WriteJSON(localDescription); err != nil {
log.Println(err)
continue
}
}
}