1
0
mirror of https://gitlab.crans.org/nounous/ghostream.git synced 2024-12-22 16:12:19 +00:00

Update package webrtc with Quality structure

This commit is contained in:
Alexandre Iooss 2020-10-19 19:48:44 +02:00
parent 340d0447a8
commit 34200afaed
No known key found for this signature in database
GPG Key ID: 6C79278F3FCDCC02
3 changed files with 29 additions and 36 deletions

View File

@ -6,50 +6,43 @@ import (
"log" "log"
"net" "net"
"os/exec" "os/exec"
"strings"
"time"
"github.com/pion/rtp" "github.com/pion/rtp"
"github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3"
"gitlab.crans.org/nounous/ghostream/stream" "gitlab.crans.org/nounous/ghostream/messaging"
) )
var ( func autoIngest(streams *messaging.Streams) {
activeStream map[string]struct{} // Subscribe to new stream event
) event := make(chan string, 8)
streams.Subscribe(event)
func autoIngest(streams map[string]*stream.Stream) { // For each new stream
// Regulary check existing streams for name := range event {
activeStream = make(map[string]struct{}) // Get stream
for { stream, err := streams.Get(name)
for name, st := range streams { if err != nil {
if strings.Contains(name, "@") { log.Printf("Failed to get stream '%s'", name)
// Not a source stream, pass
continue
}
if _, ok := activeStream[name]; ok {
// Stream is already ingested
continue
}
// Start ingestion
log.Printf("Starting webrtc for '%s'", name)
go ingest(name, st)
} }
// Regulary pull stream list, // Get specific quality
// it may be better to tweak the messaging system // FIXME: make it possible to forward other qualities
// to get an event on a new stream. qualityName := "source"
time.Sleep(time.Second) quality, err := stream.GetQuality(qualityName)
if err != nil {
log.Printf("Failed to get quality '%s'", qualityName)
}
// Start forwarding
log.Printf("Starting webrtc for '%s' quality '%s'", name, qualityName)
go ingest(name, quality)
} }
} }
func ingest(name string, input *stream.Stream) { func ingest(name string, q *messaging.Quality) {
// Register to get stream // Register to get stream
videoInput := make(chan []byte, 1024) videoInput := make(chan []byte, 1024)
input.Register(videoInput) q.Register(videoInput)
activeStream[name] = struct{}{}
// Open a UDP Listener for RTP Packets on port 5004 // Open a UDP Listener for RTP Packets on port 5004
videoListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5004}) videoListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5004})
@ -146,7 +139,7 @@ func ingest(name string, input *stream.Stream) {
if err = audioListener.Close(); err != nil { if err = audioListener.Close(); err != nil {
log.Printf("Faited to close UDP listener: %s", err) log.Printf("Faited to close UDP listener: %s", err)
} }
delete(activeStream, name) q.Unregister(videoInput)
} }
func startFFmpeg(in <-chan []byte) (ffmpeg *exec.Cmd, err error) { func startFFmpeg(in <-chan []byte) (ffmpeg *exec.Cmd, err error) {

View File

@ -8,7 +8,7 @@ import (
"github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3"
"gitlab.crans.org/nounous/ghostream/internal/monitoring" "gitlab.crans.org/nounous/ghostream/internal/monitoring"
"gitlab.crans.org/nounous/ghostream/stream" "gitlab.crans.org/nounous/ghostream/messaging"
) )
// Options holds web package configuration // Options holds web package configuration
@ -182,7 +182,7 @@ func getPayloadType(m webrtc.MediaEngine, codecType webrtc.RTPCodecType, codecNa
} }
// Serve WebRTC media streaming server // Serve WebRTC media streaming server
func Serve(streams map[string]*stream.Stream, remoteSdpChan chan struct { func Serve(streams *messaging.Streams, remoteSdpChan chan struct {
StreamID string StreamID string
RemoteDescription webrtc.SessionDescription RemoteDescription webrtc.SessionDescription
}, localSdpChan chan webrtc.SessionDescription, cfg *Options) { }, localSdpChan chan webrtc.SessionDescription, cfg *Options) {

View File

@ -5,12 +5,12 @@ import (
"testing" "testing"
"github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3"
"gitlab.crans.org/nounous/ghostream/stream" "gitlab.crans.org/nounous/ghostream/messaging"
) )
func TestServe(t *testing.T) { func TestServe(t *testing.T) {
// Init streams messaging and WebRTC server // Init streams messaging and WebRTC server
streams := make(map[string]*stream.Stream) streams := messaging.New()
remoteSdpChan := make(chan struct { remoteSdpChan := make(chan struct {
StreamID string StreamID string
RemoteDescription webrtc.SessionDescription RemoteDescription webrtc.SessionDescription