Skip to content

Commit

Permalink
refactor: inbound transport to use AgentMessageProcessedEvent
Browse files Browse the repository at this point in the history
Signed-off-by: Sai Ranjit Tummalapalli <[email protected]>
  • Loading branch information
sairanjit committed Sep 13, 2024
1 parent cc6d1ab commit 3281bc5
Show file tree
Hide file tree
Showing 4 changed files with 7,997 additions and 5,492 deletions.
1 change: 1 addition & 0 deletions packages/node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"@credo-ts/core": "workspace:*",
"@types/express": "^4.17.15",
"express": "^4.17.1",
"rxjs": "^7.8.0",
"ws": "^8.13.0"
},
"devDependencies": {
Expand Down
12 changes: 12 additions & 0 deletions packages/node/src/transport/HttpInboundTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import type {
EncryptedMessage,
AgentContext,
AgentMessageReceivedEvent,
AgentMessageProcessedEvent,
} from '@credo-ts/core'
import type { Express, Request, Response } from 'express'
import type { Server } from 'http'

import { DidCommMimeType, CredoError, TransportService, utils, AgentEventTypes } from '@credo-ts/core'
import express, { text } from 'express'
import { filter, first, firstValueFrom } from 'rxjs'

const supportedContentTypes: string[] = [DidCommMimeType.V0, DidCommMimeType.V1]

Expand Down Expand Up @@ -66,6 +68,16 @@ export class HttpInboundTransport implements InboundTransport {
},
})

// Wait for message to be processed
await firstValueFrom(
agent.events.observable<AgentMessageProcessedEvent>(AgentEventTypes.AgentMessageProcessed).pipe(
filter((e) => e.type === AgentEventTypes.AgentMessageProcessed),
filter((e) => e.payload.message.id === encryptedMessage.id),
filter((e) => e.payload.message.type === encryptedMessage.type),
first()
)
)

// If agent did not use session when processing message we need to send response here.
if (!res.headersSent) {
res.status(200).end()
Expand Down
15 changes: 14 additions & 1 deletion packages/node/src/transport/WsInboundTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import type {
EncryptedMessage,
AgentContext,
AgentMessageReceivedEvent,
AgentMessageProcessedEvent,
} from '@credo-ts/core'

import { CredoError, TransportService, utils, AgentEventTypes } from '@credo-ts/core'
import { filter, first, firstValueFrom } from 'rxjs'
// eslint-disable-next-line import/no-named-as-default
import WebSocket, { Server } from 'ws'

Expand Down Expand Up @@ -70,13 +72,24 @@ export class WsInboundTransport implements InboundTransport {
socket.addEventListener('message', async (event: any) => {
this.logger.debug('WebSocket message event received.', { url: event.target.url })
try {
const encryptedMessage = JSON.parse(event.data)
agent.events.emit<AgentMessageReceivedEvent>(agent.context, {
type: AgentEventTypes.AgentMessageReceived,
payload: {
message: JSON.parse(event.data),
message: encryptedMessage,
session: session,
},
})

// Wait for message to be processed
await firstValueFrom(
agent.events.observable<AgentMessageProcessedEvent>(AgentEventTypes.AgentMessageProcessed).pipe(
filter((e) => e.type === AgentEventTypes.AgentMessageProcessed),
filter((e) => e.payload.message.id === encryptedMessage.id),
filter((e) => e.payload.message.type === encryptedMessage.type),
first()
)
)
} catch (error) {
this.logger.error(`Error processing message: ${error}`)
}
Expand Down
Loading

0 comments on commit 3281bc5

Please sign in to comment.