From 5d6c33249de21e26b3fe16db2befb4738c3d129e Mon Sep 17 00:00:00 2001 From: Denis Bykhov Date: Thu, 4 Apr 2024 11:02:17 +0500 Subject: [PATCH] Change tx notify logic (#5172) Signed-off-by: Denis Bykhov --- packages/core/src/client.ts | 12 ++++++++---- packages/query/src/__tests__/connection.ts | 2 +- server/ws/src/client.ts | 11 +++-------- 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/packages/core/src/client.ts b/packages/core/src/client.ts index 8c0e600a0f..559584a64f 100644 --- a/packages/core/src/client.ts +++ b/packages/core/src/client.ts @@ -156,7 +156,6 @@ class ClientImpl implements AccountClient, BackupClient, MeasureClient { // We need to handle it on server, before performing local live query updates. const result = await this.conn.tx(tx) - this.notify?.(tx) return result } @@ -166,9 +165,14 @@ class ClientImpl implements AccountClient, BackupClient, MeasureClient { async updateFromRemote (...tx: Tx[]): Promise { for (const t of tx) { - if (t.objectSpace === core.space.Model) { - this.hierarchy.tx(t) - await this.model.tx(t) + try { + if (t.objectSpace === core.space.Model) { + this.hierarchy.tx(t) + await this.model.tx(t) + } + } catch (err) { + console.error('failed to apply model transaction, skipping', t) + continue } } this.notify?.(...tx) diff --git a/packages/query/src/__tests__/connection.ts b/packages/query/src/__tests__/connection.ts index d065475ce7..15cf96f98a 100644 --- a/packages/query/src/__tests__/connection.ts +++ b/packages/query/src/__tests__/connection.ts @@ -76,7 +76,7 @@ FulltextStorage & { } await Promise.all([model.tx(tx), transactions.tx(tx)]) // Not required, since handled in client. - // handler(tx) + handler(tx) return {} }, close: async () => {}, diff --git a/server/ws/src/client.ts b/server/ws/src/client.ts index b90b530b43..82b9146c90 100644 --- a/server/ws/src/client.ts +++ b/server/ws/src/client.ts @@ -156,7 +156,7 @@ export class ClientSession implements Session { } if (tx._class !== core.class.TxApplyIf) { - this.broadcast(this, this.token.workspace, { result: tx }, target) + this.broadcast(null, this.token.workspace, { result: tx }, target) } if (shouldBroadcast) { if (this.useBroadcast) { @@ -176,7 +176,7 @@ export class ClientSession implements Session { if (tx._class === core.class.TxApplyIf) { ;(result as TxApplyResult).derived.push(bevent) } - this.broadcast(this, this.token.workspace, { result: bevent }, target) + this.broadcast(null, this.token.workspace, { result: bevent }, target) } else { if (tx._class === core.class.TxApplyIf) { ;(result as TxApplyResult).derived.push(...derived) @@ -184,12 +184,7 @@ export class ClientSession implements Session { while (derived.length > 0) { const part = derived.splice(0, 250) console.log('Broadcasting part', part.length, derived.length) - this.broadcast( - tx._class === core.class.TxApplyIf ? this : null, - this.token.workspace, - { result: part }, - target - ) + this.broadcast(null, this.token.workspace, { result: part }, target) } } } else {