diff --git a/README.md b/README.md index d7b305bf..447b5434 100644 --- a/README.md +++ b/README.md @@ -398,7 +398,7 @@ const pokemonsEventStore = new EventStore({ > // => pokemonsReducer > ``` > -> - onEventPushed (?(pushEventResponse: PushEventResponse) => Promise\): The callback to run after events are pushed +> - onEventPushed (?(pushEventResponse: PushEventResponse) => Promise\): Callback to run after events are pushed > > ```ts > const onEventPushed = pokemonsEventStore.onEventPushed; @@ -522,11 +522,12 @@ const pokemonsEventStore = new EventStore({ > // => 'aggregate' and 'lastEvent' are always defined 🙌 > ``` > -> - pushEvent ((eventDetail: EventDetail, opt?: OptionsObj = {}) => Promise\): Pushes a new event to the event store. The `timestamp` is optional (we keep it available as it can be useful in tests & migrations). If not provided, it is automatically set as `new Date().toISOString()`. Throws an `EventAlreadyExistsError` if an event already exists for the corresponding `aggregateId` and `version`. +> - pushEvent ((eventDetail: EventDetail, opt?: OptionsObj = {}) => Promise\): Pushes a new event to the event store. The `timestamp` is optional (we keep it available as it can be useful in tests & migrations). If not provided, it is automatically set as `new Date().toISOString()`. Throws an `EventAlreadyExistsError` if an event already exists for the corresponding `aggregateId` and `version` (see section below on race conditions). > > `OptionsObj` contains the following properties: > > - prevAggregate (?Aggregate): The aggregate at the current version, i.e. before having pushed the event. Can be useful in some cases like when using the [`ConnectedEventStore` class](#--connectedeventstore) +> - force (?boolean): To force push the event even if one already exists for the corresponding `aggregateId` and `version`. Any existing event will be overridden, so use with extra care, mainly in [data migrations](#--dam). > > `ResponseObj` contains the following properties: > @@ -971,9 +972,13 @@ await appMessageQueue.publishMessage({ > > The following methods interact with the messaging solution of your application through a `MessageQueueAdapter`. They will throw an `UndefinedMessageChannelAdapterError` if you did not provide one. > -> - publishMessage ((message: NotificationMessage | StateCarryingMessage) => Promise\): Publish a `NotificationMessage` (for `NotificationMessageQueues`) or a `StateCarryingMessage` (for `StateCarryingMessageQueues`) to the message queue. +> - publishMessage ((message: Message, opt?: OptionsObj = {}) => Promise\): Publish a `Message` (of the appropriate type) to the message queue. > -> - publishMessages ((messages: NotificationMessage[] | StateCarryingMessage[]) => Promise\): Publish several `NotificationMessage` (for `NotificationMessageQueues`) or several `StateCarryingMessage` (for `StateCarryingMessageQueues`) to the message queue. +> `OptionsObj` contains the following properties: +> +> - replay (?boolean): Signals that the event is not happening in real-time, e.g. in maintenance or migration operations. This information can be used downstream to react appropriately (e.g. prevent sending notification emails). Check the implementation of you adapter for more details. +> +> - publishMessages (messages: Message[], opt?: OptionsObj) => Promise\): Publish several `Messages` (of the appropriate type) to the message queue. Options are similar to the `publishMessage` options. > > - getAggregateAndPublishMessage ((message: NotificationMessage) => Promise\): _(StateCarryingMessageQueues only)_ Append the matching aggregate (with correct version) to a `NotificationMessage` and turn it into a `StateCarryingMessage` before publishing it to the message queue. Uses the message queue event stores: Make sure that they have correct adapters set up. > @@ -1103,9 +1108,13 @@ await appMessageBus.publishMessage({ > > The following methods interact with the messaging solution of your application through a `MessageBusAdapter`. They will throw an `UndefinedMessageChannelAdapterError` if you did not provide one. > -> - publishMessage ((message: NotificationMessage | StateCarryingMessage) => Promise\): Publish a `NotificationMessage` (for `NotificationMessageBuses`) or a `StateCarryingMessage` (for `StateCarryingMessageBuses`) to the message bus. +> - publishMessage ((message: Message, opt?: OptionsObj = {}) => Promise\): Publish a `Message` (of the appropriate type) to the message bus. +> +> `OptionsObj` contains the following properties: +> +> - replay (?boolean): Signals that the event is not happening in real-time, e.g. in maintenance or migration operations. This information can be used downstream to react appropriately (e.g. prevent sending notification emails). Check the implementation of you adapter for more details. > -> - publishMessages ((messages: NotificationMessage[] | StateCarryingMessage[]) => Promise\): Publish several `NotificationMessage` (for `NotificationMessageBuses`) or several `StateCarryingMessage` (for `StateCarryingMessageBuses`) to the message bus. +> - publishMessages (messages: Message[], opt?: OptionsObj) => Promise\): Publish several `Messages` (of the appropriate type) to the message bus. Options are similar to the `publishMessage` options. > > - getAggregateAndPublishMessage ((message: NotificationMessage) => Promise\): _(StateCarryingMessageBuses only)_ Append the matching aggregate (with correct version) to a `NotificationMessage` and turn it into a `StateCarryingMessage` before publishing it to the message bus. Uses the message bus event stores: Make sure that they have correct adapters set up. > diff --git a/packages/core/src/eventStore/eventStore.ts b/packages/core/src/eventStore/eventStore.ts index a2bb5c20..a580ceaf 100644 --- a/packages/core/src/eventStore/eventStore.ts +++ b/packages/core/src/eventStore/eventStore.ts @@ -176,11 +176,15 @@ export class EventStore< */ ) as Promise<{ events: EVENT_DETAILS[] }>; - this.pushEvent = async (eventDetail, { prevAggregate } = {}) => { + this.pushEvent = async ( + eventDetail, + { prevAggregate, force = false } = {}, + ) => { const storageAdapter = this.getStorageAdapter(); const { event } = (await storageAdapter.pushEvent(eventDetail, { eventStoreId: this.eventStoreId, + force, })) as { event: $EVENT_DETAILS }; let nextAggregate: AGGREGATE | undefined = undefined; diff --git a/packages/core/src/eventStore/eventStore.type.test.ts b/packages/core/src/eventStore/eventStore.type.test.ts index f0f8125b..770ef642 100644 --- a/packages/core/src/eventStore/eventStore.type.test.ts +++ b/packages/core/src/eventStore/eventStore.type.test.ts @@ -108,7 +108,7 @@ assertPushEventInput1; const assertPushEventInput2: A.Equals< Parameters[1], - { prevAggregate?: PokemonAggregate | undefined } | undefined + { prevAggregate?: PokemonAggregate | undefined; force?: boolean } | undefined > = 1; assertPushEventInput2; diff --git a/packages/core/src/eventStore/eventStore.unit.test.ts b/packages/core/src/eventStore/eventStore.unit.test.ts index e97dab33..9efe0d07 100644 --- a/packages/core/src/eventStore/eventStore.unit.test.ts +++ b/packages/core/src/eventStore/eventStore.unit.test.ts @@ -148,6 +148,7 @@ describe('event store', () => { expect(pushEventMock).toHaveBeenCalledTimes(1); expect(pushEventMock).toHaveBeenCalledWith(pikachuLeveledUpEvent, { eventStoreId: pokemonsEventStore.eventStoreId, + force: false, }); expect(response).toStrictEqual({ event: pikachuLeveledUpEvent }); }); diff --git a/packages/core/src/eventStore/types.ts b/packages/core/src/eventStore/types.ts index 3c2c022a..90bdbb7a 100644 --- a/packages/core/src/eventStore/types.ts +++ b/packages/core/src/eventStore/types.ts @@ -37,7 +37,7 @@ export type EventPusher< event: $EVENT_DETAILS extends EventDetail ? OptionalTimestamp<$EVENT_DETAILS> : $EVENT_DETAILS, - options?: { prevAggregate?: $AGGREGATE }, + options?: { prevAggregate?: $AGGREGATE; force?: boolean }, ) => Promise<{ event: EVENT_DETAILS; nextAggregate?: AGGREGATE }>; export type AggregateIdsLister = ( @@ -62,6 +62,9 @@ export type EventGroupPusher = < ...GroupedEvent[], ], >( + /** + * @debt v2 "use an array and enable options in 2nd arg (useful for 'force' opt for instance)" + */ ...groupedEvents: GROUPED_EVENTS ) => Promise<{ eventGroup: EventGroupPusherResponse }>; diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 1fac63bd..04026d94 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -6,6 +6,7 @@ export type { EventDetail, OptionalTimestamp } from './event/eventDetail'; export type { StorageAdapter } from './storageAdapter'; export type { EventsQueryOptions, + PushEventOptions, EventStoreContext, ListAggregateIdsOptions, ListAggregateIdsOutput, @@ -65,6 +66,7 @@ export type { NotificationMessage, StateCarryingMessage, Message, + PublishMessageOptions, EventStoreAggregateExistsMessage, EventStoreNotificationMessage, EventStoreStateCarryingMessage, diff --git a/packages/core/src/messaging/channel/aggregateExistsMessageChannel.ts b/packages/core/src/messaging/channel/aggregateExistsMessageChannel.ts index 9d32c675..f0c38114 100644 --- a/packages/core/src/messaging/channel/aggregateExistsMessageChannel.ts +++ b/packages/core/src/messaging/channel/aggregateExistsMessageChannel.ts @@ -7,6 +7,7 @@ import { UndefinedMessageChannelAdapterError, } from './errors'; import type { MessageChannelAdapter } from './messageChannelAdapter'; +import type { PublishMessageOptions } from './types'; export class AggregateExistsMessageChannel< EVENT_STORE extends EventStore = EventStore, @@ -28,6 +29,7 @@ export class AggregateExistsMessageChannel< EventStore, EventStoreAggregateExistsMessage >, + options?: PublishMessageOptions, ) => Promise; publishMessages: ( aggregateExistsMessages: $Contravariant< @@ -35,6 +37,7 @@ export class AggregateExistsMessageChannel< EventStore, EventStoreAggregateExistsMessage >[], + options?: PublishMessageOptions, ) => Promise; constructor({ @@ -87,23 +90,33 @@ export class AggregateExistsMessageChannel< return eventStore; }; - this.publishMessage = async aggregateExistsMessage => { + this.publishMessage = async ( + aggregateExistsMessage, + { replay = false } = {}, + ) => { const { eventStoreId } = aggregateExistsMessage; this.getEventStore(eventStoreId); const messageChannelAdapter = this.getMessageChannelAdapter(); - await messageChannelAdapter.publishMessage(aggregateExistsMessage); + await messageChannelAdapter.publishMessage(aggregateExistsMessage, { + replay, + }); }; - this.publishMessages = async aggregateExistsMessages => { + this.publishMessages = async ( + aggregateExistsMessages, + { replay = false } = {}, + ) => { for (const aggregateExistsMessage of aggregateExistsMessages) { const { eventStoreId } = aggregateExistsMessage; this.getEventStore(eventStoreId); } const messageChannelAdapter = this.getMessageChannelAdapter(); - await messageChannelAdapter.publishMessages(aggregateExistsMessages); + await messageChannelAdapter.publishMessages(aggregateExistsMessages, { + replay, + }); }; } } diff --git a/packages/core/src/messaging/channel/index.ts b/packages/core/src/messaging/channel/index.ts index aaff3bc0..e8e181a8 100644 --- a/packages/core/src/messaging/channel/index.ts +++ b/packages/core/src/messaging/channel/index.ts @@ -5,6 +5,7 @@ export type { MessageChannelSourceEventStoreIds, MessageChannelSourceEventStoreIdTypes, } from './generics'; +export type { PublishMessageOptions } from './types'; export type { MessageChannelAdapter } from './messageChannelAdapter'; export { AggregateExistsMessageChannel } from './aggregateExistsMessageChannel'; export { NotificationMessageChannel } from './notificationMessageChannel'; diff --git a/packages/core/src/messaging/channel/messageChannelAdapter.ts b/packages/core/src/messaging/channel/messageChannelAdapter.ts index e308f608..4ce9064f 100644 --- a/packages/core/src/messaging/channel/messageChannelAdapter.ts +++ b/packages/core/src/messaging/channel/messageChannelAdapter.ts @@ -1,6 +1,13 @@ import type { Message } from '../message'; +import type { PublishMessageOptions } from './types'; export interface MessageChannelAdapter { - publishMessage: (message: Message) => Promise; - publishMessages: (messages: Message[]) => Promise; + publishMessage: ( + message: Message, + options?: PublishMessageOptions, + ) => Promise; + publishMessages: ( + messages: Message[], + options?: PublishMessageOptions, + ) => Promise; } diff --git a/packages/core/src/messaging/channel/notificationMessageChannel.ts b/packages/core/src/messaging/channel/notificationMessageChannel.ts index a7c225c8..9989389f 100644 --- a/packages/core/src/messaging/channel/notificationMessageChannel.ts +++ b/packages/core/src/messaging/channel/notificationMessageChannel.ts @@ -7,6 +7,7 @@ import { UndefinedMessageChannelAdapterError, } from './errors'; import type { MessageChannelAdapter } from './messageChannelAdapter'; +import type { PublishMessageOptions } from './types'; export class NotificationMessageChannel< EVENT_STORE extends EventStore = EventStore, @@ -28,6 +29,7 @@ export class NotificationMessageChannel< EventStore, EventStoreNotificationMessage >, + options?: PublishMessageOptions, ) => Promise; publishMessages: ( notificationMessages: $Contravariant< @@ -35,6 +37,7 @@ export class NotificationMessageChannel< EventStore, EventStoreNotificationMessage >[], + options?: PublishMessageOptions, ) => Promise; constructor({ @@ -87,23 +90,33 @@ export class NotificationMessageChannel< return eventStore; }; - this.publishMessage = async notificationMessage => { + this.publishMessage = async ( + notificationMessage, + { replay = false } = {}, + ) => { const { eventStoreId } = notificationMessage; this.getEventStore(eventStoreId); const messageChannelAdapter = this.getMessageChannelAdapter(); - await messageChannelAdapter.publishMessage(notificationMessage); + await messageChannelAdapter.publishMessage(notificationMessage, { + replay, + }); }; - this.publishMessages = async notificationMessages => { + this.publishMessages = async ( + notificationMessages, + { replay = false } = {}, + ) => { for (const notificationMessage of notificationMessages) { const { eventStoreId } = notificationMessage; this.getEventStore(eventStoreId); } const messageChannelAdapter = this.getMessageChannelAdapter(); - await messageChannelAdapter.publishMessages(notificationMessages); + await messageChannelAdapter.publishMessages(notificationMessages, { + replay, + }); }; } } diff --git a/packages/core/src/messaging/channel/stateCarryingMessageChannel.ts b/packages/core/src/messaging/channel/stateCarryingMessageChannel.ts index 504caa06..b769d513 100644 --- a/packages/core/src/messaging/channel/stateCarryingMessageChannel.ts +++ b/packages/core/src/messaging/channel/stateCarryingMessageChannel.ts @@ -10,6 +10,7 @@ import { UndefinedMessageChannelAdapterError, } from './errors'; import type { MessageChannelAdapter } from './messageChannelAdapter'; +import type { PublishMessageOptions } from './types'; export class StateCarryingMessageChannel< EVENT_STORE extends EventStore = EventStore, @@ -31,6 +32,7 @@ export class StateCarryingMessageChannel< EventStore, EventStoreStateCarryingMessage >, + options?: PublishMessageOptions, ) => Promise; getAggregateAndPublishMessage: ( notificationMessage: $Contravariant< @@ -38,6 +40,7 @@ export class StateCarryingMessageChannel< EventStore, EventStoreNotificationMessage >, + options?: PublishMessageOptions, ) => Promise; publishMessages: ( @@ -46,6 +49,7 @@ export class StateCarryingMessageChannel< EventStore, EventStoreStateCarryingMessage >[], + options?: PublishMessageOptions, ) => Promise; constructor({ @@ -98,16 +102,24 @@ export class StateCarryingMessageChannel< return eventStore; }; - this.publishMessage = async stateCarryingMessage => { + this.publishMessage = async ( + stateCarryingMessage, + { replay = false } = {}, + ) => { const { eventStoreId } = stateCarryingMessage; this.getEventStore(eventStoreId); const messageChannelAdapter = this.getMessageChannelAdapter(); - await messageChannelAdapter.publishMessage(stateCarryingMessage); + await messageChannelAdapter.publishMessage(stateCarryingMessage, { + replay, + }); }; - this.getAggregateAndPublishMessage = async notificationMessage => { + this.getAggregateAndPublishMessage = async ( + notificationMessage, + { replay = false } = {}, + ) => { const { eventStoreId, event } = notificationMessage; const { aggregateId, version } = event; @@ -117,10 +129,16 @@ export class StateCarryingMessageChannel< maxVersion: version, }); - await this.publishMessage({ ...notificationMessage, aggregate }); + await this.publishMessage( + { ...notificationMessage, aggregate }, + { replay }, + ); }; - this.publishMessages = async stateCarryingMessages => { + this.publishMessages = async ( + stateCarryingMessages, + { replay = false } = {}, + ) => { for (const stateCarryingMessage of stateCarryingMessages) { const { eventStoreId } = stateCarryingMessage; this.getEventStore(eventStoreId); @@ -128,7 +146,9 @@ export class StateCarryingMessageChannel< const messageChannelAdapter = this.getMessageChannelAdapter(); - await messageChannelAdapter.publishMessages(stateCarryingMessages); + await messageChannelAdapter.publishMessages(stateCarryingMessages, { + replay, + }); }; } } diff --git a/packages/core/src/messaging/channel/types.ts b/packages/core/src/messaging/channel/types.ts new file mode 100644 index 00000000..cf103c81 --- /dev/null +++ b/packages/core/src/messaging/channel/types.ts @@ -0,0 +1,3 @@ +export type PublishMessageOptions = { + replay?: boolean; +}; diff --git a/packages/core/src/messaging/index.ts b/packages/core/src/messaging/index.ts index e987cbc5..6da8bc8a 100644 --- a/packages/core/src/messaging/index.ts +++ b/packages/core/src/messaging/index.ts @@ -1,4 +1,19 @@ -export * from './channel'; +export { + AggregateExistsMessageChannel, + NotificationMessageChannel, + StateCarryingMessageChannel, + MessageChannelEventStoreNotFoundError, + UndefinedMessageChannelAdapterError, +} from './channel'; +export type { + EventStoreMessageChannel, + MessageChannelSourceEventStores, + MessageChannelMessage, + MessageChannelSourceEventStoreIds, + MessageChannelSourceEventStoreIdTypes, + PublishMessageOptions, + MessageChannelAdapter, +} from './channel'; export * from './bus'; export * from './queue'; export type { diff --git a/packages/core/src/storageAdapter.ts b/packages/core/src/storageAdapter.ts index cf2a9de7..ba767944 100644 --- a/packages/core/src/storageAdapter.ts +++ b/packages/core/src/storageAdapter.ts @@ -11,6 +11,10 @@ export type EventsQueryOptions = { export type EventStoreContext = { eventStoreId: string }; +export type PushEventOptions = EventStoreContext & { + force?: boolean; +}; + export type ListAggregateIdsOptions = { limit?: number; pageToken?: string; @@ -46,7 +50,7 @@ export interface StorageAdapter { ) => Promise<{ events: EventDetail[] }>; pushEvent: ( eventDetail: OptionalTimestamp, - context: EventStoreContext, + options: PushEventOptions, ) => Promise<{ event: EventDetail }>; pushEventGroup: ( ...groupedEvents: [GroupedEvent, ...GroupedEvent[]] diff --git a/packages/dam/README.md b/packages/dam/README.md index 19842fba..421b6c9a 100644 --- a/packages/dam/README.md +++ b/packages/dam/README.md @@ -26,7 +26,7 @@ yarn add @castore/core `@castore/dam` exposes a series of utils that scan past events and re-publish them in [message channels](https://github.com/castore-dev/castore#--event-driven-architecture) – or _"pour them"_ as in _"pouring water from a container to another"_ 🫗. -Those utils are typically very useful for data maintenance and migration, and can be rate limited to limit impact on production traffic. They are the following: +Those utils are typically very useful for data maintenance and migration. They publish messages with the `replay` option enabled and can be **rate limited** to limit impact on production traffic. They are the following: - [`pourEventStoreAggregateIds`](#poureventstoreaggregateids): Pour all the aggregate ids of an event store in an `AggregateExistsMessageChannel`. - [`pourAggregateEvents`](#pouraggregateevents): Pour all the events of a specific aggregate in a provided `NotificationMessageChannel`. diff --git a/packages/dam/src/pourAggregateEvents/pourAggregateEvents.ts b/packages/dam/src/pourAggregateEvents/pourAggregateEvents.ts index f8b25bec..fcc99911 100644 --- a/packages/dam/src/pourAggregateEvents/pourAggregateEvents.ts +++ b/packages/dam/src/pourAggregateEvents/pourAggregateEvents.ts @@ -3,6 +3,7 @@ import type { EventStore, EventStoreNotificationMessage, EventsQueryOptions, + PublishMessageOptions, } from '@castore/core'; import { getThrottle } from '~/utils/getThrottle'; @@ -13,6 +14,7 @@ interface Props { messageChannel: { publishMessage: ( message: EventStoreNotificationMessage, + options?: PublishMessageOptions, ) => Promise; }; aggregateId: string; @@ -44,10 +46,13 @@ export const pourAggregateEvents = async ({ for (const eventToPour of eventsToPour) { await throttle(() => - messageChannel.publishMessage({ - eventStoreId: eventStore.eventStoreId, - event: eventToPour, - } as EventStoreNotificationMessage), + messageChannel.publishMessage( + { + eventStoreId: eventStore.eventStoreId, + event: eventToPour, + } as EventStoreNotificationMessage, + { replay: true }, + ), ); } diff --git a/packages/dam/src/pourAggregateEvents/pourAggregateEvents.unit.test.ts b/packages/dam/src/pourAggregateEvents/pourAggregateEvents.unit.test.ts index 2c6c5a84..5d5b5252 100644 --- a/packages/dam/src/pourAggregateEvents/pourAggregateEvents.unit.test.ts +++ b/packages/dam/src/pourAggregateEvents/pourAggregateEvents.unit.test.ts @@ -4,7 +4,10 @@ import { EventStoreId, EventsQueryOptions, } from '@castore/core'; -import { InMemoryMessageQueueAdapter } from '@castore/in-memory-message-queue-adapter'; +import { + InMemoryMessageQueueAdapter, + TaskContext, +} from '@castore/in-memory-message-queue-adapter'; import { pokemonEventStore, @@ -22,12 +25,13 @@ const messageQueue = new NotificationMessageQueue({ let receivedMessages: { date: Date; message: NotificationMessage>; + context: TaskContext; }[] = []; InMemoryMessageQueueAdapter.attachTo(messageQueue, { - worker: message => + worker: (message, context) => new Promise(resolve => { - receivedMessages.push({ date: new Date(), message }); + receivedMessages.push({ date: new Date(), message, context }); resolve(); }), }); @@ -56,6 +60,7 @@ describe('pourAggregateEvents', () => { eventStoreId: pokemonEvtStoreId, event: pikachuEvents[0], }); + expect(receivedMessages[0]?.context).toMatchObject({ replay: true }); expect(receivedMessages[1]?.message).toStrictEqual({ eventStoreId: pokemonEvtStoreId, event: pikachuEvents[1], diff --git a/packages/dam/src/pourEventStoreAggregateIds/pourEventStoreAggregateIds.ts b/packages/dam/src/pourEventStoreAggregateIds/pourEventStoreAggregateIds.ts index 91e79395..c4e9df74 100644 --- a/packages/dam/src/pourEventStoreAggregateIds/pourEventStoreAggregateIds.ts +++ b/packages/dam/src/pourEventStoreAggregateIds/pourEventStoreAggregateIds.ts @@ -3,6 +3,7 @@ import type { EventStore, EventStoreId, ListAggregateIdsOptions, + PublishMessageOptions, } from '@castore/core'; import type { ScanInfos } from '~/types'; @@ -14,6 +15,7 @@ interface Props { messageChannel: { publishMessage: ( messages: AggregateExistsMessage>, + options?: PublishMessageOptions, ) => Promise; }; options?: Omit; @@ -53,7 +55,10 @@ export const pourEventStoreAggregateIds = async < for (const aggregateId of aggregateIds) { await throttle(() => - messageChannel.publishMessage({ eventStoreId, aggregateId }), + messageChannel.publishMessage( + { eventStoreId, aggregateId }, + { replay: true }, + ), ); } diff --git a/packages/dam/src/pourEventStoreAggregateIds/pourEventStoreAggregateIds.unit.test.ts b/packages/dam/src/pourEventStoreAggregateIds/pourEventStoreAggregateIds.unit.test.ts index bede578a..4df963c1 100644 --- a/packages/dam/src/pourEventStoreAggregateIds/pourEventStoreAggregateIds.unit.test.ts +++ b/packages/dam/src/pourEventStoreAggregateIds/pourEventStoreAggregateIds.unit.test.ts @@ -4,7 +4,10 @@ import { EventStoreId, ListAggregateIdsOptions, } from '@castore/core'; -import { InMemoryMessageQueueAdapter } from '@castore/in-memory-message-queue-adapter'; +import { + InMemoryMessageQueueAdapter, + TaskContext, +} from '@castore/in-memory-message-queue-adapter'; import { pokemonEventStore, @@ -23,12 +26,13 @@ const messageQueue = new AggregateExistsMessageQueue({ let receivedMessages: { date: Date; message: AggregateExistsMessage>; + context: TaskContext; }[] = []; InMemoryMessageQueueAdapter.attachTo(messageQueue, { - worker: message => + worker: (message, context) => new Promise(resolve => { - receivedMessages.push({ date: new Date(), message }); + receivedMessages.push({ date: new Date(), message, context }); resolve(); }), }); @@ -58,6 +62,9 @@ describe('pourEventStoreAggregateIds', () => { eventStoreId: pokemonEvtStoreId, aggregateId: pikachuId, }); + expect(receivedMessages[0]?.context).toMatchObject({ + replay: true, + }); expect(receivedMessages[1]?.message).toStrictEqual({ eventStoreId: pokemonEvtStoreId, aggregateId: charizardId, diff --git a/packages/dam/src/pourEventStoreCollectionEvents/pourEventStoreCollectionEvents.ts b/packages/dam/src/pourEventStoreCollectionEvents/pourEventStoreCollectionEvents.ts index 8b876353..0bb6822c 100644 --- a/packages/dam/src/pourEventStoreCollectionEvents/pourEventStoreCollectionEvents.ts +++ b/packages/dam/src/pourEventStoreCollectionEvents/pourEventStoreCollectionEvents.ts @@ -3,6 +3,7 @@ import type { EventStore, EventStoreId, EventStoreNotificationMessage, + PublishMessageOptions, } from '@castore/core'; import type { ScanInfos } from '~/types'; @@ -16,6 +17,7 @@ interface Props { messageChannel: { publishMessage: ( message: EventStoreNotificationMessage, + options?: PublishMessageOptions, ) => Promise; }; filters?: { from?: string; to?: string }; @@ -146,7 +148,7 @@ export const pourEventStoreCollectionEvents = async < messagesToPour.filterByTimestamp({ from, to }); messagesToPour.sortByTimestamp(); - await messagePourer.pourMessageBatch(messagesToPour); + await messagePourer.pourMessageBatch(messagesToPour, { replay: true }); } while (!areAllCollectionAggregatesScanned); return { diff --git a/packages/dam/src/pourEventStoreCollectionEvents/pourEventStoreCollectionEvents.unit.test.ts b/packages/dam/src/pourEventStoreCollectionEvents/pourEventStoreCollectionEvents.unit.test.ts index fe300981..ff1b42ea 100644 --- a/packages/dam/src/pourEventStoreCollectionEvents/pourEventStoreCollectionEvents.unit.test.ts +++ b/packages/dam/src/pourEventStoreCollectionEvents/pourEventStoreCollectionEvents.unit.test.ts @@ -3,7 +3,10 @@ import { NotificationMessageQueue, EventStoreId, } from '@castore/core'; -import { InMemoryMessageQueueAdapter } from '@castore/in-memory-message-queue-adapter'; +import { + InMemoryMessageQueueAdapter, + TaskContext, +} from '@castore/in-memory-message-queue-adapter'; import { pokemonEventStore, @@ -33,12 +36,13 @@ let receivedMessages: { message: NotificationMessage< EventStoreId >; + context: TaskContext; }[] = []; InMemoryMessageQueueAdapter.attachTo(messageQueue, { - worker: message => + worker: (message, context) => new Promise(resolve => { - receivedMessages.push({ date: new Date(), message }); + receivedMessages.push({ date: new Date(), message, context }); resolve(); }), }); @@ -77,6 +81,10 @@ describe('pourEventStoreEvents', () => { // 2020-12-01T00:00:00.000Z event: ashKetchumEvents[0], }); + expect(receivedMessages[0]?.context).toMatchObject({ + replay: true, + }); + expect(receivedMessages[1]?.message).toStrictEqual({ eventStoreId: pokemonEvtStoreId, // 2021-01-01T00:00:00.000Z diff --git a/packages/dam/src/pourEventStoreEvents/pourEventStoreEvents.ts b/packages/dam/src/pourEventStoreEvents/pourEventStoreEvents.ts index 47226e4a..b0d3fdbd 100644 --- a/packages/dam/src/pourEventStoreEvents/pourEventStoreEvents.ts +++ b/packages/dam/src/pourEventStoreEvents/pourEventStoreEvents.ts @@ -1,5 +1,9 @@ /* eslint-disable @typescript-eslint/no-non-null-assertion */ -import type { EventStore, EventStoreNotificationMessage } from '@castore/core'; +import type { + EventStore, + EventStoreNotificationMessage, + PublishMessageOptions, +} from '@castore/core'; import type { ScanInfos } from '~/types'; import { EventBook } from '~/utils/eventBook'; @@ -11,6 +15,7 @@ interface Props { messageChannel: { publishMessage: ( message: EventStoreNotificationMessage, + options?: PublishMessageOptions, ) => Promise; }; filters?: { from?: string; to?: string }; @@ -76,7 +81,7 @@ export const pourEventStoreEvents = async ({ messagesToPour.filterByTimestamp({ from, to }); messagesToPour.sortByTimestamp(); - await messagePourer.pourMessageBatch(messagesToPour); + await messagePourer.pourMessageBatch(messagesToPour, { replay: true }); pageToken = nextPageToken; } while (!areAllAggregatesScanned); diff --git a/packages/dam/src/pourEventStoreEvents/pourEventStoreEvents.unit.test.ts b/packages/dam/src/pourEventStoreEvents/pourEventStoreEvents.unit.test.ts index be5467c0..5004fe90 100644 --- a/packages/dam/src/pourEventStoreEvents/pourEventStoreEvents.unit.test.ts +++ b/packages/dam/src/pourEventStoreEvents/pourEventStoreEvents.unit.test.ts @@ -3,7 +3,10 @@ import { NotificationMessageQueue, EventStoreId, } from '@castore/core'; -import { InMemoryMessageQueueAdapter } from '@castore/in-memory-message-queue-adapter'; +import { + InMemoryMessageQueueAdapter, + TaskContext, +} from '@castore/in-memory-message-queue-adapter'; import { pokemonEventStore, @@ -24,12 +27,13 @@ const messageQueue = new NotificationMessageQueue({ let receivedMessages: { date: Date; message: NotificationMessage>; + context: TaskContext; }[] = []; InMemoryMessageQueueAdapter.attachTo(messageQueue, { - worker: message => + worker: (message, context) => new Promise(resolve => { - receivedMessages.push({ date: new Date(), message }); + receivedMessages.push({ date: new Date(), message, context }); resolve(); }), }); @@ -56,6 +60,10 @@ describe('pourEventStoreEvents', () => { eventStoreId: pokemonEvtStoreId, event: pikachuEvents[0], }); + expect(receivedMessages[0]?.context).toMatchObject({ + replay: true, + }); + expect(receivedMessages[1]?.message).toStrictEqual({ eventStoreId: pokemonEvtStoreId, event: pikachuEvents[1], diff --git a/packages/dam/src/utils/messagePourer.ts b/packages/dam/src/utils/messagePourer.ts index d3f2cabf..0834fa04 100644 --- a/packages/dam/src/utils/messagePourer.ts +++ b/packages/dam/src/utils/messagePourer.ts @@ -1,4 +1,8 @@ -import type { EventStore, EventStoreNotificationMessage } from '@castore/core'; +import { + EventStore, + EventStoreNotificationMessage, + PublishMessageOptions, +} from '@castore/core'; import { getThrottle } from '~/utils/getThrottle'; @@ -8,6 +12,7 @@ export class MessagePourer { messageChannel: { publishMessage: ( message: EventStoreNotificationMessage, + options?: PublishMessageOptions, ) => Promise; }; pouredEventCount: number; @@ -19,6 +24,7 @@ export class MessagePourer { messageChannel: { publishMessage: ( message: EventStoreNotificationMessage, + options?: PublishMessageOptions, ) => Promise; }, rateLimit = Infinity, @@ -28,11 +34,14 @@ export class MessagePourer { this.throttle = getThrottle(rateLimit); } - pourMessageBatch = async ({ - messages, - }: MessageBatch): Promise => { + pourMessageBatch = async ( + { messages }: MessageBatch, + options: PublishMessageOptions = {}, + ): Promise => { for (const message of messages) { - await this.throttle(() => this.messageChannel.publishMessage(message)); + await this.throttle(() => + this.messageChannel.publishMessage(message, options), + ); this.pouredEventCount += 1; } diff --git a/packages/dynamodb-event-storage-adapter/src/adapter.ts b/packages/dynamodb-event-storage-adapter/src/adapter.ts index 8f6db22e..5f2b164d 100644 --- a/packages/dynamodb-event-storage-adapter/src/adapter.ts +++ b/packages/dynamodb-event-storage-adapter/src/adapter.ts @@ -10,12 +10,13 @@ import { } from '@aws-sdk/client-dynamodb'; import { marshall, unmarshall } from '@aws-sdk/util-dynamodb'; -import { +import type { Aggregate, EventDetail, - GroupedEvent, + PushEventOptions, StorageAdapter, } from '@castore/core'; +import { GroupedEvent } from '@castore/core'; import { EVENT_TABLE_INITIAL_EVENT_INDEX_NAME, @@ -122,7 +123,10 @@ const parseGroupedEvents = ( */ export class DynamoDbEventStorageAdapter implements StorageAdapter { getEvents: StorageAdapter['getEvents']; - getPushEventInput: (eventDetail: EventDetail) => PutItemCommandInput; + getPushEventInput: ( + eventDetail: EventDetail, + options: PushEventOptions, + ) => PutItemCommandInput; pushEvent: StorageAdapter['pushEvent']; pushEventGroup: StorageAdapter['pushEventGroup']; groupEvent: StorageAdapter['groupEvent']; @@ -224,9 +228,10 @@ export class DynamoDbEventStorageAdapter implements StorageAdapter { }; }; - this.getPushEventInput = event => { + this.getPushEventInput = (event, options) => { const { aggregateId, version, type, timestamp, payload, metadata } = event; + const force = options.force ?? false; return { TableName: this.getTableName(), @@ -242,17 +247,23 @@ export class DynamoDbEventStorageAdapter implements StorageAdapter { }, MARSHALL_OPTIONS, ), - ExpressionAttributeNames: { '#version': EVENT_TABLE_SK }, - ConditionExpression: 'attribute_not_exists(#version)', + ...(force + ? {} + : { + ExpressionAttributeNames: { '#version': EVENT_TABLE_SK }, + ConditionExpression: 'attribute_not_exists(#version)', + }), }; }; - this.pushEvent = async (eventWithOptTimestamp, context) => { + this.pushEvent = async (eventWithOptTimestamp, options) => { const event = { timestamp: new Date().toISOString(), ...eventWithOptTimestamp, }; - const putEventCommand = new PutItemCommand(this.getPushEventInput(event)); + const putEventCommand = new PutItemCommand( + this.getPushEventInput(event, options), + ); const { aggregateId, version } = event; @@ -263,7 +274,7 @@ export class DynamoDbEventStorageAdapter implements StorageAdapter { error instanceof Error && isConditionalCheckFailedException(error) ) { - const { eventStoreId } = context; + const { eventStoreId } = options; throw new DynamoDBEventAlreadyExistsError({ eventStoreId, @@ -293,10 +304,10 @@ export class DynamoDbEventStorageAdapter implements StorageAdapter { await dynamodbClient.send( new TransactWriteItemsCommand({ TransactItems: groupedEvents.map(groupedEvent => ({ - Put: groupedEvent.eventStorageAdapter.getPushEventInput({ - timestamp, - ...groupedEvent.event, - }), + Put: groupedEvent.eventStorageAdapter.getPushEventInput( + { timestamp, ...groupedEvent.event }, + { eventStoreId: groupedEvent.context.eventStoreId }, + ), })), }), ); diff --git a/packages/dynamodb-event-storage-adapter/src/adapter.unit.test.ts b/packages/dynamodb-event-storage-adapter/src/adapter.unit.test.ts index 061f1f0c..5367c147 100644 --- a/packages/dynamodb-event-storage-adapter/src/adapter.unit.test.ts +++ b/packages/dynamodb-event-storage-adapter/src/adapter.unit.test.ts @@ -98,6 +98,17 @@ describe('DynamoDbEventStorageAdapter', () => { MockDate.reset(); }); + + it('does not add condition if force option is set to true', async () => { + await adapter.pushEvent(secondEvent, { eventStoreId, force: true }); + + // regularly check if vitest matchers are available (toHaveReceivedCommandWith) + // https://github.com/m-radzikowski/aws-sdk-client-mock/issues/139 + expect(dynamoDbClientMock.calls()).toHaveLength(1); + const input = dynamoDbClientMock.call(0).args[0].input; + expect(input).not.toHaveProperty('ConditionExpression'); + expect(input).not.toHaveProperty('ExpressionAttributeNames'); + }); }); describe('table name getter', () => { diff --git a/packages/dynamodb-event-storage-adapter/src/singleTableAdapter.ts b/packages/dynamodb-event-storage-adapter/src/singleTableAdapter.ts index cae6dcd8..750309ee 100644 --- a/packages/dynamodb-event-storage-adapter/src/singleTableAdapter.ts +++ b/packages/dynamodb-event-storage-adapter/src/singleTableAdapter.ts @@ -10,13 +10,13 @@ import { } from '@aws-sdk/client-dynamodb'; import { marshall, unmarshall } from '@aws-sdk/util-dynamodb'; -import { +import type { Aggregate, EventDetail, - GroupedEvent, StorageAdapter, - EventStoreContext, + PushEventOptions, } from '@castore/core'; +import { GroupedEvent } from '@castore/core'; import { EVENT_TABLE_EVENT_STORE_ID_KEY, @@ -134,7 +134,7 @@ export class DynamoDbSingleTableEventStorageAdapter implements StorageAdapter { getEvents: StorageAdapter['getEvents']; getPushEventInput: ( eventDetail: EventDetail, - context: EventStoreContext, + options: PushEventOptions, ) => PutItemCommandInput; pushEvent: StorageAdapter['pushEvent']; pushEventGroup: StorageAdapter['pushEventGroup']; @@ -237,10 +237,10 @@ export class DynamoDbSingleTableEventStorageAdapter implements StorageAdapter { }; }; - this.getPushEventInput = (event, context) => { + this.getPushEventInput = (event, options) => { const { aggregateId, version, type, timestamp, payload, metadata } = event; - const { eventStoreId } = context; + const { eventStoreId, force = false } = options; return { TableName: this.getTableName(), @@ -256,18 +256,23 @@ export class DynamoDbSingleTableEventStorageAdapter implements StorageAdapter { }, MARSHALL_OPTIONS, ), - ExpressionAttributeNames: { '#version': EVENT_TABLE_SK }, - ConditionExpression: 'attribute_not_exists(#version)', + ...(force + ? {} + : { + ExpressionAttributeNames: { '#version': EVENT_TABLE_SK }, + ConditionExpression: 'attribute_not_exists(#version)', + }), }; }; - this.pushEvent = async (eventWithOptTimestamp, context) => { + this.pushEvent = async (eventWithOptTimestamp, options) => { const event = { timestamp: new Date().toISOString(), ...eventWithOptTimestamp, }; + const putEventCommand = new PutItemCommand( - this.getPushEventInput(event, context), + this.getPushEventInput(event, options), ); const { aggregateId, version } = event; @@ -279,7 +284,7 @@ export class DynamoDbSingleTableEventStorageAdapter implements StorageAdapter { error instanceof Error && isConditionalCheckFailedException(error) ) { - const { eventStoreId } = context; + const { eventStoreId } = options; throw new DynamoDBEventAlreadyExistsError({ eventStoreId, diff --git a/packages/dynamodb-event-storage-adapter/src/singleTableAdapter.unit.test.ts b/packages/dynamodb-event-storage-adapter/src/singleTableAdapter.unit.test.ts index aa6d2b0b..42601f7e 100644 --- a/packages/dynamodb-event-storage-adapter/src/singleTableAdapter.unit.test.ts +++ b/packages/dynamodb-event-storage-adapter/src/singleTableAdapter.unit.test.ts @@ -109,6 +109,17 @@ describe('DynamoDbEventStorageAdapter', () => { MockDate.reset(); }); + + it('does not add condition if force option is set to true', async () => { + await adapter.pushEvent(secondEvent, { eventStoreId, force: true }); + + // regularly check if vitest matchers are available (toHaveReceivedCommandWith) + // https://github.com/m-radzikowski/aws-sdk-client-mock/issues/139 + expect(dynamoDbClientMock.calls()).toHaveLength(1); + const input = dynamoDbClientMock.call(0).args[0].input; + expect(input).not.toHaveProperty('ConditionExpression'); + expect(input).not.toHaveProperty('ExpressionAttributeNames'); + }); }); describe('table name getter', () => { diff --git a/packages/dynamodb-event-storage-adapter/src/utils/formatEventForTransaction.ts b/packages/dynamodb-event-storage-adapter/src/utils/formatEventForTransaction.ts index 94e38997..1aa6e027 100644 --- a/packages/dynamodb-event-storage-adapter/src/utils/formatEventForTransaction.ts +++ b/packages/dynamodb-event-storage-adapter/src/utils/formatEventForTransaction.ts @@ -31,7 +31,7 @@ export const formatEventForTransaction = ( return { transactItem: { - Put: storageAdapter.getPushEventInput(eventDetail), + Put: storageAdapter.getPushEventInput(eventDetail, { eventStoreId }), }, dynamoDbClient, }; diff --git a/packages/dynamodb-event-storage-adapter/src/utils/formatEventForTransaction.unit.test.ts b/packages/dynamodb-event-storage-adapter/src/utils/formatEventForTransaction.unit.test.ts index 570e81c6..8741130e 100644 --- a/packages/dynamodb-event-storage-adapter/src/utils/formatEventForTransaction.unit.test.ts +++ b/packages/dynamodb-event-storage-adapter/src/utils/formatEventForTransaction.unit.test.ts @@ -25,7 +25,9 @@ describe('formatEventForTransaction', () => { ).toStrictEqual({ dynamoDbClient: dynamoDbClientMock, transactItem: { - Put: storageAdapter.getPushEventInput(pikachuAppearedEvent), + Put: storageAdapter.getPushEventInput(pikachuAppearedEvent, { + eventStoreId: pokemonsEventStore.eventStoreId, + }), }, }); }); diff --git a/packages/event-bridge-message-bus-adapter/README.md b/packages/event-bridge-message-bus-adapter/README.md index 16d454d6..9e9b774b 100644 --- a/packages/event-bridge-message-bus-adapter/README.md +++ b/packages/event-bridge-message-bus-adapter/README.md @@ -103,6 +103,25 @@ When publishing a message, its `eventStoreId` is used as the message `source` an } ``` +If the `replay` option is set to `true` when publishing a notification or state-carrying message, the `"detail-type"` will be set to `"__REPLAYED__"`. This makes sure that any subscription to replayed events is **opt-in**: + +```ts +// 👇 Replayed notification message +{ + "source": "POKEMONS", + "detail-type": "__REPLAYED__", + "detail": { + "eventStoreId": "POKEMONS", + "event": { + "aggregateId": "123", + "type": "POKEMON_APPEARED", // <= event type still available + ... + }, + }, + ... +} +``` + On the listeners side, you can use the `EventBridgeMessageBusMessage` TS type to type your argument: ```ts diff --git a/packages/event-bridge-message-bus-adapter/src/adapter.ts b/packages/event-bridge-message-bus-adapter/src/adapter.ts index 8cb2e584..0ca8280d 100644 --- a/packages/event-bridge-message-bus-adapter/src/adapter.ts +++ b/packages/event-bridge-message-bus-adapter/src/adapter.ts @@ -4,7 +4,11 @@ import { } from '@aws-sdk/client-eventbridge'; import chunk from 'lodash.chunk'; -import type { Message, MessageChannelAdapter } from '@castore/core'; +import type { + Message, + MessageChannelAdapter, + PublishMessageOptions, +} from '@castore/core'; import { isAggregateExistsMessage, isEventCarryingMessage, @@ -12,9 +16,12 @@ import { export const EVENTBRIDGE_MAX_ENTRIES_BATCH_SIZE = 10; -const getDetailType = (message: Message): string => { +const getDetailType = ( + message: Message, + { replay = false }: PublishMessageOptions = {}, +): string => { if (isEventCarryingMessage(message)) { - return message.event.type; + return replay ? '__REPLAYED__' : message.event.type; } if (isAggregateExistsMessage(message)) { @@ -46,7 +53,7 @@ export class EventBridgeMessageBusAdapter implements MessageChannelAdapter { ? this.eventBusName : this.eventBusName(); - this.publishMessage = async message => { + this.publishMessage = async (message, options) => { const { eventStoreId } = message; await this.eventBridgeClient.send( @@ -55,7 +62,7 @@ export class EventBridgeMessageBusAdapter implements MessageChannelAdapter { { EventBusName: this.getEventBusName(), Source: eventStoreId, - DetailType: getDetailType(message), + DetailType: getDetailType(message, options), Detail: JSON.stringify(message), }, ], @@ -63,7 +70,7 @@ export class EventBridgeMessageBusAdapter implements MessageChannelAdapter { ); }; - this.publishMessages = async messages => { + this.publishMessages = async (messages, options) => { for (const chunkMessages of chunk( messages, EVENTBRIDGE_MAX_ENTRIES_BATCH_SIZE, @@ -73,7 +80,7 @@ export class EventBridgeMessageBusAdapter implements MessageChannelAdapter { Entries: chunkMessages.map(message => ({ EventBusName: this.getEventBusName(), Source: message.eventStoreId, - DetailType: getDetailType(message), + DetailType: getDetailType(message, options), Detail: JSON.stringify(message), })), }), diff --git a/packages/event-bridge-message-bus-adapter/src/adapter.unit.test.ts b/packages/event-bridge-message-bus-adapter/src/adapter.unit.test.ts index e40508d7..b4cb3e54 100644 --- a/packages/event-bridge-message-bus-adapter/src/adapter.unit.test.ts +++ b/packages/event-bridge-message-bus-adapter/src/adapter.unit.test.ts @@ -5,7 +5,7 @@ import { import { mockClient } from 'aws-sdk-client-mock'; import type { A } from 'ts-toolbelt'; -import type { Message } from '@castore/core'; +import type { Message, PublishMessageOptions } from '@castore/core'; import { EventBridgeMessageBusAdapter, @@ -14,142 +14,174 @@ import { const eventBridgeClientMock = mockClient(EventBridgeClient); -describe('EventBridgeMessageBusAdapter', () => { - const eventBusNameMock = 'my-event-bus'; - - const eventStoreIdMock = 'my-event-store'; - - const eventMock = { - aggregateId: 'my-aggregate-id', - version: 1, - type: 'my-event-type', - timestamp: new Date().toISOString(), - }; - - const otherEventMock = { - aggregateId: 'my-aggregate-id', - version: 2, - type: 'my-event-type-2', - timestamp: new Date().toISOString(), - }; - - const messageMock = { - eventStoreId: eventStoreIdMock, - event: eventMock, - }; - - const otherMessageMock = { - eventStoreId: eventStoreIdMock, - event: otherEventMock, - }; +const eventBusNameMock = 'my-event-bus'; + +const adapter = new EventBridgeMessageBusAdapter({ + eventBusName: eventBusNameMock, + eventBridgeClient: eventBridgeClientMock as unknown as EventBridgeClient, +}); + +const eventStoreIdMock = 'my-event-store'; + +const eventMock = { + aggregateId: 'my-aggregate-id', + version: 1, + type: 'my-event-type', + timestamp: new Date().toISOString(), +}; + +const otherEventMock = { + aggregateId: 'my-aggregate-id', + version: 2, + type: 'my-event-type-2', + timestamp: new Date().toISOString(), +}; + +const messageMock = { + eventStoreId: eventStoreIdMock, + event: eventMock, +}; + +const otherMessageMock = { + eventStoreId: eventStoreIdMock, + event: otherEventMock, +}; +describe('EventBridgeMessageBusAdapter', () => { beforeEach(() => { eventBridgeClientMock.reset(); eventBridgeClientMock.on(PutEventsCommand).resolves({}); }); - it('send a PutEventsCommand to event bridge client on message published', async () => { - const adapter = new EventBridgeMessageBusAdapter({ - eventBusName: eventBusNameMock, - eventBridgeClient: eventBridgeClientMock as unknown as EventBridgeClient, - }); - - const assertMessage: A.Equals< - Parameters, - [Message] - > = 1; - assertMessage; - - await adapter.publishMessage(messageMock); - - // regularly check if vitest matchers are available (toHaveReceivedCommandWith) - // https://github.com/m-radzikowski/aws-sdk-client-mock/issues/139 - expect(eventBridgeClientMock.calls()).toHaveLength(1); - expect(eventBridgeClientMock.call(0).args[0].input).toMatchObject({ - Entries: [ - { - EventBusName: eventBusNameMock, - Source: eventStoreIdMock, - DetailType: eventMock.type, - Detail: JSON.stringify(messageMock), - }, - ], + describe('publishMessage', () => { + it('send a PutEventsCommand to event bridge client on message published', async () => { + const assertMessage: A.Equals< + Parameters, + [message: Message, options?: PublishMessageOptions | undefined] + > = 1; + assertMessage; + + await adapter.publishMessage(messageMock); + + // regularly check if vitest matchers are available (toHaveReceivedCommandWith) + // https://github.com/m-radzikowski/aws-sdk-client-mock/issues/139 + expect(eventBridgeClientMock.calls()).toHaveLength(1); + expect(eventBridgeClientMock.call(0).args[0].input).toMatchObject({ + Entries: [ + { + EventBusName: eventBusNameMock, + Source: eventStoreIdMock, + DetailType: eventMock.type, + Detail: JSON.stringify(messageMock), + }, + ], + }); }); - }); - it('send a PutEventsCommand to event bridge client on messages published', async () => { - const adapter = new EventBridgeMessageBusAdapter({ - eventBusName: eventBusNameMock, - eventBridgeClient: eventBridgeClientMock as unknown as EventBridgeClient, + it('sets detail-type as __REPLAYED__ when replay options is true', async () => { + await adapter.publishMessage(messageMock, { replay: true }); + + // regularly check if vitest matchers are available (toHaveReceivedCommandWith) + // https://github.com/m-radzikowski/aws-sdk-client-mock/issues/139 + expect(eventBridgeClientMock.calls()).toHaveLength(1); + expect(eventBridgeClientMock.call(0).args[0].input).toMatchObject({ + Entries: [ + { + EventBusName: eventBusNameMock, + Source: eventStoreIdMock, + DetailType: '__REPLAYED__', + Detail: JSON.stringify(messageMock), + }, + ], + }); }); - const assertMessage: A.Equals< - Parameters, - [Message[]] - > = 1; - assertMessage; - - await adapter.publishMessages([messageMock, otherMessageMock]); - - // regularly check if vitest matchers are available (toHaveReceivedCommandWith) - // https://github.com/m-radzikowski/aws-sdk-client-mock/issues/139 - expect(eventBridgeClientMock.calls()).toHaveLength(1); - expect(eventBridgeClientMock.call(0).args[0].input).toMatchObject({ - Entries: [ - { - EventBusName: eventBusNameMock, - Source: eventStoreIdMock, - DetailType: eventMock.type, - Detail: JSON.stringify(messageMock), - }, - { - EventBusName: eventBusNameMock, - Source: eventStoreIdMock, - DetailType: otherEventMock.type, - Detail: JSON.stringify(otherMessageMock), - }, - ], + it('works with event bus name getters', async () => { + const otherAdapter = new EventBridgeMessageBusAdapter({ + eventBusName: () => eventBusNameMock, + eventBridgeClient: + eventBridgeClientMock as unknown as EventBridgeClient, + }); + + await otherAdapter.publishMessage(messageMock); + + // regularly check if vitest matchers are available (toHaveReceivedCommandWith) + // https://github.com/m-radzikowski/aws-sdk-client-mock/issues/139 + expect(eventBridgeClientMock.calls()).toHaveLength(1); + expect(eventBridgeClientMock.call(0).args[0].input).toMatchObject({ + Entries: [{ EventBusName: eventBusNameMock }], + }); }); }); - it('chunk messages in separate PutEventsCommand calls when there are more messages then EVENTBRIDGE_MAX_ENTRIES_BATCH_SIZE', async () => { - const adapter = new EventBridgeMessageBusAdapter({ - eventBusName: eventBusNameMock, - eventBridgeClient: eventBridgeClientMock as unknown as EventBridgeClient, + describe('publishMessages', () => { + it('sets detail-type as __REPLAYED__ when replay options is true', async () => { + await adapter.publishMessages([messageMock, otherMessageMock], { + replay: true, + }); + + // regularly check if vitest matchers are available (toHaveReceivedCommandWith) + // https://github.com/m-radzikowski/aws-sdk-client-mock/issues/139 + expect(eventBridgeClientMock.calls()).toHaveLength(1); + expect(eventBridgeClientMock.call(0).args[0].input).toMatchObject({ + Entries: [ + { + EventBusName: eventBusNameMock, + Source: eventStoreIdMock, + DetailType: '__REPLAYED__', + Detail: JSON.stringify(messageMock), + }, + { + EventBusName: eventBusNameMock, + Source: eventStoreIdMock, + DetailType: '__REPLAYED__', + Detail: JSON.stringify(otherMessageMock), + }, + ], + }); }); - await adapter.publishMessages( - Array.from( - { length: EVENTBRIDGE_MAX_ENTRIES_BATCH_SIZE + 1 }, - () => messageMock, - ), - ); - - // regularly check if vitest matchers are available (toHaveReceivedCommandWith) - // https://github.com/m-radzikowski/aws-sdk-client-mock/issues/139 - expect(eventBridgeClientMock.calls()).toHaveLength(2); - }); - - it('works with event bus name getters', async () => { - const adapter = new EventBridgeMessageBusAdapter({ - eventBusName: () => eventBusNameMock, - eventBridgeClient: eventBridgeClientMock as unknown as EventBridgeClient, + it('send a PutEventsCommand to event bridge client on messages published', async () => { + const assertMessage: A.Equals< + Parameters, + [messages: Message[], options?: PublishMessageOptions | undefined] + > = 1; + assertMessage; + + await adapter.publishMessages([messageMock, otherMessageMock]); + + // regularly check if vitest matchers are available (toHaveReceivedCommandWith) + // https://github.com/m-radzikowski/aws-sdk-client-mock/issues/139 + expect(eventBridgeClientMock.calls()).toHaveLength(1); + expect(eventBridgeClientMock.call(0).args[0].input).toMatchObject({ + Entries: [ + { + EventBusName: eventBusNameMock, + Source: eventStoreIdMock, + DetailType: eventMock.type, + Detail: JSON.stringify(messageMock), + }, + { + EventBusName: eventBusNameMock, + Source: eventStoreIdMock, + DetailType: otherEventMock.type, + Detail: JSON.stringify(otherMessageMock), + }, + ], + }); }); - await adapter.publishMessage(messageMock); - - // regularly check if vitest matchers are available (toHaveReceivedCommandWith) - // https://github.com/m-radzikowski/aws-sdk-client-mock/issues/139 - expect(eventBridgeClientMock.calls()).toHaveLength(1); - expect(eventBridgeClientMock.call(0).args[0].input).toMatchObject({ - Entries: [ - { - EventBusName: eventBusNameMock, - Source: eventStoreIdMock, - DetailType: eventMock.type, - Detail: JSON.stringify(messageMock), - }, - ], + it('chunk messages in separate PutEventsCommand calls when there are more messages then EVENTBRIDGE_MAX_ENTRIES_BATCH_SIZE', async () => { + await adapter.publishMessages( + Array.from( + { length: EVENTBRIDGE_MAX_ENTRIES_BATCH_SIZE + 1 }, + () => messageMock, + ), + ); + + // regularly check if vitest matchers are available (toHaveReceivedCommandWith) + // https://github.com/m-radzikowski/aws-sdk-client-mock/issues/139 + expect(eventBridgeClientMock.calls()).toHaveLength(2); }); }); }); diff --git a/packages/event-bridge-message-bus-adapter/src/message.ts b/packages/event-bridge-message-bus-adapter/src/message.ts index 60a3ff47..5360cb94 100644 --- a/packages/event-bridge-message-bus-adapter/src/message.ts +++ b/packages/event-bridge-message-bus-adapter/src/message.ts @@ -29,7 +29,7 @@ type EventBridgeStateCarryingMessageBusMessage< EVENT_STORE_ID > ? EventBridgeEvent< - EVENT_TYPE, + EVENT_TYPE | '__REPLAYED__', StateCarryingMessage< EVENT_STORE_ID, Extract< @@ -69,7 +69,7 @@ type EventBridgeNotificationMessageBusMessage< EVENT_STORE_ID > ? EventBridgeEvent< - EVENT_TYPE, + EVENT_TYPE | '__REPLAYED__', NotificationMessage< EVENT_STORE_ID, Extract< diff --git a/packages/in-memory-message-bus-adapter/README.md b/packages/in-memory-message-bus-adapter/README.md index 036bd9fb..79217b9c 100644 --- a/packages/in-memory-message-bus-adapter/README.md +++ b/packages/in-memory-message-bus-adapter/README.md @@ -60,7 +60,7 @@ appMessageBus.messageBusAdapter = messageBusAdapter; Similarly to event emitters, the `inMemoryMessageBusAdapter` exposes an `on` method that takes two arguments: -- A filter patterns to optionally specify an `eventStoreId` and an event `type` to listen to (`NotificationEventBus` and `StateCarryingEventBus` only) +- A filter patterns to optionally specify an `eventStoreId` and an event `type` to listen to (`NotificationEventBus` and `StateCarryingEventBus` only), and wether replayed events should be included - An async callback to execute if the message matches the filter pattern ```ts @@ -84,6 +84,33 @@ messageBusAdapter.on( const { eventStoreId, event } = message; }, ); + +// 👇 Include replayed events +messageBusAdapter.on( + { eventStoreId: 'POKEMONS', eventType: 'POKEMON_APPEARED', onReplay: true }, + async message => { + // 🙌 Correctly typed! + const { eventStoreId, event } = message; + }, +); +``` + +For more control, the callback has access to more context through its second argument: + +```ts +messageBusAdapter.on( + ..., + async (message, context) => { + const { eventStoreId, event } = message; + const { + // 👇 See "Retry policy" section below + attempt, + retryAttemptsLeft, + // 👇 If event is replayed + replay, + } = context; + }, +); ``` The same callback can be re-used with different filter patterns. If a message matches several of them, it will still be triggered once: diff --git a/packages/in-memory-message-bus-adapter/src/adapter.ts b/packages/in-memory-message-bus-adapter/src/adapter.ts index 0f42a0d8..11deea67 100644 --- a/packages/in-memory-message-bus-adapter/src/adapter.ts +++ b/packages/in-memory-message-bus-adapter/src/adapter.ts @@ -1,3 +1,4 @@ +/* eslint-disable max-lines */ import type { EventEmitter } from 'node:events'; import { @@ -15,9 +16,10 @@ import type { ConstructorArgs, FilterPattern, InMemoryBusMessage, + TaskContext, } from './types'; import { - doesMessageMatchAnyFilterPattern, + doesTaskMatchAnyFilterPattern, parseBackoffRate, parseRetryAttempts, parseRetryDelayInMs, @@ -66,6 +68,7 @@ export class InMemoryMessageBusAdapter filterPattern: FilterPattern, handler: ( message: InMemoryMessageBusMessage, + context: TaskContext, ) => Promise, ) => void; @@ -108,12 +111,13 @@ export class InMemoryMessageBusAdapter }, ); - this.publishMessage = async message => + this.publishMessage = async (message, { replay = false } = {}) => new Promise(resolve => { const task: Task = { message, attempt: 1, retryAttemptsLeft: this.retryAttempts, + replay, }; this.eventEmitter.emit('message', task); @@ -121,9 +125,9 @@ export class InMemoryMessageBusAdapter resolve(); }); - this.publishMessages = async messages => { + this.publishMessages = async (messages, options) => { for (const message of messages) { - await this.publishMessage(message); + await this.publishMessage(message, options); } }; @@ -143,6 +147,7 @@ export class InMemoryMessageBusAdapter filterPattern: FilterPattern, handler: ( message: InMemoryMessageBusMessage, + context: TaskContext, ) => Promise, ) => { let handlerIndex = this.handlers.findIndex( @@ -171,14 +176,21 @@ export class InMemoryMessageBusAdapter InMemoryMessageBusMessage >, ) => { - const { message, retryHandlerIndex } = task; + const { + message, + retryHandlerIndex, + attempt, + retryAttemptsLeft, + replay = false, + } = task; + const context: TaskContext = { attempt, retryAttemptsLeft, replay }; if ( retryHandlerIndex === undefined - ? doesMessageMatchAnyFilterPattern(message, filterPatterns) + ? doesTaskMatchAnyFilterPattern(task, filterPatterns) : retryHandlerIndex === handlerIndex ) { - void handler(message).catch(error => { + void handler(message, context).catch(error => { this.eventEmitter.emit('error', error, task, handlerIndex); }); } diff --git a/packages/in-memory-message-bus-adapter/src/adapter.unit.test.ts b/packages/in-memory-message-bus-adapter/src/adapter.unit.test.ts index e3e4c931..158fa9a7 100644 --- a/packages/in-memory-message-bus-adapter/src/adapter.unit.test.ts +++ b/packages/in-memory-message-bus-adapter/src/adapter.unit.test.ts @@ -15,6 +15,7 @@ import { } from '@castore/demo-blueprint'; import { InMemoryMessageBusAdapter } from './adapter'; +import type { TaskContext } from './types'; const messageBus = new NotificationMessageBus({ messageBusId: 'messageBusId', @@ -37,6 +38,12 @@ const pikachuCaughtMessage: EventStoreNotificationMessage< event: pikachuCaughtEvent, }; +const context: TaskContext = { + attempt: 1, + retryAttemptsLeft: 2, + replay: false, +}; + const sleep = (ms: number): Promise => new Promise(resolve => setTimeout(resolve, ms)); @@ -82,7 +89,7 @@ describe('in-memory message queue adapter', () => { expect(handler2).not.toHaveBeenCalled(); }); - it('calls handler if it has been set', async () => { + it('calls handler only if it has been set', async () => { inMemoryMessageBusAdapter.on( { eventStoreId: 'POKEMONS', eventType: 'APPEARED' }, handler1, @@ -92,7 +99,7 @@ describe('in-memory message queue adapter', () => { await inMemoryMessageBusAdapter.publishMessage(pikachuCaughtMessage); expect(handler1).toHaveBeenCalledOnce(); - expect(handler1).toHaveBeenCalledWith(pikachuAppearedMessage); + expect(handler1).toHaveBeenCalledWith(pikachuAppearedMessage, context); inMemoryMessageBusAdapter.on( { eventStoreId: 'POKEMONS', eventType: 'CAUGHT_BY_TRAINER' }, @@ -101,7 +108,7 @@ describe('in-memory message queue adapter', () => { await inMemoryMessageBusAdapter.publishMessage(pikachuCaughtMessage); expect(handler1).toHaveBeenCalledTimes(2); - expect(handler1).toHaveBeenCalledWith(pikachuCaughtMessage); + expect(handler1).toHaveBeenCalledWith(pikachuCaughtMessage, context); }); it('calls handler only once, event if matches several filter patterns', async () => { @@ -109,7 +116,46 @@ describe('in-memory message queue adapter', () => { await inMemoryMessageBusAdapter.publishMessage(pikachuAppearedMessage); expect(handler1).toHaveBeenCalledOnce(); - expect(handler1).toHaveBeenCalledWith(pikachuAppearedMessage); + expect(handler1).toHaveBeenCalledWith(pikachuAppearedMessage, context); + }); + + it('calls handler only if replay has been specified', async () => { + await inMemoryMessageBusAdapter.publishMessage(pikachuCaughtMessage, { + replay: true, + }); + + // Both are still triggered on real-time messages since prev tests + expect(handler1).not.toHaveBeenCalled(); + expect(handler2).not.toHaveBeenCalled(); + + inMemoryMessageBusAdapter.on( + { eventStoreId: 'POKEMONS', onReplay: true }, + handler1, + ); + inMemoryMessageBusAdapter.on( + { + eventStoreId: 'POKEMONS', + eventType: 'CAUGHT_BY_TRAINER', + onReplay: true, + }, + handler2, + ); + + await inMemoryMessageBusAdapter.publishMessage(pikachuCaughtMessage, { + replay: true, + }); + + // Both are still triggered on real-time messages since prev tests + expect(handler1).toHaveBeenCalledOnce(); + expect(handler1).toHaveBeenCalledWith(pikachuCaughtMessage, { + ...context, + replay: true, + }); + expect(handler2).toHaveBeenCalledOnce(); + expect(handler2).toHaveBeenCalledWith(pikachuCaughtMessage, { + ...context, + replay: true, + }); }); it('calls all handlers if needed', async () => { @@ -117,8 +163,8 @@ describe('in-memory message queue adapter', () => { await inMemoryMessageBusAdapter.publishMessage(pikachuAppearedMessage); - expect(handler1).toHaveBeenCalledWith(pikachuAppearedMessage); - expect(handler2).toHaveBeenCalledWith(pikachuAppearedMessage); + expect(handler1).toHaveBeenCalledWith(pikachuAppearedMessage, context); + expect(handler2).toHaveBeenCalledWith(pikachuAppearedMessage, context); }); it('statically rejects invalid handlers', async () => { @@ -181,7 +227,7 @@ describe('in-memory message queue adapter', () => { inMemoryMessageBusAdapter.on({}, handler); await messageBus.publishMessage(pikachuAppearedMessage); - expect(handler).toHaveBeenCalledWith(pikachuAppearedMessage); + expect(handler).toHaveBeenCalledWith(pikachuAppearedMessage, context); }); }); diff --git a/packages/in-memory-message-bus-adapter/src/index.ts b/packages/in-memory-message-bus-adapter/src/index.ts index 98163fb8..75208ea4 100644 --- a/packages/in-memory-message-bus-adapter/src/index.ts +++ b/packages/in-memory-message-bus-adapter/src/index.ts @@ -1,2 +1,3 @@ export { InMemoryMessageBusAdapter } from './adapter'; +export type { TaskContext } from './types'; export type { InMemoryMessageBusMessage } from './message'; diff --git a/packages/in-memory-message-bus-adapter/src/message.ts b/packages/in-memory-message-bus-adapter/src/message.ts index 34f3e3c9..7b8687be 100644 --- a/packages/in-memory-message-bus-adapter/src/message.ts +++ b/packages/in-memory-message-bus-adapter/src/message.ts @@ -43,4 +43,5 @@ export type Task< retryHandlerIndex?: number; attempt: number; retryAttemptsLeft: number; + replay?: boolean; }; diff --git a/packages/in-memory-message-bus-adapter/src/types.ts b/packages/in-memory-message-bus-adapter/src/types.ts index 0ec62244..6af38984 100644 --- a/packages/in-memory-message-bus-adapter/src/types.ts +++ b/packages/in-memory-message-bus-adapter/src/types.ts @@ -20,6 +20,12 @@ export type ConstructorArgs = { retryBackoffRate?: number; }; +export type TaskContext = { + attempt: number; + retryAttemptsLeft: number; + replay: boolean; +}; + export type InMemoryBusMessage< MESSAGE_BUS extends | AggregateExistsMessageBus @@ -44,5 +50,9 @@ export type FilterPattern< EVENT_STORE_ID extends string = string, EVENT_TYPE extends string = string, > = - | { eventStoreId?: EVENT_STORE_ID; eventType?: never } - | { eventStoreId: EVENT_STORE_ID; eventType?: EVENT_TYPE }; + | { eventStoreId?: EVENT_STORE_ID; eventType?: never; onReplay?: boolean } + | { + eventStoreId: EVENT_STORE_ID; + eventType?: EVENT_TYPE; + onReplay?: boolean; + }; diff --git a/packages/in-memory-message-bus-adapter/src/utils.ts b/packages/in-memory-message-bus-adapter/src/utils.ts index 78592c1f..5fc52716 100644 --- a/packages/in-memory-message-bus-adapter/src/utils.ts +++ b/packages/in-memory-message-bus-adapter/src/utils.ts @@ -1,14 +1,18 @@ -import type { Message } from '@castore/core'; import { isEventCarryingMessage } from '@castore/core'; +import type { Task } from './message'; import type { FilterPattern } from './types'; -export const doesMessageMatchFilterPattern = ( - message: Message, +export const doesTaskMatchFilterPattern = ( + task: Task, filterPattern: FilterPattern, ): boolean => { - const { eventStoreId: filterEventStoreId, eventType: filterEventType } = - filterPattern; + const { message, replay = false } = task; + const { + eventStoreId: filterEventStoreId, + eventType: filterEventType, + onReplay = false, + } = filterPattern; let messageEventType: string | undefined = undefined; const { eventStoreId: messageEventStoreId } = message; @@ -17,6 +21,10 @@ export const doesMessageMatchFilterPattern = ( messageEventType = message.event.type; } + if (replay && !onReplay) { + return false; + } + if (filterEventStoreId !== undefined && filterEventType !== undefined) { return ( messageEventStoreId === filterEventStoreId && @@ -31,12 +39,12 @@ export const doesMessageMatchFilterPattern = ( return true; }; -export const doesMessageMatchAnyFilterPattern = ( - message: Message, +export const doesTaskMatchAnyFilterPattern = ( + task: Task, filterPatterns: FilterPattern[], ): boolean => filterPatterns.some(filterPattern => - doesMessageMatchFilterPattern(message, filterPattern), + doesTaskMatchFilterPattern(task, filterPattern), ); export const parseRetryDelayInMs = (retryDelayInMs: number): number => { diff --git a/packages/in-memory-message-queue-adapter/README.md b/packages/in-memory-message-queue-adapter/README.md index 25015969..ed980f0e 100644 --- a/packages/in-memory-message-queue-adapter/README.md +++ b/packages/in-memory-message-queue-adapter/README.md @@ -82,6 +82,23 @@ messageQueueAdapter.worker = async message => { > Only one worker at a time can be set up +For more control, the worker has access to more context through its second argument: + +```ts +messageQueueAdapter.worker = async (message, context) => { + const { eventStoreId, event } = message; + const { + // 👇 See "Retry policy" section below + attempt, + retryAttemptsLeft, + // 👇 If event is replayed + replay, + } = context; + + ... +}; +``` + ## ♻️ Retry policy This adapter will retry failed messages handling. You can specify a different retry policy than the default one via its constructor arguments: diff --git a/packages/in-memory-message-queue-adapter/src/adapter.ts b/packages/in-memory-message-queue-adapter/src/adapter.ts index 121aabba..e49368e1 100644 --- a/packages/in-memory-message-queue-adapter/src/adapter.ts +++ b/packages/in-memory-message-queue-adapter/src/adapter.ts @@ -18,7 +18,7 @@ import { parseRetryDelayInMs, } from './utils'; -type InMemoryQueueMessage< +export type InMemoryQueueMessage< MESSAGE_QUEUE extends | AggregateExistsMessageQueue | StateCarryingMessageQueue @@ -42,14 +42,18 @@ type InMemoryQueueMessage< > : never; -export type Task = { - message: MESSAGE; +export type TaskContext = { attempt: number; retryAttemptsLeft: number; + replay: boolean; }; +export type Task = { + message: MESSAGE; +} & TaskContext; + type ConstructorArgs = { - worker?: (message: MESSAGE) => Promise; + worker?: (message: MESSAGE, context: TaskContext) => Promise; retryAttempts?: number; retryDelayInMs?: number; retryBackoffRate?: number; @@ -78,7 +82,9 @@ export class InMemoryMessageQueueAdapter publishMessage: MessageChannelAdapter['publishMessage']; publishMessages: MessageChannelAdapter['publishMessages']; - private subscribe: (nextHandler: (message: MESSAGE) => Promise) => void; + private subscribe: ( + nextHandler: (message: MESSAGE, context: TaskContext) => Promise, + ) => void; retryAttempts: number; retryDelayInMs: number; retryBackoffRate: number; @@ -96,14 +102,17 @@ export class InMemoryMessageQueueAdapter this.retryBackoffRate = parseBackoffRate(retryBackoffRate); this.subscribe = ( - nextHandler: (message: MESSAGE) => Promise, + nextHandler: (message: MESSAGE, context: TaskContext) => Promise, ): void => { - this.queue = fastQ(({ message }) => nextHandler(message), 1); + this.queue = fastQ( + ({ message, ...context }) => nextHandler(message, context), + 1, + ); this.queue.error((error, task) => { // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (error === null) return; - const { message, attempt, retryAttemptsLeft } = task; + const { attempt, retryAttemptsLeft, ...restTask } = task; if (retryAttemptsLeft <= 0) { console.error(error); @@ -120,9 +129,9 @@ export class InMemoryMessageQueueAdapter if (queue === undefined) return; void queue.push({ - message, attempt: attempt + 1, retryAttemptsLeft: retryAttemptsLeft - 1, + ...restTask, }); }, waitTimeInMs); }); @@ -132,7 +141,7 @@ export class InMemoryMessageQueueAdapter this.subscribe(worker); } - this.publishMessage = async message => + this.publishMessage = async (message, { replay = false } = {}) => new Promise(resolve => { const queue = this.queue; @@ -146,19 +155,22 @@ export class InMemoryMessageQueueAdapter message: message as MESSAGE, attempt: 1, retryAttemptsLeft: this.retryAttempts, + replay, }); resolve(); }); - this.publishMessages = async messages => { + this.publishMessages = async (messages, options) => { for (const message of messages) { - await this.publishMessage(message); + await this.publishMessage(message, options); } }; } - set worker(worker: (message: MESSAGE) => Promise) { + set worker( + worker: (message: MESSAGE, context: TaskContext) => Promise, + ) { this.subscribe(worker); } } diff --git a/packages/in-memory-message-queue-adapter/src/adapter.unit.test.ts b/packages/in-memory-message-queue-adapter/src/adapter.unit.test.ts index 945da60a..0809006d 100644 --- a/packages/in-memory-message-queue-adapter/src/adapter.unit.test.ts +++ b/packages/in-memory-message-queue-adapter/src/adapter.unit.test.ts @@ -13,7 +13,7 @@ import { pikachuAppearedEvent, } from '@castore/demo-blueprint'; -import { Task, InMemoryMessageQueueAdapter } from './adapter'; +import { Task, InMemoryMessageQueueAdapter, TaskContext } from './adapter'; const messageQueue = new NotificationMessageQueue({ messageQueueId: 'messageQueueId', @@ -29,6 +29,12 @@ const pikachuAppearedMessage: EventStoreNotificationMessage< event: pikachuAppearedEvent, }; +const context: TaskContext = { + attempt: 1, + retryAttemptsLeft: 2, + replay: false, +}; + const sleep = (ms: number): Promise => new Promise(resolve => setTimeout(resolve, ms)); @@ -77,17 +83,28 @@ describe('in-memory message queue adapter', () => { await inMemoryMessageQueueAdapter.publishMessage(pikachuAppearedMessage); - expect(worker1).toHaveBeenCalledWith(pikachuAppearedMessage); + expect(worker1).toHaveBeenCalledWith(pikachuAppearedMessage, context); expect(worker2).not.toHaveBeenCalled(); }); + it('passes replay option to context', async () => { + await inMemoryMessageQueueAdapter.publishMessage(pikachuAppearedMessage, { + replay: true, + }); + + expect(worker1).toHaveBeenCalledWith(pikachuAppearedMessage, { + ...context, + replay: true, + }); + }); + it('recreates queue if new worker is set', async () => { inMemoryMessageQueueAdapter.worker = worker2; await inMemoryMessageQueueAdapter.publishMessage(pikachuAppearedMessage); expect(worker1).not.toHaveBeenCalled(); - expect(worker2).toHaveBeenCalledWith(pikachuAppearedMessage); + expect(worker2).toHaveBeenCalledWith(pikachuAppearedMessage, context); }); it('actually connects to a messageQueue', async () => { @@ -95,7 +112,7 @@ describe('in-memory message queue adapter', () => { await messageQueue.publishMessage(pikachuAppearedMessage); - expect(worker2).toHaveBeenCalledWith(pikachuAppearedMessage); + expect(worker2).toHaveBeenCalledWith(pikachuAppearedMessage, context); }); it('accepts worker in constructor', async () => { @@ -129,6 +146,18 @@ describe('in-memory message queue adapter', () => { expect(worker2).toHaveBeenCalledTimes(mockNumberOfEventToPublish); }); + + it('passes replay option to context', async () => { + await messageQueue.publishMessages([pikachuAppearedMessage], { + replay: true, + }); + + expect(worker2).toHaveBeenCalledOnce(); + expect(worker2).toHaveBeenCalledWith(pikachuAppearedMessage, { + ...context, + replay: true, + }); + }); }); describe('through static method', () => { @@ -157,7 +186,7 @@ describe('in-memory message queue adapter', () => { inMemoryMessageQueueAdapter.worker = worker; await messageQueue.publishMessage(pikachuAppearedMessage); - expect(worker).toHaveBeenCalledWith(pikachuAppearedMessage); + expect(worker).toHaveBeenCalledWith(pikachuAppearedMessage, context); }); it('correctly instanciates a class and attach it (with worker)', async () => { @@ -183,7 +212,7 @@ describe('in-memory message queue adapter', () => { assertQueueType; await messageQueue.publishMessage(pikachuAppearedMessage); - expect(worker).toHaveBeenCalledWith(pikachuAppearedMessage); + expect(worker).toHaveBeenCalledWith(pikachuAppearedMessage, context); }); }); diff --git a/packages/in-memory-message-queue-adapter/src/index.ts b/packages/in-memory-message-queue-adapter/src/index.ts index 8c9eb922..5bb7bbcc 100644 --- a/packages/in-memory-message-queue-adapter/src/index.ts +++ b/packages/in-memory-message-queue-adapter/src/index.ts @@ -1 +1,2 @@ export { InMemoryMessageQueueAdapter } from './adapter'; +export type { InMemoryQueueMessage, TaskContext } from './adapter'; diff --git a/packages/inmemory-event-storage-adapter/src/adapter.ts b/packages/inmemory-event-storage-adapter/src/adapter.ts index 489a363c..9472d9cf 100644 --- a/packages/inmemory-event-storage-adapter/src/adapter.ts +++ b/packages/inmemory-event-storage-adapter/src/adapter.ts @@ -1,11 +1,11 @@ /* eslint-disable max-lines */ -import { +import type { Aggregate, EventDetail, - GroupedEvent, - EventStoreContext, + PushEventOptions, StorageAdapter, } from '@castore/core'; +import { GroupedEvent } from '@castore/core'; import { InMemoryEventAlreadyExistsError } from './error'; import { @@ -113,7 +113,7 @@ export class InMemoryStorageAdapter implements StorageAdapter { getEvents: StorageAdapter['getEvents']; pushEventSync: ( eventDetail: EventDetail, - context: EventStoreContext, + options: PushEventOptions, ) => Awaited>; pushEvent: StorageAdapter['pushEvent']; pushEventGroup: StorageAdapter['pushEventGroup']; @@ -137,8 +137,10 @@ export class InMemoryStorageAdapter implements StorageAdapter { } }); - this.pushEventSync = (event, context) => { + this.pushEventSync = (event, options) => { const { aggregateId, version } = event; + const { eventStoreId, force = false } = options; + const events = this.eventStore[aggregateId]; if (events === undefined) { @@ -147,18 +149,22 @@ export class InMemoryStorageAdapter implements StorageAdapter { return { event }; } - if ( - events.some( - ({ version: existingVersion }) => existingVersion === version, - ) - ) { - const { eventStoreId } = context; - - throw new InMemoryEventAlreadyExistsError({ - eventStoreId, - aggregateId, - version, - }); + const existingEventIndex = events.findIndex( + ({ version: existingVersion }) => existingVersion === version, + ); + + if (existingEventIndex !== -1) { + if (force) { + events[existingEventIndex] = event; + + return { event }; + } else { + throw new InMemoryEventAlreadyExistsError({ + eventStoreId, + aggregateId, + version, + }); + } } events.push(event); @@ -166,10 +172,10 @@ export class InMemoryStorageAdapter implements StorageAdapter { return { event }; }; - this.pushEvent = async (event, context) => + this.pushEvent = async (event, options) => new Promise(resolve => { const timestamp = new Date().toISOString(); - resolve(this.pushEventSync({ timestamp, ...event }, context)); + resolve(this.pushEventSync({ timestamp, ...event }, options)); }); this.pushEventGroup = async (...groupedEventsInput) => diff --git a/packages/inmemory-event-storage-adapter/src/adapter.unit.test.ts b/packages/inmemory-event-storage-adapter/src/adapter.unit.test.ts index 939742cc..6ce1638c 100644 --- a/packages/inmemory-event-storage-adapter/src/adapter.unit.test.ts +++ b/packages/inmemory-event-storage-adapter/src/adapter.unit.test.ts @@ -41,7 +41,7 @@ describe('in-memory storage adapter', () => { }); describe('methods', () => { - describe('getEvents', () => { + describe('getEvents / pushEvent', () => { const storageAdapter = new InMemoryStorageAdapter(); it('gets an empty array if there is no event for aggregateId', async () => { @@ -59,6 +59,15 @@ describe('in-memory storage adapter', () => { ).rejects.toThrow(InMemoryEventAlreadyExistsError); }); + it('overrides event is force option is set to true', async () => { + const { event } = await storageAdapter.pushEvent(eventMock1, { + eventStoreId, + force: true, + }); + + expect(event).toStrictEqual(eventMock1); + }); + it('pushes and gets events correctly', async () => { const { timestamp, ...eventMock2WithoutTimestamp } = eventMock2; MockDate.set(timestamp); diff --git a/packages/redux-event-storage-adapter/src/adapter.ts b/packages/redux-event-storage-adapter/src/adapter.ts index 181650b0..6cd538ec 100644 --- a/packages/redux-event-storage-adapter/src/adapter.ts +++ b/packages/redux-event-storage-adapter/src/adapter.ts @@ -5,7 +5,7 @@ import { GroupedEvent, StorageAdapter, EventDetail, - EventStoreContext, + PushEventOptions, Aggregate, } from '@castore/core'; @@ -106,7 +106,7 @@ export class ReduxEventStorageAdapter implements StorageAdapter { getEvents: StorageAdapter['getEvents']; pushEventSync: ( eventDetail: EventDetail, - context: EventStoreContext, + options: PushEventOptions, ) => Awaited>; pushEvent: StorageAdapter['pushEvent']; pushEventGroup: StorageAdapter['pushEventGroup']; @@ -145,14 +145,15 @@ export class ReduxEventStorageAdapter implements StorageAdapter { return eventStoreState; }; - this.pushEventSync = event => { + this.pushEventSync = (event, options) => { const { aggregateId } = event; + const force = options.force ?? false; const eventStoreState = this.getEventStoreState(); const events = eventStoreState.eventsByAggregateId[aggregateId] ?? []; - if (events.some(({ version }) => version === event.version)) { + if (!force && events.some(({ version }) => version === event.version)) { throw new ReduxStoreEventAlreadyExistsError({ eventStoreId: this.eventStoreId, aggregateId: event.aggregateId, diff --git a/packages/sqs-message-queue-adapter/README.md b/packages/sqs-message-queue-adapter/README.md index 106396a7..d8396a2e 100644 --- a/packages/sqs-message-queue-adapter/README.md +++ b/packages/sqs-message-queue-adapter/README.md @@ -66,19 +66,19 @@ When publishing a message, it is JSON stringified and passed as the record body. ```ts // 👇 Aggregate exists -{ - "body": "{ +const message = { + body: '{ \"eventStoreId\": \"POKEMONS\", \"aggregateId\": \"123\", - }", + }', ... // <= Other technical SQS properties } ``` ```ts // 👇 Notification -{ - "body": "{ +const message = { + body: '{ \"eventStoreId\": \"POKEMONS\", \"event\": { \"aggregateId\": \"123\", @@ -87,38 +87,62 @@ When publishing a message, it is JSON stringified and passed as the record body. \"timestamp\": ... ... }, - }", + }', ... } ``` ```ts // 👇 State-carrying -{ - "body": "{ +const message = { + body: '{ \"eventStoreId\": \"POKEMONS\", \"event\": { \"aggregateId\": \"123\", ... }, - \"aggregate\": ... - }", + \"aggregate\": ..., + }', ... -} +}; ``` -If your queue is of type FIFO, the `MessageGroupId` and `MessageDeduplicationId` will be derived from a combination of the `eventStoreId`, `aggregateId` and `version`: +If your queue is of type FIFO, the `messageGroupId` and `messageDeduplicationId` will be derived from a combination of the `eventStoreId`, `aggregateId` and `version`: ```ts -// 👇 Entry example -const Entry = { - MessageBody: JSON.stringify({ ... }), - MessageGroupId: "POKEMONS#123", - MessageDeduplicationId: "POKEMONS#123#1", // <= Or "POKEMONS#123" for AggregateExistsMessageQueues +// 👇 Fifo message +const message = { + messageBody: ..., + messageGroupId: "POKEMONS#123", + messageDeduplicationId: "POKEMONS#123#1", // <= Or "POKEMONS#123" for AggregateExistsMessageQueues ... // <= Other technical SQS properties }; ``` +If the `replay` option is set to `true`, a `replay` metadata attribute is included in the message: + +```ts +// 👇 Replayed notification message +const message = { + body: '{ + \"eventStoreId\": \"POKEMONS\", + \"event\": { + \"aggregateId\": \"123\", + ... + }, + }', + messageAttributes: { + replay: { + // 👇 boolean type is not available in SQS 🤷‍♂️ + dataType: 'Number', + // 👇 numberValue is not available in SQS 🤷‍♂️ + stringValue: '1', + }, + }, + ... +}; +``` + On the worker side, you can use the `SQSMessageQueueMessage` and `SQSMessageQueueMessageBody` TS types to type your argument: ```ts diff --git a/packages/sqs-message-queue-adapter/src/adapter.ts b/packages/sqs-message-queue-adapter/src/adapter.ts index d76316e7..97211413 100644 --- a/packages/sqs-message-queue-adapter/src/adapter.ts +++ b/packages/sqs-message-queue-adapter/src/adapter.ts @@ -2,6 +2,8 @@ import { SQSClient, SendMessageCommand, SendMessageBatchCommand, + SendMessageCommandInput, + SendMessageBatchRequestEntry, } from '@aws-sdk/client-sqs'; import chunk from 'lodash.chunk'; @@ -57,48 +59,75 @@ export class SQSMessageQueueAdapter implements MessageChannelAdapter { this.getQueueUrl = () => typeof this.queueUrl === 'string' ? this.queueUrl : this.queueUrl(); - this.publishMessage = async message => { + this.publishMessage = async (message, { replay = false } = {}) => { const { eventStoreId } = message; const { aggregateId, version } = parseMessage(message); + const sendMessageCommandInput: SendMessageCommandInput = { + MessageBody: JSON.stringify(message), + QueueUrl: this.getQueueUrl(), + }; + + if (this.fifo) { + sendMessageCommandInput.MessageDeduplicationId = [ + eventStoreId, + aggregateId, + version, + ] + .filter(Boolean) + .join('#'); + + sendMessageCommandInput.MessageGroupId = [ + eventStoreId, + aggregateId, + ].join('#'); + } + + if (replay) { + sendMessageCommandInput.MessageAttributes = { + replay: { DataType: 'Number', StringValue: '1' }, + }; + } + await this.sqsClient.send( - new SendMessageCommand({ - MessageBody: JSON.stringify(message), - QueueUrl: this.getQueueUrl(), - ...(this.fifo - ? { - MessageDeduplicationId: [eventStoreId, aggregateId, version] - .filter(Boolean) - .join('#'), - MessageGroupId: [eventStoreId, aggregateId].join('#'), - } - : {}), - }), + new SendMessageCommand(sendMessageCommandInput), ); }; - this.publishMessages = async messages => { + this.publishMessages = async (messages, { replay = false } = {}) => { + const baseEntry: Omit< + SendMessageBatchRequestEntry, + 'Id' | 'MessageBody' + > = {}; + + if (replay) { + baseEntry.MessageAttributes = { + replay: { DataType: 'Number', StringValue: '1' }, + }; + } + for (const chunkMessages of chunk(messages, SQS_MAX_MESSAGE_BATCH_SIZE)) { await this.sqsClient.send( new SendMessageBatchCommand({ Entries: chunkMessages.map(message => { const { eventStoreId } = message; const { aggregateId, version } = parseMessage(message); - const messageId = [eventStoreId, aggregateId, version] .filter(Boolean) .join('#'); - return { + const entry: SendMessageBatchRequestEntry = { + ...baseEntry, Id: messageId, MessageBody: JSON.stringify(message), - ...(this.fifo - ? { - MessageDeduplicationId: messageId, - MessageGroupId: [eventStoreId, aggregateId].join('#'), - } - : {}), }; + + if (this.fifo) { + entry.MessageDeduplicationId = messageId; + entry.MessageGroupId = [eventStoreId, aggregateId].join('#'); + } + + return entry; }), QueueUrl: this.getQueueUrl(), }), diff --git a/packages/sqs-message-queue-adapter/src/adapter.unit.test.ts b/packages/sqs-message-queue-adapter/src/adapter.unit.test.ts index b76124a3..ced74e8d 100644 --- a/packages/sqs-message-queue-adapter/src/adapter.unit.test.ts +++ b/packages/sqs-message-queue-adapter/src/adapter.unit.test.ts @@ -1,3 +1,4 @@ +/* eslint-disable max-lines */ import { SQSClient, SendMessageCommand, @@ -6,7 +7,7 @@ import { import { mockClient } from 'aws-sdk-client-mock'; import type { A } from 'ts-toolbelt'; -import type { Message } from '@castore/core'; +import type { Message, PublishMessageOptions } from '@castore/core'; import { SQSMessageQueueAdapter, SQS_MAX_MESSAGE_BATCH_SIZE } from './adapter'; @@ -52,7 +53,7 @@ describe('SQSMessageQueueAdapter', () => { const assertMessage: A.Equals< Parameters, - [Message] + [message: Message, options?: PublishMessageOptions | undefined] > = 1; assertMessage; @@ -85,6 +86,25 @@ describe('SQSMessageQueueAdapter', () => { }); }); + it('appends replay MessageAttribute if replay is set to true', async () => { + const adapter = new SQSMessageQueueAdapter({ + queueUrl: queueUrlMock, + sqsClient: sqsClientMock as unknown as SQSClient, + fifo: true, + }); + + await adapter.publishMessage(message, { replay: true }); + + // regularly check if vitest matchers are available (toHaveReceivedCommandWith) + // https://github.com/m-radzikowski/aws-sdk-client-mock/issues/139 + expect(sqsClientMock.calls()).toHaveLength(1); + expect(sqsClientMock.call(0).args[0].input).toMatchObject({ + MessageAttributes: { + replay: { DataType: 'Number', StringValue: '1' }, + }, + }); + }); + it('send a SendMessageBatchCommand to sqs client on messages batch published', async () => { const adapter = new SQSMessageQueueAdapter({ queueUrl: queueUrlMock, @@ -93,7 +113,7 @@ describe('SQSMessageQueueAdapter', () => { const assertMessages: A.Equals< Parameters, - [Message[]] + [messages: Message[], options?: PublishMessageOptions | undefined] > = 1; assertMessages; @@ -150,6 +170,34 @@ describe('SQSMessageQueueAdapter', () => { }); }); + it('appends replay MessageAttribute if replay is set to true', async () => { + const adapter = new SQSMessageQueueAdapter({ + queueUrl: queueUrlMock, + sqsClient: sqsClientMock as unknown as SQSClient, + }); + + await adapter.publishMessages([message, otherMessage], { replay: true }); + + // regularly check if vitest matchers are available (toHaveReceivedCommandWith) + // https://github.com/m-radzikowski/aws-sdk-client-mock/issues/139 + expect(sqsClientMock.calls()).toHaveLength(1); + expect(sqsClientMock.call(0).args[0].input).toMatchObject({ + Entries: [ + { + MessageAttributes: { + replay: { DataType: 'Number', StringValue: '1' }, + }, + }, + { + MessageAttributes: { + replay: { DataType: 'Number', StringValue: '1' }, + }, + }, + ], + QueueUrl: queueUrlMock, + }); + }); + it('chunk messages in separate SendMessageBatchCommand calls when there are more messages then SQS_MAX_MESSAGE_BATCH_SIZE', async () => { const adapter = new SQSMessageQueueAdapter({ queueUrl: queueUrlMock, diff --git a/packages/sqs-message-queue-adapter/src/message.ts b/packages/sqs-message-queue-adapter/src/message.ts index b448396e..2590d9ad 100644 --- a/packages/sqs-message-queue-adapter/src/message.ts +++ b/packages/sqs-message-queue-adapter/src/message.ts @@ -1,4 +1,4 @@ -import type { SQSEvent } from 'aws-lambda'; +import type { SQSRecord } from 'aws-lambda'; import type { AggregateExistsMessageQueue, @@ -10,7 +10,15 @@ import type { MessageChannelSourceEventStores, } from '@castore/core'; -export type SQSMessageQueueMessage = SQSEvent; +export interface SQSMessageQueueRecord extends SQSRecord { + messageAttributes: { + replay?: { stringValue: '1'; dataType: 'Number' }; + }; +} + +export interface SQSMessageQueueMessage { + Records: SQSMessageQueueRecord[]; +} type Prettify> = OBJECTS extends infer OBJECT