-
Notifications
You must be signed in to change notification settings - Fork 312
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: one (*sql.DB) pool for all jobsdb #5170
base: master
Are you sure you want to change the base?
Changes from all commits
3966164
163c085
625f3ee
6b85d6f
dfa116a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -139,13 +139,21 @@ | |
FeaturesRetryMaxAttempts: 10, | ||
}) | ||
|
||
dbPool, err := misc.GetDatabaseConnectionPool(ctx, config, statsFactory, "jobsdb") | ||
if err != nil { | ||
return err | ||
} | ||
// Not deferring close here | ||
// defer dbPool.Close() | ||
Comment on lines
+146
to
+147
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What the reason for not closing here? Shall we inject dbPool to embeddedAppHandler. So we always use on connection pool |
||
|
||
// This separate gateway db is created just to be used with gateway because in case of degraded mode, | ||
// the earlier created gwDb (which was created to be used mainly with processor) will not be running, and it | ||
// will cause issues for gateway because gateway is supposed to receive jobs even in degraded mode. | ||
gatewayDB := jobsdb.NewForWrite( | ||
"gw", | ||
jobsdb.WithClearDB(options.ClearDB), | ||
jobsdb.WithStats(statsFactory), | ||
jobsdb.WithDBHandle(dbPool), | ||
) | ||
if err = gatewayDB.Start(); err != nil { | ||
return fmt.Errorf("could not start gateway: %w", err) | ||
|
@@ -160,24 +168,24 @@ | |
jobsdb.WithDSLimit(a.config.gatewayDSLimit), | ||
jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)), | ||
jobsdb.WithStats(statsFactory), | ||
jobsdb.WithDBHandle(dbPool), | ||
) | ||
defer gwDBForProcessor.Close() | ||
routerDB := jobsdb.NewForReadWrite( | ||
"rt", | ||
jobsdb.WithClearDB(options.ClearDB), | ||
jobsdb.WithDSLimit(a.config.routerDSLimit), | ||
jobsdb.WithSkipMaintenanceErr(config.GetBool("Router.jobsDB.skipMaintenanceError", false)), | ||
jobsdb.WithStats(statsFactory), | ||
jobsdb.WithDBHandle(dbPool), | ||
) | ||
defer routerDB.Close() | ||
batchRouterDB := jobsdb.NewForReadWrite( | ||
"batch_rt", | ||
jobsdb.WithClearDB(options.ClearDB), | ||
jobsdb.WithDSLimit(a.config.batchRouterDSLimit), | ||
jobsdb.WithSkipMaintenanceErr(config.GetBool("BatchRouter.jobsDB.skipMaintenanceError", false)), | ||
jobsdb.WithStats(statsFactory), | ||
jobsdb.WithDBHandle(dbPool), | ||
) | ||
defer batchRouterDB.Close() | ||
|
||
// We need two errorDBs, one in read & one in write mode to support separate gateway to store failures | ||
errDBForRead := jobsdb.NewForRead( | ||
|
@@ -186,13 +194,14 @@ | |
jobsdb.WithDSLimit(a.config.processorDSLimit), | ||
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)), | ||
jobsdb.WithStats(statsFactory), | ||
jobsdb.WithDBHandle(dbPool), | ||
) | ||
defer errDBForRead.Close() | ||
errDBForWrite := jobsdb.NewForWrite( | ||
"proc_error", | ||
jobsdb.WithClearDB(options.ClearDB), | ||
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", true)), | ||
jobsdb.WithStats(statsFactory), | ||
jobsdb.WithDBHandle(dbPool), | ||
) | ||
if err = errDBForWrite.Start(); err != nil { | ||
return fmt.Errorf("could not start errDBForWrite: %w", err) | ||
|
@@ -205,8 +214,8 @@ | |
jobsdb.WithDSLimit(a.config.processorDSLimit), | ||
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)), | ||
jobsdb.WithStats(statsFactory), | ||
jobsdb.WithDBHandle(dbPool), | ||
) | ||
defer schemaDB.Close() | ||
|
||
archivalDB := jobsdb.NewForReadWrite( | ||
"arc", | ||
|
@@ -219,8 +228,8 @@ | |
return config.GetDuration("archival.jobRetention", 24, time.Hour) | ||
}, | ||
), | ||
jobsdb.WithDBHandle(dbPool), | ||
) | ||
defer archivalDB.Close() | ||
|
||
var schemaForwarder schema_forwarder.Forwarder | ||
if config.GetBool("EventSchemas2.enabled", false) { | ||
|
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
@@ -1,6 +1,8 @@ | ||||||||||
package misc | ||||||||||
|
||||||||||
import ( | ||||||||||
"context" | ||||||||||
"database/sql" | ||||||||||
"fmt" | ||||||||||
"net/url" | ||||||||||
"os" | ||||||||||
|
@@ -9,6 +11,9 @@ | |||||||||
"github.com/lib/pq" | ||||||||||
|
||||||||||
"github.com/rudderlabs/rudder-go-kit/config" | ||||||||||
"github.com/rudderlabs/rudder-go-kit/stats" | ||||||||||
"github.com/rudderlabs/rudder-go-kit/stats/collectors" | ||||||||||
"github.com/rudderlabs/rudder-server/rruntime" | ||||||||||
) | ||||||||||
|
||||||||||
// GetConnectionString Returns Jobs DB connection configuration | ||||||||||
|
@@ -35,6 +40,74 @@ | |||||||||
) | ||||||||||
} | ||||||||||
|
||||||||||
func GetDatabaseConnectionPool( | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
ctx context.Context, | ||||||||||
conf *config.Config, | ||||||||||
stat stats.Stats, | ||||||||||
componentName string, | ||||||||||
) (*sql.DB, error) { | ||||||||||
connStr := GetConnectionString(conf, componentName) | ||||||||||
db, err := sql.Open("postgres", connStr) | ||||||||||
if err != nil { | ||||||||||
return nil, fmt.Errorf("Error opening connection to database: %w", err) | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
} | ||||||||||
if err := db.Ping(); err != nil { | ||||||||||
return nil, fmt.Errorf("Error pinging database: %w", err) | ||||||||||
} | ||||||||||
if err := stat.RegisterCollector( | ||||||||||
collectors.NewDatabaseSQLStats( | ||||||||||
"jobsdb", | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
db, | ||||||||||
), | ||||||||||
); err != nil { | ||||||||||
return nil, fmt.Errorf("Error registering database stats collector: %w", err) | ||||||||||
} | ||||||||||
/* | ||||||||||
TODO: find out a reasonably good value for below pool configurations | ||||||||||
*/ | ||||||||||
Comment on lines
+65
to
+67
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
|
||||||||||
maxConnsVar := conf.GetReloadableIntVar(40, 1, "JobsDB.maxOpenConnections") | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would avoid using JobsDB, as it is an overloaded term.
Suggested change
Or even
Suggested change
|
||||||||||
maxConns := maxConnsVar.Load() | ||||||||||
db.SetMaxOpenConns(maxConns) | ||||||||||
|
||||||||||
maxIdleConnsVar := conf.GetReloadableIntVar(5, 1, "JobsDB.maxIdleConnections") | ||||||||||
maxIdleConns := maxIdleConnsVar.Load() | ||||||||||
db.SetMaxIdleConns(maxIdleConns) | ||||||||||
|
||||||||||
maxIdleTimeVar := conf.GetReloadableDurationVar(15, time.Minute, "JobsDB.maxIdleTime") | ||||||||||
maxIdleTime := maxIdleTimeVar.Load() | ||||||||||
db.SetConnMaxIdleTime(maxIdleTime) | ||||||||||
|
||||||||||
maxConnLifetimeVar := conf.GetReloadableDurationVar(0, 0, "JobsDB.maxConnLifetime") | ||||||||||
maxConnLifetime := maxConnLifetimeVar.Load() | ||||||||||
db.SetConnMaxLifetime(maxConnLifetime) | ||||||||||
|
||||||||||
ticker := time.NewTicker(5 * time.Second) | ||||||||||
defer ticker.Stop() | ||||||||||
rruntime.Go(func() { | ||||||||||
for { | ||||||||||
select { | ||||||||||
case <-ctx.Done(): | ||||||||||
return | ||||||||||
case <-ticker.C: | ||||||||||
} | ||||||||||
updatePoolConfig(db.SetMaxOpenConns, &maxConns, maxConnsVar) | ||||||||||
updatePoolConfig(db.SetConnMaxIdleTime, &maxIdleTime, maxIdleTimeVar) | ||||||||||
updatePoolConfig(db.SetMaxIdleConns, &maxIdleConns, maxIdleConnsVar) | ||||||||||
updatePoolConfig(db.SetConnMaxLifetime, &maxConnLifetime, maxConnLifetimeVar) | ||||||||||
} | ||||||||||
}) | ||||||||||
return db, nil | ||||||||||
} | ||||||||||
|
||||||||||
func updatePoolConfig[T comparable](f func(T), current *T, conf config.ValueLoader[T]) { | ||||||||||
newValue := conf.Load() | ||||||||||
if newValue != *current { | ||||||||||
f(newValue) | ||||||||||
*current = newValue | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
// SetAppNameInDBConnURL sets application name in db connection url | ||||||||||
// if application name is already present in dns it will get override by the appName | ||||||||||
func SetAppNameInDBConnURL(connectionUrl, appName string) (string, error) { | ||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would avoid using
jobsdb
for anything else other than jobsdb component