Skip to content

Commit

Permalink
Handle broadcast in pipeline (#5732)
Browse files Browse the repository at this point in the history
Signed-off-by: Denis Bykhov <[email protected]>
  • Loading branch information
BykhovDenis committed Jun 4, 2024
1 parent fd83bf3 commit 223479e
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 9 deletions.
19 changes: 18 additions & 1 deletion server/core/src/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,27 @@ export async function createPipeline (
upgrade: boolean,
broadcast: BroadcastFunc
): Promise<Pipeline> {
const broadcastHandlers: BroadcastFunc[] = [broadcast]
const _broadcast: BroadcastFunc = (
tx: Tx[],
targets: string | string[] | undefined,
exclude: string[] | undefined
) => {
for (const handler of broadcastHandlers) handler(tx, targets, exclude)
}
const storage = await ctx.with(
'create-server-storage',
{},
async (ctx) =>
await createServerStorage(ctx, conf, {
upgrade,
broadcast
broadcast: _broadcast
})
)
const pipelineResult = await PipelineImpl.create(ctx.newChild('pipeline-operations', {}), storage, constructors)
broadcastHandlers.push((tx: Tx[], targets: string | string[] | undefined, exclude: string[] | undefined) => {
void pipelineResult.handleBroadcast(tx, targets, exclude)
})
return pipelineResult
}

Expand Down Expand Up @@ -115,6 +126,12 @@ class PipelineImpl implements Pipeline {
}
}

async handleBroadcast (tx: Tx[], targets?: string | string[], exclude?: string[]): Promise<void> {
if (this.head !== undefined) {
await this.head.handleBroadcast(tx, targets, exclude)
}
}

async close (): Promise<void> {
await this.storage.close()
}
Expand Down
4 changes: 3 additions & 1 deletion server/core/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,19 @@ export interface Middleware {
query: DocumentQuery<T>,
options?: FindOptions<T>
) => Promise<FindResult<T>>
handleBroadcast: HandleBroadcastFunc
searchFulltext: (ctx: SessionContext, query: SearchQuery, options: SearchOptions) => Promise<SearchResult>
}

/**
* @public
*/
export type BroadcastFunc = (tx: Tx[], targets?: string | string[], exclude?: string[]) => void

/**
* @public
*/
export type HandledBroadcastFunc = (tx: Tx[], targets?: string[]) => Tx[]
export type HandleBroadcastFunc = (tx: Tx[], targets?: string | string[], exclude?: string[]) => Promise<void>

/**
* @public
Expand Down
6 changes: 6 additions & 0 deletions server/middleware/src/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ export abstract class BaseMiddleware {
return await this.provideSearchFulltext(ctx, query, options)
}

async handleBroadcast (tx: Tx[], targets?: string | string[], exclude?: string[]): Promise<void> {
if (this.next !== undefined) {
await this.next.handleBroadcast(tx, targets, exclude)
}
}

protected async provideTx (ctx: SessionContext, tx: Tx): Promise<TxMiddlewareResult> {
if (this.next !== undefined) {
return await this.next.tx(ctx, tx)
Expand Down
34 changes: 27 additions & 7 deletions server/middleware/src/spaceSecurity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ export class SpaceSecurityMiddleware extends BaseMiddleware implements Middlewar
return res
}

private resyncDomains (): void {
this.spaceSecurityInit = this.init(this.spaceMeasureCtx)
}

private addMemberSpace (member: Ref<Account>, space: Ref<Space>): void {
const arr = this.allowedSpaces[member] ?? []
arr.push(space)
Expand Down Expand Up @@ -393,6 +397,11 @@ export class SpaceSecurityMiddleware extends BaseMiddleware implements Middlewar
await this.brodcastEvent(ctx, [cud.objectId])
}
}
} else if (tx._class === core.class.TxWorkspaceEvent) {
const event = tx as TxWorkspaceEvent
if (event.event === WorkspaceEvent.BulkUpdate) {
this.resyncDomains()
}
}
}

Expand All @@ -414,16 +423,27 @@ export class SpaceSecurityMiddleware extends BaseMiddleware implements Middlewar
it.target = mergeTargets(targets, it.target)
})

await this.waitInit()
for (const tt of ctx.derived) {
for (const t of tt.derived) {
if (this.storage.hierarchy.isDerived(t._class, core.class.TxCUD)) {
await this.processTxSpaceDomain(t as TxCUD<Doc>)
return res
}

override async handleBroadcast (
txes: Tx[],
targets?: string | string[] | undefined,
exclude?: string[] | undefined
): Promise<void> {
for (const tx of txes) {
const h = this.storage.hierarchy
if (h.isDerived(tx._class, core.class.TxCUD)) {
const cudTx = tx as TxCUD<Doc>
await this.processTxSpaceDomain(cudTx)
} else if (tx._class === core.class.TxWorkspaceEvent) {
const event = tx as TxWorkspaceEvent
if (event.event === WorkspaceEvent.BulkUpdate) {
this.resyncDomains()
}
}
}

return res
await this.next?.handleBroadcast(txes, targets, exclude)
}

private getAllAllowedSpaces (account: Account, isData: boolean): Ref<Space>[] {
Expand Down

0 comments on commit 223479e

Please sign in to comment.