Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db extensions #646

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions token/sdk/dig/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/hyperledger-labs/fabric-token-sdk/token/services/db"
dbdriver "github.com/hyperledger-labs/fabric-token-sdk/token/services/db/driver"
"github.com/hyperledger-labs/fabric-token-sdk/token/services/db/sql/driver/unity"
"github.com/hyperledger-labs/fabric-token-sdk/token/services/db/sql/ext"
"github.com/hyperledger-labs/fabric-token-sdk/token/services/identitydb"
"github.com/hyperledger-labs/fabric-token-sdk/token/services/network"
"github.com/hyperledger-labs/fabric-token-sdk/token/services/network/driver"
Expand Down Expand Up @@ -86,8 +87,11 @@ type DBDriverResult struct {
IdentityDBDriver db.NamedDriver[dbdriver.IdentityDBDriver] `group:"identitydb-drivers"`
}

func NewDBDrivers() DBDriverResult {
ttxDBDriver, tokenDBDriver, tokenLockDBDriver, auditDBDriver, identityDBDriver := unity.NewDBDrivers()
func NewDBDrivers(in struct {
dig.In
TokenDBExtensions []ext.Factory[ext.TokenDBExtension] `group:"tokendb-extensions"`
}) DBDriverResult {
ttxDBDriver, tokenDBDriver, tokenLockDBDriver, auditDBDriver, identityDBDriver := unity.NewDBDrivers(in.TokenDBExtensions)
return DBDriverResult{
TTXDBDriver: ttxDBDriver,
TokenDBDriver: tokenDBDriver,
Expand Down
2 changes: 2 additions & 0 deletions token/sdk/dig/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
kvs2 "github.com/hyperledger-labs/fabric-token-sdk/token/services/identity/kvs"
"github.com/hyperledger-labs/fabric-token-sdk/token/services/identitydb"
"github.com/hyperledger-labs/fabric-token-sdk/token/services/interop/htlc"
"github.com/hyperledger-labs/fabric-token-sdk/token/services/interop/htlc/db/sql"
logging2 "github.com/hyperledger-labs/fabric-token-sdk/token/services/logging"
"github.com/hyperledger-labs/fabric-token-sdk/token/services/network"
_ "github.com/hyperledger-labs/fabric-token-sdk/token/services/network/fabric"
Expand Down Expand Up @@ -118,6 +119,7 @@ func (p *SDK) Install() error {
p.Container().Provide(digutils.Identity[*auditdb.Manager](), dig.As(new(auditor.AuditDBProvider))),
p.Container().Provide(NewIdentityDBManager),
p.Container().Provide(NewTokenLockDBManager),
p.Container().Provide(sql.NewHTLCTokenDBExtensionFactory, dig.Group("tokendb-extensions")),
p.Container().Provide(digutils.Identity[*kvs.KVS](), dig.As(new(kvs2.KVS))),
p.Container().Provide(identity.NewDBStorageProvider),
p.Container().Provide(auditor.NewManager),
Expand Down
2 changes: 2 additions & 0 deletions token/services/db/driver/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
package driver

import (
"database/sql"
"errors"
"time"

Expand Down Expand Up @@ -105,6 +106,7 @@ type CertificationDB interface {
}

type TokenDBTransaction interface {
Tx() *sql.Tx
// GetToken returns the owned tokens and their identifier keys for the passed ids.
GetToken(txID string, index uint64, includeDeleted bool) (*token.Token, []string, error)
// Delete marks the passed token as deleted by a given identifier (idempotent)
Expand Down
27 changes: 21 additions & 6 deletions token/services/db/sql/driver/unity/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@
package unity

import (
"database/sql"

"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils"
"github.com/hyperledger-labs/fabric-token-sdk/token"
"github.com/hyperledger-labs/fabric-token-sdk/token/services/db"
dbdriver "github.com/hyperledger-labs/fabric-token-sdk/token/services/db/driver"
sqldb "github.com/hyperledger-labs/fabric-token-sdk/token/services/db/sql"
"github.com/hyperledger-labs/fabric-token-sdk/token/services/db/sql/ext"
"github.com/pkg/errors"
)

Expand All @@ -22,12 +25,14 @@
)

type Driver struct {
DBOpener *sqldb.DBOpener
DBOpener *sqldb.DBOpener
TokenDBExtensions []ext.Factory[ext.TokenDBExtension] `optional=true`

Check failure on line 29 in token/services/db/sql/driver/unity/driver.go

View workflow job for this annotation

GitHub Actions / lint

structtag: struct field tag `optional=true` not compatible with reflect.StructTag.Get: bad syntax for struct tag pair (govet)
}

func NewDriver() *Driver {
func NewDriver(tokenDBExtensions []ext.Factory[ext.TokenDBExtension]) *Driver {
return &Driver{
DBOpener: sqldb.NewSQLDBOpener(optsKey, envVarKey),
DBOpener: sqldb.NewSQLDBOpener(optsKey, envVarKey),
TokenDBExtensions: tokenDBExtensions,
}
}

Expand All @@ -36,7 +41,17 @@
}

func (d *Driver) OpenTokenDB(cp dbdriver.ConfigProvider, tmsID token.TMSID) (dbdriver.TokenDB, error) {
return openDB(d.DBOpener, cp, tmsID, sqldb.NewTokenDB)
return openDB(d.DBOpener, cp, tmsID, func(db *sql.DB, tablePrefix string, createSchema bool) (dbdriver.TokenDB, error) {
extensions := make([]ext.TokenDBExtension, len(d.TokenDBExtensions))
for i, factory := range d.TokenDBExtensions {
ext, err := factory.NewExtension(tablePrefix)
if err != nil {
return nil, errors.WithMessagef(err, "failed creating token DB extension [%d]", i)
}
extensions = append(extensions, ext)
}
return sqldb.NewTokenDB(db, tablePrefix, createSchema, extensions...)
})
}

func (d *Driver) OpenTokenLockDB(cp dbdriver.ConfigProvider, tmsID token.TMSID) (dbdriver.TokenLockDB, error) {
Expand Down Expand Up @@ -107,8 +122,8 @@
return t.Driver.OpenIdentityDB(cp, tmsID)
}

func NewDBDrivers() (db.NamedDriver[dbdriver.TTXDBDriver], db.NamedDriver[dbdriver.TokenDBDriver], db.NamedDriver[dbdriver.TokenLockDBDriver], db.NamedDriver[dbdriver.AuditDBDriver], db.NamedDriver[dbdriver.IdentityDBDriver]) {
root := NewDriver()
func NewDBDrivers(tokenDBExtensions []ext.Factory[ext.TokenDBExtension]) (db.NamedDriver[dbdriver.TTXDBDriver], db.NamedDriver[dbdriver.TokenDBDriver], db.NamedDriver[dbdriver.TokenLockDBDriver], db.NamedDriver[dbdriver.AuditDBDriver], db.NamedDriver[dbdriver.IdentityDBDriver]) {
root := NewDriver(tokenDBExtensions)
return db.NamedDriver[dbdriver.TTXDBDriver]{Name: "unity", Driver: &TtxDBDriver{Driver: root}},
db.NamedDriver[dbdriver.TokenDBDriver]{Name: "unity", Driver: &TokenDBDriver{Driver: root}},
db.NamedDriver[dbdriver.TokenLockDBDriver]{Name: "unity", Driver: &TokenLockDBDriver{Driver: root}},
Expand Down
29 changes: 29 additions & 0 deletions token/services/db/sql/ext/driver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package ext

import (
"database/sql"

"github.com/hyperledger-labs/fabric-token-sdk/token/services/db/driver"
"github.com/hyperledger-labs/fabric-token-sdk/token/token"
)

type Extension interface {
GetSchema(tablePrefix string) string
}

type TokenDBExtension interface {
Extension
Delete(tx *sql.Tx, txID string, index uint64, deletedBy string) error
StoreToken(tx *sql.Tx, tr driver.TokenRecord, owners []string) error
DeleteTokens(tx *sql.Tx, deletedBy string, ids ...*token.ID) error
}

type Factory[T Extension] interface {
NewExtension(prefix string) (T, error)
}
4 changes: 2 additions & 2 deletions token/services/db/sql/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func NewIdentityDB(db *sql.DB, tablePrefix string, createSchema bool, signerInfo
Signers: tables.Signers,
}, signerInfoCache)
if createSchema {
if err = initSchema(db, identityDB.GetSchema()); err != nil {
if err = initSchema(db, identityDB.GetSchema(tablePrefix)); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -267,7 +267,7 @@ func (w *IdentityConfigurationIterator) Next() (driver.IdentityConfiguration, er
return c, err
}

func (db *IdentityDB) GetSchema() string {
func (db *IdentityDB) GetSchema(string) string {
return fmt.Sprintf(`
-- IdentityConfigurations
CREATE TABLE IF NOT EXISTS %s (
Expand Down
4 changes: 2 additions & 2 deletions token/services/db/sql/tokenlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewTokenLockDB(db *sql.DB, tablePrefix string, createSchema bool) (driver.T

identityDB := newTokenLockDB(db, tokenLockTables{TokenLocks: tables.TokenLocks})
if createSchema {
if err = initSchema(db, identityDB.GetSchema()); err != nil {
if err = initSchema(db, identityDB.GetSchema(tablePrefix)); err != nil {
return nil, err
}
}
Expand All @@ -64,7 +64,7 @@ func (db *TokenLockDB) UnlockByTxID(consumerTxID transaction.ID) error {
return err
}

func (db *TokenLockDB) GetSchema() string {
func (db *TokenLockDB) GetSchema(string) string {
return fmt.Sprintf(`
-- TokenLocks
CREATE TABLE IF NOT EXISTS %s (
Expand Down
63 changes: 53 additions & 10 deletions token/services/db/sql/tokens.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

tdriver "github.com/hyperledger-labs/fabric-token-sdk/token/driver"
"github.com/hyperledger-labs/fabric-token-sdk/token/services/db/driver"
"github.com/hyperledger-labs/fabric-token-sdk/token/services/db/sql/ext"
"github.com/hyperledger-labs/fabric-token-sdk/token/services/network/common/rws/keys"
"github.com/hyperledger-labs/fabric-token-sdk/token/token"
"github.com/pkg/errors"
Expand All @@ -28,18 +29,20 @@ type tokenTables struct {
}

type TokenDB struct {
db *sql.DB
table tokenTables
db *sql.DB
table tokenTables
extensions []ext.TokenDBExtension
}

func newTokenDB(db *sql.DB, tables tokenTables) *TokenDB {
func newTokenDB(db *sql.DB, tables tokenTables, extensions ...ext.TokenDBExtension) *TokenDB {
return &TokenDB{
db: db,
table: tables,
db: db,
table: tables,
extensions: extensions,
}
}

func NewTokenDB(db *sql.DB, tablePrefix string, createSchema bool) (driver.TokenDB, error) {
func NewTokenDB(db *sql.DB, tablePrefix string, createSchema bool, extensions ...ext.TokenDBExtension) (driver.TokenDB, error) {
tables, err := getTableNames(tablePrefix)
if err != nil {
return nil, errors.Wrapf(err, "failed to get table names")
Expand All @@ -50,11 +53,16 @@ func NewTokenDB(db *sql.DB, tablePrefix string, createSchema bool) (driver.Token
Ownership: tables.Ownership,
PublicParams: tables.PublicParams,
Certifications: tables.Certifications,
})
}, extensions...)
if createSchema {
if err = initSchema(db, tokenDB.GetSchema()); err != nil {
if err = initSchema(db, tokenDB.GetSchema(tablePrefix)); err != nil {
return nil, err
}
for _, extension := range tokenDB.extensions {
if err = initSchema(db, extension.GetSchema(tablePrefix)); err != nil {
return nil, err
}
}
}
return tokenDB, nil
}
Expand All @@ -70,6 +78,14 @@ func (db *TokenDB) StoreToken(tr driver.TokenRecord, owners []string) (err error
}
return
}
for _, extension := range db.extensions {
if err := extension.StoreToken(tx.Tx(), tr, owners); err != nil {
if err1 := tx.Rollback(); err1 != nil {
logger.Errorf("error rolling back: %s", err1.Error())
}
return err
}
}
if err = tx.Commit(); err != nil {
return
}
Expand All @@ -85,11 +101,29 @@ func (db *TokenDB) DeleteTokens(deletedBy string, ids ...*token.ID) error {
args := []interface{}{deletedBy, time.Now().UTC()}
where := whereTokenIDs(&args, ids)

tx, err := db.NewTokenDBTransaction()
if err != nil {
return err
}
query := fmt.Sprintf("UPDATE %s SET is_deleted = true, spent_by = $1, spent_at = $2 WHERE %s", db.table.Tokens, where)
logger.Debug(query, args)
if _, err := db.db.Exec(query, args...); err != nil {
if _, err := tx.Tx().Exec(query, args...); err != nil {
if err1 := tx.Rollback(); err1 != nil {
logger.Errorf("error rolling back: %s", err1.Error())
}
return errors.Wrapf(err, "error setting tokens to deleted [%v]", ids)
}
for _, extension := range db.extensions {
if err := extension.DeleteTokens(tx.Tx(), deletedBy, ids...); err != nil {
if err1 := tx.Rollback(); err1 != nil {
logger.Errorf("error rolling back: %s", err1.Error())
}
return errors.Wrapf(err, "error committing extension tokens to deleted [%v]", ids)
}
}
if err = tx.Commit(); err != nil {
return errors.Wrapf(err, "error committing tokens to deleted [%v]", ids)
}
return nil
}

Expand Down Expand Up @@ -721,7 +755,7 @@ func (db *TokenDB) GetCertifications(ids []*token.ID) ([][]byte, error) {
return certifications, nil
}

func (db *TokenDB) GetSchema() string {
func (db *TokenDB) GetSchema(string) string {
return fmt.Sprintf(`
-- Tokens
CREATE TABLE IF NOT EXISTS %s (
Expand Down Expand Up @@ -799,6 +833,10 @@ type TokenTransaction struct {
tx *sql.Tx
}

func (t *TokenTransaction) Tx() *sql.Tx {
return t.tx
}

func (t *TokenTransaction) GetToken(txID string, index uint64, includeDeleted bool) (*token.Token, []string, error) {
where, join, args := tokenQuerySql(driver.QueryTokenDetailsParams{
IDs: []*token.ID{{TxId: txID, Index: index}},
Expand Down Expand Up @@ -850,6 +888,11 @@ func (t *TokenTransaction) Delete(txID string, index uint64, deletedBy string) e
if _, err := t.tx.Exec(query, deletedBy, now, txID, index); err != nil {
return errors.Wrapf(err, "error setting token to deleted [%s]", txID)
}
for _, extension := range t.db.extensions {
if err := extension.Delete(t.tx, txID, index, deletedBy); err != nil {
return errors.Wrapf(err, "error setting token to deleted [%s]", txID)
}
}
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions token/services/db/sql/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewTransactionDB(db *sql.DB, tablePrefix string, createSchema bool) (driver
TransactionEndorseAck: tables.TransactionEndorseAck,
})
if createSchema {
if err = initSchema(db, transactionsDB.GetSchema()); err != nil {
if err = initSchema(db, transactionsDB.GetSchema(tablePrefix)); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -253,7 +253,7 @@ func (db *TransactionDB) SetStatus(txID string, status driver.TxStatus, message
return
}

func (db *TransactionDB) GetSchema() string {
func (db *TransactionDB) GetSchema(string) string {
return fmt.Sprintf(`
-- requests
CREATE TABLE IF NOT EXISTS %s (
Expand Down
4 changes: 2 additions & 2 deletions token/services/db/sql/wallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func NewWalletDB(db *sql.DB, tablePrefix string, createSchema bool) (driver.Wall

walletDB := newWalletDB(db, walletTables{Wallets: tables.Wallets})
if createSchema {
if err = initSchema(db, walletDB.GetSchema()); err != nil {
if err = initSchema(db, walletDB.GetSchema(tablePrefix)); err != nil {
return nil, errors.Wrapf(err, "failed to create schema")
}
}
Expand Down Expand Up @@ -127,7 +127,7 @@ func (db *WalletDB) IdentityExists(identity token.Identity, wID driver.WalletID,
return result != ""
}

func (db *WalletDB) GetSchema() string {
func (db *WalletDB) GetSchema(string) string {
return fmt.Sprintf(`
-- Wallets
CREATE TABLE IF NOT EXISTS %s (
Expand Down
Loading
Loading