diff --git a/messaging/quality.go b/messaging/quality.go new file mode 100644 index 0000000..8ed72c9 --- /dev/null +++ b/messaging/quality.go @@ -0,0 +1,79 @@ +// Package messaging defines a structure to communication between inputs and outputs +package messaging + +import ( + "sync" +) + +// Quality holds a specific stream quality. +// It makes packages able to subscribe to an incoming stream. +type Quality struct { + // Incoming data come from this channel + Broadcast chan<- []byte + + // Incoming data will be outputed to all those outputs. + // Use a map to be able to delete an item. + outputs map[chan []byte]struct{} + + // Mutex to lock outputs map + lockOutputs sync.Mutex +} + +func newQuality() (q *Quality) { + q = &Quality{} + broadcast := make(chan []byte, 1024) + q.Broadcast = broadcast + q.outputs = make(map[chan []byte]struct{}) + go q.run(broadcast) + return q +} + +func (q *Quality) run(broadcast <-chan []byte) { + for msg := range broadcast { + q.lockOutputs.Lock() + for output := range q.outputs { + select { + case output <- msg: + default: + // If full, do a ring buffer + // Check that output is not of size zero + if len(output) > 1 { + <-output + } + } + } + q.lockOutputs.Unlock() + } + + // Incoming chan has been closed, close all outputs + q.lockOutputs.Lock() + for ch := range q.outputs { + delete(q.outputs, ch) + close(ch) + } + q.lockOutputs.Unlock() +} + +// Close the incoming chan, this will also delete all outputs. +func (q *Quality) Close() { + close(q.Broadcast) +} + +// Register a new output on a stream. +func (q *Quality) Register(output chan []byte) { + q.lockOutputs.Lock() + q.outputs[output] = struct{}{} + q.lockOutputs.Unlock() +} + +// Unregister removes an output. +func (q *Quality) Unregister(output chan []byte) { + // Make sure we did not already close this output + q.lockOutputs.Lock() + _, ok := q.outputs[output] + if ok { + delete(q.outputs, output) + close(output) + } + defer q.lockOutputs.Unlock() +} diff --git a/messaging/stream.go b/messaging/stream.go new file mode 100644 index 0000000..81eed21 --- /dev/null +++ b/messaging/stream.go @@ -0,0 +1,60 @@ +// Package messaging defines a structure to communication between inputs and outputs +package messaging + +import ( + "sync" +) + +// Stream makes packages able to subscribe to an incoming stream +type Stream struct { + // Different qualities of this stream + qualities map[string]*Quality + + // Mutex to lock outputs map + lockQualities sync.Mutex + + // Count clients for statistics + nbClients int +} + +func newStream() (s *Stream) { + s = &Stream{} + s.qualities = make(map[string]*Quality) + s.nbClients = 0 + return s +} + +// CreateQuality creates a new quality associated with this stream. +func (s *Stream) CreateQuality(name string) (quality *Quality) { + s.lockQualities.Lock() + quality = newQuality() + s.qualities[name] = quality + s.lockQualities.Unlock() + return quality +} + +// DeleteQuality removes a quality. +func (s *Stream) DeleteQuality(name string) { + // Make sure we did not already close this output + s.lockQualities.Lock() + if _, ok := s.qualities[name]; ok { + s.qualities[name].Close() + delete(s.qualities, name) + } + s.lockQualities.Unlock() +} + +// ClientCount returns the number of clients. +func (s *Stream) ClientCount() int { + return s.nbClients +} + +// IncrementClientCount increments the number of clients. +func (s *Stream) IncrementClientCount() { + s.nbClients++ +} + +// DecrementClientCount decrements the number of clients. +func (s *Stream) DecrementClientCount() { + s.nbClients-- +} diff --git a/messaging/streams.go b/messaging/streams.go new file mode 100644 index 0000000..d9fa78b --- /dev/null +++ b/messaging/streams.go @@ -0,0 +1,97 @@ +// Package messaging defines a structure to communication between inputs and outputs +package messaging + +import ( + "errors" + "log" + "sync" +) + +// Streams hold all application streams. +type Streams struct { + // Associate each stream name to the stream + streams map[string]*Stream + + // Mutex to lock streams + lockStreams sync.Mutex + + // Subscribers get notified when a new stream is created + // Use a map to be able to delete a subscriber + eventSubscribers map[chan string]struct{} + + // Mutex to lock eventSubscribers + lockSubscribers sync.Mutex +} + +// New creates a new stream list. +func New() (l *Streams) { + l = &Streams{} + l.streams = make(map[string]*Stream) + l.eventSubscribers = make(map[chan string]struct{}) + return l +} + +// Subscribe to get notified on new stream. +func (l *Streams) Subscribe(output chan string) { + l.lockSubscribers.Lock() + l.eventSubscribers[output] = struct{}{} + l.lockSubscribers.Unlock() +} + +// Unsubscribe to no longer get notified on new stream. +func (l *Streams) Unsubscribe(output chan string) { + // Make sure we did not already delete this subscriber + l.lockSubscribers.Lock() + if _, ok := l.eventSubscribers[output]; ok { + delete(l.eventSubscribers, output) + } + l.lockSubscribers.Unlock() +} + +// Create a new stream. +func (l *Streams) Create(name string) (s *Stream, err error) { + // If stream already exist, fail + if _, ok := l.streams[name]; ok { + return nil, errors.New("stream already exists") + } + + // Create stream + s = newStream() + l.lockStreams.Lock() + l.streams[name] = s + l.lockStreams.Unlock() + + // Notify + l.lockSubscribers.Lock() + for sub := range l.eventSubscribers { + select { + case sub <- name: + default: + log.Printf("Failed to announce stream '%s' to subscriber", name) + } + } + l.lockSubscribers.Unlock() + return s, nil +} + +// Get a stream. +func (l *Streams) Get(name string) (s *Stream, err error) { + // If stream does exist, return it + l.lockStreams.Lock() + s, ok := l.streams[name] + l.lockStreams.Unlock() + if !ok { + return nil, errors.New("stream does not exist") + } + return s, nil +} + +// Delete a stream. +func (l *Streams) Delete(name string) { + // Make sure we did not already delete this stream + l.lockStreams.Lock() + if _, ok := l.streams[name]; ok { + delete(l.streams, name) + } + l.lockStreams.Unlock() +} diff --git a/messaging/streams_test.go b/messaging/streams_test.go new file mode 100644 index 0000000..9850bc5 --- /dev/null +++ b/messaging/streams_test.go @@ -0,0 +1,52 @@ +package messaging + +import "testing" + +func TestWithOneStream(t *testing.T) { + streams := New() + + // Subscribe to new streams + event := make(chan string, 8) + streams.Subscribe(event) + + // Create a stream + stream, err := streams.Create("demo") + if err != nil { + t.Errorf("Failed to create stream") + } + + // Check that we receive the creation event + e := <-event + if e != "demo" { + t.Errorf("Message has wrong content: %s != demo", e) + } + + // Create a quality + quality := stream.CreateQuality("source") + + // Register one output + output := make(chan []byte, 64) + quality.Register(output) + stream.IncrementClientCount() + + // Try to pass one message + quality.Broadcast <- []byte("hello world") + msg := <-output + if string(msg) != "hello world" { + t.Errorf("Message has wrong content: %s != hello world", msg) + } + + // Check client count + if count := stream.ClientCount(); count != 1 { + t.Errorf("Client counter returned %d, expected 1", count) + } + + // Unregister + quality.Unregister(output) + stream.DecrementClientCount() + + // Check client count + if count := stream.ClientCount(); count != 0 { + t.Errorf("Client counter returned %d, expected 0", count) + } +} diff --git a/stream/messaging.go b/stream/messaging.go deleted file mode 100644 index 4c2c0e7..0000000 --- a/stream/messaging.go +++ /dev/null @@ -1,99 +0,0 @@ -// Package stream defines a structure to communication between inputs and outputs -package stream - -import ( - "sync" -) - -// Stream makes packages able to subscribe to an incoming stream -type Stream struct { - // Incoming data come from this channel - Broadcast chan<- []byte - - // Use a map to be able to delete an item - outputs map[chan []byte]struct{} - - // Count clients for statistics - nbClients int - - // Mutex to lock outputs map - lock sync.Mutex -} - -// New creates a new stream. -func New() *Stream { - s := &Stream{} - broadcast := make(chan []byte, 1024) - s.Broadcast = broadcast - s.outputs = make(map[chan []byte]struct{}) - s.nbClients = 0 - go s.run(broadcast) - return s -} - -func (s *Stream) run(broadcast <-chan []byte) { - for msg := range broadcast { - s.lock.Lock() - for output := range s.outputs { - select { - case output <- msg: - default: - // If full, do a ring buffer - // Check that output is not of size zero - if len(output) > 1 { - <-output - } - } - } - s.lock.Unlock() - } - - // Incoming chan has been closed, close all outputs - s.lock.Lock() - for ch := range s.outputs { - delete(s.outputs, ch) - close(ch) - } - s.lock.Unlock() -} - -// Close the incoming chan, this will also delete all outputs -func (s *Stream) Close() { - close(s.Broadcast) -} - -// Register a new output on a stream. -func (s *Stream) Register(output chan []byte) { - s.lock.Lock() - defer s.lock.Unlock() - s.outputs[output] = struct{}{} -} - -// Unregister removes an output. -// If hidden in true, then do not count this client. -func (s *Stream) Unregister(output chan []byte) { - s.lock.Lock() - defer s.lock.Unlock() - - // Make sure we did not already close this output - _, ok := s.outputs[output] - if ok { - delete(s.outputs, output) - close(output) - } -} - -// ClientCount returns the number of clients -func (s *Stream) ClientCount() int { - return s.nbClients -} - -// IncrementClientCount increments the number of clients -func (s *Stream) IncrementClientCount() { - s.nbClients++ -} - -// DecrementClientCount decrements the number of clients -func (s *Stream) DecrementClientCount() { - s.nbClients-- -} diff --git a/stream/messaging_test.go b/stream/messaging_test.go deleted file mode 100644 index 49e17a0..0000000 --- a/stream/messaging_test.go +++ /dev/null @@ -1,42 +0,0 @@ -package stream - -import ( - "testing" -) - -func TestWithoutOutputs(t *testing.T) { - stream := New() - defer stream.Close() - stream.Broadcast <- []byte("hello world") -} - -func TestWithOneOutput(t *testing.T) { - stream := New() - defer stream.Close() - - // Register one output - output := make(chan []byte, 64) - stream.Register(output) - stream.IncrementClientCount() - - // Try to pass one message - stream.Broadcast <- []byte("hello world") - msg := <-output - if string(msg) != "hello world" { - t.Errorf("Message has wrong content: %s != hello world", msg) - } - - // Check client count - if count := stream.ClientCount(); count != 1 { - t.Errorf("Client counter returned %d, expected 1", count) - } - - // Unregister - stream.Unregister(output) - stream.DecrementClientCount() - - // Check client count - if count := stream.ClientCount(); count != 0 { - t.Errorf("Client counter returned %d, expected 0", count) - } -}