diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a5b86084a..c082a1bbaa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ - "Cardinality limit" outcomes now report which limit was exceeded. ([#3825](https://github.com/getsentry/relay/pull/3825)) - Derive span browser name from user agent. ([#3834](https://github.com/getsentry/relay/pull/3834)) +- Redis pools for `project_configs`, `cardinality`, `quotas`, and `misc` usecases + can now be configured individually. ([#3843](https://github.com/getsentry/relay/pull/3843)) **Internal**: diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index e9a6864952..292d318399 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -16,7 +16,6 @@ use relay_kafka::{ }; use relay_metrics::aggregator::{AggregatorConfig, FlushBatching}; use relay_metrics::MetricNamespace; -use relay_redis::RedisConfigOptions; use serde::de::{DeserializeOwned, Unexpected, Visitor}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use uuid::Uuid; @@ -24,7 +23,7 @@ use uuid::Uuid; use crate::aggregator::{AggregatorServiceConfig, ScopedAggregatorConfig}; use crate::byte_size::ByteSize; use crate::upstream::UpstreamDescriptor; -use crate::{RedisConfig, RedisConnection}; +use crate::{create_redis_pools, RedisConfig, RedisConfigs, RedisPoolConfigs}; const DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD: u64 = 10; @@ -1024,7 +1023,7 @@ pub struct Processing { pub kafka_validate_topics: bool, /// Redis hosts to connect to for storing state for rate limits. #[serde(default)] - pub redis: Option, + pub redis: Option, /// Maximum chunk size of attachments for Kafka. #[serde(default = "default_chunk_size")] pub attachment_chunk_size: ByteSize, @@ -1585,7 +1584,7 @@ impl Config { } if let Some(redis) = overrides.redis_url { - processing.redis = Some(RedisConfig::single(redis)) + processing.redis = Some(RedisConfigs::Unified(RedisConfig::single(redis))) } if let Some(kafka_url) = overrides.kafka_url { @@ -2279,26 +2278,15 @@ impl Config { &self.values.processing.topics.unused } - /// Redis servers to connect to, for rate limiting. - pub fn redis(&self) -> Option<(&RedisConnection, RedisConfigOptions)> { - let cpu_concurrency = self.cpu_concurrency(); + /// Redis servers to connect to for project configs, cardinality limits, + /// rate limiting, and metrics metadata. + pub fn redis(&self) -> Option { + let redis_configs = self.values.processing.redis.as_ref()?; - let redis = self.values.processing.redis.as_ref()?; - - let options = RedisConfigOptions { - max_connections: redis - .options - .max_connections - .unwrap_or(cpu_concurrency as u32 * 2) - .min(crate::redis::DEFAULT_MIN_MAX_CONNECTIONS), - connection_timeout: redis.options.connection_timeout, - max_lifetime: redis.options.max_lifetime, - idle_timeout: redis.options.idle_timeout, - read_timeout: redis.options.read_timeout, - write_timeout: redis.options.write_timeout, - }; - - Some((&redis.connection, options)) + Some(create_redis_pools( + redis_configs, + self.cpu_concurrency() as u32, + )) } /// Chunk size of attachments in bytes. diff --git a/relay-config/src/redis.rs b/relay-config/src/redis.rs index 437d1e5b15..29fba9d0cf 100644 --- a/relay-config/src/redis.rs +++ b/relay-config/src/redis.rs @@ -1,3 +1,4 @@ +use relay_redis::RedisConfigOptions; use serde::{Deserialize, Serialize}; /// For small setups, `2 x limits.max_thread_count` does not leave enough headroom. @@ -86,7 +87,8 @@ pub enum RedisConnection { } /// Configuration for connecting a redis client. -#[derive(Clone, Debug, Serialize, Eq, PartialEq)] +#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)] +#[serde(from = "RedisConfigFromFile")] pub struct RedisConfig { /// Redis connection info. #[serde(flatten)] @@ -128,12 +130,92 @@ impl From for RedisConfig { } } -impl<'de> Deserialize<'de> for RedisConfig { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - RedisConfigFromFile::deserialize(deserializer).map(Into::into) +/// Configurations for the various Redis pools used by Relay. +#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)] +#[serde(untagged)] +pub enum RedisConfigs { + /// All pools should be configured the same way. + Unified(RedisConfig), + /// Individual configurations for each pool. + Individual { + /// Configuration for the `project_configs` pool. + project_configs: Box, + /// Configuration for the `cardinality` pool. + cardinality: Box, + /// Configuration for the `quotas` pool. + quotas: Box, + /// Configuration for the `misc` pool. + misc: Box, + }, +} + +/// Helper struct bundling connections and options for the various Redis pools. +#[derive(Clone, Debug)] +pub struct RedisPoolConfigs<'a> { + /// Configuration for the `project_configs` pool. + pub project_configs: (&'a RedisConnection, RedisConfigOptions), + /// Configuration for the `cardinality` pool. + pub cardinality: (&'a RedisConnection, RedisConfigOptions), + /// Configuration for the `quotas` pool. + pub quotas: (&'a RedisConnection, RedisConfigOptions), + /// Configuration for the `misc` pool. + pub misc: (&'a RedisConnection, RedisConfigOptions), +} + +pub(super) fn create_redis_pool( + config: &RedisConfig, + default_connections: u32, +) -> (&RedisConnection, RedisConfigOptions) { + let options = RedisConfigOptions { + max_connections: config + .options + .max_connections + .unwrap_or(default_connections), + connection_timeout: config.options.connection_timeout, + max_lifetime: config.options.max_lifetime, + idle_timeout: config.options.idle_timeout, + read_timeout: config.options.read_timeout, + write_timeout: config.options.write_timeout, + }; + + (&config.connection, options) +} + +pub(super) fn create_redis_pools(configs: &RedisConfigs, cpu_concurrency: u32) -> RedisPoolConfigs { + // Default `max_connections` for the `project_configs` pool. + // In a unified config, this is used for all pools. + let project_configs_default_connections = std::cmp::max( + cpu_concurrency * 2, + crate::redis::DEFAULT_MIN_MAX_CONNECTIONS, + ); + match configs { + RedisConfigs::Unified(cfg) => { + let pool = create_redis_pool(cfg, project_configs_default_connections); + RedisPoolConfigs { + project_configs: pool.clone(), + cardinality: pool.clone(), + quotas: pool.clone(), + misc: pool, + } + } + RedisConfigs::Individual { + project_configs, + cardinality, + quotas, + misc, + } => { + let project_configs = + create_redis_pool(project_configs, project_configs_default_connections); + let cardinality = create_redis_pool(cardinality, cpu_concurrency); + let quotas = create_redis_pool(quotas, cpu_concurrency); + let misc = create_redis_pool(misc, cpu_concurrency); + RedisPoolConfigs { + project_configs, + cardinality, + quotas, + misc, + } + } } } @@ -167,6 +249,90 @@ connection_timeout: 5 ); } + #[test] + fn test_redis_single_opts_unified() { + let yaml = r#" +server: "redis://127.0.0.1:6379" +max_connections: 42 +connection_timeout: 5 +"#; + + let config: RedisConfigs = serde_yaml::from_str(yaml) + .expect("Parsed processing redis config: single with options"); + + assert_eq!( + config, + RedisConfigs::Unified(RedisConfig { + connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()), + options: PartialRedisConfigOptions { + max_connections: Some(42), + connection_timeout: 5, + ..Default::default() + } + }) + ); + } + + #[test] + fn test_redis_individual() { + let yaml = r#" +project_configs: + server: "redis://127.0.0.1:6379" + max_connections: 42 + connection_timeout: 5 +cardinality: + server: "redis://127.0.0.1:6379" +quotas: + cluster_nodes: + - "redis://127.0.0.1:6379" + - "redis://127.0.0.2:6379" + max_connections: 17 + connection_timeout: 5 +misc: + cluster_nodes: + - "redis://127.0.0.1:6379" + - "redis://127.0.0.2:6379" +"#; + + let configs: RedisConfigs = serde_yaml::from_str(yaml) + .expect("Parsed processing redis configs: single with options"); + + let expected = RedisConfigs::Individual { + project_configs: Box::new(RedisConfig { + connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()), + options: PartialRedisConfigOptions { + max_connections: Some(42), + connection_timeout: 5, + ..Default::default() + }, + }), + cardinality: Box::new(RedisConfig { + connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()), + options: Default::default(), + }), + quotas: Box::new(RedisConfig { + connection: RedisConnection::Cluster(vec![ + "redis://127.0.0.1:6379".to_owned(), + "redis://127.0.0.2:6379".to_owned(), + ]), + options: PartialRedisConfigOptions { + max_connections: Some(17), + connection_timeout: 5, + ..Default::default() + }, + }), + misc: Box::new(RedisConfig { + connection: RedisConnection::Cluster(vec![ + "redis://127.0.0.1:6379".to_owned(), + "redis://127.0.0.2:6379".to_owned(), + ]), + options: Default::default(), + }), + }; + + assert_eq!(configs, expected); + } + #[test] fn test_redis_single_serialize() { let config = RedisConfig { @@ -189,6 +355,28 @@ connection_timeout: 5 "###); } + #[test] + fn test_redis_single_serialize_unified() { + let configs = RedisConfigs::Unified(RedisConfig { + connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()), + options: PartialRedisConfigOptions { + connection_timeout: 5, + ..Default::default() + }, + }); + + assert_json_snapshot!(configs, @r###" + { + "server": "redis://127.0.0.1:6379", + "connection_timeout": 5, + "max_lifetime": 300, + "idle_timeout": 60, + "read_timeout": 3, + "write_timeout": 3 + } + "###); + } + #[test] fn test_redis_single_opts_default() { let yaml = r#" @@ -254,6 +442,33 @@ read_timeout: 10 ); } + #[test] + fn test_redis_cluster_nodes_opts_unified() { + let yaml = r#" +cluster_nodes: + - "redis://127.0.0.1:6379" + - "redis://127.0.0.2:6379" +read_timeout: 10 +"#; + + let config: RedisConfigs = serde_yaml::from_str(yaml) + .expect("Parsed processing redis config: single with options"); + + assert_eq!( + config, + RedisConfigs::Unified(RedisConfig { + connection: RedisConnection::Cluster(vec![ + "redis://127.0.0.1:6379".to_owned(), + "redis://127.0.0.2:6379".to_owned() + ]), + options: PartialRedisConfigOptions { + read_timeout: 10, + ..Default::default() + }, + }) + ); + } + #[test] fn test_redis_cluster_serialize() { let config = RedisConfig { @@ -281,4 +496,113 @@ read_timeout: 10 } "###); } + + #[test] + fn test_redis_cluster_serialize_unified() { + let configs = RedisConfigs::Unified(RedisConfig { + connection: RedisConnection::Cluster(vec![ + "redis://127.0.0.1:6379".to_owned(), + "redis://127.0.0.2:6379".to_owned(), + ]), + options: PartialRedisConfigOptions { + read_timeout: 33, + ..Default::default() + }, + }); + + assert_json_snapshot!(configs, @r###" + { + "cluster_nodes": [ + "redis://127.0.0.1:6379", + "redis://127.0.0.2:6379" + ], + "connection_timeout": 5, + "max_lifetime": 300, + "idle_timeout": 60, + "read_timeout": 33, + "write_timeout": 3 + } + "###); + } + + #[test] + fn test_redis_serialize_individual() { + let configs = RedisConfigs::Individual { + project_configs: Box::new(RedisConfig { + connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()), + options: PartialRedisConfigOptions { + max_connections: Some(42), + connection_timeout: 5, + ..Default::default() + }, + }), + cardinality: Box::new(RedisConfig { + connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()), + options: Default::default(), + }), + quotas: Box::new(RedisConfig { + connection: RedisConnection::Cluster(vec![ + "redis://127.0.0.1:6379".to_owned(), + "redis://127.0.0.2:6379".to_owned(), + ]), + options: PartialRedisConfigOptions { + max_connections: Some(17), + connection_timeout: 5, + ..Default::default() + }, + }), + misc: Box::new(RedisConfig { + connection: RedisConnection::Cluster(vec![ + "redis://127.0.0.1:6379".to_owned(), + "redis://127.0.0.2:6379".to_owned(), + ]), + options: Default::default(), + }), + }; + + assert_json_snapshot!(configs, @r###" + { + "project_configs": { + "server": "redis://127.0.0.1:6379", + "max_connections": 42, + "connection_timeout": 5, + "max_lifetime": 300, + "idle_timeout": 60, + "read_timeout": 3, + "write_timeout": 3 + }, + "cardinality": { + "server": "redis://127.0.0.1:6379", + "connection_timeout": 5, + "max_lifetime": 300, + "idle_timeout": 60, + "read_timeout": 3, + "write_timeout": 3 + }, + "quotas": { + "cluster_nodes": [ + "redis://127.0.0.1:6379", + "redis://127.0.0.2:6379" + ], + "max_connections": 17, + "connection_timeout": 5, + "max_lifetime": 300, + "idle_timeout": 60, + "read_timeout": 3, + "write_timeout": 3 + }, + "misc": { + "cluster_nodes": [ + "redis://127.0.0.1:6379", + "redis://127.0.0.2:6379" + ], + "connection_timeout": 5, + "max_lifetime": 300, + "idle_timeout": 60, + "read_timeout": 3, + "write_timeout": 3 + } + } + "###); + } } diff --git a/relay-redis/src/noop.rs b/relay-redis/src/noop.rs index 0bab77bdcf..db08723155 100644 --- a/relay-redis/src/noop.rs +++ b/relay-redis/src/noop.rs @@ -29,3 +29,16 @@ impl RedisPool { Ok(Self) } } + +/// The various [`RedisPool`]s used within Relay. +#[derive(Debug, Clone)] +pub struct RedisPools { + /// The pool used for project configurations + pub project_configs: RedisPool, + /// The pool used for cardinality limits. + pub cardinality: RedisPool, + /// The pool used for rate limiting/quotas. + pub quotas: RedisPool, + /// The pool used for metrics metadata. + pub misc: RedisPool, +} diff --git a/relay-redis/src/real.rs b/relay-redis/src/real.rs index 6b4f129940..f5514131e7 100644 --- a/relay-redis/src/real.rs +++ b/relay-redis/src/real.rs @@ -208,6 +208,19 @@ impl RedisPool { } } +/// The various [`RedisPool`]s used within Relay. +#[derive(Debug, Clone)] +pub struct RedisPools { + /// The pool used for project configurations + pub project_configs: RedisPool, + /// The pool used for cardinality limits. + pub cardinality: RedisPool, + /// The pool used for rate limiting/quotas. + pub quotas: RedisPool, + /// The pool used for metrics metadata. + pub misc: RedisPool, +} + /// Stats about how the [`RedisPool`] is performing. pub struct Stats { /// The number of connections currently being managed by the pool. diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 4f40ce73d0..4b45fcdf67 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -10,8 +10,8 @@ use axum::extract::FromRequestParts; use axum::http::request::Parts; use rayon::ThreadPool; use relay_cogs::Cogs; -use relay_config::{Config, RedisConnection}; -use relay_redis::RedisPool; +use relay_config::{Config, RedisConnection, RedisPoolConfigs}; +use relay_redis::{RedisConfigOptions, RedisError, RedisPool, RedisPools}; use relay_system::{channel, Addr, Service}; use tokio::runtime::Runtime; @@ -153,15 +153,10 @@ impl ServiceState { let upstream_relay = UpstreamRelayService::new(config.clone()).start(); let test_store = TestStoreService::new(config.clone()).start(); - let redis_pool = config + let redis_pools = config .redis() .filter(|_| config.processing_enabled()) - .map(|redis| match redis { - (RedisConnection::Single(server), options) => RedisPool::single(server, options), - (RedisConnection::Cluster(servers), options) => { - RedisPool::cluster(servers.iter().map(|s| s.as_str()), options) - } - }) + .map(create_redis_pools) .transpose() .context(ServiceError::Redis)?; @@ -232,7 +227,7 @@ impl ServiceState { global_config_handle, cogs, #[cfg(feature = "processing")] - redis_pool.clone(), + redis_pools.clone(), processor::Addrs { project_cache: project_cache.clone(), outcome_aggregator: outcome_aggregator.clone(), @@ -260,7 +255,9 @@ impl ServiceState { MemoryChecker::new(memory_stat.clone(), config.clone()), project_cache_services, metric_outcomes, - redis_pool.clone(), + redis_pools + .as_ref() + .map(|pools| pools.project_configs.clone()), ) .spawn_handler(project_cache_rx); @@ -277,7 +274,7 @@ impl ServiceState { config.clone(), upstream_relay.clone(), #[cfg(feature = "processing")] - redis_pool, + redis_pools.clone(), ) .start(); @@ -365,6 +362,31 @@ impl ServiceState { } } +fn create_redis_pool( + (connection, options): (&RedisConnection, RedisConfigOptions), +) -> Result { + match connection { + RedisConnection::Cluster(servers) => { + RedisPool::cluster(servers.iter().map(|s| s.as_str()), options) + } + RedisConnection::Single(server) => RedisPool::single(server, options), + } +} + +pub fn create_redis_pools(configs: RedisPoolConfigs) -> Result { + let project_configs = create_redis_pool(configs.project_configs)?; + let cardinality = create_redis_pool(configs.cardinality)?; + let quotas = create_redis_pool(configs.quotas)?; + let misc = create_redis_pool(configs.misc)?; + + Ok(RedisPools { + project_configs, + cardinality, + quotas, + misc, + }) +} + #[axum::async_trait] impl FromRequestParts for ServiceState { type Rejection = Infallible; diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 99de389281..5a63f07ec9 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -58,7 +58,7 @@ use { relay_dynamic_config::{CardinalityLimiterMode, MetricExtractionGroups}, relay_metrics::RedisMetricMetaStore, relay_quotas::{Quota, RateLimitingError, RateLimits, RedisRateLimiter}, - relay_redis::RedisPool, + relay_redis::{RedisPool, RedisPools}, std::iter::Chain, std::slice::Iter, symbolic_unreal::{Unreal4Error, Unreal4ErrorKind}, @@ -1118,7 +1118,7 @@ struct InnerProcessor { global_config: GlobalConfigHandle, cogs: Cogs, #[cfg(feature = "processing")] - redis_pool: Option, + quotas_pool: Option, addrs: Addrs, #[cfg(feature = "processing")] rate_limiter: Option, @@ -1137,7 +1137,7 @@ impl EnvelopeProcessorService { config: Arc, global_config: GlobalConfigHandle, cogs: Cogs, - #[cfg(feature = "processing")] redis: Option, + #[cfg(feature = "processing")] redis: Option, addrs: Addrs, metric_outcomes: MetricOutcomes, ) -> Self { @@ -1151,32 +1151,41 @@ impl EnvelopeProcessorService { } }); + #[cfg(feature = "processing")] + let (cardinality, quotas, misc) = match redis { + Some(RedisPools { + cardinality, + quotas, + misc, + .. + }) => (Some(cardinality), Some(quotas), Some(misc)), + None => (None, None, None), + }; + let inner = InnerProcessor { workers: WorkerGroup::new(pool), global_config, cogs, #[cfg(feature = "processing")] - redis_pool: redis.clone(), + quotas_pool: quotas.clone(), #[cfg(feature = "processing")] - rate_limiter: redis - .clone() - .map(|pool| RedisRateLimiter::new(pool).max_limit(config.max_rate_limit())), + rate_limiter: quotas + .map(|quotas| RedisRateLimiter::new(quotas).max_limit(config.max_rate_limit())), addrs, geoip_lookup, #[cfg(feature = "processing")] - metric_meta_store: redis.clone().map(|pool| { - RedisMetricMetaStore::new(pool, config.metrics_meta_locations_expiry()) + metric_meta_store: misc.map(|misc| { + RedisMetricMetaStore::new(misc, config.metrics_meta_locations_expiry()) }), #[cfg(feature = "processing")] - cardinality_limiter: redis - .clone() - .map(|pool| { + cardinality_limiter: cardinality + .map(|cardinality| { RedisSetLimiter::new( RedisSetLimiterOptions { cache_vacuum_interval: config .cardinality_limiter_cache_vacuum_interval(), }, - pool, + cardinality, ) }) .map(CardinalityLimiter::new), @@ -1250,9 +1259,9 @@ impl EnvelopeProcessorService { #[allow(unused_mut)] let mut reservoir = ReservoirEvaluator::new(reservoir_counters); #[cfg(feature = "processing")] - if let Some(redis_pool) = self.inner.redis_pool.as_ref() { + if let Some(quotas_pool) = self.inner.quotas_pool.as_ref() { let org_id = managed_envelope.scoping().organization_id; - reservoir.set_redis(org_id, redis_pool); + reservoir.set_redis(org_id, quotas_pool); } let extracted_metrics = ProcessingExtractedMetrics::new( diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index 910606b265..ddc4b1734c 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -442,7 +442,9 @@ impl ProjectSource { UpstreamProjectSourceService::new(config.clone(), upstream_relay).start(); #[cfg(feature = "processing")] - let redis_maxconns = config.redis().map(|(_, config)| config.max_connections); + let redis_maxconns = config + .redis() + .map(|configs| configs.project_configs.1.max_connections); #[cfg(feature = "processing")] let redis_source = _redis.map(|pool| RedisProjectSource::new(config.clone(), pool)); diff --git a/relay-server/src/services/stats.rs b/relay-server/src/services/stats.rs index a2a1a960d7..f233998b09 100644 --- a/relay-server/src/services/stats.rs +++ b/relay-server/src/services/stats.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use relay_config::{Config, RelayMode}; #[cfg(feature = "processing")] -use relay_redis::RedisPool; +use relay_redis::{RedisPool, RedisPools}; use relay_statsd::metric; use relay_system::{Addr, Service}; use tokio::time::interval; @@ -17,20 +17,20 @@ pub struct RelayStats { config: Arc, upstream_relay: Addr, #[cfg(feature = "processing")] - redis_pool: Option, + redis_pools: Option, } impl RelayStats { pub fn new( config: Arc, upstream_relay: Addr, - #[cfg(feature = "processing")] redis_pool: Option, + #[cfg(feature = "processing")] redis_pools: Option, ) -> Self { Self { config, upstream_relay, #[cfg(feature = "processing")] - redis_pool, + redis_pools, } } @@ -100,18 +100,36 @@ impl RelayStats { } } + #[cfg(feature = "processing")] + fn redis_pool(redis_pool: &RedisPool, name: &str) { + let state = redis_pool.stats(); + metric!( + gauge(RelayGauges::RedisPoolConnections) = u64::from(state.connections), + pool = name + ); + metric!( + gauge(RelayGauges::RedisPoolIdleConnections) = u64::from(state.idle_connections), + pool = name + ); + } + #[cfg(not(feature = "processing"))] - async fn redis_pool(&self) {} + async fn redis_pools(&self) {} #[cfg(feature = "processing")] - async fn redis_pool(&self) { - let Some(ref redis_pool) = self.redis_pool else { - return; - }; - - let state = redis_pool.stats(); - metric!(gauge(RelayGauges::RedisPoolConnections) = u64::from(state.connections)); - metric!(gauge(RelayGauges::RedisPoolIdleConnections) = u64::from(state.idle_connections)); + async fn redis_pools(&self) { + if let Some(RedisPools { + project_configs, + cardinality, + quotas, + misc, + }) = self.redis_pools.as_ref() + { + Self::redis_pool(project_configs, "project_configs"); + Self::redis_pool(cardinality, "cardinality"); + Self::redis_pool(quotas, "quotas"); + Self::redis_pool(misc, "misc"); + } } } @@ -128,7 +146,7 @@ impl Service for RelayStats { let _ = tokio::join!( self.upstream_status(), self.tokio_metrics(), - self.redis_pool(), + self.redis_pools(), ); ticker.tick().await; } diff --git a/relay-server/src/testutils.rs b/relay-server/src/testutils.rs index d14e5e16ac..f595b30096 100644 --- a/relay-server/src/testutils.rs +++ b/relay-server/src/testutils.rs @@ -12,12 +12,11 @@ use relay_sampling::{DynamicSamplingContext, SamplingConfig}; use relay_system::Addr; use relay_test::mock_service; -#[cfg(feature = "processing")] -use {relay_config::RedisConnection, relay_redis::RedisPool}; - use crate::envelope::{Envelope, Item, ItemType}; use crate::extractors::RequestMeta; use crate::metrics::{MetricOutcomes, MetricStats}; +#[cfg(feature = "processing")] +use crate::service::create_redis_pools; use crate::services::global_config::GlobalConfigHandle; use crate::services::outcome::TrackOutcome; use crate::services::processor::{self, EnvelopeProcessorService}; @@ -123,17 +122,7 @@ pub fn create_test_processor(config: Config) -> EnvelopeProcessorService { let (test_store, _) = mock_service("test_store", (), |&mut (), _| {}); #[cfg(feature = "processing")] - let redis = config - .redis() - .filter(|_| config.processing_enabled()) - .map(|redis| match redis { - (RedisConnection::Single(server), options) => { - RedisPool::single(server, options).unwrap() - } - (RedisConnection::Cluster(servers), options) => { - RedisPool::cluster(servers.iter().map(|s| s.as_str()), options).unwrap() - } - }); + let redis_pools = config.redis().map(create_redis_pools).transpose().unwrap(); let metric_outcomes = MetricOutcomes::new(MetricStats::test().0, outcome_aggregator.clone()); @@ -144,7 +133,7 @@ pub fn create_test_processor(config: Config) -> EnvelopeProcessorService { GlobalConfigHandle::fixed(Default::default()), Cogs::noop(), #[cfg(feature = "processing")] - redis, + redis_pools, processor::Addrs { outcome_aggregator, project_cache, @@ -162,18 +151,7 @@ pub fn create_test_processor_with_addrs( addrs: processor::Addrs, ) -> EnvelopeProcessorService { #[cfg(feature = "processing")] - let redis = config - .redis() - .filter(|_| config.processing_enabled()) - .map(|redis| match redis { - (RedisConnection::Single(server), options) => { - RedisPool::single(server, options).unwrap() - } - (RedisConnection::Cluster(servers), options) => { - RedisPool::cluster(servers.iter().map(|s| s.as_str()), options).unwrap() - } - }); - + let redis_pools = config.redis().map(create_redis_pools).transpose().unwrap(); let metric_outcomes = MetricOutcomes::new(MetricStats::test().0, addrs.outcome_aggregator.clone()); @@ -184,7 +162,7 @@ pub fn create_test_processor_with_addrs( GlobalConfigHandle::fixed(Default::default()), Cogs::noop(), #[cfg(feature = "processing")] - redis, + redis_pools, addrs, metric_outcomes, )