Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: Ease maintenance with replay and force options #137

Merged
merged 12 commits into from
Aug 6, 2023
21 changes: 15 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ const pokemonsEventStore = new EventStore({
> // => pokemonsReducer
> ```
>
> - <code>onEventPushed <i>(?(pushEventResponse: PushEventResponse) => Promise\<void\>)</i></code>: The callback to run after events are pushed
> - <code>onEventPushed <i>(?(pushEventResponse: PushEventResponse) => Promise\<void\>)</i></code>: Callback to run after events are pushed
>
> ```ts
> const onEventPushed = pokemonsEventStore.onEventPushed;
Expand Down Expand Up @@ -522,11 +522,12 @@ const pokemonsEventStore = new EventStore({
> // => 'aggregate' and 'lastEvent' are always defined 🙌
> ```
>
> - <code>pushEvent <i>((eventDetail: EventDetail, opt?: OptionsObj = {}) => Promise\<ResponseObj\>)</i></code>: Pushes a new event to the event store. The `timestamp` is optional (we keep it available as it can be useful in tests & migrations). If not provided, it is automatically set as `new Date().toISOString()`. Throws an `EventAlreadyExistsError` if an event already exists for the corresponding `aggregateId` and `version`.
> - <code>pushEvent <i>((eventDetail: EventDetail, opt?: OptionsObj = {}) => Promise\<ResponseObj\>)</i></code>: Pushes a new event to the event store. The `timestamp` is optional (we keep it available as it can be useful in tests & migrations). If not provided, it is automatically set as `new Date().toISOString()`. Throws an `EventAlreadyExistsError` if an event already exists for the corresponding `aggregateId` and `version` (see section below on race conditions).
>
> `OptionsObj` contains the following properties:
>
> - <code>prevAggregate <i>(?Aggregate)</i></code>: The aggregate at the current version, i.e. before having pushed the event. Can be useful in some cases like when using the [`ConnectedEventStore` class](#--connectedeventstore)
> - <code>force <i>(?boolean)</i></code>: To force push the event even if one already exists for the corresponding `aggregateId` and `version`. Any existing event will be overridden, so use with extra care, mainly in [data migrations](#--dam).
>
> `ResponseObj` contains the following properties:
>
Expand Down Expand Up @@ -971,9 +972,13 @@ await appMessageQueue.publishMessage({
>
> The following methods interact with the messaging solution of your application through a `MessageQueueAdapter`. They will throw an `UndefinedMessageChannelAdapterError` if you did not provide one.
>
> - <code>publishMessage <i>((message: NotificationMessage | StateCarryingMessage) => Promise\<void\>)</i></code>: Publish a `NotificationMessage` (for `NotificationMessageQueues`) or a `StateCarryingMessage` (for `StateCarryingMessageQueues`) to the message queue.
> - <code>publishMessage <i>((message: Message, opt?: OptionsObj = {}) => Promise\<void\>)</i></code>: Publish a `Message` (of the appropriate type) to the message queue.
>
> - <code>publishMessages <i>((messages: NotificationMessage[] | StateCarryingMessage[]) => Promise\<void\>)</i></code>: Publish several `NotificationMessage` (for `NotificationMessageQueues`) or several `StateCarryingMessage` (for `StateCarryingMessageQueues`) to the message queue.
> `OptionsObj` contains the following properties:
>
> - <code>replay <i>(?boolean)</i></code>: Signals that the event is not happening in real-time, e.g. in maintenance or migration operations. This information can be used downstream to react appropriately (e.g. prevent sending notification emails). Check the implementation of you adapter for more details.
>
> - <code>publishMessages <i>(messages: Message[], opt?: OptionsObj) => Promise\<void\>)</i></code>: Publish several `Messages` (of the appropriate type) to the message queue. Options are similar to the `publishMessage` options.
>
> - <code>getAggregateAndPublishMessage <i>((message: NotificationMessage) => Promise\<void\>)</i></code>: _(StateCarryingMessageQueues only)_ Append the matching aggregate (with correct version) to a `NotificationMessage` and turn it into a `StateCarryingMessage` before publishing it to the message queue. Uses the message queue event stores: Make sure that they have correct adapters set up.
>
Expand Down Expand Up @@ -1103,9 +1108,13 @@ await appMessageBus.publishMessage({
>
> The following methods interact with the messaging solution of your application through a `MessageBusAdapter`. They will throw an `UndefinedMessageChannelAdapterError` if you did not provide one.
>
> - <code>publishMessage <i>((message: NotificationMessage | StateCarryingMessage) => Promise\<void\>)</i></code>: Publish a `NotificationMessage` (for `NotificationMessageBuses`) or a `StateCarryingMessage` (for `StateCarryingMessageBuses`) to the message bus.
> - <code>publishMessage <i>((message: Message, opt?: OptionsObj = {}) => Promise\<void\>)</i></code>: Publish a `Message` (of the appropriate type) to the message bus.
>
> `OptionsObj` contains the following properties:
>
> - <code>replay <i>(?boolean)</i></code>: Signals that the event is not happening in real-time, e.g. in maintenance or migration operations. This information can be used downstream to react appropriately (e.g. prevent sending notification emails). Check the implementation of you adapter for more details.
>
> - <code>publishMessages <i>((messages: NotificationMessage[] | StateCarryingMessage[]) => Promise\<void\>)</i></code>: Publish several `NotificationMessage` (for `NotificationMessageBuses`) or several `StateCarryingMessage` (for `StateCarryingMessageBuses`) to the message bus.
> - <code>publishMessages <i>(messages: Message[], opt?: OptionsObj) => Promise\<void\>)</i></code>: Publish several `Messages` (of the appropriate type) to the message bus. Options are similar to the `publishMessage` options.
>
> - <code>getAggregateAndPublishMessage <i>((message: NotificationMessage) => Promise\<void\>)</i></code>: _(StateCarryingMessageBuses only)_ Append the matching aggregate (with correct version) to a `NotificationMessage` and turn it into a `StateCarryingMessage` before publishing it to the message bus. Uses the message bus event stores: Make sure that they have correct adapters set up.
>
Expand Down
6 changes: 5 additions & 1 deletion packages/core/src/eventStore/eventStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,15 @@ export class EventStore<
*/
) as Promise<{ events: EVENT_DETAILS[] }>;

this.pushEvent = async (eventDetail, { prevAggregate } = {}) => {
this.pushEvent = async (
eventDetail,
{ prevAggregate, force = false } = {},
) => {
const storageAdapter = this.getStorageAdapter();

const { event } = (await storageAdapter.pushEvent(eventDetail, {
eventStoreId: this.eventStoreId,
force,
})) as { event: $EVENT_DETAILS };

let nextAggregate: AGGREGATE | undefined = undefined;
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/eventStore/eventStore.type.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ assertPushEventInput1;

const assertPushEventInput2: A.Equals<
Parameters<typeof pokemonsEventStore.pushEvent>[1],
{ prevAggregate?: PokemonAggregate | undefined } | undefined
{ prevAggregate?: PokemonAggregate | undefined; force?: boolean } | undefined
> = 1;
assertPushEventInput2;

Expand Down
1 change: 1 addition & 0 deletions packages/core/src/eventStore/eventStore.unit.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ describe('event store', () => {
expect(pushEventMock).toHaveBeenCalledTimes(1);
expect(pushEventMock).toHaveBeenCalledWith(pikachuLeveledUpEvent, {
eventStoreId: pokemonsEventStore.eventStoreId,
force: false,
});
expect(response).toStrictEqual({ event: pikachuLeveledUpEvent });
});
Expand Down
5 changes: 4 additions & 1 deletion packages/core/src/eventStore/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export type EventPusher<
event: $EVENT_DETAILS extends EventDetail
? OptionalTimestamp<$EVENT_DETAILS>
: $EVENT_DETAILS,
options?: { prevAggregate?: $AGGREGATE },
options?: { prevAggregate?: $AGGREGATE; force?: boolean },
) => Promise<{ event: EVENT_DETAILS; nextAggregate?: AGGREGATE }>;

export type AggregateIdsLister = (
Expand All @@ -62,6 +62,9 @@ export type EventGroupPusher = <
...GroupedEvent[],
],
>(
/**
* @debt v2 "use an array and enable options in 2nd arg (useful for 'force' opt for instance)"
*/
...groupedEvents: GROUPED_EVENTS
) => Promise<{ eventGroup: EventGroupPusherResponse<GROUPED_EVENTS> }>;

Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export type { EventDetail, OptionalTimestamp } from './event/eventDetail';
export type { StorageAdapter } from './storageAdapter';
export type {
EventsQueryOptions,
PushEventOptions,
EventStoreContext,
ListAggregateIdsOptions,
ListAggregateIdsOutput,
Expand Down Expand Up @@ -65,6 +66,7 @@ export type {
NotificationMessage,
StateCarryingMessage,
Message,
PublishMessageOptions,
EventStoreAggregateExistsMessage,
EventStoreNotificationMessage,
EventStoreStateCarryingMessage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
UndefinedMessageChannelAdapterError,
} from './errors';
import type { MessageChannelAdapter } from './messageChannelAdapter';
import type { PublishMessageOptions } from './types';

export class AggregateExistsMessageChannel<
EVENT_STORE extends EventStore = EventStore,
Expand All @@ -28,13 +29,15 @@ export class AggregateExistsMessageChannel<
EventStore,
EventStoreAggregateExistsMessage<EVENT_STORE>
>,
options?: PublishMessageOptions,
) => Promise<void>;
publishMessages: (
aggregateExistsMessages: $Contravariant<
EVENT_STORE,
EventStore,
EventStoreAggregateExistsMessage<EVENT_STORE>
>[],
options?: PublishMessageOptions,
) => Promise<void>;

constructor({
Expand Down Expand Up @@ -87,23 +90,33 @@ export class AggregateExistsMessageChannel<
return eventStore;
};

this.publishMessage = async aggregateExistsMessage => {
this.publishMessage = async (
aggregateExistsMessage,
{ replay = false } = {},
) => {
const { eventStoreId } = aggregateExistsMessage;
this.getEventStore(eventStoreId);

const messageChannelAdapter = this.getMessageChannelAdapter();

await messageChannelAdapter.publishMessage(aggregateExistsMessage);
await messageChannelAdapter.publishMessage(aggregateExistsMessage, {
replay,
});
};

this.publishMessages = async aggregateExistsMessages => {
this.publishMessages = async (
aggregateExistsMessages,
{ replay = false } = {},
) => {
for (const aggregateExistsMessage of aggregateExistsMessages) {
const { eventStoreId } = aggregateExistsMessage;
this.getEventStore(eventStoreId);
}
const messageChannelAdapter = this.getMessageChannelAdapter();

await messageChannelAdapter.publishMessages(aggregateExistsMessages);
await messageChannelAdapter.publishMessages(aggregateExistsMessages, {
replay,
});
};
}
}
1 change: 1 addition & 0 deletions packages/core/src/messaging/channel/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export type {
MessageChannelSourceEventStoreIds,
MessageChannelSourceEventStoreIdTypes,
} from './generics';
export type { PublishMessageOptions } from './types';
export type { MessageChannelAdapter } from './messageChannelAdapter';
export { AggregateExistsMessageChannel } from './aggregateExistsMessageChannel';
export { NotificationMessageChannel } from './notificationMessageChannel';
Expand Down
11 changes: 9 additions & 2 deletions packages/core/src/messaging/channel/messageChannelAdapter.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
import type { Message } from '../message';
import type { PublishMessageOptions } from './types';

export interface MessageChannelAdapter {
publishMessage: (message: Message) => Promise<void>;
publishMessages: (messages: Message[]) => Promise<void>;
publishMessage: (
message: Message,
options?: PublishMessageOptions,
) => Promise<void>;
publishMessages: (
messages: Message[],
options?: PublishMessageOptions,
) => Promise<void>;
}
21 changes: 17 additions & 4 deletions packages/core/src/messaging/channel/notificationMessageChannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
UndefinedMessageChannelAdapterError,
} from './errors';
import type { MessageChannelAdapter } from './messageChannelAdapter';
import type { PublishMessageOptions } from './types';

export class NotificationMessageChannel<
EVENT_STORE extends EventStore = EventStore,
Expand All @@ -28,13 +29,15 @@ export class NotificationMessageChannel<
EventStore,
EventStoreNotificationMessage<EVENT_STORE>
>,
options?: PublishMessageOptions,
) => Promise<void>;
publishMessages: (
notificationMessages: $Contravariant<
EVENT_STORE,
EventStore,
EventStoreNotificationMessage<EVENT_STORE>
>[],
options?: PublishMessageOptions,
) => Promise<void>;

constructor({
Expand Down Expand Up @@ -87,23 +90,33 @@ export class NotificationMessageChannel<
return eventStore;
};

this.publishMessage = async notificationMessage => {
this.publishMessage = async (
notificationMessage,
{ replay = false } = {},
) => {
const { eventStoreId } = notificationMessage;
this.getEventStore(eventStoreId);

const messageChannelAdapter = this.getMessageChannelAdapter();

await messageChannelAdapter.publishMessage(notificationMessage);
await messageChannelAdapter.publishMessage(notificationMessage, {
replay,
});
};

this.publishMessages = async notificationMessages => {
this.publishMessages = async (
notificationMessages,
{ replay = false } = {},
) => {
for (const notificationMessage of notificationMessages) {
const { eventStoreId } = notificationMessage;
this.getEventStore(eventStoreId);
}
const messageChannelAdapter = this.getMessageChannelAdapter();

await messageChannelAdapter.publishMessages(notificationMessages);
await messageChannelAdapter.publishMessages(notificationMessages, {
replay,
});
};
}
}
32 changes: 26 additions & 6 deletions packages/core/src/messaging/channel/stateCarryingMessageChannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
UndefinedMessageChannelAdapterError,
} from './errors';
import type { MessageChannelAdapter } from './messageChannelAdapter';
import type { PublishMessageOptions } from './types';

export class StateCarryingMessageChannel<
EVENT_STORE extends EventStore = EventStore,
Expand All @@ -31,13 +32,15 @@ export class StateCarryingMessageChannel<
EventStore,
EventStoreStateCarryingMessage<EVENT_STORE>
>,
options?: PublishMessageOptions,
) => Promise<void>;
getAggregateAndPublishMessage: (
notificationMessage: $Contravariant<
EVENT_STORE,
EventStore,
EventStoreNotificationMessage<EVENT_STORE>
>,
options?: PublishMessageOptions,
) => Promise<void>;

publishMessages: (
Expand All @@ -46,6 +49,7 @@ export class StateCarryingMessageChannel<
EventStore,
EventStoreStateCarryingMessage<EVENT_STORE>
>[],
options?: PublishMessageOptions,
) => Promise<void>;

constructor({
Expand Down Expand Up @@ -98,16 +102,24 @@ export class StateCarryingMessageChannel<
return eventStore;
};

this.publishMessage = async stateCarryingMessage => {
this.publishMessage = async (
stateCarryingMessage,
{ replay = false } = {},
) => {
const { eventStoreId } = stateCarryingMessage;
this.getEventStore(eventStoreId);

const messageChannelAdapter = this.getMessageChannelAdapter();

await messageChannelAdapter.publishMessage(stateCarryingMessage);
await messageChannelAdapter.publishMessage(stateCarryingMessage, {
replay,
});
};

this.getAggregateAndPublishMessage = async notificationMessage => {
this.getAggregateAndPublishMessage = async (
notificationMessage,
{ replay = false } = {},
) => {
const { eventStoreId, event } = notificationMessage;
const { aggregateId, version } = event;

Expand All @@ -117,18 +129,26 @@ export class StateCarryingMessageChannel<
maxVersion: version,
});

await this.publishMessage({ ...notificationMessage, aggregate });
await this.publishMessage(
{ ...notificationMessage, aggregate },
{ replay },
);
};

this.publishMessages = async stateCarryingMessages => {
this.publishMessages = async (
stateCarryingMessages,
{ replay = false } = {},
) => {
for (const stateCarryingMessage of stateCarryingMessages) {
const { eventStoreId } = stateCarryingMessage;
this.getEventStore(eventStoreId);
}

const messageChannelAdapter = this.getMessageChannelAdapter();

await messageChannelAdapter.publishMessages(stateCarryingMessages);
await messageChannelAdapter.publishMessages(stateCarryingMessages, {
replay,
});
};
}
}
3 changes: 3 additions & 0 deletions packages/core/src/messaging/channel/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export type PublishMessageOptions = {
replay?: boolean;
};
17 changes: 16 additions & 1 deletion packages/core/src/messaging/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
export * from './channel';
export {
AggregateExistsMessageChannel,
NotificationMessageChannel,
StateCarryingMessageChannel,
MessageChannelEventStoreNotFoundError,
UndefinedMessageChannelAdapterError,
} from './channel';
export type {
EventStoreMessageChannel,
MessageChannelSourceEventStores,
MessageChannelMessage,
MessageChannelSourceEventStoreIds,
MessageChannelSourceEventStoreIdTypes,
PublishMessageOptions,
MessageChannelAdapter,
} from './channel';
export * from './bus';
export * from './queue';
export type {
Expand Down
Loading
Loading