Skip to content

Commit

Permalink
Subscription for schedulerNamespaceStartWorkflowRPS (#6574)
Browse files Browse the repository at this point in the history
## What changed?
- Use dynamic config subscription for
`schedulerNamespaceStartWorkflowRPS`.
- Simplify test that uses it.

## Why?
Faster reaction to changing rate limit and can simplify test.

## How did you test it?
updated functional test
  • Loading branch information
dnr authored Oct 1, 2024
1 parent 0d4e202 commit 10ab254
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 44 deletions.
3 changes: 2 additions & 1 deletion service/worker/batcher/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,10 @@ func (s *workerComponent) DedicatedWorkerOptions(ns *namespace.Namespace) *worke
}
}

func (s *workerComponent) Register(registry sdkworker.Registry, ns *namespace.Namespace, _ workercommon.RegistrationDetails) {
func (s *workerComponent) Register(registry sdkworker.Registry, ns *namespace.Namespace, _ workercommon.RegistrationDetails) func() {
registry.RegisterWorkflowWithOptions(BatchWorkflow, workflow.RegisterOptions{Name: BatchWFTypeName})
registry.RegisterActivity(s.activities(ns.Name(), ns.ID()))
return nil
}

func (s *workerComponent) activities(name namespace.Name, id namespace.ID) *activities {
Expand Down
4 changes: 3 additions & 1 deletion service/worker/common/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ type (
PerNSWorkerComponent interface {
// Register registers Workflow and Activity types provided by this worker component.
// The namespace that this worker is running in is also provided.
Register(sdkworker.Registry, *namespace.Namespace, RegistrationDetails)
// If Register returns a function, that function will be called when the worker is
// stopped. This can be used to clean up any state.
Register(sdkworker.Registry, *namespace.Namespace, RegistrationDetails) func()
// DedicatedWorkerOptions returns a PerNSDedicatedWorkerOptions for this worker component.
DedicatedWorkerOptions(*namespace.Namespace) *PerNSDedicatedWorkerOptions
}
Expand Down
6 changes: 4 additions & 2 deletions service/worker/common/interface_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion service/worker/pernamespaceworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ type (
componentSet string
client sdkclient.Client
worker sdkworker.Worker
cleanup []func()
}

workerAllocation struct {
Expand Down Expand Up @@ -448,6 +449,7 @@ func (w *perNamespaceWorker) tryRefresh(ns *namespace.Namespace) error {
client, worker, err := w.startWorker(ns, enabledComponents, workerAllocation, dcOptions)
if err != nil {
// TODO: add metric also
w.stopWorkerLocked() // for calling cleanup
return err
}

Expand Down Expand Up @@ -500,7 +502,10 @@ func (w *perNamespaceWorker) startWorker(
Multiplicity: allocation.Local,
}
for _, cmp := range components {
cmp.Register(worker, ns, details)
cleanup := cmp.Register(worker, ns, details)
if cleanup != nil {
w.cleanup = append(w.cleanup, cleanup)
}
}

// this blocks by calling DescribeNamespace a few times (with a 10s timeout)
Expand Down Expand Up @@ -554,6 +559,10 @@ func (w *perNamespaceWorker) stopWorkerAndResetTimer() {
}

func (w *perNamespaceWorker) stopWorkerLocked() {
for _, cleanup := range w.cleanup {
cleanup()
}
w.cleanup = nil
if w.worker != nil {
w.worker.Stop()
w.worker = nil
Expand Down
30 changes: 21 additions & 9 deletions service/worker/scheduler/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package scheduler

import (
"fmt"
"math"
"time"

enumspb "go.temporal.io/api/enums/v1"
Expand Down Expand Up @@ -66,7 +67,7 @@ type (
specBuilder *SpecBuilder // workflow dep
activityDeps activityDeps
enabledForNs dynamicconfig.BoolPropertyFnWithNamespaceFilter
globalNSStartWorkflowRPS dynamicconfig.FloatPropertyFnWithNamespaceFilter
globalNSStartWorkflowRPS dynamicconfig.TypedSubscribableWithNamespaceFilter[float64]
maxBlobSize dynamicconfig.IntPropertyFnWithNamespaceFilter
localActivitySleepLimit dynamicconfig.DurationPropertyFnWithNamespaceFilter
}
Expand Down Expand Up @@ -100,7 +101,7 @@ func NewResult(
specBuilder: specBuilder,
activityDeps: params,
enabledForNs: dynamicconfig.WorkerEnableScheduler.Get(dc),
globalNSStartWorkflowRPS: dynamicconfig.SchedulerNamespaceStartWorkflowRPS.Get(dc),
globalNSStartWorkflowRPS: dynamicconfig.SchedulerNamespaceStartWorkflowRPS.Subscribe(dc),
maxBlobSize: dynamicconfig.BlobSizeLimitError.Get(dc),
localActivitySleepLimit: dynamicconfig.SchedulerLocalActivitySleepLimit.Get(dc),
},
Expand All @@ -113,24 +114,35 @@ func (s *workerComponent) DedicatedWorkerOptions(ns *namespace.Namespace) *worke
}
}

func (s *workerComponent) Register(registry sdkworker.Registry, ns *namespace.Namespace, details workercommon.RegistrationDetails) {
func (s *workerComponent) Register(registry sdkworker.Registry, ns *namespace.Namespace, details workercommon.RegistrationDetails) func() {
wfFunc := func(ctx workflow.Context, args *schedspb.StartScheduleArgs) error {
return schedulerWorkflowWithSpecBuilder(ctx, args, s.specBuilder)
}
registry.RegisterWorkflowWithOptions(wfFunc, workflow.RegisterOptions{Name: WorkflowType})
registry.RegisterActivity(s.activities(ns.Name(), ns.ID(), details))

activities, cleanup := s.newActivities(ns.Name(), ns.ID(), details)
registry.RegisterActivity(activities)
return cleanup
}

func (s *workerComponent) activities(name namespace.Name, id namespace.ID, details workercommon.RegistrationDetails) *activities {
localRPS := func() float64 {
return float64(details.Multiplicity) * s.globalNSStartWorkflowRPS(name.String()) / float64(details.TotalWorkers)
func (s *workerComponent) newActivities(name namespace.Name, id namespace.ID, details workercommon.RegistrationDetails) (*activities, func()) {
const burstRatio = 1.0

lim := quotas.NewRateLimiter(1, 1)
cb := func(rps float64) {
localRPS := rps * float64(details.Multiplicity) / float64(details.TotalWorkers)
burst := max(1, int(math.Ceil(localRPS*burstRatio)))
lim.SetRateBurst(localRPS, burst)
}
initialRPS, cancel := s.globalNSStartWorkflowRPS(name.String(), cb)
cb(initialRPS)

return &activities{
activityDeps: s.activityDeps,
namespace: name,
namespaceID: id,
startWorkflowRateLimiter: quotas.NewDefaultOutgoingRateLimiter(localRPS),
startWorkflowRateLimiter: lim,
maxBlobSize: func() int { return s.maxBlobSize(name.String()) },
localActivitySleepLimit: func() time.Duration { return s.localActivitySleepLimit(name.String()) },
}
}, cancel
}
31 changes: 1 addition & 30 deletions tests/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -1049,31 +1049,9 @@ func (s *ScheduleFunctionalSuite) TestRateLimit() {
wid := "sched-test-rate-limit-wf-%d"
wt := "sched-test-rate-limit-wt"

// clean up per-ns-worker. note that this will run after the OverrideDynamicConfig below is reverted.
s.T().Cleanup(func() {
for _, w := range s.testCluster.host.workerServices {
w.RefreshPerNSWorkerManager()
}
time.Sleep(2 * time.Second)
})

// Set 1/sec rate limit per namespace. To force this to take effect immediately (instead of
// waiting one minute) we have to cause the whole worker to be stopped and started. The
// sleeps are needed because the refresh is asynchronous, and there's no way to get access
// to the actual rate limiter object to refresh it directly.
// Set 1/sec rate limit per namespace.
s.OverrideDynamicConfig(dynamicconfig.SchedulerNamespaceStartWorkflowRPS, 1.0)

revertWorkerCount := s.OverrideDynamicConfig(dynamicconfig.WorkerPerNamespaceWorkerCount, 0)
for _, w := range s.testCluster.host.workerServices {
w.RefreshPerNSWorkerManager()
}
time.Sleep(2 * time.Second)
revertWorkerCount()
for _, w := range s.testCluster.host.workerServices {
w.RefreshPerNSWorkerManager()
}
time.Sleep(2 * time.Second)

var runs int32
workflowFn := func(ctx workflow.Context) error {
workflow.SideEffect(ctx, func(ctx workflow.Context) any {
Expand Down Expand Up @@ -1127,13 +1105,6 @@ func (s *ScheduleFunctionalSuite) TestRateLimit() {
})
s.NoError(err)
}

// stop workers again so they get started with the default rps
s.OverrideDynamicConfig(dynamicconfig.WorkerPerNamespaceWorkerCount, 0)
for _, w := range s.testCluster.host.workerServices {
w.RefreshPerNSWorkerManager()
}
time.Sleep(2 * time.Second)
}

func (s *ScheduleFunctionalSuite) TestNextTimeCache() {
Expand Down

0 comments on commit 10ab254

Please sign in to comment.