85 lines
1.9 KiB
Go
85 lines
1.9 KiB
Go
// Package messaging defines a structure to communication between inputs and outputs
|
|
package messaging
|
|
|
|
import (
|
|
"errors"
|
|
"sync"
|
|
)
|
|
|
|
// Stream makes packages able to subscribe to an incoming stream
|
|
type Stream struct {
|
|
// Different qualities of this stream
|
|
qualities map[string]*Quality
|
|
|
|
// Mutex to lock outputs map
|
|
lockQualities sync.Mutex
|
|
|
|
// Count clients for statistics
|
|
nbClients int
|
|
}
|
|
|
|
func newStream() (s *Stream) {
|
|
s = &Stream{}
|
|
s.qualities = make(map[string]*Quality)
|
|
s.nbClients = 0
|
|
return s
|
|
}
|
|
|
|
// Close stream.
|
|
func (s *Stream) Close() {
|
|
for quality := range s.qualities {
|
|
s.DeleteQuality(quality)
|
|
}
|
|
}
|
|
|
|
// CreateQuality creates a new quality associated with this stream.
|
|
func (s *Stream) CreateQuality(name string) (quality *Quality, err error) {
|
|
// If quality already exist, fail
|
|
if _, ok := s.qualities[name]; ok {
|
|
return nil, errors.New("quality already exists")
|
|
}
|
|
|
|
s.lockQualities.Lock()
|
|
quality = newQuality(name, s)
|
|
s.qualities[name] = quality
|
|
s.lockQualities.Unlock()
|
|
return quality, nil
|
|
}
|
|
|
|
// DeleteQuality removes a stream quality.
|
|
func (s *Stream) DeleteQuality(name string) {
|
|
// Make sure we did not already close this output
|
|
s.lockQualities.Lock()
|
|
if _, ok := s.qualities[name]; ok {
|
|
s.qualities[name].Close()
|
|
delete(s.qualities, name)
|
|
}
|
|
s.lockQualities.Unlock()
|
|
}
|
|
|
|
// GetQuality gets a specific stream quality.
|
|
func (s *Stream) GetQuality(name string) (quality *Quality, err error) {
|
|
s.lockQualities.Lock()
|
|
quality, ok := s.qualities[name]
|
|
s.lockQualities.Unlock()
|
|
if !ok {
|
|
return nil, errors.New("quality does not exist")
|
|
}
|
|
return quality, nil
|
|
}
|
|
|
|
// ClientCount returns the number of clients.
|
|
func (s *Stream) ClientCount() int {
|
|
return s.nbClients
|
|
}
|
|
|
|
// IncrementClientCount increments the number of clients.
|
|
func (s *Stream) IncrementClientCount() {
|
|
s.nbClients++
|
|
}
|
|
|
|
// DecrementClientCount decrements the number of clients.
|
|
func (s *Stream) DecrementClientCount() {
|
|
s.nbClients--
|
|
}
|