From db40fe1c54d632b08c5d148bc77df1ec74cf1b0d Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Tue, 17 Sep 2024 11:14:55 +0200 Subject: [PATCH] Introduce `DB#RunInTx()` method --- database/db.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/database/db.go b/database/db.go index 99661cc..d3c7b3c 100644 --- a/database/db.go +++ b/database/db.go @@ -769,6 +769,30 @@ func (db *DB) Delete( return db.DeleteStreamed(ctx, entityType, idsCh, onSuccess...) } +// ExecTx allows running a function in a database transaction without requiring manual transaction handling. +// +// A new transaction is started on [DB] which is then passed to fn. After fn returns, the transaction is +// committed unless an error was returned. If fn returns an error, that error is returned or when failing +// to start or/and commit the transaction. +func (db *DB) ExecTx(ctx context.Context, fn func(context.Context, *sqlx.Tx) error) error { + tx, err := db.BeginTxx(ctx, nil) + if err != nil { + return errors.Wrap(err, "cannot start transaction") + } + // We don't expect meaningful errors from rolling back the tx other than the sql.ErrTxDone, so just ignore it. + defer func() { _ = tx.Rollback() }() + + if err := fn(ctx, tx); err != nil { + return errors.WithStack(err) + } + + if err := tx.Commit(); err != nil { + return errors.Wrap(err, "cannot commit transaction") + } + + return nil +} + func (db *DB) GetSemaphoreForTable(table string) *semaphore.Weighted { db.tableSemaphoresMu.Lock() defer db.tableSemaphoresMu.Unlock()