diff --git a/messaging/quality.go b/messaging/quality.go index a2c74c2..3afb3a1 100644 --- a/messaging/quality.go +++ b/messaging/quality.go @@ -10,6 +10,12 @@ import ( // Quality holds a specific stream quality. // It makes packages able to subscribe to an incoming stream. type Quality struct { + // Type of the quality + Name string + + // Source Stream + Stream *Stream + // Incoming data come from this channel Broadcast chan<- []byte @@ -27,8 +33,9 @@ type Quality struct { WebRtcRemoteSdp chan webrtc.SessionDescription } -func newQuality() (q *Quality) { - q = &Quality{} +func newQuality(name string, stream *Stream) (q *Quality) { + q = &Quality{Name: name} + q.Stream = stream broadcast := make(chan []byte, 1024) q.Broadcast = broadcast q.outputs = make(map[chan []byte]struct{}) diff --git a/messaging/stream.go b/messaging/stream.go index 4894d1e..b10a198 100644 --- a/messaging/stream.go +++ b/messaging/stream.go @@ -40,7 +40,7 @@ func (s *Stream) CreateQuality(name string) (quality *Quality, err error) { } s.lockQualities.Lock() - quality = newQuality() + quality = newQuality(name, s) s.qualities[name] = quality s.lockQualities.Unlock() return quality, nil diff --git a/stream/srt/handler.go b/stream/srt/handler.go index c6da6d2..3b740bf 100644 --- a/stream/srt/handler.go +++ b/stream/srt/handler.go @@ -24,6 +24,17 @@ func handleStreamer(socket *srtgo.SrtSocket, streams *messaging.Streams, name st socket.Close() return } + + // Create sub-qualities + for _, qualityName := range []string{"audio", "480p", "360p", "240p"} { + _, err := stream.CreateQuality(qualityName) + if err != nil { + log.Printf("Error on quality creating: %s", err) + socket.Close() + return + } + } + log.Printf("New SRT streamer for stream '%s' quality 'source'", name) // Read RTP packets forever and send them to the WebRTC Client diff --git a/stream/webrtc/ingest.go b/stream/webrtc/ingest.go index 0c5ecd6..beb67b1 100644 --- a/stream/webrtc/ingest.go +++ b/stream/webrtc/ingest.go @@ -14,33 +14,61 @@ import ( func ingest(name string, q *messaging.Quality) { // Register to get stream - videoInput := make(chan []byte, 1024) - q.Register(videoInput) + input := make(chan []byte, 1024) + // FIXME Stream data should already be transcoded + source, _ := q.Stream.GetQuality("source") + source.Register(input) - // Open a UDP Listener for RTP Packets on port 5004 - audioListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5004}) - if err != nil { - log.Printf("Faited to open UDP listener %s", err) - return + // FIXME Bad code + port := 5000 + var tracks map[string][]*webrtc.Track + qualityName := "" + switch q.Name { + case "audio": + port = 5004 + tracks = audioTracks + break + case "source": + port = 5005 + tracks = videoTracks + qualityName = "@source" + break + case "480p": + port = 5006 + tracks = videoTracks + qualityName = "@480p" + break + case "360p": + port = 5007 + tracks = videoTracks + qualityName = "@360p" + break + case "240p": + port = 5008 + tracks = videoTracks + qualityName = "@240p" + break } - videoListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5005}) + + // Open a UDP Listener for RTP Packets + listener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: port}) if err != nil { log.Printf("Faited to open UDP listener %s", err) return } - // Start ffmpag to convert videoInput to video and audio UDP - ffmpeg, err := startFFmpeg(videoInput) + // Start ffmpag to convert input to video and audio UDP + ffmpeg, err := startFFmpeg(q, input) if err != nil { log.Printf("Error while starting ffmpeg: %s", err) return } - // Receive video + // Receive stream go func() { inboundRTPPacket := make([]byte, 1500) // UDP MTU for { - n, _, err := videoListener.ReadFromUDP(inboundRTPPacket) + n, _, err := listener.ReadFromUDP(inboundRTPPacket) if err != nil { log.Printf("Failed to read from UDP: %s", err) break @@ -51,49 +79,13 @@ func ingest(name string, q *messaging.Quality) { continue } - if videoTracks[name] == nil { - videoTracks[name] = make([]*webrtc.Track, 0) - } - - // Write RTP srtPacket to all video tracks + // Write RTP srtPacket to all tracks // Adapt payload and SSRC to match destination - for _, videoTrack := range videoTracks[name] { - packet.Header.PayloadType = videoTrack.PayloadType() - packet.Header.SSRC = videoTrack.SSRC() - if writeErr := videoTrack.WriteRTP(packet); writeErr != nil { - log.Printf("Failed to write to video track: %s", err) - continue - } - } - } - }() - - // Receive audio - go func() { - inboundRTPPacket := make([]byte, 1500) // UDP MTU - for { - n, _, err := audioListener.ReadFromUDP(inboundRTPPacket) - if err != nil { - log.Printf("Failed to read from UDP: %s", err) - break - } - packet := &rtp.Packet{} - if err := packet.Unmarshal(inboundRTPPacket[:n]); err != nil { - log.Printf("Failed to unmarshal RTP srtPacket: %s", err) - continue - } - - if audioTracks[name] == nil { - audioTracks[name] = make([]*webrtc.Track, 0) - } - - // Write RTP srtPacket to all audio tracks - // Adapt payload and SSRC to match destination - for _, audioTrack := range audioTracks[name] { - packet.Header.PayloadType = audioTrack.PayloadType() - packet.Header.SSRC = audioTrack.SSRC() - if writeErr := audioTrack.WriteRTP(packet); writeErr != nil { - log.Printf("Failed to write to audio track: %s", err) + for _, track := range tracks[name+qualityName] { + packet.Header.PayloadType = track.PayloadType() + packet.Header.SSRC = track.SSRC() + if writeErr := track.WriteRTP(packet); writeErr != nil { + log.Printf("Failed to write to track: %s", writeErr) continue } } @@ -105,24 +97,47 @@ func ingest(name string, q *messaging.Quality) { log.Printf("Faited to wait for ffmpeg: %s", err) } - // Close UDP listeners - if err = videoListener.Close(); err != nil { + // Close UDP listener + if err = listener.Close(); err != nil { log.Printf("Faited to close UDP listener: %s", err) } - if err = audioListener.Close(); err != nil { - log.Printf("Faited to close UDP listener: %s", err) - } - q.Unregister(videoInput) + q.Unregister(input) } -func startFFmpeg(in <-chan []byte) (ffmpeg *exec.Cmd, err error) { - ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0", - // Audio - "-vn", "-c:a", "libopus", "-b:a", "160k", - "-f", "rtp", "rtp://127.0.0.1:5004", - // Source - "-an", "-c:v", "copy", "-b:v", "3000k", "-maxrate", "5000k", "-bufsize", "5000k", - "-f", "rtp", "rtp://127.0.0.1:5005"} +func startFFmpeg(q *messaging.Quality, in <-chan []byte) (ffmpeg *exec.Cmd, err error) { + // FIXME Use transcoders to downscale, then remux in RTP + ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0"} + switch q.Name { + case "audio": + ffmpegArgs = append(ffmpegArgs, "-vn", "-c:a", "libopus", "-b:a", "160k", + "-f", "rtp", "rtp://127.0.0.1:5004") + break + case "source": + ffmpegArgs = append(ffmpegArgs, "-an", "-c:v", "copy", + "-f", "rtp", "rtp://127.0.0.1:5005") + break + case "480p": + ffmpegArgs = append(ffmpegArgs, + "-an", "-c:v", "libx264", "-b:v", "1200k", "-maxrate", "2000k", "-bufsize", "3000k", + "-preset", "ultrafast", "-profile", "main", "-tune", "zerolatency", + "-vf", "scale=854:480", + "-f", "rtp", "rtp://127.0.0.1:5006") + break + case "360p": + ffmpegArgs = append(ffmpegArgs, + "-an", "-c:v", "libx264", "-b:v", "800k", "-maxrate", "1200k", "-bufsize", "1500k", + "-preset", "ultrafast", "-profile", "main", "-tune", "zerolatency", + "-vf", "scale=480:360", + "-f", "rtp", "rtp://127.0.0.1:5007") + break + case "240p": + ffmpegArgs = append(ffmpegArgs, + "-an", "-c:v", "libx264", "-b:v", "500k", "-maxrate", "800k", "-bufsize", "1000k", + "-preset", "ultrafast", "-profile", "main", "-tune", "zerolatency", + "-vf", "scale=360:240", + "-f", "rtp", "rtp://127.0.0.1:5008") + break + } ffmpeg = exec.Command("ffmpeg", ffmpegArgs...) // Handle errors output diff --git a/stream/webrtc/webrtc.go b/stream/webrtc/webrtc.go index c4cb7f6..67173f3 100644 --- a/stream/webrtc/webrtc.go +++ b/stream/webrtc/webrtc.go @@ -40,7 +40,7 @@ func removeTrack(tracks []*webrtc.Track, track *webrtc.Track) []*webrtc.Track { // GetNumberConnectedSessions get the number of currently connected clients func GetNumberConnectedSessions(streamID string) int { - return len(videoTracks[streamID]) + return len(audioTracks[streamID]) } // newPeerHandler is called when server receive a new session description @@ -117,21 +117,20 @@ func newPeerHandler(name string, localSdpChan chan webrtc.SessionDescription, re quality = split[1] } log.Printf("New WebRTC session for stream %s, quality %s", streamID, quality) - // TODO Consider the quality // Set the handler for ICE connection state // This will notify you when the peer has connected/disconnected peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { log.Printf("Connection State has changed %s \n", connectionState.String()) - if videoTracks[streamID] == nil { - videoTracks[streamID] = make([]*webrtc.Track, 0, 1) + if videoTracks[streamID+"@"+quality] == nil { + videoTracks[streamID+"@"+quality] = make([]*webrtc.Track, 0, 1) } if audioTracks[streamID] == nil { audioTracks[streamID] = make([]*webrtc.Track, 0, 1) } if connectionState == webrtc.ICEConnectionStateConnected { // Register tracks - videoTracks[streamID] = append(videoTracks[streamID], videoTrack) + videoTracks[streamID+"@"+quality] = append(videoTracks[streamID+"@"+quality], videoTrack) audioTracks[streamID] = append(audioTracks[streamID], audioTrack) monitoring.WebRTCConnectedSessions.Inc() } else if connectionState == webrtc.ICEConnectionStateDisconnected { @@ -205,16 +204,17 @@ func Serve(streams *messaging.Streams, cfg *Options) { // Get specific quality // FIXME: make it possible to forward other qualities - qualityName := "source" - quality, err := stream.GetQuality(qualityName) - if err != nil { - log.Printf("Failed to get quality '%s'", qualityName) - } + for _, qualityName := range []string{"source", "audio", "480p", "360p", "240p"} { + quality, err := stream.GetQuality(qualityName) + if err != nil { + log.Printf("Failed to get quality '%s'", qualityName) + } - // Start forwarding - log.Printf("Starting webrtc for '%s' quality '%s'", name, qualityName) - go ingest(name, quality) - go listenSdp(name, quality.WebRtcLocalSdp, quality.WebRtcRemoteSdp, cfg) + // Start forwarding + log.Printf("Starting webrtc for '%s' quality '%s'", name, qualityName) + go ingest(name, quality) + go listenSdp(name, quality.WebRtcLocalSdp, quality.WebRtcRemoteSdp, cfg) + } } } diff --git a/web/static/js/viewer.js b/web/static/js/viewer.js index 42e8b41..ee21a25 100644 --- a/web/static/js/viewer.js +++ b/web/static/js/viewer.js @@ -14,7 +14,7 @@ export function initViewerPage(stream, stunServers, viewersCounterRefreshPeriod) const viewer = document.getElementById("viewer"); // Default quality - let quality = "source"; + let quality = "240p"; // Create WebSocket and WebRTC const websocket = new GsWebSocket(); diff --git a/web/template/player.html b/web/template/player.html index 6cdd650..af9a08d 100644 --- a/web/template/player.html +++ b/web/template/player.html @@ -8,9 +8,9 @@