Skip to content

Commit

Permalink
finalize adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas Aribart committed Aug 25, 2023
1 parent 1e41263 commit 581d87a
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 9 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1320,6 +1320,7 @@ Castore also comes with a handy [React Visualizer](./packages/react-visualizer/R
#### Message Buses Adapters
- [EventBridge Message Bus Adapter](./packages/event-bridge-message-bus-adapter/README.md): Implementation of the `MessageBusAdapter` interface based on AWS EventBridge.
- [EventBridge + S3 Message Bus Adapter](./packages/event-bridge-s3-message-bus-adapter/README.md): Implementation of the `MessageBusAdapter` interface based on AWS EventBridge and S3.
- [In-Memory Message Bus Adapter](./packages/in-memory-message-bus-adapter/README.md): Implementation of the `MessageBusAdapter` interface using a local Node/JS event emitter. To be used in manual or unit tests.
### - Common Patterns
Expand Down
1 change: 1 addition & 0 deletions docs/docs/5-resources.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,5 @@ Castore also comes with a handy [React Visualizer](https://www.npmjs.com/package
### Message Buses Adapters

- [EventBridge Message Bus Adapter](https://www.npmjs.com/package/@castore/event-bridge-message-bus-adapter): Implementation of the `MessageBusAdapter` interface based on AWS EventBridge.
- [EventBridge + S3 Message Bus Adapter](https://www.npmjs.com/package/@castore/event-bridge-s3-message-bus-adapter/README.md): Implementation of the `MessageBusAdapter` interface based on AWS EventBridge and S3.
- [In-Memory Message Bus Adapter](https://www.npmjs.com/package/@castore/in-memory-message-bus-adapter): Implementation of the `MessageBusAdapter` interface using a local Node/JS event emitter. To be used in manual or unit tests.
32 changes: 27 additions & 5 deletions packages/event-bridge-s3-message-bus-adapter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,36 @@ If the event is a notification or state-carrying event, the `version` is also ad
const key = 'temporary-storage/POKEMONS/pikachu1/2020-01-01T00:00:00.000Z#3';
```

On the listeners side, you can use the `EventBridgeS3MessageBusMessage` TS type to type your argument:
On the listeners side, you can use the `EventBridgeS3MessageBusMessage` TS type to type your argument, and the `parseMessage` util to fetch the message if it has been uploaded to S3 (it passes it through otherwise):

```ts
import type { EventBridgeS3MessageBusMessage } from '@castore/event-bridge-s3-message-bus-adapter';
import {
EventBridgeS3MessageBusMessage,
parseMessage,
} from '@castore/event-bridge-s3-message-bus-adapter';

const listener = async (
message: EventBridgeS3MessageBusMessage<typeof appMessageBus>,
) => {
// 🙌 Correctly typed!
const { eventStoreId, event } = message.detail;
const { eventStoreId, event } = await parseMessage(message);
};
```

Note that `parseMessage` uses `fetch` under the hood, so you will have to provide it if your version of node doesn't:

```ts
import fetch from 'node-fetch';

import {
EventBridgeS3MessageBusMessage,
parseMessage,
} from '@castore/event-bridge-s3-message-bus-adapter';

const listener = async (
message: EventBridgeS3MessageBusMessage<typeof appMessageBus>,
) => {
const { eventStoreId, event } = await parseMessage(message, { fetch });
};
```

Expand All @@ -106,10 +126,12 @@ const listener = async (
>,
) => {
// 🙌 Correctly typed!
const { eventStoreId, event } = message.detail;
const { eventStoreId, event } = await parseMessage(message);
};
```

## 🔑 IAM

The `publishMessage` method requires the `events:PutEvents` IAM permission on the provided event bus, as well as the `s3:putObject` and `s3:getObject` IAM permissions on the provided s3 bucket for the desired keys (e.g. `my-bucket-name/temporary-storage/*`).
The `publishMessage` method requires the `events:PutEvents` IAM permission on the provided event bus, as well as the `s3:putObject` and `s3:getObject` IAM permissions on the provided s3 bucket at the desired keys (e.g. `my-bucket-name/temporary-storage/*`).

The `parseMessage` doesn't require any permission as the messageURL is pre-signed.
3 changes: 2 additions & 1 deletion packages/event-bridge-s3-message-bus-adapter/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
"@castore/event-bridge-message-bus-adapter": "workspace:",
"@types/aws-lambda": "^8.10.111",
"lodash.chunk": "^4.2.0",
"ts-toolbelt": "^9.6.0"
"ts-toolbelt": "^9.6.0",
"undici": "^5.23.0"
},
"devDependencies": {
"@aws-sdk/client-eventbridge": "^3.2.0",
Expand Down
16 changes: 16 additions & 0 deletions packages/event-bridge-s3-message-bus-adapter/src/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,20 @@ export class EventBridgeS3MessageBusAdapter implements MessageChannelAdapter {
await this.eventBridgeMessageBusAdapter.publishEntries(entries);
};
}

set eventBusName(eventBusName: string | (() => string)) {
this.eventBridgeMessageBusAdapter.eventBusName = eventBusName;
}

get eventBusName(): string {
return this.eventBridgeMessageBusAdapter.getEventBusName();
}

set eventBridgeClient(eventBridgeClient: EventBridgeClient) {
this.eventBridgeMessageBusAdapter.eventBridgeClient = eventBridgeClient;
}

get eventBridgeClient(): EventBridgeClient {
return this.eventBridgeMessageBusAdapter.eventBridgeClient;
}
}
43 changes: 43 additions & 0 deletions packages/event-bridge-s3-message-bus-adapter/src/fetch.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import type {
FormData as _FormData,
Headers as _Headers,
HeadersInit as _HeadersInit,
BodyInit as _BodyInit,
Request as _Request,
RequestInit as _RequestInit,
RequestInfo as _RequestInfo,
RequestMode as _RequestMode,
RequestRedirect as _RequestRedirect,
RequestCredentials as _RequestCredentials,
RequestDestination as _RequestDestination,
ReferrerPolicy as _ReferrerPolicy,
Response as _Response,
ResponseInit as _ResponseInit,
ResponseType as _ResponseType,
} from 'undici';

declare global {
export const {
fetch,
FormData,
Headers,
Request,
Response,
}: typeof import('undici');

type FormData = _FormData;
type Headers = _Headers;
type HeadersInit = _HeadersInit;
type BodyInit = _BodyInit;
type Request = _Request;
type RequestInit = _RequestInit;
type RequestInfo = _RequestInfo;
type RequestMode = _RequestMode;
type RequestRedirect = _RequestRedirect;
type RequestCredentials = _RequestCredentials;
type RequestDestination = _RequestDestination;
type ReferrerPolicy = _ReferrerPolicy;
type Response = _Response;
type ResponseInit = _ResponseInit;
type ResponseType = _ResponseType;
}
1 change: 1 addition & 0 deletions packages/event-bridge-s3-message-bus-adapter/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export { EventBridgeS3MessageBusAdapter } from './adapter';
export type { EventBridgeS3MessageBusMessage } from './message';
export { parseMessage } from './parseMessage';
10 changes: 7 additions & 3 deletions packages/event-bridge-s3-message-bus-adapter/src/message.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { A, O } from 'ts-toolbelt';
import type { EventBridgeEvent } from 'aws-lambda';

import type {
AggregateExistsMessageBus,
Expand Down Expand Up @@ -31,7 +31,11 @@ export type EventBridgeS3MessageBusMessage<
EVENT_STORE_IDS,
EVENT_TYPES
> extends infer MESSAGE
? MESSAGE extends object
? O.Update<MESSAGE, 'Detail', A.x | OversizedEntryDetail>
? MESSAGE extends EventBridgeEvent<string, unknown>
? {
[KEY in keyof MESSAGE]: KEY extends 'detail'
? MESSAGE[KEY] | OversizedEntryDetail
: MESSAGE[KEY];
}
: never
: never;
39 changes: 39 additions & 0 deletions packages/event-bridge-s3-message-bus-adapter/src/parseMessage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import type { EventBridgeEvent } from 'aws-lambda';

import type { OversizedEntryDetail } from './message';

type ParsedMessage<MESSAGES extends EventBridgeEvent<string, unknown>> =
MESSAGES extends infer MESSAGE
? MESSAGE extends EventBridgeEvent<string, unknown>
? {
[KEY in keyof MESSAGE]: KEY extends 'detail'
? Exclude<MESSAGE[KEY], OversizedEntryDetail>
: MESSAGE[KEY];
}
: never
: never;

export const parseMessage = async <
MESSAGES extends EventBridgeEvent<string, unknown>,
>(
_message: MESSAGES,
{ fetch: _fetch = fetch }: { fetch?: typeof fetch } = { fetch },
): Promise<ParsedMessage<MESSAGES>> => {
const message = _message as EventBridgeEvent<string, unknown>;
const { detail: entry } = message;

if (typeof entry === 'object' && entry !== null && 'messageUrl' in entry) {
const response = await _fetch((entry as OversizedEntryDetail).messageUrl);

if (!response.ok) {
throw new Error(response.statusText);
}

return {
...message,
detail: await response.json(),
} as ParsedMessage<MESSAGES>;
}

return message as ParsedMessage<MESSAGES>;
};
26 changes: 26 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6276,6 +6276,7 @@ __metadata:
ts-toolbelt: ^9.6.0
ttypescript: ^1.5.13
typescript: ^4.6.3
undici: ^5.23.0
vitest: ^0.26.2
peerDependencies:
"@aws-sdk/client-eventbridge": ^3.0.0
Expand Down Expand Up @@ -12483,6 +12484,15 @@ __metadata:
languageName: node
linkType: hard

"busboy@npm:^1.6.0":
version: 1.6.0
resolution: "busboy@npm:1.6.0"
dependencies:
streamsearch: ^1.1.0
checksum: 32801e2c0164e12106bf236291a00795c3c4e4b709ae02132883fe8478ba2ae23743b11c5735a0aae8afe65ac4b6ca4568b91f0d9fed1fdbc32ede824a73746e
languageName: node
linkType: hard

"bytes@npm:3.0.0":
version: 3.0.0
resolution: "bytes@npm:3.0.0"
Expand Down Expand Up @@ -23922,6 +23932,13 @@ __metadata:
languageName: node
linkType: hard

"streamsearch@npm:^1.1.0":
version: 1.1.0
resolution: "streamsearch@npm:1.1.0"
checksum: 1cce16cea8405d7a233d32ca5e00a00169cc0e19fbc02aa839959985f267335d435c07f96e5e0edd0eadc6d39c98d5435fb5bbbdefc62c41834eadc5622ad942
languageName: node
linkType: hard

"string-argv@npm:^0.3.1":
version: 0.3.1
resolution: "string-argv@npm:0.3.1"
Expand Down Expand Up @@ -25057,6 +25074,15 @@ __metadata:
languageName: node
linkType: hard

"undici@npm:^5.23.0":
version: 5.23.0
resolution: "undici@npm:5.23.0"
dependencies:
busboy: ^1.6.0
checksum: 906ca4fb1d47163d2cee2ecbbc664a1d92508a2cdf1558146621109f525c983a83597910b36e6ba468240e95259be5939cea6babc99fc0c36360b16630f66784
languageName: node
linkType: hard

"unherit@npm:^1.0.4":
version: 1.1.3
resolution: "unherit@npm:1.1.3"
Expand Down

0 comments on commit 581d87a

Please sign in to comment.