Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: GetPinnedConversationIDs #2660

Merged
merged 2 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.72-alpha.27
github.com/openimsdk/protocol v0.0.72-alpha.29
github.com/openimsdk/tools v0.0.50-alpha.12
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.72-alpha.27 h1:S6n3uj7YhKjo2NCHHSnUijaJ9YYiy8TTMquc4EJOm50=
github.com/openimsdk/protocol v0.0.72-alpha.27/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/protocol v0.0.72-alpha.29 h1:z6Bm57IW/HNxTAJmqYjhVaLRUJLVIK0EH7G7HBzbwdc=
github.com/openimsdk/protocol v0.0.72-alpha.29/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.50-alpha.12 h1:rV3BxgqN+F79vZvdoQ+97Eob8ScsRVEM8D+Wrcl23uo=
github.com/openimsdk/tools v0.0.50-alpha.12/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
Expand Down
4 changes: 4 additions & 0 deletions internal/api/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,7 @@ func (o *ConversationApi) GetOwnerConversation(c *gin.Context) {
func (o *ConversationApi) GetNotNotifyConversationIDs(c *gin.Context) {
a2r.Call(conversation.ConversationClient.GetNotNotifyConversationIDs, o.Client, c)
}

func (o *ConversationApi) GetPinnedConversationIDs(c *gin.Context) {
a2r.Call(conversation.ConversationClient.GetPinnedConversationIDs, o.Client, c)
}
1 change: 1 addition & 0 deletions internal/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
conversationGroup.POST("/get_incremental_conversations", c.GetIncrementalConversation)
conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation)
conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs)
conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs)
}

statisticsGroup := r.Group("/statistics")
Expand Down
8 changes: 8 additions & 0 deletions internal/rpc/conversation/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,3 +718,11 @@ func (c *conversationServer) GetNotNotifyConversationIDs(ctx context.Context, re
}
return &pbconversation.GetNotNotifyConversationIDsResp{ConversationIDs: conversationIDs}, nil
}

func (c *conversationServer) GetPinnedConversationIDs(ctx context.Context, req *pbconversation.GetPinnedConversationIDsReq) (*pbconversation.GetPinnedConversationIDsResp, error) {
conversationIDs, err := c.conversationDatabase.GetPinnedConversationIDs(ctx, req.UserID)
if err != nil {
return nil, err
}
return &pbconversation.GetPinnedConversationIDsResp{ConversationIDs: conversationIDs}, nil
}
5 changes: 5 additions & 0 deletions pkg/common/storage/cache/cachekey/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
ConversationKey = "CONVERSATION:"
ConversationIDsKey = "CONVERSATION_IDS:"
NotNotifyConversationIDsKey = "NOT_NOTIFY_CONVERSATION_IDS:"
PinnedConversationIDsKey = "PINNED_CONVERSATION_IDS:"
ConversationIDsHashKey = "CONVERSATION_IDS_HASH:"
ConversationHasReadSeqKey = "CONVERSATION_HAS_READ_SEQ:"
RecvMsgOptKey = "RECV_MSG_OPT:"
Expand All @@ -39,6 +40,10 @@ func GetNotNotifyConversationIDsKey(ownerUserID string) string {
return NotNotifyConversationIDsKey + ownerUserID
}

func GetPinnedConversationIDs(ownerUserID string) string {
return PinnedConversationIDsKey + ownerUserID
}

func GetSuperGroupRecvNotNotifyUserIDsKey(groupID string) string {
return SuperGroupRecvMsgNotNotifyUserIDsKey + groupID
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/common/storage/cache/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type ConversationCache interface {
// get user's conversationIDs from msgCache
GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error)
GetUserNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error)
GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error)
DelConversationIDs(userIDs ...string) ConversationCache

GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error)
Expand Down Expand Up @@ -56,6 +57,7 @@ type ConversationCache interface {
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache
DelConversationNotNotifyMessageUserIDs(userIDs ...string) ConversationCache
DelConversationPinnedMessageUserIDs(userIDs ...string) ConversationCache
DelConversationVersionUserIDs(userIDs ...string) ConversationCache

FindMaxConversationUserVersion(ctx context.Context, userID string) (*relationtb.VersionLog, error)
Expand Down
18 changes: 18 additions & 0 deletions pkg/common/storage/cache/redis/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ func (c *ConversationRedisCache) getNotNotifyConversationIDsKey(ownerUserID stri
return cachekey.GetNotNotifyConversationIDsKey(ownerUserID)
}

func (c *ConversationRedisCache) getPinnedConversationIDsKey(ownerUserID string) string {
return cachekey.GetPinnedConversationIDs(ownerUserID)
}

func (c *ConversationRedisCache) getSuperGroupRecvNotNotifyUserIDsKey(groupID string) string {
return cachekey.GetSuperGroupRecvNotNotifyUserIDsKey(groupID)
}
Expand Down Expand Up @@ -115,6 +119,12 @@ func (c *ConversationRedisCache) GetUserNotNotifyConversationIDs(ctx context.Con
})
}

func (c *ConversationRedisCache) GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error) {
return getCache(ctx, c.rcClient, c.getPinnedConversationIDsKey(userID), c.expireTime, func(ctx context.Context) ([]string, error) {
return c.conversationDB.FindUserIDAllPinnedConversationID(ctx, userID)
})
}

func (c *ConversationRedisCache) DelConversationIDs(userIDs ...string) cache.ConversationCache {
keys := make([]string, 0, len(userIDs))
for _, userID := range userIDs {
Expand Down Expand Up @@ -260,6 +270,14 @@ func (c *ConversationRedisCache) DelConversationNotNotifyMessageUserIDs(userIDs
return cache
}

func (c *ConversationRedisCache) DelConversationPinnedMessageUserIDs(userIDs ...string) cache.ConversationCache {
cache := c.CloneConversationCache()
for _, userID := range userIDs {
cache.AddKeys(c.getPinnedConversationIDsKey(userID))
}
return cache
}

func (c *ConversationRedisCache) DelConversationVersionUserIDs(userIDs ...string) cache.ConversationCache {
cache := c.CloneConversationCache()
for _, userID := range userIDs {
Expand Down
32 changes: 29 additions & 3 deletions pkg/common/storage/controller/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type ConversationDatabase interface {
GetOwnerConversation(ctx context.Context, ownerUserID string, pagination pagination.Pagination) (int64, []*relationtb.Conversation, error)
// GetNotNotifyConversationIDs gets not notify conversationIDs by userID
GetNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error)
// GetPinnedConversationIDs gets pinned conversationIDs by userID
GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error)
}

func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase {
Expand Down Expand Up @@ -112,6 +114,9 @@ func (c *conversationDatabase) SetUsersConversationFieldTx(ctx context.Context,
cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID)
cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...)
}
if _, ok := fieldMap["is_pinned"]; ok {
cache = cache.DelConversationPinnedMessageUserIDs(userIDs...)
}
cache = cache.DelConversationVersionUserIDs(haveUserIDs...)
}
NotUserIDs := stringutil.DifferenceString(haveUserIDs, userIDs)
Expand Down Expand Up @@ -149,6 +154,9 @@ func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context,
cache = cache.DelConversationNotReceiveMessageUserIDs(conversationID)
cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...)
}
if _, ok := args["is_pinned"]; ok {
cache = cache.DelConversationPinnedMessageUserIDs(userIDs...)
}
return cache.ChainExecDel(ctx)
}

Expand All @@ -159,6 +167,7 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat
var (
userIDs []string
notNotifyUserIDs []string
pinnedUserIDs []string
)

cache := c.cache.CloneConversationCache()
Expand All @@ -169,9 +178,16 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat
if conversation.RecvMsgOpt == constant.ReceiveNotNotifyMessage {
notNotifyUserIDs = append(notNotifyUserIDs, conversation.OwnerUserID)
}
if conversation.IsPinned == true {
pinnedUserIDs = append(pinnedUserIDs, conversation.OwnerUserID)
}
}
return cache.DelConversationIDs(userIDs...).DelUserConversationIDsHash(userIDs...).DelConversationVersionUserIDs(userIDs...).
DelConversationNotNotifyMessageUserIDs(notNotifyUserIDs...).ChainExecDel(ctx)
return cache.DelConversationIDs(userIDs...).
DelUserConversationIDsHash(userIDs...).
DelConversationVersionUserIDs(userIDs...).
DelConversationNotNotifyMessageUserIDs(notNotifyUserIDs...).
DelConversationPinnedMessageUserIDs(pinnedUserIDs...).
ChainExecDel(ctx)
}

func (c *conversationDatabase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversations []*relationtb.Conversation) error {
Expand Down Expand Up @@ -224,7 +240,9 @@ func (c *conversationDatabase) GetUserAllConversation(ctx context.Context, owner
func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationtb.Conversation) error {
return c.tx.Transaction(ctx, func(ctx context.Context) error {
cache := c.cache.CloneConversationCache()
cache = cache.DelConversationVersionUserIDs(ownerUserID).DelConversationNotNotifyMessageUserIDs(ownerUserID)
cache = cache.DelConversationVersionUserIDs(ownerUserID).
DelConversationNotNotifyMessageUserIDs(ownerUserID).
DelConversationPinnedMessageUserIDs(ownerUserID)

groupIDs := datautil.Distinct(datautil.Filter(conversations, func(e *relationtb.Conversation) (string, bool) {
return e.GroupID, e.GroupID != ""
Expand Down Expand Up @@ -374,3 +392,11 @@ func (c *conversationDatabase) GetNotNotifyConversationIDs(ctx context.Context,
}
return conversationIDs, nil
}

func (c *conversationDatabase) GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error) {
conversationIDs, err := c.cache.GetPinnedConversationIDs(ctx, userID)
if err != nil {
return nil, err
}
return conversationIDs, nil
}
1 change: 1 addition & 0 deletions pkg/common/storage/database/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Conversation interface {
FindUserID(ctx context.Context, userIDs []string, conversationIDs []string) ([]string, error)
FindUserIDAllConversationID(ctx context.Context, userID string) ([]string, error)
FindUserIDAllNotNotifyConversationID(ctx context.Context, userID string) ([]string, error)
FindUserIDAllPinnedConversationID(ctx context.Context, userID string) ([]string, error)
Take(ctx context.Context, userID, conversationID string) (conversation *model.Conversation, err error)
FindConversationID(ctx context.Context, userID string, conversationIDs []string) (existConversationID []string, err error)
FindUserIDAllConversations(ctx context.Context, userID string) (conversations []*model.Conversation, err error)
Expand Down
7 changes: 7 additions & 0 deletions pkg/common/storage/database/mgo/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@ func (c *ConversationMgo) FindUserIDAllNotNotifyConversationID(ctx context.Conte
}, options.Find().SetProjection(bson.M{"_id": 0, "conversation_id": 1}))
}

func (c *ConversationMgo) FindUserIDAllPinnedConversationID(ctx context.Context, userID string) ([]string, error) {
return mongoutil.Find[string](ctx, c.coll, bson.M{
"owner_user_id": userID,
"is_pinned": true,
}, options.Find().SetProjection(bson.M{"_id": 0, "conversation_id": 1}))
}

func (c *ConversationMgo) Take(ctx context.Context, userID, conversationID string) (conversation *model.Conversation, err error) {
return mongoutil.FindOne[*model.Conversation](ctx, c.coll, bson.M{"owner_user_id": userID, "conversation_id": conversationID})
}
Expand Down
Loading