diff --git a/go.mod b/go.mod index 71e4d07e09..54dd68d632 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.72-alpha.27 + github.com/openimsdk/protocol v0.0.72-alpha.29 github.com/openimsdk/tools v0.0.50-alpha.12 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index 330e67adf9..aeab03055e 100644 --- a/go.sum +++ b/go.sum @@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72-alpha.27 h1:S6n3uj7YhKjo2NCHHSnUijaJ9YYiy8TTMquc4EJOm50= -github.com/openimsdk/protocol v0.0.72-alpha.27/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/protocol v0.0.72-alpha.29 h1:z6Bm57IW/HNxTAJmqYjhVaLRUJLVIK0EH7G7HBzbwdc= +github.com/openimsdk/protocol v0.0.72-alpha.29/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/tools v0.0.50-alpha.12 h1:rV3BxgqN+F79vZvdoQ+97Eob8ScsRVEM8D+Wrcl23uo= github.com/openimsdk/tools v0.0.50-alpha.12/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= diff --git a/internal/api/conversation.go b/internal/api/conversation.go index 8e1a46d478..8e3a3ca82d 100644 --- a/internal/api/conversation.go +++ b/internal/api/conversation.go @@ -66,3 +66,7 @@ func (o *ConversationApi) GetOwnerConversation(c *gin.Context) { func (o *ConversationApi) GetNotNotifyConversationIDs(c *gin.Context) { a2r.Call(conversation.ConversationClient.GetNotNotifyConversationIDs, o.Client, c) } + +func (o *ConversationApi) GetPinnedConversationIDs(c *gin.Context) { + a2r.Call(conversation.ConversationClient.GetPinnedConversationIDs, o.Client, c) +} diff --git a/internal/api/router.go b/internal/api/router.go index 6b0278864f..318c2a7755 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -233,6 +233,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En conversationGroup.POST("/get_incremental_conversations", c.GetIncrementalConversation) conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation) conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs) + conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs) } statisticsGroup := r.Group("/statistics") diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index 3098d57915..6f6ca1f674 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -718,3 +718,11 @@ func (c *conversationServer) GetNotNotifyConversationIDs(ctx context.Context, re } return &pbconversation.GetNotNotifyConversationIDsResp{ConversationIDs: conversationIDs}, nil } + +func (c *conversationServer) GetPinnedConversationIDs(ctx context.Context, req *pbconversation.GetPinnedConversationIDsReq) (*pbconversation.GetPinnedConversationIDsResp, error) { + conversationIDs, err := c.conversationDatabase.GetPinnedConversationIDs(ctx, req.UserID) + if err != nil { + return nil, err + } + return &pbconversation.GetPinnedConversationIDsResp{ConversationIDs: conversationIDs}, nil +} diff --git a/pkg/common/storage/cache/cachekey/conversation.go b/pkg/common/storage/cache/cachekey/conversation.go index acc9d15cfd..909774288b 100644 --- a/pkg/common/storage/cache/cachekey/conversation.go +++ b/pkg/common/storage/cache/cachekey/conversation.go @@ -18,6 +18,7 @@ const ( ConversationKey = "CONVERSATION:" ConversationIDsKey = "CONVERSATION_IDS:" NotNotifyConversationIDsKey = "NOT_NOTIFY_CONVERSATION_IDS:" + PinnedConversationIDsKey = "PINNED_CONVERSATION_IDS:" ConversationIDsHashKey = "CONVERSATION_IDS_HASH:" ConversationHasReadSeqKey = "CONVERSATION_HAS_READ_SEQ:" RecvMsgOptKey = "RECV_MSG_OPT:" @@ -39,6 +40,10 @@ func GetNotNotifyConversationIDsKey(ownerUserID string) string { return NotNotifyConversationIDsKey + ownerUserID } +func GetPinnedConversationIDs(ownerUserID string) string { + return PinnedConversationIDsKey + ownerUserID +} + func GetSuperGroupRecvNotNotifyUserIDsKey(groupID string) string { return SuperGroupRecvMsgNotNotifyUserIDsKey + groupID } diff --git a/pkg/common/storage/cache/conversation.go b/pkg/common/storage/cache/conversation.go index 8970db29c8..ac3011107c 100644 --- a/pkg/common/storage/cache/conversation.go +++ b/pkg/common/storage/cache/conversation.go @@ -26,6 +26,7 @@ type ConversationCache interface { // get user's conversationIDs from msgCache GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) GetUserNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error) + GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error) DelConversationIDs(userIDs ...string) ConversationCache GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error) @@ -56,6 +57,7 @@ type ConversationCache interface { GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache DelConversationNotNotifyMessageUserIDs(userIDs ...string) ConversationCache + DelConversationPinnedMessageUserIDs(userIDs ...string) ConversationCache DelConversationVersionUserIDs(userIDs ...string) ConversationCache FindMaxConversationUserVersion(ctx context.Context, userID string) (*relationtb.VersionLog, error) diff --git a/pkg/common/storage/cache/redis/conversation.go b/pkg/common/storage/cache/redis/conversation.go index 40df1e57a1..326f60b96a 100644 --- a/pkg/common/storage/cache/redis/conversation.go +++ b/pkg/common/storage/cache/redis/conversation.go @@ -75,6 +75,10 @@ func (c *ConversationRedisCache) getNotNotifyConversationIDsKey(ownerUserID stri return cachekey.GetNotNotifyConversationIDsKey(ownerUserID) } +func (c *ConversationRedisCache) getPinnedConversationIDsKey(ownerUserID string) string { + return cachekey.GetPinnedConversationIDs(ownerUserID) +} + func (c *ConversationRedisCache) getSuperGroupRecvNotNotifyUserIDsKey(groupID string) string { return cachekey.GetSuperGroupRecvNotNotifyUserIDsKey(groupID) } @@ -115,6 +119,12 @@ func (c *ConversationRedisCache) GetUserNotNotifyConversationIDs(ctx context.Con }) } +func (c *ConversationRedisCache) GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error) { + return getCache(ctx, c.rcClient, c.getPinnedConversationIDsKey(userID), c.expireTime, func(ctx context.Context) ([]string, error) { + return c.conversationDB.FindUserIDAllPinnedConversationID(ctx, userID) + }) +} + func (c *ConversationRedisCache) DelConversationIDs(userIDs ...string) cache.ConversationCache { keys := make([]string, 0, len(userIDs)) for _, userID := range userIDs { @@ -260,6 +270,14 @@ func (c *ConversationRedisCache) DelConversationNotNotifyMessageUserIDs(userIDs return cache } +func (c *ConversationRedisCache) DelConversationPinnedMessageUserIDs(userIDs ...string) cache.ConversationCache { + cache := c.CloneConversationCache() + for _, userID := range userIDs { + cache.AddKeys(c.getPinnedConversationIDsKey(userID)) + } + return cache +} + func (c *ConversationRedisCache) DelConversationVersionUserIDs(userIDs ...string) cache.ConversationCache { cache := c.CloneConversationCache() for _, userID := range userIDs { diff --git a/pkg/common/storage/controller/conversation.go b/pkg/common/storage/controller/conversation.go index 46ac9a1f9c..06a0733658 100644 --- a/pkg/common/storage/controller/conversation.go +++ b/pkg/common/storage/controller/conversation.go @@ -71,6 +71,8 @@ type ConversationDatabase interface { GetOwnerConversation(ctx context.Context, ownerUserID string, pagination pagination.Pagination) (int64, []*relationtb.Conversation, error) // GetNotNotifyConversationIDs gets not notify conversationIDs by userID GetNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error) + // GetPinnedConversationIDs gets pinned conversationIDs by userID + GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error) } func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase { @@ -112,6 +114,9 @@ func (c *conversationDatabase) SetUsersConversationFieldTx(ctx context.Context, cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID) cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...) } + if _, ok := fieldMap["is_pinned"]; ok { + cache = cache.DelConversationPinnedMessageUserIDs(userIDs...) + } cache = cache.DelConversationVersionUserIDs(haveUserIDs...) } NotUserIDs := stringutil.DifferenceString(haveUserIDs, userIDs) @@ -149,6 +154,9 @@ func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context, cache = cache.DelConversationNotReceiveMessageUserIDs(conversationID) cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...) } + if _, ok := args["is_pinned"]; ok { + cache = cache.DelConversationPinnedMessageUserIDs(userIDs...) + } return cache.ChainExecDel(ctx) } @@ -159,6 +167,7 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat var ( userIDs []string notNotifyUserIDs []string + pinnedUserIDs []string ) cache := c.cache.CloneConversationCache() @@ -169,9 +178,16 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat if conversation.RecvMsgOpt == constant.ReceiveNotNotifyMessage { notNotifyUserIDs = append(notNotifyUserIDs, conversation.OwnerUserID) } + if conversation.IsPinned == true { + pinnedUserIDs = append(pinnedUserIDs, conversation.OwnerUserID) + } } - return cache.DelConversationIDs(userIDs...).DelUserConversationIDsHash(userIDs...).DelConversationVersionUserIDs(userIDs...). - DelConversationNotNotifyMessageUserIDs(notNotifyUserIDs...).ChainExecDel(ctx) + return cache.DelConversationIDs(userIDs...). + DelUserConversationIDsHash(userIDs...). + DelConversationVersionUserIDs(userIDs...). + DelConversationNotNotifyMessageUserIDs(notNotifyUserIDs...). + DelConversationPinnedMessageUserIDs(pinnedUserIDs...). + ChainExecDel(ctx) } func (c *conversationDatabase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversations []*relationtb.Conversation) error { @@ -224,7 +240,9 @@ func (c *conversationDatabase) GetUserAllConversation(ctx context.Context, owner func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationtb.Conversation) error { return c.tx.Transaction(ctx, func(ctx context.Context) error { cache := c.cache.CloneConversationCache() - cache = cache.DelConversationVersionUserIDs(ownerUserID).DelConversationNotNotifyMessageUserIDs(ownerUserID) + cache = cache.DelConversationVersionUserIDs(ownerUserID). + DelConversationNotNotifyMessageUserIDs(ownerUserID). + DelConversationPinnedMessageUserIDs(ownerUserID) groupIDs := datautil.Distinct(datautil.Filter(conversations, func(e *relationtb.Conversation) (string, bool) { return e.GroupID, e.GroupID != "" @@ -374,3 +392,11 @@ func (c *conversationDatabase) GetNotNotifyConversationIDs(ctx context.Context, } return conversationIDs, nil } + +func (c *conversationDatabase) GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error) { + conversationIDs, err := c.cache.GetPinnedConversationIDs(ctx, userID) + if err != nil { + return nil, err + } + return conversationIDs, nil +} diff --git a/pkg/common/storage/database/conversation.go b/pkg/common/storage/database/conversation.go index 2c20f73bcd..5a9b19035d 100644 --- a/pkg/common/storage/database/conversation.go +++ b/pkg/common/storage/database/conversation.go @@ -28,6 +28,7 @@ type Conversation interface { FindUserID(ctx context.Context, userIDs []string, conversationIDs []string) ([]string, error) FindUserIDAllConversationID(ctx context.Context, userID string) ([]string, error) FindUserIDAllNotNotifyConversationID(ctx context.Context, userID string) ([]string, error) + FindUserIDAllPinnedConversationID(ctx context.Context, userID string) ([]string, error) Take(ctx context.Context, userID, conversationID string) (conversation *model.Conversation, err error) FindConversationID(ctx context.Context, userID string, conversationIDs []string) (existConversationID []string, err error) FindUserIDAllConversations(ctx context.Context, userID string) (conversations []*model.Conversation, err error) diff --git a/pkg/common/storage/database/mgo/conversation.go b/pkg/common/storage/database/mgo/conversation.go index 4c936aedce..f7ced1c2cf 100644 --- a/pkg/common/storage/database/mgo/conversation.go +++ b/pkg/common/storage/database/mgo/conversation.go @@ -131,6 +131,13 @@ func (c *ConversationMgo) FindUserIDAllNotNotifyConversationID(ctx context.Conte }, options.Find().SetProjection(bson.M{"_id": 0, "conversation_id": 1})) } +func (c *ConversationMgo) FindUserIDAllPinnedConversationID(ctx context.Context, userID string) ([]string, error) { + return mongoutil.Find[string](ctx, c.coll, bson.M{ + "owner_user_id": userID, + "is_pinned": true, + }, options.Find().SetProjection(bson.M{"_id": 0, "conversation_id": 1})) +} + func (c *ConversationMgo) Take(ctx context.Context, userID, conversationID string) (conversation *model.Conversation, err error) { return mongoutil.FindOne[*model.Conversation](ctx, c.coll, bson.M{"owner_user_id": userID, "conversation_id": conversationID}) }