diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index dbceb6dbec..a1c8b26b6c 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -18,19 +18,20 @@ import ( "context" "errors" "fmt" + "net/http" + "os" + "os/signal" + "syscall" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/tools/utils/datautil" - "net/http" - "os" - "os/signal" - "syscall" "github.com/openimsdk/open-im-server/v3/pkg/common/config" - kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" + discRegister "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/tools/errs" @@ -65,6 +66,7 @@ type Config struct { func Start(ctx context.Context, index int, config *Config) error { log.CInfo(ctx, "MSG-TRANSFER server is initializing", "prometheusPorts", config.MsgTransfer.Prometheus.Ports, "index", index) + mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) if err != nil { return err @@ -73,12 +75,13 @@ func Start(ctx context.Context, index int, config *Config) error { if err != nil { return err } - client, err := kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share) + client, err := discRegister.NewDiscoveryRegister(&config.Discovery, &config.Share) if err != nil { return err } client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) + msgModel := redis.NewMsgCache(rdb) msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB()) if err != nil { @@ -94,17 +97,17 @@ func Start(ctx context.Context, index int, config *Config) error { return err } seqUserCache := redis.NewSeqUserCacheRedis(rdb, seqUser) - msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqUserCache, seqConversationCache, &config.KafkaConfig) + msgTransferDatabase, err := controller.NewMsgTransferDatabase(msgDocModel, msgModel, seqUserCache, seqConversationCache, &config.KafkaConfig) if err != nil { return err } conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation) groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group) - historyCH, err := NewOnlineHistoryRedisConsumerHandler(&config.KafkaConfig, msgDatabase, &conversationRpcClient, &groupRpcClient) + historyCH, err := NewOnlineHistoryRedisConsumerHandler(&config.KafkaConfig, msgTransferDatabase, &conversationRpcClient, &groupRpcClient) if err != nil { return err } - historyMongoCH, err := NewOnlineHistoryMongoConsumerHandler(&config.KafkaConfig, msgDatabase) + historyMongoCH, err := NewOnlineHistoryMongoConsumerHandler(&config.KafkaConfig, msgTransferDatabase) if err != nil { return err } diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 6de07cfbce..6b924b05b9 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -18,6 +18,10 @@ import ( "context" "encoding/json" "errors" + "strconv" + "strings" + "time" + "github.com/IBM/sarama" "github.com/go-redis/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -33,9 +37,6 @@ import ( "github.com/openimsdk/tools/mq/kafka" "github.com/openimsdk/tools/utils/stringutil" "google.golang.org/protobuf/proto" - "strconv" - "strings" - "time" ) const ( @@ -56,19 +57,19 @@ type OnlineHistoryRedisConsumerHandler struct { redisMessageBatches *batcher.Batcher[sarama.ConsumerMessage] - msgDatabase controller.CommonMsgDatabase + msgTransferDatabase controller.MsgTransferDatabase conversationRpcClient *rpcclient.ConversationRpcClient groupRpcClient *rpcclient.GroupRpcClient } -func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase, +func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.MsgTransferDatabase, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*OnlineHistoryRedisConsumerHandler, error) { historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToRedisGroupID, []string{kafkaConf.ToRedisTopic}, false) if err != nil { return nil, err } var och OnlineHistoryRedisConsumerHandler - och.msgDatabase = database + och.msgTransferDatabase = database b := batcher.New[sarama.ConsumerMessage]( batcher.WithSize(size), @@ -161,7 +162,7 @@ func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, return } for key, seq := range readSeq { - if err := och.msgDatabase.SetHasReadSeqToDB(ctx, key.userID, key.conversationID, seq); err != nil { + if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, key.userID, key.conversationID, seq); err != nil { log.ZError(ctx, "set read seq to db error", err, "userID", key.userID, "conversationID", key.conversationID, "seq", seq) } } @@ -248,7 +249,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key } if len(storageMessageList) > 0 { msg := storageMessageList[0] - lastSeq, isNewConversation, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList) + lastSeq, isNewConversation, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList) if err != nil && !errors.Is(errs.Unwrap(err), redis.Nil) { log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageMessageList) return @@ -282,7 +283,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key } log.ZDebug(ctx, "success incr to next topic") - err = och.msgDatabase.MsgToMongoMQ(ctx, key, conversationID, storageMessageList, lastSeq) + err = och.msgTransferDatabase.MsgToMongoMQ(ctx, key, conversationID, storageMessageList, lastSeq) if err != nil { log.ZError(ctx, "Msg To MongoDB MQ error", err, "conversationID", conversationID, "storageList", storageMessageList, "lastSeq", lastSeq) @@ -299,14 +300,14 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con storageMessageList = append(storageMessageList, msg.message) } if len(storageMessageList) > 0 { - lastSeq, _, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList) + lastSeq, _, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList) if err != nil { log.ZError(ctx, "notification batch insert to redis error", err, "conversationID", conversationID, "storageList", storageMessageList) return } log.ZDebug(ctx, "success to next topic", "conversationID", conversationID) - err = och.msgDatabase.MsgToMongoMQ(ctx, key, conversationID, storageMessageList, lastSeq) + err = och.msgTransferDatabase.MsgToMongoMQ(ctx, key, conversationID, storageMessageList, lastSeq) if err != nil { log.ZError(ctx, "Msg To MongoDB MQ error", err, "conversationID", conversationID, "storageList", storageMessageList, "lastSeq", lastSeq) @@ -318,7 +319,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, key, conversationID string, msgs []*ContextMsg) { for _, v := range msgs { log.ZDebug(ctx, "push msg to topic", "msg", v.message.String()) - och.msgDatabase.MsgToPushMQ(v.ctx, key, conversationID, v.message) + och.msgTransferDatabase.MsgToPushMQ(v.ctx, key, conversationID, v.message) } } diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index cea47fcd50..ef6f6ac7d9 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -16,6 +16,7 @@ package msgtransfer import ( "context" + "github.com/IBM/sarama" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" @@ -28,10 +29,10 @@ import ( type OnlineHistoryMongoConsumerHandler struct { historyConsumerGroup *kafka.MConsumerGroup - msgDatabase controller.CommonMsgDatabase + msgTransferDatabase controller.MsgTransferDatabase } -func NewOnlineHistoryMongoConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase) (*OnlineHistoryMongoConsumerHandler, error) { +func NewOnlineHistoryMongoConsumerHandler(kafkaConf *config.Kafka, database controller.MsgTransferDatabase) (*OnlineHistoryMongoConsumerHandler, error) { historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToMongoGroupID, []string{kafkaConf.ToMongoTopic}, true) if err != nil { return nil, err @@ -39,7 +40,7 @@ func NewOnlineHistoryMongoConsumerHandler(kafkaConf *config.Kafka, database cont mc := &OnlineHistoryMongoConsumerHandler{ historyConsumerGroup: historyConsumerGroup, - msgDatabase: database, + msgTransferDatabase: database, } return mc, nil } @@ -57,7 +58,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont return } log.ZInfo(ctx, "mongo consumer recv msg", "msgs", msgFromMQ.String()) - err = mc.msgDatabase.BatchInsertChat2DB(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData, msgFromMQ.LastSeq) + err = mc.msgTransferDatabase.BatchInsertChat2DB(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData, msgFromMQ.LastSeq) if err != nil { log.ZError( ctx, @@ -76,7 +77,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont for _, msg := range msgFromMQ.MsgData { seqs = append(seqs, msg.Seq) } - err = mc.msgDatabase.DeleteMessagesFromCache(ctx, msgFromMQ.ConversationID, seqs) + err = mc.msgTransferDatabase.DeleteMessagesFromCache(ctx, msgFromMQ.ConversationID, seqs) if err != nil { log.ZError( ctx, diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go index b08a939cb0..fdd06d3ff3 100644 --- a/pkg/common/storage/controller/msg.go +++ b/pkg/common/storage/controller/msg.go @@ -26,7 +26,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/convert" - "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/protocol/constant" pbmsg "github.com/openimsdk/protocol/msg" @@ -47,16 +46,10 @@ const ( // CommonMsgDatabase defines the interface for message database operations. type CommonMsgDatabase interface { - // BatchInsertChat2DB inserts a batch of messages into the database for a specific conversation. - BatchInsertChat2DB(ctx context.Context, conversationID string, msgs []*sdkws.MsgData, currentMaxSeq int64) error // RevokeMsg revokes a message in a conversation. RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *model.RevokeModel) error // MarkSingleChatMsgsAsRead marks messages as read for a single chat by sequence numbers. MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, seqs []int64) error - // DeleteMessagesFromCache deletes message caches from Redis by sequence numbers. - DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error - // BatchInsertChat2Cache increments the sequence number and then batch inserts messages into the cache. - BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNewConversation bool, err error) // GetMsgBySeqsRange retrieves messages from MongoDB by a range of sequence numbers. GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num, userMaxSeq int64) (minSeq int64, maxSeq int64, seqMsg []*sdkws.MsgData, err error) // GetMsgBySeqs retrieves messages for large groups from MongoDB by sequence numbers. @@ -77,7 +70,6 @@ type CommonMsgDatabase interface { SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error - SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error @@ -91,8 +83,6 @@ type CommonMsgDatabase interface { // to mq MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error - MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) - MsgToMongoMQ(ctx context.Context, key, conversationID string, msgs []*sdkws.MsgData, lastSeq int64) error RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error) RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error) @@ -114,23 +104,12 @@ func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser if err != nil { return nil, err } - producerToMongo, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToMongoTopic) - if err != nil { - return nil, err - } - producerToPush, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToPushTopic) - if err != nil { - return nil, err - } - return &commonMsgDatabase{ msgDocDatabase: msgDocModel, msg: msg, seqUser: seqUser, seqConversation: seqConversation, producer: producerToRedis, - producerToMongo: producerToMongo, - producerToPush: producerToPush, }, nil } @@ -141,8 +120,6 @@ type commonMsgDatabase struct { seqConversation cache.SeqConversationCache seqUser cache.SeqUser producer *kafka.Producer - producerToMongo *kafka.Producer - producerToPush *kafka.Producer } func (db *commonMsgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error { @@ -150,23 +127,6 @@ func (db *commonMsgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sd return err } -func (db *commonMsgDatabase) MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) { - partition, offset, err := db.producerToPush.SendMessage(ctx, key, &pbmsg.PushMsgDataToMQ{MsgData: msg2mq, ConversationID: conversationID}) - if err != nil { - log.ZError(ctx, "MsgToPushMQ", err, "key", key, "msg2mq", msg2mq) - return 0, 0, err - } - return partition, offset, nil -} - -func (db *commonMsgDatabase) MsgToMongoMQ(ctx context.Context, key, conversationID string, messages []*sdkws.MsgData, lastSeq int64) error { - if len(messages) > 0 { - _, _, err := db.producerToMongo.SendMessage(ctx, key, &pbmsg.MsgDataToMongoByMQ{LastSeq: lastSeq, ConversationID: conversationID, MsgData: messages}) - return err - } - return nil -} - func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, firstSeq int64) error { if len(fields) == 0 { return nil @@ -268,52 +228,6 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI return nil } -func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error { - if len(msgList) == 0 { - return errs.ErrArgs.WrapMsg("msgList is empty") - } - msgs := make([]any, len(msgList)) - for i, msg := range msgList { - if msg == nil { - continue - } - var offlinePushModel *model.OfflinePushModel - if msg.OfflinePushInfo != nil { - offlinePushModel = &model.OfflinePushModel{ - Title: msg.OfflinePushInfo.Title, - Desc: msg.OfflinePushInfo.Desc, - Ex: msg.OfflinePushInfo.Ex, - IOSPushSound: msg.OfflinePushInfo.IOSPushSound, - IOSBadgeCount: msg.OfflinePushInfo.IOSBadgeCount, - } - } - msgs[i] = &model.MsgDataModel{ - SendID: msg.SendID, - RecvID: msg.RecvID, - GroupID: msg.GroupID, - ClientMsgID: msg.ClientMsgID, - ServerMsgID: msg.ServerMsgID, - SenderPlatformID: msg.SenderPlatformID, - SenderNickname: msg.SenderNickname, - SenderFaceURL: msg.SenderFaceURL, - SessionType: msg.SessionType, - MsgFrom: msg.MsgFrom, - ContentType: msg.ContentType, - Content: string(msg.Content), - Seq: msg.Seq, - SendTime: msg.SendTime, - CreateTime: msg.CreateTime, - Status: msg.Status, - Options: msg.Options, - OfflinePush: offlinePushModel, - AtUserIDList: msg.AtUserIDList, - AttachedInfo: msg.AttachedInfo, - Ex: msg.Ex, - } - } - return db.BatchInsertBlock(ctx, conversationID, msgs, updateKeyMsg, msgList[0].Seq) -} - func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *model.RevokeModel) error { return db.BatchInsertBlock(ctx, conversationID, []any{revoke}, updateKeyRevoke, seq) } @@ -333,56 +247,6 @@ func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(ctx context.Context, userI return nil } -func (db *commonMsgDatabase) DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error { - return db.msg.DeleteMessagesFromCache(ctx, conversationID, seqs) -} - -func (db *commonMsgDatabase) setHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error { - for userID, seq := range userSeqMap { - if err := db.seqUser.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil { - return err - } - } - return nil -} - -func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) { - lenList := len(msgs) - if int64(lenList) > db.msgTable.GetSingleGocMsgNum() { - return 0, false, errs.New("message count exceeds limit", "limit", db.msgTable.GetSingleGocMsgNum()).Wrap() - } - if lenList < 1 { - return 0, false, errs.New("no messages to insert", "minCount", 1).Wrap() - } - currentMaxSeq, err := db.seqConversation.Malloc(ctx, conversationID, int64(len(msgs))) - if err != nil { - log.ZError(ctx, "storage.seq.Malloc", err) - return 0, false, err - } - isNew = currentMaxSeq == 0 - lastMaxSeq := currentMaxSeq - userSeqMap := make(map[string]int64) - for _, m := range msgs { - currentMaxSeq++ - m.Seq = currentMaxSeq - userSeqMap[m.SendID] = m.Seq - } - - failedNum, err := db.msg.SetMessagesToCache(ctx, conversationID, msgs) - if err != nil { - prommetrics.MsgInsertRedisFailedCounter.Add(float64(failedNum)) - log.ZError(ctx, "setMessageToCache error", err, "len", len(msgs), "conversationID", conversationID) - } else { - prommetrics.MsgInsertRedisSuccessCounter.Inc() - } - err = db.setHasReadSeqs(ctx, conversationID, userSeqMap) - if err != nil { - log.ZError(ctx, "SetHasReadSeqs error", err, "userSeqMap", userSeqMap, "conversationID", conversationID) - prommetrics.SeqSetFailedCounter.Inc() - } - return lastMaxSeq, isNew, errs.Wrap(err) -} - func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) { for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, seqs) { // log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs) @@ -810,10 +674,6 @@ func (db *commonMsgDatabase) SetHasReadSeq(ctx context.Context, userID string, c return db.seqUser.SetUserReadSeq(ctx, conversationID, userID, hasReadSeq) } -func (db *commonMsgDatabase) SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error { - return db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, hasReadSeq) -} - func (db *commonMsgDatabase) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) { return db.seqUser.GetUserReadSeqs(ctx, userID, conversationIDs) } diff --git a/pkg/common/storage/controller/msg_transfer.go b/pkg/common/storage/controller/msg_transfer.go new file mode 100644 index 0000000000..5e540a2c33 --- /dev/null +++ b/pkg/common/storage/controller/msg_transfer.go @@ -0,0 +1,286 @@ +package controller + +import ( + "context" + + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + pbmsg "github.com/openimsdk/protocol/msg" + "github.com/openimsdk/protocol/sdkws" + "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/mq/kafka" + "go.mongodb.org/mongo-driver/mongo" +) + +type MsgTransferDatabase interface { + // BatchInsertChat2DB inserts a batch of messages into the database for a specific conversation. + BatchInsertChat2DB(ctx context.Context, conversationID string, msgs []*sdkws.MsgData, currentMaxSeq int64) error + // DeleteMessagesFromCache deletes message caches from Redis by sequence numbers. + DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error + + // BatchInsertChat2Cache increments the sequence number and then batch inserts messages into the cache. + BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNewConversation bool, err error) + SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error + + // to mq + MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) + MsgToMongoMQ(ctx context.Context, key, conversationID string, msgs []*sdkws.MsgData, lastSeq int64) error +} + +func NewMsgTransferDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (MsgTransferDatabase, error) { + conf, err := kafka.BuildProducerConfig(*kafkaConf.Build()) + if err != nil { + return nil, err + } + producerToMongo, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToMongoTopic) + if err != nil { + return nil, err + } + producerToPush, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToPushTopic) + if err != nil { + return nil, err + } + return &msgTransferDatabase{ + msgDocDatabase: msgDocModel, + msg: msg, + seqUser: seqUser, + seqConversation: seqConversation, + producerToMongo: producerToMongo, + producerToPush: producerToPush, + }, nil +} + +type msgTransferDatabase struct { + msgDocDatabase database.Msg + msgTable model.MsgDocModel + msg cache.MsgCache + seqConversation cache.SeqConversationCache + seqUser cache.SeqUser + producerToMongo *kafka.Producer + producerToPush *kafka.Producer +} + +func (db *msgTransferDatabase) BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error { + if len(msgList) == 0 { + return errs.ErrArgs.WrapMsg("msgList is empty") + } + msgs := make([]any, len(msgList)) + for i, msg := range msgList { + if msg == nil { + continue + } + var offlinePushModel *model.OfflinePushModel + if msg.OfflinePushInfo != nil { + offlinePushModel = &model.OfflinePushModel{ + Title: msg.OfflinePushInfo.Title, + Desc: msg.OfflinePushInfo.Desc, + Ex: msg.OfflinePushInfo.Ex, + IOSPushSound: msg.OfflinePushInfo.IOSPushSound, + IOSBadgeCount: msg.OfflinePushInfo.IOSBadgeCount, + } + } + msgs[i] = &model.MsgDataModel{ + SendID: msg.SendID, + RecvID: msg.RecvID, + GroupID: msg.GroupID, + ClientMsgID: msg.ClientMsgID, + ServerMsgID: msg.ServerMsgID, + SenderPlatformID: msg.SenderPlatformID, + SenderNickname: msg.SenderNickname, + SenderFaceURL: msg.SenderFaceURL, + SessionType: msg.SessionType, + MsgFrom: msg.MsgFrom, + ContentType: msg.ContentType, + Content: string(msg.Content), + Seq: msg.Seq, + SendTime: msg.SendTime, + CreateTime: msg.CreateTime, + Status: msg.Status, + Options: msg.Options, + OfflinePush: offlinePushModel, + AtUserIDList: msg.AtUserIDList, + AttachedInfo: msg.AttachedInfo, + Ex: msg.Ex, + } + } + return db.BatchInsertBlock(ctx, conversationID, msgs, updateKeyMsg, msgList[0].Seq) +} + +func (db *msgTransferDatabase) BatchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, firstSeq int64) error { + if len(fields) == 0 { + return nil + } + num := db.msgTable.GetSingleGocMsgNum() + // num = 100 + for i, field := range fields { // Check the type of the field + var ok bool + switch key { + case updateKeyMsg: + var msg *model.MsgDataModel + msg, ok = field.(*model.MsgDataModel) + if msg != nil && msg.Seq != firstSeq+int64(i) { + return errs.ErrInternalServer.WrapMsg("seq is invalid") + } + case updateKeyRevoke: + _, ok = field.(*model.RevokeModel) + default: + return errs.ErrInternalServer.WrapMsg("key is invalid") + } + if !ok { + return errs.ErrInternalServer.WrapMsg("field type is invalid") + } + } + // Returns true if the document exists in the database, false if the document does not exist in the database + updateMsgModel := func(seq int64, i int) (bool, error) { + var ( + res *mongo.UpdateResult + err error + ) + docID := db.msgTable.GetDocID(conversationID, seq) + index := db.msgTable.GetMsgIndex(seq) + field := fields[i] + switch key { + case updateKeyMsg: + res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "msg", field) + case updateKeyRevoke: + res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "revoke", field) + } + if err != nil { + return false, err + } + return res.MatchedCount > 0, nil + } + tryUpdate := true + for i := 0; i < len(fields); i++ { + seq := firstSeq + int64(i) // Current sequence number + if tryUpdate { + matched, err := updateMsgModel(seq, i) + if err != nil { + return err + } + if matched { + continue // The current data has been updated, skip the current data + } + } + doc := model.MsgDocModel{ + DocID: db.msgTable.GetDocID(conversationID, seq), + Msg: make([]*model.MsgInfoModel, num), + } + var insert int // Inserted data number + for j := i; j < len(fields); j++ { + seq = firstSeq + int64(j) + if db.msgTable.GetDocID(conversationID, seq) != doc.DocID { + break + } + insert++ + switch key { + case updateKeyMsg: + doc.Msg[db.msgTable.GetMsgIndex(seq)] = &model.MsgInfoModel{ + Msg: fields[j].(*model.MsgDataModel), + } + case updateKeyRevoke: + doc.Msg[db.msgTable.GetMsgIndex(seq)] = &model.MsgInfoModel{ + Revoke: fields[j].(*model.RevokeModel), + } + } + } + for i, msgInfo := range doc.Msg { + if msgInfo == nil { + msgInfo = &model.MsgInfoModel{} + doc.Msg[i] = msgInfo + } + if msgInfo.DelList == nil { + doc.Msg[i].DelList = []string{} + } + } + if err := db.msgDocDatabase.Create(ctx, &doc); err != nil { + if mongo.IsDuplicateKeyError(err) { + i-- // already inserted + tryUpdate = true // next block use update mode + continue + } + return err + } + tryUpdate = false // The current block is inserted successfully, and the next block is inserted preferentially + i += insert - 1 // Skip the inserted data + } + return nil +} + +func (db *msgTransferDatabase) DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error { + return db.msg.DeleteMessagesFromCache(ctx, conversationID, seqs) +} + +func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) { + lenList := len(msgs) + if int64(lenList) > db.msgTable.GetSingleGocMsgNum() { + return 0, false, errs.New("message count exceeds limit", "limit", db.msgTable.GetSingleGocMsgNum()).Wrap() + } + if lenList < 1 { + return 0, false, errs.New("no messages to insert", "minCount", 1).Wrap() + } + currentMaxSeq, err := db.seqConversation.Malloc(ctx, conversationID, int64(len(msgs))) + if err != nil { + log.ZError(ctx, "storage.seq.Malloc", err) + return 0, false, err + } + isNew = currentMaxSeq == 0 + lastMaxSeq := currentMaxSeq + userSeqMap := make(map[string]int64) + for _, m := range msgs { + currentMaxSeq++ + m.Seq = currentMaxSeq + userSeqMap[m.SendID] = m.Seq + } + + failedNum, err := db.msg.SetMessagesToCache(ctx, conversationID, msgs) + if err != nil { + prommetrics.MsgInsertRedisFailedCounter.Add(float64(failedNum)) + log.ZError(ctx, "setMessageToCache error", err, "len", len(msgs), "conversationID", conversationID) + } else { + prommetrics.MsgInsertRedisSuccessCounter.Inc() + } + err = db.setHasReadSeqs(ctx, conversationID, userSeqMap) + if err != nil { + log.ZError(ctx, "SetHasReadSeqs error", err, "userSeqMap", userSeqMap, "conversationID", conversationID) + prommetrics.SeqSetFailedCounter.Inc() + } + return lastMaxSeq, isNew, errs.Wrap(err) +} + +func (db *msgTransferDatabase) setHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error { + for userID, seq := range userSeqMap { + if err := db.seqUser.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil { + return err + } + } + return nil +} + +func (db *msgTransferDatabase) SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error { + return db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, hasReadSeq) +} + +func (db *msgTransferDatabase) MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) { + partition, offset, err := db.producerToPush.SendMessage(ctx, key, &pbmsg.PushMsgDataToMQ{MsgData: msg2mq, ConversationID: conversationID}) + if err != nil { + log.ZError(ctx, "MsgToPushMQ", err, "key", key, "msg2mq", msg2mq) + return 0, 0, err + } + return partition, offset, nil +} + +func (db *msgTransferDatabase) MsgToMongoMQ(ctx context.Context, key, conversationID string, messages []*sdkws.MsgData, lastSeq int64) error { + if len(messages) > 0 { + _, _, err := db.producerToMongo.SendMessage(ctx, key, &pbmsg.MsgDataToMongoByMQ{LastSeq: lastSeq, ConversationID: conversationID, MsgData: messages}) + if err != nil { + log.ZError(ctx, "MsgToMongoMQ", err, "key", key, "conversationID", conversationID, "lastSeq", lastSeq) + return err + } + } + return nil +}