1
0
mirror of https://gitlab.crans.org/nounous/ghostream.git synced 2025-10-25 06:03:05 +02:00

40 Commits

Author SHA1 Message Date
Yohann D'ANELLO
fc3c0f606c Reduce bcrypt rounds for demo password to 10 to displaying the bcrypt process in the CPU profiling 2020-11-07 15:03:42 +01:00
Yohann D'ANELLO
e1bdae5380 Add profiling during debugging 2020-11-07 15:02:09 +01:00
Yohann D'ANELLO
26e7182085 Close websocket handler on error 2020-11-07 15:00:33 +01:00
Yohann D'ANELLO
59c47ca3e0 5 UDP ports are not enough 2020-11-07 15:00:06 +01:00
Yohann D'ANELLO
8eea7d6e4f Upgrade pion 2020-11-07 14:58:15 +01:00
Yohann D'ANELLO
f394af9257 Update main page 2020-10-29 13:43:36 +01:00
Yohann D'ANELLO
5b40aa886f Use random UDP ports to able to have multiple concurrent streams 2020-10-29 13:11:30 +01:00
Yohann D'ANELLO
9fc3d37e72 Update srtgo to don't depend anymore on C library 2020-10-29 13:04:13 +01:00
Yohann D'ANELLO
9e7e1ec0b8 Stream with the H264 codec to have no CPU usage 2020-10-27 19:32:23 +01:00
Alexandre Iooss
cdb56c8bf5 Fix sendLocalDescription retry in web client 2020-10-23 08:11:55 +02:00
Alexandre Iooss
ff2ebd76f1 Make viewer able to change quality 2020-10-22 18:41:14 +02:00
Alexandre Iooss
4cbb1d8192 Better javascript messages 2020-10-22 18:21:42 +02:00
Alexandre Iooss
24478bdc7a Retry message sending when websocket not ready 2020-10-22 13:42:45 +02:00
Alexandre Iooss
0f4c57bcde Cache go modules in CI 2020-10-22 10:38:47 +02:00
erdnaxe
c0820db244 Merge branch 'websocket' into 'dev'
Websocket

See merge request nounous/ghostream!7
2020-10-22 08:26:41 +02:00
Alexandre Iooss
a2a74761bb Parse JSON from server SDP 2020-10-22 08:23:35 +02:00
Alexandre Iooss
ba8bf426e0 Fix JSON decoding 2020-10-22 08:19:01 +02:00
Alexandre Iooss
90d7bd4760 Add package comment in websocket_handler.go 2020-10-21 22:43:28 +02:00
Alexandre Iooss
2928e8ae77 Rename main.js to viewer.js 2020-10-21 22:43:11 +02:00
Alexandre Iooss
e461c0b526 Fix some undefined this in js classes 2020-10-21 22:38:36 +02:00
Alexandre Iooss
9d162b13ed WebRTC JS module 2020-10-21 22:10:39 +02:00
Yohann D'ANELLO
91c4e9d14d Forwarding should not have the re option 2020-10-21 13:34:39 +02:00
Yohann D'ANELLO
5ea8a0913b Add ability to format output URL with the start time of the stream, if the stream is recorded. 2020-10-21 11:47:06 +02:00
Alexandre Iooss
0b3fb87fa2 Working javascript modules 2020-10-20 21:59:07 +02:00
Alexandre Iooss
c88f473ec0 Remove old JS 2020-10-20 21:45:26 +02:00
Alexandre Iooss
11231ceb84 viewerCounter and websocket JS modules 2020-10-20 21:29:41 +02:00
Alexandre Iooss
01efba3e3f Handle websocket 2020-10-20 19:12:15 +02:00
Alexandre Iooss
ac2f87e936 Add HTML viewport 2020-10-20 09:24:56 +02:00
Alexandre Iooss
cd63c93dce Fix web player overflow 2020-10-19 21:52:53 +02:00
Alexandre Iooss
4727b2bf64 Fix typo in outputted 2020-10-19 21:49:37 +02:00
Alexandre Iooss
e1f83a32df Put webrtc SDP inside Quality struct 2020-10-19 21:45:23 +02:00
Alexandre Iooss
e848d92a1a Fix viewer count 2020-10-19 20:05:20 +02:00
Alexandre Iooss
d263f743f7 Update package web with Quality structure 2020-10-19 19:57:04 +02:00
Alexandre Iooss
d03d4fed40 Update package text with Quality structure 2020-10-19 19:52:24 +02:00
Alexandre Iooss
34200afaed Update package webrtc with Quality structure 2020-10-19 19:48:44 +02:00
Alexandre Iooss
340d0447a8 Update package telnet with Quality structure 2020-10-19 19:44:30 +02:00
Alexandre Iooss
069b2155be Update package srt with Quality structure 2020-10-19 19:40:36 +02:00
Alexandre Iooss
c317d91b8d Update package forwarding with Quality structure 2020-10-19 19:28:30 +02:00
Alexandre Iooss
bb589a71ce Add method to get quality 2020-10-19 19:28:04 +02:00
Alexandre Iooss
f825d3d513 New Streams and Quality structures 2020-10-19 19:14:46 +02:00
46 changed files with 1134 additions and 941 deletions

6
.gitignore vendored
View File

@@ -17,3 +17,9 @@ pkged.go
# Profiler and test files
*.prof
*.test
# Javascript tools
.eslintrc.js
node_modules
package.json
package-lock.json

View File

@@ -2,8 +2,18 @@ stages:
- test
- quality-assurance
.go-cache:
variables:
GOPATH: $CI_PROJECT_DIR/.go
before_script:
- mkdir -p .go
cache:
paths:
- .go/pkg/mod/
unit_tests:
image: golang:1.15-alpine
extends: .go-cache
stage: test
before_script:
- apk add --no-cache -X http://dl-cdn.alpinelinux.org/alpine/edge/community build-base ffmpeg gcc libsrt-dev
@@ -18,6 +28,7 @@ unit_tests:
linters:
image: golang:1.15-alpine
extends: .go-cache
stage: quality-assurance
script:
- go get -u golang.org/x/lint/golint

View File

@@ -12,6 +12,6 @@ FROM alpine:3.12
RUN apk add --no-cache -X https://dl-cdn.alpinelinux.org/alpine/edge/community/ ffmpeg libsrt
COPY --from=build_base /code/out/ghostream /app/ghostream
WORKDIR /app
# 2112 for monitoring, 8023 for Telnet, 8080 for Web, 9710 for SRT, 10000-10005 (UDP) for WebRTC
EXPOSE 2112 8023 8080 9710/udp 10000-10005/udp
# 2112 for monitoring, 8023 for Telnet, 8080 for Web, 9710 for SRT, 10000-11000 (UDP) for WebRTC
EXPOSE 2112 8023 8080 9710/udp 10000-11000/udp
CMD ["/app/ghostream"]

View File

@@ -6,7 +6,7 @@ import (
func TestBasicLogin(t *testing.T) {
basicCredentials := make(map[string]string)
basicCredentials["demo"] = "$2b$15$LRnG3eIHFlYIguTxZOLH7eHwbQC/vqjnLq6nDFiHSUDKIU.f5/1H6"
basicCredentials["demo"] = "$2b$10$xuU7XFwmRX2CMgdSaA8rM.4Y8.BtRNzhUedwN0G8tCegDRNUERTCS"
// Test good credentials
backend, _ := New(&Options{Credentials: basicCredentials})

View File

@@ -30,7 +30,7 @@ services:
restart: always
ports:
- 9710:9710/udp
- 10000-10005:10000-10005/udp
- 10000-11000:10000-11000/udp
volumes:
- ./ghostream_data:/etc/ghostream:ro
labels:

View File

@@ -22,12 +22,12 @@ auth:
# Basic backend configuration
# To generate bcrypt hashed password from Python, use:
# python3 -c 'import bcrypt; print(bcrypt.hashpw(b"PASSWORD", bcrypt.gensalt(rounds=15)).decode("ascii"))'
# python3 -c 'import bcrypt; print(bcrypt.hashpw(b"PASSWORD", bcrypt.gensalt(rounds=12)).decode("ascii"))'
#
#basic:
# credentials:
# # Demo user with password "demo"
# demo: $2b$15$LRnG3eIHFlYIguTxZOLH7eHwbQC/vqjnLq6nDFiHSUDKIU.f5/1H6
# demo: $2b$10$xuU7XFwmRX2CMgdSaA8rM.4Y8.BtRNzhUedwN0G8tCegDRNUERTCS
# LDAP backend configuration
#
@@ -38,13 +38,17 @@ auth:
## Stream forwarding ##
# Forward an incoming stream to other servers
# The URL can be anything FFMpeg can accept as an stream output
# If a file is specified, the name may contains %Y, %m, %d, %H, %M or %S
# that will be replaced by the current date information.
forwarding:
# By default nothing is forwarded.
#
# This example forwards a stream named "demo" to Twitch and YouTube,
# and save the record in a timestamped-file,
#demo:
# - rtmp://live-cdg.twitch.tv/app/STREAM_KEY
# - rtmp://a.rtmp.youtube.com/live2/STREAM_KEY
# - /home/ghostream/lives/%name/live-%Y-%m-%d-%H-%M-%S.flv
## Prometheus monitoring ##
# Expose a monitoring endpoint for Prometheus
@@ -166,7 +170,7 @@ webrtc:
# This range must be opened in your firewall.
#
#minPortUDP: 10000
#maxPortUDP: 10005
#maxPortUDP: 11000
# STUN servers, you should host your own Coturn instance
#

11
go.mod
View File

@@ -3,13 +3,14 @@ module gitlab.crans.org/nounous/ghostream
go 1.13
require (
github.com/3d0c/gmf v0.0.0-20200614092945-e58d8d5a6035
github.com/go-ldap/ldap/v3 v3.2.3
github.com/haivision/srtgo v0.0.0-20200731151239-e00427ae473a
github.com/gorilla/websocket v1.4.0
github.com/haivision/srtgo v0.0.0-20201025191851-67964e8f497a
github.com/markbates/pkger v0.17.1
github.com/pion/rtp v1.6.0
github.com/pion/webrtc/v3 v3.0.0-beta.5
github.com/pion/rtp v1.6.1
github.com/pion/webrtc/v3 v3.0.0-beta.10
github.com/pkg/profile v1.5.0
github.com/prometheus/client_golang v1.7.1
github.com/sherifabdlnaby/configuro v0.0.2
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a
)

50
go.sum
View File

@@ -7,8 +7,6 @@ dmitri.shuralyov.com/html/belt v0.0.0-20180602232347-f7d459c86be0/go.mod h1:JLBr
dmitri.shuralyov.com/service/change v0.0.0-20181023043359-a85b471d5412/go.mod h1:a1inKt/atXimZ4Mv927x+r7UpyzRUf4emIoiiSC2TN4=
dmitri.shuralyov.com/state v0.0.0-20180228185332-28bcc343414c/go.mod h1:0PRwlb0D6DFvNNtx+9ybjezNCa8XF0xaYcETyp6rHWU=
git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg=
github.com/3d0c/gmf v0.0.0-20200614092945-e58d8d5a6035 h1:QZb1aMKxiYdGGieyIDmXuw9I9YcGWGViTrpQ6vcZX7Q=
github.com/3d0c/gmf v0.0.0-20200614092945-e58d8d5a6035/go.mod h1:0QMRcUq2JsDECeAq7bj4h79k7XbhtTsrPUQf6G7qfPs=
github.com/Azure/go-ntlmssp v0.0.0-20200615164410-66371956d46c h1:/IBSNwUN8+eKzUzbJPqhK839ygXJ82sde8x3ogr6R28=
github.com/Azure/go-ntlmssp v0.0.0-20200615164410-66371956d46c/go.mod h1:chxPXzSsl7ZWRAuOIE23GDNzjWuZquvFlgA8xmpunjU=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
@@ -115,14 +113,15 @@ github.com/googleapis/gax-go v2.0.0+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk
github.com/googleapis/gax-go/v2 v2.0.3/go.mod h1:LLvjysVCY1JZeum8Z6l8qUty8fiNwE08qbEPm1M08qg=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw=
github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/haivision/srtgo v0.0.0-20200731151239-e00427ae473a h1:JliMkv/mAqM5+QzG6Hkw1XcVl1crU8yIQGnhppMv7s0=
github.com/haivision/srtgo v0.0.0-20200731151239-e00427ae473a/go.mod h1:yVZ4oACfcnUAcxrh+0b6IuIWfkHLK3IAQ99tuuhRx54=
github.com/haivision/srtgo v0.0.0-20201025191851-67964e8f497a h1:54abJQezjMoiP+xMQ3ZQbcDXFjqytAYm/n0EVqrYeXg=
github.com/haivision/srtgo v0.0.0-20201025191851-67964e8f497a/go.mod h1:7izzTiCO3zc9ZIVTFMjxUiYL+kgryFP9rl3bsweqdmc=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
@@ -193,29 +192,30 @@ github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pion/datachannel v1.4.21 h1:3ZvhNyfmxsAqltQrApLPQMhSFNA+aT87RqyCq4OXmf0=
github.com/pion/datachannel v1.4.21/go.mod h1:oiNyP4gHx2DIwRzX/MFyH0Rz/Gz05OgBlayAI2hAWjg=
github.com/pion/dtls/v2 v2.0.2 h1:FHCHTiM182Y8e15aFTiORroiATUI16ryHiQh8AIOJ1E=
github.com/pion/dtls/v2 v2.0.2/go.mod h1:27PEO3MDdaCfo21heT59/vsdmZc0zMt9wQPcSlLu/1I=
github.com/pion/ice/v2 v2.0.6 h1:7Jf3AX6VIjgO2tGRyT0RGGxkDYOF4m5I5DQzf34IN1Y=
github.com/pion/ice/v2 v2.0.6/go.mod h1:xOXvVRlQC/B7FPJeJYKY6IepFRAKb3t1un1K9boYaaQ=
github.com/pion/dtls/v2 v2.0.3 h1:3qQ0s4+TXD00rsllL8g8KQcxAs+Y/Z6oz618RXX6p14=
github.com/pion/dtls/v2 v2.0.3/go.mod h1:TUjyL8bf8LH95h81Xj7kATmzMRt29F/4lxpIPj2Xe4Y=
github.com/pion/ice/v2 v2.0.9 h1:oHbiN6Q9tgb8Gfu3I4cbr5mHRE1uqiuFABQ8CbWjIyk=
github.com/pion/ice/v2 v2.0.9/go.mod h1:NK+o39ynb+N1YSj9fPgWs3vjVcrsWw0KCr/311MqVq8=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
github.com/pion/mdns v0.0.4 h1:O4vvVqr4DGX63vzmO6Fw9vpy3lfztVWHGCQfyw0ZLSY=
github.com/pion/mdns v0.0.4/go.mod h1:R1sL0p50l42S5lJs91oNdUL58nm0QHrhxnSegr++qC0=
github.com/pion/quic v0.1.4 h1:bNz9sCJjlM3GqMdq7Fne57FiWfdyiJ++yHVbuqeoD3Y=
github.com/pion/quic v0.1.4/go.mod h1:dBhNvkLoQqRwfi6h3Vqj3IcPLgiW7rkZxBbRdp7Vzvk=
github.com/pion/randutil v0.0.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/rtcp v1.2.3 h1:2wrhKnqgSz91Q5nzYTO07mQXztYPtxL8a0XOss4rJqA=
github.com/pion/rtcp v1.2.3/go.mod h1:zGhIv0RPRF0Z1Wiij22pUt5W/c9fevqSzT4jje/oK7I=
github.com/pion/rtp v1.6.0 h1:4Ssnl/T5W2LzxHj9ssYpGVEQh3YYhQFNVmSWO88MMwk=
github.com/pion/rtp v1.6.0/go.mod h1:QgfogHsMBVE/RFNno467U/KBqfUywEH+HK+0rtnwsdI=
github.com/pion/rtcp v1.2.4 h1:NT3H5LkUGgaEapvp0HGik+a+CpflRF7KTD7H+o7OWIM=
github.com/pion/rtcp v1.2.4/go.mod h1:52rMNPWFsjr39z9B9MhnkqhPLoeHTv1aN63o/42bWE0=
github.com/pion/rtp v1.6.1 h1:2Y2elcVBrahYnHKN2X7rMHX/r1R4TEBMP1LaVu/wNhk=
github.com/pion/rtp v1.6.1/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
github.com/pion/sctp v1.7.10 h1:o3p3/hZB5Cx12RMGyWmItevJtZ6o2cpuxaw6GOS4x+8=
github.com/pion/sctp v1.7.10/go.mod h1:EhpTUQu1/lcK3xI+eriS6/96fWetHGCvBi9MSsnaBN0=
github.com/pion/sdp/v3 v3.0.1 h1:we4OyeTT6s+6sxrAKy/LVywskx1dzUQlh4xu3b+iAs0=
github.com/pion/sdp/v3 v3.0.1/go.mod h1:bNiSknmJE0HYBprTHXKPQ3+JjacTv5uap92ueJZKsRk=
github.com/pion/srtp v1.5.1 h1:9Q3jAfslYZBt+C69SI/ZcONJh9049JUHZWYRRf5KEKw=
github.com/pion/srtp v1.5.1/go.mod h1:B+QgX5xPeQTNc1CJStJPHzOlHK66ViMDWTT0HZTCkcA=
github.com/pion/sctp v1.7.11 h1:UCnj7MsobLKLuP/Hh+JMiI/6W5Bs/VF45lWKgHFjSIE=
github.com/pion/sctp v1.7.11/go.mod h1:EhpTUQu1/lcK3xI+eriS6/96fWetHGCvBi9MSsnaBN0=
github.com/pion/sdp/v3 v3.0.2 h1:UNnSPVaMM+Pdu/mR9UvAyyo6zkdYbKeuOooCwZvTl/g=
github.com/pion/sdp/v3 v3.0.2/go.mod h1:bNiSknmJE0HYBprTHXKPQ3+JjacTv5uap92ueJZKsRk=
github.com/pion/srtp v1.5.2 h1:25DmvH+fqKZDqvX64vTwnycVwL9ooJxHF/gkX16bDBY=
github.com/pion/srtp v1.5.2/go.mod h1:NiBff/MSxUwMUwx/fRNyD/xGE+dVvf8BOCeXhjCXZ9U=
github.com/pion/stun v0.3.5 h1:uLUCBCkQby4S1cf6CGuR9QrVOKcvUwFeemaC865QHDg=
github.com/pion/stun v0.3.5/go.mod h1:gDMim+47EeEtfWogA37n6qXZS88L5V6LqFcf+DZA2UA=
github.com/pion/transport v0.8.10/go.mod h1:tBmha/UCjpum5hqTWhfAEs3CO4/tHSg0MYRhSzR+CZ8=
@@ -226,12 +226,14 @@ github.com/pion/turn/v2 v2.0.4 h1:oDguhEv2L/4rxwbL9clGLgtzQPjtuZwCdoM7Te8vQVk=
github.com/pion/turn/v2 v2.0.4/go.mod h1:1812p4DcGVbYVBTiraUmP50XoKye++AMkbfp+N27mog=
github.com/pion/udp v0.1.0 h1:uGxQsNyrqG3GLINv36Ff60covYmfrLoxzwnCsIYspXI=
github.com/pion/udp v0.1.0/go.mod h1:BPELIjbwE9PRbd/zxI/KYBnbo7B6+oA6YuEaNE8lths=
github.com/pion/webrtc/v3 v3.0.0-beta.5 h1:A1hApeo77Kp2sHWwd53rIHZZm5W9JXEn//5jHj9+fdc=
github.com/pion/webrtc/v3 v3.0.0-beta.5/go.mod h1:HjQaNPq5SdNazGlwiim6/lWBkZD97mPPyDttr7byNA4=
github.com/pion/webrtc/v3 v3.0.0-beta.10 h1:1aBn9jv/oe4v2Uf47HutWIjg2i2ZP/O7HqpgKPqSuhE=
github.com/pion/webrtc/v3 v3.0.0-beta.10/go.mod h1:GlriYYHJ5KkNsCunm3oFDPql4TDTrrNoI9iSWWSnafA=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/profile v1.5.0 h1:042Buzk+NhDI+DeSAA62RwJL8VAuZUMQZUjCsRz1Mug=
github.com/pkg/profile v1.5.0/go.mod h1:qBsxPvzyUincmltOk6iyRVxHYg4adc0OFOv72ZdLa18=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
@@ -350,6 +352,8 @@ golang.org/x/crypto v0.0.0-20200604202706-70a84ac30bf9 h1:vEg9joUBmeBcK9iSJftGNf
golang.org/x/crypto v0.0.0-20200604202706-70a84ac30bf9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a h1:vclmkQCjlDX5OydZ9wv8rBCcS0QyQY66Mpf/7BZbInM=
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
@@ -385,6 +389,8 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200822124328-c89045814202 h1:VvcQYSHwXgi7W+TpUR6A9g6Up98WAHf3f/ulnJ62IyA=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201002202402-0a1ea396d57c h1:dk0ukUIHmGHqASjP0iue2261isepFCC6XRCSd1nHgDw=
golang.org/x/net v0.0.0-20201002202402-0a1ea396d57c/go.mod h1:iQL9McJNjoIa5mjH6nYTCTZXUN6RP+XW3eib7Ya3XcI=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@@ -417,6 +423,10 @@ golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200610111108-226ff32320da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200926100807-9d91bd62050c h1:38q6VNPWR010vN82/SB121GujZNIfAUb4YttE2rhGuc=
golang.org/x/sys v0.0.0-20200926100807-9d91bd62050c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -446,6 +456,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/api v0.0.0-20180910000450-7ca32eb868bf/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
google.golang.org/api v0.0.0-20181030000543-1d582fd0359e/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
google.golang.org/api v0.1.0/go.mod h1:UGEZY7KEX120AnNLIHFMKIo4obdJhkp2tPbaPlQx13Y=

View File

@@ -2,7 +2,6 @@
package config
import (
"gitlab.crans.org/nounous/ghostream/transcoder/audio"
"net"
"github.com/sherifabdlnaby/configuro"
@@ -60,10 +59,6 @@ func New() *Config {
ListenAddress: ":8023",
},
Transcoder: transcoder.Options{
Audio: audio.Options{
Enabled: true,
Bitrate: 160,
},
Text: text.Options{
Enabled: false,
Width: 80,
@@ -83,7 +78,7 @@ func New() *Config {
},
WebRTC: webrtc.Options{
Enabled: true,
MaxPortUDP: 10005,
MaxPortUDP: 11000,
MinPortUDP: 10000,
STUNServers: []string{"stun:stun.l.google.com:19302"},
},
@@ -122,7 +117,7 @@ func Load() (*Config, error) {
// If no credentials register, add demo account with password "demo"
if len(cfg.Auth.Basic.Credentials) < 1 {
cfg.Auth.Basic.Credentials["demo"] = "$2b$15$LRnG3eIHFlYIguTxZOLH7eHwbQC/vqjnLq6nDFiHSUDKIU.f5/1H6"
cfg.Auth.Basic.Credentials["demo"] = "$2b$10$xuU7XFwmRX2CMgdSaA8rM.4Y8.BtRNzhUedwN0G8tCegDRNUERTCS"
}
return cfg, nil

19
main.go
View File

@@ -7,10 +7,11 @@ package main
import (
"log"
"github.com/pkg/profile"
"gitlab.crans.org/nounous/ghostream/auth"
"gitlab.crans.org/nounous/ghostream/internal/config"
"gitlab.crans.org/nounous/ghostream/internal/monitoring"
"gitlab.crans.org/nounous/ghostream/stream"
"gitlab.crans.org/nounous/ghostream/messaging"
"gitlab.crans.org/nounous/ghostream/stream/forwarding"
"gitlab.crans.org/nounous/ghostream/stream/srt"
"gitlab.crans.org/nounous/ghostream/stream/telnet"
@@ -20,6 +21,9 @@ import (
)
func main() {
// TODO Don't always profile if not needed
defer profile.Start().Stop()
// Configure logger
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
@@ -38,15 +42,8 @@ func main() {
defer authBackend.Close()
}
// WebRTC session description channels
remoteSdpChan := make(chan struct {
StreamID string
RemoteDescription webrtc.SessionDescription
})
localSdpChan := make(chan webrtc.SessionDescription)
// Init streams messaging
streams := make(map[string]*stream.Stream)
streams := messaging.New()
// Start routines
go transcoder.Init(streams, &cfg.Transcoder)
@@ -54,8 +51,8 @@ func main() {
go monitoring.Serve(&cfg.Monitoring)
go srt.Serve(streams, authBackend, &cfg.Srt)
go telnet.Serve(streams, &cfg.Telnet)
go web.Serve(streams, remoteSdpChan, localSdpChan, &cfg.Web)
go webrtc.Serve(streams, remoteSdpChan, localSdpChan, &cfg.WebRTC)
go web.Serve(streams, &cfg.Web)
go webrtc.Serve(streams, &cfg.WebRTC)
// Wait for routines
select {}

89
messaging/quality.go Normal file
View File

@@ -0,0 +1,89 @@
// Package messaging defines a structure to communication between inputs and outputs
package messaging
import (
"sync"
"github.com/pion/webrtc/v3"
)
// Quality holds a specific stream quality.
// It makes packages able to subscribe to an incoming stream.
type Quality struct {
// Incoming data come from this channel
Broadcast chan<- []byte
// Incoming data will be outputted to all those outputs.
// Use a map to be able to delete an item.
outputs map[chan []byte]struct{}
// Mutex to lock outputs map
lockOutputs sync.Mutex
// WebRTC session descriptor exchange.
// When new client connects, a SDP arrives on WebRtcRemoteSdp,
// then webrtc package answers on WebRtcLocalSdp.
WebRtcLocalSdp chan webrtc.SessionDescription
WebRtcRemoteSdp chan webrtc.SessionDescription
}
func newQuality() (q *Quality) {
q = &Quality{}
broadcast := make(chan []byte, 1024)
q.Broadcast = broadcast
q.outputs = make(map[chan []byte]struct{})
q.WebRtcLocalSdp = make(chan webrtc.SessionDescription, 1)
q.WebRtcRemoteSdp = make(chan webrtc.SessionDescription, 1)
go q.run(broadcast)
return q
}
func (q *Quality) run(broadcast <-chan []byte) {
for msg := range broadcast {
q.lockOutputs.Lock()
for output := range q.outputs {
select {
case output <- msg:
default:
// If full, do a ring buffer
// Check that output is not of size zero
if len(output) > 1 {
<-output
}
}
}
q.lockOutputs.Unlock()
}
// Incoming chan has been closed, close all outputs
q.lockOutputs.Lock()
for ch := range q.outputs {
delete(q.outputs, ch)
close(ch)
}
q.lockOutputs.Unlock()
}
// Close the incoming chan, this will also delete all outputs.
func (q *Quality) Close() {
close(q.Broadcast)
}
// Register a new output on a stream.
func (q *Quality) Register(output chan []byte) {
q.lockOutputs.Lock()
q.outputs[output] = struct{}{}
q.lockOutputs.Unlock()
}
// Unregister removes an output.
func (q *Quality) Unregister(output chan []byte) {
// Make sure we did not already close this output
q.lockOutputs.Lock()
_, ok := q.outputs[output]
if ok {
delete(q.outputs, output)
close(output)
}
defer q.lockOutputs.Unlock()
}

84
messaging/stream.go Normal file
View File

@@ -0,0 +1,84 @@
// Package messaging defines a structure to communication between inputs and outputs
package messaging
import (
"errors"
"sync"
)
// Stream makes packages able to subscribe to an incoming stream
type Stream struct {
// Different qualities of this stream
qualities map[string]*Quality
// Mutex to lock outputs map
lockQualities sync.Mutex
// Count clients for statistics
nbClients int
}
func newStream() (s *Stream) {
s = &Stream{}
s.qualities = make(map[string]*Quality)
s.nbClients = 0
return s
}
// Close stream.
func (s *Stream) Close() {
for quality := range s.qualities {
s.DeleteQuality(quality)
}
}
// CreateQuality creates a new quality associated with this stream.
func (s *Stream) CreateQuality(name string) (quality *Quality, err error) {
// If quality already exist, fail
if _, ok := s.qualities[name]; ok {
return nil, errors.New("quality already exists")
}
s.lockQualities.Lock()
quality = newQuality()
s.qualities[name] = quality
s.lockQualities.Unlock()
return quality, nil
}
// DeleteQuality removes a stream quality.
func (s *Stream) DeleteQuality(name string) {
// Make sure we did not already close this output
s.lockQualities.Lock()
if _, ok := s.qualities[name]; ok {
s.qualities[name].Close()
delete(s.qualities, name)
}
s.lockQualities.Unlock()
}
// GetQuality gets a specific stream quality.
func (s *Stream) GetQuality(name string) (quality *Quality, err error) {
s.lockQualities.Lock()
quality, ok := s.qualities[name]
s.lockQualities.Unlock()
if !ok {
return nil, errors.New("quality does not exist")
}
return quality, nil
}
// ClientCount returns the number of clients.
func (s *Stream) ClientCount() int {
return s.nbClients
}
// IncrementClientCount increments the number of clients.
func (s *Stream) IncrementClientCount() {
s.nbClients++
}
// DecrementClientCount decrements the number of clients.
func (s *Stream) DecrementClientCount() {
s.nbClients--
}

98
messaging/streams.go Normal file
View File

@@ -0,0 +1,98 @@
// Package messaging defines a structure to communication between inputs and outputs
package messaging
import (
"errors"
"log"
"sync"
)
// Streams hold all application streams.
type Streams struct {
// Associate each stream name to the stream
streams map[string]*Stream
// Mutex to lock streams
lockStreams sync.Mutex
// Subscribers get notified when a new stream is created
// Use a map to be able to delete a subscriber
eventSubscribers map[chan string]struct{}
// Mutex to lock eventSubscribers
lockSubscribers sync.Mutex
}
// New creates a new stream list.
func New() (l *Streams) {
l = &Streams{}
l.streams = make(map[string]*Stream)
l.eventSubscribers = make(map[chan string]struct{})
return l
}
// Subscribe to get notified on new stream.
func (l *Streams) Subscribe(output chan string) {
l.lockSubscribers.Lock()
l.eventSubscribers[output] = struct{}{}
l.lockSubscribers.Unlock()
}
// Unsubscribe to no longer get notified on new stream.
func (l *Streams) Unsubscribe(output chan string) {
// Make sure we did not already delete this subscriber
l.lockSubscribers.Lock()
if _, ok := l.eventSubscribers[output]; ok {
delete(l.eventSubscribers, output)
}
l.lockSubscribers.Unlock()
}
// Create a new stream.
func (l *Streams) Create(name string) (s *Stream, err error) {
// If stream already exist, fail
if _, ok := l.streams[name]; ok {
return nil, errors.New("stream already exists")
}
// Create stream
s = newStream()
l.lockStreams.Lock()
l.streams[name] = s
l.lockStreams.Unlock()
// Notify
l.lockSubscribers.Lock()
for sub := range l.eventSubscribers {
select {
case sub <- name:
default:
log.Printf("Failed to announce stream '%s' to subscriber", name)
}
}
l.lockSubscribers.Unlock()
return s, nil
}
// Get a stream.
func (l *Streams) Get(name string) (s *Stream, err error) {
// If stream does exist, return it
l.lockStreams.Lock()
s, ok := l.streams[name]
l.lockStreams.Unlock()
if !ok {
return nil, errors.New("stream does not exist")
}
return s, nil
}
// Delete a stream.
func (l *Streams) Delete(name string) {
// Make sure we did not already delete this stream
l.lockStreams.Lock()
if _, ok := l.streams[name]; ok {
l.streams[name].Close()
delete(l.streams, name)
}
l.lockStreams.Unlock()
}

55
messaging/streams_test.go Normal file
View File

@@ -0,0 +1,55 @@
package messaging
import "testing"
func TestWithOneStream(t *testing.T) {
streams := New()
// Subscribe to new streams
event := make(chan string, 8)
streams.Subscribe(event)
// Create a stream
stream, err := streams.Create("demo")
if err != nil {
t.Errorf("Failed to create stream")
}
// Check that we receive the creation event
e := <-event
if e != "demo" {
t.Errorf("Message has wrong content: %s != demo", e)
}
// Create a quality
quality, err := stream.CreateQuality("source")
if err != nil {
t.Errorf("Failed to create quality")
}
// Register one output
output := make(chan []byte, 64)
quality.Register(output)
stream.IncrementClientCount()
// Try to pass one message
quality.Broadcast <- []byte("hello world")
msg := <-output
if string(msg) != "hello world" {
t.Errorf("Message has wrong content: %s != hello world", msg)
}
// Check client count
if count := stream.ClientCount(); count != 1 {
t.Errorf("Client counter returned %d, expected 1", count)
}
// Unregister
quality.Unregister(output)
stream.DecrementClientCount()
// Check client count
if count := stream.ClientCount(); count != 0 {
t.Errorf("Client counter returned %d, expected 0", count)
}
}

View File

@@ -3,11 +3,13 @@ package forwarding
import (
"bufio"
"fmt"
"log"
"os/exec"
"strings"
"time"
"gitlab.crans.org/nounous/ghostream/stream"
"gitlab.crans.org/nounous/ghostream/messaging"
)
// Options to configure the stream forwarding.
@@ -15,43 +17,65 @@ import (
type Options map[string][]string
// Serve handles incoming packets from SRT and forward them to other external services
func Serve(streams map[string]*stream.Stream, cfg Options) {
func Serve(streams *messaging.Streams, cfg Options) {
if len(cfg) < 1 {
// No forwarding, ignore
return
}
// Subscribe to new stream event
event := make(chan string, 8)
streams.Subscribe(event)
log.Printf("Stream forwarding initialized")
for {
for name, st := range streams {
fwdCfg, ok := cfg[name]
// For each new stream
for name := range event {
streamCfg, ok := cfg[name]
if !ok {
// Not configured
continue
}
// Start forwarding
log.Printf("Starting forwarding for '%s'", name)
go forward(st, fwdCfg)
// Get stream
stream, err := streams.Get(name)
if err != nil {
log.Printf("Failed to get stream '%s'", name)
}
// Regulary pull stream list,
// it may be better to tweak the messaging system
// to get an event on a new stream.
time.Sleep(time.Second)
// Get specific quality
// FIXME: make it possible to forward other qualities
qualityName := "source"
quality, err := stream.GetQuality(qualityName)
if err != nil {
log.Printf("Failed to get quality '%s'", qualityName)
}
// Start forwarding
log.Printf("Starting forwarding for '%s' quality '%s'", name, qualityName)
go forward(name, quality, streamCfg)
}
}
// Start a FFMPEG instance and redirect stream output to forwarded streams
func forward(st *stream.Stream, fwdCfg []string) {
func forward(streamName string, q *messaging.Quality, fwdCfg []string) {
output := make(chan []byte, 1024)
st.Register(output)
q.Register(output)
// Launch FFMPEG instance
params := []string{"-hide_banner", "-loglevel", "error", "-re", "-i", "pipe:0"}
params := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0"}
for _, url := range fwdCfg {
// If the url should be date-formatted, replace special characters with the current time information
now := time.Now()
formattedURL := strings.ReplaceAll(url, "%Y", fmt.Sprintf("%04d", now.Year()))
formattedURL = strings.ReplaceAll(formattedURL, "%m", fmt.Sprintf("%02d", now.Month()))
formattedURL = strings.ReplaceAll(formattedURL, "%d", fmt.Sprintf("%02d", now.Day()))
formattedURL = strings.ReplaceAll(formattedURL, "%H", fmt.Sprintf("%02d", now.Hour()))
formattedURL = strings.ReplaceAll(formattedURL, "%M", fmt.Sprintf("%02d", now.Minute()))
formattedURL = strings.ReplaceAll(formattedURL, "%S", fmt.Sprintf("%02d", now.Second()))
formattedURL = strings.ReplaceAll(formattedURL, "%name", streamName)
params = append(params, "-f", "flv", "-preset", "ultrafast", "-tune", "zerolatency",
"-c", "copy", url)
"-c", "copy", formattedURL)
}
ffmpeg := exec.Command("ffmpeg", params...)
@@ -77,14 +101,14 @@ func forward(st *stream.Stream, fwdCfg []string) {
_ = input.Close()
_ = errOutput.Close()
_ = ffmpeg.Process.Kill()
st.Unregister(output)
q.Unregister(output)
}()
// Log standard error output
go func() {
scanner := bufio.NewScanner(errOutput)
for scanner.Scan() {
log.Printf("[FORWARDING FFMPEG] %s", scanner.Text())
log.Printf("[FORWARDING FFMPEG %s] %s", streamName, scanner.Text())
}
}()

View File

@@ -6,7 +6,7 @@ import (
"testing"
"time"
"gitlab.crans.org/nounous/ghostream/stream"
"gitlab.crans.org/nounous/ghostream/messaging"
"gitlab.crans.org/nounous/ghostream/stream/srt"
)
@@ -35,7 +35,7 @@ func TestForwardStream(t *testing.T) {
cfg["demo"] = []string{"rtmp://127.0.0.1:1936/live/app"}
// Register forwarding stream list
streams := make(map[string]*stream.Stream)
streams := messaging.New()
go Serve(streams, cfg)
// Serve SRT Server without authentification backend

View File

@@ -1,99 +0,0 @@
// Package stream defines a structure to communication between inputs and outputs
package stream
import (
"sync"
)
// Stream makes packages able to subscribe to an incoming stream
type Stream struct {
// Incoming data come from this channel
Broadcast chan<- []byte
// Use a map to be able to delete an item
outputs map[chan []byte]struct{}
// Count clients for statistics
nbClients int
// Mutex to lock outputs map
lock sync.Mutex
}
// New creates a new stream.
func New() *Stream {
s := &Stream{}
broadcast := make(chan []byte, 1024)
s.Broadcast = broadcast
s.outputs = make(map[chan []byte]struct{})
s.nbClients = 0
go s.run(broadcast)
return s
}
func (s *Stream) run(broadcast <-chan []byte) {
for msg := range broadcast {
s.lock.Lock()
for output := range s.outputs {
select {
case output <- msg:
default:
// If full, do a ring buffer
// Check that output is not of size zero
if len(output) > 1 {
<-output
}
}
}
s.lock.Unlock()
}
// Incoming chan has been closed, close all outputs
s.lock.Lock()
for ch := range s.outputs {
delete(s.outputs, ch)
close(ch)
}
s.lock.Unlock()
}
// Close the incoming chan, this will also delete all outputs
func (s *Stream) Close() {
close(s.Broadcast)
}
// Register a new output on a stream.
func (s *Stream) Register(output chan []byte) {
s.lock.Lock()
defer s.lock.Unlock()
s.outputs[output] = struct{}{}
}
// Unregister removes an output.
// If hidden in true, then do not count this client.
func (s *Stream) Unregister(output chan []byte) {
s.lock.Lock()
defer s.lock.Unlock()
// Make sure we did not already close this output
_, ok := s.outputs[output]
if ok {
delete(s.outputs, output)
close(output)
}
}
// ClientCount returns the number of clients
func (s *Stream) ClientCount() int {
return s.nbClients
}
// IncrementClientCount increments the number of clients
func (s *Stream) IncrementClientCount() {
s.nbClients++
}
// DecrementClientCount decrements the number of clients
func (s *Stream) DecrementClientCount() {
s.nbClients--
}

View File

@@ -1,42 +0,0 @@
package stream
import (
"testing"
)
func TestWithoutOutputs(t *testing.T) {
stream := New()
defer stream.Close()
stream.Broadcast <- []byte("hello world")
}
func TestWithOneOutput(t *testing.T) {
stream := New()
defer stream.Close()
// Register one output
output := make(chan []byte, 64)
stream.Register(output)
stream.IncrementClientCount()
// Try to pass one message
stream.Broadcast <- []byte("hello world")
msg := <-output
if string(msg) != "hello world" {
t.Errorf("Message has wrong content: %s != hello world", msg)
}
// Check client count
if count := stream.ClientCount(); count != 1 {
t.Errorf("Client counter returned %d, expected 1", count)
}
// Unregister
stream.Unregister(output)
stream.DecrementClientCount()
// Check client count
if count := stream.ClientCount(); count != 0 {
t.Errorf("Client counter returned %d, expected 0", count)
}
}

View File

@@ -5,21 +5,26 @@ import (
"log"
"github.com/haivision/srtgo"
"gitlab.crans.org/nounous/ghostream/stream"
"gitlab.crans.org/nounous/ghostream/messaging"
)
func handleStreamer(socket *srtgo.SrtSocket, streams map[string]*stream.Stream, name string) {
// Check stream does not exist
if _, ok := streams[name]; ok {
log.Print("Stream already exists, refusing new streamer")
func handleStreamer(socket *srtgo.SrtSocket, streams *messaging.Streams, name string) {
// Create stream
stream, err := streams.Create(name)
if err != nil {
log.Printf("Error on stream creating: %s", err)
socket.Close()
return
}
// Create stream
log.Printf("New SRT streamer for stream %s", name)
st := stream.New()
streams[name] = st
// Create source quality
q, err := stream.CreateQuality("source")
if err != nil {
log.Printf("Error on quality creating: %s", err)
socket.Close()
return
}
log.Printf("New SRT streamer for stream '%s' quality 'source'", name)
// Read RTP packets forever and send them to the WebRTC Client
for {
@@ -42,29 +47,38 @@ func handleStreamer(socket *srtgo.SrtSocket, streams map[string]*stream.Stream,
// Send raw data to other streams
buff = buff[:n]
st.Broadcast <- buff
q.Broadcast <- buff
}
// Close stream
st.Close()
streams.Delete(name)
socket.Close()
delete(streams, name)
}
func handleViewer(s *srtgo.SrtSocket, streams map[string]*stream.Stream, name string) {
log.Printf("New SRT viewer for stream %s", name)
func handleViewer(socket *srtgo.SrtSocket, streams *messaging.Streams, name string) {
// Get requested stream
st, ok := streams[name]
if !ok {
log.Println("Stream does not exist, refusing new viewer")
stream, err := streams.Get(name)
if err != nil {
log.Printf("Failed to get stream: %s", err)
socket.Close()
return
}
// Get requested quality
// FIXME: make qualities available
qualityName := "source"
q, err := stream.GetQuality(qualityName)
if err != nil {
log.Printf("Failed to get quality: %s", err)
socket.Close()
return
}
log.Printf("New SRT viewer for stream %s quality %s", name, qualityName)
// Register new output
c := make(chan []byte, 1024)
st.Register(c)
st.IncrementClientCount()
q.Register(c)
stream.IncrementClientCount()
// Receive data and send them
for data := range c {
@@ -74,7 +88,7 @@ func handleViewer(s *srtgo.SrtSocket, streams map[string]*stream.Stream, name st
}
// Send data
_, err := s.Write(data, 1000)
_, err := socket.Write(data, 1000)
if err != nil {
log.Printf("Remove SRT viewer because of sending error, %s", err)
break
@@ -82,7 +96,7 @@ func handleViewer(s *srtgo.SrtSocket, streams map[string]*stream.Stream, name st
}
// Close output
st.Unregister(c)
st.DecrementClientCount()
s.Close()
q.Unregister(c)
stream.DecrementClientCount()
socket.Close()
}

View File

@@ -1,9 +1,6 @@
// Package srt serves a SRT server
package srt
// #include <srt/srt.h>
import "C"
import (
"log"
"net"
@@ -12,7 +9,7 @@ import (
"github.com/haivision/srtgo"
"gitlab.crans.org/nounous/ghostream/auth"
"gitlab.crans.org/nounous/ghostream/stream"
"gitlab.crans.org/nounous/ghostream/messaging"
)
// Options holds web package configuration
@@ -39,7 +36,7 @@ func splitHostPort(hostport string) (string, uint16, error) {
}
// Serve SRT server
func Serve(streams map[string]*stream.Stream, authBackend auth.Backend, cfg *Options) {
func Serve(streams *messaging.Streams, authBackend auth.Backend, cfg *Options) {
if !cfg.Enabled {
// SRT is not enabled, ignore
return
@@ -62,7 +59,7 @@ func Serve(streams map[string]*stream.Stream, authBackend auth.Backend, cfg *Opt
for {
// Wait for new connection
s, err := sck.Accept()
s, _, err := sck.Accept()
if err != nil {
// Something wrong happened
log.Println(err)
@@ -73,7 +70,7 @@ func Serve(streams map[string]*stream.Stream, authBackend auth.Backend, cfg *Opt
// Without this, the SRT buffer might get full before reading it
// streamid can be "name:password" for streamer or "name" for viewer
streamID, err := s.GetSockOptString(C.SRTO_STREAMID)
streamID, err := s.GetSockOptString(srtgo.SRTO_STREAMID)
if err != nil {
log.Print("Failed to get socket streamid")
continue

View File

@@ -6,7 +6,7 @@ import (
"testing"
"time"
"gitlab.crans.org/nounous/ghostream/stream"
"gitlab.crans.org/nounous/ghostream/messaging"
)
// TestSplitHostPort Try to split a host like 127.0.0.1:1234 in host, port (127.0.0.1, 1234à
@@ -58,7 +58,7 @@ func TestServeSRT(t *testing.T) {
}
// Init streams messaging and SRT server
streams := make(map[string]*stream.Stream)
streams := messaging.New()
go Serve(streams, nil, &Options{Enabled: true, ListenAddress: ":9711", MaxClients: 2})
ffmpeg := exec.Command("ffmpeg", "-hide_banner", "-loglevel", "error",

View File

@@ -7,7 +7,7 @@ import (
"strings"
"time"
"gitlab.crans.org/nounous/ghostream/stream"
"gitlab.crans.org/nounous/ghostream/messaging"
)
// Options holds telnet package configuration
@@ -17,7 +17,7 @@ type Options struct {
}
// Serve Telnet server
func Serve(streams map[string]*stream.Stream, cfg *Options) {
func Serve(streams *messaging.Streams, cfg *Options) {
if !cfg.Enabled {
// Telnet is not enabled, ignore
return
@@ -32,17 +32,17 @@ func Serve(streams map[string]*stream.Stream, cfg *Options) {
// Handle each new client
for {
s, err := listener.Accept()
socket, err := listener.Accept()
if err != nil {
log.Printf("Error while accepting TCP socket: %s", s)
log.Printf("Error while accepting TCP socket: %s", err)
continue
}
go handleViewer(s, streams, cfg)
go handleViewer(socket, streams, cfg)
}
}
func handleViewer(s net.Conn, streams map[string]*stream.Stream, cfg *Options) {
func handleViewer(s net.Conn, streams *messaging.Streams, cfg *Options) {
// Prompt user about stream name
if _, err := s.Write([]byte("[GHOSTREAM]\nEnter stream name: ")); err != nil {
log.Printf("Error while writing to TCP socket: %s", err)
@@ -56,7 +56,7 @@ func handleViewer(s net.Conn, streams map[string]*stream.Stream, cfg *Options) {
s.Close()
return
}
name := strings.TrimSpace(string(buff[:n])) + "@text"
name := strings.TrimSpace(string(buff[:n]))
if len(name) < 1 {
// Too short, exit
s.Close()
@@ -67,9 +67,9 @@ func handleViewer(s net.Conn, streams map[string]*stream.Stream, cfg *Options) {
time.Sleep(time.Second)
// Get requested stream
st, ok := streams[name]
if !ok {
log.Println("Stream does not exist, kicking new Telnet viewer")
stream, err := streams.Get(name)
if err != nil {
log.Printf("Kicking new Telnet viewer: %s", err)
if _, err := s.Write([]byte("This stream is inactive.\n")); err != nil {
log.Printf("Error while writing to TCP socket: %s", err)
}
@@ -77,11 +77,23 @@ func handleViewer(s net.Conn, streams map[string]*stream.Stream, cfg *Options) {
return
}
// Get requested quality
qualityName := "text"
q, err := stream.GetQuality(qualityName)
if err != nil {
log.Printf("Kicking new Telnet viewer: %s", err)
if _, err := s.Write([]byte("This stream is not converted to text.\n")); err != nil {
log.Printf("Error while writing to TCP socket: %s", err)
}
s.Close()
return
}
log.Printf("New Telnet viewer for stream %s quality %s", name, qualityName)
// Register new client
log.Printf("New Telnet viewer for stream '%s'", name)
c := make(chan []byte, 128)
st.Register(c)
st.IncrementClientCount()
q.Register(c)
stream.IncrementClientCount()
// Hide terminal cursor
if _, err = s.Write([]byte("\033[?25l")); err != nil {
@@ -106,7 +118,7 @@ func handleViewer(s net.Conn, streams map[string]*stream.Stream, cfg *Options) {
}
// Close output
st.Unregister(c)
st.DecrementClientCount()
q.Unregister(c)
stream.DecrementClientCount()
s.Close()
}

View File

@@ -3,13 +3,13 @@ package telnet
import (
"testing"
"gitlab.crans.org/nounous/ghostream/stream"
"gitlab.crans.org/nounous/ghostream/messaging"
)
// TestTelnetOutput creates a TCP client that connects to the server and get one image.
func TestTelnetOutput(t *testing.T) {
// Try to start Telnet server while it is disabled
streams := make(map[string]*stream.Stream)
streams := messaging.New()
go Serve(streams, &Options{Enabled: false})
// FIXME test connect

View File

@@ -2,166 +2,40 @@
package webrtc
import (
"github.com/3d0c/gmf"
"bufio"
"fmt"
"log"
"math/rand"
"net"
"os/exec"
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
"gitlab.crans.org/nounous/ghostream/stream"
"log"
"net"
"strings"
"time"
"gitlab.crans.org/nounous/ghostream/messaging"
)
var (
activeStream map[string]struct{}
)
func autoIngest(streams map[string]*stream.Stream) {
// Regulary check existing streams
activeStream = make(map[string]struct{})
for {
for name, st := range streams {
if strings.Contains(name, "@") {
// Not a source stream, pass
continue
}
if _, ok := activeStream[name]; ok {
// Stream is already ingested
continue
}
// Start ingestion
log.Printf("Starting webrtc for '%s'", name)
// FIXME Ensure that the audio stream exist, but that's poop code
time.Sleep(time.Second)
go ingest(name, st, streams[name+"@audio"])
}
// Regulary pull stream list,
// it may be better to tweak the messaging system
// to get an event on a new stream.
time.Sleep(time.Second)
}
}
func ingest(name string, input *stream.Stream, audio *stream.Stream) {
func ingest(name string, q *messaging.Quality) {
// Register to get stream
videoInput := make(chan []byte, 1024)
input.Register(videoInput)
audioInput := make(chan []byte, 1024)
audio.Register(audioInput)
activeStream[name] = struct{}{}
q.Register(videoInput)
inputCtx := gmf.NewCtx()
avioInputCtx, _ := gmf.NewAVIOContext(inputCtx, &gmf.AVIOHandlers{ReadPacket: func() ([]byte, int) {
data := <-audioInput
return data, len(data)
}})
log.Println("Open input")
inputCtx.SetPb(avioInputCtx).OpenInput("")
log.Println("Opened")
defer inputCtx.CloseInput()
defer avioInputCtx.Release()
// FIXME Mux into RTP without having multiple UDP listeners
firstPort := int(rand.Int31n(63535)) + 2000
if audioTracks[name] == nil {
audioTracks[name] = make([]*webrtc.Track, 0)
}
udpListener, _ := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 1234})
outputCtx, _ := gmf.NewOutputCtxWithFormatName("rtp://127.0.0.1:1234", "rtp")
avioOutputCtx, _ := gmf.NewAVIOContext(outputCtx, &gmf.AVIOHandlers{WritePacket: func(data []byte) int {
n := len(data)
log.Printf("Read %d bytes", n)
return n
}})
// FIXME DON'T RAN AN UDP LISTENER, PLIZ GET DIRECTLY UDP PACKETS, WHY IS IT SO COMPLICATED????
// outputCtx.SetPb(avioOutputCtx)
defer outputCtx.CloseOutput()
defer avioOutputCtx.Release()
log.Printf("%d streams", inputCtx.StreamsCnt())
for i := 0; i < inputCtx.StreamsCnt(); i++ {
srcStream, err := inputCtx.GetStream(i)
if err != nil {
log.Println("GetStream error")
}
outputCtx.AddStreamWithCodeCtx(srcStream.CodecCtx())
}
outputCtx.Dump()
if err := outputCtx.WriteHeader(); err != nil {
log.Printf("Unable to write RTP header: %s", err)
}
// Receive audio data
go func() {
buff := make([]byte, 1500)
for {
n, _ := udpListener.Read(buff)
if n == 0 {
return
}
packet := &rtp.Packet{}
if err := packet.Unmarshal(buff[:n]); err != nil {
log.Printf("Failed to unmarshal RTP srtPacket: %s", err)
continue
}
if audioTracks[name] == nil {
audioTracks[name] = make([]*webrtc.Track, 0)
}
// Write RTP srtPacket to all audio tracks
// Adapt payload and SSRC to match destination
for _, audioTrack := range audioTracks[name] {
packet.Header.PayloadType = audioTrack.PayloadType()
packet.Header.SSRC = audioTrack.SSRC()
if writeErr := audioTrack.WriteRTP(packet); writeErr != nil {
log.Printf("Failed to write to audio track: %s", writeErr)
continue
}
}
}
}()
first := false
for packet := range inputCtx.GetNewPackets() {
if first { //if read from rtsp ,the first packets is wrong.
if err := outputCtx.WritePacketNoBuffer(packet); err != nil {
log.Printf("Error while writing packet: %s", err)
}
}
first = true
packet.Free()
}
select {}
// TODO Register to all substreams and make RTP packets. Don't transcode in this package.
/* // Open a UDP Listener for RTP Packets on port 5004
videoListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5004})
// Open UDP listeners for RTP Packets
audioListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: firstPort})
if err != nil {
log.Printf("Faited to open UDP listener %s", err)
return
}
audioListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5005})
videoListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: firstPort + 1})
if err != nil {
log.Printf("Faited to open UDP listener %s", err)
return
}
// Start ffmpag to convert videoInput to video and audio UDP
ffmpeg, err := startFFmpeg(videoInput)
ffmpeg, err := startFFmpeg(videoInput, firstPort)
if err != nil {
log.Printf("Error while starting ffmpeg: %s", err)
return
@@ -242,18 +116,18 @@ func ingest(name string, input *stream.Stream, audio *stream.Stream) {
}
if err = audioListener.Close(); err != nil {
log.Printf("Faited to close UDP listener: %s", err)
}*/
delete(activeStream, name)
}
q.Unregister(videoInput)
}
/* func startFFmpeg(in <-chan []byte) (ffmpeg *exec.Cmd, err error) {
func startFFmpeg(in <-chan []byte, listeningPort int) (ffmpeg *exec.Cmd, err error) {
ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0",
"-an", "-vcodec", "libvpx", "-crf", "10", "-cpu-used", "5", "-b:v", "6000k", "-maxrate", "8000k", "-bufsize", "12000k", // TODO Change bitrate when changing quality
"-qmin", "10", "-qmax", "42", "-threads", "4", "-deadline", "1", "-error-resilient", "1",
"-auto-alt-ref", "1",
"-f", "rtp", "rtp://127.0.0.1:5004",
"-vn", "-acodec", "libopus", "-cpu-used", "5", "-deadline", "1", "-qmin", "10", "-qmax", "42", "-error-resilient", "1", "-auto-alt-ref", "1",
"-f", "rtp", "rtp://127.0.0.1:5005"}
// Audio
"-vn", "-c:a", "libopus", "-b:a", "96k",
"-f", "rtp", fmt.Sprintf("rtp://127.0.0.1:%d", listeningPort),
// Source
"-an", "-c:v", "copy",
"-f", "rtp", fmt.Sprintf("rtp://127.0.0.1:%d", listeningPort+1)}
ffmpeg = exec.Command("ffmpeg", ffmpegArgs...)
// Handle errors output
@@ -287,4 +161,4 @@ func ingest(name string, input *stream.Stream, audio *stream.Stream) {
// Start process
err = ffmpeg.Start()
return ffmpeg, err
} */
}

View File

@@ -8,7 +8,7 @@ import (
"github.com/pion/webrtc/v3"
"gitlab.crans.org/nounous/ghostream/internal/monitoring"
"gitlab.crans.org/nounous/ghostream/stream"
"gitlab.crans.org/nounous/ghostream/messaging"
)
// Options holds web package configuration
@@ -45,13 +45,10 @@ func GetNumberConnectedSessions(streamID string) int {
// newPeerHandler is called when server receive a new session description
// this initiates a WebRTC connection and return server description
func newPeerHandler(localSdpChan chan webrtc.SessionDescription, remoteSdp struct {
StreamID string
RemoteDescription webrtc.SessionDescription
}, cfg *Options) {
func newPeerHandler(name string, localSdpChan chan webrtc.SessionDescription, remoteSdp webrtc.SessionDescription, cfg *Options) {
// Create media engine using client SDP
mediaEngine := webrtc.MediaEngine{}
if err := mediaEngine.PopulateFromSDP(remoteSdp.RemoteDescription); err != nil {
if err := mediaEngine.PopulateFromSDP(remoteSdp); err != nil {
log.Println("Failed to create new media engine", err)
localSdpChan <- webrtc.SessionDescription{}
return
@@ -78,7 +75,7 @@ func newPeerHandler(localSdpChan chan webrtc.SessionDescription, remoteSdp struc
}
// Create video track
codec, payloadType := getPayloadType(mediaEngine, webrtc.RTPCodecTypeVideo, "VP8")
codec, payloadType := getPayloadType(mediaEngine, webrtc.RTPCodecTypeVideo, "H264")
videoTrack, err := webrtc.NewTrack(payloadType, rand.Uint32(), "video", "pion", codec)
if err != nil {
log.Println("Failed to create new video track", err)
@@ -106,13 +103,13 @@ func newPeerHandler(localSdpChan chan webrtc.SessionDescription, remoteSdp struc
}
// Set the remote SessionDescription
if err = peerConnection.SetRemoteDescription(remoteSdp.RemoteDescription); err != nil {
if err = peerConnection.SetRemoteDescription(remoteSdp); err != nil {
log.Println("Failed to set remote description", err)
localSdpChan <- webrtc.SessionDescription{}
return
}
streamID := remoteSdp.StreamID
streamID := name
split := strings.SplitN(streamID, "@", 2)
streamID = split[0]
quality := "source"
@@ -182,10 +179,7 @@ func getPayloadType(m webrtc.MediaEngine, codecType webrtc.RTPCodecType, codecNa
}
// Serve WebRTC media streaming server
func Serve(streams map[string]*stream.Stream, remoteSdpChan chan struct {
StreamID string
RemoteDescription webrtc.SessionDescription
}, localSdpChan chan webrtc.SessionDescription, cfg *Options) {
func Serve(streams *messaging.Streams, cfg *Options) {
if !cfg.Enabled {
// WebRTC is not enabled, ignore
return
@@ -193,17 +187,42 @@ func Serve(streams map[string]*stream.Stream, remoteSdpChan chan struct {
log.Printf("WebRTC server using UDP from port %d to %d", cfg.MinPortUDP, cfg.MaxPortUDP)
// Allocate memory
// WebRTC ingested tracks
videoTracks = make(map[string][]*webrtc.Track)
audioTracks = make(map[string][]*webrtc.Track)
// Ingest data
go autoIngest(streams)
// Subscribe to new stream event
event := make(chan string, 8)
streams.Subscribe(event)
// For each new stream
for name := range event {
// Get stream
stream, err := streams.Get(name)
if err != nil {
log.Printf("Failed to get stream '%s'", name)
}
// Get specific quality
// FIXME: make it possible to forward other qualities
qualityName := "source"
quality, err := stream.GetQuality(qualityName)
if err != nil {
log.Printf("Failed to get quality '%s'", qualityName)
}
// Start forwarding
log.Printf("Starting webrtc for '%s' quality '%s'", name, qualityName)
go ingest(name, quality)
go listenSdp(name, quality.WebRtcLocalSdp, quality.WebRtcRemoteSdp, cfg)
}
}
func listenSdp(name string, localSdp, remoteSdp chan webrtc.SessionDescription, cfg *Options) {
// Handle new connections
for {
// Wait for incoming session description
// then send the local description to browser
newPeerHandler(localSdpChan, <-remoteSdpChan, cfg)
newPeerHandler(name, localSdp, <-remoteSdp, cfg)
}
}

View File

@@ -5,24 +5,19 @@ import (
"testing"
"github.com/pion/webrtc/v3"
"gitlab.crans.org/nounous/ghostream/stream"
"gitlab.crans.org/nounous/ghostream/messaging"
)
func TestServe(t *testing.T) {
// Init streams messaging and WebRTC server
streams := make(map[string]*stream.Stream)
remoteSdpChan := make(chan struct {
StreamID string
RemoteDescription webrtc.SessionDescription
})
localSdpChan := make(chan webrtc.SessionDescription)
streams := messaging.New()
cfg := Options{
Enabled: true,
MinPortUDP: 10000,
MaxPortUDP: 10005,
STUNServers: []string{"stun:stun.l.google.com:19302"},
}
go Serve(streams, remoteSdpChan, localSdpChan, &cfg)
go Serve(streams, &cfg)
// New client connection
mediaEngine := webrtc.MediaEngine{}
@@ -31,7 +26,7 @@ func TestServe(t *testing.T) {
peerConnection, _ := api.NewPeerConnection(webrtc.Configuration{})
// Create video track
codec, payloadType := getPayloadType(mediaEngine, webrtc.RTPCodecTypeVideo, "VP8")
codec, payloadType := getPayloadType(mediaEngine, webrtc.RTPCodecTypeVideo, "H264")
videoTrack, err := webrtc.NewTrack(payloadType, rand.Uint32(), "video", "pion", codec)
if err != nil {
t.Error("Failed to create new video track", err)
@@ -58,12 +53,6 @@ func TestServe(t *testing.T) {
peerConnection.SetLocalDescription(offer)
<-gatherComplete
// Send offer to server
remoteSdpChan <- struct {
StreamID string
RemoteDescription webrtc.SessionDescription
}{"demo", *peerConnection.LocalDescription()}
_ = <-localSdpChan
// FIXME: Send offer to server
// FIXME: verify connection did work
}

View File

@@ -1,153 +0,0 @@
// Package audio transcode a stream to filter the audio
package audio
import (
"bufio"
"fmt"
"github.com/3d0c/gmf"
"io"
"log"
"os/exec"
"strings"
"time"
"gitlab.crans.org/nounous/ghostream/stream"
)
// Options holds audio package configuration
type Options struct {
Enabled bool
Bitrate int
}
// Init text transcoder
func Init(streams map[string]*stream.Stream, cfg *Options) {
if !cfg.Enabled {
// Audio transcode is not enabled, ignore
return
}
// Regulary check existing streams
for {
for sourceName, sourceStream := range streams {
if strings.Contains(sourceName, "@") {
// Not a source stream, pass
continue
}
// Check that the transcoded stream does not already exist
name := sourceName + "@audio"
_, ok := streams[name]
if ok {
// Stream is already transcoded
continue
}
// Start conversion
log.Printf("Starting audio transcode '%s'", name)
st := stream.New()
streams[name] = st
go transcode(sourceStream, st, cfg)
}
// Regulary pull stream list,
// it may be better to tweak the messaging system
// to get an event on a new stream.
time.Sleep(time.Second)
}
}
// Extract audio from stream
func transcode(input, output *stream.Stream, cfg *Options) {
// Start ffmpeg to transcode video to audio
videoInput := make(chan []byte, 1024)
input.Register(videoInput)
ffmpeg, audio, err := startFFmpeg(videoInput, cfg)
if err != nil {
log.Printf("Error while starting ffmpeg: %s", err)
return
}
dataBuff := make([]byte, gmf.IO_BUFFER_SIZE) // UDP MTU
for {
n, err := (*audio).Read(dataBuff)
if err != nil {
log.Printf("An error occurred while reading input: %s", err)
break
}
if n == 0 {
// Stream is finished
break
}
output.Broadcast <- dataBuff[:n]
}
// Stop transcode
_ = ffmpeg.Process.Kill()
_ = (*audio).Close()
}
// Start a ffmpeg instance to convert stream into audio
func startFFmpeg(in <-chan []byte, cfg *Options) (*exec.Cmd, *io.ReadCloser, error) {
// TODO in a future release: remove FFMPEG dependency and transcode directly using the libopus API
// FIXME It seems impossible to get a RTP Packet from standard output.
// We need to find a clean solution, without waiting on UDP listeners.
// FIXME We should also not build RTP packets here.
/* port := 0
var udpListener *net.UDPConn
var err error
for {
port = rand.Intn(65535)
udpListener, err = net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: port})
if err != nil {
if strings.Contains(fmt.Sprintf("%s", err), "address already in use") {
continue
}
return nil, nil, err
}
break
}*/
bitrate := fmt.Sprintf("%dk", cfg.Bitrate)
// Use copy audio codec, assume for now that libopus is used by the streamer
ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0",
"-vn", "-c:a", "copy", "-b:a", bitrate, "-f", "mpegts", "pipe:1"}
ffmpeg := exec.Command("ffmpeg", ffmpegArgs...)
// Handle errors output
errOutput, err := ffmpeg.StderrPipe()
if err != nil {
return nil, nil, err
}
go func() {
scanner := bufio.NewScanner(errOutput)
for scanner.Scan() {
log.Printf("[AUDIO FFMPEG %s] %s", "demo", scanner.Text())
}
}()
// Handle text output
output, err := ffmpeg.StdoutPipe()
if err != nil {
return nil, nil, err
}
// Handle stream input
input, err := ffmpeg.StdinPipe()
if err != nil {
return nil, nil, err
}
go func() {
for data := range in {
_, _ = input.Write(data)
}
}()
// Start process
err = ffmpeg.Start()
return ffmpeg, &output, err
}

View File

@@ -1 +0,0 @@
package audio

View File

@@ -8,10 +8,8 @@ import (
"io"
"log"
"os/exec"
"strings"
"time"
"gitlab.crans.org/nounous/ghostream/stream"
"gitlab.crans.org/nounous/ghostream/messaging"
)
// Options holds text package configuration
@@ -23,45 +21,46 @@ type Options struct {
}
// Init text transcoder
func Init(streams map[string]*stream.Stream, cfg *Options) {
func Init(streams *messaging.Streams, cfg *Options) {
if !cfg.Enabled {
// Text transcode is not enabled, ignore
return
}
// Regulary check existing streams
for {
for sourceName, sourceStream := range streams {
if strings.Contains(sourceName, "@") {
// Not a source stream, pass
continue
// Subscribe to new stream event
event := make(chan string, 8)
streams.Subscribe(event)
// For each new stream
for name := range event {
// Get stream
stream, err := streams.Get(name)
if err != nil {
log.Printf("Failed to get stream '%s'", name)
}
// Check that the transcoded stream does not already exist
name := sourceName + "@text"
_, ok := streams[name]
if ok {
// Stream is already transcoded
continue
// Get specific quality
// FIXME: make it possible to forward other qualities
qualityName := "source"
quality, err := stream.GetQuality(qualityName)
if err != nil {
log.Printf("Failed to get quality '%s'", qualityName)
}
// Start conversion
log.Printf("Starting text transcode '%s'", name)
st := stream.New()
streams[name] = st
go transcode(sourceStream, st, cfg)
// Create new text quality
outputQuality, err := stream.CreateQuality("text")
if err != nil {
log.Printf("Failed to create quality 'text': %s", err)
}
// Regulary pull stream list,
// it may be better to tweak the messaging system
// to get an event on a new stream.
time.Sleep(time.Second)
// Start forwarding
log.Printf("Starting text transcoder for '%s' quality '%s'", name, qualityName)
go transcode(quality, outputQuality, cfg)
}
}
// Convert video to ANSI text
func transcode(input, output *stream.Stream, cfg *Options) {
func transcode(input, output *messaging.Quality, cfg *Options) {
// Start ffmpeg to transcode video to rawvideo
videoInput := make(chan []byte, 1024)
input.Register(videoInput)

View File

@@ -2,19 +2,16 @@
package transcoder
import (
"gitlab.crans.org/nounous/ghostream/stream"
"gitlab.crans.org/nounous/ghostream/transcoder/audio"
"gitlab.crans.org/nounous/ghostream/messaging"
"gitlab.crans.org/nounous/ghostream/transcoder/text"
)
// Options holds text package configuration
type Options struct {
Text text.Options
Audio audio.Options
}
// Init all transcoders
func Init(streams map[string]*stream.Stream, cfg *Options) {
func Init(streams *messaging.Streams, cfg *Options) {
go text.Init(streams, &cfg.Text)
go audio.Init(streams, &cfg.Audio)
}

View File

@@ -21,61 +21,20 @@ var (
validPath = regexp.MustCompile("^/[a-z0-9@_-]*$")
)
// Handle WebRTC session description exchange via POST
func viewerPostHandler(w http.ResponseWriter, r *http.Request) {
// Limit response body to 128KB
r.Body = http.MaxBytesReader(w, r.Body, 131072)
// Get stream ID from URL, or from domain name
path := r.URL.Path[1:]
host := r.Host
if strings.Contains(host, ":") {
realHost, _, err := net.SplitHostPort(r.Host)
if err != nil {
log.Printf("Failed to split host and port from %s", r.Host)
return
}
host = realHost
}
host = strings.Replace(host, ".", "-", -1)
if streamID, ok := cfg.MapDomainToStream[host]; ok {
path = streamID
}
// Decode client description
dec := json.NewDecoder(r.Body)
dec.DisallowUnknownFields()
remoteDescription := webrtc.SessionDescription{}
if err := dec.Decode(&remoteDescription); err != nil {
http.Error(w, "The JSON WebRTC offer is malformed", http.StatusBadRequest)
// Handle site index and viewer pages
func viewerHandler(w http.ResponseWriter, r *http.Request) {
// Validation on path
if validPath.FindStringSubmatch(r.URL.Path) == nil {
http.NotFound(w, r)
log.Printf("Replied not found on %s", r.URL.Path)
return
}
// Exchange session descriptions with WebRTC stream server
remoteSdpChan <- struct {
StreamID string
RemoteDescription webrtc.SessionDescription
}{StreamID: path, RemoteDescription: remoteDescription}
localDescription := <-localSdpChan
// Send server description as JSON
jsonDesc, err := json.Marshal(localDescription)
if err != nil {
http.Error(w, "An error occurred while formating response", http.StatusInternalServerError)
log.Println("An error occurred while sending session description", err)
return
}
w.Header().Set("Content-Type", "application/json")
_, err = w.Write(jsonDesc)
if err != nil {
log.Println("An error occurred while sending session description", err)
// Check method
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed.", http.StatusMethodNotAllowed)
}
// Increment monitoring
monitoring.WebSessions.Inc()
}
func viewerGetHandler(w http.ResponseWriter, r *http.Request) {
// Get stream ID from URL, or from domain name
path := r.URL.Path[1:]
host := r.Host
@@ -122,27 +81,6 @@ func viewerGetHandler(w http.ResponseWriter, r *http.Request) {
monitoring.WebViewerServed.Inc()
}
// Handle site index and viewer pages
// POST requests are used to exchange WebRTC session descriptions
func viewerHandler(w http.ResponseWriter, r *http.Request) {
// Validation on path
if validPath.FindStringSubmatch(r.URL.Path) == nil {
http.NotFound(w, r)
log.Printf("Replied not found on %s", r.URL.Path)
return
}
// Route depending on HTTP method
switch r.Method {
case http.MethodGet:
viewerGetHandler(w, r)
case http.MethodPost:
viewerPostHandler(w, r)
default:
http.Error(w, "Sorry, only GET and POST methods are supported.", http.StatusBadRequest)
}
}
func staticHandler() http.Handler {
// Set up static files server
staticFs := http.FileServer(pkger.Dir("/web/static"))
@@ -153,19 +91,16 @@ func statisticsHandler(w http.ResponseWriter, r *http.Request) {
name := strings.SplitN(strings.Replace(r.URL.Path[7:], "/", "", -1), "@", 2)[0]
userCount := 0
// Get all substreams
for _, outputType := range []string{"", "@720p", "@480p", "@360p", "@240p", "@text"} {
// Get requested stream
stream, ok := streams[name+outputType]
if ok {
// Get number of output channels
userCount += stream.ClientCount()
}
stream, err := streams.Get(name)
if err == nil {
userCount = stream.ClientCount()
userCount += webrtc.GetNumberConnectedSessions(name)
}
// Display connected users statistics
enc := json.NewEncoder(w)
err := enc.Encode(struct{ ConnectedViewers int }{userCount})
err = enc.Encode(struct{ ConnectedViewers int }{userCount})
if err != nil {
http.Error(w, "Failed to generate JSON.", http.StatusInternalServerError)
log.Printf("Failed to generate JSON: %s", err)

View File

@@ -4,6 +4,8 @@ import (
"net/http"
"net/http/httptest"
"testing"
"gitlab.crans.org/nounous/ghostream/messaging"
)
func TestViewerPageGET(t *testing.T) {
@@ -12,6 +14,9 @@ func TestViewerPageGET(t *testing.T) {
t.Errorf("Failed to load templates: %v", err)
}
// Init streams messaging
streams = messaging.New()
cfg = &Options{}
// Test GET request

View File

@@ -2,7 +2,7 @@
video {
display: block;
flex-grow: 1;
width: 100%;
max-width: 100%;
/* Black borders around video */
background-color: #000;

View File

@@ -0,0 +1,29 @@
/**
* ViewerCounter show the number of active viewers
*/
export class ViewerCounter {
/**
* @param {HTMLElement} element
* @param {String} streamName
*/
constructor(element, streamName) {
this.element = element;
this.url = "/_stats/" + streamName;
}
/**
* Regulary update counter
*
* @param {Number} updatePeriod
*/
regularUpdate(updatePeriod) {
setInterval(() => this.refreshViewersCounter(), updatePeriod);
}
refreshViewersCounter() {
fetch(this.url)
.then(response => response.json())
.then((data) => this.element.innerText = data.ConnectedViewers)
.catch(console.log);
}
}

View File

@@ -0,0 +1,99 @@
/**
* GsWebRTC to connect to Ghostream
*/
export class GsWebRTC {
/**
* @param {list} stunServers STUN servers
* @param {HTMLElement} viewer Video HTML element
* @param {HTMLElement} connectionIndicator Connection indicator element
*/
constructor(stunServers, viewer, connectionIndicator) {
this.viewer = viewer;
this.connectionIndicator = connectionIndicator;
this.pc = new RTCPeerConnection({
iceServers: [{ urls: stunServers }]
});
// We want to receive audio and video
this.pc.addTransceiver("video", { "direction": "sendrecv" });
this.pc.addTransceiver("audio", { "direction": "sendrecv" });
// Configure events
this.pc.oniceconnectionstatechange = () => this._onConnectionStateChange();
this.pc.ontrack = (e) => this._onTrack(e);
}
/**
* On connection change, log it and change indicator.
* If connection closed or failed, try to reconnect.
*/
_onConnectionStateChange() {
console.log("[WebRTC] ICE connection state changed to " + this.pc.iceConnectionState);
switch (this.pc.iceConnectionState) {
case "disconnected":
this.connectionIndicator.style.fill = "#dc3545";
break;
case "checking":
this.connectionIndicator.style.fill = "#ffc107";
break;
case "connected":
this.connectionIndicator.style.fill = "#28a745";
break;
case "closed":
case "failed":
console.log("[WebRTC] Connection closed, restarting...");
/*peerConnection.close();
peerConnection = null;
setTimeout(startPeerConnection, 1000);*/
break;
}
}
/**
* On new track, add it to the player
* @param {Event} event
*/
_onTrack(event) {
console.log(`[WebRTC] New ${event.track.kind} track`);
if (event.track.kind === "video") {
this.viewer.srcObject = event.streams[0];
}
}
/**
* Create an offer and set local description.
* After that the browser will fire onicecandidate events.
*/
createOffer() {
this.pc.createOffer().then(offer => {
this.pc.setLocalDescription(offer);
console.log("[WebRTC] WebRTC offer created");
}).catch(console.log);
}
/**
* Register a function to call to send local descriptions
* @param {Function} sendFunction Called with a local description to send.
*/
onICECandidate(sendFunction) {
// When candidate is null, ICE layer has run out of potential configurations to suggest
// so let's send the offer to the server.
// FIXME: Send offers progressively to do Trickle ICE
this.pc.onicecandidate = event => {
if (event.candidate === null) {
// Send offer to server
console.log("[WebRTC] Sending session description to server");
sendFunction(this.pc.localDescription);
}
};
}
/**
* Set WebRTC remote description
* After that, the connection will be established and ontrack will be fired.
* @param {RTCSessionDescription} sdp Session description data
*/
setRemoteDescription(sdp) {
this.pc.setRemoteDescription(sdp);
}
}

View File

@@ -0,0 +1,62 @@
/**
* GsWebSocket to do Ghostream signalling
*/
export class GsWebSocket {
constructor() {
const protocol = (window.location.protocol === "https:") ? "wss://" : "ws://";
this.url = protocol + window.location.host + "/_ws/";
// Open WebSocket
this._open();
// Configure events
this.socket.addEventListener("open", () => {
console.log("[WebSocket] Connection established");
});
this.socket.addEventListener("close", () => {
console.log("[WebSocket] Connection closed, retrying connection in 1s...");
setTimeout(() => this._open(), 1000);
});
this.socket.addEventListener("error", () => {
console.log("[WebSocket] Connection errored, retrying connection in 1s...");
setTimeout(() => this._open(), 1000);
});
}
_open() {
console.log(`[WebSocket] Connecting to ${this.url}...`);
this.socket = new WebSocket(this.url);
}
/**
* Send local WebRTC session description to remote.
* @param {SessionDescription} localDescription WebRTC local SDP
* @param {string} stream Name of the stream
* @param {string} quality Requested quality
*/
sendLocalDescription(localDescription, stream, quality) {
if (this.socket.readyState !== 1) {
console.log("[WebSocket] Waiting for connection to send data...");
setTimeout(() => this.sendLocalDescription(localDescription, stream, quality), 100);
return;
}
console.log(`[WebSocket] Sending WebRTC local session description for stream ${stream} quality ${quality}`);
this.socket.send(JSON.stringify({
"webRtcSdp": localDescription,
"stream": stream,
"quality": quality
}));
}
/**
* Set callback function on new remote session description.
* @param {Function} callback Function called when data is received
*/
onRemoteDescription(callback) {
this.socket.addEventListener("message", (event) => {
console.log("[WebSocket] Received WebRTC remote session description");
const sdp = new RTCSessionDescription(JSON.parse(event.data));
callback(sdp);
});
}
}

View File

@@ -1,12 +0,0 @@
// Side widget toggler
const sideWidgetToggle = document.getElementById("sideWidgetToggle")
sideWidgetToggle.addEventListener("click", function () {
const sideWidget = document.getElementById("sideWidget")
if (sideWidget.style.display === "none") {
sideWidget.style.display = "block"
sideWidgetToggle.textContent = "»"
} else {
sideWidget.style.display = "none"
sideWidgetToggle.textContent = "«"
}
})

View File

@@ -1,9 +0,0 @@
document.getElementById("quality").addEventListener("change", (event) => {
console.log(`Stream quality changed to ${event.target.value}`)
// Restart the connection with a new quality
peerConnection.close()
peerConnection = null
streamPath = window.location.href + event.target.value
startPeerConnection()
})

View File

@@ -1,97 +1,87 @@
let peerConnection
let streamPath = window.location.href
import { GsWebSocket } from "./modules/websocket.js";
import { ViewerCounter } from "./modules/viewerCounter.js";
import { GsWebRTC } from "./modules/webrtc.js";
startPeerConnection = () => {
// Init peer connection
peerConnection = new RTCPeerConnection({
iceServers: [{ urls: stunServers }]
})
/**
* Initialize viewer page
*
* @param {String} stream
* @param {List} stunServers
* @param {Number} viewersCounterRefreshPeriod
*/
export function initViewerPage(stream, stunServers, viewersCounterRefreshPeriod) {
// Viewer element
const viewer = document.getElementById("viewer");
// On connection change, change indicator color
// if connection failed, restart peer connection
peerConnection.oniceconnectionstatechange = e => {
console.log("ICE connection state changed, " + peerConnection.iceConnectionState)
switch (peerConnection.iceConnectionState) {
case "disconnected":
document.getElementById("connectionIndicator").style.fill = "#dc3545"
break
case "checking":
document.getElementById("connectionIndicator").style.fill = "#ffc107"
break
case "connected":
document.getElementById("connectionIndicator").style.fill = "#28a745"
break
case "closed":
case "failed":
console.log("Connection failed, restarting...")
peerConnection.close()
peerConnection = null
setTimeout(startPeerConnection, 1000)
break
}
}
// Default quality
let quality = "source";
// We want to receive audio and video
peerConnection.addTransceiver('video', { 'direction': 'sendrecv' })
peerConnection.addTransceiver('audio', { 'direction': 'sendrecv' })
// Create WebSocket and WebRTC
const websocket = new GsWebSocket();
const webrtc = new GsWebRTC(
stunServers,
viewer,
document.getElementById("connectionIndicator"),
);
webrtc.createOffer();
webrtc.onICECandidate(localDescription => {
websocket.sendLocalDescription(localDescription, stream, quality);
});
websocket.onRemoteDescription(sdp => {
webrtc.setRemoteDescription(sdp);
});
// Create offer and set local description
peerConnection.createOffer().then(offer => {
// After setLocalDescription, the browser will fire onicecandidate events
peerConnection.setLocalDescription(offer)
}).catch(console.log)
// When candidate is null, ICE layer has run out of potential configurations to suggest
// so let's send the offer to the server
peerConnection.onicecandidate = event => {
if (event.candidate === null) {
// Send offer to server
// The server know the stream name from the url
// The server replies with its description
// After setRemoteDescription, the browser will fire ontrack events
console.log("Sending session description to server")
fetch(streamPath, {
method: 'POST',
headers: {
'Accept': 'application/json',
'Content-Type': 'application/json'
},
body: JSON.stringify(peerConnection.localDescription)
})
.then(response => response.json())
.then((data) => peerConnection.setRemoteDescription(new RTCSessionDescription(data)))
.catch(console.log)
}
}
// When video track is received, configure player
peerConnection.ontrack = function (event) {
console.log(`New ${event.track.kind} track`)
if (event.track.kind === "video") {
const viewer = document.getElementById('viewer')
viewer.srcObject = event.streams[0]
}
}
}
// Register keyboard events
let viewer = document.getElementById("viewer")
window.addEventListener("keydown", (event) => {
// Register keyboard events
window.addEventListener("keydown", (event) => {
switch (event.key) {
case 'f':
case "f":
// F key put player in fullscreen
if (document.fullscreenElement !== null) {
document.exitFullscreen()
document.exitFullscreen();
} else {
viewer.requestFullscreen()
viewer.requestFullscreen();
}
break
case 'm':
case ' ':
break;
case "m":
case " ":
// M and space key mute player
viewer.muted = !viewer.muted
event.preventDefault()
viewer.play()
break
viewer.muted = !viewer.muted;
event.preventDefault();
viewer.play();
break;
}
})
});
// Create viewer counter
const viewerCounter = new ViewerCounter(
document.getElementById("connected-people"),
stream,
);
viewerCounter.regularUpdate(viewersCounterRefreshPeriod);
viewerCounter.refreshViewersCounter();
// Side widget toggler
const sideWidgetToggle = document.getElementById("sideWidgetToggle");
const sideWidget = document.getElementById("sideWidget");
if (sideWidgetToggle !== null && sideWidget !== null) {
// On click, toggle side widget visibility
sideWidgetToggle.addEventListener("click", function () {
if (sideWidget.style.display === "none") {
sideWidget.style.display = "block";
sideWidgetToggle.textContent = "»";
} else {
sideWidget.style.display = "none";
sideWidgetToggle.textContent = "«";
}
});
}
// Video quality toggler
document.getElementById("quality").addEventListener("change", (event) => {
quality = event.target.value;
console.log(`Stream quality changed to ${quality}`);
// Restart WebRTC negociation
webrtc.createOffer();
});
}

View File

@@ -1,12 +0,0 @@
// Refresh viewer count by pulling metric from server
function refreshViewersCounter(streamID, period) {
// Distinguish oneDomainPerStream mode
fetch("/_stats/" + streamID)
.then(response => response.json())
.then((data) => document.getElementById("connected-people").innerText = data.ConnectedViewers)
.catch(console.log)
setTimeout(() => {
refreshViewersCounter(streamID, period)
}, period)
}

View File

@@ -4,6 +4,7 @@
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
<title>{{if .Path}}{{.Path}} - {{end}}{{.Cfg.Name}}</title>
<link rel="stylesheet" href="static/css/style.css">
<link rel="stylesheet" href="static/css/player.css">

View File

@@ -21,11 +21,7 @@
<ul>
<li>
<b>Serveur :</b>
<code>srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}</code>,
</li>
<li>
<b>Clé de stream :</b>
<code>IDENTIFIANT:MOT_DE_PASSE</code>
<code>srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}?IDENTIFIANT:MOT_DE_PASS</code>,
avec <code>IDENTIFIANT</code> et <code>MOT_DE_PASSE</code>
vos identifiants.
</li>
@@ -52,6 +48,81 @@
</code>
</p>
<h2>Comment lire un flux depuis un lecteur externe ?</h2>
<p>
À l'heure actuelle, la plupart des lecteurs vidéos ne supportent
pas le protocole SRT, ou le supportent mal. Un travail est en
cours pour les rendre un maximum compatibles. Liste non exhaustive
des lecteurs vidéos testés :
</p>
<h3>FFPlay</h3>
<p>
Si FFMpeg est installé sur votre machine, il est accompagné d'un
lecteur vidéo nommé <code>ffplay</code>. Si FFMpeg est compilé
avec le support de SRT (c'est le cas sur la plupart des distributions,
sauf cas ci-dessous), il vous suffira d'exécuter :
</p>
<p>
<code>
ffplay -fflags nobuffer srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}?streamid=IDENTIFIANT
</code>
</p>
<h3>MPV</h3>
<p>
MPV supporte officiellement SRT depuis le 16 octobre 2020.
Néanmoins, la version stable de MPV est beaucoup plus vieille.
Vous devez alors utiliser une version de développement pour
pouvoir lire un flux SRT depuis MPV. L'installation se fait
depuis <a href="https://mpv.io/installation/"> cette page</a>.
Sous Arch Linux, il vous suffit de récupérer le paquet
<code>mpv-git</code> dans l'AUR. Pour lire le flux, il suffit
d'exécuter :
</p>
<p>
<code>
mpv srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}?streamid=IDENTIFIANT
</code>
</p>
<h3>VLC Media Player</h3>
<p>
Bien que VLC supporte officiellement le protocole SRT,
toutes les options ne sont pas encore implémentées,
notamment l'option pour choisir son stream.
<a href="https://patches.videolan.org/patch/30299/">Un patch</a>
a été soumis et est en attente d'acceptation.
Une fois le patch accepté, il sera appliqué dans les versions
de développement de VLC. Sous Arch Linux, il suffit de récupérer
le paquet <code>vlc-git</code> de l'AUR. Avec un VLC à jour,
il suffit d'exécuter :
</p>
<p>
<code>
vlc srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}?streamid=IDENTIFIANT
</code>
</p>
<p>
Ou bien d'aller dans Média -> Ouvrir un flux réseau et d'entrer l'URL
<code>srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}?streamid=IDENTIFIANT</code>.
</p>
<h3>Le protocole n'existe pas ou n'est pas supporté.</h3>
<p>
La technologie SRT est très récente et n'est pas supportée par
les dépôts stables. Ainsi, si vous avez Ubuntu &le; 20.04 ou
Debian &le; Buster, vous ne pourrez pas utiliser de lecteur vidéo
ni même diffuser avec votre machine. Vous devrez vous mettre à
jour vers Ubuntu 20.10 ou Debian Bullseye.
</p>
<h2>Mentions légales</h2>
<p>
Le service de diffusion vidéo du Crans est un service d'hébergement

View File

@@ -8,10 +8,10 @@
<div class="controls">
<span class="control-quality">
<select id="quality">
<option value="">Source</option>
<option value="@720p">720p</option>
<option value="@480p">480p</option>
<option value="@240p">240p</option>
<option value="source">Source</option>
<option value="720p">720p</option>
<option value="480p">480p</option>
<option value="240p">240p</option>
</select>
</span>
<code class="control-srt-link">srt://{{.Cfg.Hostname}}:{{.Cfg.SRTServerPort}}?streamid={{.Path}}</code>
@@ -34,21 +34,17 @@
{{end}}
</div>
{{if .WidgetURL}}<script src="/static/js/sideWidget.js"></script>{{end}}
<script src="/static/js/videoQuality.js"></script>
<script src="/static/js/viewer.js"></script>
<script src="/static/js/viewersCounter.js"></script>
<script>
<script type="module">
import { initViewerPage } from "/static/js/viewer.js";
// Some variables that need to be fixed by web page
const viewersCounterRefreshPeriod = Number("{{.Cfg.ViewersCounterRefreshPeriod}}");
const stream = "{{.Path}}";
const stunServers = [
{{range $id, $value := .Cfg.STUNServers}}
'{{$value}}',
"{{$value}}",
{{end}}
]
startPeerConnection()
// Wait a bit before pulling viewers counter for the first time
setTimeout(() => {
refreshViewersCounter("{{.Path}}", {{.Cfg.ViewersCounterRefreshPeriod}})
}, 1000)
initViewerPage(stream, stunServers, viewersCounterRefreshPeriod)
</script>
{{end}}

View File

@@ -10,8 +10,7 @@ import (
"strings"
"github.com/markbates/pkger"
"github.com/pion/webrtc/v3"
"gitlab.crans.org/nounous/ghostream/stream"
"gitlab.crans.org/nounous/ghostream/messaging"
)
// Options holds web package configuration
@@ -33,18 +32,11 @@ type Options struct {
var (
cfg *Options
// WebRTC session description channels
remoteSdpChan chan struct {
StreamID string
RemoteDescription webrtc.SessionDescription
}
localSdpChan chan webrtc.SessionDescription
// Preload templates
templates *template.Template
// Streams to get statistics
streams map[string]*stream.Stream
streams *messaging.Streams
)
// Load templates with pkger
@@ -78,13 +70,8 @@ func loadTemplates() error {
}
// Serve HTTP server
func Serve(s map[string]*stream.Stream, rSdpChan chan struct {
StreamID string
RemoteDescription webrtc.SessionDescription
}, lSdpChan chan webrtc.SessionDescription, c *Options) {
func Serve(s *messaging.Streams, c *Options) {
streams = s
remoteSdpChan = rSdpChan
localSdpChan = lSdpChan
cfg = c
if !cfg.Enabled {
@@ -101,6 +88,7 @@ func Serve(s map[string]*stream.Stream, rSdpChan chan struct {
mux := http.NewServeMux()
mux.HandleFunc("/", viewerHandler)
mux.Handle("/static/", staticHandler())
mux.HandleFunc("/_ws/", websocketHandler)
mux.HandleFunc("/_stats/", statisticsHandler)
log.Printf("HTTP server listening on %s", cfg.ListenAddress)
log.Fatal(http.ListenAndServe(cfg.ListenAddress, mux))

View File

@@ -5,16 +5,16 @@ import (
"testing"
"time"
"gitlab.crans.org/nounous/ghostream/stream"
"gitlab.crans.org/nounous/ghostream/messaging"
)
// TestHTTPServe tries to serve a real HTTP server and load some pages
func TestHTTPServe(t *testing.T) {
// Init streams messaging
streams := make(map[string]*stream.Stream)
streams := messaging.New()
// Create a disabled web server
go Serve(streams, nil, nil, &Options{Enabled: false, ListenAddress: "127.0.0.1:8081"})
go Serve(streams, &Options{Enabled: false, ListenAddress: "127.0.0.1:8081"})
// Sleep 500ms to ensure that the web server is running, to avoid fails because the request came too early
time.Sleep(500 * time.Millisecond)
@@ -26,7 +26,7 @@ func TestHTTPServe(t *testing.T) {
}
// Now let's really start the web server
go Serve(streams, nil, nil, &Options{Enabled: true, ListenAddress: "127.0.0.1:8081"})
go Serve(streams, &Options{Enabled: true, ListenAddress: "127.0.0.1:8081"})
// Sleep 500ms to ensure that the web server is running, to avoid fails because the request came too early
time.Sleep(500 * time.Millisecond)

68
web/websocket_handler.go Normal file
View File

@@ -0,0 +1,68 @@
// Package web serves the JavaScript player and WebRTC negotiation
package web
import (
"log"
"net/http"
"github.com/gorilla/websocket"
"gitlab.crans.org/nounous/ghostream/stream/webrtc"
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
// clientDescription is sent by new client
type clientDescription struct {
WebRtcSdp webrtc.SessionDescription
Stream string
Quality string
}
// websocketHandler exchanges WebRTC SDP and viewer count
func websocketHandler(w http.ResponseWriter, r *http.Request) {
// Upgrade client connection to WebSocket
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("Failed to upgrade client to websocket: %s", err)
return
}
for {
// Get client description
c := &clientDescription{}
err = conn.ReadJSON(c)
if err != nil {
log.Printf("Failed to receive client description: %s", err)
_ = conn.Close()
return
}
// Get requested stream
stream, err := streams.Get(c.Stream)
if err != nil {
log.Printf("Stream not found: %s", c.Stream)
continue
}
// Get requested quality
q, err := stream.GetQuality(c.Quality)
if err != nil {
log.Printf("Quality not found: %s", c.Quality)
continue
}
// Exchange session descriptions with WebRTC stream server
// FIXME: Add trickle ICE support
q.WebRtcRemoteSdp <- c.WebRtcSdp
localDescription := <-q.WebRtcLocalSdp
// Send new local description
if err := conn.WriteJSON(localDescription); err != nil {
log.Println(err)
continue
}
}
}