Skip to content

Commit

Permalink
feat: GetPinnedConversationIDs (#2660)
Browse files Browse the repository at this point in the history
* feat: GetPinnedConversationIDs

* feat: api
  • Loading branch information
icey-yu committed Sep 25, 2024
1 parent 51aaf08 commit 7da87e1
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 6 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.72-alpha.27
github.com/openimsdk/protocol v0.0.72-alpha.29
github.com/openimsdk/tools v0.0.50-alpha.12
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.72-alpha.27 h1:S6n3uj7YhKjo2NCHHSnUijaJ9YYiy8TTMquc4EJOm50=
github.com/openimsdk/protocol v0.0.72-alpha.27/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/protocol v0.0.72-alpha.29 h1:z6Bm57IW/HNxTAJmqYjhVaLRUJLVIK0EH7G7HBzbwdc=
github.com/openimsdk/protocol v0.0.72-alpha.29/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.50-alpha.12 h1:rV3BxgqN+F79vZvdoQ+97Eob8ScsRVEM8D+Wrcl23uo=
github.com/openimsdk/tools v0.0.50-alpha.12/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
Expand Down
4 changes: 4 additions & 0 deletions internal/api/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,7 @@ func (o *ConversationApi) GetOwnerConversation(c *gin.Context) {
func (o *ConversationApi) GetNotNotifyConversationIDs(c *gin.Context) {
a2r.Call(conversation.ConversationClient.GetNotNotifyConversationIDs, o.Client, c)
}

func (o *ConversationApi) GetPinnedConversationIDs(c *gin.Context) {
a2r.Call(conversation.ConversationClient.GetPinnedConversationIDs, o.Client, c)
}
1 change: 1 addition & 0 deletions internal/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
conversationGroup.POST("/get_incremental_conversations", c.GetIncrementalConversation)
conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation)
conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs)
conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs)
}

statisticsGroup := r.Group("/statistics")
Expand Down
8 changes: 8 additions & 0 deletions internal/rpc/conversation/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,3 +718,11 @@ func (c *conversationServer) GetNotNotifyConversationIDs(ctx context.Context, re
}
return &pbconversation.GetNotNotifyConversationIDsResp{ConversationIDs: conversationIDs}, nil
}

func (c *conversationServer) GetPinnedConversationIDs(ctx context.Context, req *pbconversation.GetPinnedConversationIDsReq) (*pbconversation.GetPinnedConversationIDsResp, error) {
conversationIDs, err := c.conversationDatabase.GetPinnedConversationIDs(ctx, req.UserID)
if err != nil {
return nil, err
}
return &pbconversation.GetPinnedConversationIDsResp{ConversationIDs: conversationIDs}, nil
}
5 changes: 5 additions & 0 deletions pkg/common/storage/cache/cachekey/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
ConversationKey = "CONVERSATION:"
ConversationIDsKey = "CONVERSATION_IDS:"
NotNotifyConversationIDsKey = "NOT_NOTIFY_CONVERSATION_IDS:"
PinnedConversationIDsKey = "PINNED_CONVERSATION_IDS:"
ConversationIDsHashKey = "CONVERSATION_IDS_HASH:"
ConversationHasReadSeqKey = "CONVERSATION_HAS_READ_SEQ:"
RecvMsgOptKey = "RECV_MSG_OPT:"
Expand All @@ -39,6 +40,10 @@ func GetNotNotifyConversationIDsKey(ownerUserID string) string {
return NotNotifyConversationIDsKey + ownerUserID
}

func GetPinnedConversationIDs(ownerUserID string) string {
return PinnedConversationIDsKey + ownerUserID
}

func GetSuperGroupRecvNotNotifyUserIDsKey(groupID string) string {
return SuperGroupRecvMsgNotNotifyUserIDsKey + groupID
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/common/storage/cache/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type ConversationCache interface {
// get user's conversationIDs from msgCache
GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error)
GetUserNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error)
GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error)
DelConversationIDs(userIDs ...string) ConversationCache

GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error)
Expand Down Expand Up @@ -56,6 +57,7 @@ type ConversationCache interface {
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache
DelConversationNotNotifyMessageUserIDs(userIDs ...string) ConversationCache
DelConversationPinnedMessageUserIDs(userIDs ...string) ConversationCache
DelConversationVersionUserIDs(userIDs ...string) ConversationCache

FindMaxConversationUserVersion(ctx context.Context, userID string) (*relationtb.VersionLog, error)
Expand Down
18 changes: 18 additions & 0 deletions pkg/common/storage/cache/redis/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ func (c *ConversationRedisCache) getNotNotifyConversationIDsKey(ownerUserID stri
return cachekey.GetNotNotifyConversationIDsKey(ownerUserID)
}

func (c *ConversationRedisCache) getPinnedConversationIDsKey(ownerUserID string) string {
return cachekey.GetPinnedConversationIDs(ownerUserID)
}

func (c *ConversationRedisCache) getSuperGroupRecvNotNotifyUserIDsKey(groupID string) string {
return cachekey.GetSuperGroupRecvNotNotifyUserIDsKey(groupID)
}
Expand Down Expand Up @@ -115,6 +119,12 @@ func (c *ConversationRedisCache) GetUserNotNotifyConversationIDs(ctx context.Con
})
}

func (c *ConversationRedisCache) GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error) {
return getCache(ctx, c.rcClient, c.getPinnedConversationIDsKey(userID), c.expireTime, func(ctx context.Context) ([]string, error) {
return c.conversationDB.FindUserIDAllPinnedConversationID(ctx, userID)
})
}

func (c *ConversationRedisCache) DelConversationIDs(userIDs ...string) cache.ConversationCache {
keys := make([]string, 0, len(userIDs))
for _, userID := range userIDs {
Expand Down Expand Up @@ -260,6 +270,14 @@ func (c *ConversationRedisCache) DelConversationNotNotifyMessageUserIDs(userIDs
return cache
}

func (c *ConversationRedisCache) DelConversationPinnedMessageUserIDs(userIDs ...string) cache.ConversationCache {
cache := c.CloneConversationCache()
for _, userID := range userIDs {
cache.AddKeys(c.getPinnedConversationIDsKey(userID))
}
return cache
}

func (c *ConversationRedisCache) DelConversationVersionUserIDs(userIDs ...string) cache.ConversationCache {
cache := c.CloneConversationCache()
for _, userID := range userIDs {
Expand Down
32 changes: 29 additions & 3 deletions pkg/common/storage/controller/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type ConversationDatabase interface {
GetOwnerConversation(ctx context.Context, ownerUserID string, pagination pagination.Pagination) (int64, []*relationtb.Conversation, error)
// GetNotNotifyConversationIDs gets not notify conversationIDs by userID
GetNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error)
// GetPinnedConversationIDs gets pinned conversationIDs by userID
GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error)
}

func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase {
Expand Down Expand Up @@ -112,6 +114,9 @@ func (c *conversationDatabase) SetUsersConversationFieldTx(ctx context.Context,
cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID)
cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...)
}
if _, ok := fieldMap["is_pinned"]; ok {
cache = cache.DelConversationPinnedMessageUserIDs(userIDs...)
}
cache = cache.DelConversationVersionUserIDs(haveUserIDs...)
}
NotUserIDs := stringutil.DifferenceString(haveUserIDs, userIDs)
Expand Down Expand Up @@ -149,6 +154,9 @@ func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context,
cache = cache.DelConversationNotReceiveMessageUserIDs(conversationID)
cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...)
}
if _, ok := args["is_pinned"]; ok {
cache = cache.DelConversationPinnedMessageUserIDs(userIDs...)
}
return cache.ChainExecDel(ctx)
}

Expand All @@ -159,6 +167,7 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat
var (
userIDs []string
notNotifyUserIDs []string
pinnedUserIDs []string
)

cache := c.cache.CloneConversationCache()
Expand All @@ -169,9 +178,16 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat
if conversation.RecvMsgOpt == constant.ReceiveNotNotifyMessage {
notNotifyUserIDs = append(notNotifyUserIDs, conversation.OwnerUserID)
}
if conversation.IsPinned == true {
pinnedUserIDs = append(pinnedUserIDs, conversation.OwnerUserID)
}
}
return cache.DelConversationIDs(userIDs...).DelUserConversationIDsHash(userIDs...).DelConversationVersionUserIDs(userIDs...).
DelConversationNotNotifyMessageUserIDs(notNotifyUserIDs...).ChainExecDel(ctx)
return cache.DelConversationIDs(userIDs...).
DelUserConversationIDsHash(userIDs...).
DelConversationVersionUserIDs(userIDs...).
DelConversationNotNotifyMessageUserIDs(notNotifyUserIDs...).
DelConversationPinnedMessageUserIDs(pinnedUserIDs...).
ChainExecDel(ctx)
}

func (c *conversationDatabase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversations []*relationtb.Conversation) error {
Expand Down Expand Up @@ -224,7 +240,9 @@ func (c *conversationDatabase) GetUserAllConversation(ctx context.Context, owner
func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationtb.Conversation) error {
return c.tx.Transaction(ctx, func(ctx context.Context) error {
cache := c.cache.CloneConversationCache()
cache = cache.DelConversationVersionUserIDs(ownerUserID).DelConversationNotNotifyMessageUserIDs(ownerUserID)
cache = cache.DelConversationVersionUserIDs(ownerUserID).
DelConversationNotNotifyMessageUserIDs(ownerUserID).
DelConversationPinnedMessageUserIDs(ownerUserID)

groupIDs := datautil.Distinct(datautil.Filter(conversations, func(e *relationtb.Conversation) (string, bool) {
return e.GroupID, e.GroupID != ""
Expand Down Expand Up @@ -374,3 +392,11 @@ func (c *conversationDatabase) GetNotNotifyConversationIDs(ctx context.Context,
}
return conversationIDs, nil
}

func (c *conversationDatabase) GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error) {
conversationIDs, err := c.cache.GetPinnedConversationIDs(ctx, userID)
if err != nil {
return nil, err
}
return conversationIDs, nil
}
1 change: 1 addition & 0 deletions pkg/common/storage/database/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Conversation interface {
FindUserID(ctx context.Context, userIDs []string, conversationIDs []string) ([]string, error)
FindUserIDAllConversationID(ctx context.Context, userID string) ([]string, error)
FindUserIDAllNotNotifyConversationID(ctx context.Context, userID string) ([]string, error)
FindUserIDAllPinnedConversationID(ctx context.Context, userID string) ([]string, error)
Take(ctx context.Context, userID, conversationID string) (conversation *model.Conversation, err error)
FindConversationID(ctx context.Context, userID string, conversationIDs []string) (existConversationID []string, err error)
FindUserIDAllConversations(ctx context.Context, userID string) (conversations []*model.Conversation, err error)
Expand Down
7 changes: 7 additions & 0 deletions pkg/common/storage/database/mgo/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@ func (c *ConversationMgo) FindUserIDAllNotNotifyConversationID(ctx context.Conte
}, options.Find().SetProjection(bson.M{"_id": 0, "conversation_id": 1}))
}

func (c *ConversationMgo) FindUserIDAllPinnedConversationID(ctx context.Context, userID string) ([]string, error) {
return mongoutil.Find[string](ctx, c.coll, bson.M{
"owner_user_id": userID,
"is_pinned": true,
}, options.Find().SetProjection(bson.M{"_id": 0, "conversation_id": 1}))
}

func (c *ConversationMgo) Take(ctx context.Context, userID, conversationID string) (conversation *model.Conversation, err error) {
return mongoutil.FindOne[*model.Conversation](ctx, c.coll, bson.M{"owner_user_id": userID, "conversation_id": conversationID})
}
Expand Down

0 comments on commit 7da87e1

Please sign in to comment.