Update package forwarding with Quality structure

This commit is contained in:
Alexandre Iooss 2020-10-19 19:28:30 +02:00
parent bb589a71ce
commit c317d91b8d
No known key found for this signature in database
GPG Key ID: 6C79278F3FCDCC02
3 changed files with 35 additions and 24 deletions

View File

@ -10,7 +10,7 @@ import (
"gitlab.crans.org/nounous/ghostream/auth" "gitlab.crans.org/nounous/ghostream/auth"
"gitlab.crans.org/nounous/ghostream/internal/config" "gitlab.crans.org/nounous/ghostream/internal/config"
"gitlab.crans.org/nounous/ghostream/internal/monitoring" "gitlab.crans.org/nounous/ghostream/internal/monitoring"
"gitlab.crans.org/nounous/ghostream/stream" "gitlab.crans.org/nounous/ghostream/messaging"
"gitlab.crans.org/nounous/ghostream/stream/forwarding" "gitlab.crans.org/nounous/ghostream/stream/forwarding"
"gitlab.crans.org/nounous/ghostream/stream/srt" "gitlab.crans.org/nounous/ghostream/stream/srt"
"gitlab.crans.org/nounous/ghostream/stream/telnet" "gitlab.crans.org/nounous/ghostream/stream/telnet"
@ -46,7 +46,7 @@ func main() {
localSdpChan := make(chan webrtc.SessionDescription) localSdpChan := make(chan webrtc.SessionDescription)
// Init streams messaging // Init streams messaging
streams := make(map[string]*stream.Stream) streams := messaging.New()
// Start routines // Start routines
go transcoder.Init(streams, &cfg.Transcoder) go transcoder.Init(streams, &cfg.Transcoder)

View File

@ -5,9 +5,8 @@ import (
"bufio" "bufio"
"log" "log"
"os/exec" "os/exec"
"time"
"gitlab.crans.org/nounous/ghostream/stream" "gitlab.crans.org/nounous/ghostream/messaging"
) )
// Options to configure the stream forwarding. // Options to configure the stream forwarding.
@ -15,37 +14,49 @@ import (
type Options map[string][]string type Options map[string][]string
// Serve handles incoming packets from SRT and forward them to other external services // Serve handles incoming packets from SRT and forward them to other external services
func Serve(streams map[string]*stream.Stream, cfg Options) { func Serve(streams *messaging.Streams, cfg Options) {
if len(cfg) < 1 { if len(cfg) < 1 {
// No forwarding, ignore // No forwarding, ignore
return return
} }
// Subscribe to new stream event
event := make(chan string, 8)
streams.Subscribe(event)
log.Printf("Stream forwarding initialized") log.Printf("Stream forwarding initialized")
for {
for name, st := range streams { // For each new stream
fwdCfg, ok := cfg[name] for name := range event {
streamCfg, ok := cfg[name]
if !ok { if !ok {
// Not configured // Not configured
continue continue
} }
// Start forwarding // Get stream
log.Printf("Starting forwarding for '%s'", name) stream, err := streams.Get(name)
go forward(st, fwdCfg) if err != nil {
log.Printf("Failed to get stream '%s'", name)
} }
// Regulary pull stream list, // Get specific quality
// it may be better to tweak the messaging system // FIXME: make it possible to forward other qualities
// to get an event on a new stream. qualityName := "source"
time.Sleep(time.Second) quality, err := stream.GetQuality(qualityName)
if err != nil {
log.Printf("Failed to get quality '%s'", qualityName)
}
// Start forwarding
log.Printf("Starting forwarding for '%s' quality '%s'", name, qualityName)
go forward(quality, streamCfg)
} }
} }
// Start a FFMPEG instance and redirect stream output to forwarded streams // Start a FFMPEG instance and redirect stream output to forwarded streams
func forward(st *stream.Stream, fwdCfg []string) { func forward(q *messaging.Quality, fwdCfg []string) {
output := make(chan []byte, 1024) output := make(chan []byte, 1024)
st.Register(output) q.Register(output)
// Launch FFMPEG instance // Launch FFMPEG instance
params := []string{"-hide_banner", "-loglevel", "error", "-re", "-i", "pipe:0"} params := []string{"-hide_banner", "-loglevel", "error", "-re", "-i", "pipe:0"}
@ -77,7 +88,7 @@ func forward(st *stream.Stream, fwdCfg []string) {
_ = input.Close() _ = input.Close()
_ = errOutput.Close() _ = errOutput.Close()
_ = ffmpeg.Process.Kill() _ = ffmpeg.Process.Kill()
st.Unregister(output) q.Unregister(output)
}() }()
// Log standard error output // Log standard error output

View File

@ -6,7 +6,7 @@ import (
"testing" "testing"
"time" "time"
"gitlab.crans.org/nounous/ghostream/stream" "gitlab.crans.org/nounous/ghostream/messaging"
"gitlab.crans.org/nounous/ghostream/stream/srt" "gitlab.crans.org/nounous/ghostream/stream/srt"
) )
@ -35,7 +35,7 @@ func TestForwardStream(t *testing.T) {
cfg["demo"] = []string{"rtmp://127.0.0.1:1936/live/app"} cfg["demo"] = []string{"rtmp://127.0.0.1:1936/live/app"}
// Register forwarding stream list // Register forwarding stream list
streams := make(map[string]*stream.Stream) streams := messaging.New()
go Serve(streams, cfg) go Serve(streams, cfg)
// Serve SRT Server without authentification backend // Serve SRT Server without authentification backend