commit 66a02a930e3b5d32795389cbd99733b3a78f3345
parent cbbc8ddd5814d091ec4be0f779f2e2d10e16deed
Author: n0tr1v <n0tr1v@protonmail.com>
Date: Fri, 29 Dec 2023 00:04:15 -0500
cleanup
Diffstat:
5 files changed, 68 insertions(+), 50 deletions(-)
diff --git a/pkg/web/handlers/api/v1/chat.go b/pkg/web/handlers/api/v1/chat.go
@@ -11,7 +11,7 @@ import (
"dkforest/pkg/web/handlers/poker"
"dkforest/pkg/web/handlers/streamModals"
"dkforest/pkg/web/handlers/usersStreamsManager"
- hutils "dkforest/pkg/web/handlers/utils"
+ "dkforest/pkg/web/handlers/utils/stream"
"errors"
"fmt"
"github.com/labstack/echo"
@@ -110,7 +110,11 @@ func ChatStreamMessagesHandler(c echo.Context) error {
return c.Redirect(http.StatusFound, "/")
}
- quit := hutils.SetStreaming(c)
+ streamItem, err := stream.SetStreaming(c, authUser.ID, "")
+ if err != nil {
+ return nil
+ }
+ defer streamItem.Cleanup()
// Keep track of users streams, so we can limit how many are open at one time per user
if item, err := usersStreamsManager.Inst.Add(authUser.ID, ""); err == nil {
@@ -242,7 +246,7 @@ func ChatStreamMessagesHandler(c echo.Context) error {
Loop:
for {
select {
- case <-quit:
+ case <-streamItem.Quit:
break Loop
default:
}
@@ -266,7 +270,7 @@ Loop:
// Toggle the "http alive indicator" class to keep the dot green
send(indicatorAlt.alternate())
- topic, msgTyp, err := sub.ReceiveTimeout2(5*time.Second, quit)
+ topic, msgTyp, err := sub.ReceiveTimeout2(5*time.Second, streamItem.Quit)
if err != nil {
if errors.Is(err, pubsub.ErrCancelled) {
break Loop
@@ -432,14 +436,12 @@ func ChatStreamMenuHandler(c echo.Context) error {
return c.HTML(http.StatusOK, s)
}
- // Keep track of users streams, so we can limit how many are open at one time per user
- if item, err := usersStreamsManager.Inst.Add(authUser.ID, ""); err == nil {
- defer item.Cleanup()
- } else {
+ streamItem, err := stream.SetStreaming(c, authUser.ID, "")
+ if err != nil {
return nil
}
+ defer streamItem.Cleanup()
- quit := hutils.SetStreaming(c)
send := func(s string) { _, _ = c.Response().Write([]byte(s)) }
var prevHash string
var menuID int
@@ -458,7 +460,7 @@ Loop:
case <-sub.ReceiveCh():
send(`<meta http-equiv="refresh" content="1" />`)
return nil
- case <-quit:
+ case <-streamItem.Quit:
break Loop
}
diff --git a/pkg/web/handlers/chess.go b/pkg/web/handlers/chess.go
@@ -10,6 +10,7 @@ import (
"dkforest/pkg/web/handlers/interceptors"
"dkforest/pkg/web/handlers/usersStreamsManager"
hutils "dkforest/pkg/web/handlers/utils"
+ "dkforest/pkg/web/handlers/utils/stream"
"encoding/json"
"fmt"
"github.com/labstack/echo"
@@ -118,13 +119,11 @@ func ChessGameAnalyzeHandler(c echo.Context) error {
}()
}
- quit := hutils.SetStreaming(c)
-
- if item, err := usersStreamsManager.Inst.Add(authUser.ID, "analyze_"+key); err == nil {
- defer item.Cleanup()
- } else {
+ streamItem, err := stream.SetStreaming(c, authUser.ID, "analyze_"+key)
+ if err != nil {
return nil
}
+ defer streamItem.Cleanup()
sub := interceptors.ChessAnalyzeProgressPubSub.Subscribe([]string{"chess_analyze_progress_" + key})
defer sub.Close()
@@ -149,7 +148,7 @@ func ChessGameAnalyzeHandler(c echo.Context) error {
Loop:
for {
select {
- case <-quit:
+ case <-streamItem.Quit:
break Loop
default:
}
@@ -158,7 +157,7 @@ Loop:
break
}
- _, progress, err = sub.ReceiveTimeout2(1*time.Second, quit)
+ _, progress, err = sub.ReceiveTimeout2(1*time.Second, streamItem.Quit)
if err != nil {
if err == pubsub.ErrCancelled {
break Loop
@@ -599,13 +598,11 @@ func ChessGameHandler(c echo.Context) error {
// Keep track of "if the game was over" when we loaded the page
gameLoadedOver := game.Outcome() != chess.NoOutcome
- quit := hutils.SetStreaming(c)
-
- if item, err := usersStreamsManager.Inst.Add(authUser.ID, key); err == nil {
- defer item.Cleanup()
- } else {
+ streamItem, err := stream.SetStreaming(c, authUser.ID, key)
+ if err != nil {
return nil
}
+ defer streamItem.Cleanup()
send(hutils.HtmlCssReset)
send(`<style>html, body { background-color: #222; }</style>`)
@@ -633,7 +630,7 @@ func ChessGameHandler(c echo.Context) error {
select {
case <-once.After(100 * time.Millisecond):
case <-time.After(5 * time.Second):
- case <-quit:
+ case <-streamItem.Quit:
return
}
p1Count := usersStreamsManager.Inst.GetUserStreamsCountFor(p1ID, key)
@@ -660,7 +657,7 @@ func ChessGameHandler(c echo.Context) error {
Loop:
for {
select {
- case <-quit:
+ case <-streamItem.Quit:
break Loop
default:
}
@@ -678,7 +675,7 @@ Loop:
break
}
- _, payload, err := sub.ReceiveTimeout2(1*time.Second, quit)
+ _, payload, err := sub.ReceiveTimeout2(1*time.Second, streamItem.Quit)
if err != nil {
if err == pubsub.ErrCancelled {
break Loop
@@ -748,7 +745,7 @@ Loop:
go func(payload interceptors.ChessMove, c echo.Context) {
select {
case <-time.After(animationMs * time.Millisecond):
- case <-quit:
+ case <-streamItem.Quit:
return
}
if payload.IDStr2 != "" {
diff --git a/pkg/web/handlers/poker.go b/pkg/web/handlers/poker.go
@@ -9,8 +9,8 @@ import (
"dkforest/pkg/pubsub"
"dkforest/pkg/utils"
"dkforest/pkg/web/handlers/poker"
- "dkforest/pkg/web/handlers/usersStreamsManager"
hutils "dkforest/pkg/web/handlers/utils"
+ "dkforest/pkg/web/handlers/utils/stream"
"encoding/base64"
"errors"
"fmt"
@@ -417,14 +417,11 @@ func PokerStreamHandler(c echo.Context) error {
g := poker.PokerInstance.GetOrCreateGame(db, roomID, pokerTable.ID, pokerTable.MinBet, pokerTable.IsTest)
- quit := hutils.SetStreaming(c)
-
- // Keep track of users streams, so we can limit how many are open at one time per user
- if item, err := usersStreamsManager.Inst.Add(authUser.ID, roomTopic); err == nil {
- defer item.Cleanup()
- } else {
+ streamItem, err := stream.SetStreaming(c, authUser.ID, roomTopic)
+ if err != nil {
return nil
}
+ defer streamItem.Cleanup()
sub := poker.PubSub.Subscribe([]string{roomTopic, roomUserTopic, "refresh_loading_icon_" + string(authUser.Username)})
defer sub.Close()
@@ -432,7 +429,7 @@ func PokerStreamHandler(c echo.Context) error {
send(poker.BuildBaseHtml(g, authUser, chatRoomSlug))
c.Response().Flush()
- loop(quit, sub, func(topic string, payload any) error {
+ loop(streamItem.Quit, sub, func(topic string, payload any) error {
switch payload.(type) {
case poker.RefreshLoadingIconEvent:
send(`<meta http-equiv="refresh" content="1" />`)
@@ -457,14 +454,12 @@ func PokerLogsHandler(c echo.Context) error {
roomLogsTopic := roomID.LogsTopic()
sub := poker.PubSub.Subscribe([]string{roomLogsTopic, "refresh_loading_icon_" + string(authUser.Username)})
defer sub.Close()
- quit := hutils.SetStreaming(c)
- // Keep track of users streams, so we can limit how many are open at one time per user
- if item, err := usersStreamsManager.Inst.Add(authUser.ID, roomLogsTopic); err == nil {
- defer item.Cleanup()
- } else {
+ streamItem, err := stream.SetStreaming(c, authUser.ID, roomLogsTopic)
+ if err != nil {
return nil
}
+ defer streamItem.Cleanup()
send(hutils.HtmlCssReset)
send(`<style>body { background-color: #444; color: #ddd; padding: 3px; }</style><div style="display:flex;flex-direction:column-reverse;">`)
@@ -473,7 +468,7 @@ func PokerLogsHandler(c echo.Context) error {
}
c.Response().Flush()
- loop(quit, sub, func(topic string, payload any) error {
+ loop(streamItem.Quit, sub, func(topic string, payload any) error {
switch evt := payload.(type) {
case poker.RefreshLoadingIconEvent:
send(`<meta http-equiv="refresh" content="1" />`)
@@ -500,14 +495,12 @@ func PokerBetHandler(c echo.Context) error {
roomUserTopic := roomID.UserTopic(authUser.ID)
sub := poker.PubSub.Subscribe([]string{roomID.Topic(), roomUserTopic, "refresh_loading_icon_" + string(authUser.Username)})
defer sub.Close()
- quit := hutils.SetStreaming(c)
- // Keep track of users streams, so we can limit how many are open at one time per user
- if item, err := usersStreamsManager.Inst.Add(authUser.ID, roomUserTopic); err == nil {
- defer item.Cleanup()
- } else {
+ streamItem, err := stream.SetStreaming(c, authUser.ID, roomUserTopic)
+ if err != nil {
return nil
}
+ defer streamItem.Cleanup()
if c.Request().Method == http.MethodPost {
submitBtn := c.Request().PostFormValue("submitBtn")
@@ -598,7 +591,7 @@ func PokerBetHandler(c echo.Context) error {
c.Response().Flush()
}
- loop(quit, sub, func(topic string, payload any) error {
+ loop(streamItem.Quit, sub, func(topic string, payload any) error {
switch payload.(type) {
case poker.RefreshLoadingIconEvent:
send(`<meta http-equiv="refresh" content="1" />`)
diff --git a/pkg/web/handlers/utils/stream/stream.go b/pkg/web/handlers/utils/stream/stream.go
@@ -0,0 +1,26 @@
+package stream
+
+import (
+ "dkforest/pkg/database"
+ "dkforest/pkg/web/handlers/usersStreamsManager"
+ hutils "dkforest/pkg/web/handlers/utils"
+ "github.com/labstack/echo"
+)
+
+type StreamItem struct {
+ Quit <-chan struct{}
+ item *usersStreamsManager.Item
+}
+
+func (s *StreamItem) Cleanup() {
+ s.item.Cleanup()
+}
+
+func SetStreaming(c echo.Context, userID database.UserID, key string) (*StreamItem, error) {
+ // Keep track of users streams, so we can limit how many are open at one time per user
+ item, err := usersStreamsManager.Inst.Add(userID, key)
+ if err != nil {
+ return nil, err
+ }
+ return &StreamItem{Quit: hutils.SetStreaming(c), item: item}, nil
+}
diff --git a/pkg/web/handlers/utils/utils.go b/pkg/web/handlers/utils/utils.go
@@ -199,14 +199,14 @@ func VerifyPow(username, nonce string, difficulty int) bool {
return strings.HasPrefix(hashed, prefix)
}
-func SetStreamingHeaders(c echo.Context) {
+func setStreamingHeaders(c echo.Context) {
c.Response().Header().Set(echo.HeaderContentType, echo.MIMETextHTMLCharsetUTF8)
c.Response().WriteHeader(http.StatusOK)
c.Response().Header().Set("Transfer-Encoding", "chunked")
c.Response().Header().Set("Connection", "keep-alive")
}
-func CloseSignalChan(c echo.Context) <-chan struct{} {
+func closeSignalChan(c echo.Context) <-chan struct{} {
ctx, cancel := context.WithCancel(context.Background())
// Listen to the closing of HTTP connection via CloseNotifier
notify := c.Request().Context().Done()
@@ -223,8 +223,8 @@ func CloseSignalChan(c echo.Context) <-chan struct{} {
}
func SetStreaming(c echo.Context) <-chan struct{} {
- SetStreamingHeaders(c)
- return CloseSignalChan(c)
+ setStreamingHeaders(c)
+ return closeSignalChan(c)
}
func GetReferer(c echo.Context) string {