Skip to content

Commit

Permalink
feat_: poc to use single content-topic for all community chats
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem committed Oct 9, 2024
1 parent a84f78f commit 8cb3323
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 43 deletions.
27 changes: 14 additions & 13 deletions eth-node/types/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@ import (

// NewMessage represents a new whisper message that is posted through the RPC.
type NewMessage struct {
SymKeyID string `json:"symKeyID"`
PublicKey []byte `json:"pubKey"`
SigID string `json:"sig"`
TTL uint32 `json:"ttl"`
PubsubTopic string `json:"pubsubTopic"`
Topic TopicType `json:"topic"`
Payload []byte `json:"payload"`
Padding []byte `json:"padding"`
PowTime uint32 `json:"powTime"`
PowTarget float64 `json:"powTarget"`
TargetPeer string `json:"targetPeer"`
Ephemeral bool `json:"ephemeral"`
Priority *int `json:"priority"`
SymKeyID string `json:"symKeyID"`
PublicKey []byte `json:"pubKey"`
SigID string `json:"sig"`
TTL uint32 `json:"ttl"`
PubsubTopic string `json:"pubsubTopic"`
Topic TopicType `json:"topic"`
Payload []byte `json:"payload"`
Padding []byte `json:"padding"`
PowTime uint32 `json:"powTime"`
PowTarget float64 `json:"powTarget"`
TargetPeer string `json:"targetPeer"`
Ephemeral bool `json:"ephemeral"`
Priority *int `json:"priority"`
ContentTopicOverride string `json:"contentTopicOverride"`
}

// Message is the RPC representation of a whisper message.
Expand Down
11 changes: 6 additions & 5 deletions protocol/common/message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,11 +660,12 @@ func (s *MessageSender) dispatchCommunityChatMessage(ctx context.Context, rawMes
}

newMessage := &types.NewMessage{
TTL: whisperTTL,
Payload: payload,
PowTarget: calculatePoW(payload),
PowTime: whisperPoWTime,
PubsubTopic: rawMessage.PubsubTopic,
TTL: whisperTTL,
Payload: payload,
PowTarget: calculatePoW(payload),
PowTime: whisperPoWTime,
PubsubTopic: rawMessage.PubsubTopic,
ContentTopicOverride: rawMessage.ContentTopicOverride,
}

if rawMessage.BeforeDispatch != nil {
Expand Down
1 change: 1 addition & 0 deletions protocol/common/raw_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,5 @@ type RawMessage struct {
ResendType ResendType
ResendMethod ResendMethod
Priority *MessagePriority
ContentTopicOverride string
}
4 changes: 4 additions & 0 deletions protocol/communities/manager_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,10 @@ func (m *ArchiveManager) StartHistoryArchiveTasksInterval(community *Community,
m.logger.Error("failed to get community chat topics ", zap.Error(err))
continue
}
// adding the content-topic used for member updates.
// since member updates would not be too frequent i.e only addition/deletion would add a new message,
// this shouldn't cause too much increase in size of archive generated.
topics = append(topics, m.transport.FilterByChatID(community.MemberUpdateChannelID()).ContentTopic)

ts := time.Now().Unix()
to := time.Unix(ts, 0)
Expand Down
3 changes: 2 additions & 1 deletion protocol/communities_key_distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ func (ckd *CommunitiesKeyDistributorImpl) sendKeyExchangeMessage(community *comm
Recipients: pubkeys,
MessageType: protobuf.ApplicationMetadataMessage_CHAT_MESSAGE,
HashRatchetGroupID: hashRatchetGroupID,
PubsubTopic: community.PubsubTopic(), // TODO: confirm if it should be sent in community pubsub topic
PubsubTopic: community.PubsubTopic(), // TODO: confirm if it should be sent in community pubsub topic
ContentTopicOverride: community.MemberUpdateChannelID(), //TODO: Confirm if this is correct, could not figure out where LocalChatID is set in this flow
}
_, err := ckd.sender.SendCommunityMessage(context.Background(), &rawMessage)

Expand Down
3 changes: 2 additions & 1 deletion protocol/communities_messenger_token_permissions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2152,7 +2152,8 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestImportDecryptedArchiveMe
startDate := messageDate.Add(-time.Minute)
endDate := messageDate.Add(time.Minute)
topic := types.BytesToTopic(transport.ToTopic(chat.ID))
topics := []types.TopicType{topic}
communityCommonTopic := types.BytesToTopic(transport.ToTopic(community.MemberUpdateChannelID()))
topics := []types.TopicType{topic, communityCommonTopic}

torrentConfig := params.TorrentConfig{
Enabled: true,
Expand Down
5 changes: 3 additions & 2 deletions protocol/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -1888,7 +1888,7 @@ func (m *Messenger) InitFilters() error {
}
}

filtersToInit = append(filtersToInit, transport.FiltersToInitialize{ChatID: chat.ID, PubsubTopic: community.PubsubTopic()})
filtersToInit = append(filtersToInit, transport.FiltersToInitialize{ChatID: chat.ID, PubsubTopic: community.PubsubTopic(), ContentTopicOverrideID: community.MemberUpdateChannelID()})
case ChatTypeOneToOne:
pk, err := chat.PublicKey()
if err != nil {
Expand Down Expand Up @@ -2196,7 +2196,8 @@ func (m *Messenger) dispatchMessage(ctx context.Context, rawMessage common.RawMe
)
return rawMessage, fmt.Errorf("can't post message type '%d' on chat '%s'", rawMessage.MessageType, chat.ID)
}

//setting content-topic over-ride for community messages to use memberUpdatesChannelID
rawMessage.ContentTopicOverride = community.MemberUpdateChannelID()
logger.Debug("sending community chat message", zap.String("chatName", chat.Name))
isCommunityEncrypted, err := m.communitiesManager.IsEncrypted(chat.CommunityID)
if err != nil {
Expand Down
19 changes: 12 additions & 7 deletions protocol/messenger_communities.go
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,7 @@ func (m *Messenger) initCommunityChats(community *communities.Community) ([]*Cha
chats := CreateCommunityChats(community, m.getTimesource())

for _, chat := range chats {
publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{ChatID: chat.ID, PubsubTopic: community.PubsubTopic()})
publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{ChatID: chat.ID, PubsubTopic: community.PubsubTopic(), ContentTopicOverrideID: community.MemberUpdateChannelID()})

}

Expand Down Expand Up @@ -2394,7 +2394,7 @@ func (m *Messenger) CreateCommunityChat(communityID types.HexBytes, c *protobuf.
for chatID, chat := range changes.ChatsAdded {
c := CreateCommunityChat(changes.Community.IDString(), chatID, chat, m.getTimesource())
chats = append(chats, c)
publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{ChatID: c.ID, PubsubTopic: changes.Community.PubsubTopic()})
publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{ChatID: c.ID, PubsubTopic: changes.Community.PubsubTopic(), ContentTopicOverrideID: changes.Community.MemberUpdateChannelID()})

response.AddChat(c)
}
Expand Down Expand Up @@ -2490,9 +2490,9 @@ func (m *Messenger) DefaultFilters(o *communities.Community) []transport.Filters

filters := []transport.FiltersToInitialize{
{ChatID: cID, PubsubTopic: communityPubsubTopic},
{ChatID: updatesChannelID, PubsubTopic: communityPubsubTopic},
{ChatID: mlChannelID, PubsubTopic: communityPubsubTopic},
{ChatID: memberUpdateChannelID, PubsubTopic: communityPubsubTopic},
{ChatID: updatesChannelID, PubsubTopic: communityPubsubTopic, ContentTopicOverrideID: o.MemberUpdateChannelID()},
{ChatID: mlChannelID, PubsubTopic: communityPubsubTopic, ContentTopicOverrideID: o.MemberUpdateChannelID()},
{ChatID: memberUpdateChannelID, PubsubTopic: communityPubsubTopic, ContentTopicOverrideID: o.MemberUpdateChannelID()},
{ChatID: uncompressedPubKey, PubsubTopic: shard.DefaultNonProtectedPubsubTopic()},
}

Expand Down Expand Up @@ -3406,8 +3406,9 @@ func (m *Messenger) handleCommunityResponse(state *ReceivedMessageState, communi

state.Response.AddChat(chat)
publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{
ChatID: chat.ID,
PubsubTopic: community.PubsubTopic(),
ChatID: chat.ID,
PubsubTopic: community.PubsubTopic(),
ContentTopicOverrideID: community.MemberUpdateChannelID(),
})
// Update name, currently is the only field is mutable
} else if oldChat.Name != chat.Name ||
Expand Down Expand Up @@ -3941,6 +3942,10 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community
for _, filter := range filters {
topics = append(topics, filter.ContentTopic)
}
// adding the content-topic used for member updates.
// since member updates would not be too frequent i.e only addition/deletion would add a new message,
// this shouldn't cause too much increase in size of archive generated.
filters = append(filters, m.transport.FilterByChatID(c.MemberUpdateChannelID()))

// First we need to know the timestamp of the latest waku message
// we've received for this community, so we can request messages we've
Expand Down
25 changes: 16 additions & 9 deletions protocol/transport/filters_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (f *FiltersManager) Init(

// Add public, one-to-one and negotiated filters.
for _, fi := range filtersToInit {
_, err := f.LoadPublic(fi.ChatID, fi.PubsubTopic)
_, err := f.LoadPublic(fi.ChatID, fi.PubsubTopic, fi.ContentTopicOverrideID)
if err != nil {
return nil, err
}
Expand All @@ -123,15 +123,16 @@ func (f *FiltersManager) Init(
}

type FiltersToInitialize struct {
ChatID string
PubsubTopic string
ChatID string
PubsubTopic string
ContentTopicOverrideID string //litte hacky but this is used to override content-topic in filtersManager.
}

func (f *FiltersManager) InitPublicFilters(publicFiltersToInit []FiltersToInitialize) ([]*Filter, error) {
var filters []*Filter
// Add public, one-to-one and negotiated filters.
for _, pf := range publicFiltersToInit {
f, err := f.LoadPublic(pf.ChatID, pf.PubsubTopic)
f, err := f.LoadPublic(pf.ChatID, pf.PubsubTopic, pf.ContentTopicOverrideID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -455,7 +456,7 @@ func (f *FiltersManager) LoadNegotiated(secret types.NegotiatedSecret) (*Filter,
}

keyString := hex.EncodeToString(secret.Key)
filter, err := f.addSymmetric(keyString, "")
filter, err := f.addSymmetric(keyString, "", "")
if err != nil {
f.logger.Debug("could not register negotiated topic", zap.Error(err))
return nil, err
Expand Down Expand Up @@ -534,7 +535,7 @@ func (f *FiltersManager) PersonalTopicFilter() *Filter {
}

// LoadPublic adds a filter for a public chat.
func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter, error) {
func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string, contentTopicID string) (*Filter, error) {
f.mutex.Lock()
defer f.mutex.Unlock()

Expand All @@ -553,7 +554,7 @@ func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter,
return chat, nil
}

filterAndTopic, err := f.addSymmetric(chatID, pubsubTopic)
filterAndTopic, err := f.addSymmetric(chatID, pubsubTopic, contentTopicID)
if err != nil {
f.logger.Debug("could not register public chat topic", zap.String("chatID", chatID), zap.Error(err))
return nil, err
Expand Down Expand Up @@ -592,7 +593,7 @@ func (f *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, erro
return f.filters[chatID], nil
}

contactCodeFilter, err := f.addSymmetric(chatID, "")
contactCodeFilter, err := f.addSymmetric(chatID, "", "")
if err != nil {
f.logger.Debug("could not register contact code topic", zap.String("chatID", chatID), zap.Error(err))
return nil, err
Expand All @@ -615,7 +616,7 @@ func (f *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, erro
}

// addSymmetric adds a symmetric key filter
func (f *FiltersManager) addSymmetric(chatID string, pubsubTopic string) (*RawFilter, error) {
func (f *FiltersManager) addSymmetric(chatID string, pubsubTopic string, contentTopicID string) (*RawFilter, error) {
var symKeyID string
var err error

Expand Down Expand Up @@ -644,6 +645,12 @@ func (f *FiltersManager) addSymmetric(chatID string, pubsubTopic string) (*RawFi
}
}

if contentTopicID != "" {
//override with single contentTopic for all community chats
topic = ToTopic(contentTopicID)
topics = append(topics, topic)
}

id, err := f.service.Subscribe(&types.SubscriptionOptions{
SymKeyID: symKeyID,
PoW: minPow,
Expand Down
10 changes: 5 additions & 5 deletions protocol/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (t *Transport) ProcessNegotiatedSecret(secret types.NegotiatedSecret) (*Fil
}

func (t *Transport) JoinPublic(chatID string) (*Filter, error) {
return t.filters.LoadPublic(chatID, "")
return t.filters.LoadPublic(chatID, "", "")
}

func (t *Transport) LeavePublic(chatID string) error {
Expand Down Expand Up @@ -279,12 +279,11 @@ func (t *Transport) SendPublic(ctx context.Context, newMessage *types.NewMessage
if err := t.addSig(newMessage); err != nil {
return nil, err
}

filter, err := t.filters.LoadPublic(chatName, newMessage.PubsubTopic)
//passing content-topic override, it will be used if set. otherwise chatName will be used to load filter.
filter, err := t.filters.LoadPublic(chatName, newMessage.PubsubTopic, newMessage.ContentTopicOverride)
if err != nil {
return nil, err
}

newMessage.SymKeyID = filter.SymKeyID
newMessage.Topic = filter.ContentTopic
newMessage.PubsubTopic = filter.PubsubTopic
Expand Down Expand Up @@ -361,7 +360,8 @@ func (t *Transport) SendCommunityMessage(ctx context.Context, newMessage *types.
}

// We load the filter to make sure we can post on it
filter, err := t.filters.LoadPublic(PubkeyToHex(publicKey)[2:], newMessage.PubsubTopic)
//passing content-topic override, it will be used if set. otherwise chatName will be used to load filter.
filter, err := t.filters.LoadPublic(PubkeyToHex(publicKey)[2:], newMessage.PubsubTopic, newMessage.ContentTopicOverride)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 8cb3323

Please sign in to comment.