Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mediator) : Add TTL for message collection #290

Merged
merged 1 commit into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,21 @@ To set up the mediator storage (MongoDB):
- `MONGODB_PASSWORD` - is the password used by the Mediator service to connect to the database.
- `MONGODB_DB_NAME` - is the name of the database used by the Mediator.

#### Mediator storage
- The `messages` collection contains two types of messages: `Mediator` and `User`.
1. **Mediator Messages**:
- These messages received by mediator for any interactions with the mediator.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • The mediator receives these messages for any interactions with the mediator.

- Examples include messages for setting up mediation, requesting mediation, or picking up messages from the mediator.
- These messages stored in collection can be used for debugging purpose mediator functionality and interactions with the mediator. Hence they can be deleted after a period of time.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • The messages stored in the collection are usable for debugging purposes, mediator functionality, and interactions with the mediator. Hence, after a predetermined period, deleting them is possible.

- This message type `Mediator` can be setup to have a configurable Time-To-Live (TTL) value, after which they can expire.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

- This message type `Mediator` can be set up to have a configurable Time-To-Live (TTL) value, after which they can expire.

- This is how the TTL can be configured for the collection messages [initdb.js](initdb.js)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • This is how the TTL is configurable for the collection messages initdb.js

2. **User Messages**:
- These are the actual messages e.g like the Forward message from the mediator, contain a `User` message inside. This inside message is stored as type `User` to be delivered to user.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • These are the actual messages, e.g. the Forward message from the mediator, containing a User message inside. This inside message gets stored as User deliverable to the user.

- They do not have a TTL, and will persist in the system until the user retrieves them using a pickup protocol and deletes them.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

- They do not have a TTL and will persist in the system until the user retrieves them using a pickup protocol and deletes them.

- The mediator is responsible for storing and making these user messages available for delivery to the intended recipients.

ℹ️ For existing users, please utilize the migration script [migration_mediator_collection.js](migration_mediator_collection.js) to migrate the collection.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

::: info
For existing users, please utilize the migration script migration_mediator_collection.js to migrate the collection.
:::

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not valid in markdown file :::info


## Run

The DIDComm Mediator comprises two elements: a backend service and a database.
Expand Down
10 changes: 10 additions & 0 deletions infrastructure/charts/mediator/templates/mongodb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,16 @@ data:
// Only enforce uniqueness on non-empty arrays
db.getCollection(collectionDidAccount).createIndex({ 'alias': 1 }, { unique: true , partialFilterExpression: { "alias.0": { $exists: true } }});
db.getCollection(collectionDidAccount).createIndex({ "messagesRef.hash": 1, "messagesRef.recipient": 1 });
// 7 day * 24 hours * 60 minutes * 60 seconds
const expireAfterSeconds = 7 * 24 * 60 * 60;
db.getCollection(collectionMessages).createIndex(
{ ts: 1 },
{
name: "message-ttl-index",
partialFilterExpression: { "message_type" : "Mediator" },
expireAfterSeconds: expireAfterSeconds
}
);
---
apiVersion: v1
kind: Service
Expand Down
11 changes: 11 additions & 0 deletions initdb.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,14 @@ db.getCollection(collectionDidAccount).createIndex({ 'did': 1 }, { unique: true
// Only enforce uniqueness on non-empty arrays
db.getCollection(collectionDidAccount).createIndex({ 'alias': 1 }, { unique: true, partialFilterExpression: { "alias.0": { $exists: true } } });
db.getCollection(collectionDidAccount).createIndex({ "messagesRef.hash": 1, "messagesRef.recipient": 1 });

// There are 2 message types `Mediator` and `User` Please follow the Readme for more details in the section Mediator storage
const expireAfterSeconds = 7 * 24 * 60 * 60; // 7 day * 24 hours * 60 minutes * 60 seconds
db.getCollection(collectionMessages).createIndex(
{ ts: 1 },
{
name: "message-ttl-index",
partialFilterExpression: { "message_type" : "Mediator" },
expireAfterSeconds: expireAfterSeconds
}
)
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ object AgentExecutorMediator {
em.`protected`.obj match
case AnonProtectedHeader(epk, apv, typ, enc, alg) => ops.anonDecrypt(em)
case AuthProtectedHeader(epk, apv, skid, apu, typ, enc, alg) => ops.authDecrypt(em)
}.flatMap(decrypt _)
}.flatMap(decrypt)
case sm: SignedMessage =>
ops.verify(sm).flatMap {
case false => ZIO.fail(ValidationFailed)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,33 @@ type HASH = String
// messages
type XRequestID = String // x-request-id

enum MessageType {
case Mediator, User
}

case class MessageItem(
_id: HASH,
msg: SignedMessage | EncryptedMessage,
headers: ProtectedHeader | Seq[SignProtectedHeader],
ts: String,
ts: Instant,
mineme0110 marked this conversation as resolved.
Show resolved Hide resolved
message_type: MessageType,
xRequestId: Option[XRequestID]
)
object MessageItem {
def apply(msg: SignedMessage | EncryptedMessage, xRequestId: Option[XRequestID]): MessageItem =
def apply(msg: SignedMessage | EncryptedMessage, messageType: MessageType, xRequestId: Option[XRequestID]): MessageItem =
val now = Instant.now()
msg match {
case sMsg: SignedMessage =>
new MessageItem(
msg.sha256,
msg,
sMsg.signatures.map(_.`protected`.obj),
Instant.now().toString,
now,
messageType,
xRequestId
)
case eMsg: EncryptedMessage =>
new MessageItem(msg.sha256, msg, eMsg.`protected`.obj, Instant.now().toString, xRequestId)
new MessageItem(msg.sha256, msg, eMsg.`protected`.obj, now, messageType, xRequestId)
}

given BSONWriter[ProtectedHeader | Seq[SignProtectedHeader]] with {
Expand Down Expand Up @@ -72,6 +79,25 @@ object MessageItem {
}
}

given BSONWriter[MessageType] with
def writeTry(value: MessageType): Try[BSONValue] = Try {
value match {
case MessageType.Mediator => BSONString("Mediator")
case MessageType.User => BSONString("User")
}
}

given BSONReader[MessageType] with
def readTry(bson: BSONValue): Try[MessageType] = Try {
bson match {
case BSONString("Mediator") => MessageType.Mediator
case BSONString("User") => MessageType.User
case _ => throw new RuntimeException("Invalid MessagePurpose value in BSON")
}
}



given BSONDocumentWriter[MessageItem] = Macros.writer[MessageItem]
given BSONDocumentReader[MessageItem] = Macros.reader[MessageItem]
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package org.hyperledger.identus.mediator.db

import fmgp.did.*
import fmgp.did.comm.{SignedMessage, EncryptedMessage}
import fmgp.did.comm.{EncryptedMessage, SignedMessage}
import org.hyperledger.identus.mediator.db.MessageType.Mediator
import org.hyperledger.identus.mediator.{DuplicateMessage, StorageCollection, StorageError, StorageThrowable}
import reactivemongo.api.bson.*
import reactivemongo.api.bson.collection.BSONCollection
Expand All @@ -28,13 +29,13 @@ class MessageItemRepo(reactiveMongoApi: ReactiveMongoApi)(using ec: ExecutionCon
.map(_.collection(collectionName))
.mapError(ex => StorageCollection(ex))

def insert(msg: SignedMessage | EncryptedMessage): IO[StorageError, WriteResult] = {
def insert(msg: SignedMessage | EncryptedMessage, messageType: MessageType = Mediator): IO[StorageError, WriteResult] = {
for {
_ <- ZIO.logInfo("insert")
_ <- ZIO.logInfo(s"insert $messageType")
xRequestId <- ZIO.logAnnotations.map(_.get(XRequestId.value))
coll <- collection
result <- ZIO
.fromFuture(implicit ec => coll.insert.one(MessageItem(msg, xRequestId)))
.fromFuture(implicit ec => coll.insert.one(MessageItem(msg, messageType, xRequestId)))
.tapError(err => ZIO.logError(s"insert : ${err.getMessage}"))
.mapError {
case ex: DatabaseException if (ex.code.contains(DuplicateMessage.code)) => DuplicateMessage(ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.hyperledger.identus.mediator.db.*
import zio.*
import zio.json.*
import fmgp.did.comm.protocol.pickup3.MessageDelivery
import org.hyperledger.identus.mediator.db.MessageType.User

object ForwardMessageExecuter
extends ProtocolExecuter[
Expand All @@ -36,7 +37,7 @@ object ForwardMessageExecuter
msg <-
if (numbreOfUpdated > 0) { // Or maybe we can add all the time
for {
_ <- repoMessageItem.insert(m.msg)
_ <- repoMessageItem.insert(m.msg, User)
_ <- ZIO.logInfo("Add next msg (of the ForwardMessage) to the Message Repo")

// For Live Mode
Expand Down
31 changes: 31 additions & 0 deletions migration_mediator_collection.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// migration script
// Please utilize the following script to update your existing collection for Mediator release v0.14.5 and beyond.
const collectionName = 'messages';
const collectionNameUserAccount = 'user.account';
let userHashes = [];

db.getCollection(collectionNameUserAccount).find({}).forEach(function(user) {
user.messagesRef.forEach(function(messageRef) {
userHashes.push(messageRef.hash);
});
});

db.getCollection('messages').find({}).forEach(function(message) {
let newTimestamp = new Date(message.ts);
if(userHashes.includes(message._id)) {
db.getCollection('messages').updateOne({ _id: message._id }, { $set: { message_type: 'User', ts: newTimestamp } });
} else {
db.getCollection('messages').updateOne({ _id: message._id }, { $set: { message_type: 'Mediator', ts: newTimestamp } });
}
});

// There are 2 message types `Mediator` and `User` Please follow the Readme for more details in the section Mediator storage
const expireAfterSeconds = 7 * 24 * 60 * 60; // 7 day * 24 hours * 60 minutes * 60 seconds
db.getCollection(collectionMessages).createIndex(
{ ts: 1 },
{
name: "message-ttl-index",
partialFilterExpression: { "message_type" : "Mediator" },
expireAfterSeconds: expireAfterSeconds
}
)
Loading