From 14a5bc5bee17b8e269100c329b4eee9c77730240 Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Tue, 3 Sep 2024 12:58:14 -0700 Subject: [PATCH] State machine standby executor fixes (#6472) (#6474) ## What changed? Cherry picked #6472 on top of 120. --- components/dummy/tasks.go | 29 +++- components/nexusoperations/executors.go | 9 -- components/nexusoperations/tasks.go | 9 ++ service/history/ndc_standby_task_util.go | 14 -- .../outbound_queue_standby_task_executor.go | 138 ++++++++---------- service/history/queues/executable.go | 2 +- .../timer_queue_standby_task_executor.go | 8 +- .../timer_queue_standby_task_executor_test.go | 108 ++++++++++++++ 8 files changed, 201 insertions(+), 116 deletions(-) diff --git a/components/dummy/tasks.go b/components/dummy/tasks.go index b3a7194526a..fe7616aa74b 100644 --- a/components/dummy/tasks.go +++ b/components/dummy/tasks.go @@ -30,8 +30,8 @@ import ( ) const ( - TaskTypeTimer = "dummy.Immediate" - TaskTypeImmediate = "dummy.Timer" + TaskTypeTimer = "dummy.Timer" + TaskTypeImmediate = "dummy.Immediate" ) type ImmediateTask struct { @@ -66,7 +66,8 @@ func (ImmediateTaskSerializer) Serialize(hsm.Task) ([]byte, error) { } type TimerTask struct { - Deadline time.Time + Deadline time.Time + concurrent bool } var _ hsm.Task = TimerTask{} @@ -79,21 +80,33 @@ func (t TimerTask) Kind() hsm.TaskKind { return hsm.TaskKindTimer{Deadline: t.Deadline} } -func (TimerTask) Concurrent() bool { - return false +func (t TimerTask) Concurrent() bool { + return t.concurrent +} + +func (t TimerTask) Validate(*hsm.Node) error { + // In case the task is considered concurrent, consider it valid for now. + return nil } type TimerTaskSerializer struct{} func (TimerTaskSerializer) Deserialize(data []byte, kind hsm.TaskKind) (hsm.Task, error) { if kind, ok := kind.(hsm.TaskKindTimer); ok { - return TimerTask{Deadline: kind.Deadline}, nil + return TimerTask{Deadline: kind.Deadline, concurrent: len(data) > 0}, nil } return nil, fmt.Errorf("%w: expected timer", hsm.ErrInvalidTaskKind) } -func (TimerTaskSerializer) Serialize(hsm.Task) ([]byte, error) { - return nil, nil +func (s TimerTaskSerializer) Serialize(task hsm.Task) ([]byte, error) { + if tt, ok := task.(TimerTask); ok { + if tt.concurrent { + // Non empty data marks the task as concurrent. + return []byte{1}, nil + } + return nil, nil + } + return nil, fmt.Errorf("incompatible task: %v", task) } func RegisterTaskSerializers(reg *hsm.Registry) error { diff --git a/components/nexusoperations/executors.go b/components/nexusoperations/executors.go index ce3b5fcc849..9e46030c074 100644 --- a/components/nexusoperations/executors.go +++ b/components/nexusoperations/executors.go @@ -293,9 +293,6 @@ func (e taskExecutor) loadOperationArgs( if err := task.Validate(node); err != nil { return err } - if err := node.CheckRunning(); err != nil { - return err - } operation, err := hsm.MachineData[Operation](node) if err != nil { return err @@ -459,9 +456,6 @@ func (e taskExecutor) executeBackoffTask(env hsm.Environment, node *hsm.Node, ta if err := task.Validate(node); err != nil { return err } - if err := node.CheckRunning(); err != nil { - return err - } return hsm.MachineTransition(node, func(op Operation) (hsm.TransitionOutput, error) { return TransitionRescheduled.Apply(op, EventRescheduled{ Node: node, @@ -473,9 +467,6 @@ func (e taskExecutor) executeTimeoutTask(env hsm.Environment, node *hsm.Node, ta if err := task.Validate(node); err != nil { return err } - if err := node.CheckRunning(); err != nil { - return err - } return hsm.MachineTransition(node, func(op Operation) (hsm.TransitionOutput, error) { eventID, err := hsm.EventIDFromToken(op.ScheduledEventToken) if err != nil { diff --git a/components/nexusoperations/tasks.go b/components/nexusoperations/tasks.go index 2a3f761c563..0fb23f754b3 100644 --- a/components/nexusoperations/tasks.go +++ b/components/nexusoperations/tasks.go @@ -59,6 +59,9 @@ func (TimeoutTask) Concurrent() bool { // Validate checks if the timeout task is still valid to execute for the given node state. func (t TimeoutTask) Validate(node *hsm.Node) error { + if err := node.CheckRunning(); err != nil { + return err + } op, err := hsm.MachineData[Operation](node) if err != nil { return err @@ -106,6 +109,9 @@ func (InvocationTask) Concurrent() bool { } func (t InvocationTask) Validate(node *hsm.Node) error { + if err := node.CheckRunning(); err != nil { + return err + } op, err := hsm.MachineData[Operation](node) if err != nil { return err @@ -152,6 +158,9 @@ func (BackoffTask) Concurrent() bool { } func (t BackoffTask) Validate(node *hsm.Node) error { + if err := node.CheckRunning(); err != nil { + return err + } op, err := hsm.MachineData[Operation](node) if err != nil { return err diff --git a/service/history/ndc_standby_task_util.go b/service/history/ndc_standby_task_util.go index 5ac9d8166eb..6e6d03021f1 100644 --- a/service/history/ndc_standby_task_util.go +++ b/service/history/ndc_standby_task_util.go @@ -97,20 +97,6 @@ func standbyTimerTaskPostActionTaskDiscarded( return consts.ErrTaskDiscarded } -func standbyOutboundTaskPostActionTaskDiscarded( - _ context.Context, - taskInfo tasks.Task, - postActionInfo interface{}, - logger log.Logger, -) error { - if postActionInfo == nil { - return nil - } - - logger.Warn("Discarding standby outbound task due to task being pending for too long.", tag.Task(taskInfo)) - return consts.ErrTaskDiscarded -} - type ( historyResendInfo struct { diff --git a/service/history/outbound_queue_standby_task_executor.go b/service/history/outbound_queue_standby_task_executor.go index d5a5cdf7c38..c1f25c47e91 100644 --- a/service/history/outbound_queue_standby_task_executor.go +++ b/service/history/outbound_queue_standby_task_executor.go @@ -27,6 +27,7 @@ import ( "errors" "go.temporal.io/server/common/log" + "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "go.temporal.io/server/service/history/configs" @@ -87,90 +88,13 @@ func (e *outboundQueueStandbyTaskExecutor) Execute( } } - nsName, err := e.shardContext.GetNamespaceRegistry().GetNamespaceName( - namespace.ID(task.GetNamespaceID()), - ) - if err != nil { - return respond(err) - } - - ref, smt, err := stateMachineTask(e.shardContext, task) - if err != nil { - return respond(err) - } - - if err := validateTaskByClock(e.shardContext, task); err != nil { - return respond(err) - } - - destination := "" - if dtask, ok := task.(tasks.HasDestination); ok { - destination = dtask.GetDestination() - } - - actionFn := func(ctx context.Context) (any, error) { - err := e.Access(ctx, ref, hsm.AccessRead, func(node *hsm.Node) error { - if smt.Concurrent() { - //nolint:revive // concurrent tasks implements hsm.ConcurrentTask interface - concurrentSmt := smt.(hsm.ConcurrentTask) - return concurrentSmt.Validate(node) - } - return nil - }) - if err != nil { - if errors.Is(err, consts.ErrStaleReference) { - // If the reference is stale, then the task was already executed in - // the active queue, and there is nothing to do here. - return nil, nil - } - return nil, err - } - - // If there was no error from Access nor from the accessor function, then the task - // is still valid for processing based on the current state of the machine. - // The *likely* reasons are: a) delay in the replication stack; b) destination is down. - // In any case, the task needs to be retried. - var postActionInfoErr error = consts.ErrTaskRetry - if e.config.OutboundStandbyTaskMissingEventsDestinationDownErr(nsName.String(), destination) { - // Wrap the retry error with DestinationDownError so it can trigger the circuit breaker on - // the standby side. This won't do any harm, at most some delay processing the standby task. - // Assuming the dynamic config OutboundStandbyTaskMissingEventsDiscardDelay is long enough, - // it should give enough time for the active side to execute the task successfully, and the - // standby side to process it as well without discarding the task. - postActionInfoErr = queues.NewDestinationDownError( - "standby task executor returned retryable error", - postActionInfoErr, - ) - } - return postActionInfoErr, nil - } - - err = e.processTask( - ctx, - task, - actionFn, - getStandbyPostActionFn( - task, - e.Now, - 0, // We don't need resend delay since we don't do fetch history. - e.config.OutboundStandbyTaskMissingEventsDiscardDelay(nsName.String(), destination), - // We don't need to fetch history from remote for state machine to sync. - // So, just use the noop post action which will return a retry error - // if the task didn't succeed. - standbyTaskPostActionNoOp, - standbyOutboundTaskPostActionTaskDiscarded, - ), - ) - - return respond(err) + return respond(e.processTask(ctx, task)) } func (e *outboundQueueStandbyTaskExecutor) processTask( ctx context.Context, task tasks.Task, - actionFn func(context.Context) (any, error), - postActionFn standbyPostActionFn, -) (retError error) { +) error { ctx, cancel := context.WithTimeout(ctx, taskTimeout) defer cancel() @@ -180,15 +104,67 @@ func (e *outboundQueueStandbyTaskExecutor) processTask( if err != nil { return err } + if !nsRecord.IsOnCluster(e.clusterName) { // namespace is not replicated to local cluster, ignore corresponding tasks return nil } - historyResendInfo, err := actionFn(ctx) + if err := validateTaskByClock(e.shardContext, task); err != nil { + return err + } + + ref, smt, err := stateMachineTask(e.shardContext, task) + if err != nil { + return err + } + + err = e.Access(ctx, ref, hsm.AccessRead, func(node *hsm.Node) error { + if smt.Concurrent() { + //nolint:revive // concurrent tasks implements hsm.ConcurrentTask interface + concurrentSmt := smt.(hsm.ConcurrentTask) + return concurrentSmt.Validate(node) + } + return nil + }) + if err != nil { + if errors.Is(err, consts.ErrStaleReference) { + // If the reference is stale, then the task was already executed in + // the active queue, and there is nothing to do here. + return nil + } return err } - return postActionFn(ctx, task, historyResendInfo, e.logger) + // If there was no error from Access nor from the accessor function, then the task + // is still valid for processing based on the current state of the machine. + // The *likely* reasons are: a) delay in the replication stack; b) destination is down. + // In any case, the task needs to be retried (or discarded, based on the configured discard delay). + + destination := "" + if dtask, ok := task.(tasks.HasDestination); ok { + destination = dtask.GetDestination() + } + + discardTime := task.GetVisibilityTime().Add(e.config.OutboundStandbyTaskMissingEventsDiscardDelay(nsRecord.Name().String(), destination)) + // now > task start time + discard delay + if e.Now().After(discardTime) { + e.logger.Warn("Discarding standby outbound task due to task being pending for too long.", tag.Task(task)) + return consts.ErrTaskDiscarded + } + + err = consts.ErrTaskRetry + if e.config.OutboundStandbyTaskMissingEventsDestinationDownErr(nsRecord.Name().String(), destination) { + // Wrap the retry error with DestinationDownError so it can trigger the circuit breaker on + // the standby side. This won't do any harm, at most some delay processing the standby task. + // Assuming the dynamic config OutboundStandbyTaskMissingEventsDiscardDelay is long enough, + // it should give enough time for the active side to execute the task successfully, and the + // standby side to process it as well without discarding the task. + err = queues.NewDestinationDownError( + "standby task executor returned retryable error", + err, + ) + } + return err } diff --git a/service/history/queues/executable.go b/service/history/queues/executable.go index d8c4a501e3b..f2cc4224bf9 100644 --- a/service/history/queues/executable.go +++ b/service/history/queues/executable.go @@ -361,7 +361,7 @@ func (e *executableImpl) isSafeToDropError(err error) bool { // Even though ErrStaleReference is castable to serviceerror.NotFound, we give this error special treatment // because we're interested in the metric. metrics.TaskSkipped.With(e.taggedMetricsHandler).Record(1) - e.logger.Info("Skipped task due with stale reference", tag.Error(err)) + e.logger.Info("Skipped task due to stale reference", tag.Error(err)) return true } diff --git a/service/history/timer_queue_standby_task_executor.go b/service/history/timer_queue_standby_task_executor.go index 21382988119..35c37fdb72f 100644 --- a/service/history/timer_queue_standby_task_executor.go +++ b/service/history/timer_queue_standby_task_executor.go @@ -523,10 +523,12 @@ func (t *timerQueueStandbyTaskExecutor) executeStateMachineTimerTask( if task.Concurrent() { //nolint:revive // concurrent tasks implements hsm.ConcurrentTask interface concurrentTask := task.(hsm.ConcurrentTask) - return concurrentTask.Validate(node) + if err := concurrentTask.Validate(node); err != nil { + return err + } } - // If the task is expired and still valid in the standby queue, - // then the state machine is stale. + // If the timer fired and the task is still valid in the standby queue, wait for the active cluster to + // transition and invalidate the task. return consts.ErrTaskRetry }, ) diff --git a/service/history/timer_queue_standby_task_executor_test.go b/service/history/timer_queue_standby_task_executor_test.go index 9f03a6ede53..a6853045b65 100644 --- a/service/history/timer_queue_standby_task_executor_test.go +++ b/service/history/timer_queue_standby_task_executor_test.go @@ -1743,6 +1743,114 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestExecuteStateMachineTimerTask_Ex s.Equal(futureDeadline, info.StateMachineTimers[0].Deadline.AsTime()) } +func (s *timerQueueStandbyTaskExecutorSuite) TestExecuteStateMachineTimerTask_ValidConcurrentTaskIsKept() { + reg := s.mockShard.StateMachineRegistry() + s.NoError(dummy.RegisterStateMachine(reg)) + s.NoError(dummy.RegisterTaskSerializers(reg)) + + we := &commonpb.WorkflowExecution{ + WorkflowId: tests.WorkflowID, + RunId: tests.RunID, + } + + ms := workflow.NewMockMutableState(s.controller) + info := &persistencespb.WorkflowExecutionInfo{ + VersionHistories: &historypb.VersionHistories{ + CurrentVersionHistoryIndex: 0, + Histories: []*historypb.VersionHistory{ + { + Items: []*historypb.VersionHistoryItem{ + {EventId: 1, Version: 2}, + }, + }, + }, + }, + } + + root, err := hsm.NewRoot( + reg, + workflow.StateMachineType, + ms, + make(map[string]*persistencespb.StateMachineMap), + ms, + ) + s.NoError(err) + + ms.EXPECT().GetCurrentVersion().Return(int64(2)).AnyTimes() + ms.EXPECT().NextTransitionCount().Return(int64(0)).AnyTimes() // emulate transition history disabled. + ms.EXPECT().GetNextEventID().Return(int64(2)) + ms.EXPECT().GetExecutionInfo().Return(info).AnyTimes() + ms.EXPECT().GetWorkflowKey().Return(tests.WorkflowKey).AnyTimes() + ms.EXPECT().GetExecutionState().Return( + &persistencespb.WorkflowExecutionState{Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING}, + ).AnyTimes() + ms.EXPECT().HSM().Return(root).AnyTimes() + + _, err = dummy.MachineCollection(root).Add("dummy", dummy.NewDummy()) + s.NoError(err) + + dummyRoot, err := root.Child([]hsm.Key{ + {Type: dummy.StateMachineType, ID: "dummy"}, + }) + s.NoError(err) + err = hsm.MachineTransition(dummyRoot, func(sm *dummy.Dummy) (hsm.TransitionOutput, error) { + return dummy.Transition0.Apply(sm, dummy.Event0{}) + }) + s.NoError(err) + err = hsm.MachineTransition(dummyRoot, func(sm *dummy.Dummy) (hsm.TransitionOutput, error) { + return dummy.Transition0.Apply(sm, dummy.Event0{}) + }) + s.NoError(err) + + // Track a task with a past deadline. Should get executed. + workflow.TrackStateMachineTimer(ms, s.mockShard.GetTimeSource().Now().Add(-time.Hour), &persistencespb.StateMachineTaskInfo{ + Ref: &persistencespb.StateMachineRef{ + MutableStateVersionedTransition: &persistencespb.VersionedTransition{ + NamespaceFailoverVersion: 2, + }, + MachineInitialVersionedTransition: &persistencespb.VersionedTransition{ + NamespaceFailoverVersion: 0, + }, + MachineLastUpdateVersionedTransition: &persistencespb.VersionedTransition{ + NamespaceFailoverVersion: 2, + }, + }, + Type: dummy.TaskTypeTimer, + Data: []byte{1}, // Mark the task as concurrent + }) + + wfCtx := workflow.NewMockContext(s.controller) + wfCtx.EXPECT().LoadMutableState(gomock.Any(), s.mockShard).Return(ms, nil) + + mockCache := wcache.NewMockCache(s.controller) + mockCache.EXPECT().GetOrCreateWorkflowExecution( + gomock.Any(), s.mockShard, tests.NamespaceID, we, locks.PriorityLow, + ).Return(wfCtx, wcache.NoopReleaseFn, nil) + + task := &tasks.StateMachineTimerTask{ + WorkflowKey: tests.WorkflowKey, + Version: 2, + } + + //nolint:revive // unchecked-type-assertion + timerQueueStandbyTaskExecutor := newTimerQueueStandbyTaskExecutor( + s.mockShard, + mockCache, + s.mockDeleteManager, + s.mockNDCHistoryResender, + s.mockResendHandler, + s.mockMatchingClient, + s.logger, + metrics.NoopMetricsHandler, + s.clusterName, + s.config, + ).(*timerQueueStandbyTaskExecutor) + + err = timerQueueStandbyTaskExecutor.executeStateMachineTimerTask(context.Background(), task) + s.ErrorIs(err, consts.ErrTaskRetry) + s.Equal(1, len(info.StateMachineTimers)) +} + func (s *timerQueueStandbyTaskExecutorSuite) TestExecuteStateMachineTimerTask_StaleStateMachine() { reg := s.mockShard.StateMachineRegistry() s.NoError(dummy.RegisterStateMachine(reg))