diff --git a/.gitignore b/.gitignore index 4b753e2..12b2578 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,9 @@ pkged.go # Profiler and test files *.prof *.test + +# Javascript tools +.eslintrc.js +node_modules +package.json +package-lock.json diff --git a/go.mod b/go.mod index 128eef3..09a0e82 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.13 require ( github.com/go-ldap/ldap/v3 v3.2.3 + github.com/gorilla/websocket v1.4.0 github.com/haivision/srtgo v0.0.0-20200731151239-e00427ae473a github.com/markbates/pkger v0.17.1 github.com/pion/rtp v1.6.0 diff --git a/go.sum b/go.sum index 00c058c..0d16fd8 100644 --- a/go.sum +++ b/go.sum @@ -113,6 +113,7 @@ github.com/googleapis/gax-go v2.0.0+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk github.com/googleapis/gax-go/v2 v2.0.3/go.mod h1:LLvjysVCY1JZeum8Z6l8qUty8fiNwE08qbEPm1M08qg= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= diff --git a/web/handler.go b/web/handler.go index 5b52bc2..a17191d 100644 --- a/web/handler.go +++ b/web/handler.go @@ -21,76 +21,20 @@ var ( validPath = regexp.MustCompile("^/[a-z0-9@_-]*$") ) -// Handle WebRTC session description exchange via POST -func viewerPostHandler(w http.ResponseWriter, r *http.Request) { - // Limit response body to 128KB - r.Body = http.MaxBytesReader(w, r.Body, 131072) - - // Get stream ID from URL, or from domain name - path := r.URL.Path[1:] - host := r.Host - if strings.Contains(host, ":") { - realHost, _, err := net.SplitHostPort(r.Host) - if err != nil { - log.Printf("Failed to split host and port from %s", r.Host) - return - } - host = realHost - } - host = strings.Replace(host, ".", "-", -1) - if streamID, ok := cfg.MapDomainToStream[host]; ok { - path = streamID - } - - // Decode client description - dec := json.NewDecoder(r.Body) - dec.DisallowUnknownFields() - remoteDescription := webrtc.SessionDescription{} - if err := dec.Decode(&remoteDescription); err != nil { - http.Error(w, "The JSON WebRTC offer is malformed", http.StatusBadRequest) +// Handle site index and viewer pages +func viewerHandler(w http.ResponseWriter, r *http.Request) { + // Validation on path + if validPath.FindStringSubmatch(r.URL.Path) == nil { + http.NotFound(w, r) + log.Printf("Replied not found on %s", r.URL.Path) return } - // Get requested stream - stream, err := streams.Get(path) - if err != nil { - http.Error(w, "Stream not found", http.StatusNotFound) - log.Printf("Stream not found: %s", path) - return + // Check method + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed.", http.StatusMethodNotAllowed) } - // Get requested quality - // FIXME: extract quality from request - qualityName := "source" - q, err := stream.GetQuality(qualityName) - if err != nil { - http.Error(w, "Quality not found", http.StatusNotFound) - log.Printf("Quality not found: %s", qualityName) - return - } - - // Exchange session descriptions with WebRTC stream server - q.WebRtcRemoteSdp <- remoteDescription - localDescription := <-q.WebRtcLocalSdp - - // Send server description as JSON - jsonDesc, err := json.Marshal(localDescription) - if err != nil { - http.Error(w, "An error occurred while formating response", http.StatusInternalServerError) - log.Println("An error occurred while sending session description", err) - return - } - w.Header().Set("Content-Type", "application/json") - _, err = w.Write(jsonDesc) - if err != nil { - log.Println("An error occurred while sending session description", err) - } - - // Increment monitoring - monitoring.WebSessions.Inc() -} - -func viewerGetHandler(w http.ResponseWriter, r *http.Request) { // Get stream ID from URL, or from domain name path := r.URL.Path[1:] host := r.Host @@ -137,27 +81,6 @@ func viewerGetHandler(w http.ResponseWriter, r *http.Request) { monitoring.WebViewerServed.Inc() } -// Handle site index and viewer pages -// POST requests are used to exchange WebRTC session descriptions -func viewerHandler(w http.ResponseWriter, r *http.Request) { - // Validation on path - if validPath.FindStringSubmatch(r.URL.Path) == nil { - http.NotFound(w, r) - log.Printf("Replied not found on %s", r.URL.Path) - return - } - - // Route depending on HTTP method - switch r.Method { - case http.MethodGet: - viewerGetHandler(w, r) - case http.MethodPost: - viewerPostHandler(w, r) - default: - http.Error(w, "Sorry, only GET and POST methods are supported.", http.StatusBadRequest) - } -} - func staticHandler() http.Handler { // Set up static files server staticFs := http.FileServer(pkger.Dir("/web/static")) diff --git a/web/static/js/modules/viewerCounter.js b/web/static/js/modules/viewerCounter.js new file mode 100644 index 0000000..b95bf38 --- /dev/null +++ b/web/static/js/modules/viewerCounter.js @@ -0,0 +1,29 @@ +/** + * ViewerCounter show the number of active viewers + */ +export class ViewerCounter { + /** + * @param {HTMLElement} element + * @param {String} streamName + */ + constructor(element, streamName) { + this.element = element; + this.url = "/_stats/" + streamName; + } + + /** + * Regulary update counter + * + * @param {Number} updatePeriod + */ + regularUpdate(updatePeriod) { + setInterval(() => this.refreshViewersCounter(), updatePeriod); + } + + refreshViewersCounter() { + fetch(this.url) + .then(response => response.json()) + .then((data) => this.element.innerText = data.ConnectedViewers) + .catch(console.log); + } +} diff --git a/web/static/js/modules/webrtc.js b/web/static/js/modules/webrtc.js new file mode 100644 index 0000000..df359e7 --- /dev/null +++ b/web/static/js/modules/webrtc.js @@ -0,0 +1,98 @@ +/** + * GsWebRTC to connect to Ghostream + */ +export class GsWebRTC { + /** + * @param {list} stunServers + * @param {HTMLElement} connectionIndicator + */ + constructor(stunServers, connectionIndicator) { + this.connectionIndicator = connectionIndicator; + this.pc = new RTCPeerConnection({ + iceServers: [{ urls: stunServers }] + }); + + // We want to receive audio and video + this.pc.addTransceiver("video", { "direction": "sendrecv" }); + this.pc.addTransceiver("audio", { "direction": "sendrecv" }); + + // Configure events + this.pc.oniceconnectionstatechange = () => this._onConnectionStateChange(); + this.pc.ontrack = (e) => this._onTrack(e); + } + + /** + * On connection change, log it and change indicator. + * If connection closed or failed, try to reconnect. + */ + _onConnectionStateChange() { + console.log("ICE connection state changed to " + this.pc.iceConnectionState); + switch (this.pc.iceConnectionState) { + case "disconnected": + this.connectionIndicator.style.fill = "#dc3545"; + break; + case "checking": + this.connectionIndicator.style.fill = "#ffc107"; + break; + case "connected": + this.connectionIndicator.style.fill = "#28a745"; + break; + case "closed": + case "failed": + console.log("Connection closed, restarting..."); + /*peerConnection.close(); + peerConnection = null; + setTimeout(startPeerConnection, 1000);*/ + break; + } + } + + /** + * On new track, add it to the player + * @param {Event} event + */ + _onTrack(event) { + console.log(`New ${event.track.kind} track`); + if (event.track.kind === "video") { + const viewer = document.getElementById("viewer"); + viewer.srcObject = event.streams[0]; + } + } + + /** + * Create an offer and set local description. + * After that the browser will fire onicecandidate events. + */ + createOffer() { + this.pc.createOffer().then(offer => { + this.pc.setLocalDescription(offer); + console.log("WebRTC offer created"); + }).catch(console.log); + } + + /** + * Register a function to call to send local descriptions + * @param {Function} sendFunction Called with a local description to send. + */ + onICECandidate(sendFunction) { + // When candidate is null, ICE layer has run out of potential configurations to suggest + // so let's send the offer to the server. + // FIXME: Send offers progressively to do Trickle ICE + this.pc.onicecandidate = event => { + if (event.candidate === null) { + // Send offer to server + console.log("Sending session description to server"); + sendFunction(this.pc.localDescription); + } + }; + } + + /** + * Set WebRTC remote description + * After that, the connection will be established and ontrack will be fired. + * @param {RTCSessionDescription} sdp Session description data + */ + setRemoteDescription(sdp) { + this.pc.setRemoteDescription(sdp); + } +} diff --git a/web/static/js/modules/websocket.js b/web/static/js/modules/websocket.js new file mode 100644 index 0000000..b9f017c --- /dev/null +++ b/web/static/js/modules/websocket.js @@ -0,0 +1,63 @@ +/** + * GsWebSocket to do Ghostream signalling + */ +export class GsWebSocket { + constructor() { + const protocol = (window.location.protocol === "https:") ? "wss://" : "ws://"; + this.url = protocol + window.location.host + "/_ws/"; + } + + _open() { + this.socket = new WebSocket(this.url); + } + + /** + * Open websocket. + * @param {Function} openCallback Function called when connection is established. + * @param {Function} closeCallback Function called when connection is lost. + */ + open() { + this._open(); + this.socket.addEventListener("open", () => { + console.log("WebSocket opened"); + }); + this.socket.addEventListener("close", () => { + console.log("WebSocket closed, retrying connection in 1s..."); + setTimeout(() => this._open(), 1000); + }); + this.socket.addEventListener("error", () => { + console.log("WebSocket errored, retrying connection in 1s..."); + setTimeout(() => this._open(), 1000); + }); + } + + /** + * Exchange WebRTC session description with server. + * @param {SessionDescription} localDescription WebRTC local SDP + * @param {string} stream Name of the stream + * @param {string} quality Requested quality + */ + sendDescription(localDescription, stream, quality) { + if (this.socket.readyState !== 1) { + console.log("WebSocket not ready to send data"); + return; + } + this.socket.send(JSON.stringify({ + "webRtcSdp": localDescription, + "stream": stream, + "quality": quality + })); + } + + /** + * Set callback function on new session description. + * @param {Function} callback Function called when data is received + */ + onDescription(callback) { + this.socket.addEventListener("message", (event) => { + console.log("Message from server ", event.data); + const sdp = new RTCSessionDescription(JSON.parse(event.data)); + callback(sdp); + }); + } +} diff --git a/web/static/js/sideWidget.js b/web/static/js/sideWidget.js deleted file mode 100644 index efc0d9e..0000000 --- a/web/static/js/sideWidget.js +++ /dev/null @@ -1,12 +0,0 @@ -// Side widget toggler -const sideWidgetToggle = document.getElementById("sideWidgetToggle") -sideWidgetToggle.addEventListener("click", function () { - const sideWidget = document.getElementById("sideWidget") - if (sideWidget.style.display === "none") { - sideWidget.style.display = "block" - sideWidgetToggle.textContent = "»" - } else { - sideWidget.style.display = "none" - sideWidgetToggle.textContent = "«" - } -}) \ No newline at end of file diff --git a/web/static/js/videoQuality.js b/web/static/js/videoQuality.js deleted file mode 100644 index ba6e15e..0000000 --- a/web/static/js/videoQuality.js +++ /dev/null @@ -1,9 +0,0 @@ -document.getElementById("quality").addEventListener("change", (event) => { - console.log(`Stream quality changed to ${event.target.value}`) - - // Restart the connection with a new quality - peerConnection.close() - peerConnection = null - streamPath = window.location.href + event.target.value - startPeerConnection() -}) diff --git a/web/static/js/viewer.js b/web/static/js/viewer.js index 9b8f3ea..1e0e7d3 100644 --- a/web/static/js/viewer.js +++ b/web/static/js/viewer.js @@ -1,97 +1,87 @@ -let peerConnection -let streamPath = window.location.href +import { GsWebSocket } from "./modules/websocket.js"; +import { ViewerCounter } from "./modules/viewerCounter.js"; +import { GsWebRTC } from "./modules/webrtc.js"; -startPeerConnection = () => { - // Init peer connection - peerConnection = new RTCPeerConnection({ - iceServers: [{ urls: stunServers }] - }) +/** + * Initialize viewer page + * + * @param {String} stream + * @param {List} stunServers + * @param {Number} viewersCounterRefreshPeriod + */ +export function initViewerPage(stream, stunServers, viewersCounterRefreshPeriod) { + // Default quality + let quality = "source"; - // On connection change, change indicator color - // if connection failed, restart peer connection - peerConnection.oniceconnectionstatechange = e => { - console.log("ICE connection state changed, " + peerConnection.iceConnectionState) - switch (peerConnection.iceConnectionState) { - case "disconnected": - document.getElementById("connectionIndicator").style.fill = "#dc3545" - break - case "checking": - document.getElementById("connectionIndicator").style.fill = "#ffc107" - break - case "connected": - document.getElementById("connectionIndicator").style.fill = "#28a745" - break - case "closed": - case "failed": - console.log("Connection failed, restarting...") - peerConnection.close() - peerConnection = null - setTimeout(startPeerConnection, 1000) - break - } - } + // Create WebSocket + const s = new GsWebSocket(); + s.open(); - // We want to receive audio and video - peerConnection.addTransceiver('video', { 'direction': 'sendrecv' }) - peerConnection.addTransceiver('audio', { 'direction': 'sendrecv' }) + // Create WebRTC + const c = new GsWebRTC( + stunServers, + document.getElementById("connectionIndicator"), + ); + c.createOffer(); + c.onICECandidate(localDescription => { + s.sendDescription(localDescription, stream, quality); + }); + s.onDescription(sdp => { + c.setRemoteDescription(sdp); + }); - // Create offer and set local description - peerConnection.createOffer().then(offer => { - // After setLocalDescription, the browser will fire onicecandidate events - peerConnection.setLocalDescription(offer) - }).catch(console.log) - - // When candidate is null, ICE layer has run out of potential configurations to suggest - // so let's send the offer to the server - peerConnection.onicecandidate = event => { - if (event.candidate === null) { - // Send offer to server - // The server know the stream name from the url - // The server replies with its description - // After setRemoteDescription, the browser will fire ontrack events - console.log("Sending session description to server") - fetch(streamPath, { - method: 'POST', - headers: { - 'Accept': 'application/json', - 'Content-Type': 'application/json' - }, - body: JSON.stringify(peerConnection.localDescription) - }) - .then(response => response.json()) - .then((data) => peerConnection.setRemoteDescription(new RTCSessionDescription(data))) - .catch(console.log) - } - } - - // When video track is received, configure player - peerConnection.ontrack = function (event) { - console.log(`New ${event.track.kind} track`) - if (event.track.kind === "video") { - const viewer = document.getElementById('viewer') - viewer.srcObject = event.streams[0] - } - } -} - -// Register keyboard events -let viewer = document.getElementById("viewer") -window.addEventListener("keydown", (event) => { - switch (event.key) { - case 'f': + // Register keyboard events + const viewer = document.getElementById("viewer"); + window.addEventListener("keydown", (event) => { + switch (event.key) { + case "f": // F key put player in fullscreen if (document.fullscreenElement !== null) { - document.exitFullscreen() + document.exitFullscreen(); } else { - viewer.requestFullscreen() + viewer.requestFullscreen(); } - break - case 'm': - case ' ': + break; + case "m": + case " ": // M and space key mute player - viewer.muted = !viewer.muted - event.preventDefault() - viewer.play() - break + viewer.muted = !viewer.muted; + event.preventDefault(); + viewer.play(); + break; + } + }); + + // Create viewer counter + const viewerCounter = new ViewerCounter( + document.getElementById("connected-people"), + stream, + ); + viewerCounter.regularUpdate(viewersCounterRefreshPeriod); + viewerCounter.refreshViewersCounter(); + + // Side widget toggler + const sideWidgetToggle = document.getElementById("sideWidgetToggle"); + const sideWidget = document.getElementById("sideWidget"); + if (sideWidgetToggle !== null && sideWidget !== null) { + // On click, toggle side widget visibility + sideWidgetToggle.addEventListener("click", function () { + if (sideWidget.style.display === "none") { + sideWidget.style.display = "block"; + sideWidgetToggle.textContent = "»"; + } else { + sideWidget.style.display = "none"; + sideWidgetToggle.textContent = "«"; + } + }); } -}) + + // Video quality toggler + document.getElementById("quality").addEventListener("change", (event) => { + quality = event.target.value; + console.log(`Stream quality changed to ${quality}`); + + // Restart the connection with a new quality + // FIXME + }); +} diff --git a/web/static/js/viewersCounter.js b/web/static/js/viewersCounter.js deleted file mode 100644 index 7298f6e..0000000 --- a/web/static/js/viewersCounter.js +++ /dev/null @@ -1,12 +0,0 @@ -// Refresh viewer count by pulling metric from server -function refreshViewersCounter(streamID, period) { - // Distinguish oneDomainPerStream mode - fetch("/_stats/" + streamID) - .then(response => response.json()) - .then((data) => document.getElementById("connected-people").innerText = data.ConnectedViewers) - .catch(console.log) - - setTimeout(() => { - refreshViewersCounter(streamID, period) - }, period) -} diff --git a/web/template/player.html b/web/template/player.html index 3e340a4..e5efc69 100644 --- a/web/template/player.html +++ b/web/template/player.html @@ -34,21 +34,17 @@ {{end}} -{{if .WidgetURL}}{{end}} - - - - -{{end}} \ No newline at end of file +{{end}} diff --git a/web/web.go b/web/web.go index 32d1290..44dc90e 100644 --- a/web/web.go +++ b/web/web.go @@ -88,6 +88,7 @@ func Serve(s *messaging.Streams, c *Options) { mux := http.NewServeMux() mux.HandleFunc("/", viewerHandler) mux.Handle("/static/", staticHandler()) + mux.HandleFunc("/_ws/", websocketHandler) mux.HandleFunc("/_stats/", statisticsHandler) log.Printf("HTTP server listening on %s", cfg.ListenAddress) log.Fatal(http.ListenAndServe(cfg.ListenAddress, mux)) diff --git a/web/websocket_handler.go b/web/websocket_handler.go new file mode 100644 index 0000000..e774135 --- /dev/null +++ b/web/websocket_handler.go @@ -0,0 +1,67 @@ +// Package web serves the JavaScript player and WebRTC negotiation +package web + +import ( + "log" + "net/http" + + "github.com/gorilla/websocket" + "gitlab.crans.org/nounous/ghostream/stream/webrtc" +) + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, +} + +// clientDescription is sent by new client +type clientDescription struct { + WebRtcSdp webrtc.SessionDescription + Stream string + Quality string +} + +// websocketHandler exchanges WebRTC SDP and viewer count +func websocketHandler(w http.ResponseWriter, r *http.Request) { + // Upgrade client connection to WebSocket + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Printf("Failed to upgrade client to websocket: %s", err) + return + } + + for { + // Get client description + c := &clientDescription{} + err = conn.ReadJSON(c) + if err != nil { + log.Printf("Failed to receive client description: %s", err) + return + } + + // Get requested stream + stream, err := streams.Get(c.Stream) + if err != nil { + log.Printf("Stream not found: %s", c.Stream) + return + } + + // Get requested quality + q, err := stream.GetQuality(c.Quality) + if err != nil { + log.Printf("Quality not found: %s", c.Quality) + return + } + + // Exchange session descriptions with WebRTC stream server + // FIXME: Add trickle ICE support + q.WebRtcRemoteSdp <- c.WebRtcSdp + localDescription := <-q.WebRtcLocalSdp + + // Send new local description + if err := conn.WriteJSON(localDescription); err != nil { + log.Println(err) + return + } + } +}