dkforest

A forum and chat platform (onion)
git clone https://git.dasho.dev/n0tr1v/dkforest.git
Log | Files | Refs | LICENSE

commit 5e3d5d78cd643a784a24da03c2588a252f0b8ea8
parent d3de3db0777c66c3f7cdff88a9138d33e907f616
Author: n0tr1v <n0tr1v@protonmail.com>
Date:   Mon, 12 Jun 2023 21:16:07 -0700

move code

Diffstat:
Mpkg/web/handlers/admin.go | 3++-
Apkg/web/handlers/api/v1/chat.go | 356+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mpkg/web/handlers/chat.go | 3++-
Mpkg/web/handlers/chess.go | 12+++++++-----
Mpkg/web/handlers/handlers.go | 360-------------------------------------------------------------------------------
Dpkg/web/handlers/usersStreamsManager.go | 81-------------------------------------------------------------------------------
Apkg/web/handlers/usersStreamsManager/usersStreamsManager.go | 81+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mpkg/web/handlers/utils/utils.go | 20++++++++++++++++++++
Mpkg/web/web.go | 2+-
9 files changed, 469 insertions(+), 449 deletions(-)

diff --git a/pkg/web/handlers/admin.go b/pkg/web/handlers/admin.go @@ -4,6 +4,7 @@ import ( dutils "dkforest/pkg/database/utils" "dkforest/pkg/managers" "dkforest/pkg/web/handlers/interceptors" + "dkforest/pkg/web/handlers/usersStreamsManager" "fmt" "github.com/jinzhu/gorm" "io" @@ -701,7 +702,7 @@ func AdminEditRoomHandler(c echo.Context) error { func StreamUsersHandler(c echo.Context) error { db := c.Get("database").(*database.DkfDB) - usersIDs := usersStreamsManager.GetUsers() + usersIDs := usersStreamsManager.UsersStreamsManagerInst.GetUsers() users, _ := db.GetUsersByID(usersIDs) out := "" for _, user := range users { diff --git a/pkg/web/handlers/api/v1/chat.go b/pkg/web/handlers/api/v1/chat.go @@ -0,0 +1,356 @@ +package v1 + +import ( + "dkforest/pkg/config" + "dkforest/pkg/database" + dutils "dkforest/pkg/database/utils" + "dkforest/pkg/managers" + "dkforest/pkg/pubsub" + "dkforest/pkg/utils" + "dkforest/pkg/web/handlers/streamModals" + "dkforest/pkg/web/handlers/usersStreamsManager" + hutils "dkforest/pkg/web/handlers/utils" + "fmt" + "github.com/labstack/echo" + "net/http" + "strings" + "time" +) + +// Returns either or not authUser is allowed to see msg +func VerifyMsgAuth(db *database.DkfDB, authUser *database.User, msg *database.ChatMessage) bool { + // Verify moderators channel authorization + if msg.Moderators && !authUser.IsModerator() { + return false + } + // Verify group authorization + if msg.GroupID != nil { + userGroupsIDs, _ := db.GetUserRoomGroupsIDs(authUser.ID, msg.RoomID) + if !utils.InArr(*msg.GroupID, userGroupsIDs) { + return false + } + } + // verify PM authorization + if msg.IsPm() { + if msg.UserID != authUser.ID && *msg.ToUserID != authUser.ID { + return false + } + } + return true +} + +func manualPreload(db *database.DkfDB, msg *database.ChatMessage, room database.ChatRoom) { + if msg.GroupID != nil { + if msg.Group == nil { + group, _ := db.GetRoomGroupByID(msg.RoomID, *msg.GroupID) + msg.Group = &group + } + } + if msg.ToUserID != nil { + if msg.ToUser == nil { + toUser, _ := db.GetUserByID(*msg.ToUserID) + msg.ToUser = &toUser + } + } + if msg.User.ID == 0 { + msg.User, _ = db.GetUserByID(msg.UserID) + } + msg.Room = room +} + +// Return true if the message passes all the user's filter. +// false if the message does not and should be discarded. +func applyUserFilters(db *database.DkfDB, authUser *database.User, msg *database.ChatMessage, + pmOnlyQuery database.PmDisplayMode, displayHellbanned, mentionsOnlyQuery bool) bool { + if (pmOnlyQuery == database.PmOnly && msg.ToUser == nil) || + (pmOnlyQuery == database.PmNone && msg.ToUser != nil) || + !authUser.DisplayModerators && msg.Moderators || + !displayHellbanned && msg.IsHellbanned { + return false + } + + if !authUser.DisplayIgnored { + ignoredUsersIDs, _ := db.GetIgnoredUsersIDs(authUser.ID) + if utils.InArr(msg.UserID, ignoredUsersIDs) { + return false + } + } + + if mentionsOnlyQuery && !strings.Contains(msg.Message, authUser.Username.AtStr()) { + return false + } + return true +} + +func ChatStreamMessagesHandler(c echo.Context) error { + db := c.Get("database").(*database.DkfDB) + authUser := c.Get("authUser").(*database.User) + csrf, _ := c.Get("csrf").(string) + + roomName := c.Param("roomName") + room, roomKey, err := dutils.GetRoomAndKey(db, c, roomName) + if err != nil { + return c.Redirect(http.StatusFound, "/") + } + + // Setup headers for http request + c.Response().Header().Set(echo.HeaderContentType, echo.MIMETextHTMLCharsetUTF8) + c.Response().WriteHeader(http.StatusOK) + c.Response().Header().Set("Transfer-Encoding", "chunked") + c.Response().Header().Set("Connection", "keep-alive") + + quit := hutils.CloseSignalChan(c) + + // Keep track of users streams, so we can limit how many are open at one time per user + if err := usersStreamsManager.UsersStreamsManagerInst.Add(authUser.ID, ""); err != nil { + return nil + } + defer usersStreamsManager.UsersStreamsManagerInst.Remove(authUser.ID, "") + + // Keep track of how many bytes we sent on the http request, so we can auto-refresh when passing a threshold + bytesSent := 0 + send := func(s string) { + n, _ := c.Response().Write([]byte(s)) + bytesSent += n + } + + data := ChatMessagesData{} + if _, found := c.QueryParams()["ml"]; found { + data.TopBarQueryParams = "&ml=1" + } + + // Register modals and send the css for them + modalsManager := streamModals.NewModalsManager() + modalsManager.Register(streamModals.NewCodeModal(authUser.ID, room)) + if authUser.IsAdmin { + modalsManager.Register(streamModals.NewPurgeModal(authUser.ID, room)) + } + send(modalsManager.Css()) + + data.ReadMarker, _ = db.GetUserReadMarker(authUser.ID, room.ID) + data.ChatMenuData.RoomName = room.Name + data.ManualRefreshTimeout = 0 + send(GenerateStyle(authUser, data)) + send(`<div id="i"></div>`) // http alive indicator; green/red dot + send(fmt.Sprintf(`<div style="display:flex;flex-direction:column-reverse;" id="msgs">`)) + + // Get initial messages for the user + pmOnlyQuery := dutils.DoParsePmDisplayMode(c.QueryParam("pmonly")) + mentionsOnlyQuery := utils.DoParseBool(c.QueryParam("mentionsOnly")) + displayHellbanned := authUser.DisplayHellbanned || authUser.IsHellbanned + displayIgnoredMessages := false + msgs, err := db.GetChatMessages(room.ID, roomKey, authUser.Username, authUser.ID, pmOnlyQuery, mentionsOnlyQuery, + displayHellbanned, authUser.DisplayIgnored, authUser.DisplayModerators, displayIgnoredMessages, 0) + if err != nil { + return c.Redirect(http.StatusFound, "/") + } + + // Render the messages as html + data.Messages = msgs + data.NbButtons = authUser.CountUIButtons() + nullUsername := config.NullUsername + send("<div>" + RenderMessages(authUser, data, csrf, nullUsername) + "</div>") + c.Response().Flush() + + // Create a subscriber and topics to listen to + selfRefreshTopic := "refresh_" + string(authUser.Username) + readMarkerTopic := "readmarker_" + authUser.ID.String() + authorizedTopics := []string{ + "refresh", + selfRefreshTopic, + readMarkerTopic, + "room_" + room.ID.String()} + authorizedTopics = append(authorizedTopics, modalsManager.Topics()...) + sub := database.MsgPubSub.Subscribe(authorizedTopics) + defer sub.Close() + + // Keep track of messages that are after the read-marker (unread). + // When we receive a "delete msg", and this map is empty, we should hide the read-marker + // as it means the read marker is now at the very top. + msgsMap := make(map[int64]struct{}) + for _, msg := range msgs { + if msg.CreatedAt.After(data.ReadMarker.ReadAt) { + msgsMap[msg.ID] = struct{}{} + } + } + + // If the read-marker is at the very top, it will be hidden and need to be displayed when we receive a new message. + // If it is not at the top, it will already be visible and does not need to be displayed again. + var displayReadMarker bool + if len(msgs) > 0 { + fstMsgTsRound := msgs[0].CreatedAt.Round(time.Second) + readMarkerTsRound := data.ReadMarker.ReadAt.Round(time.Second) + displayReadMarker = fstMsgTsRound.Before(readMarkerTsRound) || fstMsgTsRound.Equal(readMarkerTsRound) + } + + // Keep track of current read-marker revision + readMarkerRev := 0 + // Hide current rev of read marker and insert an invisible one at the top. + updateReadMarker := func() { + send(fmt.Sprintf(`<style>.read-marker-%d{display:none !important;}</style>`, readMarkerRev)) + send(fmt.Sprintf(`<div class="read-marker read-marker-%d" style="display:none;"></div>`, readMarkerRev+1)) + readMarkerRev++ + displayReadMarker = true + } + // Show the invisible read-marker which used to be at the top. + showReadMarker := func() { + if displayReadMarker { + send(fmt.Sprintf(`<style>.read-marker-%d{display:block !important;}</style>`, readMarkerRev)) + displayReadMarker = false + } + } + + // Toggle between true/false every 5sec. This bool keep track of which class to send for our "online indicator" + // We need to change the css class in order for the css to never actually complete the animation and stay "green". + var indicatorSelector bool + +Loop: + for { + select { + case <-quit: + break Loop + default: + } + + // Refresh the page to prevent having it growing infinitely bigger + if bytesSent > 10<<20 { // 10 MB + send(`<meta http-equiv="refresh" content="0" />`) + return nil + } + + authUserTmp, _ := db.GetUserByID(authUser.ID) + authUser = &authUserTmp + managers.ActiveUsers.UpdateUserInRoom(room, managers.NewUserInfo(authUser)) + + // Update read record + db.UpdateChatReadRecord(authUser.ID, room.ID) + + // Toggle the "http alive indicator" class to keep the dot green + send(fmt.Sprintf(`<style>#i{animation: %s 30s forwards}</style>`, + utils.Ternary(indicatorSelector, "i1", "i2"))) + indicatorSelector = !indicatorSelector + + topic, msgTyp, err := sub.ReceiveTimeout2(5*time.Second, quit) + if err != nil { + if err == pubsub.ErrCancelled { + break Loop + } + c.Response().Flush() + continue + } + + // We receive this event when the "update read-marker" is clicked. + // This means the user is saying that all messages are read, and read-marker should be at the very top. + if topic == readMarkerTopic { + msgsMap = make(map[int64]struct{}) // read-marker at the top, so no unread message + updateReadMarker() + c.Response().Flush() + continue + } + + if topic == selfRefreshTopic || msgTyp.Typ == database.ForceRefresh { + send(`<meta http-equiv="refresh" content="0" />`) + return nil + } + + if modalsManager.Handle(db, authUser, topic, csrf, msgTyp, send) { + c.Response().Flush() + continue + } + + if msgTyp.Typ == database.DeleteMsg { + // Delete msg from the map that keep track of unread messages. + // If the map is now empty, we hide the read-marker. + delete(msgsMap, msgTyp.Msg.ID) + if len(msgsMap) == 0 { + updateReadMarker() + } + + send(fmt.Sprintf(`<style>.msgidc-%s-%d{display:none;}</style>`, msgTyp.Msg.UUID, msgTyp.Msg.Rev)) + c.Response().Flush() + continue + } + + if msgTyp.Typ == database.EditMsg { + // Get all messages for the user that were created after the edited one (included) + msgs, err := db.GetChatMessages(room.ID, roomKey, authUser.Username, authUser.ID, pmOnlyQuery, + mentionsOnlyQuery, displayHellbanned, authUser.DisplayIgnored, authUser.DisplayModerators, + displayIgnoredMessages, msgTyp.Msg.ID) + if err != nil { + return c.Redirect(http.StatusFound, "/") + } + + // If no messages, continue. This might happen if the user has ignored the user making the edit. + if len(msgs) == 0 { + c.Response().Flush() + continue + } + + // Generate css to hide the previous revision of these messages + toHide := make([]string, len(msgs)) + for i, msg := range msgs { + toHide[i] = fmt.Sprintf(".msgidc-%s-%d", msg.UUID, msg.Rev-1) + } + send(fmt.Sprintf(`<style>%s{display:none;}</style>`, strings.Join(toHide, ","))) + + // Render the new revision of the messages in html + data.Messages = msgs + data.NbButtons = authUser.CountUIButtons() + send(RenderMessages(authUser, data, csrf, nullUsername)) + + c.Response().Flush() + continue + } + + msg := &msgTyp.Msg + if room.IsProtected() { + if err := msg.Decrypt(roomKey); err != nil { + return c.Redirect(http.StatusFound, "/") + } + } + + if !VerifyMsgAuth(db, authUser, msg) || + !applyUserFilters(db, authUser, msg, pmOnlyQuery, displayHellbanned, mentionsOnlyQuery) { + continue + } + + manualPreload(db, msg, room) + + baseTopBarURL := "/api/v1/chat/top-bar/" + room.Name + readMarkerRendered := true + isFirstMsg := false + renderedMsg := RenderMessage(1, *msg, authUser, data, baseTopBarURL, &readMarkerRendered, &isFirstMsg, csrf, nullUsername) + + // Keep track of unread messages + msgsMap[msg.ID] = struct{}{} + + send(renderedMsg) + showReadMarker() + + // Sound notifications + var newMessageSound, taggedSound, pmSound bool + if msg.User.ID != authUser.ID { + newMessageSound = true + if strings.Contains(renderedMsg, authUser.Username.AtStr()) { + taggedSound = true + } + if msg.IsPmRecipient(*authUser) { + pmSound = true + } + } + if (authUser.NotifyTagged && taggedSound) || (authUser.NotifyPmmed && pmSound) { + send(`<audio src="/public/mp3/sound5.mp3" autoplay></audio>`) + } else if authUser.NotifyNewMessage && newMessageSound { + send(`<audio src="/public/mp3/sound6.mp3" autoplay></audio>`) + } + + c.Response().Flush() + } // end of infinite loop (LOOP) + + // Display a big banner stating the connection is closed. + send(`<div class="connection-closed">Connection closed</div>`) + // Auto refresh the page after 5sec so that the client reconnect after the app has restarted + send(`<meta http-equiv="refresh" content="5" />`) + c.Response().Flush() + return nil +} diff --git a/pkg/web/handlers/chat.go b/pkg/web/handlers/chat.go @@ -8,6 +8,7 @@ import ( "dkforest/pkg/hashset" "dkforest/pkg/managers" "dkforest/pkg/utils" + v1 "dkforest/pkg/web/handlers/api/v1" hutils "dkforest/pkg/web/handlers/utils" "github.com/PuerkitoBio/goquery" "github.com/asaskevich/govalidator" @@ -400,7 +401,7 @@ func ChatCodeHandler(c echo.Context) error { return c.Redirect(http.StatusFound, "/") } - if !verifyMsgAuth(db, authUser, &msg) { + if !v1.VerifyMsgAuth(db, authUser, &msg) { return c.Redirect(http.StatusFound, "/") } diff --git a/pkg/web/handlers/chess.go b/pkg/web/handlers/chess.go @@ -7,6 +7,8 @@ import ( "dkforest/pkg/pubsub" "dkforest/pkg/utils" "dkforest/pkg/web/handlers/interceptors" + "dkforest/pkg/web/handlers/usersStreamsManager" + hutils "dkforest/pkg/web/handlers/utils" "fmt" "github.com/labstack/echo" "github.com/notnil/chess" @@ -267,12 +269,12 @@ func ChessGameHandler(c echo.Context) error { return nil } - quit := closeSignalChan(c) + quit := hutils.CloseSignalChan(c) - if err := usersStreamsManager.Add(authUser.ID, key); err != nil { + if err := usersStreamsManager.UsersStreamsManagerInst.Add(authUser.ID, key); err != nil { return nil } - defer usersStreamsManager.Remove(authUser.ID, key) + defer usersStreamsManager.UsersStreamsManagerInst.Remove(authUser.ID, key) c.Response().Header().Set(echo.HeaderContentType, echo.MIMETextHTMLCharsetUTF8) c.Response().WriteHeader(http.StatusOK) @@ -307,8 +309,8 @@ func ChessGameHandler(c echo.Context) error { case <-quit: return } - p1Count := usersStreamsManager.GetUserStreamsCountFor(p1ID, key) - p2Count := usersStreamsManager.GetUserStreamsCountFor(p2ID, key) + p1Count := usersStreamsManager.UsersStreamsManagerInst.GetUserStreamsCountFor(p1ID, key) + p2Count := usersStreamsManager.UsersStreamsManagerInst.GetUserStreamsCountFor(p2ID, key) if p1Online && p1Count == 0 { p1Online = false send(`<style>#p1Status { background-color: darkred !important; }</style>`) diff --git a/pkg/web/handlers/handlers.go b/pkg/web/handlers/handlers.go @@ -2,18 +2,14 @@ package handlers import ( "bytes" - "context" "dkforest/pkg/cache" "dkforest/pkg/captcha" "dkforest/pkg/config" "dkforest/pkg/database" dutils "dkforest/pkg/database/utils" - "dkforest/pkg/managers" "dkforest/pkg/odometer" - "dkforest/pkg/pubsub" "dkforest/pkg/utils" v1 "dkforest/pkg/web/handlers/api/v1" - "dkforest/pkg/web/handlers/streamModals" hutils "dkforest/pkg/web/handlers/utils" "encoding/base64" "fmt" @@ -32,11 +28,9 @@ import ( "net/http" "net/url" "os" - "os/signal" "path/filepath" "regexp" "strings" - "syscall" "time" ) @@ -782,362 +776,8 @@ func ChatStreamMenuHandler(c echo.Context) error { return c.HTML(http.StatusOK, s) } -func closeSignalChan(c echo.Context) <-chan struct{} { - ctx, cancel := context.WithCancel(context.Background()) - // Listen to the closing of HTTP connection via CloseNotifier - notify := c.Request().Context().Done() - notify1 := make(chan os.Signal) - signal.Notify(notify1, syscall.SIGINT, syscall.SIGTERM) - utils.SGo(func() { - select { - case <-notify: - case <-notify1: - } - cancel() - }) - return ctx.Done() -} - -// Returns either or not authUser is allowed to see msg -func verifyMsgAuth(db *database.DkfDB, authUser *database.User, msg *database.ChatMessage) bool { - // Verify moderators channel authorization - if msg.Moderators && !authUser.IsModerator() { - return false - } - // Verify group authorization - if msg.GroupID != nil { - userGroupsIDs, _ := db.GetUserRoomGroupsIDs(authUser.ID, msg.RoomID) - if !utils.InArr(*msg.GroupID, userGroupsIDs) { - return false - } - } - // verify PM authorization - if msg.IsPm() { - if msg.UserID != authUser.ID && *msg.ToUserID != authUser.ID { - return false - } - } - return true -} - -func manualPreload(db *database.DkfDB, msg *database.ChatMessage, room database.ChatRoom) { - if msg.GroupID != nil { - if msg.Group == nil { - group, _ := db.GetRoomGroupByID(msg.RoomID, *msg.GroupID) - msg.Group = &group - } - } - if msg.ToUserID != nil { - if msg.ToUser == nil { - toUser, _ := db.GetUserByID(*msg.ToUserID) - msg.ToUser = &toUser - } - } - if msg.User.ID == 0 { - msg.User, _ = db.GetUserByID(msg.UserID) - } - msg.Room = room -} - -// Return true if the message passes all the user's filter. -// false if the message does not and should be discarded. -func applyUserFilters(db *database.DkfDB, authUser *database.User, msg *database.ChatMessage, - pmOnlyQuery database.PmDisplayMode, displayHellbanned, mentionsOnlyQuery bool) bool { - if (pmOnlyQuery == database.PmOnly && msg.ToUser == nil) || - (pmOnlyQuery == database.PmNone && msg.ToUser != nil) || - !authUser.DisplayModerators && msg.Moderators || - !displayHellbanned && msg.IsHellbanned { - return false - } - - if !authUser.DisplayIgnored { - ignoredUsersIDs, _ := db.GetIgnoredUsersIDs(authUser.ID) - if utils.InArr(msg.UserID, ignoredUsersIDs) { - return false - } - } - - if mentionsOnlyQuery && !strings.Contains(msg.Message, authUser.Username.AtStr()) { - return false - } - return true -} - func ChatStreamMessagesRefreshHandler(c echo.Context) error { authUser := c.Get("authUser").(*database.User) database.MsgPubSub.Pub("refresh_"+string(authUser.Username), database.ChatMessageType{Typ: database.ForceRefresh}) return c.NoContent(http.StatusOK) } - -func ChatStreamMessagesHandler(c echo.Context) error { - db := c.Get("database").(*database.DkfDB) - authUser := c.Get("authUser").(*database.User) - csrf, _ := c.Get("csrf").(string) - - roomName := c.Param("roomName") - room, roomKey, err := dutils.GetRoomAndKey(db, c, roomName) - if err != nil { - return c.Redirect(http.StatusFound, "/") - } - - // Setup headers for http request - c.Response().Header().Set(echo.HeaderContentType, echo.MIMETextHTMLCharsetUTF8) - c.Response().WriteHeader(http.StatusOK) - c.Response().Header().Set("Transfer-Encoding", "chunked") - c.Response().Header().Set("Connection", "keep-alive") - - quit := closeSignalChan(c) - - // Keep track of users streams, so we can limit how many are open at one time per user - if err := usersStreamsManager.Add(authUser.ID, ""); err != nil { - return nil - } - defer usersStreamsManager.Remove(authUser.ID, "") - - // Keep track of how many bytes we sent on the http request, so we can auto-refresh when passing a threshold - bytesSent := 0 - send := func(s string) { - n, _ := c.Response().Write([]byte(s)) - bytesSent += n - } - - data := v1.ChatMessagesData{} - if _, found := c.QueryParams()["ml"]; found { - data.TopBarQueryParams = "&ml=1" - } - - // Register modals and send the css for them - modalsManager := streamModals.NewModalsManager() - modalsManager.Register(streamModals.NewCodeModal(authUser.ID, room)) - if authUser.IsAdmin { - modalsManager.Register(streamModals.NewPurgeModal(authUser.ID, room)) - } - send(modalsManager.Css()) - - data.ReadMarker, _ = db.GetUserReadMarker(authUser.ID, room.ID) - data.ChatMenuData.RoomName = room.Name - data.ManualRefreshTimeout = 0 - send(v1.GenerateStyle(authUser, data)) - send(`<div id="i"></div>`) // http alive indicator; green/red dot - send(fmt.Sprintf(`<div style="display:flex;flex-direction:column-reverse;" id="msgs">`)) - - // Get initial messages for the user - pmOnlyQuery := dutils.DoParsePmDisplayMode(c.QueryParam("pmonly")) - mentionsOnlyQuery := utils.DoParseBool(c.QueryParam("mentionsOnly")) - displayHellbanned := authUser.DisplayHellbanned || authUser.IsHellbanned - displayIgnoredMessages := false - msgs, err := db.GetChatMessages(room.ID, roomKey, authUser.Username, authUser.ID, pmOnlyQuery, mentionsOnlyQuery, - displayHellbanned, authUser.DisplayIgnored, authUser.DisplayModerators, displayIgnoredMessages, 0) - if err != nil { - return c.Redirect(http.StatusFound, "/") - } - - // Render the messages as html - data.Messages = msgs - data.NbButtons = authUser.CountUIButtons() - nullUsername := config.NullUsername - send("<div>" + v1.RenderMessages(authUser, data, csrf, nullUsername) + "</div>") - c.Response().Flush() - - // Create a subscriber and topics to listen to - selfRefreshTopic := "refresh_" + string(authUser.Username) - readMarkerTopic := "readmarker_" + authUser.ID.String() - authorizedTopics := []string{ - "refresh", - selfRefreshTopic, - readMarkerTopic, - "room_" + room.ID.String()} - authorizedTopics = append(authorizedTopics, modalsManager.Topics()...) - sub := database.MsgPubSub.Subscribe(authorizedTopics) - defer sub.Close() - - // Keep track of messages that are after the read-marker (unread). - // When we receive a "delete msg", and this map is empty, we should hide the read-marker - // as it means the read marker is now at the very top. - msgsMap := make(map[int64]struct{}) - for _, msg := range msgs { - if msg.CreatedAt.After(data.ReadMarker.ReadAt) { - msgsMap[msg.ID] = struct{}{} - } - } - - // If the read-marker is at the very top, it will be hidden and need to be displayed when we receive a new message. - // If it is not at the top, it will already be visible and does not need to be displayed again. - var displayReadMarker bool - if len(msgs) > 0 { - fstMsgTsRound := msgs[0].CreatedAt.Round(time.Second) - readMarkerTsRound := data.ReadMarker.ReadAt.Round(time.Second) - displayReadMarker = fstMsgTsRound.Before(readMarkerTsRound) || fstMsgTsRound.Equal(readMarkerTsRound) - } - - // Keep track of current read-marker revision - readMarkerRev := 0 - // Hide current rev of read marker and insert an invisible one at the top. - updateReadMarker := func() { - send(fmt.Sprintf(`<style>.read-marker-%d{display:none !important;}</style>`, readMarkerRev)) - send(fmt.Sprintf(`<div class="read-marker read-marker-%d" style="display:none;"></div>`, readMarkerRev+1)) - readMarkerRev++ - displayReadMarker = true - } - // Show the invisible read-marker which used to be at the top. - showReadMarker := func() { - if displayReadMarker { - send(fmt.Sprintf(`<style>.read-marker-%d{display:block !important;}</style>`, readMarkerRev)) - displayReadMarker = false - } - } - - // Toggle between true/false every 5sec. This bool keep track of which class to send for our "online indicator" - // We need to change the css class in order for the css to never actually complete the animation and stay "green". - var indicatorSelector bool - -Loop: - for { - select { - case <-quit: - break Loop - default: - } - - // Refresh the page to prevent having it growing infinitely bigger - if bytesSent > 10<<20 { // 10 MB - send(`<meta http-equiv="refresh" content="0" />`) - return nil - } - - authUserTmp, _ := db.GetUserByID(authUser.ID) - authUser = &authUserTmp - managers.ActiveUsers.UpdateUserInRoom(room, managers.NewUserInfo(authUser)) - - // Update read record - db.UpdateChatReadRecord(authUser.ID, room.ID) - - // Toggle the "http alive indicator" class to keep the dot green - send(fmt.Sprintf(`<style>#i{animation: %s 30s forwards}</style>`, - utils.Ternary(indicatorSelector, "i1", "i2"))) - indicatorSelector = !indicatorSelector - - topic, msgTyp, err := sub.ReceiveTimeout2(5*time.Second, quit) - if err != nil { - if err == pubsub.ErrCancelled { - break Loop - } - c.Response().Flush() - continue - } - - // We receive this event when the "update read-marker" is clicked. - // This means the user is saying that all messages are read, and read-marker should be at the very top. - if topic == readMarkerTopic { - msgsMap = make(map[int64]struct{}) // read-marker at the top, so no unread message - updateReadMarker() - c.Response().Flush() - continue - } - - if topic == selfRefreshTopic || msgTyp.Typ == database.ForceRefresh { - send(`<meta http-equiv="refresh" content="0" />`) - return nil - } - - if modalsManager.Handle(db, authUser, topic, csrf, msgTyp, send) { - c.Response().Flush() - continue - } - - if msgTyp.Typ == database.DeleteMsg { - // Delete msg from the map that keep track of unread messages. - // If the map is now empty, we hide the read-marker. - delete(msgsMap, msgTyp.Msg.ID) - if len(msgsMap) == 0 { - updateReadMarker() - } - - send(fmt.Sprintf(`<style>.msgidc-%s-%d{display:none;}</style>`, msgTyp.Msg.UUID, msgTyp.Msg.Rev)) - c.Response().Flush() - continue - } - - if msgTyp.Typ == database.EditMsg { - // Get all messages for the user that were created after the edited one (included) - msgs, err := db.GetChatMessages(room.ID, roomKey, authUser.Username, authUser.ID, pmOnlyQuery, - mentionsOnlyQuery, displayHellbanned, authUser.DisplayIgnored, authUser.DisplayModerators, - displayIgnoredMessages, msgTyp.Msg.ID) - if err != nil { - return c.Redirect(http.StatusFound, "/") - } - - // If no messages, continue. This might happen if the user has ignored the user making the edit. - if len(msgs) == 0 { - c.Response().Flush() - continue - } - - // Generate css to hide the previous revision of these messages - toHide := make([]string, len(msgs)) - for i, msg := range msgs { - toHide[i] = fmt.Sprintf(".msgidc-%s-%d", msg.UUID, msg.Rev-1) - } - send(fmt.Sprintf(`<style>%s{display:none;}</style>`, strings.Join(toHide, ","))) - - // Render the new revision of the messages in html - data.Messages = msgs - data.NbButtons = authUser.CountUIButtons() - send(v1.RenderMessages(authUser, data, csrf, nullUsername)) - - c.Response().Flush() - continue - } - - msg := &msgTyp.Msg - if room.IsProtected() { - if err := msg.Decrypt(roomKey); err != nil { - return c.Redirect(http.StatusFound, "/") - } - } - - if !verifyMsgAuth(db, authUser, msg) || - !applyUserFilters(db, authUser, msg, pmOnlyQuery, displayHellbanned, mentionsOnlyQuery) { - continue - } - - manualPreload(db, msg, room) - - baseTopBarURL := "/api/v1/chat/top-bar/" + room.Name - readMarkerRendered := true - isFirstMsg := false - renderedMsg := v1.RenderMessage(1, *msg, authUser, data, baseTopBarURL, &readMarkerRendered, &isFirstMsg, csrf, nullUsername) - - // Keep track of unread messages - msgsMap[msg.ID] = struct{}{} - - send(renderedMsg) - showReadMarker() - - // Sound notifications - var newMessageSound, taggedSound, pmSound bool - if msg.User.ID != authUser.ID { - newMessageSound = true - if strings.Contains(renderedMsg, authUser.Username.AtStr()) { - taggedSound = true - } - if msg.IsPmRecipient(*authUser) { - pmSound = true - } - } - if (authUser.NotifyTagged && taggedSound) || (authUser.NotifyPmmed && pmSound) { - send(`<audio src="/public/mp3/sound5.mp3" autoplay></audio>`) - } else if authUser.NotifyNewMessage && newMessageSound { - send(`<audio src="/public/mp3/sound6.mp3" autoplay></audio>`) - } - - c.Response().Flush() - } // end of infinite loop (LOOP) - - // Display a big banner stating the connection is closed. - send(`<div class="connection-closed">Connection closed</div>`) - // Auto refresh the page after 5sec so that the client reconnect after the app has restarted - send(`<meta http-equiv="refresh" content="5" />`) - c.Response().Flush() - return nil -} diff --git a/pkg/web/handlers/usersStreamsManager.go b/pkg/web/handlers/usersStreamsManager.go @@ -1,81 +0,0 @@ -package handlers - -import ( - "dkforest/pkg/database" - "errors" - "sync" -) - -const userMaxStream = 15 - -var ErrTooManyStreams = errors.New("too many streams") - -type UserStreamsMap map[string]int64 - -func (m *UserStreamsMap) count() (out int64) { - for _, v := range *m { - out += v - } - return -} - -// UsersStreamsManager ensure that a user doesn't have more than userMaxStream -// http long polling streams open at the same time. -// If the limit is reached, the pages will then refuse to load. -// This is to prevent a malicious user from opening unlimited amount of streams and wasting the server resources. -type UsersStreamsManager struct { - sync.RWMutex - m map[database.UserID]UserStreamsMap -} - -func NewUsersStreamsManager() *UsersStreamsManager { - return &UsersStreamsManager{m: make(map[database.UserID]UserStreamsMap)} -} - -func (m *UsersStreamsManager) Add(userID database.UserID, chessKey string) error { - m.Lock() - defer m.Unlock() - tmp, found := m.m[userID] - if found && tmp.count() >= userMaxStream { - return ErrTooManyStreams - } - if !found { - tmp = make(UserStreamsMap) - } - tmp[chessKey]++ - m.m[userID] = tmp - return nil -} - -func (m *UsersStreamsManager) Remove(userID database.UserID, chessKey string) { - m.Lock() - defer m.Unlock() - if tmp, found := m.m[userID]; found { - tmp[chessKey]-- - m.m[userID] = tmp - } -} - -func (m *UsersStreamsManager) GetUserStreamsCountFor(userID database.UserID, key string) (out int64) { - m.RLock() - defer m.RUnlock() - if userMap, found := m.m[userID]; found { - if nbStreams, found1 := userMap[key]; found1 { - return nbStreams - } - } - return -} - -func (m *UsersStreamsManager) GetUsers() (out []database.UserID) { - m.RLock() - defer m.RUnlock() - for userID, userMap := range m.m { - if userMap.count() > 0 { - out = append(out, userID) - } - } - return -} - -var usersStreamsManager = NewUsersStreamsManager() diff --git a/pkg/web/handlers/usersStreamsManager/usersStreamsManager.go b/pkg/web/handlers/usersStreamsManager/usersStreamsManager.go @@ -0,0 +1,81 @@ +package usersStreamsManager + +import ( + "dkforest/pkg/database" + "errors" + "sync" +) + +const userMaxStream = 15 + +var ErrTooManyStreams = errors.New("too many streams") + +type UserStreamsMap map[string]int64 + +func (m *UserStreamsMap) count() (out int64) { + for _, v := range *m { + out += v + } + return +} + +// UsersStreamsManager ensure that a user doesn't have more than userMaxStream +// http long polling streams open at the same time. +// If the limit is reached, the pages will then refuse to load. +// This is to prevent a malicious user from opening unlimited amount of streams and wasting the server resources. +type UsersStreamsManager struct { + sync.RWMutex + m map[database.UserID]UserStreamsMap +} + +func NewUsersStreamsManager() *UsersStreamsManager { + return &UsersStreamsManager{m: make(map[database.UserID]UserStreamsMap)} +} + +func (m *UsersStreamsManager) Add(userID database.UserID, chessKey string) error { + m.Lock() + defer m.Unlock() + tmp, found := m.m[userID] + if found && tmp.count() >= userMaxStream { + return ErrTooManyStreams + } + if !found { + tmp = make(UserStreamsMap) + } + tmp[chessKey]++ + m.m[userID] = tmp + return nil +} + +func (m *UsersStreamsManager) Remove(userID database.UserID, chessKey string) { + m.Lock() + defer m.Unlock() + if tmp, found := m.m[userID]; found { + tmp[chessKey]-- + m.m[userID] = tmp + } +} + +func (m *UsersStreamsManager) GetUserStreamsCountFor(userID database.UserID, key string) (out int64) { + m.RLock() + defer m.RUnlock() + if userMap, found := m.m[userID]; found { + if nbStreams, found1 := userMap[key]; found1 { + return nbStreams + } + } + return +} + +func (m *UsersStreamsManager) GetUsers() (out []database.UserID) { + m.RLock() + defer m.RUnlock() + for userID, userMap := range m.m { + if userMap.count() > 0 { + out = append(out, userID) + } + } + return +} + +var UsersStreamsManagerInst = NewUsersStreamsManager() diff --git a/pkg/web/handlers/utils/utils.go b/pkg/web/handlers/utils/utils.go @@ -1,6 +1,7 @@ package utils import ( + "context" "crypto/sha256" "dkforest/pkg/captcha" "encoding/base64" @@ -10,8 +11,11 @@ import ( "fmt" "net" "net/http" + "os" + "os/signal" "strconv" "strings" + "syscall" "time" "dkforest/pkg/config" @@ -184,3 +188,19 @@ func VerifyPow(username, nonce string, difficulty int) bool { prefix := strings.Repeat("0", difficulty) return strings.HasPrefix(hashed, prefix) } + +func CloseSignalChan(c echo.Context) <-chan struct{} { + ctx, cancel := context.WithCancel(context.Background()) + // Listen to the closing of HTTP connection via CloseNotifier + notify := c.Request().Context().Done() + notify1 := make(chan os.Signal) + signal.Notify(notify1, syscall.SIGINT, syscall.SIGTERM) + utils.SGo(func() { + select { + case <-notify: + case <-notify1: + } + cancel() + }) + return ctx.Done() +} diff --git a/pkg/web/web.go b/pkg/web/web.go @@ -152,7 +152,7 @@ func getMainServer(db *database.DkfDB, i18nBundle *i18n.Bundle, renderer *tmp.Te authGroup.POST("/api/v1/chat/top-bar/:roomName", v1.ChatTopBarHandler, middlewares.AuthRateLimitMiddleware(1*time.Second, 3)) authGroup.GET("/api/v1/chat/messages/:roomName", v1.ChatMessagesHandler) authGroup.GET("/api/v1/chat/messages/:roomName/refresh", handlers.ChatStreamMessagesRefreshHandler, middlewares.AuthRateLimitMiddleware(1*time.Second, 4)) - authGroup.GET("/api/v1/chat/messages/:roomName/stream", handlers.ChatStreamMessagesHandler, middlewares.AuthRateLimitMiddleware(1*time.Second, 4)) + authGroup.GET("/api/v1/chat/messages/:roomName/stream", v1.ChatStreamMessagesHandler, middlewares.AuthRateLimitMiddleware(1*time.Second, 4)) authGroup.GET("/api/v1/chat/messages/:roomName/stream/menu", handlers.ChatStreamMenuHandler) authGroup.POST("/api/v1/notifications/delete/:notificationID", v1.DeleteNotificationHandler) authGroup.POST("/api/v1/session-notifications/delete/:sessionNotificationID", v1.DeleteSessionNotificationHandler)