diff --git a/main.go b/main.go index eb9cb1d..c0400fb 100644 --- a/main.go +++ b/main.go @@ -90,14 +90,17 @@ func main() { remoteSdpChan := make(chan webrtc.SessionDescription) localSdpChan := make(chan webrtc.SessionDescription) + // SRT channel, to propagate forwarding + forwardingChannel := make(chan srt.Packet) + // Start stream, web and monitoring server go monitoring.Serve(&cfg.Monitoring) - go srt.Serve(&cfg.Srt) + go srt.Serve(&cfg.Srt, forwardingChannel) go web.Serve(remoteSdpChan, localSdpChan, &cfg.Web) go webrtc.Serve(remoteSdpChan, localSdpChan, &cfg.WebRTC) // Configure stream forwarding - forwarding.New(cfg.Forwarding) + forwarding.New(cfg.Forwarding, forwardingChannel) // Wait for routines select {} diff --git a/stream/forwarding/forwarding.go b/stream/forwarding/forwarding.go index e6bd55d..e4225dd 100644 --- a/stream/forwarding/forwarding.go +++ b/stream/forwarding/forwarding.go @@ -2,6 +2,7 @@ package forwarding import ( "bufio" + "gitlab.crans.org/nounous/ghostream/stream/srt" "io" "log" "os/exec" @@ -13,16 +14,43 @@ type Options map[string][]string var ( cfg Options + forwardingChannel chan srt.Packet ffmpegInstances = make(map[string]*exec.Cmd) ffmpegInputStreams = make(map[string]*io.WriteCloser) ) -// New Load configuration -func New(c Options) { +// New Load configuration and initialize SRT channel +func New(c Options, channel chan srt.Packet) { cfg = c + forwardingChannel = channel + go waitForPackets() log.Printf("Stream forwarding initialized") } +func waitForPackets() { + for { + var err error = nil + packet := <-forwardingChannel + switch packet.PacketType { + case "register": + err = RegisterStream(packet.StreamName) + break + case "sendData": + err = SendPacket(packet.StreamName, packet.Data) + break + case "close": + err = CloseConnection(packet.StreamName) + break + default: + log.Println("Unknown SRT packet type:", packet.PacketType) + break + } + if err != nil { + log.Printf("Error occured while receiving SRT packet of type %s: %s", packet.PacketType, err) + } + } +} + // RegisterStream Declare a new open stream and create ffmpeg instances func RegisterStream(name string) error { streams, exist := cfg[name] @@ -80,16 +108,14 @@ func RegisterStream(name string) error { } // SendPacket forward data to all FFMpeg instances related to the stream name -func SendPacket(name string, data []byte) { +func SendPacket(name string, data []byte) error { stdin := ffmpegInputStreams[name] if stdin == nil { // Don't need to forward stream - return + return nil } _, err := (*stdin).Write(data) - if err != nil { - log.Printf("Error while sending a packet to external streaming server for key %s: %s", name, err) - } + return err } // CloseConnection When the stream is ended, close FFMPEG instances diff --git a/stream/forwarding/forwarding_test.go b/stream/forwarding/forwarding_test.go index 8b111c4..5a88a21 100644 --- a/stream/forwarding/forwarding_test.go +++ b/stream/forwarding/forwarding_test.go @@ -52,11 +52,14 @@ func TestForwardStream(t *testing.T) { forwardingList := make(map[string][]string) forwardingList["demo"] = []string{"rtmp://127.0.0.1:1936/live/app"} + + forwardingChannel = make(chan srt.Packet) + // Register forwarding stream list - New(forwardingList) + New(forwardingList, forwardingChannel) // Serve HTTP Server - go srt.Serve(&srt.Options{ListenAddress: ":9711", MaxClients: 2}) + go srt.Serve(&srt.Options{ListenAddress: ":9711", MaxClients: 2}, forwardingChannel) ffmpeg := exec.Command("ffmpeg", "-i", "http://ftp.crans.org/events/Blender%20OpenMovies/big_buck_bunny_480p_stereo.ogg", diff --git a/stream/srt/srt.go b/stream/srt/srt.go index 9bab761..52ec3db 100644 --- a/stream/srt/srt.go +++ b/stream/srt/srt.go @@ -14,6 +14,12 @@ type Options struct { MaxClients int } +type Packet struct { + Data []byte + PacketType string + StreamName string +} + // Split host and port from listen address func splitHostPort(hostport string) (string, uint16) { host, portS, err := net.SplitHostPort(hostport) @@ -31,7 +37,7 @@ func splitHostPort(hostport string) (string, uint16) { } // Serve SRT server -func Serve(cfg *Options) { +func Serve(cfg *Options, forwardingChannel chan Packet) { options := make(map[string]string) options["transtype"] = "live" @@ -59,10 +65,7 @@ func Serve(cfg *Options) { // Setup stream forwarding // FIXME: demo should be replaced by stream name - /* if err := forwarding.RegisterStream("demo"); err != nil { - log.Println("Error occurred during forward stream init:", err) - break - } */ + forwardingChannel <- Packet{StreamName: "demo", PacketType: "register", Data: nil} // Read RTP packets forever and send them to the WebRTC Client for { @@ -81,15 +84,13 @@ func Serve(cfg *Options) { // log.Printf("Received %d bytes", n) // Send raw packet to other streams - // forwarding.SendPacket("demo", buff[:n]) + forwardingChannel <- Packet{StreamName: "demo", PacketType: "sendData", Data: buff[:n]} // TODO: Send to WebRTC // See https://github.com/ebml-go/webm/blob/master/reader.go //err := videoTrack.WriteSample(media.Sample{Data: data, Samples: uint32(sampleCount)}) } - /* if err := forwarding.CloseConnection("demo"); err != nil { - log.Printf("Failed to close forward stream: %s", err) - } */ + forwardingChannel <- Packet{StreamName: "demo", PacketType: "close", Data: nil} } } diff --git a/stream/srt/srt_test.go b/stream/srt/srt_test.go index e1c1a84..68fafcf 100644 --- a/stream/srt/srt_test.go +++ b/stream/srt/srt_test.go @@ -36,7 +36,7 @@ func TestServeSRT(t *testing.T) { t.Skip("WARNING: FFMPEG is not installed. Skipping stream test") } - go Serve(&Options{ListenAddress: ":9711", MaxClients: 2}) + go Serve(&Options{ListenAddress: ":9711", MaxClients: 2}, nil) ffmpeg := exec.Command("ffmpeg", "-i", "http://ftp.crans.org/events/Blender%20OpenMovies/big_buck_bunny_480p_stereo.ogg",