2020-10-09 20:36:02 +00:00
// Package webrtc provides the backend to simulate a WebRTC client to send stream
2020-10-05 08:11:30 +00:00
package webrtc
import (
2020-10-19 08:18:45 +00:00
"github.com/3d0c/gmf"
2020-10-18 20:07:11 +00:00
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
2020-10-19 08:18:45 +00:00
"gitlab.crans.org/nounous/ghostream/stream"
2020-10-05 08:11:30 +00:00
"log"
2020-10-19 08:18:45 +00:00
"net"
2020-10-18 14:05:28 +00:00
"strings"
"time"
2020-10-05 08:11:30 +00:00
)
2020-10-13 07:50:46 +00:00
var (
2020-10-18 14:05:28 +00:00
activeStream map [ string ] struct { }
2020-10-13 07:50:46 +00:00
)
2020-10-18 14:05:28 +00:00
func autoIngest ( streams map [ string ] * stream . Stream ) {
// Regulary check existing streams
activeStream = make ( map [ string ] struct { } )
2020-10-05 08:11:30 +00:00
for {
2020-10-18 14:05:28 +00:00
for name , st := range streams {
if strings . Contains ( name , "@" ) {
// Not a source stream, pass
continue
2020-10-05 08:11:30 +00:00
}
2020-10-18 14:05:28 +00:00
if _ , ok := activeStream [ name ] ; ok {
// Stream is already ingested
continue
2020-10-12 22:30:05 +00:00
}
2020-10-18 14:05:28 +00:00
// Start ingestion
log . Printf ( "Starting webrtc for '%s'" , name )
2020-10-18 20:07:11 +00:00
// FIXME Ensure that the audio stream exist, but that's poop code
time . Sleep ( time . Second )
go ingest ( name , st , streams [ name + "@audio" ] )
2020-10-13 07:50:46 +00:00
}
2020-10-18 14:05:28 +00:00
// Regulary pull stream list,
// it may be better to tweak the messaging system
// to get an event on a new stream.
time . Sleep ( time . Second )
2020-10-13 07:50:46 +00:00
}
}
2020-10-12 22:30:05 +00:00
2020-10-18 20:07:11 +00:00
func ingest ( name string , input * stream . Stream , audio * stream . Stream ) {
2020-10-18 14:05:28 +00:00
// Register to get stream
videoInput := make ( chan [ ] byte , 1024 )
input . Register ( videoInput )
2020-10-18 20:07:11 +00:00
audioInput := make ( chan [ ] byte , 1024 )
audio . Register ( audioInput )
2020-10-18 14:05:28 +00:00
activeStream [ name ] = struct { } { }
2020-10-05 08:11:30 +00:00
2020-10-19 08:18:45 +00:00
inputCtx := gmf . NewCtx ( )
avioInputCtx , _ := gmf . NewAVIOContext ( inputCtx , & gmf . AVIOHandlers { ReadPacket : func ( ) ( [ ] byte , int ) {
data := <- audioInput
return data , len ( data )
} } )
log . Println ( "Open input" )
inputCtx . SetPb ( avioInputCtx ) . OpenInput ( "" )
log . Println ( "Opened" )
defer inputCtx . CloseInput ( )
defer avioInputCtx . Release ( )
if audioTracks [ name ] == nil {
audioTracks [ name ] = make ( [ ] * webrtc . Track , 0 )
}
udpListener , _ := net . ListenUDP ( "udp" , & net . UDPAddr { IP : net . ParseIP ( "127.0.0.1" ) , Port : 1234 } )
outputCtx , _ := gmf . NewOutputCtxWithFormatName ( "rtp://127.0.0.1:1234" , "rtp" )
avioOutputCtx , _ := gmf . NewAVIOContext ( outputCtx , & gmf . AVIOHandlers { WritePacket : func ( data [ ] byte ) int {
n := len ( data )
log . Printf ( "Read %d bytes" , n )
return n
} } )
// FIXME DON'T RAN AN UDP LISTENER, PLIZ GET DIRECTLY UDP PACKETS, WHY IS IT SO COMPLICATED????
// outputCtx.SetPb(avioOutputCtx)
defer outputCtx . CloseOutput ( )
defer avioOutputCtx . Release ( )
log . Printf ( "%d streams" , inputCtx . StreamsCnt ( ) )
for i := 0 ; i < inputCtx . StreamsCnt ( ) ; i ++ {
srcStream , err := inputCtx . GetStream ( i )
if err != nil {
log . Println ( "GetStream error" )
}
outputCtx . AddStreamWithCodeCtx ( srcStream . CodecCtx ( ) )
}
outputCtx . Dump ( )
if err := outputCtx . WriteHeader ( ) ; err != nil {
log . Printf ( "Unable to write RTP header: %s" , err )
}
2020-10-18 20:07:11 +00:00
// Receive audio data
2020-10-13 07:50:46 +00:00
go func ( ) {
2020-10-19 08:18:45 +00:00
buff := make ( [ ] byte , 1500 )
2020-10-13 07:50:46 +00:00
for {
2020-10-19 08:18:45 +00:00
n , _ := udpListener . Read ( buff )
if n == 0 {
return
}
2020-10-13 07:50:46 +00:00
packet := & rtp . Packet { }
2020-10-19 08:18:45 +00:00
if err := packet . Unmarshal ( buff [ : n ] ) ; err != nil {
2020-10-13 07:50:46 +00:00
log . Printf ( "Failed to unmarshal RTP srtPacket: %s" , err )
continue
2020-10-05 08:11:30 +00:00
}
2020-10-18 14:05:28 +00:00
if audioTracks [ name ] == nil {
audioTracks [ name ] = make ( [ ] * webrtc . Track , 0 )
2020-10-13 07:50:46 +00:00
}
2020-10-05 08:11:30 +00:00
2020-10-13 07:50:46 +00:00
// Write RTP srtPacket to all audio tracks
// Adapt payload and SSRC to match destination
2020-10-18 14:05:28 +00:00
for _ , audioTrack := range audioTracks [ name ] {
2020-10-13 07:50:46 +00:00
packet . Header . PayloadType = audioTrack . PayloadType ( )
packet . Header . SSRC = audioTrack . SSRC ( )
if writeErr := audioTrack . WriteRTP ( packet ) ; writeErr != nil {
2020-10-18 20:07:11 +00:00
log . Printf ( "Failed to write to audio track: %s" , writeErr )
2020-10-13 07:50:46 +00:00
continue
2020-10-05 08:11:30 +00:00
}
}
}
2020-10-13 07:50:46 +00:00
} ( )
2020-10-19 08:18:45 +00:00
first := false
for packet := range inputCtx . GetNewPackets ( ) {
if first { //if read from rtsp ,the first packets is wrong.
if err := outputCtx . WritePacketNoBuffer ( packet ) ; err != nil {
log . Printf ( "Error while writing packet: %s" , err )
}
}
first = true
packet . Free ( )
}
2020-10-18 20:07:11 +00:00
select { }
2020-10-18 14:05:28 +00:00
2020-10-18 20:07:11 +00:00
// TODO Register to all substreams and make RTP packets. Don't transcode in this package.
/ * // Open a UDP Listener for RTP Packets on port 5004
videoListener , err := net . ListenUDP ( "udp" , & net . UDPAddr { IP : net . ParseIP ( "127.0.0.1" ) , Port : 5004 } )
if err != nil {
log . Printf ( "Faited to open UDP listener %s" , err )
return
}
audioListener , err := net . ListenUDP ( "udp" , & net . UDPAddr { IP : net . ParseIP ( "127.0.0.1" ) , Port : 5005 } )
if err != nil {
log . Printf ( "Faited to open UDP listener %s" , err )
return
}
// Start ffmpag to convert videoInput to video and audio UDP
ffmpeg , err := startFFmpeg ( videoInput )
if err != nil {
log . Printf ( "Error while starting ffmpeg: %s" , err )
return
}
// Receive video
go func ( ) {
inboundRTPPacket := make ( [ ] byte , 1500 ) // UDP MTU
for {
n , _ , err := videoListener . ReadFromUDP ( inboundRTPPacket )
if err != nil {
log . Printf ( "Failed to read from UDP: %s" , err )
break
}
packet := & rtp . Packet { }
if err := packet . Unmarshal ( inboundRTPPacket [ : n ] ) ; err != nil {
log . Printf ( "Failed to unmarshal RTP srtPacket: %s" , err )
continue
}
if videoTracks [ name ] == nil {
videoTracks [ name ] = make ( [ ] * webrtc . Track , 0 )
}
// Write RTP srtPacket to all video tracks
// Adapt payload and SSRC to match destination
for _ , videoTrack := range videoTracks [ name ] {
packet . Header . PayloadType = videoTrack . PayloadType ( )
packet . Header . SSRC = videoTrack . SSRC ( )
if writeErr := videoTrack . WriteRTP ( packet ) ; writeErr != nil {
log . Printf ( "Failed to write to video track: %s" , err )
continue
}
}
}
} ( )
// Receive audio
go func ( ) {
inboundRTPPacket := make ( [ ] byte , 1500 ) // UDP MTU
for {
n , _ , err := audioListener . ReadFromUDP ( inboundRTPPacket )
if err != nil {
log . Printf ( "Failed to read from UDP: %s" , err )
break
}
packet := & rtp . Packet { }
if err := packet . Unmarshal ( inboundRTPPacket [ : n ] ) ; err != nil {
log . Printf ( "Failed to unmarshal RTP srtPacket: %s" , err )
continue
}
if audioTracks [ name ] == nil {
audioTracks [ name ] = make ( [ ] * webrtc . Track , 0 )
}
// Write RTP srtPacket to all audio tracks
// Adapt payload and SSRC to match destination
for _ , audioTrack := range audioTracks [ name ] {
packet . Header . PayloadType = audioTrack . PayloadType ( )
packet . Header . SSRC = audioTrack . SSRC ( )
if writeErr := audioTrack . WriteRTP ( packet ) ; writeErr != nil {
log . Printf ( "Failed to write to audio track: %s" , err )
continue
}
}
}
} ( )
// Wait for stopped ffmpeg
if err = ffmpeg . Wait ( ) ; err != nil {
log . Printf ( "Faited to wait for ffmpeg: %s" , err )
}
// Close UDP listeners
if err = videoListener . Close ( ) ; err != nil {
log . Printf ( "Faited to close UDP listener: %s" , err )
}
if err = audioListener . Close ( ) ; err != nil {
log . Printf ( "Faited to close UDP listener: %s" , err )
} * /
2020-10-18 14:05:28 +00:00
delete ( activeStream , name )
}
2020-10-18 20:07:11 +00:00
/ * func startFFmpeg ( in <- chan [ ] byte ) ( ffmpeg * exec . Cmd , err error ) {
2020-10-18 14:05:28 +00:00
ffmpegArgs := [ ] string { "-hide_banner" , "-loglevel" , "error" , "-i" , "pipe:0" ,
"-an" , "-vcodec" , "libvpx" , "-crf" , "10" , "-cpu-used" , "5" , "-b:v" , "6000k" , "-maxrate" , "8000k" , "-bufsize" , "12000k" , // TODO Change bitrate when changing quality
"-qmin" , "10" , "-qmax" , "42" , "-threads" , "4" , "-deadline" , "1" , "-error-resilient" , "1" ,
"-auto-alt-ref" , "1" ,
"-f" , "rtp" , "rtp://127.0.0.1:5004" ,
"-vn" , "-acodec" , "libopus" , "-cpu-used" , "5" , "-deadline" , "1" , "-qmin" , "10" , "-qmax" , "42" , "-error-resilient" , "1" , "-auto-alt-ref" , "1" ,
"-f" , "rtp" , "rtp://127.0.0.1:5005" }
ffmpeg = exec . Command ( "ffmpeg" , ffmpegArgs ... )
// Handle errors output
errOutput , err := ffmpeg . StderrPipe ( )
if err != nil {
return nil , err
}
2020-10-13 07:50:46 +00:00
go func ( ) {
scanner := bufio . NewScanner ( errOutput )
for scanner . Scan ( ) {
log . Printf ( "[WEBRTC FFMPEG %s] %s" , "demo" , scanner . Text ( ) )
2020-10-05 08:11:30 +00:00
}
2020-10-13 07:50:46 +00:00
} ( )
2020-10-18 14:05:28 +00:00
// Handle stream input
input , err := ffmpeg . StdinPipe ( )
if err != nil {
return nil , err
}
go func ( ) {
for data := range in {
if _ , err := input . Write ( data ) ; err != nil {
log . Printf ( "Failed to write data to ffmpeg input: %s" , err )
}
}
// End of stream
ffmpeg . Process . Kill ( )
} ( )
// Start process
err = ffmpeg . Start ( )
return ffmpeg , err
2020-10-18 20:07:11 +00:00
} * /