ghostream/stream/srt/handler.go

90 lines
1.9 KiB
Go
Raw Normal View History

2020-10-09 20:36:02 +00:00
// Package srt serves a SRT server
2020-10-04 12:24:00 +00:00
package srt
import (
"log"
"github.com/haivision/srtgo"
2020-10-17 10:26:24 +00:00
"gitlab.crans.org/nounous/ghostream/stream"
2020-10-04 12:24:00 +00:00
)
2020-10-17 11:43:16 +00:00
func handleStreamer(socket *srtgo.SrtSocket, streams map[string]*stream.Stream, name string) {
2020-10-17 10:26:24 +00:00
// Check stream does not exist
if _, ok := streams[name]; ok {
log.Print("Stream already exists, refusing new streamer")
socket.Close()
return
}
// Create stream
2020-10-04 12:24:00 +00:00
log.Printf("New SRT streamer for stream %s", name)
2020-10-17 11:43:16 +00:00
st := stream.New()
2020-10-17 10:26:24 +00:00
streams[name] = st
2020-10-04 12:24:00 +00:00
// Create a new buffer
2020-10-04 12:27:16 +00:00
// UDP packet cannot be larger than MTU (1500)
buff := make([]byte, 1500)
2020-10-04 12:24:00 +00:00
// Read RTP packets forever and send them to the WebRTC Client
for {
2020-10-04 12:27:16 +00:00
// 5s timeout
2020-10-17 10:26:24 +00:00
n, err := socket.Read(buff, 5000)
2020-10-04 12:24:00 +00:00
if err != nil {
2020-10-14 19:36:12 +00:00
log.Println("Error occurred while reading SRT socket:", err)
2020-10-04 12:24:00 +00:00
break
}
if n == 0 {
// End of stream
log.Printf("Received no bytes, stopping stream.")
break
}
2020-10-17 10:26:24 +00:00
// Send raw data to other streams
2020-10-04 12:24:00 +00:00
// Copy data in another buffer to ensure that the data would not be overwritten
2020-10-17 10:26:24 +00:00
// FIXME: might be unnecessary
2020-10-04 12:24:00 +00:00
data := make([]byte, n)
copy(data, buff[:n])
2020-10-17 10:26:24 +00:00
st.Broadcast <- data
2020-10-04 12:24:00 +00:00
}
2020-10-17 10:26:24 +00:00
// Close stream
st.Close()
socket.Close()
delete(streams, name)
2020-10-04 12:24:00 +00:00
}
2020-10-17 11:43:16 +00:00
func handleViewer(s *srtgo.SrtSocket, streams map[string]*stream.Stream, name string) {
2020-10-04 12:24:00 +00:00
log.Printf("New SRT viewer for stream %s", name)
2020-10-17 10:26:24 +00:00
// Get requested stream
st, ok := streams[name]
if !ok {
log.Println("Stream does not exist, refusing new viewer")
return
}
// Register new output
c := make(chan []byte, 128)
2020-10-17 11:43:16 +00:00
st.Register(c, false)
2020-10-17 10:26:24 +00:00
// Receive data and send them
2020-10-17 11:43:16 +00:00
for data := range c {
2020-10-17 10:26:24 +00:00
if len(data) < 1 {
log.Print("Remove SRT viewer because of end of stream")
break
}
2020-10-17 11:43:16 +00:00
// Send data
2020-10-17 10:26:24 +00:00
_, err := s.Write(data, 1000)
if err != nil {
log.Printf("Remove SRT viewer because of sending error, %s", err)
break
2020-10-04 12:24:00 +00:00
}
}
2020-10-17 10:26:24 +00:00
// Close output
2020-10-17 11:43:16 +00:00
st.Unregister(c, false)
2020-10-17 10:26:24 +00:00
s.Close()
2020-10-04 12:24:00 +00:00
}