diff --git a/internal/monitoring/monitoring.go b/internal/monitoring/monitoring.go index 8ec6626..2a10c50 100644 --- a/internal/monitoring/monitoring.go +++ b/internal/monitoring/monitoring.go @@ -15,15 +15,21 @@ type Options struct { } var ( - // ViewerServed is the total amount of viewer page served - ViewerServed = promauto.NewCounter(prometheus.CounterOpts{ - Name: "ghostream_viewer_served_total", + // WebViewerServed is the total amount of viewer page served + WebViewerServed = promauto.NewCounter(prometheus.CounterOpts{ + Name: "ghostream_web_viewer_served_total", Help: "The total amount of viewer served", }) + + // WebSessions is the total amount of WebRTC session exchange + WebSessions = promauto.NewCounter(prometheus.CounterOpts{ + Name: "ghostream_web_sessions_total", + Help: "The total amount of WebRTC sessions exchanged", + }) ) -// ServeHTTP server that expose prometheus metrics -func ServeHTTP(cfg *Options) { +// Serve monitoring server that expose prometheus metrics +func Serve(cfg *Options) { mux := http.NewServeMux() mux.Handle("/metrics", promhttp.Handler()) log.Printf("Monitoring HTTP server listening on %s", cfg.ListenAddress) diff --git a/main.go b/main.go index 713af45..1c2b317 100644 --- a/main.go +++ b/main.go @@ -4,9 +4,11 @@ import ( "log" "strings" + "github.com/pion/webrtc/v3" "github.com/spf13/viper" "gitlab.crans.org/nounous/ghostream/auth" "gitlab.crans.org/nounous/ghostream/internal/monitoring" + "gitlab.crans.org/nounous/ghostream/stream" "gitlab.crans.org/nounous/ghostream/web" ) @@ -68,15 +70,14 @@ func main() { } defer authBackend.Close() - // Start web server routine - go func() { - web.ServeHTTP(&cfg.Web) - }() + // WebRTC session description channels + remoteSdpChan := make(chan webrtc.SessionDescription) + localSdpChan := make(chan webrtc.SessionDescription) - // Start monitoring server routine - go func() { - monitoring.ServeHTTP(&cfg.Monitoring) - }() + // Start stream, web and monitoring server + go stream.Serve(remoteSdpChan, localSdpChan) + go web.Serve(remoteSdpChan, localSdpChan, &cfg.Web) + go monitoring.Serve(&cfg.Monitoring) // Wait for routines select {} diff --git a/stream/stream.go b/stream/stream.go new file mode 100644 index 0000000..7a150ff --- /dev/null +++ b/stream/stream.go @@ -0,0 +1,207 @@ +package stream + +import ( + "context" + "fmt" + "io" + "math/rand" + "os" + "time" + + "github.com/pion/webrtc/v3" + "github.com/pion/webrtc/v3/pkg/media" + "github.com/pion/webrtc/v3/pkg/media/ivfreader" + "github.com/pion/webrtc/v3/pkg/media/oggreader" +) + +const ( + audioFileName = "output.ogg" + videoFileName = "output.ivf" +) + +// Serve WebRTC media streaming server +func Serve(remoteSdpChan chan webrtc.SessionDescription, localSdpChan chan webrtc.SessionDescription) { + // Assert that we have an audio or video file + _, err := os.Stat(videoFileName) + haveVideoFile := !os.IsNotExist(err) + _, err = os.Stat(audioFileName) + haveAudioFile := !os.IsNotExist(err) + if !haveAudioFile && !haveVideoFile { + panic("Could not find `" + audioFileName + "` or `" + videoFileName + "`") + } + + // Passing client offer + offer := <-remoteSdpChan + + // We make our own mediaEngine so we can place the sender's codecs in it. This because we must use the + // dynamic media type from the sender in our answer. This is not required if we are the offerer + mediaEngine := webrtc.MediaEngine{} + if err = mediaEngine.PopulateFromSDP(offer); err != nil { + panic(err) + } + + // Create a new RTCPeerConnection + api := webrtc.NewAPI(webrtc.WithMediaEngine(mediaEngine)) + peerConnection, err := api.NewPeerConnection(webrtc.Configuration{ + ICEServers: []webrtc.ICEServer{ + { + URLs: []string{"stun:stun.l.google.com:19302"}, + }, + }, + }) + if err != nil { + panic(err) + } + iceConnectedCtx, iceConnectedCtxCancel := context.WithCancel(context.Background()) + + if haveVideoFile { + // Create a video track + videoTrack, addTrackErr := peerConnection.NewTrack(getPayloadType(mediaEngine, webrtc.RTPCodecTypeVideo, "VP8"), rand.Uint32(), "video", "pion") + if addTrackErr != nil { + panic(addTrackErr) + } + if _, addTrackErr = peerConnection.AddTrack(videoTrack); addTrackErr != nil { + panic(addTrackErr) + } + + go func() { + // Open a IVF file and start reading using our IVFReader + file, ivfErr := os.Open(videoFileName) + if ivfErr != nil { + panic(ivfErr) + } + + ivf, header, ivfErr := ivfreader.NewWith(file) + if ivfErr != nil { + panic(ivfErr) + } + + // Wait for connection established + <-iceConnectedCtx.Done() + + // Send our video file frame at a time. Pace our sending so we send it at the same speed it should be played back as. + // This isn't required since the video is timestamped, but we will such much higher loss if we send all at once. + sleepTime := time.Millisecond * time.Duration((float32(header.TimebaseNumerator)/float32(header.TimebaseDenominator))*1000) + for { + frame, _, ivfErr := ivf.ParseNextFrame() + if ivfErr == io.EOF { + fmt.Printf("All video frames parsed and sent") + os.Exit(0) + } + + if ivfErr != nil { + panic(ivfErr) + } + + time.Sleep(sleepTime) + if ivfErr = videoTrack.WriteSample(media.Sample{Data: frame, Samples: 90000}); ivfErr != nil { + panic(ivfErr) + } + } + }() + } + + if haveAudioFile { + // Create a audio track + audioTrack, addTrackErr := peerConnection.NewTrack(getPayloadType(mediaEngine, webrtc.RTPCodecTypeAudio, "opus"), rand.Uint32(), "audio", "pion") + if addTrackErr != nil { + panic(addTrackErr) + } + if _, addTrackErr = peerConnection.AddTrack(audioTrack); addTrackErr != nil { + panic(addTrackErr) + } + + go func() { + // Open a IVF file and start reading using our IVFReader + file, oggErr := os.Open(audioFileName) + if oggErr != nil { + panic(oggErr) + } + + // Open on oggfile in non-checksum mode. + ogg, _, oggErr := oggreader.NewWith(file) + if oggErr != nil { + panic(oggErr) + } + + // Wait for connection established + <-iceConnectedCtx.Done() + + // Keep track of last granule, the difference is the amount of samples in the buffer + var lastGranule uint64 + for { + pageData, pageHeader, oggErr := ogg.ParseNextPage() + if oggErr == io.EOF { + fmt.Printf("All audio pages parsed and sent") + os.Exit(0) + } + + if oggErr != nil { + panic(oggErr) + } + + // The amount of samples is the difference between the last and current timestamp + sampleCount := float64(pageHeader.GranulePosition - lastGranule) + lastGranule = pageHeader.GranulePosition + + if oggErr = audioTrack.WriteSample(media.Sample{Data: pageData, Samples: uint32(sampleCount)}); oggErr != nil { + panic(oggErr) + } + + // Convert seconds to Milliseconds, Sleep doesn't accept floats + time.Sleep(time.Duration((sampleCount/48000)*1000) * time.Millisecond) + } + }() + } + + // Set the handler for ICE connection state + // This will notify you when the peer has connected/disconnected + peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { + fmt.Printf("Connection State has changed %s \n", connectionState.String()) + if connectionState == webrtc.ICEConnectionStateConnected { + iceConnectedCtxCancel() + } + }) + + // Set the remote SessionDescription + if err = peerConnection.SetRemoteDescription(offer); err != nil { + panic(err) + } + + // Create answer + answer, err := peerConnection.CreateAnswer(nil) + if err != nil { + panic(err) + } + + // Create channel that is blocked until ICE Gathering is complete + gatherComplete := webrtc.GatheringCompletePromise(peerConnection) + + // Sets the LocalDescription, and starts our UDP listeners + if err = peerConnection.SetLocalDescription(answer); err != nil { + panic(err) + } + + // Block until ICE Gathering is complete, disabling trickle ICE + // we do this because we only can exchange one signaling message + // in a production application you should exchange ICE Candidates via OnICECandidate + <-gatherComplete + + // Output the answer + localSdpChan <- *peerConnection.LocalDescription() + + // Block forever + select {} +} + +// Search for Codec PayloadType +// +// Since we are answering we need to match the remote PayloadType +func getPayloadType(m webrtc.MediaEngine, codecType webrtc.RTPCodecType, codecName string) uint8 { + for _, codec := range m.GetCodecsByKind(codecType) { + if codec.Name == codecName { + return codec.PayloadType + } + } + panic(fmt.Sprintf("Remote peer does not support %s", codecName)) +} diff --git a/web/static/css/style.css b/web/static/css/style.css index c3e3ace..d47bf1b 100644 --- a/web/static/css/style.css +++ b/web/static/css/style.css @@ -76,6 +76,9 @@ h1, h2, h3, h4 { width: 100%; height: 100%; max-height: 90vh; + + /* Black borders when video is not 16/9 */ + background-color: #000; } .col-chat { diff --git a/web/static/js/viewer.js b/web/static/js/viewer.js index c408c87..d3388aa 100644 --- a/web/static/js/viewer.js +++ b/web/static/js/viewer.js @@ -12,7 +12,7 @@ peerConnection = new RTCPeerConnection({ peerConnection.oniceconnectionstatechange = e => { console.log(peerConnection.iceConnectionState) - switch (myPeerConnection.iceConnectionState) { + switch (peerConnection.iceConnectionState) { case "closed": case "failed": console.log("FIXME Failed"); diff --git a/web/web.go b/web/web.go index 470cb93..0d1077e 100644 --- a/web/web.go +++ b/web/web.go @@ -20,11 +20,19 @@ type Options struct { WidgetURL string } -// Preload templates -var templates = template.Must(template.ParseGlob("web/template/*.html")) +var ( + cfg *Options + + // WebRTC session description channels + remoteSdpChan chan webrtc.SessionDescription + localSdpChan chan webrtc.SessionDescription + + // Preload templates + templates = template.Must(template.ParseGlob("web/template/*.html")) +) // Handle WebRTC session description exchange via POST -func sessionExchangeHandler(w http.ResponseWriter, r *http.Request) { +func viewerPostHandler(w http.ResponseWriter, r *http.Request) { // Limit response body to 128KB r.Body = http.MaxBytesReader(w, r.Body, 131072) @@ -37,8 +45,9 @@ func sessionExchangeHandler(w http.ResponseWriter, r *http.Request) { return } - // FIXME remoteDescription -> "Magic" -> localDescription - localDescription := remoteDescription + // Exchange session descriptions with WebRTC stream server + remoteSdpChan <- remoteDescription + localDescription := <-localSdpChan // Send server description as JSON jsonDesc, err := json.Marshal(localDescription) @@ -49,37 +58,46 @@ func sessionExchangeHandler(w http.ResponseWriter, r *http.Request) { } w.Header().Set("Content-Type", "application/json") w.Write(jsonDesc) + + // Increment monitoring + monitoring.WebSessions.Inc() +} + +func viewerGetHandler(w http.ResponseWriter, r *http.Request) { + // Render template + data := struct { + Path string + Cfg *Options + }{Path: r.URL.Path[1:], Cfg: cfg} + if err := templates.ExecuteTemplate(w, "base", data); err != nil { + log.Println(err.Error()) + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + // Increment monitoring + monitoring.WebViewerServed.Inc() } // Handle site index and viewer pages // POST requests are used to exchange WebRTC session descriptions -func viewerHandler(w http.ResponseWriter, r *http.Request, cfg *Options) { +func viewerHandler(w http.ResponseWriter, r *http.Request) { // FIXME validation on path: https://golang.org/doc/articles/wiki/#tmp_11 + // Route depending on HTTP method switch r.Method { - case "GET": - // Render template - data := struct { - Path string - Cfg *Options - }{Path: r.URL.Path[1:], Cfg: cfg} - if err := templates.ExecuteTemplate(w, "base", data); err != nil { - log.Println(err.Error()) - http.Error(w, "Internal Server Error", http.StatusInternalServerError) - } - case "POST": - sessionExchangeHandler(w, r) + case http.MethodGet: + viewerGetHandler(w, r) + case http.MethodPost: + viewerPostHandler(w, r) default: http.Error(w, "Sorry, only GET and POST methods are supported.", http.StatusBadRequest) } - - // Increment monitoring - monitoring.ViewerServed.Inc() } // Handle static files // We do not use http.FileServer as we do not want directory listing -func staticHandler(w http.ResponseWriter, r *http.Request, cfg *Options) { +func staticHandler(w http.ResponseWriter, r *http.Request) { path := "./web/" + r.URL.Path if f, err := os.Stat(path); err == nil && !f.IsDir() { http.ServeFile(w, r, path) @@ -88,19 +106,16 @@ func staticHandler(w http.ResponseWriter, r *http.Request, cfg *Options) { } } -// Closure to pass configuration -func makeHandler(fn func(http.ResponseWriter, *http.Request, *Options), cfg *Options) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - fn(w, r, cfg) - } -} +// Serve HTTP server +func Serve(rSdpChan chan webrtc.SessionDescription, lSdpChan chan webrtc.SessionDescription, c *Options) { + remoteSdpChan = rSdpChan + localSdpChan = lSdpChan + cfg = c -// ServeHTTP server -func ServeHTTP(cfg *Options) { // Set up HTTP router and server mux := http.NewServeMux() - mux.HandleFunc("/", makeHandler(viewerHandler, cfg)) - mux.HandleFunc("/static/", makeHandler(staticHandler, cfg)) + mux.HandleFunc("/", viewerHandler) + mux.HandleFunc("/static/", staticHandler) log.Printf("HTTP server listening on %s", cfg.ListenAddress) log.Fatal(http.ListenAndServe(cfg.ListenAddress, mux)) }