Skip to content

Commit

Permalink
Consolidate and clean up copyPersistenceConfig in functional tests (#…
Browse files Browse the repository at this point in the history
…6554)

## What changed?
Just refactor some repeated code

## Why?
easier to maintain
  • Loading branch information
dnr authored Sep 27, 2024
1 parent 453d3b2 commit 6d8baf9
Showing 1 changed file with 27 additions and 69 deletions.
96 changes: 27 additions & 69 deletions tests/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,30 +432,31 @@ func (c *temporalImpl) GetFrontendNamespaceRegistry() namespace.Registry {
return c.frontendNamespaceRegistry
}

func (c *temporalImpl) startFrontend(
hostsByService map[primitives.ServiceName]static.Hosts,
startWG *sync.WaitGroup,
) {
serviceName := primitives.FrontendService
persistenceConfig, err := copyPersistenceConfig(c.persistenceConfig)
if err != nil {
c.logger.Fatal("Failed to copy persistence config for history", tag.Error(err))
}
func (c *temporalImpl) copyPersistenceConfig() config.Persistence {
persistenceConfig := copyPersistenceConfig(c.persistenceConfig)
if c.esConfig != nil {
esDataStoreName := "es-visibility"
persistenceConfig.VisibilityStore = esDataStoreName
persistenceConfig.DataStores[esDataStoreName] = config.DataStore{
Elasticsearch: c.esConfig,
}
}
return persistenceConfig
}

func (c *temporalImpl) startFrontend(
hostsByService map[primitives.ServiceName]static.Hosts,
startWG *sync.WaitGroup,
) {
serviceName := primitives.FrontendService

var frontendService *frontend.Service
var clientBean client.Bean
var namespaceRegistry namespace.Registry
var rpcFactory common.RPCFactory
feApp := fx.New(
fx.Supply(
persistenceConfig,
c.copyPersistenceConfig(),
serviceName,
c.mockAdminClient,
),
Expand Down Expand Up @@ -501,7 +502,7 @@ func (c *temporalImpl) startFrontend(
temporal.FxLogAdapter,
c.getFxOptionsForService(primitives.FrontendService),
)
err = feApp.Err()
err := feApp.Err()
if err != nil {
c.logger.Fatal("unable to construct frontend service", tag.Error(err))
}
Expand Down Expand Up @@ -535,24 +536,12 @@ func (c *temporalImpl) startHistory(
historyHosts.Self = host
hostMap[serviceName] = historyHosts

persistenceConfig, err := copyPersistenceConfig(c.persistenceConfig)
if err != nil {
c.logger.Fatal("Failed to copy persistence config for history", tag.Error(err))
}
if c.esConfig != nil {
esDataStoreName := "es-visibility"
persistenceConfig.VisibilityStore = esDataStoreName
persistenceConfig.DataStores[esDataStoreName] = config.DataStore{
Elasticsearch: c.esConfig,
}
}

var historyService *history.Service
var clientBean client.Bean
var namespaceRegistry namespace.Registry
app := fx.New(
fx.Supply(
persistenceConfig,
c.copyPersistenceConfig(),
serviceName,
c.mockAdminClient,
),
Expand Down Expand Up @@ -596,7 +585,7 @@ func (c *temporalImpl) startHistory(
temporal.FxLogAdapter,
c.getFxOptionsForService(primitives.HistoryService),
)
err = app.Err()
err := app.Err()
if err != nil {
c.logger.Fatal("unable to construct history service", tag.Error(err))
}
Expand Down Expand Up @@ -631,24 +620,12 @@ func (c *temporalImpl) startMatching(
) {
serviceName := primitives.MatchingService

persistenceConfig, err := copyPersistenceConfig(c.persistenceConfig)
if err != nil {
c.logger.Fatal("Failed to copy persistence config for matching", tag.Error(err))
}
if c.esConfig != nil {
esDataStoreName := "es-visibility"
persistenceConfig.VisibilityStore = esDataStoreName
persistenceConfig.DataStores[esDataStoreName] = config.DataStore{
Elasticsearch: c.esConfig,
}
}

var matchingService *matching.Service
var clientBean client.Bean
var namespaceRegistry namespace.Registry
app := fx.New(
fx.Supply(
persistenceConfig,
c.copyPersistenceConfig(),
serviceName,
c.mockAdminClient,
),
Expand Down Expand Up @@ -684,7 +661,7 @@ func (c *temporalImpl) startMatching(
temporal.FxLogAdapter,
c.getFxOptionsForService(primitives.MatchingService),
)
err = app.Err()
err := app.Err()
if err != nil {
c.logger.Fatal("unable to start matching service", tag.Error(err))
}
Expand Down Expand Up @@ -712,18 +689,6 @@ func (c *temporalImpl) startWorker(
) {
serviceName := primitives.WorkerService

persistenceConfig, err := copyPersistenceConfig(c.persistenceConfig)
if err != nil {
c.logger.Fatal("Failed to copy persistence config for history", tag.Error(err))
}
if c.esConfig != nil {
esDataStoreName := "es-visibility"
persistenceConfig.VisibilityStore = esDataStoreName
persistenceConfig.DataStores[esDataStoreName] = config.DataStore{
Elasticsearch: c.esConfig,
}
}

clusterConfigCopy := cluster.Config{
EnableGlobalNamespace: c.clusterMetadataConfig.EnableGlobalNamespace,
FailoverVersionIncrement: c.clusterMetadataConfig.FailoverVersionIncrement,
Expand All @@ -740,7 +705,7 @@ func (c *temporalImpl) startWorker(
var namespaceRegistry namespace.Registry
app := fx.New(
fx.Supply(
persistenceConfig,
c.copyPersistenceConfig(),
serviceName,
c.mockAdminClient,
),
Expand Down Expand Up @@ -778,7 +743,7 @@ func (c *temporalImpl) startWorker(
temporal.FxLogAdapter,
c.getFxOptionsForService(primitives.WorkerService),
)
err = app.Err()
err := app.Err()
if err != nil {
c.logger.Fatal("unable to start worker service", tag.Error(err))
}
Expand Down Expand Up @@ -1018,26 +983,19 @@ func (c *temporalImpl) Authorize(
return authorization.Result{Decision: authorization.DecisionAllow}, nil
}

// copyPersistenceConfig makes a deepcopy of persistence config.
// copyPersistenceConfig makes a deep copy of persistence config.
// This is just a temp fix for the race condition of persistence config.
// The race condition happens because all the services are using the same datastore map in the config.
// Also all services will retry to modify the maxQPS field in the datastore during start up and use the modified maxQPS value to create a persistence factory.
func copyPersistenceConfig(pConfig config.Persistence) (config.Persistence, error) {
copiedDataStores := make(map[string]config.DataStore)
for name, value := range pConfig.DataStores {
copiedDataStore := config.DataStore{}
encodedDataStore, err := json.Marshal(value)
if err != nil {
return pConfig, err
}

if err = json.Unmarshal(encodedDataStore, &copiedDataStore); err != nil {
return pConfig, err
}
copiedDataStores[name] = copiedDataStore
func copyPersistenceConfig(cfg config.Persistence) config.Persistence {
var newCfg config.Persistence
b, err := json.Marshal(cfg)
if err != nil {
panic("copy persistence config: " + err.Error())
} else if err = json.Unmarshal(b, &newCfg); err != nil {
panic("copy persistence config: " + err.Error())
}
pConfig.DataStores = copiedDataStores
return pConfig, nil
return newCfg
}

func sdkClientFactoryProvider(
Expand Down

0 comments on commit 6d8baf9

Please sign in to comment.