Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: one (*sql.DB) pool for all jobsdb #5170

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,21 @@
FeaturesRetryMaxAttempts: 10,
})

dbPool, err := misc.GetDatabaseConnectionPool(ctx, config, statsFactory, "jobsdb")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
dbPool, err := misc.GetDatabaseConnectionPool(ctx, config, statsFactory, "jobsdb")
dbPool, err := misc.GetDatabaseConnectionPool(ctx, config, statsFactory, "embedded-app")

I would avoid using jobsdb for anything else other than jobsdb component

if err != nil {
return err

Check warning on line 144 in app/apphandlers/embeddedAppHandler.go

View check run for this annotation

Codecov / codecov/patch

app/apphandlers/embeddedAppHandler.go#L144

Added line #L144 was not covered by tests
}
// Not deferring close here
// defer dbPool.Close()
Comment on lines +146 to +147
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What the reason for not closing here? Shall we inject dbPool to embeddedAppHandler. So we always use on connection pool


// This separate gateway db is created just to be used with gateway because in case of degraded mode,
// the earlier created gwDb (which was created to be used mainly with processor) will not be running, and it
// will cause issues for gateway because gateway is supposed to receive jobs even in degraded mode.
gatewayDB := jobsdb.NewForWrite(
"gw",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
)
if err = gatewayDB.Start(); err != nil {
return fmt.Errorf("could not start gateway: %w", err)
Expand All @@ -160,24 +168,24 @@
jobsdb.WithDSLimit(a.config.gatewayDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
)
defer gwDBForProcessor.Close()
routerDB := jobsdb.NewForReadWrite(
"rt",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(a.config.routerDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Router.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
)
defer routerDB.Close()
batchRouterDB := jobsdb.NewForReadWrite(
"batch_rt",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(a.config.batchRouterDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("BatchRouter.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
)
defer batchRouterDB.Close()

// We need two errorDBs, one in read & one in write mode to support separate gateway to store failures
errDBForRead := jobsdb.NewForRead(
Expand All @@ -186,13 +194,14 @@
jobsdb.WithDSLimit(a.config.processorDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
)
defer errDBForRead.Close()
errDBForWrite := jobsdb.NewForWrite(
"proc_error",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", true)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
)
if err = errDBForWrite.Start(); err != nil {
return fmt.Errorf("could not start errDBForWrite: %w", err)
Expand All @@ -205,8 +214,8 @@
jobsdb.WithDSLimit(a.config.processorDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
)
defer schemaDB.Close()

archivalDB := jobsdb.NewForReadWrite(
"arc",
Expand All @@ -219,8 +228,8 @@
return config.GetDuration("archival.jobRetention", 24, time.Hour)
},
),
jobsdb.WithDBHandle(dbPool),
)
defer archivalDB.Close()

var schemaForwarder schema_forwarder.Forwarder
if config.GetBool("EventSchemas2.enabled", false) {
Expand Down
11 changes: 9 additions & 2 deletions app/apphandlers/gatewayAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"github.com/rudderlabs/rudder-server/jobsdb"
sourcedebugger "github.com/rudderlabs/rudder-server/services/debugger/source"
"github.com/rudderlabs/rudder-server/services/transformer"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types/deployment"
)

Expand Down Expand Up @@ -68,14 +69,20 @@
}
defer sourceHandle.Stop()

dbPool, err := misc.GetDatabaseConnectionPool(ctx, config, statsFactory, "jobsdb")
if err != nil {
return err

Check warning on line 74 in app/apphandlers/gatewayAppHandler.go

View check run for this annotation

Codecov / codecov/patch

app/apphandlers/gatewayAppHandler.go#L74

Added line #L74 was not covered by tests
}
defer dbPool.Close()

gatewayDB := jobsdb.NewForWrite(
"gw",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(a.config.gatewayDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
)
defer gatewayDB.Close()

if err := gatewayDB.Start(); err != nil {
return fmt.Errorf("could not start gatewayDB: %w", err)
Expand All @@ -87,8 +94,8 @@
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
)
defer errDB.Close()

if err := errDB.Start(); err != nil {
return fmt.Errorf("could not start errDB: %w", err)
Expand Down
21 changes: 15 additions & 6 deletions app/apphandlers/processorAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,43 +145,52 @@
FeaturesRetryMaxAttempts: 10,
})

dbPool, err := misc.GetDatabaseConnectionPool(ctx, config, statsFactory, "jobsdb")
if err != nil {
return err

Check warning on line 150 in app/apphandlers/processorAppHandler.go

View check run for this annotation

Codecov / codecov/patch

app/apphandlers/processorAppHandler.go#L148-L150

Added lines #L148 - L150 were not covered by tests
}
// This is idompotent, so it's safe to call it multiple times(jobsdb.Close())
// can be cleaned up later
defer dbPool.Close()

Check warning on line 154 in app/apphandlers/processorAppHandler.go

View check run for this annotation

Codecov / codecov/patch

app/apphandlers/processorAppHandler.go#L154

Added line #L154 was not covered by tests

gwDBForProcessor := jobsdb.NewForRead(
"gw",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(a.config.gatewayDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),

Check warning on line 162 in app/apphandlers/processorAppHandler.go

View check run for this annotation

Codecov / codecov/patch

app/apphandlers/processorAppHandler.go#L162

Added line #L162 was not covered by tests
)
defer gwDBForProcessor.Close()
routerDB := jobsdb.NewForReadWrite(
"rt",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(a.config.routerDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Router.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),

Check warning on line 170 in app/apphandlers/processorAppHandler.go

View check run for this annotation

Codecov / codecov/patch

app/apphandlers/processorAppHandler.go#L170

Added line #L170 was not covered by tests
)
defer routerDB.Close()
batchRouterDB := jobsdb.NewForReadWrite(
"batch_rt",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(a.config.batchRouterDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("BatchRouter.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),

Check warning on line 178 in app/apphandlers/processorAppHandler.go

View check run for this annotation

Codecov / codecov/patch

app/apphandlers/processorAppHandler.go#L178

Added line #L178 was not covered by tests
)
defer batchRouterDB.Close()
errDBForRead := jobsdb.NewForRead(
"proc_error",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(a.config.processorDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),

Check warning on line 186 in app/apphandlers/processorAppHandler.go

View check run for this annotation

Codecov / codecov/patch

app/apphandlers/processorAppHandler.go#L186

Added line #L186 was not covered by tests
)
defer errDBForRead.Close()
errDBForWrite := jobsdb.NewForWrite(
"proc_error",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", true)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),

Check warning on line 193 in app/apphandlers/processorAppHandler.go

View check run for this annotation

Codecov / codecov/patch

app/apphandlers/processorAppHandler.go#L193

Added line #L193 was not covered by tests
)
if err = errDBForWrite.Start(); err != nil {
return fmt.Errorf("could not start errDBForWrite: %w", err)
Expand All @@ -192,8 +201,8 @@
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(a.config.processorDSLimit),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),

Check warning on line 204 in app/apphandlers/processorAppHandler.go

View check run for this annotation

Codecov / codecov/patch

app/apphandlers/processorAppHandler.go#L204

Added line #L204 was not covered by tests
)
defer schemaDB.Close()

archivalDB := jobsdb.NewForReadWrite(
"arc",
Expand All @@ -206,8 +215,8 @@
return config.GetDuration("archival.jobRetention", 24, time.Hour)
},
),
jobsdb.WithDBHandle(dbPool),
)
defer archivalDB.Close()

var schemaForwarder schema_forwarder.Forwarder
if config.GetBool("EventSchemas2.enabled", false) {
Expand Down
38 changes: 19 additions & 19 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,29 +783,29 @@
),
),
)
}

var maxConns int
if !jd.conf.enableReaderQueue || !jd.conf.enableWriterQueue {
maxConns = jd.conf.maxOpenConnections
} else {
maxConns = 2 // buffer
maxConns += jd.conf.maxReaders + jd.conf.maxWriters
switch jd.ownerType {
case Read:
maxConns += 2 // migrate, refreshDsList
case Write:
maxConns += 1 // addNewDS
case ReadWrite:
maxConns += 3 // migrate, addNewDS, archive
}
if maxConns >= jd.conf.maxOpenConnections {
var maxConns int
if !jd.conf.enableReaderQueue || !jd.conf.enableWriterQueue {
maxConns = jd.conf.maxOpenConnections
} else {
maxConns = 2 // buffer
maxConns += jd.conf.maxReaders + jd.conf.maxWriters
switch jd.ownerType {
case Read:
maxConns += 2 // migrate, refreshDsList
case Write:
maxConns += 1 // addNewDS
case ReadWrite:
maxConns += 3 // migrate, addNewDS, archive
}
if maxConns >= jd.conf.maxOpenConnections {
maxConns = jd.conf.maxOpenConnections

Check warning on line 802 in jobsdb/jobsdb.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/jobsdb.go#L802

Added line #L802 was not covered by tests
}
}
}
jd.dbHandle.SetMaxOpenConns(maxConns)
jd.dbHandle.SetMaxOpenConns(maxConns)

jd.assertError(jd.dbHandle.Ping())
jd.assertError(jd.dbHandle.Ping())
}

jd.workersAndAuxSetup()

Expand Down
73 changes: 73 additions & 0 deletions utils/misc/dbutils.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package misc

import (
"context"
"database/sql"
"fmt"
"net/url"
"os"
Expand All @@ -9,6 +11,9 @@
"github.com/lib/pq"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-go-kit/stats/collectors"
"github.com/rudderlabs/rudder-server/rruntime"
)

// GetConnectionString Returns Jobs DB connection configuration
Expand All @@ -35,6 +40,74 @@
)
}

func GetDatabaseConnectionPool(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func GetDatabaseConnectionPool(
func NewDatabaseConnectionPool(

ctx context.Context,
conf *config.Config,
stat stats.Stats,
componentName string,
) (*sql.DB, error) {
connStr := GetConnectionString(conf, componentName)
db, err := sql.Open("postgres", connStr)
if err != nil {
return nil, fmt.Errorf("Error opening connection to database: %w", err)

Check warning on line 52 in utils/misc/dbutils.go

View check run for this annotation

Codecov / codecov/patch

utils/misc/dbutils.go#L52

Added line #L52 was not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return nil, fmt.Errorf("Error opening connection to database: %w", err)
return nil, fmt.Errorf("opening connection to database: %w", err)

}
if err := db.Ping(); err != nil {
return nil, fmt.Errorf("Error pinging database: %w", err)

Check warning on line 55 in utils/misc/dbutils.go

View check run for this annotation

Codecov / codecov/patch

utils/misc/dbutils.go#L55

Added line #L55 was not covered by tests
}
if err := stat.RegisterCollector(
collectors.NewDatabaseSQLStats(
"jobsdb",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"jobsdb",
componentName,

db,
),
); err != nil {
return nil, fmt.Errorf("Error registering database stats collector: %w", err)

Check warning on line 63 in utils/misc/dbutils.go

View check run for this annotation

Codecov / codecov/patch

utils/misc/dbutils.go#L63

Added line #L63 was not covered by tests
}
/*
TODO: find out a reasonably good value for below pool configurations
*/
Comment on lines +65 to +67
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/*
TODO: find out a reasonably good value for below pool configurations
*/


maxConnsVar := conf.GetReloadableIntVar(40, 1, "JobsDB.maxOpenConnections")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would avoid using JobsDB, as it is an overloaded term.

Suggested change
maxConnsVar := conf.GetReloadableIntVar(40, 1, "JobsDB.maxOpenConnections")
maxConnsVar := conf.GetReloadableIntVar(40, 1, "db.pool.maxOpenConnections")

Or even

Suggested change
maxConnsVar := conf.GetReloadableIntVar(40, 1, "JobsDB.maxOpenConnections")
maxConnsVar := conf.GetReloadableIntVar(40, 1, "db.pool.maxOpenConnections", "db."+componentName+".pool.maxOpenConnections")

maxConns := maxConnsVar.Load()
db.SetMaxOpenConns(maxConns)

maxIdleConnsVar := conf.GetReloadableIntVar(5, 1, "JobsDB.maxIdleConnections")
maxIdleConns := maxIdleConnsVar.Load()
db.SetMaxIdleConns(maxIdleConns)

maxIdleTimeVar := conf.GetReloadableDurationVar(15, time.Minute, "JobsDB.maxIdleTime")
maxIdleTime := maxIdleTimeVar.Load()
db.SetConnMaxIdleTime(maxIdleTime)

maxConnLifetimeVar := conf.GetReloadableDurationVar(0, 0, "JobsDB.maxConnLifetime")
maxConnLifetime := maxConnLifetimeVar.Load()
db.SetConnMaxLifetime(maxConnLifetime)

ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
rruntime.Go(func() {
for {
select {
case <-ctx.Done():
return
case <-ticker.C:

Check warning on line 92 in utils/misc/dbutils.go

View check run for this annotation

Codecov / codecov/patch

utils/misc/dbutils.go#L92

Added line #L92 was not covered by tests
}
updatePoolConfig(db.SetMaxOpenConns, &maxConns, maxConnsVar)
updatePoolConfig(db.SetConnMaxIdleTime, &maxIdleTime, maxIdleTimeVar)
updatePoolConfig(db.SetMaxIdleConns, &maxIdleConns, maxIdleConnsVar)
updatePoolConfig(db.SetConnMaxLifetime, &maxConnLifetime, maxConnLifetimeVar)

Check warning on line 97 in utils/misc/dbutils.go

View check run for this annotation

Codecov / codecov/patch

utils/misc/dbutils.go#L94-L97

Added lines #L94 - L97 were not covered by tests
}
})
return db, nil
}

func updatePoolConfig[T comparable](f func(T), current *T, conf config.ValueLoader[T]) {
newValue := conf.Load()
if newValue != *current {
f(newValue)
*current = newValue

Check warning on line 107 in utils/misc/dbutils.go

View check run for this annotation

Codecov / codecov/patch

utils/misc/dbutils.go#L103-L107

Added lines #L103 - L107 were not covered by tests
}
}

// SetAppNameInDBConnURL sets application name in db connection url
// if application name is already present in dns it will get override by the appName
func SetAppNameInDBConnURL(connectionUrl, appName string) (string, error) {
Expand Down
Loading