commit 77da77fc240521b1ad771b98d64545d8e7ded8a0
parent f55bc3814eef948c4829e69a2df9d6e8aa1985ef
Author: n0tr1v <n0tr1v@protonmail.com>
Date: Sun, 21 May 2023 16:28:52 -0700
generic PubSub
Diffstat:
3 files changed, 55 insertions(+), 71 deletions(-)
diff --git a/pkg/pubsub/pubsub.go b/pkg/pubsub/pubsub.go
@@ -2,54 +2,70 @@ package pubsub
import (
"context"
- "encoding/json"
"sync"
"time"
"github.com/pkg/errors"
)
-// Contains and manage the map of topics -> subscribers
-var topicsSubs struct {
+// PubSub contains and manage the map of topics -> subscribers
+type PubSub[T any] struct {
sync.Mutex
- m map[string][]*Sub
+ m map[string][]*Sub[T]
}
-func init() {
- topicsSubs.m = make(map[string][]*Sub)
+func NewPubSub[T any]() *PubSub[T] {
+ ps := PubSub[T]{}
+ ps.m = make(map[string][]*Sub[T])
+ return &ps
}
-func getSubscribers(topic string) []*Sub {
- topicsSubs.Lock()
- defer topicsSubs.Unlock()
- return topicsSubs.m[topic]
+func (p *PubSub[T]) getSubscribers(topic string) []*Sub[T] {
+ p.Lock()
+ defer p.Unlock()
+ return p.m[topic]
}
-func addSubscriber(s *Sub) {
- topicsSubs.Lock()
+func (p *PubSub[T]) addSubscriber(s *Sub[T]) {
+ p.Lock()
for _, topic := range s.topics {
- topicsSubs.m[topic] = append(topicsSubs.m[topic], s)
+ p.m[topic] = append(p.m[topic], s)
}
- topicsSubs.Unlock()
+ p.Unlock()
}
-func removeSubscriber(s *Sub) {
- topicsSubs.Lock()
+func (p *PubSub[T]) removeSubscriber(s *Sub[T]) {
+ p.Lock()
for _, topic := range s.topics {
- for i, subscriber := range topicsSubs.m[topic] {
+ for i, subscriber := range p.m[topic] {
if subscriber == s {
- topicsSubs.m[topic] = append(topicsSubs.m[topic][:i], topicsSubs.m[topic][i+1:]...)
+ p.m[topic] = append(p.m[topic][:i], p.m[topic][i+1:]...)
break
}
}
}
- topicsSubs.Unlock()
+ p.Unlock()
+}
+
+// Subscribe is an alias for NewSub
+func (p *PubSub[T]) Subscribe(topics []string) *Sub[T] {
+ ctx, cancel := context.WithCancel(context.Background())
+ s := &Sub[T]{topics: topics, ch: make(chan payload[T], 10), ctx: ctx, cancel: cancel, p: p}
+ p.addSubscriber(s)
+ return s
+}
+
+// Pub shortcut for publish which ignore the error
+func (p *PubSub[T]) Pub(topic string, msg T) {
+ for _, s := range p.getSubscribers(topic) {
+ s.publish(payload[T]{topic, msg})
+ }
}
//
-type payload struct {
+type payload[T any] struct {
topic string
- msg string
+ msg T
}
// ErrTimeout error returned when timeout occurs
@@ -59,23 +75,16 @@ var ErrTimeout = errors.New("timeout")
var ErrCancelled = errors.New("cancelled")
// Sub subscriber will receive messages published on a topic in his ch
-type Sub struct {
- topics []string // Topics subscribed to
- ch chan payload // Receives messages in this channel
+type Sub[T any] struct {
+ topics []string // Topics subscribed to
+ ch chan payload[T] // Receives messages in this channel
ctx context.Context
cancel context.CancelFunc
-}
-
-// NewSub creates a new subscriber for topics
-func NewSub(topics []string) *Sub {
- ctx, cancel := context.WithCancel(context.Background())
- s := &Sub{topics: topics, ch: make(chan payload, 10), ctx: ctx, cancel: cancel}
- addSubscriber(s)
- return s
+ p *PubSub[T]
}
// ReceiveTimeout returns a message received on the channel or timeout
-func (s *Sub) ReceiveTimeout(timeout time.Duration) (topic string, msg string, err error) {
+func (s *Sub[T]) ReceiveTimeout(timeout time.Duration) (topic string, msg T, err error) {
select {
case p := <-s.ch:
return p.topic, p.msg, nil
@@ -87,8 +96,8 @@ func (s *Sub) ReceiveTimeout(timeout time.Duration) (topic string, msg string, e
}
// Receive returns a message
-func (s *Sub) Receive() (topic string, msg string, err error) {
- var res string
+func (s *Sub[T]) Receive() (topic string, msg T, err error) {
+ var res T
select {
case p := <-s.ch:
return p.topic, p.msg, nil
@@ -98,42 +107,15 @@ func (s *Sub) Receive() (topic string, msg string, err error) {
}
// Close will remove the subscriber from the topic subscribers
-func (s *Sub) Close() {
+func (s *Sub[T]) Close() {
s.cancel()
- removeSubscriber(s)
+ s.p.removeSubscriber(s)
}
// publish a message to the subscriber channel
-func (s *Sub) publish(p payload) {
+func (s *Sub[T]) publish(p payload[T]) {
select {
case s.ch <- p:
default:
}
}
-
-// Subscribe is an alias for NewSub
-func Subscribe(topics []string) *Sub {
- return NewSub(topics)
-}
-
-// PublishString a message to all subscribers of a topic
-func PublishString(topic string, msg string) {
- for _, s := range getSubscribers(topic) {
- s.publish(payload{topic, msg})
- }
-}
-
-// Publish a message to all subscribers of a topic
-func Publish(topic string, msg any) error {
- marshalled, err := json.Marshal(msg)
- if err != nil {
- return err
- }
- PublishString(topic, string(marshalled))
- return nil
-}
-
-// Pub shortcut for publish which ignore the error
-func Pub(topic string, msg any) {
- _ = Publish(topic, msg)
-}
diff --git a/pkg/web/handlers/api/v1/chess.go b/pkg/web/handlers/api/v1/chess.go
@@ -467,7 +467,7 @@ func (b *Chess) SendMove(gameKey string, userID database.UserID, g *ChessGame, c
//delete(b.games, gameKey)
}
- pubsub.Pub(gameKey, true)
+ ChessPubSub.Pub(gameKey, true)
// Notify (pm) the opponent that you made a move
if opponent.NotifyChessMove {
@@ -483,6 +483,8 @@ func (b *Chess) SendMove(gameKey string, userID database.UserID, g *ChessGame, c
return nil
}
+var ChessPubSub = pubsub.NewPubSub[bool]()
+
func (b *Chess) InterceptMsg(cmd *Command) {
m := cRgx.FindStringSubmatch(cmd.message)
if len(m) != 3 {
diff --git a/pkg/web/handlers/handlers.go b/pkg/web/handlers/handlers.go
@@ -6,7 +6,6 @@ import (
"crypto/sha256"
dutils "dkforest/pkg/database/utils"
"dkforest/pkg/hashset"
- pubsub2 "dkforest/pkg/pubsub"
"dkforest/pkg/utils/crypto"
v1 "dkforest/pkg/web/handlers/api/v1"
"encoding/base64"
@@ -4767,7 +4766,7 @@ func ChessGameHandler(c echo.Context) error {
resignColor = chess.Black
}
g.Game.Resign(resignColor)
- pubsub2.Pub(key, true)
+ v1.ChessPubSub.Pub(key, true)
} else {
if err := v1.ChessInstance.SendMove(key, authUser.ID, g, c); err != nil {
logrus.Error(err)
@@ -4829,8 +4828,9 @@ func ChessGameHandler(c echo.Context) error {
authorizedChannels := make([]string, 0)
authorizedChannels = append(authorizedChannels, key)
- pubsub := pubsub2.Subscribe(authorizedChannels)
- defer pubsub.Close()
+
+ sub := v1.ChessPubSub.Subscribe(authorizedChannels)
+ defer sub.Close()
var card1 string
if isSpectator {
@@ -4855,7 +4855,7 @@ Loop:
break
}
- _, _, err := pubsub.ReceiveTimeout(1 * time.Second)
+ _, _, err := sub.ReceiveTimeout(1 * time.Second)
if err != nil {
continue
}