diff --git a/eth-node/types/rpc.go b/eth-node/types/rpc.go index 6105f3b016..4af4b967a1 100644 --- a/eth-node/types/rpc.go +++ b/eth-node/types/rpc.go @@ -6,19 +6,20 @@ import ( // NewMessage represents a new whisper message that is posted through the RPC. type NewMessage struct { - SymKeyID string `json:"symKeyID"` - PublicKey []byte `json:"pubKey"` - SigID string `json:"sig"` - TTL uint32 `json:"ttl"` - PubsubTopic string `json:"pubsubTopic"` - Topic TopicType `json:"topic"` - Payload []byte `json:"payload"` - Padding []byte `json:"padding"` - PowTime uint32 `json:"powTime"` - PowTarget float64 `json:"powTarget"` - TargetPeer string `json:"targetPeer"` - Ephemeral bool `json:"ephemeral"` - Priority *int `json:"priority"` + SymKeyID string `json:"symKeyID"` + PublicKey []byte `json:"pubKey"` + SigID string `json:"sig"` + TTL uint32 `json:"ttl"` + PubsubTopic string `json:"pubsubTopic"` + Topic TopicType `json:"topic"` + Payload []byte `json:"payload"` + Padding []byte `json:"padding"` + PowTime uint32 `json:"powTime"` + PowTarget float64 `json:"powTarget"` + TargetPeer string `json:"targetPeer"` + Ephemeral bool `json:"ephemeral"` + Priority *int `json:"priority"` + ContentTopicOverride string `json:"contentTopicOverride"` } // Message is the RPC representation of a whisper message. diff --git a/protocol/common/message_sender.go b/protocol/common/message_sender.go index 20e9ab5446..1046ea0935 100644 --- a/protocol/common/message_sender.go +++ b/protocol/common/message_sender.go @@ -660,11 +660,12 @@ func (s *MessageSender) dispatchCommunityChatMessage(ctx context.Context, rawMes } newMessage := &types.NewMessage{ - TTL: whisperTTL, - Payload: payload, - PowTarget: calculatePoW(payload), - PowTime: whisperPoWTime, - PubsubTopic: rawMessage.PubsubTopic, + TTL: whisperTTL, + Payload: payload, + PowTarget: calculatePoW(payload), + PowTime: whisperPoWTime, + PubsubTopic: rawMessage.PubsubTopic, + ContentTopicOverride: rawMessage.ContentTopicOverrideID, } if rawMessage.BeforeDispatch != nil { diff --git a/protocol/common/raw_message.go b/protocol/common/raw_message.go index 687d3b78a9..38475819de 100644 --- a/protocol/common/raw_message.go +++ b/protocol/common/raw_message.go @@ -66,22 +66,23 @@ type RawMessage struct { // when this is true, the message will not be resent via ResendTypeDataSync, but it's possible to // resend it via ResendTypeRawMessage specified in ResendType // MVDS only supports sending encrypted message. - SkipEncryptionLayer bool - SendPushNotification bool - MessageType protobuf.ApplicationMetadataMessage_Type - Payload []byte - Sender *ecdsa.PrivateKey - Recipients []*ecdsa.PublicKey - SkipGroupMessageWrap bool - SkipApplicationWrap bool - SendOnPersonalTopic bool - CommunityID []byte - CommunityKeyExMsgType CommKeyExMsgType - Ephemeral bool - BeforeDispatch func(*RawMessage) error - HashRatchetGroupID []byte - PubsubTopic string - ResendType ResendType - ResendMethod ResendMethod - Priority *MessagePriority + SkipEncryptionLayer bool + SendPushNotification bool + MessageType protobuf.ApplicationMetadataMessage_Type + Payload []byte + Sender *ecdsa.PrivateKey + Recipients []*ecdsa.PublicKey + SkipGroupMessageWrap bool + SkipApplicationWrap bool + SendOnPersonalTopic bool + CommunityID []byte + CommunityKeyExMsgType CommKeyExMsgType + Ephemeral bool + BeforeDispatch func(*RawMessage) error + HashRatchetGroupID []byte + PubsubTopic string + ResendType ResendType + ResendMethod ResendMethod + Priority *MessagePriority + ContentTopicOverrideID string } diff --git a/protocol/communities_key_distributor.go b/protocol/communities_key_distributor.go index c0c4d125d1..241307f14d 100644 --- a/protocol/communities_key_distributor.go +++ b/protocol/communities_key_distributor.go @@ -88,14 +88,15 @@ func (ckd *CommunitiesKeyDistributorImpl) distributeKey(community *communities.C func (ckd *CommunitiesKeyDistributorImpl) sendKeyExchangeMessage(community *communities.Community, hashRatchetGroupID []byte, pubkeys []*ecdsa.PublicKey, msgType common.CommKeyExMsgType) error { rawMessage := common.RawMessage{ - Sender: community.PrivateKey(), - SkipEncryptionLayer: false, - CommunityID: community.ID(), - CommunityKeyExMsgType: msgType, - Recipients: pubkeys, - MessageType: protobuf.ApplicationMetadataMessage_CHAT_MESSAGE, - HashRatchetGroupID: hashRatchetGroupID, - PubsubTopic: community.PubsubTopic(), // TODO: confirm if it should be sent in community pubsub topic + Sender: community.PrivateKey(), + SkipEncryptionLayer: false, + CommunityID: community.ID(), + CommunityKeyExMsgType: msgType, + Recipients: pubkeys, + MessageType: protobuf.ApplicationMetadataMessage_CHAT_MESSAGE, + HashRatchetGroupID: hashRatchetGroupID, + PubsubTopic: community.PubsubTopic(), // TODO: confirm if it should be sent in community pubsub topic + ContentTopicOverrideID: community.MemberUpdateChannelID(), //TODO: Confirm if this is correct, could not figure out where LocalChatID is set in this flow } _, err := ckd.sender.SendCommunityMessage(context.Background(), &rawMessage) diff --git a/protocol/messenger.go b/protocol/messenger.go index 2e97603262..4cf12b7aaf 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -1888,7 +1888,7 @@ func (m *Messenger) InitFilters() error { } } - filtersToInit = append(filtersToInit, transport.FiltersToInitialize{ChatID: chat.ID, PubsubTopic: community.PubsubTopic()}) + filtersToInit = append(filtersToInit, transport.FiltersToInitialize{ChatID: chat.ID, PubsubTopic: community.PubsubTopic(), ContentTopicOverrideID: community.MemberUpdateChannelID()}) case ChatTypeOneToOne: pk, err := chat.PublicKey() if err != nil { @@ -2196,7 +2196,8 @@ func (m *Messenger) dispatchMessage(ctx context.Context, rawMessage common.RawMe ) return rawMessage, fmt.Errorf("can't post message type '%d' on chat '%s'", rawMessage.MessageType, chat.ID) } - + //setting content-topic over-ride for community messages to use memberUpdatesChannelID + rawMessage.ContentTopicOverrideID = community.MemberUpdateChannelID() logger.Debug("sending community chat message", zap.String("chatName", chat.Name)) isCommunityEncrypted, err := m.communitiesManager.IsEncrypted(chat.CommunityID) if err != nil { diff --git a/protocol/messenger_communities.go b/protocol/messenger_communities.go index 8049f24ed0..d735dd6df3 100644 --- a/protocol/messenger_communities.go +++ b/protocol/messenger_communities.go @@ -951,7 +951,7 @@ func (m *Messenger) initCommunityChats(community *communities.Community) ([]*Cha chats := CreateCommunityChats(community, m.getTimesource()) for _, chat := range chats { - publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{ChatID: chat.ID, PubsubTopic: community.PubsubTopic()}) + publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{ChatID: chat.ID, PubsubTopic: community.PubsubTopic(), ContentTopicOverrideID: community.MemberUpdateChannelID()}) } @@ -2394,7 +2394,7 @@ func (m *Messenger) CreateCommunityChat(communityID types.HexBytes, c *protobuf. for chatID, chat := range changes.ChatsAdded { c := CreateCommunityChat(changes.Community.IDString(), chatID, chat, m.getTimesource()) chats = append(chats, c) - publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{ChatID: c.ID, PubsubTopic: changes.Community.PubsubTopic()}) + publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{ChatID: c.ID, PubsubTopic: changes.Community.PubsubTopic(), ContentTopicOverrideID: changes.Community.MemberUpdateChannelID()}) response.AddChat(c) } @@ -2490,9 +2490,9 @@ func (m *Messenger) DefaultFilters(o *communities.Community) []transport.Filters filters := []transport.FiltersToInitialize{ {ChatID: cID, PubsubTopic: communityPubsubTopic}, - {ChatID: updatesChannelID, PubsubTopic: communityPubsubTopic}, - {ChatID: mlChannelID, PubsubTopic: communityPubsubTopic}, - {ChatID: memberUpdateChannelID, PubsubTopic: communityPubsubTopic}, + {ChatID: updatesChannelID, PubsubTopic: communityPubsubTopic, ContentTopicOverrideID: o.MemberUpdateChannelID()}, + {ChatID: mlChannelID, PubsubTopic: communityPubsubTopic, ContentTopicOverrideID: o.MemberUpdateChannelID()}, + {ChatID: memberUpdateChannelID, PubsubTopic: communityPubsubTopic, ContentTopicOverrideID: o.MemberUpdateChannelID()}, {ChatID: uncompressedPubKey, PubsubTopic: shard.DefaultNonProtectedPubsubTopic()}, } @@ -3406,8 +3406,9 @@ func (m *Messenger) handleCommunityResponse(state *ReceivedMessageState, communi state.Response.AddChat(chat) publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{ - ChatID: chat.ID, - PubsubTopic: community.PubsubTopic(), + ChatID: chat.ID, + PubsubTopic: community.PubsubTopic(), + ContentTopicOverrideID: community.MemberUpdateChannelID(), }) // Update name, currently is the only field is mutable } else if oldChat.Name != chat.Name || diff --git a/protocol/transport/filters_manager.go b/protocol/transport/filters_manager.go index 5393d63bf7..3597a091ed 100644 --- a/protocol/transport/filters_manager.go +++ b/protocol/transport/filters_manager.go @@ -99,7 +99,7 @@ func (f *FiltersManager) Init( // Add public, one-to-one and negotiated filters. for _, fi := range filtersToInit { - _, err := f.LoadPublic(fi.ChatID, fi.PubsubTopic) + _, err := f.LoadPublic(fi.ChatID, fi.PubsubTopic, fi.ContentTopicOverrideID) if err != nil { return nil, err } @@ -123,15 +123,16 @@ func (f *FiltersManager) Init( } type FiltersToInitialize struct { - ChatID string - PubsubTopic string + ChatID string + PubsubTopic string + ContentTopicOverrideID string //litte hacky but this is used to override content-topic in filtersManager. } func (f *FiltersManager) InitPublicFilters(publicFiltersToInit []FiltersToInitialize) ([]*Filter, error) { var filters []*Filter // Add public, one-to-one and negotiated filters. for _, pf := range publicFiltersToInit { - f, err := f.LoadPublic(pf.ChatID, pf.PubsubTopic) + f, err := f.LoadPublic(pf.ChatID, pf.PubsubTopic, pf.ContentTopicOverrideID) if err != nil { return nil, err } @@ -455,7 +456,7 @@ func (f *FiltersManager) LoadNegotiated(secret types.NegotiatedSecret) (*Filter, } keyString := hex.EncodeToString(secret.Key) - filter, err := f.addSymmetric(keyString, "") + filter, err := f.addSymmetric(keyString, "", "") if err != nil { f.logger.Debug("could not register negotiated topic", zap.Error(err)) return nil, err @@ -534,7 +535,7 @@ func (f *FiltersManager) PersonalTopicFilter() *Filter { } // LoadPublic adds a filter for a public chat. -func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter, error) { +func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string, contentTopicID string) (*Filter, error) { f.mutex.Lock() defer f.mutex.Unlock() @@ -553,7 +554,7 @@ func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter, return chat, nil } - filterAndTopic, err := f.addSymmetric(chatID, pubsubTopic) + filterAndTopic, err := f.addSymmetric(chatID, pubsubTopic, contentTopicID) if err != nil { f.logger.Debug("could not register public chat topic", zap.String("chatID", chatID), zap.Error(err)) return nil, err @@ -592,7 +593,7 @@ func (f *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, erro return f.filters[chatID], nil } - contactCodeFilter, err := f.addSymmetric(chatID, "") + contactCodeFilter, err := f.addSymmetric(chatID, "", "") if err != nil { f.logger.Debug("could not register contact code topic", zap.String("chatID", chatID), zap.Error(err)) return nil, err @@ -615,7 +616,7 @@ func (f *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, erro } // addSymmetric adds a symmetric key filter -func (f *FiltersManager) addSymmetric(chatID string, pubsubTopic string) (*RawFilter, error) { +func (f *FiltersManager) addSymmetric(chatID string, pubsubTopic string, contentTopicID string) (*RawFilter, error) { var symKeyID string var err error @@ -644,6 +645,12 @@ func (f *FiltersManager) addSymmetric(chatID string, pubsubTopic string) (*RawFi } } + if contentTopicID != "" { + //override with single contentTopic for all community chats + topic = ToTopic(contentTopicID) + topics = append(topics, topic) + } + id, err := f.service.Subscribe(&types.SubscriptionOptions{ SymKeyID: symKeyID, PoW: minPow, diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index 2d873a1759..2ae759a24d 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -190,7 +190,7 @@ func (t *Transport) ProcessNegotiatedSecret(secret types.NegotiatedSecret) (*Fil } func (t *Transport) JoinPublic(chatID string) (*Filter, error) { - return t.filters.LoadPublic(chatID, "") + return t.filters.LoadPublic(chatID, "", "") } func (t *Transport) LeavePublic(chatID string) error { @@ -279,12 +279,11 @@ func (t *Transport) SendPublic(ctx context.Context, newMessage *types.NewMessage if err := t.addSig(newMessage); err != nil { return nil, err } - - filter, err := t.filters.LoadPublic(chatName, newMessage.PubsubTopic) + //passing content-topic override, it will be used if set. otherwise chatName will be used to load filter. + filter, err := t.filters.LoadPublic(chatName, newMessage.PubsubTopic, newMessage.ContentTopicOverride) if err != nil { return nil, err } - newMessage.SymKeyID = filter.SymKeyID newMessage.Topic = filter.ContentTopic newMessage.PubsubTopic = filter.PubsubTopic @@ -361,7 +360,8 @@ func (t *Transport) SendCommunityMessage(ctx context.Context, newMessage *types. } // We load the filter to make sure we can post on it - filter, err := t.filters.LoadPublic(PubkeyToHex(publicKey)[2:], newMessage.PubsubTopic) + //passing content-topic override, it will be used if set. otherwise chatName will be used to load filter. + filter, err := t.filters.LoadPublic(PubkeyToHex(publicKey)[2:], newMessage.PubsubTopic, newMessage.ContentTopicOverride) if err != nil { return nil, err }