Skip to content

Commit

Permalink
Merge pull request #162 from castore-dev/enable-passing-options-to-pu…
Browse files Browse the repository at this point in the history
…shEventGroup
  • Loading branch information
ThomasAribart committed Oct 6, 2023
2 parents c2b5286 + c8b724c commit 662d40b
Show file tree
Hide file tree
Showing 13 changed files with 147 additions and 30 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/label-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ defaults:
run:
shell: bash

permissions:
contents: read
pull-requests: write

jobs:
label-pr:
name: 🏷 Label PR
Expand Down
9 changes: 9 additions & 0 deletions docs/docs/2-event-sourcing/6-joining-data.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ await EventStore.pushEventGroup(
...
}),
);

// You can also pass options as a first argument
await EventStore.pushEventGroup(
{ force: true },
pokemonsEventStore.groupEvent({
...
}),
...
);
```

:::note
Expand Down
8 changes: 8 additions & 0 deletions docs/docs/3-reacting-to-events/4-connected-event-store.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ await connectedPokemonsEventStore.pushEvent(
{ prevAggregate: pokemonAggregate },
// Removes the need to re-fetch 🙌
);

await EventStore.pushEventGroup(
connectedPokemonsEventStore.groupEvent(
{ ... },
// Will also work on event groups 🙌
{ prevAggregate: pokemonAggregate },
),
);
```

Compared to data streams, connected event stores have the advantage of simplicity, performances and costs. However, they **strongly decouple your storage and messaging solutions**: Make sure to anticipate any issue that might arise (consistency, non-caught errors etc.).
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/eventStorageAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export interface EventStorageAdapter {
options: PushEventOptions,
) => Promise<{ event: EventDetail }>;
pushEventGroup: (
options: { force?: boolean },
...groupedEvents: [GroupedEvent, ...GroupedEvent[]]
) => Promise<{ eventGroup: { event: EventDetail }[] }>;
groupEvent: (eventDetail: OptionalTimestamp<EventDetail>) => GroupedEvent;
Expand Down
34 changes: 27 additions & 7 deletions packages/core/src/eventStore/eventStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import type { Aggregate } from '~/aggregate';
import type { EventDetail } from '~/event/eventDetail';
import type { EventType, EventTypeDetails } from '~/event/eventType';
import type { GroupedEvent } from '~/event/groupedEvent';
import { GroupedEvent } from '~/event/groupedEvent';
import type { EventStorageAdapter } from '~/eventStorageAdapter';
import type { $Contravariant } from '~/utils';

Expand Down Expand Up @@ -41,15 +41,29 @@ export class EventStore<
> {
static pushEventGroup: EventGroupPusher = async <
GROUPED_EVENTS extends [GroupedEvent, ...GroupedEvent[]],
OPTIONS_OR_GROUPED_EVENTS_HEAD extends
| GroupedEvent
| { force?: boolean } = GroupedEvent,
>(
...groupedEvents: GROUPED_EVENTS
optionsOrGroupedEvent: OPTIONS_OR_GROUPED_EVENTS_HEAD,
..._groupedEvents: GROUPED_EVENTS
) => {
const [groupedEventsHead, ...groupedEventsTail] = groupedEvents;
const groupedEvents = (
optionsOrGroupedEvent instanceof GroupedEvent
? [optionsOrGroupedEvent, ..._groupedEvents]
: _groupedEvents
) as [GroupedEvent, ...GroupedEvent[]];

const options = (
optionsOrGroupedEvent instanceof GroupedEvent ? {} : optionsOrGroupedEvent
) as { force?: boolean };

const [groupedEventsHead] = groupedEvents;

const { eventGroup: eventGroupWithoutAggregates } =
await groupedEventsHead.eventStorageAdapter.pushEventGroup(
groupedEventsHead,
...groupedEventsTail,
options,
...groupedEvents,
);

const eventGroupWithAggregates = eventGroupWithoutAggregates.map(
Expand All @@ -68,7 +82,7 @@ export class EventStore<

return { event, ...(nextAggregate ? { nextAggregate } : {}) };
},
) as EventGroupPusherResponse<GROUPED_EVENTS>;
);

await Promise.all(
groupedEvents.map((groupedEvent, eventIndex) => {
Expand All @@ -82,7 +96,13 @@ export class EventStore<
}),
);

return { eventGroup: eventGroupWithAggregates };
return { eventGroup: eventGroupWithAggregates } as {
eventGroup: OPTIONS_OR_GROUPED_EVENTS_HEAD extends GroupedEvent
? EventGroupPusherResponse<
[OPTIONS_OR_GROUPED_EVENTS_HEAD, ...GROUPED_EVENTS]
>
: EventGroupPusherResponse<GROUPED_EVENTS>;
};
};

_types?: {
Expand Down
39 changes: 34 additions & 5 deletions packages/core/src/eventStore/eventStore.type.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable max-lines */
import type { A } from 'ts-toolbelt';

import type { Aggregate } from '~/aggregate';
Expand Down Expand Up @@ -147,17 +148,27 @@ assertGroupEventOutput;

const assertGenericPushEventGroupInput: A.Equals<
Parameters<typeof EventStore.pushEventGroup>,
[GroupedEvent, ...GroupedEvent[]]
[
GroupedEvent | { force?: boolean | undefined },
GroupedEvent,
...GroupedEvent[],
]
> = 1;
assertGenericPushEventGroupInput;

const assertGenericPushEventGroupOutput: A.Equals<
ReturnType<typeof EventStore.pushEventGroup>,
Promise<{
eventGroup: {
event: EventDetail;
nextAggregate?: Aggregate | undefined;
}[];
eventGroup:
| {
event: EventDetail;
nextAggregate?: Aggregate | undefined;
}[]
// Weird TS bug
| {
event: EventDetail;
nextAggregate?: Aggregate | undefined;
}[];
}>
> = 1;
assertGenericPushEventGroupOutput;
Expand All @@ -178,3 +189,21 @@ const assertPushEventGroupOutput: A.Equals<
}
> = 1;
assertPushEventGroupOutput;

const pushTwoPokemonsEventGroupWithOptions = () =>
EventStore.pushEventGroup(
{ force: true },
pokemonsEventStore.groupEvent(pikachuAppearedEvent),
pokemonsEventStore.groupEvent(pikachuCaughtEvent),
);

const assertPushTwoPokemonsEventGroupWithOptions: A.Equals<
Awaited<ReturnType<typeof pushTwoPokemonsEventGroupWithOptions>>,
{
eventGroup: [
{ event: PokemonEventDetails; nextAggregate?: PokemonAggregate },
{ event: PokemonEventDetails; nextAggregate?: PokemonAggregate },
];
}
> = 1;
assertPushTwoPokemonsEventGroupWithOptions;
40 changes: 39 additions & 1 deletion packages/core/src/eventStore/eventStore.unit.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ describe('event store', () => {
timestamp: pikachuLeveledUpEvent.timestamp,
};

beforeEach(() => {
pushEventGroupMock.mockReset();
});

it('pushes new event group correctly', async () => {
pushEventGroupMock.mockResolvedValue({
eventGroup: [
Expand All @@ -251,7 +255,41 @@ describe('event store', () => {
const response = await EventStore.pushEventGroup(...eventGroup);

expect(pushEventGroupMock).toHaveBeenCalledTimes(1);
expect(pushEventGroupMock).toHaveBeenCalledWith(...eventGroup);
expect(pushEventGroupMock).toHaveBeenCalledWith({}, ...eventGroup);

expect(response).toStrictEqual({
eventGroup: [
{ event: pikachuLeveledUpEvent },
{ event: charizardLeveledUpEvent },
],
});
});

it('passes options through', async () => {
const options = { force: true };

pushEventGroupMock.mockResolvedValue({
eventGroup: [
{ event: pikachuLeveledUpEvent },
{ event: charizardLeveledUpEvent },
],
});

const eventGroup = [
new GroupedEvent({
event: pikachuLeveledUpEvent,
eventStorageAdapter: eventStorageAdapterMock,
}),
new GroupedEvent({
event: charizardLeveledUpEvent,
eventStorageAdapter: eventStorageAdapterMock,
}),
] as const;

const response = await EventStore.pushEventGroup(options, ...eventGroup);

expect(pushEventGroupMock).toHaveBeenCalledTimes(1);
expect(pushEventGroupMock).toHaveBeenCalledWith(options, ...eventGroup);

expect(response).toStrictEqual({
eventGroup: [
Expand Down
15 changes: 11 additions & 4 deletions packages/core/src/eventStore/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,19 @@ export type EventGroupPusher = <
GroupedEvent,
...GroupedEvent[],
],
OPTIONS_OR_GROUPED_EVENTS_HEAD extends
| GroupedEvent
| { force?: boolean } = GroupedEvent,
>(
/**
* @debt v2 "use an array and enable options in 2nd arg (useful for 'force' opt for instance)"
*/
optionsOrGroupedEventsHead: OPTIONS_OR_GROUPED_EVENTS_HEAD,
...groupedEvents: GROUPED_EVENTS
) => Promise<{ eventGroup: EventGroupPusherResponse<GROUPED_EVENTS> }>;
) => Promise<{
eventGroup: OPTIONS_OR_GROUPED_EVENTS_HEAD extends GroupedEvent
? EventGroupPusherResponse<
[OPTIONS_OR_GROUPED_EVENTS_HEAD, ...GROUPED_EVENTS]
>
: EventGroupPusherResponse<GROUPED_EVENTS>;
}>;

export type EventGroupPusherResponse<GROUPED_EVENTS extends GroupedEvent[]> =
number extends GROUPED_EVENTS['length']
Expand Down
4 changes: 2 additions & 2 deletions packages/event-storage-adapter-dynamodb/src/legacyAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ export class LegacyDynamoDBEventStorageAdapter implements EventStorageAdapter {
/**
* @debt test "Add unit test for pushEventGroup"
*/
this.pushEventGroup = async (...groupedEventsInput) => {
this.pushEventGroup = async (options, ...groupedEventsInput) => {
const { groupedEvents, timestamp = new Date().toISOString() } =
parseGroupedEvents(...groupedEventsInput);

Expand All @@ -302,7 +302,7 @@ export class LegacyDynamoDBEventStorageAdapter implements EventStorageAdapter {
TransactItems: groupedEvents.map(groupedEvent => ({
Put: groupedEvent.eventStorageAdapter.getPushEventInput(
{ timestamp, ...groupedEvent.event },
{ eventStoreId: groupedEvent.context.eventStoreId },
{ ...options, eventStoreId: groupedEvent.context.eventStoreId },
),
})),
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ export class DynamoDBSingleTableEventStorageAdapter
/**
* @debt test "Add unit test for pushEventGroup"
*/
this.pushEventGroup = async (...groupedEventsInput) => {
this.pushEventGroup = async (options, ...groupedEventsInput) => {
const { groupedEvents, timestamp = new Date().toISOString() } =
parseGroupedEvents(...groupedEventsInput);

Expand All @@ -314,7 +314,7 @@ export class DynamoDBSingleTableEventStorageAdapter
TransactItems: groupedEvents.map(groupedEvent => ({
Put: groupedEvent.eventStorageAdapter.getPushEventInput(
{ timestamp, ...groupedEvent.event },
groupedEvent.context,
{ ...options, ...groupedEvent.context },
),
})),
}),
Expand Down
4 changes: 2 additions & 2 deletions packages/event-storage-adapter-in-memory/src/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ export class InMemoryEventStorageAdapter implements EventStorageAdapter {
resolve(this.pushEventSync({ timestamp, ...event }, options));
});

this.pushEventGroup = async (...groupedEventsInput) =>
this.pushEventGroup = async (options, ...groupedEventsInput) =>
new Promise(resolve => {
const { groupedEvents, timestamp = new Date().toISOString() } =
parseGroupedEvents(...groupedEventsInput);
Expand All @@ -188,7 +188,7 @@ export class InMemoryEventStorageAdapter implements EventStorageAdapter {
try {
const response = eventStorageAdapter.pushEventSync(
{ timestamp, ...event },
context,
{ ...options, ...context },
);
responses.push(response);
} catch (error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ describe('in-memory storage adapter', () => {
];

const eventGroup = await eventStorageAdapterA.pushEventGroup(
{ force: true },
...groupedEvents,
);
expect(eventGroup).toStrictEqual({
Expand Down Expand Up @@ -312,7 +313,7 @@ describe('in-memory storage adapter', () => {
];

await expect(() =>
eventStorageAdapterA.pushEventGroup(...groupedEvents),
eventStorageAdapterA.pushEventGroup({}, ...groupedEvents),
).rejects.toThrow();
});

Expand All @@ -330,7 +331,7 @@ describe('in-memory storage adapter', () => {
];

await expect(() =>
eventStorageAdapterA.pushEventGroup(...groupedEvents),
eventStorageAdapterA.pushEventGroup({}, ...groupedEvents),
).rejects.toThrow();
});

Expand All @@ -351,7 +352,7 @@ describe('in-memory storage adapter', () => {
];

await expect(() =>
eventStorageAdapterA.pushEventGroup(...groupedEvents),
eventStorageAdapterA.pushEventGroup({}, ...groupedEvents),
).rejects.toThrow();
});

Expand All @@ -377,7 +378,7 @@ describe('in-memory storage adapter', () => {
];

await expect(() =>
eventStorageAdapterA.pushEventGroup(...groupedEvents),
eventStorageAdapterA.pushEventGroup({}, ...groupedEvents),
).rejects.toThrow();

expect(pushEventSyncASpy).toHaveBeenCalledOnce();
Expand Down
6 changes: 3 additions & 3 deletions packages/event-storage-adapter-redux/src/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ export class ReduxEventStorageAdapter implements EventStorageAdapter {

this.pushEventSync = (event, options) => {
const { aggregateId } = event;
const force = options.force ?? false;
const { force = false } = options;

const eventStoreState = this.getEventStoreState();

Expand Down Expand Up @@ -172,7 +172,7 @@ export class ReduxEventStorageAdapter implements EventStorageAdapter {
resolve(this.pushEventSync({ timestamp, ...event }, context));
});

this.pushEventGroup = async (...groupedEventsInput) =>
this.pushEventGroup = async (options, ...groupedEventsInput) =>
new Promise(resolve => {
const { groupedEvents, timestamp = new Date().toISOString() } =
parseGroupedEvents(...groupedEventsInput);
Expand All @@ -185,7 +185,7 @@ export class ReduxEventStorageAdapter implements EventStorageAdapter {
try {
const response = eventStorageAdapter.pushEventSync(
{ timestamp, ...event },
context,
{ ...options, ...context },
);
responses.push(response);
} catch (error) {
Expand Down

0 comments on commit 662d40b

Please sign in to comment.