diff --git a/main.go b/main.go index b45cc22..6a9453f 100644 --- a/main.go +++ b/main.go @@ -38,13 +38,6 @@ func main() { defer authBackend.Close() } - // WebRTC session description channels - remoteSdpChan := make(chan struct { - StreamID string - RemoteDescription webrtc.SessionDescription - }) - localSdpChan := make(chan webrtc.SessionDescription) - // Init streams messaging streams := messaging.New() @@ -54,8 +47,8 @@ func main() { go monitoring.Serve(&cfg.Monitoring) go srt.Serve(streams, authBackend, &cfg.Srt) go telnet.Serve(streams, &cfg.Telnet) - go web.Serve(streams, remoteSdpChan, localSdpChan, &cfg.Web) - go webrtc.Serve(streams, remoteSdpChan, localSdpChan, &cfg.WebRTC) + go web.Serve(streams, &cfg.Web) + go webrtc.Serve(streams, &cfg.WebRTC) // Wait for routines select {} diff --git a/messaging/quality.go b/messaging/quality.go index 8ed72c9..7115693 100644 --- a/messaging/quality.go +++ b/messaging/quality.go @@ -3,6 +3,8 @@ package messaging import ( "sync" + + "github.com/pion/webrtc/v3" ) // Quality holds a specific stream quality. @@ -17,6 +19,12 @@ type Quality struct { // Mutex to lock outputs map lockOutputs sync.Mutex + + // WebRTC session descriptor exchange. + // When new client connects, a SDP arrives on WebRtcRemoteSdp, + // then webrtc package answers on WebRtcLocalSdp. + WebRtcLocalSdp chan webrtc.SessionDescription + WebRtcRemoteSdp chan webrtc.SessionDescription } func newQuality() (q *Quality) { @@ -24,6 +32,8 @@ func newQuality() (q *Quality) { broadcast := make(chan []byte, 1024) q.Broadcast = broadcast q.outputs = make(map[chan []byte]struct{}) + q.WebRtcLocalSdp = make(chan webrtc.SessionDescription, 1) + q.WebRtcRemoteSdp = make(chan webrtc.SessionDescription, 1) go q.run(broadcast) return q } diff --git a/stream/webrtc/ingest.go b/stream/webrtc/ingest.go index 77a7bc1..25d7c11 100644 --- a/stream/webrtc/ingest.go +++ b/stream/webrtc/ingest.go @@ -12,33 +12,6 @@ import ( "gitlab.crans.org/nounous/ghostream/messaging" ) -func autoIngest(streams *messaging.Streams) { - // Subscribe to new stream event - event := make(chan string, 8) - streams.Subscribe(event) - - // For each new stream - for name := range event { - // Get stream - stream, err := streams.Get(name) - if err != nil { - log.Printf("Failed to get stream '%s'", name) - } - - // Get specific quality - // FIXME: make it possible to forward other qualities - qualityName := "source" - quality, err := stream.GetQuality(qualityName) - if err != nil { - log.Printf("Failed to get quality '%s'", qualityName) - } - - // Start forwarding - log.Printf("Starting webrtc for '%s' quality '%s'", name, qualityName) - go ingest(name, quality) - } -} - func ingest(name string, q *messaging.Quality) { // Register to get stream videoInput := make(chan []byte, 1024) diff --git a/stream/webrtc/webrtc.go b/stream/webrtc/webrtc.go index ddaceb3..a101799 100644 --- a/stream/webrtc/webrtc.go +++ b/stream/webrtc/webrtc.go @@ -45,13 +45,10 @@ func GetNumberConnectedSessions(streamID string) int { // newPeerHandler is called when server receive a new session description // this initiates a WebRTC connection and return server description -func newPeerHandler(localSdpChan chan webrtc.SessionDescription, remoteSdp struct { - StreamID string - RemoteDescription webrtc.SessionDescription -}, cfg *Options) { +func newPeerHandler(name string, localSdpChan chan webrtc.SessionDescription, remoteSdp webrtc.SessionDescription, cfg *Options) { // Create media engine using client SDP mediaEngine := webrtc.MediaEngine{} - if err := mediaEngine.PopulateFromSDP(remoteSdp.RemoteDescription); err != nil { + if err := mediaEngine.PopulateFromSDP(remoteSdp); err != nil { log.Println("Failed to create new media engine", err) localSdpChan <- webrtc.SessionDescription{} return @@ -106,13 +103,13 @@ func newPeerHandler(localSdpChan chan webrtc.SessionDescription, remoteSdp struc } // Set the remote SessionDescription - if err = peerConnection.SetRemoteDescription(remoteSdp.RemoteDescription); err != nil { + if err = peerConnection.SetRemoteDescription(remoteSdp); err != nil { log.Println("Failed to set remote description", err) localSdpChan <- webrtc.SessionDescription{} return } - streamID := remoteSdp.StreamID + streamID := name split := strings.SplitN(streamID, "@", 2) streamID = split[0] quality := "source" @@ -182,10 +179,7 @@ func getPayloadType(m webrtc.MediaEngine, codecType webrtc.RTPCodecType, codecNa } // Serve WebRTC media streaming server -func Serve(streams *messaging.Streams, remoteSdpChan chan struct { - StreamID string - RemoteDescription webrtc.SessionDescription -}, localSdpChan chan webrtc.SessionDescription, cfg *Options) { +func Serve(streams *messaging.Streams, cfg *Options) { if !cfg.Enabled { // WebRTC is not enabled, ignore return @@ -193,17 +187,42 @@ func Serve(streams *messaging.Streams, remoteSdpChan chan struct { log.Printf("WebRTC server using UDP from port %d to %d", cfg.MinPortUDP, cfg.MaxPortUDP) - // Allocate memory + // WebRTC ingested tracks videoTracks = make(map[string][]*webrtc.Track) audioTracks = make(map[string][]*webrtc.Track) - // Ingest data - go autoIngest(streams) + // Subscribe to new stream event + event := make(chan string, 8) + streams.Subscribe(event) + // For each new stream + for name := range event { + // Get stream + stream, err := streams.Get(name) + if err != nil { + log.Printf("Failed to get stream '%s'", name) + } + + // Get specific quality + // FIXME: make it possible to forward other qualities + qualityName := "source" + quality, err := stream.GetQuality(qualityName) + if err != nil { + log.Printf("Failed to get quality '%s'", qualityName) + } + + // Start forwarding + log.Printf("Starting webrtc for '%s' quality '%s'", name, qualityName) + go ingest(name, quality) + go listenSdp(name, quality.WebRtcLocalSdp, quality.WebRtcRemoteSdp, cfg) + } +} + +func listenSdp(name string, localSdp, remoteSdp chan webrtc.SessionDescription, cfg *Options) { // Handle new connections for { // Wait for incoming session description // then send the local description to browser - newPeerHandler(localSdpChan, <-remoteSdpChan, cfg) + newPeerHandler(name, localSdp, <-remoteSdp, cfg) } } diff --git a/stream/webrtc/webrtc_test.go b/stream/webrtc/webrtc_test.go index 8383d70..0277bb3 100644 --- a/stream/webrtc/webrtc_test.go +++ b/stream/webrtc/webrtc_test.go @@ -11,18 +11,13 @@ import ( func TestServe(t *testing.T) { // Init streams messaging and WebRTC server streams := messaging.New() - remoteSdpChan := make(chan struct { - StreamID string - RemoteDescription webrtc.SessionDescription - }) - localSdpChan := make(chan webrtc.SessionDescription) cfg := Options{ Enabled: true, MinPortUDP: 10000, MaxPortUDP: 10005, STUNServers: []string{"stun:stun.l.google.com:19302"}, } - go Serve(streams, remoteSdpChan, localSdpChan, &cfg) + go Serve(streams, &cfg) // New client connection mediaEngine := webrtc.MediaEngine{} @@ -58,12 +53,6 @@ func TestServe(t *testing.T) { peerConnection.SetLocalDescription(offer) <-gatherComplete - // Send offer to server - remoteSdpChan <- struct { - StreamID string - RemoteDescription webrtc.SessionDescription - }{"demo", *peerConnection.LocalDescription()} - _ = <-localSdpChan - + // FIXME: Send offer to server // FIXME: verify connection did work } diff --git a/web/handler.go b/web/handler.go index 3b6cc38..5b52bc2 100644 --- a/web/handler.go +++ b/web/handler.go @@ -51,12 +51,27 @@ func viewerPostHandler(w http.ResponseWriter, r *http.Request) { return } + // Get requested stream + stream, err := streams.Get(path) + if err != nil { + http.Error(w, "Stream not found", http.StatusNotFound) + log.Printf("Stream not found: %s", path) + return + } + + // Get requested quality + // FIXME: extract quality from request + qualityName := "source" + q, err := stream.GetQuality(qualityName) + if err != nil { + http.Error(w, "Quality not found", http.StatusNotFound) + log.Printf("Quality not found: %s", qualityName) + return + } + // Exchange session descriptions with WebRTC stream server - remoteSdpChan <- struct { - StreamID string - RemoteDescription webrtc.SessionDescription - }{StreamID: path, RemoteDescription: remoteDescription} - localDescription := <-localSdpChan + q.WebRtcRemoteSdp <- remoteDescription + localDescription := <-q.WebRtcLocalSdp // Send server description as JSON jsonDesc, err := json.Marshal(localDescription) diff --git a/web/web.go b/web/web.go index 40dc38b..32d1290 100644 --- a/web/web.go +++ b/web/web.go @@ -10,7 +10,6 @@ import ( "strings" "github.com/markbates/pkger" - "github.com/pion/webrtc/v3" "gitlab.crans.org/nounous/ghostream/messaging" ) @@ -33,13 +32,6 @@ type Options struct { var ( cfg *Options - // WebRTC session description channels - remoteSdpChan chan struct { - StreamID string - RemoteDescription webrtc.SessionDescription - } - localSdpChan chan webrtc.SessionDescription - // Preload templates templates *template.Template @@ -78,13 +70,8 @@ func loadTemplates() error { } // Serve HTTP server -func Serve(s *messaging.Streams, rSdpChan chan struct { - StreamID string - RemoteDescription webrtc.SessionDescription -}, lSdpChan chan webrtc.SessionDescription, c *Options) { +func Serve(s *messaging.Streams, c *Options) { streams = s - remoteSdpChan = rSdpChan - localSdpChan = lSdpChan cfg = c if !cfg.Enabled { diff --git a/web/web_test.go b/web/web_test.go index 0892cd2..2f96357 100644 --- a/web/web_test.go +++ b/web/web_test.go @@ -14,7 +14,7 @@ func TestHTTPServe(t *testing.T) { streams := messaging.New() // Create a disabled web server - go Serve(streams, nil, nil, &Options{Enabled: false, ListenAddress: "127.0.0.1:8081"}) + go Serve(streams, &Options{Enabled: false, ListenAddress: "127.0.0.1:8081"}) // Sleep 500ms to ensure that the web server is running, to avoid fails because the request came too early time.Sleep(500 * time.Millisecond) @@ -26,7 +26,7 @@ func TestHTTPServe(t *testing.T) { } // Now let's really start the web server - go Serve(streams, nil, nil, &Options{Enabled: true, ListenAddress: "127.0.0.1:8081"}) + go Serve(streams, &Options{Enabled: true, ListenAddress: "127.0.0.1:8081"}) // Sleep 500ms to ensure that the web server is running, to avoid fails because the request came too early time.Sleep(500 * time.Millisecond)