WebRTC session exchange working

This commit is contained in:
Alexandre Iooss 2020-09-24 11:24:13 +02:00
parent 02399e7e26
commit 097766141f
No known key found for this signature in database
GPG Key ID: 6C79278F3FCDCC02
6 changed files with 278 additions and 46 deletions

View File

@ -15,15 +15,21 @@ type Options struct {
} }
var ( var (
// ViewerServed is the total amount of viewer page served // WebViewerServed is the total amount of viewer page served
ViewerServed = promauto.NewCounter(prometheus.CounterOpts{ WebViewerServed = promauto.NewCounter(prometheus.CounterOpts{
Name: "ghostream_viewer_served_total", Name: "ghostream_web_viewer_served_total",
Help: "The total amount of viewer served", Help: "The total amount of viewer served",
}) })
// WebSessions is the total amount of WebRTC session exchange
WebSessions = promauto.NewCounter(prometheus.CounterOpts{
Name: "ghostream_web_sessions_total",
Help: "The total amount of WebRTC sessions exchanged",
})
) )
// ServeHTTP server that expose prometheus metrics // Serve monitoring server that expose prometheus metrics
func ServeHTTP(cfg *Options) { func Serve(cfg *Options) {
mux := http.NewServeMux() mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler()) mux.Handle("/metrics", promhttp.Handler())
log.Printf("Monitoring HTTP server listening on %s", cfg.ListenAddress) log.Printf("Monitoring HTTP server listening on %s", cfg.ListenAddress)

17
main.go
View File

@ -4,9 +4,11 @@ import (
"log" "log"
"strings" "strings"
"github.com/pion/webrtc/v3"
"github.com/spf13/viper" "github.com/spf13/viper"
"gitlab.crans.org/nounous/ghostream/auth" "gitlab.crans.org/nounous/ghostream/auth"
"gitlab.crans.org/nounous/ghostream/internal/monitoring" "gitlab.crans.org/nounous/ghostream/internal/monitoring"
"gitlab.crans.org/nounous/ghostream/stream"
"gitlab.crans.org/nounous/ghostream/web" "gitlab.crans.org/nounous/ghostream/web"
) )
@ -68,15 +70,14 @@ func main() {
} }
defer authBackend.Close() defer authBackend.Close()
// Start web server routine // WebRTC session description channels
go func() { remoteSdpChan := make(chan webrtc.SessionDescription)
web.ServeHTTP(&cfg.Web) localSdpChan := make(chan webrtc.SessionDescription)
}()
// Start monitoring server routine // Start stream, web and monitoring server
go func() { go stream.Serve(remoteSdpChan, localSdpChan)
monitoring.ServeHTTP(&cfg.Monitoring) go web.Serve(remoteSdpChan, localSdpChan, &cfg.Web)
}() go monitoring.Serve(&cfg.Monitoring)
// Wait for routines // Wait for routines
select {} select {}

207
stream/stream.go Normal file
View File

@ -0,0 +1,207 @@
package stream
import (
"context"
"fmt"
"io"
"math/rand"
"os"
"time"
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/media"
"github.com/pion/webrtc/v3/pkg/media/ivfreader"
"github.com/pion/webrtc/v3/pkg/media/oggreader"
)
const (
audioFileName = "output.ogg"
videoFileName = "output.ivf"
)
// Serve WebRTC media streaming server
func Serve(remoteSdpChan chan webrtc.SessionDescription, localSdpChan chan webrtc.SessionDescription) {
// Assert that we have an audio or video file
_, err := os.Stat(videoFileName)
haveVideoFile := !os.IsNotExist(err)
_, err = os.Stat(audioFileName)
haveAudioFile := !os.IsNotExist(err)
if !haveAudioFile && !haveVideoFile {
panic("Could not find `" + audioFileName + "` or `" + videoFileName + "`")
}
// Passing client offer
offer := <-remoteSdpChan
// We make our own mediaEngine so we can place the sender's codecs in it. This because we must use the
// dynamic media type from the sender in our answer. This is not required if we are the offerer
mediaEngine := webrtc.MediaEngine{}
if err = mediaEngine.PopulateFromSDP(offer); err != nil {
panic(err)
}
// Create a new RTCPeerConnection
api := webrtc.NewAPI(webrtc.WithMediaEngine(mediaEngine))
peerConnection, err := api.NewPeerConnection(webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{
URLs: []string{"stun:stun.l.google.com:19302"},
},
},
})
if err != nil {
panic(err)
}
iceConnectedCtx, iceConnectedCtxCancel := context.WithCancel(context.Background())
if haveVideoFile {
// Create a video track
videoTrack, addTrackErr := peerConnection.NewTrack(getPayloadType(mediaEngine, webrtc.RTPCodecTypeVideo, "VP8"), rand.Uint32(), "video", "pion")
if addTrackErr != nil {
panic(addTrackErr)
}
if _, addTrackErr = peerConnection.AddTrack(videoTrack); addTrackErr != nil {
panic(addTrackErr)
}
go func() {
// Open a IVF file and start reading using our IVFReader
file, ivfErr := os.Open(videoFileName)
if ivfErr != nil {
panic(ivfErr)
}
ivf, header, ivfErr := ivfreader.NewWith(file)
if ivfErr != nil {
panic(ivfErr)
}
// Wait for connection established
<-iceConnectedCtx.Done()
// Send our video file frame at a time. Pace our sending so we send it at the same speed it should be played back as.
// This isn't required since the video is timestamped, but we will such much higher loss if we send all at once.
sleepTime := time.Millisecond * time.Duration((float32(header.TimebaseNumerator)/float32(header.TimebaseDenominator))*1000)
for {
frame, _, ivfErr := ivf.ParseNextFrame()
if ivfErr == io.EOF {
fmt.Printf("All video frames parsed and sent")
os.Exit(0)
}
if ivfErr != nil {
panic(ivfErr)
}
time.Sleep(sleepTime)
if ivfErr = videoTrack.WriteSample(media.Sample{Data: frame, Samples: 90000}); ivfErr != nil {
panic(ivfErr)
}
}
}()
}
if haveAudioFile {
// Create a audio track
audioTrack, addTrackErr := peerConnection.NewTrack(getPayloadType(mediaEngine, webrtc.RTPCodecTypeAudio, "opus"), rand.Uint32(), "audio", "pion")
if addTrackErr != nil {
panic(addTrackErr)
}
if _, addTrackErr = peerConnection.AddTrack(audioTrack); addTrackErr != nil {
panic(addTrackErr)
}
go func() {
// Open a IVF file and start reading using our IVFReader
file, oggErr := os.Open(audioFileName)
if oggErr != nil {
panic(oggErr)
}
// Open on oggfile in non-checksum mode.
ogg, _, oggErr := oggreader.NewWith(file)
if oggErr != nil {
panic(oggErr)
}
// Wait for connection established
<-iceConnectedCtx.Done()
// Keep track of last granule, the difference is the amount of samples in the buffer
var lastGranule uint64
for {
pageData, pageHeader, oggErr := ogg.ParseNextPage()
if oggErr == io.EOF {
fmt.Printf("All audio pages parsed and sent")
os.Exit(0)
}
if oggErr != nil {
panic(oggErr)
}
// The amount of samples is the difference between the last and current timestamp
sampleCount := float64(pageHeader.GranulePosition - lastGranule)
lastGranule = pageHeader.GranulePosition
if oggErr = audioTrack.WriteSample(media.Sample{Data: pageData, Samples: uint32(sampleCount)}); oggErr != nil {
panic(oggErr)
}
// Convert seconds to Milliseconds, Sleep doesn't accept floats
time.Sleep(time.Duration((sampleCount/48000)*1000) * time.Millisecond)
}
}()
}
// Set the handler for ICE connection state
// This will notify you when the peer has connected/disconnected
peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
fmt.Printf("Connection State has changed %s \n", connectionState.String())
if connectionState == webrtc.ICEConnectionStateConnected {
iceConnectedCtxCancel()
}
})
// Set the remote SessionDescription
if err = peerConnection.SetRemoteDescription(offer); err != nil {
panic(err)
}
// Create answer
answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
panic(err)
}
// Create channel that is blocked until ICE Gathering is complete
gatherComplete := webrtc.GatheringCompletePromise(peerConnection)
// Sets the LocalDescription, and starts our UDP listeners
if err = peerConnection.SetLocalDescription(answer); err != nil {
panic(err)
}
// Block until ICE Gathering is complete, disabling trickle ICE
// we do this because we only can exchange one signaling message
// in a production application you should exchange ICE Candidates via OnICECandidate
<-gatherComplete
// Output the answer
localSdpChan <- *peerConnection.LocalDescription()
// Block forever
select {}
}
// Search for Codec PayloadType
//
// Since we are answering we need to match the remote PayloadType
func getPayloadType(m webrtc.MediaEngine, codecType webrtc.RTPCodecType, codecName string) uint8 {
for _, codec := range m.GetCodecsByKind(codecType) {
if codec.Name == codecName {
return codec.PayloadType
}
}
panic(fmt.Sprintf("Remote peer does not support %s", codecName))
}

View File

@ -76,6 +76,9 @@ h1, h2, h3, h4 {
width: 100%; width: 100%;
height: 100%; height: 100%;
max-height: 90vh; max-height: 90vh;
/* Black borders when video is not 16/9 */
background-color: #000;
} }
.col-chat { .col-chat {

View File

@ -12,7 +12,7 @@ peerConnection = new RTCPeerConnection({
peerConnection.oniceconnectionstatechange = e => { peerConnection.oniceconnectionstatechange = e => {
console.log(peerConnection.iceConnectionState) console.log(peerConnection.iceConnectionState)
switch (myPeerConnection.iceConnectionState) { switch (peerConnection.iceConnectionState) {
case "closed": case "closed":
case "failed": case "failed":
console.log("FIXME Failed"); console.log("FIXME Failed");

View File

@ -20,11 +20,19 @@ type Options struct {
WidgetURL string WidgetURL string
} }
// Preload templates var (
var templates = template.Must(template.ParseGlob("web/template/*.html")) cfg *Options
// WebRTC session description channels
remoteSdpChan chan webrtc.SessionDescription
localSdpChan chan webrtc.SessionDescription
// Preload templates
templates = template.Must(template.ParseGlob("web/template/*.html"))
)
// Handle WebRTC session description exchange via POST // Handle WebRTC session description exchange via POST
func sessionExchangeHandler(w http.ResponseWriter, r *http.Request) { func viewerPostHandler(w http.ResponseWriter, r *http.Request) {
// Limit response body to 128KB // Limit response body to 128KB
r.Body = http.MaxBytesReader(w, r.Body, 131072) r.Body = http.MaxBytesReader(w, r.Body, 131072)
@ -37,8 +45,9 @@ func sessionExchangeHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
// FIXME remoteDescription -> "Magic" -> localDescription // Exchange session descriptions with WebRTC stream server
localDescription := remoteDescription remoteSdpChan <- remoteDescription
localDescription := <-localSdpChan
// Send server description as JSON // Send server description as JSON
jsonDesc, err := json.Marshal(localDescription) jsonDesc, err := json.Marshal(localDescription)
@ -49,15 +58,12 @@ func sessionExchangeHandler(w http.ResponseWriter, r *http.Request) {
} }
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
w.Write(jsonDesc) w.Write(jsonDesc)
// Increment monitoring
monitoring.WebSessions.Inc()
} }
// Handle site index and viewer pages func viewerGetHandler(w http.ResponseWriter, r *http.Request) {
// POST requests are used to exchange WebRTC session descriptions
func viewerHandler(w http.ResponseWriter, r *http.Request, cfg *Options) {
// FIXME validation on path: https://golang.org/doc/articles/wiki/#tmp_11
switch r.Method {
case "GET":
// Render template // Render template
data := struct { data := struct {
Path string Path string
@ -66,20 +72,32 @@ func viewerHandler(w http.ResponseWriter, r *http.Request, cfg *Options) {
if err := templates.ExecuteTemplate(w, "base", data); err != nil { if err := templates.ExecuteTemplate(w, "base", data); err != nil {
log.Println(err.Error()) log.Println(err.Error())
http.Error(w, "Internal Server Error", http.StatusInternalServerError) http.Error(w, "Internal Server Error", http.StatusInternalServerError)
} return
case "POST":
sessionExchangeHandler(w, r)
default:
http.Error(w, "Sorry, only GET and POST methods are supported.", http.StatusBadRequest)
} }
// Increment monitoring // Increment monitoring
monitoring.ViewerServed.Inc() monitoring.WebViewerServed.Inc()
}
// Handle site index and viewer pages
// POST requests are used to exchange WebRTC session descriptions
func viewerHandler(w http.ResponseWriter, r *http.Request) {
// FIXME validation on path: https://golang.org/doc/articles/wiki/#tmp_11
// Route depending on HTTP method
switch r.Method {
case http.MethodGet:
viewerGetHandler(w, r)
case http.MethodPost:
viewerPostHandler(w, r)
default:
http.Error(w, "Sorry, only GET and POST methods are supported.", http.StatusBadRequest)
}
} }
// Handle static files // Handle static files
// We do not use http.FileServer as we do not want directory listing // We do not use http.FileServer as we do not want directory listing
func staticHandler(w http.ResponseWriter, r *http.Request, cfg *Options) { func staticHandler(w http.ResponseWriter, r *http.Request) {
path := "./web/" + r.URL.Path path := "./web/" + r.URL.Path
if f, err := os.Stat(path); err == nil && !f.IsDir() { if f, err := os.Stat(path); err == nil && !f.IsDir() {
http.ServeFile(w, r, path) http.ServeFile(w, r, path)
@ -88,19 +106,16 @@ func staticHandler(w http.ResponseWriter, r *http.Request, cfg *Options) {
} }
} }
// Closure to pass configuration // Serve HTTP server
func makeHandler(fn func(http.ResponseWriter, *http.Request, *Options), cfg *Options) http.HandlerFunc { func Serve(rSdpChan chan webrtc.SessionDescription, lSdpChan chan webrtc.SessionDescription, c *Options) {
return func(w http.ResponseWriter, r *http.Request) { remoteSdpChan = rSdpChan
fn(w, r, cfg) localSdpChan = lSdpChan
} cfg = c
}
// ServeHTTP server
func ServeHTTP(cfg *Options) {
// Set up HTTP router and server // Set up HTTP router and server
mux := http.NewServeMux() mux := http.NewServeMux()
mux.HandleFunc("/", makeHandler(viewerHandler, cfg)) mux.HandleFunc("/", viewerHandler)
mux.HandleFunc("/static/", makeHandler(staticHandler, cfg)) mux.HandleFunc("/static/", staticHandler)
log.Printf("HTTP server listening on %s", cfg.ListenAddress) log.Printf("HTTP server listening on %s", cfg.ListenAddress)
log.Fatal(http.ListenAndServe(cfg.ListenAddress, mux)) log.Fatal(http.ListenAndServe(cfg.ListenAddress, mux))
} }