diff --git a/database/db.go b/database/db.go index c4502df..3eb2df7 100644 --- a/database/db.go +++ b/database/db.go @@ -691,67 +691,25 @@ func (db *DB) CreateIgnoreStreamed( ) } -func WithOnSuccessUpsert(onSuccess ...OnSuccess[Entity]) ExecOption { - return func(options *ExecOptions) { - options.onSuccess = onSuccess - } -} - -func WithStatement(stmt string, placeholders int) ExecOption { - return func(options *ExecOptions) { - options.stmt = stmt - options.placeholders = placeholders - } -} - -type ExecOption func(options *ExecOptions) - -type ExecOptions struct { - onSuccess []OnSuccess[Entity] - stmt string - placeholders int -} - -func NewExecOptions(execOpts ...ExecOption) *ExecOptions { - execOptions := &ExecOptions{} - - for _, option := range execOpts { - option(execOptions) - } - - return execOptions -} - // UpsertStreamed bulk upserts the specified entities via NamedBulkExec. // The upsert statement is created using BuildUpsertStmt with the first entity from the entities stream. // Bulk size is controlled via Options.MaxPlaceholdersPerStatement and // concurrency is controlled via Options.MaxConnectionsPerTable. // Entities for which the query ran successfully will be passed to onSuccess. func (db *DB) UpsertStreamed( - ctx context.Context, entities <-chan Entity, execOpts ...ExecOption, + ctx context.Context, entities <-chan Entity, onSuccess ...OnSuccess[Entity], ) error { - - execOptions := NewExecOptions(execOpts...) - first, forward, err := com.CopyFirst(ctx, entities) if err != nil { return errors.Wrap(err, "can't copy first entity") } sem := db.GetSemaphoreForTable(TableName(first)) - var stmt string - var placeholders int - - if execOptions.stmt != "" { - stmt = execOptions.stmt - placeholders = execOptions.placeholders - } else { - stmt, placeholders = db.BuildUpsertStmt(first) - } + stmt, placeholders := db.BuildUpsertStmt(first) return db.NamedBulkExec( ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, - forward, SplitOnDupId[Entity], execOptions.onSuccess..., + forward, SplitOnDupId[Entity], onSuccess..., ) } @@ -770,58 +728,17 @@ func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan Entity) error return db.NamedBulkExecTx(ctx, stmt, db.Options.MaxRowsPerTransaction, sem, forward) } -func WithOnSuccessDelete(onSuccess ...OnSuccess[any]) DeleteOption { - return func(options *DeleteOptions) { - options.onSuccess = onSuccess - } -} - -func ByColumn(column string) DeleteOption { - return func(options *DeleteOptions) { - options.column = column - } -} - -type DeleteOption func(options *DeleteOptions) - -type DeleteOptions struct { - onSuccess []OnSuccess[any] - column string -} - -func NewDeleteOptions(execOpts ...DeleteOption) *DeleteOptions { - deleteOptions := &DeleteOptions{} - - for _, option := range execOpts { - option(deleteOptions) - } - - return deleteOptions -} - // DeleteStreamed bulk deletes the specified ids via BulkExec. // The delete statement is created using BuildDeleteStmt with the passed entityType. // Bulk size is controlled via Options.MaxPlaceholdersPerStatement and // concurrency is controlled via Options.MaxConnectionsPerTable. // IDs for which the query ran successfully will be passed to onSuccess. func (db *DB) DeleteStreamed( - ctx context.Context, entityType Entity, ids <-chan interface{}, deleteOpts ...DeleteOption, + ctx context.Context, entityType Entity, ids <-chan interface{}, onSuccess ...OnSuccess[any], ) error { - - deleteOptions := NewDeleteOptions(deleteOpts...) - sem := db.GetSemaphoreForTable(TableName(entityType)) - - var stmt string - - if deleteOptions.column != "" { - stmt = fmt.Sprintf("DELETE FROM %s WHERE %s IN (?)", TableName(entityType), deleteOptions.column) - } else { - stmt = db.BuildDeleteStmt(entityType) - } - return db.BulkExec( - ctx, stmt, db.Options.MaxPlaceholdersPerStatement, sem, ids, deleteOptions.onSuccess..., + ctx, db.BuildDeleteStmt(entityType), db.Options.MaxPlaceholdersPerStatement, sem, ids, onSuccess..., ) } @@ -837,7 +754,7 @@ func (db *DB) Delete( } close(idsCh) - return db.DeleteStreamed(ctx, entityType, idsCh, WithOnSuccessDelete(onSuccess...)) + return db.DeleteStreamed(ctx, entityType, idsCh, onSuccess...) } func (db *DB) GetSemaphoreForTable(table string) *semaphore.Weighted { diff --git a/database/optionally.go b/database/optionally.go new file mode 100644 index 0000000..73f6a84 --- /dev/null +++ b/database/optionally.go @@ -0,0 +1,135 @@ +package database + +import ( + "context" + "fmt" + "github.com/icinga/icinga-go-library/com" + "github.com/pkg/errors" +) + +// Upsert inserts new rows into a table or updates rows of a table if the primary key already exists. +type Upsert interface { + // Stream bulk upserts the specified entities via NamedBulkExec. + // If not explicitly specified, the upsert statement is created using + // BuildUpsertStmt with the first entity from the entities stream. + Stream(ctx context.Context, entities <-chan Entity) error +} + +// UpsertOption is a functional option for NewUpsert. +type UpsertOption func(u *upsert) + +// WithOnUpsert adds callback(s) to bulk upserts. Entities for which the +// operation was performed successfully are passed to the callbacks. +func WithOnUpsert(onUpsert ...OnSuccess[Entity]) UpsertOption { + return func(u *upsert) { + u.onUpsert = onUpsert + } +} + +// WithStatement uses the specified statement for bulk upserts instead of automatically creating one. +func WithStatement(stmt string, placeholders int) UpsertOption { + return func(u *upsert) { + u.stmt = stmt + u.placeholders = placeholders + } +} + +// NewUpsert creates a new Upsert initalized with a database. +func NewUpsert(db *DB, options ...UpsertOption) Upsert { + u := &upsert{db: db} + + for _, option := range options { + option(u) + } + + return u +} + +type upsert struct { + db *DB + onUpsert []OnSuccess[Entity] + stmt string + placeholders int +} + +func (u *upsert) Stream(ctx context.Context, entities <-chan Entity) error { + first, forward, err := com.CopyFirst(ctx, entities) + if err != nil { + return errors.Wrap(err, "can't copy first entity") + } + + sem := u.db.GetSemaphoreForTable(TableName(first)) + var stmt string + var placeholders int + + if u.stmt != "" { + stmt = u.stmt + placeholders = u.placeholders + } else { + stmt, placeholders = u.db.BuildUpsertStmt(first) + } + + return u.db.NamedBulkExec( + ctx, stmt, u.db.BatchSizeByPlaceholders(placeholders), sem, + forward, SplitOnDupId[Entity], u.onUpsert..., + ) +} + +// Delete deletes rows of a table. +type Delete interface { + // Stream bulk deletes rows from the table specified in from using the given args stream via BulkExec. + // Unless explicitly specified, the DELETE statement is created using BuildDeleteStmt. + Stream(ctx context.Context, from any, args <-chan any) error +} + +// DeleteOption is a functional option for NewDelete. +type DeleteOption func(options *delete) + +// WithOnDelete adds callback(s) to bulk deletes. Arguments for which the +// operation was performed successfully are passed to the callbacks. +func WithOnDelete(onDelete ...OnSuccess[any]) DeleteOption { + return func(d *delete) { + d.onDelete = onDelete + } +} + +// ByColumn uses the given column for the WHERE clause that the rows must +// satisfy in order to be deleted, instead of automatically using ID. +func ByColumn(column string) DeleteOption { + return func(d *delete) { + d.column = column + } +} + +// NewDelete creates a new Delete initalized with a database. +func NewDelete(db *DB, options ...DeleteOption) Delete { + d := &delete{db: db} + + for _, option := range options { + option(d) + } + + return d +} + +type delete struct { + db *DB + column string + onDelete []OnSuccess[any] +} + +func (d *delete) Stream(ctx context.Context, from any, args <-chan any) error { + var stmt string + + if d.column != "" { + stmt = fmt.Sprintf(`DELETE FROM "%s" WHERE %s IN (?)`, TableName(from), d.column) + } else { + stmt = d.db.BuildDeleteStmt(from) + } + + sem := d.db.GetSemaphoreForTable(TableName(from)) + + return d.db.BulkExec( + ctx, stmt, d.db.Options.MaxPlaceholdersPerStatement, sem, args, d.onDelete..., + ) +}