diff --git a/README.md b/README.md index b78576b3..5b0a0401 100644 --- a/README.md +++ b/README.md @@ -1320,6 +1320,7 @@ Castore also comes with a handy [React Visualizer](./packages/react-visualizer/R #### Message Buses Adapters - [EventBridge Message Bus Adapter](./packages/event-bridge-message-bus-adapter/README.md): Implementation of the `MessageBusAdapter` interface based on AWS EventBridge. +- [EventBridge + S3 Message Bus Adapter](./packages/event-bridge-s3-message-bus-adapter/README.md): Implementation of the `MessageBusAdapter` interface based on AWS EventBridge and S3. - [In-Memory Message Bus Adapter](./packages/in-memory-message-bus-adapter/README.md): Implementation of the `MessageBusAdapter` interface using a local Node/JS event emitter. To be used in manual or unit tests. ### - Common Patterns diff --git a/docs/docs/5-resources.md b/docs/docs/5-resources.md index b0937f6c..e578f0c3 100644 --- a/docs/docs/5-resources.md +++ b/docs/docs/5-resources.md @@ -41,4 +41,5 @@ Castore also comes with a handy [React Visualizer](https://www.npmjs.com/package ### Message Buses Adapters - [EventBridge Message Bus Adapter](https://www.npmjs.com/package/@castore/event-bridge-message-bus-adapter): Implementation of the `MessageBusAdapter` interface based on AWS EventBridge. +- [EventBridge + S3 Message Bus Adapter](https://www.npmjs.com/package/@castore/event-bridge-s3-message-bus-adapter/README.md): Implementation of the `MessageBusAdapter` interface based on AWS EventBridge and S3. - [In-Memory Message Bus Adapter](https://www.npmjs.com/package/@castore/in-memory-message-bus-adapter): Implementation of the `MessageBusAdapter` interface using a local Node/JS event emitter. To be used in manual or unit tests. diff --git a/packages/event-bridge-s3-message-bus-adapter/README.md b/packages/event-bridge-s3-message-bus-adapter/README.md index f27d1053..6a5d42e9 100644 --- a/packages/event-bridge-s3-message-bus-adapter/README.md +++ b/packages/event-bridge-s3-message-bus-adapter/README.md @@ -80,16 +80,36 @@ If the event is a notification or state-carrying event, the `version` is also ad const key = 'temporary-storage/POKEMONS/pikachu1/2020-01-01T00:00:00.000Z#3'; ``` -On the listeners side, you can use the `EventBridgeS3MessageBusMessage` TS type to type your argument: +On the listeners side, you can use the `EventBridgeS3MessageBusMessage` TS type to type your argument, and the `parseMessage` util to fetch the message if it has been uploaded to S3 (it passes it through otherwise): ```ts -import type { EventBridgeS3MessageBusMessage } from '@castore/event-bridge-s3-message-bus-adapter'; +import { + EventBridgeS3MessageBusMessage, + parseMessage, +} from '@castore/event-bridge-s3-message-bus-adapter'; const listener = async ( message: EventBridgeS3MessageBusMessage, ) => { // 🙌 Correctly typed! - const { eventStoreId, event } = message.detail; + const { eventStoreId, event } = await parseMessage(message); +}; +``` + +Note that `parseMessage` uses `fetch` under the hood, so you will have to provide it if your version of node doesn't: + +```ts +import fetch from 'node-fetch'; + +import { + EventBridgeS3MessageBusMessage, + parseMessage, +} from '@castore/event-bridge-s3-message-bus-adapter'; + +const listener = async ( + message: EventBridgeS3MessageBusMessage, +) => { + const { eventStoreId, event } = await parseMessage(message, { fetch }); }; ``` @@ -106,10 +126,12 @@ const listener = async ( >, ) => { // 🙌 Correctly typed! - const { eventStoreId, event } = message.detail; + const { eventStoreId, event } = await parseMessage(message); }; ``` ## 🔑 IAM -The `publishMessage` method requires the `events:PutEvents` IAM permission on the provided event bus, as well as the `s3:putObject` and `s3:getObject` IAM permissions on the provided s3 bucket for the desired keys (e.g. `my-bucket-name/temporary-storage/*`). +The `publishMessage` method requires the `events:PutEvents` IAM permission on the provided event bus, as well as the `s3:putObject` and `s3:getObject` IAM permissions on the provided s3 bucket at the desired keys (e.g. `my-bucket-name/temporary-storage/*`). + +The `parseMessage` doesn't require any permission as the messageURL is pre-signed. diff --git a/packages/event-bridge-s3-message-bus-adapter/package.json b/packages/event-bridge-s3-message-bus-adapter/package.json index 77f955e6..131a5958 100644 --- a/packages/event-bridge-s3-message-bus-adapter/package.json +++ b/packages/event-bridge-s3-message-bus-adapter/package.json @@ -41,7 +41,8 @@ "@castore/event-bridge-message-bus-adapter": "workspace:", "@types/aws-lambda": "^8.10.111", "lodash.chunk": "^4.2.0", - "ts-toolbelt": "^9.6.0" + "ts-toolbelt": "^9.6.0", + "undici": "^5.23.0" }, "devDependencies": { "@aws-sdk/client-eventbridge": "^3.2.0", diff --git a/packages/event-bridge-s3-message-bus-adapter/src/adapter.ts b/packages/event-bridge-s3-message-bus-adapter/src/adapter.ts index 8c88b0ea..31ecb1b7 100644 --- a/packages/event-bridge-s3-message-bus-adapter/src/adapter.ts +++ b/packages/event-bridge-s3-message-bus-adapter/src/adapter.ts @@ -179,4 +179,20 @@ export class EventBridgeS3MessageBusAdapter implements MessageChannelAdapter { await this.eventBridgeMessageBusAdapter.publishEntries(entries); }; } + + set eventBusName(eventBusName: string | (() => string)) { + this.eventBridgeMessageBusAdapter.eventBusName = eventBusName; + } + + get eventBusName(): string { + return this.eventBridgeMessageBusAdapter.getEventBusName(); + } + + set eventBridgeClient(eventBridgeClient: EventBridgeClient) { + this.eventBridgeMessageBusAdapter.eventBridgeClient = eventBridgeClient; + } + + get eventBridgeClient(): EventBridgeClient { + return this.eventBridgeMessageBusAdapter.eventBridgeClient; + } } diff --git a/packages/event-bridge-s3-message-bus-adapter/src/fetch.d.ts b/packages/event-bridge-s3-message-bus-adapter/src/fetch.d.ts new file mode 100644 index 00000000..9b47eb24 --- /dev/null +++ b/packages/event-bridge-s3-message-bus-adapter/src/fetch.d.ts @@ -0,0 +1,43 @@ +import type { + FormData as _FormData, + Headers as _Headers, + HeadersInit as _HeadersInit, + BodyInit as _BodyInit, + Request as _Request, + RequestInit as _RequestInit, + RequestInfo as _RequestInfo, + RequestMode as _RequestMode, + RequestRedirect as _RequestRedirect, + RequestCredentials as _RequestCredentials, + RequestDestination as _RequestDestination, + ReferrerPolicy as _ReferrerPolicy, + Response as _Response, + ResponseInit as _ResponseInit, + ResponseType as _ResponseType, +} from 'undici'; + +declare global { + export const { + fetch, + FormData, + Headers, + Request, + Response, + }: typeof import('undici'); + + type FormData = _FormData; + type Headers = _Headers; + type HeadersInit = _HeadersInit; + type BodyInit = _BodyInit; + type Request = _Request; + type RequestInit = _RequestInit; + type RequestInfo = _RequestInfo; + type RequestMode = _RequestMode; + type RequestRedirect = _RequestRedirect; + type RequestCredentials = _RequestCredentials; + type RequestDestination = _RequestDestination; + type ReferrerPolicy = _ReferrerPolicy; + type Response = _Response; + type ResponseInit = _ResponseInit; + type ResponseType = _ResponseType; +} diff --git a/packages/event-bridge-s3-message-bus-adapter/src/index.ts b/packages/event-bridge-s3-message-bus-adapter/src/index.ts index 49bddae7..c2fe89ab 100644 --- a/packages/event-bridge-s3-message-bus-adapter/src/index.ts +++ b/packages/event-bridge-s3-message-bus-adapter/src/index.ts @@ -1,2 +1,3 @@ export { EventBridgeS3MessageBusAdapter } from './adapter'; export type { EventBridgeS3MessageBusMessage } from './message'; +export { parseMessage } from './parseMessage'; diff --git a/packages/event-bridge-s3-message-bus-adapter/src/message.ts b/packages/event-bridge-s3-message-bus-adapter/src/message.ts index 74bb6749..1e28f294 100644 --- a/packages/event-bridge-s3-message-bus-adapter/src/message.ts +++ b/packages/event-bridge-s3-message-bus-adapter/src/message.ts @@ -1,4 +1,4 @@ -import type { A, O } from 'ts-toolbelt'; +import type { EventBridgeEvent } from 'aws-lambda'; import type { AggregateExistsMessageBus, @@ -31,7 +31,11 @@ export type EventBridgeS3MessageBusMessage< EVENT_STORE_IDS, EVENT_TYPES > extends infer MESSAGE - ? MESSAGE extends object - ? O.Update + ? MESSAGE extends EventBridgeEvent + ? { + [KEY in keyof MESSAGE]: KEY extends 'detail' + ? MESSAGE[KEY] | OversizedEntryDetail + : MESSAGE[KEY]; + } : never : never; diff --git a/packages/event-bridge-s3-message-bus-adapter/src/parseMessage.ts b/packages/event-bridge-s3-message-bus-adapter/src/parseMessage.ts new file mode 100644 index 00000000..e0022300 --- /dev/null +++ b/packages/event-bridge-s3-message-bus-adapter/src/parseMessage.ts @@ -0,0 +1,39 @@ +import type { EventBridgeEvent } from 'aws-lambda'; + +import type { OversizedEntryDetail } from './message'; + +type ParsedMessage> = + MESSAGES extends infer MESSAGE + ? MESSAGE extends EventBridgeEvent + ? { + [KEY in keyof MESSAGE]: KEY extends 'detail' + ? Exclude + : MESSAGE[KEY]; + } + : never + : never; + +export const parseMessage = async < + MESSAGES extends EventBridgeEvent, +>( + _message: MESSAGES, + { fetch: _fetch = fetch }: { fetch?: typeof fetch } = { fetch }, +): Promise> => { + const message = _message as EventBridgeEvent; + const { detail: entry } = message; + + if (typeof entry === 'object' && entry !== null && 'messageUrl' in entry) { + const response = await _fetch((entry as OversizedEntryDetail).messageUrl); + + if (!response.ok) { + throw new Error(response.statusText); + } + + return { + ...message, + detail: await response.json(), + } as ParsedMessage; + } + + return message as ParsedMessage; +}; diff --git a/yarn.lock b/yarn.lock index 386279c0..012cba63 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6276,6 +6276,7 @@ __metadata: ts-toolbelt: ^9.6.0 ttypescript: ^1.5.13 typescript: ^4.6.3 + undici: ^5.23.0 vitest: ^0.26.2 peerDependencies: "@aws-sdk/client-eventbridge": ^3.0.0 @@ -12483,6 +12484,15 @@ __metadata: languageName: node linkType: hard +"busboy@npm:^1.6.0": + version: 1.6.0 + resolution: "busboy@npm:1.6.0" + dependencies: + streamsearch: ^1.1.0 + checksum: 32801e2c0164e12106bf236291a00795c3c4e4b709ae02132883fe8478ba2ae23743b11c5735a0aae8afe65ac4b6ca4568b91f0d9fed1fdbc32ede824a73746e + languageName: node + linkType: hard + "bytes@npm:3.0.0": version: 3.0.0 resolution: "bytes@npm:3.0.0" @@ -23922,6 +23932,13 @@ __metadata: languageName: node linkType: hard +"streamsearch@npm:^1.1.0": + version: 1.1.0 + resolution: "streamsearch@npm:1.1.0" + checksum: 1cce16cea8405d7a233d32ca5e00a00169cc0e19fbc02aa839959985f267335d435c07f96e5e0edd0eadc6d39c98d5435fb5bbbdefc62c41834eadc5622ad942 + languageName: node + linkType: hard + "string-argv@npm:^0.3.1": version: 0.3.1 resolution: "string-argv@npm:0.3.1" @@ -25057,6 +25074,15 @@ __metadata: languageName: node linkType: hard +"undici@npm:^5.23.0": + version: 5.23.0 + resolution: "undici@npm:5.23.0" + dependencies: + busboy: ^1.6.0 + checksum: 906ca4fb1d47163d2cee2ecbbc664a1d92508a2cdf1558146621109f525c983a83597910b36e6ba468240e95259be5939cea6babc99fc0c36360b16630f66784 + languageName: node + linkType: hard + "unherit@npm:^1.0.4": version: 1.1.3 resolution: "unherit@npm:1.1.3"