Skip to content

Commit

Permalink
Merge pull request #131 from castore-dev/add-rate-limiting-to-dam-utils
Browse files Browse the repository at this point in the history
patch: create pourEventStoreCollectionEvents util
  • Loading branch information
ThomasAribart committed Jul 16, 2023
2 parents 7c22a5e + fdb2771 commit 43937c2
Show file tree
Hide file tree
Showing 16 changed files with 672 additions and 167 deletions.
78 changes: 71 additions & 7 deletions packages/dam/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ import { pourEventStoreAggregateIds } from '@castore/dam';

// 👇 ...or AggregateExistsMessageBus
const maintenanceMessageQueue = new AggregateExistsMessageQueue({
sourceEventStores: [pokemonEventStore],
...
});

await pourEventStoreAggregateIds({
eventStore: pokemonsEventStore,
const results = await pourEventStoreAggregateIds({
eventStore: pokemonEventStore,
messageChannel: maintenanceMessageQueue,
// 👇 Optional `listAggregateIds` options (except "pageToken")
options: {
Expand All @@ -57,6 +58,14 @@ await pourEventStoreAggregateIds({
// 👇 Optional rate limit (messages/second)
rateLimit: 100,
});

const {
// 👇 Count of poured aggregate ids
pouredAggregateIdCount,
// 👇 Infos about first/last scanned aggregates (potentially undefined)
firstScannedAggregate,
lastScannedAggregate,
} = results;
```

### `pourAggregateEvents`
Expand All @@ -68,11 +77,12 @@ import { pourAggregateEvents } from '@castore/dam';

// 👇 ...or NotificationMessageBus
const maintenanceMessageQueue = new NotificationMessageQueue({
sourceEventStores: [pokemonEventStore],
...
});

await pourAggregateEvents({
eventStore: pokemonsEventStore,
const results = await pourAggregateEvents({
eventStore: pokemonEventStore,
messageChannel: maintenanceMessageQueue,
aggregateId: 'pikachu1',
// 👇 Optional `getEvents` options
Expand All @@ -90,22 +100,31 @@ await pourAggregateEvents({
// 👇 Optional rate limit (messages/second)
rateLimit: 100,
});

const {
// 👇 Count of poured aggregate ids
pouredEventCount,
// 👇 Infos about first/last scanned events (potentially undefined)
firstPouredEvent,
lastPouredEvent,
} = results;
```

### `pourEventStoreEvents`

Pour all the events of an event store in a provided [`NotificationMessageChannel`](https://github.com/castore-dev/castore#--event-driven-architecture). Events are published in the order of their timestamps (accross aggregates).
Pour all the events of an event store in a provided [`NotificationMessageChannel`](https://github.com/castore-dev/castore#--event-driven-architecture). Events are published in the order of their timestamps (independently of their aggregate).

```ts
import { pourEventStoreEvents } from '@castore/dam';

// 👇 ...or NotificationMessageBus
const maintenanceMessageQueue = new NotificationMessageQueue({
sourceEventStores: [pokemonEventStore],
...
});

await pourEventStoreEvents({
eventStore: pokemonsEventStore,
const results = await pourEventStoreEvents({
eventStore: pokemonEventStore,
messageChannel: maintenanceMessageQueue,
// 👇 Optional `timestamp` filters
filters: {
Expand All @@ -115,4 +134,49 @@ await pourEventStoreEvents({
// 👇 Optional rate limit (messages/second)
rateLimit: 100,
});

const {
// 👇 Count of poured events
pouredEventCount,
// 👇 Infos about first/last scanned aggregates (potentially undefined)
firstScannedAggregate,
lastScannedAggregate,
} = results;
```

### `pourEventStoreCollectionEvents`

Pour all the events of a **collection of event stores** in a provided [`NotificationMessageChannel`](https://github.com/castore-dev/castore#--event-driven-architecture). Events are published in the order of their timestamps (independently of their aggregate and event store).

```ts
import { pourEventStoreEvents } from '@castore/dam';

// 👇 ...or NotificationMessageBus
const maintenanceMessageQueue = new NotificationMessageQueue({
sourceEventStores: [pokemonEventStore, trainerEventStore],
// ...
});

const results = await pourEventStoreCollectionEvents({
eventStores: [pokemonEventStore, trainerEventStore],
messageChannel: maintenanceMessageQueue,
// 👇 Optional `timestamp` filters
filters: {
from: '2020-01-01T00:00:00.000Z',
to: '2023-01-01T00:00:00.000Z',
},
// 👇 Optional rate limit (messages/second)
rateLimit: 100,
});

const {
// 👇 Count of poured events
pouredEventCount,
// 👇 Infos about first/last scanned aggregates (potentially undefined)
scans: {
// 👇 By event store id
POKEMONS: { firstScannedAggregate, lastScannedAggregate },
TRAINERS: { firstScannedAggregate, lastScannedAggregate },
},
} = results;
```
101 changes: 83 additions & 18 deletions packages/dam/src/fixtures.test.ts
Original file line number Diff line number Diff line change
@@ -1,44 +1,55 @@
import type { EventStoreEventsDetails } from '@castore/core';
import { pokemonsEventStore } from '@castore/demo-blueprint';
import {
pokemonsEventStore,
trainersEventStore,
} from '@castore/demo-blueprint';
import { mockEventStore } from '@castore/test-tools';

export const eventStoreId = pokemonsEventStore.eventStoreId;
export const pokemonEvtStoreId = pokemonsEventStore.eventStoreId;
export const trainerEvtStoreId = trainersEventStore.eventStoreId;

export const aggregate1Id = 'pikachu1';
export const aggregate2Id = 'charizard1';
export const aggregate3Id = 'pikachu2';
// AGGREGATE IDS

export const aggregate1Events: EventStoreEventsDetails<
export const pikachuId = 'pikachu1';
export const charizardId = 'charizard1';
export const arcanineId = 'arcanine1';

export const ashKetchumId = 'ashKetchum';
export const garyOakId = 'garyOak';

// POKEMON EVENTS

export const pikachuEvents: EventStoreEventsDetails<
typeof pokemonsEventStore
>[] = [
{
aggregateId: aggregate1Id,
aggregateId: pikachuId,
version: 1,
type: 'APPEARED',
timestamp: '2021-01-01T00:00:00.000Z',
payload: { name: 'Pikachu', level: 2 },
metadata: {},
},
{
aggregateId: aggregate1Id,
aggregateId: pikachuId,
version: 2,
type: 'CAUGHT_BY_TRAINER',
timestamp: '2022-01-01T00:00:00.000Z',
payload: { trainerId: 'ashKetchum' },
payload: { trainerId: ashKetchumId },
},
{
aggregateId: aggregate1Id,
aggregateId: pikachuId,
version: 3,
type: 'LEVELLED_UP',
timestamp: '2023-07-01T00:00:00.000Z',
},
];

export const aggregate2Events: EventStoreEventsDetails<
export const charizardEvents: EventStoreEventsDetails<
typeof pokemonsEventStore
>[] = [
{
aggregateId: aggregate2Id,
aggregateId: charizardId,
version: 1,
type: 'APPEARED',
timestamp: '2022-07-01T00:00:00.000Z',
Expand All @@ -47,21 +58,75 @@ export const aggregate2Events: EventStoreEventsDetails<
},
];

export const aggregate3Events: EventStoreEventsDetails<
export const arcanineEvents: EventStoreEventsDetails<
typeof pokemonsEventStore
>[] = [
{
aggregateId: aggregate3Id,
aggregateId: arcanineId,
version: 1,
type: 'APPEARED',
timestamp: '2023-01-01T00:00:00.000Z',
payload: { name: 'Pikachu', level: 3 },
metadata: {},
},
{
aggregateId: arcanineId,
version: 2,
type: 'CAUGHT_BY_TRAINER',
timestamp: '2024-01-01T00:00:00.000Z',
payload: { trainerId: garyOakId },
},
];

// TRAINER EVENTS

export const ashKetchumEvents: EventStoreEventsDetails<
typeof trainersEventStore
>[] = [
{
aggregateId: ashKetchumId,
version: 1,
type: 'GAME_STARTED',
timestamp: '2020-12-01T00:00:00.000Z',
payload: { trainerName: 'Ash Ketchum' },
},
{
aggregateId: ashKetchumId,
version: 2,
type: 'POKEMON_CAUGHT',
timestamp: '2022-01-01T00:00:00.000Z',
payload: { pokemonId: pikachuId },
},
];

export const mockedEventStore = mockEventStore(pokemonsEventStore, [
...aggregate1Events,
...aggregate2Events,
...aggregate3Events,
export const garyOakEvents: EventStoreEventsDetails<
typeof trainersEventStore
>[] = [
{
aggregateId: garyOakId,
version: 1,
type: 'GAME_STARTED',
timestamp: '2022-12-01T00:00:00.000Z',
payload: { trainerName: 'Gary Oak' },
},
{
aggregateId: garyOakId,
version: 2,
type: 'POKEMON_CAUGHT',
timestamp: '2024-01-01T00:00:00.000Z',
payload: { pokemonId: arcanineId },
},
];

// EVENT STORES

export const trainerEventStore = mockEventStore(trainersEventStore, [
...ashKetchumEvents,
...garyOakEvents,
]);

export const pokemonEventStore = mockEventStore(pokemonsEventStore, [
...pikachuEvents,
...charizardEvents,
...arcanineEvents,
]);
2 changes: 2 additions & 0 deletions packages/dam/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
export { pourEventStoreAggregateIds } from './pourEventStoreAggregateIds';
export { pourAggregateEvents } from './pourAggregateEvents';
export { pourEventStoreEvents } from './pourEventStoreEvents';
export { pourEventStoreCollectionEvents } from './pourEventStoreCollectionEvents';
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import type {
EventsQueryOptions,
} from '@castore/core';

import { getIsBetween } from '~/utils/getIsBetween';
import { getThrottle } from '~/utils/getThrottle';
import { getIsBetween } from '~/utils/isBetween';

interface Props<EVENT_STORE extends EventStore> {
eventStore: EVENT_STORE;
Expand Down
Loading

0 comments on commit 43937c2

Please sign in to comment.