diff --git a/go.mod b/go.mod index 128eef3..09a0e82 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.13 require ( github.com/go-ldap/ldap/v3 v3.2.3 + github.com/gorilla/websocket v1.4.0 github.com/haivision/srtgo v0.0.0-20200731151239-e00427ae473a github.com/markbates/pkger v0.17.1 github.com/pion/rtp v1.6.0 diff --git a/go.sum b/go.sum index 00c058c..0d16fd8 100644 --- a/go.sum +++ b/go.sum @@ -113,6 +113,7 @@ github.com/googleapis/gax-go v2.0.0+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk github.com/googleapis/gax-go/v2 v2.0.3/go.mod h1:LLvjysVCY1JZeum8Z6l8qUty8fiNwE08qbEPm1M08qg= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= diff --git a/web/handler.go b/web/handler.go index 5b52bc2..a17191d 100644 --- a/web/handler.go +++ b/web/handler.go @@ -21,76 +21,20 @@ var ( validPath = regexp.MustCompile("^/[a-z0-9@_-]*$") ) -// Handle WebRTC session description exchange via POST -func viewerPostHandler(w http.ResponseWriter, r *http.Request) { - // Limit response body to 128KB - r.Body = http.MaxBytesReader(w, r.Body, 131072) - - // Get stream ID from URL, or from domain name - path := r.URL.Path[1:] - host := r.Host - if strings.Contains(host, ":") { - realHost, _, err := net.SplitHostPort(r.Host) - if err != nil { - log.Printf("Failed to split host and port from %s", r.Host) - return - } - host = realHost - } - host = strings.Replace(host, ".", "-", -1) - if streamID, ok := cfg.MapDomainToStream[host]; ok { - path = streamID - } - - // Decode client description - dec := json.NewDecoder(r.Body) - dec.DisallowUnknownFields() - remoteDescription := webrtc.SessionDescription{} - if err := dec.Decode(&remoteDescription); err != nil { - http.Error(w, "The JSON WebRTC offer is malformed", http.StatusBadRequest) +// Handle site index and viewer pages +func viewerHandler(w http.ResponseWriter, r *http.Request) { + // Validation on path + if validPath.FindStringSubmatch(r.URL.Path) == nil { + http.NotFound(w, r) + log.Printf("Replied not found on %s", r.URL.Path) return } - // Get requested stream - stream, err := streams.Get(path) - if err != nil { - http.Error(w, "Stream not found", http.StatusNotFound) - log.Printf("Stream not found: %s", path) - return + // Check method + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed.", http.StatusMethodNotAllowed) } - // Get requested quality - // FIXME: extract quality from request - qualityName := "source" - q, err := stream.GetQuality(qualityName) - if err != nil { - http.Error(w, "Quality not found", http.StatusNotFound) - log.Printf("Quality not found: %s", qualityName) - return - } - - // Exchange session descriptions with WebRTC stream server - q.WebRtcRemoteSdp <- remoteDescription - localDescription := <-q.WebRtcLocalSdp - - // Send server description as JSON - jsonDesc, err := json.Marshal(localDescription) - if err != nil { - http.Error(w, "An error occurred while formating response", http.StatusInternalServerError) - log.Println("An error occurred while sending session description", err) - return - } - w.Header().Set("Content-Type", "application/json") - _, err = w.Write(jsonDesc) - if err != nil { - log.Println("An error occurred while sending session description", err) - } - - // Increment monitoring - monitoring.WebSessions.Inc() -} - -func viewerGetHandler(w http.ResponseWriter, r *http.Request) { // Get stream ID from URL, or from domain name path := r.URL.Path[1:] host := r.Host @@ -137,27 +81,6 @@ func viewerGetHandler(w http.ResponseWriter, r *http.Request) { monitoring.WebViewerServed.Inc() } -// Handle site index and viewer pages -// POST requests are used to exchange WebRTC session descriptions -func viewerHandler(w http.ResponseWriter, r *http.Request) { - // Validation on path - if validPath.FindStringSubmatch(r.URL.Path) == nil { - http.NotFound(w, r) - log.Printf("Replied not found on %s", r.URL.Path) - return - } - - // Route depending on HTTP method - switch r.Method { - case http.MethodGet: - viewerGetHandler(w, r) - case http.MethodPost: - viewerPostHandler(w, r) - default: - http.Error(w, "Sorry, only GET and POST methods are supported.", http.StatusBadRequest) - } -} - func staticHandler() http.Handler { // Set up static files server staticFs := http.FileServer(pkger.Dir("/web/static")) diff --git a/web/web.go b/web/web.go index 32d1290..44dc90e 100644 --- a/web/web.go +++ b/web/web.go @@ -88,6 +88,7 @@ func Serve(s *messaging.Streams, c *Options) { mux := http.NewServeMux() mux.HandleFunc("/", viewerHandler) mux.Handle("/static/", staticHandler()) + mux.HandleFunc("/_ws/", websocketHandler) mux.HandleFunc("/_stats/", statisticsHandler) log.Printf("HTTP server listening on %s", cfg.ListenAddress) log.Fatal(http.ListenAndServe(cfg.ListenAddress, mux)) diff --git a/web/websocket_handler.go b/web/websocket_handler.go new file mode 100644 index 0000000..2475b7b --- /dev/null +++ b/web/websocket_handler.go @@ -0,0 +1,66 @@ +package web + +import ( + "log" + "net/http" + + "github.com/gorilla/websocket" + "gitlab.crans.org/nounous/ghostream/stream/webrtc" +) + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, +} + +// clientDescription is sent by new client +type clientDescription struct { + webRtcSdp webrtc.SessionDescription + stream string + quality string +} + +// websocketHandler exchanges WebRTC SDP and viewer count +func websocketHandler(w http.ResponseWriter, r *http.Request) { + // Upgrade client connection to WebSocket + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Printf("Failed to upgrade client to websocket: %s", err) + return + } + + for { + // Get client description + c := &clientDescription{} + err = conn.ReadJSON(c) + if err != nil { + log.Printf("Failed to receive client description: %s", err) + return + } + + // Get requested stream + stream, err := streams.Get(c.stream) + if err != nil { + log.Printf("Stream not found: %s", c.stream) + return + } + + // Get requested quality + q, err := stream.GetQuality(c.quality) + if err != nil { + log.Printf("Quality not found: %s", c.quality) + return + } + + // Exchange session descriptions with WebRTC stream server + // FIXME: Add trickle ICE support + q.WebRtcRemoteSdp <- c.webRtcSdp + localDescription := <-q.WebRtcLocalSdp + + // Send new local description + if err := conn.WriteJSON(localDescription); err != nil { + log.Println(err) + return + } + } +}