1
0
mirror of https://gitlab.crans.org/nounous/ghostream.git synced 2024-12-23 00:22:19 +00:00
ghostream/messaging/quality.go

80 lines
1.7 KiB
Go
Raw Normal View History

2020-10-19 17:14:46 +00:00
// Package messaging defines a structure to communication between inputs and outputs
package messaging
import (
"sync"
)
// Quality holds a specific stream quality.
// It makes packages able to subscribe to an incoming stream.
type Quality struct {
// Incoming data come from this channel
Broadcast chan<- []byte
// Incoming data will be outputed to all those outputs.
// Use a map to be able to delete an item.
outputs map[chan []byte]struct{}
// Mutex to lock outputs map
lockOutputs sync.Mutex
}
func newQuality() (q *Quality) {
q = &Quality{}
broadcast := make(chan []byte, 1024)
q.Broadcast = broadcast
q.outputs = make(map[chan []byte]struct{})
go q.run(broadcast)
return q
}
func (q *Quality) run(broadcast <-chan []byte) {
for msg := range broadcast {
q.lockOutputs.Lock()
for output := range q.outputs {
select {
case output <- msg:
default:
// If full, do a ring buffer
// Check that output is not of size zero
if len(output) > 1 {
<-output
}
}
}
q.lockOutputs.Unlock()
}
// Incoming chan has been closed, close all outputs
q.lockOutputs.Lock()
for ch := range q.outputs {
delete(q.outputs, ch)
close(ch)
}
q.lockOutputs.Unlock()
}
// Close the incoming chan, this will also delete all outputs.
func (q *Quality) Close() {
close(q.Broadcast)
}
// Register a new output on a stream.
func (q *Quality) Register(output chan []byte) {
q.lockOutputs.Lock()
q.outputs[output] = struct{}{}
q.lockOutputs.Unlock()
}
// Unregister removes an output.
func (q *Quality) Unregister(output chan []byte) {
// Make sure we did not already close this output
q.lockOutputs.Lock()
_, ok := q.outputs[output]
if ok {
delete(q.outputs, output)
close(output)
}
defer q.lockOutputs.Unlock()
}