Use []byte for stream data

This commit is contained in:
Alexandre Iooss 2020-10-17 10:21:40 +02:00
parent 73e6be1274
commit e0911ab050
No known key found for this signature in database
GPG Key ID: 6C79278F3FCDCC02
2 changed files with 12 additions and 12 deletions

View File

@ -6,10 +6,10 @@ import "sync"
// Stream makes packages able to subscribe to an incoming stream // Stream makes packages able to subscribe to an incoming stream
type Stream struct { type Stream struct {
// Incoming data come from this channel // Incoming data come from this channel
Broadcast chan<- interface{} Broadcast chan<- []byte
// Use a map to be able to delete an item // Use a map to be able to delete an item
outputs map[chan<- interface{}]struct{} outputs map[chan<- []byte]struct{}
// Mutex to lock this ressource // Mutex to lock this ressource
lock sync.Mutex lock sync.Mutex
@ -18,14 +18,14 @@ type Stream struct {
// New creates a new stream. // New creates a new stream.
func New() *Stream { func New() *Stream {
s := &Stream{} s := &Stream{}
broadcast := make(chan interface{}, 64) broadcast := make(chan []byte, 64)
s.Broadcast = broadcast s.Broadcast = broadcast
s.outputs = make(map[chan<- interface{}]struct{}) s.outputs = make(map[chan<- []byte]struct{})
go s.run(broadcast) go s.run(broadcast)
return s return s
} }
func (s *Stream) run(broadcast <-chan interface{}) { func (s *Stream) run(broadcast <-chan []byte) {
for msg := range broadcast { for msg := range broadcast {
func() { func() {
s.lock.Lock() s.lock.Lock()
@ -57,14 +57,14 @@ func (s *Stream) Close() {
} }
// Register a new output on a stream // Register a new output on a stream
func (s *Stream) Register(output chan<- interface{}) { func (s *Stream) Register(output chan<- []byte) {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
s.outputs[output] = struct{}{} s.outputs[output] = struct{}{}
} }
// Unregister removes an output // Unregister removes an output
func (s *Stream) Unregister(output chan<- interface{}) { func (s *Stream) Unregister(output chan<- []byte) {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()

View File

@ -7,7 +7,7 @@ import (
func TestWithoutOutputs(t *testing.T) { func TestWithoutOutputs(t *testing.T) {
stream := New() stream := New()
defer stream.Close() defer stream.Close()
stream.Broadcast <- "hello world" stream.Broadcast <- []byte("hello world")
} }
func TestWithOneOutput(t *testing.T) { func TestWithOneOutput(t *testing.T) {
@ -15,14 +15,14 @@ func TestWithOneOutput(t *testing.T) {
defer stream.Close() defer stream.Close()
// Register one output // Register one output
output := make(chan interface{}, 64) output := make(chan []byte, 64)
stream.Register(output) stream.Register(output)
// Try to pass one message // Try to pass one message
stream.Broadcast <- "hello world" stream.Broadcast <- []byte("hello world")
msg := <-output msg := <-output
if m, ok := msg.(string); !ok || m != "hello world" { if string(msg) != "hello world" {
t.Error("Message has wrong type or content") t.Errorf("Message has wrong content: %s != hello world", msg)
} }
// Unregister // Unregister