diff --git a/README.md b/README.md
index d7b305bf..447b5434 100644
--- a/README.md
+++ b/README.md
@@ -398,7 +398,7 @@ const pokemonsEventStore = new EventStore({
> // => pokemonsReducer
> ```
>
-> - onEventPushed (?(pushEventResponse: PushEventResponse) => Promise\)
: The callback to run after events are pushed
+> - onEventPushed (?(pushEventResponse: PushEventResponse) => Promise\)
: Callback to run after events are pushed
>
> ```ts
> const onEventPushed = pokemonsEventStore.onEventPushed;
@@ -522,11 +522,12 @@ const pokemonsEventStore = new EventStore({
> // => 'aggregate' and 'lastEvent' are always defined 🙌
> ```
>
-> - pushEvent ((eventDetail: EventDetail, opt?: OptionsObj = {}) => Promise\)
: Pushes a new event to the event store. The `timestamp` is optional (we keep it available as it can be useful in tests & migrations). If not provided, it is automatically set as `new Date().toISOString()`. Throws an `EventAlreadyExistsError` if an event already exists for the corresponding `aggregateId` and `version`.
+> - pushEvent ((eventDetail: EventDetail, opt?: OptionsObj = {}) => Promise\)
: Pushes a new event to the event store. The `timestamp` is optional (we keep it available as it can be useful in tests & migrations). If not provided, it is automatically set as `new Date().toISOString()`. Throws an `EventAlreadyExistsError` if an event already exists for the corresponding `aggregateId` and `version` (see section below on race conditions).
>
> `OptionsObj` contains the following properties:
>
> - prevAggregate (?Aggregate)
: The aggregate at the current version, i.e. before having pushed the event. Can be useful in some cases like when using the [`ConnectedEventStore` class](#--connectedeventstore)
+> - force (?boolean)
: To force push the event even if one already exists for the corresponding `aggregateId` and `version`. Any existing event will be overridden, so use with extra care, mainly in [data migrations](#--dam).
>
> `ResponseObj` contains the following properties:
>
@@ -971,9 +972,13 @@ await appMessageQueue.publishMessage({
>
> The following methods interact with the messaging solution of your application through a `MessageQueueAdapter`. They will throw an `UndefinedMessageChannelAdapterError` if you did not provide one.
>
-> - publishMessage ((message: NotificationMessage | StateCarryingMessage) => Promise\)
: Publish a `NotificationMessage` (for `NotificationMessageQueues`) or a `StateCarryingMessage` (for `StateCarryingMessageQueues`) to the message queue.
+> - publishMessage ((message: Message, opt?: OptionsObj = {}) => Promise\)
: Publish a `Message` (of the appropriate type) to the message queue.
>
-> - publishMessages ((messages: NotificationMessage[] | StateCarryingMessage[]) => Promise\)
: Publish several `NotificationMessage` (for `NotificationMessageQueues`) or several `StateCarryingMessage` (for `StateCarryingMessageQueues`) to the message queue.
+> `OptionsObj` contains the following properties:
+>
+> - replay (?boolean)
: Signals that the event is not happening in real-time, e.g. in maintenance or migration operations. This information can be used downstream to react appropriately (e.g. prevent sending notification emails). Check the implementation of you adapter for more details.
+>
+> - publishMessages (messages: Message[], opt?: OptionsObj) => Promise\)
: Publish several `Messages` (of the appropriate type) to the message queue. Options are similar to the `publishMessage` options.
>
> - getAggregateAndPublishMessage ((message: NotificationMessage) => Promise\)
: _(StateCarryingMessageQueues only)_ Append the matching aggregate (with correct version) to a `NotificationMessage` and turn it into a `StateCarryingMessage` before publishing it to the message queue. Uses the message queue event stores: Make sure that they have correct adapters set up.
>
@@ -1103,9 +1108,13 @@ await appMessageBus.publishMessage({
>
> The following methods interact with the messaging solution of your application through a `MessageBusAdapter`. They will throw an `UndefinedMessageChannelAdapterError` if you did not provide one.
>
-> - publishMessage ((message: NotificationMessage | StateCarryingMessage) => Promise\)
: Publish a `NotificationMessage` (for `NotificationMessageBuses`) or a `StateCarryingMessage` (for `StateCarryingMessageBuses`) to the message bus.
+> - publishMessage ((message: Message, opt?: OptionsObj = {}) => Promise\)
: Publish a `Message` (of the appropriate type) to the message bus.
+>
+> `OptionsObj` contains the following properties:
+>
+> - replay (?boolean)
: Signals that the event is not happening in real-time, e.g. in maintenance or migration operations. This information can be used downstream to react appropriately (e.g. prevent sending notification emails). Check the implementation of you adapter for more details.
>
-> - publishMessages ((messages: NotificationMessage[] | StateCarryingMessage[]) => Promise\)
: Publish several `NotificationMessage` (for `NotificationMessageBuses`) or several `StateCarryingMessage` (for `StateCarryingMessageBuses`) to the message bus.
+> - publishMessages (messages: Message[], opt?: OptionsObj) => Promise\)
: Publish several `Messages` (of the appropriate type) to the message bus. Options are similar to the `publishMessage` options.
>
> - getAggregateAndPublishMessage ((message: NotificationMessage) => Promise\)
: _(StateCarryingMessageBuses only)_ Append the matching aggregate (with correct version) to a `NotificationMessage` and turn it into a `StateCarryingMessage` before publishing it to the message bus. Uses the message bus event stores: Make sure that they have correct adapters set up.
>
diff --git a/packages/core/src/eventStore/eventStore.ts b/packages/core/src/eventStore/eventStore.ts
index a2bb5c20..a580ceaf 100644
--- a/packages/core/src/eventStore/eventStore.ts
+++ b/packages/core/src/eventStore/eventStore.ts
@@ -176,11 +176,15 @@ export class EventStore<
*/
) as Promise<{ events: EVENT_DETAILS[] }>;
- this.pushEvent = async (eventDetail, { prevAggregate } = {}) => {
+ this.pushEvent = async (
+ eventDetail,
+ { prevAggregate, force = false } = {},
+ ) => {
const storageAdapter = this.getStorageAdapter();
const { event } = (await storageAdapter.pushEvent(eventDetail, {
eventStoreId: this.eventStoreId,
+ force,
})) as { event: $EVENT_DETAILS };
let nextAggregate: AGGREGATE | undefined = undefined;
diff --git a/packages/core/src/eventStore/eventStore.type.test.ts b/packages/core/src/eventStore/eventStore.type.test.ts
index f0f8125b..770ef642 100644
--- a/packages/core/src/eventStore/eventStore.type.test.ts
+++ b/packages/core/src/eventStore/eventStore.type.test.ts
@@ -108,7 +108,7 @@ assertPushEventInput1;
const assertPushEventInput2: A.Equals<
Parameters[1],
- { prevAggregate?: PokemonAggregate | undefined } | undefined
+ { prevAggregate?: PokemonAggregate | undefined; force?: boolean } | undefined
> = 1;
assertPushEventInput2;
diff --git a/packages/core/src/eventStore/eventStore.unit.test.ts b/packages/core/src/eventStore/eventStore.unit.test.ts
index e97dab33..9efe0d07 100644
--- a/packages/core/src/eventStore/eventStore.unit.test.ts
+++ b/packages/core/src/eventStore/eventStore.unit.test.ts
@@ -148,6 +148,7 @@ describe('event store', () => {
expect(pushEventMock).toHaveBeenCalledTimes(1);
expect(pushEventMock).toHaveBeenCalledWith(pikachuLeveledUpEvent, {
eventStoreId: pokemonsEventStore.eventStoreId,
+ force: false,
});
expect(response).toStrictEqual({ event: pikachuLeveledUpEvent });
});
diff --git a/packages/core/src/eventStore/types.ts b/packages/core/src/eventStore/types.ts
index 3c2c022a..90bdbb7a 100644
--- a/packages/core/src/eventStore/types.ts
+++ b/packages/core/src/eventStore/types.ts
@@ -37,7 +37,7 @@ export type EventPusher<
event: $EVENT_DETAILS extends EventDetail
? OptionalTimestamp<$EVENT_DETAILS>
: $EVENT_DETAILS,
- options?: { prevAggregate?: $AGGREGATE },
+ options?: { prevAggregate?: $AGGREGATE; force?: boolean },
) => Promise<{ event: EVENT_DETAILS; nextAggregate?: AGGREGATE }>;
export type AggregateIdsLister = (
@@ -62,6 +62,9 @@ export type EventGroupPusher = <
...GroupedEvent[],
],
>(
+ /**
+ * @debt v2 "use an array and enable options in 2nd arg (useful for 'force' opt for instance)"
+ */
...groupedEvents: GROUPED_EVENTS
) => Promise<{ eventGroup: EventGroupPusherResponse }>;
diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts
index 1fac63bd..04026d94 100644
--- a/packages/core/src/index.ts
+++ b/packages/core/src/index.ts
@@ -6,6 +6,7 @@ export type { EventDetail, OptionalTimestamp } from './event/eventDetail';
export type { StorageAdapter } from './storageAdapter';
export type {
EventsQueryOptions,
+ PushEventOptions,
EventStoreContext,
ListAggregateIdsOptions,
ListAggregateIdsOutput,
@@ -65,6 +66,7 @@ export type {
NotificationMessage,
StateCarryingMessage,
Message,
+ PublishMessageOptions,
EventStoreAggregateExistsMessage,
EventStoreNotificationMessage,
EventStoreStateCarryingMessage,
diff --git a/packages/core/src/messaging/channel/aggregateExistsMessageChannel.ts b/packages/core/src/messaging/channel/aggregateExistsMessageChannel.ts
index 9d32c675..f0c38114 100644
--- a/packages/core/src/messaging/channel/aggregateExistsMessageChannel.ts
+++ b/packages/core/src/messaging/channel/aggregateExistsMessageChannel.ts
@@ -7,6 +7,7 @@ import {
UndefinedMessageChannelAdapterError,
} from './errors';
import type { MessageChannelAdapter } from './messageChannelAdapter';
+import type { PublishMessageOptions } from './types';
export class AggregateExistsMessageChannel<
EVENT_STORE extends EventStore = EventStore,
@@ -28,6 +29,7 @@ export class AggregateExistsMessageChannel<
EventStore,
EventStoreAggregateExistsMessage
>,
+ options?: PublishMessageOptions,
) => Promise;
publishMessages: (
aggregateExistsMessages: $Contravariant<
@@ -35,6 +37,7 @@ export class AggregateExistsMessageChannel<
EventStore,
EventStoreAggregateExistsMessage
>[],
+ options?: PublishMessageOptions,
) => Promise;
constructor({
@@ -87,23 +90,33 @@ export class AggregateExistsMessageChannel<
return eventStore;
};
- this.publishMessage = async aggregateExistsMessage => {
+ this.publishMessage = async (
+ aggregateExistsMessage,
+ { replay = false } = {},
+ ) => {
const { eventStoreId } = aggregateExistsMessage;
this.getEventStore(eventStoreId);
const messageChannelAdapter = this.getMessageChannelAdapter();
- await messageChannelAdapter.publishMessage(aggregateExistsMessage);
+ await messageChannelAdapter.publishMessage(aggregateExistsMessage, {
+ replay,
+ });
};
- this.publishMessages = async aggregateExistsMessages => {
+ this.publishMessages = async (
+ aggregateExistsMessages,
+ { replay = false } = {},
+ ) => {
for (const aggregateExistsMessage of aggregateExistsMessages) {
const { eventStoreId } = aggregateExistsMessage;
this.getEventStore(eventStoreId);
}
const messageChannelAdapter = this.getMessageChannelAdapter();
- await messageChannelAdapter.publishMessages(aggregateExistsMessages);
+ await messageChannelAdapter.publishMessages(aggregateExistsMessages, {
+ replay,
+ });
};
}
}
diff --git a/packages/core/src/messaging/channel/index.ts b/packages/core/src/messaging/channel/index.ts
index aaff3bc0..e8e181a8 100644
--- a/packages/core/src/messaging/channel/index.ts
+++ b/packages/core/src/messaging/channel/index.ts
@@ -5,6 +5,7 @@ export type {
MessageChannelSourceEventStoreIds,
MessageChannelSourceEventStoreIdTypes,
} from './generics';
+export type { PublishMessageOptions } from './types';
export type { MessageChannelAdapter } from './messageChannelAdapter';
export { AggregateExistsMessageChannel } from './aggregateExistsMessageChannel';
export { NotificationMessageChannel } from './notificationMessageChannel';
diff --git a/packages/core/src/messaging/channel/messageChannelAdapter.ts b/packages/core/src/messaging/channel/messageChannelAdapter.ts
index e308f608..4ce9064f 100644
--- a/packages/core/src/messaging/channel/messageChannelAdapter.ts
+++ b/packages/core/src/messaging/channel/messageChannelAdapter.ts
@@ -1,6 +1,13 @@
import type { Message } from '../message';
+import type { PublishMessageOptions } from './types';
export interface MessageChannelAdapter {
- publishMessage: (message: Message) => Promise;
- publishMessages: (messages: Message[]) => Promise;
+ publishMessage: (
+ message: Message,
+ options?: PublishMessageOptions,
+ ) => Promise;
+ publishMessages: (
+ messages: Message[],
+ options?: PublishMessageOptions,
+ ) => Promise;
}
diff --git a/packages/core/src/messaging/channel/notificationMessageChannel.ts b/packages/core/src/messaging/channel/notificationMessageChannel.ts
index a7c225c8..9989389f 100644
--- a/packages/core/src/messaging/channel/notificationMessageChannel.ts
+++ b/packages/core/src/messaging/channel/notificationMessageChannel.ts
@@ -7,6 +7,7 @@ import {
UndefinedMessageChannelAdapterError,
} from './errors';
import type { MessageChannelAdapter } from './messageChannelAdapter';
+import type { PublishMessageOptions } from './types';
export class NotificationMessageChannel<
EVENT_STORE extends EventStore = EventStore,
@@ -28,6 +29,7 @@ export class NotificationMessageChannel<
EventStore,
EventStoreNotificationMessage
>,
+ options?: PublishMessageOptions,
) => Promise;
publishMessages: (
notificationMessages: $Contravariant<
@@ -35,6 +37,7 @@ export class NotificationMessageChannel<
EventStore,
EventStoreNotificationMessage
>[],
+ options?: PublishMessageOptions,
) => Promise;
constructor({
@@ -87,23 +90,33 @@ export class NotificationMessageChannel<
return eventStore;
};
- this.publishMessage = async notificationMessage => {
+ this.publishMessage = async (
+ notificationMessage,
+ { replay = false } = {},
+ ) => {
const { eventStoreId } = notificationMessage;
this.getEventStore(eventStoreId);
const messageChannelAdapter = this.getMessageChannelAdapter();
- await messageChannelAdapter.publishMessage(notificationMessage);
+ await messageChannelAdapter.publishMessage(notificationMessage, {
+ replay,
+ });
};
- this.publishMessages = async notificationMessages => {
+ this.publishMessages = async (
+ notificationMessages,
+ { replay = false } = {},
+ ) => {
for (const notificationMessage of notificationMessages) {
const { eventStoreId } = notificationMessage;
this.getEventStore(eventStoreId);
}
const messageChannelAdapter = this.getMessageChannelAdapter();
- await messageChannelAdapter.publishMessages(notificationMessages);
+ await messageChannelAdapter.publishMessages(notificationMessages, {
+ replay,
+ });
};
}
}
diff --git a/packages/core/src/messaging/channel/stateCarryingMessageChannel.ts b/packages/core/src/messaging/channel/stateCarryingMessageChannel.ts
index 504caa06..b769d513 100644
--- a/packages/core/src/messaging/channel/stateCarryingMessageChannel.ts
+++ b/packages/core/src/messaging/channel/stateCarryingMessageChannel.ts
@@ -10,6 +10,7 @@ import {
UndefinedMessageChannelAdapterError,
} from './errors';
import type { MessageChannelAdapter } from './messageChannelAdapter';
+import type { PublishMessageOptions } from './types';
export class StateCarryingMessageChannel<
EVENT_STORE extends EventStore = EventStore,
@@ -31,6 +32,7 @@ export class StateCarryingMessageChannel<
EventStore,
EventStoreStateCarryingMessage
>,
+ options?: PublishMessageOptions,
) => Promise;
getAggregateAndPublishMessage: (
notificationMessage: $Contravariant<
@@ -38,6 +40,7 @@ export class StateCarryingMessageChannel<
EventStore,
EventStoreNotificationMessage
>,
+ options?: PublishMessageOptions,
) => Promise;
publishMessages: (
@@ -46,6 +49,7 @@ export class StateCarryingMessageChannel<
EventStore,
EventStoreStateCarryingMessage
>[],
+ options?: PublishMessageOptions,
) => Promise;
constructor({
@@ -98,16 +102,24 @@ export class StateCarryingMessageChannel<
return eventStore;
};
- this.publishMessage = async stateCarryingMessage => {
+ this.publishMessage = async (
+ stateCarryingMessage,
+ { replay = false } = {},
+ ) => {
const { eventStoreId } = stateCarryingMessage;
this.getEventStore(eventStoreId);
const messageChannelAdapter = this.getMessageChannelAdapter();
- await messageChannelAdapter.publishMessage(stateCarryingMessage);
+ await messageChannelAdapter.publishMessage(stateCarryingMessage, {
+ replay,
+ });
};
- this.getAggregateAndPublishMessage = async notificationMessage => {
+ this.getAggregateAndPublishMessage = async (
+ notificationMessage,
+ { replay = false } = {},
+ ) => {
const { eventStoreId, event } = notificationMessage;
const { aggregateId, version } = event;
@@ -117,10 +129,16 @@ export class StateCarryingMessageChannel<
maxVersion: version,
});
- await this.publishMessage({ ...notificationMessage, aggregate });
+ await this.publishMessage(
+ { ...notificationMessage, aggregate },
+ { replay },
+ );
};
- this.publishMessages = async stateCarryingMessages => {
+ this.publishMessages = async (
+ stateCarryingMessages,
+ { replay = false } = {},
+ ) => {
for (const stateCarryingMessage of stateCarryingMessages) {
const { eventStoreId } = stateCarryingMessage;
this.getEventStore(eventStoreId);
@@ -128,7 +146,9 @@ export class StateCarryingMessageChannel<
const messageChannelAdapter = this.getMessageChannelAdapter();
- await messageChannelAdapter.publishMessages(stateCarryingMessages);
+ await messageChannelAdapter.publishMessages(stateCarryingMessages, {
+ replay,
+ });
};
}
}
diff --git a/packages/core/src/messaging/channel/types.ts b/packages/core/src/messaging/channel/types.ts
new file mode 100644
index 00000000..cf103c81
--- /dev/null
+++ b/packages/core/src/messaging/channel/types.ts
@@ -0,0 +1,3 @@
+export type PublishMessageOptions = {
+ replay?: boolean;
+};
diff --git a/packages/core/src/messaging/index.ts b/packages/core/src/messaging/index.ts
index e987cbc5..6da8bc8a 100644
--- a/packages/core/src/messaging/index.ts
+++ b/packages/core/src/messaging/index.ts
@@ -1,4 +1,19 @@
-export * from './channel';
+export {
+ AggregateExistsMessageChannel,
+ NotificationMessageChannel,
+ StateCarryingMessageChannel,
+ MessageChannelEventStoreNotFoundError,
+ UndefinedMessageChannelAdapterError,
+} from './channel';
+export type {
+ EventStoreMessageChannel,
+ MessageChannelSourceEventStores,
+ MessageChannelMessage,
+ MessageChannelSourceEventStoreIds,
+ MessageChannelSourceEventStoreIdTypes,
+ PublishMessageOptions,
+ MessageChannelAdapter,
+} from './channel';
export * from './bus';
export * from './queue';
export type {
diff --git a/packages/core/src/storageAdapter.ts b/packages/core/src/storageAdapter.ts
index cf2a9de7..ba767944 100644
--- a/packages/core/src/storageAdapter.ts
+++ b/packages/core/src/storageAdapter.ts
@@ -11,6 +11,10 @@ export type EventsQueryOptions = {
export type EventStoreContext = { eventStoreId: string };
+export type PushEventOptions = EventStoreContext & {
+ force?: boolean;
+};
+
export type ListAggregateIdsOptions = {
limit?: number;
pageToken?: string;
@@ -46,7 +50,7 @@ export interface StorageAdapter {
) => Promise<{ events: EventDetail[] }>;
pushEvent: (
eventDetail: OptionalTimestamp,
- context: EventStoreContext,
+ options: PushEventOptions,
) => Promise<{ event: EventDetail }>;
pushEventGroup: (
...groupedEvents: [GroupedEvent, ...GroupedEvent[]]
diff --git a/packages/dam/README.md b/packages/dam/README.md
index 19842fba..421b6c9a 100644
--- a/packages/dam/README.md
+++ b/packages/dam/README.md
@@ -26,7 +26,7 @@ yarn add @castore/core
`@castore/dam` exposes a series of utils that scan past events and re-publish them in [message channels](https://github.com/castore-dev/castore#--event-driven-architecture) – or _"pour them"_ as in _"pouring water from a container to another"_ 🫗.
-Those utils are typically very useful for data maintenance and migration, and can be rate limited to limit impact on production traffic. They are the following:
+Those utils are typically very useful for data maintenance and migration. They publish messages with the `replay` option enabled and can be **rate limited** to limit impact on production traffic. They are the following:
- [`pourEventStoreAggregateIds`](#poureventstoreaggregateids): Pour all the aggregate ids of an event store in an `AggregateExistsMessageChannel`.
- [`pourAggregateEvents`](#pouraggregateevents): Pour all the events of a specific aggregate in a provided `NotificationMessageChannel`.
diff --git a/packages/dam/src/pourAggregateEvents/pourAggregateEvents.ts b/packages/dam/src/pourAggregateEvents/pourAggregateEvents.ts
index f8b25bec..fcc99911 100644
--- a/packages/dam/src/pourAggregateEvents/pourAggregateEvents.ts
+++ b/packages/dam/src/pourAggregateEvents/pourAggregateEvents.ts
@@ -3,6 +3,7 @@ import type {
EventStore,
EventStoreNotificationMessage,
EventsQueryOptions,
+ PublishMessageOptions,
} from '@castore/core';
import { getThrottle } from '~/utils/getThrottle';
@@ -13,6 +14,7 @@ interface Props {
messageChannel: {
publishMessage: (
message: EventStoreNotificationMessage,
+ options?: PublishMessageOptions,
) => Promise;
};
aggregateId: string;
@@ -44,10 +46,13 @@ export const pourAggregateEvents = async ({
for (const eventToPour of eventsToPour) {
await throttle(() =>
- messageChannel.publishMessage({
- eventStoreId: eventStore.eventStoreId,
- event: eventToPour,
- } as EventStoreNotificationMessage),
+ messageChannel.publishMessage(
+ {
+ eventStoreId: eventStore.eventStoreId,
+ event: eventToPour,
+ } as EventStoreNotificationMessage,
+ { replay: true },
+ ),
);
}
diff --git a/packages/dam/src/pourAggregateEvents/pourAggregateEvents.unit.test.ts b/packages/dam/src/pourAggregateEvents/pourAggregateEvents.unit.test.ts
index 2c6c5a84..5d5b5252 100644
--- a/packages/dam/src/pourAggregateEvents/pourAggregateEvents.unit.test.ts
+++ b/packages/dam/src/pourAggregateEvents/pourAggregateEvents.unit.test.ts
@@ -4,7 +4,10 @@ import {
EventStoreId,
EventsQueryOptions,
} from '@castore/core';
-import { InMemoryMessageQueueAdapter } from '@castore/in-memory-message-queue-adapter';
+import {
+ InMemoryMessageQueueAdapter,
+ TaskContext,
+} from '@castore/in-memory-message-queue-adapter';
import {
pokemonEventStore,
@@ -22,12 +25,13 @@ const messageQueue = new NotificationMessageQueue({
let receivedMessages: {
date: Date;
message: NotificationMessage>;
+ context: TaskContext;
}[] = [];
InMemoryMessageQueueAdapter.attachTo(messageQueue, {
- worker: message =>
+ worker: (message, context) =>
new Promise(resolve => {
- receivedMessages.push({ date: new Date(), message });
+ receivedMessages.push({ date: new Date(), message, context });
resolve();
}),
});
@@ -56,6 +60,7 @@ describe('pourAggregateEvents', () => {
eventStoreId: pokemonEvtStoreId,
event: pikachuEvents[0],
});
+ expect(receivedMessages[0]?.context).toMatchObject({ replay: true });
expect(receivedMessages[1]?.message).toStrictEqual({
eventStoreId: pokemonEvtStoreId,
event: pikachuEvents[1],
diff --git a/packages/dam/src/pourEventStoreAggregateIds/pourEventStoreAggregateIds.ts b/packages/dam/src/pourEventStoreAggregateIds/pourEventStoreAggregateIds.ts
index 91e79395..c4e9df74 100644
--- a/packages/dam/src/pourEventStoreAggregateIds/pourEventStoreAggregateIds.ts
+++ b/packages/dam/src/pourEventStoreAggregateIds/pourEventStoreAggregateIds.ts
@@ -3,6 +3,7 @@ import type {
EventStore,
EventStoreId,
ListAggregateIdsOptions,
+ PublishMessageOptions,
} from '@castore/core';
import type { ScanInfos } from '~/types';
@@ -14,6 +15,7 @@ interface Props {
messageChannel: {
publishMessage: (
messages: AggregateExistsMessage>,
+ options?: PublishMessageOptions,
) => Promise;
};
options?: Omit;
@@ -53,7 +55,10 @@ export const pourEventStoreAggregateIds = async <
for (const aggregateId of aggregateIds) {
await throttle(() =>
- messageChannel.publishMessage({ eventStoreId, aggregateId }),
+ messageChannel.publishMessage(
+ { eventStoreId, aggregateId },
+ { replay: true },
+ ),
);
}
diff --git a/packages/dam/src/pourEventStoreAggregateIds/pourEventStoreAggregateIds.unit.test.ts b/packages/dam/src/pourEventStoreAggregateIds/pourEventStoreAggregateIds.unit.test.ts
index bede578a..4df963c1 100644
--- a/packages/dam/src/pourEventStoreAggregateIds/pourEventStoreAggregateIds.unit.test.ts
+++ b/packages/dam/src/pourEventStoreAggregateIds/pourEventStoreAggregateIds.unit.test.ts
@@ -4,7 +4,10 @@ import {
EventStoreId,
ListAggregateIdsOptions,
} from '@castore/core';
-import { InMemoryMessageQueueAdapter } from '@castore/in-memory-message-queue-adapter';
+import {
+ InMemoryMessageQueueAdapter,
+ TaskContext,
+} from '@castore/in-memory-message-queue-adapter';
import {
pokemonEventStore,
@@ -23,12 +26,13 @@ const messageQueue = new AggregateExistsMessageQueue({
let receivedMessages: {
date: Date;
message: AggregateExistsMessage>;
+ context: TaskContext;
}[] = [];
InMemoryMessageQueueAdapter.attachTo(messageQueue, {
- worker: message =>
+ worker: (message, context) =>
new Promise(resolve => {
- receivedMessages.push({ date: new Date(), message });
+ receivedMessages.push({ date: new Date(), message, context });
resolve();
}),
});
@@ -58,6 +62,9 @@ describe('pourEventStoreAggregateIds', () => {
eventStoreId: pokemonEvtStoreId,
aggregateId: pikachuId,
});
+ expect(receivedMessages[0]?.context).toMatchObject({
+ replay: true,
+ });
expect(receivedMessages[1]?.message).toStrictEqual({
eventStoreId: pokemonEvtStoreId,
aggregateId: charizardId,
diff --git a/packages/dam/src/pourEventStoreCollectionEvents/pourEventStoreCollectionEvents.ts b/packages/dam/src/pourEventStoreCollectionEvents/pourEventStoreCollectionEvents.ts
index 8b876353..0bb6822c 100644
--- a/packages/dam/src/pourEventStoreCollectionEvents/pourEventStoreCollectionEvents.ts
+++ b/packages/dam/src/pourEventStoreCollectionEvents/pourEventStoreCollectionEvents.ts
@@ -3,6 +3,7 @@ import type {
EventStore,
EventStoreId,
EventStoreNotificationMessage,
+ PublishMessageOptions,
} from '@castore/core';
import type { ScanInfos } from '~/types';
@@ -16,6 +17,7 @@ interface Props {
messageChannel: {
publishMessage: (
message: EventStoreNotificationMessage,
+ options?: PublishMessageOptions,
) => Promise;
};
filters?: { from?: string; to?: string };
@@ -146,7 +148,7 @@ export const pourEventStoreCollectionEvents = async <
messagesToPour.filterByTimestamp({ from, to });
messagesToPour.sortByTimestamp();
- await messagePourer.pourMessageBatch(messagesToPour);
+ await messagePourer.pourMessageBatch(messagesToPour, { replay: true });
} while (!areAllCollectionAggregatesScanned);
return {
diff --git a/packages/dam/src/pourEventStoreCollectionEvents/pourEventStoreCollectionEvents.unit.test.ts b/packages/dam/src/pourEventStoreCollectionEvents/pourEventStoreCollectionEvents.unit.test.ts
index fe300981..ff1b42ea 100644
--- a/packages/dam/src/pourEventStoreCollectionEvents/pourEventStoreCollectionEvents.unit.test.ts
+++ b/packages/dam/src/pourEventStoreCollectionEvents/pourEventStoreCollectionEvents.unit.test.ts
@@ -3,7 +3,10 @@ import {
NotificationMessageQueue,
EventStoreId,
} from '@castore/core';
-import { InMemoryMessageQueueAdapter } from '@castore/in-memory-message-queue-adapter';
+import {
+ InMemoryMessageQueueAdapter,
+ TaskContext,
+} from '@castore/in-memory-message-queue-adapter';
import {
pokemonEventStore,
@@ -33,12 +36,13 @@ let receivedMessages: {
message: NotificationMessage<
EventStoreId
>;
+ context: TaskContext;
}[] = [];
InMemoryMessageQueueAdapter.attachTo(messageQueue, {
- worker: message =>
+ worker: (message, context) =>
new Promise(resolve => {
- receivedMessages.push({ date: new Date(), message });
+ receivedMessages.push({ date: new Date(), message, context });
resolve();
}),
});
@@ -77,6 +81,10 @@ describe('pourEventStoreEvents', () => {
// 2020-12-01T00:00:00.000Z
event: ashKetchumEvents[0],
});
+ expect(receivedMessages[0]?.context).toMatchObject({
+ replay: true,
+ });
+
expect(receivedMessages[1]?.message).toStrictEqual({
eventStoreId: pokemonEvtStoreId,
// 2021-01-01T00:00:00.000Z
diff --git a/packages/dam/src/pourEventStoreEvents/pourEventStoreEvents.ts b/packages/dam/src/pourEventStoreEvents/pourEventStoreEvents.ts
index 47226e4a..b0d3fdbd 100644
--- a/packages/dam/src/pourEventStoreEvents/pourEventStoreEvents.ts
+++ b/packages/dam/src/pourEventStoreEvents/pourEventStoreEvents.ts
@@ -1,5 +1,9 @@
/* eslint-disable @typescript-eslint/no-non-null-assertion */
-import type { EventStore, EventStoreNotificationMessage } from '@castore/core';
+import type {
+ EventStore,
+ EventStoreNotificationMessage,
+ PublishMessageOptions,
+} from '@castore/core';
import type { ScanInfos } from '~/types';
import { EventBook } from '~/utils/eventBook';
@@ -11,6 +15,7 @@ interface Props {
messageChannel: {
publishMessage: (
message: EventStoreNotificationMessage,
+ options?: PublishMessageOptions,
) => Promise;
};
filters?: { from?: string; to?: string };
@@ -76,7 +81,7 @@ export const pourEventStoreEvents = async ({
messagesToPour.filterByTimestamp({ from, to });
messagesToPour.sortByTimestamp();
- await messagePourer.pourMessageBatch(messagesToPour);
+ await messagePourer.pourMessageBatch(messagesToPour, { replay: true });
pageToken = nextPageToken;
} while (!areAllAggregatesScanned);
diff --git a/packages/dam/src/pourEventStoreEvents/pourEventStoreEvents.unit.test.ts b/packages/dam/src/pourEventStoreEvents/pourEventStoreEvents.unit.test.ts
index be5467c0..5004fe90 100644
--- a/packages/dam/src/pourEventStoreEvents/pourEventStoreEvents.unit.test.ts
+++ b/packages/dam/src/pourEventStoreEvents/pourEventStoreEvents.unit.test.ts
@@ -3,7 +3,10 @@ import {
NotificationMessageQueue,
EventStoreId,
} from '@castore/core';
-import { InMemoryMessageQueueAdapter } from '@castore/in-memory-message-queue-adapter';
+import {
+ InMemoryMessageQueueAdapter,
+ TaskContext,
+} from '@castore/in-memory-message-queue-adapter';
import {
pokemonEventStore,
@@ -24,12 +27,13 @@ const messageQueue = new NotificationMessageQueue({
let receivedMessages: {
date: Date;
message: NotificationMessage>;
+ context: TaskContext;
}[] = [];
InMemoryMessageQueueAdapter.attachTo(messageQueue, {
- worker: message =>
+ worker: (message, context) =>
new Promise(resolve => {
- receivedMessages.push({ date: new Date(), message });
+ receivedMessages.push({ date: new Date(), message, context });
resolve();
}),
});
@@ -56,6 +60,10 @@ describe('pourEventStoreEvents', () => {
eventStoreId: pokemonEvtStoreId,
event: pikachuEvents[0],
});
+ expect(receivedMessages[0]?.context).toMatchObject({
+ replay: true,
+ });
+
expect(receivedMessages[1]?.message).toStrictEqual({
eventStoreId: pokemonEvtStoreId,
event: pikachuEvents[1],
diff --git a/packages/dam/src/utils/messagePourer.ts b/packages/dam/src/utils/messagePourer.ts
index d3f2cabf..0834fa04 100644
--- a/packages/dam/src/utils/messagePourer.ts
+++ b/packages/dam/src/utils/messagePourer.ts
@@ -1,4 +1,8 @@
-import type { EventStore, EventStoreNotificationMessage } from '@castore/core';
+import {
+ EventStore,
+ EventStoreNotificationMessage,
+ PublishMessageOptions,
+} from '@castore/core';
import { getThrottle } from '~/utils/getThrottle';
@@ -8,6 +12,7 @@ export class MessagePourer {
messageChannel: {
publishMessage: (
message: EventStoreNotificationMessage,
+ options?: PublishMessageOptions,
) => Promise;
};
pouredEventCount: number;
@@ -19,6 +24,7 @@ export class MessagePourer {
messageChannel: {
publishMessage: (
message: EventStoreNotificationMessage,
+ options?: PublishMessageOptions,
) => Promise;
},
rateLimit = Infinity,
@@ -28,11 +34,14 @@ export class MessagePourer {
this.throttle = getThrottle(rateLimit);
}
- pourMessageBatch = async ({
- messages,
- }: MessageBatch): Promise => {
+ pourMessageBatch = async (
+ { messages }: MessageBatch,
+ options: PublishMessageOptions = {},
+ ): Promise => {
for (const message of messages) {
- await this.throttle(() => this.messageChannel.publishMessage(message));
+ await this.throttle(() =>
+ this.messageChannel.publishMessage(message, options),
+ );
this.pouredEventCount += 1;
}
diff --git a/packages/dynamodb-event-storage-adapter/src/adapter.ts b/packages/dynamodb-event-storage-adapter/src/adapter.ts
index 8f6db22e..5f2b164d 100644
--- a/packages/dynamodb-event-storage-adapter/src/adapter.ts
+++ b/packages/dynamodb-event-storage-adapter/src/adapter.ts
@@ -10,12 +10,13 @@ import {
} from '@aws-sdk/client-dynamodb';
import { marshall, unmarshall } from '@aws-sdk/util-dynamodb';
-import {
+import type {
Aggregate,
EventDetail,
- GroupedEvent,
+ PushEventOptions,
StorageAdapter,
} from '@castore/core';
+import { GroupedEvent } from '@castore/core';
import {
EVENT_TABLE_INITIAL_EVENT_INDEX_NAME,
@@ -122,7 +123,10 @@ const parseGroupedEvents = (
*/
export class DynamoDbEventStorageAdapter implements StorageAdapter {
getEvents: StorageAdapter['getEvents'];
- getPushEventInput: (eventDetail: EventDetail) => PutItemCommandInput;
+ getPushEventInput: (
+ eventDetail: EventDetail,
+ options: PushEventOptions,
+ ) => PutItemCommandInput;
pushEvent: StorageAdapter['pushEvent'];
pushEventGroup: StorageAdapter['pushEventGroup'];
groupEvent: StorageAdapter['groupEvent'];
@@ -224,9 +228,10 @@ export class DynamoDbEventStorageAdapter implements StorageAdapter {
};
};
- this.getPushEventInput = event => {
+ this.getPushEventInput = (event, options) => {
const { aggregateId, version, type, timestamp, payload, metadata } =
event;
+ const force = options.force ?? false;
return {
TableName: this.getTableName(),
@@ -242,17 +247,23 @@ export class DynamoDbEventStorageAdapter implements StorageAdapter {
},
MARSHALL_OPTIONS,
),
- ExpressionAttributeNames: { '#version': EVENT_TABLE_SK },
- ConditionExpression: 'attribute_not_exists(#version)',
+ ...(force
+ ? {}
+ : {
+ ExpressionAttributeNames: { '#version': EVENT_TABLE_SK },
+ ConditionExpression: 'attribute_not_exists(#version)',
+ }),
};
};
- this.pushEvent = async (eventWithOptTimestamp, context) => {
+ this.pushEvent = async (eventWithOptTimestamp, options) => {
const event = {
timestamp: new Date().toISOString(),
...eventWithOptTimestamp,
};
- const putEventCommand = new PutItemCommand(this.getPushEventInput(event));
+ const putEventCommand = new PutItemCommand(
+ this.getPushEventInput(event, options),
+ );
const { aggregateId, version } = event;
@@ -263,7 +274,7 @@ export class DynamoDbEventStorageAdapter implements StorageAdapter {
error instanceof Error &&
isConditionalCheckFailedException(error)
) {
- const { eventStoreId } = context;
+ const { eventStoreId } = options;
throw new DynamoDBEventAlreadyExistsError({
eventStoreId,
@@ -293,10 +304,10 @@ export class DynamoDbEventStorageAdapter implements StorageAdapter {
await dynamodbClient.send(
new TransactWriteItemsCommand({
TransactItems: groupedEvents.map(groupedEvent => ({
- Put: groupedEvent.eventStorageAdapter.getPushEventInput({
- timestamp,
- ...groupedEvent.event,
- }),
+ Put: groupedEvent.eventStorageAdapter.getPushEventInput(
+ { timestamp, ...groupedEvent.event },
+ { eventStoreId: groupedEvent.context.eventStoreId },
+ ),
})),
}),
);
diff --git a/packages/dynamodb-event-storage-adapter/src/adapter.unit.test.ts b/packages/dynamodb-event-storage-adapter/src/adapter.unit.test.ts
index 061f1f0c..5367c147 100644
--- a/packages/dynamodb-event-storage-adapter/src/adapter.unit.test.ts
+++ b/packages/dynamodb-event-storage-adapter/src/adapter.unit.test.ts
@@ -98,6 +98,17 @@ describe('DynamoDbEventStorageAdapter', () => {
MockDate.reset();
});
+
+ it('does not add condition if force option is set to true', async () => {
+ await adapter.pushEvent(secondEvent, { eventStoreId, force: true });
+
+ // regularly check if vitest matchers are available (toHaveReceivedCommandWith)
+ // https://github.com/m-radzikowski/aws-sdk-client-mock/issues/139
+ expect(dynamoDbClientMock.calls()).toHaveLength(1);
+ const input = dynamoDbClientMock.call(0).args[0].input;
+ expect(input).not.toHaveProperty('ConditionExpression');
+ expect(input).not.toHaveProperty('ExpressionAttributeNames');
+ });
});
describe('table name getter', () => {
diff --git a/packages/dynamodb-event-storage-adapter/src/singleTableAdapter.ts b/packages/dynamodb-event-storage-adapter/src/singleTableAdapter.ts
index cae6dcd8..750309ee 100644
--- a/packages/dynamodb-event-storage-adapter/src/singleTableAdapter.ts
+++ b/packages/dynamodb-event-storage-adapter/src/singleTableAdapter.ts
@@ -10,13 +10,13 @@ import {
} from '@aws-sdk/client-dynamodb';
import { marshall, unmarshall } from '@aws-sdk/util-dynamodb';
-import {
+import type {
Aggregate,
EventDetail,
- GroupedEvent,
StorageAdapter,
- EventStoreContext,
+ PushEventOptions,
} from '@castore/core';
+import { GroupedEvent } from '@castore/core';
import {
EVENT_TABLE_EVENT_STORE_ID_KEY,
@@ -134,7 +134,7 @@ export class DynamoDbSingleTableEventStorageAdapter implements StorageAdapter {
getEvents: StorageAdapter['getEvents'];
getPushEventInput: (
eventDetail: EventDetail,
- context: EventStoreContext,
+ options: PushEventOptions,
) => PutItemCommandInput;
pushEvent: StorageAdapter['pushEvent'];
pushEventGroup: StorageAdapter['pushEventGroup'];
@@ -237,10 +237,10 @@ export class DynamoDbSingleTableEventStorageAdapter implements StorageAdapter {
};
};
- this.getPushEventInput = (event, context) => {
+ this.getPushEventInput = (event, options) => {
const { aggregateId, version, type, timestamp, payload, metadata } =
event;
- const { eventStoreId } = context;
+ const { eventStoreId, force = false } = options;
return {
TableName: this.getTableName(),
@@ -256,18 +256,23 @@ export class DynamoDbSingleTableEventStorageAdapter implements StorageAdapter {
},
MARSHALL_OPTIONS,
),
- ExpressionAttributeNames: { '#version': EVENT_TABLE_SK },
- ConditionExpression: 'attribute_not_exists(#version)',
+ ...(force
+ ? {}
+ : {
+ ExpressionAttributeNames: { '#version': EVENT_TABLE_SK },
+ ConditionExpression: 'attribute_not_exists(#version)',
+ }),
};
};
- this.pushEvent = async (eventWithOptTimestamp, context) => {
+ this.pushEvent = async (eventWithOptTimestamp, options) => {
const event = {
timestamp: new Date().toISOString(),
...eventWithOptTimestamp,
};
+
const putEventCommand = new PutItemCommand(
- this.getPushEventInput(event, context),
+ this.getPushEventInput(event, options),
);
const { aggregateId, version } = event;
@@ -279,7 +284,7 @@ export class DynamoDbSingleTableEventStorageAdapter implements StorageAdapter {
error instanceof Error &&
isConditionalCheckFailedException(error)
) {
- const { eventStoreId } = context;
+ const { eventStoreId } = options;
throw new DynamoDBEventAlreadyExistsError({
eventStoreId,
diff --git a/packages/dynamodb-event-storage-adapter/src/singleTableAdapter.unit.test.ts b/packages/dynamodb-event-storage-adapter/src/singleTableAdapter.unit.test.ts
index aa6d2b0b..42601f7e 100644
--- a/packages/dynamodb-event-storage-adapter/src/singleTableAdapter.unit.test.ts
+++ b/packages/dynamodb-event-storage-adapter/src/singleTableAdapter.unit.test.ts
@@ -109,6 +109,17 @@ describe('DynamoDbEventStorageAdapter', () => {
MockDate.reset();
});
+
+ it('does not add condition if force option is set to true', async () => {
+ await adapter.pushEvent(secondEvent, { eventStoreId, force: true });
+
+ // regularly check if vitest matchers are available (toHaveReceivedCommandWith)
+ // https://github.com/m-radzikowski/aws-sdk-client-mock/issues/139
+ expect(dynamoDbClientMock.calls()).toHaveLength(1);
+ const input = dynamoDbClientMock.call(0).args[0].input;
+ expect(input).not.toHaveProperty('ConditionExpression');
+ expect(input).not.toHaveProperty('ExpressionAttributeNames');
+ });
});
describe('table name getter', () => {
diff --git a/packages/dynamodb-event-storage-adapter/src/utils/formatEventForTransaction.ts b/packages/dynamodb-event-storage-adapter/src/utils/formatEventForTransaction.ts
index 94e38997..1aa6e027 100644
--- a/packages/dynamodb-event-storage-adapter/src/utils/formatEventForTransaction.ts
+++ b/packages/dynamodb-event-storage-adapter/src/utils/formatEventForTransaction.ts
@@ -31,7 +31,7 @@ export const formatEventForTransaction = (
return {
transactItem: {
- Put: storageAdapter.getPushEventInput(eventDetail),
+ Put: storageAdapter.getPushEventInput(eventDetail, { eventStoreId }),
},
dynamoDbClient,
};
diff --git a/packages/dynamodb-event-storage-adapter/src/utils/formatEventForTransaction.unit.test.ts b/packages/dynamodb-event-storage-adapter/src/utils/formatEventForTransaction.unit.test.ts
index 570e81c6..8741130e 100644
--- a/packages/dynamodb-event-storage-adapter/src/utils/formatEventForTransaction.unit.test.ts
+++ b/packages/dynamodb-event-storage-adapter/src/utils/formatEventForTransaction.unit.test.ts
@@ -25,7 +25,9 @@ describe('formatEventForTransaction', () => {
).toStrictEqual({
dynamoDbClient: dynamoDbClientMock,
transactItem: {
- Put: storageAdapter.getPushEventInput(pikachuAppearedEvent),
+ Put: storageAdapter.getPushEventInput(pikachuAppearedEvent, {
+ eventStoreId: pokemonsEventStore.eventStoreId,
+ }),
},
});
});
diff --git a/packages/event-bridge-message-bus-adapter/README.md b/packages/event-bridge-message-bus-adapter/README.md
index 16d454d6..9e9b774b 100644
--- a/packages/event-bridge-message-bus-adapter/README.md
+++ b/packages/event-bridge-message-bus-adapter/README.md
@@ -103,6 +103,25 @@ When publishing a message, its `eventStoreId` is used as the message `source` an
}
```
+If the `replay` option is set to `true` when publishing a notification or state-carrying message, the `"detail-type"` will be set to `"__REPLAYED__"`. This makes sure that any subscription to replayed events is **opt-in**:
+
+```ts
+// 👇 Replayed notification message
+{
+ "source": "POKEMONS",
+ "detail-type": "__REPLAYED__",
+ "detail": {
+ "eventStoreId": "POKEMONS",
+ "event": {
+ "aggregateId": "123",
+ "type": "POKEMON_APPEARED", // <= event type still available
+ ...
+ },
+ },
+ ...
+}
+```
+
On the listeners side, you can use the `EventBridgeMessageBusMessage` TS type to type your argument:
```ts
diff --git a/packages/event-bridge-message-bus-adapter/src/adapter.ts b/packages/event-bridge-message-bus-adapter/src/adapter.ts
index 8cb2e584..0ca8280d 100644
--- a/packages/event-bridge-message-bus-adapter/src/adapter.ts
+++ b/packages/event-bridge-message-bus-adapter/src/adapter.ts
@@ -4,7 +4,11 @@ import {
} from '@aws-sdk/client-eventbridge';
import chunk from 'lodash.chunk';
-import type { Message, MessageChannelAdapter } from '@castore/core';
+import type {
+ Message,
+ MessageChannelAdapter,
+ PublishMessageOptions,
+} from '@castore/core';
import {
isAggregateExistsMessage,
isEventCarryingMessage,
@@ -12,9 +16,12 @@ import {
export const EVENTBRIDGE_MAX_ENTRIES_BATCH_SIZE = 10;
-const getDetailType = (message: Message): string => {
+const getDetailType = (
+ message: Message,
+ { replay = false }: PublishMessageOptions = {},
+): string => {
if (isEventCarryingMessage(message)) {
- return message.event.type;
+ return replay ? '__REPLAYED__' : message.event.type;
}
if (isAggregateExistsMessage(message)) {
@@ -46,7 +53,7 @@ export class EventBridgeMessageBusAdapter implements MessageChannelAdapter {
? this.eventBusName
: this.eventBusName();
- this.publishMessage = async message => {
+ this.publishMessage = async (message, options) => {
const { eventStoreId } = message;
await this.eventBridgeClient.send(
@@ -55,7 +62,7 @@ export class EventBridgeMessageBusAdapter implements MessageChannelAdapter {
{
EventBusName: this.getEventBusName(),
Source: eventStoreId,
- DetailType: getDetailType(message),
+ DetailType: getDetailType(message, options),
Detail: JSON.stringify(message),
},
],
@@ -63,7 +70,7 @@ export class EventBridgeMessageBusAdapter implements MessageChannelAdapter {
);
};
- this.publishMessages = async messages => {
+ this.publishMessages = async (messages, options) => {
for (const chunkMessages of chunk(
messages,
EVENTBRIDGE_MAX_ENTRIES_BATCH_SIZE,
@@ -73,7 +80,7 @@ export class EventBridgeMessageBusAdapter implements MessageChannelAdapter {
Entries: chunkMessages.map(message => ({
EventBusName: this.getEventBusName(),
Source: message.eventStoreId,
- DetailType: getDetailType(message),
+ DetailType: getDetailType(message, options),
Detail: JSON.stringify(message),
})),
}),
diff --git a/packages/event-bridge-message-bus-adapter/src/adapter.unit.test.ts b/packages/event-bridge-message-bus-adapter/src/adapter.unit.test.ts
index e40508d7..b4cb3e54 100644
--- a/packages/event-bridge-message-bus-adapter/src/adapter.unit.test.ts
+++ b/packages/event-bridge-message-bus-adapter/src/adapter.unit.test.ts
@@ -5,7 +5,7 @@ import {
import { mockClient } from 'aws-sdk-client-mock';
import type { A } from 'ts-toolbelt';
-import type { Message } from '@castore/core';
+import type { Message, PublishMessageOptions } from '@castore/core';
import {
EventBridgeMessageBusAdapter,
@@ -14,142 +14,174 @@ import {
const eventBridgeClientMock = mockClient(EventBridgeClient);
-describe('EventBridgeMessageBusAdapter', () => {
- const eventBusNameMock = 'my-event-bus';
-
- const eventStoreIdMock = 'my-event-store';
-
- const eventMock = {
- aggregateId: 'my-aggregate-id',
- version: 1,
- type: 'my-event-type',
- timestamp: new Date().toISOString(),
- };
-
- const otherEventMock = {
- aggregateId: 'my-aggregate-id',
- version: 2,
- type: 'my-event-type-2',
- timestamp: new Date().toISOString(),
- };
-
- const messageMock = {
- eventStoreId: eventStoreIdMock,
- event: eventMock,
- };
-
- const otherMessageMock = {
- eventStoreId: eventStoreIdMock,
- event: otherEventMock,
- };
+const eventBusNameMock = 'my-event-bus';
+
+const adapter = new EventBridgeMessageBusAdapter({
+ eventBusName: eventBusNameMock,
+ eventBridgeClient: eventBridgeClientMock as unknown as EventBridgeClient,
+});
+
+const eventStoreIdMock = 'my-event-store';
+
+const eventMock = {
+ aggregateId: 'my-aggregate-id',
+ version: 1,
+ type: 'my-event-type',
+ timestamp: new Date().toISOString(),
+};
+
+const otherEventMock = {
+ aggregateId: 'my-aggregate-id',
+ version: 2,
+ type: 'my-event-type-2',
+ timestamp: new Date().toISOString(),
+};
+
+const messageMock = {
+ eventStoreId: eventStoreIdMock,
+ event: eventMock,
+};
+
+const otherMessageMock = {
+ eventStoreId: eventStoreIdMock,
+ event: otherEventMock,
+};
+describe('EventBridgeMessageBusAdapter', () => {
beforeEach(() => {
eventBridgeClientMock.reset();
eventBridgeClientMock.on(PutEventsCommand).resolves({});
});
- it('send a PutEventsCommand to event bridge client on message published', async () => {
- const adapter = new EventBridgeMessageBusAdapter({
- eventBusName: eventBusNameMock,
- eventBridgeClient: eventBridgeClientMock as unknown as EventBridgeClient,
- });
-
- const assertMessage: A.Equals<
- Parameters,
- [Message]
- > = 1;
- assertMessage;
-
- await adapter.publishMessage(messageMock);
-
- // regularly check if vitest matchers are available (toHaveReceivedCommandWith)
- // https://github.com/m-radzikowski/aws-sdk-client-mock/issues/139
- expect(eventBridgeClientMock.calls()).toHaveLength(1);
- expect(eventBridgeClientMock.call(0).args[0].input).toMatchObject({
- Entries: [
- {
- EventBusName: eventBusNameMock,
- Source: eventStoreIdMock,
- DetailType: eventMock.type,
- Detail: JSON.stringify(messageMock),
- },
- ],
+ describe('publishMessage', () => {
+ it('send a PutEventsCommand to event bridge client on message published', async () => {
+ const assertMessage: A.Equals<
+ Parameters,
+ [message: Message, options?: PublishMessageOptions | undefined]
+ > = 1;
+ assertMessage;
+
+ await adapter.publishMessage(messageMock);
+
+ // regularly check if vitest matchers are available (toHaveReceivedCommandWith)
+ // https://github.com/m-radzikowski/aws-sdk-client-mock/issues/139
+ expect(eventBridgeClientMock.calls()).toHaveLength(1);
+ expect(eventBridgeClientMock.call(0).args[0].input).toMatchObject({
+ Entries: [
+ {
+ EventBusName: eventBusNameMock,
+ Source: eventStoreIdMock,
+ DetailType: eventMock.type,
+ Detail: JSON.stringify(messageMock),
+ },
+ ],
+ });
});
- });
- it('send a PutEventsCommand to event bridge client on messages published', async () => {
- const adapter = new EventBridgeMessageBusAdapter({
- eventBusName: eventBusNameMock,
- eventBridgeClient: eventBridgeClientMock as unknown as EventBridgeClient,
+ it('sets detail-type as __REPLAYED__ when replay options is true', async () => {
+ await adapter.publishMessage(messageMock, { replay: true });
+
+ // regularly check if vitest matchers are available (toHaveReceivedCommandWith)
+ // https://github.com/m-radzikowski/aws-sdk-client-mock/issues/139
+ expect(eventBridgeClientMock.calls()).toHaveLength(1);
+ expect(eventBridgeClientMock.call(0).args[0].input).toMatchObject({
+ Entries: [
+ {
+ EventBusName: eventBusNameMock,
+ Source: eventStoreIdMock,
+ DetailType: '__REPLAYED__',
+ Detail: JSON.stringify(messageMock),
+ },
+ ],
+ });
});
- const assertMessage: A.Equals<
- Parameters,
- [Message[]]
- > = 1;
- assertMessage;
-
- await adapter.publishMessages([messageMock, otherMessageMock]);
-
- // regularly check if vitest matchers are available (toHaveReceivedCommandWith)
- // https://github.com/m-radzikowski/aws-sdk-client-mock/issues/139
- expect(eventBridgeClientMock.calls()).toHaveLength(1);
- expect(eventBridgeClientMock.call(0).args[0].input).toMatchObject({
- Entries: [
- {
- EventBusName: eventBusNameMock,
- Source: eventStoreIdMock,
- DetailType: eventMock.type,
- Detail: JSON.stringify(messageMock),
- },
- {
- EventBusName: eventBusNameMock,
- Source: eventStoreIdMock,
- DetailType: otherEventMock.type,
- Detail: JSON.stringify(otherMessageMock),
- },
- ],
+ it('works with event bus name getters', async () => {
+ const otherAdapter = new EventBridgeMessageBusAdapter({
+ eventBusName: () => eventBusNameMock,
+ eventBridgeClient:
+ eventBridgeClientMock as unknown as EventBridgeClient,
+ });
+
+ await otherAdapter.publishMessage(messageMock);
+
+ // regularly check if vitest matchers are available (toHaveReceivedCommandWith)
+ // https://github.com/m-radzikowski/aws-sdk-client-mock/issues/139
+ expect(eventBridgeClientMock.calls()).toHaveLength(1);
+ expect(eventBridgeClientMock.call(0).args[0].input).toMatchObject({
+ Entries: [{ EventBusName: eventBusNameMock }],
+ });
});
});
- it('chunk messages in separate PutEventsCommand calls when there are more messages then EVENTBRIDGE_MAX_ENTRIES_BATCH_SIZE', async () => {
- const adapter = new EventBridgeMessageBusAdapter({
- eventBusName: eventBusNameMock,
- eventBridgeClient: eventBridgeClientMock as unknown as EventBridgeClient,
+ describe('publishMessages', () => {
+ it('sets detail-type as __REPLAYED__ when replay options is true', async () => {
+ await adapter.publishMessages([messageMock, otherMessageMock], {
+ replay: true,
+ });
+
+ // regularly check if vitest matchers are available (toHaveReceivedCommandWith)
+ // https://github.com/m-radzikowski/aws-sdk-client-mock/issues/139
+ expect(eventBridgeClientMock.calls()).toHaveLength(1);
+ expect(eventBridgeClientMock.call(0).args[0].input).toMatchObject({
+ Entries: [
+ {
+ EventBusName: eventBusNameMock,
+ Source: eventStoreIdMock,
+ DetailType: '__REPLAYED__',
+ Detail: JSON.stringify(messageMock),
+ },
+ {
+ EventBusName: eventBusNameMock,
+ Source: eventStoreIdMock,
+ DetailType: '__REPLAYED__',
+ Detail: JSON.stringify(otherMessageMock),
+ },
+ ],
+ });
});
- await adapter.publishMessages(
- Array.from(
- { length: EVENTBRIDGE_MAX_ENTRIES_BATCH_SIZE + 1 },
- () => messageMock,
- ),
- );
-
- // regularly check if vitest matchers are available (toHaveReceivedCommandWith)
- // https://github.com/m-radzikowski/aws-sdk-client-mock/issues/139
- expect(eventBridgeClientMock.calls()).toHaveLength(2);
- });
-
- it('works with event bus name getters', async () => {
- const adapter = new EventBridgeMessageBusAdapter({
- eventBusName: () => eventBusNameMock,
- eventBridgeClient: eventBridgeClientMock as unknown as EventBridgeClient,
+ it('send a PutEventsCommand to event bridge client on messages published', async () => {
+ const assertMessage: A.Equals<
+ Parameters,
+ [messages: Message[], options?: PublishMessageOptions | undefined]
+ > = 1;
+ assertMessage;
+
+ await adapter.publishMessages([messageMock, otherMessageMock]);
+
+ // regularly check if vitest matchers are available (toHaveReceivedCommandWith)
+ // https://github.com/m-radzikowski/aws-sdk-client-mock/issues/139
+ expect(eventBridgeClientMock.calls()).toHaveLength(1);
+ expect(eventBridgeClientMock.call(0).args[0].input).toMatchObject({
+ Entries: [
+ {
+ EventBusName: eventBusNameMock,
+ Source: eventStoreIdMock,
+ DetailType: eventMock.type,
+ Detail: JSON.stringify(messageMock),
+ },
+ {
+ EventBusName: eventBusNameMock,
+ Source: eventStoreIdMock,
+ DetailType: otherEventMock.type,
+ Detail: JSON.stringify(otherMessageMock),
+ },
+ ],
+ });
});
- await adapter.publishMessage(messageMock);
-
- // regularly check if vitest matchers are available (toHaveReceivedCommandWith)
- // https://github.com/m-radzikowski/aws-sdk-client-mock/issues/139
- expect(eventBridgeClientMock.calls()).toHaveLength(1);
- expect(eventBridgeClientMock.call(0).args[0].input).toMatchObject({
- Entries: [
- {
- EventBusName: eventBusNameMock,
- Source: eventStoreIdMock,
- DetailType: eventMock.type,
- Detail: JSON.stringify(messageMock),
- },
- ],
+ it('chunk messages in separate PutEventsCommand calls when there are more messages then EVENTBRIDGE_MAX_ENTRIES_BATCH_SIZE', async () => {
+ await adapter.publishMessages(
+ Array.from(
+ { length: EVENTBRIDGE_MAX_ENTRIES_BATCH_SIZE + 1 },
+ () => messageMock,
+ ),
+ );
+
+ // regularly check if vitest matchers are available (toHaveReceivedCommandWith)
+ // https://github.com/m-radzikowski/aws-sdk-client-mock/issues/139
+ expect(eventBridgeClientMock.calls()).toHaveLength(2);
});
});
});
diff --git a/packages/event-bridge-message-bus-adapter/src/message.ts b/packages/event-bridge-message-bus-adapter/src/message.ts
index 60a3ff47..5360cb94 100644
--- a/packages/event-bridge-message-bus-adapter/src/message.ts
+++ b/packages/event-bridge-message-bus-adapter/src/message.ts
@@ -29,7 +29,7 @@ type EventBridgeStateCarryingMessageBusMessage<
EVENT_STORE_ID
>
? EventBridgeEvent<
- EVENT_TYPE,
+ EVENT_TYPE | '__REPLAYED__',
StateCarryingMessage<
EVENT_STORE_ID,
Extract<
@@ -69,7 +69,7 @@ type EventBridgeNotificationMessageBusMessage<
EVENT_STORE_ID
>
? EventBridgeEvent<
- EVENT_TYPE,
+ EVENT_TYPE | '__REPLAYED__',
NotificationMessage<
EVENT_STORE_ID,
Extract<
diff --git a/packages/in-memory-message-bus-adapter/README.md b/packages/in-memory-message-bus-adapter/README.md
index 036bd9fb..79217b9c 100644
--- a/packages/in-memory-message-bus-adapter/README.md
+++ b/packages/in-memory-message-bus-adapter/README.md
@@ -60,7 +60,7 @@ appMessageBus.messageBusAdapter = messageBusAdapter;
Similarly to event emitters, the `inMemoryMessageBusAdapter` exposes an `on` method that takes two arguments:
-- A filter patterns to optionally specify an `eventStoreId` and an event `type` to listen to (`NotificationEventBus` and `StateCarryingEventBus` only)
+- A filter patterns to optionally specify an `eventStoreId` and an event `type` to listen to (`NotificationEventBus` and `StateCarryingEventBus` only), and wether replayed events should be included
- An async callback to execute if the message matches the filter pattern
```ts
@@ -84,6 +84,33 @@ messageBusAdapter.on(
const { eventStoreId, event } = message;
},
);
+
+// 👇 Include replayed events
+messageBusAdapter.on(
+ { eventStoreId: 'POKEMONS', eventType: 'POKEMON_APPEARED', onReplay: true },
+ async message => {
+ // 🙌 Correctly typed!
+ const { eventStoreId, event } = message;
+ },
+);
+```
+
+For more control, the callback has access to more context through its second argument:
+
+```ts
+messageBusAdapter.on(
+ ...,
+ async (message, context) => {
+ const { eventStoreId, event } = message;
+ const {
+ // 👇 See "Retry policy" section below
+ attempt,
+ retryAttemptsLeft,
+ // 👇 If event is replayed
+ replay,
+ } = context;
+ },
+);
```
The same callback can be re-used with different filter patterns. If a message matches several of them, it will still be triggered once:
diff --git a/packages/in-memory-message-bus-adapter/src/adapter.ts b/packages/in-memory-message-bus-adapter/src/adapter.ts
index 0f42a0d8..11deea67 100644
--- a/packages/in-memory-message-bus-adapter/src/adapter.ts
+++ b/packages/in-memory-message-bus-adapter/src/adapter.ts
@@ -1,3 +1,4 @@
+/* eslint-disable max-lines */
import type { EventEmitter } from 'node:events';
import {
@@ -15,9 +16,10 @@ import type {
ConstructorArgs,
FilterPattern,
InMemoryBusMessage,
+ TaskContext,
} from './types';
import {
- doesMessageMatchAnyFilterPattern,
+ doesTaskMatchAnyFilterPattern,
parseBackoffRate,
parseRetryAttempts,
parseRetryDelayInMs,
@@ -66,6 +68,7 @@ export class InMemoryMessageBusAdapter
filterPattern: FilterPattern,
handler: (
message: InMemoryMessageBusMessage,
+ context: TaskContext,
) => Promise,
) => void;
@@ -108,12 +111,13 @@ export class InMemoryMessageBusAdapter
},
);
- this.publishMessage = async message =>
+ this.publishMessage = async (message, { replay = false } = {}) =>
new Promise(resolve => {
const task: Task = {
message,
attempt: 1,
retryAttemptsLeft: this.retryAttempts,
+ replay,
};
this.eventEmitter.emit('message', task);
@@ -121,9 +125,9 @@ export class InMemoryMessageBusAdapter
resolve();
});
- this.publishMessages = async messages => {
+ this.publishMessages = async (messages, options) => {
for (const message of messages) {
- await this.publishMessage(message);
+ await this.publishMessage(message, options);
}
};
@@ -143,6 +147,7 @@ export class InMemoryMessageBusAdapter
filterPattern: FilterPattern,
handler: (
message: InMemoryMessageBusMessage,
+ context: TaskContext,
) => Promise,
) => {
let handlerIndex = this.handlers.findIndex(
@@ -171,14 +176,21 @@ export class InMemoryMessageBusAdapter
InMemoryMessageBusMessage
>,
) => {
- const { message, retryHandlerIndex } = task;
+ const {
+ message,
+ retryHandlerIndex,
+ attempt,
+ retryAttemptsLeft,
+ replay = false,
+ } = task;
+ const context: TaskContext = { attempt, retryAttemptsLeft, replay };
if (
retryHandlerIndex === undefined
- ? doesMessageMatchAnyFilterPattern(message, filterPatterns)
+ ? doesTaskMatchAnyFilterPattern(task, filterPatterns)
: retryHandlerIndex === handlerIndex
) {
- void handler(message).catch(error => {
+ void handler(message, context).catch(error => {
this.eventEmitter.emit('error', error, task, handlerIndex);
});
}
diff --git a/packages/in-memory-message-bus-adapter/src/adapter.unit.test.ts b/packages/in-memory-message-bus-adapter/src/adapter.unit.test.ts
index e3e4c931..158fa9a7 100644
--- a/packages/in-memory-message-bus-adapter/src/adapter.unit.test.ts
+++ b/packages/in-memory-message-bus-adapter/src/adapter.unit.test.ts
@@ -15,6 +15,7 @@ import {
} from '@castore/demo-blueprint';
import { InMemoryMessageBusAdapter } from './adapter';
+import type { TaskContext } from './types';
const messageBus = new NotificationMessageBus({
messageBusId: 'messageBusId',
@@ -37,6 +38,12 @@ const pikachuCaughtMessage: EventStoreNotificationMessage<
event: pikachuCaughtEvent,
};
+const context: TaskContext = {
+ attempt: 1,
+ retryAttemptsLeft: 2,
+ replay: false,
+};
+
const sleep = (ms: number): Promise =>
new Promise(resolve => setTimeout(resolve, ms));
@@ -82,7 +89,7 @@ describe('in-memory message queue adapter', () => {
expect(handler2).not.toHaveBeenCalled();
});
- it('calls handler if it has been set', async () => {
+ it('calls handler only if it has been set', async () => {
inMemoryMessageBusAdapter.on(
{ eventStoreId: 'POKEMONS', eventType: 'APPEARED' },
handler1,
@@ -92,7 +99,7 @@ describe('in-memory message queue adapter', () => {
await inMemoryMessageBusAdapter.publishMessage(pikachuCaughtMessage);
expect(handler1).toHaveBeenCalledOnce();
- expect(handler1).toHaveBeenCalledWith(pikachuAppearedMessage);
+ expect(handler1).toHaveBeenCalledWith(pikachuAppearedMessage, context);
inMemoryMessageBusAdapter.on(
{ eventStoreId: 'POKEMONS', eventType: 'CAUGHT_BY_TRAINER' },
@@ -101,7 +108,7 @@ describe('in-memory message queue adapter', () => {
await inMemoryMessageBusAdapter.publishMessage(pikachuCaughtMessage);
expect(handler1).toHaveBeenCalledTimes(2);
- expect(handler1).toHaveBeenCalledWith(pikachuCaughtMessage);
+ expect(handler1).toHaveBeenCalledWith(pikachuCaughtMessage, context);
});
it('calls handler only once, event if matches several filter patterns', async () => {
@@ -109,7 +116,46 @@ describe('in-memory message queue adapter', () => {
await inMemoryMessageBusAdapter.publishMessage(pikachuAppearedMessage);
expect(handler1).toHaveBeenCalledOnce();
- expect(handler1).toHaveBeenCalledWith(pikachuAppearedMessage);
+ expect(handler1).toHaveBeenCalledWith(pikachuAppearedMessage, context);
+ });
+
+ it('calls handler only if replay has been specified', async () => {
+ await inMemoryMessageBusAdapter.publishMessage(pikachuCaughtMessage, {
+ replay: true,
+ });
+
+ // Both are still triggered on real-time messages since prev tests
+ expect(handler1).not.toHaveBeenCalled();
+ expect(handler2).not.toHaveBeenCalled();
+
+ inMemoryMessageBusAdapter.on(
+ { eventStoreId: 'POKEMONS', onReplay: true },
+ handler1,
+ );
+ inMemoryMessageBusAdapter.on(
+ {
+ eventStoreId: 'POKEMONS',
+ eventType: 'CAUGHT_BY_TRAINER',
+ onReplay: true,
+ },
+ handler2,
+ );
+
+ await inMemoryMessageBusAdapter.publishMessage(pikachuCaughtMessage, {
+ replay: true,
+ });
+
+ // Both are still triggered on real-time messages since prev tests
+ expect(handler1).toHaveBeenCalledOnce();
+ expect(handler1).toHaveBeenCalledWith(pikachuCaughtMessage, {
+ ...context,
+ replay: true,
+ });
+ expect(handler2).toHaveBeenCalledOnce();
+ expect(handler2).toHaveBeenCalledWith(pikachuCaughtMessage, {
+ ...context,
+ replay: true,
+ });
});
it('calls all handlers if needed', async () => {
@@ -117,8 +163,8 @@ describe('in-memory message queue adapter', () => {
await inMemoryMessageBusAdapter.publishMessage(pikachuAppearedMessage);
- expect(handler1).toHaveBeenCalledWith(pikachuAppearedMessage);
- expect(handler2).toHaveBeenCalledWith(pikachuAppearedMessage);
+ expect(handler1).toHaveBeenCalledWith(pikachuAppearedMessage, context);
+ expect(handler2).toHaveBeenCalledWith(pikachuAppearedMessage, context);
});
it('statically rejects invalid handlers', async () => {
@@ -181,7 +227,7 @@ describe('in-memory message queue adapter', () => {
inMemoryMessageBusAdapter.on({}, handler);
await messageBus.publishMessage(pikachuAppearedMessage);
- expect(handler).toHaveBeenCalledWith(pikachuAppearedMessage);
+ expect(handler).toHaveBeenCalledWith(pikachuAppearedMessage, context);
});
});
diff --git a/packages/in-memory-message-bus-adapter/src/index.ts b/packages/in-memory-message-bus-adapter/src/index.ts
index 98163fb8..75208ea4 100644
--- a/packages/in-memory-message-bus-adapter/src/index.ts
+++ b/packages/in-memory-message-bus-adapter/src/index.ts
@@ -1,2 +1,3 @@
export { InMemoryMessageBusAdapter } from './adapter';
+export type { TaskContext } from './types';
export type { InMemoryMessageBusMessage } from './message';
diff --git a/packages/in-memory-message-bus-adapter/src/message.ts b/packages/in-memory-message-bus-adapter/src/message.ts
index 34f3e3c9..7b8687be 100644
--- a/packages/in-memory-message-bus-adapter/src/message.ts
+++ b/packages/in-memory-message-bus-adapter/src/message.ts
@@ -43,4 +43,5 @@ export type Task<
retryHandlerIndex?: number;
attempt: number;
retryAttemptsLeft: number;
+ replay?: boolean;
};
diff --git a/packages/in-memory-message-bus-adapter/src/types.ts b/packages/in-memory-message-bus-adapter/src/types.ts
index 0ec62244..6af38984 100644
--- a/packages/in-memory-message-bus-adapter/src/types.ts
+++ b/packages/in-memory-message-bus-adapter/src/types.ts
@@ -20,6 +20,12 @@ export type ConstructorArgs = {
retryBackoffRate?: number;
};
+export type TaskContext = {
+ attempt: number;
+ retryAttemptsLeft: number;
+ replay: boolean;
+};
+
export type InMemoryBusMessage<
MESSAGE_BUS extends
| AggregateExistsMessageBus
@@ -44,5 +50,9 @@ export type FilterPattern<
EVENT_STORE_ID extends string = string,
EVENT_TYPE extends string = string,
> =
- | { eventStoreId?: EVENT_STORE_ID; eventType?: never }
- | { eventStoreId: EVENT_STORE_ID; eventType?: EVENT_TYPE };
+ | { eventStoreId?: EVENT_STORE_ID; eventType?: never; onReplay?: boolean }
+ | {
+ eventStoreId: EVENT_STORE_ID;
+ eventType?: EVENT_TYPE;
+ onReplay?: boolean;
+ };
diff --git a/packages/in-memory-message-bus-adapter/src/utils.ts b/packages/in-memory-message-bus-adapter/src/utils.ts
index 78592c1f..5fc52716 100644
--- a/packages/in-memory-message-bus-adapter/src/utils.ts
+++ b/packages/in-memory-message-bus-adapter/src/utils.ts
@@ -1,14 +1,18 @@
-import type { Message } from '@castore/core';
import { isEventCarryingMessage } from '@castore/core';
+import type { Task } from './message';
import type { FilterPattern } from './types';
-export const doesMessageMatchFilterPattern = (
- message: Message,
+export const doesTaskMatchFilterPattern = (
+ task: Task,
filterPattern: FilterPattern,
): boolean => {
- const { eventStoreId: filterEventStoreId, eventType: filterEventType } =
- filterPattern;
+ const { message, replay = false } = task;
+ const {
+ eventStoreId: filterEventStoreId,
+ eventType: filterEventType,
+ onReplay = false,
+ } = filterPattern;
let messageEventType: string | undefined = undefined;
const { eventStoreId: messageEventStoreId } = message;
@@ -17,6 +21,10 @@ export const doesMessageMatchFilterPattern = (
messageEventType = message.event.type;
}
+ if (replay && !onReplay) {
+ return false;
+ }
+
if (filterEventStoreId !== undefined && filterEventType !== undefined) {
return (
messageEventStoreId === filterEventStoreId &&
@@ -31,12 +39,12 @@ export const doesMessageMatchFilterPattern = (
return true;
};
-export const doesMessageMatchAnyFilterPattern = (
- message: Message,
+export const doesTaskMatchAnyFilterPattern = (
+ task: Task,
filterPatterns: FilterPattern[],
): boolean =>
filterPatterns.some(filterPattern =>
- doesMessageMatchFilterPattern(message, filterPattern),
+ doesTaskMatchFilterPattern(task, filterPattern),
);
export const parseRetryDelayInMs = (retryDelayInMs: number): number => {
diff --git a/packages/in-memory-message-queue-adapter/README.md b/packages/in-memory-message-queue-adapter/README.md
index 25015969..ed980f0e 100644
--- a/packages/in-memory-message-queue-adapter/README.md
+++ b/packages/in-memory-message-queue-adapter/README.md
@@ -82,6 +82,23 @@ messageQueueAdapter.worker = async message => {
> Only one worker at a time can be set up
+For more control, the worker has access to more context through its second argument:
+
+```ts
+messageQueueAdapter.worker = async (message, context) => {
+ const { eventStoreId, event } = message;
+ const {
+ // 👇 See "Retry policy" section below
+ attempt,
+ retryAttemptsLeft,
+ // 👇 If event is replayed
+ replay,
+ } = context;
+
+ ...
+};
+```
+
## ♻️ Retry policy
This adapter will retry failed messages handling. You can specify a different retry policy than the default one via its constructor arguments:
diff --git a/packages/in-memory-message-queue-adapter/src/adapter.ts b/packages/in-memory-message-queue-adapter/src/adapter.ts
index 121aabba..e49368e1 100644
--- a/packages/in-memory-message-queue-adapter/src/adapter.ts
+++ b/packages/in-memory-message-queue-adapter/src/adapter.ts
@@ -18,7 +18,7 @@ import {
parseRetryDelayInMs,
} from './utils';
-type InMemoryQueueMessage<
+export type InMemoryQueueMessage<
MESSAGE_QUEUE extends
| AggregateExistsMessageQueue
| StateCarryingMessageQueue
@@ -42,14 +42,18 @@ type InMemoryQueueMessage<
>
: never;
-export type Task = {
- message: MESSAGE;
+export type TaskContext = {
attempt: number;
retryAttemptsLeft: number;
+ replay: boolean;
};
+export type Task = {
+ message: MESSAGE;
+} & TaskContext;
+
type ConstructorArgs = {
- worker?: (message: MESSAGE) => Promise;
+ worker?: (message: MESSAGE, context: TaskContext) => Promise;
retryAttempts?: number;
retryDelayInMs?: number;
retryBackoffRate?: number;
@@ -78,7 +82,9 @@ export class InMemoryMessageQueueAdapter
publishMessage: MessageChannelAdapter['publishMessage'];
publishMessages: MessageChannelAdapter['publishMessages'];
- private subscribe: (nextHandler: (message: MESSAGE) => Promise) => void;
+ private subscribe: (
+ nextHandler: (message: MESSAGE, context: TaskContext) => Promise,
+ ) => void;
retryAttempts: number;
retryDelayInMs: number;
retryBackoffRate: number;
@@ -96,14 +102,17 @@ export class InMemoryMessageQueueAdapter
this.retryBackoffRate = parseBackoffRate(retryBackoffRate);
this.subscribe = (
- nextHandler: (message: MESSAGE) => Promise,
+ nextHandler: (message: MESSAGE, context: TaskContext) => Promise,
): void => {
- this.queue = fastQ(({ message }) => nextHandler(message), 1);
+ this.queue = fastQ(
+ ({ message, ...context }) => nextHandler(message, context),
+ 1,
+ );
this.queue.error((error, task) => {
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (error === null) return;
- const { message, attempt, retryAttemptsLeft } = task;
+ const { attempt, retryAttemptsLeft, ...restTask } = task;
if (retryAttemptsLeft <= 0) {
console.error(error);
@@ -120,9 +129,9 @@ export class InMemoryMessageQueueAdapter
if (queue === undefined) return;
void queue.push({
- message,
attempt: attempt + 1,
retryAttemptsLeft: retryAttemptsLeft - 1,
+ ...restTask,
});
}, waitTimeInMs);
});
@@ -132,7 +141,7 @@ export class InMemoryMessageQueueAdapter
this.subscribe(worker);
}
- this.publishMessage = async message =>
+ this.publishMessage = async (message, { replay = false } = {}) =>
new Promise(resolve => {
const queue = this.queue;
@@ -146,19 +155,22 @@ export class InMemoryMessageQueueAdapter
message: message as MESSAGE,
attempt: 1,
retryAttemptsLeft: this.retryAttempts,
+ replay,
});
resolve();
});
- this.publishMessages = async messages => {
+ this.publishMessages = async (messages, options) => {
for (const message of messages) {
- await this.publishMessage(message);
+ await this.publishMessage(message, options);
}
};
}
- set worker(worker: (message: MESSAGE) => Promise) {
+ set worker(
+ worker: (message: MESSAGE, context: TaskContext) => Promise,
+ ) {
this.subscribe(worker);
}
}
diff --git a/packages/in-memory-message-queue-adapter/src/adapter.unit.test.ts b/packages/in-memory-message-queue-adapter/src/adapter.unit.test.ts
index 945da60a..0809006d 100644
--- a/packages/in-memory-message-queue-adapter/src/adapter.unit.test.ts
+++ b/packages/in-memory-message-queue-adapter/src/adapter.unit.test.ts
@@ -13,7 +13,7 @@ import {
pikachuAppearedEvent,
} from '@castore/demo-blueprint';
-import { Task, InMemoryMessageQueueAdapter } from './adapter';
+import { Task, InMemoryMessageQueueAdapter, TaskContext } from './adapter';
const messageQueue = new NotificationMessageQueue({
messageQueueId: 'messageQueueId',
@@ -29,6 +29,12 @@ const pikachuAppearedMessage: EventStoreNotificationMessage<
event: pikachuAppearedEvent,
};
+const context: TaskContext = {
+ attempt: 1,
+ retryAttemptsLeft: 2,
+ replay: false,
+};
+
const sleep = (ms: number): Promise =>
new Promise(resolve => setTimeout(resolve, ms));
@@ -77,17 +83,28 @@ describe('in-memory message queue adapter', () => {
await inMemoryMessageQueueAdapter.publishMessage(pikachuAppearedMessage);
- expect(worker1).toHaveBeenCalledWith(pikachuAppearedMessage);
+ expect(worker1).toHaveBeenCalledWith(pikachuAppearedMessage, context);
expect(worker2).not.toHaveBeenCalled();
});
+ it('passes replay option to context', async () => {
+ await inMemoryMessageQueueAdapter.publishMessage(pikachuAppearedMessage, {
+ replay: true,
+ });
+
+ expect(worker1).toHaveBeenCalledWith(pikachuAppearedMessage, {
+ ...context,
+ replay: true,
+ });
+ });
+
it('recreates queue if new worker is set', async () => {
inMemoryMessageQueueAdapter.worker = worker2;
await inMemoryMessageQueueAdapter.publishMessage(pikachuAppearedMessage);
expect(worker1).not.toHaveBeenCalled();
- expect(worker2).toHaveBeenCalledWith(pikachuAppearedMessage);
+ expect(worker2).toHaveBeenCalledWith(pikachuAppearedMessage, context);
});
it('actually connects to a messageQueue', async () => {
@@ -95,7 +112,7 @@ describe('in-memory message queue adapter', () => {
await messageQueue.publishMessage(pikachuAppearedMessage);
- expect(worker2).toHaveBeenCalledWith(pikachuAppearedMessage);
+ expect(worker2).toHaveBeenCalledWith(pikachuAppearedMessage, context);
});
it('accepts worker in constructor', async () => {
@@ -129,6 +146,18 @@ describe('in-memory message queue adapter', () => {
expect(worker2).toHaveBeenCalledTimes(mockNumberOfEventToPublish);
});
+
+ it('passes replay option to context', async () => {
+ await messageQueue.publishMessages([pikachuAppearedMessage], {
+ replay: true,
+ });
+
+ expect(worker2).toHaveBeenCalledOnce();
+ expect(worker2).toHaveBeenCalledWith(pikachuAppearedMessage, {
+ ...context,
+ replay: true,
+ });
+ });
});
describe('through static method', () => {
@@ -157,7 +186,7 @@ describe('in-memory message queue adapter', () => {
inMemoryMessageQueueAdapter.worker = worker;
await messageQueue.publishMessage(pikachuAppearedMessage);
- expect(worker).toHaveBeenCalledWith(pikachuAppearedMessage);
+ expect(worker).toHaveBeenCalledWith(pikachuAppearedMessage, context);
});
it('correctly instanciates a class and attach it (with worker)', async () => {
@@ -183,7 +212,7 @@ describe('in-memory message queue adapter', () => {
assertQueueType;
await messageQueue.publishMessage(pikachuAppearedMessage);
- expect(worker).toHaveBeenCalledWith(pikachuAppearedMessage);
+ expect(worker).toHaveBeenCalledWith(pikachuAppearedMessage, context);
});
});
diff --git a/packages/in-memory-message-queue-adapter/src/index.ts b/packages/in-memory-message-queue-adapter/src/index.ts
index 8c9eb922..5bb7bbcc 100644
--- a/packages/in-memory-message-queue-adapter/src/index.ts
+++ b/packages/in-memory-message-queue-adapter/src/index.ts
@@ -1 +1,2 @@
export { InMemoryMessageQueueAdapter } from './adapter';
+export type { InMemoryQueueMessage, TaskContext } from './adapter';
diff --git a/packages/inmemory-event-storage-adapter/src/adapter.ts b/packages/inmemory-event-storage-adapter/src/adapter.ts
index 489a363c..9472d9cf 100644
--- a/packages/inmemory-event-storage-adapter/src/adapter.ts
+++ b/packages/inmemory-event-storage-adapter/src/adapter.ts
@@ -1,11 +1,11 @@
/* eslint-disable max-lines */
-import {
+import type {
Aggregate,
EventDetail,
- GroupedEvent,
- EventStoreContext,
+ PushEventOptions,
StorageAdapter,
} from '@castore/core';
+import { GroupedEvent } from '@castore/core';
import { InMemoryEventAlreadyExistsError } from './error';
import {
@@ -113,7 +113,7 @@ export class InMemoryStorageAdapter implements StorageAdapter {
getEvents: StorageAdapter['getEvents'];
pushEventSync: (
eventDetail: EventDetail,
- context: EventStoreContext,
+ options: PushEventOptions,
) => Awaited>;
pushEvent: StorageAdapter['pushEvent'];
pushEventGroup: StorageAdapter['pushEventGroup'];
@@ -137,8 +137,10 @@ export class InMemoryStorageAdapter implements StorageAdapter {
}
});
- this.pushEventSync = (event, context) => {
+ this.pushEventSync = (event, options) => {
const { aggregateId, version } = event;
+ const { eventStoreId, force = false } = options;
+
const events = this.eventStore[aggregateId];
if (events === undefined) {
@@ -147,18 +149,22 @@ export class InMemoryStorageAdapter implements StorageAdapter {
return { event };
}
- if (
- events.some(
- ({ version: existingVersion }) => existingVersion === version,
- )
- ) {
- const { eventStoreId } = context;
-
- throw new InMemoryEventAlreadyExistsError({
- eventStoreId,
- aggregateId,
- version,
- });
+ const existingEventIndex = events.findIndex(
+ ({ version: existingVersion }) => existingVersion === version,
+ );
+
+ if (existingEventIndex !== -1) {
+ if (force) {
+ events[existingEventIndex] = event;
+
+ return { event };
+ } else {
+ throw new InMemoryEventAlreadyExistsError({
+ eventStoreId,
+ aggregateId,
+ version,
+ });
+ }
}
events.push(event);
@@ -166,10 +172,10 @@ export class InMemoryStorageAdapter implements StorageAdapter {
return { event };
};
- this.pushEvent = async (event, context) =>
+ this.pushEvent = async (event, options) =>
new Promise(resolve => {
const timestamp = new Date().toISOString();
- resolve(this.pushEventSync({ timestamp, ...event }, context));
+ resolve(this.pushEventSync({ timestamp, ...event }, options));
});
this.pushEventGroup = async (...groupedEventsInput) =>
diff --git a/packages/inmemory-event-storage-adapter/src/adapter.unit.test.ts b/packages/inmemory-event-storage-adapter/src/adapter.unit.test.ts
index 939742cc..6ce1638c 100644
--- a/packages/inmemory-event-storage-adapter/src/adapter.unit.test.ts
+++ b/packages/inmemory-event-storage-adapter/src/adapter.unit.test.ts
@@ -41,7 +41,7 @@ describe('in-memory storage adapter', () => {
});
describe('methods', () => {
- describe('getEvents', () => {
+ describe('getEvents / pushEvent', () => {
const storageAdapter = new InMemoryStorageAdapter();
it('gets an empty array if there is no event for aggregateId', async () => {
@@ -59,6 +59,15 @@ describe('in-memory storage adapter', () => {
).rejects.toThrow(InMemoryEventAlreadyExistsError);
});
+ it('overrides event is force option is set to true', async () => {
+ const { event } = await storageAdapter.pushEvent(eventMock1, {
+ eventStoreId,
+ force: true,
+ });
+
+ expect(event).toStrictEqual(eventMock1);
+ });
+
it('pushes and gets events correctly', async () => {
const { timestamp, ...eventMock2WithoutTimestamp } = eventMock2;
MockDate.set(timestamp);
diff --git a/packages/redux-event-storage-adapter/src/adapter.ts b/packages/redux-event-storage-adapter/src/adapter.ts
index 181650b0..6cd538ec 100644
--- a/packages/redux-event-storage-adapter/src/adapter.ts
+++ b/packages/redux-event-storage-adapter/src/adapter.ts
@@ -5,7 +5,7 @@ import {
GroupedEvent,
StorageAdapter,
EventDetail,
- EventStoreContext,
+ PushEventOptions,
Aggregate,
} from '@castore/core';
@@ -106,7 +106,7 @@ export class ReduxEventStorageAdapter implements StorageAdapter {
getEvents: StorageAdapter['getEvents'];
pushEventSync: (
eventDetail: EventDetail,
- context: EventStoreContext,
+ options: PushEventOptions,
) => Awaited>;
pushEvent: StorageAdapter['pushEvent'];
pushEventGroup: StorageAdapter['pushEventGroup'];
@@ -145,14 +145,15 @@ export class ReduxEventStorageAdapter implements StorageAdapter {
return eventStoreState;
};
- this.pushEventSync = event => {
+ this.pushEventSync = (event, options) => {
const { aggregateId } = event;
+ const force = options.force ?? false;
const eventStoreState = this.getEventStoreState();
const events = eventStoreState.eventsByAggregateId[aggregateId] ?? [];
- if (events.some(({ version }) => version === event.version)) {
+ if (!force && events.some(({ version }) => version === event.version)) {
throw new ReduxStoreEventAlreadyExistsError({
eventStoreId: this.eventStoreId,
aggregateId: event.aggregateId,
diff --git a/packages/sqs-message-queue-adapter/README.md b/packages/sqs-message-queue-adapter/README.md
index 106396a7..d8396a2e 100644
--- a/packages/sqs-message-queue-adapter/README.md
+++ b/packages/sqs-message-queue-adapter/README.md
@@ -66,19 +66,19 @@ When publishing a message, it is JSON stringified and passed as the record body.
```ts
// 👇 Aggregate exists
-{
- "body": "{
+const message = {
+ body: '{
\"eventStoreId\": \"POKEMONS\",
\"aggregateId\": \"123\",
- }",
+ }',
... // <= Other technical SQS properties
}
```
```ts
// 👇 Notification
-{
- "body": "{
+const message = {
+ body: '{
\"eventStoreId\": \"POKEMONS\",
\"event\": {
\"aggregateId\": \"123\",
@@ -87,38 +87,62 @@ When publishing a message, it is JSON stringified and passed as the record body.
\"timestamp\": ...
...
},
- }",
+ }',
...
}
```
```ts
// 👇 State-carrying
-{
- "body": "{
+const message = {
+ body: '{
\"eventStoreId\": \"POKEMONS\",
\"event\": {
\"aggregateId\": \"123\",
...
},
- \"aggregate\": ...
- }",
+ \"aggregate\": ...,
+ }',
...
-}
+};
```
-If your queue is of type FIFO, the `MessageGroupId` and `MessageDeduplicationId` will be derived from a combination of the `eventStoreId`, `aggregateId` and `version`:
+If your queue is of type FIFO, the `messageGroupId` and `messageDeduplicationId` will be derived from a combination of the `eventStoreId`, `aggregateId` and `version`:
```ts
-// 👇 Entry example
-const Entry = {
- MessageBody: JSON.stringify({ ... }),
- MessageGroupId: "POKEMONS#123",
- MessageDeduplicationId: "POKEMONS#123#1", // <= Or "POKEMONS#123" for AggregateExistsMessageQueues
+// 👇 Fifo message
+const message = {
+ messageBody: ...,
+ messageGroupId: "POKEMONS#123",
+ messageDeduplicationId: "POKEMONS#123#1", // <= Or "POKEMONS#123" for AggregateExistsMessageQueues
... // <= Other technical SQS properties
};
```
+If the `replay` option is set to `true`, a `replay` metadata attribute is included in the message:
+
+```ts
+// 👇 Replayed notification message
+const message = {
+ body: '{
+ \"eventStoreId\": \"POKEMONS\",
+ \"event\": {
+ \"aggregateId\": \"123\",
+ ...
+ },
+ }',
+ messageAttributes: {
+ replay: {
+ // 👇 boolean type is not available in SQS 🤷♂️
+ dataType: 'Number',
+ // 👇 numberValue is not available in SQS 🤷♂️
+ stringValue: '1',
+ },
+ },
+ ...
+};
+```
+
On the worker side, you can use the `SQSMessageQueueMessage` and `SQSMessageQueueMessageBody` TS types to type your argument:
```ts
diff --git a/packages/sqs-message-queue-adapter/src/adapter.ts b/packages/sqs-message-queue-adapter/src/adapter.ts
index d76316e7..97211413 100644
--- a/packages/sqs-message-queue-adapter/src/adapter.ts
+++ b/packages/sqs-message-queue-adapter/src/adapter.ts
@@ -2,6 +2,8 @@ import {
SQSClient,
SendMessageCommand,
SendMessageBatchCommand,
+ SendMessageCommandInput,
+ SendMessageBatchRequestEntry,
} from '@aws-sdk/client-sqs';
import chunk from 'lodash.chunk';
@@ -57,48 +59,75 @@ export class SQSMessageQueueAdapter implements MessageChannelAdapter {
this.getQueueUrl = () =>
typeof this.queueUrl === 'string' ? this.queueUrl : this.queueUrl();
- this.publishMessage = async message => {
+ this.publishMessage = async (message, { replay = false } = {}) => {
const { eventStoreId } = message;
const { aggregateId, version } = parseMessage(message);
+ const sendMessageCommandInput: SendMessageCommandInput = {
+ MessageBody: JSON.stringify(message),
+ QueueUrl: this.getQueueUrl(),
+ };
+
+ if (this.fifo) {
+ sendMessageCommandInput.MessageDeduplicationId = [
+ eventStoreId,
+ aggregateId,
+ version,
+ ]
+ .filter(Boolean)
+ .join('#');
+
+ sendMessageCommandInput.MessageGroupId = [
+ eventStoreId,
+ aggregateId,
+ ].join('#');
+ }
+
+ if (replay) {
+ sendMessageCommandInput.MessageAttributes = {
+ replay: { DataType: 'Number', StringValue: '1' },
+ };
+ }
+
await this.sqsClient.send(
- new SendMessageCommand({
- MessageBody: JSON.stringify(message),
- QueueUrl: this.getQueueUrl(),
- ...(this.fifo
- ? {
- MessageDeduplicationId: [eventStoreId, aggregateId, version]
- .filter(Boolean)
- .join('#'),
- MessageGroupId: [eventStoreId, aggregateId].join('#'),
- }
- : {}),
- }),
+ new SendMessageCommand(sendMessageCommandInput),
);
};
- this.publishMessages = async messages => {
+ this.publishMessages = async (messages, { replay = false } = {}) => {
+ const baseEntry: Omit<
+ SendMessageBatchRequestEntry,
+ 'Id' | 'MessageBody'
+ > = {};
+
+ if (replay) {
+ baseEntry.MessageAttributes = {
+ replay: { DataType: 'Number', StringValue: '1' },
+ };
+ }
+
for (const chunkMessages of chunk(messages, SQS_MAX_MESSAGE_BATCH_SIZE)) {
await this.sqsClient.send(
new SendMessageBatchCommand({
Entries: chunkMessages.map(message => {
const { eventStoreId } = message;
const { aggregateId, version } = parseMessage(message);
-
const messageId = [eventStoreId, aggregateId, version]
.filter(Boolean)
.join('#');
- return {
+ const entry: SendMessageBatchRequestEntry = {
+ ...baseEntry,
Id: messageId,
MessageBody: JSON.stringify(message),
- ...(this.fifo
- ? {
- MessageDeduplicationId: messageId,
- MessageGroupId: [eventStoreId, aggregateId].join('#'),
- }
- : {}),
};
+
+ if (this.fifo) {
+ entry.MessageDeduplicationId = messageId;
+ entry.MessageGroupId = [eventStoreId, aggregateId].join('#');
+ }
+
+ return entry;
}),
QueueUrl: this.getQueueUrl(),
}),
diff --git a/packages/sqs-message-queue-adapter/src/adapter.unit.test.ts b/packages/sqs-message-queue-adapter/src/adapter.unit.test.ts
index b76124a3..ced74e8d 100644
--- a/packages/sqs-message-queue-adapter/src/adapter.unit.test.ts
+++ b/packages/sqs-message-queue-adapter/src/adapter.unit.test.ts
@@ -1,3 +1,4 @@
+/* eslint-disable max-lines */
import {
SQSClient,
SendMessageCommand,
@@ -6,7 +7,7 @@ import {
import { mockClient } from 'aws-sdk-client-mock';
import type { A } from 'ts-toolbelt';
-import type { Message } from '@castore/core';
+import type { Message, PublishMessageOptions } from '@castore/core';
import { SQSMessageQueueAdapter, SQS_MAX_MESSAGE_BATCH_SIZE } from './adapter';
@@ -52,7 +53,7 @@ describe('SQSMessageQueueAdapter', () => {
const assertMessage: A.Equals<
Parameters,
- [Message]
+ [message: Message, options?: PublishMessageOptions | undefined]
> = 1;
assertMessage;
@@ -85,6 +86,25 @@ describe('SQSMessageQueueAdapter', () => {
});
});
+ it('appends replay MessageAttribute if replay is set to true', async () => {
+ const adapter = new SQSMessageQueueAdapter({
+ queueUrl: queueUrlMock,
+ sqsClient: sqsClientMock as unknown as SQSClient,
+ fifo: true,
+ });
+
+ await adapter.publishMessage(message, { replay: true });
+
+ // regularly check if vitest matchers are available (toHaveReceivedCommandWith)
+ // https://github.com/m-radzikowski/aws-sdk-client-mock/issues/139
+ expect(sqsClientMock.calls()).toHaveLength(1);
+ expect(sqsClientMock.call(0).args[0].input).toMatchObject({
+ MessageAttributes: {
+ replay: { DataType: 'Number', StringValue: '1' },
+ },
+ });
+ });
+
it('send a SendMessageBatchCommand to sqs client on messages batch published', async () => {
const adapter = new SQSMessageQueueAdapter({
queueUrl: queueUrlMock,
@@ -93,7 +113,7 @@ describe('SQSMessageQueueAdapter', () => {
const assertMessages: A.Equals<
Parameters,
- [Message[]]
+ [messages: Message[], options?: PublishMessageOptions | undefined]
> = 1;
assertMessages;
@@ -150,6 +170,34 @@ describe('SQSMessageQueueAdapter', () => {
});
});
+ it('appends replay MessageAttribute if replay is set to true', async () => {
+ const adapter = new SQSMessageQueueAdapter({
+ queueUrl: queueUrlMock,
+ sqsClient: sqsClientMock as unknown as SQSClient,
+ });
+
+ await adapter.publishMessages([message, otherMessage], { replay: true });
+
+ // regularly check if vitest matchers are available (toHaveReceivedCommandWith)
+ // https://github.com/m-radzikowski/aws-sdk-client-mock/issues/139
+ expect(sqsClientMock.calls()).toHaveLength(1);
+ expect(sqsClientMock.call(0).args[0].input).toMatchObject({
+ Entries: [
+ {
+ MessageAttributes: {
+ replay: { DataType: 'Number', StringValue: '1' },
+ },
+ },
+ {
+ MessageAttributes: {
+ replay: { DataType: 'Number', StringValue: '1' },
+ },
+ },
+ ],
+ QueueUrl: queueUrlMock,
+ });
+ });
+
it('chunk messages in separate SendMessageBatchCommand calls when there are more messages then SQS_MAX_MESSAGE_BATCH_SIZE', async () => {
const adapter = new SQSMessageQueueAdapter({
queueUrl: queueUrlMock,
diff --git a/packages/sqs-message-queue-adapter/src/message.ts b/packages/sqs-message-queue-adapter/src/message.ts
index b448396e..2590d9ad 100644
--- a/packages/sqs-message-queue-adapter/src/message.ts
+++ b/packages/sqs-message-queue-adapter/src/message.ts
@@ -1,4 +1,4 @@
-import type { SQSEvent } from 'aws-lambda';
+import type { SQSRecord } from 'aws-lambda';
import type {
AggregateExistsMessageQueue,
@@ -10,7 +10,15 @@ import type {
MessageChannelSourceEventStores,
} from '@castore/core';
-export type SQSMessageQueueMessage = SQSEvent;
+export interface SQSMessageQueueRecord extends SQSRecord {
+ messageAttributes: {
+ replay?: { stringValue: '1'; dataType: 'Number' };
+ };
+}
+
+export interface SQSMessageQueueMessage {
+ Records: SQSMessageQueueRecord[];
+}
type Prettify> =
OBJECTS extends infer OBJECT