1
0
mirror of https://gitlab.crans.org/nounous/ghostream.git synced 2024-12-23 00:22:19 +00:00

💩 Split webrtc tracks by stream id (need to clean this, stream ID must pass between the session descriptor and the webrtc flux transmit)

This commit is contained in:
Yohann D'ANELLO 2020-10-05 22:00:08 +02:00
parent 76f009efe3
commit 022f6fb098
5 changed files with 87 additions and 30 deletions

View File

@ -104,7 +104,10 @@ func main() {
defer authBackend.Close() defer authBackend.Close()
// WebRTC session description channels // WebRTC session description channels
remoteSdpChan := make(chan webrtc.SessionDescription) remoteSdpChan := make(chan struct {
StreamID string
RemoteDescription webrtc.SessionDescription
})
localSdpChan := make(chan webrtc.SessionDescription) localSdpChan := make(chan webrtc.SessionDescription)
// SRT channel for forwarding and webrtc // SRT channel for forwarding and webrtc

View File

@ -2,6 +2,7 @@ package webrtc
import ( import (
"bufio" "bufio"
"github.com/pion/webrtc/v3"
"io" "io"
"log" "log"
"net" "net"
@ -18,10 +19,10 @@ func ingestFrom(inputChannel chan srt.Packet) {
for { for {
var err error = nil var err error = nil
packet := <-inputChannel srtPacket := <-inputChannel
switch packet.PacketType { switch srtPacket.PacketType {
case "register": case "register":
log.Printf("WebRTC RegisterStream %s", packet.StreamName) log.Printf("WebRTC RegisterStream %s", srtPacket.StreamName)
// Open a UDP Listener for RTP Packets on port 5004 // Open a UDP Listener for RTP Packets on port 5004
videoListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5004}) videoListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5004})
@ -74,13 +75,17 @@ func ingestFrom(inputChannel chan srt.Packet) {
} }
packet := &rtp.Packet{} packet := &rtp.Packet{}
if err := packet.Unmarshal(inboundRTPPacket[:n]); err != nil { if err := packet.Unmarshal(inboundRTPPacket[:n]); err != nil {
log.Printf("Failed to unmarshal RTP packet: %s", err) log.Printf("Failed to unmarshal RTP srtPacket: %s", err)
continue continue
} }
// Write RTP packet to all video tracks if videoTracks[srtPacket.StreamName] == nil {
videoTracks[srtPacket.StreamName] = make([]*webrtc.Track, 0)
}
// Write RTP srtPacket to all video tracks
// Adapt payload and SSRC to match destination // Adapt payload and SSRC to match destination
for _, videoTrack := range videoTracks { for _, videoTrack := range videoTracks[srtPacket.StreamName] {
packet.Header.PayloadType = videoTrack.PayloadType() packet.Header.PayloadType = videoTrack.PayloadType()
packet.Header.SSRC = videoTrack.SSRC() packet.Header.SSRC = videoTrack.SSRC()
if writeErr := videoTrack.WriteRTP(packet); writeErr != nil { if writeErr := videoTrack.WriteRTP(packet); writeErr != nil {
@ -102,13 +107,17 @@ func ingestFrom(inputChannel chan srt.Packet) {
} }
packet := &rtp.Packet{} packet := &rtp.Packet{}
if err := packet.Unmarshal(inboundRTPPacket[:n]); err != nil { if err := packet.Unmarshal(inboundRTPPacket[:n]); err != nil {
log.Printf("Failed to unmarshal RTP packet: %s", err) log.Printf("Failed to unmarshal RTP srtPacket: %s", err)
continue continue
} }
// Write RTP packet to all audio tracks if audioTracks[srtPacket.StreamName] == nil {
audioTracks[srtPacket.StreamName] = make([]*webrtc.Track, 0)
}
// Write RTP srtPacket to all audio tracks
// Adapt payload and SSRC to match destination // Adapt payload and SSRC to match destination
for _, audioTrack := range audioTracks { for _, audioTrack := range audioTracks[srtPacket.StreamName] {
packet.Header.PayloadType = audioTrack.PayloadType() packet.Header.PayloadType = audioTrack.PayloadType()
packet.Header.SSRC = audioTrack.SSRC() packet.Header.SSRC = audioTrack.SSRC()
if writeErr := audioTrack.WriteRTP(packet); writeErr != nil { if writeErr := audioTrack.WriteRTP(packet); writeErr != nil {
@ -127,20 +136,20 @@ func ingestFrom(inputChannel chan srt.Packet) {
}() }()
break break
case "sendData": case "sendData":
// FIXME send to stream packet.StreamName // FIXME send to stream srtPacket.StreamName
if _, err := ffmpegInput.Write(packet.Data); err != nil { if _, err := ffmpegInput.Write(srtPacket.Data); err != nil {
log.Printf("Failed to write data to ffmpeg input: %s", err) log.Printf("Failed to write data to ffmpeg input: %s", err)
} }
break break
case "close": case "close":
log.Printf("WebRTC CloseConnection %s", packet.StreamName) log.Printf("WebRTC CloseConnection %s", srtPacket.StreamName)
break break
default: default:
log.Println("Unknown SRT packet type:", packet.PacketType) log.Println("Unknown SRT srtPacket type:", srtPacket.PacketType)
break break
} }
if err != nil { if err != nil {
log.Printf("Error occured while receiving SRT packet of type %s: %s", packet.PacketType, err) log.Printf("Error occured while receiving SRT srtPacket of type %s: %s", srtPacket.PacketType, err)
} }
} }
} }

View File

@ -23,8 +23,8 @@ type Options struct {
type SessionDescription = webrtc.SessionDescription type SessionDescription = webrtc.SessionDescription
var ( var (
videoTracks []*webrtc.Track videoTracks map[string][]*webrtc.Track
audioTracks []*webrtc.Track audioTracks map[string][]*webrtc.Track
) )
// Helper to reslice tracks // Helper to reslice tracks
@ -44,10 +44,13 @@ func GetNumberConnectedSessions() int {
// newPeerHandler is called when server receive a new session description // newPeerHandler is called when server receive a new session description
// this initiates a WebRTC connection and return server description // this initiates a WebRTC connection and return server description
func newPeerHandler(remoteSdp webrtc.SessionDescription, cfg *Options) webrtc.SessionDescription { func newPeerHandler(remoteSdp struct {
StreamID string
RemoteDescription webrtc.SessionDescription
}, cfg *Options) webrtc.SessionDescription {
// Create media engine using client SDP // Create media engine using client SDP
mediaEngine := webrtc.MediaEngine{} mediaEngine := webrtc.MediaEngine{}
if err := mediaEngine.PopulateFromSDP(remoteSdp); err != nil { if err := mediaEngine.PopulateFromSDP(remoteSdp.RemoteDescription); err != nil {
log.Println("Failed to create new media engine", err) log.Println("Failed to create new media engine", err)
return webrtc.SessionDescription{} return webrtc.SessionDescription{}
} }
@ -95,7 +98,7 @@ func newPeerHandler(remoteSdp webrtc.SessionDescription, cfg *Options) webrtc.Se
} }
// Set the remote SessionDescription // Set the remote SessionDescription
if err = peerConnection.SetRemoteDescription(remoteSdp); err != nil { if err = peerConnection.SetRemoteDescription(remoteSdp.RemoteDescription); err != nil {
log.Println("Failed to set remote description", err) log.Println("Failed to set remote description", err)
return webrtc.SessionDescription{} return webrtc.SessionDescription{}
} }
@ -116,19 +119,27 @@ func newPeerHandler(remoteSdp webrtc.SessionDescription, cfg *Options) webrtc.Se
return webrtc.SessionDescription{} return webrtc.SessionDescription{}
} }
streamID := remoteSdp.StreamID
// Set the handler for ICE connection state // Set the handler for ICE connection state
// This will notify you when the peer has connected/disconnected // This will notify you when the peer has connected/disconnected
peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
log.Printf("Connection State has changed %s \n", connectionState.String()) log.Printf("Connection State has changed %s \n", connectionState.String())
if videoTracks[streamID] == nil {
videoTracks[streamID] = make([]*webrtc.Track, 0, 1)
}
if audioTracks[streamID] == nil {
audioTracks[streamID] = make([]*webrtc.Track, 0, 1)
}
if connectionState == webrtc.ICEConnectionStateConnected { if connectionState == webrtc.ICEConnectionStateConnected {
// Register tracks // Register tracks
videoTracks = append(videoTracks, videoTrack) videoTracks[streamID] = append(videoTracks[streamID], videoTrack)
audioTracks = append(audioTracks, audioTrack) audioTracks[streamID] = append(audioTracks[streamID], audioTrack)
monitoring.WebRTCConnectedSessions.Inc() monitoring.WebRTCConnectedSessions.Inc()
} else if connectionState == webrtc.ICEConnectionStateDisconnected { } else if connectionState == webrtc.ICEConnectionStateDisconnected {
// Unregister tracks // Unregister tracks
videoTracks = removeTrack(videoTracks, videoTrack) videoTracks[streamID] = removeTrack(videoTracks[streamID], videoTrack)
audioTracks = removeTrack(audioTracks, audioTrack) audioTracks[streamID] = removeTrack(audioTracks[streamID], audioTrack)
monitoring.WebRTCConnectedSessions.Dec() monitoring.WebRTCConnectedSessions.Dec()
} }
}) })
@ -155,9 +166,16 @@ func getPayloadType(m webrtc.MediaEngine, codecType webrtc.RTPCodecType, codecNa
} }
// Serve WebRTC media streaming server // Serve WebRTC media streaming server
func Serve(remoteSdpChan, localSdpChan chan webrtc.SessionDescription, inputChannel chan srt.Packet, cfg *Options) { func Serve(remoteSdpChan chan struct {
StreamID string
RemoteDescription webrtc.SessionDescription
}, localSdpChan chan webrtc.SessionDescription, inputChannel chan srt.Packet, cfg *Options) {
log.Printf("WebRTC server using UDP from port %d to %d", cfg.MinPortUDP, cfg.MaxPortUDP) log.Printf("WebRTC server using UDP from port %d to %d", cfg.MinPortUDP, cfg.MaxPortUDP)
// Allocate memory
videoTracks = make(map[string][]*webrtc.Track)
audioTracks = make(map[string][]*webrtc.Track)
// Ingest data from SRT // Ingest data from SRT
go ingestFrom(inputChannel) go ingestFrom(inputChannel)

View File

@ -19,6 +19,21 @@ func viewerPostHandler(w http.ResponseWriter, r *http.Request) {
// Limit response body to 128KB // Limit response body to 128KB
r.Body = http.MaxBytesReader(w, r.Body, 131072) r.Body = http.MaxBytesReader(w, r.Body, 131072)
// Get stream ID from URL, or from domain name
path := r.URL.Path[1:]
if cfg.OneStreamPerDomain {
host := r.Host
if strings.Contains(host, ":") {
realHost, _, err := net.SplitHostPort(r.Host)
if err != nil {
log.Printf("Failed to split host and port from %s", r.Host)
return
}
host = realHost
}
path = host
}
// Decode client description // Decode client description
dec := json.NewDecoder(r.Body) dec := json.NewDecoder(r.Body)
dec.DisallowUnknownFields() dec.DisallowUnknownFields()
@ -29,7 +44,10 @@ func viewerPostHandler(w http.ResponseWriter, r *http.Request) {
} }
// Exchange session descriptions with WebRTC stream server // Exchange session descriptions with WebRTC stream server
remoteSdpChan <- remoteDescription remoteSdpChan <- struct {
StreamID string
RemoteDescription webrtc.SessionDescription
}{StreamID: path, RemoteDescription: remoteDescription}
localDescription := <-localSdpChan localDescription := <-localSdpChan
// Send server description as JSON // Send server description as JSON
@ -40,7 +58,10 @@ func viewerPostHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
_, _ = w.Write(jsonDesc) _, err = w.Write(jsonDesc)
if err != nil {
log.Println("An error occurred while sending session description", err)
}
// Increment monitoring // Increment monitoring
monitoring.WebSessions.Inc() monitoring.WebSessions.Inc()

View File

@ -30,7 +30,10 @@ var (
cfg *Options cfg *Options
// WebRTC session description channels // WebRTC session description channels
remoteSdpChan chan webrtc.SessionDescription remoteSdpChan chan struct {
StreamID string
RemoteDescription webrtc.SessionDescription
}
localSdpChan chan webrtc.SessionDescription localSdpChan chan webrtc.SessionDescription
// Preload templates // Preload templates
@ -71,7 +74,10 @@ func loadTemplates() error {
} }
// Serve HTTP server // Serve HTTP server
func Serve(rSdpChan chan webrtc.SessionDescription, lSdpChan chan webrtc.SessionDescription, c *Options) { func Serve(rSdpChan chan struct {
StreamID string
RemoteDescription webrtc.SessionDescription
}, lSdpChan chan webrtc.SessionDescription, c *Options) {
remoteSdpChan = rSdpChan remoteSdpChan = rSdpChan
localSdpChan = lSdpChan localSdpChan = lSdpChan
cfg = c cfg = c