diff --git a/app/apphandlers/embeddedAppHandler.go b/app/apphandlers/embeddedAppHandler.go index 03876e8081..8962a0563a 100644 --- a/app/apphandlers/embeddedAppHandler.go +++ b/app/apphandlers/embeddedAppHandler.go @@ -139,6 +139,13 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) FeaturesRetryMaxAttempts: 10, }) + dbPool, err := misc.GetDatabaseConnectionPool(ctx, config, statsFactory, "jobsdb") + if err != nil { + return err + } + // Not deferring close here + // defer dbPool.Close() + // This separate gateway db is created just to be used with gateway because in case of degraded mode, // the earlier created gwDb (which was created to be used mainly with processor) will not be running, and it // will cause issues for gateway because gateway is supposed to receive jobs even in degraded mode. @@ -146,6 +153,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) "gw", jobsdb.WithClearDB(options.ClearDB), jobsdb.WithStats(statsFactory), + jobsdb.WithDBHandle(dbPool), ) if err = gatewayDB.Start(); err != nil { return fmt.Errorf("could not start gateway: %w", err) @@ -160,24 +168,24 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) jobsdb.WithDSLimit(a.config.gatewayDSLimit), jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)), jobsdb.WithStats(statsFactory), + jobsdb.WithDBHandle(dbPool), ) - defer gwDBForProcessor.Close() routerDB := jobsdb.NewForReadWrite( "rt", jobsdb.WithClearDB(options.ClearDB), jobsdb.WithDSLimit(a.config.routerDSLimit), jobsdb.WithSkipMaintenanceErr(config.GetBool("Router.jobsDB.skipMaintenanceError", false)), jobsdb.WithStats(statsFactory), + jobsdb.WithDBHandle(dbPool), ) - defer routerDB.Close() batchRouterDB := jobsdb.NewForReadWrite( "batch_rt", jobsdb.WithClearDB(options.ClearDB), jobsdb.WithDSLimit(a.config.batchRouterDSLimit), jobsdb.WithSkipMaintenanceErr(config.GetBool("BatchRouter.jobsDB.skipMaintenanceError", false)), jobsdb.WithStats(statsFactory), + jobsdb.WithDBHandle(dbPool), ) - defer batchRouterDB.Close() // We need two errorDBs, one in read & one in write mode to support separate gateway to store failures errDBForRead := jobsdb.NewForRead( @@ -186,13 +194,14 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) jobsdb.WithDSLimit(a.config.processorDSLimit), jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)), jobsdb.WithStats(statsFactory), + jobsdb.WithDBHandle(dbPool), ) - defer errDBForRead.Close() errDBForWrite := jobsdb.NewForWrite( "proc_error", jobsdb.WithClearDB(options.ClearDB), jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", true)), jobsdb.WithStats(statsFactory), + jobsdb.WithDBHandle(dbPool), ) if err = errDBForWrite.Start(); err != nil { return fmt.Errorf("could not start errDBForWrite: %w", err) @@ -205,8 +214,8 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) jobsdb.WithDSLimit(a.config.processorDSLimit), jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)), jobsdb.WithStats(statsFactory), + jobsdb.WithDBHandle(dbPool), ) - defer schemaDB.Close() archivalDB := jobsdb.NewForReadWrite( "arc", @@ -219,8 +228,8 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) return config.GetDuration("archival.jobRetention", 24, time.Hour) }, ), + jobsdb.WithDBHandle(dbPool), ) - defer archivalDB.Close() var schemaForwarder schema_forwarder.Forwarder if config.GetBool("EventSchemas2.enabled", false) { diff --git a/app/apphandlers/gatewayAppHandler.go b/app/apphandlers/gatewayAppHandler.go index 99bb25bebf..81e9e400a2 100644 --- a/app/apphandlers/gatewayAppHandler.go +++ b/app/apphandlers/gatewayAppHandler.go @@ -23,6 +23,7 @@ import ( "github.com/rudderlabs/rudder-server/jobsdb" sourcedebugger "github.com/rudderlabs/rudder-server/services/debugger/source" "github.com/rudderlabs/rudder-server/services/transformer" + "github.com/rudderlabs/rudder-server/utils/misc" "github.com/rudderlabs/rudder-server/utils/types/deployment" ) @@ -68,14 +69,20 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options) } defer sourceHandle.Stop() + dbPool, err := misc.GetDatabaseConnectionPool(ctx, config, statsFactory, "jobsdb") + if err != nil { + return err + } + defer dbPool.Close() + gatewayDB := jobsdb.NewForWrite( "gw", jobsdb.WithClearDB(options.ClearDB), jobsdb.WithDSLimit(a.config.gatewayDSLimit), jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)), jobsdb.WithStats(statsFactory), + jobsdb.WithDBHandle(dbPool), ) - defer gatewayDB.Close() if err := gatewayDB.Start(); err != nil { return fmt.Errorf("could not start gatewayDB: %w", err) @@ -87,8 +94,8 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options) jobsdb.WithClearDB(options.ClearDB), jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)), jobsdb.WithStats(statsFactory), + jobsdb.WithDBHandle(dbPool), ) - defer errDB.Close() if err := errDB.Start(); err != nil { return fmt.Errorf("could not start errDB: %w", err) diff --git a/app/apphandlers/processorAppHandler.go b/app/apphandlers/processorAppHandler.go index c462ec0e2c..4647573ab7 100644 --- a/app/apphandlers/processorAppHandler.go +++ b/app/apphandlers/processorAppHandler.go @@ -145,43 +145,52 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options FeaturesRetryMaxAttempts: 10, }) + dbPool, err := misc.GetDatabaseConnectionPool(ctx, config, statsFactory, "jobsdb") + if err != nil { + return err + } + // This is idompotent, so it's safe to call it multiple times(jobsdb.Close()) + // can be cleaned up later + defer dbPool.Close() + gwDBForProcessor := jobsdb.NewForRead( "gw", jobsdb.WithClearDB(options.ClearDB), jobsdb.WithDSLimit(a.config.gatewayDSLimit), jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)), jobsdb.WithStats(statsFactory), + jobsdb.WithDBHandle(dbPool), ) - defer gwDBForProcessor.Close() routerDB := jobsdb.NewForReadWrite( "rt", jobsdb.WithClearDB(options.ClearDB), jobsdb.WithDSLimit(a.config.routerDSLimit), jobsdb.WithSkipMaintenanceErr(config.GetBool("Router.jobsDB.skipMaintenanceError", false)), jobsdb.WithStats(statsFactory), + jobsdb.WithDBHandle(dbPool), ) - defer routerDB.Close() batchRouterDB := jobsdb.NewForReadWrite( "batch_rt", jobsdb.WithClearDB(options.ClearDB), jobsdb.WithDSLimit(a.config.batchRouterDSLimit), jobsdb.WithSkipMaintenanceErr(config.GetBool("BatchRouter.jobsDB.skipMaintenanceError", false)), jobsdb.WithStats(statsFactory), + jobsdb.WithDBHandle(dbPool), ) - defer batchRouterDB.Close() errDBForRead := jobsdb.NewForRead( "proc_error", jobsdb.WithClearDB(options.ClearDB), jobsdb.WithDSLimit(a.config.processorDSLimit), jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)), jobsdb.WithStats(statsFactory), + jobsdb.WithDBHandle(dbPool), ) - defer errDBForRead.Close() errDBForWrite := jobsdb.NewForWrite( "proc_error", jobsdb.WithClearDB(options.ClearDB), jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", true)), jobsdb.WithStats(statsFactory), + jobsdb.WithDBHandle(dbPool), ) if err = errDBForWrite.Start(); err != nil { return fmt.Errorf("could not start errDBForWrite: %w", err) @@ -192,8 +201,8 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options jobsdb.WithClearDB(options.ClearDB), jobsdb.WithDSLimit(a.config.processorDSLimit), jobsdb.WithStats(statsFactory), + jobsdb.WithDBHandle(dbPool), ) - defer schemaDB.Close() archivalDB := jobsdb.NewForReadWrite( "arc", @@ -206,8 +215,8 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options return config.GetDuration("archival.jobRetention", 24, time.Hour) }, ), + jobsdb.WithDBHandle(dbPool), ) - defer archivalDB.Close() var schemaForwarder schema_forwarder.Forwarder if config.GetBool("EventSchemas2.enabled", false) { diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index e1a159772c..01e5af49aa 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -783,29 +783,29 @@ func (jd *Handle) init() { ), ), ) - } - var maxConns int - if !jd.conf.enableReaderQueue || !jd.conf.enableWriterQueue { - maxConns = jd.conf.maxOpenConnections - } else { - maxConns = 2 // buffer - maxConns += jd.conf.maxReaders + jd.conf.maxWriters - switch jd.ownerType { - case Read: - maxConns += 2 // migrate, refreshDsList - case Write: - maxConns += 1 // addNewDS - case ReadWrite: - maxConns += 3 // migrate, addNewDS, archive - } - if maxConns >= jd.conf.maxOpenConnections { + var maxConns int + if !jd.conf.enableReaderQueue || !jd.conf.enableWriterQueue { maxConns = jd.conf.maxOpenConnections + } else { + maxConns = 2 // buffer + maxConns += jd.conf.maxReaders + jd.conf.maxWriters + switch jd.ownerType { + case Read: + maxConns += 2 // migrate, refreshDsList + case Write: + maxConns += 1 // addNewDS + case ReadWrite: + maxConns += 3 // migrate, addNewDS, archive + } + if maxConns >= jd.conf.maxOpenConnections { + maxConns = jd.conf.maxOpenConnections + } } - } - jd.dbHandle.SetMaxOpenConns(maxConns) + jd.dbHandle.SetMaxOpenConns(maxConns) - jd.assertError(jd.dbHandle.Ping()) + jd.assertError(jd.dbHandle.Ping()) + } jd.workersAndAuxSetup() diff --git a/utils/misc/dbutils.go b/utils/misc/dbutils.go index b63c78d025..859581afd6 100644 --- a/utils/misc/dbutils.go +++ b/utils/misc/dbutils.go @@ -1,6 +1,8 @@ package misc import ( + "context" + "database/sql" "fmt" "net/url" "os" @@ -9,6 +11,9 @@ import ( "github.com/lib/pq" "github.com/rudderlabs/rudder-go-kit/config" + "github.com/rudderlabs/rudder-go-kit/stats" + "github.com/rudderlabs/rudder-go-kit/stats/collectors" + "github.com/rudderlabs/rudder-server/rruntime" ) // GetConnectionString Returns Jobs DB connection configuration @@ -35,6 +40,74 @@ func GetConnectionString(c *config.Config, componentName string) string { ) } +func GetDatabaseConnectionPool( + ctx context.Context, + conf *config.Config, + stat stats.Stats, + componentName string, +) (*sql.DB, error) { + connStr := GetConnectionString(conf, componentName) + db, err := sql.Open("postgres", connStr) + if err != nil { + return nil, fmt.Errorf("Error opening connection to database: %w", err) + } + if err := db.Ping(); err != nil { + return nil, fmt.Errorf("Error pinging database: %w", err) + } + if err := stat.RegisterCollector( + collectors.NewDatabaseSQLStats( + "jobsdb", + db, + ), + ); err != nil { + return nil, fmt.Errorf("Error registering database stats collector: %w", err) + } + /* + TODO: find out a reasonably good value for below pool configurations + */ + + maxConnsVar := conf.GetReloadableIntVar(40, 1, "JobsDB.maxOpenConnections") + maxConns := maxConnsVar.Load() + db.SetMaxOpenConns(maxConns) + + maxIdleConnsVar := conf.GetReloadableIntVar(5, 1, "JobsDB.maxIdleConnections") + maxIdleConns := maxIdleConnsVar.Load() + db.SetMaxIdleConns(maxIdleConns) + + maxIdleTimeVar := conf.GetReloadableDurationVar(15, time.Minute, "JobsDB.maxIdleTime") + maxIdleTime := maxIdleTimeVar.Load() + db.SetConnMaxIdleTime(maxIdleTime) + + maxConnLifetimeVar := conf.GetReloadableDurationVar(0, 0, "JobsDB.maxConnLifetime") + maxConnLifetime := maxConnLifetimeVar.Load() + db.SetConnMaxLifetime(maxConnLifetime) + + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + rruntime.Go(func() { + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + updatePoolConfig(db.SetMaxOpenConns, &maxConns, maxConnsVar) + updatePoolConfig(db.SetConnMaxIdleTime, &maxIdleTime, maxIdleTimeVar) + updatePoolConfig(db.SetMaxIdleConns, &maxIdleConns, maxIdleConnsVar) + updatePoolConfig(db.SetConnMaxLifetime, &maxConnLifetime, maxConnLifetimeVar) + } + }) + return db, nil +} + +func updatePoolConfig[T comparable](f func(T), current *T, conf config.ValueLoader[T]) { + newValue := conf.Load() + if newValue != *current { + f(newValue) + *current = newValue + } +} + // SetAppNameInDBConnURL sets application name in db connection url // if application name is already present in dns it will get override by the appName func SetAppNameInDBConnURL(connectionUrl, appName string) (string, error) {