diff --git a/protocol/messenger.go b/protocol/messenger.go index 2e976032620..fda667817a7 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -1888,7 +1888,7 @@ func (m *Messenger) InitFilters() error { } } - filtersToInit = append(filtersToInit, transport.FiltersToInitialize{ChatID: chat.ID, PubsubTopic: community.PubsubTopic()}) + filtersToInit = append(filtersToInit, transport.FiltersToInitialize{ChatID: chat.ID, PubsubTopic: community.PubsubTopic(), ContentTopic: community.MemberUpdateChannelID()}) case ChatTypeOneToOne: pk, err := chat.PublicKey() if err != nil { diff --git a/protocol/messenger_communities.go b/protocol/messenger_communities.go index 8049f24ed04..8d57a6e037b 100644 --- a/protocol/messenger_communities.go +++ b/protocol/messenger_communities.go @@ -951,7 +951,7 @@ func (m *Messenger) initCommunityChats(community *communities.Community) ([]*Cha chats := CreateCommunityChats(community, m.getTimesource()) for _, chat := range chats { - publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{ChatID: chat.ID, PubsubTopic: community.PubsubTopic()}) + publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{ChatID: chat.ID, PubsubTopic: community.PubsubTopic(), ContentTopic: community.MemberUpdateChannelID()}) } @@ -2394,7 +2394,7 @@ func (m *Messenger) CreateCommunityChat(communityID types.HexBytes, c *protobuf. for chatID, chat := range changes.ChatsAdded { c := CreateCommunityChat(changes.Community.IDString(), chatID, chat, m.getTimesource()) chats = append(chats, c) - publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{ChatID: c.ID, PubsubTopic: changes.Community.PubsubTopic()}) + publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{ChatID: c.ID, PubsubTopic: changes.Community.PubsubTopic(), ContentTopic: changes.Community.IDString()}) response.AddChat(c) } @@ -2489,10 +2489,10 @@ func (m *Messenger) DefaultFilters(o *communities.Community) []transport.Filters communityPubsubTopic := o.PubsubTopic() filters := []transport.FiltersToInitialize{ - {ChatID: cID, PubsubTopic: communityPubsubTopic}, - {ChatID: updatesChannelID, PubsubTopic: communityPubsubTopic}, - {ChatID: mlChannelID, PubsubTopic: communityPubsubTopic}, - {ChatID: memberUpdateChannelID, PubsubTopic: communityPubsubTopic}, + {ChatID: cID, PubsubTopic: communityPubsubTopic, ContentTopic: o.MemberUpdateChannelID()}, + {ChatID: updatesChannelID, PubsubTopic: communityPubsubTopic, ContentTopic: o.MemberUpdateChannelID()}, + {ChatID: mlChannelID, PubsubTopic: communityPubsubTopic, ContentTopic: o.MemberUpdateChannelID()}, + {ChatID: memberUpdateChannelID, PubsubTopic: communityPubsubTopic, ContentTopic: o.MemberUpdateChannelID()}, {ChatID: uncompressedPubKey, PubsubTopic: shard.DefaultNonProtectedPubsubTopic()}, } @@ -3406,8 +3406,9 @@ func (m *Messenger) handleCommunityResponse(state *ReceivedMessageState, communi state.Response.AddChat(chat) publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{ - ChatID: chat.ID, - PubsubTopic: community.PubsubTopic(), + ChatID: chat.ID, + PubsubTopic: community.PubsubTopic(), + ContentTopic: community.MemberUpdateChannelID(), }) // Update name, currently is the only field is mutable } else if oldChat.Name != chat.Name || diff --git a/protocol/transport/filters_manager.go b/protocol/transport/filters_manager.go index 5393d63bf78..517296b2bcd 100644 --- a/protocol/transport/filters_manager.go +++ b/protocol/transport/filters_manager.go @@ -99,7 +99,7 @@ func (f *FiltersManager) Init( // Add public, one-to-one and negotiated filters. for _, fi := range filtersToInit { - _, err := f.LoadPublic(fi.ChatID, fi.PubsubTopic) + _, err := f.LoadPublic(fi.ChatID, fi.PubsubTopic, fi.ContentTopic) if err != nil { return nil, err } @@ -123,15 +123,16 @@ func (f *FiltersManager) Init( } type FiltersToInitialize struct { - ChatID string - PubsubTopic string + ChatID string + PubsubTopic string + ContentTopic string //litte hacky but this is used to override content-topic in filtersManager. } func (f *FiltersManager) InitPublicFilters(publicFiltersToInit []FiltersToInitialize) ([]*Filter, error) { var filters []*Filter // Add public, one-to-one and negotiated filters. for _, pf := range publicFiltersToInit { - f, err := f.LoadPublic(pf.ChatID, pf.PubsubTopic) + f, err := f.LoadPublic(pf.ChatID, pf.PubsubTopic, pf.ContentTopic) if err != nil { return nil, err } @@ -455,7 +456,7 @@ func (f *FiltersManager) LoadNegotiated(secret types.NegotiatedSecret) (*Filter, } keyString := hex.EncodeToString(secret.Key) - filter, err := f.addSymmetric(keyString, "") + filter, err := f.addSymmetric(keyString, "", "") if err != nil { f.logger.Debug("could not register negotiated topic", zap.Error(err)) return nil, err @@ -534,7 +535,7 @@ func (f *FiltersManager) PersonalTopicFilter() *Filter { } // LoadPublic adds a filter for a public chat. -func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter, error) { +func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string, contentTopic string) (*Filter, error) { f.mutex.Lock() defer f.mutex.Unlock() @@ -553,7 +554,7 @@ func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter, return chat, nil } - filterAndTopic, err := f.addSymmetric(chatID, pubsubTopic) + filterAndTopic, err := f.addSymmetric(chatID, pubsubTopic, contentTopic) if err != nil { f.logger.Debug("could not register public chat topic", zap.String("chatID", chatID), zap.Error(err)) return nil, err @@ -570,12 +571,15 @@ func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter, } f.filters[chatID] = chat + symKey, _ := f.service.GetSymKey(filterAndTopic.SymKeyID) f.logger.Debug("registering filter for", zap.String("chatID", chatID), zap.String("type", "public"), zap.String("ContentTopic", filterAndTopic.Topic.String()), zap.String("PubsubTopic", pubsubTopic), + zap.String("symKeyID", filterAndTopic.SymKeyID), + zap.String("symKey", string(symKey)), ) return chat, nil @@ -592,7 +596,7 @@ func (f *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, erro return f.filters[chatID], nil } - contactCodeFilter, err := f.addSymmetric(chatID, "") + contactCodeFilter, err := f.addSymmetric(chatID, "", "") if err != nil { f.logger.Debug("could not register contact code topic", zap.String("chatID", chatID), zap.Error(err)) return nil, err @@ -615,7 +619,7 @@ func (f *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, erro } // addSymmetric adds a symmetric key filter -func (f *FiltersManager) addSymmetric(chatID string, pubsubTopic string) (*RawFilter, error) { +func (f *FiltersManager) addSymmetric(chatID string, pubsubTopic string, contentTopic string) (*RawFilter, error) { var symKeyID string var err error @@ -644,6 +648,13 @@ func (f *FiltersManager) addSymmetric(chatID string, pubsubTopic string) (*RawFi } } + if contentTopic != "" { + oldTopic := topic + //override with single contentTopic for all community chats + topic = ToTopic(contentTopic) + f.logger.Debug("changing content-topic", zap.String("from", string(oldTopic)), zap.String("to", string(topic))) + } + id, err := f.service.Subscribe(&types.SubscriptionOptions{ SymKeyID: symKeyID, PoW: minPow, diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index 2d873a1759e..e9f1b997c02 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -190,7 +190,7 @@ func (t *Transport) ProcessNegotiatedSecret(secret types.NegotiatedSecret) (*Fil } func (t *Transport) JoinPublic(chatID string) (*Filter, error) { - return t.filters.LoadPublic(chatID, "") + return t.filters.LoadPublic(chatID, "", "") } func (t *Transport) LeavePublic(chatID string) error { @@ -279,12 +279,13 @@ func (t *Transport) SendPublic(ctx context.Context, newMessage *types.NewMessage if err := t.addSig(newMessage); err != nil { return nil, err } - - filter, err := t.filters.LoadPublic(chatName, newMessage.PubsubTopic) + //passing empty communityID as the filter should have already been loaded. + //TODO: is there a scenario where filter is not loaded until a message is sent? + filter, err := t.filters.LoadPublic(chatName, newMessage.PubsubTopic, "") if err != nil { return nil, err } - + t.logger.Debug("sending public message", zap.String("keyID", filter.SymKeyID), zap.String("contenttopic", filter.ContentTopic.String())) newMessage.SymKeyID = filter.SymKeyID newMessage.Topic = filter.ContentTopic newMessage.PubsubTopic = filter.PubsubTopic @@ -361,7 +362,9 @@ func (t *Transport) SendCommunityMessage(ctx context.Context, newMessage *types. } // We load the filter to make sure we can post on it - filter, err := t.filters.LoadPublic(PubkeyToHex(publicKey)[2:], newMessage.PubsubTopic) + //passing empty communityID as the filter should have already been loaded. + //TODO: is there a scenario where filter is not loaded until a message is sent? + filter, err := t.filters.LoadPublic(PubkeyToHex(publicKey)[2:], newMessage.PubsubTopic, "") if err != nil { return nil, err } diff --git a/wakuv2/api.go b/wakuv2/api.go index f106b32f52b..a02a7a8b608 100644 --- a/wakuv2/api.go +++ b/wakuv2/api.go @@ -28,6 +28,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/payload" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "go.uber.org/zap" "github.com/status-im/status-go/wakuv2/common" @@ -257,7 +258,7 @@ func (api *PublicWakuAPI) Post(ctx context.Context, req NewMessage) (hexutil.Byt } hash, err := api.w.Send(req.PubsubTopic, wakuMsg, req.Priority) - + api.w.logger.Debug("sent msg on waku ", zap.ByteString("symKey", keyInfo.SymKey)) if err != nil { return nil, err } diff --git a/wakuv2/common/filter.go b/wakuv2/common/filter.go index 7630a162ecd..86ef4046f69 100644 --- a/wakuv2/common/filter.go +++ b/wakuv2/common/filter.go @@ -235,6 +235,15 @@ func (fs *Filters) NotifyWatchers(recvMessage *ReceivedMessage) bool { } for _, watcher := range candidates { + log.Debug("candidate filter for msg:", + //zap.String("chatID", watcher.), + //zap.String("type", "public"), + recvMessage.Hash().Hex(), + zap.String("ContentTopic", recvMessage.ContentTopic.String()), + zap.String("PubsubTopic", recvMessage.PubsubTopic), + zap.String("symKey", string(watcher.KeySym)), + ) + // Messages are decrypted successfully only once if decodedMsg == nil { decodedMsg = recvMessage.Open(watcher)