diff --git a/stream/webrtc/ingest.go b/stream/webrtc/ingest.go index b39b2f1..480f707 100644 --- a/stream/webrtc/ingest.go +++ b/stream/webrtc/ingest.go @@ -4,10 +4,15 @@ package webrtc import ( "bufio" "fmt" + "github.com/pion/webrtc/v3/pkg/media" + "github.com/pion/webrtc/v3/pkg/media/h264reader" + "io" "log" "math/rand" "net" + "os" "os/exec" + "time" "github.com/pion/rtp" "github.com/pion/webrtc/v3" @@ -22,19 +27,17 @@ func ingest(name string, q *messaging.Quality) { // FIXME Mux into RTP without having multiple UDP listeners firstPort := int(rand.Int31n(63535)) + 2000 - // Open UDP listeners for RTP Packets + // Open UDP listener for RTP Packets audioListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: firstPort}) if err != nil { log.Printf("Faited to open UDP listener %s", err) return } - videoListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: firstPort + 1}) - if err != nil { - log.Printf("Faited to open UDP listener %s", err) - return - } - // Start ffmpag to convert videoInput to video and audio UDP + f, _ := os.Open("CoffeeRun.h264") + h264, err := h264reader.NewReader(f) + + // Start ffmpag to convert videoInput to audio UDP ffmpeg, err := startFFmpeg(videoInput, firstPort) if err != nil { log.Printf("Error while starting ffmpeg: %s", err) @@ -43,31 +46,37 @@ func ingest(name string, q *messaging.Quality) { // Receive video go func() { - inboundRTPPacket := make([]byte, 1500) // UDP MTU for { - n, _, err := videoListener.ReadFromUDP(inboundRTPPacket) - if err != nil { - log.Printf("Failed to read from UDP: %s", err) + nal, h264Err := h264.NextNAL() + if h264Err == io.EOF { + fmt.Printf("All video frames parsed and sent") + return + } + if h264Err != nil { + log.Printf("Failed to read from H264: %s", h264Err) break } - packet := &rtp.Packet{} - if err := packet.Unmarshal(inboundRTPPacket[:n]); err != nil { - log.Printf("Failed to unmarshal RTP srtPacket: %s", err) - continue - } + + time.Sleep(time.Millisecond * 33) + nal.Data = append([]byte{0x00, 0x00, 0x00, 0x01}, nal.Data...) if videoTracks[name] == nil { videoTracks[name] = make([]*webrtc.Track, 0) } - // Write RTP srtPacket to all video tracks - // Adapt payload and SSRC to match destination + var spsAndPpsCache []byte + if nal.UnitType == h264reader.NalUnitTypeSPS || nal.UnitType == h264reader.NalUnitTypePPS { + spsAndPpsCache = append(spsAndPpsCache, nal.Data...) + continue + } else if nal.UnitType == h264reader.NalUnitTypeCodedSliceIdr { + nal.Data = append(spsAndPpsCache, nal.Data...) + spsAndPpsCache = []byte{} + } + log.Println(nal.PictureOrderCount) + for _, videoTrack := range videoTracks[name] { - packet.Header.PayloadType = videoTrack.PayloadType() - packet.Header.SSRC = videoTrack.SSRC() - if writeErr := videoTrack.WriteRTP(packet); writeErr != nil { - log.Printf("Failed to write to video track: %s", err) - continue + if h264Err = videoTrack.WriteSample(media.Sample{Data: nal.Data, Samples: 90000}); h264Err != nil { + panic(h264Err) } } } @@ -110,10 +119,7 @@ func ingest(name string, q *messaging.Quality) { log.Printf("Faited to wait for ffmpeg: %s", err) } - // Close UDP listeners - if err = videoListener.Close(); err != nil { - log.Printf("Faited to close UDP listener: %s", err) - } + // Close UDP listener if err = audioListener.Close(); err != nil { log.Printf("Faited to close UDP listener: %s", err) } @@ -124,10 +130,7 @@ func startFFmpeg(in <-chan []byte, listeningPort int) (ffmpeg *exec.Cmd, err err ffmpegArgs := []string{"-hide_banner", "-loglevel", "error", "-i", "pipe:0", // Audio "-vn", "-c:a", "libopus", "-b:a", "96k", - "-f", "rtp", fmt.Sprintf("rtp://127.0.0.1:%d", listeningPort), - // Source - "-an", "-c:v", "copy", - "-f", "rtp", fmt.Sprintf("rtp://127.0.0.1:%d", listeningPort+1)} + "-f", "rtp", fmt.Sprintf("rtp://127.0.0.1:%d", listeningPort)} ffmpeg = exec.Command("ffmpeg", ffmpegArgs...) // Handle errors output diff --git a/stream/webrtc/webrtc.go b/stream/webrtc/webrtc.go index c4cb7f6..20936df 100644 --- a/stream/webrtc/webrtc.go +++ b/stream/webrtc/webrtc.go @@ -75,8 +75,8 @@ func newPeerHandler(name string, localSdpChan chan webrtc.SessionDescription, re } // Create video track - codec, payloadType := getPayloadType(mediaEngine, webrtc.RTPCodecTypeVideo, "H264") - videoTrack, err := webrtc.NewTrack(payloadType, rand.Uint32(), "video", "pion", codec) + payloadType := getPayloadType(mediaEngine, webrtc.RTPCodecTypeVideo, "VP8") + videoTrack, err := peerConnection.NewTrack(payloadType, rand.Uint32(), "video", "pion") if err != nil { log.Println("Failed to create new video track", err) localSdpChan <- webrtc.SessionDescription{} @@ -89,8 +89,8 @@ func newPeerHandler(name string, localSdpChan chan webrtc.SessionDescription, re } // Create audio track - codec, payloadType = getPayloadType(mediaEngine, webrtc.RTPCodecTypeAudio, "opus") - audioTrack, err := webrtc.NewTrack(payloadType, rand.Uint32(), "audio", "pion", codec) + payloadType = getPayloadType(mediaEngine, webrtc.RTPCodecTypeAudio, "opus") + audioTrack, err := peerConnection.NewTrack(payloadType, rand.Uint32(), "audio", "pion") if err != nil { log.Println("Failed to create new audio track", err) localSdpChan <- webrtc.SessionDescription{} @@ -168,14 +168,14 @@ func newPeerHandler(name string, localSdpChan chan webrtc.SessionDescription, re // Search for Codec PayloadType // // Since we are answering we need to match the remote PayloadType -func getPayloadType(m webrtc.MediaEngine, codecType webrtc.RTPCodecType, codecName string) (*webrtc.RTPCodec, uint8) { +func getPayloadType(m webrtc.MediaEngine, codecType webrtc.RTPCodecType, codecName string) uint8 { for _, codec := range m.GetCodecsByKind(codecType) { if codec.Name == codecName { - return codec, codec.PayloadType + return codec.PayloadType } } log.Printf("Remote peer does not support %s", codecName) - return nil, 0 + return 0 } // Serve WebRTC media streaming server