Skip to content

Commit

Permalink
feat_: poc to use single content-topic for all community chats
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem committed Sep 25, 2024
1 parent a84f78f commit 48283d7
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 24 deletions.
2 changes: 1 addition & 1 deletion protocol/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -1888,7 +1888,7 @@ func (m *Messenger) InitFilters() error {
}
}

filtersToInit = append(filtersToInit, transport.FiltersToInitialize{ChatID: chat.ID, PubsubTopic: community.PubsubTopic()})
filtersToInit = append(filtersToInit, transport.FiltersToInitialize{ChatID: chat.ID, PubsubTopic: community.PubsubTopic(), ContentTopic: community.MemberUpdateChannelID()})
case ChatTypeOneToOne:
pk, err := chat.PublicKey()
if err != nil {
Expand Down
17 changes: 9 additions & 8 deletions protocol/messenger_communities.go
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,7 @@ func (m *Messenger) initCommunityChats(community *communities.Community) ([]*Cha
chats := CreateCommunityChats(community, m.getTimesource())

for _, chat := range chats {
publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{ChatID: chat.ID, PubsubTopic: community.PubsubTopic()})
publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{ChatID: chat.ID, PubsubTopic: community.PubsubTopic(), ContentTopic: community.MemberUpdateChannelID()})

}

Expand Down Expand Up @@ -2394,7 +2394,7 @@ func (m *Messenger) CreateCommunityChat(communityID types.HexBytes, c *protobuf.
for chatID, chat := range changes.ChatsAdded {
c := CreateCommunityChat(changes.Community.IDString(), chatID, chat, m.getTimesource())
chats = append(chats, c)
publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{ChatID: c.ID, PubsubTopic: changes.Community.PubsubTopic()})
publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{ChatID: c.ID, PubsubTopic: changes.Community.PubsubTopic(), ContentTopic: changes.Community.IDString()})

response.AddChat(c)
}
Expand Down Expand Up @@ -2489,10 +2489,10 @@ func (m *Messenger) DefaultFilters(o *communities.Community) []transport.Filters
communityPubsubTopic := o.PubsubTopic()

filters := []transport.FiltersToInitialize{
{ChatID: cID, PubsubTopic: communityPubsubTopic},
{ChatID: updatesChannelID, PubsubTopic: communityPubsubTopic},
{ChatID: mlChannelID, PubsubTopic: communityPubsubTopic},
{ChatID: memberUpdateChannelID, PubsubTopic: communityPubsubTopic},
{ChatID: cID, PubsubTopic: communityPubsubTopic, ContentTopic: o.MemberUpdateChannelID()},
{ChatID: updatesChannelID, PubsubTopic: communityPubsubTopic, ContentTopic: o.MemberUpdateChannelID()},
{ChatID: mlChannelID, PubsubTopic: communityPubsubTopic, ContentTopic: o.MemberUpdateChannelID()},
{ChatID: memberUpdateChannelID, PubsubTopic: communityPubsubTopic, ContentTopic: o.MemberUpdateChannelID()},
{ChatID: uncompressedPubKey, PubsubTopic: shard.DefaultNonProtectedPubsubTopic()},
}

Expand Down Expand Up @@ -3406,8 +3406,9 @@ func (m *Messenger) handleCommunityResponse(state *ReceivedMessageState, communi

state.Response.AddChat(chat)
publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{
ChatID: chat.ID,
PubsubTopic: community.PubsubTopic(),
ChatID: chat.ID,
PubsubTopic: community.PubsubTopic(),
ContentTopic: community.MemberUpdateChannelID(),
})
// Update name, currently is the only field is mutable
} else if oldChat.Name != chat.Name ||
Expand Down
29 changes: 20 additions & 9 deletions protocol/transport/filters_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (f *FiltersManager) Init(

// Add public, one-to-one and negotiated filters.
for _, fi := range filtersToInit {
_, err := f.LoadPublic(fi.ChatID, fi.PubsubTopic)
_, err := f.LoadPublic(fi.ChatID, fi.PubsubTopic, fi.ContentTopic)
if err != nil {
return nil, err
}
Expand All @@ -123,15 +123,16 @@ func (f *FiltersManager) Init(
}

type FiltersToInitialize struct {
ChatID string
PubsubTopic string
ChatID string
PubsubTopic string
ContentTopic string //litte hacky but this is used to override content-topic in filtersManager.
}

func (f *FiltersManager) InitPublicFilters(publicFiltersToInit []FiltersToInitialize) ([]*Filter, error) {
var filters []*Filter
// Add public, one-to-one and negotiated filters.
for _, pf := range publicFiltersToInit {
f, err := f.LoadPublic(pf.ChatID, pf.PubsubTopic)
f, err := f.LoadPublic(pf.ChatID, pf.PubsubTopic, pf.ContentTopic)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -455,7 +456,7 @@ func (f *FiltersManager) LoadNegotiated(secret types.NegotiatedSecret) (*Filter,
}

keyString := hex.EncodeToString(secret.Key)
filter, err := f.addSymmetric(keyString, "")
filter, err := f.addSymmetric(keyString, "", "")
if err != nil {
f.logger.Debug("could not register negotiated topic", zap.Error(err))
return nil, err
Expand Down Expand Up @@ -534,7 +535,7 @@ func (f *FiltersManager) PersonalTopicFilter() *Filter {
}

// LoadPublic adds a filter for a public chat.
func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter, error) {
func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string, contentTopic string) (*Filter, error) {
f.mutex.Lock()
defer f.mutex.Unlock()

Expand All @@ -553,7 +554,7 @@ func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter,
return chat, nil
}

filterAndTopic, err := f.addSymmetric(chatID, pubsubTopic)
filterAndTopic, err := f.addSymmetric(chatID, pubsubTopic, contentTopic)
if err != nil {
f.logger.Debug("could not register public chat topic", zap.String("chatID", chatID), zap.Error(err))
return nil, err
Expand All @@ -570,12 +571,15 @@ func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter,
}

f.filters[chatID] = chat
symKey, _ := f.service.GetSymKey(filterAndTopic.SymKeyID)

f.logger.Debug("registering filter for",
zap.String("chatID", chatID),
zap.String("type", "public"),
zap.String("ContentTopic", filterAndTopic.Topic.String()),
zap.String("PubsubTopic", pubsubTopic),
zap.String("symKeyID", filterAndTopic.SymKeyID),
zap.String("symKey", string(symKey)),
)

return chat, nil
Expand All @@ -592,7 +596,7 @@ func (f *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, erro
return f.filters[chatID], nil
}

contactCodeFilter, err := f.addSymmetric(chatID, "")
contactCodeFilter, err := f.addSymmetric(chatID, "", "")
if err != nil {
f.logger.Debug("could not register contact code topic", zap.String("chatID", chatID), zap.Error(err))
return nil, err
Expand All @@ -615,7 +619,7 @@ func (f *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, erro
}

// addSymmetric adds a symmetric key filter
func (f *FiltersManager) addSymmetric(chatID string, pubsubTopic string) (*RawFilter, error) {
func (f *FiltersManager) addSymmetric(chatID string, pubsubTopic string, contentTopic string) (*RawFilter, error) {
var symKeyID string
var err error

Expand Down Expand Up @@ -644,6 +648,13 @@ func (f *FiltersManager) addSymmetric(chatID string, pubsubTopic string) (*RawFi
}
}

if contentTopic != "" {
oldTopic := topic
//override with single contentTopic for all community chats
topic = ToTopic(contentTopic)
f.logger.Debug("changing content-topic", zap.String("from", string(oldTopic)), zap.String("to", string(topic)))
}

id, err := f.service.Subscribe(&types.SubscriptionOptions{
SymKeyID: symKeyID,
PoW: minPow,
Expand Down
13 changes: 8 additions & 5 deletions protocol/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (t *Transport) ProcessNegotiatedSecret(secret types.NegotiatedSecret) (*Fil
}

func (t *Transport) JoinPublic(chatID string) (*Filter, error) {
return t.filters.LoadPublic(chatID, "")
return t.filters.LoadPublic(chatID, "", "")
}

func (t *Transport) LeavePublic(chatID string) error {
Expand Down Expand Up @@ -279,12 +279,13 @@ func (t *Transport) SendPublic(ctx context.Context, newMessage *types.NewMessage
if err := t.addSig(newMessage); err != nil {
return nil, err
}

filter, err := t.filters.LoadPublic(chatName, newMessage.PubsubTopic)
//passing empty communityID as the filter should have already been loaded.
//TODO: is there a scenario where filter is not loaded until a message is sent?
filter, err := t.filters.LoadPublic(chatName, newMessage.PubsubTopic, "")
if err != nil {
return nil, err
}

t.logger.Debug("sending public message", zap.String("keyID", filter.SymKeyID), zap.String("contenttopic", filter.ContentTopic.String()))
newMessage.SymKeyID = filter.SymKeyID
newMessage.Topic = filter.ContentTopic
newMessage.PubsubTopic = filter.PubsubTopic
Expand Down Expand Up @@ -361,7 +362,9 @@ func (t *Transport) SendCommunityMessage(ctx context.Context, newMessage *types.
}

// We load the filter to make sure we can post on it
filter, err := t.filters.LoadPublic(PubkeyToHex(publicKey)[2:], newMessage.PubsubTopic)
//passing empty communityID as the filter should have already been loaded.
//TODO: is there a scenario where filter is not loaded until a message is sent?
filter, err := t.filters.LoadPublic(PubkeyToHex(publicKey)[2:], newMessage.PubsubTopic, "")
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion wakuv2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/waku-org/go-waku/waku/v2/payload"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"go.uber.org/zap"

"github.com/status-im/status-go/wakuv2/common"

Expand Down Expand Up @@ -257,7 +258,7 @@ func (api *PublicWakuAPI) Post(ctx context.Context, req NewMessage) (hexutil.Byt
}

hash, err := api.w.Send(req.PubsubTopic, wakuMsg, req.Priority)

api.w.logger.Debug("sent msg on waku ", zap.ByteString("symKey", keyInfo.SymKey))
if err != nil {
return nil, err
}
Expand Down
9 changes: 9 additions & 0 deletions wakuv2/common/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,15 @@ func (fs *Filters) NotifyWatchers(recvMessage *ReceivedMessage) bool {
}

for _, watcher := range candidates {
log.Debug("candidate filter for msg:",
//zap.String("chatID", watcher.),
//zap.String("type", "public"),
recvMessage.Hash().Hex(),
zap.String("ContentTopic", recvMessage.ContentTopic.String()),
zap.String("PubsubTopic", recvMessage.PubsubTopic),
zap.String("symKey", string(watcher.KeySym)),
)

// Messages are decrypted successfully only once
if decodedMsg == nil {
decodedMsg = recvMessage.Open(watcher)
Expand Down

0 comments on commit 48283d7

Please sign in to comment.