diff --git a/service/worker/batcher/fx.go b/service/worker/batcher/fx.go index aa93498549b..6e08278d3de 100644 --- a/service/worker/batcher/fx.go +++ b/service/worker/batcher/fx.go @@ -89,9 +89,10 @@ func (s *workerComponent) DedicatedWorkerOptions(ns *namespace.Namespace) *worke } } -func (s *workerComponent) Register(registry sdkworker.Registry, ns *namespace.Namespace, _ workercommon.RegistrationDetails) { +func (s *workerComponent) Register(registry sdkworker.Registry, ns *namespace.Namespace, _ workercommon.RegistrationDetails) func() { registry.RegisterWorkflowWithOptions(BatchWorkflow, workflow.RegisterOptions{Name: BatchWFTypeName}) registry.RegisterActivity(s.activities(ns.Name(), ns.ID())) + return nil } func (s *workerComponent) activities(name namespace.Name, id namespace.ID) *activities { diff --git a/service/worker/common/interface.go b/service/worker/common/interface.go index 3e89e1af8bf..c2dff07926b 100644 --- a/service/worker/common/interface.go +++ b/service/worker/common/interface.go @@ -61,7 +61,9 @@ type ( PerNSWorkerComponent interface { // Register registers Workflow and Activity types provided by this worker component. // The namespace that this worker is running in is also provided. - Register(sdkworker.Registry, *namespace.Namespace, RegistrationDetails) + // If Register returns a function, that function will be called when the worker is + // stopped. This can be used to clean up any state. + Register(sdkworker.Registry, *namespace.Namespace, RegistrationDetails) func() // DedicatedWorkerOptions returns a PerNSDedicatedWorkerOptions for this worker component. DedicatedWorkerOptions(*namespace.Namespace) *PerNSDedicatedWorkerOptions } diff --git a/service/worker/common/interface_mock.go b/service/worker/common/interface_mock.go index 86f540328f8..e4193901d00 100644 --- a/service/worker/common/interface_mock.go +++ b/service/worker/common/interface_mock.go @@ -154,9 +154,11 @@ func (mr *MockPerNSWorkerComponentMockRecorder) DedicatedWorkerOptions(arg0 any) } // Register mocks base method. -func (m *MockPerNSWorkerComponent) Register(arg0 worker.Registry, arg1 *namespace.Namespace, arg2 RegistrationDetails) { +func (m *MockPerNSWorkerComponent) Register(arg0 worker.Registry, arg1 *namespace.Namespace, arg2 RegistrationDetails) func() { m.ctrl.T.Helper() - m.ctrl.Call(m, "Register", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "Register", arg0, arg1, arg2) + ret0, _ := ret[0].(func()) + return ret0 } // Register indicates an expected call of Register. diff --git a/service/worker/pernamespaceworker.go b/service/worker/pernamespaceworker.go index 8f188b7ac8c..c65067f8a07 100644 --- a/service/worker/pernamespaceworker.go +++ b/service/worker/pernamespaceworker.go @@ -112,6 +112,7 @@ type ( componentSet string client sdkclient.Client worker sdkworker.Worker + cleanup []func() } workerAllocation struct { @@ -448,6 +449,7 @@ func (w *perNamespaceWorker) tryRefresh(ns *namespace.Namespace) error { client, worker, err := w.startWorker(ns, enabledComponents, workerAllocation, dcOptions) if err != nil { // TODO: add metric also + w.stopWorkerLocked() // for calling cleanup return err } @@ -500,7 +502,10 @@ func (w *perNamespaceWorker) startWorker( Multiplicity: allocation.Local, } for _, cmp := range components { - cmp.Register(worker, ns, details) + cleanup := cmp.Register(worker, ns, details) + if cleanup != nil { + w.cleanup = append(w.cleanup, cleanup) + } } // this blocks by calling DescribeNamespace a few times (with a 10s timeout) @@ -554,6 +559,10 @@ func (w *perNamespaceWorker) stopWorkerAndResetTimer() { } func (w *perNamespaceWorker) stopWorkerLocked() { + for _, cleanup := range w.cleanup { + cleanup() + } + w.cleanup = nil if w.worker != nil { w.worker.Stop() w.worker = nil diff --git a/service/worker/scheduler/fx.go b/service/worker/scheduler/fx.go index 9b17bf4edb1..a56d315b390 100644 --- a/service/worker/scheduler/fx.go +++ b/service/worker/scheduler/fx.go @@ -26,6 +26,7 @@ package scheduler import ( "fmt" + "math" "time" enumspb "go.temporal.io/api/enums/v1" @@ -66,7 +67,7 @@ type ( specBuilder *SpecBuilder // workflow dep activityDeps activityDeps enabledForNs dynamicconfig.BoolPropertyFnWithNamespaceFilter - globalNSStartWorkflowRPS dynamicconfig.FloatPropertyFnWithNamespaceFilter + globalNSStartWorkflowRPS dynamicconfig.TypedSubscribableWithNamespaceFilter[float64] maxBlobSize dynamicconfig.IntPropertyFnWithNamespaceFilter localActivitySleepLimit dynamicconfig.DurationPropertyFnWithNamespaceFilter } @@ -100,7 +101,7 @@ func NewResult( specBuilder: specBuilder, activityDeps: params, enabledForNs: dynamicconfig.WorkerEnableScheduler.Get(dc), - globalNSStartWorkflowRPS: dynamicconfig.SchedulerNamespaceStartWorkflowRPS.Get(dc), + globalNSStartWorkflowRPS: dynamicconfig.SchedulerNamespaceStartWorkflowRPS.Subscribe(dc), maxBlobSize: dynamicconfig.BlobSizeLimitError.Get(dc), localActivitySleepLimit: dynamicconfig.SchedulerLocalActivitySleepLimit.Get(dc), }, @@ -113,24 +114,35 @@ func (s *workerComponent) DedicatedWorkerOptions(ns *namespace.Namespace) *worke } } -func (s *workerComponent) Register(registry sdkworker.Registry, ns *namespace.Namespace, details workercommon.RegistrationDetails) { +func (s *workerComponent) Register(registry sdkworker.Registry, ns *namespace.Namespace, details workercommon.RegistrationDetails) func() { wfFunc := func(ctx workflow.Context, args *schedspb.StartScheduleArgs) error { return schedulerWorkflowWithSpecBuilder(ctx, args, s.specBuilder) } registry.RegisterWorkflowWithOptions(wfFunc, workflow.RegisterOptions{Name: WorkflowType}) - registry.RegisterActivity(s.activities(ns.Name(), ns.ID(), details)) + + activities, cleanup := s.newActivities(ns.Name(), ns.ID(), details) + registry.RegisterActivity(activities) + return cleanup } -func (s *workerComponent) activities(name namespace.Name, id namespace.ID, details workercommon.RegistrationDetails) *activities { - localRPS := func() float64 { - return float64(details.Multiplicity) * s.globalNSStartWorkflowRPS(name.String()) / float64(details.TotalWorkers) +func (s *workerComponent) newActivities(name namespace.Name, id namespace.ID, details workercommon.RegistrationDetails) (*activities, func()) { + const burstRatio = 1.0 + + lim := quotas.NewRateLimiter(1, 1) + cb := func(rps float64) { + localRPS := rps * float64(details.Multiplicity) / float64(details.TotalWorkers) + burst := max(1, int(math.Ceil(localRPS*burstRatio))) + lim.SetRateBurst(localRPS, burst) } + initialRPS, cancel := s.globalNSStartWorkflowRPS(name.String(), cb) + cb(initialRPS) + return &activities{ activityDeps: s.activityDeps, namespace: name, namespaceID: id, - startWorkflowRateLimiter: quotas.NewDefaultOutgoingRateLimiter(localRPS), + startWorkflowRateLimiter: lim, maxBlobSize: func() int { return s.maxBlobSize(name.String()) }, localActivitySleepLimit: func() time.Duration { return s.localActivitySleepLimit(name.String()) }, - } + }, cancel } diff --git a/tests/schedule.go b/tests/schedule.go index c0d4799664f..6200fec786b 100644 --- a/tests/schedule.go +++ b/tests/schedule.go @@ -1049,31 +1049,9 @@ func (s *ScheduleFunctionalSuite) TestRateLimit() { wid := "sched-test-rate-limit-wf-%d" wt := "sched-test-rate-limit-wt" - // clean up per-ns-worker. note that this will run after the OverrideDynamicConfig below is reverted. - s.T().Cleanup(func() { - for _, w := range s.testCluster.host.workerServices { - w.RefreshPerNSWorkerManager() - } - time.Sleep(2 * time.Second) - }) - - // Set 1/sec rate limit per namespace. To force this to take effect immediately (instead of - // waiting one minute) we have to cause the whole worker to be stopped and started. The - // sleeps are needed because the refresh is asynchronous, and there's no way to get access - // to the actual rate limiter object to refresh it directly. + // Set 1/sec rate limit per namespace. s.OverrideDynamicConfig(dynamicconfig.SchedulerNamespaceStartWorkflowRPS, 1.0) - revertWorkerCount := s.OverrideDynamicConfig(dynamicconfig.WorkerPerNamespaceWorkerCount, 0) - for _, w := range s.testCluster.host.workerServices { - w.RefreshPerNSWorkerManager() - } - time.Sleep(2 * time.Second) - revertWorkerCount() - for _, w := range s.testCluster.host.workerServices { - w.RefreshPerNSWorkerManager() - } - time.Sleep(2 * time.Second) - var runs int32 workflowFn := func(ctx workflow.Context) error { workflow.SideEffect(ctx, func(ctx workflow.Context) any { @@ -1127,13 +1105,6 @@ func (s *ScheduleFunctionalSuite) TestRateLimit() { }) s.NoError(err) } - - // stop workers again so they get started with the default rps - s.OverrideDynamicConfig(dynamicconfig.WorkerPerNamespaceWorkerCount, 0) - for _, w := range s.testCluster.host.workerServices { - w.RefreshPerNSWorkerManager() - } - time.Sleep(2 * time.Second) } func (s *ScheduleFunctionalSuite) TestNextTimeCache() {