1
0
mirror of https://gitlab.crans.org/nounous/ghostream.git synced 2024-12-22 16:12:19 +00:00

Put webrtc SDP inside Quality struct

This commit is contained in:
Alexandre Iooss 2020-10-19 21:45:23 +02:00
parent e848d92a1a
commit e1f83a32df
No known key found for this signature in database
GPG Key ID: 6C79278F3FCDCC02
8 changed files with 71 additions and 85 deletions

11
main.go
View File

@ -38,13 +38,6 @@ func main() {
defer authBackend.Close() defer authBackend.Close()
} }
// WebRTC session description channels
remoteSdpChan := make(chan struct {
StreamID string
RemoteDescription webrtc.SessionDescription
})
localSdpChan := make(chan webrtc.SessionDescription)
// Init streams messaging // Init streams messaging
streams := messaging.New() streams := messaging.New()
@ -54,8 +47,8 @@ func main() {
go monitoring.Serve(&cfg.Monitoring) go monitoring.Serve(&cfg.Monitoring)
go srt.Serve(streams, authBackend, &cfg.Srt) go srt.Serve(streams, authBackend, &cfg.Srt)
go telnet.Serve(streams, &cfg.Telnet) go telnet.Serve(streams, &cfg.Telnet)
go web.Serve(streams, remoteSdpChan, localSdpChan, &cfg.Web) go web.Serve(streams, &cfg.Web)
go webrtc.Serve(streams, remoteSdpChan, localSdpChan, &cfg.WebRTC) go webrtc.Serve(streams, &cfg.WebRTC)
// Wait for routines // Wait for routines
select {} select {}

View File

@ -3,6 +3,8 @@ package messaging
import ( import (
"sync" "sync"
"github.com/pion/webrtc/v3"
) )
// Quality holds a specific stream quality. // Quality holds a specific stream quality.
@ -17,6 +19,12 @@ type Quality struct {
// Mutex to lock outputs map // Mutex to lock outputs map
lockOutputs sync.Mutex lockOutputs sync.Mutex
// WebRTC session descriptor exchange.
// When new client connects, a SDP arrives on WebRtcRemoteSdp,
// then webrtc package answers on WebRtcLocalSdp.
WebRtcLocalSdp chan webrtc.SessionDescription
WebRtcRemoteSdp chan webrtc.SessionDescription
} }
func newQuality() (q *Quality) { func newQuality() (q *Quality) {
@ -24,6 +32,8 @@ func newQuality() (q *Quality) {
broadcast := make(chan []byte, 1024) broadcast := make(chan []byte, 1024)
q.Broadcast = broadcast q.Broadcast = broadcast
q.outputs = make(map[chan []byte]struct{}) q.outputs = make(map[chan []byte]struct{})
q.WebRtcLocalSdp = make(chan webrtc.SessionDescription, 1)
q.WebRtcRemoteSdp = make(chan webrtc.SessionDescription, 1)
go q.run(broadcast) go q.run(broadcast)
return q return q
} }

View File

@ -12,33 +12,6 @@ import (
"gitlab.crans.org/nounous/ghostream/messaging" "gitlab.crans.org/nounous/ghostream/messaging"
) )
func autoIngest(streams *messaging.Streams) {
// Subscribe to new stream event
event := make(chan string, 8)
streams.Subscribe(event)
// For each new stream
for name := range event {
// Get stream
stream, err := streams.Get(name)
if err != nil {
log.Printf("Failed to get stream '%s'", name)
}
// Get specific quality
// FIXME: make it possible to forward other qualities
qualityName := "source"
quality, err := stream.GetQuality(qualityName)
if err != nil {
log.Printf("Failed to get quality '%s'", qualityName)
}
// Start forwarding
log.Printf("Starting webrtc for '%s' quality '%s'", name, qualityName)
go ingest(name, quality)
}
}
func ingest(name string, q *messaging.Quality) { func ingest(name string, q *messaging.Quality) {
// Register to get stream // Register to get stream
videoInput := make(chan []byte, 1024) videoInput := make(chan []byte, 1024)

View File

@ -45,13 +45,10 @@ func GetNumberConnectedSessions(streamID string) int {
// newPeerHandler is called when server receive a new session description // newPeerHandler is called when server receive a new session description
// this initiates a WebRTC connection and return server description // this initiates a WebRTC connection and return server description
func newPeerHandler(localSdpChan chan webrtc.SessionDescription, remoteSdp struct { func newPeerHandler(name string, localSdpChan chan webrtc.SessionDescription, remoteSdp webrtc.SessionDescription, cfg *Options) {
StreamID string
RemoteDescription webrtc.SessionDescription
}, cfg *Options) {
// Create media engine using client SDP // Create media engine using client SDP
mediaEngine := webrtc.MediaEngine{} mediaEngine := webrtc.MediaEngine{}
if err := mediaEngine.PopulateFromSDP(remoteSdp.RemoteDescription); err != nil { if err := mediaEngine.PopulateFromSDP(remoteSdp); err != nil {
log.Println("Failed to create new media engine", err) log.Println("Failed to create new media engine", err)
localSdpChan <- webrtc.SessionDescription{} localSdpChan <- webrtc.SessionDescription{}
return return
@ -106,13 +103,13 @@ func newPeerHandler(localSdpChan chan webrtc.SessionDescription, remoteSdp struc
} }
// Set the remote SessionDescription // Set the remote SessionDescription
if err = peerConnection.SetRemoteDescription(remoteSdp.RemoteDescription); err != nil { if err = peerConnection.SetRemoteDescription(remoteSdp); err != nil {
log.Println("Failed to set remote description", err) log.Println("Failed to set remote description", err)
localSdpChan <- webrtc.SessionDescription{} localSdpChan <- webrtc.SessionDescription{}
return return
} }
streamID := remoteSdp.StreamID streamID := name
split := strings.SplitN(streamID, "@", 2) split := strings.SplitN(streamID, "@", 2)
streamID = split[0] streamID = split[0]
quality := "source" quality := "source"
@ -182,10 +179,7 @@ func getPayloadType(m webrtc.MediaEngine, codecType webrtc.RTPCodecType, codecNa
} }
// Serve WebRTC media streaming server // Serve WebRTC media streaming server
func Serve(streams *messaging.Streams, remoteSdpChan chan struct { func Serve(streams *messaging.Streams, cfg *Options) {
StreamID string
RemoteDescription webrtc.SessionDescription
}, localSdpChan chan webrtc.SessionDescription, cfg *Options) {
if !cfg.Enabled { if !cfg.Enabled {
// WebRTC is not enabled, ignore // WebRTC is not enabled, ignore
return return
@ -193,17 +187,42 @@ func Serve(streams *messaging.Streams, remoteSdpChan chan struct {
log.Printf("WebRTC server using UDP from port %d to %d", cfg.MinPortUDP, cfg.MaxPortUDP) log.Printf("WebRTC server using UDP from port %d to %d", cfg.MinPortUDP, cfg.MaxPortUDP)
// Allocate memory // WebRTC ingested tracks
videoTracks = make(map[string][]*webrtc.Track) videoTracks = make(map[string][]*webrtc.Track)
audioTracks = make(map[string][]*webrtc.Track) audioTracks = make(map[string][]*webrtc.Track)
// Ingest data // Subscribe to new stream event
go autoIngest(streams) event := make(chan string, 8)
streams.Subscribe(event)
// For each new stream
for name := range event {
// Get stream
stream, err := streams.Get(name)
if err != nil {
log.Printf("Failed to get stream '%s'", name)
}
// Get specific quality
// FIXME: make it possible to forward other qualities
qualityName := "source"
quality, err := stream.GetQuality(qualityName)
if err != nil {
log.Printf("Failed to get quality '%s'", qualityName)
}
// Start forwarding
log.Printf("Starting webrtc for '%s' quality '%s'", name, qualityName)
go ingest(name, quality)
go listenSdp(name, quality.WebRtcLocalSdp, quality.WebRtcRemoteSdp, cfg)
}
}
func listenSdp(name string, localSdp, remoteSdp chan webrtc.SessionDescription, cfg *Options) {
// Handle new connections // Handle new connections
for { for {
// Wait for incoming session description // Wait for incoming session description
// then send the local description to browser // then send the local description to browser
newPeerHandler(localSdpChan, <-remoteSdpChan, cfg) newPeerHandler(name, localSdp, <-remoteSdp, cfg)
} }
} }

View File

@ -11,18 +11,13 @@ import (
func TestServe(t *testing.T) { func TestServe(t *testing.T) {
// Init streams messaging and WebRTC server // Init streams messaging and WebRTC server
streams := messaging.New() streams := messaging.New()
remoteSdpChan := make(chan struct {
StreamID string
RemoteDescription webrtc.SessionDescription
})
localSdpChan := make(chan webrtc.SessionDescription)
cfg := Options{ cfg := Options{
Enabled: true, Enabled: true,
MinPortUDP: 10000, MinPortUDP: 10000,
MaxPortUDP: 10005, MaxPortUDP: 10005,
STUNServers: []string{"stun:stun.l.google.com:19302"}, STUNServers: []string{"stun:stun.l.google.com:19302"},
} }
go Serve(streams, remoteSdpChan, localSdpChan, &cfg) go Serve(streams, &cfg)
// New client connection // New client connection
mediaEngine := webrtc.MediaEngine{} mediaEngine := webrtc.MediaEngine{}
@ -58,12 +53,6 @@ func TestServe(t *testing.T) {
peerConnection.SetLocalDescription(offer) peerConnection.SetLocalDescription(offer)
<-gatherComplete <-gatherComplete
// Send offer to server // FIXME: Send offer to server
remoteSdpChan <- struct {
StreamID string
RemoteDescription webrtc.SessionDescription
}{"demo", *peerConnection.LocalDescription()}
_ = <-localSdpChan
// FIXME: verify connection did work // FIXME: verify connection did work
} }

View File

@ -51,12 +51,27 @@ func viewerPostHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
// Get requested stream
stream, err := streams.Get(path)
if err != nil {
http.Error(w, "Stream not found", http.StatusNotFound)
log.Printf("Stream not found: %s", path)
return
}
// Get requested quality
// FIXME: extract quality from request
qualityName := "source"
q, err := stream.GetQuality(qualityName)
if err != nil {
http.Error(w, "Quality not found", http.StatusNotFound)
log.Printf("Quality not found: %s", qualityName)
return
}
// Exchange session descriptions with WebRTC stream server // Exchange session descriptions with WebRTC stream server
remoteSdpChan <- struct { q.WebRtcRemoteSdp <- remoteDescription
StreamID string localDescription := <-q.WebRtcLocalSdp
RemoteDescription webrtc.SessionDescription
}{StreamID: path, RemoteDescription: remoteDescription}
localDescription := <-localSdpChan
// Send server description as JSON // Send server description as JSON
jsonDesc, err := json.Marshal(localDescription) jsonDesc, err := json.Marshal(localDescription)

View File

@ -10,7 +10,6 @@ import (
"strings" "strings"
"github.com/markbates/pkger" "github.com/markbates/pkger"
"github.com/pion/webrtc/v3"
"gitlab.crans.org/nounous/ghostream/messaging" "gitlab.crans.org/nounous/ghostream/messaging"
) )
@ -33,13 +32,6 @@ type Options struct {
var ( var (
cfg *Options cfg *Options
// WebRTC session description channels
remoteSdpChan chan struct {
StreamID string
RemoteDescription webrtc.SessionDescription
}
localSdpChan chan webrtc.SessionDescription
// Preload templates // Preload templates
templates *template.Template templates *template.Template
@ -78,13 +70,8 @@ func loadTemplates() error {
} }
// Serve HTTP server // Serve HTTP server
func Serve(s *messaging.Streams, rSdpChan chan struct { func Serve(s *messaging.Streams, c *Options) {
StreamID string
RemoteDescription webrtc.SessionDescription
}, lSdpChan chan webrtc.SessionDescription, c *Options) {
streams = s streams = s
remoteSdpChan = rSdpChan
localSdpChan = lSdpChan
cfg = c cfg = c
if !cfg.Enabled { if !cfg.Enabled {

View File

@ -14,7 +14,7 @@ func TestHTTPServe(t *testing.T) {
streams := messaging.New() streams := messaging.New()
// Create a disabled web server // Create a disabled web server
go Serve(streams, nil, nil, &Options{Enabled: false, ListenAddress: "127.0.0.1:8081"}) go Serve(streams, &Options{Enabled: false, ListenAddress: "127.0.0.1:8081"})
// Sleep 500ms to ensure that the web server is running, to avoid fails because the request came too early // Sleep 500ms to ensure that the web server is running, to avoid fails because the request came too early
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
@ -26,7 +26,7 @@ func TestHTTPServe(t *testing.T) {
} }
// Now let's really start the web server // Now let's really start the web server
go Serve(streams, nil, nil, &Options{Enabled: true, ListenAddress: "127.0.0.1:8081"}) go Serve(streams, &Options{Enabled: true, ListenAddress: "127.0.0.1:8081"})
// Sleep 500ms to ensure that the web server is running, to avoid fails because the request came too early // Sleep 500ms to ensure that the web server is running, to avoid fails because the request came too early
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)