From ef760ae4cc18cb86b8aaf86fba5e5cdbef4d0da2 Mon Sep 17 00:00:00 2001 From: Alexandre Iooss Date: Fri, 25 Sep 2020 12:03:40 +0200 Subject: [PATCH] Create first webrtc session like others --- stream/stream.go | 110 ++++++++++++++++------------------------------- 1 file changed, 38 insertions(+), 72 deletions(-) diff --git a/stream/stream.go b/stream/stream.go index a294523..335775b 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -21,19 +21,34 @@ const ( ) var ( - peerConnectionConfig webrtc.Configuration + iceConnectedCtx, iceConnectedCtxCancel = context.WithCancel(context.Background()) ) // newPeerHandler is called when server receive a new session description // this initiates a WebRTC connection and return server description func newPeerHandler(api *webrtc.API, remoteSdp webrtc.SessionDescription, audioTrack *webrtc.Track, videoTrack *webrtc.Track) webrtc.SessionDescription { // Create a new PeerConnection - peerConnection, err := api.NewPeerConnection(peerConnectionConfig) + peerConnection, err := api.NewPeerConnection(webrtc.Configuration{ + ICEServers: []webrtc.ICEServer{ + { + URLs: []string{"stun:stun.l.google.com:19302"}, + }, + }, + }) if err != nil { log.Println("Failed to initiate peer connection", err) return webrtc.SessionDescription{} } + // Set the handler for ICE connection state + // This will notify you when the peer has connected/disconnected + peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { + log.Printf("Connection State has changed %s \n", connectionState.String()) + if connectionState == webrtc.ICEConnectionStateConnected { + iceConnectedCtxCancel() + } + }) + // Add audio and video tracks if _, err = peerConnection.AddTrack(audioTrack); err != nil { log.Println("Failed to add audio track", err) @@ -77,48 +92,42 @@ func newPeerHandler(api *webrtc.API, remoteSdp webrtc.SessionDescription, audioT // Serve WebRTC media streaming server func Serve(remoteSdpChan chan webrtc.SessionDescription, localSdpChan chan webrtc.SessionDescription) { - peerConnectionConfig = webrtc.Configuration{ - ICEServers: []webrtc.ICEServer{ - { - URLs: []string{"stun:stun.l.google.com:19302"}, - }, - }, - } - // Assert that we have an audio or video file _, err := os.Stat(videoFileName) haveVideoFile := !os.IsNotExist(err) _, err = os.Stat(audioFileName) haveAudioFile := !os.IsNotExist(err) - if !haveAudioFile && !haveVideoFile { + if !haveAudioFile || !haveVideoFile { panic("Could not find `" + audioFileName + "` or `" + videoFileName + "`") } - // We make our own mediaEngine so we can place the sender's codecs in it. This because we must use the - // dynamic media type from the sender in our answer. This is not required if we are the offerer + // Create media engine + // Only support VP8 and Opus mediaEngine := webrtc.MediaEngine{} offer := <-remoteSdpChan if err = mediaEngine.PopulateFromSDP(offer); err != nil { panic(err) } - // Create a new RTCPeerConnection + // Create a new API object api := webrtc.NewAPI(webrtc.WithMediaEngine(mediaEngine)) - peerConnection, err := api.NewPeerConnection(peerConnectionConfig) + + // Create video track + codec, payloadType := getPayloadType(mediaEngine, webrtc.RTPCodecTypeVideo, "VP8") + videoTrack, err := webrtc.NewTrack(payloadType, rand.Uint32(), "video", "pion", codec) if err != nil { panic(err) } - iceConnectedCtx, iceConnectedCtxCancel := context.WithCancel(context.Background()) - // Create a video track - videoTrack, addTrackErr := peerConnection.NewTrack(getPayloadType(mediaEngine, webrtc.RTPCodecTypeVideo, "VP8"), rand.Uint32(), "video", "pion") - if addTrackErr != nil { - panic(addTrackErr) - } - if _, addTrackErr = peerConnection.AddTrack(videoTrack); addTrackErr != nil { - panic(addTrackErr) + // Create audio track + codec, payloadType = getPayloadType(mediaEngine, webrtc.RTPCodecTypeAudio, "opus") + audioTrack, err := webrtc.NewTrack(payloadType, rand.Uint32(), "audio", "pion", codec) + if err != nil { + panic(err) } + localSdpChan <- newPeerHandler(api, offer, audioTrack, videoTrack) + go func() { // Open a IVF file and start reading using our IVFReader file, ivfErr := os.Open(videoFileName) @@ -138,6 +147,7 @@ func Serve(remoteSdpChan chan webrtc.SessionDescription, localSdpChan chan webrt // This isn't required since the video is timestamped, but we will such much higher loss if we send all at once. sleepTime := time.Millisecond * time.Duration((float32(header.TimebaseNumerator)/float32(header.TimebaseDenominator))*1000) for { + // Need at least one client frame, _, ivfErr := ivf.ParseNextFrame() if ivfErr == io.EOF { fmt.Printf("All video frames parsed and sent") @@ -150,20 +160,11 @@ func Serve(remoteSdpChan chan webrtc.SessionDescription, localSdpChan chan webrt time.Sleep(sleepTime) if ivfErr = videoTrack.WriteSample(media.Sample{Data: frame, Samples: 90000}); ivfErr != nil { - panic(ivfErr) + log.Fatalln("Failed to write video stream:", ivfErr) } } }() - // Create a audio track - audioTrack, addTrackErr := peerConnection.NewTrack(getPayloadType(mediaEngine, webrtc.RTPCodecTypeAudio, "opus"), rand.Uint32(), "audio", "pion") - if addTrackErr != nil { - panic(addTrackErr) - } - if _, addTrackErr = peerConnection.AddTrack(audioTrack); addTrackErr != nil { - panic(addTrackErr) - } - go func() { // Open a IVF file and start reading using our IVFReader file, oggErr := os.Open(audioFileName) @@ -183,6 +184,7 @@ func Serve(remoteSdpChan chan webrtc.SessionDescription, localSdpChan chan webrt // Keep track of last granule, the difference is the amount of samples in the buffer var lastGranule uint64 for { + // Need at least one client pageData, pageHeader, oggErr := ogg.ParseNextPage() if oggErr == io.EOF { fmt.Printf("All audio pages parsed and sent") @@ -198,7 +200,7 @@ func Serve(remoteSdpChan chan webrtc.SessionDescription, localSdpChan chan webrt lastGranule = pageHeader.GranulePosition if oggErr = audioTrack.WriteSample(media.Sample{Data: pageData, Samples: uint32(sampleCount)}); oggErr != nil { - panic(oggErr) + log.Fatalln("Failed to write audio stream:", oggErr) } // Convert seconds to Milliseconds, Sleep doesn't accept floats @@ -206,42 +208,6 @@ func Serve(remoteSdpChan chan webrtc.SessionDescription, localSdpChan chan webrt } }() - // Set the handler for ICE connection state - // This will notify you when the peer has connected/disconnected - peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { - fmt.Printf("Connection State has changed %s \n", connectionState.String()) - if connectionState == webrtc.ICEConnectionStateConnected { - iceConnectedCtxCancel() - } - }) - - // Set the remote SessionDescription - if err = peerConnection.SetRemoteDescription(offer); err != nil { - panic(err) - } - - // Create answer - answer, err := peerConnection.CreateAnswer(nil) - if err != nil { - panic(err) - } - - // Create channel that is blocked until ICE Gathering is complete - gatherComplete := webrtc.GatheringCompletePromise(peerConnection) - - // Sets the LocalDescription, and starts our UDP listeners - if err = peerConnection.SetLocalDescription(answer); err != nil { - panic(err) - } - - // Block until ICE Gathering is complete, disabling trickle ICE - // we do this because we only can exchange one signaling message - // in a production application you should exchange ICE Candidates via OnICECandidate - <-gatherComplete - - // Output the answer - localSdpChan <- *peerConnection.LocalDescription() - // Handle new connections for { // Wait for incoming session description @@ -254,10 +220,10 @@ func Serve(remoteSdpChan chan webrtc.SessionDescription, localSdpChan chan webrt // Search for Codec PayloadType // // Since we are answering we need to match the remote PayloadType -func getPayloadType(m webrtc.MediaEngine, codecType webrtc.RTPCodecType, codecName string) uint8 { +func getPayloadType(m webrtc.MediaEngine, codecType webrtc.RTPCodecType, codecName string) (*webrtc.RTPCodec, uint8) { for _, codec := range m.GetCodecsByKind(codecType) { if codec.Name == codecName { - return codec.PayloadType + return codec, codec.PayloadType } } panic(fmt.Sprintf("Remote peer does not support %s", codecName))