pubsub_test.go (2031B)
1 package pubsub 2 3 import ( 4 "testing" 5 "time" 6 7 "github.com/stretchr/testify/assert" 8 ) 9 10 func TestPublish(t *testing.T) { 11 topic := "topic1" 12 msg := "msg1" 13 14 ps := NewPubSub[string]() 15 16 s1 := ps.Subscribe([]string{topic}) 17 s2 := ps.Subscribe([]string{topic}) 18 s3 := ps.Subscribe([]string{topic}) 19 20 ps.Pub(topic, msg) 21 22 s1Topic, s1Msg, s1Err := s1.ReceiveTimeout(time.Second) 23 s2Topic, s2Msg, s2Err := s2.ReceiveTimeout(time.Second) 24 s3Topic, s3Msg, s3Err := s3.ReceiveTimeout(time.Second) 25 26 assert.Nil(t, s1Err) 27 assert.Nil(t, s2Err) 28 assert.Nil(t, s3Err) 29 assert.Equal(t, msg, s1Msg) 30 assert.Equal(t, msg, s2Msg) 31 assert.Equal(t, msg, s3Msg) 32 assert.Equal(t, topic, s1Topic) 33 assert.Equal(t, topic, s2Topic) 34 assert.Equal(t, topic, s3Topic) 35 } 36 37 func TestSubscribe_manyTopics(t *testing.T) { 38 topic1 := "topic1" 39 topic2 := "topic2" 40 msg1 := "msg1" 41 msg2 := "msg2" 42 43 ps := NewPubSub[string]() 44 45 s := ps.Subscribe([]string{topic1, topic2}) 46 ps.Pub(topic1, msg1) 47 ps.Pub(topic2, msg2) 48 49 s1Topic1, s1Msg1, s1Err1 := s.ReceiveTimeout(time.Second) 50 s1Topic2, s1Msg2, s1Err2 := s.ReceiveTimeout(time.Second) 51 52 assert.Equal(t, topic1, s1Topic1) 53 assert.Equal(t, msg1, s1Msg1) 54 assert.Nil(t, s1Err1) 55 56 assert.Equal(t, topic2, s1Topic2) 57 assert.Equal(t, msg2, s1Msg2) 58 assert.Nil(t, s1Err2) 59 } 60 61 func TestPublishMarshal(t *testing.T) { 62 topic := "topic" 63 type Msg struct { 64 ID int64 65 Msg string 66 private string 67 } 68 var msg Msg 69 msg.ID = 1 70 msg.Msg = "will be sent" 71 msg.private = "will not" 72 73 ps := NewPubSub[Msg]() 74 75 s1 := ps.Subscribe([]string{topic}) 76 ps.Pub(topic, msg) 77 s1Topic, s1Msg, s1Err := s1.ReceiveTimeout(time.Second) 78 79 assert.Nil(t, s1Err) 80 assert.Equal(t, int64(1), s1Msg.ID) 81 assert.Equal(t, "will be sent", s1Msg.Msg) 82 assert.Equal(t, "will not", s1Msg.private) 83 assert.Equal(t, topic, s1Topic) 84 } 85 86 func TestSub_Close(t *testing.T) { 87 topic := "topic1" 88 89 ps := NewPubSub[string]() 90 91 s1 := ps.Subscribe([]string{topic}) 92 s1.Close() 93 _, _, s1Err := s1.ReceiveTimeout(time.Second) 94 95 assert.Equal(t, ErrCancelled, s1Err) 96 }