Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement offline push using kafka #2600

Merged
merged 47 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
6fa9d93
refactor: refactor workflows contents.
mo3et Aug 12, 2024
c8dec9f
add tool workflows.
mo3et Aug 12, 2024
5511ad3
update field.
mo3et Aug 12, 2024
c50bce3
fix: remove chat error.
mo3et Aug 12, 2024
5471d1c
Fix err.
mo3et Aug 12, 2024
3101f14
fix error.
mo3et Aug 12, 2024
72aca1e
remove cn comment.
mo3et Aug 12, 2024
002cdde
update workflows files.
mo3et Aug 12, 2024
a281442
update infra config.
mo3et Aug 12, 2024
fa54e57
move workflows.
mo3et Aug 12, 2024
f100cab
feat: update bot.
mo3et Aug 12, 2024
e5435cd
Merge branch 'openimsdk:main' into main
mo3et Aug 12, 2024
4ef9200
fix: solve uncorrect outdated msg get.
mo3et Aug 13, 2024
9b46304
Merge branch 'main' of github.com:mo3et/open-im-server
mo3et Aug 13, 2024
bc47a6a
update get docIDs logic.
mo3et Aug 13, 2024
7c482c9
update
mo3et Aug 13, 2024
d467381
update skip logic.
mo3et Aug 13, 2024
a2bbb6a
fix
mo3et Aug 13, 2024
850b7b2
update.
mo3et Aug 13, 2024
e8789bd
Merge branch 'main' of github.com:openimsdk/open-im-server
mo3et Aug 14, 2024
4539151
Merge branch 'main' of github.com:openimsdk/open-im-server
mo3et Aug 19, 2024
8c1c1b6
Merge branch 'main' of github.com:openimsdk/open-im-server
mo3et Aug 20, 2024
d4a4d67
Merge branch 'main' of github.com:openimsdk/open-im-server
mo3et Aug 21, 2024
d63c5b7
Merge branch 'main' of github.com:openimsdk/open-im-server
mo3et Aug 21, 2024
fcb210d
Merge branch 'main' of github.com:openimsdk/open-im-server
mo3et Aug 29, 2024
6bc5264
fix: delay deleteObject func.
mo3et Aug 29, 2024
0e90f78
remove unused content.
mo3et Aug 29, 2024
de99505
update log type.
mo3et Sep 3, 2024
c4ee8c5
Merge branch 'main' of github.com:openimsdk/open-im-server
mo3et Sep 3, 2024
d44e11f
Merge branch 'main' of github.com:openimsdk/open-im-server
mo3et Sep 4, 2024
4abca90
feat: implement request batch count limit.
mo3et Sep 4, 2024
86bd7e8
update
mo3et Sep 4, 2024
1d35cb3
update
mo3et Sep 4, 2024
6e841f8
Merge branch 'main' of github.com:openimsdk/open-im-server
mo3et Sep 6, 2024
302a6db
feat: implement offline push.
mo3et Sep 7, 2024
a38bdc7
feat: implement batch Push spilt
mo3et Sep 7, 2024
0a71bef
update go mod
mo3et Sep 7, 2024
d4669d9
feat: implement kafka producer and consumer.
mo3et Sep 9, 2024
b380b0c
update format,
mo3et Sep 9, 2024
700b725
add PushMQ log.
mo3et Sep 9, 2024
af6a624
Merge branch 'main' of github.com:openimsdk/open-im-server into feat/…
mo3et Sep 10, 2024
01b8a87
Merge branch 'main' of github.com:openimsdk/open-im-server into feat/…
mo3et Sep 10, 2024
15e5836
feat: update Handler logic.
mo3et Sep 10, 2024
585266d
update MQ logic.
mo3et Sep 10, 2024
2a95c8c
update
mo3et Sep 10, 2024
c5ea3f5
update
mo3et Sep 10, 2024
ad5c1c8
fix: update OfflinePushConsumerHandler.
mo3et Sep 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config/kafka.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@ toRedisTopic: toRedis
toMongoTopic: toMongo
# Kafka topic for push notifications
toPushTopic: toPush
# Kafka topic for offline push notifications
toOfflinePushTopic: toOfflinePush
# Consumer group ID for Redis topic
toRedisGroupID: redis
# Consumer group ID for MongoDB topic
toMongoGroupID: mongo
# Consumer group ID for push notifications topic
toPushGroupID: push
# Consumer group ID for offline push notifications topic
toOfflinePushGroupID: offlinePush
# TLS (Transport Layer Security) configuration
tls:
# Enable or disable TLS
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.72-alpha.13
github.com/openimsdk/protocol v0.0.72-alpha.17
github.com/openimsdk/tools v0.0.50-alpha.11
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.72-alpha.13 h1:ILpvuxWGrVJMVCPRodOQcrSMFKUBzLahBPb8GkITWSc=
github.com/openimsdk/protocol v0.0.72-alpha.13/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/protocol v0.0.72-alpha.17 h1:kB7eyjJHdkc8lpSlLIHskHzbodxkIG4eaK908iQLVdI=
github.com/openimsdk/protocol v0.0.72-alpha.17/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.50-alpha.11 h1:ClhkRjUVJWbmOiQ14G6do/ES1a6ZueDITv40Apwq/Tc=
github.com/openimsdk/tools v0.0.50-alpha.11/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
Expand Down
1 change: 1 addition & 0 deletions internal/msgtransfer/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func Start(ctx context.Context, index int, config *Config) error {
if err != nil {
return err
}

msgTransfer := &MsgTransfer{
historyCH: historyCH,
historyMongoCH: historyMongoCH,
Expand Down
13 changes: 12 additions & 1 deletion internal/push/offlinepush/getui/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ import (
"context"
"crypto/sha256"
"encoding/hex"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"
"strconv"
"sync"
"time"

"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"

"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/tools/errs"
Expand Down Expand Up @@ -91,6 +92,16 @@ func (g *Client) Push(ctx context.Context, userIDs []string, title, content stri
for i, v := range s.GetSplitResult() {
go func(index int, userIDs []string) {
defer wg.Done()
for i := 0; i < len(userIDs); i += maxNum {
end := i + maxNum
if end > len(userIDs) {
end = len(userIDs)
}
if err = g.batchPush(ctx, token, userIDs[i:end], pushReq); err != nil {
log.ZError(ctx, "batchPush failed", err, "index", index, "token", token, "req", pushReq)
}

}
if err = g.batchPush(ctx, token, userIDs, pushReq); err != nil {
log.ZError(ctx, "batchPush failed", err, "index", index, "token", token, "req", pushReq)
}
Expand Down
122 changes: 122 additions & 0 deletions internal/push/offlinepush_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package push

import (
"context"

"github.com/IBM/sarama"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/protocol/constant"
pbpush "github.com/openimsdk/protocol/push"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mq/kafka"
"github.com/openimsdk/tools/utils/jsonutil"
"google.golang.org/protobuf/proto"
)

type OfflinePushConsumerHandler struct {
OfflinePushConsumerGroup *kafka.MConsumerGroup
offlinePusher offlinepush.OfflinePusher
}

func NewOfflinePushConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher) (*OfflinePushConsumerHandler, error) {
var offlinePushConsumerHandler OfflinePushConsumerHandler
var err error
offlinePushConsumerHandler.offlinePusher = offlinePusher
offlinePushConsumerHandler.OfflinePushConsumerGroup, err = kafka.NewMConsumerGroup(config.KafkaConfig.Build(), config.KafkaConfig.ToOfflineGroupID,
[]string{config.KafkaConfig.ToOfflinePushTopic}, true)
if err != nil {
return nil, err
}
return &offlinePushConsumerHandler, nil
}

func (*OfflinePushConsumerHandler) Setup(sarama.ConsumerGroupSession) error { return nil }
func (*OfflinePushConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }
func (o *OfflinePushConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
ctx := o.OfflinePushConsumerGroup.GetContextFromMsg(msg)
o.handleMsg2OfflinePush(ctx, msg.Value)
sess.MarkMessage(msg, "")
}
return nil
}

func (o *OfflinePushConsumerHandler) handleMsg2OfflinePush(ctx context.Context, msg []byte) {
offlinePushMsg := pbpush.PushMsgReq{}
if err := proto.Unmarshal(msg, &offlinePushMsg); err != nil {
log.ZError(ctx, "offline push Unmarshal msg err", err, "msg", string(msg))
return
}
if offlinePushMsg.MsgData == nil || offlinePushMsg.UserIDs == nil {
log.ZError(ctx, "offline push msg is empty", errs.New("offlinePushMsg is empty"), "userIDs", offlinePushMsg.UserIDs, "msg", offlinePushMsg.MsgData)
return
}
log.ZInfo(ctx, "receive to OfflinePush MQ", "userIDs", offlinePushMsg.UserIDs, "msg", offlinePushMsg.MsgData)

err := o.offlinePushMsg(ctx, offlinePushMsg.MsgData, offlinePushMsg.UserIDs)
if err != nil {
log.ZWarn(ctx, "offline push failed", err, "msg", offlinePushMsg.String())
}
}

func (c *OfflinePushConsumerHandler) getOfflinePushInfos(msg *sdkws.MsgData) (title, content string, opts *options.Opts, err error) {
type AtTextElem struct {
Text string `json:"text,omitempty"`
AtUserList []string `json:"atUserList,omitempty"`
IsAtSelf bool `json:"isAtSelf"`
}

opts = &options.Opts{Signal: &options.Signal{}}
if msg.OfflinePushInfo != nil {
opts.IOSBadgeCount = msg.OfflinePushInfo.IOSBadgeCount
opts.IOSPushSound = msg.OfflinePushInfo.IOSPushSound
opts.Ex = msg.OfflinePushInfo.Ex
}

if msg.OfflinePushInfo != nil {
title = msg.OfflinePushInfo.Title
content = msg.OfflinePushInfo.Desc
}
if title == "" {
switch msg.ContentType {
case constant.Text:
fallthrough
case constant.Picture:
fallthrough
case constant.Voice:
fallthrough
case constant.Video:
fallthrough
case constant.File:
title = constant.ContentType2PushContent[int64(msg.ContentType)]
case constant.AtText:
ac := AtTextElem{}
_ = jsonutil.JsonStringToStruct(string(msg.Content), &ac)
case constant.SignalingNotification:
title = constant.ContentType2PushContent[constant.SignalMsg]
default:
title = constant.ContentType2PushContent[constant.Common]
}
}
if content == "" {
content = title
}
return
}

func (c *OfflinePushConsumerHandler) offlinePushMsg(ctx context.Context, msg *sdkws.MsgData, offlinePushUserIDs []string) error {
title, content, opts, err := c.getOfflinePushInfos(msg)
if err != nil {
return err
}
err = c.offlinePusher.Push(ctx, offlinePushUserIDs, title, content, opts)
if err != nil {
prommetrics.MsgOfflinePushFailedCounter.Inc()
return err
}
return nil
}
19 changes: 16 additions & 3 deletions internal/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package push

import (
"context"

"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
Expand All @@ -17,12 +18,12 @@ type pushServer struct {
disCov discovery.SvcDiscoveryRegistry
offlinePusher offlinepush.OfflinePusher
pushCh *ConsumerHandler
offlinePushCh *OfflinePushConsumerHandler
}

type Config struct {
RpcConfig config.Push
RedisConfig config.Redis
MongodbConfig config.Mongo
KafkaConfig config.Kafka
NotificationConfig config.Notification
Share config.Share
Expand Down Expand Up @@ -55,18 +56,30 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
if err != nil {
return err
}
database := controller.NewPushDatabase(cacheModel)

consumer, err := NewConsumerHandler(config, offlinePusher, rdb, client)
database := controller.NewPushDatabase(cacheModel, &config.KafkaConfig)

consumer, err := NewConsumerHandler(config, database, offlinePusher, rdb, client)
if err != nil {
return err
}

offlinePushConsumer, err := NewOfflinePushConsumerHandler(config, offlinePusher)
if err != nil {
return err
}

pbpush.RegisterPushMsgServiceServer(server, &pushServer{
database: database,
disCov: client,
offlinePusher: offlinePusher,
pushCh: consumer,
offlinePushCh: offlinePushConsumer,
})

go consumer.pushConsumerGroup.RegisterHandleAndConsumer(ctx, consumer)

go offlinePushConsumer.OfflinePushConsumerGroup.RegisterHandleAndConsumer(ctx, offlinePushConsumer)

return nil
}
Loading
Loading